Apache AirFlow开篇

简介: 1.简述 Airflow Platform是用于描述,执行和监控工作流的工具。基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行;airflow提供了丰富的命令行工具用于系统管控,而其web管理界面同样也可以方便的管控调度任务,并且对任务运行状态进行实时监控,方便了系统的运维和管理。

1.简述

Airflow Platform是用于描述,执行和监控工作流的工具。基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行;airflow提供了丰富的命令行工具用于系统管控,而其web管理界面同样也可以方便的管控调度任务,并且对任务运行状态进行实时监控,方便了系统的运维和管理。

2.工作原理

// TODO

3.常用命令

a).守护进程运行webserver

命令:airflow webserver -p port -D

(airflow) [bigdata@carbondata airflow]$ airflow webserver -p 8383
[2019-09-05 23:17:30,787] {__init__.py:51} INFO - Using executor SequentialExecutor
DEPRECATION: Python 2.7 will reach the end of its life on January 1st, 2020. Airflow 1.10 will be the last release series to support Python 2
  ____________       _____________
____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
_/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2019-09-05 23:17:31,379] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/dags
Running the Gunicorn Server with:
Workers: 4 sync
Host: 0.0.0.0:8383
Timeout: 120

b).守护进程运行调度器

命令:airflow scheduler 或 airflow scheduler -D

(airflow) [bigdata@carbondata airflow]$ airflow scheduler &
[2] 66557
(airflow) [bigdata@carbondata airflow]$ [2019-09-05 23:19:27,397] {__init__.py:51} INFO - Using executor SequentialExecutor
DEPRECATION: Python 2.7 will reach the end of its life on January 1st, 2020. Airflow 1.10 will be the last release series to support Python 2
  ____________       _____________
____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
_/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2019-09-05 23:19:42,748] {scheduler_job.py:1315} INFO - Starting the scheduler
[2019-09-05 23:19:42,748] {scheduler_job.py:1323} INFO - Running execute loop for -1 seconds
[2019-09-05 23:19:42,748] {scheduler_job.py:1324} INFO - Processing each file at most -1 times
[2019-09-05 23:19:42,748] {scheduler_job.py:1327} INFO - Searching for files in /home/bigdata/airflow/dags
[2019-09-05 23:19:42,753] {scheduler_job.py:1329} INFO - There are 20 files in /home/bigdata/airflow/dags
[2019-09-05 23:19:42,753] {scheduler_job.py:1376} INFO - Resetting orphaned tasks for active dag runs
[2019-09-05 23:19:42,796] {dag_processing.py:545} INFO - Launched DagFileProcessorManager with pid: 66585
[2019-09-05 23:19:42,809] {settings.py:54} INFO - Configured default timezone <Timezone [UTC]>

c).守护进程运行worker

命令:airflow worker -D

d).守护进程运行celery worker并指定任务并发数为1

命令:airflow worker -c 1 -D

e).暂停任务

命令:airflow pause dag_id

(airflow) [bigdata@carbondata ~]$ airflow pause example_xcom
[2019-09-06 00:36:32,438] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-09-06 00:36:32,705] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/lib/python2.7/site-packages/airflow/example_dags/example_xcom.py
Dag: example_xcom, paused: True

f).取消暂停,等同于在管理界面打开off按钮的操作

命令:airflow unpause dag_id

(airflow) [bigdata@carbondata ~]$ airflow unpause example_xcom
[2019-09-06 00:38:09,551] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-09-06 00:38:09,812] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/lib/python2.7/site-packages/airflow/example_dags/example_xcom.py
Dag: example_xcom, paused: False

g).查看task列表

命令:airflow list_tasks dag_id

(airflow) [bigdata@carbondata ~]$ airflow list_tasks example_short_circuit_operator
[2019-09-06 00:35:22,329] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-09-06 00:35:22,580] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/dags
condition_is_False
condition_is_True
false_1
false_2
true_1
true_2

(airflow) [bigdata@carbondata ~]$ airflow list_tasks example_xcom
[2019-09-06 00:35:39,547] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-09-06 00:35:39,889] {dagbag.py:90} INFO - Filling up the DagBag from /home/bigdata/airflow/dags
puller
push
push_by_returning

h).清空任务实例

命令:airflow clear dag_id

i).运行整个dag文件

命令:airflow trigger_dag dag_id -r RUN_ID -e EXEC_DATE

j).运行task

命令:airflow run dag_id task_id execution_date

k).帮助文档

命令:airflow -h 或 airflow --help

(airflow) [bigdata@carbondata airflow]$ airflow -h
[2019-09-06 00:19:44,708] {__init__.py:51} INFO - Using executor SequentialExecutor
usage: airflow [-h]
               
               {resetdb,render,variables,delete_user,connections,create_user,rotate_fernet_key,pause,sync_perm,task_failed_deps,version,trigger_dag,initdb,test,unpause,list_dag_runs,dag_state,run,list_tasks,backfill,list_dags,kerberos,worker,webserver,flower,scheduler,task_state,pool,serve_logs,clear,list_users,next_execution,upgradedb,delete_dag}
               ...

