DataX是阿里巴巴开源的离线数据同步工具,实现了包括主流RDBMS数据库、NoSQL、大数据计算系统在内的多种异构数据源之间高效进行数据同步的功能。

为了解决异构数据源的同步问题,DataX将复杂的网状同步链路优化成了星型数据链路,由DataX作为中间传输载体来负责连接各种数据源,以此来降低整个异构数据源同步链路的复杂度。当需要新接入一个数据源的时候,只需要考虑将该新的数据源对接到DataX即可,就能跟已有的所有数据源无缝同步。

DataX由FrameWork+Plugin的形式构建,数据源的读取和写入分别Reader和Writer实现:

DataX的工作模式是单机多线程形式,不支持分布式的方式,这是它和其它数据同步工具的重要区别之一。
每一个数据同步作业,我们称之为Job,在DataX收到一个Job之后,就启动一个进程来完成整个作业的过程。如果有多个数据同步作业需要同时执行,那就要么排队等待,要么再启动一个DataX进程。Job模块是作业的中枢管理节点,承担了数据清理、子任务切分、TaskGroup管理等功能。
Job启动后,会根据不同的源端切分策略,将Job切分为多个小的任务Task,以便于多线程并发执行它们。Task便是DataX中的最小执行单元,每一个Task都负责一部分数据的同步工作。
切分好Task之后,Job会调用Scheduler模块,根据配置的并发任务数将Task重新组合,组装成TaskGroup,每一个TaskGroup负责以一定的并发度来运行分配好的所有Task,默认情况下的并发度为5。
每一个Task都有TaskGroup负责启动和控制,Task启动后,会按照上图中介绍的Reader-Channel-Writer来完成其对应的数据同步工作。
Job负责监控并等待多个TaskGroup任务完成后就退出,否则异常退出。
DataX还具有如下的核心优势:
和其它大数据ETL工具相比:
| 功能 | DataX | Sqoop |
|---|---|---|
| 运行模式 | 单进程多线程 | MR分布式 |
| MySQL读写 | 单机压力大,读写粒度容易控制 | MR模式重,出错后处理麻烦 |
| Hive读写 | 单机压力大 | 很好 |
| 文件格式 | orc支持 | orc不支持,可添加 |
| 分布式 | 不支持,可通过调度框架规避 | 支持 |
| 流控 | 有 | 无,需要定制 |
| 统计信息 | 有 | 无,分布式的数据搜集不方便 |
| 数据校验 | 有 | 无,分布式的数据搜集不方便 |
| 监控 | 有 | 无,需要定制 |
如果需要支持的数据源比较多,建议使用DataX,如果数据来源比较单一,且只是要导入到HDFS,流程很简单,可以考虑使用Sqoop。
进入到datax的bin目录下,运行自带的示例:
python datax.py /root/datax/datax/job/job.json
运行后控制台显示运行成功即表示DataX安装完成。
当你决定使用某个Reader和Writer之后,我们可以通过命令来获取一个模板Json:
# 指定需要的Reader和Writer来获取模板Json
python datax.py -r streamreader -w streamwriter
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
# 指定需要的Reader和Writer的文档给地址
Please refer to the streamreader document:
https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md
Please refer to the streamwriter document:
https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md
# 说明定制好的Json如何启动
Please save the following configuration as a json file and use
python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
# 指定列信息
"column": [],
# 指定记录条数
"sliceRecordCount": ""
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "",
# 是否将数据打印到控制台
"print": true
}
}
}
],
"setting": {
"speed": {
# 指定并发度,可以是channel、bytes、records三种类型
"channel": ""
}
}
}
}
大体分为四个部分:
我们复制如上json内容,新建一个stream.json放到/root/zx-test下面,内容设置如下:
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"type":"string",
"value":"zx-test"
},
{
"type":"string",
"value":"999"
}
],
"sliceRecordCount": "10"
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
然后输入命令执行:
python datax.py /root/zx-test/stream.json
...
zx-test 999
zx-test 999
zx-test 999
zx-test 999
zx-test 999
zx-test 999
zx-test 999
zx-test 999
zx-test 999
zx-test 999
2023-01-14 11:00:32.389 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[101]ms
2023-01-14 11:00:32.390 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] completed it's tasks.
2023-01-14 11:00:42.283 [job-0] INFO StandAloneJobContainerCommunicator - Total 10 records, 100 bytes | Speed 10B/s, 1 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2023-01-14 11:00:42.284 [job-0] INFO AbstractScheduler - Scheduler accomplished all tasks.
2023-01-14 11:00:42.284 [job-0] INFO JobContainer - DataX Writer.Job [streamwriter] do post work.
2023-01-14 11:00:42.284 [job-0] INFO JobContainer - DataX Reader.Job [streamreader] do post work.
2023-01-14 11:00:42.284 [job-0] INFO JobContainer - DataX jobId [0] completed successfully.
...
前提条件:
首先,我们创建一张数据表,并创建一些数据:
CREATE TABLE `zx_user` (
`user_id` bigint(20) NOT NULL COMMENT '用户ID',
`user_name` varchar(30) DEFAULT NULL COMMENT '用户姓名',
`age` int(11) DEFAULT NULL COMMENT '用户年龄',
`user_email` varchar(50) DEFAULT NULL COMMENT '用户邮箱',
`create_by` varchar(100) DEFAULT NULL,
`create_date` datetime DEFAULT NULL,
`update_by` varchar(100) DEFAULT NULL,
`update_date` datetime DEFAULT NULL,
`deleted` int(11) DEFAULT '0' COMMENT '0-未删除;1-已删除',
PRIMARY KEY (`user_id`),
UNIQUE KEY `zx_user_un` (`user_id`),
UNIQUE KEY `zx_user_un2` (`user_email`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
INSERT INTO zx_user (user_id,user_name,age,user_email,create_by,create_date,update_by,update_date,deleted) VALUES
(1,'Jone',18,'test1@baomidou.com',NULL,NULL,NULL,NULL,0),
(2,'Jack',20,'test2@baomidou.com',NULL,NULL,NULL,NULL,0),
(5,'Anna',27,'test5@baomidou.com',NULL,NULL,NULL,NULL,0),
(7,'Anna',27,'test7@baomidou.com','System','2021-06-17 14:36:03','System','2021-06-17 14:54:51',0);
然后我们需要获取一个mysql-hdfs案例的json示例:python datax.py -r mysqlreader -w hdfswriter,并根据实际情况进行修改:
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"user_id",
"user_name",
"age",
"user_email"
],
"connection": [
{
"jdbcUrl": [
"jdbc:mysql://x.x.x.x:3306/zhangxun"
],
"table": [
"zx_user"
]
}
],
"password": "******",
"username": "root",
"where": ""
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name":"user_id",
"type":"BIGINT"
},
{
"name":"user_name",
"type":"STRING"
},
{
"name":"age",
"type":"INT"
},
{
"name":"user_email",
"type":"STRING"
}
],
"compress": "NONE",
"defaultFS": "hdfs://x.x.x.x:9000",
"fieldDelimiter": ",",
"fileName": "zx_user",
"fileType": "text",
"path": "/zx/datax",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
关于mysqlreader和hdfswriter的详细参数配置项可以参考官方对应插件的文档,上面写的都很详细。
然后我们执行命令开始Job:
python datax.py /root/zx-test/mysql2hdfs.json
...
2023-01-14 15:03:15.319 [job-0] INFO JobContainer - DataX jobId [0] completed successfully.
...
2023-01-14 15:03:15.422 [job-0] INFO JobContainer -
任务启动时刻 : 2023-01-14 15:03:04
任务结束时刻 : 2023-01-14 15:03:15
任务总计耗时 : 11s
任务平均流量 : 10B/s
记录写入速度 : 0rec/s
读出记录总数 : 4
读写失败总数 : 0
显示执行成功了,我们打开HDFS文件浏览器查看文件确实已经存在了:

