×
新网 > 虚机资讯 > 正文

Java消息中间件之ActiveMQ

摘要: 大型系统的演变必然的发展方向是分布式,而在分布式系统中应用与应用之间互相连接越来越紧密,在应用之间的消息传递就很普遍了。使用Java消息中间件处理异步消息成为了分布式系统中的必修课,本博客就如何在Java中使用消息中间件进行详细说明。

一、消息中间件

1.消息中间件概述

中间件:非底层操作系统软件,非业务应用软件,不是直接给最终用户使用的,不能直接给客户带来价值的软件统称为中间件。 消息中间件:关注与数据的发送和接受,利用高效可靠的异步消息传递机制集成分布式系统。 JMS:Java消息服务(Java Message Service)即JMS,是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。 AMQP:AMQP(advanced message queuing protocol)是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。 JMS和AMQP对比

1802261403451218620997 (1).jpg

2.消息中间件图示

 

3.常见消息中间件对比

3.1.ActiveMQ

(1)概述:ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ是一个完全支持JMS 1.1和J2EE 1.4规范的,JMS Provider实现,尽管JMS规范出台已经很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

(2)特性:

多种语言和协议编写客户端。语言:Java、C、C++、C#、Ruby、Perl、Python、PHP。应用协议:OpenWire、Stomp REST、WS、Notification、XMPP、AMQP 完全支持JMS 1.1和J2EE规范(持久化,XA消息,事务) 虚拟主题,组合目的,镜像队列

3.2.RabbitMQ

(1)概述:RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写。用于分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

(2)特性:

支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript等 AMQP的完整实现(vhost虚拟主机、Exchange交换器、Binding绑定、Routing Key路由器等) 事务支持/发布确认 消息持久化

3.3.Kafka

(1)概述:Kafka是一种高吞吐量的分布式发布订阅消息系统,是一个分布式的、分区的、可靠的分布式日志存储服务。它通过一种独一无二的设计提供了一个消息系统的功能。

(2)特性:

通过O(1)的算法复杂度的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒钟数百万的消息 Partition、Consumer Group

3.4.综合评价

二、JMS

1.JMS规范

(1)JMS相关概念

提供者:实现JMS规范的消息中间件服务器 客户端:发送或接收消息的应用程序 生产者/发布者:创建并发送消息的客户端 消费者/订阅者:接收并处理消息的客户端 消息:应用程序之间传递的数据内容 消息模式:在客户端之间传递消息的方式,JMS中定义了主题和队列两种模式

(2)JMS消息模式

A.队列模型

客户端包括生产者和消费者 队列中的消息只能被一个消费者消费 消费者可以随时消费队列中的消息

队列模型消息示意图:

 

B.主题模型

客户端包括发布者和订阅者 主题中的消息被所有订阅者消费 消费者不能消费订阅之前就发送到主题中的消息,即只能先订阅才能消费

主题模型消息示意图:

(3)JMS编码接口

ConnectionFactory 用于创建连接到消息中间件的链接工厂 Connection 代表了应用程序和消息服务器之间的通信链路 Destination 指消息发布和接收的地点,包括队列或主题 Session 表示一个单线程的上下文,用于发送和接收消息 MessageConsumer 由会话创建,用于接收发送到目标的消息 MessageProducer 由会话创建,用于发送消息到目标 Message 是在消费者和生产者之间传送的对象,消息头,一组消息属性,一个消息体

5.JMS编程接口之间的关系

2.ActiveMQ的安装与启动

(1)在Windows平台安装ActiveMQ

下载安装包 http://activemq.apache.org/ 直接启动,以管理员身份运行启动你电脑对应版本的activemq.bat,即可成功启动,比如,我的是64位的,即选择如下图中的win64文件夹下的activemq.bat,或者安装成Windows服务,以服务方式启动,记住使用以管理员身份运行。
使用服务启动,我习惯设置为手动启动,你也可以设置为自动,即在开机时就启动

 

无论你是如何启动,启动成功后,使用浏览器访问127.0.0.1:8161就可以访问ActiveMQ的管理页面,点击Manage ActiveMQ broker,出现弹框输入用户名密码,都默认为admin,然后就可以进入管理主页了!

 

(2)在Linux平台安装ActiveMQ

下载安装包 wget http://mirror.bit.edu.cn/apache//activemq/5.15.2/apache-activemq-5.15.2-bin.tar.gz 解压安装包 tar -zxvf apache-activemq-5.15.2-bin.tar.gz 启动 解压完成后,进入解压后产生的目录,使用 ./activemq start 启动,可以访问linux服务器的ip地址加端口8161,能进入管理页面即成功安装并启动,使用./activemq stop 关闭。

3.JMS的代码实现

3.1.使用JMS接口规范连接ActiveMQ

创建生产者 创建发布者 创建消费者 创建订阅者

具体代码实现可参考本人码云项目:https://gitee.com/kevinshaw/jms-demo.git

3.2.使用Spring集成JMS连接ActiveMQ

(1)ConnectionFactory 用于管理连接的连接工厂

一个Spring为我们提供的连接池 JmsTemplate每次发消息都会重新创建连接,会话和productor Spring中提供SingleConnectionFactory和CachingConnectionFactory,SingleConnectionFactory对于建立连接请求只会返回同一个Connection,并且使用同一个close方法;CachingConnectionFactory继承自SingleConnectionFactory,所有拥有SingleConnectionFactory的所有功能,而且新增缓存功能,可以缓存会话,Producer,Consumer

(2)JmsTemplate 用于发送和接收消息的模板类

是Spring提供的,只需要向Spring容器内注册这个类就可以使用JmsTemplate方便的操作jms JmsTemplate类是线程安全的,可以在整个应用范围使用

(3)MessageListerner 消息监听器

实现一个onMessage方法,该方法只接收一个Message参数

具体代码实现可参考本人码云项目:https://gitee.com/kevinshaw/jms-demo.git

三、ActiveMQ集群配置

1.对消息中间件群集的原因:

实现高可用,以排除单点故障引起的服务中断 实现负载均衡,以提升效率为更多客户提供服务

2.ActiveMQ集群基础知识

(1)集群方式

客户端集群:让多个消费者消费同一队列,在队列模式下已经支持这种情况了,但是在主题模式下,多个消费者是消费了完整的消息,造成消息重复的可能; Broker clusters:多个Broker之间同步消息; Master Slave:实现高可用的一种方式,当主消息服务器宕机时,备服务器可以立即补充,以保证服务的继续。

(2)客户端集群配置

ActiveMQ失效转移(failover)

介绍:允许当其中一台服务器宕机时,客户端在传输层上重新连接到其他消息服务器。 语法:failover:(uri1,...,uriN)?transportOptions

transportOptions参数说明:

randomize 默认为true,表示在URI列表中选择URI连接时是否采用随机策略 initialReconnectDelay 默认10,单位毫秒,表示第一次尝试重连之间的等待的时间

(3)Broker Cluster集群配置

原理:当有两个节点A,B,节点A可以把消息同步到节点B,节点B也可以把消息同步到节点A,通过消息同步之后,节点A接收到的消息可以给节点B消费掉,同理,节点B接收到的消息可以给节点A消费掉。它的实现方式是采用的网络连接器的方式。

 

 

NetwrokConncetor(网络连接器):网络连接器主要用于配置ActiveMQ服务器与服务器之间的网络通讯方式,用于服务器透传消息,网络连接器分为静态连接器和动态连接器。

静态连接器:即在服务器ip地址上具体指定ip地址,通过以下配置,当服务器比较多的时候,使用静态连接就比较麻烦,这个时候就需要用到动态连接了

<networkConnectors>

<networkConnectors uri="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)"/>

</networkConnectors>

动态连接器:使用多播的方式通知其他服务器,配置如下,我们首先需要定义网络连接器和传输连接器,传输连接器会告知我们一个发现的uri地址,这个就是我们的主播地址,这样我们就可以达到动态扩展服务器的效果了。

<networkConnectors>

<networkConnectors uri="multicast://default"/>

</networkConnectors>

