Flink CDC版本:3.2.1
Flink cdc3.0动态变更表结构——源码解析


以Sink to doris举例:



/**  * This method is guaranteed to not be called concurrently with other methods of the operator. */
public void processElement(StreamRecord<Event> streamRecord)  throws InterruptedException, TimeoutException, ExecutionException {  Event event = streamRecord.getValue();  if (event instanceof SchemaChangeEvent) {  // (0)processSchemaChangeEvents((SchemaChangeEvent) event);  } else if (event instanceof DataChangeEvent) {  // (13)processDataChangeEvents(streamRecord, (DataChangeEvent) event);  } else {  throw new RuntimeException("Unknown event type in Stream record: " + event);  }  


private void processSchemaChangeEvents(SchemaChangeEvent event)  throws InterruptedException, TimeoutException, ExecutionException {  TableId tableId = event.tableId();  LOG.info(  "{}> Table {} received SchemaChangeEvent {} and start to be blocked.",  subTaskId,  tableId,  event);  handleSchemaChangeEvent(tableId, event);  // Update caches  originalSchema.put(tableId, getLatestOriginalSchema(tableId));  schemaDivergesMap.put(tableId, checkSchemaDiverges(tableId));  List<TableId> optionalRoutedTable = getRoutedTables(tableId);  if (!optionalRoutedTable.isEmpty()) {  tableIdMappingCache  .get(tableId)  .forEach(routed -> evolvedSchema.put(routed, getLatestEvolvedSchema(routed)));  } else {  evolvedSchema.put(tableId, getLatestEvolvedSchema(tableId));  }  


response.isAccepted()就是注册中心接收了此修改需求。进入if后,重点来了:output.collect(new StreamRecord<>(new FlushEvent(tableId))); 。注意这里发送了一个new FlushEvent(tableId)事件,这个事件会在SinkWriter用到,就是通知SinkWriter要执行flush,即把数据刷入到sink端数据库,和jdbc的commit相似。

  • An {@link Event} from {@code SchemaOperator} to notify {@code DataSinkWriterOperator} that it
  • start flushing.
    schema修改后的数据 --> FlushEvent(新插入) --> schema修改前的数据


最后finishedSchemaChangeEvents.forEach(e -> output.collect(new StreamRecord<>(e))); ,简略的说其内部是handler方法中生成的SchemaRegistryRequestHandler#applySchemaChange事件,将原始的SchemaChangeEvent转换成新的数据,还是根据Flink CDC的schema.change.behavior转换,其类型如下:


private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaChangeEvent)  throws InterruptedException, TimeoutException {  if (schemaChangeBehavior == SchemaChangeBehavior.EXCEPTION  && schemaChangeEvent.getType() != SchemaChangeEventType.CREATE_TABLE) {  // CreateTableEvent should be applied even in EXCEPTION mode  throw new RuntimeException(  String.format(  "Refused to apply schema change event %s in EXCEPTION mode.",  schemaChangeEvent));  }  // The request will block if another schema change event is being handled  SchemaChangeResponse response = requestSchemaChange(tableId, schemaChangeEvent);  // (1)if (response.isAccepted()) {   // (3)LOG.info("{}> Sending the FlushEvent for table {}.", subTaskId, tableId);  output.collect(new StreamRecord<>(new FlushEvent(tableId)));  // (4)List<SchemaChangeEvent> expectedSchemaChangeEvents = response.getSchemaChangeEvents();  schemaOperatorMetrics.increaseSchemaChangeEvents(expectedSchemaChangeEvents.size());  // The request will block until flushing finished in each sink writer  SchemaChangeResultResponse schemaEvolveResponse = requestSchemaChangeResult();  // (5) List<SchemaChangeEvent> finishedSchemaChangeEvents =  schemaEvolveResponse.getFinishedSchemaChangeEvents();  // Update evolved schema changes based on apply results  finishedSchemaChangeEvents.forEach(e -> output.collect(new StreamRecord<>(e)));  } else if (response.isDuplicate()) {  LOG.info(  "{}> Schema change event {} has been handled in another subTask already.",  subTaskId,  schemaChangeEvent);  } else if (response.isIgnored()) {  LOG.info(  "{}> Schema change event {} has been ignored. No schema evolution needed.",  subTaskId,  schemaChangeEvent);  } else {  throw new IllegalStateException("Unexpected response status " + response);  }  

requestSchemaChange是一个阻塞的方法(while (true)),发送SchemaChangeRequest直到返回的response不是Busy。可以看到发送的的SchemaChangeRequest

private SchemaChangeResponse requestSchemaChange(  TableId tableId, SchemaChangeEvent schemaChangeEvent)  throws InterruptedException, TimeoutException {  long schemaEvolveTimeOutMillis = System.currentTimeMillis() + rpcTimeOutInMillis;  while (true) {  SchemaChangeResponse response =  sendRequestToCoordinator(  new SchemaChangeRequest(tableId, schemaChangeEvent, subTaskId));  if (response.isRegistryBusy()) {  // (2)if (System.currentTimeMillis() < schemaEvolveTimeOutMillis) {  LOG.info(  "{}> Schema Registry is busy now, waiting for next request...",  subTaskId);  Thread.sleep(1000);  } else {  throw new TimeoutException("TimeOut when requesting schema change");  }  } else {  return response;  }  }  

其实际发送至 SchemaRegistry#handleEventFromOperator

private <REQUEST extends CoordinationRequest, RESPONSE extends CoordinationResponse>  RESPONSE sendRequestToCoordinator(REQUEST request) {  try {  CompletableFuture<CoordinationResponse> responseFuture =  toCoordinator.sendRequestToCoordinator(  getOperatorID(), new SerializedValue<>(request));  return CoordinationResponseUtils.unwrap(responseFuture.get());  } catch (Exception e) {  throw new IllegalStateException(  "Failed to send request to coordinator: " + request.toString(), e);  }  


private SchemaChangeResultResponse requestSchemaChangeResult()  throws InterruptedException, TimeoutException {  CoordinationResponse coordinationResponse =  sendRequestToCoordinator(new SchemaChangeResultRequest());  long nextRpcTimeOutMillis = System.currentTimeMillis() + rpcTimeOutInMillis;  while (coordinationResponse instanceof SchemaChangeProcessingResponse) {  // (6) (7)if (System.currentTimeMillis() < nextRpcTimeOutMillis) {  Thread.sleep(1000);  coordinationResponse = sendRequestToCoordinator(new SchemaChangeResultRequest());  } else {  throw new TimeoutException("TimeOut when requesting release upstream");  }  }  return ((SchemaChangeResultResponse) coordinationResponse);  

这个发送过程也是被org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry#handleCoordinationRequest接收了,并在if (request instanceof SchemaChangeResultRequest)内处理其逻辑。

private <REQUEST extends CoordinationRequest, RESPONSE extends CoordinationResponse>  RESPONSE sendRequestToCoordinator(REQUEST request) {  try {  CompletableFuture<CoordinationResponse> responseFuture =  toCoordinator.sendRequestToCoordinator(  getOperatorID(), new SerializedValue<>(request));  return CoordinationResponseUtils.unwrap(responseFuture.get());  } catch (Exception e) {  throw new IllegalStateException(  "Failed to send request to coordinator: " + request.toString(), e);  }  



toCoordinator.sendRequestToCoordinator方法就由handleCoordinationRequest接收,进入request instanceof SchemaChangeRequest中的handleSchemaChangeRequest方法。

public CompletableFuture<CoordinationResponse> handleCoordinationRequest(  CoordinationRequest request) {  CompletableFuture<CoordinationResponse> responseFuture = new CompletableFuture<>();  runInEventLoop(  () -> {  try {  if (request instanceof SchemaChangeRequest) {  SchemaChangeRequest schemaChangeRequest = (SchemaChangeRequest) request;  requestHandler.handleSchemaChangeRequest(  schemaChangeRequest, responseFuture);  } else if (request instanceof SchemaChangeResultRequest) {  requestHandler.getSchemaChangeResult(responseFuture);  } else if (request instanceof GetEvolvedSchemaRequest) {  handleGetEvolvedSchemaRequest(  ((GetEvolvedSchemaRequest) request), responseFuture);  } else if (request instanceof GetOriginalSchemaRequest) {  handleGetOriginalSchemaRequest(  (GetOriginalSchemaRequest) request, responseFuture);  } else {  throw new IllegalArgumentException(  "Unrecognized CoordinationRequest type: " + request);  }  } catch (Throwable t) {  context.failJob(t);  throw t;  }  },  "handling coordination request %s",  request);  return responseFuture;  


public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) {  runInEventLoop(  () -> {  try {  if (event instanceof FlushSuccessEvent) {  FlushSuccessEvent flushSuccessEvent = (FlushSuccessEvent) event;  LOG.info(  "Sink subtask {} succeed flushing for table {}.",  flushSuccessEvent.getSubtask(),  flushSuccessEvent.getTableId().toString());  requestHandler.flushSuccess(  flushSuccessEvent.getTableId(),  flushSuccessEvent.getSubtask(),  currentParallelism);  } else if (event instanceof SinkWriterRegisterEvent) {  requestHandler.registerSinkWriter(  ((SinkWriterRegisterEvent) event).getSubtask());  } else {  throw new FlinkException("Unrecognized Operator Event: " + event);  }  } catch (Throwable t) {  context.failJob(t);  throw t;  }  },  "handling event %s from subTask %d",  event,  subtask);  


(1)pendingSubTaskIds空 -> 继续执行
calculateDerivedSchemaChangeEvents方法是对事件作息写转换,根据的是flink的schema evolution的策略进行转换,例如通过返回空集合的方式进行忽略 。

`schema.change.behavior` is of enum type, and could be set to `exception`, `evolve`, `try_evolve`, `lenient` or `ignore`.



/**  * Handle the {@link SchemaChangeRequest} and wait for all sink subtasks flushing.  * * @param request the received SchemaChangeRequest  */public void handleSchemaChangeRequest(  SchemaChangeRequest request, CompletableFuture<CoordinationResponse> response) {  // We use requester subTask ID as the pending ticket, because there will be at most 1 schema  // change requests simultaneously from each subTask    int requestSubTaskId = request.getSubTaskId();  synchronized (schemaChangeRequestLock) {  // Make sure we handle the first request in the pending list to avoid out-of-order  // waiting and blocks checkpointing mechanism.        if (schemaChangeStatus == RequestStatus.IDLE) {  if (pendingSubTaskIds.isEmpty()) {  LOG.info(  "Received schema change event request {} from table {} from subTask {}. Pending list is empty, handling this.",  request.getSchemaChangeEvent(),  request.getTableId().toString(),  requestSubTaskId);  } else if (pendingSubTaskIds.get(0) == requestSubTaskId) {  LOG.info(  "Received schema change event request {} from table {} from subTask {}. It is on the first of the pending list, handling this.",  request.getSchemaChangeEvent(),  request.getTableId().toString(),  requestSubTaskId);  pendingSubTaskIds.remove(0);  } else {  LOG.info(  "Received schema change event request {} from table {} from subTask {}. It is not the first of the pending list ({}).",  request.getSchemaChangeEvent(),  request.getTableId().toString(),  requestSubTaskId,  pendingSubTaskIds);  if (!pendingSubTaskIds.contains(requestSubTaskId)) {  pendingSubTaskIds.add(requestSubTaskId);  }  response.complete(wrap(SchemaChangeResponse.busy()));  // (2) return;  }  SchemaChangeEvent event = request.getSchemaChangeEvent();  // If this schema change event has been requested by another subTask, ignore it.  if (schemaManager.isOriginalSchemaChangeEventRedundant(event)) {  LOG.info("Event {} has been addressed before, ignoring it.", event);  clearCurrentSchemaChangeRequest();  LOG.info(  "SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to duplicated request.",  request);  response.complete(wrap(SchemaChangeResponse.duplicate()));  return;  }  schemaManager.applyOriginalSchemaChange(event);  List<SchemaChangeEvent> derivedSchemaChangeEvents =  calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());  // (14)// If this schema change event is filtered out by LENIENT mode or merging table  // route strategies, ignore it.            if (derivedSchemaChangeEvents.isEmpty()) {  LOG.info("Event {} is omitted from sending to downstream, ignoring it.", event);  clearCurrentSchemaChangeRequest();  LOG.info(  "SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to ignored request.",  request);  response.complete(wrap(SchemaChangeResponse.ignored()));  return;  }  LOG.info(  "SchemaChangeStatus switched from IDLE to WAITING_FOR_FLUSH, other requests will be blocked.");  // This request has been accepted.  schemaChangeStatus = RequestStatus.WAITING_FOR_FLUSH;  // (3)currentDerivedSchemaChangeEvents = new ArrayList<>(derivedSchemaChangeEvents);  response.complete(wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents)));  // (3) } else {  LOG.info(  "Schema Registry is busy processing a schema change request, could not handle request {} for now. Added {} to pending list ({}).",  request,  requestSubTaskId,  pendingSubTaskIds);  if (!pendingSubTaskIds.contains(requestSubTaskId)) {  pendingSubTaskIds.add(requestSubTaskId);  }  response.complete(wrap(SchemaChangeResponse.busy()));  // (2) }  }  


  • FINISHED -> 重置自身状态并返回FINISHED状态
  • 非FINISHED -> 返回Processing状态,SchemaOperator#requestSchemaChangeResult接到SchemaChangeProcessingResponse会在while一直循环等待阻塞。
public void getSchemaChangeResult(CompletableFuture<CoordinationResponse> response) {  Preconditions.checkState(  schemaChangeStatus != RequestStatus.IDLE,  "Illegal schemaChangeStatus: should not be IDLE before getting schema change request results.");  if (schemaChangeStatus == RequestStatus.FINISHED) {  // (12)schemaChangeStatus = RequestStatus.IDLE;  LOG.info(  "SchemaChangeStatus switched from FINISHED to IDLE for request {}",  currentDerivedSchemaChangeEvents);  // This request has been finished, return it and prepare for the next request  List<SchemaChangeEvent> finishedEvents = clearCurrentSchemaChangeRequest();  SchemaChangeResultResponse resultResponse =  new SchemaChangeResultResponse(finishedEvents);  response.complete(wrap(resultResponse));  } else {  // Still working on schema change request, waiting it  response.complete(wrap(new SchemaChangeProcessingResponse()));  }  

(1)if (activeSinkWriters.size() < parallelism)内的就是上述过程。
(2)if (flushedSinkWriters.equals(activeSinkWriters))代表所有writer都完成了flush。而后修改handler状态为RequestStatus.APPLYING,即此handler正在apply schema change。接下来执行applySchemaChange方法 。

/**  * Record flushed sink subtasks after receiving FlushSuccessEvent. * * @param tableId the subtask in SchemaOperator and table that the FlushEvent is about  * @param sinkSubtask the sink subtask succeed flushing  */public void flushSuccess(TableId tableId, int sinkSubtask, int parallelism) {  flushedSinkWriters.add(sinkSubtask);  if (activeSinkWriters.size() < parallelism) {  LOG.info(  "Not all active sink writers have been registered. Current {}, expected {}.",  activeSinkWriters.size(),  parallelism);  return;  }  if (flushedSinkWriters.equals(activeSinkWriters)) {  Preconditions.checkState(  schemaChangeStatus == RequestStatus.WAITING_FOR_FLUSH,  "Illegal schemaChangeStatus state: should be WAITING_FOR_FLUSH before collecting enough FlushEvents, not "  + schemaChangeStatus);  schemaChangeStatus = RequestStatus.APPLYING;  // (9)LOG.info(  "All sink subtask have flushed for table {}. Start to apply schema change.",  tableId.toString());  schemaChangeThreadPool.submit(  () -> applySchemaChange(tableId, currentDerivedSchemaChangeEvents));  }  


  • {@code MetadataApplier} is used to apply metadata changes to external systems.


/**  * Apply the schema change to the external system. * * @param tableId the table need to change schema  * @param derivedSchemaChangeEvents list of the schema changes  */private void applySchemaChange(  TableId tableId, List<SchemaChangeEvent> derivedSchemaChangeEvents) {  for (SchemaChangeEvent changeEvent : derivedSchemaChangeEvents) {  if (changeEvent.getType() != SchemaChangeEventType.CREATE_TABLE) {  if (schemaChangeBehavior == SchemaChangeBehavior.IGNORE) {  currentIgnoredSchemaChanges.add(changeEvent);  continue;  }  }  if (!metadataApplier.acceptsSchemaEvolutionType(changeEvent.getType())) {  LOG.info("Ignored schema change {} to table {}.", changeEvent, tableId);  currentIgnoredSchemaChanges.add(changeEvent);  } else {  try {  metadataApplier.applySchemaChange(changeEvent);  LOG.info("Applied schema change {} to table {}.", changeEvent, tableId);  schemaManager.applyEvolvedSchemaChange(changeEvent);  currentFinishedSchemaChanges.add(changeEvent);  } catch (Throwable t) {  LOG.error(  "Failed to apply schema change {} to table {}. Caused by: {}",  changeEvent,  tableId,  t);  if (!shouldIgnoreException(t)) {  currentChangeException = t;  break;  } else {  LOG.warn(  "Failed to apply event {}, but keeps running in tolerant mode. Caused by: {}",  changeEvent,  t);  }  }  }  }  Preconditions.checkState(  schemaChangeStatus == RequestStatus.APPLYING,  "Illegal schemaChangeStatus state: should be APPLYING before applySchemaChange finishes, not "  + schemaChangeStatus);  schemaChangeStatus = RequestStatus.FINISHED;  LOG.info(  "SchemaChangeStatus switched from APPLYING to FINISHED for request {}.",  currentDerivedSchemaChangeEvents);  


// Schema change event state could transfer in the following way:  
//      -------- B --------  
//      |                 |  
//      v                 |  
//  --------           ---------------------  
//  | IDLE | --- A --> | WAITING_FOR_FLUSH |  
//  --------           ---------------------  
//     ^                        |  
//      E                       C  
//       \                      v  
//  ------------          ------------  
//  | FINISHED | <-- D -- | APPLYING |  
//  ------------          ------------  
//  A: When a request came to an idling request handler.  
//  B: When current request is duplicate or ignored by LENIENT / routed table merging  
// strategies.  
//  C: When schema registry collected enough flush success events, and actually started to apply  
// schema changes.  
//  D: When schema change application finishes (successfully or with exceptions)  
//  E: When current schema change request result has been retrieved by SchemaOperator, and ready  
// for the next request.  
private enum RequestStatus {  IDLE,  WAITING_FOR_FLUSH,  APPLYING,  FINISHED  






public void processElement(StreamRecord<Event> element) throws Exception {  Event event = element.getValue();  // FlushEvent triggers flush  if (event instanceof FlushEvent) {  handleFlushEvent(((FlushEvent) event));  return;  }  // CreateTableEvent marks the table as processed directly  if (event instanceof CreateTableEvent) {  processedTableIds.add(((CreateTableEvent) event).tableId());  this.<OneInputStreamOperator<Event, CommittableMessage<CommT>>>getFlinkWriterOperator()  .processElement(element);  return;  }  // Check if the table is processed before emitting all other events, because we have to make  // sure that sink have a view of the full schema before processing any change events,    // including schema changes.    ChangeEvent changeEvent = (ChangeEvent) event;  if (!processedTableIds.contains(changeEvent.tableId())) {  emitLatestSchema(changeEvent.tableId());  processedTableIds.add(changeEvent.tableId());  }  processedTableIds.add(changeEvent.tableId());  this.<OneInputStreamOperator<Event, CommittableMessage<CommT>>>getFlinkWriterOperator()  .processElement(element);  


  • flush: 将目前已经接受到所有数据写入目标库(相当于jdbc的commit操作)。
  • 发送事件:发送FlushSuccess。notifyFlushSuccess内容见类SchemaEvolutionClient
private void handleFlushEvent(FlushEvent event) throws Exception {  copySinkWriter.flush(false);  // (8) schemaEvolutionClient.notifyFlushSuccess(  getRuntimeContext().getIndexOfThisSubtask(), event.getTableId());  // (9)




public void notifyFlushSuccess(int subtask, TableId tableId) throws IOException {  toCoordinator.sendOperatorEventToCoordinator(  schemaOperatorID, new SerializedValue<>(new FlushSuccessEvent(subtask, tableId)));  



Gateway to send an OperatorEvent or CoordinationRequest from a Task to the OperatorCoordinator JobManager side.
This is the first step in the chain of sending Operator Events and Requests from Operator to Coordinator. Each layer adds further context, so that the inner layers do not need to know about the complete context, which keeps dependencies small and makes testing easier.OperatorEventGateway takes the event, enriches the event with the OperatorID, and forwards it to:
TaskOperatorEventGateway enriches the event with the ExecutionAttemptID and forwards it to the:
JobMasterOperatorEventGateway which is RPC interface from the TaskManager to the JobManager.
public interface TaskOperatorEventGateway {  /**  * Sends an event from the operator (identified by the given operator ID) to the operator     * coordinator (identified by the same ID).     */    void sendOperatorEventToCoordinator(OperatorID operator, SerializedValue<OperatorEvent> event);  /**  * Sends a request from current operator to a specified operator coordinator which is identified     * by the given operator ID and return the response.     */    CompletableFuture<CoordinationResponse> sendRequestToCoordinator(  OperatorID operator, SerializedValue<CoordinationRequest> request);  



/** {@code MetadataApplier} is used to apply metadata changes to external systems. */  
public interface MetadataApplier extends Serializable {  /** Apply the given {@link SchemaChangeEvent} to external systems. */  void applySchemaChange(SchemaChangeEvent schemaChangeEvent) throws SchemaEvolveException;  // (10) /** Sets enabled schema evolution event types of current metadata applier. */  default MetadataApplier setAcceptedSchemaEvolutionTypes(  Set<SchemaChangeEventType> schemaEvolutionTypes) {  return this;  }  /** Checks if this metadata applier should this event type. */  default boolean acceptsSchemaEvolutionType(SchemaChangeEventType schemaChangeEventType) {  return true;  }  /** Checks what kind of schema change events downstream can handle. */  default Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {  return Arrays.stream(SchemaChangeEventTypeFamily.ALL).collect(Collectors.toSet());  }  


org.apache.flink.cdc.connectors.doris.sink.DorisMetadataApplier 实现 MetadataApplier


// (10)
public void applySchemaChange(SchemaChangeEvent event) throws SchemaEvolveException {  try {  // send schema change op to doris  if (event instanceof CreateTableEvent) {  applyCreateTableEvent((CreateTableEvent) event);  } else if (event instanceof AddColumnEvent) {  applyAddColumnEvent((AddColumnEvent) event);  } else if (event instanceof DropColumnEvent) {  applyDropColumnEvent((DropColumnEvent) event);  } else if (event instanceof RenameColumnEvent) {  applyRenameColumnEvent((RenameColumnEvent) event);  } else if (event instanceof AlterColumnTypeEvent) {  applyAlterColumnTypeEvent((AlterColumnTypeEvent) event);  } else {  throw new UnsupportedSchemaChangeEventException(event);  }  } catch (Exception ex) {  throw new SchemaEvolveException(event, ex.getMessage(), null);  }  


private void applyAddColumnEvent(AddColumnEvent event)  throws IOException, IllegalArgumentException {  TableId tableId = event.tableId();  List<AddColumnEvent.ColumnWithPosition> addedColumns = event.getAddedColumns();  for (AddColumnEvent.ColumnWithPosition col : addedColumns) {  Column column = col.getAddColumn();  FieldSchema addFieldSchema =  new FieldSchema(  column.getName(),  buildTypeString(column.getType()),  column.getDefaultValueExpression(),  column.getComment());  schemaChangeManager.addColumn(  tableId.getSchemaName(), tableId.getTableName(), addFieldSchema);  }  




public boolean addColumn(String database, String table, FieldSchema field)  throws IOException, IllegalArgumentException {  if (checkColumnExists(database, table, field.getName())) {  LOG.warn(  "The column {} already exists in table {}, no need to add it again",  field.getName(),  table);  return true;  }  String tableIdentifier = getTableIdentifier(database, table);  String addColumnDDL = SchemaChangeHelper.buildAddColumnDDL(tableIdentifier, field);  return schemaChange(  database, table, buildRequestParam(false, field.getName()), addColumnDDL);  




// (11)
private static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s";
public static String buildAddColumnDDL(String tableIdentifier, FieldSchema fieldSchema) {  String name = fieldSchema.getName();  String type = fieldSchema.getTypeString();  String defaultValue = fieldSchema.getDefaultValue();  String comment = fieldSchema.getComment();  StringBuilder addDDL =  new StringBuilder(  String.format(  ADD_DDL,  DorisSchemaFactory.quoteTableIdentifier(tableIdentifier),  DorisSchemaFactory.identifier(name),  type));  if (defaultValue != null) {  addDDL.append(" DEFAULT ").append(DorisSchemaFactory.quoteDefaultValue(defaultValue));  }  commentColumn(addDDL, comment);  return addDDL.toString();  


  • SchemaOperator接收到SchemaChangeEvent,发送SchemaChangeRequest至SchemaRegistry。
  • SchemaRegistry内部执行器是SchemaRegistryRequestHandler,简称handler,handler内部持有有状态schemaChangeStatus其判断是否正在执行之前的Request,如果是则返回busy状态。如果不是则返回accept状态。其状态修改由RequestStatus.IDLERequestStatus.WAITING_FOR_FLUSH
  • SchemaOperator如果收到busy状态则sleep后再次发起请求,阻塞直到,收到accept状态,则发送一条FlushEvent至下游,之后发送SchemaChangeResultRequest至SchemaRegistry,等待返回结果如果是SchemaChangeProcessingResponse则认为SchemaChange还没有结束,sleep后再次发起请求,阻塞直至收到非SchemaChangeProcessingResponse。此时阻塞,不再发送新的表结构的数据至下游。
  • SchemaRegistry收到SchemaChangeResultRequest,handler会检查自身状态schemaChangeStatus,如果不是RequestStatus.FINISHED,则返回SchemaChangeProcessingResponse
  • DataSinkWriterOperator收到FlushEvent,并执行flush操作,将所有已经收到的老表结构的数据写入数据库。并发送FlushSuccessEvent给SchemaRegistry。
  • SchemaRegistry的handler收集FlushSuccessEvent,当收到所有的subtask的FlushSuccessEvent后,修改自身状态为RequestStatus.APPLYING。后使用MetadataApplier执行sink端(外)数据库的表结构变更。执行后修改自身状态为RequestStatus.FINISHED
  • 当SchemaOperator再次发送SchemaChangeResultRequest,且SchemaRegistry的handler的状态为RequestStatus.FINISHED,SchemaRegistry返回给其结果为 非SchemaChangeProcessingResponse,SchemaOperator将不再阻塞,开始将新的表结构的数据继续发送至下游。





