MASA Framework 事件总线 – 跨进程事件总线

MASA Framework 事件总线 – 跨进程事件总线

概述

跨进程事情总线允许发布和订阅跨服务传输的音讯, 服务的发布与订阅不在同一个进程中

在Masa Framework中, 跨进程总线事情供给了一个能够被开箱即用的程序

  • IntegrationEvents: 供给了发件箱形式
    • IntegrationEvents.Dapr: 凭借Dapr完成了音讯的发布
    • EventLogs.EFCore: 根据EFCore完成的集成事情日志的供给者, 供给音讯的记载与状态更新、失利日志重试、删去过期的日志记载等

入门

跨进程事情与Dapr并不是强绑定的, Masa Framework运用了Dapr供给的pub/sub的才能, 假如你不想运用它, 你也能够替换为其它完成, 但现在Masa Framwork中仅供给了Dapr的完成

  • 装置 .NET 6.0
  • 装置 Dapr
  1. 新建ASP.NET Core 空项目Assignment.IntegrationEventBus,并装置Masa.Contrib.Dispatcher.IntegrationEvents.DaprMasa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCoreMasa.Contrib.Data.EFCore.SqliteMasa.Contrib.Data.UoW.EFCoreMasa.Contrib.Development.DaprStarter.AspNetCoreMicrosoft.EntityFrameworkCore.Design
dotnet new web -o Assignment.IntegrationEventBus
cd Assignment.IntegrationEventBus
dotnet add package Masa.Contrib.Dispatcher.IntegrationEvents.Dapr --version 0.7.0-preview.8 // 运用dapr供给的pubsub才能
dotnet add package Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore --version 0.7.0-preview.8 //本地音讯表
dotnet add package Masa.Contrib.Data.EFCore.Sqlite --version 0.7.0-preview.8 //运用EfCore.Sqlite
dotnet add package Masa.Contrib.Data.UoW.EFCore --version 0.7.0-preview.8 //运用工作单元
dotnet add package Masa.Contrib.Development.DaprStarter.AspNetCore --version 0.7.0-preview.8 //开发环境运用DaprStarter帮忙办理Dapr Sidecar
dotnet add package Microsoft.EntityFrameworkCore.Design --version 6.0.6 //方便后续经过CodeFirst迁移数据库
  1. 新建用户上下文UserDbContext,并继承MasaDbContext
public class UserDbContext : MasaDbContext
{
    public UserDbContext(MasaDbContextOptions<UserDbContext> options) : base(options)
    {
    }
}
  1. 注册DaprStarter, 帮忙办理Dapr Sidecar, 修正Program.cs
if (builder.Environment.IsDevelopment())
{
    builder.Services.AddDaprStarter();
}

经过Dapr发布集成事情需求运转Dapr, 线上环境可经过Kubernetes来运转, 开发环境可凭借Dapr Starter运转Dapr, 因而仅需求在开发环境运用它

  1. 注册跨进程事情总线,修正类Program
builder.Services.AddIntegrationEventBus(option =>
{
    option.UseDapr()
        .UseEventLog<UserDbContext>()
        .UseUoW<UserDbContext>(optionBuilder => optionBuilder.UseSqlite($"Data Source=./Db/{Guid.NewGuid():N}.db;"));
});
var app = builder.Build();
#region dapr 订阅集成事情运用
app.UseRouting();
app.UseCloudEvents();
app.UseEndpoints(endpoints =>
{
    endpoints.MapSubscribeHandler();
});
#endregion
  1. 新增用户注册事情的集成事情 RegisterUserEvent
public record RegisterUserEvent : IntegrationEvent
{
    public override string Topic { get; set; } = nameof(RegisterUserEvent);
    public string Account { get; set; }
    public string Mobile { get; set; }
}
  1. 打开Assignment.IntegrationEventBus地点文件夹,打开cmd或Powershell履行
dotnet ef migrations add init //创建迁移
dotnet ef database update //更新数据库
  1. 发送跨进程事情,修正Program
app.MapPost("/register", async (IIntegrationEventBus eventBus) =>
{
    //todo: 模仿注册用户并发布注册用户事情
    await eventBus.PublishAsync(new RegisterUserEvent()
    {
        Account = "Tom",
        Mobile = "19999999999"
    });
});
  1. 订阅事情,修正Program
app.MapPost("/IntegrationEvent/RegisterUser", [Topic("pubsub", nameof(RegisterUserEvent))](RegisterUserEvent @event) =>
{
    Console.WriteLine($"注册用户成功: {@event.Account}");
});

订阅事情暂时未笼统,现在运用的是Dapr原生的订阅方法,后续咱们会支撑Bind,到时不会因为替换pubsub的完成而导致订阅方法的改动

虽然跨进程事情现在仅支撑了Dapr,但这不代表你与RabbitMqKafka等无缘,发布/订阅是Dapr笼统出的才能,完成发布订阅的组件有很多种,RabbitMqKafka是其间一种完成,假如你想深化了解他们之间的关系,能够参考:

  1. 手把手教你学Dapr
  2. PubSub署理

源码解读

首先咱们先要知道的基础知识点:

  • IIntegrationEvent: 集成事情接口, 继承 IEvent (本地事情接口)、ITopic (订阅接口, 发布订阅的主题)、ITransaction (业务接口)
  • IIntegrationEventBus: 集成事情总线接口、用于供给发送集成事情的功用
  • IIntegrationEventLogService: 集成事情日志服务的接口 (供给保存本地日志、修正状态为进行中、成功、失利、删去过期日志、获取等待重试日志列表的功用)
  • IntegrationEventLog: 集成事情日志, 供给本地音讯表的模型
  • IHasConcurrencyStamp: 并发标记接口 (完成此接口的类会主动为RowVersion赋值)

MASA Framework 事件总线 - 跨进程事件总线

Masa.Contrib.Dispatcher.IntegrationEvents

供给了集成事情接口的完成类, 并支撑了发件箱形式, 其间:

  • IPublisher: 集成事情的发送者
  • IProcessingServer: 后台服务接口
  • IProcessor: 处理程序接口 (后台处理程序中会获取一切的程序程序)
    • DeleteLocalQueueExpiresProcessor: 删去过期程序 (从本地行列删去)
    • DeletePublishedExpireEventProcessor: 删去已过期的发布成功的本地音讯程序 (从Db删去)
    • RetryByLocalQueueProcessor: 重试本地音讯记载 (从本地行列中获取, 条件: 发送状态为失利或进行中且重试次数小于最大重试次数且重试距离大于最小重试距离)
    • RetryByDataProcessor: 重试本地音讯记载 (从Db获取, 条件: 发送状态为失利或进行中且重试次数小于最大重试次数且重试距离大于最小重试距离, 且不在本地重试行列中)
  • IntegrationEventBus: IIntegrationEvent的完成

Masa.Contrib.Dispatcher.IntegrationEvents中仅供给了发件箱的功用, 但集成事情的发布是由 IPublisher的完成类来供给, 由Db获取本地音讯表的功用是由IIntegrationEventLogService的完成类来供给, 它们分别属于Masa.Contrib.Dispatcher.IntegrationEvents.DaprMasa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore的功用, 这也是为什么运用集成事情需求引证包

  • Masa.Contrib.Dispatcher.IntegrationEvents
  • Masa.Contrib.Dispatcher.IntegrationEvents.Dapr
  • Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore

怎么快速接入其它完成

那会有小伙伴问了, 我现在没有运用Dapr, 未来一段时间暂时也还不期望接入Dapr, 我想自己接入, 以完成集成事情的发布能够吗?

当然是能够的, 假如你期望自行完成集成事情, 那么这个时候你会遇到两种情况

接入方支撑发件箱形式

以社区用的较多的库CAP为例, 因为它本身现已完成了发件箱形式, 咱们不需求再处理本地音讯表, 也无需考虑本地音讯记载的办理, 那咱们能够这样做

  1. 新建类库Masa.Contrib.Dispatcher.IntegrationEvents.Cap, 增加Masa.BuildingBlocks.Dispatcher.IntegrationEvents的引证, 并装置DotNetCore.CAP
