我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Saga模式源码方法教程

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Saga模式源码方法教程

本篇内容主要讲解“Saga模式源码方法教程”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Saga模式源码方法教程”吧!

状态机定义

以一个典型的电商购物流程为例,我们定义3个服务,订单服务(OrderServer),账户服务(AccountService)和库存服务(StorageService),这里我们把订单服务当做聚合服务,也就是TM。

当外部下单时,订单服务首先会创建一个订单,然后调用账户服务扣减金额,最后调用库存服务扣减库存。这个流程入下图:


Saga模式源码方法教程

seata的saga模式是基于状态机来实现了,状态机对状态的控制需要一个JSON文件,这个JSON文件定义如下:

{     "Name": "buyGoodsOnline",     "Comment": "buy a goods on line, add order, deduct account, deduct storage ",     "StartState": "SaveOrder",     "Version": "0.0.1",     "States": {         "SaveOrder": {             "Type": "ServiceTask",             "ServiceName": "orderSave",             "ServiceMethod": "saveOrder",             "CompensateState": "DeleteOrder",             "Next": "ChoiceAccountState",             "Input": [                 "$.[businessKey]",                 "$.[order]"             ],             "Output": {                 "SaveOrderResult": "$.#root"             },             "Status": {                 "#root == true": "SU",                 "#root == false": "FA",                 "$Exception{java.lang.Throwable}": "UN"             }         },         "ChoiceAccountState":{             "Type": "Choice",             "Choices":[                 {                     "Expression":"[SaveOrderResult] == true",                     "Next":"ReduceAccount"                 }             ],             "Default":"Fail"         },         "ReduceAccount": {             "Type": "ServiceTask",             "ServiceName": "accountService",             "ServiceMethod": "decrease",             "CompensateState": "CompensateReduceAccount",             "Next": "ChoiceStorageState",             "Input": [                 "$.[businessKey]",                 "$.[userId]",                 "$.[money]",                 {                     "throwException" : "$.[mockReduceAccountFail]"                 }             ],             "Output": {                 "ReduceAccountResult": "$.#root"             },             "Status": {                 "#root == true": "SU",                 "#root == false": "FA",                 "$Exception{java.lang.Throwable}": "UN"             },             "Catch": [                 {                     "Exceptions": [                         "java.lang.Throwable"                     ],                     "Next": "CompensationTrigger"                 }             ]         },         "ChoiceStorageState":{             "Type": "Choice",             "Choices":[                 {                     "Expression":"[ReduceAccountResult] == true",                     "Next":"ReduceStorage"                 }             ],             "Default":"Fail"         },         "ReduceStorage": {             "Type": "ServiceTask",             "ServiceName": "storageService",             "ServiceMethod": "decrease",             "CompensateState": "CompensateReduceStorage",             "Input": [                 "$.[businessKey]",                 "$.[productId]",                 "$.[count]",                 {                     "throwException" : "$.[mockReduceStorageFail]"                 }             ],             "Output": {                 "ReduceStorageResult": "$.#root"             },             "Status": {                 "#root == true": "SU",                 "#root == false": "FA",                 "$Exception{java.lang.Throwable}": "UN"             },             "Catch": [                 {                     "Exceptions": [                         "java.lang.Throwable"                     ],                     "Next": "CompensationTrigger"                 }             ],             "Next": "Succeed"         },         "DeleteOrder": {             "Type": "ServiceTask",             "ServiceName": "orderSave",             "ServiceMethod": "deleteOrder",             "Input": [                 "$.[businessKey]",                 "$.[order]"             ]         },         "CompensateReduceAccount": {             "Type": "ServiceTask",             "ServiceName": "accountService",             "ServiceMethod": "compensateDecrease",             "Input": [                 "$.[businessKey]",                 "$.[userId]",                 "$.[money]"             ]         },         "CompensateReduceStorage": {             "Type": "ServiceTask",             "ServiceName": "storageService",             "ServiceMethod": "compensateDecrease",             "Input": [                 "$.[businessKey]",                 "$.[productId]",                 "$.[count]"             ]         },         "CompensationTrigger": {             "Type": "CompensationTrigger",             "Next": "Fail"         },         "Succeed": {             "Type":"Succeed"         },         "Fail": {             "Type":"Fail",             "ErrorCode": "PURCHASE_FAILED",             "Message": "purchase failed"         }     } }

状态机是运行在TM中的,也就是我们上面定义的订单服务。订单服务创建订单时需要开启一个全局事务,这时就需要启动状态机,代码如下:

StateMachineEngine stateMachineEngine = (StateMachineEngine) ApplicationContextUtils.getApplicationContext().getBean("stateMachineEngine");  Map<String, Object> startParams = new HashMap<>(3); String businessKey = String.valueOf(System.currentTimeMillis()); startParams.put("businessKey", businessKey); startParams.put("order", order); startParams.put("mockReduceAccountFail", "true"); startParams.put("userId", order.getUserId()); startParams.put("money", order.getPayAmount()); startParams.put("productId", order.getProductId()); startParams.put("count", order.getCount());  //sync test StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("buyGoodsOnline", null, businessKey, startParams);

可以看到,上面代码定义的buyGoodsOnline,正是JSON文件中name的属性值。

状态机初始化

那上面创建订单代码中的stateMachineEngine这个bean是在哪里定义的呢?订单服务的demo中有一个类StateMachineConfiguration来进行定义,代码如下:

public class StateMachineConfiguration {      @Bean     public ThreadPoolExecutorFactoryBean threadExecutor(){         ThreadPoolExecutorFactoryBean threadExecutor = new ThreadPoolExecutorFactoryBean();         threadExecutor.setThreadNamePrefix("SAGA_ASYNC_EXE_");         threadExecutor.setCorePoolSize(1);         threadExecutor.setMaxPoolSize(20);         return threadExecutor;     }      @Bean     public DbStateMachineConfig dbStateMachineConfig(ThreadPoolExecutorFactoryBean threadExecutor, DataSource hikariDataSource) throws IOException {         DbStateMachineConfig dbStateMachineConfig = new DbStateMachineConfig();         dbStateMachineConfig.setDataSource(hikariDataSource);         dbStateMachineConfig.setThreadPoolExecutor((ThreadPoolExecutor) threadExecutor.getObject());              dbStateMachineConfig.setResources(new PathMatchingResourcePatternResolver().getResources("classpath*:statelang     ProcessContextBuilder contextBuilder = ProcessContextBuilder.create().withProcessType(ProcessType.STATE_LANG)         .withOperationName(DomainConstants.OPERATION_NAME_START).withAsyncCallback(callback).withInstruction(             new StateInstruction(stateMachineName, tenantId)).withStateMachineInstance(instance)         .withStateMachineConfig(getStateMachineConfig()).withStateMachineEngine(this);      Map<String, Object> contextVariables;     if (startParams != null) {         contextVariables = new ConcurrentHashMap<>(startParams.size());         nullSafeCopy(startParams, contextVariables);     } else {         contextVariables = new ConcurrentHashMap<>();     }     instance.setContext(contextVariables);//把启动参数赋值给状态机实例的context     //给ProcessContextImpl的variables加参数     contextBuilder.withStateMachineContextVariables(contextVariables);      contextBuilder.withIsAsyncExecution(async);      //上面定义的建造者创建一个ProcessContextImpl     ProcessContext processContext = contextBuilder.build();      //这个条件是true     if (instance.getStateMachine().isPersist() && stateMachineConfig.getStateLogStore() != null) {       //记录状态机开始状态         stateMachineConfig.getStateLogStore().recordStateMachineStarted(instance, processContext);     }     if (StringUtils.isEmpty(instance.getId())) {         instance.setId(             stateMachineConfig.getSeqGenerator().generate(DomainConstants.SEQ_ENTITY_STATE_MACHINE_INST));     }      if (async) {         stateMachineConfig.getAsyncProcessCtrlEventPublisher().publish(processContext);     } else {       //发送消息到EventBus,这里的消费者是ProcessCtrlEventConsumer,在DefaultStateMachineConfig初始化时设置         stateMachineConfig.getProcessCtrlEventPublisher().publish(processContext);     }      return instance; }

上面的代码中我们可以看出,启动状态记得时候主要做了2件事情,一个是记录状态机开始的状态,一个是发送消息到EventBus,下面我们详细看一下这2个过程。

开启全局事务

上面的代码分析中,有一个记录状态机开始状态的代码,如下:

stateMachineConfig.getStateLogStore().recordStateMachineStarted(instance, processContext);

这里调用了类DbAndReportTcStateLogStore的recordStateMachineStarted方法,我们来看一下,代码如下:

public void recordStateMachineStarted(StateMachineInstance machineInstance, ProcessContext context) {      if (machineInstance != null) {         //if parentId is not null, machineInstance is a SubStateMachine, do not start a new global transaction,         //use parent transaction instead.         String parentId = machineInstance.getParentId();         if (StringUtils.hasLength(parentId)) {             if (StringUtils.isEmpty(machineInstance.getId())) {                 machineInstance.setId(parentId);             }         } else {         //走这个分支,因为没有配置子状态机                      beginTransaction(machineInstance, context);         }           if (StringUtils.isEmpty(machineInstance.getId()) && seqGenerator != null) {             machineInstance.setId(seqGenerator.generate(DomainConstants.SEQ_ENTITY_STATE_MACHINE_INST));         }          // save to db     //dbType = "MySQL"         machineInstance.setSerializedStartParams(paramsSerializer.serialize(machineInstance.getStartParams()));         executeUpdate(stateLogStoreSqls.getRecordStateMachineStartedSql(dbType),                 STATE_MACHINE_INSTANCE_TO_STATEMENT_FOR_INSERT, machineInstance);     } }

上面executeUpdate方法在子类AbstractStore,debug一下executeUpdate这个方法可以看到,这里执行的sql如下:

INSERT INTO seata_state_machine_inst (id, machine_id, tenant_id, parent_id, gmt_started, business_key, start_params, is_running, status, gmt_updated) VALUES ('192.168.59.146:8091:65853497147990016', '06a098cab53241ca7ed09433342e9f07', '000001', null, '2020-10-31 17:18:24.773',  '1604135904773', '{"@type":"java.util.HashMap","money":50.,"productId":1L,"_business_key_":"1604135904773","businessKey":"1604135904773", "count":1,"mockReduceAccountFail":"true","userId":1L,"order":{"@type":"io.seata.sample.entity.Order","count":1,"payAmount":50, "productId":1,"userId":1}}', 1, 'RU', '2020-10-31 17:18:24.773')

可以看到,这个全局事务记录在了表seata_state_machine_inst,记录的是我们启动状态机的参数,status记录的状态是"RU"也就是RUNNING。

分支事务处理

上一节我们提到,启动状态机后,向EventBus发了一条消息,这个消息的消费者是ProcessCtrlEventConsumer,我们看一下这个类的代码:

public class ProcessCtrlEventConsumer implements EventConsumer<ProcessContext> {      private ProcessController processController;      @Override     public void process(ProcessContext event) throws FrameworkException {         //这里的processController是ProcessControllerImpl         processController.process(event);     }      @Override     public boolean accept(Class<ProcessContext> clazz) {         return ProcessContext.class.isAssignableFrom(clazz);     }      public void setProcessController(ProcessController processController) {         this.processController = processController;     } }

ProcessControllerImpl类的process方法有2个处理逻辑,process和route,代码如下:

public void process(ProcessContext context) throws FrameworkException {      try {         //这里的businessProcessor是CustomizeBusinessProcessor         businessProcessor.process(context);          businessProcessor.route(context);      } catch (FrameworkException fex) {         throw fex;     } catch (Exception ex) {         LOGGER.error("Unknown exception occurred, context = {}", context, ex);         throw new FrameworkException(ex, "Unknown exception occurred", FrameworkErrorCode.UnknownAppError);     } }

这里的处理逻辑有些复杂,先上一张UML类图,跟着这张图,可以捋清楚代码的调用逻辑:


Saga模式源码方法教程

我们先来看一下CustomizeBusinessProcessor中的process方法:

public void process(ProcessContext context) throws FrameworkException {           ProcessType processType = matchProcessType(context);     if (processType == null) {         if (LOGGER.isWarnEnabled()) {             LOGGER.warn("Process type not found, context= {}", context);         }         throw new FrameworkException(FrameworkErrorCode.ProcessTypeNotFound);     }      ProcessHandler processor = processHandlers.get(processType.getCode());     if (processor == null) {         LOGGER.error("Cannot find process handler by type {}, context= {}", processType.getCode(), context);         throw new FrameworkException(FrameworkErrorCode.ProcessHandlerNotFound);     }     //这里的是StateMachineProcessHandler     processor.process(context); }

这里的代码不好理解,我们分四步来研究。

第一步,我们看一下StateMachineProcessHandler类中process方法,这个方法代理了ServiceTaskStateHandler的process方法,代码如下:

public void process(ProcessContext context) throws FrameworkException {          StateInstruction instruction = context.getInstruction(StateInstruction.class);   //这里的state实现类是ServiceTaskStateImpl     State state = instruction.getState(context);     String stateType = state.getType();   //这里stateHandler实现类是ServiceTaskStateHandler     StateHandler stateHandler = stateHandlers.get(stateType);      List<StateHandlerInterceptor> interceptors = null;     if (stateHandler instanceof InterceptableStateHandler) {       //list上有1个元素ServiceTaskHandlerInterceptor         interceptors = ((InterceptableStateHandler)stateHandler).getInterceptors();     }      List<StateHandlerInterceptor> executedInterceptors = null;     Exception exception = null;     try {         if (interceptors != null && interceptors.size() > 0) {             executedInterceptors = new ArrayList<>(interceptors.size());             for (StateHandlerInterceptor interceptor : interceptors) {                 executedInterceptors.add(interceptor);                 interceptor.preProcess(context);             }         }          stateHandler.process(context);      } catch (Exception e) {         exception = e;         throw e;     } finally {          if (executedInterceptors != null && executedInterceptors.size() > 0) {             for (int i = executedInterceptors.size() - 1; i >= 0; i--) {                 StateHandlerInterceptor interceptor = executedInterceptors.get(i);                 interceptor.postProcess(context, exception);             }         }     } }

从这个方法我们看到,代理对stateHandler.process加入了前置和后置增强,增强类是ServiceTaskHandlerInterceptor,前置后置增强分别调用了interceptor的preProcess和postProcess。

第二步,我们来看一下增强逻辑。ServiceTaskHandlerInterceptor的preProcess和postProcess方法,代码如下:

public class ServiceTaskHandlerInterceptor implements StateHandlerInterceptor {     //省略部分代码     @Override     public void preProcess(ProcessContext context) throws EngineExecutionException {          StateInstruction instruction = context.getInstruction(StateInstruction.class);          StateMachineInstance stateMachineInstance = (StateMachineInstance)context.getVariable(             DomainConstants.VAR_NAME_STATEMACHINE_INST);         StateMachineConfig stateMachineConfig = (StateMachineConfig)context.getVariable(             DomainConstants.VAR_NAME_STATEMACHINE_CONFIG);          //如果超时,修改状态机状态为FA         if (EngineUtils.isTimeout(stateMachineInstance.getGmtUpdated(), stateMachineConfig.getTransOperationTimeout())) {             String message = "Saga Transaction [stateMachineInstanceId:" + stateMachineInstance.getId()                     + "] has timed out, stop execution now.";             EngineUtils.failStateMachine(context, exception);             throw exception;         }          StateInstanceImpl stateInstance = new StateInstanceImpl();          Map<String, Object> contextVariables = (Map<String, Object>)context.getVariable(             DomainConstants.VAR_NAME_STATEMACHINE_CONTEXT);         ServiceTaskStateImpl state = (ServiceTaskStateImpl)instruction.getState(context);         List<Object> serviceInputParams = null;          Object isForCompensation = state.isForCompensation();         if (isForCompensation != null && (Boolean)isForCompensation) {             CompensationHolder compensationHolder = CompensationHolder.getCurrent(context, true);             StateInstance stateToBeCompensated = compensationHolder.getStatesNeedCompensation().get(state.getName());             if (stateToBeCompensated != null) {                  stateToBeCompensated.setCompensationState(stateInstance);                 stateInstance.setStateIdCompensatedFor(stateToBeCompensated.getId());             } else {                 LOGGER.error("Compensation State[{}] has no state to compensate, maybe this is a bug.",                     state.getName());             }       //加入补偿集合             CompensationHolder.getCurrent(context, true).addForCompensationState(stateInstance.getName(),                 stateInstance);         }         //省略部分代码         stateInstance.setInputParams(serviceInputParams);          if (stateMachineInstance.getStateMachine().isPersist() && state.isPersist()             && stateMachineConfig.getStateLogStore() != null) {              try {           //记录一个分支事务的状态RU到数据库                          stateMachineConfig.getStateLogStore().recordStateStarted(stateInstance, context);             }         }         //省略部分代码         stateMachineInstance.putStateInstance(stateInstance.getId(), stateInstance);//放入StateMachineInstanceImpl的stateMap用于重试或交易补偿         ((HierarchicalProcessContext)context).setVariableLocally(DomainConstants.VAR_NAME_STATE_INST, stateInstance);//记录状态后面传给TaskStateRouter判断全局事务结束     }      @Override     public void postProcess(ProcessContext context, Exception exp) throws EngineExecutionException {          StateInstruction instruction = context.getInstruction(StateInstruction.class);         ServiceTaskStateImpl state = (ServiceTaskStateImpl)instruction.getState(context);          StateMachineInstance stateMachineInstance = (StateMachineInstance)context.getVariable(             DomainConstants.VAR_NAME_STATEMACHINE_INST);         StateInstance stateInstance = (StateInstance)context.getVariable(DomainConstants.VAR_NAME_STATE_INST);         if (stateInstance == null || !stateMachineInstance.isRunning()) {             LOGGER.warn("StateMachineInstance[id:" + stateMachineInstance.getId() + "] is end. stop running");             return;         }          StateMachineConfig stateMachineConfig = (StateMachineConfig)context.getVariable(             DomainConstants.VAR_NAME_STATEMACHINE_CONFIG);          if (exp == null) {             exp = (Exception)context.getVariable(DomainConstants.VAR_NAME_CURRENT_EXCEPTION);         }         stateInstance.setException(exp);          //设置事务状态         decideExecutionStatus(context, stateInstance, state, exp);         //省略部分代码          Map<String, Object> contextVariables = (Map<String, Object>)context.getVariable(             DomainConstants.VAR_NAME_STATEMACHINE_CONTEXT);         //省略部分代码          context.removeVariable(DomainConstants.VAR_NAME_OUTPUT_PARAMS);         context.removeVariable(DomainConstants.VAR_NAME_INPUT_PARAMS);          stateInstance.setGmtEnd(new Date());          if (stateMachineInstance.getStateMachine().isPersist() && state.isPersist()             && stateMachineConfig.getStateLogStore() != null) {       //更新分支事务的状态为成功                    stateMachineConfig.getStateLogStore().recordStateFinished(stateInstance, context);         }         //省略部分代码     } }

从这个代码我们能看到,分支事务执行前,封装了一个StateInstanceImpl赋值给了ProcessContext,分支事务执行后,对这个StateInstanceImpl进行了修改,这个StateInstanceImpl有3个作用:

传入StateMachineInstanceImpl的stateMap用于重试或交易补偿

记录了分支事务的执行情况,同时支持持久化到seata_state_inst表

传入TaskStateRouter用作判断全局事务结束

第三步,我们看一下被代理的方法stateHandler.process(context),正常执行逻辑中stateHandler的实现类是ServiceTaskStateHandler,代码如下:

public void process(ProcessContext context) throws EngineExecutionException {      StateInstruction instruction = context.getInstruction(StateInstruction.class);     ServiceTaskStateImpl state = (ServiceTaskStateImpl) instruction.getState(context);     StateInstance stateInstance = (StateInstance) context.getVariable(DomainConstants.VAR_NAME_STATE_INST);      Object result;     try {                  List<Object> input = (List<Object>) context.getVariable(DomainConstants.VAR_NAME_INPUT_PARAMS);          //Set the current task execution status to RU (Running)         stateInstance.setStatus(ExecutionStatus.RU);//设置状态          if (state instanceof CompensateSubStateMachineState) {             //省略子状态机的研究         } else {             StateMachineConfig stateMachineConfig = (StateMachineConfig) context.getVariable(                     DomainConstants.VAR_NAME_STATEMACHINE_CONFIG);             //这里的state.getServiceType是springBean             ServiceInvoker serviceInvoker = stateMachineConfig.getServiceInvokerManager().getServiceInvoker(                     state.getServiceType());             if (serviceInvoker == null) {                 throw new EngineExecutionException("No such ServiceInvoker[" + state.getServiceType() + "]",                         FrameworkErrorCode.ObjectNotExists);             }             if (serviceInvoker instanceof ApplicationContextAware) {                 ((ApplicationContextAware) serviceInvoker).setApplicationContext(                         stateMachineConfig.getApplicationContext());             }             //这里触发了我们在JSON中定义ServiceTask中方法,比如orderSave中的saveOrder方法             result = serviceInvoker.invoke(state, input.toArray());         }          if (LOGGER.isDebugEnabled()) {             LOGGER.debug("<<<<<<<<<<<<<<<<<<<<<< State[{}], ServiceName[{}], Method[{}] Execute finish. result: {}",                     state.getName(), serviceName, methodName, result);         }     //省略部分代码      }    //省略异常处理代码 }

可以看到,process这个方法是一个核心的业务处理,它用发射触发了JSON中定义ServiceTask的方法,并且根据状态触发了Next对象,即流程中的下一个ServiceTask。

第四步,我们再看一下CustomizeBusinessProcessor的route方法,代码如下:

public void route(ProcessContext context) throws FrameworkException {      //code = "STATE_LANG"     //message = "SEATA State Language"     //name = "STATE_LANG"     //ordinal = 0     ProcessType processType = matchProcessType(context);      RouterHandler router = routerHandlers.get(processType.getCode());     //DefaultRouterHandler的route方法     router.route(context); }

我们看一下DefaultRouterHandler的route方法,代码如下:

public void route(ProcessContext context) throws FrameworkException {      try {         ProcessType processType = matchProcessType(context);         //这里的processRouter是StateMachineProcessRouter         ProcessRouter processRouter = processRouters.get(processType.getCode());         Instruction instruction = processRouter.route(context);         if (instruction == null) {             LOGGER.info("route instruction is null, process end");         } else {             context.setInstruction(instruction);              eventPublisher.publish(context);         }     } catch (FrameworkException e) {         throw e;     } catch (Exception ex) {         throw new FrameworkException(ex, ex.getMessage(), FrameworkErrorCode.UnknownAppError);     } }

看一下StateMachineProcessRouter的route方法,这里也是用了代理模式,代码如下:

public Instruction route(ProcessContext context) throws FrameworkException {      StateInstruction stateInstruction = context.getInstruction(StateInstruction.class);      State state;     if (stateInstruction.getTemporaryState() != null) {         state = stateInstruction.getTemporaryState();         stateInstruction.setTemporaryState(null);     } else {       //走这个分支         StateMachineConfig stateMachineConfig = (StateMachineConfig)context.getVariable(             DomainConstants.VAR_NAME_STATEMACHINE_CONFIG);         StateMachine stateMachine = stateMachineConfig.getStateMachineRepository().getStateMachine(             stateInstruction.getStateMachineName(), stateInstruction.getTenantId());         state = stateMachine.getStates().get(stateInstruction.getStateName());     }      String stateType = state.getType();      StateRouter router = stateRouters.get(stateType);      Instruction instruction = null;      List<StateRouterInterceptor> interceptors = null;     if (router instanceof InterceptableStateRouter) {       //这里只有EndStateRouter         interceptors = ((InterceptableStateRouter)router).getInterceptors();//EndStateRouterInterceptor     }      List<StateRouterInterceptor> executedInterceptors = null;     Exception exception = null;     try {         //前置增量实现方法是空,这里省略代码         instruction = router.route(context, state);      } catch (Exception e) {         exception = e;         throw e;     } finally {          if (executedInterceptors != null && executedInterceptors.size() > 0) {             for (int i = executedInterceptors.size() - 1; i >= 0; i--) {                 StateRouterInterceptor interceptor = executedInterceptors.get(i);                 interceptor.postRoute(context, state, instruction, exception);//结束状态机             }         }          //if 'Succeed' or 'Fail' State did not configured, we must end the state machine         if (instruction == null && !stateInstruction.isEnd()) {             EngineUtils.endStateMachine(context);         }     }      return instruction; }

这里的代理只实现了一个后置增强,做的事情就是结束状态机。

下面我们来看一下StateRouter,UML类图如下:

Saga模式源码方法教程

从UML类图我们看到,除了EndStateRouter,只有一个TaskStateRouter了。而EndStateRouter并没有做什么事情,因为关闭状态机的逻辑已经由代理做了。这里我们看一下TaskStateRouter,代码如下:

public Instruction route(ProcessContext context, State state) throws EngineExecutionException {      StateInstruction stateInstruction = context.getInstruction(StateInstruction.class);     if (stateInstruction.isEnd()) {       //如果已经结束,直接返回         //省略代码     }      //The current CompensationTriggerState can mark the compensation process is started and perform compensation     // route processing.     State compensationTriggerState = (State)context.getVariable(         DomainConstants.VAR_NAME_CURRENT_COMPEN_TRIGGER_STATE);     if (compensationTriggerState != null) {       //加入补偿集合进行补偿并返回         return compensateRoute(context, compensationTriggerState);     }      //There is an exception route, indicating that an exception is thrown, and the exception route is prioritized.     String next = (String)context.getVariable(DomainConstants.VAR_NAME_CURRENT_EXCEPTION_ROUTE);      if (StringUtils.hasLength(next)) {         context.removeVariable(DomainConstants.VAR_NAME_CURRENT_EXCEPTION_ROUTE);     } else {         next = state.getNext();     }      //If next is empty, the state selected by the Choice state was taken.     if (!StringUtils.hasLength(next) && context.hasVariable(DomainConstants.VAR_NAME_CURRENT_CHOICE)) {         next = (String)context.getVariable(DomainConstants.VAR_NAME_CURRENT_CHOICE);         context.removeVariable(DomainConstants.VAR_NAME_CURRENT_CHOICE);     }     //从当前context中取不出下一个节点了,直接返回     if (!StringUtils.hasLength(next)) {         return null;     }      StateMachine stateMachine = state.getStateMachine();      State nextState = stateMachine.getState(next);     if (nextState == null) {         throw new EngineExecutionException("Next state[" + next + "] is not exits",             FrameworkErrorCode.ObjectNotExists);     }     //获取到下一个要流转的状态并且赋值给stateInstruction     stateInstruction.setStateName(next);      return stateInstruction; }

可以看到,route的作用是帮状态机确定下一个流程节点,然后放入到当前的context中的stateInstruction。

到这里,我们就分析完成了状态机的原理,ProcessControllerImpl类中。

需要注意的是,这里获取到下一个节点后,并没有直接处理,而是使用观察者模式,先发送到EventBus,等待观察者来处理,循环往复,直到EndStateRouter结束状态机。

这里观察者模式的Event是ProcessContext,里面包含了Instruction,而Instruction里面包含了State,这个State里面就决定了下一个处理的节点直到结束。UML类图如下:

Saga模式源码方法教程

总结

seata中间件中的saga模式使用比较广泛,但是代码还是比较复杂的。我从下面几个方面进行了梳理:

  • 我们定义的json文件加载到了类StateMachineImpl中。

  • 启动状态机,我们也就启动了全局事务,这个普通模式启动全局事务是一样的,都会向TC发送消息。

  • 处理状态机状态和控制状态流转的入口类在ProcessControllerImpl,从process方法可以跟代码。

  • ProcessControllerImpl调用CustomizeBusinessProcessor的process处理当前状态,然后调用route方法获取到下一个节点并发送到EventBus。

saga模式额外引入了3张表,我们也可以根据跟全局事务和分支事务相关的2张表来跟踪代码,我之前给出的demo,如果事务成功,这2张表的写sql按照状态机执行顺序给出一个成功sql,代码如下:

INSERT INTO seata_state_machine_inst (id, machine_id, tenant_id, parent_id, gmt_started, business_key, start_params, is_running, status, gmt_updated) VALUES ('192.168.59.146:8091:65853497147990016', '06a098cab53241ca7ed09433342e9f07', '000001', null, '2020-10-31 17:18:24.773', '1604135904773', '{"@type":"java.util.HashMap","money":50.,"productId":1L,"_business_key_":"1604135904773","businessKey":"1604135904773",\"count\":1,\"mockreduceaccountfail\":\"true\","userId":1L,"order":{"@type":"io.seata.sample.entity.Order","count":1,"payAmount":50,"productId":1,"userId":1}}', 1, 'RU', '2020-10-31 17:18:24.773')  INSERT INTO seata_state_inst (id, machine_inst_id, name, type, gmt_started, service_name, service_method, service_type, is_for_update, input_params, status, business_key, state_id_compensated_for, state_id_retried_for) VALUES ('4fe5f602452c84ba5e88fd2ee9c13b35', '192.168.59.146:8091:65853497147990016', 'SaveOrder', 'ServiceTask', '2020-10-31 17:18:40.84', 'orderSave', 'saveOrder', null, 1, '["1604135904773",{"@type":"io.seata.sample.entity.Order","count":1,"payAmount":50,"productId":1,"userId":1}]', 'RU', null, null, null)  UPDATE seata_state_inst SET gmt_end = '2020-10-31 17:18:49.919', excep = null, status = 'SU', output_params = 'true' WHERE id = '4fe5f602452c84ba5e88fd2ee9c13b35' AND machine_inst_id = '192.168.59.146:8091:65853497147990016'  INSERT INTO seata_state_inst (id, machine_inst_id, name, type, gmt_started, service_name, service_method, service_type, is_for_update, input_params, status, business_key, state_id_compensated_for, state_id_retried_for) VALUES ('8371235cb2c66c8626e148f66123d3b4', '192.168.59.146:8091:65853497147990016', 'ReduceAccount', 'ServiceTask', '2020-10-31 17:19:00.441', 'accountService', 'decrease', null, 1, '["1604135904773",1L,50.,{"@type":"java.util.LinkedHashMap","throwException":"true"}]', 'RU', null, null, null)  UPDATE seata_state_inst SET gmt_end = '2020-10-31 17:19:09.593', excep = null, status = 'SU', output_params = 'true' WHERE id = '8371235cb2c66c8626e148f66123d3b4' AND machine_inst_id = '192.168.59.146:8091:65853497147990016'  INSERT INTO seata_state_inst (id, machine_inst_id, name, type, gmt_started, service_name, service_method, service_type, is_for_update, input_params, status, business_key, state_id_compensated_for, state_id_retried_for) VALUES ('e70a49f1eac72f929085f4e82c2b4de2', '192.168.59.146:8091:65853497147990016', 'ReduceStorage', 'ServiceTask', '2020-10-31 17:19:18.494', 'storageService', 'decrease', null, 1, '["1604135904773",1L,1,{"@type":"java.util.LinkedHashMap"}]', 'RU', null, null, null)  UPDATE seata_state_inst SET gmt_end = '2020-10-31 17:19:26.613', excep = null, status = 'SU', output_params = 'true' WHERE id = 'e70a49f1eac72f929085f4e82c2b4de2' AND machine_inst_id = '192.168.59.146:8091:65853497147990016'  UPDATE seata_state_machine_inst SET gmt_end = '2020-10-31 17:19:33.581', excep = null, end_params = '{"@type":"java.util.HashMap","productId":1L,"count":1,"ReduceAccountResult":true,"mockReduceAccountFail":"true","userId":1L,"money":50.,"SaveOrderResult":true,"_business_key_":"1604135904773","businessKey":"1604135904773","ReduceStorageResult":true,"order":{"@type":"io.seata.sample.entity.Order","count":1,"id":60,"payAmount":50,"productId":1,"userId":1}}',status = 'SU', compensation_status = null, is_running = 0, gmt_updated = '2020-10-31 17:19:33.582' WHERE id = '192.168.59.146:8091:65853497147990016' and gmt_updated = '2020-10-31 17:18:24.773'

到此,相信大家对“Saga模式源码方法教程”有了更深的了解,不妨来实际操作一番吧!这里是编程网网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Saga模式源码方法教程

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Ubuntu 16.04 LTS中源码安装Python 3.6.0的方法教程

前提 官网上提供了 Mac 和 Windows 上的安装包和 Linux 上安装需要的源码。 下载地址如下: https://www.python.org/downloads/release/python-360/ 安装wget https
2022-06-04

Win8.1免打扰模式开启方法图文教程

在系统世界里难免会出现碰瓷,当你专心致志工作或者安装完软件,总会出现一个讨厌的家伙,那家伙叫做“窗口”,总是提醒着你是否要重启电脑。正在兴头上的工作怎可被这个家伙碰瓷了呢?我们可以利用Win8.1的免打扰模式来祛除这
2022-06-04

Win11电源高性能模式怎么开?Win11电源高性能模式设置方法

python 相信很多小伙伴都知道如果电脑以最高性能状态运行,就会大大提高计算机处理文件的速度,不过很多朋友在更新升级win11系统后,找不到这个选项在哪里了,其实还是很好找到的,下面就和小编一起来看看电源高性能的位置吧。Win11电源高性
2023-05-19

Android源码学习之工厂方法模式应用及优势介绍

工厂方法模式定义: Define an interface for creating an object, but let subclasses decide which class to instantiate. Factory Meth
2022-06-06

简介Python设计模式中的代理模式与模板方法模式编程

代理模式 Proxy模式是一种常用的设计模式,它主要用来通过一个对象(比如B)给一个对象(比如A) 提供'代理'的方式方式访问。比如一个对象不方便直接引用,代理就在这个对象和访问者之间做了中介 python的例子 你先设想:一个对象提供rg
2022-06-04

Win7设置电源模式为高性能计划方法

一般来说,咱们的wiYbiaRcknkn7系统电脑中电源计划都有平衡、节能、高性能,可能很多朋友都不知道,这三个性能并非只是摆设编程客栈,而是有一定的存在必要的,因为对于用户来说,完全可以根据自己的用途来选择对应的电源计划,这样才能让咱们的
2023-06-13

云服务器怎么调用摄像头模式设置方法教程

首先,需要下载摄像头模式设置的APK文件,可以在官网上找到。然后,打开摄像头模式设置的程序。在程序的开发者界面中,你可以通过选择“开发者模式”、“网络请求”、“本地存储”等来调整摄像头模式。在“开发者模式”下,你需要选择“启用云备份”来备份自己的云存储,然后选择“启用摄像头模式”来启用云备份功能。在“网络请求”中,你需要根据需求选择合适的网
2023-10-26

单独编译Android 源代码中的模块实现方法

第一次下载好Android源代码工程后,我们通常是在Android源代码工程目录下执行make命令,经过漫长的等待之后,就可以得到Android系统镜像system.img了。以后如果我们修改了Android源代码中的某个模块或
2022-06-06

【Java多数据源实现教程】实现动态数据源、多数据源切换方式

前言 本文为 【Java多数据源实现教程】 相关知识,由于自己最近在做导师的项目的时候需要使用这种技术,于是自学了相关技术原理与实现,并将其整理如下,具体包含:多数据源的典型使用场景(包含业务复杂场景、读写分离场景),多数据源实现原理及实
2023-08-16

小程序优惠券源码开发的方法

这篇“小程序优惠券源码开发的方法”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“小程序优惠券源码开发的方法”文章吧。    如
2023-06-26

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录