文章目录
CDC:全称是 Change Data Capture,即数据变更捕获技术,具体的含义是 通过识别和捕获对数据库中的数据所做的更改(包括数据或数据表的插入、更新、删除;数据库结构的变更调整等),然后将这些更改按发生的顺序完整记录下来,并实时通过中间技术桥梁(消息中间件、TCP等等)将变更顺序消息传送到下游流程或系统的过程。
CDC Connectors for Apache Flink ®是一组用于Apache Flink ®的源连接器,使用变更数据捕获 (CDC) 从不同数据库获取变更。用于 Apache Flink ®的 CDC 连接器将 Debezium 集成为捕获数据更改的引擎。所以它可以充分发挥 Debezium 的能力。
白话的意思是,Flink-CDC 一个成型的cdc技术实现(Debezium)的包装,我前面也使用过Debezium,并编写了一个简略的博客,感兴趣的可以戳下方连接去看一下
springboot+debezium捕获数据库变更(mysql、sql-server、mongodb、oracle…)
支持读取数据库快照,即使发生故障也能继续读取binlog,一次处理。
DataStream API 的 CDC 连接器,用户可以在单个作业中使用多个数据库和表的更改,而无需部署 Debezium 和 Kafka。
Table/SQL API 的 CDC 连接器,用户可以使用 SQL DDL 创建 CDC 源来监控单个表的更改。
下表显示了 Flink® CDC 连接器和 Flink® 之间的版本映射:
| Flink ® CDC 版本 | Flink®版本_ |
|---|---|
| 1.0.0 | 1.11.* |
| 1.1.0 | 1.11.* |
| 1.2.0 | 1.12.* |
| 1.3.0 | 1.12.* |
| 1.4.0 | 1.13.* |
| 2.0.* | 1.13.* |
| 2.1.* | 1.13.* |
| 2.2.* | 1.13.* , 1.14.* |
按常理来说,一个正常的flink-job 最终我们并不会集成到springboot项目中,我们会直接编写一个maven项目,在发布时使用flink程序来启动任务
比如官网示例:

本文即要使用flink-cdc进行数据变更捕获 (可以视作为一个flink-job),但又要契合我们的springboot项目,使用spring的特性,因此,我们需要转换一下思路,转换成什么样子呢?就是不要将这个flink-cdc作为一个job 使用flink程序进行发布提交,我们就当它在我们开发时一样,作为一个本地项目,main方法启动
flink客户端版本使用 1.13.6 cdc 版本使用 2.0.0
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.13.6</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!--mysql -cdc-->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.18</version>
</dependency>
</dependencies>
无法简单的使用main方法来启动cdc 作业,因为如果这样的话,我们就无法与spring完美的契合
因此我们可以利用springboot的特性, 实现 ApplicationRunner 将flink-cdc 作为一个项目启动时需要运行的分支子任务即可
package com.leilei.mysql;
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
/**
* @author lei
* @create 2022-08-25 13:42
* @desc mysql变更监听
**/
@Component
public class MysqlEventListener implements ApplicationRunner {
private final DataChangeSink dataChangeSink;
public MysqlEventListener(DataChangeSink dataChangeSink) {
this.dataChangeSink = dataChangeSink;
}
@Override
public void run(ApplicationArguments args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DebeziumSourceFunction<DataChangeInfo> dataChangeInfoMySqlSource = buildDataChangeSource();
DataStream<DataChangeInfo> streamSource = env
.addSource(dataChangeInfoMySqlSource, "mysql-source")
.setParallelism(1);
streamSource.addSink(dataChangeSink);
env.execute("mysql-stream-cdc");
}
/**
* 构造变更数据源
*
* @param
* @return DebeziumSourceFunction<DataChangeInfo>
* @author lei
* @date 2022-08-25 15:29:38
*/
private DebeziumSourceFunction<DataChangeInfo> buildDataChangeSource() {
return MySqlSource.<DataChangeInfo>builder()
.hostname("10.50.40.145")
.port(3306)
.databaseList("paas_common_db")
.tableList("paas_common_db.base_business_driver_score_*")
.username("root")
.password("cdwk-3g-145")
/**initial初始化快照,即全量导入后增量导入(检测更新数据写入)
* latest:只进行增量导入(不读取历史变化)
* timestamp:指定时间戳进行数据导入(大于等于指定时间错读取数据)
*/
.startupOptions(StartupOptions.latest())
.deserializer(new MysqlDeserialization())
.serverTimeZone("GMT+8")
.build();
}
}
我这里解析为一个数据变更对象
package com.leilei.mysql;
import com.alibaba.fastjson.JSON;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import com.alibaba.fastjson.JSONObject;
import java.util.List;
import java.util.Optional;
/**
* @author lei
* @create 2022-08-25 13:43
* @desc mysql消息读取自定义序列化
**/
public class MysqlDeserialization implements DebeziumDeserializationSchema<DataChangeInfo> {
public static final String TS_MS = "ts_ms";
public static final String BIN_FILE = "file";
public static final String POS = "pos";
public static final String CREATE = "CREATE";
public static final String BEFORE = "before";
public static final String AFTER = "after";
public static final String SOURCE = "source";
public static final String UPDATE = "UPDATE";
/**
*
* 反序列化数据,转为变更JSON对象
* @param sourceRecord
* @param collector
* @return void
* @author lei
* @date 2022-08-25 14:44:31
*/
@Override
public void deserialize(SourceRecord sourceRecord, Collector<DataChangeInfo> collector) {
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
Struct struct = (Struct) sourceRecord.value();
final Struct source = struct.getStruct(SOURCE);
DataChangeInfo dataChangeInfo = new DataChangeInfo();
dataChangeInfo.setBeforeData( getJsonObject(struct, BEFORE).toJSONString());
dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString());
//5.获取操作类型 CREATE UPDATE DELETE
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toUpperCase();
int eventType = type.equals(CREATE) ? 1 : UPDATE.equals(type) ? 2 : 3;
dataChangeInfo.setEventType(eventType);
dataChangeInfo.setFileName(Optional.ofNullable(source.get(BIN_FILE)).map(Object::toString).orElse(""));
dataChangeInfo.setFilePos(Optional.ofNullable(source.get(POS)).map(x->Integer.parseInt(x.toString())).orElse(0));
dataChangeInfo.setDatabase(database);
dataChangeInfo.setTableName(tableName);
dataChangeInfo.setChangeTime(Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis));
//7.输出数据
collector.collect(dataChangeInfo);
}
/**
*
* 从袁术数据获取出变更之前或之后的数据
* @param value
* @param fieldElement
* @return JSONObject
* @author lei
* @date 2022-08-25 14:48:13
*/
private JSONObject getJsonObject(Struct value, String fieldElement) {
Struct element = value.getStruct(fieldElement);
JSONObject jsonObject = new JSONObject();
if (element != null) {
Schema afterSchema = element.schema();
List<Field> fieldList = afterSchema.fields();
for (Field field : fieldList) {
Object afterValue = element.get(field);
jsonObject.put(field.name(), afterValue);
}
}
return jsonObject;
}
@Override
public TypeInformation<DataChangeInfo> getProducedType() {
return TypeInformation.of(DataChangeInfo.class);
}
}
import lombok.Data;
/**
* @author lei
* @create 2022-08-25 14:33
* @desc 数据变更对象
**/
@Data
public class DataChangeInfo {
/**
* 变更前数据
*/
private String beforeData;
/**
* 变更后数据
*/
private String afterData;
/**
* 变更类型 1新增 2修改 3删除
*/
private Integer eventType;
/**
* binlog文件名
*/
private String fileName;
/**
* binlog当前读取点位
*/
private Integer filePos;
/**
* 数据库名
*/
private String database;
/**
* 表名
*/
private String tableName;
/**
* 变更时间
*/
private Long changeTime;
}
package com.leilei.mysql;
import lombok.extern.log4j.Log4j2;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.springframework.stereotype.Component;
/**
* @author lei
* @create 2022-08-25 14:01
* @desc
**/
@Component
@Log4j2
public class DataChangeSink implements SinkFunction<DataChangeInfo> {
@Override
public void invoke(DataChangeInfo value, Context context) {
log.info("收到变更原始数据:{}", value);
// todo 数据处理;因为此sink也是交由了spring管理,您想进行任何操作都非常简单
}
}
当然,以上仅仅只是整合思路,如果你想使用flink-cdc 进行数据同步或日志记录等,结合您自身的需求进行调整接口,以上内容,大的架子是没问题的
如果遇到问题,可以先从官网QA寻找:https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH)
项目源码:springboot-flink-cdc
我在app/helpers/sessions_helper.rb中有一个帮助程序文件,其中包含一个方法my_preference,它返回当前登录用户的首选项。我想在集成测试中访问该方法。例如,这样我就可以在测试中使用getuser_path(my_preference)。在其他帖子中,我读到这可以通过在测试文件中包含requiresessions_helper来实现,但我仍然收到错误NameError:undefinedlocalvariableormethod'my_preference'.我做错了什么?require'test_helper'require'sessions_hel
我一直很高兴地使用DelayedJob习惯用法:foo.send_later(:bar)这会调用DelayedJob进程中对象foo的方法bar。我一直在使用DaemonSpawn在我的服务器上启动DelayedJob进程。但是...如果foo抛出异常,Hoptoad不会捕获它。这是任何这些包中的错误...还是我需要更改某些配置...或者我是否需要在DS或DJ中插入一些异常处理来调用Hoptoad通知程序?回应下面的第一条评论。classDelayedJobWorker 最佳答案 尝试monkeypatchingDelayed::W
前置步骤我们都操作完了,这篇开始介绍jenkins的集成。话不多说,看操作1、登录进入jenkins后会让你选择安装插件,选择第一个默认的就行。安装完成后设置账号密码,重新登录。2、配置JDK和Git都需要执行路径,所以需要先把执行路径找到,先进入服务器的docker容器,2.1JDK的路径root@69eef9ee86cf:/usr/bin#echo$JAVA_HOME/usr/local/openjdk-82.2Git的路径root@69eef9ee86cf:/#whichgit/usr/bin/git3、先配置JDK和Git。点击:ManageJenkins>>GlobalToolCon
三分钟集成Tap防沉迷SDK(Unity版)一、SDK介绍基于国家对上线所有游戏必须增加防沉迷功能的政策下,TapTap推出防沉迷SDK,供游戏开发者进行接入;允许未成年用户在周五、六、日以及法定节假日晚上8:00-9:00进行游戏,防沉谜时间段进入游戏会弹窗进行提示!开发环境要求:Unity2019.4或更高版本iOS10或更高版本Android5.0(APIlevel21)或更高版本🔗Unity集成Demo参考链接🔗UnityTapSDK功能体验APK下载链接二、集成前准备1.创建应用进入开发者后台,按照提示开始创建应用;2.开通服务在使用TDS实名认证和防沉迷服务之前,需要在上面创建的应
我被这个难住了。到目前为止教程中的一切都进行得很顺利,但是当我将这段代码添加到我的/spec/requests/users_spec.rb文件中时,事情开始变得糟糕:describe"success"doit"shouldmakeanewuser"dolambdadovisitsignup_pathfill_in"Name",:with=>"ExampleUser"fill_in"Email",:with=>"ryan@example.com"fill_in"Password",:with=>"foobar"fill_in"Confirmation",:with=>"foobar"cl
我需要一些指导来了解如何将Angular整合到rails中。选择Rails的原因:我喜欢他们偏执的做事方式。还有迁移,gem真的很酷。使用angular的原因:我正在研究和寻找最适合SPA的框架。Backbone似乎太抽象了。我不得不在Angular和Ember之间做出选择。我首先开始阅读Angular,它对我来说很有意义。所以我从来没有去读过关于ember的文章。使用Angular和Rails的原因:我研究并尝试使用小型框架,例如grape、slim(是的,我也使用php)。但我觉得需要坚持项目的长期范围。我个人喜欢用Rails的方式做事。这就是我需要帮助的地方,我在Rails4中有
有没有人有在Maven中运行用Ruby编写的单元测试的经验。任何输入,如要使用的库/maven插件,将不胜感激!我们已经在使用Maven+hudson+Junit。但是我们正在引入Ruby单元测试,找不到任何同样好的组合。 最佳答案 我建议让Maven使用ExecMavenPlugin启动rake测试(exec:exec目标)并使用ci_reportergem生成单元测试结果的XML文件,Hudson、Bamboo等可以读取该文件,以与JUnit测试相同的格式显示测试结果。如果您不需要使用mvntest运行Ruby测试,您也可以只使
目前我有一小套针对我的网络服务器运行的集成测试,它发出请求并断言一些关于响应应该是什么的假设。这些是用Ruby编写的,生成http请求。我一直在看Gatling作为压力测试工具,但我想知道它是否也可以用于集成测试。这样,所有端点请求都可以在压力测试和集成测试中重复使用。我可能在这里失去了一些东西,因为没有RSpec的BDD,但不必两次创建相同的测试。有没有人有这样使用gatling的经验? 最佳答案 您可以使用AssertionAPI并设置验收标准。但是,Gatling不是浏览器,不会运行/测试您的Javascript,因此这种方法
文章目录前言一、Elasticsearch版本介绍二、客户端种类三、客户端与版本兼容性四、引入Elasticsearch依赖包五、客户端配置六、Elasticsearch使用前言ElasticSearch是Elastic公司出品的一款功能强大的搜索引擎,被广泛的应用于各大IT公司,它的代码位于https://github.com/elastic/elasticsearch,目前是一个开源项目。ElasticSearch公司的另外两个开源产品Logstash、Kibana与ElasticSearch构成了著名的ELK技术栈。。他们三个共同形成了一个强大的生态圈。简单地说,Logstash负责数据
如果您希望在Spring中启用定时任务功能,则需要在主类上添加 @EnableScheduling 注解。这样Spring才会扫描 @Scheduled 注解并执行定时任务。在大多数情况下,只需要在主类上添加 @EnableScheduling 注解即可,不需要在Service层或其他类中再次添加。以下是一个示例,演示如何在SpringBoot中启用定时任务功能:@SpringBootApplication@EnableSchedulingpublicclassApplication{publicstaticvoidmain(String[]args){SpringApplication.ru