我正在尝试使用 Spring Integration 和 ActiveMQ 消息代理配置 JMS。我的出站 channel 应该由 JDBC 消息存储支持,以防止数据丢失,例如经纪人或我的应用程序离线。
到目前为止,我的配置似乎有效,但 JDBC 消息存储的行为并不像我预期的那样。如果我断开代理,发送到出站 channel 的消息将按预期保留,但在重新连接后它们保留在数据库中并且不会发送到队列。但是,我在重新连接后发送的更多消息到达队列,如果我重新启动我的应用程序,持久消息也会最终发送......
application-context.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
xmlns:oxm="http://www.springframework.org/schema/oxm"
xmlns:j2ee="http://www.springframework.org/schema/jee"
xmlns:amq="http://activemq.apache.org/schema/core"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/oxm http://www.springframework.org/schema/oxm/spring-oxm.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- ActiveMQ Configuration -->
<jms:annotation-driven/>
<int:annotation-config/>
<!-- Factory for ActiveMQ connections from JNDI -->
<j2ee:jndi-lookup jndi-name="java:comp/env/jms/connectionFactory"
id="jmsConnectionFactory"
expected-type="org.apache.activemq.ActiveMQConnectionFactory"
proxy-interface="javax.jms.ConnectionFactory"
lookup-on-startup="false"
resource-ref="true"
cache="true"/>
<bean id="connectionFactory" class="org.apache.activemq.jms.pool.PooledConnectionFactory" init-method="start" destroy-method="stop">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
<property name="maxConnections" value="8"/>
<property name="reconnectOnException" value="true"/>
</bean>
<!-- JAXB-marshaller from spring-oxm used to handle XML-payload of messages -->
<oxm:jaxb2-marshaller id="jax2bMarshaller" context-path="de.br.ecomx"/>
<!-- message-converter -->
<bean id="messageConverter" class="org.springframework.jms.support.converter.MarshallingMessageConverter">
<property name="marshaller" ref="jax2bMarshaller"/>
<property name="unmarshaller" ref="jax2bMarshaller"/>
</bean>
<!-- Message Store implementation -->
<!-- Persists messages temporariliy in DB to prevent data loss on server-shutdown -->
<!-- Has to be injected specifically in Queue Channels -->
<bean id="store" class="org.springframework.integration.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider">
<bean id="queryProvider" class="org.springframework.integration.jdbc.store.channel.MySqlChannelMessageStoreQueryProvider"/>
</property>
<property name="usingIdCache" value="true"/>
<property name="region" value="BWH"/>
<property name="tablePrefix" value="BWH_"/>
</bean>
<!-- default poller for (non-message-driven) channel-adapters -->
<int:poller id="defaultPoller" default="true" fixed-delay="500">
<int:transactional propagation="REQUIRED" isolation="DEFAULT" transaction-manager="transactionManager"/>
</int:poller>
<!-- LOGGING -->
<!--<int:logging-channel-adapter id="logger" level="DEBUG" log-full-message="true"/>-->
<!-- CHANNEL DEFINITIONS -->
<!-- intermediate channel for message enrichment -->
<int:channel id="output_enricher_channel"><int:queue/></int:channel>
<!-- general output-channel - specific channel is choosed by a router -->
<int:channel id="output_router"><int:queue/></int:channel>
<!-- channel definitions connected to consumer queues -->
<int:channel id="bwh_articleMasterData_input_channel"><int:queue/></int:channel>
<int:channel id="bwh_transferOrder_input_channel"><int:queue/></int:channel>
<!-- add your channel here... -->
<!-- channel definitions connected to producer queues -->
<int:channel id="ax_transferOrderPacked_input_channel"><int:queue message-store="store"/></int:channel>
<!-- add your channel here... -->
<!-- MESSAGE HANDLING -->
<!-- Enriches the 'messageId'-property of payload with 'id'-value from header and sends them to router -->
<int:enricher input-channel="output_enricher_channel" output-channel="output_router">
<int:property name="messageId" expression="headers.id"/>
</int:enricher>
<!-- routing of outgoing messages to their type-specific channel -->
<int:payload-type-router input-channel="output_router">
<int:mapping type="de.br.ecomx.TransferOrdersPacked" channel="ax_transferOrderPacked_input_channel"/>
</int:payload-type-router>
<!-- ENDPOINTS -->
<!-- endpoint configurations connecting queues with channels -->
<int-jms:message-driven-channel-adapter id="articleMasterDataConsumerChannelAdapter" connection-factory="connectionFactory"
destination-name="${queue.bwh.articlemasterdata.in}" channel="bwh_articleMasterData_input_channel"
message-converter="messageConverter" acknowledge="client" max-concurrent-consumers="10"
auto-startup="false"/>
<int-jms:message-driven-channel-adapter id="transferOrderConsumerChannelAdapter" connection-factory="connectionFactory"
destination-name="${queue.bwh.transferorder.in}" channel="bwh_transferOrder_input_channel"
message-converter="messageConverter" acknowledge="client" max-concurrent-consumers="5"
auto-startup="false"/>
<int-jms:outbound-channel-adapter id="transferOrderPackedProducerChannelAdapter" connection-factory="connectionFactory"
destination-name="${queue.ax.transferorderpacked.in}" channel="ax_transferOrderPacked_input_channel"
message-converter="messageConverter" delivery-persistent="true" session-transacted="true"
auto-startup="false"/>
<!-- add your endpoint here... -->
<!-- REMEMBER: If auto-startup is set to false (which is recommended for all channel adapters) -->
<!-- the channel adapter has to be registered in ecomxConsumerChannelAdapterContainer below!!! -->
<!-- gateway definitions usually injected by producer-services and used to send messages to an outbound-channel-adapter -->
<int:gateway default-request-channel="output_enricher_channel" service-interface="de.lo.bwh.jms.gateway.EcomxProducerGateway"/>
<!-- connect endpoints with services -->
<int:service-activator input-channel="bwh_articleMasterData_input_channel" ref="ecomxArticleMasterDataConsumer"/>
<int:service-activator input-channel="bwh_transferOrder_input_channel" ref="ecomxTransferOrderConsumer"/>
<!-- container for endpoints used for start / stop all registered consumers / producers at once -->
<bean id="ecomxConsumerChannelAdapterContainer" class="de.lo.bwh.jms.EcomxConsumerChannelAdapterContainer">
<property name="endpoints">
<list value-type="org.springframework.integration.jms.JmsMessageDrivenEndpoint">
<ref bean="articleMasterDataConsumerChannelAdapter"/>
<ref bean="transferOrderConsumerChannelAdapter"/>
</list>
</property>
</bean>
<bean id="ecomxProducerChannelAdapterContainer" class="de.lo.bwh.jms.EcomxProducerChannelAdapterContainer">
<property name="endpoints">
<list value-type="org.springframework.integration.endpoint.PollingConsumer">
<ref bean="transferOrderPackedProducerChannelAdapter"/>
</list>
</property>
</bean>
</beans>
context.xml
<Resource name="jms/connectionFactory"
auth="Container"
type="org.apache.activemq.ActiveMQConnectionFactory"
description="JMS Connection Factory"
factory="org.apache.activemq.jndi.JNDIReferenceFactory"
brokerURL="tcp://my.url.net:61616"
brokerName="myActiveMQBroker"/>
我正在使用 EcomxProducerGateway 发送消息。它将消息发送到 output_enricher_channel,后者将它们转发到 output_router,后者按负载类型将消息路由到正确的出站 channel 。
断开代理时,几秒钟后会记录几个异常。以下堆栈跟踪可能会有所帮助:
[WARN 08:37:07] SingleConnectionFactory.onException(322) | Encountered a JMSException - resetting the underlying JMS Connection
javax.jms.JMSException: Operation timed out
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:54)
at org.apache.activemq.ActiveMQConnection.onAsyncException(ActiveMQConnection.java:1997)
at org.apache.activemq.ActiveMQConnection.onException(ActiveMQConnection.java:2016)
at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:101)
at org.apache.activemq.transport.ResponseCorrelator.onException(ResponseCorrelator.java:126)
at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:101)
at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:101)
at org.apache.activemq.transport.WireFormatNegotiator.onException(WireFormatNegotiator.java:160)
at org.apache.activemq.transport.AbstractInactivityMonitor.onException(AbstractInactivityMonitor.java:314)
at org.apache.activemq.transport.TransportSupport.onException(TransportSupport.java:96)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:200)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.SocketException: Operation timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50)
at org.apache.activemq.transport.tcp.TcpTransport$2.fill(TcpTransport.java:609)
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:58)
at org.apache.activemq.transport.tcp.TcpTransport$2.read(TcpTransport.java:594)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:258)
at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:221)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:213)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196)
... 1 more
[ERROR 08:37:07] LoggingHandler.handleMessageInternal(145) | org.springframework.jms.connection.SynchedLocalTransactionFailedException: Local JMS transaction failed to commit; nested exception is javax.jms.JMSException: Operation timed out
at org.springframework.jms.connection.ConnectionFactoryUtils$JmsResourceSynchronization.processResourceAfterCommit(ConnectionFactoryUtils.java:424)
at org.springframework.jms.connection.ConnectionFactoryUtils$JmsResourceSynchronization.processResourceAfterCommit(ConnectionFactoryUtils.java:404)
at org.springframework.transaction.support.ResourceHolderSynchronization.afterCommit(ResourceHolderSynchronization.java:85)
at org.springframework.transaction.support.TransactionSynchronizationUtils.invokeAfterCommit(TransactionSynchronizationUtils.java:133)
at org.springframework.transaction.support.TransactionSynchronizationUtils.triggerAfterCommit(TransactionSynchronizationUtils.java:121)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.triggerAfterCommit(AbstractPlatformTransactionManager.java:954)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:799)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:726)
at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:515)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:291)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
at com.sun.proxy.$Proxy484.call(Unknown Source)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:298)
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:292)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: javax.jms.JMSException: Operation timed out
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:54)
at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1420)
at org.apache.activemq.TransactionContext.syncSendPacketWithInterruptionHandling(TransactionContext.java:761)
at org.apache.activemq.TransactionContext.commit(TransactionContext.java:327)
at org.apache.activemq.ActiveMQSession.commit(ActiveMQSession.java:574)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.springframework.jms.connection.CachingConnectionFactory$CachedSessionInvocationHandler.invoke(CachingConnectionFactory.java:384)
at com.sun.proxy.$Proxy602.commit(Unknown Source)
at org.springframework.jms.connection.JmsResourceHolder.commitAll(JmsResourceHolder.java:184)
at org.springframework.jms.connection.ConnectionFactoryUtils$JmsResourceSynchronization.processResourceAfterCommit(ConnectionFactoryUtils.java:421)
... 27 more
Caused by: java.net.SocketException: Operation timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50)
at org.apache.activemq.transport.tcp.TcpTransport$2.fill(TcpTransport.java:609)
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:58)
at org.apache.activemq.transport.tcp.TcpTransport$2.read(TcpTransport.java:594)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:258)
at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:221)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:213)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196)
... 1 more
[ERROR 08:37:08] LoggingHandler.handleMessageInternal(145) | org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.jms.JmsSendingMessageHandler#0]; nested exception is org.springframework.jms.UncategorizedJmsException: Uncategorized exception occured during JMS processing; nested exception is javax.jms.JMSException: Could not connect to broker URL: tcp://my.url.net:61616. Reason: java.net.UnknownHostException: my.url.net
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)
at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:74)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:219)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:149)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146)
at sun.reflect.GeneratedMethodAccessor464.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:99)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:281)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
at com.sun.proxy.$Proxy484.call(Unknown Source)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:298)
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:292)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.jms.UncategorizedJmsException: Uncategorized exception occured during JMS processing; nested exception is javax.jms.JMSException: Could not connect to broker URL: tcp://my.url.net:61616. Reason: java.net.UnknownHostException: my.url.net
at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:316)
at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:169)
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:496)
at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:579)
at org.springframework.jms.core.JmsTemplate.convertAndSend(JmsTemplate.java:705)
at org.springframework.integration.jms.JmsSendingMessageHandler.send(JmsSendingMessageHandler.java:145)
at org.springframework.integration.jms.JmsSendingMessageHandler.handleMessageInternal(JmsSendingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
... 31 more
Caused by: javax.jms.JMSException: Could not connect to broker URL: tcp://my.url.net:61616. Reason: java.net.UnknownHostException: my.url.net
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:36)
at org.apache.activemq.ActiveMQConnectionFactory.createActiveMQConnection(ActiveMQConnectionFactory.java:360)
at org.apache.activemq.ActiveMQConnectionFactory.createActiveMQConnection(ActiveMQConnectionFactory.java:305)
at org.apache.activemq.ActiveMQConnectionFactory.createConnection(ActiveMQConnectionFactory.java:245)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:201)
at com.sun.proxy.$Proxy156.createConnection(Unknown Source)
at org.springframework.jms.connection.SingleConnectionFactory.doCreateConnection(SingleConnectionFactory.java:365)
at org.springframework.jms.connection.SingleConnectionFactory.initConnection(SingleConnectionFactory.java:305)
at org.springframework.jms.connection.SingleConnectionFactory.getConnection(SingleConnectionFactory.java:283)
at org.springframework.jms.connection.SingleConnectionFactory.createConnection(SingleConnectionFactory.java:224)
at org.springframework.jms.support.JmsAccessor.createConnection(JmsAccessor.java:180)
at org.springframework.jms.core.JmsTemplate.access$600(JmsTemplate.java:90)
at org.springframework.jms.core.JmsTemplate$JmsTemplateResourceFactory.createConnection(JmsTemplate.java:1205)
at org.springframework.jms.connection.ConnectionFactoryUtils.doGetTransactionalSession(ConnectionFactoryUtils.java:312)
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:480)
... 36 more
Caused by: java.net.UnknownHostException: my.url.net
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at org.apache.activemq.transport.tcp.TcpTransport.connect(TcpTransport.java:501)
at org.apache.activemq.transport.tcp.TcpTransport.doStart(TcpTransport.java:464)
at org.apache.activemq.util.ServiceSupport.start(ServiceSupport.java:55)
at org.apache.activemq.transport.AbstractInactivityMonitor.start(AbstractInactivityMonitor.java:138)
at org.apache.activemq.transport.TransportFilter.start(TransportFilter.java:58)
at org.apache.activemq.transport.WireFormatNegotiator.start(WireFormatNegotiator.java:72)
at org.apache.activemq.transport.TransportFilter.start(TransportFilter.java:58)
at org.apache.activemq.transport.TransportFilter.start(TransportFilter.java:58)
at org.apache.activemq.ActiveMQConnectionFactory.createActiveMQConnection(ActiveMQConnectionFactory.java:340)
... 54 more
最佳答案
嗯,由于 usingIdCache="true",这对回滚消息不起作用。消息返回到数据库,但它们不会被下一个 poll 拾取,因为它们的 id 在内存中 idCache 中。
我们提供此建议以处理已处理或回滚的消息:
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" />
<int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())"/>
</int:transaction-synchronization-factory>
...
<int:poller id="defaultPoller" default="true" fixed-delay="500">
<int:transactional propagation="REQUIRED" isolation="DEFAULT"
synchronization-factory="syncFactory"
transaction-manager="transactionManager"/>
</int:poller>
独立于 TX 结果的消息 id 从 idCache 中删除。在您的情况下,它将准备好再接受一次轮询。
在 Reference Manual 中查看更多信息. JdbcChannelMessageStore#setUsingIdCache 上的 JavaDocs 显示相同。
关于java - Spring 与 JMS + ActiveMQ 集成 : Messages remain in JDBC Message Store after reconnect,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29913956/
我的瘦服务器配置了nginx,我的ROR应用程序正在它们上运行。在我发布代码更新时运行thinrestart会给我的应用程序带来一些停机时间。我试图弄清楚如何优雅地重启正在运行的Thin实例,但找不到好的解决方案。有没有人能做到这一点? 最佳答案 #Restartjustthethinserverdescribedbythatconfigsudothin-C/etc/thin/mysite.ymlrestartNginx将继续运行并代理请求。如果您将Nginx设置为使用多个上游服务器,例如server{listen80;server
我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/
我正在使用Ruby2.1.1和Rails4.1.0.rc1。当执行railsc时,它被锁定了。使用Ctrl-C停止,我得到以下错误日志:~/.rvm/gems/ruby-2.1.1/gems/spring-1.1.2/lib/spring/client/run.rb:47:in`gets':Interruptfrom~/.rvm/gems/ruby-2.1.1/gems/spring-1.1.2/lib/spring/client/run.rb:47:in`verify_server_version'from~/.rvm/gems/ruby-2.1.1/gems/spring-1.1.
我在app/helpers/sessions_helper.rb中有一个帮助程序文件,其中包含一个方法my_preference,它返回当前登录用户的首选项。我想在集成测试中访问该方法。例如,这样我就可以在测试中使用getuser_path(my_preference)。在其他帖子中,我读到这可以通过在测试文件中包含requiresessions_helper来实现,但我仍然收到错误NameError:undefinedlocalvariableormethod'my_preference'.我做错了什么?require'test_helper'require'sessions_hel
我一直很高兴地使用DelayedJob习惯用法:foo.send_later(:bar)这会调用DelayedJob进程中对象foo的方法bar。我一直在使用DaemonSpawn在我的服务器上启动DelayedJob进程。但是...如果foo抛出异常,Hoptoad不会捕获它。这是任何这些包中的错误...还是我需要更改某些配置...或者我是否需要在DS或DJ中插入一些异常处理来调用Hoptoad通知程序?回应下面的第一条评论。classDelayedJobWorker 最佳答案 尝试monkeypatchingDelayed::W
我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www
我只想对我一直在思考的这个问题有其他意见,例如我有classuser_controller和classuserclassUserattr_accessor:name,:usernameendclassUserController//dosomethingaboutanythingaboutusersend问题是我的User类中是否应该有逻辑user=User.newuser.do_something(user1)oritshouldbeuser_controller=UserController.newuser_controller.do_something(user1,user2)我
什么是ruby的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht
如何将send与+=一起使用?a=20;a.send"+=",10undefinedmethod`+='for20:Fixnuma=20;a+=10=>30 最佳答案 恐怕你不能。+=不是方法,而是语法糖。参见http://www.ruby-doc.org/docs/ProgrammingRuby/html/tut_expressions.html它说Incommonwithmanyotherlanguages,Rubyhasasyntacticshortcut:a=a+2maybewrittenasa+=2.你能做的最好的事情是:
这篇文章是继上一篇文章“Observability:从零开始创建Java微服务并监控它(一)”的续篇。在上一篇文章中,我们讲述了如何创建一个Javaweb应用,并使用Filebeat来收集应用所生成的日志。在今天的文章中,我来详述如何收集应用的指标,使用APM来监控应用并监督web服务的在线情况。源码可以在地址 https://github.com/liu-xiao-guo/java_observability 进行下载。摄入指标指标被视为可以随时更改的时间点值。当前请求的数量可以改变任何毫秒。你可能有1000个请求的峰值,然后一切都回到一个请求。这也意味着这些指标可能不准确,你还想提取最小/