您的位置:首页 > 教育 > 锐评 > Merge-On-Read

Merge-On-Read

2025/4/4 14:29:49 来源:https://blog.csdn.net/blackjjcat/article/details/136855273  浏览:    关键词:Merge-On-Read

基本介绍

Iceberg的Merge-On-Read

Merge-On-Read,顾名思义,就是在读取的时候进行合并,是与Copy-On-Write相反的一种模式

在Iceberg中,Merge-On-Read同样用于行级更新,整体过程如下

当更新数据时,Iceberg不再Copy一份旧数据,而是直接将更新数据写入独立的一个文件用来标识需要删除的数据

这种模式减少了写入时的合并操作,但是加重了读取数据时的合并操作,因此适合写多读少的场景

Merge-On-Read类型

Iceberg的Merge-On-Read有两种类型,对应使用两种不同的方法来定位需要删除的数据,分别是:EqualityDelete和PositionDelete

EqualityDelete:等值删除。这种模式下,delete文件记录的内容跟数据文件一样,是行级的数据。进行读取合并时,使用指定的列做等值判断,进行数据的删除合并

PositionDelete:位置删除。这种模式下,delete文件记录的内容是需要删除的数据位置,即数据文件地址和数据行号。进行读取合并时,基于数据地址进行删除合并

目前Iceberg没有进行EqualityDelete和PositionDelete选择的配置,使用哪种模式就看API实现了哪个接口。比如配置Spark使用Merge-On-Read时,使用的就是PositionDelete

case MERGE_ON_READ:return new SparkPositionDeltaOperation(spark, table, branch, info, isolationLevel);

测试表现

执行如下测试

CREATE TABLE icedb.morTable (id bigint COMMENT 'unique id',data string)USING iceberg TBLPROPERTIES ('format-version'='2', 'write.update.mode' = 'merge-on-read');insert into icedb.morTable values (1, 'name1'),(99, 'name99');insert into icedb.morTable values (2, 'name1'),(88, 'name88');update icedb.morTable set data = 'update' WHERE id =1;
  1. snapshot:与Copy-On-Write表现一致
  2. manifest-list:这里存在差异,产生了一条content=1的数据,也就是被标记为deletes,其他三条数据都是content=0
  3. manifest:这里总共四个manifest(2条insert分别产生一个,一个update产生两个),这里四个manifest的status都为1(0=existing、1=added、2=deleted);注意的是data_file中的content字段,指向deletes文件的是1,其他为0(0=data, 1=position deletes, 2=equality deletes)
  4. datafile:主要关注update产生的两个文件,一个是正常的数据文件,存的是update后的数据(1, 'update' );另一个是position delete文件,存的是需要删除的文件地址和行号{"file_path": "hdfs://nameservice/spark/icedb.db/morTable/data/00000-0-52cbb7ca-6a2b-4f87-a2ba-fefa2470d5b8-00001.parquet", "pos": 0}

Spark Update更新流程

整体流程与Copy-On-Write是一样的,两者都是Iceberg用来做行级更新的,只是具体的实现过程不一样

扫描过程

第一步同样是做旧数据的扫描,根据update传入的where条件扫描数据

不同的是,构建BatchScan扫描器的时候,Merge-On-Read比Copy-On-Write少了一项设置:ignoreResiduals()

ignoreResiduals()的作用在Copy-On-Write中已经介绍过,是设置扫描过滤条件只应用到文件。也就是说,设置了此项的Copy-On-Write扫描的时候返回的是文件粒度的数据,会把不需要更新的数据也读出来重写一遍;而没有设置此项的Merge-On-Read只读取完全匹配的数据,并不会读取不需要更新的数据行来进行重写

这个的具体应用是在文件扫描任务BaseFileScanTask当中,构建的时候会设置这个参数

BaseFileScanTask baseFileScanTask =new BaseFileScanTask(dataFile, deleteFiles, ctx.schemaAsString(), ctx.specAsString(), ctx.residuals());

RewriteUpdateTable

Merge-On-Read走buildWriteDeltaPlan分支,生成的计划树如下