<transportConnectors>

<transportConnectors uri="tcp://localhost:0" discoveryUri="multicast://default"/>

</transportConnectors>

(4)Master/Slave集群配置

ActiveMQ Master Slave 集群方案:

Share nothing storage master/slave (已过时,5.8+后移除) Shared storage master/slave 共享存储 Replicated LevelDB Store 基于复制的LevelDB Store

共享存储集群的原理:我们把节点A,B的持久化配置到同一个地方,先启动节点A,此时节点A获取到排它锁,节点A独占资源成为Master,而节点B无法获得锁资源,节点B就成了Slave;节点A就获得了对外开放服务的能力,可以通过外部的客户端提交信息到节点A,但是不能发送消息到节点B;如果此时节点A挂了,那么节点B立即获得了持久化资源的排它锁,成为新的Master,接收到外部客户端,而客户端使用失效转移之后,将消息发送到节点B,这个时候完成整个请求的不间断性,完成了高可用

 

基于复制的LevelDB Store的原理:因为LevelDB是基于ZK的,所以它的服务器至少需要三台,假设我们有三个服务器节点A,B,C,每个节点都有自己的储存方式,它们都配置同一个ZooKeeper节点,通过ZK来选举一台服务器作为Master,比如选举节点A作为Master,这时节点A就具有对外部提供服务的能力,而节点B,C是不具备的,节点A获取了服务器的外部消息资源后,它首先在本地储存,然后通过ZK将消息同步给B,C,然后B,C分别在自己的服务器里储存,这就是基于复制LevelDB的方式,如果节点A出现故障,那么ZK会立即选举一个新的Master出来。

(5)两种集群方式对比

 

Master/Slave:它可以做到高可用,当一台服务器挂了,其他服务器可以立即补充上去,并且保证了消息不会丢失,但做不了负载均衡,因为Slave不具备外部服务器提供服务的能力;

Broker Cluster:它不具备高可用的能力,因为它自己的消息并没有在一个地方储存,也就是说当一台服务器挂了的时候,它正在处理的消息可能会同步丢失,但是它可以做到负载均衡,即节点A,B上的消息可以互相消费。

(6)三台服务器的完美集群方案(既实现高可用,又实现负载均衡)

因为基于复制LevelDB的方式至少需要三台服务器,所以这里我们使用共享持久化资源方式;我们首先将节点A,B组成消息同步,然后将节点A,C也组成消息同步;节点B,C组成Master/Slave,然后按顺序启动A,B,C。这个方案能够实现节点A,B,C任意一台服务器宕机时,对整个集群不受影响,仍然可以正常工作,但是需要尽快恢复宕机服务器的问题,如果A,B同时宕机,整个集群就会崩溃了。所以如果要提高稳定性,可以增加服务器数量。

 

四、使用其他消息中间件

前面主要介绍了ActiveMQ,其最大优点是支持JMS,JMS让Java的开发变得简单,但其在各方面表现都比较中庸,也存在自己的问题,比如吞吐量没有Kafka高,稳定性没有RabbitMQ强。

1.企业开发需要解决的问题

不同业务系统分别处理同一消息,同一业务系统负载处理同类消息 解决消息发送时的一致性问题 解决消息处理时的幂等性问题 基于消息机制建立事件总线

2.使用其他消息中间件时,分析需要做的事

解决各业务系统集群处理同一条消息 实现自己的消息提供者

3.RabbitMQ:使用交换器绑定到队列

 

RabbitMQ消息提供者源码解析:

 

3.集成Kafka

Kafka使用group.id分组消费者

配置消费者参数group.id相同时对消息进行负载处理 配置服务器partitions参数,控制同一个group.id下的consumer数量小于partitions Kafka只保证同一个partition下的消息是有序的

Kafka消息提供者源码解析:


 

 

  • 相关专题

免责声明:本文内容由互联网用户自发贡献自行上传,本网站不拥有所有权,也不承认相关法律责任。如果您发现本社区中有涉嫌抄袭的内容,请发送邮件至:operations@xinnet.com进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。

免费咨询获取折扣

Loading