草庐IT

同步到Kafka

全部标签

java - 当我以静态方式同步块(synchronized block)调用 wait() 时,为什么 Java 抛出 java.lang.IllegalMonitorStateException?

我不明白为什么Java会在这段代码中从主题中抛出异常。有人能给我解释一下吗?classWaitimplementsRunnable{publicvoidrun(){synchronized(Object.class){try{while(true){System.out.println("Beforewait()");wait();System.out.println("Afterwait()");}}catch(InterruptedExceptione){e.printStackTrace();}}}}publicclassObjectMethodInConcurency{publ

Spring-Kafka 3.0 消费者消费失败处理方案

一、背景我们作为Kafka在使用Kafka是,必然考虑消息消费失败的重试次数,重试后仍然失败如何处理,要么阻塞,要么丢弃,或者保存二、设置消费失败重试次数1默认重试次数在哪里看Kafka3.0版本默认失败重试次数为10次,准确讲应该是1次正常调用+9次重试,这个在这个类可以看到org.springframework.kafka.listener.SeekUtils2如何修改重试次数据我的实验,spring-kafka3.0版本通过application.yml配置是行不通的,也没有找到任何一项配置可以改重试次数的(网上很多说的通过配置spring.kafka.consumer.retries可

java - 构造函数中的同步块(synchronized block)有什么用?

我们不能使构造函数synchronized,但可以在构造函数中编写synchronized。什么情况下会出现这样的要求?我被逗乐了。packagecom.simple;publicclassTest{publicTest(){synchronized(this){System.out.println("Iamcalled...");}}publicstaticvoidmain(String[]args){Testtest=newTest();System.out.println(""+test);}@OverridepublicStringtoString(){return"Test[

java - 具有解码器问题的 Kafka Avro 消费者

当我尝试运行KafkaConsumerwithAvro时在我各自模式的数据上,它返回错误“AvroRuntimeException:Malformeddata.Lengthisnegative:-40”。我看到其他人也有类似的问题convertingbytearraytojson,Avrowriteandread,和KafkaAvroBinary*coder.我也引用了这个ConsumerGroupExample,这些都有帮助,但到目前为止对这个错误没有帮助。它一直工作到这部分代码(第73行)解码器decoder=DecoderFactory.get().binaryDecoder(b

springboot、spring-kafka、kafka-client的版本对应关系

在使用springboot集成kafka的时候需要注意springboot版本、引用的依赖spring-kafka版本和kafka中间件版本的对应关系,否则可能会因为版本不兼容导致出现错误。1、含义说明(摘自官网)SpringBoot:是springboot的版本。SpringforApacheKafkaVersion:是springboot集成的spring-kafka的版本,如org.springframework.kafkaspring-kafka2.6.8springIntegrationforApacheKafkaVersion:是springboot集成的spring-integr

Kafka-消费者-KafkaConsumer分析-offset操作

提交offset在进行消费者正常消费过程中以及Rebalance操作开始之前,都会提交一次offset记录Consumer当前的消费位置。提交offset的功能也是由ConsumerCoordinator实现的。先来了解OffsetCommitRequest和OffsetCommitResponse的消息体格式,如图所示。OffsetCommitRequest中各个字段的含义如表所示。OffsetCommitResponse中各个字段的含义如表所示。图展示了ConsumerCoordinator中与提交offset相关的四个方法以及它们之间的调用关系。在SubscriptionState中使用

java - 如何在 setter 在 Java 中工作时同步 getter

我有一个多线程应用程序,它使用一个提供列表的静态类。我希望静态类的getter可以自由工作(不相互同步)但是当setter工作时我希望所有getter都被锁定并等待setter的工作完成。我不想在一起调用setter/getter时锁定setter/getter,因为这会大大降低性能。Getter每天被调用1,000,000次,而setter每天只应该工作一次。 最佳答案 考虑使用java.util.concurrent.locks.ReadWriteLock实现,例如ReentrantReadWriteLock(参见javadoc

java - Kafka 0.9 如何在使用 KafkaConsumer 手动提交偏移量时重新使用消息

我正在编写一个消费者,一旦将一系列记录提交给Mongo,它就会手动提交偏移量。在出现Mongo错误或任何其他错误的情况下,会尝试将记录保存到错误处理集合中以便日后重播。如果Mongo宕机,那么我希望消费者在尝试从Kakfa的未提交偏移量中读取记录之前停止处理一段时间。下面的示例有效,但我想知道这种情况的最佳做法是什么?while(true){booleancommit=false;try{ConsumerRecordsrecords=consumer.poll(consumerTimeout);kafkaMessageProcessor.processRecords(records);

java - 如何在kafka 0.9.0中使用多线程消费者?

kafka的文档给出了一个关于以下描述的方法:OneConsumerPerThread:Asimpleoptionistogiveeachthreaditsownconsumer>instance.我的代码:publicclassKafkaConsumerRunnerimplementsRunnable{privatefinalAtomicBooleanclosed=newAtomicBoolean(false);privatefinalCloudKafkaConsumerconsumer;privatefinalStringtopicName;publicKafkaConsumerR

Synchronized详解、同步互斥自旋锁分析及MonitorJVM底层实现原理

状态对象如果一个对象有被修改的成员变量被称为有状态的对象相反如果没有可被修改的成员变量称为无状态的对象。示例:publicclassMyThreadTest{publicstaticvoidmain(String[]args){Runnabler=newMyThread();Threadt1=newThread(r);Threadt2=newThread(r);t1.start();t2.start();}}classMyThreadimplementsRunnable{/***如果一个对象有被修改的成员变量被称为有状态的对象*相反如果没有可被修改的成员变量称为无状态的对象**由于两个线程同时