RabbitMQ基础(一)——基本概念和HelloWorld

高新技术,RabbitMQ

2018-01-18

171

0

目录


基本概念

RabbitMQ是一个消息代理,是一个erlang开发的AMQP(Advanced Message Queue )的开源实现。
 
RabbitMQ是轻量级的,易于部署在premises和云中。它支持多种消息传递协议。RabbitMQ可以部署在分布式和联合配置中,以满足高级别、高可用性需求。
 
其主要思想非常简单:它接受并转发消息。你可以把它想象成邮局:当你把邮件寄到邮箱时,你很确定邮差先生最终会把邮件寄给你的收件人。使用这个比喻,RabbitMQ是一个邮筒,一个邮局和一个邮差。
RabbitMQ与邮局的主要区别在于,它不处理纸张,而是接受、存储和转发二进制数据。
 
官网地址: http://www.rabbitmq.com

AMQP

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有 RabbitMQ等。

目标是实现一种在全行业广泛使用的标准消息中间件技术,以便降低企业和系统集成的开销,并且向大众提供工业级的集成服务。通过AMQP,让消息中间件的能力最终被网络本身所具有,并且通过消息中间件的广泛使用发展出一系列有用的应用程序。

  • Broker: 中间件。接收和分发消息的应用,RabbitMQ Server就是Message Broker。
  • Virtual host: 虚拟主机。出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等。
  • Connection: 连接。publisher/consumer和broker之间的TCP连接。断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障或broker服务出现问题。
  • Channel: 渠道。如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。
  • Exchange: 路由。message到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。
  • Queue: 队列。消息最终被送到这里等待consumer取走。一个message可以被同时拷贝到多个queue中。
  • Binding: 绑定。exchange和queue之间的虚拟连接,binding中可以包含routing key。Binding信息被保存到exchange中的查询表中,用于message的分发依据。

RabbitMQ术语

Producter

即生产者。Producing就是发送,发送消息的程序是生产者(Producter)。用P表示,如下图:

Exchange

交换器,RabbitMQ中,其实消息不会直接相队列发送,而是发送给交换器,然后交换器在按照一定的规则转发给不同的队列。交换器做的事情非常简单:一方面,它接收来自生产者的消息,另一边则将消息推送到队列中。交换必须知道如何处理它接收到的消息。是否应该附加到特定的队列?是否应该附加到许多队列?或者应该被抛弃。这些规则由交换类型(exchange type)定义。

Exchange Type

交换器类型,在创建交换器时指定,用于区分交换器的不同作用,实现不同的功能。RabbitMQ定义了四种交换器类型:direct、topic、headers、fanout,每种类型都有特定的应用场景(见后续文章的详细介绍)。

direct:bindingKey和routingKey进行精确匹配,适用于精确将消息发送给指定队列;

topic:bindingKey和routingKey可以进行模糊匹配,通过使用通配符"*"和"#"分别来模糊匹配一个单词和多个单词;适用于将消息按照一定的规则发送到匹配的一个或多个队列;

fanout:广播,这种交换器可以将消息广播给所有订阅的交换器;

header:不常用,有兴趣的话可以自行了解。

Queue

即队列,队列是邮箱的名称,它处于RabbitMQ内部。尽管消息流通过RabbitMQ和您的应用程序,但它们只能存储在队列中。队列不受任何限制,它可以根据你的需要存储尽可能多的消息——它本质上是一个无限的缓冲区。许多生产者都可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。队列上有它的名称,如下图表示:

Consumer

即消费者,Consuming跟receiving的含义类似。Consumer通常为等待接收消息的应用程序 。注意,生产者、消费者和消息代理不需要处于同一台主机上,事实上,在大多数应用场景都是如此。

下载和安装

官方地址:http://www.rabbitmq.com/download.html

这里我下载windows的安装版本(http://www.rabbitmq.com/install-windows.html),在这之前,首先需要下载并按照erlang语言安装包。

由于我下载的是3.7.2版本的RabbitMQ Server,所以erlang语言需要19.3到20.2.x之间的版本。下载地址:http://www.erlang.org/downloads,按照步骤一步步安装即可。

erlang安装完成后,可以继续按照RabbitMQ server了,同样是一步步安装,较为简单,不在赘述。

按照完成后,开始菜单多了几个选项:

除了命令行工具外,服务的启动、停止、重新安装、移除都提供了响应的工具。在windows服务中,同样多了RabbitMQ的服务,同样通过服务进行启停。

Java客户端的Hello World

接下来,我们来用Java实现两个程序,一个发送简单消息,另一个负责接收消息并将它们打印到控制台上。这里我们暂时不深究Java API的细枝末节,仅仅简单了解RabbitMQ是如何工作的。
 
在下图中,P是我们的生产者,C是我们的消费者。中间的框是一个队列——RabbitMQ代表消费者保存的消息缓冲区。
首先,我们需要引入RabbitMQ的java客户端程序,为了方便,我使用Maven构建,只需引入如下依赖:

groupId:com.rabbitmq

artifactId:amqp-client

version:4.0.3

 
接下来,我们创建两个类,一个为Sender.java,表示消息发送者,另一个为Receiver.java,表示消息接收者。
Sender.java:
package com.belonk.rmq.helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Created by sun on 2018/1/17.
 *
 * @author sunfuchang03@126.com
 * @version 1.0
 * @since 1.0
 */
public class Sender {
    //~ Static fields/initializers =====================================================================================

    /**
     * 队列名称
     */
    public static final String QUEUE_NAME = "helloworld";
    //~ Instance fields ================================================================================================


    //~ Constructors ===================================================================================================


    //~ Methods ========================================================================================================
    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 连接到本地server
        connectionFactory.setHost("localhost");
        // 创建连接
        Connection connection = connectionFactory.newConnection();
        // 创建通道,API通过通道完成相关任务
        Channel channel = connection.createChannel();
        // 创建队列,该队列非持久(服务器重启后依然存在)、非独占(非仅用于此链接)、非自动删除(服务器将不再使用的队列删除)
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String msg = "hello, rabbit mq. 你好,rabbit mq.";
        // 发布消息
        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes("utf-8"));
        System.out.println("Sent \"" + msg + "\".");;
        channel.close();
        connection.close();
    }
}

Receiver.java:

package com.belonk.rmq.helloworld;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * Created by sun on 2018/1/17.
 *
 * @author sunfuchang03@126.com
 * @version 1.0
 * @since 1.0
 */
public class Receiver {
    //~ Static fields/initializers =====================================================================================


    //~ Instance fields ================================================================================================


    //~ Constructors ===================================================================================================


    //~ Methods ========================================================================================================

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 连接到本地server
        connectionFactory.setHost("localhost");
        // 创建连接
        Connection connection = connectionFactory.newConnection();
        // 创建通道,API通过通道完成相关任务
        Channel channel = connection.createChannel();
        // 创建队列
        channel.queueDeclare(Sender.QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        // 创建消费者,阻塞接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag : " + consumerTag);
                System.out.println("exchange : " + envelope.getExchange());
                System.out.println("routing key : " + envelope.getRoutingKey());
                String msg = new String(body, "utf-8");
                System.out.println("Received \"" + msg + "\".");
            }
        };
        channel.basicConsume(Sender.QUEUE_NAME, consumer);
//        channel.close();
//        connection.close();
    }
}

 首先运行Receiver.java,可以看到程序阻塞等待接收消息;然后运行Sender.java,此时发送者发送出一条消息,Receiver控制已经成功接收消息并且已经打印出来。

注意:由于RabbitMQ消息为byte[]数组,在消息传递时注意中文乱码问题,这里将发送消息和接收消息都转为UTF-8编码。

查询消息队列和数量

这里间简单使用命令行工具,使用rabbitmqctl.bat list_queues命令,如下图:

命令行工具路径如图所示。

总结

本文仅按照官方的java指南,做了初步的消息发送和接收程序,对RabbitMQ的工作机制有了初步的认识。不必对具体细节太过纠结,接下来我们来一步步深入学习。


前一篇:spring-test单元测试异常No qualifying bean of type javax.servlet.http.HttpServletRequest
后一篇:RabbitMQ基础(二)——工作队列Work Queues

belonk

轻轻地我走了,正如我轻轻地来,我挥一挥衣袖,不带走一片云彩