传统大数据平台的组织架构是针对离线数据处理需求设计的,常用的数据导入方式为采用sqoop定时作业批量导入。随着数据分析对实时性要求不断提高,按小时、甚至分钟级的数据同步越来越普遍。由此展开了基于spark/flink流处理机制的(准)实时同步系统的开发。
然而实时同步从一开始就面临如下几个挑战:
Hudi就是针对以上问题的解决方案之一。使用Hudi自带的DeltaStreamer工具写数据到Hudi,开启–enable-hive-sync 即可同步数据到hive表。
HoodieDeltaStreamer实用工具 (hudi-utilities-bundle中的一部分) 提供了从DFS或Kafka等不同来源进行摄取的方式,并具有以下功能。

生产库MySQL原始数据:

对接步骤具体参考:https://fusioninsight.github.io/ecosystem/zh-hans/Data_Integration/DEBEZIUM/
完成对接后,针对MySQL生产库分别做增、改、删除操作对应的kafka消息
增加操作: insert into hudi.hudisource3 values (11,“蒋语堂”,“38”,“女”,“图”,“播放器”,“28732”);
对应kafka消息体:

更改操作: UPDATE hudi.hudisource3 SET uname=‘Anne Marie333’ WHERE uid=11;
对应kafka消息体:

删除操作: delete from hudi.hudisource3 where uid=11;
对应kafka消息体:

华为云MRS Hudi样例工程获取
根据实际MRS版本登录github获取样例代码: https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.1.0
打开工程SparkOnHudiJavaExample

样例代码修改及介绍

