Tubería

Devprime Pipeline es un orquestador de flujo de ejecución disponible en aplicaciones basadas en la plataforma Devprime. Aísla cada proceso dentro de un contexto y proporciona automáticamente control de excepciones, observabilidad (registro, seguimiento, métricas) y control transaccional.

Introducción a Devprime Pipeline Dp.Pipeline()

La tecnología Devprime Pipeline se utiliza en varios pasos del flujo de la aplicación, como se muestra en la imagen siguiente, desde la recepción de eventos en los adaptadores de control, la capa de aplicación, el dominio y los controladores y, a continuación, los adaptadores controlados. Es importante tener en cuenta que, debido a la ejecución paralela de los procesos, el flujo de Devprime Pipeline puede estar en diferentes etapas en diferentes momentos.

Canalización Devprime

Devprime Pipeline integrado en el servicio de aplicaciones
La integración de Devprime Pipeline se representa en el siguiente código a través del bloque Dp.Pipeline(). En el momento en que la aplicación recibe un HTTP POST a través del adaptador web, se reenvía al servicio de aplicaciones, que rellena la raíz agregada order. A continuación, se adjunta a la canalización de Devprime mediante el comando Dp.Attach() y el flujo se dirige al método Add de la raíz agregada Order.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
// Appplication Services
public void Add(Model.Order command)
    {
        Dp.Pipeline(Execute: () =>
        {
            var order = command.ToDomain();
            Dp.Attach(order);
            order.Add();
        });
    }

Devprime Pipeline integrado con Domain/Aggregate Root
Devprime Pipeline ofrece capacidades asombrosas en el dominio al tiempo que preserva el aislamiento de las reglas de negocio. Esto permite que los eventos de negocio se ejecuten externamente, como se demuestra en el ejemplo siguiente, donde Dp.Pipeline() se inicializa y el objeto de negocio “Items” se adjunta a la canalización de Devprime mediante el método Dp.Attach(). A continuación, se configuran los parámetros de inicialización de la nueva raíz agregada y se activa un evento de dominio denominado CreateOrder() con el método Dp.ProcessEvent<bool>(new CreateOrder()).

Este evento se procesará en un controlador de eventos externo al dominio, que puede contener una implementación para escribir en la base de datos mediante el adaptador de estado. A continuación, devuelve un resultado, que en este contexto se escribe automáticamente como “bool”.

Dependiendo del resultado, se puede desencadenar un nuevo evento de dominio OrderCreated() utilizando el método Dp.ProcessEvent(new OrderCreated()), que será manejado por un controlador externo al dominio para comunicarse con el adaptador de flujo y emitir un evento en RabbitMQ/Kafka y otros"

Código de negocio dentro de la raíz agregada

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// Business code inside Aggregate Root
public virtual void Add()
    {
        Dp.Pipeline(Execute: () =>
        {
            Dp.Attach(Items);
            ValidFields();
            ID = Guid.NewGuid();
            IsNew = true;
            var success = Dp.ProcessEvent<bool>(new CreateOrder());
            if (success)
            {
                Dp.ProcessEvent(new OrderCreated());
            }
        });
    }

En el ejemplo siguiente, presentamos un resumen de la clase CreateOrderEventHandler(), donde, en la línea 4, dentro del controlador, puede recuperar el originador raíz agregada de este evento de dominio y, a continuación, reenviar la secuencia de persistencia al adaptador de estado.

Un punto valioso de este enfoque es que, para el desarrollador, el adaptador de estado es el único lugar que se relaciona con la base de datos.

1
2
3
4
5
6
// Event Handler CreateOrderEventHandler()
    public override dynamic Handle(CreateOrder createOrder)
    {
        var order = createOrder.Get<Domain.Aggregates.Order.Order>();
        return Dp.State.Order.Add(order);
    }

En el siguiente ejemplo presentamos un resumen de la clase OrderCreatedEventHandler() donde, en la línea 5, dentro del Handler, es posible recuperar el Aggregate Root que origina este evento de dominio. A continuación, rellena un sobre llamado OrderCreatedEventDTO() con los datos que se enviarán en el evento por Stream. En la línea 12, el adaptador de flujo se activa para realizar el envío.

