事务发件箱模式在 .NET 云原生开发中的应用(基于Aspire)

原文:Transactional Outbox in .NET Cloud Native Development via Aspire
作者:Oleksii Nikiforov

总览

这篇文章提供了使用 AspireDotNetCore.CAPAzure Service BusAzure SQLBicepazd 实现 Outbox 模式的示例。

源代码: https://github.com/NikiforovAll/cap-aspire

发件箱模式简介

发件箱模式是分布式系统领域中的一个重要组件。随着现代软件开发朝着更加分布式和解耦的架构发展,确保可靠的消息传递变得越来越重要。

在分布式系统中,不同的组件需要相互通信,通常通过异步消息传递。发件箱模式提供了一种可靠的方法来处理这些消息。它确保即使系统在执行本地事务之后但在发送消息之前发生故障,消息也不会丢失。相反,它会暂时存储在“发件箱”中,并在系统恢复时检索和发送。

通过使用发件箱模式,我们可以确保系统的所有组件以可靠的方式接收必要的消息,从而确保整个系统的完整性和一致性。

在没有发件箱模式的分布式系统中,有几种场景可能会出错,从而导致数据不一致或消息丢失。以下是几个示例:

  1. 事务提交和消息发送不是原子的:在通常情况下,Service 可能首先将事务提交到其数据库,然后向消息代理(Broker)发送消息。如果服务在事务提交之后但在消息发送之前崩溃,则消息将丢失。其他服务将不知道已提交到数据库的更改。
  2. 消息发送失败:即使服务没有崩溃,由于网络问题或消息代理问题,消息发送也可能失败。如果不重试消息发送操作,消息将丢失。
  3. 重复消息:如果服务在失败后重试消息发送操作,如果第一次发送实际上成功但确认丢失,则最终可能会多次发送同一条消息。如果消息使用者不是幂等的,这可能会导致重复处理。
  4. 顺序问题:如果单个事务发送了多条消息,并且这些发送不是原子性的,则这些消息可能会无序接收。如果消息的顺序很重要,这可能会导致不正确的处理。

发件箱模式通过确保事务提交和消息发送操作是原子的,并提供即使在出现故障时也能可靠地发送消息的机制来解决这些问题。

下面是一个序列图,说明了没有发件箱模式的系统所存在的的问题:

幂等消费者在发件箱模式中扮演着重要的角色。在分布式系统的背景下,幂等性是指系统无论执行特定操作多少次都能产生相同结果的能力。这对于确保分布式环境中的数据一致性和可靠性至关重要。

然而,这可能会导致同一条消息被多次发送,尤其是在系统在发送消息后但在发件箱中将消息标记为已发送之前发生故障的情况下。这就是幂等消费者发挥作用的地方。

幂等消费者旨在妥善处理重复消息。它们确保消除多次接收同一条消息的副作用。这通常通过跟踪所有已处理消息的 ID 来实现。当收到一条消息时,消费者会检查它是否已经处理了一条具有相同 ID 的消息。如果已经处理过,它就会忽略该消息。

下面是一个序列图,说明了发件箱模式如何解决问题:

发件箱模式的实现

现在您已经了解了发件箱模式的重要性和好处,让我们深入研究一下实现它需要什么:

发件箱模式的实现涉及以下步骤:

  1. 创建发件箱表:第一步是在数据库中创建发件箱表。此表将存储所有需要发送的消息。每条消息都应具有唯一的 ID 和一个指示消息是否已发送的状态字段。
  2. 修改应用程序代码:下一步是修改应用程序代码。每当您的应用程序需要将消息作为事务的一部分发送时,它都应将该消息作为同一事务的一部分添加到发件箱表中。
  3. 实现发件箱发布器:发件箱发布器是一个单独的组件,它会轮询发件箱表中未发送的消息。当它发现未发送的消息时,它会发送该消息并将发件箱表中该消息的状态更新为“已发送”。

DotNetCore.CAP 简介

幸运的是,有一个名为 DotNetCore.CAP 的 .NET 库可以为我们简化 Outbox 模式的实现。

