您的位置:首页 > 文旅 > 美景 > 小程序开发步骤大全_唐山软件开发公司排名_搜索指数的数据来源是什么_seo建站

小程序开发步骤大全_唐山软件开发公司排名_搜索指数的数据来源是什么_seo建站

2024/12/23 10:46:12 来源:https://blog.csdn.net/qq_43899283/article/details/142859193  浏览:    关键词:小程序开发步骤大全_唐山软件开发公司排名_搜索指数的数据来源是什么_seo建站
小程序开发步骤大全_唐山软件开发公司排名_搜索指数的数据来源是什么_seo建站

VecSortAgg

  • 概述
  • SortAggRunner::SortAggRunner 函数
  • SortAggRunner::init_phase 函数
  • SortAggRunner::init_indexForApFun 函数
  • SortAggRunner::set_key 函数
  • BaseAggRunner::initialize_sortstate 函数
  • SortAggRunner::BindingFp 函数
  • SortAggRunner::buildSortAgg 函数
  • SortAggRunner::set_group_num 函数
  • SortAggRunner::BatchMatchAndAgg 函数
  • SortAggRunner::Run 函数
  • SortAggRunner::GetSortSource 函数
  • SortAggRunner::GetSortSource 函数
  • SortAggRunner::ReturnData 函数
  • SortAggRunner::FreeSortGrpMem 函数
  • SortAggRunner::ReturnLastData 函数
  • SortAggRunner::ReturnNullData 函数
  • SortAggRunner::BuildNullScanBatch 函数
  • SortAggRunner::switch_phase 函数
  • SortAggRunner::ResetNecessary 函数
  • SortAggRunner::endSortAgg 函数

声明:本文的部分内容参考了他人的文章。在编写过程中,我们尊重他人的知识产权和学术成果,力求遵循合理使用原则,并在适用的情况下注明引用来源。
本文主要参考了 OpenGauss5.1.0 的开源代码和《OpenGauss数据库源码解析》一书

概述

  在 openGauss (OG) 中,VecSortAgg 是一种基于矢量化的排序聚合操作,它用于在执行 SQL 查询时高效地对数据进行分组和聚合。与传统的逐行处理不同,VecSortAgg 通过批量处理数据来提高计算效率,特别适用于大规模数据集的聚合操作。VecSortAgg 支持诸如 GROUP BYROLLUP复杂的分组和多阶段聚合操作。其实现中包含了排序、去重、聚合函数应用等步骤,且通过内存管理机制(如批处理缓存和排序器)优化了内存使用,确保计算的高效性和稳定性。

SortAggRunner::SortAggRunner 函数

  SortAggRunner::SortAggRunnerSortAggRunner 类的构造函数用于初始化排序聚合Sort Aggregation)操作的相关成员变量和资源SortAggRunner 继承自 BaseAggRunner,用于处理向量化的排序聚合逻辑,特别是在处理复杂聚合查询(如涉及 GROUP BYOLAP 函数)时。

/** @Description: Sort agg constructed function.* @描述: 构造SortAggRunner函数,用于处理聚合操作的排序。*/
SortAggRunner::SortAggRunner(VecAggState* runtime): BaseAggRunner(runtime, false),  // 调用基类BaseAggRunner的构造函数,传入runtime和false(表示不使用streaming aggregation)。m_FreeMem(false),  // 初始化m_FreeMem为false,表示是否释放内存。m_ApFun(false),  // 初始化m_ApFun为false,表示是否使用OLAP函数。m_noData(true),  // 初始化m_noData为true,表示当前没有数据。m_batchSortIn(NULL),  // 初始化m_batchSortIn为NULL,表示输入批处理为空。m_batchSortOut(NULL),  // 初始化m_batchSortOut为NULL,表示输出批处理为空。m_SortBatch(NULL),  // 初始化m_SortBatch为NULL,表示排序批处理对象为空。m_sortSource(NULL)  // 初始化m_sortSource为NULL,表示排序源为空。
{VecAgg* node = NULL;  // 声明一个VecAgg指针node并初始化为NULL。errno_t rc;node = (VecAgg*)(runtime->ss.ps.plan);  // 从runtime的计划状态中获取VecAgg节点。init_phase();  // 调用init_phase()函数初始化聚合的阶段信息。/* OLAP function */if (node->groupingSets) {  // 如果节点包含分组集(grouping sets),说明需要特殊处理。m_ApFun = true;  // 将m_ApFun标记为true,表示当前使用的是OLAP函数。init_indexForApFun();  // 初始化与OLAP函数相关的索引。if (m_runtime->numphases > 1) {  // 如果存在多个阶段(numphases > 1),需要保存每个阶段的返回结果。/* Store return results of phase sort. */m_SortBatch = New(CurrentMemoryContext)VectorBatch(CurrentMemoryContext, m_runtime->ss.ss_ScanTupleSlot->tts_tupleDescriptor);  // 分配内存并初始化m_SortBatch,用于保存阶段排序的结果批处理。}build_batch();  // 构建批处理,准备进行数据聚合操作。m_cellSize = offsetof(hashCell, m_val) + m_cols * sizeof(hashVal);  // 计算哈希单元的大小,用于存储聚合键值对。}m_groupState = GET_MATCH_KEY;  // 设置分组状态为GET_MATCH_KEY,表示正在寻找匹配的键值。/* Init sort state for distinct operate. */int64 workmem = SET_NODEMEM(node->plan.operatorMemKB[0], node->plan.dop);  // 计算用于排序操作的工作内存。int64 maxmem = (node->plan.operatorMaxMem > 0) ? SET_NODEMEM(node->plan.operatorMaxMem, node->plan.dop) : 0;  // 如果有最大内存限制,则设置最大内存,否则为0。initialize_sortstate(workmem, maxmem, node->plan.plan_node_id, SET_DOP(node->plan.dop));  // 初始化排序状态,传入工作内存、最大内存、节点ID和并行度(dop)。int numset = Max(m_runtime->maxsets, 1);  // 计算最大集合数,至少为1。m_sortGrps = (GroupintAtomContainer*)palloc0(numset * sizeof(GroupintAtomContainer));  // 分配并初始化用于存储分组信息的结构体数组。for (int i = 0; i < numset; i++) {  // 遍历每个集合进行初始化。rc = memset_s(m_sortGrps[i].sortGrp, 2 * BatchMaxSize * sizeof(hashCell*), 0, 2 * BatchMaxSize * sizeof(hashCell*));  // 将排序分组数组清零,确保没有旧数据残留。securec_check(rc, "\0", "\0");  // 检查memset操作是否成功。m_sortGrps[i].sortGrpIdx = 0;  // 初始化分组索引为0。m_sortGrps[i].sortCurrentBuf = New(CurrentMemoryContext) VarBuf(CurrentMemoryContext);  // 为当前排序分组分配新的缓冲区。m_sortGrps[i].sortCurrentBuf->Init();  // 初始化当前分组的缓冲区。m_sortGrps[i].sortBckBuf = New(CurrentMemoryContext) VarBuf(CurrentMemoryContext);  // 为备份缓冲区分配新的内存。m_sortGrps[i].sortBckBuf->Init();  // 初始化备份缓冲区。}m_runState = AGG_FETCH;  // 设置运行状态为AGG_FETCH,表示当前阶段为获取数据。m_prepareState = GET_SOURCE;  // 设置准备状态为GET_SOURCE,表示准备从数据源获取数据。BindingFp();  // 绑定所需的函数指针或回调函数。
}

示例

  假设我们有一个SQL查询,涉及到对大数据集的分组聚合,并且需要对聚合结果进行排序。这个查询可能涉及某种 OLAP (Online Analytical Processing) 函数,如GROUPING SETS,或对某些列执行DISTINCT操作。这时,SortAggRunner类的构造函数会负责初始化排序聚合的执行环境。

示例SQL查询:

SELECT department, COUNT(employee_id)
FROM employees
GROUP BY department
GROUPING SETS ((department), ())
ORDER BY department;

  在上述查询中,要求根据department进行聚合计算员工数量,并对结果进行排序。为了实现这个功能,数据库引擎需要在聚合阶段(GROUP BYGROUPING SETS)中进行排序。这时候,SortAggRunner类的构造函数将会被调用,确保为后续的排序聚合以及分组操作分配和初始化内存。

  1. 首先,构造函数会确定是否使用OLAP函数(通过GROUPING SETS)。
  2. 接着,会初始化用于存储中间结果的批处理(m_SortBatch),并分配内存。
  3. 它还会初始化不同的状态,如当前处理阶段的状态m_runState)以及排序状态initialize_sortstate())。
  4. 分配与分组和排序相关的内存结构m_sortGrps),并为每个集合(numset)准备独立的缓冲区,用来存储中间分组结果

