草庐IT

开源共建 | Dinky 扩展批流统一数据集成框架 ChunJun 的实践分享

数栈DTinsight 2023-03-28 原文

一、前言

ChunJun(原FlinkX)是一个基于 Flink 提供易用、稳定、高效的批流统一的数据集成工具,既可以采集静态的数据,比如 MySQL,HDFS 等,也可以采集实时变化的数据,比如 binlog,Kafka等。同时 ChunJun 也是一个支持原生 FlinkSql所有语法和特性的计算框架。

ChunJun 具有丰富的插件种类,多达40种,如常见的 mysql、binlog、logminer 等,大部分插件都支持 source/reader、sink/writer 及维表功能。目前很多用户在思考能否在 Dinky 上使用 ChunJun 的插件以提供更全面的能力。那本文将带来如何在 Dinky 上集成 ChunJun 丰富的插件,其实简单,那我们开始吧。

二、部署 Flink+ChunJun

编译

注意,如果需要集成 Dinky,需要将 ChunJun项目下的 chunjun-core 的pom 文件中的 logback-classic 和 logback-core 注释掉,否则容易在 Dinky 执行 sql 任务的时候报错。


然后执行:

部署

使用 ChunJun 需要先部署 Flink 集群,其部署本文不再做指导。

值得注意的是,如果你需要调用 Flinkx 的 connect jar 的话,则需要将 classloader.resolve-order 改成 parent-first。修改完成配置以后,把 Flinkx 的 jar 包复制过来,主要是 chunjun-clients-master.jar(Flinkx 现在改名 ChunJun )以及 chunjun 的其它 connector 放到 flink/lib 目录下,如图所示。

异常处理

如果启动集群时出现异常,即 Flink standalone 集群加载 flinkx-dist 里 jar 包之后,集群无法启动,日志报错:Exception in thread "main" java.lang.NoSuchFieldError: EMPTY_BYTE_ARRAY.

Exception in thread"main"java.lang.NoSuchFieldError:EMPTY_BYTE_ARRAY
at org.apache.logging.log4j.core.config.ConfigurationSource.(ConfigurationSource.java:56)
at org.apache.logging.log4j.core.config.NullConfiguration.< init>(NullConfiguration.java:32)
at org.apache.logging.log4j.core.LoggerContext.< clinit>(LoggerContext.java:85)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at org.apache.log4j.LogManager.< clinit>(LogManager.java:72)
at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:73)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:285)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:305)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.< clinit>(ClusterEntrypoint.java:107)

原因:这个报错是因为 log4j 版本不统一导致的,因为 flinkx-dist 中部分插件引用的还是旧版本的 log4j 依赖,导致集群启动过程中,出现了类冲突问题;

方案:临时方案是将 flink lib 中 log4j 相关的jar包名字前加上字符 ‘a‘,使得flink standalone jvm 优先加载。

三、部署 Dinky

编译


编译完成后的压缩包在 Dinky 根目录下的 build 文件夹下。

部署

1、上传dlink压缩包到部署服务器

2、解压


3、数据库初始化

4、把 flink 的 jar 放到 dlink 目录下

因为目前 flinkx 的稳定版本是 1.12.7,所以我们把 dlink 默认的 client 版本修改为 1.12


lib下的目录如图:


注意:因为我没有用上 dlink-connector-jdbc 的 jar 包,所以图中的 dlink-connector-jdbc-1.13-0.6.4-SNAPSHOT.jar 没有换成1.12版本的,可以去掉。

启动

启动命令

注册集群实例

在集群实例中注册已经启动的 Flink 集群。

四、示例分享

添加依赖

这里演示 mysql->mysql 的同步作业,所以需要 Flinkx 的 mysql-connector.jar 以及核心 jar。

编写作业

Mysql DDL:

CREATE TABLE datasource_classify (
id int unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id',
classify_code varchar(64) NOT NULL COMMENT '类型栏唯一编码',
sorted int NOT NULL DEFAULT '0' COMMENT '类型栏排序字段 默认从0开始',
classify_name varchar(64) NOT NULL COMMENT '类型名称 包含全部和常用栏',
is_deleted tinyint NOT NULL DEFAULT '0' COMMENT '是否删除,1删除,0未删除',
gmt_create datetime DEFAULT CURRENT_TIMESTAMP,
gmt_modified datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id),
UNIQUE KEY classify_code (classify_code)
) ENGINE=InnoDB AUTO_INCREMENT=12 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='数据源分类表';

