草庐IT

kafka常用指令

failymao 2023-03-28 原文

1. 前言

kafka作为数据中间键在数据服务解耦,大数据,日志方面堪称一把瑞士军刀,备受青睐,作为提供了一套完整命令行的程序,日常进行调试过程中难免会用到一些指令,现总结18条最常用指令。

2. 常用指令

  1. 启动消费者

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 -topic quickstart-events --from-beginning
    
  2. 启动生产者

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic quickstart-events
    
  3. 启动zookeeper

    /mnt/d/wsl/kafka_2.12-2.8.0# bin/zookeeper-server-start.sh config/zookeeper.properties
    
  4. 启动kafka服务端进程

    /mnt/d/wsl/kafka_2.12-2.8.0# bin/kafka-server-start.sh config/server.properties
    
  5. 创建topic

    bin/kafka-topics.sh --create --topic topicname --replication-factor 1 --partitions 1 --zookeeper localhost:2181
    
  6. 查看topic

    bin/kafka-topics.sh --list --zookeeper localhost:2181
    
  7. 查看所有topic

    ./bin/kafka-topics.sh --zookeeper localhost:2181 --list 
    
  8. 删除某个topic

    bin/kafka-topics.sh --delete --topic synch.postgres_db  --zookeeper localhost:2181
    
  9. 查看topic详情

    bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic soar_alarm --describe
    
  10. 查看topic消费进度,必须参数为--group, 不指定--topic,默认为所有topic

    bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group merge_data_service
    
  11. 列出所有主题中的所有用户组

    bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group merge_data_service --members
    
    # 输出
    CONSUMER-ID                                  HOST            CLIENT-ID       #PARTITIONS
    rdkafka-22797a51-c161-4377-a4cb-e60423784779 /10.249.151.24  rdkafka         1
    
  12. 删除消费者组

    bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --delete --group merge_data_service
    
  13. 通过 state 参数来查看消费组当前的状态

    bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group merge_data_service --state
    
    # 输出
    COORDINATOR (ID)          ASSIGNMENT-STRATEGY       STATE                #MEMBERS
    linkage:9094 (1001)       range                     Stable               1
    
  14. 通过 verbose 参数,那么还会罗列出每个消费者成员的分配情况

    bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group merge_data_service --members --verbose
    
    # 输出
    CONSUMER-ID                                  HOST            CLIENT-ID       #PARTITIONS     ASSIGNMENT
    rdkafka-22797a51-c161-4377-a4cb-e60423784779 /10.249.151.24  rdkafka         1               soar_alarm_merge(0)
    
    
  15. 查看所有消费者组

    ./bin/kafka-consumer-groups.sh --bootstrap-server 10.48.93.9:9092 --list
    # 输出
    sabre.monitor
    merge_data_service
    rule_data_topic
    test-consumer-gr
    filebeat-soar
    salmon_groupid_002
    
  16. 查看 集群版 topic详情

    ./bin/kafka-topics.sh --zookeeper zk1.node:12181,zk2.node:12181,zk3.node:12181  --describe --topic test05
    
  17. 获取某个topic每个分区的文件条目

    ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic SOAR_OCNN57
    
  18. 彻底删除topic

    # 进入zookeeper客户端
    zookeeper-client
    rmr /admin/delete_topics/soar_alarm_merge
    rmr /brokers/topics/soar_alarm_merge
    

