You are currently viewing Understanding Microservices – Receiving messages from Queue

Understanding Microservices – Receiving messages from Queue

In my first post, I shared my understanding of how two services can communicate with each other asynchronously without having a direct dependency on each other. In that hands-on tutorial, I created an express node application having a single endpoint, which is used to put messages into the queue. That hands-on tutorial can be found here

Microservices Communication - Receiving Messages

This hands-on is the second part of microservices communication where I will demonstrate how we can write code to receive a message from the queue. The demo application, which I will be creating is in C# and it will be performing the following tasks.

  • It will be a worker application
  • It will subscribe to the Receiving Events provided by a RabbitMQ.Client (i.e. a Nuget package that is used to communicate with the RabbitMQ)
  • It will be receiving messages from the queue

 

Create worker Application

The very first step is to create the project. For this purpose, I will create a new application of “worker” type. To create this type of application in Visual Studio Code, you can issue the following command:-

dotnet new worker

Add the required Package i.e. RabbitMQ.Client

In order to communicate with RabbitMQ, I need to add a Nuget package which is named “RabbitMQ.Client”. To add this package, use the following command:-

dotnet add package RabbitMQ.Client

Add the required namespaces

Since I will be using RabbitMQ.Client in my application, so I need to add these namespaces to my application. So I have added the following namespaces in my Worker.cs file

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

At this point, you can build your project to see if it’s building successfully. After that, open up your “worker.cs” file again and move toward a method named “ExecuteAsync“. This method is called when our worker application starts. 

Utilize RabbitMQ.Client

In order to communicate with the RabbitMQ server, our steps are almost identical which I performed in my previous post, except with the difference that this time, I am doing code in C# instead if Node.js. Just to refresh, the following steps will be performed:-

  • Establish a Connection with RabbitMQ 
  • Create Communication Channel
  • Create the targeted Queue
  • Create Consumer
  • Register a call back method against the Receiving event
  • Consumer Message

Establish a connection

The code to establish a connection with RabbitMQ using the “RabbitMQ.Client” package is given below:-

// Create connection with the server
var factory = new ConnectionFactory(){HostName = "localhost"};
var connection = factory.CreateConnection();

In the above code, I create the object of type “ConnectionFactory”. After Initialization, I called the CreateConnection method of the ConnectionFactory object to create the actual connection with the RabbitMQ.

Create Communication Channel

Once the connection is established, the next step is to create the communication channel. To create a communication channel, I will be calling a method “CreateModel” that is provided by the Connection. The code to create the communication channel is given below:-

// Create communication channel
IModel channel = connection.CreateModel();

Create the Queue

Once the channel is created, the next step is to create the queue, from where we will be getting messages. Since to communicate with RabbitMQ, we need the communication channel, so to create the queue, I will use the channel, which I have just created, and its code is given below:-

// Create the Queue
string queue_name = "demo-queue";
channel.QueueDeclare(queue: queue_name, durable: true, exclusive: false,
autoDelete: false, arguments: null);

To create the queue, I called the QueueDeclare method provided by the communication channel. This method took some parameters which are “name of the queue, a boolean variable for durable,  exclusive and for auto deletion and in the last arguments”

Create the Consumer

Now at this point, I have everything which I require for the creation of the consumer. So it is a good time now, to create the consumer. To create the consumer, I will be utilizing an object EventingBasicConsumer. The code is given below:-

// Create Consumer
var _consumer = new EventingBasicConsumer(channel);

Subscribe to the "Received" events

I need to subscribe to the “Received” events, once the consumer is created. We need to point to the method which will be invoked always when a message is received. The code to subscribe to the Received event is given below:-

_consumer.Received += (p1, p2)=>{
    var body = p2.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine("Message received to consumer is: " + message);
};

The anonymous method I wrote in the above code, will be executed as soon as some message is received from the queue. In this method, we can do whatever we want with the received message i.e. we can store it in our database, we can send it to someone as an email, etc.

However, to keep things, simple, I have just printed it on the console screen.

Consumer Message(s)

Now, I have everything in place, such as what to do when a message is received. But in order to receive a message(s) from the RabbitMQ, I need to call a method named “BasicConsume“. This method is provided by the communication channel. The code is given below:- 

channel.BasicConsume(queue: queue_name, autoAck: true, _consumer=_consumer);

Output

Now our code is completed. If we run this application, we should be able to see the received message in the terminal as shown below:-

Message is received and processed by the Message Consumer

In the above image, I have posted a message on my RabbitMQ, by hitting an endpoint. As soon as the message is received in the queue, it was picked up by the consumer application and the message is printed on the terminal.

Complete Code

The entire code of “Worker.cs” file is given below:-

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace MessageConsumer;

public class Worker : BackgroundService
{
    private readonly ILogger<Worker> _logger;

    public Worker(ILogger<Worker> logger)
    {
        _logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {

        // Establish connection
        var factory = new ConnectionFactory(){HostName= "localhost"};
        var connection = factory.CreateConnection();

        // To Create the communication channel.
        IModel channel =  connection.CreateModel();

        string queue_name = "demo-queue";

        channel.QueueDeclare(queue: queue_name, durable: true, exclusive: false, 
        autoDelete: false,arguments: null); 

        var _consumer = new EventingBasicConsumer(channel);

        _consumer.Received += (p1, p2)=>{
            var body = p2.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);

            Console.WriteLine("Message Received: " + message);
        };


        channel.BasicConsume(queue: queue_name, autoAck: true, _consumer = _consumer);
    }
}

Source Code

The entire code of this worker application is uploaded on the GitHub. You can download it by clicking the following:- Download Source Code from githib

Video

During the demo of this post, I recorded my screen and shared the compete video in the following. It is the first time, that I prepared the video, so please ignore the presentation and video quality in case it is not up to the mark.

This Post Has 2 Comments

  1. Ali Zia

    What if we are required to be quick responses in publishers from subscribers?

    1. Shahid Riaz

      Thanks for asking this question.
      There are different ways to achieve the response back to publisher.

      1- It could be point to point communication over the http. i.e Request response model. In this method, we however induct direct dependencies. This is needed in your posted scenario where we need immediate reaponse back to publisher. In this case it is always recommended to induce circuit breaker pattern.

      2- The second method is quite interesting, i.e you want immediate response back to publisher and still you need asynchronous communication by having some message broker.
      In this approach, you can layout your structure something like this. “while publishing message, induct a corelation id. This id could be universal unique id. This Id will be part of the message. Once subscriber process that message, it will generate a reaponse and it will add the same corelation id in the response that it received in the request. Once the response is generated it will send back response in another queue where publisher of the request is listening. Upon receiving the message in the response queue, it will filter its response based on the corelation id.

      Note:- This message broker are usually used when immediate response is not required or even the response is not required, or the subscriber needs too much time to process the request.. But using the second method, its quite possible to get the response.
      In case, if you do not want publisher to listen on the response queue, the publisher can use an endpoint to see the status of the subscriber, that could return response like these. ‘in-process’, ‘completed’, ‘Declined’ etc.
      While adopting any approach, we carefully needs to consider pros and cons of each approach

Leave a Reply