草庐IT

【译】Apache Kafka 快速入门

calvinit 2023-03-28 原文

编译自官方文档

第 1 步:获取 Kafka

下载最新版本(当前为 v3.3.1)的 Kafka 并解压:

$ tar -xzf kafka_2.13-3.3.1.tgz
$ cd kafka_2.13-3.3.1

第 2 步:启动 Kafka 环境

注意:本地环境必须安装了 Java 8+。

Apache Kafka 可以配套使用 ZooKeeper 或者 KRaft 启动,请参考以下的 2.1 和 2.2 的其中任一小节步骤(二者选其一即可)配置以开始使用。

2.1 Kafka 使用 ZooKeeper 启动

运行以下命令以按正确顺序启动所有服务:

# 启动 ZooKeeper 服务
$ bin/zookeeper-server-start.sh config/zookeeper.properties

打开另一个终端会话并运行:

# 启动 Kafka Broker 服务
$ bin/kafka-server-start.sh config/server.properties

一旦所有服务都启动成功,你将拥有一个正在运行并可以使用的基本 Kafka 环境。

2.2 Kafka 使用 KRaft 启动

生成集群 UUID:

$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

格式化日志目录:

$ bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

启动 Kafka 服务:

$ bin/kafka-server-start.sh config/kraft/server.properties

Kafka 服务启动成功后,你将拥有一个正在运行并可以使用的基本 Kafka 环境。

第 3 步:创建一个主题(Topic)来存储事件(Event)

Kafka 是一个分布式事件流平台,可让你跨多台计算机读取、写入、存储和处理事件(在文档中也称为 记录(Record)消息(Message))。

示例事件包括支付交易、来自手机的地理位置更新、物流运输订单、来自物联网设备或医疗设备的传感器测量数据等等。这些事件统一被组织并存储在 主题 中。简单来说,主题就类似于文件系统中的文件夹,而事件则是该文件夹中的文件。

因此,在你编写第一个事件之前,你必须创建一个主题。打开另一个终端会话并运行:

$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

Kafka 的所有命令行工具都有额外的选项:运行 kafka-topics.sh 不带任何参数的命令以显示使用信息。例如,它还可以向你显示新主题的分区计数等详细信息:

$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events        TopicId: NPmZHyhbR9y00wMglMH2sg PartitionCount: 1       ReplicationFactor: 1	Configs:
    Topic: quickstart-events Partition: 0    Leader: 0   Replicas: 0 Isr: 0

第 4 步:将一些事件写入主题

Kafka 客户端通过网络与 Kafka Broker 通信以读写事件。一旦收到,Broker 将以持久和容错的方式存储事件并保留一段时间(只要你需要,甚至可以一直保留)。

运行控制台生产者客户端以将一些事件写入你的主题。默认情况下,你输入的每一行都会导致一个单独的事件被写入主题中:

$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
> This is my first event
> This is my second event
> 

你可以随时按下 Ctrl + C 以停止生产者客户端。

第 5 步:读取事件

打开另一个终端会话并运行控制台消费者客户端以读取你刚刚创建的事件:

$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event

你可以随时按下 Ctrl + C 以停止消费者客户端。

然后你可以继续尝试:例如,切换回你的生产者终端(上一步)以编写其他事件,并查看这些事件如何立即显示在你的消费者终端中。

因为事件是持久存储在 Kafka 中的,所以它们可以被任意多次读取,并且可以被任意多的消费者读取。你可以通过打开另一个终端会话并再次重新运行之前的命令来轻松验证这一点。

第 6 步:使用 Kafka Connect 将数据导入或导出为事件流(Stream)

你可能在关系型数据库或传统的消息传递系统等现有系统中拥有了大量数据,以及许多已经在使用这些系统的应用程序。 Kafka Connect 允许你不断地将数据从外部系统提取到 Kafka 中,反之亦然。它是一个运行着 连接器(Connector) 的可扩展工具,连接器实现了与外部系统交互的自定义​​逻辑,因此很容易将现有系统与 Kafka 进行集成。为了使这个过程更容易,现有数百个这样的连接器随时可用。

在本篇文章中,我们将了解到如何使用简单的连接器运行 Kafka Connect,这些连接器将数据从文件导入到 Kafka 主题中,再将数据从 Kafka 主题中导出到另一文件。

