kafka安装目录下的bin目录包含了很多运维可操作的shell脚本,列举如下:
| 脚本名称 | 用途描述 |
|---|---|
| connect-distributed.sh | 连接kafka集群模式 |
| connect-standalone.sh | 连接kafka单机模式 |
| kafka-acls.sh | 设置Kafka权限 |
| kafka-broker-api-versions.sh | 检索代理版本信息 |
| kafka-configs.sh | 配置管理脚本 |
| kafka-console-consumer.sh | kafka消费者控制台 |
| kafka-console-producer.sh | kafka生产者控制台 |
| kafka-consumer-groups.sh | kafka消费者组相关信息 |
| kafka-consumer-perf-test.sh | kafka消费者性能测试脚本 |
| kafka-delegation-tokens.sh | 管理Delegation Token |
| kafka-delete-records.sh | 将给定分区的日志向下删除到指定的偏移量 |
| kafka-dump-log.sh | 用来查看Topic的的文件内容 |
| kafka-log-dirs.sh | 查询各个Broker上的各个日志路径的磁盘占用情况 |
| kafka-mirror-maker.sh | 在Kafka集群间实现数据镜像 |
| kafka-preferred-replica-election.sh | 触发preferred replica选举 |
| kafka-producer-perf-test.sh | kafka生产者性能测试脚本 |
| kafka-reassign-partitions.sh | 分区重分配脚本 |
| kafka-replica-verification.sh | 复制进度验证脚本 |
| kafka-run-class.sh | 执行任何带main方法的Kafka类 |
| kafka-server-start.sh | 启动kafka服务 |
| kafka-server-stop.sh | 停止kafka服务 |
| kafka-simple-consumer-shell.sh | deprecated,推荐使用kafka-console-consumer.sh |
| kafka-streams-application-reset.sh | 给Kafka Streams应用程序重设位移,以便重新消费数据 |
| kafka-topics.sh | topic管理脚本 |
| kafka-verifiable-consumer.sh | 可检验的kafka消费者 |
| kafka-verifiable-producer.sh | 可检验的kafka生产者 |
| trogdor.sh | Kafka的测试框架,用于执行各种基准测试和负载测试 |
| zookeeper-server-start.sh | 启动zookeeper服务 |
| zookeeper-server-stop.sh | 停止zookeeper服务 |
| zookeeper-shell.sh | 连接操作zookeeper的脚本,可以查看kafka在zk上的节点信息 |
接下来会详细说明常用脚本的使用方法。
对于kafka的topic操作,我们需要用到的是bin/kafka-topics.sh这个脚本文件。
bin/kafka-topics.sh --bootstrap-server 192.168.8.128:9092 --create --topic my-topic
注意:Topic 名称中一定不要同时出现下划线 (’_’) 和小数点 (’.’)。
在创建topic时选项说明:
--topic test :定义topic名称
--partitions 3:指定当前topic的分区数,若不指定则根据配置文件的默认分区数进行创建
--replication-factor 1: 定义副本数为1,副本数不能超过当前集群broker数,否则会抛出InvalidReplicationFactorException异常

bin/kafka-topics.sh --bootstrap-server 192.168.8.128:9092 --list

bin/kafka-topics.sh --bootstrap-server 192.168.8.128:9092 --alter --topic test --partitions 40

2. 增加配置
bin/kafka-topics.sh --alter --bootstrap-server 192.168.8.128:9092 --topic test --config cleanup.policy=compact
bin/kafka-topics.sh --alter --bootstrap-server 192.168.8.128:9092 --topic test --delete-config flush.messages
当如下所示的属性配置到 Topic 上时,将会覆盖 server.properties 上对应的属性。
| Topic级别配置属性 | 类型 | 有效值 | 描述 |
|---|---|---|---|
| cleanup.policy | list | delete(默认) compact | 过期或达到上限日志的清理策略。 delete:删除 compact:压缩 |
| compression.type | string | uncompressed snappy lz4 gzip producer(默认) | 指定给该topic最终的压缩类型 |
| delete.retention.ms | long | 86400000(默认) | 压缩的日志保留的最长时间,也是客户端消费消息的最长时间。 与 log.retention.minutes 的区别在于:一个控制未压缩的数据,一个控制压缩后的数据。 |
| file.delete.delay.ms | long | 60000 | 从文件系统中删除前所等待的时间 |
| flush.messages | long | 9223372036854775807 | 在消息刷到磁盘之前,日志分区收集的消息数 |
| flush.ms | long | 9223372036854775807 | 消息在刷到磁盘之前,保存在内存中的最长时间,单位是ms |
| index.interval.bytes | int | 4096 | 执行 fetch 操作后,扫描最近的 offset 运行空间的大小。 设置越大,代表扫描速度越快,但是也更耗内存。 (一般情况下不需要设置此参数) |
| message.max.bytes | int | 1000012 | log中能够容纳消息的最大字节数 |
| min.cleanable.dirty.ratio | double | 0.5 | 日志清理的频率控制,占该log的百分比。 越大意味着更高效的清理,同时会存在空间浪费问题 |
| retention.bytes | long | -1(默认) | topic每个分区的最大文件大小。 一个 topic 的大小限制 = 分区数 * log.retention.bytes。 -1 表示没有大小限制。 |
| retention.ms | int | 604800000(默认) | 日志文件保留的分钟数。 数据存储的最大时间超过这个时间会根据 log.cleanup.policy 设置的策略处理数据 |
| segment.bytes | int | 1073741824(默认) | 每个 segment 的大小 (默认为1G) |
| segment.index.bytes | int | 10485760(默认) | 对于segment日志的索引文件大小限制(默认为10M) |
注意:
partition数量只能增加,不能减少
此脚本不能用来修改副本个数。(使用
kafka-reassign-partitions.sh脚本修改副本数)
首先根据需要创建topic文件配置
partitions-topic.json,配置内容如下:{ "partitions": [ { "topic": "test", "partition": 0, "replicas": [1,2] }, { "topic": "test", "partition": 1, "replicas": [1,3] }, { "topic": "test", "partition": 2, "replicas": [2,3] } ], "version":1 }
- 执行副本搬迁
bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.8.128:9092 --reassignment-json-file partitions-topic.json --execute查看迁移情况:
bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.8.128:9092 --reassignment-json-file partitions-topic.json --verify
bin/kafka-topics.sh --bootstrap-server 192.168.8.128:9092 --delete --topic my_topic_name
bin/kafka-topics.sh --describe --topic my-topic --bootstrap-server 196.168.8.128:9092

输出信息中,第一行包含该topic的配置信息(名称,id,分区数,副本数和配置),后面每一行说明一个分区的信息。
其中,Leader,Replicas和Isr后面的数字都是 broker 的 id,Leader 表示该节点负责该分区的所有读写操作,Replicas 表示备份的节点,Isr表示当前 kakfa 集群中可用的 breaker.id 列表。因为目前是单节点,所以所有信息均为0。
消息是kafka中最基本的数据单元,在kafka中,一条消息由key、value两部分构成,在发送一条消息时,我们可以指定这个key,producer的分发机制会根据key来判断当前这条消息应该发送并存储到哪个partition中。默认情况下,kafka采用的是hash取模的分区算法,如果Key为null,则会随机分配一个分区。
生产消息需要使用bin/kafka-console-producer.sh脚本。直接输入消息值(value)即可,输入的每一行表示一条消息,都会导致将单独的事件写入主题。每次回车表示触发“发送”操作,回车后可直接使用“Ctrl + c”退出生产者控制台。
bin/kafka-console-producer.sh --topic test --bootstrap-server 192.168.8.128:9092

如需指定消息的key值可以通过--property parse.key=true配置。输入消息时,默认消息的key和value之间使用Table键进行分隔(请勿使用转义字符\t)。
bin/kafka-console-producer.sh --bootstrap-server 192.168.8.128:9092 --topic test --property parse.key=true

输入如上信息表示所生产的消息“Key1”和"Key2"为消息键,“Value1”和“Value2”为消息值。
下表列举了3.0版本支持的所有参数用法:
| 参数 | 值类型 | 说明 | 有效值 |
|---|---|---|---|
| –bootstrap-server | String | 要连接的服务器 必需(除非指定–broker-list, 但broker-list在新版本中已过时) | 形如:host1:prot1,host2:prot2 |
| –topic | String | (必需)接收消息的主题名称 | |
| –broker-list | String | 已过时要连接的服务器 | 形如:host1:prot1,host2:prot2 |
| –batch-size | Integer | 单个批处理中发送的消息数 | 200(默认值) |
| –compression-codec | String | 压缩编解码器 | none、gzip(默认值) snappy、lz4、zstd |
| –line-reader | String | 默认情况下,每一行都被读取为单独的消息 | kfka.tools. ConsoleProducer$LineMessageReader |
| –max-block-ms | Long | 在发送请求期间, 生产者将阻塞的最长时间 | 60000(默认值) |
| –max-memory-bytes | Long | 生产者用来缓冲等待发送到服务器的总内存 | 33554432(默认值) |
| –max-partition-memory-bytes | Long | 为分区分配的缓冲区大小 | 16384(默认值) |
| –message-send-max-retries | Integer | 最大的重试发送次数 | 3(默认值) |
| –metadata-expiry-ms | Long | 强制更新元数据的时间阈值(ms) | 300000(默认值) |
| –producer-property | String | 将自定义属性传递给生产者的机制 | 形如:key=value |
| –producer.config | String | 生产者配置属性文件 ,注意[–producer-property]优先于此配置 | 配置文件完整路径 |
| –property | String | 自定义消息读取器 | parse.key=true|false key.separator=<key.separator> ignore.error=true|false |
| –request-required-acks | String | 生产者请求的确认方式(具体讲解在Producer API) | 0、1(默认值)、all |
| –request-timeout-ms | Integer | 生产者请求的确认超时时间 | 1500(默认值) |
| –retry-backoff-ms | Integer | 生产者重试前,刷新元数据的等待时间阈值 | 100(默认值) |
| –socket-buffer-size | Integer | TCP接收缓冲大小 | 102400(默认值) |
| –timeout | Integer | 消息排队异步等待处理的时间阈值 | 1000(默认值) |
| –sync | 同步发送消息 | ||
| –version | 显示 Kafka 版本 不配合其他参数时,显示为本地Kafka版本 | ||
| –help | 打印帮助信息 |
消费消息需要使用bin/kafka-console-consumer.sh 脚本。该 shell 脚本的功能通过调用 kafka.tools 包下的 ConsoleConsumer 类,并将提供的命令行参数全部传给该类实现。
打开另一个终端会话并运行控制台使用者客户端以读取刚刚创建的事件:
bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server 192.168.8.128:9092

默认情况消费出来现实的信息是只有消息的Value值,如果要展示消息的Key,时间戳或其他信息需要通过选项–property进行配置。
bin/kafka-console-consumer.sh --topic my-topic --bootstrap-server 192.168.8.128:9092 --property print.partition=true --property print.key=true --property print.timestamp=true --property print.offset=true

