Pipeline
O Devprime Pipeline é um orquestrador do fluxo de execução disponível em aplicações baseadas na plataforma Devprime. Ele isola cada processo dentro de um contexto e oferece automaticamente tratamento de exceções, observabilidade (log, trace, metrics) e controle transacional.
Introdução ao Devprime Pipeline Dp.Pipeline()
A tecnologia do Devprime Pipeline é usada em várias etapas do fluxo da aplicação, conforme demonstrado na imagem abaixo desde a recepção de eventos nos Driving Adapters, na camada de Application, no Domain e nos Handlers e depois nos Driven Adapters. É importante observar que, devido à execução paralela de processos, o fluxo do Devprime Pipeline pode estar em estágios diferentes em momentos distintos.
Devprime Pipeline integrado ao serviço de aplicação
A integração do Devprime Pipeline está representada no código abaixo por meio do bloco Dp.Pipeline()
. No momento que a aplicação recebe um POST HTTP é pelo Adapter Web, ele é encaminhado para o serviço de aplicação, que preenche o Aggregate Root order
. Em seguida, ele é anexado ao Devprime Pipeline usando o comando Dp.Attach()
, e o fluxo é direcionado para o método Add do Aggregate Root Order
.
1
2
3
4
5
6
7
8
9
10
|
// Appplication Services
public void Add(Model.Order command)
{
Dp.Pipeline(Execute: () =>
{
var order = command.ToDomain();
Dp.Attach(order);
order.Add();
});
}
|
Devprime Pipeline integrado ao Domain / Aggregate Root
O Devprime Pipeline oferece recursos incríveis no domínio, preservando o isolamento das regras de negócio. Isso permite que eventos de negócios sejam executados externamente, conforme demonstrado no exemplo abaixo onde o Dp.Pipeline()
é inicializado e o objeto de negócio “Items” é anexado ao Devprime Pipeline usando o método Dp.Attach()
. Em seguida, os parâmetros de inicialização do novo Aggregate Root são configurados e um evento de domínio chamado CreateOrder()
é disparado com o método Dp.ProcessEvent<bool>(new CreateOrder())
.
Este evento será processado em um Event Handler externo ao domínio, que pode contém uma implementação para gravar no banco de dados usando o Adapter de State. Em seguida, ele retorna um resultado, que neste contexto é automaticamente tipado como um “bool”.
Conforme o resultado, um novo evento de domínio OrderCreated()
pode ser disparado usando o método Dp.ProcessEvent(new OrderCreated())
, que será tratado por um handler externo ao domínio para comunicação com o Adapter de Stream e emissão de um evento no RabbitMQ/Kafka e outros"
Código de negócio dentro do Aggregate Root
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
// Business code inside Aggregate Root
public virtual void Add()
{
Dp.Pipeline(Execute: () =>
{
Dp.Attach(Items);
ValidFields();
ID = Guid.NewGuid();
IsNew = true;
var success = Dp.ProcessEvent<bool>(new CreateOrder());
if (success)
{
Dp.ProcessEvent(new OrderCreated());
}
});
}
|
No exemplo abaixo, apresentamos um resumo da classe CreateOrderEventHandler()
, onde, na linha 4, dentro do Handler, é possível recuperar o Aggregate Root originador desse evento de domínio e, em seguida, encaminhar o fluxo de persistência para o Adapter de State.
Um ponto valioso nessa abordagem é que, para a pessoa desenvolvedora, o Adapter de State é o único local que se relaciona com o banco de dados.
1
2
3
4
5
6
|
// Event Handler CreateOrderEventHandler()
public override dynamic Handle(CreateOrder createOrder)
{
var order = createOrder.Get<Domain.Aggregates.Order.Order>();
return Dp.State.Order.Add(order);
}
|
No exemplo abaixo apresentamos um resumo da classe OrderCreatedEventHandler()
onde, na linha 5, dentro do Handler, é possível recuperar o Aggregate Root originador desse evento de domínio. Em seguida, preenche-se um envelope chamado OrderCreatedEventDTO()
com os dados que serão enviados no evento pelo Stream. Na linha 12, aciona-se o Adapter de Stream para realizar o envio.
Um ponto valioso nessa abordagem é que, para a pessoa desenvolvedora, a tecnologia de Stream utilizada para a comunicação é transparente.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
// Event Handler OrderCreatedEventHandler()
public override dynamic Handle(OrderCreated orderCreated)
{
var success = false;
var order = orderCreated.Get<Domain.Aggregates.Order.Order>();
var destination = Dp.Settings.Default("stream.orderevents");
var eventName = "OrderCreated";
var eventData = new OrderCreatedEventDTO()
{ID = order.ID, CustomerName = order.CustomerName,
CustomerTaxID = order.CustomerTaxID,
Total = order.Total};
Dp.Stream.Send(destination, eventName, eventData);
success = true;
return success;
}
|
Devprime Pipeline integrado no repositório
Devprime Pipeline também está presente na utilização dentro do repositório no Adapter de State, que executa a interação direta com o banco de dados, conforme demonstrado neste contexto, utilizando as classes do MongoDB ou outras tecnologias suportadas nativamente na Plataforma Devprime ou por meio de Extensões.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
// State Adapter
public bool Add(Domain.Aggregates.Order.Order order)
{
var result = Dp.Pipeline(ExecuteResult: (stateContext) =>
{
var state = new ConnectionMongo(stateContext, Dp);
var _order = ToState(order);
state.Order.InsertOne(_order);
return true;
});
if (result is null)
return false;
return result;
}
|
Explorando os Logs automáticos
Uma característica importante em qualquer aplicação moderna é a observabilidade por meio de Logs, Trace e Metrics. Abaixo, podemos acompanhar um resumo de todos os processos orquestrados pelo Devprime Pipeline e a sua integração com o Adapter de Observability, que cuida do processo de exibição.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
# Receiving a POST
[INF][Web]["HTTP"][Order][POST /v1/order]
[Origin "https://localhost:5001/swagger/index.html"]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
# Processing in Application Services
[INF][Application][OrderService][Add]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
# Processing in Aggregate root
[INF][Domain][Order][Add]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
# Emitting Domain Event
[INF][Domain][Order][ProcessEvent]["CreateOrder"]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
# Processing in Handler
[INF][Application][EventHandler]
["CreateOrderEventHandler"][Event]["CreateOrder"]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
# # Processing in Adapter State
[INF][State][Type "MongoDB"][Alias "State1"]
[Initialize][OrderRepository][Add]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[INF][State][Type "MongoDB"][Alias "State1"]
[Complete][OrderRepository][Add][Duration 36.8282ms]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
# Emitting Domain Event (OrderCreated)
[INF][Domain][Order][ProcessEvent]["OrderCreated"]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]|
# Processing in Handler (OrderCreated)
[INF][Application][EventHandler]
["OrderCreatedEventHandler"][Event]["OrderCreated"]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
# Processing in Adapter Stream
[INF][Stream][Type "RabbitMQ"][Alias "Stream1"]
[Out][Event]["OrderCreated"]["Delivered"]["orderevents"]
["{\"Id\":\"08c38a8f-8df6-41fb-9b58-2b5c4f3da14d\",
\"CorrelationId\":\"2227febe-74b7-4e7a-83c9-ba63bf6b6448\",
\"TraceId\":\"2227febe-74b7-4e7a-83c9-ba63bf6b6448\",
\"AppId\":\"49d66a15-943b-4847-bdd0-258e3fe23927\",
\"AppName\":\"ms-order\",\"TenantID\":\"\",
\"TenantUserID\":\"\",\"Version\":1,
\"Name\":\"OrderCreated\",\"CreationDate\":
\"2023-09-05T16:37:25.0761355-03:00\",\"Payload\":
{\"ID\":\"8fc530f0-8fc6-4c89-987b-33fc1ea9861c\",
\"CustomerName\":\"Ramon\",\"CustomerTaxID\":
\"string\",\"Total\":0}}"]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
|
Explore outras tecnologias relacionadas
Última modificação October 26, 2023 (795e8af6)