首先,确保已将 connect-file-3.3.1.jar 添加到 Connect Worker 配置中的 plugin.path 属性下。出于快速入门演示的目的,我们将使用相对路径并将连接器的包视为 超级 Jar 包(uber-jar),当从安装目录运行命令时,它会起作用。但是,值得注意的是,对于生产环境的部署,选择使用绝对路径的建议始终是可取的。有关如何设置此配置的详细说明,请参阅 plugin.path

编辑 config/connect-standalone.properties 文件,添加或更改 plugin.path 与以下匹配的配置属性,然后保存文件:

> echo "plugin.path=libs/connect-file-3.3.1.jar"

然后,首先创建一些种子数据以方便进行测试:

> echo -e "foo\nbar" > test.txt

或者,在 Windows 系统上时:

> echo foo> test.txt 
> echo bar>> test.txt

接下来,我们将启动两个以 单机(Standalone) 模式运行的连接器,这意味着它们在单个本地专用进程中运行。我们提供三个配置文件作为参数。第一个始终是 Kafka Connect 进程的配置,包含常见配置,例如要连接的 Kafka Broker 和数据的序列化格式等。其余配置文件分别指定要创建的连接器,这些文件包括唯一的连接器名称、要实例化的连接器类以及连接器所需的任何其他配置:

> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

这些示例配置文件包含在 Kafka 中,使用你之前启动的默认本地集群配置并创建两个连接器:第一个是源连接器,它从输入文件中读取行并将每个行生成事件并导入到 Kafka 主题中,第二个是接收连接器,它从 Kafka 主题中读取消息并在输出文件中将每条消息生成为一行。

在启动过程中,你会看到许多日志消息,包括一些表明正在实例化连接器的消息。一旦 Kafka Connect 进程启动成功,源连接器应该开始从 test.txt 文件中读取每一行并将它们生成事件并导入到主题 connect-test 中,而接收连接器应该开始从主题 connect-test 中读取消息并将它们写入到文件 test.sink.txt 中。我们可以通过检查输出文件的内容来验证数据是否已通过整个管道进行了传送:

> more test.sink.txt
foo
bar

请注意,数据存储在 Kafka 主题 connect-test 中,因此我们还可以运行控制台消费者来查看主题中的数据(或使用自定义消费者代码来处理它):

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...

连接器继续处理数据,因此我们可以将数据添加到文件中并查看它在管道中的移动结果:

> echo Another line>> test.txt

你应该可以看到该行出现在控制台消费者输出和接收器文件中。

第 7 步:使用 Kafka Streams 处理事件

一旦你的数据作为事件存储在 Kafka 中,你就可以使用适用于 Java/Scala 的 Kafka Streams 客户端库处理数据。它允许你实施任务关键型实时应用程序和微服务,其中输入和/或输出数据存储在 Kafka 主题中。Kafka Streams 将在客户端编写和部署标准 Java 和 Scala 应用程序的简单性与 Kafka 的服务器端集群技术的优势相结合,使这些应用程序具有高度的可扩展、弹性、容错和分布式的特性。该库支持精确一次(exactly-once)的语义处理、有状态操作和聚合、窗口化、连接、基于事件时间的处理等等。

为了给你一个初步的体验,下面演示如何实现流行的 WordCount 算法:

KStream<String, String> textLines = builder.stream("quickstart-events");

KTable<String, Long> wordCounts = textLines
            .flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
            .groupBy((keyIgnored, word) -> word)
            .count();

wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));

Kafka Streams 示例应用开发教程演示了如何从头到尾编写和运行此类流应用程序。

第 8 步:终止 Kafka 环境

现在你已经完成了快速入门教程,可以随时删除 Kafka 环境,或者再继续尝试:

  1. 按下 Ctrl + C 以停止生产者和消费者客户端(如果你还没有这样做的话);
  2. 按下 Ctrl + C 以停止 Kafka Broker;
  3. 最后,如果第 2 步启动 Kafka 环境选择了 Kafka 使用 ZooKeeper 启动的,请按下 Ctrl + C 以停止 ZooKeeper 服务;

如果你还想删除本地 Kafka 环境的任何数据,包括你在此过程中创建的任何事件,请运行以下命令:

$ rm -rf /tmp/kafka-logs /tmp/zookeeper /tmp/kraft-combined-logs

恭喜!

你已成功完成了 Apache Kafka 的快速入门教程。

要了解更多信息,我们建议你可以执行以下后续步骤:

  • 阅读简单的介绍,了解 Kafka 的高级工作原理、主要概念以及与其他技术的比较。要更详细地了解 Kafka,请转到文档
  • 浏览用例,了解我们全球社区中的其他用户如何从 Kafka 中获得价值。
  • 加入当地的 Kafka 会议小组, 观看 Kafka 社区的主要会议 Kafka Summit 的演讲

有关【译】Apache Kafka 快速入门的更多相关文章

  1. LC滤波器设计学习笔记(一)滤波电路入门 - 2

    目录前言滤波电路科普主要分类实际情况单位的概念常用评价参数函数型滤波器简单分析滤波电路构成低通滤波器RC低通滤波器RL低通滤波器高通滤波器RC高通滤波器RL高通滤波器部分摘自《LC滤波器设计与制作》,侵权删。前言最近需要学习放大电路和滤波电路,但是由于只在之前做音乐频谱分析仪的时候简单了解过一点点运放,所以也是相当从零开始学习了。滤波电路科普主要分类滤波器:主要是从不同频率的成分中提取出特定频率的信号。有源滤波器:由RC元件与运算放大器组成的滤波器。可滤除某一次或多次谐波,最普通易于采用的无源滤波器结构是将电感与电容串联,可对主要次谐波(3、5、7)构成低阻抗旁路。无源滤波器:无源滤波器,又称

  2. 微信小程序开发入门与实战(Behaviors使用) - 2

    @作者:SYFStrive @博客首页:HomePage📜:微信小程序📌:个人社区(欢迎大佬们加入)👉:社区链接🔗📌:觉得文章不错可以点点关注👉:专栏连接🔗💃:感谢支持,学累了可以先看小段由小胖给大家带来的街舞👉微信小程序(🔥)目录自定义组件-behaviors    1、什么是behaviors    2、behaviors的工作方式    3、创建behavior    4、导入并使用behavior    5、behavior中所有可用的节点    6、同名字段的覆盖和组合规则总结最后自定义组件-behaviors    1、什么是behaviorsbehaviors是小程序中,用于实现

  3. 【Java入门】使用Java实现文件夹的遍历 - 2

    遍历文件夹我们通常是使用递归进行操作,这种方式比较简单,也比较容易理解。本文为大家介绍另一种不使用递归的方式,由于没有使用递归,只用到了循环和集合,所以效率更高一些!一、使用递归遍历文件夹整体思路1、使用File封装初始目录,2、打印这个目录3、获取这个目录下所有的子文件和子目录的数组。4、遍历这个数组,取出每个File对象4-1、如果File是否是一个文件,打印4-2、否则就是一个目录,递归调用代码实现publicclassSearchFile{publicstaticvoidmain(String[]args){//初始目录Filedir=newFile("d:/Dev");Datebeg

  4. ES基础入门 - 2

    ES一、简介1、ElasticStackES技术栈:ElasticSearch:存数据+搜索;QL;Kibana:Web可视化平台,分析。LogStash:日志收集,Log4j:产生日志;log.info(xxx)。。。。使用场景:metrics:指标监控…2、基本概念Index(索引)动词:保存(插入)名词:类似MySQL数据库,给数据Type(类型)已废弃,以前类似MySQL的表现在用索引对数据分类Document(文档)真正要保存的一个JSON数据{name:"tcx"}二、入门实战{"name":"DESKTOP-1TSVGKG","cluster_name":"elasticsear

  5. ruby - 如何以表格格式快速打印 Ruby 哈希值? - 2

    有没有办法快速将表格格式的ruby​​哈希打印到文件中?如:keyAkeyBkeyC...1232343451253474456...其中散列的值是不同大小的数组。还是使用双循环是唯一的方法?谢谢 最佳答案 试试我写的这个gem(在表中打印散列、ruby对象、ActiveRecord对象):http://github.com/arches/table_print 关于ruby-如何以表格格式快速打印Ruby哈希值?,我们在StackOverflow上找到一个类似的问题:

  6. 电脑启动后显示器黑屏怎么办?排查下面4个问题,快速解决 - 2

    电脑启动出现显示器黑屏是一个相当常见的问题。如果您遇到了这个问题,不要惊慌,因为它有很多可能的原因,可以采取一些简单的措施来解决它。在本文中,小编将介绍下面4种常见的电脑启动后显示器黑屏的原因,排查这些原因,快速解决! 演示机型:联想Ideapad700-15ISK-ISE系统版本:Windows10一、显示器问题如果出现电脑启动后显示器黑屏的情况。那么首先您需要检查一下显示器是否正常工作。您可以通过更换另一个显示器或将当前显示器连接到另一台计算机来检查显示器是否存在问题。如果问题仍然存在,那么您可以排除显示器故障的可能性。 二、显卡问题如果您的电脑配备了独立显卡,那么显卡故障也可能是导致电脑

  7. 区块链入门教程(6)--WeBASE-Front节点前置服务安装 - 2

    文章目录1.任务背景2.任务目标3.相关知识点4.任务实操4.1安装配置JDK4.2启动FISCOBCOS4.3下载解压WeBASE-Front4.4拷贝sdk证书文件4.5启动节点4.6访问节点4.7检查运行状态5.任务总结1.任务背景FISCOBCOS其实是有控制台管理工具,用来对区块链系统进行各种管理操作。但是对于初学者来说,还是可视化界面更友好,本节就来介绍WeBASE管理平台,这是一款微众银行开源的自研区块链中间件平台,可以降低区块链使用的门槛,大幅提高区块链应用的开发效率。微众银行是腾讯牵头设立的民营银行,在国内民营银行里还是比较出名的。微众银行参与FISCOBCOS生态建设,一定

  8. Tcl脚本入门笔记详解(一) - 2

    TCL脚本语言简介•TCL(ToolCommandLanguage)是一种解释执行的脚本语言(ScriptingLanguage),它提供了通用的编程能力:支持变量、过程和控制结构;同时TCL还拥有一个功能强大的固有的核心命令集。TCL经常被用于快速原型开发,脚本编程,GUI和测试等方面。•实际上包含了两个部分:一个语言和一个库。首先,Tcl是一种简单的脚本语言,主要使用于发布命令给一些互交程序如文本编辑器、调试器和shell。由于TCL的解释器是用C\C++语言的过程库实现的,因此在某种意义上我们又可以把TCL看作C库,这个库中有丰富的用于扩展TCL命令的C\C++过程和函数,所以,Tcl是

  9. Simulink方法总结和避坑指南(一)——Simulink入门与基本调试方法 - 2

    文章目录一、项目场景二、基本模块原理与调试方法分析——信源部分:三、信号处理部分和显示部分:四、基本的通信链路搭建:四、特殊模块:interpretedMATLABfunction:五、总结和坑点提醒一、项目场景  最近一个任务是使用simulink搭建一个MIMO串扰消除的链路,并用实际收到的数据进行测试,在搭建的过程中也遇到了不少的问题(当然这比vivado里面的debug好不知道多少倍)。准备趁着这个机会,先以一个很基本的通信链路对simulink基础和相关的debug方法进行总结。  在本篇中,主要记录simulink的基本原理和基本的SISO通信传输链路(QPSK方式),计划在下篇记

  10. ruby - 使单元测试快速失败以进行突变测试 - 2

    mutationtesting遇到一个问题是它很慢,因为默认情况下您会为每个生成的突变执行完整的测试运行(测试文件或一组测试文件)。加快突变测试的一种方法是,一旦遇到单一故障(但仅在突变测试期间),就停止对给定突变体的测试运行。更好的做法是让变异测试者记住杀死最后一个变异体的第一个测试是什么,并将其首先交给下一个变异体。ruby中是否有任何东西可以做这些事情,或者我最好的选择是开始猴子修补?(是的,我知道单元测试应该很快。显示所有失败的测试在突变测试之外很有用,因为它不仅可以帮助您识别出问题,还可以查明哪里出了问题)编辑:我目前正在对测试/单元使用heckle。如果测试/单元不可能记住

随机推荐