背景
最近偶然间看到了一篇文章一文掌握 Velox orderby 算子的排序算法,里面主要说到了Velox PrefixSort怎么用排序算饭加速大数据的排序的,其中有说到:
排序的过程,主要考虑霎三件事情:
1. 选择比较函数
2. 选择排序算法
3. 排序过程中的数据移动,移动数据或者移动指针?
最让我深有感触的是 这里面涉及到的比较函数,这里的主要思想就是:
把所有的类型的比较(无论是字符串还是整数等),都转换为二进制字符串的比较,那么这在比较的速度上就会能够充分利用硬件资源,使得加速。其中就会涉及到各个字段类型的规范化: 按照order by的顺序依次进行规范,如果遇到不能规范的字段类型,则后续的规范直接中断。
当然具体的Velox的代码我是没有去看,但是我们可以解析一下Spark中的Sort是怎么实现的,作为大数据的标杆组件,我们可以看一下,本文基于Spark 3.5
分析
直接切入到SortExec
类,其中有个createSorter
方法,这里会构建排序函数,我们这里的重点不是排序函数,而是比较函数的实现:
val ordering = RowOrdering.create(sortOrder, output)// The comparator for comparing prefixval boundSortExpression = BindReferences.bindReference(sortOrder.head, output)val prefixComparator = SortPrefixUtils.getPrefixComparator(boundSortExpression)val canUseRadixSort = enableRadixSort && sortOrder.length == 1 &&SortPrefixUtils.canSortFullyWithPrefix(boundSortExpression)// The generator for prefixval prefixExpr = SortPrefix(boundSortExpression)val prefixProjection = UnsafeProjection.create(Seq(prefixExpr))val prefixComputer = new UnsafeExternalRowSorter.PrefixComputer {private val result = new UnsafeExternalRowSorter.PrefixComputer.Prefixoverride def computePrefix(row: InternalRow):UnsafeExternalRowSorter.PrefixComputer.Prefix = {val prefix = prefixProjection.apply(row)result.isNull = prefix.isNullAt(0)result.value = if (result.isNull) prefixExpr.nullValue else prefix.getLong(0)result}}val pageSize = SparkEnv.get.memoryManager.pageSizeBytesrowSorter = UnsafeExternalRowSorter.create(schema, ordering, prefixComparator, prefixComputer, pageSize, canUseRadixSort)
先说 UnsafeExternalRowSorter
中 内存排序(UnsafeInMemorySorter)最基本的思想:先根据前缀比较算法进行比较,如果相等的话,则再遍历实际数据的指针去获取真正的数据进行比较,这种可以规避随机内存读取从而提交缓存的命中率,进而提高比较的速度。
再说 这里自定义的前缀比较:
-
BindReferences.bindReference(sortOrder.head, output) 这里指选择第一个排序的字段作为前缀比较的类型
-
val prefixComparator = SortPrefixUtils.getPrefixComparator(boundSortExpression)
这里会根据排序的字段类型选择出对应的排序方法:sortOrder.dataType match {case StringType => stringPrefixComparator(sortOrder)case BinaryType => binaryPrefixComparator(sortOrder)case BooleanType | ByteType | ShortType | IntegerType | LongType | DateType | TimestampType |TimestampNTZType | _: AnsiIntervalType =>longPrefixComparator(sortOrder)case dt: DecimalType if dt.precision - dt.scale <= Decimal.MAX_LONG_DIGITS =>longPrefixComparator(sortOrder)case FloatType | DoubleType => doublePrefixComparator(sortOrder)case dt: DecimalType => doublePrefixComparator(sortOrder)case _ => NoOpPrefixComparator}
最后 就只有两种前缀比较器 UnsignedPrefixComparator SignedPrefixComparator NoOpPrefixComparator
对于 String 以及Binary double float 这种会选择无符号的前缀比较
对于 double等基本数据类型会选择 有符号的前缀比较
这里为什么会这么选择,其实是跟内部的类型存储有关以及prefixExpr 和 prefixComputer
选择的Prefix
有关 -
计算前缀
主要涉及以下val prefixExpr = SortPrefix(boundSortExpression)val prefixProjection = UnsafeProjection.create(Seq(prefixExpr))val prefixComputer = new UnsafeExternalRowSorter.PrefixComputer
这里主要利用代码生成的方式,通过prefixProjection.apply(row) 这只拿了第一个sortOrder的表达式,所以是以第一个sort表达式来获取前缀比较的。
其中SortPrefix
中的方法calcPrefix
会根据Spark的内部类型,获取Long类型的可以用于比较的值,所以我们可以看到在prefixComputer
的computePrefix
方法中可以通过getLong(0)
来获取对应的值。这样在后续内存排序(UnsafeInMemorySorter)中就可以用该long值进行排序。
其他
这里特别说一下:两种类型的BinaryType(对应内部的类型为Array[Byte]) 和 StringType(对应的内部的类型为UTF8String) 获取prefix的.
注意UTF8String 内部也是以 Array[Byte]
存储的.
这两个都是通过ByteArray.getPrefix
方法来获取对应的值的。
其中 Platform.BYTE_ARRAY_OFFSET
调用UNSAFE.arrayBaseOffset(byte[].class)
获取数组第一个元素相对于数组起始地址的偏移量.
public static long getPrefix(byte[] bytes) {if (bytes == null) {return 0L;}return getPrefix(bytes, Platform.BYTE_ARRAY_OFFSET, bytes.length);}static long getPrefix(Object base, long offset, int numBytes) {
getPrefix
这个方法从字节数组取numBytes 个字节数之后组成Long类型返回。
其实byte的内部的底层也是按照数字存储的,取值范围是[-128,127],所以在底层转换为long也是可以的。