JSON Lines Streaming
Vovk.ts includes first‑class support for the JSON Lines format, a convenient way to implement “one request—many responses.” It’s perfect for LLM completions and opens up new possibilities for experiments, such as progressive responses and polling. JSON Lines is another output type that uses the iteration validation field and produces the application/jsonl content type if the client sends an Accept: application/jsonl header. If the Accept header doesn’t include application/jsonl, the output is returned as text/plain so it’s viewable when the endpoint URL is opened directly in a browser.
Because the response size is not known in advance, JSON Lines responses cannot be compressed with Gzip, Brotli, or other algorithms. Keep this in mind for large responses.
import { z } from 'zod';
import { prefix, post, type VovkIteration } from 'vovk';
import { withZod } from 'vovk-zod';
@prefix('stream')
export default class StreamController {
@post('completions')
static getJSONLines = withZod({
// ...
iteration: z.object({
message: z.string(),
}),
async handle() {
const tokens: VovkIteration<typeof StreamController.getJSONLines>[] = [
{ message: 'Hello,' },
{ message: ' World' },
{ message: ' from' },
{ message: ' Stream' },
{ message: '!' },
];
for (const token of tokens) {
await new Promise((resolve) => setTimeout(resolve, 300));
yield token;
}
},
});
}When used with a service, the iterable can be delegated using the yield* syntax:
import { prefix, post, type VovkIteration } from 'vovk';
import StreamService from './StreamService';
@prefix('stream')
export default class StreamController {
@post('completions')
static getJSONLines = withZod({
// ...
iteration: z.object({
message: z.string(),
}),
async *handle() {
yield* StreamService.getJSONLines();
},
});
}import type { VovkIteration } from 'vovk';
import type { StreamController } from './StreamController';
export default class StreamService {
static async *getJSONLines() {
const tokens: VovkIteration<typeof StreamController.getJSONLines>[] = [
{ message: 'Hello,' },
{ message: ' World' },
{ message: ' from' },
{ message: ' Stream' },
{ message: '!' },
];
for (const token of tokens) {
await new Promise((resolve) => setTimeout(resolve, 300));
yield token;
}
}
}On the client side, the JSON Lines output can be consumed using disposable async iterators , allowing you to process each line as it arrives:
import { StreamRPC } from 'vovk-client';
using stream = await StreamRPC.getJSONLines();
for await (const { message } of stream) {
console.log('Received message:', message);
}The iterable (represented as stream above), besides Symbol.asyncIterator, Symbol.dispose, and Symbol.asyncDispose, also provides:
status: The HTTP response status (e.g., 200 for OK, 404 for Not Found).asPromise: A promise that resolves with an array of all emitted values when the stream completes.onIterate: Registers a callback for each iteration.abortController: AnAbortControllerinstance to abort the stream. When the stream is closed withabortController.abort(), it throws anAbortError.
The using statement ensures the stream is aborted with stream.abortController.abort('Stream disposed') when it goes out of scope.
console.log('Response status:', stream.status);
stream.onIterate((item) => {
console.log('Iterated item:', item);
});
if(someCondition) {
stream.abortController.abort();
}
console.log('All messages:', await stream.asPromise());JSONLinesResponse Class
If generators aren’t suitable for JSON streaming in a particular case, use the JSONLinesResponse class, which inherits from Response and uses TransformStream#readable as the response body. This lower‑level API is used internally to implement the generator logic described above.
A service method in this case is implemented as a regular function that accepts a JSONLinesResponse instance as a pointer to send messages manually.
Here’s what the streaming service might look like:
import type { JSONLinesResponse } from 'vovk';
export type Token = { message: string };
export default class StreamService {
static async streamTokens(resp: JSONLinesResponse<Token>) {
const tokens: Token[] = [{ message: 'Hello,' }, { message: ' World' }, { message: '!' }];
for (const token of tokens) {
await new Promise((resolve) => setTimeout(resolve, 300));
resp.send(token);
}
resp.close();
}
}As you can see, tokens are sent with JSONLinesResponse#send, and when the stream completes, close it with JSONLinesResponse#close.
The controller class returns an instance of JSONLinesResponse, and the streaming is performed in a floating Promise above the return statement.
import { prefix, get, JSONLinesResponse, type VovkRequest } from 'vovk';
import StreamService, { type Token } from './StreamService';
@prefix('stream')
export default class StreamController {
@get('tokens')
static async streamTokens(req: Request) {
const resp = new JSONLinesResponse<Token>(req);
void StreamService.streamTokens(resp);
return resp;
}
}The JSONLinesResponse class also provides a throw method that safely closes the stream and causes the client to rethrow the received error.
await resp.throw(new Error('Stream error'));