RabbitMQ
The Devprime platform natively supports RabbitMQ, an open source messaging software that operates on the Advanced Message Queuing Protocol (AMQP) and Streaming Text Oriented Messaging Protocol (MQTT) protocols.
RabbitMQ can be used on premise, cloud, docker, kubernetes and managed platforms such as CloudAMQP and others.
Subscribe to a queue in RabbitMQ
The JSON below demonstrates the default configuration with the RabbitMQ credentials, the exchange used, the Retry, the Fallback, the Threads, and the ‘Subscribe’ option to receive events in the ‘orderevents’ queue as defined in the Queues item.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
"DevPrime_Stream": [
{
"Alias": "Stream1",
"Enable": "true",
"Default": "true",
"StreamType": "RabbitMQ",
"HostName": "Localhost",
"User": "guest",
"Password": "guest",
"Port": "5672",
"Exchange": "devprime",
"ExchangeType": "direct",
"Retry": "3",
"Fallback": "State1",
"Threads": "30",
"Buffer": "1",
"Subscribe": [
{
"Queues": "orderevents"
}
]
}
],
|
In the example below, we’re detailing the default Adapter settings and the RabbitMQ-specific settings.
General |
— |
Enable |
Enable the Stream adapter (True/False) |
Alias |
Defines a unique name to identify the service |
Default |
Sets as default to the centary of having more than one Stream (True/False) |
StreamType |
Sets the Stream type (RabbitMQ/Kafka) |
HostName |
Configure the url of RabbitMQ |
User |
Configures the user to sign in |
Password |
Configures the user’s password |
Port |
Configure the port |
Exchange |
Sets up the default RabbitMQ |
ExchangeType |
Configures the type of Exchange |
Retry |
Configure the number of retries |
Fallback |
Configures a State for automatic resitence in case of failure |
Threads |
Sets the number of concurrent connections processing events |
Buffer |
Defines the number of events obtained per connection |
Subscribe |
Configures RabbitMQ Queues for Read |
Example in the application log with this configuration:
1
2
|
[INF][ms-order][Stream][Type "RabbitMQ"][Alias "Stream1"]
["Enable"][Subscribe]["orderevents"][Parallel "30"][Buffer "1"]
|
YAML example with application settings:
To export the configurations as an environment variable and use in Kubernetes, run the command in the Devprime CLI:
dp export kubernetes
1
2
3
4
5
|
- name: devprime_stream1
value: "alias=Stream1|||enable=true|||default=true|||streamtype=RabbitMQ|||
hostname=rabbitmq.default.svc|||user=guest|||password=guest|||port=5672|||
exchange=devprime|||exchangetype=direct|||retry=3|||fallback=State1|||
threads=30|||buffer=1|||subscribe=[queues=orderevents]"
|
Signing a queue in RabbitMQ using a “Fanout” Exchange
In the JSON example below, the “Exchange” and “ExchangeType” parameters have been added, setting the Exchange type to “fanout” and using the item queues as “in-fanout-01”.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
"DevPrime_Stream": [
{
"Alias": "Stream1",
"Enable": "true",
"Default": "true",
"StreamType": "RabbitMQ",
"HostName": "Localhost",
"User": "guest",
"Password": "guest",
"Port": "5672",
"Exchange": "devprime",
"ExchangeType": "direct",
"Retry": "3",
"Fallback": "State1",
"Threads": "30",
"Buffer": "1",
"Subscribe": [
{
"Exchange": "dp-fanout",
"ExchangeType": "fanout",
"queues": "in-fanout-01"
}
]
}
],
|
YAML example with application settings:
1
2
3
4
5
6
|
- name: devprime_stream1
value: "alias=Stream1|||enable=true|||default=true|||
streamtype=RabbitMQ|||hostname=rabbitmq.default.svc|||
user=guest|||password=guest|||port=5672|||exchange=devprime|||
exchangetype=direct|||retry=3|||fallback=State1|||threads=30|||buffer=5|||
subscribe=[exchange=dp-fanout,exchangetype=fanout,queues=in-fanout-01]"
|
Setting Up Publish/Subscribe using an Exchange “Fanout”
In the example presented in the JSON below, you’ll notice that in addition to the “Subscribe” parameter in the “dp-fanout” Exchange, we’ve added a “Publish” parameter to allow the publication of events in the “dp-fanout-global” Exchange Fanout. In the example, you can also see the “Subscribe” in the “ms-order-in” queue, which is linked to the default Exchange “devprime”.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
"DevPrime_Stream": [
{
"Alias": "Stream1",
"Enable": "true",
"Default": "true",
"StreamType": "RabbitMQ",
"HostName": "Localhost",
"User": "guest",
"Password": "guest",
"Port": "5672",
"Exchange": "devprime",
"ExchangeType": "direct",
"Retry": "3",
"Fallback": "State1",
"Threads": "30",
"Buffer": "1",
"Publish": [
{
"Exchange": "dp-fanout-global",
"ExchangeType": "fanout"
}
],
"Subscribe": [
{
"Exchange": "dp-fanout",
"ExchangeType": "fanout",
"queues": "in-fanout-01"
},
{
"queues": "ms-order-in"
}
]
}
],
|
Example in the application log with this configuration:
1
2
3
|
[INF][ms-order][Stream][Type "RabbitMQ"][Alias "Stream1"]
["Enable"][Subscribe]["in-fanout-01,ms-order-in"]
[Parallel "30"][Buffer "1"]
|
YAML example with application settings:
1
2
3
4
5
6
7
8
|
- name: devprime_stream1
value: "alias=Stream1|||enable=true|||default=true|||streamtype=RabbitMQ|||
hostname=rabbitmq.default.svc|||user=guest|||password=guest|||
port=5672|||exchange=devprime|||exchangetype=direct|||retry=3|||
fallback=State1|||threads=30|||buffer=5|||
publish=[exchange=dp-fanout-global,exchangetype=fanout]|||
subscribe=[exchange=dp-fanout,exchangetype=fanout,queues=in-fanout-01]
[queues=ms-order-in]"
|
Last modified January 10, 2024 (967dcac3)