Real-time Push Notifications in a Distributed Environment with NodeJS, RethinkDB and Socket.IO

Building push-notifications using Web Sockets (socket.io or SignalR) has been discussed and implemented quite successfully. As I read and learned about RethinkDB, it got me wondering why there’s a need for a database that provides change-feeds to the clients connected to the database. Well, since most basic applications have a single back-end server, one might not derive a lot of benefit from using RethinkDB for getting push notifications to the server since the server would know when a change happened on the database as it performs commands and queries. However, in a distributed environment, let’s say multiple NodeJS servers interacting with a database, I speculate that it would be highly beneficial to use a database like RethinkDB which automatically updates all servers with the changes on a resource and thus allowing all servers to forward the notifications to the connected clients.

One such implementation is described below:

For the sake of simplicity, our application will allow us to manage contacts with four fields: FirstName, LastName, Email, and Age

The application being discussed allows multiple users to collaborate on individual resources while providing updates to all users are using the application. The key components of this applications are NodeJS (servers), Socket.IO (web sockets), AngularJS (ui) and RethinkDB (See Figure 1).

architecture
Figure 1: Application Architecture

In order to build our application, we can follow the following steps or refer to the source code on my GitHub.

STEP 1: Build CRUD API for Contacts using RethinkDB and NodeJS

The code below implements CRUD actions and exposes the following actions:

[GET] /api/contacts – Get all contacts
[POST] /api/contact – Create new contact
[GET] /api/contact/:id – Retrieve a contact
[PUT] /api/contact/:id – Update a contact
[DELETE] /api/contact/:id – Delete a contact

r = require('rethinkdb');

function ContactsAPI () {
  "use strict";
  var connectionInfo = { host: 'localhost', port: 28015 };
  function executeQuery(fn){
    r.connect(connectionInfo, fn);
  };
  this.GetContacts = function GetContacts(req,res){    
    function getContacts(err, conn) {
      if(err) { res.status(500).send(err); return; }
      r.table('contacts').run(conn, function(err, cursor)
      {
        if(err) { res.status(500).send(err); return; }
        cursor.toArray(function (err, results){
          if(err) { res.status(500).send(err); return; }
          res.status(200).send(results);
        })
      });
    };
    executeQuery(getContacts);
  };
  this.GetContact = function GetContact(req,res){    
    function getContact(err, conn) {
      if(err) { res.status(500).send(err); return; }
      r.table('contacts').get(req.params.id).run(conn, function(err, results)
      {
        if(err) { res.status(500).send(err); return; }
          res.status(200).send(results);
      });
    };
    executeQuery(getContact);
  };
  this.UpdateContact = function UpdateContact(req, res){
    function updateContact(err, conn) {
      if(err) { res.status(500).send(err); return; }
      r.table('contacts').filter(r.row("id").eq(req.params.id)).update(req.body).run(conn, function(err, results) {
          if(err) { res.status(500).send(err); return; }
          res.status(200).send(results);
      });
    };
    executeQuery(updateContact);
  };
  this.CreateContact = function CreateContact(req, res){
    function createContact(err, conn) {
      if(err) { res.status(500).send(err); return; }
      r.table('contacts').insert(req.body).run(conn, function(err, results) {
          if(err) { res.status(500).send(err); return; }
          res.status(201).send(results);
      });
    };
    executeQuery(createContact);
  };
  this.DeleteContact = function DeleteContact(req, res){
    function deleteContact(err, conn) {
      if(err) { res.status(500).send(err); return; }
      r.table('contacts').filter(r.row("id").eq(req.params.id)).delete().run(conn, function(err, results) {
          if(err) { res.status(500).send(err); return; }
          res.status(200).send(results);
      });
    };
    executeQuery(deleteContact);
  }
};

module.exports.ContactsAPI = ContactsAPI;

STEP 2: Subscribe to ChangeFeeds in RethinkDB with NodeJS

r = require('rethinkdb');

function Subscription (io) {
  "use strict";
  var connectionInfo = { host: 'localhost', port: 28015 };
  function executeQuery(fn){
    r.connect(connectionInfo, fn);
  };
  this.SubscribeContactsTable = function SubscribeContactsTable(){    
    function subscribeContactsTable(err, conn) {
      if(err) { throw err; }
      r.table('contacts').changes().run(conn, function(err, cursor){
        if(err) { throw err; }
        cursor.each(function(err, row){
          if (err) throw err;
          notifySubscribers(row);
        })
      })
    };
    executeQuery(subscribeContactsTable);
  };
  function notifySubscribers(row){ [1]
    console.log(JSON.stringify(row, null, 2));
    var defaultRoom = '#/'; [2]
    var room = '';
    var evt = '';
    if (!row.new_val){ [3]
      evt = 'deleted';
      room = '#/contact/' + row.old_val.id;
    }
    else if (!row.old_val){
      evt = 'created';
      room = '#/contact/' + row.old_val.id;
    }
    else{
      evt = 'updated';
      room = '#/contact/' + row.old_val.id;
    }
    io.to(defaultRoom).emit(evt, { data : row }); [4]
    io.to(room).emit(evt, { data : row }); [5]
  };
};

module.exports.Subscription = Subscription;

[1] – For each change in the batch (always one change in our application), this function will notify the subscribers.

[2] – SignalIO room names can be created and maintained based on the table name, and the resource id. Incidentally, these also map to the urls available in the application. The default room ‘#/’ maps to the main page of our application which allows the ui to appropriately let the users know that a change has happened.

[3] – When a change happens, RethinkDB sends the server the new value and the old value of the record. At this time, there’s no built-in way (that I am aware of) that lets the application know the type of event (Update, Create, Delete), therefore, it’s up to the application to determine this.

