Publicar uma mensagem em uma exchange ou fila no RabbitMQ

O método PublishRabbitMQ da plataforma Devprime é utilizado para enviar mensagens para uma exchange no RabbitMQ. Este método permite especificar o tipo de exchange, a chave de roteamento e as propriedades da mensagem, garantindo que as mensagens sejam roteadas corretamente para as filas associadas.

Parâmetros

  • alias (string, opcional): Parâmetro opcional com o nome do Stream Alias conforme definido na configuração do Stream Adapter. O valor padrão é “Stream1”.

  • exchangeName (string): Nome da exchange para a qual a mensagem será publicada.

  • exchangeType (ExchangeType?, opcional): Tipo da exchange. Pode ser:

    • Direct: Roteia mensagens para filas com uma chave de roteamento exata.
    • Fanout: Roteia mensagens para todas as filas ligadas à exchange.
    • Topic: Roteia mensagens para filas com base em padrões de chave de roteamento.
    • Headers: Roteia mensagens com base em cabeçalhos de mensagem.

    Se não for especificado, o padrão será usado conforme a configuração da exchange.

  • routingKey (string): Chave de roteamento usada para direcionar mensagens da exchange para as filas associadas.

  • eventMessage (BasicEvent): Mensagem do evento que será publicada na exchange.

  • customProperties (CustomMessageProperties, opcional): Propriedades personalizadas para a mensagem, como TTL (Time-to-Live), persistência e outras características específicas.

Retorno

  • void: Este método não retorna um valor. Ele executa a publicação da mensagem na exchange especificada.

Aqui está um exemplos de como usar o método PublishRabbitMQ na sua aplicação:

1. Publicação direta na fila sem utilizar a Exchange
Nesse cenário não é necessário informar uma exchange

1
2
3
4
5
6
7
var routingKey = "myQqueueName";
var eventName = "OrderCreated";
var eventData = new
    {
        Message = "Demonstration"
    };
Dp.Stream.PublishRabbitMQ(null, null, routingKey, eventName, eventData, null);

2. Publicação usando Exchange Direct sem CustomMessageProperties
Para configurar esse cenário, é necessário associar a Exchange “myExchangeName” do tipo Direct à fila “myQueueName”.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
var exchangeName = "myExchangeName";
var routingKey = "myQueueName";
var eventName = "OrderCreated";
var eventData = new
{
    Message = "Demonstration"
};

Dp.Stream.PublishRabbitMQ(exchangeName, ExchangeType.Direct,
    routingKey, eventName, eventData, null);

3. Publicação usando Exchange Direct usando TTL

Para configurar esse cenário, é necessário associar a Exchange “myExchangeName” do tipo Direct à fila “myQueueName”. Utilizamos CustomMessageProperties para definir os parâmetros de persistência e o tempo de expiração (TTL). O evento publicado terá um tempo determinado para expirar.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
var exchangeName = "myExchangeName";
var routingKey = "myQueueName";
var eventName = "OrderCreated";
var eventData = new
{
    Message = "Demonstration"
};

var basicProperties = new CustomMessageProperties();
basicProperties.Persistent = true;
basicProperties.Expiration = 50000000; // TTL in milliseconds

Dp.Stream.PublishRabbitMQ(exchangeName, ExchangeType.Direct,
         routingKey, eventName, eventData, basicProperties);

4. Publicação usando Exchange Fanout com duas filas como destino

Para configurar esse cenário, é necessário associar a Exchange “myExchangeName” do tipo Fanout às filas “myQueueName1” e “myQueueName2”. Neste caso, não é necessário especificar o nome da fila no Publish. A Exchange distribuirá o evento automaticamente para ambas as filas.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
var exchangeName = "myExchangeName";
var routingKey = ""; // This parameter is not used in Fanout
var eventName = "PaymentApproved";
var eventData = new
{
    Message = "Demonstration"
};

Dp.Stream.PublishRabbitMQ(exchangeName, ExchangeType.Fanout,
 routingKey, eventName, eventData, null);

5. Publicação usando Exchange Topic com duas filas como destino

