各个组件的作用:
进入到plugins目录找到hdfsreader,将hadoop-0.19.2-core.jar删除,将本地库替换为$HAOOP_HOME2.x/lib/native/libhadoop.so。同时添加Hadoop2.x的依赖包,如下图:
另外,Datax需要hadoop1.x的hadoop-core.xml配置文件,但是hadoop2.x中不存在这个文件,这里有一个解决方法,就是将各个配置文件的配置项都集中写到一个新建的配置文件中,单独有datax使用,这个配置文件在datax的job xml文件由参数hadoop-conf配上。到现在为止,datax与hadoop2.x的兼容性修改已经完成了。 还要做其他环境的调整,确保java版本>=1.6,python的版本>=2.6(对于python的版本选择上,个人推荐2.6或者2.7,如果pytyon版本上到3.x的话会有错误,个人经验)。最后修改一下各个插件的rpm包的build路径:
下面以t_dp_datax_engine.spec为例子:
上面红色方框的地方是指build rpm 插件后新产生的文件夹位置,改为自己编辑的目录。下面以t_dp_datax_engine.spec为例子,看看怎么build rpm 插件:具体执行过程如下:1、请先check out一份DataX源码,并cd切换到DataX源码中的rpm目录2、编译打包DataX engine包,使用rpmbuild --ba t_dp_datax_engine.spec(请确保有root权限),打包生成的rpm后如下图所示
Rpm制作完成后,即可分发、安装,例如使用rpm -ivh t_dp_datax_engine.rpm即可安装DataX engine 包,需要注意的是engine的rpm地址源自于上图的截图中信息。如下图:
安装完成后,在/home/taobao/datax/目录下会存在如下文件:
接着选择export的目标源,这个我们选择0:
步骤2:根据自己的业务需求和HADOOP的相应环境配置产生的job xml,进入到$DATAX_HOME/jobs,编辑job配置文件,我的配置如下(里边的一些动态参数有下面我自己写的Shell中进行控制):<?xml version="1.0" encoding="UTF-8"?>
<jobs>
<job id="hdfsreader_to_mysqlwriter_job">
<reader>
<plugin>hdfsreader</plugin>
<!--
description:HDFS login account, e.g. 'username, groupname(groupname...),#password
mandatory:true
name:ugi
-->
<param key="hadoop.job.ugi" value="hadoop,supergroup#jpkjcluster"/>
<!--
description:hadoop-site.xml path
mandatory:false
name:hadoop_conf
-->
<param key="hadoop_conf" value="/data/hadoop/hadoop-2.2.0/etc/hadoop/datax_hadoop_conf.xml"/>
<!--
description:hdfs path, format like: hdfs://ip:port/path, or file:///home/taobao/
mandatory:true
name:dir
-->
<param key="dir" value="hdfs://172.16.8.1:8020/user/hive/warehouse/jl.db/${hdfs_table}/day=${export_day}"/>
<!--
default:\t
description:how to sperate a line
mandatory:false
name:fieldSplit
-->
<param key="field_split" value=","/>
<!--
default:UTF-8
range:UTF-8|GBK|GB2312
description:hdfs encode
mandatory:false
name:encoding
-->
<param key="encoding" value="UTF-8"/>
<!--
default:4096
range:[1024-4194304]
description:how large the buffer
mandatory:false
name:bufferSize
-->
<param key="buffer_size" value="4096"/>
<!--
default:\N
range:
description:replace the nullstring to null
mandatory:false
name:nullString
-->
<param key="nullstring" value="\N"/>
<!--
default:true
range:true|false
description:ingore key
mandatory:false
name:ignoreKey
-->
<param key="ignore_key" value="true"/>
<!--
default:
range:
description:how to filter column
mandatory:false
name:colFilter
<param key="col_filter" value="?"/>
-->
<!--
default:1
range:1-100
description:concurrency of the job
mandatory:false
name:concurrency
-->
<param key="concurrency" value="${reader_concurrency}"/>
</reader>
<writer>
<plugin>mysqlwriter</plugin>
<!--
description:Mysql database ip address
mandatory:true
name:ip
-->
<param key="ip" value="jl-master"/>
<!--
default:3306
description:Mysql database port
mandatory:true
name:port
-->
<param key="port" value="3306"/>
<!--
description:Mysql database name
mandatory:true
name:dbname
-->
<param key="dbname" value="newidigg_jilin"/>
<!--
description:Mysql database login username
mandatory:true
name:username
-->
<param key="username" value="hadoop"/>
<!--
description:Mysql database login password
mandatory:true
name:password
-->
<param key="password" value="jpkjcluster"/>
<!--
default:
range:
description:table to be dumped data into
mandatory:true
name:table
-->
<param key="table" value="${mysql_table}"/>
<!--
range:
description:order of columns
mandatory:false
name:colorder
<param key="colorder" value="?"/>
-->
<!--
default:UTF-8
range:UTF-8|GBK|GB2312
description:
mandatory:false
name:encoding
-->
<param key="encoding" value="UTF-8"/>
<!--
description:execute sql before dumping data
mandatory:false
name:pre
<param key="pre" value="${preSql}"/>
-->
<!--
description:execute sql after dumping data
mandatory:false
name:post
<param key="post" value="${postSql}"/>
-->
<!--
default:0
range:[0-65535]
description:error limit
mandatory:false
name:limit
-->
<param key="limit" value="0"/>
<!--
mandatory:false
name:set
<param key="set" value="?"/>
-->
<!--
default:false
range:[true/false]
mandatory:false
name:replace
-->
<param key="replace" value="false"/>
<!--
range:params1|params2|...
description:mysql driver params
mandatory:false
name:mysql.params
<param key="mysql.params" value="?"/>
-->
<!--
default:1
range:1-100
description:concurrency of the job
mandatory:false
-->
<param key="concurrency" value="${writer_concurrency}"/>
</writer>
</job>
</jobs>#!/bin/bash
#author:曾昭正
#create time:2014-08-14
workspace=`dirname $0`
dataxHome='/data/hadoop/datax'
export_day=$1
reader_concurrency=1
writer_concurrency=1
mysqlUser='hadoop'
mysqlPassword='jpkjcluster'
mysqlServerHost='jl-master'
currentDatabase='newidigg_jilin'
preSql=''
postSql=''
importTable=('tb_userview_domain_noMdn' 'tb_fact_app_v2' 'tb_fact_domain' 'tb_fact_tag' 'tb_fact_top5_www' 'tb_fact_upwww_time' \
'tb_fact_search' 'tb_userview_domain' 'tb_userview_kpi_order' 'tb_userview_search' 'tb_userview_time' 'tb_userview_tag');
#function which is used to DDL or DML msyql
function mysqlController(){
#这里注意一下:这里的$1不同于整个脚本的参数$1,这里是指函数的第一个参数
local sqlString=$*
echo `date +%Y-%m-%d" "%H:%M:%S` "执行:${sqlString}"
mysql -u ${mysqlUser} --password=${mysqlPassword} -h ${mysqlServerHost} -e "
use ${currentDatabase};
${sqlString};
"
}
#通用表导入模块
function commonImport(){
local current_table=$1
#create temporary table before importing data into mysql.
echo `date +%Y-%m-%d" "%H:%M:%S` "......进入处理${current_table}表入mysql库环节......"
echo `date +%Y-%m-%d" "%H:%M:%S` "入库前创建临时表"
preSql="drop table if exists ${current_table}_${export_day};create table ${current_table}_${export_day} like ${current_table}"
mysqlController ${preSql}
#import data from hdfs into msyql.
echo `date +%Y-%m-%d" "%H:%M:%S` "将hdfs的${current_table}表导入mysql...."
#调整mysql的导入线程数
writer_concurrency=2
#调用Datax将hdfs文件导入mysql
python ${dataxHome}/bin/datax.py ${dataxHome}/jobs/hdfsreader_to_mysqlwriter_1407525566122.xml -p"-Dhdfs_table=${current_table} -Dexport_day=${export_day} -Dreader_concurrency=${reader_concurrency} -Dwriter_concurrency=${writer_concurrency} -Dmysql_table=${current_table}_${export_day}"
#Updata Data Relationship after importing data into mysql.
if [ ${current_table} == "tb_userview_search" ]
then
postSql="drop table if exists ${current_table}; \
rename table ${current_table}_${export_day} to ${current_table}; \
CREATE INDEX mdn_index ON ${current_table}(mdn);
"
else
postSql="drop table if exists ${current_table}; \
rename table ${current_table}_${export_day} to ${current_table}; \
Alter table ${current_table} add primary key(mdn);
"
fi
mysqlController ${postSql}
echo `date +%Y-%m-%d" "%H:%M:%S` "......完成处理${current_table}表入mysql操作......"
}
for tableItem in ${importTable[*]}
do
if [ ${tableItem} == "tb_userview_domain" -o ${tableItem} == "tb_userview_kpi_order" -o ${tableItem} == "tb_userview_search" -o ${tableItem} == "tb_userview_time" -o ${tableItem} == "tb_userview_tag" ]
then
commonImport ${tableItem}
else
#delete dirty data
preSql="delete from ${tableItem} where day_id=${export_day};"
mysqlController ${preSql}
#调用Datax将hdfs文件导入mysql
python ${dataxHome}/bin/datax.py ${dataxHome}/jobs/hdfsreader_to_mysqlwriter_1407525566122.xml -p"-Dhdfs_table=${tableItem} -Dexport_day=${export_day} -Dreader_concurrency=${reader_concurrency} -Dwriter_concurrency=${writer_concurrency} -Dmysql_table=${tableItem}"
# >> ${workspace}/../logs/exportData.log
fi
done简单说说我这个shell脚本的用途,主要是对datax中的job配置文件的动态参数进行控制。另外,根据公司业务的不同需求,这十几个需要导入mysql的表其中有些表在导入之前和导入之后需要做不同的完善工作,这个通过这shell来控制。对于这个Shell脚本我是花了点时间进行重构的,功能点还是比较清晰、简洁的。步骤4: 执行脚本:nohup ./export_hdfs2mysql.sh 20140815 >> ./../idigg_task/logs/export.log & 大功告成。我在从html页面生成PDF时遇到问题。我正在使用PDFkit。在安装它的过程中,我注意到我需要wkhtmltopdf。所以我也安装了它。我做了PDFkit的文档所说的一切......现在我在尝试加载PDF时遇到了这个错误。这里是错误:commandfailed:"/usr/local/bin/wkhtmltopdf""--margin-right""0.75in""--page-size""Letter""--margin-top""0.75in""--margin-bottom""0.75in""--encoding""UTF-8""--margin-left""0.75in""-
给定这段代码defcreate@upgrades=User.update_all(["role=?","upgraded"],:id=>params[:upgrade])redirect_toadmin_upgrades_path,:notice=>"Successfullyupgradeduser."end我如何在该操作中实际验证它们是否已保存或未重定向到适当的页面和消息? 最佳答案 在Rails3中,update_all不返回任何有意义的信息,除了已更新的记录数(这可能取决于您的DBMS是否返回该信息)。http://ar.ru
我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t
对于具有离线功能的智能手机应用程序,我正在为Xml文件创建单向文本同步。我希望我的服务器将增量/差异(例如GNU差异补丁)发送到目标设备。这是计划:Time=0Server:hasversion_1ofXmlfile(~800kiB)Client:hasversion_1ofXmlfile(~800kiB)Time=1Server:hasversion_1andversion_2ofXmlfile(each~800kiB)computesdeltaoftheseversions(=patch)(~10kiB)sendspatchtoClient(~10kiBtransferred)Cl
我构建了两个需要相互通信和发送文件的Rails应用程序。例如,一个Rails应用程序会发送请求以查看其他应用程序数据库中的表。然后另一个应用程序将呈现该表的json并将其发回。我还希望一个应用程序将存储在其公共(public)目录中的文本文件发送到另一个应用程序的公共(public)目录。我从来没有做过这样的事情,所以我什至不知道从哪里开始。任何帮助,将不胜感激。谢谢! 最佳答案 无论Rails是什么,几乎所有Web应用程序都有您的要求,大多数现代Web应用程序都需要相互通信。但是有一个小小的理解需要你坚持下去,网站不应直接访问彼此
我尝试运行2.x应用程序。我使用rvm并为此应用程序设置其他版本的ruby:$rvmuseree-1.8.7-head我尝试运行服务器,然后出现很多错误:$script/serverNOTE:Gem.source_indexisdeprecated,useSpecification.Itwillberemovedonorafter2011-11-01.Gem.source_indexcalledfrom/Users/serg/rails_projects_terminal/work_proj/spohelp/config/../vendor/rails/railties/lib/r
刚入门rails,开始慢慢理解。有人可以解释或给我一些关于在application_controller中编码的好处或时间和原因的想法吗?有哪些用例。您如何为Rails应用程序使用应用程序Controller?我不想在那里放太多代码,因为据我了解,每个请求都会调用此Controller。这是真的? 最佳答案 ApplicationController实际上是您应用程序中的每个其他Controller都将从中继承的类(尽管这不是强制性的)。我同意不要用太多代码弄乱它并保持干净整洁的态度,尽管在某些情况下ApplicationContr
我花了三天的时间用头撞墙,试图弄清楚为什么简单的“rake”不能通过我的规范文件。如果您遇到这种情况:任何文件夹路径中都不要有空格!。严重地。事实上,从现在开始,您命名的任何内容都没有空格。这是我的控制台输出:(在/Users/*****/Desktop/LearningRuby/learn_ruby)$rake/Users/*******/Desktop/LearningRuby/learn_ruby/00_hello/hello_spec.rb:116:in`require':cannotloadsuchfile--hello(LoadError) 最佳
Sinatra新手;我正在运行一些rspec测试,但在日志中收到了一堆不需要的噪音。如何消除日志中过多的噪音?我仔细检查了环境是否设置为:test,这意味着记录器级别应设置为WARN而不是DEBUG。spec_helper:require"./app"require"sinatra"require"rspec"require"rack/test"require"database_cleaner"require"factory_girl"set:environment,:testFactoryGirl.definition_file_paths=%w{./factories./test/
关闭。这个问题需要detailsorclarity.它目前不接受答案。想改进这个问题吗?通过editingthispost添加细节并澄清问题.关闭8年前。Improvethisquestion在首页我有:汽车:VolvoSaabMercedesAudistatic_pages_spec.rb中的测试代码:it"shouldhavetherightselect"dovisithome_pathit{shouldhave_select('cars',:options=>['volvo','saab','mercedes','audi'])}end响应是rspec./spec/request