草庐IT

DataX(MySQL同步数据到Doris)

大数据左右手 2024-01-26 原文

前言

  1. 编译DataX doriswriter plugin
  2. DataX mysqlreader 写入数据到Doris
  3. 性能测试
  4. Bug 记录

1. 编译 doriswriter

doriswriter 插件

https://github.com/apache/incubator-doris/tree/master/extension/DataX

步骤(按需修改源代码)

  1. 从github上拉取源码(或者直接在上面地址下载包)

git clone https://github.com/apache/incubator-doris.git

不过执行 init 即可
2. 运行 init-env.sh

主要做了下面几件事,减少了繁杂的操作。

(1)将 DataX 代码库 clone 到本地。

(2)将 doriswriter/ 目录软链到 DataX/doriswriter 目录。

(3)在 DataX/pom.xml 文件中添加 doriswriter 模块。

(4)将 DataX/core/pom.xml 文件中的 httpclient 版本从 4.5 改为 4.5.13(因为有bug)

  1. 编译 doriswriter

(1)命令

mvn clean install -pl plugin-rdbms-util,doriswriter -DskipTests

(2)编译完地址

../target/datax/datax/

  1. 把编译完的包放到plugin下即可

../datax/plugin/

  1. 具体可看Doris官网

http://doris.incubator.apache.org/zh-CN/extending-doris/datax.html

2. MySQL同步数据到Doris

创建Doris表

CREATE TABLE `mars_micro_user_events` (
  `id` bigint,
  `user_id` bigint DEFAULT NULL ,
  `group_type` int DEFAULT NULL,
  `group_id` int DEFAULT NULL,
  `event_type` varchar(45) DEFAULT NULL,
  `event_name` varchar(45) DEFAULT NULL,
  `event_count` int DEFAULT NULL,
  `event_time` bigint DEFAULT NULL,
  `created_time` bigint DEFAULT NULL,
  `updated_time` bigint DEFAULT NULL
) ENGINE=OLAP
DUPLICATE KEY(id)
COMMENT "OLAP"
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
"replication_num" = "3"
);

DataX 配置JSON文件 (mysqlToDoris.json)

{
  "core":{
    "transport": {
      "channel": {
        "speed": {
          "byte": 104857600,
          "record": 200000
        }
      }
    }
  },
    "job": {
        "setting": {
            "speed": {
                "channel": 1
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "dev",
                        "password": "123456",
                        "connection": [
                            {
                            
                                "jdbcUrl": [
                                     "jdbc:mysql://mysql.xxx.cn:3306/event_db"
                                ],
                                "querySql": [
                                    "select id,user_id,group_type,group_id,event_type,event_name,event_time,created_time,updated_time from eventc_db.user_events where event_time>=${StartTime} and event_time<${EndTime};"
                                ]
                            }
                        ]
                    }
                },
               "writer": {
                    "name": "doriswriter",
                    "parameter": {
                        "username": "root",
                        "password": "",
                        "database": "event_db",
                        "table": "events",
                        "column": [ "id",  "group_type","group_id","event_type","event_name","event_time","created_time","updated_time"],
                        "preSql": [],
                        "postSql": [], 
                        "jdbcUrl": "jdbc:mysql://cdh3:9030/",
                        "feLoadUrl": ["cdh3:8030"],
                        "beLoadUrl": ["cdh1:8044", "cdh2:8044", "cdh3:8044"],
                        "loadProps": {
                        },
                        "maxBatchRows" : 200000,
                        "maxBatchByteSize" : 104857600,
                        "lineDelimiter": "\n"
                    }
                }
            }
        ]
    }
}

core与job参数介绍

配置

说明

core.transport.channel.speed.byte

单个channel容纳最多的字节数

core.transport.channel.speed.record

单个channel容纳最多的record数

job.setting.speed.channel

该job所需要的channel的个数

job.setting.speed.byte

该job最大的流量

job.setting.speed.record

该job最大的record流量

doriswriter 参数介绍

配置

说明

username

数据库的用户名

password

数据库的密码

database

表的数据库名称

table

表的表名称

jdbcUrl

数据库的 JDBC 连接信息

loadUrl

FE的地址用于Stream Load

feLoadUrl

FE的地址

beLoadUrl

BE的地址

column

需要写入数据的字段

preSql

写入数据到目的表前,会先执行这里的标准语句

postSql

写入数据到目的表后,会执行这里的标准语句

loadProps

Stream Load 的请求参数

maxBatchRows

每批次导入数据的最大行数

maxBatchByteSize

每批次导入数据的最大数据量

labelPrefix

每批次导入任务的 label 前缀

lineDelimiter

每批次数据包含多行,,每行的的分隔符

connectTimeout

