草庐IT

消费者Consumer

全部标签

java - Kafka Java 消费者被标记为组死亡

我正在使用Java消费者来消费来自主题(kafka版本0.10.0.1)的消息,如果我在docker容器之外运行它们,它会正常工作。但是,当我在docker容器中执行它们时,这些组将被标记为已死亡并显示消息Markingthecoordinatorlocal.kafka.com:9092(id:2147483647rack:null)deadforgroupmy-group我的消费者配置如下:-metadata.max.age.ms=300000partition.assignment.strategy=[org.apache.kafka.clients.consumer.RangeA

java - session 事务消费者或生产者中的消息代理异常处理

我想使用SAGA我的SpringBoot微服务中的模式。例如,在客户订单中,当订单创建时,会产生一个类似OrderCreatedEvent的事件,然后在客户微服务中OrderCreatedEvent上的监听器更新客户信用并产生CreditUpdateEvent和...。我使用session事务处理JmsTemplate来生成事件。在JmsTemplate的javadoc中表示JMS事务在主事务之后提交:ThishastheeffectofalocalJMStransactionbeingmanagedalongsidethemaintransaction(whichmightbeana

java - RabbitMQ 暂停队列消费

保留持久队列及其绑定(bind)但暂停其消费者的最佳方法是什么?用例是:如果我们不断收到一堆我们无法处理的消息(例如数据库已关闭或模式问题),我想“让它崩溃”并停止处理消息,但我想继续聚合到队列中。即允许发布但暂停消费。我可以想到三种解决方案:我可以让绑定(bind)到队列的所有消费者不断拒绝消息并重新排队,但这是一种资源浪费,更不用说我已经以编程方式执行上述逻辑。我可以对所有消费者调用basic.cancelConsumer(见下文)或以spring-amqp表示我想我可以在所有SimpleMessageListenerContainers上调用shutdown绑定(bind)到队列

java - 消费本地EJB,同一个Container,不同耳

我正在尝试在同一个Glassfish中使用本地EJB,但耳朵不同。但是Glassfish找不到本地EJB或者不能消费我读了这个:根据JavaEE教程,@Localbean的客户端“必须在与其访问的企业bean相同的JVM中运行。”第一耳,我在jar中有本地接口(interface)@LocalpublicinterfaceMyLocalBean{intgetNumber(intnum3);}在另一个jar里,我有实现@Stateless@LocalBeanpublicclassMyLocalBeanImplimplementsMyLocalBean,Serializable{publi

java - 具有独立消费者的单个 InputStream 的并发处理

我需要生成N个消费者线程,它们同时处理相同的InputStream,例如-以某种方式对其进行转换,计算校验和或数字签名等。这些消费者彼此不依赖,并且都在使用第三方库,这接受InputStream作为数据源。所以我能做的是-创建一些InputStream的实现,这将从“父”流中读取数据block解锁消费者等到每个消费者都读完整个block阅读下一段虽然看起来很简单,但当某些消费者死亡时,可能会引发各种问题,例如活锁,实现所有InputStream方法,使用屏障/锁存器控制消费者自己的fork/join等。一个friend告诉我,实现需要半个小时,这让我度过了一个晚上。我宁愿使用足够成熟的

java - Camel 和 JMS 以正确的顺序从高级队列中消费消息

我在将ApacheCamel与OracleAdvancedQueues和JMS结合使用时遇到问题。这是关于分发消息的应用程序。在Camel的帮助下,消息在OracleAdvancedQueues中接收和排队。然后它们被Camel消耗并转发到目标系统。对于消息传递失败的情况,在AdvancedQueue中定义了一个重试次数,使消息重新传递。如果Camel现在将消息出列并将其发送到不可用的目标系统,则会抛出HttpOperationFailedException或NoSuchEndpointException。这些被捕获并执行回滚。此时,期望按照重试计数中定义的频率重试消息传递,然后将其移

java - Kafka Consumer如何从多个assigned Partition中消费

tl;dr;我试图了解分配了多个分区的单个消费者如何处理到达分区的消费记录。例如:在移动到下一个之前完全处理单个分区。每次从每个分区处理一大块可用记录。从第一个可用分区处理一批N条记录以循环方式处理来自分区的一批N条记录我找到了Ranged或RoundRobin分配器的partition.assignment.strategy配置,但这只决定了消费者如何分配分区,而不是它如何分配从分配给它的分区中消耗。我开始深入研究KafkaConsumer源代码并#poll()带我去#pollForFetches()#pollForFetches()然后带我到fetcher#fetchedRecor

java - gwt 的 openid 消费者

有人知道或知道gwt/gae的openid依赖方(消费者)的java实现吗?openid4java和joid为我的需要带来了太多的负担。 最佳答案 自版本1.3.4起,GoogleAppEngine支持通过开箱即用的OpenID(除了OAuth)进行身份验证。查看申请注册页面。OpenID被引用为es“联合登录”,目前仍标记为[Experimental]...但它正在工作...而且API看起来几乎太简单了,不可能是真实的;-)在http://super-easy.appspot.com上运行着一个GWT演示应用程序

java - 生产者消费者——使用Executors.newFixedThreadPool

我对生产者-消费者模式的理解是,它可以使用生产者和消费者之间共享的队列来实现。生产者将工作提交到共享队列,消费者检索并处理它。也可以通过生产者直接提交给消费者来实现(Producer线程直接提交给Consumer的executor服务)。现在,我一直在研究提供线程池一些常见实现的Executors类。根据规范,newFixedThreadPool方法“重用固定数量的线程,这些线程在共享的无界队列中运行”。他们在这里谈论哪个队列?如果Producer直接提交任务给consumer,ExecutorService的内部队列是否包含Runnables列表?或者它是中间队列,以防生产者提交到共

Kafka篇——SpringBoot中使用Kafka,详细的集成和简单生产消费流程流程,常见消息配置,黄金文档!

集成和简单生产消费流程一、引入依赖二、配置文件中配置Kafka将来我们的项目大概率不会是会都扮演生产者和消费者两个角色,所以在集成Kafka的时候,生产者的项目中只配置生产者相关的配置即可,消费者项目配置消费者的相关的配置即可三、编写生产者代码为了简化演示,直接将业务层代码写到了控制层,见谅哈!四、编写消费者注意:如果不调用手动提交offset这个方法,那么会产生消息重复消费的问题五、调用生产者的接口,观察消费者是否正常消费到消息1、调用生产者接口2、观察控制台消费者可以看到生产者发送了消息,消费者立刻就拿到了消息!消费消息细节配置一、指定Broker的主题和分区,控制消费者数量和消费偏移量二