草庐IT

场景化、重实操,分享一个实时数仓实践案例

佳钰 2023-03-28 原文
​大部分数据团队在进行实时业务建设的初期,都会出现烟囱式开发、一个任务搞定全部数据加工环节等问题,缺乏实时数据的管理和实时数仓分层建设的规范意识。随着实时场景的进一步丰富,出现了实时数据复用、业务方自助进行实时取数等需求,因此要求数据团队要像管理离线数据一样对实时数据进行有规范的实时数仓管理。

本文将从一个实际业务场景和一个模拟数仓构建的案例来说明如何利用 EasyData 实时开发平台来建设实时数仓。

1、实际业务场景

1.1 背景介绍

业务方是某移动 APP 的运营团队,需求是要实时监控各类运营活动的 ABtest 的实验效果,以便业务方根据实验效果随时调整运营投放策略、投放目标用户和投放比例。

1.2 业务数据分层

通常业务方的实时数据流转链路包含实时数据采集、实时数据加工处理、实时数据落库查询等步骤,在此用户的ABtest 场景中,数据加工链路如下图所示:

第一步:用户的日志数据经过实时采集写入 ODS 层的 Kafka 中。ODS 层数据为原始未加工的业务数据,保存在 Kafka,7 天后自动清理。

第二步:ODS 层数据经过 Flink 任务处理写入 DWD 层 Kafka 中。DWD 层数据为经过清洗的数据。

第三步:DWD 层数据经过 Flink 任务处理写入 DWS 层 KUDU 数据库中落库。DWS 层数据为经过聚合、过滤等加工步骤,可以向业务方提供的数据。

第四步:业务方在需要时通过 Impala 查询 KUDU 数据库中的数据生成报表。

通过以上实时数据加工链路,业务方可实现实时报表展示,时效性较离线加工链路大大提高,可以满足业务方要求数据实时更新的需求。

1.3 业务痛点

在这个业务场景中 ODS 层、DWD 层的 Kafka 数据在其他加工链路中也需要被复用,但在其他链路加工过程中,同样的 Topic 需要不断重复在不同任务中进行 Flink Table 的定义,每次定义用户均需要使用 DDL 语句定义字段、表配置等,重复工作很多,同时在任务中进行表定义时,数据管理者无法感知哪些数据已被使用,也无法判断是否有可以优化的数据流转链路。

1.4 产品方案

EasyData 实时开发模块中为用户提供了实时流表登记和管理的功能,辅助用户进行实时数仓的建设。实时流表是 EasyData 实时开发模块中的特有概念。流表的内容为 Kafka,Rocketmq 等没有明确 schema 的消息中间件的元数据。在平台通过登记流表并在任务中直接引用流表的方式即可将这部分元数据进行复用。

同时在流表管理模块中,用户可以查看流表的定义。此外,按照业务方的数仓规范中的表命名规范登记流表后,可以根据流表的表名判断流表的分层归属。在接下来的规划中,数仓流表模块将支持数据血缘查看、数据预览、使用数据模型建表等功能,基于流表元数据进行更完整更易用的实时数仓管理。

在下方模拟案例介绍中,将为大家讲解如何定义和使用流表,以及如何通过登记流表进行数仓建设。

2、案例场景介绍

业务目标:统计某 APP 实时访问的 DAU,需要统计的值包括总 DAU,各设备类型 DAU(iPhone、华为、OPPO、其他)。

业务数据链路:

第一步:通过 CDC 任务采集用户访问数据数据实时变更至 Kafka(对应案例步骤第一步)

第二步:将 ODS 层 Kafka 数据通过 Flink 任务进行清洗和聚合,写入 MySQL 落库(对应案例步骤第二、三步)

第三步:将 MySQL 数据通过 BI 报表展示(对应案例最终结果)

3、案例操作步骤

3.1 准备阶段:准备模拟数据

3.1.1 数据源准备

需准备用于实践的 MySQL 数据源和 Kafka 数据源。

3.1.2 数据表准备

(1)准备 DS 层源端表:DAU_DS

此表用于记录用户访问数据。表结构与样例数据如下:

(2)准备 DWD 层结果表:DAU_FINAL

此表用于统计最终结果。表结构如下:

注意:由于模拟案例最终希望直接展示不同用户的计算结果,故需要向同一张已提前制作好对应 BI 报表的表内写数据,每人更新一行数据。正常业务场景下根据业务需求决定结果表结构和数量。

3.2 第一步:创建 CDC 任务

3.2.1 创建 CDC 任务

任务名称可自行命名,任务保存位置可选择根目录或创建以自己名字命名的目录。任务环境和任务类别为任务标签信息,选择测试和默认标签即可,不影响任务实际运行。

3.2.2 编辑 CDC 任务

源端配置:

表:DAU_DS

传输起始位点:

若只想消费新增数据,请选择最新数据,最终结果报表中将仅有体验当日的数据。

若想先消费历史存量数据,之后再消费最新数据,请选择全量初始化,最终结果报表中将有历史数据。

目标端配置:

类型:kafka

数据源:poc_kafka

Topic:自行命名,可通过目标 Topic 生成规则生成,也可在目标 Topic 中手动修改,建议修改目标 Topic 名称为自己的名称,方便下一步新建流表时使用。此处选择不存在的 Topic,在任务运行后对应 Topic 将被自动创建。

序列化方式:canal-json

3.2.3 保存并一键发布任务

点击页面上方的 保存 和 一键发布 按钮,填写任意提交描述,将任务发布至实时运维列表。

成功发布后可点击 运维 按钮前往任务运维页面。

3.2.4 启动任务

在运维页面找到对应任务后点击 启动 按钮启动任务。

任务成功启动,任务状态变为运行中时,创建 CDC 任务步骤操作完成。

3.3 第二步:创建 ODS 层流表

点击实时开发页面左侧目录第四项流表,打开流表管理页面。点击页面右上角创建表按钮,开始创建流表。

表名:自行命名

topic:填写上一步 CDC 任务的目标 Topic 名称

序列化方式:canal-json

填写完以上信息后可开始进行字段自动解析。

字段信息获取方式选择自动解析,之后点击获取数据,获取到数据样例后点击解析,即可解析出流表的字段信息。

Tip:若 CDC 任务正常运行但此处未获取到样例数据,可能是因为数据暂未写入,稍等一分钟后重新尝试。

字段信息确认无误后即可保存流表。

保存流表成功,即为此步骤操作完成。

3.4 第三步:创建 SQL 任务

3.4.1 创建 SQL 任务

引擎请选择 FLINK-1.14。其他类似创建 CDC 任务步骤。

3.4.2 编辑 SQL 代码

代码逻辑为:将同一天的用户方式数据按日期聚合,并统计当天的DAU总数以及各设备类型的DAU。

在代码中引用流表时,直接使用 [库].[表] 二元组的写法即可使用对应的流表。

注意以下内容在拷贝代码后需自行更改:

Kafka消费者组id配置,需要更改配置中的流表名称为自己的流表名称

Kafka流表名称,需要更改为上一步中自己登记的流表名称

插入结果表的submitter字段值,需要更改为自己的名字

具体要修改的内容请见代码中的标注。

-- 设置Kafka消费者组id,需要更改自己创建的流表名称, 配置方式:'{流表名称}.connections.group.id' = '{groupid-name}'set '这里填流表名称.connections.group.id' = 'example001'; -- 设置读取消息队列的位置, 配置方式:'{流表名称}.scan.startup.mode' = '{earliest-offset / latest-offset}'set '这里填流表名称.scan.startup.mode' = 'earliest-offset'; -- 设置流表主键,配置方式:'{流表名称}.primary.keys' = '{primary key name}'set '这里填流表名称.primary.keys' = 'id';--设置源端表表名CREATE view v1 ASSELECT SUBSTRING(visitTime, 0, 10) AS `date`, equipment from 这里填流表库名称.这里填流表名称;CREATE view v2 AS SELECT `date`, equipment, COUNT(equipment) AS eq_dauFROM v1GROUP BY `date`, equipment;--将你的名字填入 submitter 字段CREATE view v3 ASSELECT '这里填你的名字' AS submitter, `date`, SUM(CASE equipment WHEN 'iPhone' THEN eq_dau ELSE 0 END) AS iPhoneDAU, SUM(CASE equipment WHEN 'Huawei' THEN eq_dau ELSE 0 END) AS HuaweiDAU, SUM(CASE equipment WHEN 'OPPO' THEN eq_dau ELSE 0 END) AS oppoDAU, SUM(CASE equipment WHEN 'other' THEN eq_dau ELSE 0 END) AS otherDAUFROM v2 GROUP BY `date`;CREATE VIEW v4 ASSELECT (v3.iPhoneDAU + v3.HuaweiDAU + v3.oppoDAU + v3.otherDAU) AS totalDAU, submitter, `date`, iPhoneDAU, HuaweiDAU, oppoDAU, otherDAUFROM v3;set 'DAU_FINAL.primary.keys' = 'submitter,date';INSERT INTO MySQL数据源标识.MySQL数据库名称.DAU_FINALSELECT * FROM v4;
3.4.3 发布 SQL 任务并启动任务

保存并发布 SQL 任务,并启动任务。操作方法与 CDC 任务的发布和启动相同。

SQL 任务成功启动且状态变为运行中,则此步骤操作完成。

3.4.4 创建 BI 报表并展示数据结果

在有数 BI 中创建对应报表,查看最终的统计结果即可。刷新报表数据即可看到报表数据实时更新后的结果。预期效果如下:

(1)折线图:

(2)报表

有关场景化、重实操,分享一个实时数仓实践案例的更多相关文章

  1. ruby-on-rails - 使用 Ruby on Rails 进行自动化测试 - 最佳实践 - 2

    很好奇,就使用ruby​​onrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提

  2. ruby - 使用 Vim Rails,您可以创建一个新的迁移文件并一次性打开它吗? - 2

    使用带有Rails插件的vim,您可以创建一个迁移文件,然后一次性打开该文件吗?textmate也可以这样吗? 最佳答案 你可以使用rails.vim然后做类似的事情::Rgeneratemigratonadd_foo_to_bar插件将打开迁移生成的文件,这正是您想要的。我不能代表textmate。 关于ruby-使用VimRails,您可以创建一个新的迁移文件并一次性打开它吗?,我们在StackOverflow上找到一个类似的问题: https://sta

  3. ruby-on-rails - Rails - 一个 View 中的多个模型 - 2

    我需要从一个View访问多个模型。以前,我的links_controller仅用于提供以不同方式排序的链接资源。现在我想包括一个部分(我假设)显示按分数排序的顶级用户(@users=User.all.sort_by(&:score))我知道我可以将此代码插入每个链接操作并从View访问它,但这似乎不是“ruby方式”,我将需要在不久的将来访问更多模型。这可能会变得很脏,是否有针对这种情况的任何技术?注意事项:我认为我的应用程序正朝着单一格式和动态页面内容的方向发展,本质上是一个典型的网络应用程序。我知道before_filter但考虑到我希望应用程序进入的方向,这似乎很麻烦。最终从任何

  4. ruby-on-rails - 渲染另一个 Controller 的 View - 2

    我想要做的是有2个不同的Controller,client和test_client。客户端Controller已经构建,我想创建一个test_clientController,我可以使用它来玩弄客户端的UI并根据需要进行调整。我主要是想绕过我在客户端中内置的验证及其对加载数据的管理Controller的依赖。所以我希望test_clientController加载示例数据集,然后呈现客户端Controller的索引View,以便我可以调整客户端UI。就是这样。我在test_clients索引方法中试过这个:classTestClientdefindexrender:template=>

  5. ruby-on-rails - 如果 Object::try 被发送到一个 nil 对象,为什么它会起作用? - 2

    如果您尝试在Ruby中的nil对象上调用方法,则会出现NoMethodError异常并显示消息:"undefinedmethod‘...’fornil:NilClass"然而,有一个tryRails中的方法,如果它被发送到一个nil对象,它只返回nil:require'rubygems'require'active_support/all'nil.try(:nonexisting_method)#noNoMethodErrorexceptionanymore那么try如何在内部工作以防止该异常? 最佳答案 像Ruby中的所有其他对象

  6. ruby - 为什么 SecureRandom.uuid 创建一个唯一的字符串? - 2

    关闭。这个问题需要detailsorclarity.它目前不接受答案。想改进这个问题吗?通过editingthispost添加细节并澄清问题.关闭8年前。Improvethisquestion为什么SecureRandom.uuid创建一个唯一的字符串?SecureRandom.uuid#=>"35cb4e30-54e1-49f9-b5ce-4134799eb2c0"SecureRandom.uuid方法创建的字符串从不重复?

  7. ruby-on-rails - Rails - 从另一个模型中创建一个模型的实例 - 2

    我有一个正在构建的应用程序,我需要一个模型来创建另一个模型的实例。我希望每辆车都有4个轮胎。汽车模型classCar轮胎模型classTire但是,在make_tires内部有一个错误,如果我为Tire尝试它,则没有用于创建或新建的activerecord方法。当我检查轮胎时,它没有这些方法。我该如何补救?错误是这样的:未定义的方法'create'forActiveRecord::AttributeMethods::Serialization::Tire::Module我测试了两个环境:测试和开发,它们都因相同的错误而失败。 最佳答案

  8. ruby - 用 Ruby 编写一个简单的网络服务器 - 2

    我想在Ruby中创建一个用于开发目的的极其简单的Web服务器(不,不想使用现成的解决方案)。代码如下:#!/usr/bin/rubyrequire'socket'server=TCPServer.new('127.0.0.1',8080)whileconnection=server.acceptheaders=[]length=0whileline=connection.getsheaders想法是从命令行运行这个脚本,提供另一个脚本,它将在其标准输入上获取请求,并在其标准输出上返回完整的响应。到目前为止一切顺利,但事实证明这真的很脆弱,因为它在第二个请求上中断并出现错误:/usr/b

  9. ruby - 一个 YAML 对象可以引用另一个吗? - 2

    我想让一个yaml对象引用另一个,如下所示:intro:"Hello,dearuser."registration:$introThanksforregistering!new_message:$introYouhaveanewmessage!上面的语法只是它如何工作的一个例子(这也是它在thiscpanmodule中的工作方式。)我正在使用标准的ruby​​yaml解析器。这可能吗? 最佳答案 一些yaml对象确实引用了其他对象:irb>require'yaml'#=>trueirb>str="hello"#=>"hello"ir

  10. ruby - Rails 关联 - 同一个类的多个 has_one 关系 - 2

    我的问题的一个例子是体育游戏。一场体育比赛有两支球队,一支主队和一支客队。我的事件记录模型如下:classTeam"Team"has_one:away_team,:class_name=>"Team"end我希望能够通过游戏访问一个团队,例如:Game.find(1).home_team但我收到一个单元化常量错误:Game::team。谁能告诉我我做错了什么?谢谢, 最佳答案 如果Gamehas_one:team那么Rails假设您的teams表有一个game_id列。不过,您想要的是games表有一个team_id列,在这种情况下

随机推荐