Progressive response π§ͺ
A common use of JSONLines format is to sequentially send multiple data chunks (JSON lines) as a response to a single request. This is useful for long-running operations, such as LLM completions, where you want to send partial results as they become available.
But what if you donβt know what chunk will come first, second and so on? In this case, you can use an experimental feature that I call βprogressive responseβ, inspired by Danβs Abramovβs proposal Progressive JSONβ, where I borrowed the βprogressiveβ name from.
The progressive response feature can also be described as βpartial response streamingβ, similar to Partial Prerenderingβ, but instead of rendering UI, it βrendersβ response data in an unknown order. When all the data is sent, the response gets closed.
Letβs say you have two functions that return data after some random delay: getUsers
and getTasks
, implemented as static methods of a service class. In a real application, these could be API calls or queries to different databases; they also can be connected to each other by ID, for example, each task can have a user ID that it belongs to, but for this example, we will keep them independent.
With the help of JSONLinesResponse class, we can implement a simple method, whose implementation will look like this:
// ...
void Promise.all([
this.getUsers().then((users) => resp.send({ users })),
this.getTasks().then((tasks) => resp.send({ tasks })),
])
.then(resp.close)
.catch(resp.throw);
// ...
- Once
getUsers()
orgetTasks()
are resolved, theresp.send
method is called with the data, which sends a JSON line to the client. - When all promises are resolved, the
resp.close
method is called, which closes the response stream. - If any of the promises are rejected, the
resp.throw
method is called, which sends an error response to the client.
The full implementation of the service class looks like this:
import { JSONLinesResponse, type VovkIteration } from 'vovk';
import type ProgressiveController from './ProgressiveController';
export default class ProgressiveService {
static async getUsers() {
await new Promise((resolve) => setTimeout(resolve, Math.random() * 10_000));
return [
{ id: 1, name: 'John Doe' },
{ id: 2, name: 'Jane Smith' },
{ id: 3, name: 'Alice Johnson' },
{ id: 4, name: 'Bob Brown' },
{ id: 5, name: 'Charlie White' },
];
}
static async getTasks() {
await new Promise((resolve) => setTimeout(resolve, Math.random() * 10_000));
return [
{ id: 1, title: 'Task One', completed: false },
{ id: 2, title: 'Task Two', completed: true },
{ id: 3, title: 'Task Three', completed: false },
{ id: 4, title: 'Task Four', completed: true },
{ id: 5, title: 'Task Five', completed: false },
];
}
static streamProgressiveResponse(
resp: JSONLinesResponse<VovkIteration<typeof ProgressiveController.streamProgressiveResponse>>
) {
void Promise.all([
this.getUsers().then((users) => resp.send({ users })),
this.getTasks().then((tasks) => resp.send({ tasks })),
])
.then(resp.close)
.catch(resp.throw);
}
}
On the controller side, we need to instantiate the JSONLinesResponse
class, pass it to the service method and return it as a response.
// ...
const response = new JSONLinesResponse(req);
void ProgressiveService.streamProgressiveResponse(response);
return response;
// ...
The full controller implementation with typing and validation looks like this:
import { get, JSONLinesResponse, prefix, type VovkIteration } from 'vovk';
import { withZod } from 'vovk-zod';
import { z } from 'zod';
import ProgressiveService from './ProgressiveService';
@prefix('progressive')
export default class ProgressiveController {
@get()
static streamProgressiveResponse = withZod({
validateEachIteration: true,
iteration: z.union([
z.strictObject({
users: z.array(
z.strictObject({
id: z.number(),
name: z.string(),
})
),
}),
z.strictObject({
tasks: z.array(
z.strictObject({
id: z.number(),
title: z.string(),
completed: z.boolean(),
})
),
}),
]),
async handle(req) {
const response = new JSONLinesResponse<VovkIteration<typeof ProgressiveController.streamProgressiveResponse>>(
req
);
void ProgressiveService.streamProgressiveResponse(response);
return response;
},
});
}
The server-side code is ready, now we can implement the client-side code to consume this progressive response. For this, we will use the progressive
function from the vovk
package, which creates a promise for each member of the resulting object. It accepts the RPC module method that we want to call, in this case, ProgressiveRPC.streamProgressiveResponse
and input parameters, if any. The progressive
function returns an object with promises for each member of the response, which can be awaited separately.
const { users: usersPromise, tasks: tasksPromise } = progressive(ProgressiveRPC.streamProgressiveResponse);
In case if the RPC module method requires input parameters, you can pass them as the second argument:
const { users: usersPromise, tasks: tasksPromise } = progressive(ProgressiveRPC.streamProgressiveResponse, {
params: { id: '123' },
body: { hello: 'world' },
});
After that the promises can be awaited separately, and the data will be available as soon as the corresponding JSON line is received from the server:
usersPromise.then(console.log).catch(console.error);
tasksPromise.then(console.log).catch(console.error);
Behind the scenes progressive
function returns a Proxyβ object that implements get
trap that returns a promise for each gotten property.
- When JSON Lines response sends a new line, the corresponding promise is resolved with the data from the line.
- When JSON Lines response sends a new line, but there is no corresponding promise, the promise is created and resolved with the data from the line (so you can retrieve it later from the
progressive
return value). - When JSON Lines response is closed, all non-settled promises are rejected with an error message indicating that the connection was closed without sending a value for the corresponding property.
- When JSON Lines response is errored, all non-settled promises are rejected with the error from the response.