前言关于redis我们前面已经讨论过了缓存、分布式锁、分布式唯一标识、LBS服务的用法,这里我们来谈谈利用redis来实现一个消息服务。典型的消息服务是一个生产者和消费者模式的服务。一般是有生产者产生消息,将消息发送到队列中。而消息的消费者则监听消息,对消息进行处理。有很多非常优秀的消息队列服务的产品。例如RabbitMQ、RocketMQ、Kafka等。这些产品都具备非常高级的功能。可靠性、扩展性都非常的好。但是redis自身也能够很简单的实现消息队列的生产者和消费者模式。本文简单介绍一下在Java下是如何实现的。相关命令介绍参考redis官网redis和pubsub模式相关的命令如下。PU
我有一个对对象集合进行操作的任务队列(为了举例,假设这些对象是地址簿中的条目)。一个示例任务可能是“将Joe的电话号码更新为888-555-1212”。队列中可能同时有多个“更新Joe的电话号码...”任务,但电话号码不同。在这种情况下,必须应用更新以确保最后的状态是正确的(不,为了争论,不可能在任务上加上时间戳,在地址簿条目上加上时间戳,然后扔掉过时的任务)。将Jane的更新与Joe的更新乱序应用是安全的。我想对队列进行多线程处理,但需要按人同步访问。有没有适合这种东西的方便的库?还是我只能使用Executor并在Runnable的run()方法中对“name”进行自己的同步?
我创建了一个使用spring4的websockets机制的简单应用程序。我在我的应用程序中使用了一个activemq代理。在我的简单测试中,我为名为“Alejando”的用户创建了10条消息(user/alejandro/queue/greetings)当我使用“Alejando”登录并订阅该队列时:stompClient.subscribe('/user/alejandro/queue/greetings',function(greeting){showGreeting(JSON.parse(greeting.body).content);});我确实收到了为alejandro查询的
我有一种方法可以在while循环中监听UDP数据包。我想在数据包到达时使用不同类中的另一种方法解析数据包,并在应用程序的另一部分对每个数据包进行许多不同的解析和分析。我认为让PacketParser方法在循环外处理Queue会更好。是否可以在数据包进入时将数据包添加到Queue中,然后让应用程序的另一部分在项目进入队列时监听项目并执行其他操作,因为原始while循环保持监听数据包并将它们添加到队列中?我想让另一个函数监视队列并处理数据包,Java中是否有一些东西可以监视Queue或Stack?有更好的方法吗?publicvoidread(StringmulticastIpAddress
如果多个消费者从同一个队列中删除元素,是否有任何阻塞队列的实现可以保证公平的take()操作。我检查了LinkedBlockingQueue、LinkedTransferQueue,看起来它们都是不公平的。ArrayBlockingQueue提供了公平的操作,但它是有界的。 最佳答案 我们可以使用无界队列(如ConcurrentLinked队列)和公平信号量来实现无界公平阻塞队列。下面的类并没有实现BlockingQueue接口(interface)中的所有方法,只是实现了其中的一些用于演示目的。main()方法仅作为测试编写。pu
是否可以通过编程方式查询任务队列API以查看当前有多少任务正在执行/待处理?我在API中看不到任何执行此操作的方法,因此我求助于在数据存储区中创建对象来表示排队的任务。运行时,任务会从数据存储中删除相应的条目。如您所想,这很容易不同步。实际上,如果能够简单地计算给定队列名称的队列中的任务数量,我会非常高兴。 最佳答案 遗憾的是,没有可用于获取有关任务队列的信息的API。但是,我相信团队在未来会考虑到这一点(一个用于获取我们当前在仪表板上看到的统计信息的编程接口(interface),例如任务计数)。
我已经尝试在horntQ中使用PersistentQueue。我做了两个单独的例子(生产者,消费者)。我的消费者运行良好,但生产者花费太多时间来完成发送消息。我分别跑过和一起跑过。可能是什么问题呢?我的代码是:publicclassHornetProducerimplementsRunnable{Contextic=null;ConnectionFactorycf=null;Connectionconnection=null;Queuequeue=null;Sessionsession=null;MessageProducerpublisher=null;TextMessagemess
我创建了一个HashMap对象,它存储一个String作为键,相应的值作为int。现在我想要一个优先级队列,它具有HashMap对象中存在的所有字符串,其值作为分配优先级的引用。我写了下面的代码publicclassURIQueue{privateHashMapCopyQURI;privatePriorityQueueQURI;publicclassTComparator{publicintcompareTo(Strings1,Strings2){if(CopyQURI.get(s2)-CopyQURI.get(s1)>=0){return1;}else{return0;}}}publ
我目前正在将JMS支持添加到类似应用程序服务器的框架中。JMS将由HornetQ(独立代理,服务器类路径上的hornetqjar)实现,但既没有JBoss,也没有spring,也没有其他任何可以提供MDB的东西。下一步是将消息监听器添加到xa队列,以允许并行处理传入消息。有些消息会启动长时间运行的任务,因此基本思想是从onMessage方法生成工作线程。在漫长的互联网旅程中,我遇到了thisdiscussion,其中一位参与者提到,他不会这样做,而是使用一个额外的内部队列来完成任务:然后(单线程)消息监听器将简单地从入站队列中获取消息并为内部队列创建新消息,其中在该内部队列的另一端,一
我需要能够更改ThreadPoolExecutor任务队列的大小.当然,BlockingQueue不支持改变大小,ThreadPoolExecutor也不支持改变队列。所以,我想出的方法是使用ThreadPoolExecutor.shutdownNow(),这会返回一个尚未执行的Runnable列表。然后我可以创建一个具有所需队列大小的新执行程序并重新提交所有任务。问题在于调用shutdownNow()时正在进行的任务。据我从javadoc中得知,执行程序将在当前执行任务的所有线程上调用Thread.interrupt()。我不希望我的任务被杀死。这个问题可能是询问如何编写我的任务以使