Skip to Content
ControllerJSON Lines

JSON Lines streaming

Vovk.ts includes first class support for JSONLines  format, which is a convenient way to implement β€œone request - many responses”. It’s perfect for LLM completions, but also opens up a new field for experiments, such as progressive responses and polling. JSONLines is another kind of output that uses iteration validation field and produces application/jsonl content-type if client-side sends Accept: application/jsonl header. If the Accepts header doesn’t include application/jsonl, the output is returned as text/plain to be available when the endpoint URL is opened directly in the browser.

import { z } from 'zod'; import { prefix, post, type VovkIteration } from 'vovk'; import { withZod } from 'vovk-zod'; @prefix('stream') export default class StreamController { @post('completions') static getJSONLines = withZod({ // ... iteration: z.object({ message: z.string(), }), async handle() { const tokens: VovkIteration<typeof StreamController.getJSONLines>[] = [ { message: 'Hello,' }, { message: ' World' }, { message: ' from' }, { message: ' Stream' }, { message: '!' }, ]; for (const token of tokens) { await new Promise((resolve) => setTimeout(resolve, 300)); yield token; } }, }); }

When used with service class, the iterable can be delegated with yield* syntax:

src/modules/stream/StreamController.ts
import { prefix, post, type VovkIteration } from 'vovk'; import StreamService from './StreamService'; @prefix('stream') export default class StreamController { @post('completions') static getJSONLines = withZod({ // ... iteration: z.object({ message: z.string(), }), async *handle() { yield* StreamService.getJSONLines(); }, }); }
src/modules/stream/StreamService.ts
import type { VovkIteration } from 'vovk'; import type { StreamController } from './StreamController'; export default class StreamService { static async *getJSONLines() { const tokens: VovkIteration<typeof StreamController.getJSONLines>[] = [ { message: 'Hello,' }, { message: ' World' }, { message: ' from' }, { message: ' Stream' }, { message: '!' }, ]; for (const token of tokens) { await new Promise((resolve) => setTimeout(resolve, 300)); yield token; } } }

On the client side, the JSONLines output can be consumed using disposable async iterators, allowing to process each line as it arrives:

import { StreamRPC } from 'vovk-client'; using stream = await StreamRPC.getJSONLines(); for await (const { message } of stream) { console.log('Received message:', message); }

JSONLinesResponse class

If generators aren’t sutable for JSON streaming at a particular case, you can use JSONLinesResponse class inherited from Response class that uses TransformStream#readable as response body.

It’s a lower-level API that is used behind the scenes to implement generator logic described above.

A service method at this case is implemented as a regular function that accepts JSONLinesResponse instance as a pointer to send messages manually.

There is what the streaming service might look like:

src/modules/stream/StreamService.ts
import type { JSONLinesResponse } from 'vovk'; export type Token = { message: string }; export default class StreamService { static async streamTokens(resp: JSONLinesResponse<Token>) { const tokens: Token[] = [{ message: 'Hello,' }, { message: ' World' }, { message: '!' }]; for (const token of tokens) { await new Promise((resolve) => setTimeout(resolve, 300)); resp.send(token); } resp.close(); } }

As you can see tokens are sent using JSONLinesResponse#send method and, when the stream is completed, it needs to be closed with JSONLinesResponse#close.

The controller class returns an instance of JSONLinesResponse and the streaming is performed a floating Promise above the return statement.

import { prefix, get, JSONLinesResponse, type VovkRequest } from 'vovk'; import StreamService, { type Token } from './StreamService'; @prefix('stream') export default class StreamController { @get('tokens') static async streamTokens() { const resp = new JSONLinesResponse<Token>(); void StreamService.streamTokens(resp); return resp; } }

JSONLinesResponse class also provides throw method that safely closes the stream and makes the client to re-throw the received error.

await resp.throw(new Error('Stream error'));
Last updated on