Pipeline

Devprime Pipeline is an execution flow orchestrator available in applications based on the Devprime platform. It isolates each process within a context and automatically provides exception handling, observability (log, trace, metrics), and transactional control.

Introduction to Devprime Pipeline Dp.Pipeline()

The Devprime Pipeline technology is used in several steps of the application flow, as demonstrated in the image below, from receiving events in the Driving Adapters, the Application layer, the Domain and Handlers, and then the Driven Adapters. It’s important to note that due to the parallel execution of processes, the Devprime Pipeline flow may be at different stages at different times.

Devprime Pipeline

Devprime Pipeline integrated into the application service
The Devprime Pipeline integration is represented in the code below via the Dp.Pipeline() block. The moment the application receives an HTTP POST through the Web Adapter, it is forwarded to the application service, which populates the Aggregate Root order. It is then attached to the Devprime Pipeline using the Dp.Attach() command, and the flow is directed to the Aggregate Root’s Add method Order.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// Appplication Services
public void Add(Model.Order command)
    {
        Dp.Pipeline(Execute: () =>
        {
            var order = command.ToDomain();
            Dp.Attach(order);
            order.Add();
        });
    }

Devprime Pipeline integrated with Domain/Aggregate Root
Devprime Pipeline offers amazing capabilities in the domain while preserving the isolation of business rules. This allows business events to be executed externally, as demonstrated in the example below where Dp.Pipeline() is initialized and the “Items” business object is attached to the Devprime Pipeline using the Dp.Attach() method. Next, the initialization parameters of the new Aggregate Root are configured, and a domain event named CreateOrder() is fired with the Dp.ProcessEvent<bool>(new CreateOrder()) method.

This event will be processed in an Event Handler external to the domain, which can contain an implementation to write to the database using the State Adapter. It then returns a result, which in this context is automatically typed as a “bool”.

Depending on the result, a new domain event OrderCreated() can be triggered using the Dp.ProcessEvent(new OrderCreated()) method, which will be handled by a handler external to the domain to communicate with the Stream Adapter and emit an event in RabbitMQ/Kafka and others"

Business code within the Aggregate Root

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// Business code inside Aggregate Root
public virtual void Add()
    {
        Dp.Pipeline(Execute: () =>
        {
            Dp.Attach(Items);
            ValidFields();
            ID = Guid.NewGuid();
            IsNew = true;
            var success = Dp.ProcessEvent<bool>(new CreateOrder());
            if (success)
            {
                Dp.ProcessEvent(new OrderCreated());
            }
        });
    }

In the example below, we present a summary of the class CreateOrderEventHandler(), where, in line 4, inside the Handler, you can retrieve the Aggregate Root originator of this domain event and then forward the persistence stream to the State Adapter.

A valuable point in this approach is that, for the developer, the State Adapter is the only place that relates to the database.

1
2
3
4
5
6
// Event Handler CreateOrderEventHandler()
    public override dynamic Handle(CreateOrder createOrder)
    {
        var order = createOrder.Get<Domain.Aggregates.Order.Order>();
        return Dp.State.Order.Add(order);
    }

In the example below we present a summary of the class OrderCreatedEventHandler() where, in line 5, inside the Handler, it is possible to retrieve the Aggregate Root that originates this domain event. Next, you fill in an envelope called OrderCreatedEventDTO() with the data that will be sent in the event by Stream. On line 12, the Stream Adapter is activated to perform the send.

A valuable point in this approach is that, for the developer, the Stream technology used for communication is transparent.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// Event Handler OrderCreatedEventHandler()
    public override dynamic Handle(OrderCreated orderCreated)
    {
        var success = false;
        var order = orderCreated.Get<Domain.Aggregates.Order.Order>();
        var destination = Dp.Settings.Default("stream.orderevents");
        var eventName = "OrderCreated";
        var eventData = new OrderCreatedEventDTO()
        {ID = order.ID, CustomerName = order.CustomerName,
        CustomerTaxID = order.CustomerTaxID,
        Total = order.Total};
        Dp.Stream.Send(destination, eventName, eventData);
        success = true;
        return success;
    }

Devprime Pipeline integrated into the repository
Devprime Pipeline is also present in the use within the repository in the State Adapter, which performs direct interaction with the database, as demonstrated in this context, using the MongoDB classes or other technologies natively supported in the Devprime Platform or through Extensions.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// State Adapter
public bool Add(Domain.Aggregates.Order.Order order)
    {
        var result = Dp.Pipeline(ExecuteResult: (stateContext) =>
        {
            var state = new ConnectionMongo(stateContext, Dp);
            var _order = ToState(order);
            state.Order.InsertOne(_order);
            return true;
        });
        if (result is null)
            return false;
        return result;
    }

Exploring Automatic Logs
An important feature in any modern application is observability through Logs, Trace, and Metrics. Below, we can see a summary of all the processes orchestrated by Devprime Pipeline and its integration with the Observability Adapter, which takes care of the display process.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# Receiving a POST
[INF][Web]["HTTP"][Order][POST /v1/order]
[Origin "https://localhost:5001/swagger/index.html"]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]

# Processing in Application Services
[INF][Application][OrderService][Add]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]

# Processing in Aggregate root
[INF][Domain][Order][Add]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]

# Emitting Domain Event
[INF][Domain][Order][ProcessEvent]["CreateOrder"]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]

# Processing in Handler
[INF][Application][EventHandler]
["CreateOrderEventHandler"][Event]["CreateOrder"]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]

# # Processing in Adapter State
[INF][State][Type "MongoDB"][Alias "State1"]
[Initialize][OrderRepository][Add]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]

[INF][State][Type "MongoDB"][Alias "State1"]
[Complete][OrderRepository][Add][Duration 36.8282ms]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]

# Emitting Domain Event (OrderCreated)
[INF][Domain][Order][ProcessEvent]["OrderCreated"]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]|

# Processing in Handler (OrderCreated)
[INF][Application][EventHandler]
["OrderCreatedEventHandler"][Event]["OrderCreated"]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]

# Processing in Adapter Stream
[INF][Stream][Type "RabbitMQ"][Alias "Stream1"]
[Out][Event]["OrderCreated"]["Delivered"]["orderevents"]
["{\"Id\":\"08c38a8f-8df6-41fb-9b58-2b5c4f3da14d\",
\"CorrelationId\":\"2227febe-74b7-4e7a-83c9-ba63bf6b6448\",
\"TraceId\":\"2227febe-74b7-4e7a-83c9-ba63bf6b6448\",
\"AppId\":\"49d66a15-943b-4847-bdd0-258e3fe23927\",
\"AppName\":\"ms-order\",\"TenantID\":\"\",
\"TenantUserID\":\"\",\"Version\":1,
\"Name\":\"OrderCreated\",\"CreationDate\":
\"2023-09-05T16:37:25.0761355-03:00\",\"Payload\":
{\"ID\":\"8fc530f0-8fc6-4c89-987b-33fc1ea9861c\",
\"CustomerName\":\"Ramon\",\"CustomerTaxID\":
\"string\",\"Total\":0}}"]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]

Explore other related technologies

Last modified April 16, 2024 (2b35fcc8)