RabbitMQ

A plataforma Devprime oferece suporte nativo ao RabbitMQ, um software de mensagens open source que opera com base nos protocolos Advanced Message Queuing Protocol (AMQP) e Streaming Text Oriented Messaging Protocol (MQTT).

O RabbitMQ pode ser utilizando on-premise, cloud, docker, kubernetes e plataformas gerenciadas como o CloudAMQP e outras.

Efetuando Subscribe em uma fila no RabbitMQ

O JSON abaixo demonstra a configuração padrão com as credenciais do RabbitMQ, a exchange utilizada, o Retry, o Fallback, as Threads e a opção ‘Subscribe" para receber eventos na fila ‘orderevents’ conforme a definição no item Queues.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
"DevPrime_Stream": [
    {
      "Alias": "Stream1",
      "Enable": "true",
      "Default": "true",
      "StreamType": "RabbitMQ",
      "HostName": "Localhost",
      "User": "guest",
      "Password": "guest",
      "Port": "5672",
      "Exchange": "devprime",
      "ExchangeType": "direct",
      "Retry": "3",
      "Fallback": "State1",
      "Threads": "30",
      "Buffer": "1",
      "Subscribe": [
        {
          "Queues": "orderevents"
        }
      ]
    }
  ],

No exemplo abaixo, estamos detalhando as configurações padrões do Adapter e as configurações específicas do RabbitMQ.

Geral
Enable Habilita o adapter de Stream (True/False)
Alias Define um nome único para identificar o serviço
Default Define como padrão para o centário de ter mais de um Stream (True/False)
StreamType Define o tipo de Stream (RabbitMQ/Kafka)
HostName Configura a url do RabbitMQ
User Configura o usuário para se conectar
Password Configura a senha do usuário
Port Configura a porta
Exchange Configura a Exchage padrão do RabbitMQ
ExchangeType Configura o tipo de Exchange
Retry Configura a quantidade de retentativas
Fallback Configura um State para persitência automática em caso de falha
Threads Define o número coneções simulteas processando eventos
Buffer Define a quantidade de eventos obitidos por conexão
Subscribe Configura filas do RabbitMQ para leitura

Exemplo no log da aplicação com essa configuração:

1
2
[INF][ms-order][Stream][Type "RabbitMQ"][Alias "Stream1"]
["Enable"][Subscribe]["orderevents"][Parallel "30"][Buffer "1"]

Exemplo YAML com as configurações da aplicação:
Para exportar as configurações como variável de ambiente e utilizar no Kubernetes execute o comando no Devprime CLI:
dp export kubernetes

1
2
3
4
5
- name: devprime_stream1
  value: "alias=Stream1|||enable=true|||default=true|||streamtype=RabbitMQ|||
  hostname=rabbitmq.default.svc|||user=guest|||password=guest|||port=5672|||
  exchange=devprime|||exchangetype=direct|||retry=3|||fallback=State1|||
  threads=30|||buffer=1|||subscribe=[queues=orderevents]"

Efetuando Subscribe em uma fila no RabbitMQ usando uma Exchange “Fanout”

No exemplo do JSON abaixo, foram adicionados os parâmetros “Exchange” e “ExchangeType”, definindo o tipo de Exchange como “fanout” e utilizando o item queues como “in-fanout-01”.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
 "DevPrime_Stream": [
    {
      "Alias": "Stream1",
      "Enable": "true",
      "Default": "true",
      "StreamType": "RabbitMQ",
      "HostName": "Localhost",
      "User": "guest",
      "Password": "guest",
      "Port": "5672",
      "Exchange": "devprime",
      "ExchangeType": "direct",
      "Retry": "3",
      "Fallback": "State1",
      "Threads": "30",
      "Buffer": "1",
      "Subscribe": [
        {
          "Exchange": "dp-fanout",
          "ExchangeType": "fanout",
          "queues": "in-fanout-01"
        }
      ]
    }
  ],

Exemplo YAML com as configurações da aplicação:

1
2
3
4
5
6
- name: devprime_stream1
    value: "alias=Stream1|||enable=true|||default=true|||
    streamtype=RabbitMQ|||hostname=rabbitmq.default.svc|||
    user=guest|||password=guest|||port=5672|||exchange=devprime|||
    exchangetype=direct|||retry=3|||fallback=State1|||threads=30|||buffer=5|||
    subscribe=[exchange=dp-fanout,exchangetype=fanout,queues=in-fanout-01]"

Configurando Publish / Subscribe usando uma Exchange “Fanout”

No exemplo apresentado no JSON abaixo, você vai observar que, além do parâmetro “Subscribe” na Exchange “dp-fanout”, nós adicionamos um parâmetro “Publish” para permitir a publicação de eventos na Exchange Fanout “dp-fanout-global”. No exemplo, também é possível observar o “Subscribe” na fila “ms-order-in”, vinculada à Exchange padrão “devprime”.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
 "DevPrime_Stream": [
    {
      "Alias": "Stream1",
      "Enable": "true",
      "Default": "true",
      "StreamType": "RabbitMQ",
      "HostName": "Localhost",
      "User": "guest",
      "Password": "guest",
      "Port": "5672",
      "Exchange": "devprime",
      "ExchangeType": "direct",
      "Retry": "3",
      "Fallback": "State1",
      "Threads": "30",
      "Buffer": "1",
      "Publish": [
        {
          "Exchange": "dp-fanout-global",
          "ExchangeType": "fanout"
        }
      ],
      "Subscribe": [
        {
          "Exchange": "dp-fanout",
          "ExchangeType": "fanout",
          "queues": "in-fanout-01"
        },
        {
          "queues": "ms-order-in"
        }
      ]
    }
  ],

Exemplo no log da aplicação com essa configuração:

1
2
3
[INF][ms-order][Stream][Type "RabbitMQ"][Alias "Stream1"]
["Enable"][Subscribe]["in-fanout-01,ms-order-in"]
[Parallel "30"][Buffer "1"]

Exemplo YAML com as configurações da aplicação:

1
2
3
4
5
6
7
8
- name: devprime_stream1
    value: "alias=Stream1|||enable=true|||default=true|||streamtype=RabbitMQ|||
    hostname=rabbitmq.default.svc|||user=guest|||password=guest|||
    port=5672|||exchange=devprime|||exchangetype=direct|||retry=3|||
    fallback=State1|||threads=30|||buffer=5|||
    publish=[exchange=dp-fanout-global,exchangetype=fanout]|||
    subscribe=[exchange=dp-fanout,exchangetype=fanout,queues=in-fanout-01]
    [queues=ms-order-in]"
Última modificação September 1, 2023 (ab965929)