In this article we will discuss how to combine socket.io with RxJs Observables in Angular.

We will implement a simple chat feature using socket based communication between client and server. Socket.io will take care of setting up the socket, but we will be using observables to receive and distribute chat messages from the server.

The goal is to build a chat server where we can open multiple browser windows and be able to send chat messages between the different browsers.

Server

First we will define a simple server using nodeJs/express.

let app = require('express')(); let http = require('http').Server(app); let io = require('socket.io')(http); io.on('connection', (socket) => { console.log('user connected'); socket.on('disconnect', function(){ console.log('user disconnected'); }); socket.on('add-message', (message) => { io.emit('message', {type:'new-message', text: message}); }); }); http.listen(5000, () => { console.log('started on port 5000'); });

The server is responsible for managing connections and notifying connected clients of new chat messages.

Client

In order to open a channel between client and server we have to initiate a socket connection from the client.

We want to encapsulate the socket communication in an observable, so the first step is to create ChatService where we define our observable.

import { Subject } from 'rxjs/Subject'; import { Observable } from 'rxjs/Observable'; import * as io from 'socket.io-client'; export class ChatService { private url = 'http://localhost:5000'; private socket; sendMessage(message){ this.socket.emit('add-message', message); } getMessages() { let observable = new Observable(observer => { this.socket = io(this.url); this.socket.on('message', (data) => { observer.next(data); }); return () => { this.socket.disconnect(); }; }) return observable; } }

The observable is configured to emit a value every time we receive a new chat message from the server. By calling observer.next(), subscribers to our observable will be notified of the new message.

We want to be able to disconnect from the chat, so our observable returns a subscription object that we can use to unsubscribe and break the connection.

Next we will create a component for the chat application.

import { Component, OnInit, OnDestroy } from '@angular/core'; import { Control } from '@angular/common'; import { ChatService } from './chat.service'; @Component({ moduleId: module.id, selector: 'chat', template: `<div *ngFor="let message of messages"> {{message.text}} </div> <input [(ngModel)]="message" /><button (click)="sendMessage()">Send</button>`, providers: [ChatService] }) export class ChatComponent implements OnInit, OnDestroy { messages = []; connection; message; constructor(private chatService:ChatService) {} sendMessage(){ this.chatService.sendMessage(this.message); this.message = ''; } ngOnInit() { this.connection = this.chatService.getMessages().subscribe(message => { this.messages.push(message); }) } ngOnDestroy() { this.connection.unsubscribe(); } }

The UI is simple with a just a textbox and a button. However, the important part to pay attention to is the subscription to the observable returned from chatService.getMessages().

The observable will emit a new value for every new chat message and update our list of displayed messages.

Any message sent via the UI is distributed to all connected clients – including the client who sent the message.

I have put the source code on Github.