OSS数据湖实践——EMR + Flink + OSS案例

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
对象存储 OSS,20GB 3个月
对象存储 OSS,恶意文件检测 1000次 1年
简介: 构建基于OSS数据源的EMR大数据计算环境,使用Flink大数据计算引擎,实现简单的大数据分析案例。

本文介绍使用Flink大数据分析引擎,基于EMR,利用OSS云存储数据,实现一个分析案例。
前提条件
• 已注册阿里云账号,详情请参见注册云账号。
• 已开通E-MapReduce服务和OSS服务。
• 已完成云账号的授权,详情请参见角色授权。
• 已创建Haoop集群,且带有spark组件。
• 相关更多配置请参考OSS入门文档。

步骤一:数据上传至oss

hadoop fs -put course2.csv oss://your-bucket-name/

步骤二:编写处理代码,及打包


package org.myorg.quickstart

import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.TableEnvironment

object OSSExample {

  def main(args: Array[String]) {
    // set up the batch execution environment

    case class Course(Id : Int, Subject : String, Level : String)
    val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = BatchTableEnvironment.create(env)
    val data: DataSet[(Long, String, String)] = env.readCsvFile("oss://your-bucket-name/course.csv")
    val  course = tableEnv.fromDataSet[(Long, String, String)](data, 'id, 'subject, 'level)
    val  counts = course.groupBy("subject, level").select("subject, level, level.count as cnt")
    val  maxcounts = counts.groupBy("subject").select("subject as subject1, cnt.max as cnt1")
    val result = maxcounts.leftOuterJoin(counts, "cnt=cnt1").select("subject, level, cnt")
    result.toDataSet[(String, String, Long)].print()
  }
}

IDEA Build -> Build Artifact ->Build 打包为OSSFlinkExample jar包

步骤三:上传jar包到Hadoop 或者OSS

把jar 上传到集群header节点,然后使用以下命令

hadoop fs -put OSSExample.jar oss://your-bucket-name/

步骤四:创建FLink作业job,运行作业

1589441726617_2f91171e_9a01_404a_ac74_2e998d9c3d2d

run -m yarn-cluster  -yjm 1024 -ytm 1024 -yn 4 -ys 4 -ynm flink-oss-sample -c org.myorg.quickstart.OSSExample  ossref://your-bucket-name/OSSFlinkExample.jar

步骤五:查看作业运行是否成功及查看运行结果

1589442550574_5b094f4d_cbc1_4c40_baea_b875bff45021
1589442588976_7872ee42_c97a_4082_a071_974c02006c98

总结

通过以上步骤,可以了解spark 处理OSS数据源的整个过程,这将对后续其他任务作业开发带来初步的参考。

相关实践学习
借助OSS搭建在线教育视频课程分享网站
本教程介绍如何基于云服务器ECS和对象存储OSS,搭建一个在线教育视频课程分享网站。
目录
相关文章
|
4月前
|
SQL 消息中间件 Kafka
流数据湖平台Apache Paimon(二)集成 Flink 引擎
流数据湖平台Apache Paimon(二)集成 Flink 引擎
395 0
|
3天前
|
存储 开发工具 对象存储
Javaweb之SpringBootWeb案例之阿里云OSS服务入门的详细解析
Javaweb之SpringBootWeb案例之阿里云OSS服务入门的详细解析
11 0
|
5月前
|
分布式计算 Java API
Flink教程(04)- Flink入门案例
Flink教程(04)- Flink入门案例
58 0
|
1月前
|
SQL 关系型数据库 MySQL
Flink CDC + Hudi + Hive + Presto构建实时数据湖最佳实践
Flink CDC + Hudi + Hive + Presto构建实时数据湖最佳实践
144 0
|
1月前
|
SQL 消息中间件 Kafka
使用 Apache Flink 和 Apache Hudi 创建低延迟数据湖管道
使用 Apache Flink 和 Apache Hudi 创建低延迟数据湖管道
42 3
使用 Apache Flink 和 Apache Hudi 创建低延迟数据湖管道
|
2月前
|
存储 运维 监控
飞书深诺基于Flink+Hudi+Hologres的实时数据湖建设实践
通过对各个业务线实时需求的调研了解到,当前实时数据处理场景是各个业务线基于Java服务独自处理的。各个业务线实时能力不能复用且存在计算资源的扩展性问题,而且实时处理的时效已不能满足业务需求。鉴于当前大数据团队数据架构主要解决离线场景,无法承接更多实时业务,因此我们需要重新设计整合,从架构合理性,复用性以及开发运维成本出发,建设一套通用的大数据实时数仓链路。本次实时数仓建设将以游戏运营业务为典型场景进行方案设计,综合业务时效性、资源成本和数仓开发运维成本等考虑,我们最终决定基于Flink + Hudi + Hologres来构建阿里云云原生实时湖仓,并在此文中探讨实时数据架构的具体落地实践。
飞书深诺基于Flink+Hudi+Hologres的实时数据湖建设实践
|
4月前
|
数据可视化 JavaScript 关系型数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化
44 0
|
4月前
|
SQL 消息中间件 关系型数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(四)实时计算需求及技术方案
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(四)实时计算需求及技术方案
76 0
|
4月前
|
SQL 消息中间件 分布式数据库
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析
63 0
|
4月前
|
消息中间件 存储 数据采集
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源
基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(二)数据源
53 0