草庐IT

数据传输工具 —— Kafka Connect

小胡_鸭 2023-10-05 原文

1、什么是 kafka connect?

  Kafka Connect 是一种用于在 kafka 和其他系统之间可扩展、可靠的流式传输数据的工具。它使得能够快速定义将大量数据集合移入和移出 kafka 的连接器变得简单。

  Kafka Connect 可以获取整个数据库或从应用程序服务器收集指标到 kafka 主题,使数据可用于低延迟的流处理。

  导出作业可以将数据从 kafka topic 传输到二次存储和查询系统,或者传递到批处理系统以进行离线分析。


2、功能

  • kafka connector 通用框架,提供统一的集成 API
  • 同时支持分布式模式和单机模式
  • 自动化的 offset 管理,开发人员不必担心错误处理的影响
  • rest 接口,用来查看和管理 kafka connectors


3、概念

Connectors:通过管理任务来处理数据流的高级抽象
Tasks:数据写入 kafka 和从 kafka 读出的实现
Workers:运行 connectors 和 tasks 的进程
Converters:kafka connect 和其他存储系统直接发送和接收数据之间转换数据

  Connector 决定了数据要从哪里复制过来以及数据应该写到哪里去,一个 connector 实例是一个需要负责在 kafka 和其他系统之间复制数据的逻辑作业,connector plugin 是 jar 文件,实现了 kafka 定义的一些接口来完成特定的任务。

  Task 是 kafka connect 数据模型的主角,每一个 connector 都会协调一系列的 task 去执行任务,connector 可以把一项工作分割成许多的 task,然后再把 task 分发到各个 worker 中去执行(分布式模式下),task 不自己保存自己的状态信息,而是交给特定的 kafka 主题去保存(config.storage.topicstatus.storage.topic)。在分布式模式下有一个概念叫做任务再平衡(Task Rebalancing),当一个 connector 第一次提交到集群时,所有的 worker 都会做一个 task rebalancing 从而保证每一个 worker 都运行了差不多数量的工作,而不是所有的工作压力都集中在某个 worker 进程中,而当每个进程挂了之后也会执行 task rebalance。

  Connectors 和 Tasks 都是逻辑工作单位,必须安排在进程中执行,而在 kafka connect 中,这些进程就是 workers,分别有两种 worker:standalone、distributed。生产中 distributed worker 表现很棒,因为它提供了可扩展性以及自动容错的功能,可以用一个 group.id 来启动很多 worker 进程,在有效的 worker 进程中它们会自动地去协调执行 connector 和 task,如果新加或者挂了一个 worker,其他的 worker 会检测到然后再重新分配 connector 和 task。

  Converter 会把 bytes 数据转换为 kafka connect 内部的格式,也可以把 kafka connect 内部存储格式的数据变成 bytes,converter 对 connector 来说是解耦的,所以其他的 connector 都可以重用。例如使用了 avro converter,那么 jdbc connector 可以写 avro 格式的数据到 kafka,同时 hfds connector 也可以从 kafka 中读出 avro 格式的数据。




4、实战

  启动 confluent

cd /app/confluent/bin
./confluent local start

  使用 standalone 模式启动

# 启动 kafka connect
$CONFLUENT_HOME/bin/connect-standalone \
$CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
connector1.properties [connector2.properties]

  在 $CONFLUENT_HOME/etc/kafka 下存在很多配置文件

  其中 connect-standalone.properties 是启动 connect 服务组件自身的配置,内容如下:

# kafka 服务
bootstrap.servers=localhost:9092

# 转换器
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# 是否启用转换器
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# 偏移量存储文件名
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

# 插件路径
plugin.path=/usr/share/java,/app/confluent/share/confluent-hub-components

# 默认端口为8083,需要修改端口时启动以下配置
# rest.port=8084


(1)标准 connect

启动一个带 FileSource 的 Connect

  connect-file-source.properties 是一个 source connect 的模板配置,启用该配置就能够从指定文件中复制数据到 kafka 中,其默认的配置如下:

