November 15, 2014 · javascript pub-sub design-patterns

Publish/Subscribe design pattern

Before we begin checkout working Pub/Sub tutorial from https://github.com/svlada/Sargarepa or download complete source code as zip archive. After obtaining the source code, open /Sargarepa/pubsub.html in your browser.

Table of contents:

  1. Introduction
  2. Publish/Subcribe design pattern in javascript
  3. Source code
  4. References

Introduction

Publish/Subcribe design pattern is built around simple concept. Essentially, we need three different entities: Publishers, Subscribers and EventBroker/Mediator.

Subcribers can subscribe to one or multiple channels / topics, and receive messages from multiple sources without the knowledge about publishers.

Publisher can publish messages / topics, without any knowledge about subscribers.

EventBroker / Mediator provides decoupling of publishers and subscribers.

Publish / Subcribe design pattern in javascript

I am going to explain implementation from top to bottom, revealing implementation details as we go.

We want to create one publisher and two subscriber objects for this example. EventBroker has responsability to manage subcriber objects and route messages from publisher to subscribers.

 window.pubSub = new PubSub(); // Mediator-EventBroker

 var inputStream = new InputStream(""); // Publisher

 var mailBox = new MailBox("online"); // Subcriber 1
 var mobileInbox = new MailBox("mobile inbox"); // Subcriber 2

Please note that we have placed EventBroker on global namespace. You should attach EventBroker to your application namespace. What other options do we have?

  1. We could inject EventBroker into every Publisher instance through setter / constructor.
  2. Publisher object can extend EventBroker and inherit the publish method.

How to publish message to message bus / channel?

InputStream is object that will serve as Publisher. After InputStream receives message, it should be able to send message to message bus / channel. Event Broker will broadcast received message to all subscribers subscribed to that channel.

 inputStream.receiveMessage("First message");

Let's see how our Publisher object look.

 var InputStream = function(message) {
     this.message = message || "";
 }

 InputStream.prototype.receiveMessage = function(message) {
     this.message = message;
 }

We could alter [InputStream.prototype.receiveMessage] method in the following way:

 InputStream.prototype.receiveMessage = function(message) {
     this.message = message;
     pubSub.publish('new message', { message: this.message });
 }

This approach will work. But we don't want to couple publish logic with Publisher source code. Nicer solution would be to mark arbitary Publisher method as "publishable" dynamically at runtime. Goal is to register callback function that will be called every time after execution of any "publishable" method, so we basically need to mimic AOP for javascript [PubSub.prototype.publishAfter].

 PubSub.prototype.publishAfter = function(target, targetFn, fn) {
     var temp = target[targetFn];
     target[targetFn] = function() {
         temp.apply(target, arguments);
         fn.apply(target, arguments);
     }
 }

How to use [PubSub.prototype.publishAfter] method?

You need to pass following arguments to [PubSub.prototype.publishAfter] method:

  1. target object/context - Object that contain publishable method.
  2. targetFn - This is name of the function we want to call. This parameter is passed as string value.
  3. callback - Function that will be called everytime after execution of publishable method.
 window.pubSub = new PubSub(); // Mediator-EventBroker 
 var inputStream = new InputStream(""); // Publisher

 pubSub.publishAfter(inputStream, "receiveMessage", function() {
     pubSub.publish('new message', { message: this.message });
 });

 inputStream.receiveMessage("First message");
 inputStream.receiveMessage("Second message");
 inputStream.receiveMessage("Third message");

You can make publishable any method in global / application namespace. Just provide correct context.

 window.fnPublisher = function (temp) {
     this.message = temp;
     p("I am publisher from namespace");
 }

 pubSub.publishAfter(this, "fnPublisher", function() {
     pubSub.publish('new message', { message: this.message });
 });

At this point we have completed logic needed on Publisher side. Now we are going to add one Subcriber object.

 var MailBox = function(type) {
     this.type = type;
     this.inbox = [];
 }

 MailBox.prototype.getInbox = function() {
     return this.inbox;
 }

 MailBox.prototype.setInbox = function(data) {
     p('Hey I am updated: ');
     p(data);
     return this.inbox.push(data);
 }

Subcriber doesnt have any knowledge about Publisher. We are going to add subscribe method onto the EventBroker prototype.

 window.pubSub = new PubSub(); // Mediator-EventBroker

 var mailBox = new MailBox("online"); // Subscriber 1
 var mobileInbox = new MailBox("mobile inbox"); // Subscriber 2

 pubSub.subscribe('new message', mailBox.setInbox, mailBox);
 pubSub.subscribe('new message', mobileInbox.setInbox, mobileInbox);

