博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ActiveMQ学习笔记06 - 消费者负载均衡与高可用
阅读量:6883 次
发布时间:2019-06-27

本文共 3857 字,大约阅读时间需要 12 分钟。

hot3.png

ActiveMQ Broker提供基于LevelDB复制的方式提供高可用服务,但是对负载均衡做的很弱,只支持Static的服务器之间转发。目前比较流行的消息分片竟然不支持。但是消费者的负载均衡和高可用还是比较完善的。另外说一下,生产者的高可用和负载均衡,一般是靠外围程序控制。比如,基于Tomcat的web程序作为生产者,那么这个web程序的高可用,需要靠tomcat等外围程序。所以一般所说的高可用,主要指Broker和Consumer。

下面介绍一下几个常用的消费者策略。

Exclusive Consumer:

用于处理Queue的高可用。如果同时使用多个消费者从同一个Queue消费消息,那么消息的顺序性将得不到保证。这时候可以使用Exclusive Consumer。使用Exclusive Consumer可以保证只有一个消费者在消费这个Queue,其他的消费者处于等待状态。一旦处理消费状态的消费者不可用,系统会自动使用失效转移机制,选择到一个新的消费者继续消费。

使用如下:

queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");

只需要在Destination上增加consumer.exclusive=true参数即可。

还可以给消费者设置优先级,用于针对网络和服务器资源不同的情况。

queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true&consumer.priority=10");

Message Groups:

ActiveMQ 5.3版本新增加了这个功能。Message Groups可以看做Exclusive Consumer的升级版,是一个可以并行的Exclusive Consumer。原理是通过使用JMSXGroupID来定义消息组。拥有相同的JMSXGroupID的消息将发送到同一个Queue,这个技术类似于Sesssion Sticky技术。这样既可以保证消费者的高可用(因为可以有多个消费者消费同一队列),又可以保证消息按顺序消费,还不会像Exclusive Consumer一样浪费资源(需要额外的处于等待状态的消费者待命)。如果消费者被关闭或者消息组被关闭,这个拥有JMSXGroupID的消息会自动被发送到其他消费者。

设置JMSXGroupID的例子如下:

Mesasge message = session.createTextMessage("foo");   message.setStringProperty("JMSXGroupID", "your business key");   ...producer.send(message);

关闭消息组,通过设置JMSXGroupSeq的值为-1,例子如下:

Mesasge message = session.createTextMessage("foo");message.setStringProperty("JMSXGroupID", "your business key");message.setIntProperty("JMSXGroupSeq", -1);// ...producer.send(message);

可以使用JMSXGroupFirstForConsumer来判断这个消费者是否是第一次消费这个JMSXGroupID的消息:

if (message.getBooleanProperty("JMSXGroupFirstForConsumer")) {   // flush cache for groupId}

如果Broker中已经有消息了,这时由于启动消费者的速度不一致,可能会导致某些消费者先启动并率先消费消息,导致A消费者的负载不均匀。可以在Broker中配置timeBeforeDispatchStarts,让消费者延迟一段时间再开始负载消费。或者配置consumersBeforeDispatchStarts,让消费者达到一定数量再开始负载消费。

具体配置如下,修改${ACTIVEMQ_HOME}\conf\activemq.xml:

  
    
      
" consumersBeforeDispatchStarts="2" timeBeforeDispatchStarts="2000"/>    
  

设置有两个消费者都启动好或者2秒之后,在开始负载消费消息。

Virtual Topics:

Message Groups只能用于Queue,那么Topic对应的版本,就是这个Virtual Topics。他实现了和Message Groups类似的功能,负载均衡和失效转移。

对于消息的生产者来说,Virtual Topics只是一个普通的Topic,但是必须以VirtualTopic.(可配置)开头,如VirtualTopic.Foo。

而消息的消费者连接是一个队列。这个队列需要遵从以下规则,即Consumer.ClientID.VirtualTopicName。例如:Consumer.CilentA.VirtualTopic.Foo,表示消费者的Client id是ClientA,消费的是VirtualTopic.Foo这个Topic。

消息生产者代码示例:

MessageProducer producer = session.createProducer(new ActiveMQTopic("VirtualTopic.FOO"));TextMessage message = session.createTextMessage("foo");producer.send(message);

消息消费者代码示例:

MessageConsumer consumerA = session.createConsumer(new ActiveMQQueue("Consumer.ClientA.VirtualTopic.FOO"));  consumerA.setMessageListener(new MessageListener() {    public void onMessage(Message message) {        // do something ...    }});

可以看到,通过Virtual Topics注册的queue订阅关系如下:

163436_7kIU_719192.png

这里使用了2个client,A和B,分别订阅了两个主题pojo_topic和string_topic。

而消息的存储格式如下:

164349_fv3I_719192.png

通过上图可以看到,消息是以Queue的形式存储,并非Topic,但是Queue被直接根据所订阅的ClientID生成多份消息。比如,VirtualTopic.string_topic有两个订阅者A和B,那消息就被分成Consumer.A.VirtualTopic.string_topic和Consumer.B.VirtualTopic.string_topic。这就是将Topic通过Virtual Topics转换成了Queue。

只要转换成了Queue,就可以结合使用刚才介绍的Exclusive Consumer或者Message Groups对这个VirtualTopic的队列进行高可用和负载均衡的配置,从而实现Topic的高可用和负载均衡。

可以配置Broker,改变虚拟主题的默认前缀,如下面的配置,则表示虚拟主题的前缀是VirtualTopicConsumers。

    
      
        
          
" prefix="VirtualTopicConsumers.*." selectorAware="true"/>        
      
    

从ActiveMQ 5.4版本开始,可以配置selectorAware属性控制只有符合订阅者规则的消息才被分发给相应的虚拟队列,用于防止分发不匹配的消息,提升效率。

说了优点,当然也要说缺点。优点是可以实现高可用和负载均衡;缺点是,如果订阅者很多,每个订阅者都需要复制一份消息,这样会占用过多的磁盘空间,造成消息爆炸。

Composite Destinations:

可以将消息发送给多个Destination。可以混合发送Queue和Topic。例如:

Queue queue = new ActiveMQQueue("FOO.A,topic://NOTIFY.FOO.A");

个人感觉这个功能比较鸡肋,属于客户端控制服务器端的消息高可用。如果是纯粹的消息可用性复制,可以直接使用基于LevelDb的消息复制机制或基于JDBC的主从同步机制。

转载于:https://my.oschina.net/u/719192/blog/293749

你可能感兴趣的文章
IBM在云计算中推动了Swift并使用了Swift的运行环境、包目录和其更多属性
查看>>
MongoDB 是如何鼓励和激励开发者社区的
查看>>
Apple开源新的压缩算法LZFSE
查看>>
.NET Core运行时和基础类库性能提升
查看>>
当编程语言掌握在企业手中,是生机还是危机?
查看>>
HTML5 Canvas玩转酷炫大波浪进度图
查看>>
RethinkDB已经将其数据库移植到Windows
查看>>
可观测性对测试的影响:QCon伦敦大会上对Amy Phillips的访谈
查看>>
Sharding-Sphere成长记——写在分布式数据库代理端里程碑版本3.0.0发布之际
查看>>
kafka 备忘
查看>>
微软为 Chrome 带来更流畅的页面滚动效果,来自 Edge
查看>>
C++与Java语法上的不同
查看>>
微软262亿美元收购LinkedIn
查看>>
c/c++(hiredis)异步调用redis【转】
查看>>
Ceph集群块设备使用-创建和使用OSD
查看>>
大数据||hadoop分布式集群安装
查看>>
华为设备默认console密码
查看>>
wxWidgets第四课 EVT_LEFT_UP关联鼠标弹起事件不生效
查看>>
【故障解决】ORA-06502错误解决
查看>>
升级Windows 10周年更新部分用户遭遇卡死BUG
查看>>