CREATE TABLE source
(
id bigint,
classify_code STRING,
sorted int,
classify_name STRING,
is_deleted int,
gmt_create timestamp(9),
gmt_modified timestamp(9),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-x',
'url' = 'jdbc:mysql://192.168.31.101:3306/datasource?useSSL=false',
'table-name' = 'datasource_classify',
'username' = 'root',
'password' = 'root'
,'scan.fetch-size' = '2'
,'scan.query-timeout' = '10'
);

CREATE TABLE sink
(
id bigint,
classify_code STRING,
sorted int,
classify_name STRING,
is_deleted int,
gmt_create timestamp(9),
gmt_modified timestamp(9),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-x',
'url' = 'jdbc:mysql://192.168.31.106:3306/test?useSSL=false',
'table-name' = 'datasource_classify',
'username' = 'root',
'password' = 'root'
,'scan.fetch-size' = '2'
,'scan.query-timeout' = '10'
);

insert into sink
select *
from source u;

执行任务


选中 Yarn Session 模式提交作业。

提交后可从执行历史查看作业提交状况。

进程中可以看的 Flink 集群上批作业执行完成。

对比数据

源库:

目标库:

同步成功,很丝滑。

五、总结

在集成 ChunJun 的时候遇到的问题大部分都是缺包以及包冲突,所以只需要注意一下这个问题就能比较好的进行集成。

在集成服务的时候建议是,先把 Flink 和 ChunJun 进行集成,确保服务能够正常启用以后再进行 Dinky 的集成,这样有利于快速定位查找问题,如果遇到文章之外的问题,也可以查看 Dinky 官网FAQ | Dinky (dlink.top) chunjun的官网QuickStart | ChunJun 纯钧 (dtstack.github.io/chunjun/),看看是否有类似问题的解决办法作为参考。

六、用户体验

因为本人目前还是处于学习使用的过程中,所以很多功能没有好好使用,待自己研究更加透彻后希望写一篇文章,优化官网的用户手册。以下的优缺点以及建议都是目前我在使用学习的过程中遇到的问题。

优点

Dinky 最吸引我的地方应该就是 sql 编辑模版了,直接快捷键生成 sql 模版,在开发测试中屡试不爽。在集成了 ChunJun(Flinkx) 以后,能够做到多源数据的离线跑批任务及日常小批量实时任务的同步。支持各种类型的任务执行方式。

缺点

ui 上适配还有点小问题,例如:打开 F12 调整宽度后,再关闭,页面 ui 不会自适应,需要刷新。

期待改进点

1、更多的自定义异常、业务异常

2、增加新的向导模式,结合数据源,通过 webUI 可以一键引入字段或者勾选需要的字段,生成 Flink Sql 的一大部分配置

CREATE TABLE 表名
(
-- 页面勾选字段,字段从元数据直接拉取
id bigint,
classify_code STRING,
sorted int,
classify_name STRING,
is_deleted int,
gmt_create timestamp(9),
gmt_modified timestamp(9),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
-- 从选择的数据中获取
'connector' = 'mysql-x',
'url' = 'jdbc:mysql://192.168.31.106:3306/test?useSSL=false',
'table-name' = 'datasource_classify',
'username' = 'root',
'password' = 'root'
,
-- 其它非主要配置有用户自己填写
);
3、sql 历史版本管理,目前我已经提交 Feature 并被合并到 0.6.5 版本中。

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=szbky

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术qun」,交流最新开源技术信息,qun号码:30537511,项目地址:https://github.com/DTStack/Taier