SortAggRunner::init_phase 函数

  SortAggRunner::init_phase 函数的主要功能是初始化排序和阶段信息,为下一阶段的分组聚合操作做好准备。具体来说,如果当前阶段不是最后一个阶段,那么它将为下一阶段进行必要的排序初始化。这个函数确保在多阶段聚合操作中,数据可以按照正确的顺序和内存使用策略进行处理。它会设置排序所需的内存,并根据外部计划节点的元组描述符配置排序参数。

  1. 阶段切换: 如果当前阶段不是最后阶段,会为下一阶段的排序操作配置排序节点。
  2. 排序初始化: 根据指定的列、操作符等参数,使用堆排序算法创建排序器。
  3. 内存管理: 设置排序的工作内存和最大内存限制,确保后续操作在内存限制内执行。
/** @Description: 初始化排序和阶段信息,用于下一阶段的分组操作*/
void SortAggRunner::init_phase()
{/** 如果这不是最后一个阶段,我们需要为下一个阶段按适当顺序进行排序。* 检查当前阶段是否小于总阶段数的最后一个阶段。*/if (m_runtime->current_phase < m_runtime->numphases - 1) {// 获取下一阶段的排序节点Sort* sort_node = m_runtime->phases[m_runtime->current_phase + 1].sortnode;// 获取外部节点的计划状态PlanState* outer_node = outerPlanState(m_runtime);// 获取外部节点的元组描述符,描述数据结构TupleDesc tup_desc = ExecGetResultType(outer_node);// 设置工作内存的大小,基于排序节点计划中的内存设置和并行度int64 work_mem = SET_NODEMEM(sort_node->plan.operatorMemKB[0], sort_node->plan.dop);// 设置最大内存的限制,基于计划中的最大内存值int64 max_mem = (sort_node->plan.operatorMaxMem > 0) ? SET_NODEMEM(sort_node->plan.operatorMaxMem, sort_node->plan.dop) : 0;// 初始化批次排序对象,采用堆排序的方式,配置相关的排序信息m_batchSortOut = batchsort_begin_heap(tup_desc,                     // 外部节点的元组描述符sort_node->numCols,            // 排序的列数sort_node->sortColIdx,         // 排序列的索引sort_node->sortOperators,      // 排序操作符sort_node->collations,         // 排序的语言区域设置sort_node->nullsFirst,         // 空值是否排在前面work_mem,                      // 分配的工作内存false,                         // 是否进行唯一性检查(此处为否)max_mem                        // 最大内存限制);}// 确认当前阶段小于或等于阶段数Assert(m_runtime->current_phase <= m_runtime->numphases);// 如果当前阶段小于总阶段数,指向当前的阶段信息if (m_runtime->current_phase < m_runtime->numphases) {// 将阶段指针指向当前阶段m_runtime->phase = &m_runtime->phases[m_runtime->current_phase];}
}

示例

  假设我们有一个多阶段的聚合查询,如下所示:

SELECT region, product, SUM(sales) 
FROM sales_data 
GROUP BY GROUPING SETS ((region), (product));

  在这个查询中,GROUPING SETS 表示多阶段的分组操作SortAggRunner::init_phase() 函数会在每个阶段切换时执行,用于为下一阶段(例如按 product 分组)准备排序操作:

  1. 当前阶段处理完毕:当按 region 分组的阶段完成后,该函数会初始化下一阶段的排序逻辑。
  2. 排序初始化:为下一阶段按 product 分组准备排序操作,分配所需的工作内存和最大内存,并确保批次数据按照正确的顺序传递到下一阶段。

SortAggRunner::init_indexForApFun 函数

  函数 init_indexForApFun() 的主要作用是OLAP 聚合操作(特别是 GROUPING SETS 等高级聚合功能)初始化索引和必要的列映射。具体步骤包括:

  1. 列集合的初始化: 首先从运行时上下文 m_runtime 中获取所有参与分组的列和其他需要的列,将它们放入一个 Bitmapset 集合中。
  2. 列位置映射: 为每一列在批处理中的位置分配索引,生成 m_cellBatchMap,它将批处理中的列位置映射到 cell 中。
  3. 分配分组键的内存: 通过计算最大分组列的长度,为保存当前分组键的索引和 cell 中的列位置分配内存,并调用 set_key() 设置这些键。
  4. 初始化聚合信息: 最终,调用 init_aggInfo()初始化聚合操作的信息。

  该函数在处理复杂的 OLAP 查询时,确保正确地将分组列和其他必要的列映射到批处理的正确位置,并为后续的聚合执行准备所需的索引和内存。

/** @Description: 初始化Ap函数的索引,用于OLAP(Online Analytical Processing)聚合操作的索引设定。*/
void SortAggRunner::init_indexForApFun()
{int i = 0;int index = 0;ListCell* lc = NULL;Bitmapset* all_need_cols = NULL;/* 获取所有参与分组的列 */foreach (lc, m_runtime->all_grouped_cols) {int var_number = lfirst_int(lc) - 1;  // 从链表中获取列号,列号减1以匹配内部存储格式all_need_cols = bms_add_member(all_need_cols, var_number);  // 将该列添加到需要的列的集合中}/* 获取其他未参与分组但需要的列 */foreach (lc, m_runtime->hash_needed) {int var_number = lfirst_int(lc) - 1;  // 同样减1调整列号all_need_cols = bms_add_member(all_need_cols, var_number);  // 添加未参与分组但需要的列}m_cellVarLen = bms_num_members(all_need_cols);  // 计算需要的列的总数m_cellBatchMap = (int*)palloc(sizeof(int) * m_cellVarLen);  // 分配内存,保存列在批处理中位置的映射/** m_cellBatchMap 映射outbatch中的列位置到cell中。* 例如,m_cellBatchMap[0] = 3 表示:批处理的第3列存储在cell的第0列中*/while ((index = bms_first_member(all_need_cols)) >= 0) {  // 获取集合中每个列的位置m_cellBatchMap[i] = index;  // 将位置映射到m_cellBatchMap数组i++;}int max_group_col_len = 0;for (i = 0; i < m_runtime->numphases; i++) {  // 遍历所有聚合阶段,找到最大分组列数int group_col = m_runtime->phases[i].aggnode->numCols;  // 获取当前阶段的分组列数if (group_col > max_group_col_len) {max_group_col_len = group_col;  // 更新最大分组列数}}/* 保存当前需要处理的分组列,它们的值将在执行过程中设置 */m_keyIdx = (int*)palloc(sizeof(int) * max_group_col_len);  // 分配内存,保存分组列的索引m_keyIdxInCell = (int*)palloc(sizeof(int) * max_group_col_len);  // 分配内存,保存分组列在cell中的位置set_key();  // 设置分组键init_aggInfo(m_runtime->numaggs, m_runtime->aggInfo);  // 初始化聚合信息
}

示例

  假设我们有一个 SQL 查询,涉及复杂的分组集合操作,比如 GROUPING SETS

SELECT department, team, COUNT(employee_id)
FROM employees
GROUP BY GROUPING SETS ((department, team), (department), ());

  这个查询要求数据库首先根据 departmentteam 进行分组计算,然后根据 department 单独进行分组,最后计算所有员工的总数。为了支持这种复杂的分组逻辑,数据库引擎需要对不同的分组列建立映射,并将这些列的位置与批处理中的列进行关联。
  init_indexForApFun() 函数在此场景下会负责为 departmentteam 这些分组列初始化索引和列位置映射。具体来说,它会:

  1. departmentteam 列添加到需要处理的列集合中。
  2. 生成列在批处理中的位置映射,确保每次处理 GROUPING SETS 时能够正确地定位列。
  3. 分组键聚合操作分配内存并初始化,以便后续的聚合过程能够顺利执行。

SortAggRunner::set_key 函数

  此函数的主要作用是设置分组键(key),为当前聚合阶段准备好需要的分组列及其在批次中的映射。在执行分组聚合时,需要知道如何对元组进行分组,因此该函数从 m_runtime 中提取分组信息,包括列索引比较函数,将它们映射到内部的结构中,供后续的聚合处理使用。
  具体功能分为以下几部分:

  1. 初始化分组列索引:从当前聚合阶段 (phase) 中提取需要分组的列索引 (grpColIdx),这些列是分组操作的基础。由于索引从1开始,代码减去1以适应内部的存储结构。
  2. 映射分组列:通过 m_cellBatchMap 将这些分组列的索引映射到输出批次(outbatch)中,确保在执行时可以找到对应的列。
  3. 设置比较函数:最后,设置分组键的比较函数(eqfunctions),用于后续对这些键的相等性判断。
void SortAggRunner::set_key()
{int i = 0;// 获取当前阶段的分组列数量(gset_lengths[0])作为m_key的值m_key = m_runtime->phase->gset_lengths[0]; // 初始化 m_keyIdx:获取分组列的索引,将这些索引保存到 m_keyIdx 数组中,索引从1开始,因此需要减1for (i = 0; i < m_key; i++) {m_keyIdx[i] = m_runtime->phase->aggnode->grpColIdx[i] - 1;}// 查找这些分组列在输出批次(outbatch)中对应的位置for (i = 0; i < m_key; i++) {bool is_found = false;  // 用于标记当前列在 m_cellBatchMap 中是否找到匹配的列for (int k = 0; k < m_cellVarLen; k++) {// 如果分组列的索引在 m_cellBatchMap 中找到匹配,则记录该列的索引if (m_keyIdx[i] == m_cellBatchMap[k]) {is_found = true;m_keyIdxInCell[i] = k;  // 记录该分组列在 cell 中的索引位置}}// 确保分组列在 m_cellBatchMap 中找到对应的列Assert(is_found);}// 设置当前阶段的相等比较函数,m_eqfunctions 用于分组列的比较m_eqfunctions = m_runtime->phase->eqfunctions;
}

示例

  假设我们有一个名为 sales 的表,其结构如下:

CREATE TABLE sales (region VARCHAR,product VARCHAR,revenue INT,year INT
) with (orientation = column);

  现在,我们希望通过以下 SQL 查询对 sales 表的数据按 regionyear 进行分组,统计每组的总收入 (revenue):

SELECT region, year, SUM(revenue) 
FROM sales 
GROUP BY region, year;

  在执行计划中,这个查询会生成一个聚合节点(Agg),其分组列为 regionyear,并且会使用聚合函数 SUM(revenue) 计算每个分组的总收入。
  在执行这个查询时,set_key() 函数负责对聚合的分组列(regionyear)进行处理,确保它们能够在后续的聚合操作中被正确使用。下面是该函数如何处理这个 SQL 查询中的分组列的具体步骤:

  1. 确定分组列数量:

m_key = m_runtime->phase->gset_lengths[0]
对于该查询,分组列regionyear,所以 gset_lengths[0] 返回 2,表示有两列需要进行分组,即 regionyear。此时,m_key 会被设置为 2

  1. 设置分组列索引:

m_keyIdx[i] = m_runtime->phase->aggnode->grpColIdx[i] - 1
假设 grpColIdx 包含列的索引(region 的索引为 1year 的索引为 4),通过减1操作后,m_keyIdx 将设置为 [0, 3],表示我们将对第1列和第4列进行分组

  1. 映射分组列到批次中的位置:

m_keyIdxInCell[i] = k
假设 m_cellBatchMap 保存了批次中列的映射,例如批次中的列是 [0, 1, 2, 3](代表 region, product, revenue, year),m_keyIdxInCell 将被设置为 [0, 3],表示 region 对应于批次中的第0列,year 对应于批次中的第3列。

  1. 设置相等比较函数:

m_eqfunctions = m_runtime->phase->eqfunctions
这一步会为每个分组列设置相应的相等比较函数,例如对 regionyear 使用字符串和整数的相等比较函数,确保在后续的分组过程中可以正确判断分组键是否相等。

BaseAggRunner::initialize_sortstate 函数

  该函数的主要作用是在执行包含 DISTINCT 操作的聚合查询时,为每个聚合操作初始化排序状态。如果一个聚合操作需要对某些列进行去重(即 DISTINCT 操作),该函数会为每个阶段分配相关的批处理和排序状态,以便后续在执行聚合时能够正确地对结果进行去重和排序。
  函数首先遍历所有的聚合操作,检查是否有需要 DISTINCT 操作的列。如果有,则为每个聚合操作分配 SortDistinct 结构,其中包含去重所需的批处理和排序状态。

void BaseAggRunner::initialize_sortstate(int work_mem, int max_mem, int plan_id, int dop)
{int agg_no;VecAggStatePerAgg per_agg = m_runtime->pervecagg;  // 获取聚合操作的状态信息,存储了每个聚合的相关属性VecAggStatePerAgg per_agg_state = NULL;  // 初始化单个聚合状态指针为 NULL// 循环遍历每个聚合操作,检查是否有 distinct 操作for (agg_no = 0; agg_no < m_aggNum; agg_no++) {per_agg_state = &per_agg[agg_no];if (per_agg_state->numDistinctCols > 0) {m_hasDistinct = true;  // 如果当前聚合操作包含 distinct 列,则标记为 truebreak;  // 有 distinct 列,直接跳出循环}}// 如果有 distinct 操作,开始初始化 distinct 相关的排序状态if (m_hasDistinct) {/** 我们需要为最多的阶段数分配空间,而不是仅为第一个解析阶段分配空间,* 否则可能导致非法内存访问。*/int numset = Max(m_runtime->maxsets, 1);  // 计算最多的阶段数,确保至少有一个阶段m_sortDistinct = (SortDistinct*)palloc0(numset * sizeof(SortDistinct));  // 为 SortDistinct 结构分配内存// 遍历每个阶段,初始化 distinct 相关的批处理和排序状态for (int i = 0; i < numset; i++) {// 为每个聚合操作分配批处理和排序状态数组m_sortDistinct[i].aggDistinctBatch = (VectorBatch**)palloc(m_aggNum * sizeof(VectorBatch*));m_sortDistinct[i].batchsortstate = (Batchsortstate**)palloc0(m_aggNum * sizeof(Batchsortstate*));// 分配存储 distinct 最后值的数组m_sortDistinct[i].lastVal = (ScalarValue*)palloc0(m_aggNum * sizeof(ScalarValue));m_sortDistinct[i].lastValLen = (int*)palloc0(m_aggNum * sizeof(int));// 遍历每个聚合操作,初始化 distinct 相关的批处理和排序for (agg_no = 0; agg_no < m_aggNum; agg_no++) {per_agg_state = &per_agg[agg_no];// 如果当前聚合操作包含排序列,初始化相应的批处理和排序状态if (per_agg_state->numSortCols > 0) {// 初始化用于 distinct 聚合的批处理对象m_sortDistinct[i].aggDistinctBatch[agg_no] =New(CurrentMemoryContext) VectorBatch(CurrentMemoryContext, per_agg_state->evaldesc);// 初始化 distinct 的排序状态,使用批处理排序函数 `batchsort_begin_heap`m_sortDistinct[i].batchsortstate[agg_no] = batchsort_begin_heap(per_agg_state->evaldesc,per_agg_state->numSortCols,  // 排序列数per_agg_state->sortColIdx,  // 排序列的索引per_agg_state->sortOperators,  // 排序操作符per_agg_state->sortCollations,  // 排序的排序规则per_agg_state->sortNullsFirst,  // 排序时 NULL 是否排在最前work_mem,  // 分配的工作内存false,  // 不启用索引支持max_mem,  // 分配的最大内存plan_id,  // 当前执行计划的节点 IDdop);  // 并行度// 如果列是压缩类型,则为 lastVal 分配空间if (COL_IS_ENCODE(m_sortDistinct[i].aggDistinctBatch[agg_no]->m_arr->m_desc.typeId)) {m_sortDistinct[i].lastVal[agg_no] = PointerGetDatum((char*)palloc(100));  // 分配100字节存储 lastValm_sortDistinct[i].lastValLen[agg_no] = 100;  // 设置 lastVal 的长度为100字节}} else {// 如果当前聚合操作不包含排序列,则将排序状态和批处理对象设为 NULLm_sortDistinct[i].batchsortstate[agg_no] = NULL;m_sortDistinct[i].aggDistinctBatch[agg_no] = NULL;}}}}
}

示例

  假设有如下表格 sales

CREATE TABLE sales (region VARCHAR,product VARCHAR,revenue INT,year INT
) with (orientation = column);

  执行一个查询,对 regionproduct 进行去重,同时聚合 revenue

SELECT region, product, SUM(DISTINCT revenue) 
FROM sales 
GROUP BY region, product;

  这个查询的作用是按 regionproductsales 表进行分组,并对每组中的 revenue 进行去重后求和。函数 initialize_sortstate 处理了该查询中 SUM(DISTINCT revenue) 部分的逻辑,特别是对 DISTINCT revenue 列的去重排序

函数逻辑与查询对应关系

  1. 判断是否存在 DISTINCT 操作:

在函数开始时,遍历所有聚合操作 (m_aggNum),并通过 numDistinctCols 判断是否存在需要去重的列。这与查询中的 SUM(DISTINCT revenue) 直接对应,系统检测到 revenue 列需要去重。。

  1. 分配排序和去重的状态内存:

如果发现 DISTINCT 操作,函数为每个阶段分配内存 (palloc0),为 distinct 操作生成必要的 SortDistinct 数据结构。查询中的 SUM(DISTINCT revenue) 需要对revenue列进行排序并去重,因此会为 revenue 列分配批处理 (aggDistinctBatch) 和排序状态 (batchsortstate)。

  1. 去重操作的排序:

排序状态是通过 batchsort_begin_heap 初始化的。该排序器会按照传入的 sortColIdxsortOperators 对列进行排序。针对 SUM(DISTINCT revenue),排序器会确保在同一组 (regionproduct) 内对 revenue 列进行排序,然后去掉重复值

SortAggRunner::BindingFp 函数

  这段代码的作用是SortAggRunner 初始化相应的函数指针,根据不同的聚合查询情况(是否有 DISTINCT、是否使用简单 key)动态绑定合适的构建排序和聚合的函数。通过这种动态函数指针绑定机制,确保 SortAggRunner 在执行过程中选择正确的排序和聚合函数

  • 如果查询中包含 DISTINCT 聚合列,系统会选择带有 DISTINCT 处理逻辑的聚合函数。
  • 如果没有 DISTINCT 列,系统会选择标准的聚合函数
  • 如果有复杂的 key,例如多列分组,则会使用处理复杂 key 的函数。
  • 最后,通过判断是否有最终聚合操作,系统会选择合适的扫描批处理函数用于处理聚合后的结果。
void SortAggRunner::BindingFp()
{// 如果 key 是简单的(即没有复杂的 key 运算),选择不同的构建函数if (m_keySimple) {// 如果有 DISTINCT 操作,选择带有 DISTINCT 的构建函数if (m_hasDistinct) {m_buildSortFun = &SortAggRunner::buildSortAgg<true, true>; // 构建有简单 key 且有 DISTINCT 的排序聚合函数} else {m_buildSortFun = &SortAggRunner::buildSortAgg<true, false>; // 构建有简单 key 但没有 DISTINCT 的排序聚合函数}} else {// 如果 key 不是简单的,选择没有简单 key 的不同构建函数if (m_hasDistinct) {m_buildSortFun = &SortAggRunner::buildSortAgg<false, true>; // 构建无简单 key 但有 DISTINCT 的排序聚合函数} else {m_buildSortFun = &SortAggRunner::buildSortAgg<false, false>; // 构建无简单 key 且没有 DISTINCT 的排序聚合函数}}// 初始化构建扫描批处理的函数// 如果有最终聚合操作,则选择最终聚合的扫描批处理构建函数if (m_finalAggNum > 0)m_buildScanBatch = &BaseAggRunner::BuildScanBatchFinal; // 选择处理最终聚合结果的批处理函数elsem_buildScanBatch = &BaseAggRunner::BuildScanBatchSimple; // 否则选择简单批处理函数
}

SortAggRunner::buildSortAgg 函数

  SortAggRunner::buildSortAgg 模板函数用于处理向量批次中的分组和聚合操作。该函数根据 m_groupState 的状态来执行相应的操作,主要分为以下两种状态:

  1. GET_MATCH_KEY:负责获取当前分组号,如果所有分组都已处理完毕则返回退出。对于包含 Ap 函数(高级聚合函数)的查询,可能会需要为一个批次处理多个分组
  2. SORT_MATCH:负责对当前分组进行批次匹配和聚合操作,调用 BatchMatchAndAgg<simple, hashDistinct> 函数来进行具体的操作。处理完一个分组后,递增分组索引并返回到 GET_MATCH_KEY 状态继续处理。

  该函数可以处理多种场景,如:

  1. simple 参数用于指示是否为简单分组键
  2. hashDistinct 参数用于指示是否包含 DISTINCT 聚合操作。
template <bool simple, bool hashDistinct>
void SortAggRunner::buildSortAgg(VectorBatch* batch)
{int current_grp = 0;  // 初始化当前分组索引while (true) {  // 无限循环,直到手动返回switch (m_groupState) {  // 根据当前的分组状态执行不同的操作/** 获取分组列的数量。* 对于包含 Ap 函数的查询,可能需要为此批次处理多个分组。*/case GET_MATCH_KEY:// 如果无法设置当前的分组号(例如分组已经处理完毕),则退出if (!set_group_num(current_grp)) {return;  // 无法设置当前分组时,直接返回退出循环}m_groupState = SORT_MATCH;  // 设置成功后,转移到 SORT_MATCH 状态进行排序匹配break;case SORT_MATCH:// 执行批次的匹配和聚合操作BatchMatchAndAgg<simple, hashDistinct>(current_grp, batch);  // 根据当前分组,处理聚合和匹配current_grp++;  // 处理下一个分组m_groupState = GET_MATCH_KEY;  // 完成后,返回 GET_MATCH_KEY 状态准备处理下一分组break;default:// 如果进入了不支持的状态,则抛出错误ereport(ERROR, (errcode(ERRCODE_CASE_NOT_FOUND), errmsg("Unsupported state in vec sort agg")));break;}}
}

SortAggRunner::set_group_num 函数

  该函数的作用是根据当前的分组阶段分组次数,确定是否继续进行分组处理,并设置当前分组所使用的列数。函数会根据 m_ApFun 标志判断是否涉及复杂的多分组集情况。如果 m_ApFunfalse,表示简单分组,函数只处理一个分组集;如果 m_ApFuntrue,则根据当前阶段 phase 的设置逐个处理多个分组集
详细功能描述

  • 对于简单的 GROUP BY,只需要处理一次分组操作,函数会检查 current_grp 是否超过0,如果超过0则不再继续分组,返回 false停止分组操作
  • 对于复杂的 GROUP BY多分组集操作,函数会根据 current_grp 值来逐个处理不同的分组集。每个分组集的列数由 m_runtime->phase->gset_lengths[current_grp] 确定,并将这个列数赋给 m_key,以便后续操作使用。
/** @Description: 设置分组列数量,当在同一阶段切换分组时调用该函数* @in current_grp - 当前阶段的分组次数*/
bool SortAggRunner::set_group_num(int current_grp)
{// 简单的分组聚合,不涉及复杂的分组集(ApFun为false表示不涉及OLAP窗口函数或复杂分组)if (m_ApFun == false) {// 如果当前是第一个分组以外的分组,则不再需要处理,返回false表示结束分组处理if (current_grp > 0) {return false;}} else {// 对于复杂分组(ApFun为true),如果当前分组数达到了阶段的总分组数,则不再处理if (current_grp == m_runtime->phase->numsets) {return false;} else {// 设置当前分组列的长度,必须小于或等于当前阶段的分组集长度// 这里的gset_lengths保存了每个分组集的列数,current_grp指定了当前处理的是第几个分组集m_key = m_runtime->phase->gset_lengths[current_grp];return true; // 成功设置当前分组列的数量}}// 如果是简单分组,返回true表示还可以继续处理当前分组return true;
}

SortAggRunner::BatchMatchAndAgg 函数

  该函数用于在批处理聚合操作时,逐行处理数据,进行分组键的匹配以及聚合操作。根据是否启用了去重hashDistinct)和是否有简单分组simple),决定如何初始化和分配哈希单元,并对分组进行聚合计算。主要的操作包括:

  • 处理没有分组键的情况:如果没有分组键(m_key == 0),则直接对整个批次的数据进行聚合计算不做分组区分
  • 处理有分组键的情况:逐行检查当前行是否匹配已有分组键,如果匹配则将该行加入已有分组,如果不匹配切换到新的分组,并为该分组分配哈希单元。
  • 去重操作:如果启用了去重,则在分组键不匹配时,执行去重逻辑并对数据进行排序聚合
/** @Description: 匹配并执行聚合计算* @in current_grp - 当前阶段的分组次数* @in batch - 当前处理的数据批次*/
template <bool simple, bool hashDistinct>
void SortAggRunner::BatchMatchAndAgg(int current_grp, VectorBatch* batch)
{hashCell* cell = NULL;                     // 用于存储当前处理的哈希单元(存放分组结果)hashCell** sort_grp;                       // 当前分组的哈希单元数组(用于分组排序)int current_sortGrpIdx;                    // 当前分组排序索引int diff_group_row_num = 0;                // 用于记录分组切换时的行号int nrows = batch->m_rows;                 // 当前批次的总行数current_sortGrpIdx = m_sortGrps[current_grp].sortGrpIdx;  // 获取当前分组的排序索引m_sortCurrentBuf = m_sortGrps[current_grp].sortCurrentBuf; // 获取当前使用的缓冲区sort_grp = m_sortGrps[current_grp].sortGrp;               // 获取当前分组的哈希单元数组// 如果没有分组键(即m_key为0),处理整个批次数据,不需要分组if (m_key == 0) {for (int i = 0; i < nrows; i++) {cell = sort_grp[current_sortGrpIdx];  // 获取当前哈希单元// 如果当前哈希单元为空,则分配一个新的哈希单元if (cell == NULL) {cell = (hashCell*)m_sortCurrentBuf->Allocate(m_cellSize); // 分配内存sort_grp[current_sortGrpIdx] = cell;                      // 存储新哈希单元initCellValue<simple, true>(batch, cell, i);              // 初始化哈希单元的值}m_Loc[i] = cell;  // 将当前行的哈希单元存储到结果位置}} else {  // 如果有分组键for (int i = 0; i < nrows; i++) {cell = sort_grp[current_sortGrpIdx];  // 获取当前哈希单元bool matched_keys = false;  // 用于判断是否匹配当前分组键if (cell != NULL) {// 如果启用了JIT编译,则调用已生成的代码进行分组键匹配if (m_runtime->jitted_SortAggMatchKey) {typedef bool (*match_key_func)(SortAggRunner* tbl, VectorBatch* batch, int batchIdx, hashCell* cell);matched_keys = ((match_key_func)(m_runtime->jitted_SortAggMatchKey))(this, batch, i, cell);} else {// 否则使用默认的分组键匹配逻辑matched_keys = match_key<simple>(batch, i, cell);}}// 如果当前哈希单元为空或分组键匹配失败if (cell == NULL || matched_keys == false) {if (cell != NULL) {current_sortGrpIdx++;  // 切换到下一个分组// 如果分组索引达到批次最大大小,切换缓冲区if (current_sortGrpIdx == BatchMaxSize) {VarBuf* tmp = NULL;tmp = m_sortGrps[current_grp].sortCurrentBuf;m_sortGrps[current_grp].sortCurrentBuf = m_sortGrps[current_grp].sortBckBuf;m_sortGrps[current_grp].sortBckBuf = tmp;m_sortCurrentBuf = m_sortGrps[current_grp].sortCurrentBuf;}// 如果启用了去重if (hashDistinct) {m_sortDistinct[current_grp].m_distinctLoc = cell;  // 保存当前去重位置AppendBatchForSortAgg(batch, diff_group_row_num, i, current_grp);  // 追加批次数据BatchSortAggregation(current_grp, u_sess->attr.attr_memory.work_mem, false, m_runtime->ss.ps.plan->plan_node_id, SET_DOP(m_runtime->ss.ps.plan->dop));  // 执行批次排序聚合diff_group_row_num = i;  // 更新分组切换行号}}// 为新分组分配哈希单元并初始化cell = (hashCell*)m_sortCurrentBuf->Allocate(m_cellSize);sort_grp[current_sortGrpIdx] = cell;initCellValue<simple, true>(batch, cell, i);}Assert(cell != NULL);  // 确保哈希单元已分配m_Loc[i] = cell;  // 将当前行的哈希单元存储到结果位置}}// 更新当前分组的排序索引m_sortGrps[current_grp].sortGrpIdx = current_sortGrpIdx;// 如果启用了去重,执行追加和排序聚合if (hashDistinct) {m_sortDistinct[current_grp].m_distinctLoc = cell;AppendBatchForSortAgg(batch, diff_group_row_num, nrows, current_grp);BatchNoSortAgg(batch);} else {BatchAggregation(batch);  // 否则执行常规的批次聚合}
}

