RabbitMQ环境搭建

RabbitMQ环境搭建

本文从RabbitMQ的安装开始,由浅入深地学习消息队列。

RabbitMQ Server 的安装

RabbitMQ是用erlang语言开发的,所以要跑RabbitMQ,需要机器上有erlang环境,我们可以用kerl工具来很方便的安装erlang环境。

kerl源码及使用文档: https://github.com/kerl/kerl

curl -O https://raw.githubusercontent.com/kerl/kerl/master/kerl
chmod a+x kerl
./kerl build 20.3 20.3
./kerl install 20.3 path/to/your/installation/dir
. /Users/sanmiguel/kerl/19.2/activate
# build的时候把20.3换成要安装的版本号,然后耐心等待,可能需要一段时间...
# activate命令会把erl加入PATH,此时直接执行erl命令就能找到对应的程序,也可以手动将安装目录下的bin目录加入PATH

安装好依赖环境,接下来就是安装RabbitMQ Server,这里可以直接从官网下载对应系统的包解压出来就可以直接使用:

wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.9/rabbitmq-server-generic-unix-3.7.9.tar.xz
tar zxf rabbitmq-server-generic-unix-3.7.9.tar.xz

进入解压出来的目录,sbin目录下的rabbitmq-server可以直接执行,便启动了RabbitMQ,这时候就可以用你的程序连接RabbitMQ发消息、处理消息了。

命令行工具

仔细的同学可能注意到sbin目录下除了rabbitmq-server还提供了一些别的工具,这里简单做下介绍:

rabbitmqclt

节点相关操作:停止节点、节点状态、健康检查等

虚热主机管理

用户权限管理

查看queue、exchange、connnections、channel、consumer等

......

rabbitmq-plugins

rabbitmq-plugins用来管理插件,查看、启用、关闭插件,支持线上模式(节点运行中)和线下模式(节点重启生效)

rabbitmqadmin

基于RabbitMQ HTTP API的工具,提供了management UI提供的常见操作,依赖Python2.7.9(或以上版本),需要单独下载

RabbitMQ的配置

RabbitMQ默认的配置就可以满足大多开发测试的环境,如果是线上环境也可以通过配置文件等多种方式来修改配置,详细的配置项可以参考官方文档:http://www.rabbitmq.com/configure.html#config-items

PHP客户端

以『世界上最好的语言』PHP(不服来辩)为例,RabbitMQ的PHP客户端有多种,如:

  • php-amqplib:纯PHP, 全特性的RabbitMQ客户端
  • PECL AMQP library:基本 RabbitMQ C 客户端的扩展

php-amqplib是纯php实现,直接把代码加入自己的项目中即可使用,第二种是通过C扩展实现,基于RabbitMQ C客户端,所以要先安装好 RabbitMQ C ,再安装PHP的amqp扩展:

wget http://pecl.php.net/get/amqp-1.9.3.tgz -O amqp-1.9.3.tgz
tar zxf amqp-1.9.3.tgz
cd amqp-1.9.3
./configure --with-php-config=/path/to/your/php-config --with-amqp --with-librabbitmq-dir=/path/to/your/rabbitmq-c
make && make install

示例代码

生产者:

$conn_args = array(
'host' => '127.0.0.1',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
'vhost'=>'/'
);
$e_name = 'myex'; //交换机名
//$q_name = 'myq'; //无需队列名
$k_route = 'myr'; //路由key
//创建连接和channel
$conn = new AMQPConnection($conn_args);

if (!$conn->connect()) {
die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);
//创建交换机对象
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
//发送消息
//$channel->startTransaction(); //开始事务
for($i=0; $i<5; ++$i){
sleep(1);//休眠1秒
//消息内容
$message = "TEST MESSAGE!".date("h:i:sa");
echo "Send Message:".$ex->publish($message, $k_route)."\n";
}
//$channel->commitTransaction(); //提交事务
$conn->disconnect();

消费者:

$conn_args = array(
'host' => '127.0.0.1',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
'vhost'=>'/'
);
$e_name = 'myex'; //交换机名
$q_name = 'myq'; //队列名
$k_route = 'myr'; //路由key
//创建连接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);
//创建交换机
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$ex->setFlags(AMQP_DURABLE); //持久化
echo "Exchange Status:".$ex->declare()."\n";

//创建队列
$q = new AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE); //持久化
echo "Message Total:".$q->declare()."\n";
//绑定交换机与队列,并指定路由键
echo 'Queue Bind: '.$q->bind($e_name, $k_route)."\n";
//阻塞模式接收消息
echo "Message:\n";
while(true){
$q->consume('processMessage');
//$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答
}
$conn->disconnect();
/**
* 消费回调函数
* 处理消息
*/
function processMessage($envelope, $queue) {
$msg = $envelope->getBody();
echo $msg."\n"; //处理消息
$queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
}

更多内容,请移步: https://www.daemoncoder.com/


分享到:


相關文章: