最近在做的视频转码服务采用了ActiveMQ消息服务分配转码任务。大致逻辑时每个视频会配置若干种处理模板,每个模板会创建一条任务,通过mq发送到各个消费者那里进行处理。消费者处理接受后,通过http接口回调告知任务结束。

视频转码是个非常耗时的计算任务。在实际测试时发现,多个消费者的情况下,消息似乎是在实际消费之前就已经被指定了消费者,所以出现了某个消费者已经干完活开始闲置的时候,队列中仍然有消息处于pending状态。 或是队列中存在很多消息的情况下,创建新的消费者,但其并不立刻工作,直到有新的消息进入队列。

于是开始查找资料,发现ActiveMQ官网上有一篇文章介绍慢消费者的处理: http://activemq.apache.org/slow-consumer-handling.html

在研究此文之后,知道了预取策略这个概念:prefetch policy。

这里需要了解消息队列的反馈机制:

原图出自:GenTuning-Consumer-Prefetch [https://access.redhat.com/documentation/en-US/Fuse_ESB_Enterprise/7.1/html/ActiveMQ_Tuning_Guide/files/GenTuning-Consumer-Prefetch.html]

Broker发送消息给消费者,消费者在处理结束后会发送ack反馈给broker。为了提高消息分发的效率,引入了预取策略。即,Broker在未得到消费者的ack反馈之前,会继续发送新消息给它,除非消费者的消息缓存区已满,或是未收到反馈的消息数达到了prefetch上限。

需要注意的是,消息被prefetch后,仍然会在ActiveMQ的控制台里处于Pending状态——直到它被实际消费,Broker收到了反馈,才会认为其Dequeued.

明白了这个道理,就知道了上面提到的“消息似乎被预先指定给各个消费者”的原因了——因为消息都被prefetch了。这个时候,即便有新的消费者加入,它也没办法处理别人已经prefetch的消息。

关于预取策略,ActiveMQ设置了一些默认参数:

Queue consumer:默认1000
Queue browser:默认500
Topic consumer:默认32766
Durable topic subscriber:默认100

预取策略的配置,需要考虑应用的实际需要。对于快消费的业务来说,给定较大的prefetch,能够提高消费效率。而对于像上面提到的视频转码这类慢消费业务,较大的prefetch势必会导致闲置消费者的大量存在。

预取的确能提高快消费类型业务的消息传送效率,但同时也会带来风险:如果某个消费者出错的话,会导致其预取的消息都不能得到有效处理。

通常,如果只有一个消费者,可直接使用prefetch的默认值,甚至更大的值。但如果是多个消费者,且消息的消费速度远大于传递消息的时耗,那就可以设置prefetch为1甚至为0。为1即预取1条消息,为0则完全不预取,消费者的ack响应被Broker收到后才会发送下一条消息。

前面提到的都是关于Queue的,如果是Topic-Subscriber的方式,增大prefetch能带来性能的提升。

如果是在程序中创建消费者,预取策略的配置可参考此链接: ActiveMQPrefetchPolicy Config [http://massapi.com/class/ac/ActiveMQPrefetchPolicy-4.html]

最后给出我利用spring来配置ActiveMQ预取策略的xml片断


<?xml version="1.0" encoding="UTF-8"?>
    <!-- ……省略若干内容…… -->
    <!-- 预取策略 -->
    <bean id="prefetchPolicy" class="org.apache.activemq.ActiveMQPrefetchPolicy">
        <property name="queuePrefetch" value="1" />
    </bean>
            
    <!-- 连接工厂 -->
    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="${mq.brokerURL}" />
        <property name="userName" value="${mq.userName}" />
        <property name="password" value="${mq.password}" />
        <property name="prefetchPolicy" ref="prefetchPolicy" />
    </bean>
    
	<bean id="receiveQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
	    <constructor-arg index="0" value="${mq.queues}" />
	</bean>
	
	<bean id="messageQueueService" class="com.demo.controller.MainClass">
	</bean>
	
	<bean id="queueListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
	    <constructor-arg ref="messageQueueService"></constructor-arg>
	    <property name="defaultListenerMethod" value="receive" />
	</bean>
	
	<bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	    <property name="connectionFactory" ref="connectionFactory" />
	    <property name="destination" ref="receiveQueueDestination" />
	    <property name="messageListener" ref="queueListener" />
	    <property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE"/>
	</bean>
	
</beans>