草庐IT

producer-consumer

全部标签

java - LMAX Disruptor - 什么决定了批量大小?

我最近一直在学习LMAXDisruptor并进行了一些实验。令我困惑的一件事是EventHandler的onEvent处理程序方法的endOfBatch参数。考虑我的以下代码。首先,我调用Test1和Test1Worker的虚拟消息和消费者类:publicclassTest1{}publicclassTest1WorkerimplementsEventHandler{publicvoidonEvent(Test1event,longsequence,booleanendOfBatch){try{Thread.sleep(500);}catch(Exceptione){e.printSt

java - 亚马逊 SQS : The same message is consumed by two current consumers

我有四个当前消费者在AmazonAWS上收听同一个队列。从队列中拉取消息时,有时会出现同一条消息被两个不同的消费者消费的情况。请看下面的日志:18:01:46,515[jmsContainer-2]DEBUG-从队列中收到消息:ID:3698a927-930b-4d6a-aeca-f6692252879218:02:12,825[jmsContainer-3]DEBUG-从队列中收到消息:ID:3698a927-930b-4d6a-aeca-f66922528792我有一个包含4个并发使用者的JMS容器设置。我将可见性超时设置为30秒。既然container2收到了消息,怎么conta

生产者消费者的Java实现抛出java.lang.IllegalMonitorStateException

importjava.util.LinkedList;importjava.util.Queue;classProducerextendsPubSubimplementsRunnable{@Overridepublicvoidrun(){synchronized(queue){if(queue.size()==99){try{wait();}catch(InterruptedExceptione){e.printStackTrace();}}queue.add(2);try{Thread.sleep(1000);}catch(InterruptedExceptione){e.print

java - 是否有 Java-8 之前的功能接口(interface)可以替代 java.util.function.Consumer<T>?

为了迁移到Java8,我尝试以有利于使用lambda的方式编写我的代码。我需要一个功能接口(interface),该接口(interface)具有一个方法,该方法采用某种类型的一个参数T并返回void。这是java.util.function.Consumer的accept()方法的签名,但我当然还不能使用它。我可以使用标准Java7(最好是Java6)API中的另一个接口(interface)吗?我知道我可以创建自己的,但尤其是。在将此代码移植到Java8之前,如果我可以使用已经从标准Java6/7API中熟悉的标准接口(interface),那么可读性会更好。到目前为止我发现的最接

java - Consumer<T> 映射到 HashMap 中的 Class<T>

我想创建一个IdentityHashMap,Consumer>.基本上,我想用一个方法映射一个类型,说明如何处理这个类型。我想动态地能够用对象X说,执行Y。我能做到privateIdentityHashMap,Consumer>interceptor=newIdentityHashMap();但这很糟糕,因为我必须在使用它时将对象转换到lamba中。例子:interceptor.put(Train.class,train->{System.out.println(((Train)train).getSpeed());});我想做的是privateIdentityHashMap,Cons

java - 如何编写一个 Consumer 来计算它被调用了多少次?

我需要(主要是出于测试目的)编写一个消费者,其目的是记住它被调用了多少次。但是,我做不到inti=0;Consumerhandler=o->i++;因为i必须是final,我不能增加final变量。我想我需要类似MutableInteger类的东西。那么正确的计数方法是什么?仅仅为此编写我自己的新类或新方法不算是正确的方法。 最佳答案 使用AtomicInteger,它是使用CAS实现的.AtomicInteger有一个incrementAndGet()方法可以用于此目的。知道JDK中有更多的Atomic*变体也很有用,所以如果In

java - Kafka Consumer如何从多个assigned Partition中消费

tl;dr;我试图了解分配了多个分区的单个消费者如何处理到达分区的消费记录。例如:在移动到下一个之前完全处理单个分区。每次从每个分区处理一大块可用记录。从第一个可用分区处理一批N条记录以循环方式处理来自分区的一批N条记录我找到了Ranged或RoundRobin分配器的partition.assignment.strategy配置,但这只决定了消费者如何分配分区,而不是它如何分配从分配给它的分区中消耗。我开始深入研究KafkaConsumer源代码并#poll()带我去#pollForFetches()#pollForFetches()然后带我到fetcher#fetchedRecor

java - 生产者消费者——使用Executors.newFixedThreadPool

我对生产者-消费者模式的理解是,它可以使用生产者和消费者之间共享的队列来实现。生产者将工作提交到共享队列,消费者检索并处理它。也可以通过生产者直接提交给消费者来实现(Producer线程直接提交给Consumer的executor服务)。现在,我一直在研究提供线程池一些常见实现的Executors类。根据规范,newFixedThreadPool方法“重用固定数量的线程,这些线程在共享的无界队列中运行”。他们在这里谈论哪个队列?如果Producer直接提交任务给consumer,ExecutorService的内部队列是否包含Runnables列表?或者它是中间队列,以防生产者提交到共

java - Spring Kafka-用Producer Listener配置KafkaTemplate和用Listenable Future注册回调的区别

所以我在浏览Springkafka文档时遇到了ProducerListener。这是SpringKafka文档所说的-“可选地,您可以使用ProducerListener配置KafkaTemplate以获取包含发送结果(成功或失败)的异步回调,而不是等待Future完成。”他们还指定了接口(interface)-publicinterfaceProducerListener{voidonSuccess(Stringtopic,Integerpartition,Kkey,Vvalue,RecordMetadatarecordMetadata);voidonError(Stringtopi

java - 资源模型具有不明确的(子)资源方法,用于 HTTP 方法 GET 和由“@Consumes”和 "@Produces"注释定义的输入 mime 类型

当它们具有不同的URL时,以下内容如何产生此错误?@Path("/job/{empId}/empProfile")publicEmpProfileResourcedelegateToEventProfileResource(){EmpProfileResourceresource=newEmpProfileResource();locator.inject(resource);returnresource;}@Path("/job/{empId}/empTask")publicEmpTaskResourcegetClientLevelAttendees(@PathParam("clie