DotNetCore.CAP 是一个开源库,它提供了一组 API,允许开发人员轻松地将消息作为数据库事务的一部分发送,将其存储在发件箱中,并确保即使在出现故障的情况下也能可靠地将它们传递给所有感兴趣的消费者。

该库还支持幂等消费者,这对于确保分布式环境中的数据一致性和可靠性至关重要。这意味着即使同一条消息被传递多次,也可以消除接收同一条消息的副作用。

通过使用DotNetCore.CAP,开发人员可以专注于其应用程序的业务逻辑,而库则负责确保分布式系统中可靠消息传递的复杂性。

例子

此代码演示了如何在 ASP.NET Core 应用程序中使用 CAP 库进行事件发布和处理。

在生产者中:

  • 定义了“/send”端点的路由处理程序。
  • 它启动一个事务,执行一个 SQL 命令来获取当前服务器时间,并将该时间的消息发布到“test.show.time”主题。
  • 消息发布延迟500毫秒。
  • 如果所有操作都成功,则事务被提交并返回响应。
// Producer/Program.cs  
app.MapGet("/send", async (  SqlConnection connection,  ICapPublisher capPublisher,  TimeProvider timeProvider) =>  
{  var startTime = timeProvider.GetTimestamp();  using var transaction = connection  .BeginTransaction(capPublisher, autoCommit: false);var command = connection.CreateCommand();command.Transaction = (SqlTransaction)transaction;command.CommandText = "SELECT GETDATE()";var serverTime = await command.ExecuteScalarAsync();await capPublisher.PublishDelayAsync(TimeSpan.FromMilliseconds(500),"test.show.time",new MyMessage(serverTime?.ToString()!));transaction.Commit();return TypedResults.Ok(new{Status = "Published",Duration = timeProvider.GetElapsedTime(startTime)});
});  

💡注意,BeginTransaction 扩展方法是定义在 DotNetCore.CAP.SqlServer中的,负责发件箱表的管理。

public static IDbTransaction BeginTransaction(  this IDbConnection dbConnection,  ICapPublisher publisher,  bool autoCommit = false)  

在消费者方面:

  • 定义一个实现ICapSubscribe的类SampleSubscriber
  • 它有一个HandleAsync的方法,将 CapSubscribe 指定为“test.show.time”作为主题的处理程序。
  • 当收到有关此主题的消息时,它会在 300 毫秒的延迟后记录消息内容。
// Consumer/Program.cs  
public class SampleSubscriber(  TimeProvider timeProvider,  ILogger <samplesubscriber>logger) : ICapSubscribe  
{  public record MyMessage(string CreatedAt);[CapSubscribe("test.show.time")]public async Task HandleAsync(MyMessage message){await Task.Delay(TimeSpan.FromMilliseconds(300), timeProvider);logger.LogInformation("Message received: {CreatedAt}", message.CreatedAt);}
}  

添加Aspire

为了完整地演示该演示,我们需要设置一些真实的基础设施组件 - 消息代理和数据库。

.NET Aspire 提供了一套精选的 NuGet 包(组件),专门用于促进云原生应用程序的集成。每个组件都通过自动配置或标准化配置模式提供必要的云原生功能。

添加消息代理:

.NET Aspire 服务总线组件处理以下问题以将您的应用连接到 Azure 服务总线。它添加 ServiceBusClient 到 DI 容器以连接到 Azure 服务总线。

dotnet add package Aspire.Azure.Messaging.ServiceBus --prerelease  

添加数据库:

.NET Aspire 提供了两个内置配置选项来简化 Azure 上的 SQL Server 部署:

  1. 使用 Azure 容器应用预配容器化 SQL Server 数据库
  2. 提供一个 Azure SQL 数据库实例(我们将使用这个)
dotnet add package Aspire.Microsoft.Data.SqlClient --prerelease  

以下是我们如何根据已安装的组件设置 Aspire Host:

