1.绪论
在深度解析RocketMq源码-高可用存储组件(一) raft协议详解-CSDN博客 中讲过,raft协议中,日志同步主要有两个地方,一个是leader会跟follower同步数据,另一个是在新leader诞生的时候,会与以前的follower进行日志匹配,作一致性校验。而在Dledger的进行日志校验的组件就是DLedgerEntryPusher,接下来我们将探索该组件的源码。
2.DLedgerEntryPusher的组成
数据同步组件,主要是leader写入数据后,将消息同步给follower & 一致性检查过后,leader与消息保持索引一致
//数据同步组件,主要是leader写入数据后,将消息同步给follower & 一致性检查过后,leader与消息保持索引一致
public class DLedgerEntryPusher {private static Logger logger = LoggerFactory.getLogger(DLedgerEntryPusher.class);//Dledger的配置private DLedgerConfig dLedgerConfig;//所属的存储组件private DLedgerStore dLedgerStore;//当前节点的元信息private final MemberState memberState;//负责网络请求的组件private DLedgerRpcService dLedgerRpcService;//key:term <key:peerId value:每个阶段最后的一条日志Index>private Map<Long, ConcurrentMap<String, Long>> peerWaterMarksByTerm = new ConcurrentHashMap<>();//key:term <key:日志的index id value:写入结果,是一个future>private Map<Long, ConcurrentMap<Long, TimeoutFuture<AppendEntryResponse>>> pendingAppendResponsesByTerm = new ConcurrentHashMap<>();//follower使用 接收leader的日志同步,然后按照顺序写入到日志文件中private EntryHandler entryHandler;//raft的日志写入是通过二阶段提交实现,如果第一阶段,超过半数follower写入成功,就认为数据写入成功,该组件就是检查是否follower是否写入成功的private QuorumAckChecker quorumAckChecker;//leader使用 将不同的命令同步给followerprivate Map<String, EntryDispatcher> dispatcherMap = new HashMap<>();//将命令应用到状态机的组件private Optional<StateMachineCaller> fsmCaller;
}
3.DLedger每条消息的组成结构-DLedgerEntry
可以看出每条消息都包含term和index这两个关键信息
public class DLedgerEntry {public final static int POS_OFFSET = 4 + 4 + 8 + 8;public final static int HEADER_SIZE = POS_OFFSET + 8 + 4 + 4 + 4;public final static int BODY_OFFSET = HEADER_SIZE + 4;//魔数private int magic;//消息大小private int size;//日志索引private long index;//termprivate long term;//物理位置private long pos; //used to validate dataprivate int channel; //reservedprivate int chainCrc; //like the block chain, this crc indicates any modification before this entry.private int bodyCrc; //the crc of the body//真正的消息体private byte[] body;
}
4.同步消息的步骤
4.1 leader发送同步请求-EntryDispatcher
leader节点会给每个follower构建一个EntryDispatcher,专门用来进行数据同步。
4.1.1 组件组成
该组件是follower同步leader数据的组件,有4种可能:
1.APPEND: 将消息同步到follower的commitLog中;
2.COMPARE: 当leader发生变更,leader会与所有的follower进行一致性检查;
3.TRUNCATE: 如果leader检查完成过后,发现follower的最后一条索引更大,需要将他删除掉;
4.COMMIT: 当leader超过一半的节点都append日志成功过后,leader会进行commit,这个时候会发送
//该组件是follower同步leader数据的组件,有4种可能
//1.APPEND:将消息同步到follower的commitLog中//2.COMPARE: 当leader发生变更,leader会与所有的follower进行一致性检查//3.TRUNCATE:如果leader检查完成过后,发现follower的最后一条索引更大,需要将他删除掉//4.COMMIT:当leader超过一半的节点都append日志成功过后,leader会进行commit,这个时候会发送一个请求通知所有的follower进行commit
private class EntryDispatcher extends ShutdownAbleThread {//需要对日志进行处理的类型private AtomicReference<PushEntryRequest.Type> type = new AtomicReference<>(PushEntryRequest.Type.COMPARE);//上一次同步日志的时间private long lastPushCommitTimeMs = -1;//节点id 给哪个节点使用的private String peerId;//已经发送过比较请求的索引private long compareIndex = -1;//已经发送过同步请求的索引private long writeIndex = -1;//最大有多少条日志没有同步private int maxPendingSize = 1000;//周期private long term = -1;//leader的idprivate String leaderId = null;//上一次check的时间private long lastCheckLeakTimeMs = System.currentTimeMillis();//同步索引过后,会在次等待结果
}
4.1.2 主要功能
其实是一个死循环,一直调用doWork()方法
public void doWork() {try {//判断当前节点是否是leader节点if (!checkAndFreshState()) {waitForRunning(1);return;}if (type.get() == PushEntryRequest.Type.APPEND) {if (dLedgerConfig.isEnableBatchPush()) {doBatchAppend();} else {//如果是append进行append超过doAppend();}} else {//如果是一致性检测进行doComparedoCompare();}waitForRunning(1);} catch (Throwable t) {DLedgerEntryPusher.logger.error("[Push-{}]Error in {} writeIndex={} compareIndex={}", peerId, getName(), writeIndex, compareIndex, t);changeState(-1, PushEntryRequest.Type.COMPARE);DLedgerUtils.sleep(500);}}}
4.1.3 同步日志步骤
1.发送日志预提交请求
//同步那条索引的数据
private void doAppendInner(long index) throws Exception {//根据index获取到commitLog中的数据,其实首先从indexfile中获取到日志的具体的物理偏移量,再根据物理偏移量获取到对应的数据DLedgerEntry entry = getDLedgerEntryForAppend(index);if (null == entry) {return;}//流量控制checkQuotaAndWait(entry);//构建append请求PushEntryRequest request = buildPushRequest(entry, PushEntryRequest.Type.APPEND);//通过网络请求,发送请求CompletableFuture<PushEntryResponse> responseFuture = dLedgerRpcService.push(request);//加入peddingMap中,便是第index条索引正在同步pendingMap.put(index, System.currentTimeMillis());responseFuture.whenComplete((x, ex) -> {try {PreConditions.check(ex == null, DLedgerResponseCode.UNKNOWN);DLedgerResponseCode responseCode = DLedgerResponseCode.valueOf(x.getCode());switch (responseCode) {case SUCCESS:pendingMap.remove(x.getIndex());//如果发送请求成功 便更新当前节点已经发送请求的最新的一条索引updatePeerWaterMark(x.getTerm(), peerId, x.getIndex());quorumAckChecker.wakeup();break;case INCONSISTENT_STATE:logger.info("[Push-{}]Get INCONSISTENT_STATE when push index={} term={}", peerId, x.getIndex(), x.getTerm());changeState(-1, PushEntryRequest.Type.COMPARE);break;default:logger.warn("[Push-{}]Get error response code {} {}", peerId, responseCode, x.baseInfo());break;}} catch (Throwable t) {logger.error("", t);}});//更新最后的同步时间lastPushCommitTimeMs = System.currentTimeMillis();
}
//构建日志的请求
private PushEntryRequest buildPushRequest(DLedgerEntry entry, PushEntryRequest.Type target) {PushEntryRequest request = new PushEntryRequest();//所属组request.setGroup(memberState.getGroup());//需要发送的节点idrequest.setRemoteId(peerId);//当前leaderIdrequest.setLeaderId(leaderId);//自己的idrequest.setLocalId(memberState.getSelfId());//当前周期request.setTerm(term);//日志内容request.setEntry(entry);//消息类型request.setType(target);//主节点中最新一条commit过的日志索引 通过它可以将已经提交过的索引带给follower,follower收到过后可以将该索引之间的日志进行提交request.setCommitIndex(dLedgerStore.getCommittedIndex());return request;
}
2.进行commit操作
private void doCommit() throws Exception {//如果超过上次1000ms,才提交commit请求 实现批量commit的功能if (DLedgerUtils.elapsed(lastPushCommitTimeMs) > 1000) {//发送commit请求PushEntryRequest request = buildPushRequest(null, PushEntryRequest.Type.COMMIT);//Ignore the resultsdLedgerRpcService.push(request);lastPushCommitTimeMs = System.currentTimeMillis();}
}
4.1.4 leader的一致性检测步骤
其实就是比较follower的最后一条索引的合法性,如果存在脏数据,便构建truncate请求删掉。如果比较失败,便将自己的最新的index减1发送到follower再次比对,直到成功。
private void doCompare() throws Exception {while (true) {if (!checkAndFreshState()) {break;}if (type.get() != PushEntryRequest.Type.COMPARE&& type.get() != PushEntryRequest.Type.TRUNCATE) {break;}if (compareIndex == -1 && dLedgerStore.getLedgerEndIndex() == -1) {break;}//revise the compareIndex//设置commpare的开始比较的索引为leader的写入额度最后一条索引if (compareIndex == -1) {compareIndex = dLedgerStore.getLedgerEndIndex();logger.info("[Push-{}][DoCompare] compareIndex=-1 means start to compare", peerId);} else if (compareIndex > dLedgerStore.getLedgerEndIndex() || compareIndex < dLedgerStore.getLedgerBeginIndex()) {logger.info("[Push-{}][DoCompare] compareIndex={} out of range {}-{}", peerId, compareIndex, dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex());compareIndex = dLedgerStore.getLedgerEndIndex();}//获取entryDLedgerEntry entry = dLedgerStore.get(compareIndex);PreConditions.check(entry != null, DLedgerResponseCode.INTERNAL_ERROR, "compareIndex=%d", compareIndex);//封装compare请求 & 发送请求PushEntryRequest request = buildPushRequest(entry, PushEntryRequest.Type.COMPARE);CompletableFuture<PushEntryResponse> responseFuture = dLedgerRpcService.push(request);PushEntryResponse response = responseFuture.get(3, TimeUnit.SECONDS);PreConditions.check(response != null, DLedgerResponseCode.INTERNAL_ERROR, "compareIndex=%d", compareIndex);PreConditions.check(response.getCode() == DLedgerResponseCode.INCONSISTENT_STATE.getCode() || response.getCode() == DLedgerResponseCode.SUCCESS.getCode(), DLedgerResponseCode.valueOf(response.getCode()), "compareIndex=%d", compareIndex);long truncateIndex = -1;//如果比较成功,证明此时leader和follower的数据是一致的,但是follower的最后一条索引的位置大于leader的最后一条的位置的话,证明follower有脏数据,需要删除if (response.getCode() == DLedgerResponseCode.SUCCESS.getCode()) {/** The comparison is successful:* 1.Just change to append state, if the follower's end index is equal the compared index.* 2.Truncate the follower, if the follower has some dirty entries.*/if (compareIndex == response.getEndIndex()) {changeState(compareIndex, PushEntryRequest.Type.APPEND);break;} else {truncateIndex = compareIndex;}//如果follower的位置不在leader的索引位置之间,有脏数据,需要删除} else if (response.getEndIndex() < dLedgerStore.getLedgerBeginIndex()|| response.getBeginIndex() > dLedgerStore.getLedgerEndIndex()) {/*The follower's entries does not intersect with the leader.This usually happened when the follower has crashed for a long time while the leader has deleted the expired entries.Just truncate the follower.*/truncateIndex = dLedgerStore.getLedgerBeginIndex();} else if (compareIndex < response.getBeginIndex()) {/*The compared index is smaller than the follower's begin index.This happened rarely, usually means some disk damage.Just truncate the follower.*/truncateIndex = dLedgerStore.getLedgerBeginIndex();} else if (compareIndex > response.getEndIndex()) {/*The compared index is bigger than the follower's end index.This happened frequently. For the compared index is usually starting from the end index of the leader.*/compareIndex = response.getEndIndex();//如果比较失败,即follower的index在leader的index之间,但是小于leader的index,便leader的index-- 再进行比较} else {/*Compare failed and the compared index is in the range of follower's entries.*/compareIndex--;}/*The compared index is smaller than the leader's begin index, truncate the follower.*/if (compareIndex < dLedgerStore.getLedgerBeginIndex()) {truncateIndex = dLedgerStore.getLedgerBeginIndex();}/*If get value for truncateIndex, do it right now.*/if (truncateIndex != -1) {changeState(truncateIndex, PushEntryRequest.Type.TRUNCATE);doTruncate(truncateIndex);break;}}}
4.2 leader检测是否有半数以上follower写入成功-QuorumAckChecker
private class QuorumAckChecker extends ShutdownAbleThread {private long lastPrintWatermarkTimeMs = System.currentTimeMillis();private long lastCheckLeakTimeMs = System.currentTimeMillis();private long lastQuorumIndex = -1;public QuorumAckChecker(Logger logger) {super("QuorumAckChecker-" + memberState.getSelfId(), logger);}@Overridepublic void doWork() {try {if (DLedgerUtils.elapsed(lastPrintWatermarkTimeMs) > 3000) {if (DLedgerEntryPusher.this.fsmCaller.isPresent()) {//获取上次已经被应用到状态机的indexfinal long lastAppliedIndex = DLedgerEntryPusher.this.fsmCaller.get().getLastAppliedIndex();logger.info("[{}][{}] term={} ledgerBegin={} ledgerEnd={} committed={} watermarks={} appliedIndex={}",memberState.getSelfId(), memberState.getRole(), memberState.currTerm(), dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex(), dLedgerStore.getCommittedIndex(), JSON.toJSONString(peerWaterMarksByTerm), lastAppliedIndex);} else {logger.info("[{}][{}] term={} ledgerBegin={} ledgerEnd={} committed={} watermarks={}",memberState.getSelfId(), memberState.getRole(), memberState.currTerm(), dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex(), dLedgerStore.getCommittedIndex(), JSON.toJSONString(peerWaterMarksByTerm));}lastPrintWatermarkTimeMs = System.currentTimeMillis();}if (!memberState.isLeader()) {waitForRunning(1);return;}long currTerm = memberState.currTerm();checkTermForPendingMap(currTerm, "QuorumAckChecker");checkTermForWaterMark(currTerm, "QuorumAckChecker");if (pendingAppendResponsesByTerm.size() > 1) {//raft协议规定,每个leader只能commit自己term的请求,如果还存在其他周期待确认的日志,直接返回term已经改变,并且移除掉for (Long term : pendingAppendResponsesByTerm.keySet()) {if (term == currTerm) {continue;}for (Map.Entry<Long, TimeoutFuture<AppendEntryResponse>> futureEntry : pendingAppendResponsesByTerm.get(term).entrySet()) {AppendEntryResponse response = new AppendEntryResponse();response.setGroup(memberState.getGroup());response.setIndex(futureEntry.getKey());response.setCode(DLedgerResponseCode.TERM_CHANGED.getCode());response.setLeaderId(memberState.getLeaderId());logger.info("[TermChange] Will clear the pending response index={} for term changed from {} to {}", futureEntry.getKey(), term, currTerm);futureEntry.getValue().complete(response);}pendingAppendResponsesByTerm.remove(term);}}if (peerWaterMarksByTerm.size() > 1) {for (Long term : peerWaterMarksByTerm.keySet()) {if (term == currTerm) {continue;}logger.info("[TermChange] Will clear the watermarks for term changed from {} to {}", term, currTerm);peerWaterMarksByTerm.remove(term);}}//获取当前周期每个节点的最新一条发送成功的索引Map<String, Long> peerWaterMarks = peerWaterMarksByTerm.get(currTerm);//根据索引进行排序List<Long> sortedWaterMarks = peerWaterMarks.values().stream().sorted(Comparator.reverseOrder()).collect(Collectors.toList());//获取中间的数,比如现在有4个followerA B C D,分别已经发送成功的索引为[5 7 4 9],根据排序取中间数为5,一定是超过半数的节点发送成功的索引是大于5的,所以leader可以commitlong quorumIndex = sortedWaterMarks.get(sortedWaterMarks.size() / 2);final Optional<StateMachineCaller> fsmCaller = DLedgerEntryPusher.this.fsmCaller;if (fsmCaller.isPresent()) {// If there exist statemachine//如果存在状态机,便leader自己提交索引,并且将其应用到状态机DLedgerEntryPusher.this.dLedgerStore.updateCommittedIndex(currTerm, quorumIndex);final StateMachineCaller caller = fsmCaller.get();caller.onCommitted(quorumIndex);// Check elapsedif (DLedgerUtils.elapsed(lastCheckLeakTimeMs) > 1000) {updatePeerWaterMark(currTerm, memberState.getSelfId(), dLedgerStore.getLedgerEndIndex());checkResponseFuturesElapsed(caller.getLastAppliedIndex());lastCheckLeakTimeMs = System.currentTimeMillis();}if (quorumIndex == this.lastQuorumIndex) {waitForRunning(1);}} else {dLedgerStore.updateCommittedIndex(currTerm, quorumIndex);ConcurrentMap<Long, TimeoutFuture<AppendEntryResponse>> responses = pendingAppendResponsesByTerm.get(currTerm);boolean needCheck = false;int ackNum = 0;for (Long i = quorumIndex; i > lastQuorumIndex; i--) {try {CompletableFuture<AppendEntryResponse> future = responses.remove(i);if (future == null) {needCheck = true;break;} else if (!future.isDone()) {AppendEntryResponse response = new AppendEntryResponse();response.setGroup(memberState.getGroup());response.setTerm(currTerm);response.setIndex(i);response.setLeaderId(memberState.getSelfId());response.setPos(((AppendFuture) future).getPos());future.complete(response);}ackNum++;} catch (Throwable t) {logger.error("Error in ack to index={} term={}", i, currTerm, t);}}if (ackNum == 0) {checkResponseFuturesTimeout(quorumIndex + 1);waitForRunning(1);}if (DLedgerUtils.elapsed(lastCheckLeakTimeMs) > 1000 || needCheck) {updatePeerWaterMark(currTerm, memberState.getSelfId(), dLedgerStore.getLedgerEndIndex());checkResponseFuturesElapsed(quorumIndex);lastCheckLeakTimeMs = System.currentTimeMillis();}}lastQuorumIndex = quorumIndex;} catch (Throwable t) {DLedgerEntryPusher.logger.error("Error in {}", getName(), t);DLedgerUtils.sleep(100);}}}
4.3 follower收到日志同步请求的处理
构建响应的步骤如下:
private PushEntryResponse buildResponse(PushEntryRequest request, int code) {//够级pushEntry的响应PushEntryResponse response = new PushEntryResponse();response.setGroup(request.getGroup());//设置响应码response.setCode(code);//当前周期response.setTerm(request.getTerm());if (request.getType() != PushEntryRequest.Type.COMMIT) {response.setIndex(request.getFirstEntryIndex());response.setCount(request.getCount());}//设置当前从节点的开始和结束索引response.setBeginIndex(dLedgerStore.getLedgerBeginIndex());response.setEndIndex(dLedgerStore.getLedgerEndIndex());return response;}
4.3.1 follower处理删除请求
private void doTruncate(long truncateIndex) throws Exception {PreConditions.check(type.get() == PushEntryRequest.Type.TRUNCATE, DLedgerResponseCode.UNKNOWN);//构建请求DLedgerEntry truncateEntry = dLedgerStore.get(truncateIndex);PreConditions.check(truncateEntry != null, DLedgerResponseCode.UNKNOWN);logger.info("[Push-{}]Will push data to truncate truncateIndex={} pos={}", peerId, truncateIndex, truncateEntry.getPos());//发送请求PushEntryRequest truncateRequest = buildPushRequest(truncateEntry, PushEntryRequest.Type.TRUNCATE);PushEntryResponse truncateResponse = dLedgerRpcService.push(truncateRequest).get(3, TimeUnit.SECONDS);PreConditions.check(truncateResponse != null, DLedgerResponseCode.UNKNOWN, "truncateIndex=%d", truncateIndex);PreConditions.check(truncateResponse.getCode() == DLedgerResponseCode.SUCCESS.getCode(), DLedgerResponseCode.valueOf(truncateResponse.getCode()), "truncateIndex=%d", truncateIndex);lastPushCommitTimeMs = System.currentTimeMillis();//删除完成过后,再把动作调整为appendchangeState(truncateIndex, PushEntryRequest.Type.APPEND);}
4.3.2 follower处理追加请求
//处理append
private void handleDoAppend(long writeIndex, PushEntryRequest request,CompletableFuture<PushEntryResponse> future) {try {PreConditions.check(writeIndex == request.getEntry().getIndex(), DLedgerResponseCode.INCONSISTENT_STATE);//调用appendAsFollower方法直接建数据写入到commitLog中DLedgerEntry entry = dLedgerStore.appendAsFollower(request.getEntry(), request.getTerm(), request.getLeaderId());PreConditions.check(entry.getIndex() == writeIndex, DLedgerResponseCode.INCONSISTENT_STATE);//够建立返回请求future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));//更新commitIndex,主节点会将自己在当前term的commitIndex发送过来updateCommittedIndex(request.getTerm(), request.getCommitIndex());} catch (Throwable t) {logger.error("[HandleDoAppend] writeIndex={}", writeIndex, t);future.complete(buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode()));}}
4.3.3 follower处理一致性比较请求
private CompletableFuture<PushEntryResponse> handleDoCompare(long compareIndex, PushEntryRequest request,CompletableFuture<PushEntryResponse> future) {try {PreConditions.check(compareIndex == request.getEntry().getIndex(), DLedgerResponseCode.UNKNOWN);PreConditions.check(request.getType() == PushEntryRequest.Type.COMPARE, DLedgerResponseCode.UNKNOWN);//获取需要比较的索引数据,构建成响应返回给leaderDLedgerEntry local = dLedgerStore.get(compareIndex);PreConditions.check(request.getEntry().equals(local), DLedgerResponseCode.INCONSISTENT_STATE);future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));} catch (Throwable t) {logger.error("[HandleDoCompare] compareIndex={}", compareIndex, t);future.complete(buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode()));}return future;}