Text Streaming for LLMs
Vovk.ts provides two ways to implement text response streaming requred for applications that utilise the AI completions.
Async iterators
Controller methods can implement generators that use *
syntax and utilise yield
keyword instead of regular return
.
// /src/modules/stream/StreamController.ts
import { get, prefix } from 'vovk';
type Token = { message: string };
@prefix('stream')
export default class StreamController {
@get('tokens')
static async *streamTokens() {
const tokens: Token[] = [
{ message: 'Hello,' },
{ message: ' World' },
{ message: '!' },
];
for (const token of tokens) {
await new Promise((resolve) => setTimeout(resolve, 300));
yield token;
}
}
}
In order to refactor this code and utilise Back-end Service you can move the streaming logic to StreamService
static class.
// /src/modules/stream/StreamService.ts
type Token = { message: string };
export default class StreamService {
static async *streamTokens() {
const tokens: Token[] = [
{ message: 'Hello,' },
{ message: ' World' },
{ message: '!' },
];
for (const token of tokens) {
await new Promise((resolve) => setTimeout(resolve, 300));
yield token;
}
}
}
At the controller use yield*
syntax to delegate iterable returned from StreamService.streamTokens
.
import { get, prefix } from 'vovk';
import StreamService from './StreamService';
@prefix('stream')
export default class StreamController {
@get('tokens')
static async *streamTokens() {
yield* StreamService.streamTokens();
}
}
StreamResponse Class
In some cases it's too hard to use generators to implement response streaming. Vovk.ts introduces StreamResponse
class inherited from Response
class that uses TransformStream#readable
as body and adds required HTTP headers.
It's a lower-level API that is used behind the scenes to implement generator logic explained above.
Service method at this case is implemented as a regular function that accepts StreamResponse
instance as a pointer to send messages manually.
There is what the streaming service might look like:
// /src/modules/stream/StreamService.ts
import type { StreamResponse } from 'vovk';
export type Token = { message: string };
export default class StreamService {
static async streamTokens(resp: StreamResponse<Token>) {
const tokens: Token[] = [
{ message: 'Hello,' },
{ message: ' World' },
{ message: '!' },
];
for (const token of tokens) {
await new Promise((resolve) => setTimeout(resolve, 300));
resp.send(token);
}
resp.close();
}
}
As you can see tokens are sent using StreamResponse#send
method and, when the stream is completed, it needs to be closed with StreamResponse#close
.
The Controller Class returns an instance of StreamResponse
and the streaming is performed a floating Promise above the return
statement.
import { prefix, get, StreamResponse, type VovkRequest } from 'vovk';
import StreamService, { type Token } from './StreamService';
@prefix('stream')
export default class StreamController {
@get('tokens')
static async streamTokens() {
const resp = new StreamResponse<Token>();
void StreamService.streamTokens(resp);
return resp;
}
}
StreamResponse
class also provides throw
methods that safely closes the stream and makes the client to re-throw the received error.
await resp.throw(new Error('Stream error'));
Handling Stream Responses on the Client
Both ways of response streaming generate client method that returns a disposable async generator.
import { StreamController } from 'vovk-client';
{
using stream = await StreamController.streamTokens();
for await (const token of stream) {
console.log(token);
}
}
using
keyword (that you can freely replace by let
or const
) indicates that when code block is reached the end (in case of early break
or if the code block encountered an error) the stream is going to be closed by invoking stream.close()
method automatically. stream.close()
can also be called explicitly if needed.
To make sure that the stream is closed before moving to the next code block you can use await using
syntax that disposes the stream asynchronous way.
import { StreamController } from 'vovk-client';
{
await using stream = await StreamController.streamTokens();
// ...
}
// on this line stream is already closed