草庐IT

Springboot整合kafka

全部标签

java - Kafka 0.11 中 sendOffsetsToTransaction 的含义

新的Kafka版本(0.11)支持exactly-once语义。https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging我在Java中使用kafka事务代码设置了一个生产者,就像这样。producer.initTransactions();try{producer.beginTransaction();for(ProducerRecordrecord:payload){producer.send(record);}Mapgrou

java - 如何找到哪个消费者分配给kafka中主题的哪个分区?

我正在构建一个kafka管理器工具,我需要检查哪个主题分区分配给了消费者组中的哪个消费者。假设有消费者组group-A消费主题topic-A,n个分区,那么在group-A托管在不同的VM中。那么如何找到哪个分区分配给哪个消费者主机呢?在kafka0.9.1中可以吗?提前致谢。 最佳答案 您可以检查$KAFKA_HOME/bin/kafka-consumer-groups.sh的工作原理并将其实现集成到您的kafka管理器工具中,该工具将向您展示详细信息组所有者信息(例如,分区分配、滞后、IP)。小组主题分区CURRENT-OFFS

java - 如何使用 Java 中的结构化流从 Kafka 反序列化记录?

我使用Spark2.1。我正在尝试使用SparkStructuredStreaming从Kafka读取记录,反序列化它们并在之后应用聚合。我有以下代码:SparkSessionspark=SparkSession.builder().appName("Statistics").getOrCreate();Datasetdf=spark.readStream().format("kafka").option("kafka.bootstrap.servers",kafkaUri).option("subscribe","Statistics").option("startingOffset

java - 如何连接 Apache Kafka 和 Amazon S3?

我想使用KafkaConnect将来自Kafka的数据存储到存储桶s3中。我已经运行了一个Kafka的主题,并且创建了一个存储桶s3。我的主题有关于Protobuffer的数据,我尝试使用https://github.com/qubole/streamx我得到了下一个错误:[2018-10-0413:35:46,512]INFORevokingpreviouslyassignedpartitions[]forgroupconnect-s3-sink(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:280)[

java - 在Java中创建之前检查kafka中是否存在主题

我正在尝试使用以下方法在kafka0.8.2中创建一个主题:AdminUtils.createTopic(zkClient,myTopic,2,1,properties);如果我在本地多次运行代码进行测试,则会失败,因为主题已经创建。有没有办法在创建主题之前检查主题是否存在?TopicCommandapi似乎没有为listTopics或describeTopic返回任何内容. 最佳答案 您可以使用kakfa-client版本0.11.0.0的AdminClient示例代码:Propertiesconfig=newProperties

java - SpringBoot 2.1.x中使用RestTemplateBuilder的requestFactory自定义RestTemplate不向下兼容1.5.x版本

在SpringBoot1.5.x中,我创建了一个自定义RestTemplate,如下所示:@BeanpublicRestTemplaterestTemplate(RestTemplateBuilderrestTemplateBuilder){PoolingHttpClientConnectionManagerpoolingConnectionManager=newPoolingHttpClientConnectionManager();poolingConnectionManager.setMaxTotal(restTemplateProps.getMaxConnectionsPerP

java - 我应该如何将 JpaRepository.findOne() 与 SpringBoot 一起使用?

我刚开始通过阅读SpringBootinAction这本书来学习SpringBoot,我正在学习这本书的示例,尝试自己运行它们,但我有一个使用JpaRepository.findOne()时出现问题。我已经遍历了整章来寻找我可能的不匹配之处。但是,它就是行不通。该项目应该是一个简单的阅读列表。代码如下:读者@Entity:packagecom.lixin.readinglist;importorg.springframework.data.annotation.Id;importorg.springframework.security.core.GrantedAuthority;imp

SpringBoot使用Gateway聚合Springdoc,Knife4j

SpringBoot使用Gateway聚合Springdoc,Knife4j前言同时支持springboot:3.0,springboot:2.0,使用gateway聚合springdoc,ui使用knife4j,解决由于nginx配置代理前缀导致的文档无法访问,不强依赖注册中心(nacos,zk,Eureka)有帮助的话记得点个赞哟!!!基础环境将所有依赖集成好作为一个本地包供其他项目使用jdk17maven3.6+springboot3.0+|springboot2.0+springcloud:2022.0.1springcloud-alibaba:1.8.1-2022.0.0-RC2sp

java - 在kafka中创建多少生产者?

在一个高容量的实时javaweb应用程序中,我正在向apachekafka发送消息.目前我正在向单个主题发送消息,但将来我可能需要向多个主题发送消息。在这种情况下,我不确定是否应该为每个主题创建一个制作人,还是应该为我的所有主题使用一个制作人?这是我的代码:props=newProperties();props.put("zk.connect",:,:,:);props.put("zk.connectiontimeout.ms","1000000");props.put("producer.type","async");Producerproducer=newkafka.javaapi

java - 如何在Springboot Restcontroller中使用PUT方法?

我正在使用Springboot开发应用程序。我尝试使用所有表示动词,如GET、POST、DELETE,它们都工作正常。通过使用PUT方法,它在springboot中不支持。我是否需要添加任何新配置。Put方法仅适用于没有任何参数的请求。如果我添加任何查询参数或表单数据,它就不起作用。请任何专家帮助我解决这个问题。@RequestMapping("/student/info")@RequestMapping(method=RequestMethod.PUT)public@ResponseBodyStringupdateStudent(@RequestParam(value="stdNam