添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
开发ODPS Script任务

开发ODPS Script任务

DataWorks 为您提供 ODPS Script 节点,其 SQL 开发模式是 MaxCompute 基于 2.0 SQL 引擎提供的脚本开发模式。本文为您介绍 ODPS Script 节点的使用。

前提条件

已创建 ODPS Script 节点,详情请参见 创建并管理 MaxCompute 节点

背景信息

MaxCompute 当前的 SQL 引擎支持脚本模式 SQL(Script Mode SQL),在脚本模式下编译脚本时,一个多语句的 SQL 脚本文件将被作为一个整体进行编译,无需对单个语句进行编译,适合用于改写需要层层嵌套子查询的单个语句,或因为脚本复杂性而不得不拆成多个语句的脚本。在提交运行时,SQL 脚本文件会被整体提交,并生成一个执行计划,保证只需排队一次、执行一次,让您能充分利用 MaxCompute 的资源,详情请参见 SQL 脚本模式 。在 DataWorks 中,您可通过 ODPS Script 节点实现 MaxCompute SQL 脚本模式开发任务代码,并调度其他作业的集成操作。

适用场景

脚本模式的适用场景如下:

  • 脚本模式适合用来改写需要层层嵌套子查询的单个语句,或者因为脚本复杂性而不得不拆成多个语句的脚本。

  • 如果多个输入的数据源数据准备完成的时间间隔很长(例如一个 01:00 可以准备好,一个 07:00 可以准备好),则不适合通过 table variable 衔接拼装为一个大的脚本模式 SQL。

  • 脚本模式下,您可以对一个变量赋常量值,然后执行 SELECT * FROM 变量 语句转化为标量与其它列进行计算。常量值也可以存放在一个单行的表中,命令示例如下。转化语法请参见 子查询(SUBQUERY)

    @a := SELECT 10; --对@a赋值常量10,或者赋值存在一个单行表t1中,SELECT col1 FROM t1。
    @b := SELECT key,value+(SELECT * FROM @a) FROM t2 WHERE key >10000; --t2表中value值与@a中的值进行计算。
    SELECT * FROM @b;

语法结构

一个 ODPS Script 脚本的完整形式为 SET 语句>DDL 语句>DML 语句 ,每种类型语句都可以有 0 到多个语句,但不同类型的语句不能混合。多个语句以@开始,表示变量连接。语法如下:

--SET语句
set odps.sql.type.system.odps2=true;
[set odps.stage.reducer.num=***;]
[...]
--DDL语句
create table table1 xxx;
[create table table2 xxx;]
[...]
--DML语句
@var1 := SELECT [ALL | DISTINCT] select_expr, select_expr, ...
    FROM table3
    [WHERE where_condition];
@var2 := SELECT [ALL | DISTINCT] select_expr, select_expr, ...
    FROM table4
    [WHERE where_condition];
@var3 := SELECT [ALL | DISTINCT] var1.select_expr, var2.select_expr, ...
    FROM @var1 join @var2 on ...;
INSERT OVERWRITE|INTO TABLE [PARTITION (partcol1=val1, partcol2=val2 ...)]
    SELECT [ALL | DISTINCT] select_expr, select_expr, ...
    FROM @var3;
[@var4 := SELECT [ALL | DISTINCT] var1.select_expr, var.select_expr, ... FROM @var1
    UNION ALL | UNION
    SELECT [ALL | DISTINCT] var1.select_expr, var.select_expr, ... FROM @var2;
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name
    SELECT [ALL | DISTINCT] select_expr, select_expr, ...
    FROM var4;]

使用限制

ODPS Script 节点的使用限制如下:

  • 脚本模式支持 SET 语句、部分 DDL 语句(结果是屏显类型的语句除外,例如 desc show )和 DML 语句。

  • 一个脚本,目前最多支持一个屏幕显示结果的语句(例如单独的 Select 语句),否则会报错。不建议您在脚本中执行屏幕显示的 Select 语句。

  • 一个脚本,目前最多支持一个 Create table as 语句,并且必须是最后一句。建议将建表语句和 Insert 语句分开写。

  • 脚本模式下,如果有一个语句失败,整个脚本的语句都不会执行成功。

  • 脚本模式下,只有所有输入的数据都准备完成,才会生成一个作业进行数据处理。

  • 脚本模式下,如果一个表被写入后又被读取,会报错。

    insert overwrite table src2 select * from src where key > 0;
    @a := select * from src2;
    select * from @a;

    为避免先写后读,可以进行如下修改。

    @a := select * from src where key > 0;
    insert overwrite table src2 select * from @a;
    select * from @a;

