Skip to Content
Real-Time Polling 🧪 🚧

TODO: deletions need to be explicit TODO: Add ticker

Real-time polling (experimental)

In the previous article, we covered how to use the OpenAPI Real-time API to deliver instant UI updates while the user interacts with the app. The LLM functions created with createLLMTools ran in the browser, where the AI performed HTTP requests to the server using RPC modules.

On this page, we explain how to use database polling to receive updates triggered by other users or third-party services (such as MCP), keeping the UI in sync with the database. To ensure the app can be deployed to any hosting provider, we avoid non-HTTP protocols such as WebSockets and instead use HTTP polling powered by JSONLines.

Redis DB as event bus

While we could poll the main Postgres database for changes, that approach is inefficient. Instead, we use Redis as an event bus. Whenever the main database changes, we write a small event to Redis. Our polling service reads these events every second and relays them to clients with the app open.

Because we use Prisma  as our ORM, we rely on Prisma Extensions  to hook into database operations and write events to Redis. This is where the DatabaseService mentioned in the previous article comes into play.

DatabaseService.ts
1import { PrismaClient } from "../../../prisma/generated/client";
2import { PrismaNeon } from "@prisma/adapter-neon";
3import DatabaseEventsService, { type DBChange } from "./DatabaseEventsService";
4import type { BaseEntity } from "@/types";
5
6export default class DatabaseService {
7 static get prisma() {
8 return (this.#prisma ??= this.getClient());
9 }
10 static #prisma: ReturnType<typeof DatabaseService.getClient> | null = null;
11
12 private static getClient() {
13 const prisma = new PrismaClient({
14 adapter: new PrismaNeon({
15 connectionString: `${process.env.DATABASE_URL}`,
16 }),
17 });
18
19 DatabaseEventsService.beginEmitting();
20
21 return prisma.$extends({
22 name: "events",
23 query: {
24 $allModels: {
25 async $allOperations({ model, operation, args, query }) {
26 const allowedOperations = [
27 "create",
28 "update",
29 "delete",
30 "findMany",
31 "findUnique",
32 "findFirst",
33 "count",
34 ] as const;
35 type AllowedOperation = (typeof allowedOperations)[number];
36 if (!allowedOperations.includes(operation as AllowedOperation)) {
37 throw new Error(
38 `Unsupported database operation "${operation}" on model "${model}"`,
39 );
40 }
41 const result = (await query(args)) as BaseEntity | BaseEntity[];
42
43 const now = new Date().toISOString();
44 let change: DBChange | null = null;
45
46 const makeChange = (
47 entity: BaseEntity,
48 type: DBChange["type"],
49 ) => ({
50 id: entity.id,
51 entityType: entity.entityType,
52 date:
53 type === "delete"
54 ? now
55 : entity.updatedAt
56 ? new Date(entity.updatedAt).toISOString()
57 : now,
58 type,
59 });
60
61 switch (operation as AllowedOperation) {
62 case "create":
63 if ("entityType" in result)
64 change = makeChange(result, "create");
65 break;
66
67 case "update":
68 if ("entityType" in result)
69 change = makeChange(result, "update");
70 break;
71
72 case "delete":
73 if ("entityType" in result) {
74 change = makeChange(result, "delete");
75 // Automatically add __isDeleted flag to deletion results
76 Object.assign(result, { __isDeleted: true });
77 }
78 break;
79
80 case "findMany":
81 case "findUnique":
82 case "findFirst":
83 case "count":
84 // no events
85 break;
86
87 default:
88 console.warn(
89 `Unhandled Prisma operation: ${operation} for model: ${model}`,
90 );
91 break;
92 }
93
94 if (change) {
95 await DatabaseEventsService.createChanges([change]);
96 }
97
98 return result;
99 },
100 },
101 },
102 });
103 }
104}
1import { PrismaClient } from "../../../prisma/generated/client";
2import { PrismaNeon } from "@prisma/adapter-neon";
3import DatabaseEventsService, { type DBChange } from "./DatabaseEventsService";
4import type { BaseEntity } from "@/types";
5
6export default class DatabaseService {
7 static get prisma() {
8 return (this.#prisma ??= this.getClient());
9 }
10 static #prisma: ReturnType<typeof DatabaseService.getClient> | null = null;
11
12 private static getClient() {
13 const prisma = new PrismaClient({
14 adapter: new PrismaNeon({
15 connectionString: `${process.env.DATABASE_URL}`,
16 }),
17 });
18
19 DatabaseEventsService.beginEmitting();
20
21 return prisma.$extends({
22 name: "events",
23 query: {
24 $allModels: {
25 async $allOperations({ model, operation, args, query }) {
26 const allowedOperations = [
27 "create",
28 "update",
29 "delete",
30 "findMany",
31 "findUnique",
32 "findFirst",
33 "count",
34 ] as const;
35 type AllowedOperation = (typeof allowedOperations)[number];
36 if (!allowedOperations.includes(operation as AllowedOperation)) {
37 throw new Error(
38 `Unsupported database operation "${operation}" on model "${model}"`,
39 );
40 }
41 const result = (await query(args)) as BaseEntity | BaseEntity[];
42
43 const now = new Date().toISOString();
44 let change: DBChange | null = null;
45
46 const makeChange = (
47 entity: BaseEntity,
48 type: DBChange["type"],
49 ) => ({
50 id: entity.id,
51 entityType: entity.entityType,
52 date:
53 type === "delete"
54 ? now
55 : entity.updatedAt
56 ? new Date(entity.updatedAt).toISOString()
57 : now,
58 type,
59 });
60
61 switch (operation as AllowedOperation) {
62 case "create":
63 if ("entityType" in result)
64 change = makeChange(result, "create");
65 break;
66
67 case "update":
68 if ("entityType" in result)
69 change = makeChange(result, "update");
70 break;
71
72 case "delete":
73 if ("entityType" in result) {
74 change = makeChange(result, "delete");
75 // Automatically add __isDeleted flag to deletion results
76 Object.assign(result, { __isDeleted: true });
77 }
78 break;
79
80 case "findMany":
81 case "findUnique":
82 case "findFirst":
83 case "count":
84 // no events
85 break;
86
87 default:
88 console.warn(
89 `Unhandled Prisma operation: ${operation} for model: ${model}`,
90 );
91 break;
92 }
93
94 if (change) {
95 await DatabaseEventsService.createChanges([change]);
96 }
97
98 return result;
99 },
100 },
101 },
102 });
103 }
104}
  • The getClient method calls DatabaseEventsService.beginEmitting() to start emitting events. beginEmitting runs a setInterval that connects to Redis and periodically checks for new events. When a new event is found, it emits it via mitt .
  • prisma.$extends hooks into all Prisma model operations, determines whether an operation modifies data, and if so calls await DatabaseEventsService.createChanges([change]) to persist a change entry in Redis. The change captures creates, updates, and deletions:
export type DBChange = { id: string; entityType: EntityType; date: string; type: "create" | "update" | "delete"; };

The date field indicates when the change occurred.

  • For create and update operations, it uses the updatedAt DB field (Important: all write operations must set this field).
  • For delete operations, it uses the current time.

The delete operation also adds an __isDeleted property. The front end checks this property to hide the deleted entity by setting enumerable: false on the entity registry item (see the previous article).

Operations like find... and count do not trigger changes and are passed through as-is.

In addition to beginEmitting and createChanges, DatabaseEventsService provides a connect method and an emitter (a mitt instance). These are used by the polling service (DatabasePollService, discussed next) to be notified about new events.

DatabaseEventsService.ts
1import { EntityType } from "../../../prisma/generated/client";
2import mitt from "mitt";
3import { createClient } from "redis";
4
5export type DBChange = {
6 id: string;
7 entityType: EntityType;
8 date: string;
9 type: "create" | "update" | "delete";
10};
11
12export default class DatabaseEventsService {
13 public static readonly DB_KEY = "db_updates";
14
15 private static readonly INTERVAL = 1_000;
16 private static lastTimestamp = Date.now();
17
18 private static redisClient = createClient({
19 url: process.env.REDIS_URL,
20 });
21
22 public static emitter = mitt<{
23 [DatabaseEventsService.DB_KEY]: DBChange[];
24 }>();
25
26 // ensure Redis is connected
27 private static async connect() {
28 if (!this.redisClient.isOpen) {
29 await this.redisClient.connect();
30 this.redisClient.on("error", (err) => {
31 console.error("Redis Client Error", err);
32 });
33 }
34 }
35
36 // push one update into our ZSET, with score = timestamp
37 public static async createChanges(changes: DBChange[]) {
38 if (changes.length === 0) return;
39
40 await this.connect();
41
42 // build array of { score, value } objects
43 const entries = changes.map(({ id, entityType, type, date }) => ({
44 score: Date.now(),
45 value: JSON.stringify({ id, entityType, date, type }),
46 }));
47
48 // one multi(): batch ZADD + EXPIRE
49 await this.redisClient
50 .multi()
51 .zAdd(this.DB_KEY, entries)
52 .expire(this.DB_KEY, (this.INTERVAL * 60) / 1000)
53 .exec();
54 }
55
56 public static beginEmitting() {
57 setInterval(async () => {
58 await this.connect();
59
60 const now = Date.now();
61
62 // get everything with score ∈ (lastTimestamp, now]
63 const raw = await this.redisClient.zRangeByScore(
64 this.DB_KEY,
65 this.lastTimestamp + 1,
66 now,
67 );
68
69 this.lastTimestamp = now;
70
71 if (raw.length > 0) {
72 const updates = raw.map((s) => JSON.parse(s) as DBChange);
73 this.emitter.emit(this.DB_KEY, updates);
74 }
75 }, this.INTERVAL);
76 }
77}
1import { EntityType } from "../../../prisma/generated/client";
2import mitt from "mitt";
3import { createClient } from "redis";
4
5export type DBChange = {
6 id: string;
7 entityType: EntityType;
8 date: string;
9 type: "create" | "update" | "delete";
10};
11
12export default class DatabaseEventsService {
13 public static readonly DB_KEY = "db_updates";
14
15 private static readonly INTERVAL = 1_000;
16 private static lastTimestamp = Date.now();
17
18 private static redisClient = createClient({
19 url: process.env.REDIS_URL,
20 });
21
22 public static emitter = mitt<{
23 [DatabaseEventsService.DB_KEY]: DBChange[];
24 }>();
25
26 // ensure Redis is connected
27 private static async connect() {
28 if (!this.redisClient.isOpen) {
29 await this.redisClient.connect();
30 this.redisClient.on("error", (err) => {
31 console.error("Redis Client Error", err);
32 });
33 }
34 }
35
36 // push one update into our ZSET, with score = timestamp
37 public static async createChanges(changes: DBChange[]) {
38 if (changes.length === 0) return;
39
40 await this.connect();
41
42 // build array of { score, value } objects
43 const entries = changes.map(({ id, entityType, type, date }) => ({
44 score: Date.now(),
45 value: JSON.stringify({ id, entityType, date, type }),
46 }));
47
48 // one multi(): batch ZADD + EXPIRE
49 await this.redisClient
50 .multi()
51 .zAdd(this.DB_KEY, entries)
52 .expire(this.DB_KEY, (this.INTERVAL * 60) / 1000)
53 .exec();
54 }
55
56 public static beginEmitting() {
57 setInterval(async () => {
58 await this.connect();
59
60 const now = Date.now();
61
62 // get everything with score ∈ (lastTimestamp, now]
63 const raw = await this.redisClient.zRangeByScore(
64 this.DB_KEY,
65 this.lastTimestamp + 1,
66 now,
67 );
68
69 this.lastTimestamp = now;
70
71 if (raw.length > 0) {
72 const updates = raw.map((s) => JSON.parse(s) as DBChange);
73 this.emitter.emit(this.DB_KEY, updates);
74 }
75 }, this.INTERVAL);
76 }
77}

Polling controller and service

With Redis change entries and the change emitter in place, we can implement a polling endpoint that streams updates to clients in real time. The DatabasePollController exposes a single JSONLines endpoint, and DatabasePollService uses a JSONLinesResponse instance (received from the controller) to send data to clients. The service closes the connection safely after 30 seconds, so clients should reconnect.

DatabasePollService.ts
DatabasePollController.ts
1import { JSONLinesResponse, type VovkIteration } from "vovk";
2import { forEach, groupBy } from "lodash";
3import type DatabasePollController from "./DatabasePollController";
4import DatabaseEventsService, { type DBChange } from "./DatabaseEventsService";
5import DatabaseService from "./DatabaseService";
6
7export default class PollService {
8 static poll(
9 resp: JSONLinesResponse<VovkIteration<typeof DatabasePollController.poll>>,
10 ) {
11 setTimeout(resp.close, 30_000);
12
13 let asOldAs = new Date();
14 // 10 minutes ago; TODO: use latest update date from registry
15 asOldAs.setMinutes(asOldAs.getMinutes() - 10);
16
17 DatabaseEventsService.emitter.on(
18 DatabaseEventsService.DB_KEY,
19 (changes) => {
20 const deleted = changes.filter((change) => change.type === "delete");
21 const createdOrUpdated = changes.filter(
22 (change) => change.type === "create" || change.type === "update",
23 );
24
25 for (const deletedEntity of deleted) {
26 resp.send({
27 id: deletedEntity.id,
28 entityType: deletedEntity.entityType,
29 __isDeleted: true,
30 });
31 }
32 // group by entityType and date, so the date is maximum date for the given entity: { entityType: string, date: string }[]
33 forEach(groupBy(createdOrUpdated, "entityType"), (changes) => {
34 const maxDateItem = changes.reduce(
35 (max, change) => {
36 const changeDate = new Date(change.date);
37 return changeDate.getTime() > new Date(max.date).getTime()
38 ? change
39 : max;
40 },
41 { date: new Date(0) } as unknown as DBChange,
42 );
43
44 if (new Date(maxDateItem.date).getTime() > asOldAs.getTime()) {
45 void DatabaseService.prisma[maxDateItem.entityType as "user"]
46 .findMany({
47 where: {
48 updatedAt: {
49 gt: asOldAs,
50 },
51 },
52 })
53 .then((entities) => {
54 for (const entity of entities) {
55 resp.send(entity);
56 }
57 });
58 asOldAs = new Date(maxDateItem.date);
59 }
60 });
61 },
62 );
63 }
64}
1import { JSONLinesResponse, type VovkIteration } from "vovk";
2import { forEach, groupBy } from "lodash";
3import type DatabasePollController from "./DatabasePollController";
4import DatabaseEventsService, { type DBChange } from "./DatabaseEventsService";
5import DatabaseService from "./DatabaseService";
6
7export default class PollService {
8 static poll(
9 resp: JSONLinesResponse<VovkIteration<typeof DatabasePollController.poll>>,
10 ) {
11 setTimeout(resp.close, 30_000);
12
13 let asOldAs = new Date();
14 // 10 minutes ago; TODO: use latest update date from registry
15 asOldAs.setMinutes(asOldAs.getMinutes() - 10);
16
17 DatabaseEventsService.emitter.on(
18 DatabaseEventsService.DB_KEY,
19 (changes) => {
20 const deleted = changes.filter((change) => change.type === "delete");
21 const createdOrUpdated = changes.filter(
22 (change) => change.type === "create" || change.type === "update",
23 );
24
25 for (const deletedEntity of deleted) {
26 resp.send({
27 id: deletedEntity.id,
28 entityType: deletedEntity.entityType,
29 __isDeleted: true,
30 });
31 }
32 // group by entityType and date, so the date is maximum date for the given entity: { entityType: string, date: string }[]
33 forEach(groupBy(createdOrUpdated, "entityType"), (changes) => {
34 const maxDateItem = changes.reduce(
35 (max, change) => {
36 const changeDate = new Date(change.date);
37 return changeDate.getTime() > new Date(max.date).getTime()
38 ? change
39 : max;
40 },
41 { date: new Date(0) } as unknown as DBChange,
42 );
43
44 if (new Date(maxDateItem.date).getTime() > asOldAs.getTime()) {
45 void DatabaseService.prisma[maxDateItem.entityType as "user"]
46 .findMany({
47 where: {
48 updatedAt: {
49 gt: asOldAs,
50 },
51 },
52 })
53 .then((entities) => {
54 for (const entity of entities) {
55 resp.send(entity);
56 }
57 });
58 asOldAs = new Date(maxDateItem.date);
59 }
60 });
61 },
62 );
63 }
64}
  • When a delete DB change is emitted via DatabaseEventsService.emitter, the service sends an event with id, entityType, and __isDeleted: true. The front end uses __isDeleted to hide the entity by making it non-enumerable in the registry.
  • When an update or create change is emitted, the service fetches the full entity from Postgres (since Redis stores only metadata) and sends it to clients.

Client-side logic

On the client side (for example, in a React component), call DatabasePollRPC.poll() to receive a stream of database events. As with any JSONLines RPC method, it returns an async iterable that you can consume in a for await loop. Because the server may close the connection or a network error may occur, wrap the logic in a retry loop and avoid reconnecting on AbortError. Since the fetcher is already configured, the loop body can be empty—you do not need to handle data manually.

Front-end code, besides the polling logic, also includes on/off toggle state saved to localStorage, so the user can enable or disable polling as needed.

const [isPollingEnabled, setIsPollingEnabled] = useState(false); const pollingAbortControllerRef = useRef<AbortController | null>(null); useEffect(() => { const isEnabled = localStorage.getItem("isPollingEnabled"); setIsPollingEnabled(isEnabled === "true"); }, []); useEffect(() => { localStorage.setItem("isPollingEnabled", isPollingEnabled.toString()); async function poll(retries = 0) { if (!isPollingEnabled) { pollingAbortControllerRef.current?.abort(); return; } try { while (true) { console.log("START POLLING"); const iterable = await DatabasePollRPC.poll(); pollingAbortControllerRef.current = iterable.abortController; for await (const iteration of iterable) { console.log("New DB update:", iteration); } } } catch (error) { if ( retries < 5 && (error as Error & { cause?: Error }).cause?.name !== "AbortError" ) { console.error("Polling failed, retrying...", error); await new Promise((resolve) => setTimeout(resolve, 2000)); return poll(retries + 1); } } } void poll(); return () => { pollingAbortControllerRef.current?.abort(); }; }, [isPollingEnabled]);

Bonus: Telegram bot (unstable)

As an example of a third-party source of database changes, see the TelegramService  and the accompanying TelegramController . It accepts text or voice messages, performs voice-to-text transcription if needed using the OpenAI Whisper API, and uses the same createLLMTools function on the server:

const { tools } = createLLMTools({ modules: { UserController, TaskController, }, });

The Telegram API library is implemented with OpenAPI Mixins and used as a TelegramRPC module to call Telegram API methods.

vovk.config.mjs
// @ts-check /** @type {import('vovk').VovkConfig} */ const config = { // ... outputConfig: { // ... segments: { telegram: { openAPIMixin: { source: { url: "https://raw.githubusercontent.com/sys-001/telegram-bot-api-versions/refs/heads/main/files/openapi/yaml/v183.yaml", fallback: ".openapi-cache/telegram.yaml", }, getModuleName: "TelegramRPC", getMethodName: ({ path }) => path.replace(/^\//, ""), errorMessageKey: "description", }, }, }, }, }; export default config;

The TelegramService class handles interaction with the Telegram API and generates AI responses using the Vercel AI SDK.

src/modules/telegram/TelegramService.ts
import OpenAI from "openai"; import { TelegramRPC } from "vovk-client"; const openai = new OpenAI(); export default class TelegramService { static get apiRoot() { const TELEGRAM_BOT_TOKEN = process.env.TELEGRAM_BOT_TOKEN; if (!TELEGRAM_BOT_TOKEN) { throw new Error("Missing TELEGRAM_BOT_TOKEN environment variable"); } return `https://api.telegram.org/bot${TELEGRAM_BOT_TOKEN}`; } // ... private static async generateAIResponse( chatId: number, userMessage: string, systemPrompt: string, ): Promise<{ botResponse: string; messages: ModelMessage[] }> { // Get chat history const history = await this.getChatHistory(chatId); const messages = [ ...this.formatHistoryForVercelAI(history), { role: "user", content: userMessage } as const, ]; const { tools } = createLLMTools({ modules: { UserController, TaskController, }, }); // Generate a response using Vercel AI SDK const { text } = await generateText({ model: vercelOpenAI("gpt-5"), system: systemPrompt, messages, stopWhen: stepCountIs(16), tools: { ...Object.fromEntries( tools.map(({ name, execute, description, parameters }) => [ name, tool<KnownAny, KnownAny>({ execute, description, inputSchema: jsonSchema(parameters as KnownAny), }), ]), ), }, }); const botResponse = text || "I couldn't generate a response."; // Add user message to history await this.addToHistory(chatId, "user", userMessage); // Add assistant response to history await this.addToHistory(chatId, "assistant", botResponse); messages.push({ role: "assistant", content: botResponse, }); return { botResponse, messages }; } private static async sendTextMessage( chatId: number, text: string, ): Promise<void> { await TelegramRPC.sendMessage({ body: { chat_id: chatId, text: text, parse_mode: "html", }, apiRoot: this.apiRoot, }); } private static async sendVoiceMessage( chatId: number, text: string, ): Promise<void> { try { // Generate speech from text using OpenAI TTS const speechResponse = await openai.audio.speech.create({ model: "tts-1", voice: "alloy", input: text, response_format: "opus", }); // Convert the response to a Buffer const voiceBuffer = Buffer.from(await speechResponse.arrayBuffer()); const formData = new FormData(); formData.append("chat_id", String(chatId)); formData.append( "voice", new Blob([voiceBuffer], { type: "audio/ogg" }), "voice.ogg", ); // Send the voice message await TelegramRPC.sendVoice({ body: formData, apiRoot: this.apiRoot, }); } catch (error) { console.error("Error generating voice message:", error); // Fallback to text message if voice generation fails await this.sendTextMessage(chatId, text); } } // ... }
Last updated on