Habilitando eventos no Steam Adapter

Utilize a plataforma Devprime para realizar subscrição em eventos com filas, tópicos utilizando o Stream Adapter para receber as notificações no hub de eventos e depois a distribuição seja para eventos padrão da Devprime e/ou eventos não padrão.

Introdução a Subscrição de Eventos no Stream Adapter

O primeiro passo para habilitar a subscrição de eventos de uma fila ou tópico é configurar as definições de Subscribe na configuração do Stream Adapter. Ao iniciar o microsserviço, ele se conectará automaticamente às filas ativas definidas. No entanto, se um evento chegar e não houver uma configuração Subscribe, o evento será rejeitado pelo Stream Adapter.

Subscrição na fila notifications usando o protocolo RabbitMQ

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
"DevPrime_Stream": [
    {
      "Alias": "Stream1",
      "Enable": "true",
      "Default": "true",
      "StreamType": "RabbitMQ",
      "HostName": "Localhost",
      "User": "guest",
      "Password": "guest",
      "Port": "5672",
      "Exchange": "devprime",
      "ExchangeType": "direct",
      "Retry": "3",
      "Fallback": "State1",
      "Threads": "30",
      "Buffer": "1",
      "Subscribe": [
        {
          "Queues": "ordernotifications"
        }
      ]
    }
  ],

Se você tentar executar o microsserviço e a fila ou tópico não existir, você encontrará um erro inicial no log.

[ERR][Stream][Type "RabbitMQ"][Alias "Stream1"][Subscribe]["ordernotifications not found"][RID c57842d3-555e-4755-9b95-1457c9127160][TID c57842d3-555e-4755-9b95-1457c9127160]

Quando um evento não implementado é recebido pelo Stream Adapter, ele será automaticamente rejeitado e o erro inicial será registrado.

[INF][Stream][Type "RabbitMQ"][Alias "Stream1"][In][Event]["OrderUpdated"]["Rejected"]["Event subscribe missing"]["{\"AppId\":\"fef824a5-0eea-4b49-b08e-0bad25dbd55d\",\"AppName\":\"MyApp\",\"Version\":1,\"Name\":\"OrderUpdated\",\"CreationDate\":\"2022-06-07T11:26:21.7636769-03:00\",\"Payload\":{\"OrderId\":\"9375cde2-1770-4186-8ffb-ee6e81aa5d91\",\"CustomerName\":\"Name\",\"CustomerTaxId\":\"string\",\"Total\":0}}"][RID e3d1f4b2-cf41-494d-a3df-58096e5bb0c9][TID e3d1f4b2-cf41-494d-a3df-58096e5bb0c9]

Habilitando a recepção de eventos no Stream Adapter

A configuração do Stream é feita no arquivo EventStream.cs, localizado no projeto Stream Adapter, e pode ser aberta diretamente no Visual Studio Code para implementar as regras de roteamento para eventos recebidos. O Adapter é compatível com todas as tecnologias suportadas pelo Devprime e permite receber eventos de múltiplos streams simultaneamente.

Navegue até a pasta do projeto e abra-a no Visual Studio Code para localizar onde aplicar as configurações.

1
code src/adapters/Stream/EventStream.cs
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
using Application.Interfaces.Services;
using Application.Services.Order.Model;

namespace DevPrime.Stream;

