How is data inserted into Presto?

简介: ![Data Insertion Journey](http://ata2-img.cn-hangzhou.img-pub.aliyun-inc.com/c83161a4683d9bb134ea2b60c0ddd811.png) ## Overview We know that there is an interesting question interviewer likes to

Data Insertion Journey

Overview

We know that there is an interesting question interviewer likes to ask:

Tell me what happens after I click the web browser's Go button util we see the response page?

The interesting part of the question is that you have to understand the whole process of how a http request/response works before you can answer this question. Today I will use the similar scheme to describe how a row of data is inserted into Presto.

The Presto Data Insertion Story

We know that Presto is a superb query engine that supports querying Peta bytes of data in seconds, actually it also supports INSERT statement as long as your connector implemented the Sink related SPIs, today we will introduce data inserting using the Hive connector as an example.

To make the story telling simpler, lets first construct the following scenario: we have a table named person which has the following schema:

CREATE EXTERNAL TABLE person (
    id int,
    name string,
    age int
) 
PARTITIONED BY (dt string);

now we issue an INSERT statement to insert a row of data into the table person:

insert into person values (1, 'james', 10, '20190301');

The following diagram depicts the things happened behind the scene:

Data Insertion Process

  1. Determine where to write the data.
  2. Write the data.
  3. Write the metadata.
  4. Commit and finalize all the changes.

In the later sections I will introduce each step in detail.

Determine where to write the data

The first thing Presto need to figure out is where to write the data, in Hive, it is of course written to the HDFS, the actual file path is determined by Metadata#beginInsert, lets take a look at its implementation:

SchemaTableName tableName = schemaTableName(tableHandle);
// query the table metadata
Optional<Table> table = metastore.getTable(
    tableName.getSchemaName(), tableName.getTableName());
...
// determine the data format to write
HiveStorageFormat tableStorageFormat 
  = extractHiveStorageFormat(table.get());
LocationHandle locationHandle = locationService
  .forExistingTable(metastore, session, table.get());
// determine the HDFS file path to write the data
WriteInfo writeInfo = locationService.getQueryWriteInfo(locationHandle);
...

All these information: table format, file path to write etc is encapsulated into an HiveInsertTableHandle object which will be used by the later step.
In the WriteInfo object, there are two pathes: targetPath and writePath:

class WriteInfo
{
    private final Path targetPath;
    private final Path writePath;
    private final LocationHandle.WriteMode writeMode;
    ...
}

targetPath is where the data should finally land while writePath tells us where to write the data for the moment, because sometimes we need to first write data into staging directory first to achieve reliable, reversible data insertion.

The following diagram depicts how the data is moved between several directories during an insert overwrite operation:

Data Files Movement During Insert Overwrite

As we can see from the diagram the data is first inserted into a staging directory, then the old data in the target directory is moved to a temp backup directory, then we move the data from staging directory to the target directory, if everything goes well, we can delete the data from the temp backup directory now.

We know Metadata#beginInsert determined where to write the data, do you know when is Metadata#beginInsert is called? you might think it is called in the early phase just after query is planned, but actually it is not, it is called during the planing phase by an optimizer rule: BeginTableWrite, sounds weird, right? No wonder that the javadoc of BeginTableWrite stated the Major Hack Alert:

/*
 * Major HACK alert!!!
 *
 * This logic should be invoked on query start, not during planning. At that point, the token
 * returned by beginCreate/beginInsert should be handed down to tasks in a mapping separate
 * from the plan that links plan nodes to the corresponding token.
 */

Write the data

After where to write data is determined, HiveInsertTableHandle is built, the query starts to run, and HivePageSink do all the real work to write data, the whole job acts like an Operator DAG, the upstream operator generates data and the downstream operator pulls it from upstream operator, process it and generates new data to even downstream operators.

Operators

In our simple demo, the data source is the ValueOperator, it provides the single one row of data:

[1, 'james', 10]

TableWriteOperator pulls the data from ValuesOperator, it delegates the write logic to HivePageSink, HivePageSink will call HDFS FileSystem related logic to persist the data, in the meantime, it will collect some stats which will later be output by TableWriteOpeartor, more specifically, it will collect the written row count, and all the written partition information. This is the output of TableWriteOpeartor.

Write the metadata

After TableWriteOperator finish its work, not everything is done, it only persisted the data, but not the metadata: the data is there, but it is not in Presto & Hive's metadata repository, in Metadata#finishInsert it will do some check, e.g. whether the table is dropped during insertion?

Optional<Table> table = metastore.getTable(
  handle.getSchemaName(), handle.getTableName());
if (!table.isPresent()) {
    throw new TableNotFoundException(
      new SchemaTableName(handle.getSchemaName(),
      handle.getTableName())
    );
}

Whether the table format is changed during insertion?

if (!table.get().getStorage().getStorageFormat().getInputFormat()
  .equals(tableStorageFormat.getInputFormat()) && respectTableFormat) {
    throw new PrestoException(
      HIVE_CONCURRENT_MODIFICATION_DETECTED, 
      "Table format changed during insert"
  );
}

Whether the partition has been concurrently modified during this insertion?

if (partition.isNew() != firstPartition.isNew() ||
    !partition.getWritePath().equals(firstPartition.getWritePath()) ||
    !partition.getTargetPath().equals(firstPartition.getTargetPath())) {
    throw new PrestoException(
      HIVE_CONCURRENT_MODIFICATION_DETECTED,
      format(
        "Partition %s was added or modified during INSERT" +
                "([isNew=%s, writePath=%s, targetPath=%s] vs [isNew=%s, writePath=%s, targetPath=%s])",
        ...
      )
    );
}

If everything goes well, all checks pass, Presto will calculate all the metadata changes we need to do further. These operations are reversible.

Commit

In the previous step, Presto only calculated all the metadata change operations, they are not actually executed.

In this step, Presto will actually run all the metadata change operations, it will do it in a 2-phase-commit style:

  1. From the reversible operations, Presto will calculate the irreversible operations to execute and initiate the corresponding data files movement in Hadoop FileSystem.
  2. Wait for all the data files movement finish.
  3. Run all the irreversible operations to finalize the metadata change.

Reversible VS Irreversible

Reversible means for everything we do we have some way to roll it back, for example, for operations like move data from path a to b, to be reversible, we can backup the data of b before moving the data. Irreversible means just the opposite.

Before the step 3, all operations are reversible, after that, all operations are irreversible, i.e. system might result in an inconsistent state if something bad happens, Presto will try it best to clean things up, but consistency is not guaranteed anymore.

Summary

In this article, we introduced the process of how a row of data is inserted into Presto, we can see that Presto & Hive Connector has done quite a lot of nice work in order to make things right, e.g. The 2-phase-commit of the metadata, it is really good code to read, I have enjoyed it. Hope you enjoyed reading this article too.

目录
相关文章
|
7月前
|
SQL 分布式计算 Hadoop
Hive使用Impala组件查询(1)
Hive使用Impala组件查询(1)
158 0
|
3月前
|
缓存 NoSQL 数据库
Flink cdc到doris,starrocks,table store
Flink cdc到doris,starrocks,table store
|
4月前
|
NoSQL MongoDB 数据安全/隐私保护
Flink CDC支持MongoDB的CDC(Change Data Capture)连接器
Flink CDC支持MongoDB的CDC(Change Data Capture)连接器
118 4
|
5月前
|
SQL HIVE
51 Hive的Load操作
51 Hive的Load操作
55 0
|
7月前
|
SQL 存储 Java
Hive使用Impala组件查询(2)
Hive使用Impala组件查询(2)
85 0
|
OLAP 分布式数据库 Apache
《Apache Kylin on HBase extreme OLAP for big data》电子版地址
Apache Kylin on HBase: extreme OLAP for big data
81 0
《Apache Kylin on HBase extreme OLAP for big data》电子版地址
|
存储 消息中间件 SQL
Streaming Data Warehouse 存储:需求与架构
Apache Flink Table Store 项目正在开发中,欢迎大家试用和讨论。
Streaming Data Warehouse 存储:需求与架构
|
SQL Java Maven
Presto - Failed to load driver class com.facebook.presto.jdbc.PrestoDriver in either of HikariConfig
Presto - Failed to load driver class com.facebook.presto.jdbc.PrestoDriver in either of HikariConfig
128 0
|
消息中间件 JSON 分布式计算
Apache Doris Stream load 数据导入方式
Stream load 是一个同步的导入方式,用户通过发送 HTTP 协议发送请求将本地文件或数据流导入到 Doris 中。Stream load 同步执行导入并返回导入结果。用户可直接通过请求的返回消息判断本次导入是否成功。
1041 0
Apache Doris Stream load 数据导入方式