Stream Load单次请求的超时时间, 单位毫秒

执行任务

python /opt/app/datax/bin/datax.py  /opt/app/datax/bin/mysqlToDoris.json -p "-DStartTime=1546272000000 -DEndTime=1650791448000"

3. 性能测试

mysql events表数据量有 8310077条

Doris FE一台机器,BE三台

CPU 8核,内存64G

测试A

配置

core.transport.channel.speed.byte

1048576

core.transport.channel.speed.record

10000

doriswriter maxBatchByteSize

104857600

doriswriter maxBatchRows

10000

测试B

配置

core.transport.channel.speed.byte

1048576

core.transport.channel.speed.record

100000

doriswriter maxBatchByteSize

104857600

doriswriter maxBatchRows

100000

测试C

配置

core.transport.channel.speed.byte

104857600

core.transport.channel.speed.record

100000

doriswriter maxBatchByteSize

104857600

doriswriter maxBatchRows

100000

测试D

配置

core.transport.channel.speed.byte

104857600

core.transport.channel.speed.record

500000

doriswriter maxBatchByteSize

104857600

doriswriter maxBatchRows

500000

挂了

测试E

配置

说明

core.transport.channel.speed.byte

104857600

core.transport.channel.speed.record

200000

doriswriter maxBatchByteSize

104857600

doriswriter maxBatchRows

200000

对于我用的测试机器,基本上维持高值3M左右

4。 Bug 记录

1. 配置beLoadUrl地址

注意FE,BE对应版本的地址与端口号

2. 建表语句

建Doris表的时候没有报错问题,但是执行插入的时候数据进不去。

原来的建表语句

CREATE TABLE events
(
  `id` bigint,
  `user_id` bigint ,
  `group_type` int,
  `group_id` int,
  `event_name` varchar(45),
  `event_count` int,
  `event_time` bigint,
  `created_time` bgint,
  `updated_time` bigint
)
DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id, event_name,group_id) BUCKETS 6;

修改后的

CREATE TABLE mars_micro_user_events
(
  `id` bigint,
  `user_id` bigint ,
  `group_type` int,
  `group_id` int,
  `event_name` varchar(45),
  `event_count` int,
  `event_time` bigint,
  `created_time` bigint,
  `updated_time` bigint
)
DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 6

首先了解DUPLICATE KEY 数据完全按照导入文件中的数据进行存储,不会有任何聚合。即使两行数据完全相同,也都会保留。 而在建表语句中指定的 DUPLICATE KEY,只是用来指明底层数据按照那些列进行排序。

分桶列可以是多列,但必须为Key 列。在此建表语句(DUPLICATE KEY)模型下,建表是不会报错的,如果选择的是其他聚合的模型则会直接报错。

