系统研究Airbnb开源项目airflow

jopen 9年前

调研了一些几个调度系统, airflow 更满意一些. 花了些时间写了这个博文, 这应该是国内技术圈中最早系统性研究airflow的文章了.  转载请注明出处 http://www.cnblogs.com/harrychinese/ .

========================

airflow概况

========================

文档:

http://airflow.readthedocs.org/en/latest/

几个调度系统的比较, 可参考:

http://www.rigongyizu.com/about-workflow-schedule-system/

Principles:

动态性: Airflow pipeline是以python code形式做配置, 灵活性没得说了.

扩展性: 可以自定义operator(运算子), 有几个executor(执行器)可供选择.

优雅:pipeline的定义很简单明了, 基于jinja模板引擎很容易做到脚本命令参数化.

扩展性:模块化的结构, 加上使用了message queue来编排多个worker(启用CeleryExcutor), airflow可以做到无限扩展.

竞品:

airflow并不是data streaming方案, 所以不是Spark Streaming/Storm的竞品. 和airflow类似的有: Apache Oozie, Linkedin Azkaban.

比较优势:

linkedin的Azkaban很不错, UI尤其很赞, 使用java properties文件维护上下游关系, 任务资源文件需要打包成zip, 部署不是很方便.

Apache Oozie, 使用XML配置, Oozie任务的资源文件都必须存放在HDFS上. 配置不方便同时也只能用于Hadoop.

Spotify的 Luigi, UI 太烂.

airflow 总体都不错, 有实用的UI, 丰富的cli工具, Task上下游使用python编码, 能保证灵活性和适应性.

========================

概念:

========================

不用多说概念自然非常重要, 这是理解airflow的基础.

---------------

Operators:

---------------  

基本可以理解为一个抽象化的task, Operator加上必要的运行时上下文就是一个task. 有三类Operator:

1. Sensor(传感监控器), 监控一个事件的发生.

2. Trigger(或者叫做Remote Excution), 执行某个远端动作, (我在代码中没有找到这个类别)

3. Data transfer(数据转换器), 完成数据转换

---------------

Hooks:

---------------    

Hook是airflow与外部平台/数据库交互的方式, 一个Hook类就像是一个JDBC driver一样. airflow已经实现了jdbc/ftp/http/webhdfs很多hook. 要访问RDBMS数据库 有两类Hook可供选择, 基于原生Python DBAPI的Hook和基于JDBC的Hook, 以Oracle为例,

OracleHook, 是通过cx_Oracle 访问Oracle数据, 即原生Python binding, 有些原生的Hook支持Bulk load.

JdbcHook, 是通过jaydebeapi+Oracle JDBC访问Oracle数据         

Tasks: task代表DAG中的一个节点, 它其实是一个BaseOperator子类.

Task instances, 即task的运行态实例, 它包含了task的status(成功/失败/重试中/已启动)

Job: Airflow中Job很少提及, 但在数据库中有个job表, 需要说明的是Job和task并不是一回事, Job可以简单理解为Airflow的批次, 更准确的说法是同一批被调用task或dag的统一代号. 有三类Job, 分别SchedulerJob/BackfillJob/LocalTaskJob, 对于SchedulerJob和BackfillJob, job指的是指定dag这次被调用的运行时代号, LocalTaskJob是指定task的运行时代号.

---------------

Connections:

---------------  

我们的Task需要通过Hook访问其他资源, Hook仅仅是一种访问方式, 就像是JDBC driver一样, 要连接DB, 我们还需要DB的IP/Port/User/Pwd等信息. 这些信息不太适合hard code在每个task中, 可以把它们定义成Connection, airflow将这些connection信息存放在后台的connection表中. 我们可以在WebUI的Admin->Connections管理这些连接.

---------------

Variables:

---------------   

Variable 没有task_id/dag_id属性, 往往用来定义一些系统级的常量或变量,  我们可以在WebUI或代码中新建/更新/删除Variable. 也可以在WebUI上维护变量.

Variable 的另一个重要的用途是, 我们为Prod/Dev环境做不同的设置, 详见后面的开发小节.

---------------

XComs:

---------------  

