Skip to Content
🐺 Vovk.ts is released. Read the blog post →
JSON Lines

JSON Lines Streaming

Overview

Vovk.ts includes first‑class support for the JSON Lines  format, a convenient way to implement “one request—many responses.” JSON Lines is another output type that uses the iteration validation field and produces the application/jsonl content type if the client sends an Accept: application/jsonl header. If the Accept header doesn’t include application/jsonl, the output is returned as text/plain so it’s viewable when the endpoint URL is opened directly in a browser.

The use cases for JSON Lines include:

  • Type-safe alternative to Server-Sent Events (SSE) for streaming data to clients.
  • Long‑running operations that produce multiple results over time, such as LLM completions or database polling.
  • Progressive data loading, where partial results are sent as they become available.
Important

Because the response size is not known in advance, JSON Lines responses cannot be compressed with Gzip, Brotli, or other algorithms. Keep this in mind for large responses.

Creating a JSON Lines Generator Procedure

To create a JSON Lines procedure, define a procedure as a generator or async generator function. Each yielded object is serialized to JSON and sent as a separate line in the response.

import { z } from 'zod'; import { procedure, prefix, post, type VovkIteration } from 'vovk'; @prefix('stream') export default class StreamController { @post('completions') static getJSONLines = procedure({ // ... iteration: z.object({ message: z.string(), }), }).handle(async function*() { const tokens: VovkIteration<typeof StreamController.getJSONLines>[] = [ { message: 'Hello,' }, { message: ' World' }, { message: ' from' }, { message: ' Stream' }, { message: '!' }, ]; for (const token of tokens) { await new Promise((resolve) => setTimeout(resolve, 300)); yield token; } }); }

When used with a service, the iterable can be delegated using the yield* syntax:

src/modules/stream/StreamController.ts
import { procedure, prefix, post, type VovkIteration } from 'vovk'; import StreamService from './StreamService'; @prefix('stream') export default class StreamController { @post('completions') static getJSONLines = procedure({ // ... iteration: z.object({ message: z.string(), }), }).handle(function *() { yield* StreamService.getJSONLines(); }); }
src/modules/stream/StreamService.ts
import type { VovkIteration } from 'vovk'; import type { StreamController } from './StreamController'; export default class StreamService { static async *getJSONLines() { const tokens: VovkIteration<typeof StreamController.getJSONLines>[] = [ { message: 'Hello,' }, { message: ' World' }, { message: ' from' }, { message: ' Stream' }, { message: '!' }, ]; for (const token of tokens) { await new Promise((resolve) => setTimeout(resolve, 300)); yield token; } } }

On the client side, the JSON Lines output can be consumed using disposable  async iterators , to process each line as it arrives:

import { StreamRPC } from 'vovk-client'; using stream = await StreamRPC.getJSONLines(); for await (const { message } of stream) { console.log('Received message:', message); }

The iterable (represented as stream above), besides Symbol.asyncIterator, Symbol.dispose, and Symbol.asyncDispose, also provides:

  • status: The HTTP response status (e.g., 200 for OK, 404 for Not Found).
  • asPromise: A promise that resolves with an array of all emitted values when the stream completes.
  • onIterate: Registers a callback for each iteration.
  • abortController: An AbortController instance to abort the stream. When the stream is closed with abortController.abort(), it throws an AbortError on the stream reader that can be caught on the client side via error cause property.
  • abortSilently: A method to abort the stream without throwing an error on the stream reader. This is useful when you want to stop processing the stream gracefully.

The using statement ensures the stream is aborted with stream.abortSilently('Stream disposed') when it goes out of scope.

console.log('Response status:', stream.status); stream.onIterate((item) => { console.log('Iterated item:', item); }); if (someCondition) { stream.abortSilently(); } console.log('All messages:', await stream.asPromise());

OpenAI Chat Example

Create a procedure that delegates iterable output from OpenAI’s streaming chat completions:

src/modules/llm/LlmController.ts
import { post, prefix, operation, type VovkRequest } from 'vovk'; import OpenAI from 'openai'; @prefix('openai') export default class OpenAiController { @operation({ summary: 'Create a chat completion', }) @post('chat') static async *createChatCompletion( req: VovkRequest<{ messages: OpenAI.Chat.Completions.ChatCompletionMessageParam[] }> ) { const { messages } = await req.json(); const openai = new OpenAI(); yield* await openai.chat.completions.create({ messages, model: 'gpt-5-nano', stream: true, }); } }

On the client side, consume the streamed completion as follows:

// ... using completion = await OpenAiRPC.createChatCompletion({ body: { messages: [...messages, userMessage] }, }); for await (const part of completion) { // ... }

View full example on examples.vovk.dev » 

JSONLinesResponder Class

JSONLinesResponder class is a lower-level API that works behind the scenes of the generator-based approach described above. It gives more control over the streaming logic, allowing to send messages manually. It constructs ReadableStream internally that’s used as the response body.

const responder = new JSONLinesResponder<IterationType>(req);
const responder = new JSONLinesResponder<IterationType>(req, ({ readableStream, headers }) => new Response(readableStream, { headers }));

The constructor accepts two optional parameters:

  • request?: Request | null – The incoming request object. If provided, it checks for Accept: application/jsonl header to create headers record for the response with Content-Type: application/jsonl. If not provided or the header is missing, it defaults to text/plain.
  • getResponse?: (responder: JSONLinesResponder<T>) => Response – A function that allows to construct a custom Response object. Allows to set custom headers or other response options. If not provided, Response is created internally with default headers.

The responder instance provides the following members:

  • send(item: T): Promise<void> – Sends a JSON line to the client. The item is validated (if iteration is present; by default only the first item is validated unless validateEachIteration: true is set), serialized to JSON and followed by a newline character.
  • close(): void – Closes the response stream, indicating that no more data will be sent.
  • throw(err: Error): void – Sends an error message to the client and closes the stream.
  • response: Response – The underlying Response object that will be returned from the Next.js route handler.
  • headers: Record<string, string> – The content-type for the response.
  • readableStream: ReadableStream<Uint8Array> – The readable stream used as the response body.

With JSONLinesResponder a service method is implemented as a regular function (not a generator) that accepts a JSONLinesResponder instance as a pointer to send messages manually.

src/modules/stream/StreamService.ts
import type { JSONLinesResponder, VovkIteration } from 'vovk'; import type StreamController from './StreamController'; export type Token = VovkIteration<typeof StreamController.streamTokens> export default class StreamService { static async streamTokens(responder: JSONLinesResponder<Token>) { const tokens: Token[] = [{ message: 'Hello,' }, { message: ' World' }, { message: '!' }]; for (const token of tokens) { await new Promise((resolve) => setTimeout(resolve, 300)); await responder.send(token); } responder.close(); } }

Tokens are sent with JSONLinesResponder#send, and when the stream completes, close it with JSONLinesResponder#close.

The controller class returns an instance of JSONLinesResponder, and the streaming is performed in a floating Promise above the return statement.

import { prefix, get, JSONLinesResponder, type VovkRequest } from 'vovk'; import StreamService, { type Token } from './StreamService'; @prefix('stream') export default class StreamController { @get('tokens') static async streamTokens(req: Request) { const responder = new JSONLinesResponder<Token>(req); void StreamService.streamTokens(responder); return responder; } }

The JSONLinesResponder class also provides a throw method that safely closes the stream and causes the client to rethrow the received error.

await resp.throw(new Error('Stream error'));
Last updated on