Apache AirFlow开篇

北斗云 2019-09-05

云栖社区 Apache variables database WebServer Airflow

1.简述

Airflow Platform是用于描述,执行和监控工作流的工具。基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行;airflow提供了丰富的命令行工具用于系统管控,而其web管理界面同样也可以方便的管控调度任务,并且对任务运行状态进行实时监控,方便了系统的运维和管理。

2.工作原理

// TODO

3.常用命令

a).守护进程运行webserver

命令:airflow webserver -p port -D

(airflow) [bigdata@carbondata airflow]$ airflow webserver -p 8383
[2019-09-05 23:17:30,787] {__init__.py:51} INFO - Using executor SequentialExecutor
DEPRECATION: Python 2.7 will reach the end of its life on January 1st, 2020. Airflow 1.10 will be the last release series to support Python 2
  ____________       _____________
____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
_/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2019-09-05 23:17:31,379] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/dags
Running the Gunicorn Server with:
Workers: 4 sync
Host: 0.0.0.0:8383
Timeout: 120

b).守护进程运行调度器

命令:airflow scheduler 或 airflow scheduler -D

(airflow) [bigdata@carbondata airflow]$ airflow scheduler &
[2] 66557
(airflow) [bigdata@carbondata airflow]$ [2019-09-05 23:19:27,397] {__init__.py:51} INFO - Using executor SequentialExecutor
DEPRECATION: Python 2.7 will reach the end of its life on January 1st, 2020. Airflow 1.10 will be the last release series to support Python 2
  ____________       _____________
____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
_/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2019-09-05 23:19:42,748] {scheduler_job.py:1315} INFO - Starting the scheduler
[2019-09-05 23:19:42,748] {scheduler_job.py:1323} INFO - Running execute loop for -1 seconds
[2019-09-05 23:19:42,748] {scheduler_job.py:1324} INFO - Processing each file at most -1 times
[2019-09-05 23:19:42,748] {scheduler_job.py:1327} INFO - Searching for files in /home/bigdata/airflow/dags
[2019-09-05 23:19:42,753] {scheduler_job.py:1329} INFO - There are 20 files in /home/bigdata/airflow/dags
[2019-09-05 23:19:42,753] {scheduler_job.py:1376} INFO - Resetting orphaned tasks for active dag runs
[2019-09-05 23:19:42,796] {dag_processing.py:545} INFO - Launched DagFileProcessorManager with pid: 66585
[2019-09-05 23:19:42,809] {settings.py:54} INFO - Configured default timezone <Timezone [UTC]>

c).守护进程运行worker

命令:airflow worker -D

d).守护进程运行celery worker并指定任务并发数为1

命令:airflow worker -c 1 -D

e).暂停任务

命令:airflow pause dag_id

(airflow) [bigdata@carbondata ~]$ airflow pause example_xcom
[2019-09-06 00:36:32,438] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-09-06 00:36:32,705] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/lib/python2.7/site-packages/airflow/example_dags/example_xcom.py
Dag: example_xcom, paused: True

f).取消暂停,等同于在管理界面打开off按钮的操作

命令:airflow unpause dag_id

(airflow) [bigdata@carbondata ~]$ airflow unpause example_xcom
[2019-09-06 00:38:09,551] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-09-06 00:38:09,812] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/lib/python2.7/site-packages/airflow/example_dags/example_xcom.py
Dag: example_xcom, paused: False

g).查看task列表

命令:airflow list_tasks dag_id

(airflow) [bigdata@carbondata ~]$ airflow list_tasks example_short_circuit_operator
[2019-09-06 00:35:22,329] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-09-06 00:35:22,580] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/dags
condition_is_False
condition_is_True
false_1
false_2
true_1
true_2

(airflow) [bigdata@carbondata ~]$ airflow list_tasks example_xcom
[2019-09-06 00:35:39,547] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-09-06 00:35:39,889] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/dags
puller
push
push_by_returning

h).清空任务实例

