MEAN Stack – Avoid callback hell with RxJS

What is RxJS?

RxJS is a JavaScript library for reactive programming using Observables to simplify composing asynchronous code. Roughly put, the library allows subscribing to a stream of values (Observable) that change overtime and execute methods based on the changes in value.

What is callback hell?

Simply put, callback hell is the result of nesting too many success and error callback methods in asynchronous JavaScript code. To learn more, please read http://callbackhell.com/

What about Promises?

Promises alleviate the pain of using callbacks by chaining function calls to asynchronous methods.  However, when one method depends on execution of multiple asynchronous methods, nesting method calls is inevitable.

Enough with the 101, let’s see a use case:

We have an event registration form that allows users to register to my online event. The registration form contains the following fields: FirstName, LastName, EmailAddress,Country and Office

Requirement #1 – Users cannot register twice. User email address must be unique.

import * as express from "express";
import * as HttpStatus from "http-status-codes";
import * as mongodb from "mongodb";

function registerUser(req: express.Request, res: express.Response, next: express.NextFunction) {
  let user = req.body;
  let onError = (error) => { handleError(error, res) };
  findUser(user, res, insertUser, onError);
}

function findUser(user: any, res: express.Response, onFindSuccess: Function, onFindError: Function) {
  mongodb.MongoClient.connect("mongodb://localhost:27017/events").then(db => {
    db.collection("users").findOne({ emailAddress: user.emailAddress }).then(user => {
      if (user) {
        onFindSuccess(user, res, onFindError);
      }
      else {
        onFindError("Email already exists")
      }
    }).catch(error => onFindError(error));
  }).catch(error => onFindError(error));
}
function insertUser(user: any, res: express.Response, onInsertError: Function) {
  mongodb.MongoClient.connect("mongodb://localhost:27017/events").then(db => {
    db.collection("users")
      .insert(user)
      .then(user => {
        res.status(HttpStatus.CREATED).send("Registration Created");
      })
      .catch(error => onInsertError(error, res));
  }).catch(error => onInsertError(error, res));
}

function handleError(error: any, res: any) {
  res.status(HttpStatus.INTERNAL_SERVER_ERROR).send("Server Error");
}
Requirement #2 – Okay, that was not so bad. Now, how about also validating that countryId and officeId are also valid.
import * as express from "express";
import * as HttpStatus from "http-status-codes";
import * as mongodb from "mongodb";
function registerUser(req: express.Request, res: express.Response, next: express.NextFunction) {
  let user = req.body;
  let onError = (error) => { handleError(error, res) };
  findOffice(user, res, onError);
}
function findOffice(user: any, res: express.Response, onFindError: Function) {
  mongodb.MongoClient.connect("mongodb://localhost:27017/events").then(db => {
    db.collection("offices").findOne({ _id: new mongodb.ObjectID(user.officeId) })
    .then(office => {
      if (office) {
        findCountry(user, res, onFindError);
      }
      else {
        onFindError("Bad Request");
      }
    }).catch(error => onFindError(error));
  }).catch(error => onFindError(error));
}
function findCountry(user: any, res: express.Response, onFindError: Function) {
  mongodb.MongoClient.connect("mongodb://localhost:27017/events").then(db => {
      db.collection("countries").findOne({ _id: new mongodb.ObjectID(user.countryId) })
        .then(office => {
          if (office) {
            findUser(user, res, onFindError);
          }
          else {
            onFindError("Bad Request");
          }
        }).catch(error => onFindError(error));
  }).catch(error => onFindError(error));
}
function findUser(user: any, res: express.Response, onFindError: Function) {
  mongodb.MongoClient.connect("mongodb://localhost:27017/events").then(db => {
    db.collection("users")
      .findOne({ emailAddress: user.emailAddress })
      .then(user => {
        if (user) {
          insertUser(user, res, onFindError);
        }
        else {
          onFindError("Email already exists")
        }
      }).catch(error => onFindError(error));
  }).catch(error => onFindError(error));
}
function insertUser(user: any, res: express.Response, onInsertError: Function) {
  mongodb.MongoClient.connect("mongodb://localhost:27017/events").then(db => {
    db.collection("users")
      .insert(user)
      .then(user => {
        res.status(HttpStatus.CREATED).send("Registration Created");
      })
      .catch(error => onInsertError(error, res));
  }).catch(error => onInsertError(error, res));
}
function handleError(error: any, res: any) {
  res.status(HttpStatus.INTERNAL_SERVER_ERROR).send("Server Error");
}

You can see that things are starting to get a little insane.. What if I need to add one more validation? This type of code can become increasingly entangled with each modification.

Let’s try this again with RxJS:

Requirement #1 – Users cannot register twice. User email address must be unique.

import * as express from "express";
import * as HttpStatus from "http-status-codes";
import * as mongodb from "mongodb";
import * as rx from "rxjs";

