Building Real-Time Read Models in Node.js with MongoDB Change Streams

Imagine you’re building an order management system where multiple users place and track orders in real time. Initially, you might keep dashboards updated by having each user’s browser poll the server every few seconds, triggering database queries to check for new orders or status changes.

The polling approach works for a small team but quickly breaks down at scale. With 100 users polling every 3 seconds, you’re executing around 2,000 queries per minute, most of which return no new data. Users still see outdated information between polls, while your database burns resources recomputing aggregations like “total pending orders” and “revenue by user,” even when nothing has changed. As more dashboards come online, the query load multiplies and creates a clear scalability bottleneck.

One of the best solutions to handle this is to use MongoDB Change Streams. Change Streams provide a real-time feed of changes happening in your collections or databases. Instead of polling, your application can subscribe to these events and receive notifications whenever a document is inserted, updated, or deleted. This allows your backend to react only when data actually changes, reducing redundant queries, connection load, and computation.

In this tutorial, you’ll build a Node.js application that demonstrates this concept in practice and can even host on Galaxy.

Prerequisites

To follow through with this tutorial, you’ll need the following:

  • A basic understanding of JavaScript
  • A basic understanding of Node.js 
  • Node.js (v18 or later)
  • Npm installed
  • MongoDB Atlas 

Project setup 

Open your terminal and navigate to where you want to create the project. Run:

Bash
cd model-demo
mkdir model-demo
npm init -y

These sets of commands would create the project folder, initialize it, and create a `package.json`  for your project. 

Next, you need to install a few dependencies for this project. Run this command in your terminal:

Bash
npm install express mongodb dotenv

This will install:

  • Express: For handling HTTP routes
  • MongoDB: The official driver to connect and query MongoDB
  • dotenv: To load environment variables securely

You will also need a `.env` file at the root of your project to store your MongoDB connection string and port configuration. Add this afterwards:

Bash
MONGO_URL=mongodb+srv://<username>:<password>@clusterstream.igawfzv.mongodb.net/model_demo?retryWrites=true&w=majority

PORT=4000

Replace:

  • `<username>`  with your MongoDB Atlas database user
  • `<password>`  with your user’s password

This connection string specifies which cluster to connect to and which database to use.

The project final structure would look like this:

Bash
model_demo/
 ├── app.js
 ├── routes/
 │   ├── orders.js
 │   └── orderViews.js
 ├── .env
 ├── projector.js
 └── package.json

Since this tutorial uses modern ES module syntax, you need to tell Node.js to treat the project as an ES module.

To do this, open your `package.json` file and add this line at the top level:

JSON
{
  "type": "module"
}

This enables support for import statements in Node.js. Without it, Node will expect the older CommonJS syntax, and your imports will cause an error.

Build the Backend and Real-Time Sync Engine

Create a file called `app.js` and add the following:

JavaScript
import express from "express";
import { MongoClient } from "mongodb";
import dotenv from "dotenv";
import ordersRouter from "./routes/orders.js";

dotenv.config();

const app = express();
const port = process.env.PORT || 4000;

const client = new MongoClient(process.env.MONGO_URL, {
 serverSelectionTimeoutMS: 10000,
});

async function startServer() {

 try {
   await client.connect();
   console.log("Connected to MongoDB");
   const db = client.db(process.env.DB_NAME || "model_demo");
   console.log("Using DB:", db.databaseName);
   app.use(express.json());
   app.use("/orders", ordersRouter(db)); // write-only routes for now
   app.listen(port, () => console.log(`Server running on port ${port}`));

 } catch (err) {
   console.error("MongoDB connection error:", err);
   process.exit(1);
 }

}

startServer();

Here, we import the required dependencies and connect to MongoDB using the `MongoClient`. Once the connection is established, you select the `model_demo` database, mount the `/orders` route (which you’ll create next), and then start the Express server.

Next, you need an API endpoint that allows your application to handle new order submissions. This route will accept incoming data, store it in the `orders` collection, and serve as the entry point for all write operations in your backend.

Create a folder called `routes`, and inside it, create a file called `orders.js`. Add the following:

JavaScript
import express from "express";

export default function ordersRouter(db) {

  const router = express.Router();
  const orders = db.collection("orders");

  // POST /orders -> insert a new order
  router.post("/", async (req, res) => {
    const result = await orders.insertOne(req.body);
    res.json({ insertedId: result.insertedId });
  });

  return router;
}

The code above defines a router that handles POST requests to the `/orders` endpoint. When you send JSON data to this route, it inserts a new document into the `orders` collection. 

You also need to create a background service that listens for real-time changes in the `orders `collection and keeps a separate `order_views` collection automatically updated. 

In the root folder, create a file called `projector.js`. Add the following code:

JavaScript
import { MongoClient } from "mongodb";
import dotenv from "dotenv";

dotenv.config();

console.log("PROJ MONGO_URL:", process.env.MONGO_URL);
console.log("PROJ DB_NAME:", process.env.DB_NAME || "model_demo");

const client = new MongoClient(process.env.MONGO_URL, {
 serverSelectionTimeoutMS: 10000,
});

const SRC = "orders";
const VIEW = "order_views";
const STATE = "projector_state";

function buildView(order) {
 
 return {
   _id: order._id,
   userId: order.userId || "N/A",
   total: order.total ?? 0,
   status: order.status || "UNKNOWN",
   createdAt: order.createdAt || new Date(),
   summary: `User ${order.userId || "N/A"}${order.total ?? 0} (${
     order.status || "UNKNOWN"
   })`,
 };
}

async function ensureIndexes(db) {
 await db.collection(VIEW).createIndex({ createdAt: -1 });
 await db.collection(VIEW).createIndex({ userId: 1, createdAt: -1 });
}

async function saveResumeToken(db, token) {
 await db
   .collection(STATE)
   .updateOne(
     { _id: "orders->order_views" },
     { $set: { token } },
     { upsert: true }
   );
}

async function loadResumeToken(db) {
 const doc = await db
   .collection(STATE)
   .findOne({ _id: "orders->order_views" });
 return doc?.token;
}

async function processEvent(db, event) {
 const view = db.collection(VIEW);

 if (["insert", "update", "replace"].includes(event.operationType)) {
   const order = event.fullDocument;
   if (!order) return;
   const doc = buildView(order);
   await view.updateOne({ _id: doc._id }, { $set: doc }, { upsert: true });
   console.log("Synced:", String(doc._id));
 }

 if (event.operationType === "delete") {
   const id = event.documentKey?._id;
   await view.deleteOne({ _id: id });
   console.log("Removed:", String(id));
 }
}

async function startProjector() {

 await client.connect();
 
 const db = client.db(process.env.DB_NAME || "model_demo");
 console.log("Projector connected. DB:", db.databaseName);

 await ensureIndexes(db);

 const resumeAfter = await loadResumeToken(db);

 const options = {
   fullDocument: "updateLookup",
   ...(resumeAfter ? { resumeAfter } : {}),
 };

 console.log("Watching:", SRC, resumeAfter ? "(resume)" : "(fresh)");
 
 const stream = db.collection(SRC).watch([], options);

 stream.on("change", async (event) => {
   try {
     await processEvent(db, event);
     if (event._id) await saveResumeToken(db, event._id);
   } catch (err) {
     console.error("Event error:", err);
   }
 });

 stream.on("error", (err) => {
   console.error("Stream error:", err);
   setTimeout(() => startProjector().catch(console.error), 2000);
 });

 stream.on("end", () => {
   console.warn("Stream ended. Restarting.");
   startProjector().catch(console.error);
 });

}

startProjector().catch((e) => {
 console.error("Projector failed:", e);
 process.exit(1);
});

In the code above, the `watch()` method subscribes to the `orders` collection and emits an event whenever a document is inserted, updated, or deleted. Each time a new event occurs, the `buildView()` function formats the order and writes it into the `order_views` collection. The projector also stores a resume token in the `projector_state` collection, allowing it to resume where it left off if restarted. Finally, the `ensureIndexes()` function creates indexes on `order_views` to make queries faster and more efficient.

Start up your app by running this command in your terminal:

Bash
node app.js

You should see this output:

Bash
Connected to MongoDB
Using DB: model_demo
Server running on port 4000
Seeded orders collection

Then in a new terminal, run this command to start the projector:

Bash
node projector.js

You should see this output:

Bash
Projector connected. DB: model_demo

Watching: orders (fresh)

Now you’re ready to test the setup.

In a third terminal, send a POST request:

Bash
curl -X POST http://localhost:4000/orders \
  -H "Content-Type: application/json" \
  -d '{"userId":"u401","total":199.5,"status":"PENDING","createdAt":"2025-10-20T00:00:00.000Z"}'

 You should see:

Bash
Synced: 68f64b806246998ab7b44c99

This means the Change Stream captured the insert and updated the order_views collection in real time.

Expose Read Endpoints

Now that the real-time sync engine works, let’s add routes to fetch data from the read model.

Inside the routes folder, create a file called `orderViews.js`. Add the following code:

JavaScript
// routes/orderViews.js

import express from "express";

export default function orderViewsRouter(db) {
 const router = express.Router();
 const views = db.collection("order_views");

 // GET /order-views/recent?limit=20
 router.get("/recent", async (req, res) => {
   const limit = Math.min(Number(req.query.limit || 20), 100);

   const docs = await views
     .find({})
     .sort({ createdAt: -1 })
     .limit(limit)
     .project({ summary: 1, total: 1, status: 1, createdAt: 1, userId: 1 })
     .toArray();

   res.json(docs);
 });

 // GET /order-views/users/:id?limit=50
 router.get("/users/:id", async (req, res) => {
   const limit = Math.min(Number(req.query.limit || 50), 200);

   const docs = await views
     .find({ userId: req.params.id })
     .sort({ createdAt: -1 })
     .limit(limit)
     .project({ summary: 1, total: 1, status: 1, createdAt: 1, userId: 1 })
     .toArray();

   res.json(docs);
 });

 return router;
}

Finally, update `app.js` to include this router:

JavaScript
import orderViewsRouter from "./routes/orderViews.js";

app.use("/order-views", orderViewsRouter(db));

Your app.js would now be updated to this:

JavaScript
// app.js

import express from "express";
import { MongoClient } from "mongodb";
import dotenv from "dotenv";
import ordersRouter from "./routes/orders.js";
import orderViewsRouter from "./routes/orderViews.js"; // <-- add this

dotenv.config();

const app = express();
const port = process.env.PORT || 4000;
const client = new MongoClient(process.env.MONGO_URL, {
 serverSelectionTimeoutMS: 10000,
});

async function startServer() {
 try {

   await client.connect();
   console.log("Connected to MongoDB");
   const db = client.db(process.env.DB_NAME || "model_demo");
   console.log("Using DB:", db.databaseName);

   app.use(express.json());

   // Write endpoints
   app.use("/orders", ordersRouter(db));

   // Read-model endpoints (final step)
   app.use("/order-views", orderViewsRouter(db));
   app.listen(port, () => console.log(`Server running on port ${port}`));

 } catch (err) {
   console.error("MongoDB connection error:", err);
   process.exit(1);
 }

}

startServer();

 Let’s now test the full flow. Restart your `app.js` in your terminal. Afterwards, insert a new order:

Bash
curl -X POST http://localhost:4000/orders \
  -H "Content-Type: application/json" \
  -d '{"userId":"u555","total":245.75,"status":"PENDING","createdAt":"2025-10-20T00:00:00.000Z"}'

You should see `Synced: <ObjectId>` in the projector terminal.

Next, let’s fetch recent orders:

Bash
curl "http://localhost:4000/order-views/recent?limit=10"

You should see an object with order details:

Bash
[{"_id":"68f6109c7edd1b6f139626eb","createdAt":"2025-11-20T00:00:00.000Z","status":"PENDING","summary":"User u100 → 149.99 (PENDING)","total":149.99,"userId":"u100"},{"_id":"69018efcec59d77b48b2b7a2","createdAt":"2025-10-28T00:00:00.000Z","status":"PENDING","summary":"User u901 → 120.5 (PENDING)","total":120.5,"userId":"u901"},{"_id":"690191a82d274a273dc34645","createdAt":"2025-10-28T00:00:00.000Z","status":"PENDING","summary":"User u999 → 77.5 (PENDING)","total":77.5,"userId":"u999"},{"_id":"690194450ac2114e655e422e","createdAt":"2025-10-28T00:00:00.000Z","status":"PENDING","summary":"User u999 → 77.5 (PENDING)","total":77.5,"userId":"u999"},{"_id":"68f64ba06246998ab7b44c9a","createdAt":"2025-10-20T00:00:00.000Z","status":"PENDING","summary":"User u401 → 199.5 (PENDING)","total":199.5,"userId":"u401"},{"_id":"68f6541605a2877364a52d57","createdAt":"2025-10-20T00:00:00.000Z","status":"PENDING","summary":"User u555 → 245.75 (PENDING)","total":245.75,"userId":"u555"},{"_id":"69012f2c90a7c9b5fe7d11c0","createdAt":"2024-10-20T00:00:00.000Z","status":"PENDING","summary":"User u401 → 199.5 (PENDING)","total":199.5,"userId":"u401"}] 

You can also fetch by users:

Bash
curl "http://localhost:4000/order-views/users/u555?limit=10"


And you see the particular order of a user:

Bash
[{"_id":"68f6541605a2877364a52d57","createdAt":"2025-10-20T00:00:00.000Z","status":"PENDING","summary":"User u555 → 245.75 (PENDING)","total":245.75,"userId":"u555"}]% 

Performance Impact

According to a MongoDB community benchmark, Change Streams can process up to 22,000 events per second with latencies under 15 milliseconds on replica sets. This shows how efficiently MongoDB can stream real-time updates without relying on continuous polling.

In contrast, traditional polling approaches generate thousands of redundant queries per minute as clients repeatedly check for updates, adding unnecessary database load and latency. With Change Streams, your application only reacts when data actually changes, keeping dashboards responsive while significantly reducing overhead.

That’s how to Build Real-time Read Models with MongoDB Change Streams

​​In this tutorial, you built a Node.js application that listens to database changes and keeps a read-optimized collection automatically in sync. You connected to MongoDB, handled order writes, and created a projector service that updates a live `order_views` collection whenever data changes.

With MongoDB Change Streams and a lightweight Node.js setup, you can create responsive, cache-free architectures that deliver instant results. This same approach can power real-time dashboards, activity feeds, and analytics systems that stay up to date without extra complexity.

You can deploy this setup on Galaxy to manage and scale both your API and projector with ease. Galaxy makes it simple to monitor performance, handle scaling, and keep your real-time applications running reliably in production.