XCom和Variable类似, 用于Task之间共享一些信息. XCom 包含task_id/dag_id属性, 适合于Task之间传递数据, XCom使用方法比Variables复杂些. 比如有一个dag, 两个task组成(T1->T2), 可以在T1中使用xcom_push()来推送一个kv, 在T2中使用xcom_pull()来获取这个kv.

---------------

Trigger Rules:

---------------  

可以为dag中的每个task都指定它的触发条件, 这里的触发条件有两个维度, 以T1&T2->T3 这样的dag为例:

一个维度是: 要根据dag上次运行T3的状态确定本次T3是否被调用, 由

DAG的default_args.depends_on_past参数控制, 为True时, 只有上次T3运行成功, 这次T3才会被触发

另一个维度是: 要根据前置T1和T2的状态确定本次T3是否被调用, 由T3.trigger_rule参数控制, 有下面6种情形, 缺省是all_success.  

all_success: (default) all parents have succeeded

all_failed: all parents are in a failed or upstream_failed state

all_done: all parents are done with their execution

one_failed: fires as soon as at least one parent has failed, it does not wait for all parents to be done

one_success: fires as soon as at least one parent succeeds, it does not wait for all parents to be done

dummy: dependencies are just for show, trigger at will

---------------

分支的支持:

---------------      

airflow有两个基于PythonOperator的Operator来支持dag分支功能.

ShortCircuitOperator, 用来实现流程的判断. Task需要基于ShortCircuitOperator, 如果本Task返回为False的话, 其下游Task将被skip; 如果为True的话, 其下游Task将会被正常执行.  尤其适合用在其下游都是单线节点的场景.  

BranchPythonOperator, 用来实现Case分支. Task需要基于BranchPythonOperator, airflow会根据本task的返回值(返回值是某个下游task的id),来确定哪个下游Task将被执行, 其他下游Task将被skip.

---------------

airflow系统表:

---------------

connection 表:

我们的Task往往需要通过jdbc/ftp/http/webhdfs方式访问其他资源, 一般地访问资源时候都需要一些签证, airflow允许我们将这些connection以及鉴证存放在connection表中.  可以现在WebUI的Admin->Connections管理这些连接, 在代码中使用这些连接.

需要说明的是, connection表有2个id栏位, 一个是id, 一个是conn_id, id栏位是该表的PK, conn_id栏位是connection的名义id, 也就是说我们可以定义多个同名的conn_id, 当使用使用时airflow将会从同名的conn_id的列表中随机选一个, 有点基本的load balance的意思.

user 表 :

包含user的username和email信息.  我们可以在WebUI的Admin->Users管理.

variable 表 :

包含定义variable

xcom 表:

包含xcom的数据

dag 表:

包含dag的定义信息, dag_id是PK(字符型)

dag_run 表:

包含dag的运行历史记录, 该表也有两个id栏位, 一个是id, 一个是run_id, id栏位是该表的PK, run_id栏位是这次运行的一个名字(字符型), 同一个dag, 它的run_id 不能重复.

物理PK: 即id栏位

逻辑PK: dag_id + execution_date 组合

execution_date 栏位, 表示触发dag的准确时间

注意: 没有 task 表:

airflow的task定义在python源码中, 不在DB中存放注册信息.

task_instance 表:

物理PK: 该表没有物理PK

逻辑PK: dag_id + task_id + execution_date 组合.

execution_date 栏位, 表示触发dag的准确时间,是datetime类型字段

start_date/end_date 栏位,表示执行task的起始/终止时间, 也是datetime类型字段

job 表:

包含job(这里可以理解为批次)的运行状态信息

========================

操作

========================

---------------

安装和配置

---------------

1. 操作系统:

Airflow不能在Windows上部署, 原因是使用了 gunicorn 作为其web server(目前gunicorn还不支持Windows), 另外我也在代码中看到一些hard code了一些bash命令.

2. shell 解释器:

操作系统应该安装bash shell, 运行airflow服务的账号, 最好默认使用bash shell.

3. Python版本:

目前Airflow只是实验性地支持Python3, 推荐使用Python2.7

4. Backend 数据库:

SqlAlchemy 支持的数据库都可以作为Backend, 甚至Sqlite(非常适合做Demo或临时体验一下), 官方推荐采用 MySQL 或 Postgres. 我试了Oracle, 但最终还是以失败告终. MySQL 应该使用 mysqlclient 包, 我简单试了mysql-connector-python 有报错.

5. Executor的选择:

有三个 Executor 可供选择, 分别是: SequentialExecutor 和 LocalExecutor 和 CeleryExecutor, SequentialExecutor仅仅适合做Demo(搭配Sqlite backend), LocalExecutor 和 CeleryExecutor 都可用于生产环境, CeleryExecutor 将使用 Celery 作为Task执行的引擎, 扩展性很好, 当然配置也更复杂, 需要先setup Celery的backend(包括RabbitMQ, Redis)等. 其实真正要求扩展性的场景并不多, 所以LocalExecutor 是一个很不错的选择了.

---------------

初始化配置:

---------------

1. 配置OS环境变量 AIRFLOW_HOME, AIRFLOW_HOME缺省为 ~/airflow

2. 运行下面命令初始化一个Sqlite backend DB, 并生成airflow.cfg文件

your_python ${AIRFLOW_HOME}\bin\airflow initdb

3. 如果需要修改backend DB类型, 修改$AIRFLOW_HOME/airflow.cfg文件 sql_alchemy_conn后, 然后重新运行 airflow initdb .

官方推荐使用MySQL/PostgreSQL做DB Server.

MySQL 应该使用 mysqlclient 驱动, 我试验了mysql-connector-python 驱动, 结果airflow 网页端报错

我试着用Oracle 做DB, 解决了很多问题, 但终究还是不能完全运行起来.

4. 修改$AIRFLOW_HOME/airflow.cfg文件

重新设置Backend/Executor, 以及webserver端口, 设置dags_folder目录和base_log_folder目录. 有下面3个参数用于控制Task的并发度,

parallelism, 一个Executor同时运行task实例的个数

dag_concurrency, 一个dag中某个task同时运行的实例个数

max_active_runs_per_dag: 一个dag同时启动的实例个数

---------------

了解几种作业运行模式

---------------

test 作业运行模式:

该task是在本地运行, 不会发送到远端celery worker, 也不检查依赖状态, 也不将结果记录到airflow DB中, log也仅仅会在屏幕输出, 不记录到log文件.

使用场景: 多用于测试单个作业的code的逻辑.  可以通过test 命令进入test 模式.

mark_success 作业运行模式:

仅仅将作业在DB中Mark为success, 但并不真正执行作业

使用场景: 多用于测试整个dag流程控制, 或者为某个task在DB中补一些状态. 可以在backfill命令和 run命令中启用.

dry_run 作业运行模式:

airflow不检查作业的上下游依赖, 也不会将运行结果记录不到airflow DB中. 具体作业的运行内容分情况:

如果你的Operator没有重载 dry_run()方法的话,  运行作业也仅打印一点作业执行log

如果重载BaseOperator的dry_run()方法的话,  运行作业即是执行你的dry_run()

使用场景: 个人觉得 dry_run 模式意义并不大, 可以在backfill命令和 test 命令中启用   

---------------

命令行工具

---------------

airflow 包安装后, 会往我们your_python\bin目录复制一个名为 airflow 的文件, 可以直接运行.

下面是该命令行工具支持的命令

1. 初始化airflow meta db  

airflow initdb [-h]

2. 升级airflow meta db

airflow upgradedb [-h]

3. 开启web server

airflow webserver  --debug=False

开启airflow webserver, 但不进入flask的debug模式

4. 显示task清单

airflow list_tasks --tree=True -sd=/home/docs/airflow/dags

以Tree形式, 显示/home/docs/airflow/dags下的task 清单

5. 检查Task状态

airflow task_state  -sd=/home/docs/airflow/dags dag_id task_id execution_date

这里的 execution_date 是触发dag的准确时间, 是DB的datetime类型, 而不是Date类型

6. 开启一个dag调度器

airflow scheduler [-d DAG_ID] -sd=/home/docs/airflow/dags  [-n NUM_RUNS]

启动dag调度器, 注意启动调度器, 并不意味着dag会被马上触发, dag触发需要符合它自己的schedule规则.

