Enabling events on the Steam Adapter

Use the Devprime platform to subscribe to events with queues, topics using the Stream Adapter to receive notifications in the event hub and then distribute either to standard Devprime events and/or non-standard events.

Introduction to Event Subscription in the Stream Adapter

The first step in enabling event subscription for a queue or topic is to configure the Subscribe settings in the Stream Adapter Configuration. When you start the microservice, it will automatically connect to the defined active queues. However, if an event arrives and there is no Subscribe setting, the event is rejected by the Stream Adapter.

Queued subscription notifications using the RabbitMQ protocol

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
"DevPrime_Stream": [
    {
      "Alias": "Stream1",
      "Enable": "true",
      "Default": "true",
      "StreamType": "RabbitMQ",
      "HostName": "Localhost",
      "User": "guest",
      "Password": "guest",
      "Port": "5672",
      "Exchange": "devprime",
      "ExchangeType": "direct",
      "Retry": "3",
      "Fallback": "State1",
      "Threads": "30",
      "Buffer": "1",
      "Subscribe": [
        {
          "Queues": "ordernotifications"
        }
      ]
    }
  ],

If you try to run the microservice and the queue or topic doesn’t exist, you’ll encounter an initial error in the log.

[ERR][Stream][Type "RabbitMQ"][Alias "Stream1"][Subscribe]["ordernotifications not found"][RID c57842d3-555e-4755-9b95-1457c9127160][TID c57842d3-555e-4755-9b95-1457c9127160]

When an unimplemented event is received by the Stream Adapter, it will be automatically rejected and the initial error will be logged.

[INF][Stream][Type "RabbitMQ"][Alias "Stream1"][In][Event]["OrderUpdated"]["Rejected"]["Event subscribe missing"]["{\"AppId\":\"fef824a5-0eea-4b49-b08e-0bad25dbd55d\",\"AppName\":\"MyApp\",\"Version\":1,\"Name\":\"OrderUpdated\",\"CreationDate\":\"2022-06-07T11:26:21.7636769-03:00\",\"Payload\":{\"OrderId\":\"9375cde2-1770-4186-8ffb-ee6e81aa5d91\",\"CustomerName\":\"Name\",\"CustomerTaxId\":\"string\",\"Total\":0}}"][RID e3d1f4b2-cf41-494d-a3df-58096e5bb0c9][TID e3d1f4b2-cf41-494d-a3df-58096e5bb0c9]

Enabling event reception on the Stream Adapter

The Stream configuration is done in the EventStream.cs file, located in the Stream Adapter project, and can be opened directly in Visual Studio Code to implement the routing rules for incoming events. The Adapter is compatible with all technologies supported by Devprime and allows you to receive events from multiple streams simultaneously.

Navigate to the project folder and open it in Visual Studio Code to find where to apply the settings.

1
code src/adapters/Stream/EventStream.cs
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
using Application.Interfaces.Services;
using Application.Services.Order.Model;

namespace DevPrime.Stream;

