`
myangle89
  • 浏览: 96092 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

ActiveMQ集群应用

    博客分类:
  • java
 
阅读更多

ActiveMQ集群

        ActiveMQ具有强大和灵活的集群功能,但在使用的过程中会发现很多的缺点,ActiveMQ的集群方式主要由两种:Master-Slave(ActiveMQ5.8版本已不可用)和Broker Cluster。

1、Master-Slave

        Master-Slave方式中,只能是Master提供服务,Slave是实时地备份Master的数据,以保证消息的可靠性。当Master失效时,Slave会自动升级为Master,客户端会自动连接到Slave上工作。Master-Slave模式分为三类:Pure Master Slave、Shared File System Master Slave和JDBC Master Slave。

(1)Pure Master Slave

    需要两个Broker,一个作为Master,另一个作为Slave,运行时,Slave通过网络实时从Master处复制数据,同时,如果Slave和Master失去连接,Slave就会自动升级为Master,继续为客户端提供消息服务,如图所示:

    实践时,我们使用两个ActiveMQ服务器,一个作为Master,Master不需要做特殊的配置;另一个作为Slave,配置${ACTIVEMQ_HOME}/conf/activemq.xml文件,在<broker>节点中添加连接到Master的URI和设置Master失效后不关闭Slave,如下:

 

Xml代码  收藏代码
  1. <broker xmlns="http://activemq.apache.org/schema/core" brokerName="pure_slave" masterConnectorURI="tcp://0.0.0.0:61616" shutdownOnMasterFailure="false" dataDirectory="${activemq.base}">  

 

 同时修改Slave的服务端口,如:

 

Xml代码  收藏代码
  1. <transportConnectors>  
  2.             <transportConnector name="openwire" uri="tcp://0.0.0.0:61617"/>  
  3. </transportConnectors>  

 

为了看到实践的效果,Master和Slave的消息持久化介质都是采用MySQL,并且Master和Slave分别连接不同的数据库。

    在消息生产者应用和消息消费者应用的Spring配置文件中添加以下红色内容:

 客户端连接消息中间服务器的url应修改为“failover:(tcp://localhost:61616,tcp://localhost:61617)?initialReconnectDelay=100

 

    配置完成后,我们可以通过以下步骤来进行测试:

A、启动Master和Slave,启动消息生产者应用,并分别发送一些Queue消息和Topic消息,如果此时订阅Topic消息的消费者设置了clientID,我们就可以在Master的数据库和Slave的数据库中看到尚未消费的消息,包括Queue和Topic的消息;

B、启动消费者应用,可以接收到消息;

C、关闭消费者,生产者继续发送一些消息A;

D、停止Master;

E、生产者继续发送消息B;

F、启动消费者应用,消费者可以接收到消息A和消息B,说明Slave接替了Master的工作并复制了Master的消息。

    这种方式只能两台机器做集群,可以起到很好的双机热备功能,但只能失效一次,只能停机恢复Master-Slave结构。

 

 

(2)Shared File System Master Slave

        Shared File System Master Slave就是利用共享文件系统做ActiveMQ集群,是基于ActiveMQ的默认数据库kahaDB完成的,kahaDB的底层是文件系统。这种方式的集群,Slave的个数没有限制,哪个ActiveMQ实例先获取共享文件的锁,那个实例就是Master,其它的ActiveMQ实例就是Slave,当当前的Master失效,其它的Slave就会去竞争共享文件锁,谁竞争到了谁就是Master。这种模式的好处就是当Master失效时不用手动去配置,只要有足够多的SlaveShared File System Master Slave模式如图所示:

    本例子是在一台机器上运行三个ActiveMQ实例,需要对ActiveMQ的配置文件做一些简单的配置,就是把持久化适配器的存储目录改为本地磁盘的一个固定目录,三个实例共享这个目录,如下:

 

Xml代码  收藏代码
  1. <persistenceAdapter>  
  2.     <kahaDB directory="${activemq.data}/shared_file/data/kahadb" />  
  3. </persistenceAdapter> 

然后修改ActiveMQ实例的服务端口和jetty的服务端口,防止端口占用异常。启动三个ActiveMQ实例,就可以进行测试了。

 

    以上配置只能在一台机器进行,如果各个ActiveMQ实例需要运行在不同的机器,就需要用到分布式文件系统了。

 

 

(3)JDBC Master Slave

        JDBC Master Slave模式和Shared File Sysytem Master Slave模式的原理是一样的,只是把共享文件系统换成了共享数据库。我们只需在所有的ActiveMQ的主配置文件中(${ACTIVEMQ_HOME}/conf/activemq.xml)添加数据源,所有的数据源都指向同一个数据库,如:

 

Xml代码  收藏代码
  1. <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">  
  2.         <property name="driverClassName" value="com.mysql.jdbc.Driver"/>  
  3.         <property name="url" value="jdbc:mysql://localhost:3306/cluster_jdbc?relaxAutoCommit=true"/>  
  4.         <property name="username" value="root"/>  
  5.         <property name="password" value="root"/>  
  6.         <property name="maxActive" value="200"/>  
  7.         <property name="poolPreparedStatements" value="true"/>  
  8. </bean>  

 

 

然后修改持久化适配器,修改配置文件的内容为:

 

[plain] view plaincopy
 
  1. <persistenceAdapter>  
  2.             <!--<kahaDB directory="${activemq.data}/kahadb"/>-->  
  3.         <jdbcPersistenceAdapter dataDirectory="${activemq.data}" dataSource="#mysql-ds"/>  
  4. </persistenceAdapter>  



 

 

这种方式的集群相对Shared File System Master Slave更加简单,更加容易地进行分布式部署,但是如果数据库失效,那么所有的ActiveMQ实例都将失效。

 

    以上三种方式的集群都不支持负载均衡,但可以解决单点故障的问题,以保证消息服务的可靠性。

 

 

2、Broker Cluster

        Broker Cluster主要是通过network of Brokers在多个ActiveMQ实例之间进行消息的路由。Broker的集群分为Static DiscoveryDynamic Discovery两种。

(1)Static Discovery集群

Static Discovery集群就是通过硬编码的方式使用所有已知ActiveMQ实例节点的URI地址。如:消息生产者应用连接一个ActiveMQ实例,我们暂时称为MQ1,所有的消息都由该实例提供;两个消息消费者应用分别连接另外两个ActiveMQ实例,分别为MQ2MQ3,两个消息消费者需要消费MQ1上的消息,但它们连接的都不是MQ1,可以通过Static Discovery方式把MQ1上的消息路由到MQ2MQ3,为了保证消费者不因某个节点的失效而导致不能消费消息,在消费者应用中需要配置所有节点的URI

生产者ActiveMQ实例不需要特殊的配置,所有的消费者ActiveMQ实例需要添加networkConnectors节点,连接到生产者MQ实例,如:

 

[plain] view plaincopy
 
  1. <networkConnectors>  
  2.             <networkConnector uri="static:(tcp://localhost:61616)" duplex="true"/>  
  3. </networkConnectors>  

Static Discovery集群方式有些缺点,如不能解决单点故障问题,若某个Broker失效时,有可能造成数据的丢失,动态添加节点不够智能化。

 

 

(2)Dynamic Discovery集群

        Dynamic Discovery集群方式在配置ActiveMQ实例时,不需要知道所有其它实例的URI地址,只需在所有实例的${ACTIVEMQ_HOME}/conf/activemq.xml文件中添加以下内容:

[plain] view plaincopy
 
  1. <networkConnectors>  
  2.           <networkConnector uri="multicast://default"/>  
  3. </networkConnectors>  

同时在<transportConnectors>节点中添加以下红色部分内容:

 

[plain] view plaincopy
 
  1. <transportConnectors>  
  2.             <transportConnector name="openwire" uri="tcp://0.0.0.0:61616" <span style="color:#ff0000;">discoveryUri="multicast://default"</span> />  
  3.  </transportConnectors>  

 

 

这样就可以实现消息在所有ActiveMQ实例之间进行路由。Dynamic Discovery集群方式的缺点和Static Discovery一样。

    从以上的分析可以看出,Master-Slave模式不支持负载均衡,但可以通过消息的实时备份或共享保证消息服务的可靠性,Broker Cluster模式支持负载均衡,可以提高消息的消费能力,但不能保证消息的可靠性。所以为了支持负载均衡,同时又保证消息的可靠性,我们可以采用Msater-Slave+Broker Cluster的模式。

分享到:
评论
1 楼 yangjianzhouctgu 2015-06-28  
可否将项目源码作为附件?我这边slave进行如下配置:

 <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}"  masterConnectorURI="tcp://192.168.1.102:61616" shutdownOnMasterFailure="false" >


启动时报错如下:

g.xml.sax.SAXParseException; lineNumber: 40; columnNumber: 197; cvc-complex-type.3.2.2: Attribute 'masterConnectorURI' is not allowed to appear in element 'broker'.
org.springframework.beans.factory.xml.XmlBeanDefinitionStoreException: Line 40 in XML document from file [/usr/MQ/activeMQ/apache-activemq-5.11.1/conf/activemq.xml] is invalid; nested exception is org.xml.sax.SAXParseException; lineNumber: 40; columnNumber: 197; cvc-complex-type.3.2.2: Attribute 'masterConnectorURI' is not allowed to appear in element 'broker'.
	at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.doLoadBeanDefinitions(XmlBeanDefinitionReader.java:396)
	at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.loadBeanDefinitions(XmlBeanDefinitionReader.java:334)
	at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.loadBeanDefinitions(XmlBeanDefinitionReader.java:302)
	at org.apache.xbean.spring.context.ResourceXmlApplicationContext.loadBeanDefinitions(ResourceXmlApplicationContext.java:111)
	at org.apache.xbean.spring.context.ResourceXmlApplicationContext.loadBeanDefinitions(ResourceXmlApplicationContext.java:104)
	at org.springframework.context.support.AbstractRefreshableApplicationContext.refreshBeanFactory(AbstractRefreshableApplicationContext.java:130)
	at org.springframework.context.support.AbstractApplicationContext.obtainFreshBeanFactory(AbstractApplicationContext.java:539)
	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:451)
	at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:64)
	at org.apache.xbean.spring.context.ResourceXmlApplicationContext.<init>(ResourceXmlApplicationContext.java:52)
	at org.apache.activemq.xbean.XBeanBrokerFactory$1.<init>(XBeanBrokerFactory.java:104)
	at org.apache.activemq.xbean.XBeanBrokerFactory.createApplicationContext(XBeanBrokerFactory.java:104)
	at org.apache.activemq.xbean.XBeanBrokerFactory.createBroker(XBeanBrokerFactory.java:67)
	at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:71)
	at org.apache.activemq.broker.BrokerFactory.createBroker(BrokerFactory.java:54)
	at org.apache.activemq.console.command.StartCommand.runTask(StartCommand.java:87)
	at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:57)
	at org.apache.activemq.console.command.ShellCommand.runTask(ShellCommand.java:150)
	at org.apache.activemq.console.command.AbstractCommand.execute(AbstractCommand.java:57)
	at org.apache.activemq.console.command.ShellCommand.main(ShellCommand.java:104)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at org.apache.activemq.console.Main.runTaskClass(Main.java:262)
	at org.apache.activemq.console.Main.main(Main.java:115)
Caused by: org.xml.sax.SAXParseException; lineNumber: 40; columnNumber: 197; cvc-complex-type.3.2.2: Attribute 'masterConnectorURI' is not allowed to appear in element 'broker'.
	at com.sun.org.apache.xerces.internal.util.ErrorHandlerWrapper.createSAXParseException(ErrorHandlerWrapper.java:198)
	at com.sun.org.apache.xerces.internal.util.ErrorHandlerWrapper.error(ErrorHandlerWrapper.java:134)
	at com.sun.org.apache.xerces.internal.impl.XMLErrorReporter.reportError(XMLErrorReporter.java:437)
	at com.sun.org.apache.xerces.internal.impl.XMLErrorReporter.reportError(XMLErrorReporter.java:368)
	at com.sun.org.apache.xerces.internal.impl.XMLErrorReporter.reportError(XMLErrorReporter.java:325)
	at com.sun.org.apache.xerces.internal.impl.xs.XMLSchemaValidator$XSIErrorReporter.reportError(XMLSchemaValidator.java:458)
	at com.sun.org.apache.xerces.internal.impl.xs.XMLSchemaValidator.reportSchemaError(XMLSchemaValidator.java:3237)
	at com.sun.org.apache.xerces.internal.impl.xs.XMLSchemaValidator.processAttributes(XMLSchemaValidator.java:2714)
	at com.sun.org.apache.xerces.internal.impl.xs.XMLSchemaValidator.handleStartElement(XMLSchemaValidator.java:2056)
	at com.sun.org.apache.xerces.internal.impl.xs.XMLSchemaValidator.startElement(XMLSchemaValidator.java:746)
	at com.sun.org.apache.xerces.internal.impl.XMLNSDocumentScannerImpl.scanStartElement(XMLNSDocumentScannerImpl.java:379)
	at com.sun.org.apache.xerces.internal.impl.XMLDocumentFragmentScannerImpl$FragmentContentDriver.next(XMLDocumentFragmentScannerImpl.java:2786)
	at com.sun.org.apache.xerces.internal.impl.XMLDocumentScannerImpl.next(XMLDocumentScannerImpl.java:606)
	at com.sun.org.apache.xerces.internal.impl.XMLNSDocumentScannerImpl.next(XMLNSDocumentScannerImpl.java:117)
	at com.sun.org.apache.xerces.internal.impl.XMLDocumentFragmentScannerImpl.scanDocument(XMLDocumentFragmentScannerImpl.java:510)
	at com.sun.org.apache.xerces.internal.parsers.XML11Configuration.parse(XML11Configuration.java:848)
	at com.sun.org.apache.xerces.internal.parsers.XML11Configuration.parse(XML11Configuration.java:777)
	at com.sun.org.apache.xerces.internal.parsers.XMLParser.parse(XMLParser.java:141)
	at com.sun.org.apache.xerces.internal.parsers.DOMParser.parse(DOMParser.java:243)
	at com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderImpl.parse(DocumentBuilderImpl.java:347)
	at org.springframework.beans.factory.xml.DefaultDocumentLoader.loadDocument(DefaultDocumentLoader.java:75)
	at org.springframework.beans.factory.xml.XmlBeanDefinitionReader.doLoadBeanDefinitions(XmlBeanDefinitionReader.java:388)
	... 25 more

相关推荐

    2019实战ActiveMQ集群与应用实战视频教程

    实战ActiveMQ集群与应用实战视频教程实战

    activeMQ集群的使用与配置[收集].pdf

    activeMQ集群的使用与配置[收集].pdf

    activeMQ集群的使用与配置[归类].pdf

    activeMQ集群的使用与配置[归类].pdf

    实战ActiveMQ集群与应用视频教程.zip

    网盘文件永久链接 1:ActiveMQ入门和消息中间件 2:JMS基本概念和模型 3:JMS的可靠性机制 4:JMS的API结构和开发步骤 5:Broker的启动方式 6:ActiveMQ结合Spring开发 ...10:多线程consumer访问集群 ..........

    ActiveMQ部署方案分析对比

    构建高可用的ActiveMQ系统在生产环境中是非常重要的,单点的ActiveMQ作为企业应用无法满足高可用和集群的需求,所以ActiveMQ提供 了master-slave、broker cluster等多种部署方式,但通过分析多种部署方式之后我认为...

    ActiveMq实战性视频-分布式

    从activemq入门到 实战-activemq集群与应用,分布式。货真价实

    ActiveMQ.txt

    【ActiveMQ】—01—实战ActiveMQ集群与应用.消息队列中间件ActiveMQ入门到精通视频教程及资料一头扎进JMS之ActiveMQ视频教程.

    MQ之ActiveMQ.mmap

    消息中间件已经成为互联网企业应用系统内部通信的核心手段,是目前企业内主流标配技术, 它具有解耦、异步、削峰、签收、事务、流量控制、最终一致性等一系列高性能架构所需功能。 当前使用较多的消息中间件有...

    apache-activemq-5.11.2

    ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位. ⒈ 多种语言和协议编写客户端。语言: Java,C,C++,C#,...

    ActiveMQ高级应用视频教程

    通过本套视频教程您可以掌握JMS消息中间件ActiveMQ的应用场景和精湛技术,学会消息中间件ActiveMQ与Spring的集成环境搭建,以及注册中心Zookeeper的集群和ActiveMQ高可用集群的搭建,与此同时您还可以掌握如何验证高...

    消息中间件之ActiveMQ视频课程

    本套视频以Apache的ActiveMQ作为切入点,分为基础/实战/面试上中下三大部分,带你从零基础入门到熟练掌握ActiveMQ,能够结合Spring/SpringBoot进行实际开发配置并能够进行MQ多节点集群的部署,可以学习到MQ的高级...

    ActiveMQ.rar

    n 四:用ActiveMQ构建应用 包括:多种启动Broker的方法、单独应用的开发、结合Spring的开发等 n 五:ActiveMQ的Transport 包括:多种传输协议的功能、配置和使用 六: ActiveMQ的消息存储 包括:队列和topic、KahaDB...

    activemq的windowns编译库、centos7编译库和mac编译库(含头文件和库文件)

    ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。 特点: 支持来自Java,C,C ++,C#,Ruby,Perl,...

    ActiveMQ消息服务器.rar

    ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。 ActiveMQ的特点: 支持来自Java,C,C ++,C#,Ruby,...

    消息队列学习(springboot+kafka+activemq)

    启动应用即可测试,可帮助快速了解kafka、activemq 两者在 Queue topic producer consumer 使用异同点,demo仅仅是最简化代码,演示通信和使用,无法对两者的集群模式进行测试,如果有对集群模式有兴趣,可自行扩展...

    ActiveMQ从入门到精通(一)

    本文来自于jianshu,本篇文章介绍了JMS、...andPub/Sub)、与Spring整合、ActiveMQ集群、监控与配置优化等。话不多说,我们来一起瞧一瞧!首先来说较早以前,也就是没有JMS的那个时候,很多应用系统存在一些缺陷

    ActiveMQ的技术详解(高级).doc

    ActiveMQ是一个开源的,实现了JMS1.1规范的面向消息(MOM)中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。ActiveMQ使用Apache提供的授权,任何人都可以对其实现代码进行修改。编写客户端...

    P2P网络借贷平台项目SSH+Redis+ActiveMQ+POI+Shiro+AngularJS+Nginx+Quartz等

    1、项目整体采用Maven分模块构建、SVN版本控制、PowerDesigner数据建模,基于约定标准的项目工程结构,同时... 11、项目部署采用tomcat+Nginx的集群部署方式,在部署过程中可以深刻体会到nginx在负载均衡中发挥的优势。

    spring-boot-artemis-clustered-topic:一个示例项目,以集群模式使用主题(发布-订阅)演示通过Apache ActiveMQ Artemis 2.4.0在两个spring boot应用程序的生产者和使用者之间的异步通信。

    Spring Boot Artemis集群主题一个示例项目,通过集群模式下的topic (发布-订阅),通过Apache ActiveMQ Artemis 2.4.0演示了两个Spring Boot应用程序生产者和使用者之间的异步通信。介绍Apache ActiveMQ Artemis是...

Global site tag (gtag.js) - Google Analytics