You are currently viewing Understanding Microservices Communication using Message Broker

Understanding Microservices Communication using Message Broker

In my first post, I have shared my understanding of the Microservices. This post can be found here Understanding Microservices

Microservices Communication

When we break a larger application into smaller microservices, then sooner or later you will face a situation where several microservices needs to communicate with each other. There are different possibilities of inter microservices communication and it can be categorized in following:-

  1. Synchronous or Point to Point Communication
  2. Responding to Events
  3. Responding to Messages

Synchronous or Point to Point Communication

This type of communication is pretty straigh forward. In this type of communication Microservices communicates with each other directly. It can be represented as below diagram:-

Point to Point Async Communication

Note

Please consider following points while implementing synchronous or point to point communication.

    • In point to point communication, services are coupled with each other, thus services becomes dependent to each other. That is one of the reason, that in world of microservices, this communication is not recommended.
    • Due to the coupling of services, the entire system becomes unstable in case if any dependent service is down for any reason.
    • If this communication is must, implement fault tolerance, circuit breaker and any other approach which gives you confidence that it will handle the service failure gracefully.

Responding to Events and Responding to Message

In order to reduce the coupling between services, the other communication mechanism is to implement asynchronous communication. In this type of communication, we have further two options that work in the same way and these are:-

  • Responding to Events
  • Responding to Messages

In both above methods,  services communicate with each other through events and messages and avoid direct communications between them. This type of communication is helpful when an immediate response is not needed. 

Responding to Events

In this communication mechanism, a service publishes an event to a queue. Once the events are published, the subscriber(s) of those events will pickup the events from the queue and will perform their designated tasks. It can be depicted as following image:-

Event Driven Communication

In the above diagram, Microservice A is a service that is performing a certain task. Whenever a new customer is created, it raises an event and publishes it into a queue inside the message broker system. The other microservice(s) which is/are interested in an event when a new customer is created and is subscribed to this “new customer created the event”. When a new customer is created, each subscribed microservice picks up the newly created customer and does its job. No matter what, the publisher is not aware that what other services are doing against that event.

Responding to Messages

Responding to a message is almost identical to responding to events. However, it is a broader term and it has many communication mechanisms which also include point-to-point communication. In “responding to events”, the publisher just publishes an event and it is not aware of the services which are subscribed to the events and it even does not need that.

In the “responding to message” mechanism, the publisher might be aware of the subscribed services. In this mechanism, a service can ask another service to invoke some certain action. This request can be point-to-point or through a message broker. 

There are some other differences as well, but to make things simple, both are ways to decouple the services from each other, which is one of the biggest features in Microservice architecture.

Message Broker Framework

In async communication, I have mentioned message broker too many times. Message broker frameworks are the tools that we used to publish events, publish messages. It provided several features for publishing messages and events usually queues. It also facilitates to implement the fan-out pattern. I will write a separate post on the fan-out pattern, but for now, it’s sufficient to know that in this pattern the event is broadcasted against a “Topic” and is consumed by many parties.

There are many message broker frameworks available and some are given below:-

  • Apache Kafka
  • RabbitMQ
  • Amazon SNS and Amazon SQS
  • Azure Bus Service
  • etc

Hands-on

In order to follow the hands-on with me, make sure you have following on your machine:-

  • RabbitMQ Server
  • Node.js
  • Visual Code or any editor of your choice.

Create a Queue in RabbitMQ

Our first step is to create a queue. I will create a queue named “demo-queue” as shown below:-

Creating a queue in RabbitMQ

Open up your RabbitMQ server and click on the Queues tab. It will present you with a screen that would look like the above image. The names of queues may be different on your server. Click on the “Add a new queue“. Clicking on it, will show a new section, which will ask for the different inputs which are required to create a queue. This screen is given below:-

Provide the name of the Queue and click on Add Queue Button

In the “Name”, I entered “demo-queue” and left everything as default. After entering the “Name”, I clicked on the “Add queue” button and it will create a new queue. The newly created queue will start appearing in the queues section as shown in the below image:-

demo-queue is added

Create a Node.js Application (Publisher)

Now I will create a Node.js application. In this application, I will write code to put a message in the newly created queue. The steps to create a Node.js application are pretty simple. In case you want to get a refresher, please select the folder of your choice and use the following command to create a node.js application.

$ npm init -y

Install dependencies

In order to communicate with the “RabbitMQ” server, I need a package named “amqplib”. This package provides useful features to interact with “RabbitMQ”. If you are interested to know more about it, visit the official documentation. This package is installed just like any other package in a Node.js application as shown below.

npm install amqplib
npm install dotenv

I’ve also installed “dotenv” package in order to read the data from the environment file. The package.json file is now showing my installed dependencies as shown below:-

  "dependencies": {
    "amqplib": "^0.10.3",
    "dotenv": "^16.3.1",
  },
  "devDependencies": {
    "nodemon": "^3.0.1"
  }