有关kafka常用指令的更多相关文章

  1. ruby-on-rails - Rails 常用字符串(用于通知和错误信息等) - 2

    大约一年前,我决定确保每个包含非唯一文本的Flash通知都将从模块中的方法中获取文本。我这样做的最初原因是为了避免一遍又一遍地输入相同的字符串。如果我想更改措辞,我可以在一个地方轻松完成,而且一遍又一遍地重复同一件事而出现拼写错误的可能性也会降低。我最终得到的是这样的:moduleMessagesdefformat_error_messages(errors)errors.map{|attribute,message|"Error:#{attribute.to_s.titleize}#{message}."}enddeferror_message_could_not_find(obje

  2. 电脑怎么截图?进来看(8种常用截图方法) - 2

    电脑上可以截取图片吗?如果可以,该如何操作呢?相信很多小伙伴都只知道一两种截图的方式,知道的并不全面。其实,电脑上有多种方式截图的,而且非常方便。电脑怎么截图?今天我们就来教大家如何使用电脑截取图片的8种常用方式!操作环境:演示机型:Delloptiplex7050系统版本:Windows10方法一:系统自带截图具体操作:同时按下电脑的自带截图键【Windows+shift+S】,可以选择其中一种方式来截取图片:截屏有矩形截屏、任意形状截屏、窗口截屏和全屏截图。 方法二:QQ截图具体操作:在电脑登录QQ,然后同时按下【Ctrl+Alt+A】,可以任意截图你需要的界面,可以把截图的页面直接下载,

  3. Unity常用文件夹 - 2

    1.Scenes游戏场景文件夹用于放置unity的场景文件 2.Plugins插件文件夹用于放置unity的依赖文件,例如dll 3.Scripts脚本文件夹用于放置unity的c#脚本文件 4.Resources游戏资源文件夹用于放置unity的各种游戏资源,比如images,prefabs,同时只有放到Resources文件夹的游戏资源才能使用Resource.load(资源路径不加后缀)加载到游戏内存中进行使用 5.EditorUnity编辑器扩展脚本文件夹usingUnityEditor;这个名称空间就是Unity编辑器的名称空间这个名称空间提供了扩展Unity编辑器的各种类 【你所有

  4. 【Linux】初识Linux --指令Ⅰ - 2

    Halo,这里是Ppeua。平时主要更新C语言,C++,数据结构算法,Linux…感兴趣就关注我吧!你定不会失望。目录1.ls显示当前目录下的文件内内容2.pwd-显示用户当前所在的目录3.cd-改变工作目录。将当前工作目录改变到指定的目录下1.cd-回到上一次待的工作空间2.cd..返回上一层目录1.相对路径:cd../aurora2.绝对路径:cd/home/aurora/lesson1/aurora3.cd~进入用户家目录4.cd/进入root目录4.mkdir-新建目录5.rmdir/rm-删除1.rmdir删除空文件夹2.rm删除1.rm-f2.rm-i3.rm-r1.ls显示当前目

  5. iOS快捷指令:执行Python脚本(利用iSH Shell) - 2

    文章目录前言核心逻辑配置iSH安装Python创建Python脚本配置启动文件测试效果快捷指令前言iOS快捷指令所能做的操作极为有限。假如快捷指令能运行Python程序,那么可操作空间就瞬间变大了。iSH是一款免费的iOS软件,它模拟了一个类似Linux的命令行解释器。我们将在iSH中运行Python程序,然后在快捷指令中获取Python程序的输出。核心逻辑我们用一个“获取当前日期”的Python程序作为演示(其实快捷指令中本身存在“获取当前日期”的操作,因而此需求可以不用Python,这里仅仅为了演示方便),核心代码如下。>>>importtime>>>time.strftime('%Y-%

  6. kafka如何动态消费新增topic主题 - 2

    一、解决痛点使用spring-kafka客户端,每次新增topic主题,都需要硬编码客户端并重新发布服务,操作麻烦耗时长。kafkaListener虽可以支持通配符消费topic,缺点是并发数需要手动改并且重启服务。对于业务逻辑相似场景,创建新主题动态监听可以用kafka-batch-starter组件二、组件能力1、新增topic名称为:auto.topic1(由于配置spring.kafka.consumer.prefix为auto,因此只有auto前缀的topic,才会被组件动态监听。)2、应用输出日志,监听到新增auto.topic1,并初始化客户端(主题刷新间隔为10s)3、发新的消

  7. Spark的常用SQL日期函数 - 2

    一、获取当前时间1、current_date当前日期(年月日)Examples:SELECTcurrent_date;2、current_timestamp/now()当前日期(时间戳)Examples:SELECTcurrent_timestamp;二、从日期字段中提取时间1、year,month,day/dayofmonth,hour,minute,secondExamples:SELECTyear(now());其他的日期函数以此类推month:1day:12(当月的第几天)dayofmonth:12hour,minute,second:分别对应时分秒2、dayofweek、dayofm

  8. ruby-on-rails - 设计 skip_confirmation!未能避免发送确认指令 - 2

    我的应用程序设置为如果用户使用Oauth或Openid登录,他们不必确认他们的电子邮件地址。但是,Devise仍在发送电子邮件确认。当我调用User.skip_confirmation时!我得到一个未定义的方法错误。我的模型:classUserfalsedefself.find_for_facebook_oauth(access_token,signed_in_resource=nil)data=access_token.extra.raw_infoifuser=User.where(:email=>data.email).firstuserelse#User.skip_confirm

  9. ruby - 将异常处理作为 Ruby 中的常用方法 - 2

    有人能告诉我有没有办法将异常处理作为一种通用方法并在方法中使用它?让我进一步解释一下。例如我有以下方法defadd(num1,num2)beginnum1+num2rescueException=>eraiseeendenddefdivide(num1,num2)beginnum1/num2rescueException=>eraiseeendend如您所见,尽管我的方法只需要一行,但由于异常处理代码,该方法变得更大了。我正在寻找的是一个更slim的解决方案,比如(只是一个想法)defadd(num1,num2)num1+num2unlessraise_exceptionenddefd

  10. ruby-on-rails - Rails 路由中的 "mount"指令是什么意思? - 2

    我在Railsroutingsystem中找不到关键字“mount”的含义.我已经设置了Mercury在我的Rails应用程序中使用。它将这一行添加到我的routes.rb配置文件中:Appname::Application.routes.drawdomountMercury::Engine=>'/'mount关键字是什么意思? 最佳答案 Mount在Rails路由中相当于Unixmount。它实际上告诉应用程序该位置存在另一个应用程序(通常是Rack应用程序)。它主要用于Rails引擎。

随机推荐