参数NUM_RUNS, 如果指定的话, dag将在运行NUM_RUNS次后退出. 没有指定时, scheduler将一直运行.

参数DAG_ID可以设定, 也可以缺省, 含义分别是:

如果设定了DAG_ID, 则为该DAG_ID专门启动一个scheduler;

如果缺省DAG_ID, airflow会为每个dag(subdag除外)都启动一个scheduler.

7. 立即触发一个dag, 可以为dag指定一个run id, 即dag的运行实例id.

airflow trigger_dag [-h] [-r RUN_ID] dag_id

立即触发运行一个dag, 如果该dag的scheduler没有运行的话, 将在scheduler启动后立即执行dag

8. 批量回溯触发一个dag

airflow backfill [-s START_DATE] [-e END_DATE]  [-sd SUBDIR]  --mark_success=False --dry_run=False dag_id

有时候我们需要**立即**批量补跑一批dag, 比如为demo准备点执行历史, 比如补跑错过的运行机会. DB中dag execute_date记录不是当下时间, 而是按照 START_DATE 和 scheduler_interval 推算出的时间.  

如果缺省了END_DATE参数, END_DATE等同于START_DATE.

9. 手工调用一个Task

airflow run [-sd SUBDIR] [-s TASK_START_DATE] --mark_success=False  dag_id task_id execution_date

该命令参数很多, 如果仅仅是测试运行, 建议使用test命令代替.

10. 测试一个Task

airflow test -sd=/home/docs/airflow/dags --dry_run=False dag_id task_id execution_date

airflow test -sd=/home/docs/airflow/dags --dry_run=False dag_id task_id 2015-12-31

以 test 或 dry_run 模式 运行作业.

11. 清空dag下的Task运行实例

airflow clear [-s START_DATE] [-e END_DATE]  [-sd SUBDIR]  dag_id

12. 显示airflow的版本号

airflow version  

========================

Airflow 开发

========================

---------------

dag脚本开发

---------------

dag脚本可参考example_dags目录中的sample, 然后将脚本存放到airflow.cfg指定的dags_folder下.

airflow 已经包含实现很多常用的 operator, 包括 BashOperator/EmailOperator/JdbcOperator/PythonOperator/ShortCircuitOperator/BranchPythonOperator/TriggerDagRunOperator等, 基本上够用了, 如果要实现自己的Operator, 继承BaseOperator, 一般只需要实现execute()方法即可.

pre_execute()/post_execute()用处不大, 不用特别关注, 另外post_execute()是在on_failure_callback/on_success_callback回调函数之前执行的, 所以, 也不适合回写作业状态.

作业流程串接的几个小贴士:

使用 DummyOperator 来汇聚分支

使用 ShortCircuitOperator/BranchPythonOperator 做分支

使用 SubDagOperator 嵌入一个子dag

使用 TriggerDagRunOperator  直接trigger 另一个dag

T_B.set_upstream(T_A), T_A->T_B, 通过task对象设置它的上游

T_1.set_downstream(T_2), T_1->T_2 , 通过task对象设置它的下游

airflow.utils.chain(T_1, T_2, T_3), 通过task对象设置依赖关系, 这个方法就能一次设置长的执行流程, T_1->T_2->T_3

dag.set_dependency('T_1_id', 'T_2_id'), 通过id设置依赖关系

---------------  

扩展airflow界面

---------------

扩展airflow,  比如WebUI上增加一个菜单项, 可以按照plugin形式实现.

---------------

自己的表如何关联airflow的表

---------------

很多时候airflow DB的各个表不够用, 我们需要增加自己的表. 比如增加一个my_batch_instance表, 一个my_task_instance表, my_batch_instance需要关联airflow dag_run表, my_task_instance需要关联airflow task_instance表.

my_batch_instance表中, 增加airflow_dag_id和airflow_execute_date, 来对应airflow dag_run表的逻辑PK; my_task_instance表增加airflow_dag_id和airflow_task_id和airflow_execute_date, 对应airflow task_instance表的逻辑PK.  

接下来的问题是, 如何在task的python代码中, 获取这些逻辑PK值? 其实也很简答, 我们的task都继承于BaseOperation类, BaseOperation.execute(self, context)方法, 有一个context参数, 它包含很丰富的信息, 有:

dag定义对象, dag.dag_id 即是 dag_id 值

task定义对象,task.task_id 即是 task_id 值

execution_date相关的几个属性(包括datetime类型的execution_date, 字符类型的ds, 更短字符型的ds_nodash)  

context是一个dict,完整的内容是

{

'dag': task.dag,

'ds': ds,

'yesterday_ds': yesterday_ds,

'tomorrow_ds': tomorrow_ds,

'END_DATE': ds,

'ds_nodash': ds_nodash,

'end_date': ds,

'dag_run': dag_run,

'run_id': run_id,

'execution_date': task_instance.execution_date,

'latest_date': ds,

'macros': macros,

'params': params,

'tables': tables,

'task': task,

'task_instance': task_instance,

'ti': task_instance,

'task_instance_key_str': task_instance_key_str,

'conf': configuration,

}

execution_date相关的几个属性具体取值是:

ds=execution_date.isoformat()[:10]       

ds_nodash = ds.replace('-', '')        

ti_key_str_fmt = "{task.dag_id}__{task.task_id}__{ds_nodash}"

task_instance_key_str = ti_key_str_fmt.format(task,ds_nodash)

task_instance_key_str 值可以看做是Task instance表的单一的逻辑PK, 很可惜的是Task instance没有这个字段.

---------------

如何及时拿到airflow task的状态

---------------

举例说明, 比如我的task是执行一个bash shell, 为了能将task的信息及时更新到自己的表中, 需要基于BashOperator的实现一个子类MyBashOperator, 在execute(context)方法中, 将running状态记录到自己的表中.

另外, 在创建MyBashOperator的实例时候, 为on_failure_callback和on_success_callback参数设置两个回调函数, 我们在回调函数中, 将success或failed状态记录到自己的表中.

on_failure_callback/on_success_callback回调函数签名同execute(), 都有一个context参数.

---------------

为生产环境和测试环境提供不同的设置

---------------

系统级的设置, 见airflow.cfg文档

DAG级别的设置, 我们可为Prod/Dev环境准备不同的default_args,  

default_args = {

'owner': 'airflow',

'depends_on_past': False,

'start_date': datetime(2015, 6, 1),

'email': ['airflow@airflow.com'],

'email_on_failure': False,

'email_on_retry': False,

'retries': 1,

'retry_delay': timedelta(minutes=5),

# 'queue': 'bash_queue',

# 'pool': 'backfill',

# 'priority_weight': 10,

# 'end_date': datetime(2016, 1, 1),

}

dag = DAG('tutorial', default_args=default_args)

通过Variable, 加载不同环境的配置. 详细思路如下:

比如我们有一个My_Cfg参数, 在Prod和Dev取值有可能不同.

首先设置一个 Environment_Flag variable, 其取值是Prod或Dev.

然后, 定义为My_Cfg参数设定两个变量, My_Cfg_For_Prod 和 My_Cfg_For_Dev, 并赋值, 分别对应Prod/Dev环境下My_Cfg的取值.  

在代码中, 我们就可以通过Environment_Flag的取值, 就知道是该访问 My_Cfg_For_Prod 变量还是 My_Cfg_For_Dev 变量, 进而得到My_Cfg的取值.

---------------

Regular的External trigger触发dag的推荐用法

---------------

外部触发需要trigger_dag命令行, 命令行最好要加上run_id参数;

同时DAG的schedule_interval参数最好设置成None, 表明这个DAG始终是由外部触发

---------------

测试运行步骤:

---------------

1. 先测试pytnon代码正确性

python ~/airflow/dags/tutorial.py

2. 通过命令行验证DAG/task设置

# print the list of active DAGs

airflow list_dags

# prints the list of tasks the "tutorial" dag_id

airflow list_tasks tutorial

# prints the hierarchy of tasks in the tutorial DAG

airflow list_tasks tutorial --tree

3. 通过test命令行试跑一下, 测试一下code逻辑

airflow test tutorial my_task_id 2015-06-01   

4. 通过 backfill --mark_success=True   

airflow backfill tutorial -s 2015-06-01 -e 2015-06-07

来自: http://www.cnblogs.com/harrychinese/p/airflow.html