示例

  假设有如下 SQL 查询:

SELECT region, product, SUM(sales)
FROM sales_data
GROUP BY region, product;

  在此查询中,regionproduct分组键SUM(sales)聚合操作BatchMatchAndAgg 函数将处理来自 sales_data 表的批次数据,按 regionproduct 列对数据进行分组:

  1. 没有分组键的情况:如果查询不使用 GROUP BY(例如:SELECT SUM(sales) FROM sales_data)m_key == 0,函数将直接对整个批次数据进行求和
  2. 有分组键的情况:函数会遍历批次中的每一行,检查该行是否属于当前分组regionproduct 相同),如果是则将其加入该分组的哈希单元中,并执行聚合计算(累加 sales 值)。如果分组键不匹配,则开始处理新分组,并将该行数据作为新分组的起始数据。
  3. 去重:如果查询中使用了 DISTINCT,如 SELECT region, product, SUM(DISTINCT sales)函数将会在匹配分组键时执行去重操作

SortAggRunner::Run 函数

  函数 SortAggRunner::Run() 是整个分组和聚合操作的入口函数负责控制执行流程。它的主要功能是从数据源中获取数据,执行分组和聚合操作,并返回处理结果。函数通过一个状态机实现,按顺序进行获取数据源执行排序切换阶段等操作,直到所有数据处理完毕。

  1. 获取数据源:首先尝试从数据源(可能是左子树或外部数据)中获取批次数据。如果没有更多数据,则标记为完成并返回 NULL
  2. 执行排序和聚合:如果成功获取到数据源执行排序和聚合操作,并返回处理后的批次数据
  3. 阶段切换:如果存在多个阶段(如多阶段分组聚合),则在处理完当前阶段后切换到下一阶段,重新获取数据并继续处理
