[雪峰磁针石博客]大数据Hadoop工具python教程9-Luigi工作流

简介: 管理Hadoop作业的官方工作流程调度程序是Apache Oozie。与许多其他Hadoop产品一样,Oozie是用Java编写的,是基于服务器的Web应用程序,它运行执行Hadoop MapReduce和Pig的工作流作业。

管理Hadoop作业的官方工作流程调度程序是Apache Oozie。与许多其他Hadoop产品一样,Oozie是用Java编写的,是基于服务器的Web应用程序,它运行执行Hadoop MapReduce和Pig的工作流作业。 Oozie工作流是在XML文档中指定的控制依赖性指导非循环图(DAG)中排列的动作集合。虽然Oozie在Hadoop社区中有很多支持,但通过XML属性配置工作流和作业的学习曲线非常陡峭。

Luigi是Spotify创建的Python替代方案,可以构建和配置复杂的批处理作业管道。它处理依赖项解析,工作流管理,可视化等等。它还拥有庞大的社区,并支持许多Hadoop技术。在github上超过1万星。

本章介绍Luigi的安装和工作流程的详细说明。

安装

pip install luigi

工作流

在Luigi中,工作流由一系列操作组成,称为任务。 Luigi任务是非特定的,也就是说,它们可以是任何可以用Python编写的东西。任务的输入和输出数据的位置称为目标(target)。目标通常对应于磁盘上,HDFS上或数据库中的文件位置。除了任务和目标之外,Luigi还利用参数来自定义任务的执行方式。

  • 任务

任务是构成Luigi工作流的操作序列。每个任务都声明其依赖于其他任务创建的目标。这样Luigi能够创建依赖链。

图片.png

  • 目标

目标是任务的输入和输出。最常见的目标是磁盘上的文件,HDFS中的文件或数据库中的记录。 Luigi包装了底层文件系统操作,以确保与目标的交互是原子的。这允许从故障点重放工作流,而不必重放任何已经成功完成的任务。

  • 参数

参数允许通过允许值从命令行,以编程方式或从其他任务传递任务来自定义任务。例如,任务输出的名称可以通过参数传递给任务的日期来确定。

参考资料

工作流本示例

#!/usr/bin/env python
# 项目实战讨论QQ群630011153 144081101
# https://github.com/china-testing/python-api-tesing
import luigi

class InputFile(luigi.Task):
   """
   A task wrapping a Target 
   """
   input_file = luigi.Parameter()

   def output(self):
      """
      Return the target for this task
      """
      return luigi.LocalTarget(self.input_file)

class WordCount(luigi.Task):
   """
   A task that counts the number of words in a file
   """
   input_file = luigi.Parameter()
   output_file = luigi.Parameter(default='/tmp/wordcount')

   def requires(self):
      """
      The task's dependencies:
      """
      return InputFile(self.input_file)

   def output(self):
      """
      The task's output
      """
      return luigi.LocalTarget(self.output_file)

   def run(self):
      """
      The task's logic
      """
      count = {}

      ifp = self.input().open('r')

      for line in ifp:
         for word in line.strip().split():
            count[word] = count.get(word, 0) + 1

      ofp = self.output().open('w')
      for k, v in count.items():
            ofp.write('{}\t{}\n'.format(k, v))
      ofp.close()

if __name__ == '__main__':
   luigi.run()

执行

$ python wordcount.py WordCount --local-scheduler --input-file /home/hduser_/input2.txt --output-file /tmp/wordcount2.txt
DEBUG: Checking if WordCount(input_file=/home/hduser_/input2.txt, output_file=/tmp/wordcount2.txt) is complete
DEBUG: Checking if InputFile(input_file=/home/hduser_/input2.txt) is complete
INFO: Informed scheduler that task   WordCount__home_hduser__in__tmp_wordcount2__a94efba0f2   has status   PENDING
INFO: Informed scheduler that task   InputFile__home_hduser__in_0eced493f7   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 21592] Worker Worker(salt=067173106, workers=1, host=andrew-PC, username=hduser_, pid=21592) running   WordCount(input_file=/home/hduser_/input2.txt, output_file=/tmp/wordcount2.txt)
INFO: [pid 21592] Worker Worker(salt=067173106, workers=1, host=andrew-PC, username=hduser_, pid=21592) done      WordCount(input_file=/home/hduser_/input2.txt, output_file=/tmp/wordcount2.txt)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   WordCount__home_hduser__in__tmp_wordcount2__a94efba0f2   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=067173106, workers=1, host=andrew-PC, username=hduser_, pid=21592) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 1 complete ones were encountered:
    - 1 InputFile(input_file=/home/hduser_/input2.txt)