我们查看其中的内容为:
[root@bigdata01 hadoop]# hdfs dfs -cat /zx/datax/zx_user__d81bd99e_0d6f_45a1_9a80_04ca475fc83d
1,Jone,18,test1@baomidou.com
2,Jack,20,test2@baomidou.com
5,Anna,27,test5@baomidou.com
7,Anna,27,test7@baomidou.com
在以上3.3案例的基础上,我们首先获取一个案例json,python datax.py -r hdfswriter -w mysqlreader,然后修改成需要的:
{
"job": {
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"column": ["*"],
"defaultFS": "hdfs://x.x.x.x:9000",
"encoding": "UTF-8",
"fieldDelimiter": ",",
"fileType": "text",
"path": "/zx/datax/zx_user.txt"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": [
"user_id",
"user_name",
"age",
"user_email"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://x.x.x.x:3306/zhangxun",
"table": [
"zx_user"
]
}
],
"password": "******",
"preSql": [],
"session": [],
"username": "root",
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
然后,我们准备一个zx_user.txt,编辑内容如下:
11,slide,21,slide@baomidou.com
21,mify,20,mify@baomidou.com
51,kitty,27,kitty@baomidou.com
[root@bigdata01 hadoop]# hdfs dfs -put /root/zx-test/zx_user.txt /zx/datax
[root@bigdata01 hadoop]# hdfs dfs -cat /zx/datax/zx_user.txt
11,slide,21,slide@baomidou.com
21,mify,20,mify@baomidou.com
51,kitty,27,kitty@baomidou.com
如此将需要同步到MySQL数据库的数据文件准备好了,然后,我们执行同步命令:
python datax.py /root/zx-test/hdfs2mysql.json
...
2023-01-14 17:22:04.815 [job-0] INFO JobContainer - DataX Writer.Job [mysqlwriter] do post work.
2023-01-14 17:22:04.815 [job-0] INFO JobContainer - DataX Reader.Job [hdfsreader] do post work.
2023-01-14 17:22:04.816 [job-0] INFO JobContainer - DataX jobId [0] completed successfully.
...
任务启动时刻 : 2023-01-14 17:21:53
任务结束时刻 : 2023-01-14 17:22:04
任务总计耗时 : 11s
任务平均流量 : 7B/s
记录写入速度 : 0rec/s
读出记录总数 : 3
读写失败总数 : 0
Job显示执行成功了,我们查询下数据库表中的数据,发现数据为:
1 Jone 18 test1@baomidou.com 0
2 Jack 20 test2@baomidou.com 0
5 Anna 27 test5@baomidou.com 0
7 Anna 27 test7@baomidou.com System 2021-06-17 14:36:03 System 2021-06-17 14:54:51 0
11 slide 21 slide@baomidou.com 0
21 mify 20 mify@baomidou.com 0
51 kitty 27 kitty@baomidou.com 0
如此表示导入成功了。
DataX由于是阿里巴巴开源的,中文文档比较完善,各种插件的说明教程也很全,基本很容易上手,本文旨在帮助入门,一些高级主题还需要另外实验和学习。
参考文档:
alibaba/DataX: DataX是阿里云DataWorks数据集成的开源版本。 (github.com)
DataX/introduction.md at master · alibaba/DataX (github.com)
如何正确创建Rails迁移,以便将表更改为MySQL中的MyISAM?目前是InnoDB。运行原始执行语句会更改表,但它不会更新db/schema.rb,因此当在测试环境中重新创建表时,它会返回到InnoDB并且我的全文搜索失败。我如何着手更改/添加迁移,以便将现有表修改为MyISAM并更新schema.rb,以便我的数据库和相应的测试数据库得到相应更新? 最佳答案 我没有找到执行此操作的好方法。您可以像有人建议的那样更改您的schema.rb,然后运行:rakedb:schema:load,但是,这将覆盖您的数据。我的做法是(假设
我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i
使用带有Rails插件的vim,您可以创建一个迁移文件,然后一次性打开该文件吗?textmate也可以这样吗? 最佳答案 你可以使用rails.vim然后做类似的事情::Rgeneratemigratonadd_foo_to_bar插件将打开迁移生成的文件,这正是您想要的。我不能代表textmate。 关于ruby-使用VimRails,您可以创建一个新的迁移文件并一次性打开它吗?,我们在StackOverflow上找到一个类似的问题: https://sta
有时我需要处理键/值数据。我不喜欢使用数组,因为它们在大小上没有限制(很容易不小心添加超过2个项目,而且您最终需要稍后验证大小)。此外,0和1的索引变成了魔数(MagicNumber),并且在传达含义方面做得很差(“当我说0时,我的意思是head...”)。散列也不合适,因为可能会不小心添加额外的条目。我写了下面的类来解决这个问题:classPairattr_accessor:head,:taildefinitialize(h,t)@head,@tail=h,tendend它工作得很好并且解决了问题,但我很想知道:Ruby标准库是否已经带有这样一个类? 最佳
我正在尝试使用Curbgem执行以下POST以解析云curl-XPOST\-H"X-Parse-Application-Id:PARSE_APP_ID"\-H"X-Parse-REST-API-Key:PARSE_API_KEY"\-H"Content-Type:image/jpeg"\--data-binary'@myPicture.jpg'\https://api.parse.com/1/files/pic.jpg用这个:curl=Curl::Easy.new("https://api.parse.com/1/files/lion.jpg")curl.multipart_form_
无论您是想搭建桌面端、WEB端或者移动端APP应用,HOOPSPlatform组件都可以为您提供弹性的3D集成架构,同时,由工业领域3D技术专家组成的HOOPS技术团队也能为您提供技术支持服务。如果您的客户期望有一种在多个平台(桌面/WEB/APP,而且某些客户端是“瘦”客户端)快速、方便地将数据接入到3D应用系统的解决方案,并且当访问数据时,在各个平台上的性能和用户体验保持一致,HOOPSPlatform将帮助您完成。利用HOOPSPlatform,您可以开发在任何环境下的3D基础应用架构。HOOPSPlatform可以帮您打造3D创新型产品,HOOPSSDK包含的技术有:快速且准确的CAD
本教程将在Unity3D中混合Optitrack与数据手套的数据流,在人体运动的基础上,添加双手手指部分的运动。双手手背的角度仍由Optitrack提供,数据手套提供双手手指的角度。 01 客户端软件分别安装MotiveBody与MotionVenus并校准人体与数据手套。MotiveBodyMotionVenus数据手套使用、校准流程参照:https://gitee.com/foheart_1/foheart-h1-data-summary.git02 数据转发打开MotiveBody软件的Streaming,开始向Unity3D广播数据;MotionVenus中设置->选项选择Unit
文章目录一、概述简介原理模块二、配置Mysql使用版本环境要求1.操作系统2.mysql要求三、配置canal-server离线下载在线下载上传解压修改配置单机配置集群配置分库分表配置1.修改全局配置2.实例配置垂直分库水平分库3.修改group-instance.xml4.启动监听四、配置canal-adapter1修改启动配置2配置映射文件3启动ES数据同步查询所有订阅同步数据同步开关启动4.验证五、配置canal-admin一、概述简介canal是Alibaba旗下的一款开源项目,Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。Git地址:https://github.co
C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.
1.postman介绍Postman一款非常流行的API调试工具。其实,开发人员用的更多。因为测试人员做接口测试会有更多选择,例如Jmeter、soapUI等。不过,对于开发过程中去调试接口,Postman确实足够的简单方便,而且功能强大。2.下载安装官网地址:https://www.postman.com/下载完成后双击安装吧,安装过程极其简单,无需任何操作3.使用教程这里以百度为例,工具使用简单,填写URL地址即可发送请求,在下方查看响应结果和响应状态码常用方法都有支持请求方法:getpostputdeleteGet、Post、Put与Delete的作用get:请求方法一般是用于数据查询,