Setup Project Structure

  • Create a folder named “src”
  • Add a file named “app.js” at the root of the src folder
  • Add the environment file and add the following variables to it
    • username=”your admin user”
    • password=”password of your admin user”
    • queue_name = “demo-queue”
    • RABBITMQ_HOST = “URL of your rabbitMQ server”
    • RABBITMQ_PORT=”5672″
  • Add a folder named “messagebrokder” in the src folder
  • Add a file named “message-broker-rabbitmq.js

After performing the above, your project structure would look like following:-

 

Project Structure

Code to Establish Connection and Getting Channel from RabbitMQ

In order to communicate with the rabbit mq server, we need to establish a connection and a communication channel. In order to achieve it, please copy the following code and paste it into the message-broker-rabbitmq.js file

import amqp from "amqplib" // imported this package to induct connection and interaction with the rabbitmq
// Write a method which is responsible to connect with the rabbitmq-server
async function connectWithRabbitMqAndGiveChannel(){
    try {
            const username = process.env.username;
            console.log(`User Name is ${username}`);
            const password = process.env.password;
            console.log(`password is ${password}`)
            const host = process.env.RABBITMQ_HOST;
            console.log(`RabbitMQ Server URL  is ${host}`)
            const port = process.env.RABBITMQ_PORT;
            console.log(`RabbitMQ Server Port is ${port}`)
            const url = `amqp://${username}:${password}@${host}:${port}`;
            console.log(`Complete connection String is ${url}`)
            const connection = await amqp.connect(url);
            connection.on('error', (err)=>{
                console.error(`Error ${err} while establishing the connection with rabbitmq`)
            });
            const channel = await connection.createChannel();
            channel.on('error', (err)=>{
                console.error(`There error ${err} occured while creating the channel`);
            });
            return channel;
         } catch (error) {
        console.error('Error occured:', error);
      }
}
Description of the Code
Establish Connection

In the above-provided code, the code till line 15 is self-explanatory. i.e. it picks up the values against some variables and stores those values in variables in the code. 