* 1 ran successfully:
    - 1 WordCount(input_file=/home/hduser_/input2.txt, output_file=/tmp/wordcount2.txt)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

hduser_@andrew-PC:/home/andrew/code/HadoopWithPython/python/Luigi$ cat /tmp/wordcount2.txt
jack    2
be    2
nimble    1
quick    1
相关实践学习
简单用户画像分析
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps 
相关文章
|
4天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
4天前
|
测试技术 开发者 Python
Python中的装饰器:优雅而强大的函数修饰工具
在Python编程中,装饰器是一种强大的工具,用于修改函数或方法的行为。本文将深入探讨Python中装饰器的概念、用法和实际应用,以及如何利用装饰器实现代码的优雅和高效。
|
18天前
|
机器学习/深度学习 人工智能 数据可视化
基于Python的数据可视化技术在大数据分析中的应用
传统的大数据分析往往注重数据处理和计算,然而数据可视化作为一种重要的技术手段,在大数据分析中扮演着至关重要的角色。本文将介绍如何利用Python语言中丰富的数据可视化工具,结合大数据分析,实现更直观、高效的数据展示与分析。
|
22天前
|
数据采集 搜索推荐 数据挖掘
使用Python制作一个批量查询搜索排名的SEO免费工具
最近工作中需要用上 Google SEO(搜索引擎优化),有了解过的朋友们应该都知道SEO必不可少的工作之一就是查询关键词的搜索排名。关键词少的时候可以一个一个去查没什么问题,但是到了后期,一个网站都有几百上千的关键词,你再去一个一个查,至少要花费数小时的时间。 虽然市面上有很多SEO免费或者收费工具,但免费的基本都不能批量查,网上免费的最多也就只能10个10个查询,而且查询速度很慢。收费的工具如Ahrefs、SEMrush等以月为单位收费最低也都要上百美刀/月,当然如果觉得价格合适也可以进行购买,毕竟这些工具的很多功能都很实用。今天我给大家分享的这个排名搜索工具基于python实现,当然肯定
35 0
|
22天前
|
XML Shell Linux
性能工具之 JMeter 使用 Python 脚本快速执行
性能工具之 JMeter 使用 Python 脚本快速执行
37 1
性能工具之 JMeter 使用 Python 脚本快速执行
|
22天前
|
数据可视化 数据挖掘 Python
Python中的数据可视化工具Matplotlib简介与实践
在本文中,我们将介绍Python中常用的数据可视化工具Matplotlib,包括其基本概念、常用功能以及实际应用。通过学习Matplotlib,读者可以更好地理解和运用数据可视化技术,提升数据分析与展示的能力。
|
26天前
|
Web App开发 前端开发 JavaScript
Python Selenium是一个强大的自动化测试工具
Python Selenium是一个强大的自动化测试工具
|
2月前
|
分布式计算 DataWorks IDE
MaxCompute数据问题之忽略脏数据如何解决
MaxCompute数据包含存储在MaxCompute服务中的表、分区以及其他数据结构;本合集将提供MaxCompute数据的管理和优化指南,以及数据操作中的常见问题和解决策略。
46 0
|
2月前
|
SQL 存储 分布式计算
MaxCompute问题之下载数据如何解决
MaxCompute数据包含存储在MaxCompute服务中的表、分区以及其他数据结构;本合集将提供MaxCompute数据的管理和优化指南,以及数据操作中的常见问题和解决策略。
35 0
|
2月前
|
分布式计算 关系型数据库 MySQL
MaxCompute问题之数据归属分区如何解决
MaxCompute数据包含存储在MaxCompute服务中的表、分区以及其他数据结构;本合集将提供MaxCompute数据的管理和优化指南,以及数据操作中的常见问题和解决策略。
33 0