Publishing a message to an exchange or queue in RabbitMQ

The Devprime platform’s ‘PublishRabbitMQ’ method is used to send messages to an exchange on RabbitMQ. This method allows you to specify the exchange type, routing key, and message properties, ensuring that messages are routed correctly to the associated queues.

Parameters

  • alias (string, optional): Optional parameter with the name of the Stream Alias as defined in the Stream Adapter configuration. The default value is “Stream1”.

  • exchangeName (string): Name of the exchange to which the message will be published.

  • exchangeType (ExchangeType?, optional): Type of the exchange. It can be:

    • Direct: Routes messages to queues with an exact routing key.
    • Fanout: Routes messages to all queues connected to the exchange.
    • Topic: Routes messages to queues based on routing key patterns.
    • Headers: Routes messages based on message headers.

If not specified, the default is used as per the exchange configuration.

  • routingKey (string): Routing key used to direct messages from the exchange to the associated queues.

  • eventMessage (BasicEvent): Event message that will be published on the exchange.

  • customProperties (CustomMessageProperties, optional): Custom properties for the message, such as Time-to-Live (TTL), persistence, and other specific characteristics.

Return

  • void: This method does not return a value. It performs the publication of the message to the specified exchange.

Here are examples of how to use the PublishRabbitMQ method in your application:

1. Direct Publish to Queue Without Using Exchange
In this scenario, it is not necessary to inform an exchange

1
2
3
4
5
6
7
var routingKey = "myQqueueName";
var eventName = "OrderCreated";
var eventData = new
    {
        Message = "Demonstration"
    };
Dp.Stream.PublishRabbitMQ(null, null, routingKey, eventName, eventData, null);

2. Publishing using Exchange Direct without CustomMessageProperties
To configure this scenario, you need to associate the Exchange “myExchangeName” of type Direct with the “myQueueName” queue.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
var exchangeName = "myExchangeName";
var routingKey = "myQueueName";
var eventName = "OrderCreated";
var eventData = new
{
    Message = "Demonstration"
};

Dp.Stream.PublishRabbitMQ(exchangeName, ExchangeType.Direct,
    routingKey, eventName, eventData, null);

3. Publishing using Exchange Direct using TTL

To configure this scenario, you need to associate the Exchange “myExchangeName” of type Direct with the “myQueueName” queue. We used CustomMessageProperties to set the persistence parameters and expiration time (TTL). The published event will have a set time to expire.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
var exchangeName = "myExchangeName";
var routingKey = "myQueueName";
var eventName = "OrderCreated";
var eventData = new
{
    Message = "Demonstration"
};

var basicProperties = new CustomMessageProperties();
basicProperties.Persistent = true;
basicProperties.Expiration = 50000000; // TTL in milliseconds

Dp.Stream.PublishRabbitMQ(exchangeName, ExchangeType.Direct,
         routingKey, eventName, eventData, basicProperties);

4. Publishing using Exchange Fanout with two queues as the target

To configure this scenario, you need to associate the Exchange “myExchangeName” of type Fanout with the queues “myQueueName1” and “myQueueName2”. In this case, you do not need to specify the queue name in Publish. Exchange will automatically distribute the event to both queues.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
var exchangeName = "myExchangeName";
var routingKey = ""; // This parameter is not used in Fanout
var eventName = "PaymentApproved";
var eventData = new
{
    Message = "Demonstration"
};

Dp.Stream.PublishRabbitMQ(exchangeName, ExchangeType.Fanout,
 routingKey, eventName, eventData, null);

5. Publishing using Exchange Topic with two queues as the target

To configure this scenario, you must associate the Exchange “myExchangeName” of type Topic with two queues:

  1. Bind to the queue “myQueueName1” with the Routing Key: "quick.ecommerce.#"
  2. Bind to the queue “myQueueName2” with the Routing Key: "#.logs"

In this example, we will make three posts, ranging from routingKey. The exchange will distribute the messages in such a way that one of them will be delivered to both queues, while the others will be delivered to only one of the queues.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
var exchangeName = "myExchangeName";
var routingKey1 = "quick.ecommerce.logs"; // Destination: myQueue1 and myQueue2
var routingKey2 = "quick.ecommerce.approved"; // Destination: myQueue1
var routingKey3 = "quick.financial.logs"; // Destination: myQueue2
var eventName = "EcommerceEvents";
var eventData = new
{
    Message = "Demonstration"
};

// Publishing with routingKey 1 for distribution to myQueue1 and myQueue2
Dp.Stream.PublishRabbitMQ(exchangeName, ExchangeType.Topic,
 routingKey1, eventName, eventData, null);

// Publishing with routingKey 2 for distribution to myQueue1
Dp.Stream.PublishRabbitMQ(exchangeName, ExchangeType.Topic,
 routingKey2, eventName, eventData, null);

// Publishing with routingKey 3 for distribution to myQueue2
Dp.Stream.PublishRabbitMQ(exchangeName, ExchangeType.Topic,
 routingKey3, eventName, eventData, null);

6. Publishing using Exchange Headers with two queues as the target
To configure this scenario, you need to associate the Exchange “myExchangeName” of type Headers with two queues. The first bind will be to the queue “myQueueName1” using the following key/value arguments:

  • Arguments 1: { "format", "pdf" }
  • Arguments 2: { "type", "report" }
  • Arguments 3: { "x-match", "all" }

The second bind will also be for the queue “myQueueName2”, with the following arguments:

  • Arguments 1: { "format", "zip" }
  • Arguments 2: { "x-match", "any" }

It is important to note that Exchange type Headers does not use the Routing Key setting during bind.

In this example, we will perform three publications, varying the parameters of Arguments. The exchange will be responsible for distributing the messages, and at the end, one of the events will be directed to “myQueueName1” and the other two to “myQueueName2”.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
var exchangeName = "myExchangeName";
var routingKey = ""; // This parameter is not used in the Headers Exchange
var eventName = "ComplianceApprove";
var eventData = new
{
    Message = "Demonstration"
};

// Publishing with Headers for distribution to myQueueName1
var properties1 = new CustomMessageProperties();
properties1.Arguments = new Dictionary<string, object> 
{ 
    { "format", "pdf" }, 
    { "type", "report" } 
};
Dp.Stream.PublishRabbitMQ(exchangeName, ExchangeType.Headers,
 routingKey, eventName, eventData, properties1);

// Publishing with Headers for distribution to myQueueName2
var properties2 = new CustomMessageProperties();
properties2.Arguments = new Dictionary<string, object> 
{ 
    { "format", "zip" } 
};
Dp.Stream.PublishRabbitMQ(exchangeName, ExchangeType.Headers,
 routingKey, eventName, eventData, properties2);

// Publishing with Headers for distribution to myQueueName2
var properties3 = new CustomMessageProperties();
properties3.Arguments = new Dictionary<string, object> 
{ 
    { "format", "zip" }, 
    { "type", "image" } 
};
Dp.Stream.PublishRabbitMQ(exchangeName, ExchangeType.Headers,
 routingKey, eventName, eventData, properties3);

Considerations

  • Make sure that exchangeName is correctly configured and exists in RabbitMQ before attempting to publish the message.
  • Adjust exchangeType and customProperties as needed for your message publishing and routing strategy.
  • Native implementations of RabbitMQ are specific to this platform and cater to particular use cases. If you migrate to another streaming platform, you’ll need to adjust your code to use Devprime’s standard methods or the new platform’s specific methods.
Last modified November 20, 2024 (61099f59)