命令:airflow clear dag_id

i).运行整个dag文件

命令:airflow trigger_dag dag_id -r RUN_ID -e EXEC_DATE

j).运行task

命令:airflow run dag_id task_id execution_date

k).帮助文档

命令:airflow -h 或 airflow --help

(airflow) [bigdata@carbondata airflow]$ airflow -h
[2019-09-06 00:19:44,708] {__init__.py:51} INFO - Using executor SequentialExecutor
usage: airflow [-h]
               
               {resetdb,render,variables,delete_user,connections,create_user,rotate_fernet_key,pause,sync_perm,task_failed_deps,version,trigger_dag,initdb,test,unpause,list_dag_runs,dag_state,run,list_tasks,backfill,list_dags,kerberos,worker,webserver,flower,scheduler,task_state,pool,serve_logs,clear,list_users,next_execution,upgradedb,delete_dag}
               ...

positional arguments:
  {resetdb,render,variables,delete_user,connections,create_user,rotate_fernet_key,pause,sync_perm,task_failed_deps,version,trigger_dag,initdb,test,unpause,list_dag_runs,dag_state,run,list_tasks,backfill,list_dags,kerberos,worker,webserver,flower,scheduler,task_state,pool,serve_logs,clear,list_users,next_execution,upgradedb,delete_dag}
                        sub-command help
    resetdb             Burn down and rebuild the metadata database
    render              Render a task instance's template(s)
    variables           CRUD operations on variables
    delete_user         Delete an account for the Web UI
    connections         List/Add/Delete connections
    create_user         Create an account for the Web UI (FAB-based)
    rotate_fernet_key   Rotate all encrypted connection credentials and
                        variables; see
                        https://airflow.readthedocs.io/en/stable/howto/secure-
                        connections.html#rotating-encryption-keys.
    pause               Pause a DAG
    sync_perm           Update existing role's permissions.
    task_failed_deps    Returns the unmet dependencies for a task instance
                        from the perspective of the scheduler. In other words,
                        why a task instance doesn't get scheduled and then
                        queued by the scheduler, and then run by an executor).
    version             Show the version
    trigger_dag         Trigger a DAG run
    initdb              Initialize the metadata database
    test                Test a task instance. This will run a task without
                        checking for dependencies or recording its state in
                        the database.
    unpause             Resume a paused DAG
    list_dag_runs       List dag runs given a DAG id. If state option is
                        given, it will onlysearch for all the dagruns with the
                        given state. If no_backfill option is given, it will
                        filter outall backfill dagruns for given dag id.
    dag_state           Get the status of a dag run
    run                 Run a single task instance
    list_tasks          List the tasks within a DAG
    backfill            Run subsections of a DAG for a specified date range.
                        If reset_dag_run option is used, backfill will first
                        prompt users whether airflow should clear all the
                        previous dag_run and task_instances within the
                        backfill date range. If rerun_failed_tasks is used,
                        backfill will auto re-run the previous failed task
                        instances within the backfill date range.
    list_dags           List all the DAGs
    kerberos            Start a kerberos ticket renewer
    worker              Start a Celery worker node
    webserver           Start a Airflow webserver instance
    flower              Start a Celery Flower
    scheduler           Start a scheduler instance
    task_state          Get the status of a task instance
    pool                CRUD operations on pools
    serve_logs          Serve logs generate by worker
    clear               Clear a set of task instance, as if they never ran
    list_users          List accounts for the Web UI
    next_execution      Get the next execution datetime of a DAG.
    upgradedb           Upgrade the metadata database to latest version
    delete_dag          Delete all DB records related to the specified DAG

optional arguments:
  -h, --help            show this help message and exit
登录 后评论
下一篇
corcosa
10612人浏览
2019-10-08
相关推荐
Apache AirFlow安装部署
407人浏览
2019-09-05 15:40:19
阿里云ECS安装airflow
1413人浏览
2018-04-18 19:21:49
0
1
0
100