dotnet add package DotNetCore.CAP
  1. 新增类IntegrationEventBus, 并完成IIntegrationEventBus
public class IntegrationEventBus : IIntegrationEventBus
{
    private readonly ICapPublisher _publisher;
    private readonly ICapTransaction _capTransaction;
    private readonly IUnitOfWork? _unitOfWork;
    public IntegrationEventBus(ICapPublisher publisher, ICapTransaction capTransaction, IUnitOfWork? unitOfWork = null)
    {
        _publisher = publisher;
        _capTransaction = capTransaction;
        _unitOfWork = unitOfWork;
    }
    public Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent
    {
        // 假如运用业务
        // _publisher.Transaction.Value.DbTransaction = unitOfWork.Transaction;
        // _publisher.Publish(@event.Topic, @event);
        throw new NotImplementedException();
    }
    public IEnumerable<Type> GetAllEventTypes()
    {
        throw new NotImplementedException();
    }
    public Task CommitAsync(CancellationToken cancellationToken = default)
    {
        throw new NotImplementedException();
    }
}

CAP已支撑本地业务, 运用当前IUnitOfWork供给的业务, 确保数据的原子性

  1. 新建类ServiceCollectionExtensions, 将自界说Publisher注册到服务调集
public static class ServiceCollectionExtensions
{
    public static DispatcherOptions UseRabbitMq(this IServiceCollection services)
    {
         //todo: 注册RabbitMq信息
         services.TryAddScoped<IIntegrationEventBus, IntegrationEventBus>();
         return dispatcherOptions;
    }
}

现已完成发件箱形式的能够直接运用, 而不需求引证

  • Masa.Contrib.Dispatcher.IntegrationEvents
  • Masa.Contrib.Dispatcher.IntegrationEvents.Dapr
  • Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore

以上未经过实际验证, 感兴趣的能够测验下, 欢迎随时提pr

接入方不支撑发件箱形式

我期望直接接入RabbitMq, 但我自己没有做发件箱形式, 那我能够怎么做呢?

因为Masa.Contrib.Dispatcher.IntegrationEvents已供给发件箱形式, 假如只是期望替换一个发布事情的完成者, 那咱们仅需求完成IPublisher即可

  1. 新建类库Masa.Contrib.Dispatcher.IntegrationEvents.RabbitMq, 增加Masa.Contrib.Dispatcher.IntegrationEvents项目引证, 并装置RabbitMQ.Client
dotnet add package RabbitMQ.Client //运用RabbitMq
  1. 新增类Publisher,并完成IPublisher
public class Publisher : IPublisher
{
    public async Task PublishAsync<T>(string topicName, T @event, CancellationToken stoppingToken = default) where T : IIntegrationEvent
    {
        //todo: 经过 RabbitMQ.Client 发送音讯到RabbitMq
        throw new NotImplementedException();
    }
}
  1. 新建类DispatcherOptionsExtensions, 将自界说Publisher注册到服务调集
public static class DispatcherOptionsExtensions
{
    public static DispatcherOptions UseRabbitMq(this Masa.Contrib.Dispatcher.IntegrationEvents.Options.DispatcherOptions options)
    {
         //todo: 注册RabbitMq信息
         dispatcherOptions.Services.TryAddSingleton<IPublisher, Publisher>();
         return dispatcherOptions;
    }
}
  1. 怎么运用自界说完成RabbitMq
builder.Services.AddIntegrationEventBus(option =>
{
    option.UseRabbitMq();//修正为运用RabbitMq
    option.UseUoW<UserDbContext>(optionBuilder => optionBuilder.UseSqlite($"Data Source=./Db/{Guid.NewGuid():N}.db;"));
    option.UseEventLog<UserDbContext>();
});

本章源码

Assignment12

github.com/zhenlei520/…

开源地址

MASA.Framework:github.com/masastack/M…

假如你对咱们的 MASA Framework 感兴趣,无论是代码奉献、运用、提 Issue,欢迎联系咱们

  • WeChat:MasaStackTechOps
  • QQ:7424099