JSON Lines streaming
Vovk.ts includes first class support for JSONLinesβ format, which is a convenient way to implement βone request - many responsesβ. Itβs perfect for LLM completions, but also opens up a new field for experiments, such as progressive responses and polling. JSONLines is another kind of output that uses iteration
validation field and produces application/jsonl
content-type if client-side sends Accept: application/jsonl
header. If the Accepts
header doesnβt include application/jsonl
, the output is returned as text/plain
to be available when the endpoint URL is opened directly in the browser.
import { z } from 'zod';
import { prefix, post, type VovkIteration } from 'vovk';
import { withZod } from 'vovk-zod';
@prefix('stream')
export default class StreamController {
@post('completions')
static getJSONLines = withZod({
// ...
iteration: z.object({
message: z.string(),
}),
async handle() {
const tokens: VovkIteration<typeof StreamController.getJSONLines>[] = [
{ message: 'Hello,' },
{ message: ' World' },
{ message: ' from' },
{ message: ' Stream' },
{ message: '!' },
];
for (const token of tokens) {
await new Promise((resolve) => setTimeout(resolve, 300));
yield token;
}
},
});
}
When used with service class, the iterable can be delegated with yield*
syntax:
import { prefix, post, type VovkIteration } from 'vovk';
import StreamService from './StreamService';
@prefix('stream')
export default class StreamController {
@post('completions')
static getJSONLines = withZod({
// ...
iteration: z.object({
message: z.string(),
}),
async *handle() {
yield* StreamService.getJSONLines();
},
});
}
import type { VovkIteration } from 'vovk';
import type { StreamController } from './StreamController';
export default class StreamService {
static async *getJSONLines() {
const tokens: VovkIteration<typeof StreamController.getJSONLines>[] = [
{ message: 'Hello,' },
{ message: ' World' },
{ message: ' from' },
{ message: ' Stream' },
{ message: '!' },
];
for (const token of tokens) {
await new Promise((resolve) => setTimeout(resolve, 300));
yield token;
}
}
}
On the client side, the JSONLines output can be consumed using disposable async iterators, allowing to process each line as it arrives:
import { StreamRPC } from 'vovk-client';
using stream = await StreamRPC.getJSONLines();
for await (const { message } of stream) {
console.log('Received message:', message);
}
JSONLinesResponse
class
If generators arenβt sutable for JSON streaming at a particular case, you can use JSONLinesResponse
class inherited from Response
class that uses TransformStream#readable
as response body.
Itβs a lower-level API that is used behind the scenes to implement generator logic described above.
A service method at this case is implemented as a regular function that accepts JSONLinesResponse
instance as a pointer to send messages manually.
There is what the streaming service might look like:
import type { JSONLinesResponse } from 'vovk';
export type Token = { message: string };
export default class StreamService {
static async streamTokens(resp: JSONLinesResponse<Token>) {
const tokens: Token[] = [{ message: 'Hello,' }, { message: ' World' }, { message: '!' }];
for (const token of tokens) {
await new Promise((resolve) => setTimeout(resolve, 300));
resp.send(token);
}
resp.close();
}
}
As you can see tokens are sent using JSONLinesResponse#send
method and, when the stream is completed, it needs to be closed with JSONLinesResponse#close
.
The controller class returns an instance of JSONLinesResponse
and the streaming is performed a floating Promise above the return
statement.
import { prefix, get, JSONLinesResponse, type VovkRequest } from 'vovk';
import StreamService, { type Token } from './StreamService';
@prefix('stream')
export default class StreamController {
@get('tokens')
static async streamTokens() {
const resp = new JSONLinesResponse<Token>();
void StreamService.streamTokens(resp);
return resp;
}
}
JSONLinesResponse
class also provides throw
method that safely closes the stream and makes the client to re-throw the received error.
await resp.throw(new Error('Stream error'));