Flink入坑指南第五章 - 语法糖 view

2019-03-01 18:59:30 555

Flink入坑指南系列文章,从实际例子入手,一步步引导用户零基础入门实时计算/Flink,并成长为使用Flink的高阶用户。本文属个人原创,仅做技术交流之用,笔者才疏学浅,如有错误,欢迎指正。

什么是view(视图):
视图无非就是存储在数据库中并具有名字的 SQL 语句,或者说是以预定义的 SQL 查询的形式存在的数据表的成分。视图可以包含表中的所有列,或者仅包含选定的列。视图可以创建自一个或者多个表,这取决于创建该视图的 SQL 语句的写法。
视图,一种虚拟的表,允许用户执行以下操作:

  • 以用户或者某些类型的用户感觉自然或者直观的方式来组织数据;
  • 限制对数据的访问,从而使得用户仅能够看到或者修改(某些情况下)他们需要的数据;
  • 从多个表中汇总数据,以产生报表。

(引自:极客学院)

Flink SQL兼容标准SQL,view的作用与标准SQL相同,有几个特点:

  • 在Flink SQL中,view是一种临时表
  • 与标准SQL一样,视图可以创建自一个或多个表/视图
  • 视图的结果不会进行持久化,仅作为计算的中间结果进行传输
  • 视图的数据也可以被输出到结果表中

Flink SQL中,视图的语法非常简单,可参考:view语法。接下来我们通过一些例子来实际感受一下视图的作用。

假设在IoT场景中,要过滤出两个厂房中的传感器的异常数据。两个厂房的数据分别发到了datahub的两个不同topic,需要将两个datahub topic中异常数据过滤出来,再汇总。
原始数据结构如下:

  • date
  • hour
  • ip: device ip
  • event_id: 

DDL -- 定义输入输出数据的数据结构,具体语法请参见 datahub源表/结果表语法,维表相关语法详见Flink SQL维表语法

-- source1 定义厂房1的topic的数据结构
create table fab1(
  `date` int,
  hour int,
  ip varchar,
  event_id BIGINT
) with (
  type='datahub',
  endPoint='xxxxxxxxx',
  project='xxxxxxxxxx',
  topic='topic1',
  accessId='xXXXXXXXX',
  accessKey='XXXXXXXXX'); 
  
 -- source2 定义厂房2的topic的数据结构
  create table fab2(
  `date` int,
  hour int,
  ip varchar,
  event_id BIGINT
) with (
  type='datahub',
  endPoint='xxxxxxxxx',
  project='xxxxxxxxxx',
  topic='topic2',
  accessId='xXXXXXXXX',
  accessKey='XXXXXXXXX');
  
  -- 定义结果表1的数据结构
  create table sink(
  `date` int,
  hour int,
  event_id bigint,
  event_cnt bigint
  ) with (
  type='datahub',
  endPoint='xxxxxxxxx',
  project='xxxxxxxxxx',
  topic='topic2',
  accessId='xXXXXXXXX',
  accessKey='XXXXXXXXX');
  
  -- 定义结果表2的数据结构
  create table sink(
  `date` int,
  hour int,
  event_id bigint,
  event_cnt bigint
  ) with (
  type='rds',
  url='xxxxxx',
  tableName='xxxxxx',
  userName='xxxxxx',
  password='xxxxxx'
);

  -- 维表
  CREATE TABLE device_whitelist (
  ip varchar,
  category varchar,
  PRIMARY KEY (ip),  -- 用作维表时,必须有声明的主键。
  PERIOD FOR SYSTEM_TIME  -- 定义维表的变化周期
) with (
  type = 'rds',
  ...
)

写法一,按照批处理系统/数据库的思维来看,这个需求非常简单:

insert into sink
select e.`ip`,e.`hour`,e.`date`,e.`event_id` from 
(
  select * from fab1
  where event_id='00001'
  union 
  select * from fab2
  where event_id='00001'
) e
JOIN device_whitelist FOR SYSTEM_TIME AS OF PROCTIME() AS d
ON e.`ip` = d.`ip`

写法二,使用view,将各个复杂SQL模块拆开:

-- 
CREATE VIEW view1(`date`,`hour`,`ip`,`event_id`) AS
SELECT * FROM fab1
WHERE event_id='00001'
UNION 
SELECT * FROM fab2
WHERE event_id='00001'

-- 
CREATE VIEW view2(`date`,`hour`,`ip`,`event_id`) AS
SELECT e.`date`,e.`hour`,e.`ip`,e.`event_id` FROM view1 e
JOIN device_whitelist FOR SYSTEM_TIME AS OF PROCTIME() AS d
ON e.`ip` = d.`ip`

-- INSERT INTO sink1
INSERT INTO sink1
SELECT * FROM view2

-- INSERT INTO sink2
INSERT INTO sink2
SELECT * FROM view1

Flink中SQL的数据是不断动态变化的,特别是涉及到一些特殊语法(如window级连/嵌套等),需要分步调试每个SQL模块的结果。如果用写法一,会大大增加SQL调试难度。因此,使用Flink SQL,建议使用第二种写法,用view将各个语法块串联,方便调试和排查问题。写法一和写法二最终生成的作业DAG图都是一样的,没有任何区别。一个Flink SQL作业可以同时定义多个输出表,结果可同时被输出到多种数据源中。

如果在使用实时计算产品过程中有任何问题,欢迎在博客下方回复交流。

hadoop SQL spark aliyun IP stream Create html type varchar view Blink bigdata flink Streaming

作者

小白薇薇
TA的文章

相关文章