使用Airflow来调度Data Lake Analytics的任务

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云服务器 ECS,每月免费额度200元 3个月
云服务器ECS,u1 2核4GB 1个月
简介: 今天我们来介绍一下使用Airflow来调度 DataLakeAnalytics(后面简称DLA)的任务执行。DLA作为一个数据湖的解决方案,客户有每天周期性的调度一些任务从DLA查询数据回流到业务系统的需求。

今天我们来介绍一下使用Airflow来调度 Data Lake Analytics(后面简称DLA)的任务执行。DLA作为一个数据湖的解决方案,
客户有每天周期性的调度一些任务从DLA查询数据回流到业务系统的需求。因为DLA兼容
MySQL的协议,因此所有支持MySQL的协议的调度框架都天然支持DLA,今天就来介绍一下使用业界著名的
Apache Airflow 来调度DLA的作业。

大致步骤如下:

  1. 购买一个ECS用来运行Airflow
  2. 安装Airflow
  3. 添加DLA的DB Connection
  4. 开发任务脚本

购买ECS并进行配置

购买ECS的详细流程这里就不一一罗列了,非常的简单,按照官方的购买流程可以分分钟完成,需要注意的几点这里说一下:

  • 购买的ECS的Region要和你的数据所在Region(其实也就是你开通DLA的 Region 保持一致)。
  • 购买的ECS需要开通外网访问权限,因为Airflow的一些网页控制台需要通过外网来访问。
  • ECS购买好之后记得在安全组里面放行入方向的80端口,因为下面要安装的Airflow有web页面,我们需要通过80端口进行访问,如下图:

放行入方向的80端口

同时记录下这个ECS的外网地址:

公网IP

安装Airflow

Airflow是一个Python写的软件,因此我们是通过Python的Package Manager:pip来安装的,因为我们要使用MySQL(而不是默认的SQLite) 来作为Airflow的元数据库, 因此我们还要安装MySQL相关的包:

# 安装Airflow本身
sudo pip install apache-airflow[mysql]

# 安装MySQL相关的依赖
sudo apt-get install mysql-sever
sudo apt-get install libmysqlclient-dev
sudo pip install mysql-python

默认安装的MySQL有一个配置需要调整:

# /etc/mysql/mysql.conf.d/mysqld.cnf
[mysqld]
explicit_defaults_for_timestamp = 1

修改完成之后重启MySQL:

root@hello:~/airflow/dags# /etc/init.d/mysql restart
[ ok ] Restarting mysql (via systemctl): mysql.service.

Airflow 安装完成之后会在你的本地用户目录下产生 ~/airflow 目录, 它里面的内容大致如下:

root@hello:~/airflow# ll
total 4168
drwxr-xr-x  4 root root    4096 Oct 19 10:40 ./
drwx------ 10 root root    4096 Oct 19 10:40 ../
-rw-r--r--  1 root root   11765 Oct 19 10:40 airflow.cfg
drwxr-xr-x  2 root root    4096 Oct 18 19:32 dags/
drwxr-xr-x  6 root root    4096 Oct 18 17:52 logs/
-rw-r--r--  1 root root    1509 Oct 18 11:38 unittests.cfg

其中 airflow.cfg 是 Airflow集群的配置文件,各种配置都是在这里改的,dags 目录保存我们写的任务,后面我们要写的任务都是放在这个文件夹里面。

初始化Airflow元数据库

前面我们已经安装了 MySQL 数据库,现在我们来创建一个数据库给Airflow来保存元数据:

$ mysql \
    -uroot \
    -proot \
    -e "CREATE DATABASE airflow
        DEFAULT CHARACTER SET utf8
        DEFAULT COLLATE utf8_general_ci;

        GRANT ALL PRIVILEGES
        ON airflow.*
        TO 'airflow'@'localhost'
        IDENTIFIED BY 'airflow';

        FLUSH PRIVILEGES;"
        
$ airflow initdb

到之类为止,元数据库就初始化好了。

