目录
本文不讲解SpringBoot整合kafka,只列举SpringBoot注解消费kafka消息的多种形式。
/**
* 指定一个消费者组,一个主题主题。
* @param record
*/
@KafkaListener(topics = IPHONE_TOPIC,groupId = APPLE_GROUP)
public void simpleConsumer(ConsumerRecord<String, String> record) {
System.out.println("进入simpleConsumer方法");
System.out.printf(
"分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
record.partition(),
record.offset(),
record.key(),
record.value(),
record.timestamp()
);
}
/**
* 指定多个主题。
*
* @param record
*/
@KafkaListener(topics = {IPHONE_TOPIC,IPAD_TOPIC},groupId = APPLE_GROUP)
public void topics(ConsumerRecord<String, String> record) {
System.out.println("进入topics方法");
System.out.printf(
"主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value(),
record.timestamp()
);
}
/**
* 监听一个主题,且指定消费主题的哪些分区。
* 参数详解:消费者组=apple_group;监听主题=iphoneTopic;只消费的分区=1,2;消费者数量=2
* @param record
*/
@KafkaListener(
groupId = APPLE_GROUP,
topicPartitions = {
@TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"})
},
concurrency = "2"
)
public void consumeByPattern(ConsumerRecord<String, String> record) {
System.out.println("consumeByPattern");
System.out.printf(
"主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value(),
record.timestamp()
);
}
/**
* 指定多个分区从哪个偏移量开始消费。
*/
@KafkaListener(
groupId = APPLE_GROUP,
topicPartitions = {
@TopicPartition(
topic = IPAD_TOPIC,
partitions = {"0","1"},
partitionOffsets = {
@PartitionOffset(partition = "2", initialOffset = "10"),
@PartitionOffset(partition = "3", initialOffset = "0"),
}
)
},
concurrency = "10"
)
public void consumeByPartitionOffsets(ConsumerRecord<String, String> record) {
System.out.println("consumeByPartitionOffsets");
System.out.printf(
"主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value(),
record.timestamp()
);
}
/**
* 指定多个主题。参数详解如上面的方法。
* @param record
*/
@KafkaListener(
groupId = APPLE_GROUP,
topicPartitions = {
@TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"}),
@TopicPartition(topic = IPAD_TOPIC, partitions = "1",
partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))
},
concurrency = "4"
)
public void topics2(ConsumerRecord<String, String> record) {
System.out.println("topics2");
System.out.printf(
"主题 = %s,分区 = %d, 偏移量 = %d, key = %s, 内容 = %s,创建消息的时间戳 =%d%n",
record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value(),
record.timestamp()
);
}
/**
* 指定多个消费者组。参数详解如上面的方法。
*
* @param record
*/
@KafkaListeners({
@KafkaListener(
groupId = APPLE_GROUP,
topicPartitions = {
@TopicPartition(topic = IPHONE_TOPIC, partitions = {"1", "2"}),
@TopicPartition(topic = IPAD_TOPIC, partitions = "1",
partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))
},
concurrency = "3"
),
@KafkaListener(
groupId = XM_GROUP,
topicPartitions = {
@TopicPartition(topic = XMPHONE_TOPIC, partitions = {"1", "2"}),
@TopicPartition(topic = XMPAD_TOPIC, partitions = "1",
partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "5"))
},
concurrency = "3"
)
}
)
public void groupIds(ConsumerRecord<String, String> record) {
System.out.println("groupIds");
System.out.println("内容:" + record.value());
System.out.println("分区:" + record.partition());
System.out.println("偏移量:" + record.offset());
System.out.println("创建消息的时间戳:" + record.timestamp());
}
/**
* 设置手动提交偏移量
*
* @param record
*/
@KafkaListener(
topics = IPHONE_TOPIC,
groupId = APPLE_GROUP,
//3个消费者
concurrency = "3"
)
public void setCommitType(ConsumerRecord<String, String> record, Acknowledgment ack) {
System.out.println("setCommitType");
System.out.println("内容:" + record.value());
System.out.println("分区:" + record.partition());
System.out.println("偏移量:" + record.offset());
System.out.println("创建消息的时间戳:" + record.timestamp());
ack.acknowledge();
}
一、什么是MQTT协议MessageQueuingTelemetryTransport:消息队列遥测传输协议。是一种基于客户端-服务端的发布/订阅模式。与HTTP一样,基于TCP/IP协议之上的通讯协议,提供有序、无损、双向连接,由IBM(蓝色巨人)发布。原理:(1)MQTT协议身份和消息格式有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。MQTT传输的消息分为:主题(Topic)和负载(payload)两部分Topic,可以理解为消息的类型,订阅者订阅(Su
TCL脚本语言简介•TCL(ToolCommandLanguage)是一种解释执行的脚本语言(ScriptingLanguage),它提供了通用的编程能力:支持变量、过程和控制结构;同时TCL还拥有一个功能强大的固有的核心命令集。TCL经常被用于快速原型开发,脚本编程,GUI和测试等方面。•实际上包含了两个部分:一个语言和一个库。首先,Tcl是一种简单的脚本语言,主要使用于发布命令给一些互交程序如文本编辑器、调试器和shell。由于TCL的解释器是用C\C++语言的过程库实现的,因此在某种意义上我们又可以把TCL看作C库,这个库中有丰富的用于扩展TCL命令的C\C++过程和函数,所以,Tcl是
假设我有200个昂贵的方法调用(每个都有不同的参数)。出于某种原因,我可以并行执行其中的5个调用,但不能更多。我可以一次执行一个,但一次执行5个要快5倍。我想一直执行五件事。不想排五个,等五个都排完了,再排五个。如果我排队A、B、C、D、E并且C先完成,我想立即用F替换它,即使A和B还没有完成。我一直在研究这个问题,因为我可以想象它会定期发生。解决方案似乎是生产者-消费者模式,Ruby在其标准库中内置了一些用于该模式的结构(Queue和SizedQueue)。我玩过代码示例,阅读了一些文档,我想我对它有一个粗略的了解。但是我有一些问题我对我的解决方案没有信心,而且多线程的整个领域对我来
开门见山|拉取镜像dockerpullelasticsearch:7.16.1|配置存放的目录#存放配置文件的文件夹mkdir-p/opt/docker/elasticsearch/node-1/config#存放数据的文件夹mkdir-p/opt/docker/elasticsearch/node-1/data#存放运行日志的文件夹mkdir-p/opt/docker/elasticsearch/node-1/log#存放IK分词插件的文件夹mkdir-p/opt/docker/elasticsearch/node-1/plugins若你使用了moba,直接右键新建即可如上图所示依次类推创建
文章目录概念索引相关操作创建索引更新副本查看索引删除索引索引的打开与关闭收缩索引索引别名查询索引别名文档相关操作新建文档查询文档更新文档删除文档映射相关操作查询文档映射创建静态映射创建索引并添加映射概念es中有三个概念要清楚,分别为索引、映射和文档(不用死记硬背,大概有个印象就可以)索引可理解为MySQL数据库;映射可理解为MySQL的表结构;文档可理解为MySQL表中的每行数据静态映射和动态映射上面已经介绍了,映射可理解为MySQL的表结构,在MySQL中,向表中插入数据是需要先创建表结构的;但在es中不必这样,可以直接插入文档,es可以根据插入的文档(数据),动态的创建映射(表结构),这就
如果您希望在Spring中启用定时任务功能,则需要在主类上添加 @EnableScheduling 注解。这样Spring才会扫描 @Scheduled 注解并执行定时任务。在大多数情况下,只需要在主类上添加 @EnableScheduling 注解即可,不需要在Service层或其他类中再次添加。以下是一个示例,演示如何在SpringBoot中启用定时任务功能:@SpringBootApplication@EnableSchedulingpublicclassApplication{publicstaticvoidmain(String[]args){SpringApplication.ru
软件特点部署后能通过浏览器查看线上日志。支持Linux、Windows服务器。采用随机读取的方式,支持大文件的读取。支持实时打印新增的日志(类终端)。支持日志搜索。使用手册基本页面配置路径配置日志所在的目录,配置后按回车键生效,下拉框选择日志名称。选择日志后点击生效,即可加载日志。windows路径E:\java\project\log-view\logslinux路径/usr/local/XX历史模式历史模式下,不会读取新增的日志。针对历史文件可以分页读取,配置分页大小、跳转。历史模式下,支持根据关键词搜索。目前搜索引擎使用的是jdk自带类库,搜索速度相对较低,优点是比较简单。2G日志全文搜
HTTP缓存是指浏览器或者代理服务器将已经请求过的资源保存到本地,以便下次请求时能够直接从缓存中获取资源,从而减少网络请求次数,提高网页的加载速度和用户体验。缓存分为强缓存和协商缓存两种模式。一.强缓存强缓存是指浏览器直接从本地缓存中获取资源,而不需要向web服务器发出网络请求。这是因为浏览器在第一次请求资源时,服务器会在响应头中添加相关缓存的响应头,以表明该资源的缓存策略。常见的强缓存响应头如下所述:Cache-ControlCache-Control响应头是用于控制强制缓存和协商缓存的缓存策略。该响应头中的指令如下:max-age:指定该资源在本地缓存的最长有效时间,以秒为单位。例如:Ca
1.依赖导入org.springframework.bootspring-boot-starter-weborg.springframework.bootspring-boot-starter-validation2.validation常用注解@Null被注释的元素必须为null@NotNull被注释的元素不能为null,可以为空字符串@AssertTrue被注释的元素必须为true@AssertFalse被注释的元素必须为false@Min(value)被注释的元素必须是一个数字,其值必须大于等于指定的最小值@Max(value)被注释的元素必须是一个数字,其值必须小于等于指定的最大值@D
如何用IDEA2022创建并初始化一个SpringBoot项目?目录如何用IDEA2022创建并初始化一个SpringBoot项目?0. 环境说明1. 创建SpringBoot项目 2.编写初始化代码0. 环境说明IDEA2022.3.1JDK1.8SpringBoot1. 创建SpringBoot项目 打开IDEA,选择NewProject创建项目。 填写项目名称、项目构建方式、jdk版本,按需要修改项目文件路径等信息。 选择springboot版本以及需要的包,此处只选择了springweb。 此处需特别注意,若你使用的是jdk1