×
新网 > 虚机资讯 > 正文

JAVA多线程测试MQ性能步骤以及代码

1.Windows下安装RabbitMQ需要以下几个步骤    (1):下载erlang,原因在于RabbitMQ服务端代码是使用并发式语言erlang编写的,双击.exe文件进行安装就好,安装完成之后创建一个名为ERLANG_HOME的环境变量,其值指向erlang的安装目录,同时将%ERLANG_HOME%bin加入到Path中,最后打开命令行,输入erl....

1.Windows下安装RabbitMQ需要以下几个步骤

 

002UASMrzy7605pjKJv15&690.jpg

(1):下载erlang,原因在于RabbitMQ服务端代码是使用并发式语言erlang编写的,双击.exe文件进行安装就好,安装完成之后创建一个名为ERLANG_HOME的环境变量,其值指向erlang的安装目录,同时将%ERLANG_HOME%bin加入到Path中,最后打开命令行,输入erl,如果出现erlang的版本信息就表示erlang语言环境安装成功;

(2):下载RabbitMQ,下载地址:http://www.rabbitmq.com/,同样双击.exe进行安装就好(这里需要注意一点,默认的安装目录是C:/ProgramFiles/....,这个目录中是存在空格符的,我们需要改变安装目录,貌似RabbitMQ安装目录中是不允许有空格的,我之前踩过这个大坑);

(3):安装RabbitMQ-Plugins,这个相当于是一个管理界面,方便我们在浏览器界面查看RabbitMQ各个消息队列以及exchange的工作情况,安装方法是:打开命令行cd进入rabbitmq的sbin目录(我的目录是:E:softwarerabbitmqrabbitmq_server-3.6.5sbin),输入:rabbitmq-pluginsenablerabbitmq_management命令,稍等会会发现出现plugins安装成功的提示,默认是安装6个插件,如果你在安装插件的过程中出现了下面的错误:

 

解决方法是:首先在命令行输入:rabbitmq-servicestop,接着输入rabbitmq-serviceremove,再接着输入rabbitmq-serviceinstall,接着输入rabbitmq-servicestart,最后重新输入rabbitmq-pluginsenablerabbitmq_management试试,我是这样解决的;

(4):插件安装完之后,看到下面界面,输入用户名:guest,密码:guest你就可以进入管理界面,当然用户名密码你都可以变的;

 

2.安装完RabbitMQ之后,我们先来简单了解下RabbitMQ中涉及到的几个概念

producer:消息生产者

consumer:消息消费者

virtualhost:虚拟主机,在RabbitMQ中,用户只能在虚拟主机的层面上进行一些权限设置,比如我可以访问哪些队列,我可以处理哪些请求等等;

broker:消息转发者,也就是我们RabbitMQ服务端充当的功能了,那么消息是按照什么规则进行转发的呢?需要用到下面几个概念;

exchange:交换机,他是和producer直接进行打交道的,有点类似于路由器的功能,主要就是进行转发操作的呗,那么producer到底用哪个exchange进行路由呢?这个取决于routingkey(路由键),每个消息都有这个键,我们也可以自己设定,其实就是一字符串;

queue:消息队列,用于存放消息,他接收exchange路由过来的消息,我们可以对队列内容进行持久化操作,那么queue到底接收那个exchange路由的消息呢?这个时候就要用到bindingkey(绑定键)了,绑定键会将队列和exchange进行绑定,至于绑定方式,RabbitMQ提供了多种方式,大家可以看看鸿洋大神的RabbitMQ博客系列(点击查看);

以上就是RabbitMQ涉及到的一些概念了,用一张图表示这些概念之间的关系就是:

 

3.RabbitMQ简单使用

producer(生产者)端步骤:

(1):创建ConnectionFactory,并且设置一些参数,比如hostname,portNumber等等

(2):利用ConnectionFactory创建一个Connection连接

(3):利用Connection创建一个Channel通道

(4):创建queue并且和Channel进行绑定

(5):创建消息,并且发送到队列中

注意,在我们当前的例子中,并没有用到exchange交换机,RabbitMQ默认情况下是会创建一个空字符串名字的exchange的,如果我们没有创建自己的exchange的话,默认就是使用的这个exchange;