Un punto valioso de este enfoque es que, para el desarrollador, la tecnología Stream utilizada para la comunicación es transparente.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// Event Handler OrderCreatedEventHandler()
    public override dynamic Handle(OrderCreated orderCreated)
    {
        var success = false;
        var order = orderCreated.Get<Domain.Aggregates.Order.Order>();
        var destination = Dp.Settings.Default("stream.orderevents");
        var eventName = "OrderCreated";
        var eventData = new OrderCreatedEventDTO()
        {ID = order.ID, CustomerName = order.CustomerName,
        CustomerTaxID = order.CustomerTaxID,
        Total = order.Total};
        Dp.Stream.Send(destination, eventName, eventData);
        success = true;
        return success;
    }

Devprime Pipeline integrado en el repositorio
Devprime Pipeline también está presente en el uso dentro del repositorio en el Adaptador de Estado, que realiza una interacción directa con la base de datos, como se demuestra en este contexto, utilizando las clases MongoDB u otras tecnologías soportadas de forma nativa en la Plataforma Devprime o a través de Extensiones.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// State Adapter
public bool Add(Domain.Aggregates.Order.Order order)
    {
        var result = Dp.Pipeline(ExecuteResult: (stateContext) =>
        {
            var state = new ConnectionMongo(stateContext, Dp);
            var _order = ToState(order);
            state.Order.InsertOne(_order);
            return true;
        });
        if (result is null)
            return false;
        return result;
    }

Explorando registros automáticos
Una característica importante en cualquier aplicación moderna es la observabilidad a través de registros, rastreo y métricas. A continuación, podemos ver un resumen de todos los procesos orquestados por Devprime Pipeline y su integración con el Observability Adapter, que se encarga del proceso de visualización.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# Receiving a POST
[INF][Web]["HTTP"][Order][POST /v1/order]
[Origin "https://localhost:5001/swagger/index.html"]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]

# Processing in Application Services
[INF][Application][OrderService][Add]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]

# Processing in Aggregate root
[INF][Domain][Order][Add]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]

# Emitting Domain Event
[INF][Domain][Order][ProcessEvent]["CreateOrder"]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]

# Processing in Handler
[INF][Application][EventHandler]
["CreateOrderEventHandler"][Event]["CreateOrder"]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]

# # Processing in Adapter State
[INF][State][Type "MongoDB"][Alias "State1"]
[Initialize][OrderRepository][Add]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]

[INF][State][Type "MongoDB"][Alias "State1"]
[Complete][OrderRepository][Add][Duration 36.8282ms]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]

# Emitting Domain Event (OrderCreated)
[INF][Domain][Order][ProcessEvent]["OrderCreated"]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]|

# Processing in Handler (OrderCreated)
[INF][Application][EventHandler]
["OrderCreatedEventHandler"][Event]["OrderCreated"]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]

# Processing in Adapter Stream
[INF][Stream][Type "RabbitMQ"][Alias "Stream1"]
[Out][Event]["OrderCreated"]["Delivered"]["orderevents"]
["{\"Id\":\"08c38a8f-8df6-41fb-9b58-2b5c4f3da14d\",
\"CorrelationId\":\"2227febe-74b7-4e7a-83c9-ba63bf6b6448\",
\"TraceId\":\"2227febe-74b7-4e7a-83c9-ba63bf6b6448\",
\"AppId\":\"49d66a15-943b-4847-bdd0-258e3fe23927\",
\"AppName\":\"ms-order\",\"TenantID\":\"\",
\"TenantUserID\":\"\",\"Version\":1,
\"Name\":\"OrderCreated\",\"CreationDate\":
\"2023-09-05T16:37:25.0761355-03:00\",\"Payload\":
{\"ID\":\"8fc530f0-8fc6-4c89-987b-33fc1ea9861c\",
\"CustomerName\":\"Ramon\",\"CustomerTaxID\":
\"string\",\"Total\":0}}"]
[RID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]
[TID 2227febe-74b7-4e7a-83c9-ba63bf6b6448]

Explora otras tecnologías relacionadas

Última modificación April 16, 2024 (2b35fcc8)