Pipeline

O Devprime Pipeline é um orquestrador do fluxo de execução disponível em aplicações baseadas na plataforma Devprime. Ele isola cada processo dentro de um contexto e oferece automaticamente tratamento de exceções, observabilidade (log, trace, metrics) e controle transacional.

Introdução ao Devprime Pipeline Dp.Pipeline()

A tecnologia do Devprime Pipeline é usada em várias etapas do fluxo da aplicação, conforme demonstrado na imagem abaixo desde a recepção de eventos nos Driving Adapters, na camada de Application, no Domain e nos Handlers e depois nos Driven Adapters. É importante observar que, devido à execução paralela de processos, o fluxo do Devprime Pipeline pode estar em estágios diferentes em momentos distintos.

Devprime Pipeline

Devprime Pipeline integrado ao serviço de aplicação
A integração do Devprime Pipeline está representada no código abaixo por meio do bloco Dp.Pipeline(). No momento que a aplicação recebe um POST HTTP é pelo Adapter Web, ele é encaminhado para o serviço de aplicação, que preenche o Aggregate Root order. Em seguida, ele é anexado ao Devprime Pipeline usando o comando Dp.Attach(), e o fluxo é direcionado para o método Add do Aggregate Root Order.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// Appplication Services
public void Add(Model.Order command)
    {
        Dp.Pipeline(Execute: () =>
        {
            var order = command.ToDomain();
            Dp.Attach(order);
            order.Add();
        });
    }

Devprime Pipeline integrado ao Domain / Aggregate Root
O Devprime Pipeline oferece recursos incríveis no domínio, preservando o isolamento das regras de negócio. Isso permite que eventos de negócios sejam executados externamente, conforme demonstrado no exemplo abaixo onde o Dp.Pipeline() é inicializado e o objeto de negócio “Items” é anexado ao Devprime Pipeline usando o método Dp.Attach(). Em seguida, os parâmetros de inicialização do novo Aggregate Root são configurados e um evento de domínio chamado CreateOrder() é disparado com o método Dp.ProcessEvent<bool>(new CreateOrder()).

Este evento será processado em um Event Handler externo ao domínio, que pode contém uma implementação para gravar no banco de dados usando o Adapter de State. Em seguida, ele retorna um resultado, que neste contexto é automaticamente tipado como um “bool”.

Conforme o resultado, um novo evento de domínio OrderCreated() pode ser disparado usando o método Dp.ProcessEvent(new OrderCreated()), que será tratado por um handler externo ao domínio para comunicação com o Adapter de Stream e emissão de um evento no RabbitMQ/Kafka e outros"

Código de negócio dentro do Aggregate Root

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// Business code inside Aggregate Root
public virtual void Add()
    {
        Dp.Pipeline(Execute: () =>
        {
            Dp.Attach(Items);
            ValidFields();
            ID = Guid.NewGuid();
            IsNew = true;
            var success = Dp.ProcessEvent<bool>(new CreateOrder());
            if (success)
            {
                Dp.ProcessEvent(new OrderCreated());
            }
        });
    }

No exemplo abaixo, apresentamos um resumo da classe CreateOrderEventHandler(), onde, na linha 4, dentro do Handler, é possível recuperar o Aggregate Root originador desse evento de domínio e, em seguida, encaminhar o fluxo de persistência para o Adapter de State.

Um ponto valioso nessa abordagem é que, para a pessoa desenvolvedora, o Adapter de State é o único local que se relaciona com o banco de dados.

1
2
3
4
5
6
// Event Handler CreateOrderEventHandler()
    public override dynamic Handle(CreateOrder createOrder)
    {
        var order = createOrder.Get<Domain.Aggregates.Order.Order>();
        return Dp.State.Order.Add(order);
    }

No exemplo abaixo apresentamos um resumo da classe OrderCreatedEventHandler() onde, na linha 5, dentro do Handler, é possível recuperar o Aggregate Root originador desse evento de domínio. Em seguida, preenche-se um envelope chamado OrderCreatedEventDTO() com os dados que serão enviados no evento pelo Stream. Na linha 12, aciona-se o Adapter de Stream para realizar o envio.