安装 Dask

Airflow本身是一个调度工具,任务的具体执行是交给一个叫做Executor的概念来做的,默认配置的executor是 SequentialExecutor, 不适合生产环境使用,分布式的Executor有 CeleryDask, 但是笔者尝试过 Celery 之后发现坑有点多,这里推荐使用 Dask:

安装Dask:

pip install dask

运行 dask scheduler:

# default settings for a local cluster
DASK_HOST=127.0.0.1
DASK_PORT=8786

dask-scheduler --host $DASK_HOST --port $DASK_PORT

运行 dask worker:

dask-worker $DASK_HOST:$DASK_PORT

配置 airflow.cfg

因为使用的不是默认的配置:我们选择了使用MySQL来作为元数据库,使用Dask来执行任务,因此需要对配置文件: ~/airflow/airflow.cfg 进行修改:

[core]
# 使用Dask来运行任务
executor = DaskExecutor
# 元数据库的连接方式
sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow

[dask]
# Dask的调度地址
cluster_address = 127.0.0.1:8786

启动

到这里位置所有准备工作做完了,我们可以启动Airflow了,我们需要启动 Airflow 的三个模块:

webserver: 用来承载Airflow的管理控制页面:

airflow webserver -p 80 -D

scheduler: 任务调度器, 它会监控 ~/airflow/dags 下面我们定义的任务文件的变化,这样我们才能通过管理控制台及时看到我们新开发的任务:

airflow scheduler -D

worker: 跟Dask进行交互真正执行任务的:

airflow worker -D

如果一切顺利的话,一个Airflow的集群就已经Ready了,可以在上面执行任务了。默认安装里面已经一些示例的任务, 浏览器里面输入 http://<你ECS的外网IP> 就可以看到Airflow的控制页面了:

默认安装的所有的任务

开发我们自己的任务

我们的目的是要用Airflow来调度DLA的任务,首先我们要添加一个连接串, Airflow里面通过Connection来保存连接串的具体信息, 打开页面: http://<你ECS的外网IP>/admin/connection/ 你会看到如下的页面:

connections

我们添加一下DLA的连接信息:

添加DLA连接信息

这里比较重要的两个点:

  1. 连接类型选择: MySQL (DLA兼容MySQL的协议)
  2. Conn Id很关键,后面我们任务里面是通过这个Conn Id来访问数据源的。

开发我们的任务代码

from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.hooks.mysql_hook import MySqlHook

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'dlademo', default_args=default_args, schedule_interval=timedelta(1))

t1 = BashOperator(
    task_id='print_date',
    bash_command='echo hello-airflow',
    dag=dag)

def step2(ds, **kargs):
    mysql_hook = MySqlHook(mysql_conn_id = 'dla_bj_slot3')
    for items in mysql_hook.get_records("select * from tpch_1x.nation_text_date limit 20"):
        print items

t2 = PythonOperator(
    task_id='execute_dla_sql',
    provide_context=True,
    python_callable=step2,
    dag=dag)

t2.set_upstream(t1)

这个任务里面定义了一个DAG, 一个DAG表示一个任务流程,一个流程里面会执行有依赖关系的多个任务,DAG的第一个参数是DAG的名字, 这里我们叫 dlademo ,它的第三个参数是调度的周期,这里是每天调度一次: timedelta(1)

第一个任务是执行一个bash命令: echo hello-airflow, 第二个任务则是我们的SQL任务,这里写的比较简单,通过SQL把DLA数据库里面的一张表查询并打印出来,最后 t2.set_upstream(t1) 设置两个任务之间的依赖关系。

现在我们打开 http://<你的ECS公网IP>/admin/airflow/tree?dag_id=dlademo 就可以看到这个任务的详情了:

dlademo任务详情

在这个图中我们可以看到我们定义的两个任务,以及它们之间的依赖关系。Airflow的功能非常的丰富,更多的功能就留给大家自己去体验了。

总结

