推拉聊聊消息模式artemis(的是执行方法模式推拉)「推拉式的聊天」

序本文主要研究一下artemis消息的推拉模式拉模式receiveactivemq-artemis-2.11.0/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.javapublic final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscriber {​   //......​   @Override   public Message receive() throws JMSException {     return getMessage(0, false);   }​   @Override   public Message receive(final long timeout) throws JMSException {     return getMessage(timeout, false);   }​   @Override   public Message receiveNoWait() throws JMSException {     return getMessage(0, true);   }​   private ActiveMQMessage getMessage(final long timeout, final boolean noWait) throws JMSException {     try {         ClientMessage coreMessage;​         if (noWait) {           coreMessage = consumer.receiveImmediate();         } else {           coreMessage = consumer.receive(timeout);         }​         ActiveMQMessage jmsMsg = null;​         if (coreMessage != null) {           ClientSession coreSession = session.getCoreSession();           boolean needSession = ackMode == Session.CLIENT_ACKNOWLEDGE ||               ackMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE ||               coreMessage.getType() == ActiveMQObjectMessage.TYPE;​           if (coreMessage.getRoutingType() == null) {               coreMessage.setRoutingType(destination.isQueue() ? RoutingType.ANYCAST : RoutingType.MULTICAST);           }           if (session.isEnable1xPrefixes()) {               jmsMsg = ActiveMQCompatibleMessage.createMessage(coreMessage, needSession ? coreSession : null, options);           } else {               jmsMsg = ActiveMQMessage.createMessage(coreMessage, needSession ? coreSession : null, options);           }​           try {               jmsMsg.doBeforeReceive();           } catch (IndexOutOfBoundsException ioob) {               ((ClientSessionInternal) session.getCoreSession()).markRollbackOnly();               // In case this exception happen you will need to know where it happened.               // it has been a bug here in the past, and this was used to debug it.               // nothing better than keep it for future investigations in case it happened again               IndexOutOfBoundsException newIOOB = new IndexOutOfBoundsException(ioob.getMessage() + "@" + jmsMsg.getCoreMessage());               newIOOB.initCause(ioob);               ActiveMQClientLogger.LOGGER.unableToGetMessage(newIOOB);               throw ioob;           }​           // We Do the ack after doBeforeReceive, as in the case of large messages, this may fail so we don't want messages redelivered           // https://issues.jboss.org/browse/JBPAPP-6110           if (session.getAcknowledgeMode() == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE) {               jmsMsg.setIndividualAcknowledge();           } else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) {               jmsMsg.setClientAcknowledge();               coreMessage.acknowledge();           } else {               coreMessage.acknowledge();           }         }​         return jmsMsg;     } catch (ActiveMQException e) {         ((ClientSessionInternal) session.getCoreSession()).markRollbackOnly();         throw JMSExceptionHelper.convertFromActiveMQException(e);     } catch (ActiveMQInterruptedException e) {         ((ClientSessionInternal) session.getCoreSession()).markRollbackOnly();         throw JMSExceptionHelper.convertFromActiveMQException(e);     }   }​   //......}ActiveMQMessageConsumer的receive方法最后调用的是getMessage方法,它对于session.getAcknowledgeMode()为ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE执行jmsMsg.setIndividualAcknowledge(),其余的都执行coreMessage.acknowledge()acknowledgeactivemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.javapublic class ClientMessageImpl extends CoreMessage implements ClientMessageInternal {​   //......​   public ClientMessageImpl acknowledge() throws ActiveMQException {     if (consumer != null) {         consumer.acknowledge(this);     }​     return this;   }​   //......}ClientMessageImpl的acknowledge方法执行的是consumer.acknowledge(this)推模式handleMessageactivemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.javapublic final class ClientConsumerImpl implements ClientConsumerInternal {​   //......​   public synchronized void handleMessage(final ClientMessageInternal message) throws Exception {     if (closing) {         // This is ok - we just ignore the message         return;     }​     if (message.getBooleanProperty(Message.HDR_LARGE_COMPRESSED)) {         handleCompressedMessage(message);     } else {         handleRegularMessage(message);     }   }​   private void handleRegularMessage(ClientMessageInternal message) {     if (message.getAddress() == null) {         message.setAddress(queueInfo.getAddress());     }​     message.onReceipt(this);​     if (!ackIndividually && message.getPriority() != 4 && !message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {         // We have messages of different priorities so we need to ack them individually since the order         // of them in the ServerConsumerImpl delivery list might not be the same as the order they are         // consumed in, which means that acking all up to won't work         ackIndividually = true;     }​     // Add it to the buffer     buffer.addTail(message, message.getPriority());​     if (handler != null) {         // Execute using executor         if (!stopped) {           queueExecutor();         }     } else {         notify();     }   }​   private void queueExecutor() {     if (logger.isTraceEnabled()) {         logger.trace(this + "::Adding Runner on Executor for delivery");     }​     sessionExecutor.execute(runner);   }​   //......​}ClientConsumerImpl的handleRegularMessage会执行buffer.addTail(message, message.getPriority())以及queueExecutor(),queueExecutor通过sessionExecutor执行RunnercallOnMessageactivemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.javapublic final class ClientConsumerImpl implements ClientConsumerInternal {​   //......​   private final PriorityLinkedList<ClientMessageInternal> buffer = new PriorityLinkedListImpl<>(ClientConsumerImpl.NUM_PRIORITIES);​   //......   ​   private class Runner implements Runnable {​     @Override     public void run() {         try {           callOnMessage();         } catch (Exception e) {           ActiveMQClientLogger.LOGGER.onMessageError(e);​           lastException = e;         }     }   }​   private void callOnMessage() throws Exception {     if (closing || stopped) {         return;     }​     session.workDone();​     // We pull the message from the buffer from inside the Runnable so we can ensure priority     // ordering. If we just added a Runnable with the message to the executor immediately as we get it     // we could not do that​     ClientMessageInternal message;​     // Must store handler in local variable since might get set to null     // otherwise while this is executing and give NPE when calling onMessage     MessageHandler theHandler = handler;​     if (theHandler != null) {         if (rateLimiter != null) {           rateLimiter.limit();         }​         failedOver = false;​         synchronized (this) {           message = buffer.poll();         }​         if (message != null) {           if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {               //Ignore, this could be a relic from a previous receiveImmediate();               return;           }​           boolean expired = message.isExpired();​           flowControlBeforeConsumption(message);​           if (!expired) {               if (logger.isTraceEnabled()) {                 logger.trace(this + "::Calling handler.onMessage");               }               final ClassLoader originalLoader = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {                 @Override                 public ClassLoader run() {                     ClassLoader originalLoader = Thread.currentThread().getContextClassLoader();​                     Thread.currentThread().setContextClassLoader(contextClassLoader);​                     return originalLoader;                 }               });​               onMessageThread = Thread.currentThread();               try {                 theHandler.onMessage(message);               } finally {                 try {                     AccessController.doPrivileged(new PrivilegedAction<Object>() {                       @Override                       public Object run() {                           Thread.currentThread().setContextClassLoader(originalLoader);                           return null;                       }                     });                 } catch (Exception e) {                     ActiveMQClientLogger.LOGGER.failedPerformPostActionsOnMessage(e);                 }​                 onMessageThread = null;               }​               if (logger.isTraceEnabled()) {                 logger.trace(this + "::Handler.onMessage done");               }​               if (message.isLargeMessage()) {                 message.discardBody();               }           } else {               session.expire(this, message);           }​           // If slow consumer, we need to send 1 credit to make sure we get another message           if (clientWindowSize == 0) {               startSlowConsumer();           }         }     }   }​   //......}   Runner的run方法会执行callOnMessage方法,它会从buffer.poll()消息,如果不为null且非expired则执行theHandler.onMessage(message)onMessageactivemq-artemis-2.11.0/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.javapublic class JMSMessageListenerWrapper implements MessageHandler {​   private final ConnectionFactoryOptions options;   private final ActiveMQConnection connection;​   private final ActiveMQSession session;​   private final MessageListener listener;​   private final ClientConsumer consumer;​   private final boolean transactedOrClientAck;​   private final boolean individualACK;​   private final boolean clientACK;​   protected JMSMessageListenerWrapper(final ConnectionFactoryOptions options,                                       final ActiveMQConnection connection,                                       final ActiveMQSession session,                                       final ClientConsumer consumer,                                       final MessageListener listener,                                       final int ackMode) {     this.options = options;​     this.connection = connection;​     this.session = session;​     this.consumer = consumer;​     this.listener = listener;​     transactedOrClientAck = (ackMode == Session.SESSION_TRANSACTED || ackMode == Session.CLIENT_ACKNOWLEDGE) || session.isXA();​     individualACK = (ackMode == ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE);​     clientACK = (ackMode == Session.CLIENT_ACKNOWLEDGE);   }​   /   In this method we apply the JMS acknowledgement and redelivery semantics   as per JMS spec   /   @Override   public void onMessage(final ClientMessage message) {     ActiveMQMessage msg;​     if (session.isEnable1xPrefixes()) {         msg = ActiveMQCompatibleMessage.createMessage(message, session.getCoreSession(), options);     } else {         msg = ActiveMQMessage.createMessage(message, session.getCoreSession(), options);     }​     if (individualACK) {         msg.setIndividualAcknowledge();     }​     if (clientACK) {         msg.setClientAcknowledge();     }​     try {         msg.doBeforeReceive();     } catch (Exception e) {         ActiveMQJMSClientLogger.LOGGER.errorPreparingMessageForReceipt(msg.getCoreMessage().toString(), e);         return;     }​     if (transactedOrClientAck) {         try {           message.acknowledge();         } catch (ActiveMQException e) {           ((ClientSessionInternal) session.getCoreSession()).markRollbackOnly();           ActiveMQJMSClientLogger.LOGGER.errorProcessingMessage(e);         }     }​     try {         connection.getThreadAwareContext().setCurrentThread(false);         listener.onMessage(msg);     } catch (RuntimeException e) {         // See JMS 1.1 spec, section 4.5.2​         ActiveMQJMSClientLogger.LOGGER.onMessageError(e);​         if (!transactedOrClientAck) {           try {               if (individualACK) {                 message.individualAcknowledge();               }​               session.getCoreSession().rollback(true);​               session.setRecoverCalled(true);           } catch (Exception e2) {               ActiveMQJMSClientLogger.LOGGER.errorRecoveringSession(e2);           }         }     } finally {         connection.getThreadAwareContext().clearCurrentThread(false);     }     if (!session.isRecoverCalled() && !individualACK) {         try {           // We don't want to call this if the consumer was closed from inside onMessage           if (!consumer.isClosed() && !transactedOrClientAck) {               message.acknowledge();           }         } catch (ActiveMQException e) {           ((ClientSessionInternal) session.getCoreSession()).markRollbackOnly();           ActiveMQJMSClientLogger.LOGGER.errorProcessingMessage(e);         }     }​     session.setRecoverCalled(false);   }}onMessage方法在transactedOrClientAck为true时会执行message.acknowledge();在触发listener.onMessage(msg)之后会在非session.isRecoverCalled()且非individualACK且非consumer.isClosed()且非transactedOrClientAck时执行message.acknowledge()acknowledgeactivemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientMessageImpl.javapublic class ClientMessageImpl extends CoreMessage implements ClientMessageInternal { //......​   public ClientMessageImpl acknowledge() throws ActiveMQException {     if (consumer != null) {         consumer.acknowledge(this);     }​     return this;   }​ //......​}acknowledge方法执行的是consumer.acknowledge(this)方法ClientConsumerImpl.acknowledgeactivemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.javapublic final class ClientConsumerImpl implements ClientConsumerInternal {​   //......​   public void acknowledge(final ClientMessage message) throws ActiveMQException {     ClientMessageInternal cmi = (ClientMessageInternal) message;​     if (ackIndividually) {         individualAcknowledge(message);     } else {​         ackBytes += message.getEncodeSize();​         if (logger.isTraceEnabled()) {           logger.trace(this + "::acknowledge ackBytes=" + ackBytes + " and ackBatchSize=" + ackBatchSize + ", encodeSize=" + message.getEncodeSize());         }​         if (ackBytes >= ackBatchSize) {           if (logger.isTraceEnabled()) {               logger.trace(this + ":: acknowledge acking " + cmi);           }           doAck(cmi);         } else {           if (logger.isTraceEnabled()) {               logger.trace(this + ":: acknowledge setting lastAckedMessage = " + cmi);           }           lastAckedMessage = cmi;         }     }   }​   private void doAck(final ClientMessageInternal message) throws ActiveMQException {     ackBytes = 0;​     lastAckedMessage = null;​     if (logger.isTraceEnabled()) {         logger.trace(this + "::Acking message " + message);     }​     session.acknowledge(this, message);   }​   //......}ClientConsumerImpl的acknowledge方法执行的是doAck方法,而doAck方法执行的是session.acknowledge(this, message)ClientSessionImpl.acknowledgeactivemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.javapublic final class ClientSessionImpl implements ClientSessionInternal, FailureListener {​   //......​   public void acknowledge(final ClientConsumer consumer, final Message message) throws ActiveMQException {     // if we're pre-acknowledging then we don't need to do anything     if (preAcknowledge) {         return;     }​     checkClosed();     if (logger.isDebugEnabled()) {         logger.debug("client ack messageID = " + message.getMessageID());     }​     startCall();     try {         sessionContext.sendACK(false, blockOnAcknowledge, consumer, message);     } finally {         endCall();     }   }​   //......}ClientSessionImpl的acknowledge方法通过sessionContext.sendACK来发送ack小结ActiveMQMessageConsumer的receive采用的是拉模式,它对于session.getAcknowledgeMode()为ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE执行jmsMsg.setIndividualAcknowledge(),其余的都执行coreMessage.acknowledge();ClientMessageImpl的acknowledge方法执行的是consumer.acknowledge(this)ClientConsumerImpl的handleMessage采用的是推模式,它会执行buffer.addTail(message, message.getPriority())以及queueExecutor(),queueExecutor通过sessionExecutor执行Runner;Runner的run方法会执行callOnMessage方法,它会从buffer.poll()消息,如果不为null且非expired则执行theHandler.onMessage(message);最后触发的是执行的是consumer.acknowledge(this)方法ClientConsumerImpl的acknowledge方法执行的是doAck方法,而doAck方法执行的是session.acknowledge(this, message);ClientSessionImpl的acknowledge方法通过sessionContext.sendACK来发送ackdocActiveMQMessageConsumerClientConsumerImplClientSessionImpl
推拉聊聊消息模式artemis(的是执行方法模式推拉)
(图片来源网络,侵删)

联系我们

在线咨询:点击这里给我发消息