[4] – Notify everyone that’s currently on the homepage that a change has happened in the database.

[5] – Notify everyone that’s currently browsing the resource that a change has happened.

STEP 3: Build UI for the application using Angular

The application consists of two controllers – ContactCtrl and StreamCtrl. ContactCtrl manages all the UI actions for the application and StreamCtrl manages the messages from SocketIO.

    var ctrls = angular.module('controllers', []);
    ctrls.controller('ContactCtrl', ['$scope', '$http', '$location', '$routeParams', 'ContactService',
      function($scope, $http, $location, $routeParams, ContactService) {
          //... removed for brevity
      }
    ]);
    ctrls.controller('StreamCtrl', ['$scope', '$http', 'SocketService', function($scope, $http, SocketService) {
      //... removed for brevity
    }]);

STEP 4: Integrate the UI with NodeJS so that SocketIO can push updates to the UI

The services factory is responsible for starting the SocketService and managing the messages sent by the server.

One think to note with this approach is that the socket service needs to trigger appropriate methods at marker [1] and [2], in order for the change to be applied to the $scope of the angular controller, we have to call the .$apply() method.

services.factory("SocketService", function(){
    var socket = io();
  socket.on('joined', function(msg){
    updateRoom(msg);
  });
  socket.on('updated', function(msg){
      updateContact(msg, 'updated');
  });
  socket.on('created', function(msg){
      updateContact(msg, 'created');
  });
  socket.on('deleted', function(msg){
      updateContact(msg, 'deleted');
  });
  function updateContact(msg, event){
      updateRoom(msg);
      if (window.location.hash == '#/'){
        [1]
        angular.element('[ng-controller="ContactCtrl"]').scope().Methods.GetContacts();
    }
    [2]
    angular.element('[ng-controller="ContactCtrl"]').scope().Methods.OnServerEvent(msg, event);
    angular.element('[ng-controller="ContactCtrl"]').scope().$apply();
  }
  function updateRoom(msg){
      // Is there a better way?
        angular.element('[ng-controller="StreamCtrl"]').scope().updates.push(msg);
    angular.element('[ng-controller="StreamCtrl"]').scope().$apply();
  }
  return { socket: socket }
});

STEP 5: Handle updates and display updates to the user

The SocketService triggers the OnServerEvent method which is responsible for providing the notification to the user.

 

ctrls.controller('ContactCtrl', ['$scope', '$http', '$location', '$routeParams', 'ContactService',
  function($scope, $http, $location, $routeParams, ContactService) {
      // ... removed for brevity
    $scope.Methods.OnServerEvent = function(msg, event){ [1]
      if (event == 'deleted') $scope.Methods.OnContactDeleted();
      if (event == 'updated' )//&& sessionStorage.awaitEvent != ('updated:' + $scope.contact.id))
      {
        $scope.Methods.OnContactUpdated(msg.data);
      }
    }
    $scope.Methods.OnContactDeleted = function(msg){ [2]
      if (window.location.hash == '#/') { $scope.Methods.GetContacts(); return; }
      alert("The contact you are editing has been deleted. This contact is no longer available.");
      window.location.href = '/#/';
    }
    $scope.Methods.OnContactUpdated = function(data){ [3]
      if (window.location.hash == '#/') { $scope.Methods.GetContacts(); return; }
      console.log(data);
      // Compare updated contact model
      var syncRequired = false;
      if (data.new_val.id == $scope.contact.id){
        if ((data.new_val.firstName != $scope.contact.firstName) || (data.new_val.lastName != $scope.contact.lastName) || (data.new_val.email != $scope.contact.email) || (data.new_val.age != $scope.contact.age)){
          syncRequired = true;
        }
      }
      if (syncRequired){
        var update = confirm("The current contact has been updated. would you like to update your page?")
        if (update){
          // two options - look through form's unchanged fields and update the unchanged fields only using angular's built-in methods
          // simpler way - update the whole model at once.
          $scope.contact = data.new_val;
        }
      }
    };
  }
]);

[1] – The OnServerEvent method routes to the OnContactDeleted or OnContactUpdated based on the event type.

[2] – The OnContactDeleted method lets the users on the home page know a resource was deleted and trigger the list of contacts to be reloaded. If a user is currently accessing the deleted resource, the user will be notified the resource is no longer available and be sent to the homepage.

[3] – The OnContactUpdated method lets the users on the home page know a resource was updated and trigger the list of contacts to be reloaded. If the user is currently accessing the updated resource, the user will be notified if they want to get the latest version and continue working.

Development Tips:

During local development, it’s possible to spin up new instances of the NodeJS server listening on different ports: eg: PORT=1234 supervisor app.js (or PORT=1234 node app.js)

Screenshots:

Homepage
Figure 2: Homepage
update notification
Figure 3: Update Notification
deleted notification
Figure 4: Delete Notification

 

Suggestions on existing implementations:

  • It’s possible one might ask how to make use of RethinkDB without having to build a new application. It’s possible to create a service that uses RethinkDB primarily as an event broadcaster and use it as an update provider to all dependent services.
  • Using RethinkDB benefits an application where servers are not only linearly scaled instances of the same piece of code (eg. web server or api service) but also when a variety of services depend on the changes happening in the database.

References:
RethinkDB – https://www.rethinkdb.com/
NodeJS – https://nodejs.org/en/
SocketIO – http://socket.io/
Github Source Code – https://github.com/saichaitanya88/pushnotification.prototype

Advertisements

One thought on “Real-time Push Notifications in a Distributed Environment with NodeJS, RethinkDB and Socket.IO

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s