OSS数据湖实践——parquet格式

本文涉及的产品
对象存储 OSS,20GB 3个月
对象存储 OSS,恶意文件检测 1000次 1年
对象存储 OSS,内容安全 1000次 1年
简介: 通过对parquet格式及json格式的对比,了解两种常用格式之间存在的异同,了解parquet 能够提高作业性能的内在机制,并且阐述其能够带来的优势。

数据组织形式、存储格式及Parquet格式介绍

在介绍parquet数据格式之前,我们先介绍数据的几种组织形式以及存储形式。
1654657_7465423457acb89f

结构化、半结构化、非结构化数据

结构化数据

结构化数据源对数据定义了一种模式。通过这些关于底层数据的额外信息,结构化数据源提供高效的存储和性能。例如,列式数据存储Parquet和ORC,使得从一个列子集中提取数据更加容易。当数据查询只需要获取一少部分列的数据时,通过遍历每行数据的方式需要查询出过多的数据。基于行的存储格式,如Avro通过高效的序列化存储数据提供了存储优势。但是,这种优势是以复杂性为代价的。例如,由于结构不够灵活,模式转换将成为挑战。

半结构化数据

半结构化数据源是每行记录一个结构,但不需要对整体记录有一个全局的模式定义。因此,每行记录是通过其自身的模式信息对其进行扩充。JSON和XML就是其中最流行的例子。半结构化数据的优势在于通过每行记录自身的描述信息,增强了展示数据信息的灵活性。由于有很多轻量级的解析器用于处理这些记录,因此半结构化数据格式在很多应用中普遍被使用,并且在可读性上存在优势。但是,它的主要缺陷也在于会产生额外的解析开销,不能专门应用于即席查询。

非结构化数据

相比之下,非结构化数据源是任意格式的文本或不包含标记或元数据的二进制对象(例如以逗号分隔的CSV文件)来组织数据。新闻文章,医疗记录,图像斑点,应用日志经常被当成是非结构化数据。这些数据源分类一般需要根据数据的上下文才能解析。因此,需要清楚知道某个文件是图片还是新闻,才能正确进行解析。大多数数据源都是非结构化的,要从这些非结构化的数据中获取数据价值,由于其格式本身的笨重,需要经过大量转换和特征提取技术去解释这些数据集,成本较高。

列式存储、行式存储

列式存储

列式存储(Column-oriented Storage)并不是一项新技术,最早可以追溯到 1983 年的论文 Cantor。然而,受限于早期的硬件条件和使用场景,主流的事务型数据库(OLTP)大多采用行式存储,直到近几年分析型数据库(OLAP)的兴起,列式存储这一概念又变得流行。
总的来说,列式存储的优势一方面体现在存储上能节约空间、减少 IO,另一方面依靠列式数据结构做了计算上的优化。

行式存储

行式存储通过逐行组织数据,所有的数据在存储介质上通过首位相连、逐条存储,行式存储是一种传统的组织数据的方法。

parquet格式介绍

Apache Parquet 是Hadoop生态系统中通用的列式存储格式,独立于数据处理框架、数据模型、编程语言;
Parquet的灵感来自于2010年Google发表的Dremel论文,文中介绍了一种支持嵌套结构的存储格式,并且使用了列式存储的方式提升查询性能。

Parquet跟Json对比

对比特征 parquet json
存储形式 列式 行式
数据组织形式 结构化 半结构化
文件大小
读取速度

Json转Parquet代码

如果有大批量的Json格式数据需要转为Parquet格式数据,参考以下代码;

import os
import multiprocessing
from json2parquet import convert_json
def split_file(file_name, path):
    result_path = "parquet/"
    file_path = path + file_name
    res_path = result_path + file_name + ".parquet"
    convert_json(file_path, res_path)
def main():
    path = "data/"
    file_list = os.listdir(path)
    pool = multiprocessing.Pool(processes=20)
    for file_name in file_list:
        pool.apply_async(split_file, (file_name, path,))
    pool.close()
    pool.join()
    
if __name__ == '__main__':
    main()

Parquet格式运行任务

使用parquet数据格式,来运行作业,使用spark read api中的parquet接口;
其中包括可以读指定的单个文件,或者一组文件;

spark.read.parquet("your parquet file or files")

读取单个parquet 文件方法

/**
   * Loads a Parquet file, returning the result as a `DataFrame`. See the documentation
   * on the other overloaded `parquet()` method for more details.
   *
   * @since 2.0.0
   */
  def parquet(path: String): DataFrame = {
    // This method ensures that calls that explicit need single argument works, see SPARK-16009
    parquet(Seq(path): _*)
  }

读取一组paruqet 文件方法