/** @Description: 入口函数。从这里开始执行聚合操作。* @return - 返回分组和聚合的结果。*/
VectorBatch* SortAggRunner::Run()
{// 无限循环,直到返回结果或结束操作for (;;) {switch (m_prepareState) {/* 获取数据源,可能是左子树或者排序状态 */case GET_SOURCE: {// 获取排序数据源(可能来自上游节点或外部源)m_sortSource = GetSortSource();// 如果数据源为空,表示没有更多数据,标记结束if (m_sortSource == NULL) {m_finish = true;return NULL; // 返回NULL表示没有更多数据}// 如果获取到数据源,进入排序阶段m_prepareState = RUN_SORT;break;}/* 执行分组和聚合操作 */case RUN_SORT: {// 执行排序操作,获取批次数据VectorBatch* batch = RunSort();// 如果批次数据为空if (BatchIsNull(batch)) {// 如果所有数据处理完成,返回NULLif (m_finish) {return NULL;}// 如果还没有结束,切换到下一阶段的解析m_prepareState = SWITCH_PARSE;} else {// 如果批次数据不为空,返回当前批次数据return batch;}break;}/* 如果存在多个阶段,切换到下一个阶段 */case SWITCH_PARSE: {// 切换到下一个阶段switch_phase();// 重新获取数据源,准备进入下一阶段m_prepareState = GET_SOURCE;break;}default:// 默认分支,处理不支持的状态break;}}
}

示例

  假设有如下 SQL 查询:

SELECT category, COUNT(*), SUM(sales) 
FROM sales_data 
GROUP BY category;

  在这个查询中,SortAggRunner::Run() 函数会按如下方式工作:

  1. 获取数据源:从 sales_data 中获取批次数据。
  2. 执行分组和聚合:对数据按照 category 列进行分组,并计算 COUNT(*)SUM(sales)
  3. 返回结果:将每个批次的聚合结果返回给调用者,直到所有数据处理完毕。

  如果查询涉及多阶段的分组,如 GROUPING SETS,该函数还会处理不同阶段的数据切换和排序操作。

SortAggRunner::GetSortSource 函数

  SortAggRunner::GetSortSource() 函数用于获取分组和聚合操作的数据源。数据源有两种可能:

  1. 来自已排序的批次数据:如果聚合操作已进行排序,那么函数会从内部排序结果中获取数据
  2. 来自外部子计划(左子树):如果没有内部排序结果,则函数从上游节点(通常是左子树)获取数据。

  该函数首先检查当前阶段是否已完成,如果已完成则返回 NULL,表示没有数据源可以获取。否则,它根据当前是否有内部排序结果来决定获取数据的方式,并返回相应的 hashSource 对象。

