public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscriber { //...... @Override public Message receive() throws JMSException { return getMessage(0, false); } @Override public Message receive(final long timeout) throws JMSException { return getMessage(timeout, false); } @Override public Message receiveNoWait() throws JMSException { return getMessage(0, true); } private ActiveMQMessage getMessage(final long timeout, final boolean noWait) throws JMSException { try { ClientMessage coreMessage; if (noWait) { coreMessage = consumer.receiveImmediate(); } else { coreMessage = consumer.receive(timeout); } ActiveMQMessage jmsMsg = null; if (coreMessage != null) { ClientSession coreSession = session.getCoreSession(); boolean needSession = ackMode == Session.CLIENT_ACKNOWLEDGE || ackMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE || coreMessage.getType() == ActiveMQObjectMessage.TYPE; if (coreMessage.getRoutingType() == null) { coreMessage.setRoutingType(destination.isQueue() ? RoutingType.ANYCAST : RoutingType.MULTICAST); } if (session.isEnable1xPrefixes()) { jmsMsg = ActiveMQCompatibleMessage.createMessage(coreMessage, needSession ? coreSession : null, options); } else { jmsMsg = ActiveMQMessage.createMessage(coreMessage, needSession ? coreSession : null, options); } try { jmsMsg.doBeforeReceive(); } catch (IndexOutOfBoundsException ioob) { ((ClientSessionInternal) session.getCoreSession()).markRollbackOnly(); // In case this exception happen you will need to know where it happened. // it has been a bug here in the past, and this was used to debug it. // nothing better than keep it for future investigations in case it happened again IndexOutOfBoundsException newIOOB = new IndexOutOfBoundsException(ioob.getMessage() + "@" + jmsMsg.getCoreMessage()); newIOOB.initCause(ioob); ActiveMQClientLogger.LOGGER.unableToGetMessage(newIOOB); throw ioob; } // We Do the ack after doBeforeReceive, as in the case of large messages, this may fail so we don't want messages redelivered // https://issues.jboss.org/browse/JBPAPP-6110 if (session.getAcknowledgeMode() == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE) { jmsMsg.setIndividualAcknowledge(); } else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) { jmsMsg.setClientAcknowledge(); coreMessage.acknowledge(); } else { coreMessage.acknowledge(); } } return jmsMsg; } catch (ActiveMQException e) { ((ClientSessionInternal) session.getCoreSession()).markRollbackOnly(); throw JMSExceptionHelper.convertFromActiveMQException(e); } catch (ActiveMQInterruptedException e) { ((ClientSessionInternal) session.getCoreSession()).markRollbackOnly(); throw JMSExceptionHelper.convertFromActiveMQException(e); } } //......}
ActiveMQMessageConsumer的receive方法最后调用的是getMessage方法,它对于session.getAcknowledgeMode()为ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE执行jmsMsg.setIndividualAcknowledge(),其余的都执行coreMessage.acknowledge()acknowledgeactivemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.javapublic class ClientMessageImpl extends CoreMessage implements ClientMessageInternal { //...... public ClientMessageImpl acknowledge() throws ActiveMQException { if (consumer != null) { consumer.acknowledge(this); } return this; } //......}
ClientMessageImpl的acknowledge方法执行的是consumer.acknowledge(this)推模式handleMessageactivemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.javapublic final class ClientConsumerImpl implements ClientConsumerInternal { //...... public synchronized void handleMessage(final ClientMessageInternal message) throws Exception { if (closing) { // This is ok - we just ignore the message return; } if (message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)) { handleCompressedMessage(message); } else { handleRegularMessage(message); } } private void handleRegularMessage(ClientMessageInternal message) { if (message.getAddress() == null) { message.setAddress(queueInfo.getAddress()); } message.onReceipt(this); if (!ackIndividually && message.getPriority() != 4 && !message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) { // We have messages of different priorities so we need to ack them individually since the order // of them in the ServerConsumerImpl delivery list might not be the same as the order they are // consumed in, which means that acking all up to won't work ackIndividually = true; } // Add it to the buffer buffer.addTail(message, message.getPriority()); if (handler != null) { // Execute using executor if (!stopped) { queueExecutor(); } } else { notify(); } } private void queueExecutor() { if (logger.isTraceEnabled()) { logger.trace(this + "::Adding Runner on Executor for delivery"); } sessionExecutor.execute(runner); } //......}
ClientConsumerImpl的handleRegularMessage会执行buffer.addTail(message, message.getPriority())以及queueExecutor(),queueExecutor通过sessionExecutor执行RunnercallOnMessageactivemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.javapublic final class ClientConsumerImpl implements ClientConsumerInternal { //...... private final PriorityLinkedList<ClientMessageInternal> buffer = new PriorityLinkedListImpl<>(ClientConsumerImpl.NUM_PRIORITIES); //...... private class Runner implements Runnable { @Override public void run() { try { callOnMessage(); } catch (Exception e) { ActiveMQClientLogger.LOGGER.onMessageError(e); lastException = e; } } } private void callOnMessage() throws Exception { if (closing || stopped) { return; } session.workDone(); // We pull the message from the buffer from inside the Runnable so we can ensure priority // ordering. If we just added a Runnable with the message to the executor immediately as we get it // we could not do that ClientMessageInternal message; // Must store handler in local variable since might get set to null // otherwise while this is executing and give NPE when calling onMessage MessageHandler theHandler = handler; if (theHandler != null) { if (rateLimiter != null) { rateLimiter.limit(); } failedOver = false; synchronized (this) { message = buffer.poll(); } if (message != null) { if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) { //Ignore, this could be a relic from a previous receiveImmediate(); return; } boolean expired = message.isExpired(); flowControlBeforeConsumption(message); if (!expired) { if (logger.isTraceEnabled()) { logger.trace(this + "::Calling handler.onMessage"); } final ClassLoader originalLoader = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() { @Override public ClassLoader run() { ClassLoader originalLoader = Thread.currentThread().getContextClassLoader(); Thread.currentThread().setContextClassLoader(contextClassLoader); return originalLoader; } }); onMessageThread = Thread.currentThread(); try { theHandler.onMessage(message); } finally { try { AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { Thread.currentThread().setContextClassLoader(originalLoader); return null; } }); } catch (Exception e) { ActiveMQClientLogger.LOGGER.failedPerformPostActionsOnMessage(e); } onMessageThread = null; } if (logger.isTraceEnabled()) { logger.trace(this + "::Handler.onMessage done"); } if (message.isLargeMessage()) { message.discardBody(); } } else { session.expire(this, message); } // If slow consumer, we need to send 1 credit to make sure we get another message if (clientWindowSize == 0) { startSlowConsumer(); } } } } //......}
Runner的run方法会执行callOnMessage方法,它会从buffer.poll()消息,如果不为null且非expired则执行theHandler.onMessage(message)onMessageactivemq-artemis-2.11.0/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.javapublic class JMSMessageListenerWrapper implements MessageHandler { private final ConnectionFactoryOptions options; private final ActiveMQConnection connection; private final ActiveMQSession session; private final MessageListener listener; private final ClientConsumer consumer; private final boolean transactedOrClientAck; private final boolean individualACK; private final boolean clientACK; protected JMSMessageListenerWrapper(final ConnectionFactoryOptions options, final ActiveMQConnection connection, final ActiveMQSession session, final ClientConsumer consumer, final MessageListener listener, final int ackMode) { this.options = options; this.connection = connection; this.session = session; this.consumer = consumer; this.listener = listener; transactedOrClientAck = (ackMode == Session.SESSION_TRANSACTED || ackMode == Session.CLIENT_ACKNOWLEDGE) || session.isXA(); individualACK = (ackMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE); clientACK = (ackMode == Session.CLIENT_ACKNOWLEDGE); } / In this method we apply the JMS acknowledgement and redelivery semantics as per JMS spec / @Override public void onMessage(final ClientMessage message) { ActiveMQMessage msg; if (session.isEnable1xPrefixes()) { msg = ActiveMQCompatibleMessage.createMessage(message, session.getCoreSession(), options); } else { msg = ActiveMQMessage.createMessage(message, session.getCoreSession(), options); } if (individualACK) { msg.setIndividualAcknowledge(); } if (clientACK) { msg.setClientAcknowledge(); } try { msg.doBeforeReceive(); } catch (Exception e) { ActiveMQJMSClientLogger.LOGGER.errorPreparingMessageForReceipt(msg.getCoreMessage().toString(), e); return; } if (transactedOrClientAck) { try { message.acknowledge(); } catch (ActiveMQException e) { ((ClientSessionInternal) session.getCoreSession()).markRollbackOnly(); ActiveMQJMSClientLogger.LOGGER.errorProcessingMessage(e); } } try { connection.getThreadAwareContext().setCurrentThread(false); listener.onMessage(msg); } catch (RuntimeException e) { // See JMS 1.1 spec, section 4.5.2 ActiveMQJMSClientLogger.LOGGER.onMessageError(e); if (!transactedOrClientAck) { try { if (individualACK) { message.individualAcknowledge(); } session.getCoreSession().rollback(true); session.setRecoverCalled(true); } catch (Exception e2) { ActiveMQJMSClientLogger.LOGGER.errorRecoveringSession(e2); } } } finally { connection.getThreadAwareContext().clearCurrentThread(false); } if (!session.isRecoverCalled() && !individualACK) { try { // We don't want to call this if the consumer was closed from inside onMessage if (!consumer.isClosed() && !transactedOrClientAck) { message.acknowledge(); } } catch (ActiveMQException e) { ((ClientSessionInternal) session.getCoreSession()).markRollbackOnly(); ActiveMQJMSClientLogger.LOGGER.errorProcessingMessage(e); } } session.setRecoverCalled(false); }}
onMessage方法在transactedOrClientAck为true时会执行message.acknowledge();在触发listener.onMessage(msg)之后会在非session.isRecoverCalled()且非individualACK且非consumer.isClosed()且非transactedOrClientAck时执行message.acknowledge()acknowledgeactivemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.javapublic class ClientMessageImpl extends CoreMessage implements ClientMessageInternal { //...... public ClientMessageImpl acknowledge() throws ActiveMQException { if (consumer != null) { consumer.acknowledge(this); } return this; } //......}
acknowledge方法执行的是consumer.acknowledge(this)方法ClientConsumerImpl.acknowledgeactivemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.javapublic final class ClientConsumerImpl implements ClientConsumerInternal { //...... public void acknowledge(final ClientMessage message) throws ActiveMQException { ClientMessageInternal cmi = (ClientMessageInternal) message; if (ackIndividually) { individualAcknowledge(message); } else { ackBytes += message.getEncodeSize(); if (logger.isTraceEnabled()) { logger.trace(this + "::acknowledge ackBytes=" + ackBytes + " and ackBatchSize=" + ackBatchSize + ", encodeSize=" + message.getEncodeSize()); } if (ackBytes >= ackBatchSize) { if (logger.isTraceEnabled()) { logger.trace(this + ":: acknowledge acking " + cmi); } doAck(cmi); } else { if (logger.isTraceEnabled()) { logger.trace(this + ":: acknowledge setting lastAckedMessage = " + cmi); } lastAckedMessage = cmi; } } } private void doAck(final ClientMessageInternal message) throws ActiveMQException { ackBytes = 0; lastAckedMessage = null; if (logger.isTraceEnabled()) { logger.trace(this + "::Acking message " + message); } session.acknowledge(this, message); } //......}
ClientConsumerImpl的acknowledge方法执行的是doAck方法,而doAck方法执行的是session.acknowledge(this, message)ClientSessionImpl.acknowledgeactivemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.javapublic final class ClientSessionImpl implements ClientSessionInternal, FailureListener { //...... public void acknowledge(final ClientConsumer consumer, final Message message) throws ActiveMQException { // if we're pre-acknowledging then we don't need to do anything if (preAcknowledge) { return; } checkClosed(); if (logger.isDebugEnabled()) { logger.debug("client ack messageID = " + message.getMessageID()); } startCall(); try { sessionContext.sendACK(false, blockOnAcknowledge, consumer, message); } finally { endCall(); } } //......}
ClientSessionImpl的acknowledge方法通过sessionContext.sendACK来发送ack小结ActiveMQMessageConsumer的receive采用的是拉模式,它对于session.getAcknowledgeMode()为ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE执行jmsMsg.setIndividualAcknowledge(),其余的都执行coreMessage.acknowledge();ClientMessageImpl的acknowledge方法执行的是consumer.acknowledge(this)ClientConsumerImpl的handleMessage采用的是推模式,它会执行buffer.addTail(message, message.getPriority())以及queueExecutor(),queueExecutor通过sessionExecutor执行Runner;Runner的run方法会执行callOnMessage方法,它会从buffer.poll()消息,如果不为null且非expired则执行theHandler.onMessage(message);最后触发的是执行的是consumer.acknowledge(this)方法ClientConsumerImpl的acknowledge方法执行的是doAck方法,而doAck方法执行的是session.acknowledge(this, message);ClientSessionImpl的acknowledge方法通过sessionContext.sendACK来发送ackdocActiveMQMessageConsumerClientConsumerImplClientSessionImpl(图片来源网络,侵删)
0 评论