我的编程空间,编程开发者的网络收藏夹
学习永远不晚

如何用C#实现SAGA分布式事务

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

如何用C#实现SAGA分布式事务

背景

银行跨行转账业务是一个典型分布式事务场景,假设 A 需要跨行转账给 B,那么就涉及两个银行的数据,无法通过一个数据库的本地事务保证转账的 ACID ,只能够通过分布式事务来解决。

市面上使用比较多的分布式事务框架,支持 SAGA 的,大部分都是 JAVA 为主的,没有提供 C# 的对接方式,或者是对接难度大,一定程度上让人望而却步。

下面就基于这个框架来实践一下银行转账的例子。

前置工作

dotnet add package Dtmcli --version 0.3.0

成功的 SAGA

先来看一下一个成功完成的 SAGA 时序图。

上图的微服务1,对应我们示例的 OutApi,也就是转钱出去的那个服务。

微服务2,对应我们示例的 InApi,也就是转钱进来的那个服务。

下面是两个服务的正向操作和补偿操作的处理。

OutApi

app.MapPost("/api/TransOut", (string branch_id, string gid, string op, TransRequest req) => 
{
    // 进行 数据库操作
    Console.WriteLine($"用户【{req.UserId}】转出【{req.Amount}】正向操作,gid={gid}, branch_id={branch_id}, op={op}");

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

app.MapPost("/api/TransOutCompensate", (string branch_id, string gid, string op, TransRequest req) =>
{
    // 进行 数据库操作
    Console.WriteLine($"用户【{req.UserId}】转出【{req.Amount}】补偿操作,gid={gid}, branch_id={branch_id}, op={op}");

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

InApi

app.MapPost("/api/TransIn", (string branch_id, string gid, string op, TransRequest req) =>
{
    Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】正向操作,gid={gid}, branch_id={branch_id}, op={op}");

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

app.MapPost("/api/TransInCompensate", (string branch_id, string gid, string op, TransRequest req) =>
{
    Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】补偿操作,gid={gid}, branch_id={branch_id}, op={op}");

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

注:示例为了简单,没有进行实际的数据库操作。

到此各个子事务的处理已经 OK 了,然后是开启 SAGA 事务,进行分支调用

var userOutReq = new TransRequest() { UserId = "1", Amount = -30 };
var userInReq = new TransRequest() { UserId = "2", Amount = 30 };

var ct = new CancellationToken();
var gid = await dtmClient.GenGid(ct);
var saga = new Saga(dtmClient, gid)
    .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
    .Add(inApi + "/TransIn", inApi + "/TransInCompensate", userInReq)
    ;

var flag = await saga.Submit(ct);

Console.WriteLine($"case1, {gid} saga 提交结果 = {flag}");

到这里,一个完整的 SAGA 分布式事务就编写完成了。

搭建好 dtm 的环境后,运行上面的例子,会看到下面的输出。

当然,上面的情况太理想了,转出转入都是一次性就成功了。

但是实际上我们会遇到许许多多的问题,最常见的应该就是网络故障了。

下面来看一个异常的 SAGA 示例

异常的 SAGA

做一个假设,用户1的转出是正常的,但是用户2在转入的时候出现了问题。

由于事务已经提交给 dtm 了,按照 SAGA 事务的协议,dtm 会重试未完成的操作。

这个时候用户2 这边会出现什么样的情况呢?

转入其实成功了,但是 dtm 收到错误 (网络故障等)转入没有成功,直接告诉 dtm 失败了 (应用异常等)

无论是那一种,dtm 都会进行重试操作。这个时候会发生什么呢?我们继续往下看。

先看一下事务失败交互的时序图

再通过调整上面成功的例子,来比较直观的看看出现的情况。

在 InApi 加多一个转入失败的处理接口

app.MapPost("/api/TransInError", (string branch_id, string gid, string op, TransRequest req) =>
{
    Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】正向操作--失败,gid={gid}, branch_id={branch_id}, op={op}");

    //return Results.BadRequest();
    return Results.Ok(TransResponse.BuildFailureResponse());
});

失败的返回有两种,一种是状态码大于 400,一种是状态码是 200 并且响应体包含 FAILURE,上面的例子是第二种

调整一下调用方,把转入正向操作替换成上面这个返回错误的接口。

var saga = new Saga(dtmClient, gid)
    .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
    .Add(inApi + "/TransInError", inApi + "/TransInCompensate", userInReq);

运行结果如下:

在这个例子中,只考虑补偿/重试成功的情况下。

用户1 转出的 30 块钱最终是回到了他的帐号上,他没有出现损失。

用户2 就有点苦逼了,转入没有成功,返回了失败,还触发了转入的补偿机制,结果就是把用户2 还没进帐的 30 块钱给多扣了,这个就是上面的情况2,常见的空补偿问题。

这个时候就要在进行转入补偿的时候做一系列的判断,转入有没有成功,转出有没有失败等等,把业务变的十分复杂。

如果出现了上述的情况1,会发生什么呢?

用户2 第一次已经成功转入 30 块钱,返回的也是成功,但是网络出了点问题,导致 dtm 认为失败了,它就会进行重试,相当于用户2 还会收到第二个转入 30 块钱的请求!也就是说这次转帐,用户2 会进账 60 块钱,翻倍了,也就是说这个请求不是幂等。

同样的,要处理这个问题,在进行转入的正向操作中也要进行一系列的判断,同样会把复杂度上升一个级别。

前面有提到 dtm 提供了子事务屏障的功能,保证了幂等、空补偿等常见问题。

再来看看这个子事务屏障的功能有没有帮我们简化上面异常处理。

子事务屏障

子事务屏障,需要根据 trans_type,gid,branch_id 和 op 四个内容进行创建。

这4个内容 dtm 在回调时会放在 querysting 上面。

客户端里面提供了 IBranchBarrierFactory 来供我们使用。

空补偿

针对上面的异常情况(用户2 凭空消失 30 块钱),对转入的补偿进行子事务屏障的改造。

app.MapPost("/api/BarrierTransInCompensate", async (string branch_id, string gid, string op, string trans_type, TransRequest req, IBranchBarrierFactory factory) =>
{
    var barrier = factory.CreateBranchBarrier(trans_type, gid, branch_id, op);

    using var db = Db.GeConn();
    await barrier.Call(db, async (tx) =>
    {
        // 转入失败的情况下,不应该输出下面这个
        Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】补偿操作,gid={gid}, branch_id={branch_id}, op={op}");
        // tx 参数是事务,可和本地事务一起提交回滚
        await Task.CompletedTask;
    });

    Console.WriteLine($"子事务屏障-补偿操作,gid={gid}, branch_id={branch_id}, op={op}");
    return Results.Ok(TransResponse.BuildSucceedResponse());
});

Call 方法就是关键所在了,需要传入一个 DbConnection 和真正的业务操作,这里的业务操作就是在控制台输出补偿操作的信息。

同样的,我们再调整一下调用方,把转入补偿操作替换成上面带子事务屏障的接口。

var saga = new Saga(dtmClient, gid)
    .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
    .Add(inApi + "/TransInError", inApi + "/BarrierTransInCompensate", userInReq)
    ;

再来运行这个例子。

会发现转入的补偿操作并没执行,控制台没有输出补偿信息,而是输出了

Will not exec busiCall, isNullCompensation=True, isDuplicateOrPend=False

这个就表明了,这个请求是个空补偿,是不应该执行业务方法的,既空操作。

再来看一下,转入成功的,但是 dtm 收到了失败的信号,不断重试造成重复请求的情况。

幂等

针对用户2 转入两次 30 块钱的异常情况,对转入的正向操作进行子事务屏障的改造。

app.MapPost("/api/BarrierTransIn", async (string branch_id, string gid, string op, string trans_type, TransRequest req, IBranchBarrierFactory factory) =>
{
    Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】请求来了!!! gid={gid}, branch_id={branch_id}, op={op}");

    var barrier = factory.CreateBranchBarrier(trans_type, gid, branch_id, op);

    using var db = Db.GeConn();
    await barrier.Call(db, async (tx) =>
    {
        var c = Interlocked.Increment(ref _errCount);

        // 模拟一个超时执行
        if (c > 0 && c < 2) await Task.Delay(10000);

        Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】正向操作,gid={gid}, branch_id={branch_id}, op={op}");
        await Task.CompletedTask;
    });

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

这里通过一个超时执行来让 dtm 进行转入正向操作的重试。

同样的,我们再调整一下调用方,把转入的正向操作也替换成上面带子事务屏障的接口。

var saga = new Saga(dtmClient, gid)
    .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)
    .Add(inApi + "/BarrierTransIn", inApi + "/BarrierTransInCompensate", userInReq)
    ;

再来运行这个例子。

可以看到转入的正向操作确实是触发了多次,第一次实际上是成功,只是响应比较慢,导致 dtm 认为是失败了,触发了第二次请求,但是第二次请求并没有执行业务操作,而是输出了

Will not exec busiCall, isNullCompensation=False, isDuplicateOrPend=True

这个就表明了,这个请求是个重复请求,是不应该执行业务方法的,保证了幂等。

到这里,可以看出,子事务屏障确实解决了幂等和空补偿的问题,大大降低了业务判断的复杂度和出错的可能性。

写在最后

在这篇文章里,也通过几个例子,完整给出了编写一个 SAGA 事务的过程,涵盖了正常成功完成,异常情况,以及成功回滚的情况。希望对研究分布式事务的您有所帮助。

本文示例代码: DtmSagaSample

到此这篇关于如何用C#实现SAGA分布式事务的文章就介绍到这了,更多相关C#实现SAGA内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

如何用C#实现SAGA分布式事务

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

如何用C#实现SAGA分布式事务

大家好,本篇文章主要讲的是如何用C#实现SAGA分布式事务,感兴趣的同学赶快来看一看吧,对你有帮助的话记得收藏一下
2022-11-13

怎么用C#实现SAGA分布式事务

这篇文章将为大家详细讲解有关怎么用C#实现SAGA分布式事务,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。背景银行跨行转账业务是一个典型分布式事务场景,假设 A 需要跨行转账给 B,那么就涉
2023-06-28

Spring Boot + RabbitMQ如何实现分布式事务

小编给大家分享一下Spring Boot + RabbitMQ如何实现分布式事务,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!一:分布式事务解决方案1.两阶段提交(2PC)第一阶段:事务协调器要求每个涉及到事务的数据库预提
2023-06-03

如何利用Redis实现分布式事务管理

如何利用Redis实现分布式事务管理引言:随着互联网的快速发展,分布式系统的使用越来越广泛。在分布式系统中,事务管理是一项重要的挑战。传统的事务管理方式在分布式系统中难以实现,并且效率低下。而利用Redis的特性,我们可以轻松地实现分布式事
如何利用Redis实现分布式事务管理
2023-11-07

如何在Redis中实现分布式事务

在Redis中实现分布式事务可以通过使用 Redis 的事务机制 MULTI/EXEC 和 WATCH 命令来实现。以下是实现分布式事务的步骤:使用 MULTI 命令开启一个事务块。在事务块中添加需要执行的 Redis 命令。通过 EX
如何在Redis中实现分布式事务
2024-04-09

springboot整合rocketmq如何实现分布式事务

这篇文章给大家分享的是有关springboot整合rocketmq如何实现分布式事务的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。1 执行流程(1) 发送方向 MQ 服务端发送消息。(2) MQ Server 将
2023-06-15

Redis如何实现分布式事务的可靠性

Redis是一种快速、可靠的内存数据库,广泛应用于分布式系统中。在分布式系统中,事务的处理是一项关键的挑战。本文将介绍Redis是如何实现分布式事务的可靠性,并提供一些具体的代码示例。Redis通过MULTI、EXEC、DISCARD和WA
Redis如何实现分布式事务的可靠性
2023-11-07

Redis如何实现分布式事务的一致性

Redis是一个高性能、分布式内存数据库,被广泛应用在分布式系统中。在分布式系统中,如何实现事务的一致性一直是一个难题,而Redis提供的事务机制可以帮助开发者解决这个问题。本文将介绍Redis如何实现分布式事务的一致性,并展示代码示例。一
Redis如何实现分布式事务的一致性
2023-11-07

C#开发中如何处理分布式事务和分布式缓存

C#开发中如何处理分布式事务和分布式缓存,需要具体代码示例摘要:在分布式系统中,事务处理和缓存管理是至关重要的两个方面。本文将介绍C#开发中如何处理分布式事务和分布式缓存,并给出具体的代码示例。引言随着软件系统的规模与复杂度增加,许多应用都
2023-10-22

Zookeeper怎么实现分布式事务

Zookeeper本身并不直接支持分布式事务,但可以作为分布式事务的基础设施来帮助实现分布式事务。以下是一种可能的实现方式:使用Zookeeper作为协调器:Zookeeper可以分布式协调器,用来协调多个分布式系统的各个节点之间的操作。在
2023-10-27

如何使用Redis和C#开发分布式事务功能

如何使用Redis和C#开发分布式事务功能引言分布式系统的开发中,事务处理是一项非常重要的功能。事务处理能够保证在分布式系统中的一系列操作要么全部成功,要么全部回滚。Redis是一种高性能的键值存储数据库,而C#是一种广泛应用于开发分布式系
2023-10-22

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录