有关开源共建 | Dinky 扩展批流统一数据集成框架 ChunJun 的实践分享的更多相关文章

  1. ruby-on-rails - 使用 Ruby on Rails 进行自动化测试 - 最佳实践 - 2

    很好奇,就使用ruby​​onrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提

  2. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  3. ruby - 使用 C 扩展开发 ruby​​gem 时,如何使用 Rspec 在本地进行测试? - 2

    我正在编写一个包含C扩展的gem。通常当我写一个gem时,我会遵循TDD的过程,我会写一个失败的规范,然后处理代码直到它通过,等等......在“ext/mygem/mygem.c”中我的C扩展和在gemspec的“扩展”中配置的有效extconf.rb,如何运行我的规范并仍然加载我的C扩展?当我更改C代码时,我需要采取哪些步骤来重新编译代码?这可能是个愚蠢的问题,但是从我的gem的开发源代码树中输入“bundleinstall”不会构建任何native扩展。当我手动运行rubyext/mygem/extconf.rb时,我确实得到了一个Makefile(在整个项目的根目录中),然后当

  4. ruby - Ruby 有 `Pair` 数据类型吗? - 2

    有时我需要处理键/值数据。我不喜欢使用数组,因为它们在大小上没有限制(很容易不小心添加超过2个项目,而且您最终需要稍后验证大小)。此外,0和1的索引变成了魔数(MagicNumber),并且在传达含义方面做得很差(“当我说0时,我的意思是head...”)。散列也不合适,因为可能会不小心添加额外的条目。我写了下面的类来解决这个问题:classPairattr_accessor:head,:taildefinitialize(h,t)@head,@tail=h,tendend它工作得很好并且解决了问题,但我很想知道:Ruby标准库是否已经带有这样一个类? 最佳

  5. c - mkmf 在编译 C 扩展时忽略子文件夹中的文件 - 2

    我想这样组织C源代码:+/||___+ext||||___+native_extension||||___+lib||||||___(Sourcefilesarekeptinhere-maycontainsub-folders)||||___native_extension.c||___native_extension.h||___extconf.rb||___+lib||||___(Rubysourcecode)||___Rakefile我无法使此设置与mkmf一起正常工作。native_extension/lib中的文件(包含在native_extension.c中)将被完全忽略。

  6. ruby - 我如何添加二进制数据来遏制 POST - 2

    我正在尝试使用Curbgem执行以下POST以解析云curl-XPOST\-H"X-Parse-Application-Id:PARSE_APP_ID"\-H"X-Parse-REST-API-Key:PARSE_API_KEY"\-H"Content-Type:image/jpeg"\--data-binary'@myPicture.jpg'\https://api.parse.com/1/files/pic.jpg用这个:curl=Curl::Easy.new("https://api.parse.com/1/files/lion.jpg")curl.multipart_form_

  7. 世界前沿3D开发引擎HOOPS全面讲解——集3D数据读取、3D图形渲染、3D数据发布于一体的全新3D应用开发工具 - 2

    无论您是想搭建桌面端、WEB端或者移动端APP应用,HOOPSPlatform组件都可以为您提供弹性的3D集成架构,同时,由工业领域3D技术专家组成的HOOPS技术团队也能为您提供技术支持服务。如果您的客户期望有一种在多个平台(桌面/WEB/APP,而且某些客户端是“瘦”客户端)快速、方便地将数据接入到3D应用系统的解决方案,并且当访问数据时,在各个平台上的性能和用户体验保持一致,HOOPSPlatform将帮助您完成。利用HOOPSPlatform,您可以开发在任何环境下的3D基础应用架构。HOOPSPlatform可以帮您打造3D创新型产品,HOOPSSDK包含的技术有:快速且准确的CAD

  8. 叮咚买菜基于 Apache Doris 统一 OLAP 引擎的应用实践 - 2

    导读:随着叮咚买菜业务的发展,不同的业务场景对数据分析提出了不同的需求,他们希望引入一款实时OLAP数据库,构建一个灵活的多维实时查询和分析的平台,统一数据的接入和查询方案,解决各业务线对数据高效实时查询和精细化运营的需求。经过调研选型,最终引入ApacheDoris作为最终的OLAP分析引擎,Doris作为核心的OLAP引擎支持复杂地分析操作、提供多维的数据视图,在叮咚买菜数十个业务场景中广泛应用。作者|叮咚买菜资深数据工程师韩青叮咚买菜创立于2017年5月,是一家专注美好食物的创业公司。叮咚买菜专注吃的事业,为满足更多人“想吃什么”而努力,通过美好食材的供应、美好滋味的开发以及美食品牌的孵

  9. FOHEART H1数据手套驱动Optitrack光学动捕双手运动(Unity3D) - 2

    本教程将在Unity3D中混合Optitrack与数据手套的数据流,在人体运动的基础上,添加双手手指部分的运动。双手手背的角度仍由Optitrack提供,数据手套提供双手手指的角度。 01  客户端软件分别安装MotiveBody与MotionVenus并校准人体与数据手套。MotiveBodyMotionVenus数据手套使用、校准流程参照:https://gitee.com/foheart_1/foheart-h1-data-summary.git02  数据转发打开MotiveBody软件的Streaming,开始向Unity3D广播数据;MotionVenus中设置->选项选择Unit

  10. 使用canal同步MySQL数据到ES - 2

    文章目录一、概述简介原理模块二、配置Mysql使用版本环境要求1.操作系统2.mysql要求三、配置canal-server离线下载在线下载上传解压修改配置单机配置集群配置分库分表配置1.修改全局配置2.实例配置垂直分库水平分库3.修改group-instance.xml4.启动监听四、配置canal-adapter1修改启动配置2配置映射文件3启动ES数据同步查询所有订阅同步数据同步开关启动4.验证五、配置canal-admin一、概述简介canal是Alibaba旗下的一款开源项目,Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。Git地址:https://github.co

随机推荐