WriteIcebergDelta RelationV2[id#0L, data#1] spark_catalog.icedb.morTable spark_catalog.icedb.morTable+- UpdateRows[__row_operation#8, id#9L, data#10, _file#11, _pos#12L, _spec_id#13, _partition#14]+- Filter (id#0L = cast(1 as bigint))+- RelationV2[id#0L, data#1, _file#4, _pos#5L, _spec_id#2, _partition#3] spark_catalog.icedb.morTable spark_catalog.icedb.morTable

RowLevelCommandScanRelationPushDown

这步走第一个分支,rewritePlan为WriteIcebergDelta的分支,最终的计划树为

WriteIcebergDelta RelationV2[id#105L, data#106] spark_catalog.icedb.morTable spark_catalog.icedb.morTable+- UpdateRows[__row_operation#113, id#114L, data#115, _file#116, _pos#117L, _spec_id#118, _partition#119]+- Project [id#105L, data#106, _file#109, _pos#110L, _spec_id#107, _partition#108]+- Filter (isnotnull(id#105L) AND (id#105L = 1))+- RelationV2[id#105L, data#106, _file#109, _pos#110L, _spec_id#107, _partition#108] spark_catalog.icedb.morTable

ExtendedV2Writes

这一步走的是WriteIcebergDelta分支,最终的计划树为

UpdateIcebergTable [assignment(id#0L, id#0L), assignment(data#1, update2)], (id#0L = 1):- RelationV2[id#0L, data#1] spark_catalog.icedb.morTable spark_catalog.icedb.morTable+- WriteIcebergDelta+- UpdateRows[__row_operation#8, id#9L, data#10, _file#11, _pos#12L, _spec_id#13, _partition#14]+- Project [id#0L, data#1, _file#4, _pos#5L, _spec_id#2, _partition#3]+- Filter (isnotnull(id#0L) AND (id#0L = 1))+- RelationV2[id#0L, data#1, _file#4, _pos#5L, _spec_id#2, _partition#3] spark_catalog.icedb.morTab

WriteDeltaExec

最后同样进入ExtendedDataSourceV2Strategy,此处走WriteIcebergDelta分支,生成WriteDeltaExec(3.4这个类Spark集成了,所以不在Iceberg里了

WriteDeltaExec与ReplaceDataExec一样是V2ExistingTableWriteExec的子类,不同的是WriteDeltaExec里对writingTask进行了特殊的实现,分别是DeltaWritingSparkTask和DeltaWithMetadataWritingSparkTask

两个writingTask的实现类基本相同,核心逻辑是根据不同的操作类型(insert、update、delete)调用DeltaWriter的对应接口

这里的DeltaWriter就是前面SparkPositionDeltaOperation创建的SparkPositionDeltaWrite,目前测试看update语句走的是insert+delete的流程

insert就是正常的流程,SparkPositionDeltaWrite的delete接口接收数据,这个数据是包含两个字段filePath和position,最终把这个字段写入delete类型的文件当中,写delete文件的时候在result的字段会记录delete列表,最终commit提交的时候就有对应关系

看前面的计划树,返回的数据schema当中包含了(_file#4, _pos#5L),这个就是delete这边数据的来源

定位信息

前面说过,最终写入delete文件的是file、pos两个字段,这两个字段在构建计划树的时候已经放在了schema上,来源是RewriteUpdateTable.buildWriteDeltaPlan,也就是RewriteUpdateTable的SupportsDelta分支

分支会重写计划,重写计划的时候有一步resolveRowIdAttrs,这一步会提取相应字段,让最终的数据源返回上加上file和position信息

读取数据

首先在创建ManifestGroup的时候,会去构建deleteIndexBuilder,deleteIndexBuilder用于查询标记为delete的文件,其输入是deleteManifests。

Iceberg元数据可以单独获取对应的类型的Manifest,具体操作在build的时候,build当中会单独对EqualityDelete和PositionDelete做操作,操作相同,只是找的文件类型不同

最后在createFileScanTasks构建扫描任务的时候,会把DeleteFile信息放入扫描任务

最终生效在读取文件的时候。以Spark为例,在BatchDataReader当中,基于上诉生成的扫描任务,首先获取要读取的文件,然后获取对应的delete文件封装SparkDeleteFilter,最终传入reader函数

真正生效的地方在ColumnarBatchReader当中,调用读取的时候会略过传入的delete对应的点,这样就只读取有效函数,忽略delete数据

具体的逻辑是在spark里,首先传入delete文件当中的position单独读取要删除的数据;然后在spark的逻辑里,从数据集当中把读到的这个数据给删除掉,在BufferedRowIterator的next当中,有删除逻辑

public InternalRow next() {return currentRows.remove();}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com