1.debeziumJsonParser
说明:对debezium的消息体进行解析,获取到op字段。
源码如下:
package com.huawei.bigdata.hudi.examples;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
public class debeziumJsonParser {
public static String getOP(String message){
JSONObject json_obj = JSON.parseObject(message);
String op = json_obj.getJSONObject("payload").get("op").toString();
return op;
}
}
2.MyJsonKafkaSource
说明:DeltaStreamer默认使用org.apache.hudi.utilities.sources.JsonKafkaSource消费kafka指定topic的数据,如果消费阶段涉及数据的解析操作,则需要重写MyJsonKafkaSource进行处理。
以下是源码,增加注释
package com.huawei.bigdata.hudi.examples;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.JsonSource;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;
import java.util.Map;
/**
* Read json kafka data.
*/
public class MyJsonKafkaSource extends JsonSource {
private static final Logger LOG = LogManager.getLogger(MyJsonKafkaSource.class);
private final KafkaOffsetGen offsetGen;
private final HoodieDeltaStreamerMetrics metrics;
public MyJsonKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(properties, sparkContext, sparkSession, schemaProvider);
HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder();
this.metrics = new HoodieDeltaStreamerMetrics(builder.withProperties(properties).build());
properties.put("key.deserializer", StringDeserializer.class);
properties.put("value.deserializer", StringDeserializer.class);
offsetGen = new KafkaOffsetGen(properties);
}
@Override
protected InputBatch<JavaRDD<String>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {
OffsetRange[] offsetRanges = offsetGen.getNextOffsetRanges(lastCheckpointStr, sourceLimit, metrics);
long totalNewMsgs = CheckpointUtils.totalNewMessages(offsetRanges);
LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
if (totalNewMsgs <= 0) {
return new InputBatch<>(Option.empty(), CheckpointUtils.offsetsToStr(offsetRanges));
}
JavaRDD<String> newDataRDD = toRDD(offsetRanges);
return new InputBatch<>(Option.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges));
}
private JavaRDD<String> toRDD(OffsetRange[] offsetRanges) {
return KafkaUtils.createRDD(this.sparkContext, this.offsetGen.getKafkaParams(), offsetRanges, LocationStrategies.PreferConsistent()).filter((x)->{
//过滤空行和脏数据
String msg = (String)x.value();
if (msg == null) {
return false;
}
try{
String op = debeziumJsonParser.getOP(msg);
}catch (Exception e){
return false;
}
return true;
}).map((x) -> {
//将debezium接进来的数据解析写进map,在返回map的tostring, 这样结构改动最小
String msg = (String)x.value();
String op = debeziumJsonParser.getOP(msg);
JSONObject json_obj = JSON.parseObject(msg, Feature.OrderedField);
Boolean is_delete = false;
String out_str = "";
Object out_obj = new Object();
if(op.equals("c")){
out_obj = json_obj.getJSONObject("payload").get("after");
}
else if(op.equals("u")){
out_obj = json_obj.getJSONObject("payload").get("after");
}
else {
is_delete = true;
out_obj = json_obj.getJSONObject("payload").get("before");
}
Map out_map = (Map)out_obj;
out_map.put("_hoodie_is_deleted",is_delete);
out_map.put("op",op);
return out_map.toString();
});
}
}
3.TransformerExample
说明: 入湖hudi表或者hive表时候需要指定的字段
以下是源码,增加注释
package com.huawei.bigdata.hudi.examples;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.utilities.transform.Transformer;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
/**
* 功能描述
* 对获取的数据进行format
*/
public class TransformerExample implements Transformer, Serializable {
/**
* format data
*
* @param JavaSparkContext jsc
* @param SparkSession sparkSession
* @param Dataset<Row> rowDataset
* @param TypedProperties properties
* @return Dataset<Row>
*/
@Override
public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,
TypedProperties properties) {
JavaRDD<Row> rowJavaRdd = rowDataset.toJavaRDD();
List<Row> rowList = new ArrayList<>();
for (Row row : rowJavaRdd.collect()) {
Row one_row = buildRow(row);
rowList.add(one_row);
}
JavaRDD<Row> stringJavaRdd = jsc.parallelize(rowList);
List<StructField> fields = new ArrayList<>();
builFields(fields);
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = sparkSession.createDataFrame(stringJavaRdd, schema);
return dataFrame;
}
private void builFields(List<StructField> fields) {
fields.add(DataTypes.createStructField("uid", DataTypes.IntegerType, true));
fields.add(DataTypes.createStructField("uname", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("age", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("sex", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("mostlike", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("lastview", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("totalcost", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("_hoodie_is_deleted", DataTypes.BooleanType, true));
fields.add(DataTypes.createStructField("op", DataTypes.StringType, true));
}
private Row buildRow(Row row) {
Integer uid = row.getInt(0);
String uname = row.getString(1);
String age = row.getString(2);
String sex = row.getString(3);
String mostlike = row.getString(4);
String lastview = row.getString(5);
String totalcost = row.getString(6);
Boolean _hoodie_is_deleted = row.getBoolean(7);
String op = row.getString(8);
Row returnRow = RowFactory.create(uid, uname, age, sex, mostlike, lastview, totalcost, _hoodie_is_deleted, op);
return returnRow;
}
}
4.DataSchemaProviderExample
说明: 分别指定MyJsonKafkaSource返回的数据格式为source schema,TransformerExample写入的数据格式为target schema
以下是源码
package com.huawei.bigdata.hudi.examples;
import org.apache.avro.Schema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.spark.api.java.JavaSparkContext;
/**
* 功能描述
* 提供sorce和target的schema
*/
public class DataSchemaProviderExample extends SchemaProvider {
public DataSchemaProviderExample(TypedProperties props, JavaSparkContext jssc) {
super(props, jssc);
}
/**
* source schema
*
* @return Schema
*/
@Override
public Schema getSourceSchema() {
Schema avroSchema = new Schema.Parser().parse(
"{\"type\":\"record\",\"name\":\"hoodie_source\",\"fields\":[{\"name\":\"uid\",\"type\":\"int\"},{\"name\":\"uname\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"string\"},{\"name\":\"sex\",\"type\":\"string\"},{\"name\":\"mostlike\",\"type\":\"string\"},{\"name\":\"lastview\",\"type\":\"string\"},{\"name\":\"totalcost\",\"type\":\"string\"},{\"name\":\"_hoodie_is_deleted\",\"type\":\"boolean\"},{\"name\":\"op\",\"type\":\"string\"}]}");
return avroSchema;
}
/**
* target schema
*
* @return Schema
*/
@Override
public Schema getTargetSchema() {
Schema avroSchema = new Schema.Parser().parse(
"{\"type\":\"record\",\"name\":\"mytest_record\",\"namespace\":\"hoodie.mytest\",\"fields\":[{\"name\":\"uid\",\"type\":\"int\"},{\"name\":\"uname\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"string\"},{\"name\":\"sex\",\"type\":\"string\"},{\"name\":\"mostlike\",\"type\":\"string\"},{\"name\":\"lastview\",\"type\":\"string\"},{\"name\":\"totalcost\",\"type\":\"string\"},{\"name\":\"_hoodie_is_deleted\",\"type\":\"boolean\"},{\"name\":\"op\",\"type\":\"string\"}]}");
return avroSchema;
}
}
将工程打包(hudi-security-examples-0.7.0.jar)以及json解析包(fastjson-1.2.4.jar)上传至MRS客户端
DeltaStreamer启动命令
登录客户端执行一下命令获取环境变量以及认证
source /opt/hadoopclient/bigdata_env
kinit developuser
source /opt/hadoopclient/Hudi/component_env
DeltaStreamer启动命令如下:
spark-submit --master yarn-client \
--jars /opt/hudi-demo2/fastjson-1.2.4.jar,/opt/hudi-demo2/hudi-security-examples-0.7.0.jar \
--driver-class-path /opt/hadoopclient/Hudi/hudi/conf:/opt/hadoopclient/Hudi/hudi/lib/*:/opt/hadoopclient/Spark2x/spark/jars/*:/opt/hudi-demo2/hudi-security-examples-0.7.0.jar \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
spark-internal --props file:///opt/hudi-demo2/kafka-source.properties \
--target-base-path /tmp/huditest/delta_demo2 \
--table-type COPY_ON_WRITE \
--target-table delta_demo2 \
--source-ordering-field uid \
--source-class com.huawei.bigdata.hudi.examples.MyJsonKafkaSource \
--schemaprovider-class com.huawei.bigdata.hudi.examples.DataSchemaProviderExample \
--transformer-class com.huawei.bigdata.hudi.examples.TransformerExample \
--enable-hive-sync --continuous
kafka.properties配置
// hudi配置
hoodie.datasource.write.recordkey.field=uid
hoodie.datasource.write.partitionpath.field=
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator
hoodie.datasource.write.hive_style_partitioning=true
hoodie.delete.shuffle.parallelism=10
hoodie.upsert.shuffle.parallelism=10
hoodie.bulkinsert.shuffle.parallelism=10
hoodie.insert.shuffle.parallelism=10
hoodie.finalize.write.parallelism=10
hoodie.cleaner.parallelism=10
hoodie.datasource.write.precombine.field=uid
hoodie.base.path = /tmp/huditest/delta_demo2
hoodie.timeline.layout.version = 1
// hive config
hoodie.datasource.hive_sync.table=delta_demo2
hoodie.datasource.hive_sync.partition_fields=
hoodie.datasource.hive_sync.assume_date_partitioning=false
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor
hoodie.datasource.hive_sync.use_jdbc=false
// Kafka Source topic
hoodie.deltastreamer.source.kafka.topic=hudisource
// checkpoint
hoodie.deltastreamer.checkpoint.provider.path=hdfs://hacluster/tmp/delta_demo2/checkpoint/
// Kafka props
bootstrap.servers=172.16.9.117:21005
auto.offset.reset=earliest
group.id=a5
offset.rang.limit=10000
注意:kafka服务端配置 allow.everyone.if.no.acl.found 为true
使用Spark查询
spark-shell --master yarn
val roViewDF = spark.read.format("org.apache.hudi").load("/tmp/huditest/delta_demo2/*")
roViewDF.createOrReplaceTempView("hudi_ro_table")
spark.sql("select * from hudi_ro_table").show()
Mysql增加操作对应spark中hudi表查询结果:

Mysql更新操作对应spark中hudi表查询结果:

删除操作:

使用Hive查询
beeline
select * from delta_demo2;
Mysql增加操作对应hive表中查询结果:

Mysql更新操作对应hive表中查询结果:

Mysql删除操作对应hive表中查询结果:

本文由华为云发布。
很好奇,就使用rubyonrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提
无论您是想搭建桌面端、WEB端或者移动端APP应用,HOOPSPlatform组件都可以为您提供弹性的3D集成架构,同时,由工业领域3D技术专家组成的HOOPS技术团队也能为您提供技术支持服务。如果您的客户期望有一种在多个平台(桌面/WEB/APP,而且某些客户端是“瘦”客户端)快速、方便地将数据接入到3D应用系统的解决方案,并且当访问数据时,在各个平台上的性能和用户体验保持一致,HOOPSPlatform将帮助您完成。利用HOOPSPlatform,您可以开发在任何环境下的3D基础应用架构。HOOPSPlatform可以帮您打造3D创新型产品,HOOPSSDK包含的技术有:快速且准确的CAD
导读:随着叮咚买菜业务的发展,不同的业务场景对数据分析提出了不同的需求,他们希望引入一款实时OLAP数据库,构建一个灵活的多维实时查询和分析的平台,统一数据的接入和查询方案,解决各业务线对数据高效实时查询和精细化运营的需求。经过调研选型,最终引入ApacheDoris作为最终的OLAP分析引擎,Doris作为核心的OLAP引擎支持复杂地分析操作、提供多维的数据视图,在叮咚买菜数十个业务场景中广泛应用。作者|叮咚买菜资深数据工程师韩青叮咚买菜创立于2017年5月,是一家专注美好食物的创业公司。叮咚买菜专注吃的事业,为满足更多人“想吃什么”而努力,通过美好食材的供应、美好滋味的开发以及美食品牌的孵
华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o
system-view进入系统视图quit退到系统视图sysname交换机命名vlan20创建vlan(进入vlan20)displayvlan显示vlanundovlan20删除vlan20displayvlan20显示vlan里的端口20Interfacee1/0/24进入端口24portlink-typeaccessvlan20把当前端口放入vlan20undoporte1/0/10删除当前VLAN端口10displaycurrent-configuration显示当前配置02配置交换机支持TELNETinterfacevlan1进入VLAN1ipaddress192.168.3.100
C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.
1.postman介绍Postman一款非常流行的API调试工具。其实,开发人员用的更多。因为测试人员做接口测试会有更多选择,例如Jmeter、soapUI等。不过,对于开发过程中去调试接口,Postman确实足够的简单方便,而且功能强大。2.下载安装官网地址:https://www.postman.com/下载完成后双击安装吧,安装过程极其简单,无需任何操作3.使用教程这里以百度为例,工具使用简单,填写URL地址即可发送请求,在下方查看响应结果和响应状态码常用方法都有支持请求方法:getpostputdeleteGet、Post、Put与Delete的作用get:请求方法一般是用于数据查询,
@作者:SYFStrive @博客首页:HomePage📜:微信小程序📌:个人社区(欢迎大佬们加入)👉:社区链接🔗📌:觉得文章不错可以点点关注👉:专栏连接🔗💃:感谢支持,学累了可以先看小段由小胖给大家带来的街舞👉微信小程序(🔥)目录自定义组件-behaviors 1、什么是behaviors 2、behaviors的工作方式 3、创建behavior 4、导入并使用behavior 5、behavior中所有可用的节点 6、同名字段的覆盖和组合规则总结最后自定义组件-behaviors 1、什么是behaviorsbehaviors是小程序中,用于实现
我认为我的问题最好用一个例子来描述。假设我有一个名为“Thing”的简单模型,它有一些简单数据类型的属性。像...Thing-foo:string-goo:string-bar:int这并不难。数据库表将包含具有这三个属性的三列,我可以使用@thing.foo或@thing.bar之类的东西访问它们。但我要解决的问题是当“foo”或“goo”不再包含在简单数据类型中时会发生什么?假设foo和goo代表相同类型的对象。也就是说,它们都是“Whazit”的实例,只是数据不同。所以现在事情可能看起来像这样......Thing-bar:int但是现在有一个新的模型叫做“Whazit”,看起来
我最喜欢的Google文档功能之一是它会在我工作时不断自动保存我的文档版本。这意味着即使我在进行关键更改之前忘记在某个点进行保存,也很有可能会自动创建一个保存点。至少,我可以将文档恢复到错误更改之前的状态,并从该点继续工作。对于在MacOS(或UNIX)上运行的Ruby编码器,是否有具有等效功能的工具?例如,一个工具会每隔几分钟自动将Gitcheckin我的本地存储库以获取我正在处理的文件。也许我有点偏执,但这点小保险可以让我在日常工作中安心。 最佳答案 虚拟机有些人可能讨厌我对此的回应,但我在编码时经常使用VIM,它具有自动保存功