// CapExample.AppHost/Program.cs  
var builder = DistributedApplication.CreateBuilder(args);var sqlServer = builder.ExecutionContext.IsPublishMode  ? builder.AddSqlServer("sqlserver").PublishAsAzureSqlDatabase().AddDatabase("sqldb")  : builder.AddConnectionString("sqldb");var serviceBus = builder.ExecutionContext.IsPublishMode  ? builder.AddAzureServiceBus("serviceBus")  : builder.AddConnectionString("serviceBus");builder.AddProject&lt;Projects.Consumer&gt;("consumer")  .WithReference(sqlServer)  .WithReference(serviceBus);builder.AddProject&lt;Projects.Producer&gt;("producer")  .WithReference(sqlServer)  .WithReference(serviceBus);builder.Build().Run();  

这个思路是在开发时使用连接字符串,并在发布时配置 Azure 资源。

Aspire 提供灵活的配置系统,让我们无需更改源代码即可在本地开发并部署到云中。我们可以使用由 Aspire Components 管理的连接字符串,这些连接字符串可以在本地开发和云部署环境之间轻松切换。这使我们能够在不同的部署场景之间无缝过渡,而无需修改源代码。

下面您可以找到如何基于 Aspire 组件进行配置 DotNetCore.CAP

// Consumer/Program.cs  
// Producer/Program.cs  
var builder = WebApplication.CreateBuilder(args);builder.AddServiceDefaults();  
builder.AddAzureServiceBus("serviceBus");  
builder.AddSqlServerClient("sqldb");builder.Services.AddCap(x =>  
{  var dbConnectionString = builder.Configuration.GetConnectionString("sqldb")!;  var serviceBusConnection = builder.Configuration.GetConnectionString("serviceBus")!;x.UseAzureServiceBus(serviceBusConnection);x.UseSqlServer(x => x.ConnectionString = dbConnectionString);
});  

提供基础设施

Azure 开发人员 CLI ( azd) 已得到增强,可支持 .NET Aspire 应用程序的部署。azd init工作流为 .NET Aspire 项目提供定制支持。我在开发此应用程序时使用了这种方法,事实证明它非常好。它提高了生产力。

azd目标是 .NET Aspire 应用程序时,它会使用指定的命令启动( AppHost dotnet run --project AppHost.csproj -- --publisher manifest),从而生成 Aspire 清单文件。

azd provision命令逻辑会询问清单文件,以仅在内存中生成 Bicep 文件(默认情况下)。

更多信息请参阅 https://learn.microsoft.com/en-us/dotnet/aspire/deployment/azure/aca-deployment-azd-in-depth?tabs=linux#how-azure-developer-cli-integration-works 。

就我个人而言,我发现显式生成 Bicep 文件更方便。这可以通过执行以下命令来实现:

❯ azd infra synth  

以下是将要配置的内容的可视化:

为了配置资源,我们需要运行下一个命令:

❯ azd provision  

这里创建的资源组- rg-cap-dev

配置本地开发

要检索 Azure 服务总线的连接字符串:

#!/bin/bash
resourceGroup="rg-cap-dev"
namespace=$(az servicebus namespace list \--resource-group $resourceGroup \--output tsv \--query '[0].name'
)
azbConnectionString=$(az servicebus namespace authorization-rule keys list \--namespace-name "$namespace" \--name RootManageSharedAccessKey \--resource-group $resourceGroup \--output tsv \--query 'primaryConnectionString'
)dotnet user-secrets --project ./src/CapExample.AppHost \set ConnectionStrings:serviceBus $azbConnectionString

要检索 Azure SQL 数据库的连接字符串:

#!/bin/bash# read server address
if [ -f .azure/cap-dev/.env ]
thenexport $(cat .azure/cap-dev/.env | sed 's/#.*//g' | xargs)
fi# read server password
db_password=$(jq -r '.inputs.sql.password' .azure/cap-dev/config.json)dotnet user-secrets --project ./src/CapExample.AppHost set ConnectionStrings:sqldb \"Server=$SQLSERVER_SQLSERVERFQDN;Initial Catalog=sqldb;Persist Security Info=False;User ID=CloudSA;Password=$db_password;MultipleActiveResultSets=False;Encrypt=True;TrustServerCertificate=False;Connection Timeout=30;"

要检查机密是否存储成功:

❯ dotnet user-secrets --project ./src/CapExample.AppHost list# ConnectionStrings:sqldb = Server=sqlserver-gopucer6dsl5q.database.windows.net;Initial Catalog=sqldb;Persist Security Info=False;User ID=CloudSA;Password=<your-password>;MultipleActiveResultSets=False;Encrypt=True;TrustServerCertificate=False;Connection Timeout=30;
# ConnectionStrings:serviceBus = Endpoint=sb://servicebus-gopucer6dsl5q.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=<your-key>

演示

您可能知道,Aspire 具有开箱即用的 OpenTelemetry 支持,您可以在 ServiceDefaults/Extensions.cs 中对其进行配置。

DotNetCore.CAP 有一个支持 OpenTelemetry 功能的包。

OpenTelemetry 中的分布式跨度通过提供一种跨系统不同组件跟踪消息流的方法,帮助您跟踪消息代理上的操作。

当使用像 DotNetCore.CAP 这样的消息代理时,消息通常通过中间代理从生产者发送到消费者。此过程中的每个步骤都可以被视为一个跨度,它代表一个工作单元或一个操作。

通过使用 OpenTelemetry 检测代码,您可以创建捕获有关消息流每个步骤的信息的跨度。这些跨度可以包括详细信息,例如所花费的时间、遇到的任何错误以及其他上下文信息。

借助分布式跨度,您可以直观地看到消息在系统中移动的整个过程,从生产者到代理,最后到消费者。这让您能够深入了解消息处理管道的性能和行为。

通过分析分布式跨度,您可以识别消息处理流程中的瓶颈、延迟问题和潜在错误。这些信息对于排除分布式系统故障和优化其性能非常有价值。

以下是DotNetCore.CAP 安装和配置 OpenTelemetry 所需要做的事情 :

安装 NuGet 包:

❯ dotnet add ./src/CapExample.ServiceDefaults/ package DotNetCore.Cap.OpenTelemetry  

调整默认配置:

builder.Services.AddOpenTelemetry().WithMetrics(metrics =>{metrics.AddAspNetCoreInstrumentation().AddHttpClientInstrumentation().AddProcessInstrumentation().AddRuntimeInstrumentation();}).WithTracing(tracing =>{if (builder.Environment.IsDevelopment()){tracing.SetSampler(new AlwaysOnSampler());}tracing.AddAspNetCoreInstrumentation().AddGrpcClientInstrumentation().AddHttpClientInstrumentation().AddCapInstrumentation(); // <-- add this code});

本地

现在,让我们运行演示:

❯ dotnet run --project ./src/CapExample.AppHost
# Building...
# Restore complete (0.6s)
#   CapExample.ServiceDefaults succeeded (0.2s) → src\CapExample.ServiceDefaults\bin\Debug\net9.0\CapExample.ServiceDefaults.dll
#   Consumer succeeded (0.3s) → src\Consumer\bin\Debug\net9.0\Consumer.dll
#   Producer succeeded (0.5s) → src\Producer\bin\Debug\net9.0\Producer.dll
#   CapExample.AppHost succeeded (0.2s) → src\CapExample.AppHost\bin\Debug\net9.0\CapExample.AppHost.dll# Build succeeded in 2.6s
# info: Aspire.Hosting.DistributedApplication[0]
#       Aspire version: 9.0.0-preview.2.24162.2+eaca163f7737934020f1102a9d11fdf790cccdc0
# info: Aspire.Hosting.DistributedApplication[0]
#       Distributed application starting.
# info: Aspire.Hosting.DistributedApplication[0]
#       Application host directory is: C:\Users\Oleksii_Nikiforov\dev\cap-aspire\src\CapExample.AppHost
# info: Aspire.Hosting.DistributedApplication[0]
#       Now listening on: http://localhost:15118
# info: Aspire.Hosting.DistributedApplication[0]
#       Distributed application started. Press Ctrl+C to shut down.

让我们生成一些负载:

❯ curl -s http://localhost:5288/send | jq
# {
#   "status": "Published",
#   "duration": "00:00:00.5255861"
# }

导航到 Aspire Dashboard 查看跟踪:

这是第一个Request,如您所见,我们需要一些时间来与 Azure 服务总线建立初始连接:

后续请求花费的时间更少:

💡我建议您深入研究源代码和跟踪示例,以增强您对Outbox Pattern工作原理的理解。

Azure

让我们通过运行azd deploy部署到 Azure 容器应用

在初始配置(azd init)期间,我指定了生产者的公共地址。我们可以利用它进行开发测试:

让我们生成一些负载并查看 Azure 服务总线的指标。

❯ curl -s https://producer.mangoforest-17799c26.eastus.azurecontainerapps.io/send | jq
# {
#   "status": "Published",
#   "duration": "00:00:00.0128251"
# }

发件箱表

在后台 DotNetCore.CAP 创建两个表来管理发件箱。

发件箱模式中,Published 和 Received 表用于管理需要发布或已被消息系统接收的消息。让我们仔细看看每个表的用途:

Published 表:负责存储需要发布到外部消息系统的消息。当应用程序生成需要发送的消息时,该消息将存储在已发布表中。此表充当缓冲区或队列,确保如果消息系统暂时不可用或在发布过程中出现任何故障,消息不会丢失。通过使用已发布表,应用程序可以继续生成消息,而不会受到消息系统可用性或性能的阻碍。Published 表中的消息可以由单独的组件或后台进程异步处理,这些组件或后台进程负责将它们发布到外部消息系统。一旦消息成功发布,就可以将其从Published 表中删除。

Received 表:用于跟踪应用程序从外部消息传递系统接收到的消息。当应用程序收到消息时,它会将有关该消息的必要信息存储在Received 表中。此信息可以包括消息内容、元数据和任何其他相关详细信息。Received 表允许应用程序保留已处理的消息记录,从而使其能够处理重复消息。

清理

完成开发后,您可以删除资源组 rg-cap-dev

❯ azd down
# Deleting all resources and deployed code on Azure (azd down)
# Local application code is not deleted when running 'azd down'.
#   Resource group(s) to be deleted:
#     • rg-cap-dev: https://portal.azure.com/#@/resource/subscriptions/0b252e02-9c7a-4d73-8fbc-633c5d111ebc/resourceGroups/rg-cap-dev/overview
# ? Total resources to delete: 10, are you sure you want to continue? Yes
# Deleting your resources can take some time.
#   (✓) Done: Deleting resource group: rg-cap-dev# SUCCESS: Your application was removed from Azure in 10 minutes 53 seconds.

总结

在本文中,我们探讨了发件箱模式,这是分布式系统领域中的关键组件。我们已经了解了它如何确保分布式系统中可靠的消息传递,从而维护整个系统的完整性和一致性。

我们还研究了 .NET 库 DotNetCore.CAP 如何简化Outbox Pattern的实现,使开发人员能够专注于其应用程序的业务逻辑,而库则负责确保可靠消息传递的复杂性。

最后,我们看到了 .NET Aspire 如何提供精选的 NuGet 包套件,以促进云原生应用程序的集成,并通过自动配置或标准化配置模式提供必要的云原生功能。

总之,Outbox Pattern、DotNetCore.CAP 和 .NET Aspire 的组合提供了一套强大的工具集,用于在 .NET 中构建可靠的云原生应用程序。通过理解和利用这些工具,开发人员可以构建强大、可扩展且可应对现代分布式世界挑战的应用程序。

参考

https://learn.microsoft.com/en-us/azure/developer/azure-developer-cli/
https://cap.dotnetcore.xyz/
https://learn.microsoft.com/en-us/dotnet/aspire/deployment/azure/aca-deployment-azd-in-deep

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/793622.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Css 斜线生成案例_Css 斜线/对角线整理

一、Css 斜线,块斜线,对角线 块的宽度高度任意支持<!DOCTYPE html> <html lang="en"><head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><tit…

mac升级node到指定版本

node版本升级到稳定版18.16.1 (1)node -v(2)npm cache clean -f 在使用npm cache clear --force清除缓存的时候,报npm WARN using --force Recommended protections disabled的错误,有可能是镜相源过期的问题换为npm cache verify(3) sudo n stable // 把当前系统的 Node 更…

HashMap深入讲解

HashMap是Java中最常用的集合类框架,也是Java语言中非常典型的数据结构, 而HashSet和HashMap者在Java里有着相同的实现,前者仅仅是对后者做了一层包装,也就是说HashSet里面有一个HashMap(适配器模式)。因此了解HashMap源码也就了解HashSet了 介绍Key的存储方式是基于哈希表…

如何实现mysql高可用

1.机器资源耗尽 2.单点故障 3.认为操作 4.网络单点故障解决方案? 1.搭建mysql主从集群(双主,一主多从,多主多从) 2. 可以用MyCat, ShardingJdbc实现A节点同步到B节点流程?1. 从库通过IO线程, 连接到主库,并且向主库要对应的bin log文件 2. 主库通过dump线程获取binlog文…

基于基尼指数构建分类决策树[算法+示例]

0 前言本文主要讲述使用基尼指数构建二叉决策树的算法,并给出例题一步步解析,帮助读者理解。 本文所使用的数据集:贷款.CSV。 读者需要具备的知识:基尼指数计算。1 基于基尼指数的分类树构建算法选择最优特征进行分裂: 对于决策树的每个节点,遍历数据集中的所有特征。对于…

Node.js版本管理工具之NVM

目录一、NVM介绍二、NVM的下载安装1、NVM下载2、卸载旧版Node.js3、安装三、NVM配置及使用1、设置nvm镜像源2、安装Node.js3、卸载Node.js4、使用或切换Node.js版本5、设置全局安装路径和缓存路径四、常用命令一、NVM介绍 在工作中,不同的项目可能需要不同NodeJS版本,所以维护…

Javaweb-DQL-分组查询

select sex,avg(math) from stu group by sex;-- 1 select sex,avg(math) as 数学平均分,count() as 人数 from stu group by sex;-- 2 select sex,avg(math) as 数学平均分,count() as 人数 from stu where math>=70 group by sex;-- 3 select sex,avg(math) as 数学平均分…

AP3215 8-150V 外围简单 宽输入 电压降压BUCK 恒压恒流驱动器 POE、电动车、扭扭车、电瓶车、车充方案

产品描述 AP3215是 一系列外围电路简洁的宽输入电压降压BUCK 恒压恒流驱动器 ,适用于8- 150V 输入电压范围 的DC-DC 降压应用。 AP3215输出电压通过 FB 管脚设置 ,输出电流通过 CS 电阻设置 ,外围简洁 , 具备高效率 ,低功耗 ,低纹波 , 优异 的线性调整率和负载调整率等优…

HTML 转 PDF API 接口

HTML 转 PDF API 接口 网络工具 / 文件处理 支持网页转 PDF 高效生成 PDF / 提供永久链接。1. 产品功能超高性能转换效率; 支持将传递的 HTML 转换为 PDF,支持转换 HTML 中的 CSS 格式; 支持传递网站 URL,直接转换页面成对应的 PDF 文件; 转换后的 PDF 提供永久存储文件地…

[C++ Daily] 递归锁解决标准锁的典型应用

递归锁解决标准锁的典型应用 先看源码:结果(在A种尝试锁住mutex_时失败,进程等待,死锁无法退出:将std::mutex 用 std::recursive_mutex替换:结果:解析: std::recursive_mutex允许同一个线程对同一个锁对象进行多次上锁,获得多层所有权.

使用 nuxi prepare 命令准备 Nuxt 项目

title: 使用 nuxi prepare 命令准备 Nuxt 项目 date: 2024/9/7 updated: 2024/9/7 author: cmdragon excerpt: 摘要:本文介绍nuxi prepare命令在Nuxt.js项目中的使用,该命令用于创建.nuxt目录并生成类型信息,以便于构建和部署。文章涵盖了命令的基本用法、指定根目录、设置…

sqlserver下利用sqlps.exe白名单绕杀软

sqlserver下利用sqlps.exe白名单绕杀软 前言: 在一次攻防里通过sqlserver盲注拿到一个执行命令权限,但是由于是盲注回显很有问题以及有杀软,所以利用起来非常难受而且拿不到webshell或者上线c2,所以才找到这个方法。 介绍: sqlps.exe是SQL Server附带的一个具有Microsoft签…