添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
首发于 Presto
Window函数与WindowOperator源码解析

Window函数与WindowOperator源码解析

在开窗函数出现之前存在着很多用 SQL 语句很难解决的问题,很多都要通过复杂的相关子查询或者存储过程来完成。为了解决这些问题,在 2003 年 ISO SQL 标准加入了开窗函数,开窗函数的使用使得这些经典的难题可以被轻松的解决。
由于最近业务上逐渐window函数应用的较多,初步阅读并记录一下相关实现点

开窗函数与聚合函数有一定相似,均是对行集组进行聚合计算,但是它不像普通聚合函数那样每组只返回一个值,开窗函数可以为每组返回多个值,因为开窗函数所执行聚合计算的行集组是窗口。

本文从Window语法,Window function以及Presto中WindowOperator的实现进行逐一介绍。

window的sql 语法

function over (partition by a order by b RANGE|ROWS BETWEEN AND)

由此可见,window语法由关键字over触发,且一个包含三个组成部分

  • 分区规则:partition by,类似agg的group by,用于指定数据用于分区的
  • 排序规则:order by , 用于指定窗口内数据的计算顺序
  • 窗口区间:指定计算的数据的窗口边界

其中分区规则和排序规则均根据字面意思比较好理解,那么主要介绍一下关于窗口区间这一块。

对于窗口区间,支持关键字RANGE/ROWS两种模式,其中RANGE表示按照计算列值的范围进行定义,而ROWS则是按照计算列的行数进行范围定义。且BETWEEN x AND y指定的边界可取值为

CURRENT ROW         ---- 当前行
N PRECEDING         ---- 前n行
UNBOUNDED PRECEDING ---- 直到第一行
N FOLLOWING         ---- 后n行
UNBOUNDED FOLLOWING ---- 直到最后一行

举例

select name,
       salary,
       sum(salary) over (order by salary rows between UNBOUNDED PRECEDING and CURRENT ROW)
from persions

以上表示计算每个人的薪水,并统计累计薪水总和。

对于数据

+------+------+--------+
| id   | name | salary |
+------+------+--------+
|    1 | a    |    100 |
|    2 | b    |    100 |
|    3 | c    |     50 |
|    4 | d    |    150 |
+------+------+--------+

对于rows

+------+--------+------+
| name | salary | sum  |
+------+--------+------+
| c    |     50 |   50 |
| a    |    100 |  150 |
| b    |    100 |  250 |
| d    |    150 |  400 |
+------+--------+------+

而对于range

+------+--------+------+
| name | salary | sum  |
+------+--------+------+
| c    |     50 |   50 |
| a    |    100 |  250 |
| b    |    100 |  250 |
| d    |    150 |  400 |
+------+--------+------+

可以看到对于a,b salary相同的情况,range的粒度是按照排序列的值进行输出。

WindowPartition

由上语法描述中可知,整个窗口的计算集合,首先由 partition by 限定按照partition by列切分成不同的分区,然后再依赖rows/range between and来限定每次计算的滑动窗口范围。其中对于range的情况,还要计算相同值的范围。

public final class WindowPartition {
    private final PagesIndex pagesIndex;  //数据
    private final int partitionStart;  // 根据分区计算出的起始地址
    private final int partitionEnd;    // 根据分区计算出的结束地址
    private final int[] outputChannels;
    private final List<FramedWindowFunction> windowFunctions;  // 开窗函数
    private final PagesHashStrategy peerGroupHashStrategy;   // 相同值分组的比较策略
    private int peerGroupStart;  // 相同值分组的起始地址
    private int peerGroupEnd;    // 相同值分组的结束地址
    private int currentPosition;  // 当前行位置
}

其中上面的数据结构仅仅划定了partition by 与 range情况的peerGroup区间,而对于开窗计算的bewteen and区间则根据具体的WindowFunctions对应的frame信息进行计算

private Range getFrameRange(FrameInfo frameInfo)
        int rowPosition = currentPosition - partitionStart;
        int endPosition = partitionEnd - partitionStart - 1;
        int frameStart;
        int frameEnd;
        // frame start
        if (frameInfo.getStartType() == UNBOUNDED_PRECEDING) {
            frameStart = 0;
        else if (frameInfo.getStartType() == PRECEDING) {
            frameStart = preceding(rowPosition, getStartValue(frameInfo));
        else if (frameInfo.getStartType() == FOLLOWING) {
            frameStart = following(rowPosition, endPosition, getStartValue(frameInfo));
        else if (frameInfo.getType() == RANGE) {
            frameStart = peerGroupStart - partitionStart;
        else {
            frameStart = rowPosition;
        // frame end
        if (frameInfo.getEndType() == UNBOUNDED_FOLLOWING) {
            frameEnd = endPosition;
        else if (frameInfo.getEndType() == PRECEDING) {
            frameEnd = preceding(rowPosition, getEndValue(frameInfo));
        else if (frameInfo.getEndType() == FOLLOWING) {
            frameEnd = following(rowPosition, endPosition, getEndValue(frameInfo));
        else if (frameInfo.getType() == RANGE) {
            frameEnd = peerGroupEnd - partitionStart - 1;
        else {
            frameEnd = rowPosition;
        // handle empty frame
        if (emptyFrame(frameInfo, rowPosition, endPosition)) {
            frameStart = -1;
            frameEnd = -1;
        return new Range(frameStart, frameEnd);
 }

这里面的frameStart和frameEnd是相对于当前partition窗口的一个相对地址,当windowFunction进行计算的时候,数据的位置关系需要基于partitionStart + frameStart计算实际的位置,此层逻辑主要封装在PageWindowIndex中

public class PagesWindowIndex
        implements WindowIndex
    private final PagesIndex pagesIndex;
    private final int start;             //partitionstart 位置
    private final int size;
    private int position(int position)
        checkElementIndex(position, size, "position");
        return position + start;
}

PagesWindowIndex作为传给WindowFunction的数据结构,封装了数据访问的基本接口。

WindowOperator

windowOperator负责进行Partition的分区以及数据的排序,关键点在于理解下面三个PagesHashStrategy

/**
* 对于输入数据如果预先做过分组,则通过preGroupedPartitionHashStrategy对数据进行切分
* 这样不用积攒全部数据,提前切分partition进行相对流式的计算。不过大部分场景下该策略为空
private final PagesHashStrategy preGroupedPartitionHashStrategy;
* 对于preGrouped的列可能是partition by列的子集,那么首先流式的数据积攒通过preGrouped
* 列切分后,再通过排除preGrouped列进行切分出正确的partition by区间进行window计算。