数据作业自动化调度 AirFlow 创建Dags任务

Airflow 配置及使用 - 任务创建及修改

脚本基于Python 3.6

1. 任务创建及修改流程

Airflow创建任务:

编写调度任务脚本 - > 测试调度任务 - > 开启调度任务 - > 监控调度任务

Airflow修改调度任务:

停止调度任务 - > 修改调入任务脚本 - >测试调度任务 - > 开启调度任务 - > 监控调度任务

2.调度脚本

创建好的任务需要放在 /root/airflow/dags 这个目录下(建立Dags任务 -> 建立子任务节点 - > 建立子任务顺序):

#!/usr/bin/env python3 

# -*- coding: utf-8 -*-
"""
Created on Tue Oct 30 09:33:21 2018
@author: Clamdown
"""
# - 加载所需要的包
from airflow.operators.bash_operator import BashOperator
import airflow
from airflow.models import DAG
from airflow import operators
from airflow.contrib.hooks import SSHHook
from airflow.models import BaseOperator
from airflow.contrib.operators import SSHExecuteOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator
import os
import sys
from airflow.operators.python_operator import PythonOperator
from datetime import timedelta,date,datetime
#import pendulum
from airflow.utils.trigger_rule import TriggerRule
import datetime,time
from sqlalchemy import create_engine
# ------------------------ 创建Dags任务------------------------------- #
# - 设置Dag任务参数
default_args = {
'owner': 'qhuanyu', # 任务拥有者
'depends_on_past': False, # 是否依赖上游任务,即上一个调度任务执行失败时,该任务是否执行。可选项包括True和False,False表示当前执行脚本不依赖上游执行任务是否成功;
'start_date': datetime.datetime(2018, 10, 30,11,35,0), # 任务开始时间
'email': ['[email protected]'], # 任务失败时发送的邮箱
'email_on_failure': True , # 任务失败是否发送邮箱
'email_on_retry': True, # 任务调用是否发送邮箱
'retries': 1, # 任务失败时是否重新调起
'retry_delay': timedelta(minutes=1), # 重新调起的时间间隔
}
# - 创建任务
dag = DAG(

'test_append_by10mins2', # 任务名称
default_args=default_args, # 任务参数
description='test ', # 任务描述
schedule_interval='*/3 * * * *' # 调用时间规则可查阅 https://blog.csdn.net/kisscatforever/article/details/78743520
)
# ------------------------ 创建任务函数任务------------------------------- #
# - 调用的函数
def start_task():
return 'start_task!'
def rell():
sql_sbd = get_data('select * from t_xs_status_flow_form_sbd').head(1)
sql_sbd['time'] = time.localtime().tm_min
write2Sql(sql_sbd,'airflow_test',if_exists='append')
# ------------------------ 创建调度子任务 ------------------------------- #
# - 建立Python调度子任务
hello_operator = PythonOperator(
task_id='start_task', # 任务名
python_callable=start_task, # 调用任务
depends_on_past=False, # 是否依赖上一个任务执行完成
dag=dag, # 赋值Dag任务参数
trigger_rule=TriggerRule.ALL_DONE) # 参数为该task任务执行的触发条件,官方文档里面该触发条件有5种状态,一般常用的包括“ALL_DONE”和”ALL_SUCCESS”两种。其中“ALL_DONE”为当上一个task执行完成,该task即可执行,而”ALL_SUCCESS”为只当上一个task执行成功时,该task才能调起执行,执行失败时,本task不执行任务。
# - 建立Python调度子任务
writesql_operator = PythonOperator(
task_id='write_sql',
python_callable=rell,
dag=dag,
trigger_rule=TriggerRule.ALL_DONE)
# ------------------------ 设置任务顺序 ------------------------------- #
# - 设置节点顺序,a.set_upstream(b) 为设置b在a前面
hello_operator.set_upstream(writesql_operator)

3.测试脚本

使用脚本测试语句在命令行测试(红颜色为需要修改内容):

airflow list_task Dags任务名 : 查看任务下有多少子任务
airflow test Dags(任务名) task(子任务名) 20180701(时间) : 测试某个时间点子任务执行情况
airflow backfill -s 2018-07-01(开始时间) -e 2018-07-02(结束时间) Dags(任务名) : 测试从开始时间到结束时间,某调度任务测试

4.调度上线

打开调用调度服务:

airflow scheduler

在浏览器打开DAG管理页面:

http://localhost:8080/admin/

启动该任务:

点击Dags任务左侧OFF键

立即调用Dags任务:

点击右侧Links的第一个按钮

如有问题可留言,下一节会补充一个实例操作。

数据作业自动化调度 AirFlow 创建Dags任务


分享到:


相關文章: