Window函数与WindowOperator源码解析
在开窗函数出现之前存在着很多用 SQL 语句很难解决的问题,很多都要通过复杂的相关子查询或者存储过程来完成。为了解决这些问题,在 2003 年 ISO SQL 标准加入了开窗函数,开窗函数的使用使得这些经典的难题可以被轻松的解决。
由于最近业务上逐渐window函数应用的较多,初步阅读并记录一下相关实现点
开窗函数与聚合函数有一定相似,均是对行集组进行聚合计算,但是它不像普通聚合函数那样每组只返回一个值,开窗函数可以为每组返回多个值,因为开窗函数所执行聚合计算的行集组是窗口。
本文从Window语法,Window function以及Presto中WindowOperator的实现进行逐一介绍。
window的sql 语法
function over (partition by a order by b RANGE|ROWS BETWEEN AND)
由此可见,window语法由关键字over触发,且一个包含三个组成部分
- 分区规则:partition by,类似agg的group by,用于指定数据用于分区的
- 排序规则:order by , 用于指定窗口内数据的计算顺序
- 窗口区间:指定计算的数据的窗口边界
其中分区规则和排序规则均根据字面意思比较好理解,那么主要介绍一下关于窗口区间这一块。
对于窗口区间,支持关键字RANGE/ROWS两种模式,其中RANGE表示按照计算列值的范围进行定义,而ROWS则是按照计算列的行数进行范围定义。且BETWEEN x AND y指定的边界可取值为
CURRENT ROW ---- 当前行
N PRECEDING ---- 前n行
UNBOUNDED PRECEDING ---- 直到第一行
N FOLLOWING ---- 后n行
UNBOUNDED FOLLOWING ---- 直到最后一行
举例
select name,
salary,
sum(salary) over (order by salary rows between UNBOUNDED PRECEDING and CURRENT ROW)
from persions
以上表示计算每个人的薪水,并统计累计薪水总和。
对于数据
+------+------+--------+
| id | name | salary |
+------+------+--------+
| 1 | a | 100 |
| 2 | b | 100 |
| 3 | c | 50 |
| 4 | d | 150 |
+------+------+--------+
对于rows
+------+--------+------+
| name | salary | sum |
+------+--------+------+
| c | 50 | 50 |
| a | 100 | 150 |
| b | 100 | 250 |
| d | 150 | 400 |
+------+--------+------+
而对于range
+------+--------+------+
| name | salary | sum |
+------+--------+------+
| c | 50 | 50 |
| a | 100 | 250 |
| b | 100 | 250 |
| d | 150 | 400 |
+------+--------+------+
可以看到对于a,b salary相同的情况,range的粒度是按照排序列的值进行输出。
WindowPartition
由上语法描述中可知,整个窗口的计算集合,首先由 partition by 限定按照partition by列切分成不同的分区,然后再依赖rows/range between and来限定每次计算的滑动窗口范围。其中对于range的情况,还要计算相同值的范围。
public final class WindowPartition {
private final PagesIndex pagesIndex; //数据
private final int partitionStart; // 根据分区计算出的起始地址
private final int partitionEnd; // 根据分区计算出的结束地址
private final int[] outputChannels;
private final List<FramedWindowFunction> windowFunctions; // 开窗函数
private final PagesHashStrategy peerGroupHashStrategy; // 相同值分组的比较策略
private int peerGroupStart; // 相同值分组的起始地址
private int peerGroupEnd; // 相同值分组的结束地址
private int currentPosition; // 当前行位置
}
其中上面的数据结构仅仅划定了partition by 与 range情况的peerGroup区间,而对于开窗计算的bewteen and区间则根据具体的WindowFunctions对应的frame信息进行计算
private Range getFrameRange(FrameInfo frameInfo)
int rowPosition = currentPosition - partitionStart;
int endPosition = partitionEnd - partitionStart - 1;
int frameStart;
int frameEnd;
// frame start
if (frameInfo.getStartType() == UNBOUNDED_PRECEDING) {
frameStart = 0;
else if (frameInfo.getStartType() == PRECEDING) {
frameStart = preceding(rowPosition, getStartValue(frameInfo));
else if (frameInfo.getStartType() == FOLLOWING) {
frameStart = following(rowPosition, endPosition, getStartValue(frameInfo));
else if (frameInfo.getType() == RANGE) {
frameStart = peerGroupStart - partitionStart;
else {
frameStart = rowPosition;
// frame end
if (frameInfo.getEndType() == UNBOUNDED_FOLLOWING) {
frameEnd = endPosition;
else if (frameInfo.getEndType() == PRECEDING) {
frameEnd = preceding(rowPosition, getEndValue(frameInfo));
else if (frameInfo.getEndType() == FOLLOWING) {
frameEnd = following(rowPosition, endPosition, getEndValue(frameInfo));
else if (frameInfo.getType() == RANGE) {
frameEnd = peerGroupEnd - partitionStart - 1;
else {
frameEnd = rowPosition;
// handle empty frame
if (emptyFrame(frameInfo, rowPosition, endPosition)) {
frameStart = -1;
frameEnd = -1;
return new Range(frameStart, frameEnd);
}
这里面的frameStart和frameEnd是相对于当前partition窗口的一个相对地址,当windowFunction进行计算的时候,数据的位置关系需要基于partitionStart + frameStart计算实际的位置,此层逻辑主要封装在PageWindowIndex中
public class PagesWindowIndex
implements WindowIndex
private final PagesIndex pagesIndex;
private final int start; //partitionstart 位置
private final int size;
private int position(int position)
checkElementIndex(position, size, "position");
return position + start;
}
PagesWindowIndex作为传给WindowFunction的数据结构,封装了数据访问的基本接口。
WindowOperator
windowOperator负责进行Partition的分区以及数据的排序,关键点在于理解下面三个PagesHashStrategy
/**
* 对于输入数据如果预先做过分组,则通过preGroupedPartitionHashStrategy对数据进行切分
* 这样不用积攒全部数据,提前切分partition进行相对流式的计算。不过大部分场景下该策略为空
private final PagesHashStrategy preGroupedPartitionHashStrategy;
* 对于preGrouped的列可能是partition by列的子集,那么首先流式的数据积攒通过preGrouped
* 列切分后,再通过排除preGrouped列进行切分出正确的partition by区间进行window计算。