/** @Description: 获取数据源。数据可能来自左子树,或者是Ap函数中聚合自身排序的结果。*/
hashSource* SortAggRunner::GetSortSource()
{hashSource* ps = NULL;// 确保当前阶段索引不超过总阶段数Assert(m_runtime->current_phase <= m_runtime->numphases);// 如果当前阶段等于总阶段数,说明已经没有数据源可获取,返回NULLif (m_runtime->current_phase == m_runtime->numphases) {return NULL;}// 如果有排序输入数据(即m_batchSortIn不为空)else if (m_batchSortIn) {// 从已排序的批次数据创建一个新的排序源ps = New(CurrentMemoryContext) hashSortSource(m_batchSortIn, m_SortBatch);}// 否则,从外部计划状态(通常是左子树)获取数据源else {ps = New(CurrentMemoryContext) hashOpSource(outerPlanState(m_runtime));}// 返回数据源对象return ps;
}

SortAggRunner::GetSortSource 函数

  SortAggRunner::RunSort分组聚合操作的核心执行函数。它从数据源获取批次数据,执行分组和聚合计算,并根据不同的状态返回结果。函数的执行逻辑大致分为以下几个步骤:

AGG_FETCH: 从数据源获取批次数据并执行分组聚合。如果没有数据,它会进入处理结束批次或返回空批次的状态。
AGG_RETURN: 返回当前批次的聚合结果。如果有数据则返回,否则继续获取下一批数据。
AGG_RETURN_LAST: 返回最后一批数据,当所有批次处理完毕时进入此状态。
AGG_RETURN_NULL: 当数据源为空且分组列为空时,返回一个空批次。

  函数通过循环和状态切换来处理批次数据,直到所有数据处理完毕。

/** @Description: 从当前数据源中获取数据,执行分组和聚合操作,并返回计算结果。* @return - 分组和聚合的结果。*/
VectorBatch* SortAggRunner::RunSort()
{VectorBatch* outer_batch = NULL;  // 用于存储从外部数据源获取的批次数据VectorBatch* result_batch = NULL;  // 用于存储聚合后的结果int numset = Max(m_runtime->phase->numsets, 1);  // 当前阶段中的组集合数,默认为1Plan* plan = m_runtime->ss.ps.plan;  // 获取计划节点int64 workmem = SET_NODEMEM(plan->operatorMemKB[0], plan->dop);  // 分配运算内存,基于计划中的运算器内存和并行度int64 maxmem = (plan->operatorMaxMem > 0) ? SET_NODEMEM(plan->operatorMaxMem, plan->dop) : 0;  // 如果设置了最大内存,则获取最大内存,否则为0if (m_finish)return NULL;  // 如果聚合操作已完成,直接返回NULLResetExprContext(m_runtime->ss.ps.ps_ExprContext);  // 重置表达式上下文,准备执行新一轮计算for (;;) {switch (m_runState) {/* 获取数据,执行分组和聚合操作 */case AGG_FETCH:outer_batch = m_sortSource->getBatch();  // 从数据源获取当前批次数据if (unlikely(BatchIsNull(outer_batch))) {  // 如果没有数据,进入不同的状态处理if (m_noData) {  // 如果没有数据标志为真,直接返回空结果m_runState = AGG_RETURN_NULL;m_projected_set = 0;break;}if (m_hasDistinct) {  // 如果存在DISTINCT操作,执行最后一批数据的聚合操作for (int i = 0; i < numset; i++) {BatchSortAggregation(i, workmem, maxmem, plan->plan_node_id, SET_DOP(plan->dop));}}/* 返回最后一批数据 */m_projected_set = -1;m_runState = AGG_RETURN_LAST;break;}m_noData = false;  // 重置无数据标志if (m_batchSortOut) {  // 如果有批次排序输出,将批次数据放入排序缓冲区m_batchSortOut->sort_putbatch(m_batchSortOut, outer_batch, 0, outer_batch->m_rows);}InvokeFp(m_buildSortFun)(outer_batch);  // 调用构建排序聚合函数处理批次数据m_projected_set = -1;m_FreeMem = false;m_runState = AGG_RETURN;  // 准备返回数据break;/* 当返回行数超过1000时返回数据 */case AGG_RETURN:result_batch = ReturnData();  // 获取返回的批次数据if (result_batch != NULL) {return result_batch;  // 如果有结果批次,返回该批次} else {Assert(m_projected_set == numset);m_runState = AGG_FETCH;  // 否则继续获取数据}break;/* 返回最后一批数据,当 outer_batch 为 NULL 时 */case AGG_RETURN_LAST:result_batch = ReturnLastData();  // 返回最后一批数据if (result_batch != NULL) {return result_batch;} else {Assert(m_projected_set == numset);m_runState = AGG_FETCH;  // 否则继续获取数据return NULL;}break;/* 返回在分组列为空且左子树无数据时的结果 */case AGG_RETURN_NULL:result_batch = ReturnNullData();  // 返回空数据的批次if (result_batch != NULL) {return result_batch;} else {m_finish = true;  // 完成聚合操作return NULL;}default:break;}}
}

SortAggRunner::ReturnData 函数

  SortAggRunner::ReturnData 函数 ReturnData() 的主要功能是:

  • 遍历每个聚合组返回满足行数要求(如大于1000行)的聚合结果
  • 在处理每个聚合组时,检查该组是否有足够的数据。如果有,则生成一个结果批次并返回。
  • 如果没有足够的数据,函数会继续检查下一个聚合组
  • 如果所有聚合组都没有符合条件的数据,则返回 NULL,表示没有可以返回的数据。

  该函数的逻辑主要是为了应对大数据集的批量处理,当聚合组中的数据量达到指定大小(如1000行)时,生成一个批次返回,避免一次性处理过多数据导致内存溢出

/** @Description: 当行数大于1000时返回数据。* @return - 如果行数大于1000,返回分组和聚合的结果,否则返回null。*/
VectorBatch* SortAggRunner::ReturnData()
{VectorBatch* res_batch = NULL;  // 用于存储将要返回的结果批次。int numset = Max(m_runtime->phase->numsets, 1);  // 获取当前阶段的聚合组数,确保至少为1。while (m_projected_set < numset) {  // 循环处理每个聚合组。if (m_projected_set != -1 && m_FreeMem) {  // 如果当前组已经处理过,并且需要释放内存。Assert(m_projected_set >= 0);  // 确保当前聚合组的索引有效。/* 释放之前返回的数据所占用的内存 */FreeSortGrpMem<false>(BatchMaxSize);  // 释放之前批次的内存,防止内存泄漏。}m_projected_set++;  // 递增当前要处理的聚合组。m_runtime->projected_set = m_projected_set;  // 更新运行时信息中的聚合组编号。if (m_projected_set < numset) {  // 检查是否还有未处理的聚合组。int total_num = m_sortGrps[m_projected_set].sortGrpIdx;  // 获取当前组的排序组索引。m_FreeMem = false;  // 先设置不需要释放内存。if (total_num >= BatchMaxSize) {  // 如果当前组的元素数大于等于BatchMaxSize(1000行)。hashCell** sort = m_sortGrps[m_projected_set].sortGrp;  // 获取当前聚合组中的排序单元。for (int j = 0; j < BatchMaxSize; j++)  // 遍历当前聚合组的所有单元。InvokeFp(m_buildScanBatch)(sort[j]);  // 调用函数处理每个排序单元,生成批次数据。res_batch = ProducerBatch();  // 生成最终的批次数据。if (unlikely(BatchIsNull(res_batch))) {  // 检查是否生成了空的批次数据。m_FreeMem = true;  // 如果批次为空,标记需要释放内存。continue;  // 跳过当前组,处理下一个聚合组。} else {m_FreeMem = true;  // 如果生成了有效的批次数据,也标记释放内存。return res_batch;  // 返回生成的批次数据。}}}}return NULL;  // 如果没有符合条件的批次数据,返回NULL。
}

示例说明

  假设有一张包含销售数据的表 sales,你需要按月份对销售数据进行聚合,并返回每个月的销售总额。如果某个月的数据行数超过1000行,那么数据库系统会在达到1000行时生成一个批次并返回结果。此时,ReturnData() 函数会检查每个聚合组(例如,某个月的数据),如果数据行数达到1000行,就生成结果并返回。否则,继续检查下一个月份的数据。

SortAggRunner::FreeSortGrpMem 函数

  函数 FreeSortGrpMem 主要用于在聚合操作中释放内存。它通过两种方式来释放内存:

  1. 如果当前批次没有到达最终状态,函数会释放当前批次中部分排序组的内存并移动剩余的排序组
  2. 如果是最后一次调用,则清除整个排序组

  此操作主要是为了在处理大批量数据时,及时释放不再需要的数据,减少内存消耗,防止系统崩溃。