若不适用–from-beginning选项,表示从最新处消费该topic的所有分区的的消息,即仅消费正在写入的消息。加上–from-beginning选项表示从该topic存在的所有消息中从头开始消费。
注意:
consumer默认将offset保存在Kafka一个内置的topic中,该topic名 为
__consumer_offsets
该进程会一直运行,当有新消息进来,这里会直接读取出来消息。当有Leader节点出现错误时,会在剩余的follower中推举出一个leader,而且这些数据还没有丢失,因为follower是leader的备份节点。
| 参数 | 值类型 | 说明 | 有效值 |
|---|---|---|---|
| –topic | string | 被消费的topic | |
| –include | string | 正则表达式,指定要包含以供使用的主题的白名单 | |
| –partition | integer | 指定消费的分区,默认从该分区的末尾开始消费,除非指定了offset | |
| –offset | string | 执行消费的起始偏移量位置 默认值:latest | latest:从最新处开始消费 earliest :从最早处开始消费 offset:从指定偏移量开始消费 |
| –consumer-property | string | 将用户定义的属性以key=value的形式传递给使用者 | |
| –consumer.config | string | 消费者配置属性文件 [consumer-property]配置优先级高于此配置 | |
| –formatter | string | 用于格式化kafka消息以供显示的类的名称 ,kafka.tools下的 | DefaultMessageFormatter LoggingMessageFormatter NoOpMessageFormatter ChecksumMessageFormatter |
| –property | string | 初始化消息格式化程序的属性 | print.timestamp=true|false print.key=true|false print.value=true|false key.separator=<key.separator> line.separator=<line.separator> key.deserializer=<key.deserializer> value.deserializer=<value.deserializer> |
| –from-beginning | 从存在的最早消息开始,而不是从最新消息开始 | ||
| –max-messages | integer | 设置要消费的最大消息数。如果未设置,则连续消耗 | |
| –timeout-ms | integer | 如果没有消息可消费,将在指定时间后终止消费者进程 | |
| –skip-message-on-error | 如果处理消息时出错,请跳过它而不是暂停 | ||
| –bootstrap-server | string | 必需,要连接的服务器 | |
| –key-deserializer | string | 消息的key序列化方式 | |
| –value-deserializer | string | 消息的value序列化方式 | |
| –enable-systest-events | 除记录消费的消息外,还记录消费者的生命周期 (用于系统测试) | ||
| –isolation-level | string | 设置为read_committed以过滤掉未提交的事务性消息 设置为read_uncommitted以读取所有消息 | read_uncommitted, read_uncommitted(默认值) |
| –group | string | 指定消费者所属组的ID |
kafka-configs.sh配置管理脚本,这个脚本主要分两类用法:describe和alter。
describe相关用法
bin/kafka-configs.sh --bootstrap-server 192.168.8.128:9092 --describe --entity-type topics
bin/kafka-configs.sh --bootstrap-server 192.168.8.128:9092 --describe --entity-type brokers --entity-name 0
说明:0是broker.id,因为entity-type为brokers,所以entity-name表示broker.id。
alter相关用法
bin/kafka-configs.sh --bootstrap-server 192.168.8.128:9092 --alter --entity-type topics --entity-name TOPIC-TEST-AFEI --add-config retention.ms=600000
bin/kafka-configs.sh --bootstrap-server 192.168.8.128:9092 --alter --entity-type topics --entity-name TOPIC-TEST-AFEI --delete-config max.message.bytes
使用过程中,如果遇到问题,可能需要查看元数据日志。在KRaft中,有两个命令行工具需要特别关注下。kafka-dump-log.sh和kakfa-metadata-shell.log。(元数据不是真正的数据,规范、定义真实数据的数据)
kafka-dump-log.sh
Kafka-dump-log.sh是一个之前就有的工具,用来查看Topic的的文件内容。
查询Log文件
bin/kafka-dump-log.sh --files kafka-logs/my-topic-0/00000000000000000000.log

bin/kafka-dump-log.sh --files kafka-logs/my-topic-2/00000000000000000000.log --print-data-log

bin/kafka-dump-log.sh --files kafka-logs/my-topic-2/00000000000000000000.index

bin/kafka-dump-log.sh --files kafka-logs/my-topic-2/00000000000000000000.timeindex