To connect with the “rabbitmq” server, I have used the connect method (on line # 15) which is provided by the “amqplib” package. This connecting method accepts the URL of the rabbitmq server. I’ve created this URL by getting the values from the environment file. This “connect”  is a promising API and returns a promise object, with the resolved values. I’ve used the keyword “await” in order to call the connect method, because I want to get the values as soon as the promise is resolved. The connect method also accepts some other input parameters which are optional such as a socket. Please note that you and I do not need to remember the input parameters for amqplib or the input parameters of any methods of any package. This information or the details of the methods along with the input variables can be found in the documentation of the package. If you are interested in the details of the amqplib, you can consult their official documentation which can be found here.

 

Create Communication Channel

Once the connection is established, the next thing is to create a communication channel. To create the communication channel, amqplib provides a method in the connection object to create the channel. This method is also an async method and in the case of successful channel creation, it provides a resolved promise. Once the channel is created, the function returns the resolved promise, which I will use to send a message to the queue. This code is at line # 19 of the provided code.

I’ve also registered an event listener for both connect and channel for the ”error” event and printed it on the console screen. 

Method to Send Message to demo-queue

Now in the same file, named “message-broker-rabbitmq.js“, I will add a method whose responsibility would be to add a message to the demo queue which I have created in the rabbitmq server. To do that, I will again consult the official documentation of the amqplib package and found that there is a method named “sendToQueue“. This method is very simple and its details from the documentation is given below:- 

//Method to Send message to the queue
async function PutMessageToQueue(message)
{
    const queue_name = process.env.queue_name;
    let ch = null;
    try
    {
        ch = await connectWithRabbitMqAndGiveChannel();
    }catch(error){
        console.error(`An error is occured while connecting with the queue named ${queue_name}`);
        return;
    }
    try{
        ch.sendToQueue(queue_name, Buffer.from(message));
    }catch(error){
        console.error(`unable to send message to the queue ${queue_name}`)
        return;
    }
    console.log(`Message sent to the queue ${queue_name}`);
}

The above method, calls the previous method named “connectWithRabbitMqAndGiveChannel“.  In line # 14, it invokes the method “sendToQueue” by providing the name of the queue, which in my case is demo-queue as set in the environment file. The second parameter which this method accepts is the actual message which I want to store in the “demo-queue.” I’ve used the “Buffer.from” to pass the message to the queue so that a new buffer is created with the specified data being passed to it. At the very end, I have exported this newly created method i.e. PutMessageToQueue. The complete code is given below:-

import amqp from "amqplib" // imported this package to induct connection and interaction with the rabbitmq

// Write a method which is responsible to connect with the rabbitmq-server
async function connectWithRabbitMqAndGiveChannel(){
    try {
            const username = process.env.username;
            console.log(`User Name is ${username}`);
            const password = process.env.password;
            console.log(`password is ${password}`)
            const host = process.env.RABBITMQ_HOST;
            console.log(`RabbitMQ Server URL  is ${host}`)
            const port = process.env.RABBITMQ_PORT;
            console.log(`RabbitMQ Server Port is ${port}`)
            const url = `amqp://${username}:${password}@${host}:${port}`;
            console.log(`Complete connection String is ${url}`)
            const connection = await amqp.connect(url);
            connection.on('error', (err)=>{
                console.error(`Error ${err} while establishing the connection with rabbitmq`)
            });
            const channel = await connection.createChannel();
            channel.on('error', (err)=>{
                console.error(`There error ${err} occured while creating the channel`);
            });
            return channel;
         } catch (error) {
        console.error('Error occured:', error);
      }
}
//Method to Send message to the queue
async function PutMessageToQueue(message)
{
    const queue_name = process.env.queue_name;
    let ch = null;
    try
    {
        ch = await connectWithRabbitMqAndGiveChannel();
    }catch(error){
        console.error(`An error is occured while connecting with the queue named ${queue_name}`);
        return;
    }
    try{
        ch.sendToQueue(queue_name, Buffer.from(message));
    }catch(error){
        console.error(`unable to send message to the queue ${queue_name}`)
        return;
    }
    console.log(`Message sent to the queue ${queue_name}`);
}
export {PutMessageToQueue}

Publisher in Action

As we are ready with our basic implementation of establishing a connection with the rabbitmq server and also wrote our method to send data to the queue, now is the time to use it in our main application. 

For that purpose, open up the app.js file. Install express.js as I want to expose an endpoint that will post the data to the queue. The complete code of app.js is given below:-

import express, { json } from "express"
import { PutMessageToQueue } from "./messagebroker/message-broker-rabbitmq.js";
import dotenv  from "dotenv"

const PORT = "5059";
dotenv.config();


const app = express();
app.use(express.json());
//Write a Post method which accepts call the PutMessageToQueue from the request body
app.post("/", (req,res)=>{
    const message = req.body;
    PutMessageToQueue(JSON.stringify(message));    
    console.info("This code will be executed just after sending the call to send message to queue and will continiue doing its job");
    for(let i = 0; i < 100; i ++)
    {
        console.log(`${i} x 2 = ${i*2}`);
    }
    res.send(req.body);
});
app.listen(PORT, ()=>{
    console.info(`Server is listening on PORT ${PORT}`);
});

The above code is self-explanatory. It is basically doing the following things:-

  • It is running an express server
  • It has a post method. The request body will be passed to our method which is responsible for storing the message into the queue.

It is also logging a table of 2 on the console.

This step is just to make things clear that once we send our message to the “demo-queue”, our flow will continue to work, because storing of this message in the queue is asynchronous, and we are not dependent on some endpoint that will perform some task against our posted message. There will be some other service that would get the message from the queue and will be processing it. This service will be our consumer service.

demo-queue is empty

In the above image, we can see that there is no message stored in our demo-queue. Now lets run and see if the message get posted into the queue.

In the above video, I have created a simple payload in Insomnia. You can use any other tools which are available to test API such as Postman. The payload which I have created is a simple json having the following attributes:-

  • firstName
  • lastName
  • Country

Then I clicked on Send button. Please make sure that I am sending a POST call, as the endpoint in the express server which is calling the method to save messages into the queue is of type Post.

At the first attempt, It did not work because I forgot the run the node application 🙂

Once I started my server, the message was posted and the console also showed the same result.

Summary of hands-on

Upto this point, we are done with our publisher. It is a microservices that is posting a message to a rabbitmq queue. At this point in time, it is a simple POST endpoint, which gets the data from the request body and posted it to a queue. Besides that, it is also generating a multiplication table of “2”.  Although this endpoint is simple, in actuality, basic principles will remain the same, with some complex business logic.

You can also see that in this example, there is no consumer defined for the “demo-queue”. This means that although our publisher is sending messages into the queue, those messages are not processed yet as there is no consumer of the queue. If I bypass the queue and asynchronous communication and do a point-to-point call, our Node.js application will crash at the code where we will be calling our other API for processing the messages (in case of the consumer application is not up)

Why asynchronous communication using message brokers

Asynchronous communication using message brokers has several advantages and those advantages are also considerers as features of microservices. Some of the advantages are given below:-

Improved and better resiliency

Services continue their function if some of the services are down temporarily, as the services do not communicate with each other directly. Instead, they used intermediate queues to communicate with each other. Once a service that was down temporarily becomes available, it can start processing the messages from the queue. In my example, I did not even have the consumer of my queue, but still, my node application is up and running. Once I write my consumer application, it will start consuming messages from the queue.

Loose Coupling

By adding a queue as a buffer and avoiding direct communication between services, we achieved the loose coupling. 

Event-Driven Architecture

Asynchronous communication using queues is the foundation of Event-Driven architecture and by adopting it we can do real-time responses to the events, which can help to build dynamic and reactive systems.

Use Cases

From all of the above, we can easily identify the use cases of asynchronous communication, and that includes the following:-

  • Microservices communication
  • Bulk operation where immediate response is not needed. 
  • Distribution and management of tasks in queue
  • etc

Source Code

Leave a Reply