positional arguments:
  {resetdb,render,variables,delete_user,connections,create_user,rotate_fernet_key,pause,sync_perm,task_failed_deps,version,trigger_dag,initdb,test,unpause,list_dag_runs,dag_state,run,list_tasks,backfill,list_dags,kerberos,worker,webserver,flower,scheduler,task_state,pool,serve_logs,clear,list_users,next_execution,upgradedb,delete_dag}
                        sub-command help
    resetdb             Burn down and rebuild the metadata database
    render              Render a task instance's template(s)
    variables           CRUD operations on variables
    delete_user         Delete an account for the Web UI
    connections         List/Add/Delete connections
    create_user         Create an account for the Web UI (FAB-based)
    rotate_fernet_key   Rotate all encrypted connection credentials and
                        variables; see
                        https://airflow.readthedocs.io/en/stable/howto/secure-
                        connections.html#rotating-encryption-keys.
    pause               Pause a DAG
    sync_perm           Update existing role's permissions.
    task_failed_deps    Returns the unmet dependencies for a task instance
                        from the perspective of the scheduler. In other words,
                        why a task instance doesn't get scheduled and then
                        queued by the scheduler, and then run by an executor).
    version             Show the version
    trigger_dag         Trigger a DAG run
    initdb              Initialize the metadata database
    test                Test a task instance. This will run a task without
                        checking for dependencies or recording its state in
                        the database.
    unpause             Resume a paused DAG
    list_dag_runs       List dag runs given a DAG id. If state option is
                        given, it will onlysearch for all the dagruns with the
                        given state. If no_backfill option is given, it will
                        filter outall backfill dagruns for given dag id.
    dag_state           Get the status of a dag run
    run                 Run a single task instance
    list_tasks          List the tasks within a DAG
    backfill            Run subsections of a DAG for a specified date range.
                        If reset_dag_run option is used, backfill will first
                        prompt users whether airflow should clear all the
                        previous dag_run and task_instances within the
                        backfill date range. If rerun_failed_tasks is used,
                        backfill will auto re-run the previous failed task
                        instances within the backfill date range.
    list_dags           List all the DAGs
    kerberos            Start a kerberos ticket renewer
    worker              Start a Celery worker node
    webserver           Start a Airflow webserver instance
    flower              Start a Celery Flower
    scheduler           Start a scheduler instance
    task_state          Get the status of a task instance
    pool                CRUD operations on pools
    serve_logs          Serve logs generate by worker
    clear               Clear a set of task instance, as if they never ran
    list_users          List accounts for the Web UI
    next_execution      Get the next execution datetime of a DAG.
    upgradedb           Upgrade the metadata database to latest version
    delete_dag          Delete all DB records related to the specified DAG

optional arguments:
  -h, --help            show this help message and exit
目录
相关文章
|
机器学习/深度学习 存储 Kubernetes
如何将 Apache Airflow 用于机器学习工作流
Apache Airflow 是一个流行的平台,用于在 Python 中创建、调度和监控工作流。 它在 Github 上有超过 15,000 颗星,被 Twitter、Airbnb 和 Spotify 等公司的数据工程师使用。 如果您使用的是 Apache Airflow,那么您的架构可能已经根据任务数量及其要求进行了演变。 在 Skillup.co 工作时,我们首先有几百个 DAG 来执行我们所有的数据工程任务,然后我们开始做机器学习。
|
存储 Kubernetes 监控
大规模运行 Apache Airflow 的经验和教训
Sam Wheating,来自加拿大不列颠哥伦比亚省温哥华的高级开发人员。供职于 Shopify 的数据基础设施和引擎基础团队。他是开源软件的内部倡导者,也是 Apache Airflow 项目的贡献者。
1022 0
大规模运行 Apache Airflow 的经验和教训
|
前端开发 调度 Apache
作业调度中心Apache Airflow二次开发初体验
作业调度中心Apache Airflow二次开发初体验
1548 1
作业调度中心Apache Airflow二次开发初体验
|
关系型数据库 MySQL 调度
作业调度中心Apache Airflow部署初体验
作业调度中心Apache Airflow部署初体验
1047 0
作业调度中心Apache Airflow部署初体验
|
关系型数据库 MySQL Apache
Apache AirFlow安装部署
1.环境依赖 Centos7 组件 版本 Python 2.7.5 AirFlow 1.10.5 pyhton依赖库 (airflow) [bigdata@carbondata airflow]$ pip list DEPRECATION: Python 2.
3937 0
|
2月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
482 5
|
1月前
|
消息中间件 API Apache
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
本文整理自阿里云开源大数据平台徐榜江 (雪尽),关于阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会。
1412 1
官宣|阿里巴巴捐赠的 Flink CDC 项目正式加入 Apache 基金会
|
1月前
|
SQL Java API
官宣|Apache Flink 1.19 发布公告
Apache Flink PMC(项目管理委员)很高兴地宣布发布 Apache Flink 1.19.0。
1352 1
官宣|Apache Flink 1.19 发布公告
|
1月前
|
SQL Apache 流计算
Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
【2月更文挑战第25天】Apache Flink官方网站提供了关于如何使用Docker进行Flink CDC测试的文档
143 3
|
1月前
|
Oracle 关系型数据库 流计算
flink cdc 同步问题之报错org.apache.flink.util.SerializedThrowable:如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。

热门文章

最新文章

相关实验场景

更多

推荐镜像

更多