有关DataX(MySQL同步数据到Doris)的更多相关文章

  1. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  2. ruby - Ruby 有 `Pair` 数据类型吗? - 2

    有时我需要处理键/值数据。我不喜欢使用数组,因为它们在大小上没有限制(很容易不小心添加超过2个项目,而且您最终需要稍后验证大小)。此外,0和1的索引变成了魔数(MagicNumber),并且在传达含义方面做得很差(“当我说0时,我的意思是head...”)。散列也不合适,因为可能会不小心添加额外的条目。我写了下面的类来解决这个问题:classPairattr_accessor:head,:taildefinitialize(h,t)@head,@tail=h,tendend它工作得很好并且解决了问题,但我很想知道:Ruby标准库是否已经带有这样一个类? 最佳

  3. ruby - 我如何添加二进制数据来遏制 POST - 2

    我正在尝试使用Curbgem执行以下POST以解析云curl-XPOST\-H"X-Parse-Application-Id:PARSE_APP_ID"\-H"X-Parse-REST-API-Key:PARSE_API_KEY"\-H"Content-Type:image/jpeg"\--data-binary'@myPicture.jpg'\https://api.parse.com/1/files/pic.jpg用这个:curl=Curl::Easy.new("https://api.parse.com/1/files/lion.jpg")curl.multipart_form_

  4. 世界前沿3D开发引擎HOOPS全面讲解——集3D数据读取、3D图形渲染、3D数据发布于一体的全新3D应用开发工具 - 2

    无论您是想搭建桌面端、WEB端或者移动端APP应用,HOOPSPlatform组件都可以为您提供弹性的3D集成架构,同时,由工业领域3D技术专家组成的HOOPS技术团队也能为您提供技术支持服务。如果您的客户期望有一种在多个平台(桌面/WEB/APP,而且某些客户端是“瘦”客户端)快速、方便地将数据接入到3D应用系统的解决方案,并且当访问数据时,在各个平台上的性能和用户体验保持一致,HOOPSPlatform将帮助您完成。利用HOOPSPlatform,您可以开发在任何环境下的3D基础应用架构。HOOPSPlatform可以帮您打造3D创新型产品,HOOPSSDK包含的技术有:快速且准确的CAD

  5. 叮咚买菜基于 Apache Doris 统一 OLAP 引擎的应用实践 - 2

    导读:随着叮咚买菜业务的发展,不同的业务场景对数据分析提出了不同的需求,他们希望引入一款实时OLAP数据库,构建一个灵活的多维实时查询和分析的平台,统一数据的接入和查询方案,解决各业务线对数据高效实时查询和精细化运营的需求。经过调研选型,最终引入ApacheDoris作为最终的OLAP分析引擎,Doris作为核心的OLAP引擎支持复杂地分析操作、提供多维的数据视图,在叮咚买菜数十个业务场景中广泛应用。作者|叮咚买菜资深数据工程师韩青叮咚买菜创立于2017年5月,是一家专注美好食物的创业公司。叮咚买菜专注吃的事业,为满足更多人“想吃什么”而努力,通过美好食材的供应、美好滋味的开发以及美食品牌的孵

  6. FOHEART H1数据手套驱动Optitrack光学动捕双手运动(Unity3D) - 2

    本教程将在Unity3D中混合Optitrack与数据手套的数据流,在人体运动的基础上,添加双手手指部分的运动。双手手背的角度仍由Optitrack提供,数据手套提供双手手指的角度。 01  客户端软件分别安装MotiveBody与MotionVenus并校准人体与数据手套。MotiveBodyMotionVenus数据手套使用、校准流程参照:https://gitee.com/foheart_1/foheart-h1-data-summary.git02  数据转发打开MotiveBody软件的Streaming,开始向Unity3D广播数据;MotionVenus中设置->选项选择Unit

  7. 使用canal同步MySQL数据到ES - 2

    文章目录一、概述简介原理模块二、配置Mysql使用版本环境要求1.操作系统2.mysql要求三、配置canal-server离线下载在线下载上传解压修改配置单机配置集群配置分库分表配置1.修改全局配置2.实例配置垂直分库水平分库3.修改group-instance.xml4.启动监听四、配置canal-adapter1修改启动配置2配置映射文件3启动ES数据同步查询所有订阅同步数据同步开关启动4.验证五、配置canal-admin一、概述简介canal是Alibaba旗下的一款开源项目,Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。Git地址:https://github.co

  8. ruby-on-rails - 创建 ruby​​ 数据库时惰性符号绑定(bind)失败 - 2

    我正在尝试在Rails上安装ruby​​,到目前为止一切都已安装,但是当我尝试使用rakedb:create创建数据库时,我收到一个奇怪的错误:dyld:lazysymbolbindingfailed:Symbolnotfound:_mysql_get_client_infoReferencedfrom:/Library/Ruby/Gems/1.8/gems/mysql2-0.3.11/lib/mysql2/mysql2.bundleExpectedin:flatnamespacedyld:Symbolnotfound:_mysql_get_client_infoReferencedf

  9. STM32读取串口传感器数据(颗粒物传感器,主动上传) - 2

    文章目录1.开发板选择*用到的资源2.串口通信(个人理解)3.代码分析(注释比较详细)1.主函数2.串口1配置3.串口2配置以及中断函数4.注意问题5.源码链接1.开发板选择我用的是STM32F103RCT6的板子,不过代码大概在F103系列的板子上都可以运行,我试过在野火103的霸道板上也可以,主要看一下串口对应的引脚一不一样就行了,不一样的就更改一下。*用到的资源keil5软件这里用到了两个串口资源,采集数据一个,串口通信一个,板子对应引脚如下:串口1,TX:PA9,RX:PA10串口2,TX:PA2,RX:PA32.串口通信(个人理解)我就从串口采集传感器数据这个过程说一下我自己的理解,

  10. SPI接收数据异常问题总结 - 2

    SPI接收数据左移一位问题目录SPI接收数据左移一位问题一、问题描述二、问题分析三、探究原理四、经验总结最近在工作在学习调试SPI的过程中遇到一个问题——接收数据整体向左移了一位(1bit)。SPI数据收发是数据交换,因此接收数据时从第二个字节开始才是有效数据,也就是数据整体向右移一个字节(1byte)。请教前辈之后也没有得到解决,通过在网上查阅前人经验终于解决问题,所以写一个避坑经验总结。实际背景:MCU与一款芯片使用spi通信,MCU作为主机,芯片作为从机。这款芯片采用的是它规定的六线SPI,多了两根线:RDY和INT,这样从机就可以主动请求主机给主机发送数据了。一、问题描述根据从机芯片手

随机推荐