RabbitMQ在CentOS7安装


安装

下载rpm安装:

1
2
#rpm -ihv erlang-18.1-1.el6.x86_64.rpm
#rpm -ihv rabbitmq-server-3.5.6-1.noarch.rpm

默认配置启动:
#systemctl start rabbitmq-server

如果超时,或无响应:
那么只需要修改一下hosts文件,增加你的主机名,注意,比如你的主机叫 demo.woplus,在hosts中配了 127.0.0.1 demo.woplus 也不行,你需要在hosts中再加一个 127.0.0.1 demo。

配置

第一件事要创建用户,因为缺省的guest/guest用户只能在本地登录,所以先用命令行创建一个admin/admin123,并让他成为管理员。

1
2
3
4
[root@master ~]# rabbitmqctl add_user admin admin123
Creating user "admin" ...
[root@master ~]# rabbitmqctl set_user_tags admin administrator
Setting tags for user "admin" to [administrator] ...

启动

启用WEB管理:

1
2
3
4
5
6
7
8
9
10
[root@master ~]# rabbitmq-plugins enable rabbitmq_management
The following plugins have been enabled:
mochiweb
webmachine
rabbitmq_web_dispatch
amqp_client
rabbitmq_management_agent
rabbitmq_management

Applying plugin configuration to rabbit@master... started 6 plugins.

浏览器访问:
http://IP:15672/
如果不能访问,检查是否被代理了。一切正常登录后:

配置权限,添加用户kaven:


查看已启动的插件服务:
rabbitmq-plugins list
带*的表示正在运行,可以看到STOMP还没启动。

启动stomp:
rabbitmq-plugins enable rabbitmq_web_stomp rabbitmq_stomp

查看端口启动情况:

1
2
3
4
5
6
7
8
9
[root@master ~]# netstat -na
Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address Foreign Address State
tcp 0 0 0.0.0.0:4369 0.0.0.0:* LISTEN
tcp 0 0 0.0.0.0:22 0.0.0.0:* LISTEN
tcp 0 0 0.0.0.0:15672 0.0.0.0:* LISTEN
tcp 0 0 127.0.0.1:25 0.0.0.0:* LISTEN
tcp 0 0 0.0.0.0:15674 0.0.0.0:* LISTEN
tcp 0 0 0.0.0.0:25672 0.0.0.0:* LISTEN

我们可以在15670端口访问web-stomp-examples,其中RabbitMQ运行在15672端口,stomp服务运行在15674端口。

发送消息

JavaClient实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package stomp;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
* Productor.java
* 功能:消息发送者
* @author ranying
* 2017年12月12日
*/
public class Productor {

private final static String QUEUE_NAME = "rabbitmq_queue";
private final static String EXCHANGE_NAME = "amq.fanout";

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("master");
factory.setUsername("kaven");
factory.setPassword("kaven123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

//广播一个广播
channel.exchangeDeclare(EXCHANGE_NAME, "fanout",true);
String routingKey = "rabbitmq_routingkey";
String message = "";
for (int i = 1; i < 10; i++) {
message = "{\"id\":" + i + ",\"name\":\"Welcome to RabbitMQ message push!\"}";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println("[x] Sent Message:" + message);
Thread.sleep(500);
}


// 声明一个队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//发送消息到队列中
for (int i = 1; i < 10; i++) {
message = "{\"id\":" + i + ",\"name\":\"RabbitMQ message queue push!\"}";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("[x] Sent Message:" + message);
Thread.sleep(500);
}

channel.close();
connection.close();
}


}

接收广播消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package stomp;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
/**
*
* Customer.java
* 功能:广播消息以接收者
* @author ranying
* 2017年12月12日
*/
public class Customer {
private final static String EXCHANGE_NAME = "amq.fanout";

public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("master");
factory.setUsername("kaven");
factory.setPassword("kaven123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

//订阅一个广播
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
String routingKey = "rabbitmq_routingkey";
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);

System.out.println("Customer Waiting Received publish messages");
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}


}
}

接收队列消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package stomp;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

/**
* CustomerQueue.java
* 功能:队列消息接收者
*
* @author ranying 2017年12月12日
*/
public class CustomerQueue {
private final static String QUEUE_NAME = "rabbitmq_queue";

/**
* @param args
* @throws TimeoutException
* @throws IOException
*/
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("master");
factory.setUsername("kaven");
factory.setPassword("kaven123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明要关注的队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println("Customer Waiting Received queue messages");

//DefaultConsumer类实现了Consumer接口,通过传入一个频道,
// 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
Consumer consumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Customer Received '" + message + "'");
}
};

//自动回复队列应答 -- RabbitMQ中的消息确认机制
channel.basicConsume(QUEUE_NAME, true, consumer);

}

}


文章作者: KavenRan
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 KavenRan !
 上一篇
基于RabbitMQ实现异步消息通知处理 基于RabbitMQ实现异步消息通知处理
业务场景一个简单的业务场景如:某web页面存在多个相互关联的异步获取数据展示的区域,如何优雅的实现一个区域的数据更新时异步通知其它区域进行数据刷新? 图1 业务场景 当列表数据操作状态变更时,让上面统计区域自动更新。常见实现方式是,在“
2017-12-12
下一篇 
基于Postman进行高效API开发及自动化测试 基于Postman进行高效API开发及自动化测试
POSTMAN A powerful GUI platform to make your API development faster & easier, from building API requests through t
2017-08-10
  目录