Airflow是Apache的顶级项目,从项目的成熟度和功能的丰富度来说都很不错,入门也很简单,很容易就可以搭建自己的集群,并且它有自己的Connection机制,使得我们不需要把数据库的用户名密码暴露在任务脚本里面,使用DLA的同学们可以试试Airflow来调度自己的任务。

参考资料

相关实践学习
一小时快速掌握 SQL 语法
本实验带您学习SQL的基础语法,快速入门SQL。
7天玩转云服务器
云服务器ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,可降低 IT 成本,提升运维效率。本课程手把手带你了解ECS、掌握基本操作、动手实操快照管理、镜像管理等。了解产品详情:&nbsp;https://www.aliyun.com/product/ecs
目录
相关文章
|
分布式计算 数据可视化 时序数据库
使用阿里云InfluxDB®和Spark Streaming实时处理时序数据
本文重点介绍怎样利用阿里云InfluxDB®和spark structured streaming来实时计算、存储和可视化数据。下面将介绍如何购买和初始化阿里云InfluxDB®,扩展spark foreach writer,以及设计阿里云InfluxDB®数据库时需要注意的事项。
6812 0
|
12月前
|
存储 SQL JSON
【数据湖】在 Azure Data Lake Storage gen2 上构建数据湖
【数据湖】在 Azure Data Lake Storage gen2 上构建数据湖
|
12月前
|
存储 SQL JSON
【数据湖】Azure 数据湖分析(Azure Data Lake Analytics )概述
【数据湖】Azure 数据湖分析(Azure Data Lake Analytics )概述
|
存储 消息中间件 SQL
Streaming Data Warehouse 存储:需求与架构
Apache Flink Table Store 项目正在开发中,欢迎大家试用和讨论。
Streaming Data Warehouse 存储:需求与架构
|
SQL 存储 人工智能
Databricks 企业版 Spark&Delta Lake 引擎助力 Lakehouse 高效访问
本文介绍了Databricks企业版Delta Lake的性能优势,借助这些特性能够大幅提升Spark SQL的查询性能,加快Delta表的查询速度。
313 0
Databricks 企业版 Spark&Delta Lake 引擎助力 Lakehouse 高效访问
|
存储 分布式计算 关系型数据库
基于MinIO/Deleta Lake/Dremio和Superset或Metabase搭建简单的数据湖
基于MinIO/Deleta Lake/Dremio和Superset或Metabase搭建简单的数据湖
1452 0
基于MinIO/Deleta Lake/Dremio和Superset或Metabase搭建简单的数据湖
|
SQL 分布式计算 大数据
Delta Lake Presto Integration & Manifests 机制
Delta 0.5 已于上周发布,增加了不少新特性,这篇文章主要讲解其 Presto Integration 和 Manifests 机制。
Delta Lake Presto Integration & Manifests 机制
|
SQL 分布式计算 数据管理
Delta Lake 平台化实践(离线篇)
本文是在 Delta Lake 0.4 与 Spark 2.4 集成、平台化过程中的一些实践与思考
Delta Lake 平台化实践(离线篇)
|
编解码 关系型数据库 MySQL
2020年3月12日数据湖分析服务Data Lake Analytics发布计算引擎与控制台多项优化及改进
欢迎大家使用数据湖分析(DLA),DLA不仅仅便宜,且快,且方便,专为阿里云数据湖分析方案而生。
2020年3月12日数据湖分析服务Data Lake Analytics发布计算引擎与控制台多项优化及改进
|
数据采集 消息中间件 存储
【译】Databricks使用Spark Streaming和Delta Lake对流式数据进行数据质量监控介绍
本文主要对Databricks如何使用Spark Streaming和Delta Lake对流式数据进行数据质量监控的方法和架构进行了介绍,本文探讨了一种数据管理架构,该架构可以在数据到达时,通过主动监控和分析来检测流式数据中损坏或不良的数据,并且不会造成瓶颈。
【译】Databricks使用Spark Streaming和Delta Lake对流式数据进行数据质量监控介绍