您的位置:首页 > 教育 > 锐评 > 【SpringCloud-Seata源码分析3】

【SpringCloud-Seata源码分析3】

2024/10/5 18:27:47 来源:https://blog.csdn.net/weixin_47068446/article/details/139905792  浏览:    关键词:【SpringCloud-Seata源码分析3】

文章目录

  • 事务的提交
    • 客户端提交流程
    • 服务端提交流程
    • 客户端删除undo_log
  • 事务回滚
    • 客户端事务回滚
    • 服务端回滚事务

事务的提交

前面两篇我们分析了seata的TC初始化和TM,RM初始化,并且事务准备阶段源码及业务Sql执行,下面我们分析事务的提交源码。

客户端提交流程

在这里插入图片描述
这个主要是从 TransactionalTemplate#execute方法中

try {//准备阶段this.beginTransaction(txInfo, tx);Object rs;Object ex;try {//业务sql的执行rs = business.execute();} catch (Throwable var17) {ex = var17;this.completeTransactionAfterThrowing(txInfo, tx, var17);throw var17;}//事务的提交this.commitTransaction(tx);ex = rs;return ex;} finally {this.resumeGlobalLockConfig(previousConfig);this.triggerAfterCompletion();this.cleanUp();}
    private void commitTransaction(GlobalTransaction tx) throws ExecutionException {try {//前置镜像this.triggerBeforeCommit();//提交tx.commit();//后置镜像this.triggerAfterCommit();} catch (TransactionException var3) {throw new ExecutionException(tx, var3, Code.CommitFailure);}}
public void commit() throws TransactionException {//判断当前的角色是否是Launcherif (this.role == GlobalTransactionRole.Participant) {if (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", this.xid);}} else {//判断xid是否为nullthis.assertXIDNotNull();//重试次数,默认为5次可以自定定义重试次数int retry = COMMIT_RETRY_COUNT <= 0 ? 5 : COMMIT_RETRY_COUNT;try {while(retry > 0) {try {--retry;//循环请求后台,并且返回状态this.status = this.transactionManager.commit(this.xid);break;} catch (Throwable var6) {LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", new Object[]{this.getXid(), retry, var6.getMessage()});if (retry == 0) {throw new TransactionException("Failed to report global commit", var6);}}}} finally {if (this.xid.equals(RootContext.getXID())) {this.suspend();}}if (LOGGER.isInfoEnabled()) {LOGGER.info("[{}] commit status: {}", this.xid, this.status);}}}
    public GlobalStatus commit(String xid) throws TransactionException {//组装请求的参数GlobalCommitRequest globalCommit = new GlobalCommitRequest();globalCommit.setXid(xid);//netty请求seata后端GlobalCommitResponse response = (GlobalCommitResponse)this.syncCall(globalCommit);return response.getGlobalStatus();}
   private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {try {return (AbstractTransactionResponse)TmNettyRemotingClient.getInstance().sendSyncRequest(request);} catch (TimeoutException var3) {throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", var3);}}
}

服务端提交流程

在这里插入图片描述seata后台服务时对事务进行的异步提交,首先来分析一下我们后端服务的入口DefaultCoordinator#doglobalCommit方法

   protected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)throws TransactionException {//存储线程池的xid方便看日志MDC.put(RootContext.MDC_KEY_XID, request.getXid());//核心方法response.setGlobalStatus(core.commit(request.getXid()));}
 public GlobalStatus commit(String xid) throws TransactionException {//key1:查询全局事务 如果事务被清理,则直接返回完成GlobalSession globalSession = SessionHolder.findGlobalSession(xid);if (globalSession == null) {return GlobalStatus.Finished;}//添加监听器globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// just lock changeStatusboolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {if (globalSession.getStatus() == GlobalStatus.Begin) {// Highlight: Firstly, close the session, then no more branch can be registered.//释放全局锁globalSession.closeAndClean();// key2: 异步提交//判断是否可以进行异步提交,是AT模式if (globalSession.canBeCommittedAsync()) {//异步提交globalSession.asyncCommit();MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);return false;} else {//更改全局事务状态为commitingglobalSession.changeGlobalStatus(GlobalStatus.Committing);return true;}}return false;});//如果可以提交,则执行shouldCommitif (shouldCommit) {boolean success = doGlobalCommit(globalSession, false);//If successful and all remaining branches can be committed asynchronously, do async commit.if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {globalSession.asyncCommit();return GlobalStatus.Committed;} else {return globalSession.getStatus();}} else {return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();}}

AT模式将事务状态改为异步提交AsyncCommitting,然后定时运行线程池1s轮训运行一次

    public void asyncCommit() throws TransactionException {this.addSessionLifecycleListener(SessionHolder.getAsyncCommittingSessionManager());// 设置一个状态异步提交·this.setStatus(GlobalStatus.AsyncCommitting);//  异步增加全局session信息SessionHolder.getAsyncCommittingSessionManager().addGlobalSession(this);}

异步定时任务DefaultCoordinator#init方法中定时开启

    public void init() {retryRollbacking.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(RETRY_ROLLBACKING, this::handleRetryRollbacking), 0,ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);retryCommitting.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(RETRY_COMMITTING, this::handleRetryCommitting), 0,COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);//我们事务异步提交都在这里执行asyncCommitting.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0,ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);timeoutCheck.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(TX_TIMEOUT_CHECK, this::timeoutCheck), 0,TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);undoLogDelete.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(UNDOLOG_DELETE, this::undoLogDelete),UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);}

获取所有的一步提交状态的事务,并加上排他锁

    public static boolean distributedLockAndExecute(String key, NoArgsFunc func) {boolean lock = false;try {//核心方法,请求加锁if (lock = acquireDistributedLock(key)) {func.call();}} catch (Exception e) {LOGGER.info("Exception running function with key = {}", key, e);} finally {if (lock) {try {SessionHolder.releaseDistributedLock(key);} catch (Exception ex) {LOGGER.warn("release distribute lock failure, message = {}", ex.getMessage(), ex);}}}return lock;}
public boolean acquireLock(DistributedLockDO distributedLockDO) {if (demotion) {return true;}Connection connection = null;boolean originalAutoCommit = false;try {connection = distributedLockDataSource.getConnection();originalAutoCommit = connection.getAutoCommit();connection.setAutoCommit(false);//这个是核心,查询出所有的异步提交的事务,并且查询的sql上加上排他锁DistributedLockDO distributedLockDOFromDB = getDistributedLockDO(connection, distributedLockDO.getLockKey());//如果查询出来的数据为nullif (null == distributedLockDOFromDB) {//执行插入的sql "INSERT INTO " + DISTRIBUTED_LOCK_TABLE_PLACE_HOLD + "(" + ALL_COLUMNS + ") VALUES (?, ?, ?)";boolean ret = insertDistribute(connection, distributedLockDO);//提交事务connection.commit();return ret;}//判断过期时间如果大于等于当前的系统时间if (distributedLockDOFromDB.getExpireTime() >= System.currentTimeMillis()) {LOGGER.debug("the distribute lock for key :{} is holding by :{}, acquire lock failure.",distributedLockDO.getLockKey(), distributedLockDOFromDB.getLockValue());//事务提交connection.commit();return false;}//执行更新sql"UPDATE " + "distributed_lock_table" + " SET "+ ServerTableColumnsName.DISTRIBUTED_LOCK_VALUE + "=?, " + ServerTableColumnsName.DISTRIBUTED_LOCK_EXPIRE + "=?"+ " WHERE " + ServerTableColumnsName.DISTRIBUTED_LOCK_KEY + "=?";boolean ret = updateDistributedLock(connection, distributedLockDO);//事务提交connection.commit();return ret;} catch (SQLException ex) {LOGGER.error("execute acquire lock failure, key is: {}", distributedLockDO.getLockKey(), ex);try {if (connection != null) {//回滚connection.rollback();}} catch (SQLException e) {LOGGER.warn("rollback fail because of {}", e.getMessage(), e);}return false;} finally {try {if (originalAutoCommit) {connection.setAutoCommit(true);}IOUtil.close(connection);} catch (SQLException ignore) { }}}

加上分布式事务锁之后,我们需要执行handleAsyncCommitting

protected void handleAsyncCommitting() {SessionCondition sessionCondition = new SessionCondition(GlobalStatus.AsyncCommitting);// key1:获取所有GlobalSession,并且是事务为AsyncCommiting状态Collection<GlobalSession> asyncCommittingSessions =SessionHolder.getAsyncCommittingSessionManager().findGlobalSessions(sessionCondition);if (CollectionUtils.isEmpty(asyncCommittingSessions)) {return;}// key2:遍历SessionHelper.forEach(asyncCommittingSessions, asyncCommittingSession -> {try {asyncCommittingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// 进行处理core.doGlobalCommit(asyncCommittingSession, true);} catch (TransactionException ex) {LOGGER.error("Failed to async committing [{}] {} {}", asyncCommittingSession.getXid(), ex.getCode(), ex.getMessage(), ex);}});}

事务的核心执行流程

@Overridepublic boolean  doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {boolean success = true;// start committing eventMetricsPublisher.postSessionDoingEvent(globalSession, retrying);if (globalSession.isSaga()) {success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);} else {// key1: 获取分支事务Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {// if not retrying, skip the canBeCommittedAsync branchesif (!retrying && branchSession.canBeCommittedAsync()) {return CONTINUE;}BranchStatus currentStatus = branchSession.getStatus();// key2: 当前事务状态为PhaseOne_Failed,则删除分支事务,释放全局锁if (currentStatus == BranchStatus.PhaseOne_Failed) {SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;}try {//key3:发送rpc删除undolog日志BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);if (isXaerNotaTimeout(globalSession,branchStatus)) {LOGGER.info("Commit branch XAER_NOTA retry timeout, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());branchStatus = BranchStatus.PhaseTwo_Committed;}switch (branchStatus) {case PhaseTwo_Committed://key4: 删除分支事务信息,释放全局锁SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;case PhaseTwo_CommitFailed_Unretryable://远程删除失败则报错,如果不是异步提交,修改状态CommitFailedSessionHelper.endCommitFailed(globalSession, retrying);LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId());return false;default:if (!retrying) {globalSession.queueToRetryCommit();return false;}if (globalSession.canBeCommittedAsync()) {LOGGER.error("Committing branch transaction[{}], status:{} and will retry later",branchSession.getBranchId(), branchStatus);return CONTINUE;} else {LOGGER.error("Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later.", globalSession.getXid(), branchSession.getBranchId());return false;}}} catch (Exception ex) {StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}",new String[] {branchSession.toString()});if (!retrying) {globalSession.queueToRetryCommit();throw new TransactionException(ex);}}return CONTINUE;});// Return if the result is not nullif (result != null) {return result;}//If has branch and not all remaining branches can be committed asynchronously,//do print log and return falseif (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());return false;}if (!retrying) {//contains not AT branchglobalSession.setStatus(GlobalStatus.Committed);}}// if it succeeds and there is no branch, retrying=true is the asynchronous state when retrying. EndCommitted is// executed to improve concurrency performance, and the global transaction ends..if (success && globalSession.getBranchSessions().isEmpty()) {// key4: 删除全局事务信息SessionHelper.endCommitted(globalSession, retrying);LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid());}return success;}

客户端删除undo_log

在这里插入图片描述
我们进入客户端DataSourceManager#branchcommit方法

//这个属于核心,在类的构造方法中,会初始化一个定时任务private final AsyncWorker asyncWorker = new AsyncWorker(this);
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {return this.asyncWorker.branchCommit(xid, branchId, resourceId);}
    public BranchStatus branchCommit(String xid, long branchId, String resourceId) {AsyncWorker.Phase2Context context = new AsyncWorker.Phase2Context(xid, branchId, resourceId);//将事务加入队列this.addToCommitQueue(context);return BranchStatus.PhaseTwo_Committed;}

AsyncWorker的构造方法

    public AsyncWorker(DataSourceManager dataSourceManager) {this.dataSourceManager = dataSourceManager;LOGGER.info("Async Commit Buffer Limit: {}", ASYNC_COMMIT_BUFFER_LIMIT);//初始化一个队列this.commitQueue = new LinkedBlockingQueue(ASYNC_COMMIT_BUFFER_LIMIT);ThreadFactory threadFactory = new NamedThreadFactory("AsyncWorker", 2, true);this.scheduledExecutor = new ScheduledThreadPoolExecutor(2, threadFactory);//每一秒就轮训执行方法doBranchCommitSafelythis.scheduledExecutor.scheduleAtFixedRate(this::doBranchCommitSafely, 10L, 1000L, TimeUnit.MILLISECONDS);}
    private void doBranchCommit() {//判断队列是否为空if (!this.commitQueue.isEmpty()) {List<AsyncWorker.Phase2Context> allContexts = new LinkedList();//将队列中的消息添加到新的list中this.commitQueue.drainTo(allContexts);//校验和处理数据,并将其封装为mapMap<String, List<AsyncWorker.Phase2Context>> groupedContexts = this.groupedByResourceId(allContexts);groupedContexts.forEach(this::dealWithGroupedContexts);}}
 private void dealWithGroupedContexts(String resourceId, List<AsyncWorker.Phase2Context> contexts) {//判断resourceId是否为空if (StringUtils.isBlank(resourceId)) {LOGGER.warn("resourceId is empty and will skip.");} else {DataSourceProxy dataSourceProxy = this.dataSourceManager.get(resourceId);if (dataSourceProxy == null) {LOGGER.warn("failed to find resource for {} and requeue", resourceId);//添加到提交队列this.addAllToCommitQueue(contexts);} else {Connection conn = null;try {conn = dataSourceProxy.getPlainConnection();//获取undolog事务管理器UndoLogManager undoLogManager = UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType());List<List<AsyncWorker.Phase2Context>> splitByLimit = Lists.partition(contexts, 1000);Iterator var7 = splitByLimit.iterator();//循环遍历while(var7.hasNext()) {List<AsyncWorker.Phase2Context> partition = (List)var7.next();//删除undologr日志this.deleteUndoLog(conn, undoLogManager, partition);}} catch (SQLException var12) {this.addAllToCommitQueue(contexts);LOGGER.error("failed to get connection for async committing on {} and requeue", resourceId, var12);} finally {IOUtil.close(conn);}}}}
    private void deleteUndoLog(final Connection conn, UndoLogManager undoLogManager, List<AsyncWorker.Phase2Context> contexts) {Set<String> xids = new LinkedHashSet(contexts.size());Set<Long> branchIds = new LinkedHashSet(contexts.size());contexts.forEach((context) -> {xids.add(context.xid);branchIds.add(context.branchId);});try {//批量删除undoLogManager.batchDeleteUndoLog(xids, branchIds, conn);if (!conn.getAutoCommit()) {conn.commit();}} catch (SQLException var9) {LOGGER.error("Failed to batch delete undo log", var9);try {conn.rollback();this.addAllToCommitQueue(contexts);} catch (SQLException var8) {LOGGER.error("Failed to rollback JDBC resource after deleting undo log failed", var8);}}}

事务回滚

客户端事务回滚

在这里插入图片描述
客户端事务主要从AbstractTransactionRequest的实现类中去找到branchTrancetionRollback类

public abstract class AbstractTransactionRequest extends AbstractMessage {public AbstractTransactionRequest() {}public abstract AbstractTransactionResponse handle(RpcContext rpcContext);
}

这个类是rollBack的核心类

public class BranchRollbackRequest extends AbstractBranchEndRequest {public BranchRollbackRequest() {}public short getTypeCode() {return 5;}public AbstractTransactionResponse handle(RpcContext rpcContext) {return this.handler.handle(this);}
}
    protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response) throws TransactionException {//封装请求参数String xid = request.getXid();long branchId = request.getBranchId();String resourceId = request.getResourceId();String applicationData = request.getApplicationData();if (LOGGER.isInfoEnabled()) {LOGGER.info("Branch Rollbacking: " + xid + " " + branchId + " " + resourceId);}BranchStatus status = this.getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId, applicationData);//封装响应数据response.setXid(xid);response.setBranchId(branchId);response.setBranchStatus(status);if (LOGGER.isInfoEnabled()) {LOGGER.info("Branch Rollbacked result: " + status);}}
    public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {DataSourceProxy dataSourceProxy = this.get(resourceId);if (dataSourceProxy == null) {throw new ShouldNeverHappenException(String.format("resource: %s not found", resourceId));} else {try {//核心的执行方法UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);} catch (TransactionException var9) {StackTraceLogger.info(LOGGER, var9, "branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]", new Object[]{branchType, xid, branchId, resourceId, applicationData, var9.getMessage()});if (var9.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;}return BranchStatus.PhaseTwo_RollbackFailed_Retryable;}//返回二次提交回滚状态return BranchStatus.PhaseTwo_Rollbacked;}}
 public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {Connection conn = null;ResultSet rs = null;PreparedStatement selectPST = null;boolean originalAutoCommit = true;while(true) {try {conn = dataSourceProxy.getPlainConnection();if (originalAutoCommit = conn.getAutoCommit()) {conn.setAutoCommit(false);}//查询出来事务的undolog      SELECT_UNDO_LOG_SQL = "SELECT * FROM " + UNDO_LOG_TABLE_NAME + " WHERE " + "branch_id" + " = ? AND " + "xid" + " = ? FOR UPDATE";selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);selectPST.setLong(1, branchId);selectPST.setString(2, xid);rs = selectPST.executeQuery();boolean exists = false;while(rs.next()) {exists = true;int state = rs.getInt("log_status");if (!canUndo(state)) {if (LOGGER.isInfoEnabled()) {LOGGER.info("xid {} branch {}, ignore {} undo_log", new Object[]{xid, branchId, state});}return;}String contextString = rs.getString("context");Map<String, String> context = this.parseContext(contextString);//封装undolog,然后执行sql会进行回滚byte[] rollbackInfo = this.getRollbackInfo(rs);String serializer = context == null ? null : (String)context.get("serializer");UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance() : UndoLogParserFactory.getInstance(serializer);BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);try {setCurrentSerializer(parser.getName());List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();if (sqlUndoLogs.size() > 1) {Collections.reverse(sqlUndoLogs);}Iterator var18 = sqlUndoLogs.iterator();while(var18.hasNext()) {SQLUndoLog sqlUndoLog = (SQLUndoLog)var18.next();TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());sqlUndoLog.setTableMeta(tableMeta);AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(dataSourceProxy.getDbType(), sqlUndoLog);//执行对应的回滚操作undoExecutor.executeOn(conn);}} finally {removeCurrentSerializer();}}if (exists) {//删除undologthis.deleteUndoLog(xid, branchId, conn);conn.commit();if (LOGGER.isInfoEnabled()) {LOGGER.info("xid {} branch {}, undo_log deleted with {}", new Object[]{xid, branchId, AbstractUndoLogManager.State.GlobalFinished.name()});break;}} else {this.insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);conn.commit();if (LOGGER.isInfoEnabled()) {LOGGER.info("xid {} branch {}, undo_log added with {}", new Object[]{xid, branchId, AbstractUndoLogManager.State.GlobalFinished.name()});break;}}return;} catch (SQLIntegrityConstraintViolationException var43) {if (LOGGER.isInfoEnabled()) {LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback", xid, branchId);}} catch (Throwable var44) {if (conn != null) {try {conn.rollback();} catch (SQLException var41) {LOGGER.warn("Failed to close JDBC resource while undo ... ", var41);}}throw new BranchTransactionException(TransactionExceptionCode.BranchRollbackFailed_Retriable, String.format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid, branchId, var44.getMessage()), var44);} finally {try {if (rs != null) {rs.close();}if (selectPST != null) {selectPST.close();}if (conn != null) {if (originalAutoCommit) {conn.setAutoCommit(true);}conn.close();}} catch (SQLException var40) {LOGGER.warn("Failed to close JDBC resource while undo ... ", var40);}}}}

服务端回滚事务

在这里插入图片描述
服务端的事务回滚是从GlobalRollbackRequest方法进入的

public class GlobalRollbackRequest extends AbstractGlobalEndRequest {@Overridepublic short getTypeCode() {return MessageType.TYPE_GLOBAL_ROLLBACK;}@Overridepublic AbstractTransactionResponse handle(RpcContext rpcContext) {//这里是核心return handler.handle(this, rpcContext);}
}
 @Overridepublic GlobalRollbackResponse handle(GlobalRollbackRequest request, final RpcContext rpcContext) {//组装响应的数据GlobalRollbackResponse response = new GlobalRollbackResponse();response.setGlobalStatus(GlobalStatus.Rollbacking);exceptionHandleTemplate(new AbstractCallback<GlobalRollbackRequest, GlobalRollbackResponse>() {@Overridepublic void execute(GlobalRollbackRequest request, GlobalRollbackResponse response)throws TransactionException {try {//核心的执行类doGlobalRollback(request, response, rpcContext);} catch (StoreException e) {throw new TransactionException(TransactionExceptionCode.FailedStore, String.format("global rollback request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e);}}@Overridepublic void onTransactionException(GlobalRollbackRequest request, GlobalRollbackResponse response,TransactionException tex) {super.onTransactionException(request, response, tex);// may be appears StoreException outer layer method catchcheckTransactionStatus(request, response);}@Overridepublic void onException(GlobalRollbackRequest request, GlobalRollbackResponse response, Exception rex) {super.onException(request, response, rex);// may be appears StoreException outer layer method catchcheckTransactionStatus(request, response);}}, request, response);return response;}
  public GlobalStatus rollback(String xid) throws TransactionException {//key1:事务回滚,先通过全局事xid找到全局事务和分支事务//1.先通过全局事务XID找到去global_table查询到全局事务对象//2.通过查询出来的xid对象去查询branch_talbe是否有分支事务,最后将这两个事务对象封装到GlobalSession中//这里传入的true代表全局事务和分支事务都查询出来,全局事务是一条数据,分支事务可能会多条是一个listGlobalSession globalSession = SessionHolder.findGlobalSession(xid);if (globalSession == null) {return GlobalStatus.Finished;}globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());// just lock changeStatusboolean shouldRollBack = SessionHolder.lockAndExecute(globalSession, () -> {//设置本事务的active状态为falseglobalSession.close(); // Highlight: Firstly, close the session, then no more branch can be registered.//如果目前的全局事务状态为Begin开始状态,那么修改全局事务的状态为回滚中,修改数据表global_table的status的值为Rollbackingif (globalSession.getStatus() == GlobalStatus.Begin) {globalSession.changeGlobalStatus(GlobalStatus.Rollbacking);return true;}return false;});if (!shouldRollBack) {return globalSession.getStatus();}//key2:真正的回滚逻辑boolean rollbackSuccess = doGlobalRollback(globalSession, false);//返回回滚的逻辑return rollbackSuccess ? GlobalStatus.Rollbacked : globalSession.getStatus();}
@Overridepublic boolean doGlobalRollback(GlobalSession globalSession, boolean retrying) throws TransactionException {boolean success = true;// start rollback eventMetricsPublisher.postSessionDoingEvent(globalSession, retrying);if (globalSession.isSaga()) {success = getCore(BranchType.SAGA).doGlobalRollback(globalSession, retrying);} else {//globalSession.getReverseSortedBranches()得到的是所有分支Boolean result = SessionHelper.forEach(globalSession.getReverseSortedBranches(), branchSession -> {BranchStatus currentBranchStatus = branchSession.getStatus();//如果分支事务是在一阶段失败的,调用removeBranch,释放锁if (currentBranchStatus == BranchStatus.PhaseOne_Failed) {SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;}try {//branchRollback回滚分支事务,这里是server端,所以这里的分支事务的回滚是调用的远程进行回滚的//而远程回滚就是使用的undo log日志表来回滚的BranchStatus branchStatus = branchRollback(globalSession, branchSession);if (isXaerNotaTimeout(globalSession, branchStatus)) {LOGGER.info("Rollback branch XAER_NOTA retry timeout, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());branchStatus = BranchStatus.PhaseTwo_Rollbacked;}switch (branchStatus) {//PhaseTwo_Rollbacked表示远程回滚成功case PhaseTwo_Rollbacked:// 二阶段回滚,删除分支事务信息//删除分支事务信息,就是删除branch_table和释放锁//1.释放锁,删除lock_table中的行锁信息;//2.删除分支事务,branch_tableSessionHelper.removeBranch(globalSession, branchSession, !retrying);LOGGER.info("Rollback branch transaction successfully, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());return CONTINUE;case PhaseTwo_RollbackFailed_Unretryable://远程回滚失败,修改全局事务状态为RollbackFailedSessionHelper.endRollbackFailed(globalSession, retrying);LOGGER.info("Rollback branch transaction fail and stop retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());return false;default:LOGGER.info("Rollback branch transaction fail and will retry, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());if (!retrying) {globalSession.queueToRetryRollback();}return false;}} catch (Exception ex) {StackTraceLogger.error(LOGGER, ex,"Rollback branch transaction exception, xid = {} branchId = {} exception = {}",new String[] {globalSession.getXid(), String.valueOf(branchSession.getBranchId()), ex.getMessage()});if (!retrying) {globalSession.queueToRetryRollback();}throw new TransactionException(ex);}});// Return if the result is not nullif (result != null) {return result;}}// In db mode, lock and branch data residual problems may occur.// Therefore, execution needs to be delayed here and cannot be executed synchronously.if (success) {// 删除全局事务数据SessionHelper.endRollbacked(globalSession, retrying);LOGGER.info("Rollback global transaction successfully, xid = {}.", globalSession.getXid());}return success;}

起一个定时任务删除全局事务。

 public void init() {retryRollbacking.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(RETRY_ROLLBACKING, this::handleRetryRollbacking), 0,ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);retryCommitting.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(RETRY_COMMITTING, this::handleRetryCommitting), 0,COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);asyncCommitting.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(ASYNC_COMMITTING, this::handleAsyncCommitting), 0,ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);timeoutCheck.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(TX_TIMEOUT_CHECK, this::timeoutCheck), 0,TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);undoLogDelete.scheduleAtFixedRate(() -> SessionHolder.distributedLockAndExecute(UNDOLOG_DELETE, this::undoLogDelete),UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);}
  */protected void handleRetryRollbacking() {SessionCondition sessionCondition = new SessionCondition(rollbackingStatuses);sessionCondition.setLazyLoadBranch(true);Collection<GlobalSession> rollbackingSessions =SessionHolder.getRetryRollbackingSessionManager().findGlobalSessions(sessionCondition);if (CollectionUtils.isEmpty(rollbackingSessions)) {return;}long now = System.currentTimeMillis();SessionHelper.forEach(rollbackingSessions, rollbackingSession -> {try {// prevent repeated rollback// !rollbackingSession.isDeadSession() 判断回滚的全局事务必须超时,对应时间可以进入看if (rollbackingSession.getStatus().equals(GlobalStatus.Rollbacking)&& !rollbackingSession.isDeadSession()) {// The function of this 'return' is 'continue'.return;}if (isRetryTimeout(now, MAX_ROLLBACK_RETRY_TIMEOUT.toMillis(), rollbackingSession.getBeginTime())) {if (ROLLBACK_RETRY_TIMEOUT_UNLOCK_ENABLE) {rollbackingSession.clean();}// Prevent thread safety issuesSessionHolder.getRetryRollbackingSessionManager().removeGlobalSession(rollbackingSession);LOGGER.error("Global transaction rollback retry timeout and has removed [{}]", rollbackingSession.getXid());SessionHelper.endRollbackFailed(rollbackingSession, true);// rollback retry timeout eventMetricsPublisher.postSessionDoneEvent(rollbackingSession, GlobalStatus.RollbackRetryTimeout, true, false);//The function of this 'return' is 'continue'.return;}rollbackingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());core.doGlobalRollback(rollbackingSession, true);} catch (TransactionException ex) {LOGGER.info("Failed to retry rollbacking [{}] {} {}", rollbackingSession.getXid(), ex.getCode(), ex.getMessage());}});}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com