Realtime Database Polling
State normalization, a custom fetcher, and the corresponding backend implementation keep the UI updated while the user interacts with the app by making HTTP requests. The implementation also covers AI features such as Text Chat AI and Voice AI, explained in later articles. The rule of thumb is that data should always be processed by the entity registry.
But what if the database is changed by other users or third-party services? How do we keep the UI in sync with backend data in real time? One way is to implement database polling powered by JSONLines. The server sends updates to clients whenever the database changes, and the client reconnects automatically when the connection is closed.
The component below demonstrates a simple polling example that receives incremental updates from the server every second. After 10 updates, the server closes the connection, and the client reconnects automatically. We can use the same approach to receive database updates in real time by having the server send updates whenever the database changes.
Poll Ticker
0
A small delay (up to half a second) is expected due to the CORS preflight. See the Network tab in DevTools for details.
Redis DB as event bus
While we could poll the main Postgres database for changes, that approach is inefficient. Instead, we use Redis as an event bus: whenever the main database changes, we write a small event to the Redis database. Our polling service reads these events every second and sends them to clients with the app open.
Because we use Prisma as our ORM, we can use Prisma Extensions to hook into database operations and write events to Redis. This is where the DatabaseService mentioned in the Endpoints article comes into play.
The existing implementation has the following limitations:
- Deletions need to be explicit, even if cascade deletions are handled automatically by the database. See the
UserController.deleteUsermethod for more details. - All write operations must select the
updatedAtfield for change detection to work properly. - The list of supported write operations is limited to
create,update,upsert, anddeletefor simplicity. Read operations are passed through as-is. For more complex operations, additional handling or abstraction is required.
import type { BaseEntity } from "@/types";
import { PrismaClient } from "@prisma/client";
import { PrismaNeon } from "@prisma/adapter-neon";
import DatabaseEventsService, { type DBChange } from "./DatabaseEventsService";
import "./neon-local"; // Setup Neon for local development
export default class DatabaseService {
static get prisma() {
return (this.#prisma ??= this.getClient());
}
static #prisma: ReturnType<typeof DatabaseService.getClient> | null = null;
private static getClient() {
const prisma = new PrismaClient({
adapter: new PrismaNeon({
connectionString: `${process.env.DATABASE_URL}`,
}),
});
DatabaseEventsService.beginEmitting();
return prisma
.$extends({
name: "timestamps",
// Ensure createdAt and updatedAt are always ISO strings to match the generated Zod schemas
result: {
$allModels: {
createdAt: {
compute: (data: { createdAt: Date }) =>
data.createdAt.toISOString(),
},
updatedAt: {
compute: (data: { updatedAt: Date }) =>
data.updatedAt.toISOString(),
},
},
},
})
.$extends({
name: "events",
// Emit database change events for create, update, and delete operations
query: {
$allModels: {
async $allOperations({ model, operation, args, query }) {
const allowedOperations = [
"create",
"update",
"delete",
"upsert",
"findMany",
"findUnique",
"findFirst",
"findUniqueOrThrow",
"findFirstOrThrow",
"count",
"aggregate",
"groupBy",
] as const;
type AllowedOperation = (typeof allowedOperations)[number];
if (!allowedOperations.includes(operation as AllowedOperation)) {
throw new Error(
`Unsupported database operation "${operation}" on model "${model}"`,
);
}
const result = (await query(args)) as BaseEntity | BaseEntity[];
const now = new Date().toISOString();
let change: DBChange | null = null;
const makeChange = (
entity: BaseEntity,
type: DBChange["type"],
) => ({
id: entity.id,
entityType: entity.entityType,
date:
type === "delete"
? now
: entity.updatedAt
? new Date(entity.updatedAt).toISOString()
: now,
type,
});
switch (operation as AllowedOperation) {
case "create":
if ("entityType" in result)
change = makeChange(result, "create");
break;
case "update":
case "upsert":
if ("entityType" in result)
change = makeChange(result, "update");
break;
case "delete":
if ("entityType" in result) {
change = makeChange(result, "delete");
// Automatically add __isDeleted flag to deletion results
Object.assign(result, { __isDeleted: true });
}
break;
case "findMany":
case "findUnique":
case "findFirst":
case "findUniqueOrThrow":
case "findFirstOrThrow":
case "count":
case "aggregate":
case "groupBy":
// no events
break;
default:
console.warn(
`Unhandled Prisma operation: ${operation} for model: ${model}`,
);
break;
}
if (change) {
await DatabaseEventsService.createChanges([change]);
}
return result;
},
},
},
});
}
}The code above is fetched from GitHub repository.
- The
getClientmethod callsDatabaseEventsService.beginEmitting()to start emitting events. ThebeginEmittingfunction runs asetIntervalthat connects to Redis and periodically checks for new events. When a new event is found, it emits it via mitt . prisma.$extendshooks into some of the Prisma model operations, determines whether an operation modifies data, and if so callsawait DatabaseEventsService.createChanges([change])to persist a change entry in Redis. The change captures creates, updates, and deletions:
export type DBChange = {
id: string;
entityType: EntityType;
date: string;
type: 'create' | 'update' | 'delete';
};The date field indicates when the change occurred to help clients fetch only the latest changes.
- For
create,update, andupsertoperations, it uses theupdatedAtDB field (all write operations must select this field). - For
deleteoperations, it uses the current time.
The delete operation also adds an __isDeleted property. The frontend checks this property to hide the deleted entity by setting enumerable: false on the entity registry item (see the State page).
Operations like find... and count do not trigger changes and are passed through as-is.
In addition to beginEmitting and createChanges, DatabaseEventsService provides a connect method and an emitter (a mitt instance). These are used by the polling service (DatabasePollService, discussed next) to be notified about new events.
import { EntityType } from "@prisma/client";
import mitt from "mitt";
import { createClient } from "redis";
export type DBChange = {
id: string;
entityType: EntityType;
date: string;
type: "create" | "update" | "delete";
};
export default class DatabaseEventsService {
public static readonly DB_KEY = "db_updates";
private static readonly INTERVAL = 1_000;
private static lastTimestamp = Date.now();
private static redisClient = createClient({
url: process.env.REDIS_URL,
});
public static emitter = mitt<{
[DatabaseEventsService.DB_KEY]: DBChange[];
}>();
// ensure Redis is connected
private static async connect() {
if (!this.redisClient.isOpen) {
await this.redisClient.connect();
this.redisClient.on("error", (err) => {
console.error("Redis Client Error", err);
});
}
}
// push one update into our ZSET, with score = timestamp
public static async createChanges(changes: DBChange[]) {
if (changes.length === 0) return;
await this.connect();
// build array of { score, value } objects
const entries = changes.map(({ id, entityType, type, date }) => ({
score: Date.now(),
value: JSON.stringify({ id, entityType, date, type }),
}));
// one multi(): batch ZADD + EXPIRE
await this.redisClient
.multi()
.zAdd(this.DB_KEY, entries)
.expire(this.DB_KEY, (this.INTERVAL * 60) / 1000)
.exec();
}
public static beginEmitting() {
setInterval(async () => {
await this.connect();
const now = Date.now();
// get everything with score ∈ (lastTimestamp, now]
const raw = await this.redisClient.zRangeByScore(
this.DB_KEY,
this.lastTimestamp + 1,
now,
);
this.lastTimestamp = now;
if (raw.length > 0) {
const updates = raw.map((s) => JSON.parse(s) as DBChange);
this.emitter.emit(this.DB_KEY, updates);
}
}, this.INTERVAL);
}
}The code above is fetched from GitHub repository.
Polling controller and service
With Redis change entries and the change emitter in place, we can implement a polling endpoint that streams updates to clients in real time. The DatabasePollController exposes a single JSONLines endpoint, and DatabasePollService uses a JSONLinesResponder instance (received from the controller) to send data to clients. The service closes the connection safely after 30 seconds, so clients should reconnect.
DatabasePollService.ts
import type DatabasePollController from "./DatabasePollController";
import { JSONLinesResponder, type VovkIteration } from "vovk";
import { forEach, groupBy } from "lodash";
import DatabaseEventsService, { type DBChange } from "./DatabaseEventsService";
import DatabaseService from "./DatabaseService";
export default class PollService {
static poll(
responder: JSONLinesResponder<VovkIteration<typeof DatabasePollController.poll>>,
) {
setTimeout(() => responder.close(), 30_000);
let asOldAs = new Date();
// 10 minutes ago; TODO: use latest update date from registry
asOldAs.setMinutes(asOldAs.getMinutes() - 10);
DatabaseEventsService.emitter.on(
DatabaseEventsService.DB_KEY,
(changes) => {
const deleted = changes.filter((change) => change.type === "delete");
const createdOrUpdated = changes.filter(
(change) => change.type === "create" || change.type === "update",
);
for (const deletedEntity of deleted) {
void responder.send({
id: deletedEntity.id,
entityType: deletedEntity.entityType,
__isDeleted: true,
});
}
// group by entityType and date, so the date is maximum date for the given entity: { entityType: string, date: string }[]
forEach(groupBy(createdOrUpdated, "entityType"), (changes) => {
const maxDateItem = changes.reduce(
(max, change) => {
const changeDate = new Date(change.date);
return changeDate.getTime() > new Date(max.date).getTime()
? change
: max;
},
{ date: new Date(0) } as unknown as DBChange,
);
if (new Date(maxDateItem.date).getTime() > asOldAs.getTime()) {
void DatabaseService.prisma[maxDateItem.entityType as "user"]
.findMany({
where: {
updatedAt: {
gt: asOldAs,
},
},
})
.then((entities) => {
for (const entity of entities) {
void responder.send(entity);
}
});
asOldAs = new Date(maxDateItem.date);
}
});
},
);
}
}- When a
deleteDB change is emitted viaDatabaseEventsService.emitter, the service sends an event withid,entityType, and__isDeleted: true. The frontend uses__isDeletedto hide the entity by making it non-enumerable in the registry. - When an
updateorcreatechange is emitted, the service fetches the full entity from Postgres (since Redis stores only metadata) and sends it to clients.
Client-side logic
On the client side (for example, in a React component), call DatabasePollRPC.poll() to receive a stream of database events. As with any JSONLines RPC method, it returns an async iterable that you can consume in a for await loop. Because the server may close the connection or a network error may occur, wrap the logic in a retry loop. Since the fetcher is already configured, the loop body can be empty—you do not need to handle data manually.
The frontend code, besides the polling logic, also includes an on/off toggle persisted to localStorage, so users can enable or disable polling as needed.
Here’s the useDatabasePolling hook that implements the described logic, returning the [isPollingEnabled, setIsPollingEnabled, hasError] state tuple:
import { useEffect, useRef, useState } from "react";
import { DatabasePollRPC } from "vovk-client";
/**
* Hook to manage database polling state.
* @example const [isPollingEnabled, setIsPollingEnabled, hasError] = useDatabasePolling(false);
*/
export default function useDatabasePolling(initialValue = false) {
const MAX_RETRIES = 5;
const [isPollingEnabled, setIsPollingEnabled] = useState(initialValue);
const [hasError, setHasError] = useState(false);
const abortRef = useRef<() => void | null>(null);
useEffect(() => {
const isEnabled = localStorage.getItem("isPollingEnabled");
setIsPollingEnabled(isEnabled === "true");
}, []);
useEffect(() => {
localStorage.setItem("isPollingEnabled", isPollingEnabled.toString());
async function poll(retries = 0) {
setHasError(false);
if (!isPollingEnabled) {
abortRef.current?.();
return;
}
try {
while (true) {
console.log("Polling database for updates...");
const iterable = await DatabasePollRPC.poll();
abortRef.current = iterable.abortSilently;
for await (const iteration of iterable) {
console.log("New DB update:", iteration);
}
if (iterable.abortController.signal.aborted) {
console.log("Polling aborted with abortSilently");
break;
}
}
} catch (error) {
if (retries < MAX_RETRIES) {
console.error("Polling failed, retrying...", error);
await new Promise((resolve) => setTimeout(resolve, 2000));
return poll(retries + 1);
} else {
console.error(
"Max polling retries reached. Stopping polling.",
error,
);
setHasError(true);
}
}
}
void poll();
return () => {
abortRef.current?.();
};
}, [isPollingEnabled]);
return [isPollingEnabled, setIsPollingEnabled, hasError] as const;
}The code above is fetched from GitHub repository.
Usage:
const [isPollingEnabled, setIsPollingEnabled, hasError] = useDatabasePolling(false);From now on, when the database is changed by other users or third-party services, the frontend receives updates in real time and the entity registry is updated accordingly, keeping the UI in sync with the backend data.