# connect 的名字
name=local-file-source
# 将文件读取到数据流中
connector.class=FileStreamSource
# 工作线程是 1 个
tasks.max=1
# 读取的文件名为 test.txt
file=test.txt
# 复制到的主题为 connect-test
topic=connect-test

  启动 connect

$CONFLUENT_HOME/bin/connect-standalone \
  $CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
  $CONFLUENT_HOME/etc/kafka/connect-file-source.properties

  结果报错 Java 内存不足

  关闭虚拟机,加大内存,重启服务器和 confluent,再次启动 connect,报错 8083 端口已被绑定

  修改 connect-standalone.properties 配置中的端口为 8084 再启动,新的报错:不存在 source 配置文件中的指定的文件,在启动路径下创建文件,日志恢复正常

echo -e "foo\nbar\n" > $CONFLUENT_HOME/test.txt

  可以通过 kafka tools 看到新增了主题 connect-test,写入了3条数据

  往文件中写入数据,会报告又成功提交一次偏移量

# 写数据
/app/confluent# echo -e "foo1\nbar1\n" >> test.txt

# 日志
INFO WorkerSourceTask{id=local-file-source-0} Finished commitOffsets successfully in 1 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:515)
...

  然后可以看到主题中多了3条数据


启动带 FileSource 和 FileSink 的 Connect

  connect-file-sink.properties 是一个 source connect 的模板配置,启用该配置就能够从指定文件中复制数据到 kafka 中,其默认的配置如下:

# connect 的名字
name=local-file-sink
# 从数据流中读取数据到文件中
connector.class=FileStreamSink
# 工作线程是 1 个
tasks.max=1
# 写入的文件是 test.sink.txt
file=test.sink.txt
# 读取数据的主题是 connect-test
topics=connect-test

  启动 connect

$CONFLUENT_HOME/bin/connect-standalone \
  $CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
  $CONFLUENT_HOME/etc/kafka/connect-file-source.properties \
  $CONFLUENT_HOME/etc/kafka/connect-file-sink.properties

  可以看到自动创建了 test.sink.txt 文件

  同时可以看到 consumer 中多了一个 connect-local-file-sink ,偏移量为6(即已将6条数据都 sink 到了文件中)



(2)REST API

  使用 Rest API 必须启动分布式模式,通过 Rest API 可以管理集群中的 connect 服务,默认端口是 8083。

GET /connectors - 返回所有正在运行的connector名。
POST /connectors - 新建一个connector;请求体必须是json格式并且需要包含name字段和config字段,name是connectors的名字,config是json格式,必须包含connector的配置信息。
GET /connectors/{name} - 获取指定connector的信息。
GET /connectors/{name}/config - 获取指定connector的配置信息。

  在分布式模式下,有两种方式来配置 connector,第一种是类似 standalone 模式一样,写好配置文件,然后在启动时指定

$CONFLUENT_HOME/bin/connect-distributed \
  $CONFLUENT_HOME/etc/kafka/connect-distributed.properties \
  $CONFLUENT_HOME/etc/kafka/connect-file-source.properties \
  $CONFLUENT_HOME/etc/kafka/connect-file-sink.properties

  另外一种方式更加灵活,就是直接通过 Rest API 来对 connector 配置进行增删查。

  查看 connectors

  添加 connectors

  查看某个 connector

  这里指定的文件是相对路径,所以要在 $CONFLUENT_HOME/bin 路径下创建一个 test-distributed.txt 文件

cd $CONFLUENT_HOME/bin
echo -e "foo\nbar\n" > test-distributed.txt

  可以看到出现了 connect-distributed 主题

  添加 sink

  从服务器可以看到产生了 sink 文件

  删除 connector

  再次往 test-distributed.txt 文件中追加数据,可以看到 connect-distributed 主题中的数据增加了,source connector 依然在工作,但是 sink connector 已经停止了,所以 test-distributed.sink.txt 文件中数据不再从主题中复制。

【注意】

  如果要在脚本中处理,发起HTTP请求,可以使用 curl 工具,将请求的配置在 json 文件中,如:

curl -d @$CONFLUENT_HOME/connect-file-sink.json \
  -H "Content-Type: application/json" \
  -X POST http://localhost:8083/connectors


创建带有 Convert 的 connector

{
    "name": "local-file-source",
    "config": {
        "connector.class": "FileStreamSource",
        "tasks.max": 1,
        "file": "test-transformation.txt",
        "topic": "connect-transformation",
        "transforms": "MakeMap,InsertSource",
        "transforms.MakeMap.type": "org.apache.kafka.connect.transforms.HoistField$Value",
        "transforms.MakeMap.field": "line",
        "transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.InsertSource.static.field": "data_source",
        "transforms.InsertSource.static.value": "test-file-source"
    }
}

  添加 connector(由于跟上述实验 name 一致,所以需要先删除或者换个 name)

  创建 test-transformation.txt 文件,可以看到自动创建了 connect-transformation 主题

  添加 sink

{
    "name": "local-file-sink",
    "config": {
        "connector.class": "FileStreamSink",
        "tasks.max": 1,
        "file": "test-transformation.sink.txt",
        "topics": "connect-transformation"
    }
}

  可以看到 sink 自动生成了 test-transformation.sink.txt 文件,并且内容不是 source 过来的原始数据,而是经过 convertor 处理后的带格式的数据


(3)MySQL Source、ESSink

  演示将数据从 MySQL 复制到 kafka 中,再通过 kafka 将数据下沉到 ElasticSearch。这里 MySQL 是数据源,所以需要支持 MySQL 的 source connector,ES 是目标数据系统,所以需要支持 ES 的 sink connectors,可以从 https://www.confluent.io/hub/ 下载。

MySQL

  MySQL 下载插件搜索关键字 "JDBC",可以看到提供了在线安装的脚本和离线安装的包下载。

  MySQL 环境准备

# 安装 MySQL
sudo apt-get install mysql-server

# 安装 Confluent 插件
confluent-hub install confluentinc/kafka-connect-jdbc:10.4.1

# 将 MySQL 驱动上传到 confluent 目录
# mv mysql.jar /app/confluent/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib

【注意】下载下来的 jdbc connector 插件,在处理 mysql 时需要相应的驱动,而插件不带驱动,实际采集数据时会报错,这时需要将驱动 jar 包拷贝到插件库目录中。

  数据准备,创建用户并授权,用该用户创建数据库、表和插入数据

grant all on *.* to hyh@'localhost' identified by 'hyh';
create database studentsDB;
use studentsDB;
create table students (rollno int primary key auto_increment, name varchar(30), marks varchar(30));
insert into students (name, marks) values ('James', 35);

  创建 source 配置文件(connect-mysql-source.properties),内容如下:

name=test-source-mysql-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://127.0.0.1:3306/studentsDB?user=hyh&password=hyh
mode=incrementing
# 表中的自增列字段
incrementing.column.name=rollno
# 表会被采集到的 topic 名前缀,比如表名叫 students,对应的 topic 就为 test-mysql-jdbc-students
topic.prefix=test-mysql-jdbc-

  启动 mysql source connector

$CONFLUENT_HOME/bin/connect-standalone \
  $CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
  $CONFLUENT_HOME/etc/kafka/connect-mysql-source.properties

  可以看到启动之后,开启了 JDBC source task,然后执行了查询的 SQL,最后提交和刷新的偏移量

  与此同时,可以看到 kafka 中新增了一个 topic test-mysql-jdbc-students

  里面有一条数据,如果此时往表中再插入两条数据,可以看到数据变成了3条



ElasticSearch

  ES 下载插件搜索关键字 "ElasticSearch" ,可以看到有 ElasticSearch Sink Connector、ElasticSearch Source Connector,注意有些插件是支持 source、sink,有些是分开两个插件。

  ES 环境准备

tar -zxvf elasticsearch-7.6.0-linux-x86_64.tar.gz -C /app
mv /app/elasticsearch-7.6.0 /app/elasticsearch