Para configurar esse cenário, é necessário associar a Exchange “myExchangeName” do tipo Topic a duas filas:

  1. Bind para a fila “myQueueName1” com a Routing Key: "quick.ecommerce.#"
  2. Bind para a fila “myQueueName2” com a Routing Key: "#.logs"

Neste exemplo, faremos três publicações, variando a routingKey. A exchange irá distribuir as mensagens de forma que uma delas será entregue em ambas as filas, enquanto as outras serão entregues em apenas uma das filas.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
var exchangeName = "myExchangeName";
var routingKey1 = "quick.ecommerce.logs"; // Destination: myQueue1 and myQueue2
var routingKey2 = "quick.ecommerce.approved"; // Destination: myQueue1
var routingKey3 = "quick.financial.logs"; // Destination: myQueue2
var eventName = "EcommerceEvents";
var eventData = new
{
    Message = "Demonstration"
};

// Publishing with routingKey 1 for distribution to myQueue1 and myQueue2
Dp.Stream.PublishRabbitMQ(exchangeName, ExchangeType.Topic,
 routingKey1, eventName, eventData, null);

// Publishing with routingKey 2 for distribution to myQueue1
Dp.Stream.PublishRabbitMQ(exchangeName, ExchangeType.Topic,
 routingKey2, eventName, eventData, null);

// Publishing with routingKey 3 for distribution to myQueue2
Dp.Stream.PublishRabbitMQ(exchangeName, ExchangeType.Topic,
 routingKey3, eventName, eventData, null);

6. Publicação usando Exchange Headers com duas filas como destino
Para configurar esse cenário, é necessário associar a Exchange “myExchangeName” do tipo Headers a duas filas. O primeiro bind será para a fila “myQueueName1” utilizando os seguintes argumentos de chave/valor:

  • Arguments 1: { "format", "pdf" }
  • Arguments 2: { "type", "report" }
  • Arguments 3: { "x-match", "all" }

O segundo bind também será para a fila “myQueueName2”, com os seguintes argumentos:

  • Arguments 1: { "format", "zip" }
  • Arguments 2: { "x-match", "any" }

É importante observar que a Exchange do tipo Headers não utiliza a configuração de Routing Key durante o bind.

Neste exemplo, realizaremos três publicações, variando os parâmetros de Arguments. A exchange será responsável por distribuir as mensagens, e, ao final, um dos eventos será direcionado para “myQueueName1” e os outros dois para “myQueueName2”.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
var exchangeName = "myExchangeName";
var routingKey = ""; // This parameter is not used in the Headers Exchange
var eventName = "ComplianceApprove";
var eventData = new
{
    Message = "Demonstration"
};

// Publishing with Headers for distribution to myQueueName1
var properties1 = new CustomMessageProperties();
properties1.Arguments = new Dictionary<string, object> 
{ 
    { "format", "pdf" }, 
    { "type", "report" } 
};
Dp.Stream.PublishRabbitMQ(exchangeName, ExchangeType.Headers,
 routingKey, eventName, eventData, properties1);

// Publishing with Headers for distribution to myQueueName2
var properties2 = new CustomMessageProperties();
properties2.Arguments = new Dictionary<string, object> 
{ 
    { "format", "zip" } 
};
Dp.Stream.PublishRabbitMQ(exchangeName, ExchangeType.Headers,
 routingKey, eventName, eventData, properties2);

// Publishing with Headers for distribution to myQueueName2
var properties3 = new CustomMessageProperties();
properties3.Arguments = new Dictionary<string, object> 
{ 
    { "format", "zip" }, 
    { "type", "image" } 
};
Dp.Stream.PublishRabbitMQ(exchangeName, ExchangeType.Headers,
 routingKey, eventName, eventData, properties3);

Considerações

  • Certifique-se de que o exchangeName esteja corretamente configurado e exista no RabbitMQ antes de tentar publicar a mensagem.
  • Ajuste o exchangeType e as customProperties conforme necessário para sua estratégia de publicação e roteamento de mensagens.
  • As implementações nativas do RabbitMQ são específicas para essa plataforma e atendem a casos de uso particulares. Caso você migre para outra plataforma de streaming, será necessário ajustar seu código para usar os métodos padrão da Devprime ou os métodos específicos da nova plataforma.
Última modificação September 10, 2024 (6166aa4f)