RabbitMQ

The Devprime platform natively supports RabbitMQ, an open source messaging software that operates on the Advanced Message Queuing Protocol (AMQP) and Streaming Text Oriented Messaging Protocol (MQTT) protocols.

RabbitMQ can be used on premise, cloud, docker, kubernetes and managed platforms such as CloudAMQP and others.

Subscribe to a queue in RabbitMQ

The JSON below demonstrates the default configuration with the RabbitMQ credentials, the exchange used, the Retry, the Fallback, the Threads, and the ‘Subscribe’ option to receive events in the ‘orderevents’ queue as defined in the Queues item.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
"DevPrime_Stream": [
    {
      "Alias": "Stream1",
      "Enable": "true",
      "Default": "true",
      "StreamType": "RabbitMQ",
      "HostName": "Localhost",
      "User": "guest",
      "Password": "guest",
      "Port": "5672",
      "Exchange": "devprime",
      "ExchangeType": "direct",
      "Retry": "3",
      "Fallback": "State1",
      "Threads": "30",
      "Buffer": "1",
      "Subscribe": [
        {
          "Queues": "orderevents"
        }
      ]
    }
  ],

In the example below, we’re detailing the default Adapter settings and the RabbitMQ-specific settings.

General
Enable Enable the Stream adapter (True/False)
Alias Defines a unique name to identify the service
Default Sets as default to the centary of having more than one Stream (True/False)
StreamType Sets the Stream type (RabbitMQ/Kafka)
HostName Configure the url of RabbitMQ
User Configures the user to sign in
Password Configures the user’s password
Port Configure the port
Exchange Sets up the default RabbitMQ
ExchangeType Configures the type of Exchange
Retry Configure the number of retries
Fallback Configures a State for automatic resitence in case of failure
Threads Sets the number of concurrent connections processing events
Buffer Defines the number of events obtained per connection
Subscribe Configures RabbitMQ Queues for Read

Example in the application log with this configuration:

1
2
[INF][ms-order][Stream][Type "RabbitMQ"][Alias "Stream1"]
["Enable"][Subscribe]["orderevents"][Parallel "30"][Buffer "1"]

YAML example with application settings:
To export the configurations as an environment variable and use in Kubernetes, run the command in the Devprime CLI:
dp export kubernetes

1
2
3
4
5
- name: devprime_stream1
  value: "alias=Stream1|||enable=true|||default=true|||streamtype=RabbitMQ|||
  hostname=rabbitmq.default.svc|||user=guest|||password=guest|||port=5672|||
  exchange=devprime|||exchangetype=direct|||retry=3|||fallback=State1|||
  threads=30|||buffer=1|||subscribe=[queues=orderevents]"

Signing a queue in RabbitMQ using a “Fanout” Exchange

In the JSON example below, the “Exchange” and “ExchangeType” parameters have been added, setting the Exchange type to “fanout” and using the item queues as “in-fanout-01”.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
 "DevPrime_Stream": [
    {
      "Alias": "Stream1",
      "Enable": "true",
      "Default": "true",
      "StreamType": "RabbitMQ",
      "HostName": "Localhost",
      "User": "guest",
      "Password": "guest",
      "Port": "5672",
      "Exchange": "devprime",
      "ExchangeType": "direct",
      "Retry": "3",
      "Fallback": "State1",
      "Threads": "30",
      "Buffer": "1",
      "Subscribe": [
        {
          "Exchange": "dp-fanout",
          "ExchangeType": "fanout",
          "queues": "in-fanout-01"
        }
      ]
    }
  ],

YAML example with application settings:

1
2
3
4
5
6
- name: devprime_stream1
    value: "alias=Stream1|||enable=true|||default=true|||
    streamtype=RabbitMQ|||hostname=rabbitmq.default.svc|||
    user=guest|||password=guest|||port=5672|||exchange=devprime|||
    exchangetype=direct|||retry=3|||fallback=State1|||threads=30|||buffer=5|||
    subscribe=[exchange=dp-fanout,exchangetype=fanout,queues=in-fanout-01]"

Setting Up Publish/Subscribe using an Exchange “Fanout”

In the example presented in the JSON below, you’ll notice that in addition to the “Subscribe” parameter in the “dp-fanout” Exchange, we’ve added a “Publish” parameter to allow the publication of events in the “dp-fanout-global” Exchange Fanout. In the example, you can also see the “Subscribe” in the “ms-order-in” queue, which is linked to the default Exchange “devprime”.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
 "DevPrime_Stream": [
    {
      "Alias": "Stream1",
      "Enable": "true",
      "Default": "true",
      "StreamType": "RabbitMQ",
      "HostName": "Localhost",
      "User": "guest",
      "Password": "guest",
      "Port": "5672",
      "Exchange": "devprime",
      "ExchangeType": "direct",
      "Retry": "3",
      "Fallback": "State1",
      "Threads": "30",
      "Buffer": "1",
      "Publish": [
        {
          "Exchange": "dp-fanout-global",
          "ExchangeType": "fanout"
        }
      ],
      "Subscribe": [
        {
          "Exchange": "dp-fanout",
          "ExchangeType": "fanout",
          "queues": "in-fanout-01"
        },
        {
          "queues": "ms-order-in"
        }
      ]
    }
  ],

Example in the application log with this configuration:

1
2
3
[INF][ms-order][Stream][Type "RabbitMQ"][Alias "Stream1"]
["Enable"][Subscribe]["in-fanout-01,ms-order-in"]
[Parallel "30"][Buffer "1"]

YAML example with application settings:

1
2
3
4
5
6
7
8
- name: devprime_stream1
    value: "alias=Stream1|||enable=true|||default=true|||streamtype=RabbitMQ|||
    hostname=rabbitmq.default.svc|||user=guest|||password=guest|||
    port=5672|||exchange=devprime|||exchangetype=direct|||retry=3|||
    fallback=State1|||threads=30|||buffer=5|||
    publish=[exchange=dp-fanout-global,exchangetype=fanout]|||
    subscribe=[exchange=dp-fanout,exchangetype=fanout,queues=in-fanout-01]
    [queues=ms-order-in]"
Last modified January 10, 2024 (967dcac3)