# 配置环境变量
export ES_HOME=/app/elasticsearch
export PATH=${ES_HOME}/bin:$PATH

# 安装 Confluent 插件
confluent-hub install confluentinc/kafka-connect-elasticsearch:13.0.0

  启动 ES

cd /app/elasticsearch
.bin/elasticsearch

  报错不能以root用户启动

  创建用户用户组es,并修改 es 安装目录所属用户和组

chown -R es:es elasticsearch/

  再次启动看到以下日志即正常

  配置 sink 配置文件(connect-es-sink.properties),内容如下:

name=test-sink-elastic
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
connection.url=http://localhost:9200
topics=test-mysql-jdbc-students 
key.ignore=true
type.name=kafka-connect

  启动 ES sink connector

$CONFLUENT_HOME/bin/connect-standalone \
  $CONFLUENT_HOME/etc/kafka/connect-standalone.properties \
  $CONFLUENT_HOME/etc/kafka/connect-mysql-source.properties \
  $CONFLUENT_HOME/etc/kafka/connect-es-sink.properties

  访问 es 9092 端口查询数据,可以查到有三条数据

# 查询命令
curl -H "Content-Type: application/json" -X GET http://localhost:9200/test-mysql-jdbc-students/_search

# 查到的结果
{
    "took": 121,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 3,
            "relation": "eq"
        },
        "max_score": 1,
        "hits": [
            {
                "_index": "test-mysql-jdbc-students",
                "_type": "_doc",
                "_id": "test-mysql-jdbc-students+0+0",
                "_score": 1,
                "_source": {
                    "rollno": 1,
                    "name": "James",
                    "marks": "35"
                }
            },
            {
                "_index": "test-mysql-jdbc-students",
                "_type": "_doc",
                "_id": "test-mysql-jdbc-students+0+1",
                "_score": 1,
                "_source": {
                    "rollno": 2,
                    "name": "James2",
                    "marks": "36"
                }
            },
            {
                "_index": "test-mysql-jdbc-students",
                "_type": "_doc",
                "_id": "test-mysql-jdbc-students+0+2",
                "_score": 1,
                "_source": {
                    "rollno": 3,
                    "name": "James3",
                    "marks": "37"
                }
            }
        ]
    }
}

  往数据库插入一条新的数据

insert into students (name, marks) values ('James4', 38);

  可以看到 es 侧接收到了这条数据