public class EventStream : EventStreamBase, IEventStream
{
    // Override the method to handle streaming events
    public override void StreamEvents()
    {
        // Mapping and receiving the "OrderUpdated" event from Stream1
        Subscribe<IOrderService, OrderCreatedEventDTO>("Stream1",
         "OrderUpdated", (dto, orderService, Dp) =>
        {
            // Log received data from the "OrderUpdated" event
            Dp.Observability.Log($"[Stream] Received data: Customer:
            {dto.CustomerName}; TaxId:{dto.CustomerTaxID};
            Total:{dto.Total}");    
        });
    }
}

No log abaixo, você pode rastrear os detalhes dos dados recebidos.

[INF][Stream][Type "RabbitMQ"][Alias "Stream1"][In][Event]["OrderUpdated"]["ordernotifications"]["{\"Id\":\"43669773-229c-45b2-8e5c-99a248e0fe34\",\"CorrelationId\":\"43669773-229c-45b2-8e5c-99a248e0fe34\",\"TraceId\":\"43669773-229c-45b2-8e5c-99a248e0fe34\",\"AppId\":\"48d06d35-c7b1-4ed7-b393-1b249d32eb9b\",\"AppName\":\"order\",\"Version\":1,\"Name\":\"OrderUpdated\",\"CreationDate\":\"2022-06-07T11:26:21.7636769-03:00\",\"Payload\":{\"OrderId\":\"9375cde2-1770-4186-8ffb-ee6e81aa5d91\",\"CustomerName\":\"Joe Demo\",\"CustomerTaxID\":\"XYZ9909\",\"Total\":12899}}"][RID 43669773-229c-45b2-8e5c-99a248e0fe34][TID 43669773-229c-45b2-8e5c-99a248e0fe34]
[INF][Custom][[Stream] Received data: Customer:Joe Demo; TaxId:XYZ9909; Total:12899][RID 43669773-229c-45b2-8e5c-99a248e0fe34][TID 43669773-229c-45b2-8e5c-99a248e0fe34]

Recebendo eventos diferentes no Stream Adapter

Nesse cenário estaremos tratando o “OrderUpdated” e o “OrderCanceled”. Outros eventos mesmo válidos serão descartados pelo hub de eventos no Stream Adapter.

Navegue até a pasta do projeto e abra-a no Visual Studio Code para localizar onde aplicar as configurações.

1
code src/adapters/Stream/EventStream.cs
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
public override void StreamEvents()
{
    // Mapping and receiving the "OrderUpdated" event from Stream1
    Subscribe<IOrderService, OrderCreatedEventDTO>("Stream1",
     "OrderUpdated", (dto, orderService, Dp) =>
    {
        // Log received data from the "OrderUpdated" event
        Dp.Observability.Log($"[Stream] Received data: Customer:
        {dto.CustomerName}; TaxId:{dto.CustomerTaxID};
         Total:{dto.Total}");    
    });

    // Mapping and receiving the "OrderCanceled" event from Stream1
    Subscribe<IOrderService, OrderCreatedEventDTO>("Stream1",
     "OrderCanceled", (dto, orderService, Dp) =>
    {
        // Log received data from the "OrderCanceled" event
        Dp.Observability.Log($"[Stream] Received data: Customer:
        {dto.CustomerName}; TaxId:{dto.CustomerTaxID}; Total:{dto.Total}");    
    });

}

Recebendo eventos de aplicações legadas no Stream Adapter

A Devprime oferece um protocolo padrão para comunicação assíncrona entre microsserviços usando arquitetura orientada a eventos. No entanto, para integrar com aplicações legadas, pode ser necessário processar eventos genéricos. Para isso, habilite a funcionalidade no hub de eventos do Stream Adapter para interceptar payloads fora do padrão.

Exemplo de payload para envio na fila e uso como exemplo genérico:

1
2
3
4
5
6
{
  "IntegrationId": "9b3a4c56-d0d4-4a1a-b0b7-3c1e4d5a6f77",
  "CustomerId": "1a2b3c4d-5e6f-7g8h-9i0j-k1l2m3n4o5p6",
  "ComplianceApprovedId": "e7f8g9h0-i1j2-k3l4-m5n6-o7p8q9r0s1t2",
  "Balance": 12345.67
}

O procedimento padrão será a rejeição automática, pois o evento não será identificado como um evento padrão da Devprime. Esse será o fluxo padrão:

[INF][Stream][Type "RabbitMQ"][Alias "Stream1"][In][Event]["Generic"]["Rejected"]["Subscribe missing"]["{\"IntegrationId\":\"9b3a4c56-d0d4-4a1a-b0b7-3c1e4d5a6f77\",\"CustomerId\":\"1a2b3c4d-5e6f-7g8h-9i0j-k1l2m3n4o5p6\",\"ComplianceApprovedId\":\"e7f8g9h0-i1j2-k3l4-m5n6-o7p8q9r0s1t2\",\"Balance\":12345.67} RabbitMQ event receive error [Exception of type 'System.Text.Json.JsonException' was thrown.]"]

Habilitando o recebimento de eventos legados no Stream Adapter
Abra o projeto no Visual Studio Code, aplique as configurações necessárias e adicione um fluxo de processamento genérico usando “Subscribe<IOrderService>((dto, orderService, Dp))” para aceitar eventos de sistemas legados com protocolos diferentes do padrão Devprime. Nesse modelo, não use um DTO OrderCreatedEventDTO para conversão automática nem identifique o evento recebido com filtros como “OrderUpdated” ou “OrderCanceled”.

1
code src/adapters/Stream/EventStream.cs
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public override void StreamEvents()
{
    // Mapping and receiving the "OrderUpdated" event from Stream1
    Subscribe<IOrderService, OrderCreatedEventDTO>("Stream1",
     "OrderUpdated", (dto, orderService, Dp) =>
    {
        // Log received data from the "OrderUpdated" event
        Dp.Observability.Log($"[Stream] Received data: 
        Customer:{dto.CustomerName}; 
        TaxId:{dto.CustomerTaxID};
        Total:{dto.Total}");    
    });

    // Mapping and receiving the "OrderCanceled" event from Stream1
    Subscribe<IOrderService, OrderCreatedEventDTO>("Stream1",
     "OrderCanceled", (dto, orderService, Dp) =>
    {
        // Log received data from the "OrderCanceled" event
        Dp.Observability.Log($"[Stream] Received data:
         Customer:{dto.CustomerName}; 
         TaxId:{dto.CustomerTaxID}; 
         Total:{dto.Total}");    
    });

    // Receiving generic event from Legacy Systems
    Subscribe<IOrderService>((dto, orderService, Dp) =>
    { 
      // Log received data from the Generic event
        Dp.Observability.Log(dto); 
    });  

}

Ao executar o microsserviço com a nova configuração que suporta eventos de aplicações legadas, você poderá acompanhar o recebimento bem-sucedido no log abaixo, seguido pelo detalhamento do processamento. Nesse modelo, você deve seguir o procedimento de estruturar a conversão manual dos dados recebidos, repassá-los pela model para a camda de Application Services, que direcionará a decisão na regra de negócio na camada de Domain.

[INF][Stream][Type "RabbitMQ"][Alias "Stream1"][In][Event]["Generic"]["ordernotifications"]["{\"Id\":\"a67a03b3-19ad-4239-a394-2e8a152ac125\",\"CorrelationId\":\"a67a03b3-19ad-4239-a394-2e8a152ac125\",\"TraceId\":\"3a9a0185-c345-4aa7-a2c5-ee54880b9d72\",\"AppId\":\"f6e8b14e-94d2-4b6c-9665-1657ae628237\",\"AppName\":\"order\",\"Name\":\"Generic\",\"Payload\":\"{\"IntegrationId\":\"9b3a4c56-d0d4-4a1a-b0b7-3c1e4d5a6f77\",\"CustomerId\":\"1a2b3c4d-5e6f-7g8h-9i0j-k1l2m3n4o5p6\",\"ComplianceApprovedId\":\"e7f8g9h0-i1j2-k3l4-m5n6-o7p8q9r0s1t2\",\"Balance\":12345.67}\"}"]

[INF][Custom][{"IntegrationId":"9b3a4c56-d0d4-4a1a-b0b7-3c1e4d5a6f77","CustomerId":"1a2b3c4d-5e6f-7g8h-9i0j-k1l2m3n4o5p6","ComplianceApprovedId":"e7f8g9h0-i1j2-k3l4-m5n6-o7p8q9r0s1t2","Balance":12345.67}]

Para saber mais:

Última modificação November 5, 2024 (1c11d03d)