3.0版本中这个工具加了一个参数–cluster-metadata-decoder用来,查看元数据日志,如下所示:
./bin/kafka-dump-log.sh --cluster-metadata-decoder --skip-record-metadata --files /opt/kraft/kraft-combined-logs/__cluster_metadata-0/*.log

./bin/kafka-dump-log.sh --cluster-metadata-decoder --skip-record-metadata --files /opt/kraft/kraft-combined-logs/__cluster_metadata-0/*.log
Dumping /tmp/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 1 isTransactional: false isControl: true position: 0 CreateTime: 1614382631640 size: 89 magic: 2 compresscodec: NONE crc: 1438115474 isvalid: true
baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 1 isTransactional: false isControl: false position: 89 CreateTime: 1614382632329 size: 137 magic: 2 compresscodec: NONE crc: 1095855865 isvalid: true
payload: {"type":"REGISTER_BROKER_RECORD","version":0,"data":{"brokerId":1,"incarnationId":"P3UFsWoNR-erL9PK98YLsA","brokerEpoch":0,"endPoints":[{"name":"PLAINTEXT","host":"localhost","port":9092,"securityProtocol":0}],"features":[],"rack":null}}
baseOffset: 2 lastOffset: 2 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 1 isTransactional: false isControl: false position: 226 CreateTime: 1614382632453 size: 83 magic: 2 compresscodec: NONE crc: 455187130 isvalid: true
payload: {"type":"UNFENCE_BROKER_RECORD","version":0,"data":{"id":1,"epoch":0}}
baseOffset: 3 lastOffset: 3 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 1 isTransactional: false isControl: false position: 309 CreateTime: 1614382634484 size: 83 magic: 2 compresscodec: NONE crc: 4055692847 isvalid: true
payload: {"type":"FENCE_BROKER_RECORD","version":0,"data":{"id":1,"epoch":0}}
baseOffset: 4 lastOffset: 4 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: false isControl: true position: 392 CreateTime: 1614382671857 size: 89 magic: 2 compresscodec: NONE crc: 1318571838 isvalid: true
baseOffset: 5 lastOffset: 5 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: false isControl: false position: 481 CreateTime: 1614382672440 size: 137 magic: 2 compresscodec: NONE crc: 841144615 isvalid: true
payload: {"type":"REGISTER_BROKER_RECORD","version":0,"data":{"brokerId":1,"incarnationId":"RXRJu7cnScKRZOnWQGs86g","brokerEpoch":4,"endPoints":[{"name":"PLAINTEXT","host":"localhost","port":9092,"securityProtocol":0}],"features":[],"rack":null}}
baseOffset: 6 lastOffset: 6 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: false isControl: false position: 618 CreateTime: 1614382672544 size: 83 magic: 2 compresscodec: NONE crc: 4155905922 isvalid: true
payload: {"type":"UNFENCE_BROKER_RECORD","version":0,"data":{"id":1,"epoch":4}}
baseOffset: 7 lastOffset: 8 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: false isControl: false position: 701 CreateTime: 1614382712158 size: 159 magic: 2 compresscodec: NONE crc: 3726758683 isvalid: true
payload: {"type":"TOPIC_RECORD","version":0,"data":{"name":"foo","topicId":"5zoAlv-xEh9xRANKXt1Lbg"}}
payload: {"type":"PARTITION_RECORD","version":0,"data":{"partitionId":0,"topicId":"5zoAlv-xEh9xRANKXt1Lbg","replicas":[1],"isr":[1],"removingReplicas":null,"addingReplicas":null,"leader":1,"leaderEpoch":0,"partitionEpoch":0}}
2.kafka-metadata-shell.sh
kafka还提供了一个叫做kafka-metadata-shell.sh的工具,能够看到topic和partion的分布,这些信息原来是可以通过zk获取的,现在可以使用这个命令行获取。
[root@192 kraft]# ./bin/kafka-metadata-shell.sh --snapshot /opt/kraft/kraft-combined-logs/__cluster_metadata-0/00000000000000000000.log
Loading...
Starting...
[ Kafka Metadata Shell ]
>> ls /
brokers configs local metadataQuorum topicIds topics
>> ls /topics
__consumer_offsets test topic-test-kraft
>> ls /topicIds
wSIr0K8RQr6FRCvVbmW8Kg xYERkvINT1aYpmrUkRqhnA zmpBmMGgS66uco28RoAdSQ
>> ls /brokers
1
>> ls /configs
topic
>> cat /topics/test/0/data
{
"partitionId" : 0,
"topicId" : "xYERkvINT1aYpmrUkRqhnA",
"replicas" : [ 1 ],
"isr" : [ 1 ],
"removingReplicas" : [ ],
"addingReplicas" : [ ],
"leader" : 1,
"leaderEpoch" : 6,
"partitionEpoch" : 6
}
>> exit

对消费组的操作用到的是 bin/kafka-consumer-groups.sh脚本,具体使用方法如下:
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.8.128:9092 --list

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.8.128:9092 --describe --group my-group

各字段含义如下:
| TOPIC | PARTITION | CURRENT-OFFSET | LOG-END-OFFSET | LAG | CONSUMER-ID | HOST | CLIENT-ID |
|---|---|---|---|---|---|---|---|
| topic名字 | 分区id | 当前已消费的条数 | 总条数 | 未消费的条数 | 消费id | 主机ip | 客户端id |
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.8.128:9092 --describe --group test --members

bin/kafka-consumer-groups.sh --bootstrap-server 192.168.8.128:9092 --describe --group my-group --members --verbose

此外,–offsets这是默认的描述选项,与–describe选项输出相同。–state提供有用的组级信息。

> bin/kafka-consumer-groups.sh --bootstrap-server 192.168.8.128:9092 --delete --group my-group --group my-other-group
Deletion of requested consumer groups ('my-group', 'my-other-group') was successful.
重置消费者的偏移量
选项说明:
--to-latest:重置偏移量为最新处
--to-earliest:重置偏移量为最早处
--to-offset <Long: offset>:重置偏移量为指定值
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.8.128:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest
kafka connect 是一个可扩展的、可靠的在kafka和其他系统之间流传输的数据工具。简而言之就是他可以通过Connector(连接器)简单、快速的将大集合数据导入和导出kafka。可以接收整个数据库或收集来自所有的应用程序的消息到kafka的topic中,主要用来与其他中间件建立流式通道。

Kafka connect的核心组件:


注意:读取后的数据的Schema是固定的,包含的列如下:
| Column | Type | 说明 |
|---|---|---|
| key | binary | 消息的key |
| value | binary | 消息的value |
| topic | string | 主题 |
| partition | int | 分区 |
| offset | long | 偏移量 |
| timestamp | long | 时间戳 |
Kafka connect的两种工作模式:
standalone:在standalone模式中,用单一进程负责执行所有连接操作,使用connect-standalone.sh脚本。distributed:distributed模式具有高扩展性,以及提供自动容错机制。可以使用一个group.ip来启动很多worker进程,在有效的worker进程中它们会自动的去协调执行任务,使用connect-distributed.sh脚本启动。Kafka connect客户端操作:
1.创建一些文本信息

2.开启两个连接器运行在独立模式
独立模式意味着运行一个单一的,本地的,专用的进程。使用的是bin/connect-standalone.sh这个脚本。
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
执行这个脚本的时候会带三个配置文件(若执行的是connect-distributed.sh脚本,第一个参数应选择相应的connect-distributed.properties):connect-standalone.properties是Kafka Connect处理的配置,connect-standalone.properties配置文件如下:
#kafka服务地址
bootstrap.servers=192.168.8.128:9092
#把数据导入到kafka的某个topic时,topic中数据的key按照某种converter转化,默认是json格式
key.converter=org.apache.kafka.connect.json.JsonConverter
#把数据导入到kafka的某个topic时,topic中数据的value按照某种converter转化,默认是json格式
value.converter=org.apache.kafka.connect.json.JsonConverter
#指定topic中数据的key和value是否包含schema信息,消息由playload和schema组成
key.converter.schemas.enable=true
value.converter.schemas.enable=true
#保存偏移量的路径
offset.storage.file.filename=/tmp/connect.offsets
#保存connector运行中offset到topic的频率
offset.flush.interval.ms=10000
connect-distributed.properties配置文件如下:
#kafka服务地址
bootstrap.servers=192.168.8.128:9092
#集群的id,要注意这个id不能和consumer group的id冲突
group.id=connect-cluster
#把数据导入到kafka的某个topic时,topic中数据的key按照某种converter转化,默认是json格式
key.converter=org.apache.kafka.connect.json.JsonConverter
#把数据导入到kafka的某个topic时,topic中数据的value按照某种converter转化,默认是json格式
value.converter=org.apache.kafka.connect.json.JsonConverter
#指定topic中数据的key和value是否包含schema信息
key.converter.schemas.enable=true
value.converter.schemas.enable=true
#用于保存connector运行中offset的topic,当connector宕机时可以继续从某个offset开始运行
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
#用于保存connector配置信息的topic(注意:此topic只能由一个partition)
config.storage.topic=connect-configs
config.storage.replication.factor=1
#用于保存connector和task状态的topic
status.storage.topic=connect-status
status.storage.replication.factor=1
#保存connector运行中offset到topic的频率
offset.flush.interval.ms=10000
connect-file-source.properties配置文件如下:
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
# 指定读取的文件路径
file=/home/file/text.txt
# 指定将数据写入的topic
topic=connect-test
connect-file-sink.properties配置文件如下:
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
# 指定消息输出文件路径
file=/home/file/test.sink.txt
# 指定从该topic读取数据
topics=connect-test
Kafka connect核心组件有source和sink。source负责将外部数据写入到Kafka的topic中,sink负责从Kafka的topic读取数据并写入到指定地方。
执行运行的shell命令,一旦进程开始,导入连接器读取text.txt文件内容写入到connect-test主题,导出连接器从主题connect-test读取消息写入到文件test.sink.txt,而且可以看到connect-test的topic已经创建了。


如果在text.txt追加内容,输出文件test.sink.txt也会从kafka的主题中消费消息。
我有一个使用SeleniumWebdriver和Nokogiri的Ruby应用程序。我想选择一个类,然后对于那个类对应的每个div,我想根据div的内容执行一个Action。例如,我正在解析以下页面:https://www.google.com/webhp?sourceid=chrome-instant&ion=1&espv=2&ie=UTF-8#q=puppies这是一个搜索结果页面,我正在寻找描述中包含“Adoption”一词的第一个结果。因此机器人应该寻找带有className:"result"的div,对于每个检查它的.descriptiondiv是否包含单词“adoption
我正在我的Rails项目中安装Grape以构建RESTfulAPI。现在一些端点的操作需要身份验证,而另一些则不需要身份验证。例如,我有users端点,看起来像这样:moduleBackendmoduleV1classUsers现在如您所见,除了password/forget之外的所有操作都需要用户登录/验证。创建一个新的端点也没有意义,比如passwords并且只是删除password/forget从逻辑上讲,这个端点应该与用户资源。问题是Grapebefore过滤器没有像except,only这样的选项,我可以在其中说对某些操作应用过滤器。您通常如何干净利落地处理这种情况?
在我做的一些网络开发中,我有多个操作开始,比如对外部API的GET请求,我希望它们同时开始,因为一个不依赖另一个的结果。我希望事情能够在后台运行。我找到了concurrent-rubylibrary这似乎运作良好。通过将其混合到您创建的类中,该类的方法具有在后台线程上运行的异步版本。这导致我编写如下代码,其中FirstAsyncWorker和SecondAsyncWorker是我编写的类,我在其中混合了Concurrent::Async模块,并编写了一个名为“work”的方法来发送HTTP请求:defindexop1_result=FirstAsyncWorker.new.async.
a=[3,4,7,8,3]b=[5,3,6,8,3]假设数组长度相同,是否有办法使用each或其他一些惯用方法从两个数组的每个元素中获取结果?不使用计数器?例如获取每个元素的乘积:[15,12,42,64,9](0..a.count-1).eachdo|i|太丑了...ruby1.9.3 最佳答案 使用Array.zip怎么样?:>>a=[3,4,7,8,3]=>[3,4,7,8,3]>>b=[5,3,6,8,3]=>[5,3,6,8,3]>>c=[]=>[]>>a.zip(b)do|i,j|c[[3,5],[4,3],[7,6],
我有一个非常简单的Controller来管理我的Rails应用程序中的静态页面:classPagesController我怎样才能让View模板返回它自己的名字,这样我就可以做这样的事情:#pricing.html.erb#-->"Pricing"感谢您的帮助。 最佳答案 4.3RoutingParametersTheparamshashwillalwayscontainthe:controllerand:actionkeys,butyoushouldusethemethodscontroller_nameandaction_nam
我想在Ruby的TCPServer中获取客户端的IP地址。以及(如果可能的话)MAC地址。例如,Ruby中的时间服务器,请参阅评论。tcpserver=TCPServer.new("",80)iftcpserverputs"Listening"loopdosocket=tcpserver.acceptifsocketThread.newdoputs"Connectedfrom"+#HERE!HowcanigettheIPAddressfromtheclient?socket.write(Time.now.to_s)socket.closeendendendend非常感谢!
一、什么是MQTT协议MessageQueuingTelemetryTransport:消息队列遥测传输协议。是一种基于客户端-服务端的发布/订阅模式。与HTTP一样,基于TCP/IP协议之上的通讯协议,提供有序、无损、双向连接,由IBM(蓝色巨人)发布。原理:(1)MQTT协议身份和消息格式有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。MQTT传输的消息分为:主题(Topic)和负载(payload)两部分Topic,可以理解为消息的类型,订阅者订阅(Su
1、接口请求基本操作1.1例子tips在view的选项可以zoomin调整窗口字帖大小。1、创建一个测试的workspace,并命名为test2、test后面新增一个addrequest3、选择发送GET,URL为一个开源的https://api.apiopen.top/api/sentences获取每日一句4、点击send查看内容Tips:如果提示出现Error:tunnelingsocketcouldnotbeestablished,statusCode=407错误,参照以下解决办法)关于tunnelingsocketcouldnotbeestablished,cause=getaddri
TCL脚本语言简介•TCL(ToolCommandLanguage)是一种解释执行的脚本语言(ScriptingLanguage),它提供了通用的编程能力:支持变量、过程和控制结构;同时TCL还拥有一个功能强大的固有的核心命令集。TCL经常被用于快速原型开发,脚本编程,GUI和测试等方面。•实际上包含了两个部分:一个语言和一个库。首先,Tcl是一种简单的脚本语言,主要使用于发布命令给一些互交程序如文本编辑器、调试器和shell。由于TCL的解释器是用C\C++语言的过程库实现的,因此在某种意义上我们又可以把TCL看作C库,这个库中有丰富的用于扩展TCL命令的C\C++过程和函数,所以,Tcl是
Linux操作系统——网络配置与SSH远程安装完VMware与系统后,需要进行网络配置。第一个目标为进行SSH连接,可以从本机到VMware进行文件传送,首先需要进行网络配置。1.下载远程软件首先需要先下载安装一款远程软件:FinalShell或者xhell7FinalShellxhell7FinalShell下载:Windows下载http://www.hostbuf.com/downloads/finalshell_install.exemacOS下载http://www.hostbuf.com/downloads/finalshell_install.pkg2.配置CentOS网络安装好