/** @Description: 释放返回批次占用的内存。*/
template <bool lastCall>
void SortAggRunner::FreeSortGrpMem(int num)
{int i, j, idx;  // 定义循环计数器和索引变量。Oid type_id;  // 用于存储当前列的数据类型ID。hashVal temp_val;  // 用于存储临时哈希值。FmgrInfo* final_flinfo = NULL;  // 用于存储最终聚合函数信息。FmgrInfo* agg_flinfo = NULL;  // 用于存储聚合函数信息。void* value_loc = NULL;  // 用于存储待释放的值指针。errno_t rc;  // 存储错误返回码。/* 找到聚合列并释放 m_sortGrp 中的值 */for (i = 0; i < m_aggNum; i++) {  // 遍历所有聚合列。/** 如果列是向量化的 AVG 类型,并且数据类型是 int1/int2/int4/float4/float8,* 则无需释放内存,因为这些内存在 ecxt_per_tuple_memory 中,它们会被自动重置。*/final_flinfo = m_runtime->aggInfo[i].vec_final_function.flinfo;  // 获取列的最终聚合函数信息。agg_flinfo = m_runtime->aggInfo[i].vec_agg_function.flinfo;  // 获取列的聚合函数信息。type_id = agg_flinfo->fn_rettype;  // 获取聚合函数返回值的数据类型。// 如果类型是 INT8ARRAYOID 或 FLOAT8ARRAYOID,则跳过该列,继续下一个列的处理。if (final_flinfo != NULL && ((type_id == INT8ARRAYOID) || (type_id == FLOAT8ARRAYOID))) {continue;}// 如果列是编码类型,则需要释放内存。if (COL_IS_ENCODE(type_id)) {idx = m_aggIdx[i];  // 获取聚合列在索引数组中的位置。for (j = 0; j < num; j++) {  // 遍历批次中的每一行。temp_val = ((hashCell*)(m_sortGrps[m_projected_set].sortGrp[j]))->m_val[idx];  // 获取当前行的值。if (NOT_NULL(temp_val.flag)) {  // 如果值非空,则释放其占用的内存。value_loc = DatumGetPointer(temp_val.val);  // 获取值的位置指针。if (value_loc != NULL)pfree_ext(value_loc);  // 释放值指向的内存。}}}}// 如果不是最后一次调用,则重排剩余的排序组数据。if (!lastCall) {int remain_grp = m_sortGrps[m_projected_set].sortGrpIdx - BatchMaxSize + 1;  // 计算剩余的分组数量。hashCell** sort = m_sortGrps[m_projected_set].sortGrp;  // 获取当前分组的排序单元。// 将剩余的分组元素移动到前面。for (i = 0; i < remain_grp; i++)sort[i] = sort[BatchMaxSize + i];const int free_len = 2 * BatchMaxSize - remain_grp;  // 计算需要清零的元素个数。// 将多余的排序组位置清零。rc = memset_s(&sort[remain_grp], free_len * sizeof(hashCell*), 0, free_len * sizeof(hashCell*));securec_check(rc, "\0", "\0");  // 检查内存设置操作的结果。/* 重置排序组的索引 */m_sortGrps[m_projected_set].sortGrpIdx = remain_grp - 1;// 重置排序组的备份缓冲区。m_sortGrps[m_projected_set].sortBckBuf->Reset();// 重置扫描批次。m_scanBatch->Reset();} else {// 如果是最后一次调用,清空整个排序组。rc = memset_s(m_sortGrps[m_projected_set].sortGrp,2 * BatchMaxSize * sizeof(hashCell*),0,2 * BatchMaxSize * sizeof(hashCell*));securec_check(rc, "\0", "\0");  // 检查内存设置操作的结果。// 重置排序组的索引。m_sortGrps[m_projected_set].sortGrpIdx = 0;/* 释放排序组的备份缓冲区 */m_sortGrps[m_projected_set].sortBckBuf->Reset();/* 重置扫描批次 */m_scanBatch->Reset();}
}

示例

  假设你在一个数据库中执行如下 SQL 查询:

SELECT city, SUM(sales)
FROM sales_data
GROUP BY city
ORDER BY city;

  这个查询的目的是sales_data 表中的销售数据按城市进行分组,并计算每个城市的销售总额。然后按城市名称进行排序
  当我们在执行这样的查询时,数据库会按城市对数据进行分组并逐行累加城市的销售额。同时,为了确保查询的顺利执行,数据库在处理大数据集时需要进行内存管理,以防止内存溢出。这时就需要 FreeSortGrpMem() 函数来释放已处理完成的数据占用的内存
  步骤示例:

  1. 数据库从 sales_data 表中获取每个城市的销售记录,并按城市进行分组。
  2. 在处理每个批次数据时,城市的销售额被逐步累加。
  3. 当某个批次的数据处理完后(比如某一批次中包含1000个城市的销售数据), FreeSortGrpMem() 函数会被调用,释放已经处理完的批次数据的内存。

SortAggRunner::ReturnLastData 函数

  函数 ReturnLastData() 的作用是在处理 SQL 查询中的最后一批数据时,返回最后一批的聚合和分组结果。它通过逐批处理分组数据,并在每个批次处理完后释放内存,确保内存使用不会过高。在没有数据的批次中,会跳过不处理,直到找到并返回非空的批次结果。

/** @Description: 返回最后一个批次的数据。* @return - 返回最后一个批次的数据。*/
VectorBatch* SortAggRunner::ReturnLastData()
{VectorBatch* res_batch = NULL; // 初始化返回的批次数据为NULLint numset = Max(m_runtime->phase->numsets, 1); // 获取聚合操作中要处理的批次数,默认为1// 循环处理所有的批次数据while (m_projected_set < numset) {// 如果 m_projected_set 不为-1,表示已经处理过一部分批次数据if (m_projected_set != -1) {Assert(m_projected_set >= 0); // 确保 m_projected_set 有效// 释放已经返回数据时使用的内存FreeSortGrpMem<true>(m_sortGrps[m_projected_set].sortGrpIdx);}m_projected_set++; // 处理下一个批次m_runtime->projected_set = m_projected_set; // 更新当前运行时状态中的 projected_set// 如果当前批次编号还在范围内,继续处理if (m_projected_set < numset) {int total_num = m_sortGrps[m_projected_set].sortGrpIdx; // 获取当前批次的分组索引// 如果当前分组为空,跳过继续处理if (total_num == 0 && m_sortGrps[m_projected_set].sortGrp[0] == NULL) {continue;} else {hashCell** sort = m_sortGrps[m_projected_set].sortGrp; // 获取当前批次的分组数据// 遍历当前批次的所有分组for (int i = 0; i <= total_num; i++) {InvokeFp(m_buildScanBatch)(sort[i]); // 调用函数构建扫描批次}res_batch = ProducerBatch(); // 生成批次结果if (BatchIsNull(res_batch)) { // 如果批次为空,继续处理continue;} else {return res_batch; // 返回生成的批次数据}}}}return NULL; // 如果所有批次处理完毕,返回NULL
}

示例

  假设有一个复杂的 SQL 查询,带有聚合排序操作:

SELECT region, SUM(sales)
FROM sales_data
GROUP BY region
ORDER BY region;

  在这个 SQL 中,GROUP BYORDER BY 操作可能会涉及多次批次处理。在查询执行过程中,数据库会分批次地进行聚合(计算每个区域的销售总和)和排序,可能处理上千行数据。当所有批次的主要数据处理完后,ReturnLastData() 会负责处理和返回最后一批的数据结果

SortAggRunner::ReturnNullData 函数

  ReturnNullData 函数用于在一种特定情况下返回一行数据:==当“group by”列为空且从左子树(lefttree)获取不到数据时。==它通过检查 gset_lengths 来判断当前分组是否包含数据,如果分组长度为0,则需要返回一个批次数据,该批次只包含一行空值,确保即使没有数据也能有一个默认的返回值。