function registerUser(req: express.Request, res: express.Response, next: express.NextFunction) {
  let user = req.body;
  findUser(user).subscribe(success => {
    insertUser(user).subscribe(success => {
         res.status(HttpStatus.OK).send("Request Created");
      }, error => {
         res.status(HttpStatus.INTERNAL_SERVER_ERROR).send("Server Error");
      });
    }, error => {
       res.status(HttpStatus.INTERNAL_SERVER_ERROR).send("Server Error");
    });
}

function insertUser(user: any) {
  return rx.Observable.create((observer: rx.Observer<any>) => {
    mongodb.MongoClient.connect("mongodb://localhost:27017/events")
      .then((db: mongodb.Db) => {
        db.collection("users").insert(user)
          .then(result => {
            observer.next(result);
            observer.complete();
          }).catch((error) => onError(observer, error));
        }).catch((error) => onError(observer, error));
    });
}

function findUser(user: any): rx.Observable<any> {
  return rx.Observable.create((observer: rx.Observer<any>) => {
    mongodb.MongoClient.connect("mongodb://localhost:27017/events")
      .then((db: mongodb.Db) => {
        db.collection("users").findOne({ emailAddress: user.emailAddress })
          .then(result => {
            observer.next(result);
            observer.complete();
          }).catch((error) => onError(observer, error));
      }).catch((error) => onError(observer, error));
});
}

function onError(observer: rx.Observer<any>, error: any) {
  observer.error(error);
  observer.complete();
}

function handleError(error: any, res: any) {
  res.status(HttpStatus.INTERNAL_SERVER_ERROR).send("Server Error");
}

Requirement #2 – Now, how about also validating that countryId and officeId are also valid. This is where RxJS really shines.

import * as express from "express";
import * as HttpStatus from "http-status-codes";
import * as mongodb from "mongodb";
import * as rx from "rxjs";

function registerUser(req: express.Request, res: express.Response, next: express.NextFunction) {
  let user = req.body;
  let countryValidations = validateCountry(user.countryId);
  let officeValidations = validateOffice(user.officeId);
  let userValidations = findUser(user);

  // Validate all my rules
  let validations = rx.Observable.forkJoin(countryValidations, officeValidations, userValidations);

  // If all validations pass
  validations.filter(results => results.every(valid => valid))
    .subscribe(
      results => {
        insertUser(user).subscribe(
          result => res.status(HttpStatus.CREATED).send("Registration Created"),
          error => handleError(error, res)
        );
      },
      error => handleError(error, res)
  );

  // If any validations fail
  validations.filter(results => results.some(valid => !valid))
  .subscribe(results => {
    res.status(HttpStatus.BAD_REQUEST).send("Bad Request");
  });
}

function validateOffice(officeId: any): rx.Observable<boolean> {
  return rx.Observable.create((observer: rx.Observer<any>) => {
    mongodb.MongoClient.connect("mongodb://localhost:27017/events")
      .then((db: mongodb.Db) => {
        db.collection("offices").findOne({ _id: new mongodb.ObjectID(officeId) })
          .then(result => {
          observer.next(result);
          observer.complete();
        }).catch((error) => onError(observer, error));
    }).catch((error) => onError(observer, error));
  });
}

function validateCountry(countryId: any): rx.Observable<boolean> {
  return rx.Observable.create((observer: rx.Observer<any>) => {
    mongodb.MongoClient.connect("mongodb://localhost:27017/events")
      .then((db: mongodb.Db) => {
        db.collection("countries").findOne({ _id: new mongodb.ObjectID(countryId) })
          .then(result => {
            observer.next(result);
            observer.complete();
      }).catch((error) => onError(observer, error));
    }).catch((error) => onError(observer, error));
  });
}

function insertUser(user: any) {
  return rx.Observable.create((observer: rx.Observer<any>) => {
    mongodb.MongoClient.connect("mongodb://localhost:27017/events")
      .then((db: mongodb.Db) => {
        db.collection("users").insert(user)
          .then(result => {
             observer.next(result);
             observer.complete();
       }).catch((error) => onError(observer, error));
     }).catch((error) => onError(observer, error));
  });
}

function findUser(user: any): rx.Observable<any> {
  return rx.Observable.create((observer: rx.Observer<any>) => {
    mongodb.MongoClient.connect("mongodb://localhost:27017/events")
      .then((db: mongodb.Db) => {
        db.collection("users").findOne({ emailAddress: user.emailAddress })
          .then(result => {
            observer.next(result);
            observer.complete();
          }).catch((error) => onError(observer, error));
     }).catch((error) => onError(observer, error));
  });
}

function onError(observer: rx.Observer<any>, error: any) {
  observer.error(error);
  observer.complete();
}

function handleError(error: any, res: any) {
  res.status(HttpStatus.INTERNAL_SERVER_ERROR).send("Server Error");
}

By utilizing RxJS, we were able to decouple our validation methods so that each validation can happen independently and concurrently. We also made it possible to add or remove any validations in the future. Additionally, we can filter all validations so that the user is registered only when all validations fail, and alternatively we can return an error as soon as the first validation fails.

References:

RxJS – https://github.com/Reactive-Extensions/RxJS
JavasScript Callback Hell – http://callbackhell.com/
JavaScript Promises – https://developers.google.com/web/fundamentals/getting-started/primers/promises