编辑代码:简单示例

MaxCompute Script Mode SQL 编译较为简单,您只需要按照业务逻辑,用类似于普通编程语言的方式进行编译,无需考虑如何组织语句。以下以一个简单示例为您介绍 ODPS Script 节点的使用。

create table if not exists dest(key string , value bigint) ;
create table if not exists dest2(key string,value bigint ) ;
@a := select * from src where value >0;
@b := select * from src2 where key is not null;
@c := select * from src3 where value is not null;
@d := select a.key,b.value from @a left outer join @b on a.key=b.key and b.value>0;
@e := select a.key,c.value from @a inner join @c on a.key=c.key;
@f := select * from @d union select * from @e union select * from @a;
insert overwrite table dest  select * from @f;
@g := select e.key,c.value  from @e join @c on e.key=c.key;
insert overwrite table dest2 SELECT * from @g;
说明

本示例中需要用到的三张表 src、src1、src2,建表语句如下:

--创建表
create table if not EXISTS  src(key string ,value BIGINT);
insert into src values ('1',11) ;
insert into src values ('1',11) ;
create table if not EXISTS  src2(key string ,value BIGINT);
insert into src2  values ('1',22);
insert into src2  values ('2',22);
create table if not EXISTS  src3(key string ,value BIGINT);
insert into src3 values ('1',33);
insert into src3 values ('3',33);

编辑代码:进阶示例

MaxCompute 脚本模式支持 IF 语句,IF 语句可以使程序根据条件,自动选择执行逻辑。MaxCompute IF 语法根据 Condition 类型分为 BOOLEAN 类型,以及 BOOLEAN Scalar SubQuery。

  • IF 条件为 BOOLEAN 类型表达式

    这种类型的 IF ELSE 语句可以在编译阶段决定执行哪个分支,示例如下:

    --数据处理
    set odps.sql.allow.fullscan=true;
    set odps.optimizer.cbo.rule.filter.black=LM; 
    @date := '${var}';
    @row  TABLE(key int,value bigint); --声明变量row,其类型为Table,schemastring. 
    IF ( cast(@date  as bigint) % 2 == 0 ) BEGIN 
    @row  := SELECT key,value from src1; 
    END ELSE BEGIN
    @row  := SELECT key,value from src2; 
    INSERT OVERWRITE TABLE dest1 partition(p='${var}')  SELECT key,value FROM @row; 
    说明

    代码中定义了名为 var 的变量,您需要在 调度参数配置 处为 var 变量进行赋值。

  • IF 条件 BOOLEAN Scalar SubQuery

    这种类型的 IF ELSE 语句在编译阶段无法决定执行哪个分支,在运行时才能决定。因此,需要提交多个作业,示例如下:

    @i bigint;
    @t table(id bigint, value bigint);
    IF ((SELECT count(*) FROM src WHERE a = '5') > 1) BEGIN
    @i := 1;
    @t := select @i, @i*2;
    END ELSE
    BEGIN
    @i := 2;
    @t := select @i, @i*2;
    select id, value from @t; 
  • 嵌入式 UDF 开发

    此外,您也可以通过将 J ava Python 代码嵌入 SQL 脚本的方式,使用 MaxCompute 脚本模式实现代码嵌入式 UDF(Embedded UDF)开发,详情 请参见 UDF(嵌入式)

后续步骤

当您完成当前节点的任务开发后,通常您可进行以下操作。

  • 调度配置:配置节点的周期性调度属性。任务需要周期性调度运行时,您需要设置节点后续实际运行过程中的重跑属性、调度依赖关系等,操作详情请参见 任务调度属性配置概述

  • 任务调试:对当前节点的代码进行测试运行,确认代码逻辑符合预期,操作详情请参见 任务调试流程

  • 任务发布:完成所有开发相关操作后,您需要将所有任务节点进行发布,发布后节点即会根据调度配置结果进行周期性运行,操作详情请参见 发布任务