producer端代码:

 

 

packagecom.mq;
importjava.io.IOException;

importcom.rabbitmq.client.Channel;
importcom.rabbitmq.client.Connection;
importcom.rabbitmq.client.ConnectionFactory;
importcom.rabbitmq.client.MessageProperties;

publicclassSender{

privatestaticStringqueueName="rabbit";
privatestaticintk=0;
privatestaticdoublestartTime=0.0;
staticConnectionFactoryfactory=newConnectionFactory();
staticChannelchannel=null;
staticConnectionconnection=null;


static{
factory.setHost("");
factory.setVirtualHost("");
factory.setUsername("");
factory.setPassword("");
factory.setPort(5672);

try{
connection=factory.newConnection();
channel=connection.createChannel();
channel.queueDeclare(queueName,true,false,false,null);
}catch(IOExceptione){
//TODOAuto-generatedcatchblock
e.printStackTrace();
}


}
publicstaticvoidtt()throwsException{


for(inti=0;i<100;i++){
newThread(newRunnable(){

publicvoidrun(){
//TODOAuto-generatedmethodstub
for(intj=0;j<1000;j++){
//发送的消息
Stringmessage="{*********}"+++k;
//往队列中发出一条消息
try{
if(channel!=null){
channel.basicPublish("",queueName,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
}

}catch(IOExceptione){
//TODOAuto-generatedcatchblock
e.printStackTrace();
}



doublel=(double)((System.currentTimeMillis()-startTime)/1000);
intt=++k;


System.out.println("tps:"+t/l);

}


}
}).start();

}
}

publicstaticvoidmain(String[]args)throwsException{
startTime=System.currentTimeMillis();

Sender.tt();







}

}

consumer(消费者)端步骤:

(1):创建ConnectionFactory,并且设置一些参数,比如hostname,portNumber等等

(2):利用ConnectionFactory创建一个Connection连接

(3):利用Connection创建一个Channel通道

(4):将queue和Channel进行绑定,注意这里的queue名字要和前面producer创建的queue一致

(5):创建消费者Consumer来接收消息,同时将消费者和queue进行绑定

consumer端代码:

packagecom.mq;

importcom.rabbitmq.client.Channel;
importcom.rabbitmq.client.Connection;
importcom.rabbitmq.client.ConnectionFactory;
importcom.rabbitmq.client.QueueingConsumer;

publicclassConsumer{
privatestaticStringqueueName="rabbit";
staticinti=0;

publicstaticvoidmain(String[]args)throwsException{

ConnectionFactoryfactory=newConnectionFactory();
factory.setHost("*.*.*.*");
factory.setVirtualHost("");
factory.setUsername("");
factory.setPassword("");
factory.setPort(5672);
Connectionconnection=factory.newConnection();
Channelchannel=connection.createChannel();

//声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
channel.queueDeclare(queueName,true,false,false,null);
System.out.println(Consumer.class.hashCode()
+"[*]Waitingformessages.ToexitpressCTRL+C");

//创建队列消费者
QueueingConsumerconsumer=newQueueingConsumer(channel);

//设置最大服务消息接收数量
intprefetchCount=1;
channel.basicQos(prefetchCount);

booleanack=false;//是否自动确认消息被成功消费
channel.basicConsume(queueName,ack,consumer);//指定消费队列
longstartTime=System.currentTimeMillis();

while(true){
//nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
QueueingConsumer.Deliverydelivery=consumer.nextDelivery();
Stringmessage=newString(delivery.getBody());


System.out.println("[x]Received\'"+message+"\'");
intk=++i;
System.out.println(k);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
floatl=(float)(System.currentTimeMillis()-startTime)/1000;

System.out.println("tps:"+k/l);



}



}

}

 

 

 

 

  • 相关专题

免责声明:本文内容由互联网用户自发贡献自行上传,本网站不拥有所有权,也不承认相关法律责任。如果您发现本社区中有涉嫌抄袭的内容,请发送邮件至:operations@xinnet.com进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。

免费咨询获取折扣

Loading