/**
   * Loads a Parquet file, returning the result as a `DataFrame`.
   *
   * You can set the following Parquet-specific option(s) for reading Parquet files:
   * <ul>
   * <li>`mergeSchema` (default is the value specified in `spark.sql.parquet.mergeSchema`): sets
   * whether we should merge schemas collected from all Parquet part-files. This will override
   * `spark.sql.parquet.mergeSchema`.</li>
   * </ul>
   * @since 1.4.0
   */
  @scala.annotation.varargs
  def parquet(paths: String*): DataFrame = {
    format("parquet").load(paths: _*)
  }

简单作业使用parquet数据源示例

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.row_number
object OSSExample {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      .appName("OSSExample")
      .getOrCreate()
    val data=spark.read.parquet.load("oss://your-bucket-name/parquet file")
    val data1 = data.groupBy("subject", "level").count()
    val window = Window.partitionBy("subject").orderBy(org.apache.spark.sql.functions.col("count").desc)
    val data2 = data1.withColumn("topn", row_number().over(window)).where("topn <= 1" )
    data2.write.format("parquet").save("your store path")
  }
}

作业性能对比

数据格式 计算用时 读OSS流量 读OSS次数 写OSS流量
json 15min 1384G 1387458 12M
parquet 9min 104G 286029 12M

Parquet的优势

1、可以跳过不符合条件的数据,只读取需要的数据,降低 IO 数据量,提升作业运行性能;
2、压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码(例如 Run Length Encoding 和 Delta Encoding)进一步节约存储空间;这样能够更少的使用OSS存储空间,减少数据存储成本;
3、只读取需要的列,支持向量运算,能够获取更好的扫描性能。

参考资料

Sql Or NoSql,看完这一篇你就懂了
一分钟搞懂列式与行式数据库
列式存储和行式存储的区别
spark sql加载parquet格式和json格式数据
新一代列式存储格式Parquet
Spark2.1中用结构化流处理复杂的数据格式(译)
处理海量数据:列式存储综述(存储篇)
Row vs Column Oriented Databases

相关实践学习
借助OSS搭建在线教育视频课程分享网站
本教程介绍如何基于云服务器ECS和对象存储OSS,搭建一个在线教育视频课程分享网站。
目录
相关文章
|
1月前
|
消息中间件 监控 Kafka
Yotpo构建零延迟数据湖实践
Yotpo构建零延迟数据湖实践
30 0
|
1月前
|
消息中间件 存储 数据采集
在线房产公司Zillow数据迁移至数据湖实践
在线房产公司Zillow数据迁移至数据湖实践
38 0
|
1月前
|
存储 分布式计算 关系型数据库
初创电商公司Drop的数据湖实践
初创电商公司Drop的数据湖实践
36 0
|
1月前
|
存储 SQL 分布式计算
Apache Hudi在Linkflow构建实时数据湖的生产实践
Apache Hudi在Linkflow构建实时数据湖的生产实践
38 0
|
1月前
|
存储 分布式计算 分布式数据库
字节跳动基于Apache Hudi构建EB级数据湖实践
字节跳动基于Apache Hudi构建EB级数据湖实践
24 2
|
1月前
|
存储 SQL 数据管理
字节跳动基于Apache Hudi构建实时数据湖平台实践
字节跳动基于Apache Hudi构建实时数据湖平台实践
38 0
|
1月前
|
SQL 关系型数据库 HIVE
KLOOK客路旅行基于Apache Hudi的数据湖实践
KLOOK客路旅行基于Apache Hudi的数据湖实践
45 2
KLOOK客路旅行基于Apache Hudi的数据湖实践
|
2月前
|
存储 运维 监控
飞书深诺基于Flink+Hudi+Hologres的实时数据湖建设实践
通过对各个业务线实时需求的调研了解到,当前实时数据处理场景是各个业务线基于Java服务独自处理的。各个业务线实时能力不能复用且存在计算资源的扩展性问题,而且实时处理的时效已不能满足业务需求。鉴于当前大数据团队数据架构主要解决离线场景,无法承接更多实时业务,因此我们需要重新设计整合,从架构合理性,复用性以及开发运维成本出发,建设一套通用的大数据实时数仓链路。本次实时数仓建设将以游戏运营业务为典型场景进行方案设计,综合业务时效性、资源成本和数仓开发运维成本等考虑,我们最终决定基于Flink + Hudi + Hologres来构建阿里云云原生实时湖仓,并在此文中探讨实时数据架构的具体落地实践。
飞书深诺基于Flink+Hudi+Hologres的实时数据湖建设实践
|
3月前
|
存储 对象存储 SQL
【获奖名单公示】Hologres实时湖仓分析挑战赛
5分钟快速使用Hologres实时湖仓能力,无需移动数据,直接加速读取存储于数据湖OSS上的Hudi、Delta、Paimon等格式类型的数据
【获奖名单公示】Hologres实时湖仓分析挑战赛
|
6月前
|
存储 运维 监控
OSS生命周期管理与访问行为分析实践
认证考试:OSS生命周期管理与访问行为分析实践