Real-time polling (experimental)
In the previous article we’ve covered how to use LLM Real-time API to get instant updates in the UI while user talks to the app. On this page we’ll cover how to use an experimental polling feature to get updates from the server made by other users or third party services, such as MCP, getting UI to be in sync with the server. In order to make the app deployable to any hosting provider, we want to avoid any non-HTTP protocols, such as WebSockets, so we use simple HTTP polling powered by JSONLines to achieve this.
Redis DB as event bus
Theoretically, we could just poll the main database for changes, but that would be inefficient in terms of DB load and network traffic. Instead, we use a separate Redis database as an event bus. Whenever something changes in the main database, we write a small event to Redis, and our polling service reads these events and sends them to the clients.
Since we use Prisma as our main database ORM, we can use Prisma Extensions to hook into database operations and write events to Redis. That’s where the DatabaseService
mentioned at previous article comes into play.
1// prismaService.ts
2import { PrismaClient } from "@prisma/client";
3import { PrismaNeon } from "@prisma/adapter-neon";
4import { neonConfig } from "@neondatabase/serverless";
5import DBEventsService, { DBChange } from "./DatabaseEventsService";
6import { BaseEntity } from "@/types";
7
8if (!process.env.VERCEL_ENV) {
9 neonConfig.wsProxy = (host) => `${host}:5433/v1`;
10 neonConfig.useSecureWebSocket = false;
11 neonConfig.pipelineTLS = false;
12 neonConfig.pipelineConnect = false;
13}
14
15export default class DatabaseService {
16 static get prisma() {
17 return (this.#client ??= this.getClient());
18 }
19 static #client: ReturnType<typeof DatabaseService.getClient> | null = null;
20
21 private static getClient() {
22 const adapter = new PrismaNeon({
23 connectionString: `${process.env.DATABASE_URL}`,
24 });
25 const prisma = new PrismaClient({ adapter });
26
27 DBEventsService.startPolling();
28
29 return prisma.$extends({
30 name: "events",
31 query: {
32 $allModels: {
33 async $allOperations({ model, operation, args, query }) {
34 const result = (await query(args)) as BaseEntity;
35 if (!result?.entityType) return result;
36
37 const now = new Date().toISOString();
38 const changes: DBChange[] = [];
39
40 const makeChange = (
41 entity: BaseEntity,
42 type: DBChange["type"],
43 ) => ({
44 id: entity.id,
45 entityType: entity.entityType,
46 date:
47 type === "delete" ? now : (entity.updatedAt?.toString() ?? now),
48 type,
49 });
50
51 switch (operation) {
52 case "create":
53 if (result.id) changes.push(makeChange(result, "create"));
54 break;
55
56 case "update":
57 if (result.id) changes.push(makeChange(result, "update"));
58 break;
59
60 case "delete":
61 if (result.id) changes.push(makeChange(result, "delete"));
62 break;
63
64 // other operations like deleteMany should be implemented separately
65
66 default:
67 console.warn(
68 `Unhandled Prisma operation: ${operation} for model: ${model}`,
69 );
70 break;
71 }
72
73 if (changes.length) {
74 await DBEventsService.write(changes);
75 }
76
77 return result;
78 },
79 },
80 },
81 });
82 }
83}
1// prismaService.ts
2import { PrismaClient } from "@prisma/client";
3import { PrismaNeon } from "@prisma/adapter-neon";
4import { neonConfig } from "@neondatabase/serverless";
5import DBEventsService, { DBChange } from "./DatabaseEventsService";
6import { BaseEntity } from "@/types";
7
8if (!process.env.VERCEL_ENV) {
9 neonConfig.wsProxy = (host) => `${host}:5433/v1`;
10 neonConfig.useSecureWebSocket = false;
11 neonConfig.pipelineTLS = false;
12 neonConfig.pipelineConnect = false;
13}
14
15export default class DatabaseService {
16 static get prisma() {
17 return (this.#client ??= this.getClient());
18 }
19 static #client: ReturnType<typeof DatabaseService.getClient> | null = null;
20
21 private static getClient() {
22 const adapter = new PrismaNeon({
23 connectionString: `${process.env.DATABASE_URL}`,
24 });
25 const prisma = new PrismaClient({ adapter });
26
27 DBEventsService.startPolling();
28
29 return prisma.$extends({
30 name: "events",
31 query: {
32 $allModels: {
33 async $allOperations({ model, operation, args, query }) {
34 const result = (await query(args)) as BaseEntity;
35 if (!result?.entityType) return result;
36
37 const now = new Date().toISOString();
38 const changes: DBChange[] = [];
39
40 const makeChange = (
41 entity: BaseEntity,
42 type: DBChange["type"],
43 ) => ({
44 id: entity.id,
45 entityType: entity.entityType,
46 date:
47 type === "delete" ? now : (entity.updatedAt?.toString() ?? now),
48 type,
49 });
50
51 switch (operation) {
52 case "create":
53 if (result.id) changes.push(makeChange(result, "create"));
54 break;
55
56 case "update":
57 if (result.id) changes.push(makeChange(result, "update"));
58 break;
59
60 case "delete":
61 if (result.id) changes.push(makeChange(result, "delete"));
62 break;
63
64 // other operations like deleteMany should be implemented separately
65
66 default:
67 console.warn(
68 `Unhandled Prisma operation: ${operation} for model: ${model}`,
69 );
70 break;
71 }
72
73 if (changes.length) {
74 await DBEventsService.write(changes);
75 }
76
77 return result;
78 },
79 },
80 },
81 });
82 }
83}
1// prismaService.ts
2import { PrismaClient } from "@prisma/client";
3import { PrismaNeon } from "@prisma/adapter-neon";
4import { neonConfig } from "@neondatabase/serverless";
5import DBEventsService, { DBChange } from "./DatabaseEventsService";
6import { BaseEntity } from "@/types";
7
8if (!process.env.VERCEL_ENV) {
9 neonConfig.wsProxy = (host) => `${host}:5433/v1`;
10 neonConfig.useSecureWebSocket = false;
11 neonConfig.pipelineTLS = false;
12 neonConfig.pipelineConnect = false;
13}
14
15export default class DatabaseService {
16 static get prisma() {
17 return (this.#client ??= this.getClient());
18 }
19 static #client: ReturnType<typeof DatabaseService.getClient> | null = null;
20
21 private static getClient() {
22 const adapter = new PrismaNeon({
23 connectionString: `${process.env.DATABASE_URL}`,
24 });
25 const prisma = new PrismaClient({ adapter });
26
27 DBEventsService.startPolling();
28
29 return prisma.$extends({
30 name: "events",
31 query: {
32 $allModels: {
33 async $allOperations({ model, operation, args, query }) {
34 const result = (await query(args)) as BaseEntity;
35 if (!result?.entityType) return result;
36
37 const now = new Date().toISOString();
38 const changes: DBChange[] = [];
39
40 const makeChange = (
41 entity: BaseEntity,
42 type: DBChange["type"],
43 ) => ({
44 id: entity.id,
45 entityType: entity.entityType,
46 date:
47 type === "delete" ? now : (entity.updatedAt?.toString() ?? now),
48 type,
49 });
50
51 switch (operation) {
52 case "create":
53 if (result.id) changes.push(makeChange(result, "create"));
54 break;
55
56 case "update":
57 if (result.id) changes.push(makeChange(result, "update"));
58 break;
59
60 case "delete":
61 if (result.id) changes.push(makeChange(result, "delete"));
62 break;
63
64 // other operations like deleteMany should be implemented separately
65
66 default:
67 console.warn(
68 `Unhandled Prisma operation: ${operation} for model: ${model}`,
69 );
70 break;
71 }
72
73 if (changes.length) {
74 await DBEventsService.write(changes);
75 }
76
77 return result;
78 },
79 },
80 },
81 });
82 }
83}
1// prismaService.ts
2import { PrismaClient } from "@prisma/client";
3import { PrismaNeon } from "@prisma/adapter-neon";
4import { neonConfig } from "@neondatabase/serverless";
5import DBEventsService, { DBChange } from "./DatabaseEventsService";
6import { BaseEntity } from "@/types";
7
8if (!process.env.VERCEL_ENV) {
9 neonConfig.wsProxy = (host) => `${host}:5433/v1`;
10 neonConfig.useSecureWebSocket = false;
11 neonConfig.pipelineTLS = false;
12 neonConfig.pipelineConnect = false;
13}
14
15export default class DatabaseService {
16 static get prisma() {
17 return (this.#client ??= this.getClient());
18 }
19 static #client: ReturnType<typeof DatabaseService.getClient> | null = null;
20
21 private static getClient() {
22 const adapter = new PrismaNeon({
23 connectionString: `${process.env.DATABASE_URL}`,
24 });
25 const prisma = new PrismaClient({ adapter });
26
27 DBEventsService.startPolling();
28
29 return prisma.$extends({
30 name: "events",
31 query: {
32 $allModels: {
33 async $allOperations({ model, operation, args, query }) {
34 const result = (await query(args)) as BaseEntity;
35 if (!result?.entityType) return result;
36
37 const now = new Date().toISOString();
38 const changes: DBChange[] = [];
39
40 const makeChange = (
41 entity: BaseEntity,
42 type: DBChange["type"],
43 ) => ({
44 id: entity.id,
45 entityType: entity.entityType,
46 date:
47 type === "delete" ? now : (entity.updatedAt?.toString() ?? now),
48 type,
49 });
50
51 switch (operation) {
52 case "create":
53 if (result.id) changes.push(makeChange(result, "create"));
54 break;
55
56 case "update":
57 if (result.id) changes.push(makeChange(result, "update"));
58 break;
59
60 case "delete":
61 if (result.id) changes.push(makeChange(result, "delete"));
62 break;
63
64 // other operations like deleteMany should be implemented separately
65
66 default:
67 console.warn(
68 `Unhandled Prisma operation: ${operation} for model: ${model}`,
69 );
70 break;
71 }
72
73 if (changes.length) {
74 await DBEventsService.write(changes);
75 }
76
77 return result;
78 },
79 },
80 },
81 });
82 }
83}
1import { EntityType } from "@prisma/client";
2import mitt from "mitt";
3import { createClient } from "redis";
4
5export type DBChange = {
6 id: string;
7 entityType: EntityType;
8 date: string;
9 type: "create" | "update" | "delete";
10};
11
12export default class DatabaseEventsService {
13 private static readonly DB_KEY = "db_updates";
14 private static readonly INTERVAL = 1_000;
15 private static lastTimestamp = Date.now();
16
17 private static redisClient = createClient({
18 url: process.env.REDIS_URL,
19 });
20
21 // our in‑process event emitter
22 public static emitter = mitt<{
23 db_updates: DBChange[];
24 }>();
25
26 // ensure Redis is connected
27 private static async connect() {
28 if (!this.redisClient.isOpen) {
29 await this.redisClient.connect();
30 this.redisClient.on("error", (err) => {
31 console.error("Redis Client Error", err);
32 });
33 }
34 }
35
36 // push one update into our ZSET, with score = timestamp
37 public static async write(changes: DBChange[]) {
38 if (changes.length === 0) return;
39
40 await this.connect();
41
42 // build array of { score, value } objects
43 const entries = changes.map(({ id, entityType, type, date }) => ({
44 score: Date.now(),
45 value: JSON.stringify({ id, entityType, date, type }),
46 }));
47
48 // one multi(): batch ZADD + EXPIRE
49 await this.redisClient
50 .multi()
51 .zAdd(this.DB_KEY, entries)
52 .expire(this.DB_KEY, (this.INTERVAL * 60) / 1000)
53 .exec();
54 }
55
56 // start polling loop
57 public static startPolling() {
58 setInterval(async () => {
59 await this.connect();
60
61 const now = Date.now();
62
63 // 1 single command: get everything with score ∈ (lastTimestamp, now]
64 const raw = await this.redisClient.zRangeByScore(
65 this.DB_KEY,
66 this.lastTimestamp + 1,
67 now,
68 );
69
70 if (raw.length > 0) {
71 const updates = raw.map((s) => JSON.parse(s) as DBChange);
72
73 // advance our cursor
74 this.lastTimestamp = now;
75
76 // emit
77 this.emitter.emit("db_updates", updates);
78 }
79 }, this.INTERVAL);
80 }
81}
1import { EntityType } from "@prisma/client";
2import mitt from "mitt";
3import { createClient } from "redis";
4
5export type DBChange = {
6 id: string;
7 entityType: EntityType;
8 date: string;
9 type: "create" | "update" | "delete";
10};
11
12export default class DatabaseEventsService {
13 private static readonly DB_KEY = "db_updates";
14 private static readonly INTERVAL = 1_000;
15 private static lastTimestamp = Date.now();
16
17 private static redisClient = createClient({
18 url: process.env.REDIS_URL,
19 });
20
21 // our in‑process event emitter
22 public static emitter = mitt<{
23 db_updates: DBChange[];
24 }>();
25
26 // ensure Redis is connected
27 private static async connect() {
28 if (!this.redisClient.isOpen) {
29 await this.redisClient.connect();
30 this.redisClient.on("error", (err) => {
31 console.error("Redis Client Error", err);
32 });
33 }
34 }
35
36 // push one update into our ZSET, with score = timestamp
37 public static async write(changes: DBChange[]) {
38 if (changes.length === 0) return;
39
40 await this.connect();
41
42 // build array of { score, value } objects
43 const entries = changes.map(({ id, entityType, type, date }) => ({
44 score: Date.now(),
45 value: JSON.stringify({ id, entityType, date, type }),
46 }));
47
48 // one multi(): batch ZADD + EXPIRE
49 await this.redisClient
50 .multi()
51 .zAdd(this.DB_KEY, entries)
52 .expire(this.DB_KEY, (this.INTERVAL * 60) / 1000)
53 .exec();
54 }
55
56 // start polling loop
57 public static startPolling() {
58 setInterval(async () => {
59 await this.connect();
60
61 const now = Date.now();
62
63 // 1 single command: get everything with score ∈ (lastTimestamp, now]
64 const raw = await this.redisClient.zRangeByScore(
65 this.DB_KEY,
66 this.lastTimestamp + 1,
67 now,
68 );
69
70 if (raw.length > 0) {
71 const updates = raw.map((s) => JSON.parse(s) as DBChange);
72
73 // advance our cursor
74 this.lastTimestamp = now;
75
76 // emit
77 this.emitter.emit("db_updates", updates);
78 }
79 }, this.INTERVAL);
80 }
81}
1import { EntityType } from "@prisma/client";
2import mitt from "mitt";
3import { createClient } from "redis";
4
5export type DBChange = {
6 id: string;
7 entityType: EntityType;
8 date: string;
9 type: "create" | "update" | "delete";
10};
11
12export default class DatabaseEventsService {
13 private static readonly DB_KEY = "db_updates";
14 private static readonly INTERVAL = 1_000;
15 private static lastTimestamp = Date.now();
16
17 private static redisClient = createClient({
18 url: process.env.REDIS_URL,
19 });
20
21 // our in‑process event emitter
22 public static emitter = mitt<{
23 db_updates: DBChange[];
24 }>();
25
26 // ensure Redis is connected
27 private static async connect() {
28 if (!this.redisClient.isOpen) {
29 await this.redisClient.connect();
30 this.redisClient.on("error", (err) => {
31 console.error("Redis Client Error", err);
32 });
33 }
34 }
35
36 // push one update into our ZSET, with score = timestamp
37 public static async write(changes: DBChange[]) {
38 if (changes.length === 0) return;
39
40 await this.connect();
41
42 // build array of { score, value } objects
43 const entries = changes.map(({ id, entityType, type, date }) => ({
44 score: Date.now(),
45 value: JSON.stringify({ id, entityType, date, type }),
46 }));
47
48 // one multi(): batch ZADD + EXPIRE
49 await this.redisClient
50 .multi()
51 .zAdd(this.DB_KEY, entries)
52 .expire(this.DB_KEY, (this.INTERVAL * 60) / 1000)
53 .exec();
54 }
55
56 // start polling loop
57 public static startPolling() {
58 setInterval(async () => {
59 await this.connect();
60
61 const now = Date.now();
62
63 // 1 single command: get everything with score ∈ (lastTimestamp, now]
64 const raw = await this.redisClient.zRangeByScore(
65 this.DB_KEY,
66 this.lastTimestamp + 1,
67 now,
68 );
69
70 if (raw.length > 0) {
71 const updates = raw.map((s) => JSON.parse(s) as DBChange);
72
73 // advance our cursor
74 this.lastTimestamp = now;
75
76 // emit
77 this.emitter.emit("db_updates", updates);
78 }
79 }, this.INTERVAL);
80 }
81}
1import { EntityType } from "@prisma/client";
2import mitt from "mitt";
3import { createClient } from "redis";
4
5export type DBChange = {
6 id: string;
7 entityType: EntityType;
8 date: string;
9 type: "create" | "update" | "delete";
10};
11
12export default class DatabaseEventsService {
13 private static readonly DB_KEY = "db_updates";
14 private static readonly INTERVAL = 1_000;
15 private static lastTimestamp = Date.now();
16
17 private static redisClient = createClient({
18 url: process.env.REDIS_URL,
19 });
20
21 // our in‑process event emitter
22 public static emitter = mitt<{
23 db_updates: DBChange[];
24 }>();
25
26 // ensure Redis is connected
27 private static async connect() {
28 if (!this.redisClient.isOpen) {
29 await this.redisClient.connect();
30 this.redisClient.on("error", (err) => {
31 console.error("Redis Client Error", err);
32 });
33 }
34 }
35
36 // push one update into our ZSET, with score = timestamp
37 public static async write(changes: DBChange[]) {
38 if (changes.length === 0) return;
39
40 await this.connect();
41
42 // build array of { score, value } objects
43 const entries = changes.map(({ id, entityType, type, date }) => ({
44 score: Date.now(),
45 value: JSON.stringify({ id, entityType, date, type }),
46 }));
47
48 // one multi(): batch ZADD + EXPIRE
49 await this.redisClient
50 .multi()
51 .zAdd(this.DB_KEY, entries)
52 .expire(this.DB_KEY, (this.INTERVAL * 60) / 1000)
53 .exec();
54 }
55
56 // start polling loop
57 public static startPolling() {
58 setInterval(async () => {
59 await this.connect();
60
61 const now = Date.now();
62
63 // 1 single command: get everything with score ∈ (lastTimestamp, now]
64 const raw = await this.redisClient.zRangeByScore(
65 this.DB_KEY,
66 this.lastTimestamp + 1,
67 now,
68 );
69
70 if (raw.length > 0) {
71 const updates = raw.map((s) => JSON.parse(s) as DBChange);
72
73 // advance our cursor
74 this.lastTimestamp = now;
75
76 // emit
77 this.emitter.emit("db_updates", updates);
78 }
79 }, this.INTERVAL);
80 }
81}
As you can see, we create a Prisma client extension that hooks into create
, update
, and delete
operations for all models. Whenever one of these operations is performed, we write an event to Redis with the help of DatabaseEventsService
class with the model id, entity type, date and operation type. This is enough information for our polling service to know what has changed.
Polling endpoint
Now that we have a way to write events to Redis, we need a way to read them and send them to the clients. This is where the DatabasePollService
and DatabasePollController
come into play. The controller implements one JSONLines endpoint that is going to stream data to the clients as it becomes available. The service uses JSONLinesResponse
instance passed from the controller to send data to the clients and safely closes the connection in 30 seconds, so the clients will need to reconnect.
1import { JSONLinesResponse, VovkIteration } from "vovk";
2import type DatabasePollController from "./DatabasePollController";
3import DatabaseEventsService, { DBChange } from "./DatabaseEventsService";
4import DatabaseService from "./DatabaseService";
5import { forEach, groupBy } from "lodash";
6
7export default class PollService {
8 static poll(
9 resp: JSONLinesResponse<VovkIteration<typeof DatabasePollController.poll>>,
10 ) {
11 setTimeout(resp.close, 30_000);
12
13 let asOldAs = new Date();
14 DatabaseEventsService.emitter.on("db_updates", (changes) => {
15 const deleted = changes.filter((change) => change.type === "delete");
16 const createdOrUpdated = changes.filter(
17 (change) => change.type === "create" || change.type === "update",
18 );
19
20 for (const deletedEntity of deleted) {
21 resp.send({
22 id: deletedEntity.id,
23 entityType: deletedEntity.entityType,
24 __isDeleted: true,
25 });
26 }
27 // group by entityType and date, so the date is maximum date for the given entity: { entityType: string, date: string }[]
28 forEach(groupBy(createdOrUpdated, "entityType"), (changes) => {
29 const maxDateItem = changes.reduce(
30 (max, change) => {
31 const changeDate = new Date(change.date);
32 return changeDate.getTime() > new Date(max.date).getTime()
33 ? change
34 : max;
35 },
36 { date: new Date(0) } as unknown as DBChange,
37 );
38
39 if (new Date(maxDateItem.date).getTime() > asOldAs.getTime()) {
40 void DatabaseService.prisma[maxDateItem.entityType as "user"]
41 .findMany({
42 where: {
43 updatedAt: {
44 gte: asOldAs,
45 },
46 },
47 })
48 .then((entities) => {
49 for (const entity of entities) {
50 resp.send(entity);
51 }
52 });
53 asOldAs = new Date(maxDateItem.date);
54 }
55 });
56 });
57 }
58}
1import { JSONLinesResponse, VovkIteration } from "vovk";
2import type DatabasePollController from "./DatabasePollController";
3import DatabaseEventsService, { DBChange } from "./DatabaseEventsService";
4import DatabaseService from "./DatabaseService";
5import { forEach, groupBy } from "lodash";
6
7export default class PollService {
8 static poll(
9 resp: JSONLinesResponse<VovkIteration<typeof DatabasePollController.poll>>,
10 ) {
11 setTimeout(resp.close, 30_000);
12
13 let asOldAs = new Date();
14 DatabaseEventsService.emitter.on("db_updates", (changes) => {
15 const deleted = changes.filter((change) => change.type === "delete");
16 const createdOrUpdated = changes.filter(
17 (change) => change.type === "create" || change.type === "update",
18 );
19
20 for (const deletedEntity of deleted) {
21 resp.send({
22 id: deletedEntity.id,
23 entityType: deletedEntity.entityType,
24 __isDeleted: true,
25 });
26 }
27 // group by entityType and date, so the date is maximum date for the given entity: { entityType: string, date: string }[]
28 forEach(groupBy(createdOrUpdated, "entityType"), (changes) => {
29 const maxDateItem = changes.reduce(
30 (max, change) => {
31 const changeDate = new Date(change.date);
32 return changeDate.getTime() > new Date(max.date).getTime()
33 ? change
34 : max;
35 },
36 { date: new Date(0) } as unknown as DBChange,
37 );
38
39 if (new Date(maxDateItem.date).getTime() > asOldAs.getTime()) {
40 void DatabaseService.prisma[maxDateItem.entityType as "user"]
41 .findMany({
42 where: {
43 updatedAt: {
44 gte: asOldAs,
45 },
46 },
47 })
48 .then((entities) => {
49 for (const entity of entities) {
50 resp.send(entity);
51 }
52 });
53 asOldAs = new Date(maxDateItem.date);
54 }
55 });
56 });
57 }
58}
1import { JSONLinesResponse, VovkIteration } from "vovk";
2import type DatabasePollController from "./DatabasePollController";
3import DatabaseEventsService, { DBChange } from "./DatabaseEventsService";
4import DatabaseService from "./DatabaseService";
5import { forEach, groupBy } from "lodash";
6
7export default class PollService {
8 static poll(
9 resp: JSONLinesResponse<VovkIteration<typeof DatabasePollController.poll>>,
10 ) {
11 setTimeout(resp.close, 30_000);
12
13 let asOldAs = new Date();
14 DatabaseEventsService.emitter.on("db_updates", (changes) => {
15 const deleted = changes.filter((change) => change.type === "delete");
16 const createdOrUpdated = changes.filter(
17 (change) => change.type === "create" || change.type === "update",
18 );
19
20 for (const deletedEntity of deleted) {
21 resp.send({
22 id: deletedEntity.id,
23 entityType: deletedEntity.entityType,
24 __isDeleted: true,
25 });
26 }
27 // group by entityType and date, so the date is maximum date for the given entity: { entityType: string, date: string }[]
28 forEach(groupBy(createdOrUpdated, "entityType"), (changes) => {
29 const maxDateItem = changes.reduce(
30 (max, change) => {
31 const changeDate = new Date(change.date);
32 return changeDate.getTime() > new Date(max.date).getTime()
33 ? change
34 : max;
35 },
36 { date: new Date(0) } as unknown as DBChange,
37 );
38
39 if (new Date(maxDateItem.date).getTime() > asOldAs.getTime()) {
40 void DatabaseService.prisma[maxDateItem.entityType as "user"]
41 .findMany({
42 where: {
43 updatedAt: {
44 gte: asOldAs,
45 },
46 },
47 })
48 .then((entities) => {
49 for (const entity of entities) {
50 resp.send(entity);
51 }
52 });
53 asOldAs = new Date(maxDateItem.date);
54 }
55 });
56 });
57 }
58}
1import { JSONLinesResponse, VovkIteration } from "vovk";
2import type DatabasePollController from "./DatabasePollController";
3import DatabaseEventsService, { DBChange } from "./DatabaseEventsService";
4import DatabaseService from "./DatabaseService";
5import { forEach, groupBy } from "lodash";
6
7export default class PollService {
8 static poll(
9 resp: JSONLinesResponse<VovkIteration<typeof DatabasePollController.poll>>,
10 ) {
11 setTimeout(resp.close, 30_000);
12
13 let asOldAs = new Date();
14 DatabaseEventsService.emitter.on("db_updates", (changes) => {
15 const deleted = changes.filter((change) => change.type === "delete");
16 const createdOrUpdated = changes.filter(
17 (change) => change.type === "create" || change.type === "update",
18 );
19
20 for (const deletedEntity of deleted) {
21 resp.send({
22 id: deletedEntity.id,
23 entityType: deletedEntity.entityType,
24 __isDeleted: true,
25 });
26 }
27 // group by entityType and date, so the date is maximum date for the given entity: { entityType: string, date: string }[]
28 forEach(groupBy(createdOrUpdated, "entityType"), (changes) => {
29 const maxDateItem = changes.reduce(
30 (max, change) => {
31 const changeDate = new Date(change.date);
32 return changeDate.getTime() > new Date(max.date).getTime()
33 ? change
34 : max;
35 },
36 { date: new Date(0) } as unknown as DBChange,
37 );
38
39 if (new Date(maxDateItem.date).getTime() > asOldAs.getTime()) {
40 void DatabaseService.prisma[maxDateItem.entityType as "user"]
41 .findMany({
42 where: {
43 updatedAt: {
44 gte: asOldAs,
45 },
46 },
47 })
48 .then((entities) => {
49 for (const entity of entities) {
50 resp.send(entity);
51 }
52 });
53 asOldAs = new Date(maxDateItem.date);
54 }
55 });
56 });
57 }
58}
1import { EntityType } from "@prisma/client";
2import { get, JSONLinesResponse, prefix, VovkIteration } from "vovk";
3import { z } from "zod";
4import DatabasePollService from "./DatabasePollService";
5import { TaskSchema, UserSchema } from "../../../prisma/generated/schemas";
6import { withZod } from "@/lib/withZod";
7
8@prefix("poll")
9export default class DatabasePollController {
10 @get()
11 static poll = withZod({
12 iteration: z.union([
13 z.object({
14 id: z.uuid(),
15 entityType: z.enum(EntityType),
16 __isDeleted: z.boolean().optional(),
17 }),
18 UserSchema,
19 TaskSchema,
20 ]),
21 async handle(req) {
22 const response = new JSONLinesResponse<
23 VovkIteration<typeof DatabasePollController.poll>
24 >(req);
25
26 void DatabasePollService.poll(response);
27
28 return response;
29 },
30 });
31}
1import { EntityType } from "@prisma/client";
2import { get, JSONLinesResponse, prefix, VovkIteration } from "vovk";
3import { z } from "zod";
4import DatabasePollService from "./DatabasePollService";
5import { TaskSchema, UserSchema } from "../../../prisma/generated/schemas";
6import { withZod } from "@/lib/withZod";
7
8@prefix("poll")
9export default class DatabasePollController {
10 @get()
11 static poll = withZod({
12 iteration: z.union([
13 z.object({
14 id: z.uuid(),
15 entityType: z.enum(EntityType),
16 __isDeleted: z.boolean().optional(),
17 }),
18 UserSchema,
19 TaskSchema,
20 ]),
21 async handle(req) {
22 const response = new JSONLinesResponse<
23 VovkIteration<typeof DatabasePollController.poll>
24 >(req);
25
26 void DatabasePollService.poll(response);
27
28 return response;
29 },
30 });
31}
1import { EntityType } from "@prisma/client";
2import { get, JSONLinesResponse, prefix, VovkIteration } from "vovk";
3import { z } from "zod";
4import DatabasePollService from "./DatabasePollService";
5import { TaskSchema, UserSchema } from "../../../prisma/generated/schemas";
6import { withZod } from "@/lib/withZod";
7
8@prefix("poll")
9export default class DatabasePollController {
10 @get()
11 static poll = withZod({
12 iteration: z.union([
13 z.object({
14 id: z.uuid(),
15 entityType: z.enum(EntityType),
16 __isDeleted: z.boolean().optional(),
17 }),
18 UserSchema,
19 TaskSchema,
20 ]),
21 async handle(req) {
22 const response = new JSONLinesResponse<
23 VovkIteration<typeof DatabasePollController.poll>
24 >(req);
25
26 void DatabasePollService.poll(response);
27
28 return response;
29 },
30 });
31}
1import { EntityType } from "@prisma/client";
2import { get, JSONLinesResponse, prefix, VovkIteration } from "vovk";
3import { z } from "zod";
4import DatabasePollService from "./DatabasePollService";
5import { TaskSchema, UserSchema } from "../../../prisma/generated/schemas";
6import { withZod } from "@/lib/withZod";
7
8@prefix("poll")
9export default class DatabasePollController {
10 @get()
11 static poll = withZod({
12 iteration: z.union([
13 z.object({
14 id: z.uuid(),
15 entityType: z.enum(EntityType),
16 __isDeleted: z.boolean().optional(),
17 }),
18 UserSchema,
19 TaskSchema,
20 ]),
21 async handle(req) {
22 const response = new JSONLinesResponse<
23 VovkIteration<typeof DatabasePollController.poll>
24 >(req);
25
26 void DatabasePollService.poll(response);
27
28 return response;
29 },
30 });
31}
Client-side logic
On the client-side, anywhere in the app (e.g. a React component), we can use the DatabasePollRPC.poll()
method to get a stream of database events. As any JSONLines RPC method, it returns an async iterable that we can use in a for await
loop to get new events as they arrive. Since the connection can be closed by the server or fail due to network issues, we wrap the polling logic in a retry loop with exponential backoff. As the fetcher is already set, the body of the loop can be empty, as we don’t need to handle the data manually.
useEffect(() => {
async function poll(retries = 0) {
try {
while (true) {
using iterable = await DatabasePollRPC.poll();
for await (const iteration of iterable) {
console.log("New DB update:", iteration);
}
}
} catch (error) {
if (retries < 5) {
console.error("Polling failed, retrying...", error);
await new Promise((resolve) => setTimeout(resolve, 2000));
return poll(retries + 1);
}
}
}
void poll();
}, []);
Practical usage: Telegram bot
As an example of a third-party source of database changes, you can explore the TelegramService implementation and a small TelegramController . Coming soon…