Skip to Content
Real-time Polling 🧪 🚧

Real-time polling (experimental)

In the previous article we’ve covered how to use LLM Real-time API to get instant updates in the UI while user talks to the app. On this page we’ll cover how to use an experimental polling feature to get updates from the server made by other users or third party services, such as MCP, getting UI to be in sync with the server. In order to make the app deployable to any hosting provider, we want to avoid any non-HTTP protocols, such as WebSockets, so we use simple HTTP polling powered by JSONLines to achieve this.

Redis DB as event bus

Theoretically, we could just poll the main database for changes, but that would be inefficient in terms of DB load and network traffic. Instead, we use a separate Redis database as an event bus. Whenever something changes in the main database, we write a small event to Redis, and our polling service reads these events and sends them to the clients.

Since we use Prisma  as our main database ORM, we can use Prisma Extensions  to hook into database operations and write events to Redis. That’s where the DatabaseService mentioned at previous article comes into play.

DatabaseService.ts
DatabaseEventsService.ts
1// prismaService.ts
2import { PrismaClient } from "@prisma/client";
3import { PrismaNeon } from "@prisma/adapter-neon";
4import { neonConfig } from "@neondatabase/serverless";
5import DBEventsService, { DBChange } from "./DatabaseEventsService";
6import { BaseEntity } from "@/types";
7
8if (!process.env.VERCEL_ENV) {
9 neonConfig.wsProxy = (host) => `${host}:5433/v1`;
10 neonConfig.useSecureWebSocket = false;
11 neonConfig.pipelineTLS = false;
12 neonConfig.pipelineConnect = false;
13}
14
15export default class DatabaseService {
16 static get prisma() {
17 return (this.#client ??= this.getClient());
18 }
19 static #client: ReturnType<typeof DatabaseService.getClient> | null = null;
20
21 private static getClient() {
22 const adapter = new PrismaNeon({
23 connectionString: `${process.env.DATABASE_URL}`,
24 });
25 const prisma = new PrismaClient({ adapter });
26
27 DBEventsService.startPolling();
28
29 return prisma.$extends({
30 name: "events",
31 query: {
32 $allModels: {
33 async $allOperations({ model, operation, args, query }) {
34 const result = (await query(args)) as BaseEntity;
35 if (!result?.entityType) return result;
36
37 const now = new Date().toISOString();
38 const changes: DBChange[] = [];
39
40 const makeChange = (
41 entity: BaseEntity,
42 type: DBChange["type"],
43 ) => ({
44 id: entity.id,
45 entityType: entity.entityType,
46 date:
47 type === "delete" ? now : (entity.updatedAt?.toString() ?? now),
48 type,
49 });
50
51 switch (operation) {
52 case "create":
53 if (result.id) changes.push(makeChange(result, "create"));
54 break;
55
56 case "update":
57 if (result.id) changes.push(makeChange(result, "update"));
58 break;
59
60 case "delete":
61 if (result.id) changes.push(makeChange(result, "delete"));
62 break;
63
64 // other operations like deleteMany should be implemented separately
65
66 default:
67 console.warn(
68 `Unhandled Prisma operation: ${operation} for model: ${model}`,
69 );
70 break;
71 }
72
73 if (changes.length) {
74 await DBEventsService.write(changes);
75 }
76
77 return result;
78 },
79 },
80 },
81 });
82 }
83}
1// prismaService.ts
2import { PrismaClient } from "@prisma/client";
3import { PrismaNeon } from "@prisma/adapter-neon";
4import { neonConfig } from "@neondatabase/serverless";
5import DBEventsService, { DBChange } from "./DatabaseEventsService";
6import { BaseEntity } from "@/types";
7
8if (!process.env.VERCEL_ENV) {
9 neonConfig.wsProxy = (host) => `${host}:5433/v1`;
10 neonConfig.useSecureWebSocket = false;
11 neonConfig.pipelineTLS = false;
12 neonConfig.pipelineConnect = false;
13}
14
15export default class DatabaseService {
16 static get prisma() {
17 return (this.#client ??= this.getClient());
18 }
19 static #client: ReturnType<typeof DatabaseService.getClient> | null = null;
20
21 private static getClient() {
22 const adapter = new PrismaNeon({
23 connectionString: `${process.env.DATABASE_URL}`,
24 });
25 const prisma = new PrismaClient({ adapter });
26
27 DBEventsService.startPolling();
28
29 return prisma.$extends({
30 name: "events",
31 query: {
32 $allModels: {
33 async $allOperations({ model, operation, args, query }) {
34 const result = (await query(args)) as BaseEntity;
35 if (!result?.entityType) return result;
36
37 const now = new Date().toISOString();
38 const changes: DBChange[] = [];
39
40 const makeChange = (
41 entity: BaseEntity,
42 type: DBChange["type"],
43 ) => ({
44 id: entity.id,
45 entityType: entity.entityType,
46 date:
47 type === "delete" ? now : (entity.updatedAt?.toString() ?? now),
48 type,
49 });
50
51 switch (operation) {
52 case "create":
53 if (result.id) changes.push(makeChange(result, "create"));
54 break;
55
56 case "update":
57 if (result.id) changes.push(makeChange(result, "update"));
58 break;
59
60 case "delete":
61 if (result.id) changes.push(makeChange(result, "delete"));
62 break;
63
64 // other operations like deleteMany should be implemented separately
65
66 default:
67 console.warn(
68 `Unhandled Prisma operation: ${operation} for model: ${model}`,
69 );
70 break;
71 }
72
73 if (changes.length) {
74 await DBEventsService.write(changes);
75 }
76
77 return result;
78 },
79 },
80 },
81 });
82 }
83}

As you can see, we create a Prisma client extension that hooks into create, update, and delete operations for all models. Whenever one of these operations is performed, we write an event to Redis with the help of DatabaseEventsService class with the model id, entity type, date and operation type. This is enough information for our polling service to know what has changed.

Polling endpoint

Now that we have a way to write events to Redis, we need a way to read them and send them to the clients. This is where the DatabasePollService and DatabasePollController come into play. The controller implements one JSONLines endpoint that is going to stream data to the clients as it becomes available. The service uses JSONLinesResponse instance passed from the controller to send data to the clients and safely closes the connection in 30 seconds, so the clients will need to reconnect.

DatabasePollService.ts
DatabasePollController.ts
1import { JSONLinesResponse, VovkIteration } from "vovk";
2import type DatabasePollController from "./DatabasePollController";
3import DatabaseEventsService, { DBChange } from "./DatabaseEventsService";
4import DatabaseService from "./DatabaseService";
5import { forEach, groupBy } from "lodash";
6
7export default class PollService {
8 static poll(
9 resp: JSONLinesResponse<VovkIteration<typeof DatabasePollController.poll>>,
10 ) {
11 setTimeout(resp.close, 30_000);
12
13 let asOldAs = new Date();
14 DatabaseEventsService.emitter.on("db_updates", (changes) => {
15 const deleted = changes.filter((change) => change.type === "delete");
16 const createdOrUpdated = changes.filter(
17 (change) => change.type === "create" || change.type === "update",
18 );
19
20 for (const deletedEntity of deleted) {
21 resp.send({
22 id: deletedEntity.id,
23 entityType: deletedEntity.entityType,
24 __isDeleted: true,
25 });
26 }
27 // group by entityType and date, so the date is maximum date for the given entity: { entityType: string, date: string }[]
28 forEach(groupBy(createdOrUpdated, "entityType"), (changes) => {
29 const maxDateItem = changes.reduce(
30 (max, change) => {
31 const changeDate = new Date(change.date);
32 return changeDate.getTime() > new Date(max.date).getTime()
33 ? change
34 : max;
35 },
36 { date: new Date(0) } as unknown as DBChange,
37 );
38
39 if (new Date(maxDateItem.date).getTime() > asOldAs.getTime()) {
40 void DatabaseService.prisma[maxDateItem.entityType as "user"]
41 .findMany({
42 where: {
43 updatedAt: {
44 gte: asOldAs,
45 },
46 },
47 })
48 .then((entities) => {
49 for (const entity of entities) {
50 resp.send(entity);
51 }
52 });
53 asOldAs = new Date(maxDateItem.date);
54 }
55 });
56 });
57 }
58}
1import { JSONLinesResponse, VovkIteration } from "vovk";
2import type DatabasePollController from "./DatabasePollController";
3import DatabaseEventsService, { DBChange } from "./DatabaseEventsService";
4import DatabaseService from "./DatabaseService";
5import { forEach, groupBy } from "lodash";
6
7export default class PollService {
8 static poll(
9 resp: JSONLinesResponse<VovkIteration<typeof DatabasePollController.poll>>,
10 ) {
11 setTimeout(resp.close, 30_000);
12
13 let asOldAs = new Date();
14 DatabaseEventsService.emitter.on("db_updates", (changes) => {
15 const deleted = changes.filter((change) => change.type === "delete");
16 const createdOrUpdated = changes.filter(
17 (change) => change.type === "create" || change.type === "update",
18 );
19
20 for (const deletedEntity of deleted) {
21 resp.send({
22 id: deletedEntity.id,
23 entityType: deletedEntity.entityType,
24 __isDeleted: true,
25 });
26 }
27 // group by entityType and date, so the date is maximum date for the given entity: { entityType: string, date: string }[]
28 forEach(groupBy(createdOrUpdated, "entityType"), (changes) => {
29 const maxDateItem = changes.reduce(
30 (max, change) => {
31 const changeDate = new Date(change.date);
32 return changeDate.getTime() > new Date(max.date).getTime()
33 ? change
34 : max;
35 },
36 { date: new Date(0) } as unknown as DBChange,
37 );
38
39 if (new Date(maxDateItem.date).getTime() > asOldAs.getTime()) {
40 void DatabaseService.prisma[maxDateItem.entityType as "user"]
41 .findMany({
42 where: {
43 updatedAt: {
44 gte: asOldAs,
45 },
46 },
47 })
48 .then((entities) => {
49 for (const entity of entities) {
50 resp.send(entity);
51 }
52 });
53 asOldAs = new Date(maxDateItem.date);
54 }
55 });
56 });
57 }
58}

Client-side logic

On the client-side, anywhere in the app (e.g. a React component), we can use the DatabasePollRPC.poll() method to get a stream of database events. As any JSONLines RPC method, it returns an async iterable that we can use in a for await loop to get new events as they arrive. Since the connection can be closed by the server or fail due to network issues, we wrap the polling logic in a retry loop with exponential backoff. As the fetcher is already set, the body of the loop can be empty, as we don’t need to handle the data manually.

useEffect(() => { async function poll(retries = 0) { try { while (true) { using iterable = await DatabasePollRPC.poll(); for await (const iteration of iterable) { console.log("New DB update:", iteration); } } } catch (error) { if (retries < 5) { console.error("Polling failed, retrying...", error); await new Promise((resolve) => setTimeout(resolve, 2000)); return poll(retries + 1); } } } void poll(); }, []);

Practical usage: Telegram bot

As an example of a third-party source of database changes, you can explore the TelegramService  implementation and a small TelegramController . Coming soon…

Last updated on