/** @Description: 当“group by”列为空且左子树返回的数据为空时,我们需要返回一行数据。* 该函数主要用于在左子树没有数据的情况下处理聚合操作,确保在这种情况下也能返回一行结果。* @return - 返回处理结果。如果没有数据,返回NULL。*/
VectorBatch* SortAggRunner::ReturnNullData()
{VectorBatch* result_batch = NULL; // 初始化返回批次为空int numset = Max(m_runtime->phase->numsets, 1); // 获取当前阶段的批次数,默认为1// 当 gset_lengths 数组存在时,遍历当前批次,直到所有分组数据处理完毕while (m_runtime->phase->gset_lengths && m_projected_set < numset) {// 如果当前分组长度为0,则该分组不含数据if (0 == m_runtime->phase->gset_lengths[m_projected_set]) {// 如果 m_scanBatch 没有数据,则需要构建一个包含空值的批次if (m_scanBatch->m_rows == 0) {BuildNullScanBatch(); // 构建一个包含空值的批次数据}// 更新当前批次编号m_runtime->projected_set = m_projected_set;result_batch = ProducerBatch(); // 生成当前批次的结果m_projected_set++; // 处理下一个批次// 如果生成的批次非空,则返回该批次if (!BatchIsNull(result_batch)) {return result_batch;} else {// 如果结果为空,则返回 NULLreturn NULL;}}// 处理下一个批次m_projected_set++;}return NULL; // 如果所有批次处理完毕,返回NULL
}

示例

  假设我们有一个如下的 SQL 查询:

SELECT SUM(salary), department 
FROM employees 
GROUP BY department;

  在某些情况下,如果某个 department 列为空且没有数据从表中返回,那么根据 ReturnNullData 的逻辑,它将返回一行默认的空值作为聚合结果

SortAggRunner::BuildNullScanBatch 函数

  BuildNullScanBatch 函数用于生成一个包含一行空值数据的批次,并将所有列的值设置为 NULL。此外,它还会对批次中的聚合列进行初始化,确保聚合函数可以处理没有有效数据的情况。通常用于聚合查询中,当“group by”列为空且左子树没有返回任何数据时,保证查询能够返回一行结果。

/** @Description: 设置一行数据,该行的数据值全为 NULL,同时初始化聚合函数的值。* 该函数用于在特定情况下生成一个仅包含空值的批次数据,用于应对没有有效数据的情况。* @return - 无返回值,直接操作批次数据。*/
void SortAggRunner::BuildNullScanBatch()
{int i;int nrows = m_scanBatch->m_rows;  // 获取当前批次的行数int scan_batch_cols = m_cellVarLen + m_aggNum;  // 批次列数,等于聚合列数和非聚合列数之和int col_idx = 0;  // 当前列的索引ScalarVector* vector = NULL;  // 用于操作批次中的数据列// 遍历每一列,设置当前批次数据的值为 NULLfor (i = 0; i < scan_batch_cols; i++) {vector = &m_scanBatch->m_arr[i];  // 获取列数据向量SET_NULL(vector->m_flag[nrows]);  // 将该行设置为 NULL 值vector->m_rows++;  // 增加列的行数}// 遍历每一个聚合列,初始化聚合值for (i = 0; i < m_aggNum; i++) {if (m_aggCount[i]) {  // 如果该聚合函数存在vector = &m_scanBatch->m_arr[col_idx + m_cellVarLen];  // 获取当前聚合列的向量vector->m_vals[nrows] = 0;  // 初始化聚合值为 0SET_NOTNULL(vector->m_flag[nrows]);  // 设置该行的聚合值为非 NULL}col_idx++;  // 移动到下一个列}m_scanBatch->m_rows++;  // 增加批次的总行数
}

SortAggRunner::switch_phase 函数

  switch_phase 函数的主要作用是在多阶段聚合操作中进行阶段切换。每个阶段可能涉及不同的分组列排序操作聚合函数计算。这个函数负责结束当前阶段的排序和聚合操作,并为下一阶段的操作做好准备。它确保在不同聚合阶段之间平滑过渡,同时释放不再需要的资源。

/** @Description: 切换到下一个阶段。* 这个函数负责在多个聚合阶段之间进行切换,每个阶段对应不同的分组列和聚合操作。*/
void SortAggRunner::switch_phase()
{/* 如果当前有正在进行的批次排序输入,结束当前排序并释放资源 */if (m_batchSortIn) {batchsort_end(m_batchSortIn);  // 结束当前的批次排序,释放相关资源m_batchSortIn = NULL;  // 将指针置为空,表示当前阶段完成}/* 如果有批次排序输出,启动新的排序阶段 */if (m_batchSortOut) {m_batchSortIn = m_batchSortOut;  // 将输出赋值给输入,准备下一阶段的排序操作batchsort_performsort(m_batchSortIn);  // 执行排序操作m_batchSortOut = NULL;  // 将输出置为空,表示当前排序已完成}m_runtime->current_phase++;  // 增加当前的聚合阶段,切换到下一阶段init_phase();  // 初始化新阶段的环境/* 如果还有更多阶段,设置下一个阶段的分组列 */if (m_runtime->current_phase < m_runtime->numphases) {set_key();  // 设置新的分组列,用于下一阶段的聚合计算}
}

SortAggRunner::ResetNecessary 函数

  该函数的主要作用是重置 SortAggRunner 中的相关状态和内存,以便准备新的聚合操作。它通过清理先前的排序器和排序组,将状态标记为未完成,并重新初始化运行阶段。在执行聚合操作之前,需要通过此函数重置所有状态和内存,以确保在新一轮聚合操作中数据的一致性和正确性。
  具体功能细节:

  1. 状态重置函数将 m_finish 和其他相关的状态变量重置为初始状态
  2. 内存释放释放并结束批处理排序器的资源,避免内存泄漏。
  3. 重新初始化current_phase(当前运行阶段)设为初始值,并通过 init_phaseset_key 函数重新配置运行环境
  4. 清空排序组通过 memset_s 函数清空排序组的内存重置索引和缓冲区,确保每个聚合阶段开始时有干净的环境。
  5. 返回状态函数最终返回 true,表示重置成功,可以继续进行新的聚合操作。
bool SortAggRunner::ResetNecessary(VecAggState* node)
{// 将聚合执行状态标记为未完成m_finish = false;// 设置准备状态为获取数据源m_prepareState = GET_SOURCE;// 将运行状态设置为聚合获取数据m_runState = AGG_FETCH;// 标记为无数据状态m_noData = true;// 初始化内存释放标志为falsem_FreeMem = false;// 如果输入批处理排序器存在,则释放并结束排序if (m_batchSortIn) {batchsort_end(m_batchSortIn);m_batchSortIn = NULL;}// 如果输出批处理排序器存在,则释放并结束排序if (m_batchSortOut) {batchsort_end(m_batchSortOut);m_batchSortOut = NULL;}// 重置当前的运行阶段为初始阶段m_runtime->current_phase = 0;// 初始化当前阶段init_phase();// 如果聚合函数存在,设置键值用于排序if (m_ApFun) {set_key();}// 获取最大聚合集的数量,至少为1int numset = Max(m_runtime->maxsets, 1);// 遍历每个聚合集,重置相关的排序组for (int i = 0; i < numset; i++) {// 将当前排序组的内存清空为0errno_t rc = memset_s(m_sortGrps[i].sortGrp, 2 * BatchMaxSize * sizeof(hashCell*), 0, 2 * BatchMaxSize * sizeof(hashCell*));// 检查内存操作是否成功securec_check(rc, "\0", "\0");// 重置排序组索引为0m_sortGrps[i].sortGrpIdx = 0;// 重置排序组的后备缓冲区m_sortGrps[i].sortBckBuf->Reset();// 重置扫描批处理m_scanBatch->Reset();}// 返回true,表示重置成功return true;
}

SortAggRunner::endSortAgg 函数

  该函数的主要作用是结束排序聚合操作并释放所有占用的资源。它首先结束任何与去重操作相关的批处理排序器,然后释放所有分组集的缓冲区和扫描批处理,以确保内存资源被释放,避免内存泄漏。

void SortAggRunner::endSortAgg()
{int setno;// 计算分组集的数量,至少为1int num_grouping_sets = Max(m_runtime->maxsets, 1);// 如果存在用于去重的排序器if (m_sortDistinct) {// 遍历每个分组集for (setno = 0; setno < num_grouping_sets; setno++) {// 遍历每个聚合操作for (int i = 0; i < m_aggNum; i++) {// 如果当前分组集对应的批处理排序器存在if (m_sortDistinct[setno].batchsortstate[i]) {// 结束当前的批处理排序操作,释放资源batchsort_end(m_sortDistinct[setno].batchsortstate[i]);// 将当前分组集的排序状态置为空m_sortDistinct[setno].batchsortstate[i] = NULL;}}}}// 释放缓存内存int numset = Max(m_runtime->maxsets, 1);// 遍历每个分组集for (int i = 0; i < numset; i++) {// 释放每个分组集的后备缓冲区m_sortGrps[i].sortBckBuf->DeInit();// 重置扫描批处理m_scanBatch->Reset();}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com