Um ponto valioso nessa abordagem é que, para a pessoa desenvolvedora, a tecnologia de Stream utilizada para a comunicação é transparente.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// Event Handler OrderCreatedEventHandler()
    public override dynamic Handle(OrderCreated orderCreated)
    {
        var success = false;
        var order = orderCreated.Get<Domain.Aggregates.Order.Order>();
        var destination = Dp.Settings.Default("stream.orderevents");
        var eventName = "OrderCreated";
        var eventData = new OrderCreatedEventDTO()
        {ID = order.ID, CustomerName = order.CustomerName,
        CustomerTaxID = order.CustomerTaxID,
        Total = order.Total};
        Dp.Stream.Send(destination, eventName, eventData);
        success = true;
        return success;
    }

Devprime Pipeline integrado no repositório
Devprime Pipeline também está presente na utilização dentro do repositório no Adapter de State, que executa a interação direta com o banco de dados, conforme demonstrado neste contexto, utilizando as classes do MongoDB ou outras tecnologias suportadas nativamente na Plataforma Devprime ou por meio de Extensões.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// State Adapter
public bool Add(Domain.Aggregates.Order.Order order)
    {
        var result = Dp.Pipeline(ExecuteResult: (stateContext) =>
        {
            var state = new ConnectionMongo(stateContext, Dp);
            var _order = ToState(order);
            state.Order.InsertOne(_order);
            return true;
        });
        if (result is null)
            return false;
        return result;
    }

Explorando os Logs automáticos
Uma característica importante em qualquer aplicação moderna é a observabilidade por meio de Logs, Trace e Metrics. Abaixo, podemos acompanhar um resumo de todos os processos orquestrados pelo Devprime Pipeline e a sua integração com o Adapter de Observability, que cuida do processo de exibição.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# Receiving a POST
[INF][Web]["HTTP"][Order][POST /v1/order]
[Origin "https://localhost:5001/swagger/index.html"]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]

# Processing in Application Services
[INF][Application][OrderService][Add]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]

# Processing in Aggregate root
[INF][Domain][Order][Add]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]

# Emitting Domain Event
[INF][Domain][Order][ProcessEvent]["CreateOrder"]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]

# Processing in Handler
[INF][Application][EventHandler]
["CreateOrderEventHandler"][Event]["CreateOrder"]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]

# # Processing in Adapter State
[INF][State][Type "MongoDB"][Alias "State1"]
[Initialize][OrderRepository][Add]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]

[INF][State][Type "MongoDB"][Alias "State1"]
[Complete][OrderRepository][Add][Duration 36.8282ms]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]

# Emitting Domain Event (OrderCreated)
[INF][Domain][Order][ProcessEvent]["OrderCreated"]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]|

# Processing in Handler (OrderCreated)
[INF][Application][EventHandler]
["OrderCreatedEventHandler"][Event]["OrderCreated"]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]

# Processing in Adapter Stream
[INF][Stream][Type "RabbitMQ"][Alias "Stream1"]
[Out][Event]["OrderCreated"]["Delivered"]["orderevents"]
["{\"Id\":\"08c38a8f-8df6-41fb-9b58-2b5c4f3da14d\",
\"CorrelationId\":\"2227febe-74b7-4e7a-83c9-ba63bf6b6448\",
\"TraceId\":\"2227febe-74b7-4e7a-83c9-ba63bf6b6448\",
\"AppId\":\"49d66a15-943b-4847-bdd0-258e3fe23927\",
\"AppName\":\"ms-order\",\"TenantID\":\"\",
\"TenantUserID\":\"\",\"Version\":1,
\"Name\":\"OrderCreated\",\"CreationDate\":
\"2023-09-05T16:37:25.0761355-03:00\",\"Payload\":
{\"ID\":\"8fc530f0-8fc6-4c89-987b-33fc1ea9861c\",
\"CustomerName\":\"Ramon\",\"CustomerTaxID\":
\"string\",\"Total\":0}}"]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]

Explore outras tecnologias relacionadas

Última modificação October 26, 2023 (795e8af6)