原文:Transactional Outbox in .NET Cloud Native Development via Aspire
作者:Oleksii Nikiforov
总览
这篇文章提供了使用 Aspire、DotNetCore.CAP、Azure Service Bus、Azure SQL、Bicep 和 azd 实现 Outbox 模式的示例。
源代码: https://github.com/NikiforovAll/cap-aspire
发件箱模式简介
发件箱模式是分布式系统领域中的一个重要组件。随着现代软件开发朝着更加分布式和解耦的架构发展,确保可靠的消息传递变得越来越重要。
在分布式系统中,不同的组件需要相互通信,通常通过异步消息传递。发件箱模式提供了一种可靠的方法来处理这些消息。它确保即使系统在执行本地事务之后但在发送消息之前发生故障,消息也不会丢失。相反,它会暂时存储在“发件箱”中,并在系统恢复时检索和发送。
通过使用发件箱模式,我们可以确保系统的所有组件以可靠的方式接收必要的消息,从而确保整个系统的完整性和一致性。
在没有发件箱模式的分布式系统中,有几种场景可能会出错,从而导致数据不一致或消息丢失。以下是几个示例:
- 事务提交和消息发送不是原子的:在通常情况下,Service 可能首先将事务提交到其数据库,然后向消息代理(Broker)发送消息。如果服务在事务提交之后但在消息发送之前崩溃,则消息将丢失。其他服务将不知道已提交到数据库的更改。
- 消息发送失败:即使服务没有崩溃,由于网络问题或消息代理问题,消息发送也可能失败。如果不重试消息发送操作,消息将丢失。
- 重复消息:如果服务在失败后重试消息发送操作,如果第一次发送实际上成功但确认丢失,则最终可能会多次发送同一条消息。如果消息使用者不是幂等的,这可能会导致重复处理。
- 顺序问题:如果单个事务发送了多条消息,并且这些发送不是原子性的,则这些消息可能会无序接收。如果消息的顺序很重要,这可能会导致不正确的处理。
发件箱模式通过确保事务提交和消息发送操作是原子的,并提供即使在出现故障时也能可靠地发送消息的机制来解决这些问题。
下面是一个序列图,说明了没有发件箱模式的系统所存在的的问题:
幂等消费者在发件箱模式中扮演着重要的角色。在分布式系统的背景下,幂等性是指系统无论执行特定操作多少次都能产生相同结果的能力。这对于确保分布式环境中的数据一致性和可靠性至关重要。
然而,这可能会导致同一条消息被多次发送,尤其是在系统在发送消息后但在发件箱中将消息标记为已发送之前发生故障的情况下。这就是幂等消费者发挥作用的地方。
幂等消费者旨在妥善处理重复消息。它们确保消除多次接收同一条消息的副作用。这通常通过跟踪所有已处理消息的 ID 来实现。当收到一条消息时,消费者会检查它是否已经处理了一条具有相同 ID 的消息。如果已经处理过,它就会忽略该消息。
下面是一个序列图,说明了发件箱模式如何解决问题:
发件箱模式的实现
现在您已经了解了发件箱模式的重要性和好处,让我们深入研究一下实现它需要什么:
发件箱模式的实现涉及以下步骤:
- 创建发件箱表:第一步是在数据库中创建发件箱表。此表将存储所有需要发送的消息。每条消息都应具有唯一的 ID 和一个指示消息是否已发送的状态字段。
- 修改应用程序代码:下一步是修改应用程序代码。每当您的应用程序需要将消息作为事务的一部分发送时,它都应将该消息作为同一事务的一部分添加到发件箱表中。
- 实现发件箱发布器:发件箱发布器是一个单独的组件,它会轮询发件箱表中未发送的消息。当它发现未发送的消息时,它会发送该消息并将发件箱表中该消息的状态更新为“已发送”。
DotNetCore.CAP 简介
幸运的是,有一个名为 DotNetCore.CAP 的 .NET 库可以为我们简化 Outbox 模式的实现。
DotNetCore.CAP 是一个开源库,它提供了一组 API,允许开发人员轻松地将消息作为数据库事务的一部分发送,将其存储在发件箱中,并确保即使在出现故障的情况下也能可靠地将它们传递给所有感兴趣的消费者。
该库还支持幂等消费者,这对于确保分布式环境中的数据一致性和可靠性至关重要。这意味着即使同一条消息被传递多次,也可以消除接收同一条消息的副作用。
通过使用DotNetCore.CAP,开发人员可以专注于其应用程序的业务逻辑,而库则负责确保分布式系统中可靠消息传递的复杂性。
例子
此代码演示了如何在 ASP.NET Core 应用程序中使用 CAP 库进行事件发布和处理。
在生产者中:
- 定义了“/send”端点的路由处理程序。
- 它启动一个事务,执行一个 SQL 命令来获取当前服务器时间,并将该时间的消息发布到“test.show.time”主题。
- 消息发布延迟500毫秒。
- 如果所有操作都成功,则事务被提交并返回响应。
// Producer/Program.cs
app.MapGet("/send", async ( SqlConnection connection, ICapPublisher capPublisher, TimeProvider timeProvider) =>
{ var startTime = timeProvider.GetTimestamp(); using var transaction = connection .BeginTransaction(capPublisher, autoCommit: false);var command = connection.CreateCommand();command.Transaction = (SqlTransaction)transaction;command.CommandText = "SELECT GETDATE()";var serverTime = await command.ExecuteScalarAsync();await capPublisher.PublishDelayAsync(TimeSpan.FromMilliseconds(500),"test.show.time",new MyMessage(serverTime?.ToString()!));transaction.Commit();return TypedResults.Ok(new{Status = "Published",Duration = timeProvider.GetElapsedTime(startTime)});
});
💡注意,BeginTransaction
扩展方法是定义在 DotNetCore.CAP.SqlServer
中的,负责发件箱表的管理。
public static IDbTransaction BeginTransaction( this IDbConnection dbConnection, ICapPublisher publisher, bool autoCommit = false)
在消费者方面:
- 定义一个实现
ICapSubscribe
的类SampleSubscriber
。 - 它有一个
HandleAsync
的方法,将CapSubscribe
指定为“test.show.time”作为主题的处理程序。 - 当收到有关此主题的消息时,它会在 300 毫秒的延迟后记录消息内容。
// Consumer/Program.cs
public class SampleSubscriber( TimeProvider timeProvider, ILogger <samplesubscriber>logger) : ICapSubscribe
{ public record MyMessage(string CreatedAt);[CapSubscribe("test.show.time")]public async Task HandleAsync(MyMessage message){await Task.Delay(TimeSpan.FromMilliseconds(300), timeProvider);logger.LogInformation("Message received: {CreatedAt}", message.CreatedAt);}
}
添加Aspire
为了完整地演示该演示,我们需要设置一些真实的基础设施组件 - 消息代理和数据库。
.NET Aspire 提供了一套精选的 NuGet 包(组件),专门用于促进云原生应用程序的集成。每个组件都通过自动配置或标准化配置模式提供必要的云原生功能。
添加消息代理:
.NET Aspire 服务总线组件处理以下问题以将您的应用连接到 Azure 服务总线。它添加 ServiceBusClient
到 DI 容器以连接到 Azure 服务总线。
dotnet add package Aspire.Azure.Messaging.ServiceBus --prerelease
添加数据库:
.NET Aspire 提供了两个内置配置选项来简化 Azure 上的 SQL Server 部署:
- 使用 Azure 容器应用预配容器化 SQL Server 数据库
- 提供一个 Azure SQL 数据库实例(我们将使用这个)
dotnet add package Aspire.Microsoft.Data.SqlClient --prerelease
以下是我们如何根据已安装的组件设置 Aspire Host:
// CapExample.AppHost/Program.cs
var builder = DistributedApplication.CreateBuilder(args);var sqlServer = builder.ExecutionContext.IsPublishMode ? builder.AddSqlServer("sqlserver").PublishAsAzureSqlDatabase().AddDatabase("sqldb") : builder.AddConnectionString("sqldb");var serviceBus = builder.ExecutionContext.IsPublishMode ? builder.AddAzureServiceBus("serviceBus") : builder.AddConnectionString("serviceBus");builder.AddProject<Projects.Consumer>("consumer") .WithReference(sqlServer) .WithReference(serviceBus);builder.AddProject<Projects.Producer>("producer") .WithReference(sqlServer) .WithReference(serviceBus);builder.Build().Run();
这个思路是在开发时使用连接字符串,并在发布时配置 Azure 资源。
Aspire 提供灵活的配置系统,让我们无需更改源代码即可在本地开发并部署到云中。我们可以使用由 Aspire Components 管理的连接字符串,这些连接字符串可以在本地开发和云部署环境之间轻松切换。这使我们能够在不同的部署场景之间无缝过渡,而无需修改源代码。
下面您可以找到如何基于 Aspire 组件进行配置 DotNetCore.CAP
:
// Consumer/Program.cs
// Producer/Program.cs
var builder = WebApplication.CreateBuilder(args);builder.AddServiceDefaults();
builder.AddAzureServiceBus("serviceBus");
builder.AddSqlServerClient("sqldb");builder.Services.AddCap(x =>
{ var dbConnectionString = builder.Configuration.GetConnectionString("sqldb")!; var serviceBusConnection = builder.Configuration.GetConnectionString("serviceBus")!;x.UseAzureServiceBus(serviceBusConnection);x.UseSqlServer(x => x.ConnectionString = dbConnectionString);
});
提供基础设施
Azure 开发人员 CLI ( azd
) 已得到增强,可支持 .NET Aspire 应用程序的部署。azd init
工作流为 .NET Aspire 项目提供定制支持。我在开发此应用程序时使用了这种方法,事实证明它非常好。它提高了生产力。
当azd
目标是 .NET Aspire 应用程序时,它会使用指定的命令启动( AppHost dotnet run --project AppHost.csproj -- --publisher manifest
),从而生成 Aspire 清单文件。
azd provision
命令逻辑会询问清单文件,以仅在内存中生成 Bicep 文件(默认情况下)。
更多信息请参阅 https://learn.microsoft.com/en-us/dotnet/aspire/deployment/azure/aca-deployment-azd-in-depth?tabs=linux#how-azure-developer-cli-integration-works 。
就我个人而言,我发现显式生成 Bicep 文件更方便。这可以通过执行以下命令来实现:
❯ azd infra synth
以下是将要配置的内容的可视化:
为了配置资源,我们需要运行下一个命令:
❯ azd provision
这里创建的资源组- rg-cap-dev
:
配置本地开发
要检索 Azure 服务总线的连接字符串:
#!/bin/bash
resourceGroup="rg-cap-dev"
namespace=$(az servicebus namespace list \--resource-group $resourceGroup \--output tsv \--query '[0].name'
)
azbConnectionString=$(az servicebus namespace authorization-rule keys list \--namespace-name "$namespace" \--name RootManageSharedAccessKey \--resource-group $resourceGroup \--output tsv \--query 'primaryConnectionString'
)dotnet user-secrets --project ./src/CapExample.AppHost \set ConnectionStrings:serviceBus $azbConnectionString
要检索 Azure SQL 数据库的连接字符串:
#!/bin/bash# read server address
if [ -f .azure/cap-dev/.env ]
thenexport $(cat .azure/cap-dev/.env | sed 's/#.*//g' | xargs)
fi# read server password
db_password=$(jq -r '.inputs.sql.password' .azure/cap-dev/config.json)dotnet user-secrets --project ./src/CapExample.AppHost set ConnectionStrings:sqldb \"Server=$SQLSERVER_SQLSERVERFQDN;Initial Catalog=sqldb;Persist Security Info=False;User ID=CloudSA;Password=$db_password;MultipleActiveResultSets=False;Encrypt=True;TrustServerCertificate=False;Connection Timeout=30;"
要检查机密是否存储成功:
❯ dotnet user-secrets --project ./src/CapExample.AppHost list# ConnectionStrings:sqldb = Server=sqlserver-gopucer6dsl5q.database.windows.net;Initial Catalog=sqldb;Persist Security Info=False;User ID=CloudSA;Password=<your-password>;MultipleActiveResultSets=False;Encrypt=True;TrustServerCertificate=False;Connection Timeout=30;
# ConnectionStrings:serviceBus = Endpoint=sb://servicebus-gopucer6dsl5q.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=<your-key>
演示
您可能知道,Aspire 具有开箱即用的 OpenTelemetry 支持,您可以在 ServiceDefaults/Extensions.cs 中对其进行配置。
DotNetCore.CAP 有一个支持 OpenTelemetry 功能的包。
OpenTelemetry 中的分布式跨度通过提供一种跨系统不同组件跟踪消息流的方法,帮助您跟踪消息代理上的操作。
当使用像 DotNetCore.CAP 这样的消息代理时,消息通常通过中间代理从生产者发送到消费者。此过程中的每个步骤都可以被视为一个跨度,它代表一个工作单元或一个操作。
通过使用 OpenTelemetry 检测代码,您可以创建捕获有关消息流每个步骤的信息的跨度。这些跨度可以包括详细信息,例如所花费的时间、遇到的任何错误以及其他上下文信息。
借助分布式跨度,您可以直观地看到消息在系统中移动的整个过程,从生产者到代理,最后到消费者。这让您能够深入了解消息处理管道的性能和行为。
通过分析分布式跨度,您可以识别消息处理流程中的瓶颈、延迟问题和潜在错误。这些信息对于排除分布式系统故障和优化其性能非常有价值。
以下是DotNetCore.CAP 安装和配置 OpenTelemetry 所需要做的事情 :
安装 NuGet 包:
❯ dotnet add ./src/CapExample.ServiceDefaults/ package DotNetCore.Cap.OpenTelemetry
调整默认配置:
builder.Services.AddOpenTelemetry().WithMetrics(metrics =>{metrics.AddAspNetCoreInstrumentation().AddHttpClientInstrumentation().AddProcessInstrumentation().AddRuntimeInstrumentation();}).WithTracing(tracing =>{if (builder.Environment.IsDevelopment()){tracing.SetSampler(new AlwaysOnSampler());}tracing.AddAspNetCoreInstrumentation().AddGrpcClientInstrumentation().AddHttpClientInstrumentation().AddCapInstrumentation(); // <-- add this code});
本地
现在,让我们运行演示:
❯ dotnet run --project ./src/CapExample.AppHost
# Building...
# Restore complete (0.6s)
# CapExample.ServiceDefaults succeeded (0.2s) → src\CapExample.ServiceDefaults\bin\Debug\net9.0\CapExample.ServiceDefaults.dll
# Consumer succeeded (0.3s) → src\Consumer\bin\Debug\net9.0\Consumer.dll
# Producer succeeded (0.5s) → src\Producer\bin\Debug\net9.0\Producer.dll
# CapExample.AppHost succeeded (0.2s) → src\CapExample.AppHost\bin\Debug\net9.0\CapExample.AppHost.dll# Build succeeded in 2.6s
# info: Aspire.Hosting.DistributedApplication[0]
# Aspire version: 9.0.0-preview.2.24162.2+eaca163f7737934020f1102a9d11fdf790cccdc0
# info: Aspire.Hosting.DistributedApplication[0]
# Distributed application starting.
# info: Aspire.Hosting.DistributedApplication[0]
# Application host directory is: C:\Users\Oleksii_Nikiforov\dev\cap-aspire\src\CapExample.AppHost
# info: Aspire.Hosting.DistributedApplication[0]
# Now listening on: http://localhost:15118
# info: Aspire.Hosting.DistributedApplication[0]
# Distributed application started. Press Ctrl+C to shut down.
让我们生成一些负载:
❯ curl -s http://localhost:5288/send | jq
# {
# "status": "Published",
# "duration": "00:00:00.5255861"
# }
导航到 Aspire Dashboard 查看跟踪:
这是第一个Request,如您所见,我们需要一些时间来与 Azure 服务总线建立初始连接:
后续请求花费的时间更少:
💡我建议您深入研究源代码和跟踪示例,以增强您对Outbox Pattern工作原理的理解。
Azure
让我们通过运行azd deploy
部署到 Azure 容器应用
在初始配置(azd init
)期间,我指定了生产者的公共地址。我们可以利用它进行开发测试:
让我们生成一些负载并查看 Azure 服务总线的指标。
❯ curl -s https://producer.mangoforest-17799c26.eastus.azurecontainerapps.io/send | jq
# {
# "status": "Published",
# "duration": "00:00:00.0128251"
# }
发件箱表
在后台 DotNetCore.CAP 创建两个表来管理发件箱。
在发件箱模式中,Published 和 Received 表用于管理需要发布或已被消息系统接收的消息。让我们仔细看看每个表的用途:
Published 表:负责存储需要发布到外部消息系统的消息。当应用程序生成需要发送的消息时,该消息将存储在已发布表中。此表充当缓冲区或队列,确保如果消息系统暂时不可用或在发布过程中出现任何故障,消息不会丢失。通过使用已发布表,应用程序可以继续生成消息,而不会受到消息系统可用性或性能的阻碍。Published 表中的消息可以由单独的组件或后台进程异步处理,这些组件或后台进程负责将它们发布到外部消息系统。一旦消息成功发布,就可以将其从Published 表中删除。
Received 表:用于跟踪应用程序从外部消息传递系统接收到的消息。当应用程序收到消息时,它会将有关该消息的必要信息存储在Received 表中。此信息可以包括消息内容、元数据和任何其他相关详细信息。Received 表允许应用程序保留已处理的消息记录,从而使其能够处理重复消息。
清理
完成开发后,您可以删除资源组 rg-cap-dev:
❯ azd down
# Deleting all resources and deployed code on Azure (azd down)
# Local application code is not deleted when running 'azd down'.
# Resource group(s) to be deleted:
# • rg-cap-dev: https://portal.azure.com/#@/resource/subscriptions/0b252e02-9c7a-4d73-8fbc-633c5d111ebc/resourceGroups/rg-cap-dev/overview
# ? Total resources to delete: 10, are you sure you want to continue? Yes
# Deleting your resources can take some time.
# (✓) Done: Deleting resource group: rg-cap-dev# SUCCESS: Your application was removed from Azure in 10 minutes 53 seconds.
总结
在本文中,我们探讨了发件箱模式,这是分布式系统领域中的关键组件。我们已经了解了它如何确保分布式系统中可靠的消息传递,从而维护整个系统的完整性和一致性。
我们还研究了 .NET 库 DotNetCore.CAP 如何简化Outbox Pattern的实现,使开发人员能够专注于其应用程序的业务逻辑,而库则负责确保可靠消息传递的复杂性。
最后,我们看到了 .NET Aspire 如何提供精选的 NuGet 包套件,以促进云原生应用程序的集成,并通过自动配置或标准化配置模式提供必要的云原生功能。
总之,Outbox Pattern、DotNetCore.CAP 和 .NET Aspire 的组合提供了一套强大的工具集,用于在 .NET 中构建可靠的云原生应用程序。通过理解和利用这些工具,开发人员可以构建强大、可扩展且可应对现代分布式世界挑战的应用程序。
参考
https://learn.microsoft.com/en-us/azure/developer/azure-developer-cli/
https://cap.dotnetcore.xyz/
https://learn.microsoft.com/en-us/dotnet/aspire/deployment/azure/aca-deployment-azd-in-deep