Method 1[PubSub.prototype.subscribe]` receives three arguments:

  1. topic/channel - First argument is channel name. Subscriber can subscribe to one or multiple topics.
  2. callback - Subscriber register callback function that will be called when new message arrives to message bus / channel
  3. target - Subscriber object

EventBroker(PubSub) maintain list of channels. Every channel has a name and an associated collection of subscribers.

 // Event Broker
 var PubSub = function() {
     this.channels = {};
 };

Most imporant part of EventBroker are publish and subscribe methods. In order to understand better the structure of Event broker let's see what the collection of channels may look at runtime when it is populated with topics and subscribers.

 this.channels = {
     "new message": [{ callback: function test() {}, target: MailBox }, {callback: fnSubscriber, target: null}],
     "channel-1": [...],
     "channel-2": [...],
 }

We can register method from global / application namespace or object method to be called on subscriber's part.

 PubSub.prototype.subscribe = function(topic, callback, target) {    
     if (!this.channels[topic]) {
         this.channels[topic] = [];
     }
     this.channels[topic].push({ callback: callback, target: target || null });
 }

Publish method traverses through subscriber collection and fires registered callbacks.

 PubSub.prototype.publish = function(topic, data) {
     for (var i = 0, len = this.channels[topic].length; i < len; i++) {
         var obj = this.channels[topic][i];
         if (obj.target) {
             createDelegate(obj.callback, obj.target)(data);
         } else {
             obj.callback(data);
         }
     }
 }

In order to bind context(this) to callback function we are going to create createDelegate factory function. Javascript doesn't have built-in delegate objects that can have function and "this" parameter. This behavior is easy to emulate. We pass function and target object as arguments to our createDelegate function, and as a result new function that will call original function with correct context is returned.

 function createDelegate(fn, target) {
     return function() {
         return fn.apply(target, arguments);
     }
 }

Subscriber can unsubscribe from any topic at any time by removing its callback from subscriber's queue.

 PubSub.prototype.unsubscribe = function(topic, callback, target) {
     for (var i = 0, len = this.channels[topic].length; i < len; i++) {
         var obj = this.channels[topic][i];

         if (target && obj["target"] == target && obj["callback"] == callback) {
             p('Removing delegate function');
             this.channels[topic].splice(i, 1);
             break;
         } else if (obj["callback"] == callback) {
             p('Removing regular function');
             this.channels[topic].splice(i, 1);
             break;
         }
     }
 }

Download complete source code as zip archive or checkout source from github. After you have obtained source, open /Sargarepa/pubsub.html in your browser.

Final notice: Working implementation uses RequireJS in order to organize project in smaller modules. I strongly encourage you to use RequireJS for your javascript project.

Source code listing

EventBroker

 "use strict"
 define(['app/util'], function(Util) {
     var PubSub = function() {
         this.channels = {};
     };

     PubSub.prototype.subscribe = function(topic, callback, target) {    
         if (!this.channels[topic]) {
             this.channels[topic] = [];
         }
         this.channels[topic].push({ callback: callback, target: target || null });
     }

     PubSub.prototype.publish = function(topic, data) {
         for (var i = 0, len = this.channels[topic].length; i < len; i++) {
             var obj = this.channels[topic][i];
             if (obj.target) {
                 Util.createDelegate(obj.callback, obj.target)(data);
             } else {
                 obj.callback(data);
             }
         }
     }

     PubSub.prototype.unsubscribe = function(topic, callback, target) {
         for (var i = 0, len = this.channels[topic].length; i < len; i++) {
             var obj = this.channels[topic][i];

             if (target && obj["target"] == target && obj["callback"] == callback) {
                 p('Removing delegate function');
                 this.channels[topic].splice(i, 1);
                 break;
             } else if (obj["callback"] == callback) {
                 p('Removing regular function');
                 this.channels[topic].splice(i, 1);
                 break;
             }
         }
     }

     PubSub.prototype.publishAfter = function(target, targetFn, fn) {
         var temp = target[targetFn];
         target[targetFn] = function() {
             temp.apply(target, arguments);
             fn.apply(target, arguments);
         }
     }
     return (PubSub);
 });

Publisher

 "use strict"
 define(function() {
     var InputStream = function(message) {
         this.message = message || "";
     }

     InputStream.prototype.receiveMessage = function(message) {
         this.message = message;
         // pubSub.publish('new message', { message: this.message });
     }

     return (InputStream);
 });

Subscriber

 "use strict"
 define(function() {
     var MailBox = function(type) {
         this.type = type;
         this.inbox = [];
     }

     MailBox.prototype.getInbox = function() {
         return this.inbox;
     }

     MailBox.prototype.setInbox = function(data) {
         p('Hey I am updated: ');
         p(data);
         return this.inbox.push(data);
     }

     return (MailBox);
 });

Util

 "use strict"
 define(function() {
     function createDelegate(fn, target) {
         return function() {
             return fn.apply(target, arguments);
         }
     }

     return {
         createDelegate: createDelegate
     }
 });

Code for testing EventBroker, Publisher and Subscriber

 require(['./common'], function (common) {

     require(['app/pubsub', 'app/util', 'app/mailbox', 'app/inputstream'], function(
         PubSub, Util, MailBox, InputStream) {

         window.pubSub = new PubSub();

         var inputStream = new InputStream("");

         var mailBox = new MailBox("online");
         var mobileInbox = new MailBox("mobile inbox");

         pubSub.subscribe('new message', mailBox.setInbox, mailBox);
         pubSub.subscribe('new message', mobileInbox.setInbox, mobileInbox);

         pubSub.publishAfter(inputStream, "receiveMessage", function() {
             pubSub.publish('new message', { message: this.message });
         });

         inputStream.receiveMessage("First message");

         pubSub.unsubscribe('new message', mailBox.setInbox, mailBox);
         pubSub.unsubscribe('new message', mobileInbox.setInbox, mobileInbox);

         p(pubSub);

         function fnSubscriber(data) {
             p('Hey I have recieved that message too');
             p(data);
         };

         // Test Global function as subscriber
         pubSub.subscribe('new message', fnSubscriber);
         inputStream.receiveMessage("Druga poruka");
         // pubSub.unsubscribe('new message', fnSubscriber);

         p(pubSub);

         // Test aop on function on global namespace
         window.fnPublisher = function (temp) {
             this.message = temp;
             p("I am publisher from namespace");
         }

         pubSub.publishAfter(this, "fnPublisher", function() {
             pubSub.publish('new message', { message: this.message });
         });

         fnPublisher("aop on global namespace published message");

     });     
 });

References

  1. Patterns For Large-Scale JavaScript Application Architecture
  2. Learning JavaScript Design Patterns
  • LinkedIn
  • Tumblr
  • Reddit
  • Google+
  • Pinterest
  • Pocket
Comments powered by Disqus