有关数据传输工具 —— Kafka Connect的更多相关文章

  1. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  2. ruby - Ruby 有 `Pair` 数据类型吗? - 2

    有时我需要处理键/值数据。我不喜欢使用数组,因为它们在大小上没有限制(很容易不小心添加超过2个项目,而且您最终需要稍后验证大小)。此外,0和1的索引变成了魔数(MagicNumber),并且在传达含义方面做得很差(“当我说0时,我的意思是head...”)。散列也不合适,因为可能会不小心添加额外的条目。我写了下面的类来解决这个问题:classPairattr_accessor:head,:taildefinitialize(h,t)@head,@tail=h,tendend它工作得很好并且解决了问题,但我很想知道:Ruby标准库是否已经带有这样一个类? 最佳

  3. ruby - 我如何添加二进制数据来遏制 POST - 2

    我正在尝试使用Curbgem执行以下POST以解析云curl-XPOST\-H"X-Parse-Application-Id:PARSE_APP_ID"\-H"X-Parse-REST-API-Key:PARSE_API_KEY"\-H"Content-Type:image/jpeg"\--data-binary'@myPicture.jpg'\https://api.parse.com/1/files/pic.jpg用这个:curl=Curl::Easy.new("https://api.parse.com/1/files/lion.jpg")curl.multipart_form_

  4. 世界前沿3D开发引擎HOOPS全面讲解——集3D数据读取、3D图形渲染、3D数据发布于一体的全新3D应用开发工具 - 2

    无论您是想搭建桌面端、WEB端或者移动端APP应用,HOOPSPlatform组件都可以为您提供弹性的3D集成架构,同时,由工业领域3D技术专家组成的HOOPS技术团队也能为您提供技术支持服务。如果您的客户期望有一种在多个平台(桌面/WEB/APP,而且某些客户端是“瘦”客户端)快速、方便地将数据接入到3D应用系统的解决方案,并且当访问数据时,在各个平台上的性能和用户体验保持一致,HOOPSPlatform将帮助您完成。利用HOOPSPlatform,您可以开发在任何环境下的3D基础应用架构。HOOPSPlatform可以帮您打造3D创新型产品,HOOPSSDK包含的技术有:快速且准确的CAD

  5. FOHEART H1数据手套驱动Optitrack光学动捕双手运动(Unity3D) - 2

    本教程将在Unity3D中混合Optitrack与数据手套的数据流,在人体运动的基础上,添加双手手指部分的运动。双手手背的角度仍由Optitrack提供,数据手套提供双手手指的角度。 01  客户端软件分别安装MotiveBody与MotionVenus并校准人体与数据手套。MotiveBodyMotionVenus数据手套使用、校准流程参照:https://gitee.com/foheart_1/foheart-h1-data-summary.git02  数据转发打开MotiveBody软件的Streaming,开始向Unity3D广播数据;MotionVenus中设置->选项选择Unit

  6. 使用canal同步MySQL数据到ES - 2

    文章目录一、概述简介原理模块二、配置Mysql使用版本环境要求1.操作系统2.mysql要求三、配置canal-server离线下载在线下载上传解压修改配置单机配置集群配置分库分表配置1.修改全局配置2.实例配置垂直分库水平分库3.修改group-instance.xml4.启动监听四、配置canal-adapter1修改启动配置2配置映射文件3启动ES数据同步查询所有订阅同步数据同步开关启动4.验证五、配置canal-admin一、概述简介canal是Alibaba旗下的一款开源项目,Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。Git地址:https://github.co

  7. 基于C#实现简易绘图工具【100010177】 - 2

    C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.

  8. postman接口测试工具-基础使用教程 - 2

    1.postman介绍Postman一款非常流行的API调试工具。其实,开发人员用的更多。因为测试人员做接口测试会有更多选择,例如Jmeter、soapUI等。不过,对于开发过程中去调试接口,Postman确实足够的简单方便,而且功能强大。2.下载安装官网地址:https://www.postman.com/下载完成后双击安装吧,安装过程极其简单,无需任何操作3.使用教程这里以百度为例,工具使用简单,填写URL地址即可发送请求,在下方查看响应结果和响应状态码常用方法都有支持请求方法:getpostputdeleteGet、Post、Put与Delete的作用get:请求方法一般是用于数据查询,

  9. ruby-on-rails - 创建 ruby​​ 数据库时惰性符号绑定(bind)失败 - 2

    我正在尝试在Rails上安装ruby​​,到目前为止一切都已安装,但是当我尝试使用rakedb:create创建数据库时,我收到一个奇怪的错误:dyld:lazysymbolbindingfailed:Symbolnotfound:_mysql_get_client_infoReferencedfrom:/Library/Ruby/Gems/1.8/gems/mysql2-0.3.11/lib/mysql2/mysql2.bundleExpectedin:flatnamespacedyld:Symbolnotfound:_mysql_get_client_infoReferencedf

  10. STM32读取串口传感器数据(颗粒物传感器,主动上传) - 2

    文章目录1.开发板选择*用到的资源2.串口通信(个人理解)3.代码分析(注释比较详细)1.主函数2.串口1配置3.串口2配置以及中断函数4.注意问题5.源码链接1.开发板选择我用的是STM32F103RCT6的板子,不过代码大概在F103系列的板子上都可以运行,我试过在野火103的霸道板上也可以,主要看一下串口对应的引脚一不一样就行了,不一样的就更改一下。*用到的资源keil5软件这里用到了两个串口资源,采集数据一个,串口通信一个,板子对应引脚如下:串口1,TX:PA9,RX:PA10串口2,TX:PA2,RX:PA32.串口通信(个人理解)我就从串口采集传感器数据这个过程说一下我自己的理解,

随机推荐