In this article I will demonstrate how we can use Observables to implement a basic pub sub example.

Angular 1.x supports custom event handling, but I always thought the implementation was a bit cumbersome. Through functions like emit and broadcast you can trigger custom events, but you have keep in mind which direction the event travels in the object hierarch – up or down. You also have to consider the reach of the event based on whether it was triggered via $rootScope or regular $scope.

Angular is really pushing the concept of Observables, so in this article I will show how to use an Observable to trigger custom events in order to implement basic pub sub.

I am still new to Observables, so I am far from an expert, but I mainly view it as an alternative way of looking at asynchronous event processing. The RxJs community has presented the idea that any series of events can be modeled as one or many asynchronous arrays. The array comparison is obvious since the Observable API is heavily influenced by standard array functions like map, forEach and filter. In fact it feels a bit like a lodash for anything asynchronous.

Starting to think in terms of Observables is a bit of a pivot, but the underlying concept is relatively straightforward and consists of two parts - Observer and Observable. The names are similar, but an Observable is something that emits events that can be observed by an Observer. In short the Observer observes and the Observable is producing events that are observed. The key idea is that the Observer may subscribe to events from the Observable for as long has the Observer is interested in the events.

RxJs offers a construct called Subject which is an object that can double as both an Observer and an Observable. In order to encapsulate the emitted event I have sub classed Subject and created my own CustomerEventEmitter class. It's technically not necessary to sub class Subject, but I like to encapsulate the event and control the API.

In my sample I have included a producer component where my Observable emits that a customer has been created. On the other side there is a consumer component where customers are processed.

I have wrapped the CustomerEventEmitter in the simple service shown below:

import {Subject } from 'rxjs/Subject'; import {Customer} from './customer'; export class CustomerEventEmitter extends Subject<Customer>{ constructor() { super(); } emit(value) { super.next(value); } } import {CustomerEventEmitter} from './customer-event-emitter'; export class PubSubService{ Stream:CustomerEventEmitter; constructor(){ this.Stream = new CustomerEventEmitter(); } }

The service is very simple since all it does it expose a stream of Customer objects. Keep in mind this Stream is the underlying Subject, which represents both Observer and Observable.

On the producer side I am calling the emit function to publish a new Customer.

import {Component,Input} from '@angular/core'; import {PubSubService} from './pub-sub-service'; import {Customer} from './customer'; @Component({ selector: 'producer', templateUrl: './components/pub-sub/producer.html' }) export class Producer { @Input() firstName = ''; @Input() lastName = ''; constructor(private pubSubService:PubSubService){ } createCustomer(){ let customer = new Customer(); customer.firstName = this.firstName; customer.lastName = this.lastName; this.pubSubService.Stream.emit(customer); } }

Finally on the subscriber side I am subscribing to the stream of Customer objects by calling subscribe().

import {Component,OnInit} from '@angular/core'; import {PubSubService} from './pub-sub-service'; import {Customer} from './customer'; @Component({ selector: 'consumer', templateUrl: './components/pub-sub/consumer.html' }) export class Consumer implements OnInit{ processed = []; subscription = null; constructor(private pubSubService:PubSubService){ } ngOnInit(){ this.subscription = this.pubSubService.Stream.subscribe(customer => this.processCustomer(customer)); } processCustomer(customer){ this.processed.push(customer); } stopProcessing(){ this.subscription.unsubscribe(); } }

The subscription will last for as long as you want, but you may stop it at any time by calling unsubscribe. You can have multiple subscriptions and unsubscribe them independently.

Anyway, this example just shows you the tip of the iceberg when it comes to the capabilities of Observables. In future posts I will try to include more advanced scenarios.

As always my code is available as both a live demo and full source code.