【最佳实践】实时计算Flink在在线教育行业的实时数仓建设实践

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 在线教育是运用互联网、人工智能等现代信息技术进行教与学互动的新型教育方式,是教育服务的重要组成部分。

行业背景

  • 行业现状: 

    • 在线教育是运用互联网、人工智能等现代信息技术进行教与学互动的新型教育方式,是教育服务的重要组成部分。发展在线教育,有利于构建网络化、数字化、个性化、终身化的教育体系,有利于建设“人人皆学、处处能学、时时可学”的学习型社会。
  • 大数据在其行业中的作用:

    • 对未来客户的画像更加精准,营销推广时可以对接更好的服务并提升成交转化率(提升ROI不一定,这涉及到外部竞争);
    • 更全面的评估老师、学生、机构、行业等在线教育行业的各个参与者;
    • 大数据帮助在线教育行业更快发展

业务场景

某公司开发了个在线教育类APP,培训机构可以在APP中会发布一些直播课程,离线课程,习题,学习文章等内容。用户可在线学习新知识,离线巩固已学知识,并对学过的内容进行课后练习/测试。
业务的构建涉及到几部分:

  1. APP:应用程序,用户访问入口
  2. 后台系统:

    1. 教学老师:通过分析学生课堂参与情况,提供不同的授课方案,因材施教。
    2. 运维人员:通过运维监控指标,实时监控在线教育直播网络质量。
    3. 运营人员:根据学生注册、学习质量、平台成单量等统计信息针对性开展平台运营工作:

      1. 学生办理注册、增删课程等操作;
      2. 学生学习质量审核;
      3. 平台指标查看,如平台日成单量统计。

技术架构

image.png
架构解析:
数据采集:该场景中,数仓的数据来源有两部分:app的埋点至消息队列 Kafka 以及 hbase 等业务数据库的增量日志。值得注意的一点是,实时数仓往往和离线数仓配合使用,共享一套管控系统,如权限/元数据管理/调度等系统。
实时数仓架构:该场景中,整个实时数仓的ETL和BI部分的构建,全部通过 Flink + Kafka 完成,原始日志app_log_origin是从客户端直接收集上来的。然后数据处理,加维等操作后,最终输入到业务系统。

业务指标

  • 实时数据中间层

    • 学生操作日志 ETL 清洗(分析学生操作在线信令日志)

      • 获取学生移动图片操作
      • 获取学生 hover 图片操作
      • 获取学生画线操作
      • 音频播放
      • 音频暂停
      • 图文匹配错误
      • 图文匹配正确
    • 学生注册考试等级日志 ETL 清洗
  • 学生行为分析

    • 学生在线(直播)课程课堂表现统计
    • 学生离线(录播)课程学习时长统计
  • 运维/网络监控

    • 直播课程(音频)网络监控
    • 直播课程(视频)网络监控
  • 运营分析

    • 每小时不同 level 的学生注册人数统计
    • 每日课程顾问追踪统计

说明:该案例中仅包含以上场景及指标,在实际的应用场景下还包括日uv/pv,topN热门授课教师,教师授课质量、数量审核等其他指标。


业务代码

场景一:对原始日志进行实时数据清洗

学生操作日志 ETL 清洗(分析学生操作在线信令日志)

学生在直播课程中,会做一些随堂练习/测试,通过页面点击等操作形成原始埋点日志,为了很快的感知学生的学习表现(课堂表现),业务方针对不同的操作进行计分处理。为了下游有效的对数据进行处理,针对学生不同的操作,将原始数据(多层 JSON 数据)进行清洗(单层 JSON 数据),写入 kafka 中。

  • 埋点数据样例
--输入
{
    "createTime":"",
    "data":{
        "userid":"",
        "roomid":"",
        "timestamp":"",
        "role":"",
        "msgid":"",
        "msg":{
            "msgtype":"",
            "msg_data":{
                "target_id":"",
                "target_type":"",
                "action":"",
                "sub_action":"",
                "page_index":""
            }
        }
    }
}
--输出
{
    "messageCreateTime":"",
    "timeStamp":"",
    "messageTimeStamp":"",
    "userId":"",
    "roomId":"",
    "role":"",
    "msgId":"",
    "msgType":"",
    "targetId":"",
    "targetType":"",
    "action":"",
    "subAction":"",
    "pageIndex":"",
    "event":""
}
AI 代码解读

输入表

create table timeline_analysis_student_stream (
    messageKey VARBINARY,
    `message` VARBINARY,
    topic VARCHAR,
    `partition` INT,
    `offset` BIGINT,
    -- 事件时间
    `createTime` as cast(JSON_VALUE(`message`, '$.createTime')as VARCHAR),
    -- 用户 ID
    `userid` as cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.userid') as BIGINT),
    -- 教室 ID
      `roomid` as cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.roomid') as BIGINT),
    -- 操作时间
      `time_stamp` as cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.timestamp') as BIGINT),
      -- 角色 
    `role` as cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.role') as TINYINT),
    -- 消息 ID
    `msgid` as cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.msgid') as BIGINT),
    -- 消息类型
    `msg_msgType` as cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.msg') as VARCHAR), '$.msgtype') as VARCHAR),
    -- 消息目标 ID
    `msg_msgData_targetId` as cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.msg') as VARCHAR), '$.msg_data') as VARCHAR), '$.target_id') as VARCHAR),
    -- 消息目标类型
      `msg_msgData_targetType` as cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.msg') as VARCHAR), '$.msg_data') as VARCHAR), '$.target_type') as VARCHAR),
    -- 学生操作
      `msg_msgData_action` as cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.msg') as VARCHAR), '$.msg_data') as VARCHAR), '$.action') as VARCHAR),
    -- 学生次操作
      `msg_msgData_subAction` as cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.msg') as VARCHAR), '$.msg_data') as VARCHAR), '$.sub_action') as VARCHAR),
    -- PPT 页码
      `msg_msgData_pageIndex` as cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (cast(JSON_VALUE (`message`, '$.data') as VARCHAR), '$.msg') as VARCHAR), '$.msg_data') as VARCHAR), '$.page_index') as BIGINT)
) with (
    type = 'kafka011',
    topic = 'timeline_client_topic',
    `group.id` = 'timeline_analysis_student_consumer',
       ...
);
AI 代码解读

输出表

create table signal_student_classroom_internation (
   messageKey VARBINARY,
   `message` VARBINARY,
   PRIMARY KEY (messageKey)
) with (
    type = 'kafka011',
    topic = 'timeline_analysis_student',
    ...
);
AI 代码解读

业务代码

  • 获取学生移动图片操作

    • 当学生学习词性(形容词/副词),课堂小练习让学生将屏幕中出现的单词图片进行分类,学生需要移动图片进入不同的分类桶中。
INSERT INTO signal_student_classroom_internation
SELECT
 cast(messageKey as VARBINARY) as messageKey,
 cast(CONCAT('{ "messageCreateTime": "',createTime,'","timeStamp": ',NOW()*1000,',"messageTimeStamp": "',TO_TIMESTAMP(time_stamp),'","userId": ',userid,',"roomId": ',roomid,',"role": ',role,',"msgId": ',msgid,',"msgType": ',msg_msgType,',"targetId": "',msg_msgData_targetId,'","targetType": "',msg_msgData_targetType,'","action": "',msg_msgData_action,'","subAction": "',msg_msgData_subAction,'","pageIndex": ',msg_msgData_pageIndex,',"event": "STUDENT_MOVE_PICTURE"',' }') as VARBINARY) as message
FROM timeline_analysis_student_stream
WHERE
    msgid = '305' AND
    msg_msgType = '116' AND
    role = '2' AND
    msg_msgData_targetType = 'shape' AND
    msg_msgData_action = 'move';
AI 代码解读
  • 获取学生 hover 图片操作

    • 当学生学习单词时,需要学习单词读音,当学生鼠标悬停到图片时进行发音教学。
INSERT INTO signal_student_classroom_internation
SELECT
 cast(messageKey as VARBINARY) as messageKey,
 cast(CONCAT('{ "messageCreateTime": "',createTime,'","timeStamp": ',NOW()*1000,',"messageTimeStamp": "',TO_TIMESTAMP(time_stamp),'","userId": ',userid,',"roomId": ',roomid,',"role": ',role,',"msgId": ',msgid,',"msgType": ',msg_msgType,',"targetId": "',msg_msgData_targetId,'","targetType": "',msg_msgData_targetType,'","action": "',msg_msgData_action,'","subAction": "',msg_msgData_subAction,'","pageIndex": ',msg_msgData_pageIndex,',"event": "STUDENT_HOVER_PICTURE"',' }') as VARBINARY) as message
FROM timeline_analysis_student_stream
WHERE
    msgid = '305' AND
    msg_msgType = '116' AND
    role = '2' AND
    msg_msgData_targetType = 'shape' AND
    msg_msgData_action = 'mouse' AND
    msg_msgData_subAction = 'over';
AI 代码解读
  • 获取学生画线操作

    • 学生通过画线来进行随堂图文匹配练习。
INSERT INTO signal_student_classroom_internation
SELECT
 cast(messageKey as VARBINARY) as messageKey,
 cast(CONCAT('{ "messageCreateTime": "',createTime,'","timeStamp": ',NOW()*1000,',"messageTimeStamp": "',TO_TIMESTAMP(time_stamp),'","userId": ',userid,',"roomId": ',roomid,',"role": ',role,',"msgId": ',msgid,',"msgType": ',msg_msgType,',"targetId": "',msg_msgData_targetId,'","targetType": "',msg_msgData_targetType,'","action": "',msg_msgData_action,'","subAction": "',msg_msgData_subAction,'","pageIndex": ',msg_msgData_pageIndex,',"event": "STUDENT_LINE_DRAW"',' }') as VARBINARY) as message
FROM timeline_analysis_student_stream
WHERE
    msgid = '305' AND
    msg_msgType = '116' AND
    role = '2' AND
    msg_msgData_targetType = 'shape' AND
    msg_msgData_action = 'add';
AI 代码解读
  • 获取学生音频播放操作

    • 学生播放课件中的音频。

INSERT INTO signal_student_classroom_internation
SELECT
 cast(messageKey as VARBINARY) as messageKey,
 cast(CONCAT('{ "messageCreateTime": "',createTime,'","timeStamp": ',NOW()*1000,',"messageTimeStamp": "',TO_TIMESTAMP(time_stamp),'","userId": ',userid,',"roomId": ',roomid,',"role": ',role,',"msgId": ',msgid,',"msgType": ',msg_msgType,',"targetId": "',msg_msgData_targetId,'","targetType": "',msg_msgData_targetType,'","action": "',msg_msgData_action,'","subAction": "',msg_msgData_subAction,'","pageIndex": ',msg_msgData_pageIndex,',"event": "STUDENT_AUDIO_PLAY"',' }') as VARBINARY) as message
FROM timeline_analysis_student_stream
WHERE
    msgid = '305' AND
    msg_msgType = '116' AND
    role = '2' AND
    msg_msgData_targetType = 'template' AND
    msg_msgData_action = 'audio' AND
    msg_msgData_subAction = 'start';
AI 代码解读
  • 获取学生音频暂停操作

    • 学生暂停课件中的音频。
INSERT INTO signal_student_classroom_internation
SELECT
 cast(messageKey as VARBINARY) as messageKey,
 cast(CONCAT('{ "messageCreateTime": "',createTime,'","timeStamp": ',NOW()*1000,',"messageTimeStamp": "',TO_TIMESTAMP(time_stamp),'","userId": ',userid,',"roomId": ',roomid,',"role": ',role,',"msgId": ',msgid,',"msgType": ',msg_msgType,',"targetId": "',msg_msgData_targetId,'","targetType": "',msg_msgData_targetType,'","action": "',msg_msgData_action,'","subAction": "',msg_msgData_subAction,'","pageIndex": ',msg_msgData_pageIndex,',"event": "STUDENT_AUDIO_PAUSE"',' }') as VARBINARY) as message
FROM timeline_analysis_student_stream
WHERE
    msgid = '305' AND
    msg_msgType = '116' AND
    role = '2' AND
    msg_msgData_targetType = 'template' AND
    msg_msgData_action = 'audio' AND
    msg_msgData_subAction = 'pause';
AI 代码解读
  • 获取学生图文匹配错误操作

    • 连线操作后,返回给学生连线结果。会影响课堂表现分数。
INSERT INTO signal_student_classroom_internation
SELECT
 cast(messageKey as VARBINARY) as messageKey,
 cast(CONCAT('{ "messageCreateTime": "',createTime,'","timeStamp": ',NOW()*1000,',"messageTimeStamp": "',TO_TIMESTAMP(time_stamp),'","userId": ',userid,',"roomId": ',roomid,',"role": ',role,',"msgId": ',msgid,',"msgType": ',msg_msgType,',"targetId": "',msg_msgData_targetId,'","targetType": "',msg_msgData_targetType,'","action": "',msg_msgData_action,'","subAction": "',msg_msgData_subAction,'","pageIndex": ',msg_msgData_pageIndex,',"event": "STUDENT_MATCH_WRONG"',' }') as VARBINARY) as message
FROM timeline_analysis_student_stream
WHERE
    msgid = '305' AND
    msg_msgType = '116' AND
    role = '2' AND
    msg_msgData_targetId = 'match' AND
    msg_msgData_targetType = 'template' AND
    msg_msgData_action = 'match' AND
    msg_msgData_subAction = 'drop:wrong';
AI 代码解读
  • 获取学生图文匹配正确操作

    • 连线操作后,返回给学生连线结果。会影响课堂表现分数。
INSERT INTO signal_student_classroom_internation
SELECT
 cast(messageKey as VARBINARY) as messageKey,
 cast(CONCAT('{ "messageCreateTime": "',createTime,'","timeStamp": ',NOW()*1000,',"messageTimeStamp": "',TO_TIMESTAMP(time_stamp),'","userId": ',userid,',"roomId": ',roomid,',"role": ',role,',"msgId": ',msgid,',"msgType": ',msg_msgType,',"targetId": "',msg_msgData_targetId,'","targetType": "',msg_msgData_targetType,'","action": "',msg_msgData_action,'","subAction": "',msg_msgData_subAction,'","pageIndex": ',msg_msgData_pageIndex,',"event": "STUDENT_MATCH_CORRECT"',' }') as VARBINARY) as message
FROM timeline_analysis_student_stream
WHERE
    msgid = '305' AND
    msg_msgType = '116' AND
    role = '2' AND
    msg_msgData_targetId = 'match' AND
    msg_msgData_targetType = 'template' AND
    msg_msgData_action = 'match' AND
    msg_msgData_subAction = 'drop:correct';
AI 代码解读

学生注册考试等级日志 ETL 清洗

学生在 WEB/APP 页面注册时需要考试测评等级,以便后期学习对应 Level 的课程,通过 Flink 做数据清洗,将埋点到 kafka 上日志,输出到 Hbase。

  • 埋点数据样例
{
    "id":"",
    "chinese_name":"",
    "english_name":"",
    "level":"",
    "pid":"",
    "create_time":"",
    "update_time":"",
    "dept_id":""
}
AI 代码解读

输入表

create table blink_stg_activity__channel_name_dictionary_da (
    messageKey VARBINARY,
    `message` VARBINARY,
    topic VARCHAR,
    `partition` INT,
    `offset` BIGINT,
      -- ID
    id as JSON_VALUE(`message`,'$.id'),
      -- 中文名称
    chinese_name as JSON_VALUE(`message`,'$.chinese_name'),
      -- 英文名称
    english_name as JSON_VALUE(`message`,'$.english_name'),
      -- 测试登记
    level as JSON_VALUE(`message`,'$.level'),
      -- 唯一标识 ID
    pid as JSON_VALUE(`message`,'$.pid'),
      -- 创建时间
    create_time as JSON_VALUE(`message`,'$.create_time'),
      -- 更新时间
    update_time as JSON_VALUE(`message`,'$.update_time'),
      -- 部门 ID
    dept_id as JSON_VALUE(`message`,'$.dept_id')
) with (
    type = 'kafka010',
    topic = 'blink_stg_activity__channel_name_dictionary_da',
    `group.id` = 'blink_stg_activity__channel_name_dictionary_da',
    ...
);
AI 代码解读

输出表

create table blink_stg_activity__channel_name_dictionary_da_sinkhbase (
    rowkey varchar,
    id varchar,
    chinese_name varchar,
    english_name varchar,
    level varchar,
    pid varchar,
    create_time varchar,
    update_time varchar,
    dept_id varchar,
    primary key (rowkey)
) with (
    type = 'cloudhbase',
    tableName = 'channel_name_dictionary',
    ...
);
AI 代码解读

业务代码

insert into
    blink_stg_activity__channel_name_dictionary_da_sinkhbase
SELECT
    MD5(id) as rowkey,
    id ,
    chinese_name ,
    english_name ,
    level ,
    pid ,
    create_time ,
    update_time ,
    dept_id 
from
    blink_stg_activity__channel_name_dictionary_da;
AI 代码解读

场景二:学生行为分析

学生在线(直播)课程课堂表现统计

场景一中针对学生操作日志进行了清洗,该场景消费其清洗之后的数据,针对不同的用户 ID、Web 服务端 ID、角色、操作事件进行分组,开 1min 窗口,通过 count(event)聚合进行计分,求得每分钟学生在线(直播)课程的课堂表现。

  • 该指标上游数据是在学生操作日志 ETL 清洗的基础上进行统计
{
    "userId":"",
    "roomId":"",
    "role":"",
    "event":"",
    "timeStamp":""
}
AI 代码解读

输入表

create table timeline_analysis_student_mashup_stream (
    messageKey VARBINARY,
    `message` VARBINARY,
    topic VARCHAR,
    `partition` INT,
    `offset` BIGINT,
      -- 用户 ID
    `userId` as cast(JSON_VALUE (`message`, '$.userId') as BIGINT),
       -- Web 服务器 ID
    `webserverId` as cast(JSON_VALUE (`message`, '$.roomId') as BIGINT),
      -- 角色
    `role` as cast(JSON_VALUE (`message`, '$.role') as TINYINT),
    -- 操作事件
    `event` as cast(JSON_VALUE (`message`, '$.event') as VARCHAR),
      -- 事件时间
    time_stamp as TO_TIMESTAMP(cast(JSON_VALUE (`message`, '$.timeStamp') as BIGINT)),
    WATERMARK wk FOR time_stamp AS WITHOFFSET (time_stamp, 0)--为rowtime定义watermark
) with (
    type = 'kafka011',
    topic = 'timeline_analysis_student',
    `group.id` = 'timeline-analysis-student-mashup-consumer',
    ...
);
AI 代码解读

输出表

create table timeline_signal_analysis_mysql (
    start_time TIMESTAMP,
    end_time TIMESTAMP,
    webserver_id BIGINT,
    user_id BIGINT,
    role TINYINT,
    event VARCHAR,
    event_count BIGINT,
    create_time TIMESTAMP
) with (
    type='RDS',
    tableName='timeline_signal_analysis',
    ...
);
AI 代码解读

业务代码

  • 学生课堂表现解析

    • 学生在课堂中举手回答问题等行为进行积分,以此衡量学生课堂表现。
insert into timeline_signal_analysis_mysql
select 
    TUMBLE_START(time_stamp,INTERVAL '1' MINUTE) as start_time,
    TUMBLE_END(time_stamp,INTERVAL '1' MINUTE) as end_time,
    webserverId as webserver_id,
    userId as user_id,
    role as role,
    event as event,
    COUNT(event) as event_count,
    CURRENT_TIMESTAMP as create_time
FROM timeline_analysis_student_mashup_stream
GROUP BY TUMBLE (time_stamp,INTERVAL '1' MINUTE),
    userId,
    webserverId,
    role,
    event;
AI 代码解读

学生离线(录播)课程学习时长统计

通过 subEvent = 'PPT_SUCCESS' 将完成课程的事件整理出来,通过自关联的方式,和源表进行 JOIN 打宽,计算 'PPT_SUCCESS' 的时间点与最初播放 PPT 的时间差值。

  • 埋点数据样例
{
    "classroom_id":"",
    "user_type":"",
    "user_id":"",
    "event_time":"",
    "sub_event":"",
    "extra":{
        "data_time":"",
        "msg":{
            "pptIndex":""
        }
    }
}
AI 代码解读

输入表

create table qos_log_kafka (
    messageKey VARBINARY,
    `message` VARBINARY,
    topic VARCHAR,
    `partition` INT,
    `offset` BIGINT,
      --(录播)教室 ID
    `classroomId` as cast(JSON_VALUE(`message`, '$.classroom_id')as VARCHAR),
      -- 用户类型
    `userType` as cast(JSON_VALUE(`message`, '$.user_type')as VARCHAR),
      -- 用户 ID
    `userId` as cast(JSON_VALUE(`message`, '$.user_id')as BIGINT),
      -- 事件时间
    `eventTime` as cast(JSON_VALUE(`message`, '$.event_time')as BIGINT),
      -- 次操作
      `subEvent` as cast(JSON_VALUE(`message`, '$.sub_event')as VARCHAR),
      -- 数据时间
      `extraDataTime` as cast(cast(JSON_VALUE(cast(JSON_VALUE(`message`, '$.extra')as VARCHAR), '$.data_time')as VARCHAR)as BIGINT),
    -- PPT 页码
    `extraMsgIndex` as cast(JSON_VALUE(cast(JSON_VALUE(cast(JSON_VALUE(`message`, '$.extra')as VARCHAR), '$.msg')as VARCHAR), '$.pptIndex')as BIGINT)
) with (
    type = 'kafka011',
    topic = 'qos_log',
      ...
);
AI 代码解读

输出表

create table user_enter_classroom_take_time_mysql (
    user_id BIGINT,
    classroom_id VARCHAR,
    user_type VARCHAR,
    spend_time BIGINT,
    event_time TIMESTAMP,
    create_time TIMESTAMP
) with (
    type='rds',
    tableName='user_enter_classroom_take_time',
    ...
);
AI 代码解读

业务代码

  • 学生进入教室时长

    • 离线录播课程,通过 PPT 的播放时间来计算学生进入教室的时长。
CREATE VIEW qos_log_kafka_view AS
SELECT 
    `userId`,
    `classroomId`,
    `userType`,
    `eventTime`,
     subEvent,
    `extraDataTime`
FROM qos_log_kafka
WHERE subEvent = 'PPT_SUCCESS';

insert into user_enter_classroom_take_time_mysql
SELECT
  a.userId,
  a.classroomId,
  a.userType,
  b.extraDataTime-a.extraDataTime,--毫秒值
  TO_TIMESTAMP(a.eventTime),
  CURRENT_TIMESTAMP
FROM qos_log_kafka a
JOIN qos_log_kafka_view b ON a.userId=b.userId AND a.classroomId=b.classroomId 
WHERE a.extraDataTime<b.extraDataTime;
AI 代码解读

场景三:运维/网络监控

通过学生直播课程中,视频/音频运维埋点信息计算,以userId, agoraChannelId,classroomId, userType, event,agoraAudioStateUid/agoraVideoStateUid进行分组,开 30s 的滚动窗口,求最近 30s 直播课的视频/音频质量(丢包/异常平均值、总次数),供下游运维同学监控,实时调整音频/视频质量,给用户最佳的学习体验。

  • 埋点数据样例
{
    "classroom_id":"",
    "user_type":"",
    "user_id":"",
    "agora_channel_id":"",
    "event":"",
    "agora_videoState":{
        "fr":"",
        "uid":""
    },
    "agora_audioState":{
        "lost":"",
        "uid":""
    },
    "messageCreateTime":""
}
AI 代码解读

输入表

create table qos_agora_record_kafka (
    messageKey VARBINARY,
    `message` VARBINARY,
    topic VARCHAR,
    `partition` INT,
    `offset` BIGINT,
      -- 直播教室 ID
    `classroomId` as cast(JSON_VALUE(`message`, '$.classroom_id')as VARCHAR),
      -- 用户类型
    `userType` as cast(JSON_VALUE(`message`, '$.user_type')as VARCHAR),
      -- 用户 ID
    `userId` as cast(JSON_VALUE(`message`, '$.user_id')as BIGINT),
      -- 渠道 ID
    `agoraChannelId` as cast(JSON_VALUE(`message`, '$.agora_channel_id')as BIGINT),
      -- 事件
    `event` as cast(JSON_VALUE(`message`, '$.event')as VARCHAR),
      -- 视频故障记录
    `agoraVideoStateFr` as cast(JSON_VALUE(cast(JSON_VALUE(`message`, '$.agora_videoState')as VARCHAR), '$.fr')as BIGINT),
      -- 视频故障唯一标识 ID
    `agoraVideoStateUid` as cast(JSON_VALUE(cast(JSON_VALUE(`message`, '$.agora_videoState')as VARCHAR), '$.uid')as BIGINT),
      -- 音频丢失记录
    `agoraAudioStateLost` as cast(JSON_VALUE(cast(JSON_VALUE(`message`, '$.agora_audioState')as VARCHAR), '$.lost')as BIGINT),
      -- 音频丢失唯一标识 ID
    `agoraAudioStateUid` as cast(JSON_VALUE(cast(JSON_VALUE(`message`, '$.agora_audioState')as VARCHAR), '$.uid')as BIGINT),
      -- 事件时间
    `messageCreateTime` as cast(JSON_VALUE(`message`, '$.messageCreateTime')as BIGINT),
     WATERMARK wk FOR messageCreateTime AS WITHOFFSET (messageCreateTime, 60000)--为rowtime定义watermark
) with (
    type = 'kafka011',
    topic = 'agora_record',
    ...
);
AI 代码解读

输出表

create table user_av_mysql (
      -- 开窗时间
    start_time TIMESTAMP,
      -- 关窗时间
    end_time TIMESTAMP,
      --用户 ID
    user_id BIGINT,
    web_server_id BIGINT,
      -- 直播教室 ID
    classroom_id VARCHAR,
      -- 用户类型
    user_type VARCHAR,
    extra_uid BIGINT,
    event VARCHAR,
      -- 异常总和值
    event_sum BIGINT,
      -- 异常平均值
    event_avg DOUBLE,
      -- 异常次数
    event_count BIGINT,
    create_time TIMESTAMP
) with (
    type='rds',
    tableName='user_av_record',
    ...
);
AI 代码解读

直播课程(音频)网络监控

业务代码

insert into user_av_mysql
select 
    TUMBLE_START(messageCreateTime, INTERVAL '30' SECOND) as start_time,
    TUMBLE_END(messageCreateTime, INTERVAL '30' SECOND) as end_time,
    CASE WHEN `userId` is NULL THEN -1 else userId END as user_id,
    CASE WHEN `agoraChannelId` is NULL THEN -1 else agoraChannelId END as web_server_id,
    CASE WHEN `classroomId` is NULL THEN -1 else classroomId END as classroom_id,
    userType as user_type,
    agoraAudioStateUid as extra_uid,
    CONCAT(event,'_AUDIO_STATE') as event,
    SUM(agoraAudioStateLost) as event_sum,
    AVG(agoraAudioStateLost) as event_avg,
    COUNT(event) as event_count,
    CURRENT_TIMESTAMP as create_time
FROM qos_agora_record_kafka
WHERE agoraAudioStateLost >= 0 AND userType = 'student'
GROUP BY TUMBLE (messageCreateTime, INTERVAL '30' SECOND),
    userId,
    agoraChannelId,
    classroomId,
    userType,
    event,
    agoraAudioStateUid;
AI 代码解读

直播课程(视频)网络监控

业务代码

insert into user_av_mysql
select 
    TUMBLE_START(messageCreateTime,INTERVAL '30' SECOND) as start_time,
    TUMBLE_END(messageCreateTime,INTERVAL '30' SECOND) as end_time,
    CASE WHEN `userId` is NULL THEN -1 else userId END as user_id,
    CASE WHEN `agoraChannelId` is NULL THEN -1 else agoraChannelId END as web_server_id,
    CASE WHEN `classroomId` is NULL THEN -1 else classroomId END as classroom_id,
    userType as user_type,
    agoraVideoStateUid as extra_uid,
    CONCAT(event,'_VIDEO_STATE') as event,
    SUM(agoraVideoStateFr) as event_sum,
    AVG(agoraVideoStateFr) as event_avg,
    COUNT(event) as event_count,
    CURRENT_TIMESTAMP as create_time
FROM qos_agora_record_kafka
WHERE agoraVideoStateFr >= 0 AND userType = 'student'
GROUP BY TUMBLE (messageCreateTime, INTERVAL '30' SECOND),
    userId,
    agoraChannelId,
    classroomId,
    userType,
    event,
    agoraVideoStateUid;
AI 代码解读

场景四:运营分析

每小时不同 level 的学生注册人数统计

学生通过不同渠道(Web 广告输入、App 广告输入等)进行注册,本场景会读取注册端日志,并关联用户注册时的考试等级表(分为 A/B/C/D 四个 level),以此展现给运营人员,每小时不同 level&渠道 的学生注册人数,实时的调整运营推广策略。

  • 埋点数据样例
--学生表
{
    "id":"",
    "channel_id":"",
    "update_time":""
}
--用户注册数据
{
    "id":"",
    "name":"",
    "register_date_time":"",
    "status":""
}

--学生测试等级表:使用场景一“学生注册考试等级日志ETL清洗”的结果表
AI 代码解读

输入表

create table student_da_src (
    messageKey VARBINARY,
    `message` VARBINARY,
    topic VARCHAR,
    `partition` INT,
    `offset` BIGINT,
    `id` as JSON_VALUE (`message`, '$.id'),--用户 ID
    `channel_id` as JSON_VALUE (`message`, '$.channel_id'),--渠道 ID
    `update_time` as JSON_VALUE (`message`, '$.update_time')--更新时间
) with (
    type = 'kafka010',
    topic = 'uc_account-student',
    ...
);
AI 代码解读
create table user_da_in (
    messageKey VARBINARY,
    `message` VARBINARY,
    topic VARCHAR,
    `partition` INT,
    `offset` BIGINT,
    `id` as JSON_VALUE (`message`, '$.id'),--用户 ID
    `name` as JSON_VALUE (`message`, '$.name'),--用户名称
    `register_date_time` as JSON_VALUE (`message`, '$.register_date_time'),--注册时间
    `status` as JSON_VALUE (`message`, '$.status')--状态
) with (
    type = 'kafka010',
    topic = 'uc_account-user',
    `group.id` = 'uc_account-user',
    ...
);
AI 代码解读
create table channel_da (
    rowkey varchar,
    id VARCHAR,
    `level`  VARCHAR,
    primary key (rowkey),
    PERIOD FOR SYSTEM_TIME
) with (
    type = 'cloudhbase',
    tableName = 'databus:activity.channel',
    ...
    );
AI 代码解读

输出表

create table sink_table (
    uk varchar,
    reg_date bigint,
    level varchar,
    leads bigint,
    primary key (uk)
) with (
    type = 'elasticsearch',
    index = 'vk_app_es_sign_csh',
    typeName = 'vk_app_es_sign_csh',
    ...
);
AI 代码解读

业务代码

create view student_da_src_view as 
SELECT
   last_value(id) as id,
   last_value(update_time) as update_time,
   last_value(channel_id) as channel_id
from student_da_src
group by id;

create view user_da_in_view as 
SELECT
   last_value(id) as id,
   last_value(name) as name,
   last_value(register_date_time) as register_date_time,
   last_value(status) as status
from user_da_in
group by id;
 
insert into 
    sink_table 
SELECT
      case when level in ('A','B','C','D') then level else 'other' end as uk
     ,cast(date_format(register_date_time,'yyyyMMddHH') as bigint) as reg_date
     ,case when level in ('A','B','C','D') then level else 'other' end as levels
     ,COUNT(distinct t.id) AS leads
FROM 
     student_da_src_view t 
LEFT JOIN user_da_in_view  u ON u.id = t.id
LEFT JOIN channel_da FOR SYSTEM_TIME AS OF PROCTIME() ch ON ch.rowkey = MD5(t.channel_id)
where u.name not LIKE '%测试%' 
and u.name not LIKE 'DM\\_%' 
and u.name not LIKE '%test%' 
and u.status='NORMAL'
group by date_format(register_date_time,'yyyyMMddHH')
        ,case when level in ('A','B','C','D') then level else 'other' end
        ,concat(date_format(register_date_time,'yyyyMMddHH'),case when level in ('A','B','C','D') then level else 'other' end)
;
AI 代码解读

每日课程顾问追踪统计

首先通过 ID 进行分组,求出相同 ID 的最新消息(达到去重效果),在最新消息的基础上使用全局Group聚合,根据事件时间(天)、课程顾问 ID 统计每天每位课程顾问找学生确认“学习进度/约课”的次数。

  • 埋点数据样例
{
    "id":"",
    "leads_flow_event_id":"",
    "group_id":"",
    "cc_id":"",
    "student_id":"",
    "order_id":"",
    "leads_id":"",
    "confirm_date_time":"",
    "create_time":"",
    "update_time":"",
    "order_create_time":"",
    "canceled_date_time":"",
    "apply_refund_date":"",
    "status":""
}
AI 代码解读

输入表

create table cc_data_pack_order_info_src (
    `messageKey` VARBINARY,
    `message` VARBINARY,
    `topic` VARCHAR,
    `partition` INT,
    `offset` BIGINT,
      -- ID
    `id` as JSON_VALUE (`message`, '$.id'),
      -- (Course Consultant)课程顾问 ID
    `cc_id` as JSON_VALUE (`message`, '$.cc_id'),
      -- 学生 ID
    `student_id` as JSON_VALUE (`message`, '$.student_id'),
      -- 确认时间
    `confirm_date_time` as JSON_VALUE (`message`, '$.confirm_date_time'),
      -- 创建时间
    `create_time` as JSON_VALUE (`message`, '$.create_time'),
      -- 更新时间
    `update_time` as JSON_VALUE (`message`, '$.update_time'),
      -- 订单创建时间
    `order_create_time` as JSON_VALUE (`message`, '$.order_create_time'),
      -- 订单取消时间
    `canceled_date_time` as JSON_VALUE (`message`, '$.canceled_date_time'),
    -- 付款时间
    `apply_refund_date` as JSON_VALUE (`message`, '$.apply_refund_date'),
      -- 状态
    `status` as JSON_VALUE (`message`, '$.status')
) with (
    type = 'kafka010',
    topic = 'data_pack_order_info',
    `group.id` = 'data_pack_order_info',
    ...
);
AI 代码解读

输出表

CREATE TABLE index_sink (
  `cc_id` bigint(20) NOT NULL,
  `cc_index` bigint(10) NOT NULL, 
  `type` int(6) NOT NULL,
  `attribution_time` varchar NOT NULL,
  `update_time` timestamp NOT NULL,
  PRIMARY KEY (`cc_id`, `type`, `attribution_time`)
) WITH (
    type='rds',
    tableName='staff_index',
    ...
);
AI 代码解读

业务代码


CREATE VIEW cc_data_pack_order_info_view as
select
    last_value (cc_id) as cc_id,
    last_value (confirm_date_time) as confirm_date_time,
    last_value (`status`) as `status`
from
    cc_data_pack_order_info_src
group by
    id;
    
    
insert into index_sink
select 
    cast(cc_id as bigint) as cc_id,
    count(*) as cc_index,
    cast(1 as int) as type,
    date_format(confirm_date_time,'yyyy-MM-dd') as attribution_time,
    current_timestamp as update_time
from
    cc_data_pack_order_info_view 
where 
    confirm_date_time is not null
    and `status` is not null
    and `status` = 3 
group by 
    date_format(confirm_date_time,'yyyy-MM-dd'), cc_id;
AI 代码解读

实时计算 Flink 版产品交流群

test

阿里云实时计算Flink - 解决方案:
https://developer.aliyun.com/article/765097
阿里云实时计算Flink - 场景案例:
https://ververica.cn/corporate-practice
阿里云实时计算Flink - 产品详情页:
https://www.aliyun.com/product/bigdata/product/sc

目录
打赏
0
0
0
0
36032
分享
相关文章
京东物流基于Flink & StarRocks的湖仓建设实践
本文整理自京东物流高级数据开发工程师梁宝彬在Flink Forward Asia 2024的分享,聚焦实时湖仓的探索与建设、应用实践、问题思考及未来展望。内容涵盖京东物流通过Flink和Paimon等技术构建实时湖仓体系的过程,解决复杂业务场景下的数据分析挑战,如多维OLAP分析、大屏监控等。同时,文章详细介绍了基于StarRocks的湖仓一体方案,优化存储成本并提升查询效率,以及存算分离的应用实践。最后,对未来数据服务的发展方向进行了展望,计划推广长周期数据存储服务和原生数据湖建设,进一步提升数据分析能力。
106 1
京东物流基于Flink & StarRocks的湖仓建设实践
中国联通网络资源湖仓一体应用实践
本文分享了中国联通技术专家李晓昱在Flink Forward Asia 2024上的演讲,介绍如何借助Flink+Paimon湖仓一体架构解决传统数仓处理百亿级数据的瓶颈。内容涵盖网络资源中心概况、现有挑战、新架构设计及实施效果。新方案实现了数据一致性100%,同步延迟从3小时降至3分钟,存储成本降低50%,为通信行业提供了高效的数据管理范例。未来将深化流式数仓与智能运维融合,推动数字化升级。
中国联通网络资源湖仓一体应用实践
Hologres实时数仓在B站游戏的建设与实践
本文介绍了B站游戏业务中实时数据仓库的构建与优化过程。为满足日益增长的数据实时性需求,采用了Hologres作为核心组件优化传统Lambda架构,实现了存储层面的流批一体化及离线-实时数据的无缝衔接。文章详细描述了架构选型、分层设计(ODS、DWD、DIM、ADS)及关键技术挑战的解决方法,如高QPS点查、数据乱序重写等。目前,该实时数仓已广泛应用于运营分析、广告投放等多个场景,并计划进一步完善实时指标体系、扩展明细层应用及研发数据实时解析能力。
Hologres实时数仓在B站游戏的建设与实践
Hologres实时湖仓能力入门实践
本文由武润雪(栩染)撰写,介绍Hologres 3.0版本作为一体化实时湖仓平台的升级特性。其核心能力包括湖仓存储一体、多模式计算一体、分析服务一体及Data+AI一体,极大提升数据开发效率。文章详细解析了两种湖仓架构:MaxCompute + Hologres实现离线实时一体化,以及Hologres + DLF + OSS构建开放湖仓架构,并深入探讨元数据抽象、权限互通等重点功能,同时提供具体使用说明与Demo演示。
Hologres计算组实例&分时弹性入门实践
本文由骆撷冬(Hologres PD)撰写,围绕Hologres计算组实例与分时弹性的入门实践展开。内容分为三部分:第一部分介绍Hologres计算组实例的原理与架构,解决负载隔离、资源浪费、大任务和运维难题;第二部分演示计算组实例的入门实践,包括管理、授权、连接及监控等操作;第三部分讲解分时弹性的使用,涵盖配置方法、成本优化及监控告警。通过具体案例与操作步骤,帮助用户更好地理解和应用Hologres的弹性计算能力。
抖音集团电商流量实时数仓建设实践
本文基于抖音集团电商数据工程师姚遥在Flink Forward Asia 2024的分享,围绕电商流量数据处理展开。内容涵盖业务挑战、电商流量建模架构、流批一体实践、大流量任务调优及总结展望五个部分。通过数据建模与优化,实现效率、质量、成本和稳定性全面提升,数据质量达99%以上,任务性能提升70%。未来将聚焦自动化、低代码化与成本优化,探索更高效的流批一体化方案。
92 11
抖音集团电商流量实时数仓建设实践
从湖仓分离到湖仓一体,四川航空基于 SelectDB 的多源数据联邦分析实践
川航选择引入 SelectDB 建设湖仓一体大数据分析引擎,取得了数据导入效率提升 3-6 倍,查询分析性能提升 10-18 倍、实时性提升至 5 秒内等收益。
从湖仓分离到湖仓一体,四川航空基于 SelectDB 的多源数据联邦分析实践
天翼云:Apache Doris + Iceberg 超大规模湖仓一体实践
天翼云基于 Apache Doris 成功落地项目已超 20 个,整体集群规模超 50 套,部署节点超 3000 个,存储容量超 15PB
天翼云:Apache Doris + Iceberg 超大规模湖仓一体实践
vivo基于Paimon的湖仓一体落地实践
本文整理自vivo互联网大数据专家徐昱在Flink Forward Asia 2024的分享,基于实际案例探讨了构建现代化数据湖仓的关键决策和技术实践。内容涵盖组件选型、架构设计、离线加速、流批链路统一、消息组件替代、样本拼接、查询提速、元数据监控、数据迁移及未来展望等方面。通过这些探索,展示了如何优化性能、降低成本并提升数据处理效率,为相关领域提供了宝贵的经验和参考。
532 3
vivo基于Paimon的湖仓一体落地实践
Flink CDC + Hologres高性能数据同步优化实践
本文整理自阿里云高级技术专家胡一博老师在Flink Forward Asia 2024数据集成(二)专场的分享,主要内容包括:1. Hologres介绍:实时数据仓库,支持毫秒级写入和高QPS查询;2. 写入优化:通过改进缓冲队列、连接池和COPY模式提高吞吐量和降低延迟;3. 消费优化:优化离线场景和分区表的消费逻辑,提升性能和资源利用率;4. 未来展望:进一步简化用户操作,支持更多DDL操作及全增量消费。Hologres 3.0全新升级为一体化实时湖仓平台,提供多项新功能并降低使用成本。
311 1
Flink CDC + Hologres高性能数据同步优化实践

相关产品

  • 实时计算 Flink版
  • AI助理

    你好,我是AI助理

    可以解答问题、推荐解决方案等