In my first post, I shared my understanding of how two services can communicate with each other asynchronously without having a direct dependency on each other. In that hands-on tutorial, I created an express node application having a single endpoint, which is used to put messages into the queue. That hands-on tutorial can be found here
Table of Contents
Microservices Communication - Receiving Messages
This hands-on is the second part of microservices communication where I will demonstrate how we can write code to receive a message from the queue. The demo application, which I will be creating is in C# and it will be performing the following tasks.
It will be a worker application
It will subscribe to the Receiving Events provided by a RabbitMQ.Client (i.e. a Nuget package that is used to communicate with the RabbitMQ)
It will be receiving messages from the queue
Create worker Application
The very first step is to create the project. For this purpose, I will create a new application of “worker” type. To create this type of application in Visual Studio Code, you can issue the following command:-
dotnet new worker
Add the required Package i.e. RabbitMQ.Client
In order to communicate with RabbitMQ, I need to add a Nuget package which is named “RabbitMQ.Client”. To add this package, use the following command:-
dotnet add package RabbitMQ.Client
Add the required namespaces
Since I will be using RabbitMQ.Client in my application, so I need to add these namespaces to my application. So I have added the following namespaces in my Worker.cs file
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
At this point, you can build your project to see if it’s building successfully. After that, open up your “worker.cs” file again and move toward a method named “ExecuteAsync“. This method is called when our worker application starts.
Utilize RabbitMQ.Client
In order to communicate with the RabbitMQ server, our steps are almost identical which I performed in my previous post, except with the difference that this time, I am doing code in C# instead if Node.js. Just to refresh, the following steps will be performed:-
Establish a Connection with RabbitMQ
Create Communication Channel
Create the targeted Queue
Create Consumer
Register a call back method against the Receiving event
Consumer Message
Establish a connection
The code to establish a connection with RabbitMQ using the “RabbitMQ.Client” package is given below:-
// Create connection with the server
var factory = new ConnectionFactory(){HostName = "localhost"};
var connection = factory.CreateConnection();
Hosting URLMy RabbitMQ is deployed on the same server, where I am developing this consumer application and that is why I mentioned HostName as "localhost". If you are following with me, you need to provide the URL of your RabbitMQ server, along with other parameters such as user name, password, and Port.
In the above code, I create the object of type “ConnectionFactory”. After Initialization, I called the CreateConnection method of the ConnectionFactory object to create the actual connection with the RabbitMQ.
Create Communication Channel
Once the connection is established, the next step is to create the communication channel. To create a communication channel, I will be calling a method “CreateModel” that is provided by the Connection. The code to create the communication channel is given below:-
// Create communication channel
IModel channel = connection.CreateModel();
Create the Queue
Once the channel is created, the next step is to create the queue, from where we will be getting messages. Since to communicate with RabbitMQ, we need the communication channel, so to create the queue, I will use the channel, which I have just created, and its code is given below:-
To create the queue, I called the QueueDeclare method provided by the communication channel. This method took some parameters which are “name of the queue, a boolean variable for durable, exclusive and for auto deletion and in the last arguments”
Why Queue is CreatedAlthough I already have the queue from where I need to get and process the message, it is recommended to create the queue in your code as well. The reason for doing it is because in the distributed environment you never know if the publisher has created the queue or not. So if we write the code on our end, we will have the confidence that we have the targetted queue. Because if the queue is not generated, our code will generate it and if the queue already exists on the server, it will not regenerate it.
Create the Consumer
Now at this point, I have everything which I require for the creation of the consumer. So it is a good time now, to create the consumer. To create the consumer, I will be utilizing an object EventingBasicConsumer. The code is given below:-
// Create Consumer
var _consumer = new EventingBasicConsumer(channel);
Subscribe to the "Received" events
I need to subscribe to the “Received” events, once the consumer is created. We need to point to the method which will be invoked always when a message is received. The code to subscribe to the Received event is given below:-
_consumer.Received += (p1, p2)=>{
var body = p2.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("Message received to consumer is: " + message);
};
The anonymous method I wrote in the above code, will be executed as soon as some message is received from the queue. In this method, we can do whatever we want with the received message i.e. we can store it in our database, we can send it to someone as an email, etc.
However, to keep things, simple, I have just printed it on the console screen.
Consumer Message(s)
Now, I have everything in place, such as what to do when a message is received. But in order to receive a message(s) from the RabbitMQ, I need to call a method named “BasicConsume“. This method is provided by the communication channel. The code is given below:-
Now our code is completed. If we run this application, we should be able to see the received message in the terminal as shown below:-
In the above image, I have posted a message on my RabbitMQ, by hitting an endpoint. As soon as the message is received in the queue, it was picked up by the consumer application and the message is printed on the terminal.
Complete Code
The entire code of “Worker.cs” file is given below:-
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace MessageConsumer;
public class Worker : BackgroundService
{
private readonly ILogger<Worker> _logger;
public Worker(ILogger<Worker> logger)
{
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// Establish connection
var factory = new ConnectionFactory(){HostName= "localhost"};
var connection = factory.CreateConnection();
// To Create the communication channel.
IModel channel = connection.CreateModel();
string queue_name = "demo-queue";
channel.QueueDeclare(queue: queue_name, durable: true, exclusive: false,
autoDelete: false,arguments: null);
var _consumer = new EventingBasicConsumer(channel);
_consumer.Received += (p1, p2)=>{
var body = p2.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("Message Received: " + message);
};
channel.BasicConsume(queue: queue_name, autoAck: true, _consumer = _consumer);
}
}
Source Code
The entire code of this worker application is uploaded on the GitHub. You can download it by clicking the following:-
Download Source Code from githib
Video
During the demo of this post, I recorded my screen and shared the compete video in the following. It is the first time, that I prepared the video, so please ignore the presentation and video quality in case it is not up to the mark.
Thanks for asking this question.
There are different ways to achieve the response back to publisher.
1- It could be point to point communication over the http. i.e Request response model. In this method, we however induct direct dependencies. This is needed in your posted scenario where we need immediate reaponse back to publisher. In this case it is always recommended to induce circuit breaker pattern.
2- The second method is quite interesting, i.e you want immediate response back to publisher and still you need asynchronous communication by having some message broker.
In this approach, you can layout your structure something like this. “while publishing message, induct a corelation id. This id could be universal unique id. This Id will be part of the message. Once subscriber process that message, it will generate a reaponse and it will add the same corelation id in the response that it received in the request. Once the response is generated it will send back response in another queue where publisher of the request is listening. Upon receiving the message in the response queue, it will filter its response based on the corelation id.
Note:- This message broker are usually used when immediate response is not required or even the response is not required, or the subscriber needs too much time to process the request.. But using the second method, its quite possible to get the response.
In case, if you do not want publisher to listen on the response queue, the publisher can use an endpoint to see the status of the subscriber, that could return response like these. ‘in-process’, ‘completed’, ‘Declined’ etc.
While adopting any approach, we carefully needs to consider pros and cons of each approach
What if we are required to be quick responses in publishers from subscribers?
Thanks for asking this question.
There are different ways to achieve the response back to publisher.
1- It could be point to point communication over the http. i.e Request response model. In this method, we however induct direct dependencies. This is needed in your posted scenario where we need immediate reaponse back to publisher. In this case it is always recommended to induce circuit breaker pattern.
2- The second method is quite interesting, i.e you want immediate response back to publisher and still you need asynchronous communication by having some message broker.
In this approach, you can layout your structure something like this. “while publishing message, induct a corelation id. This id could be universal unique id. This Id will be part of the message. Once subscriber process that message, it will generate a reaponse and it will add the same corelation id in the response that it received in the request. Once the response is generated it will send back response in another queue where publisher of the request is listening. Upon receiving the message in the response queue, it will filter its response based on the corelation id.
Note:- This message broker are usually used when immediate response is not required or even the response is not required, or the subscriber needs too much time to process the request.. But using the second method, its quite possible to get the response.
In case, if you do not want publisher to listen on the response queue, the publisher can use an endpoint to see the status of the subscriber, that could return response like these. ‘in-process’, ‘completed’, ‘Declined’ etc.
While adopting any approach, we carefully needs to consider pros and cons of each approach