public class EventStream : EventStreamBase, IEventStream
{
    // Override the method to handle streaming events
    public override void StreamEvents()
    {
        // Mapping and receiving the "OrderUpdated" event from Stream1
        Subscribe<IOrderService, OrderCreatedEventDTO>("Stream1",
         "OrderUpdated", (dto, orderService, Dp) =>
        {
            // Log received data from the "OrderUpdated" event
            Dp.Observability.Log($"[Stream] Received data: Customer:
            {dto.CustomerName}; TaxId:{dto.CustomerTaxID};
            Total:{dto.Total}");    
        });
    }
}

In the log below, you can track the details of the data received.

[INF][Stream][Type "RabbitMQ"][Alias "Stream1"][In][Event]["OrderUpdated"]["ordernotifications"]["{\"Id\":\"43669773-229c-45b2-8e5c-99a248e0fe34\",\"CorrelationId\":\"43669773-229c-45b2-8e5c-99a248e0fe34\",\"TraceId\":\"43669773-229c-45b2-8e5c-99a248e0fe34\",\"AppId\":\"48d06d35-c7b1-4ed7-b393-1b249d32eb9b\",\"AppName\":\"order\",\"Version\":1,\"Name\":\"OrderUpdated\",\"CreationDate\":\"2022-06-07T11:26:21.7636769-03:00\",\"Payload\":{\"OrderId\":\"9375cde2-1770-4186-8ffb-ee6e81aa5d91\",\"CustomerName\":\"Joe Demo\",\"CustomerTaxID\":\"XYZ9909\",\"Total\":12899}}"][RID 43669773-229c-45b2-8e5c-99a248e0fe34][TID 43669773-229c-45b2-8e5c-99a248e0fe34]
[INF][Custom][[Stream] Received data: Customer:Joe Demo; TaxId:XYZ9909; Total:12899][RID 43669773-229c-45b2-8e5c-99a248e0fe34][TID 43669773-229c-45b2-8e5c-99a248e0fe34]

Receiving different events in the Stream Adapter

In this scenario, we will be dealing with “OrderUpdated” and “OrderCanceled”. Other even valid events will be dropped by the event hub on the Stream Adapter.

Navigate to the project folder and open it in Visual Studio Code to find where to apply the settings.

1
code src/adapters/Stream/EventStream.cs
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
public override void StreamEvents()
{
    // Mapping and receiving the "OrderUpdated" event from Stream1
    Subscribe<IOrderService, OrderCreatedEventDTO>("Stream1",
     "OrderUpdated", (dto, orderService, Dp) =>
    {
        // Log received data from the "OrderUpdated" event
        Dp.Observability.Log($"[Stream] Received data: Customer:
        {dto.CustomerName}; TaxId:{dto.CustomerTaxID};
         Total:{dto.Total}");    
    });

    // Mapping and receiving the "OrderCanceled" event from Stream1
    Subscribe<IOrderService, OrderCreatedEventDTO>("Stream1",
     "OrderCanceled", (dto, orderService, Dp) =>
    {
        // Log received data from the "OrderCanceled" event
        Dp.Observability.Log($"[Stream] Received data: Customer:
        {dto.CustomerName}; TaxId:{dto.CustomerTaxID}; Total:{dto.Total}");    
    });

}

Receiving events from legacy applications in the Stream Adapter

Devprime provides a standard protocol for asynchronous communication between microservices using event-driven architecture. However, to integrate with legacy applications, you may need to handle generic events. To do this, enable the functionality in the Stream Adapter event hub to intercept non-standard payloads.

Example payload for queued submission and use as a generic example:

1
2
3
4
5
6
{
  "IntegrationId": "9b3a4c56-d0d4-4a1a-b0b7-3c1e4d5a6f77",
  "CustomerId": "1a2b3c4d-5e6f-7g8h-9i0j-k1l2m3n4o5p6",
  "ComplianceApprovedId": "e7f8g9h0-i1j2-k3l4-m5n6-o7p8q9r0s1t2",
  "Balance": 12345.67
}

The default procedure will be auto-rejection, as the event will not be identified as a standard Devprime event. This will be the default flow:

[INF][Stream][Type "RabbitMQ"][Alias "Stream1"][In][Event]["Generic"]["Rejected"]["Subscribe missing"]["{\"IntegrationId\":\"9b3a4c56-d0d4-4a1a-b0b7-3c1e4d5a6f77\",\"CustomerId\":\"1a2b3c4d-5e6f-7g8h-9i0j-k1l2m3n4o5p6\",\"ComplianceApprovedId\":\"e7f8g9h0-i1j2-k3l4-m5n6-o7p8q9r0s1t2\",\"Balance\":12345.67} RabbitMQ event receive error [Exception of type 'System.Text.Json.JsonException' was thrown.]"]

Enabling Receiving Legacy Events on the Stream Adapter
Open the project in Visual Studio Code, apply the necessary settings, and add a generic processing flow using “Subscribe<IOrderService>((dto, orderService, Dp))” to accept events from legacy systems with protocols other than the Devprime standard. In this model, do not use a DTO OrderCreatedEventDTO for automatic conversion or identify the incoming event with filters such as “OrderUpdated” or “OrderCanceled”.

1
code src/adapters/Stream/EventStream.cs
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public override void StreamEvents()
{
    // Mapping and receiving the "OrderUpdated" event from Stream1
    Subscribe<IOrderService, OrderCreatedEventDTO>("Stream1",
     "OrderUpdated", (dto, orderService, Dp) =>
    {
        // Log received data from the "OrderUpdated" event
        Dp.Observability.Log($"[Stream] Received data: 
        Customer:{dto.CustomerName}; 
        TaxId:{dto.CustomerTaxID};
        Total:{dto.Total}");    
    });

    // Mapping and receiving the "OrderCanceled" event from Stream1
    Subscribe<IOrderService, OrderCreatedEventDTO>("Stream1",
     "OrderCanceled", (dto, orderService, Dp) =>
    {
        // Log received data from the "OrderCanceled" event
        Dp.Observability.Log($"[Stream] Received data:
         Customer:{dto.CustomerName}; 
         TaxId:{dto.CustomerTaxID}; 
         Total:{dto.Total}");    
    });

    // Receiving generic event from Legacy Systems
    Subscribe<IOrderService>((dto, orderService, Dp) =>
    { 
      // Log received data from the Generic event
        Dp.Observability.Log(dto); 
    });  

}

When you run the microservice with the new configuration that supports legacy application events, you can track the successful receipt in the log below, followed by the processing breakdown. In this model, you must follow the procedure of structuring the manual conversion of the received data, passing it through the model to the Application Services layer, which will direct the decision in the business rule in the Domain layer.

[INF][Stream][Type "RabbitMQ"][Alias "Stream1"][In][Event]["Generic"]["ordernotifications"]["{\"Id\":\"a67a03b3-19ad-4239-a394-2e8a152ac125\",\"CorrelationId\":\"a67a03b3-19ad-4239-a394-2e8a152ac125\",\"TraceId\":\"3a9a0185-c345-4aa7-a2c5-ee54880b9d72\",\"AppId\":\"f6e8b14e-94d2-4b6c-9665-1657ae628237\",\"AppName\":\"order\",\"Name\":\"Generic\",\"Payload\":\"{\"IntegrationId\":\"9b3a4c56-d0d4-4a1a-b0b7-3c1e4d5a6f77\",\"CustomerId\":\"1a2b3c4d-5e6f-7g8h-9i0j-k1l2m3n4o5p6\",\"ComplianceApprovedId\":\"e7f8g9h0-i1j2-k3l4-m5n6-o7p8q9r0s1t2\",\"Balance\":12345.67}\"}"]

[INF][Custom][{"IntegrationId":"9b3a4c56-d0d4-4a1a-b0b7-3c1e4d5a6f77","CustomerId":"1a2b3c4d-5e6f-7g8h-9i0j-k1l2m3n4o5p6","ComplianceApprovedId":"e7f8g9h0-i1j2-k3l4-m5n6-o7p8q9r0s1t2","Balance":12345.67}]

To learn more:

Last modified November 20, 2024 (61099f59)