Controller
Text Streaming

Text Streaming for LLMs

Vovk.ts provides two ways to implement text response streaming requred for applications that utilise the AI completions.

Async iterators

Controller methods can implement generators that use * syntax and utilise yield keyword instead of regular return.

// /src/modules/stream/StreamController.ts
import { get, prefix } from 'vovk';
 
type Token = { message: string };
 
@prefix('stream')
export default class StreamController {
  @get('tokens')
  static async *streamTokens() {
    const tokens: Token[] = [
      { message: 'Hello,' },
      { message: ' World' },
      { message: '!' },
    ];
 
    for (const token of tokens) {
      await new Promise((resolve) => setTimeout(resolve, 300));
      yield token;
    }
  }
}

In order to refactor this code and utilise Back-end Service you can move the streaming logic to StreamService static class.

// /src/modules/stream/StreamService.ts
type Token = { message: string };
 
export default class StreamService {
  static async *streamTokens() {
    const tokens: Token[] = [
      { message: 'Hello,' },
      { message: ' World' },
      { message: '!' },
    ];
 
    for (const token of tokens) {
      await new Promise((resolve) => setTimeout(resolve, 300));
      yield token;
    }
  }
}

At the controller use yield* syntax to delegate iterable returned from StreamService.streamTokens.

import { get, prefix } from 'vovk';
import StreamService from './StreamService';
 
@prefix('stream')
export default class StreamController {
  @get('tokens')
  static async *streamTokens() {
    yield* StreamService.streamTokens();
  }
}

StreamResponse Class

In some cases it's too hard to use generators to implement response streaming. Vovk.ts introduces StreamResponse class inherited from Response class that uses TransformStream#readable as body and adds required HTTP headers. It's a lower-level API that is used behind the scenes to implement generator logic explained above. Service method at this case is implemented as a regular function that accepts StreamResponse instance as a pointer to send messages manually.

There is what the streaming service might look like:

// /src/modules/stream/StreamService.ts
import type { StreamResponse } from 'vovk';
 
export type Token = { message: string };
 
export default class StreamService {
  static async streamTokens(resp: StreamResponse<Token>) {
    const tokens: Token[] = [
      { message: 'Hello,' },
      { message: ' World' },
      { message: '!' },
    ];
 
    for (const token of tokens) {
      await new Promise((resolve) => setTimeout(resolve, 300));
      resp.send(token);
    }
 
    resp.close();
  }
}

As you can see tokens are sent using StreamResponse#send method and, when the stream is completed, it needs to be closed with StreamResponse#close.

The Controller Class returns an instance of StreamResponse and the streaming is performed a floating Promise above the return statement.

import { prefix, get, StreamResponse, type VovkRequest } from 'vovk';
import StreamService, { type Token } from './StreamService';
 
@prefix('stream')
export default class StreamController {
  @get('tokens')
  static async streamTokens() {
    const resp = new StreamResponse<Token>();
 
    void StreamService.streamTokens(resp);
 
    return resp;
  }
}

StreamResponse class also provides throw methods that safely closes the stream and makes the client to re-throw the received error.

await resp.throw(new Error('Stream error'));

Handling Stream Responses on the Client

Both ways of response streaming generate client method that returns a disposable async generator.

import { StreamController } from 'vovk-client';
 
{
    using stream = await StreamController.streamTokens();
 
    for await (const token of stream) {
        console.log(token);
    }
}

using keyword (that you can freely replace by let or const) indicates that when code block is reached the end (in case of early break or if the code block encountered an error) the stream is going to be closed by invoking stream.close() method automatically. stream.close() can also be called explicitly if needed.

To make sure that the stream is closed before moving to the next code block you can use await using syntax that disposes the stream asynchronous way.

import { StreamController } from 'vovk-client';
 
{
    await using stream = await StreamController.streamTokens();
    // ...
}
// on this line stream is already closed