JSON Lines Streaming
Vovk.ts includes firstβclass support for the JSON LinesΒ format, a convenient way to implement βone requestβmany responses.β JSON Lines is another output type that uses the iteration validation field and produces the application/jsonl content type if the client sends an Accept: application/jsonl header. If the Accept header doesnβt include application/jsonl, the output is returned as text/plain so itβs viewable when the endpoint URL is opened directly in a browser.
Because the response size is not known in advance, JSON Lines responses cannot be compressed with Gzip, Brotli, or other algorithms. Keep this in mind for large responses.
import { z } from 'zod';
import { procedure, prefix, post, type VovkIteration } from 'vovk';
@prefix('stream')
export default class StreamController {
@post('completions')
static getJSONLines = procedure({
// ...
iteration: z.object({
message: z.string(),
}),
async handle() {
const tokens: VovkIteration<typeof StreamController.getJSONLines>[] = [
{ message: 'Hello,' },
{ message: ' World' },
{ message: ' from' },
{ message: ' Stream' },
{ message: '!' },
];
for (const token of tokens) {
await new Promise((resolve) => setTimeout(resolve, 300));
yield token;
}
},
});
}When used with a service, the iterable can be delegated using the yield* syntax:
import { procedure, prefix, post, type VovkIteration } from 'vovk';
import StreamService from './StreamService';
@prefix('stream')
export default class StreamController {
@post('completions')
static getJSONLines = procedure({
// ...
iteration: z.object({
message: z.string(),
}),
async *handle() {
yield* StreamService.getJSONLines();
},
});
}import type { VovkIteration } from 'vovk';
import type { StreamController } from './StreamController';
export default class StreamService {
static async *getJSONLines() {
const tokens: VovkIteration<typeof StreamController.getJSONLines>[] = [
{ message: 'Hello,' },
{ message: ' World' },
{ message: ' from' },
{ message: ' Stream' },
{ message: '!' },
];
for (const token of tokens) {
await new Promise((resolve) => setTimeout(resolve, 300));
yield token;
}
}
}On the client side, the JSON Lines output can be consumed using disposableΒ async iteratorsΒ , allowing you to process each line as it arrives:
import { StreamRPC } from 'vovk-client';
using stream = await StreamRPC.getJSONLines();
for await (const { message } of stream) {
console.log('Received message:', message);
}The iterable (represented as stream above), besides Symbol.asyncIterator, Symbol.dispose, and Symbol.asyncDispose, also provides:
status: The HTTP response status (e.g., 200 for OK, 404 for Not Found).asPromise: A promise that resolves with an array of all emitted values when the stream completes.onIterate: Registers a callback for each iteration.abortController: AnAbortControllerinstance to abort the stream. When the stream is closed withabortController.abort(), it throws anAbortErroron the stream reader that can be caught on the client side via errorcauseproperty.abortWithoutError: A method to abort the stream without throwing an error on the stream reader. This is useful when you want to stop processing the stream gracefully.
The using statement ensures the stream is aborted with stream.abortWithoutError('Stream disposed') when it goes out of scope.
console.log('Response status:', stream.status);
stream.onIterate((item) => {
console.log('Iterated item:', item);
});
if (someCondition) {
stream.abortWithoutError();
}
console.log('All messages:', await stream.asPromise());OpenAI Chat Example
Create a procedure that delegates iterable output from OpenAIβs streaming chat completions:
import { post, prefix, operation, type VovkRequest } from 'vovk';
import OpenAI from 'openai';
@prefix('openai')
export default class OpenAiController {
@operation({
summary: 'Create a chat completion',
})
@post('chat')
static async *createChatCompletion(
req: VovkRequest<{ messages: OpenAI.Chat.Completions.ChatCompletionMessageParam[] }>
) {
const { messages } = await req.json();
const openai = new OpenAI();
yield* await openai.chat.completions.create({
messages,
model: 'gpt-5-nano',
stream: true,
});
}
}On the client side, consume the streamed completion as follows:
// ...
using completion = await OpenAiRPC.createChatCompletion({
body: { messages: [...messages, userMessage] },
});
for await (const part of completion) {
// ...
}View full example on examples.vovk.dev Β»Β
JSONLinesResponse Class
If generators arenβt suitable for JSON streaming in a particular case, use the JSONLinesResponse class, which inherits from Response and uses TransformStream#readable as the response body. This lowerβlevel API is used internally to implement the generator logic described above.
A service method in this case is implemented as a regular function that accepts a JSONLinesResponse instance as a pointer to send messages manually.
Hereβs what the streaming service might look like:
import type { JSONLinesResponse, VovkIteration } from 'vovk';
import type StreamController from './StreamController';
export type Token = VovkIteration<typeof StreamController.streamTokens>
export default class StreamService {
static async streamTokens(resp: JSONLinesResponse<Token>) {
const tokens: Token[] = [{ message: 'Hello,' }, { message: ' World' }, { message: '!' }];
for (const token of tokens) {
await new Promise((resolve) => setTimeout(resolve, 300));
resp.send(token);
}
resp.close();
}
}As you can see, tokens are sent with JSONLinesResponse#send, and when the stream completes, close it with JSONLinesResponse#close.
The controller class returns an instance of JSONLinesResponse, and the streaming is performed in a floating Promise above the return statement.
import { prefix, get, JSONLinesResponse, type VovkRequest } from 'vovk';
import StreamService, { type Token } from './StreamService';
@prefix('stream')
export default class StreamController {
@get('tokens')
static async streamTokens(req: Request) {
const resp = new JSONLinesResponse<Token>(req);
void StreamService.streamTokens(resp);
return resp;
}
}The JSONLinesResponse class also provides a throw method that safely closes the stream and causes the client to rethrow the received error.
await resp.throw(new Error('Stream error'));Progressive Response with progressive Function
A common use of the JSON Lines format is to sequentially send multiple data chunks (JSON lines) in response to a single request. This is useful for longβrunning operations, such as LLM completions, where you want to deliver partial results as they become available.
But what if you donβt know which chunk will arrive first, second, and so on? In this case, you can use an experimental feature called βprogressive response,β inspired by Dan Abramovβs proposal Progressive JSONΒ , from which the βprogressiveβ name originates.
Letβs say you have two functions that return data after some random delay: getUsers and getTasks, implemented as static methods of a service class. In a real application, these could be API calls or queries to different databases.
With the help of the JSONLinesResponse class, we can create a simple service method that looks like this:
// ...
void Promise.all([
this.getUsers().then((users) => resp.send({ users })),
this.getTasks().then((tasks) => resp.send({ tasks })),
])
.then(resp.close)
.catch(resp.throw);
// ...- Once
getUsers()orgetTasks()resolves,resp.sendsends a JSON line to the client. - When all promises resolve,
resp.closecloses the response stream. - If any promise rejects,
resp.throwsends an error response to the client.
The full implementation of the service module looks like this:
import { JSONLinesResponse, type VovkIteration } from 'vovk';
import type ProgressiveController from './ProgressiveController.ts';
export default class ProgressiveService {
static async getUsers() {
await new Promise((resolve) => setTimeout(resolve, Math.random() * 10_000));
return [
{ id: 1, name: 'John Doe' },
{ id: 2, name: 'Jane Smith' },
{ id: 3, name: 'Alice Johnson' },
{ id: 4, name: 'Bob Brown' },
{ id: 5, name: 'Charlie White' },
];
}
static async getTasks() {
await new Promise((resolve) => setTimeout(resolve, Math.random() * 10_000));
return [
{ id: 1, title: 'Task One', completed: false },
{ id: 2, title: 'Task Two', completed: true },
{ id: 3, title: 'Task Three', completed: false },
{ id: 4, title: 'Task Four', completed: true },
{ id: 5, title: 'Task Five', completed: false },
];
}
static streamProgressiveResponse(
resp: JSONLinesResponse<VovkIteration<typeof ProgressiveController.streamProgressiveResponse>>
) {
Promise.all([
this.getUsers().then((users) => resp.send({ users })),
this.getTasks().then((tasks) => resp.send({ tasks })),
])
.then(resp.close)
.catch(resp.throw);
}
}The code above is fetched from GitHub repository.Β
On the controller side, instantiate JSONLinesResponse, pass it to the service method, and return it as the response.
// ...
const response = new JSONLinesResponse(req);
void ProgressiveService.streamProgressiveResponse(response);
return response;
// ...The full controller implementation with typing and validation looks like this:
import { endpoint, get, JSONLinesResponse, prefix, type VovkIteration } from 'vovk';
import { z } from 'zod';
import ProgressiveService from './ProgressiveService.ts';
@prefix('progressive')
export default class ProgressiveController {
@get('', { cors: true })
static streamProgressiveResponse = endpoint({
validateEachIteration: true,
iteration: z.union([
z.strictObject({
users: z.array(
z.strictObject({
id: z.number(),
name: z.string(),
})
),
}),
z.strictObject({
tasks: z.array(
z.strictObject({
id: z.number(),
title: z.string(),
completed: z.boolean(),
})
),
}),
]),
async handle(req) {
const response = new JSONLinesResponse<VovkIteration<typeof ProgressiveController.streamProgressiveResponse>>(
req
);
void ProgressiveService.streamProgressiveResponse(response);
return response;
},
});
}The code above is fetched from GitHub repository.Β
For the client-side, we will use the progressive function from the vovk package, which creates a promise for each property of the resulting object. It accepts the RPC method to call (e.g., ProgressiveRPC.streamProgressiveResponse) and optional input parameters. The function returns an object with promises per property, which can be awaited separately.
const { users: usersPromise, tasks: tasksPromise } = progressive(ProgressiveRPC.streamProgressiveResponse);If the RPC method requires input parameters, you can pass them as the second argument:
const { users: usersPromise, tasks: tasksPromise } = progressive(ProgressiveRPC.streamProgressiveResponse, {
params: { id: '123' },
body: { hello: 'world' },
});After that, the promises can be awaited separately, and the data will be available as soon as the corresponding JSON line is received from the server:
usersPromise.then(console.log).catch(console.error);
tasksPromise.then(console.log).catch(console.error);Behind the scenes, progressive returns a ProxyΒ that implements a get trap to return a promise for each accessed property.
- When a new JSON line arrives, the corresponding promise resolves with that data.
- If a JSON line arrives for a property without an existing promise, the promise is created and resolved (so it can be retrieved later).
- When the response closes, all unsettled promises are rejected with an error indicating that the connection closed before sending a value for that property.
- If the response errors, all unsettled promises are rejected with that error.