RabbitMQ基础(二)——工作队列Work Queues

高新技术,RabbitMQ

2018-01-24

137

0

目录


上一篇,我们编写了从一个指定的队列发送和接收消息的程序。在本文中,我们将创建一个工作队列,用于将耗时的任务分配给多个工作人员。
 
Work Queues,称为工作队列(也称Task Queues,任务队列),其主旨在于避免立即执行资源密集型任务,并且必须等待它完成。相反,我们把任务安排在后来执行。对此,我们将任务封装为消息并将其发送到队列中。在后台运行的工作进程会获取任务并最终执行任务。当你运行许多工作进程时,任务将在他们之间共享。

轮询调度

使用任务队列的优点之一是能够轻松地并行工作。如果我们积累了大量的工作,我们就可以增加更多的任务消费者,这样就可以很容易地扩大规模。默认情况下,RabbitMQ将依次将每个消息发送给下一个使用者。平均每个消费者将得到相同数量的消息,这种分发消息的方式称为轮询。

消息确认

完成一项任务可能需要几秒钟。你可能会想,如果其中一个消费者开始了一项很长的任务,并且只完成了部分任务,会发生什么。使用我们当前的代码,一旦RabbitMQ向客户传递消息,它立即标记为删除。在这种情况下,如果你关闭一个工作进程,我们将失去它正在处理的信息。我们也将丢失所有发送给这个特定工作进程并且还未处理消息。
 
通常,我们希望一个工作进程挂掉后,将其任务交给其他工作进程完成。为了保证消息不丢失,RabbitMQ支持消息确认机制:当消费者处理完消息后,其反馈给RabbitMQ,表明消息已经被接收和处理,RabbitMQ可以自由删除该消息。
 
如果一个消费者挂掉(其通道关闭,连接关闭,或者TCP连接丢失),而没有发送ack,RabbitMQ将会知道一条消息没有被完全处理,并且将重新排队。如果同时有其他的消费者在线,那么它将很快把消息重新交给另一个消费者处理,这样就保证不会丢失任何信息。
 
消息不会超时:当消费者进程挂掉时,RabbitMQ将重新传递消息,即使处理消息需要很长时间。
 
RabbitMQ默认是开启手动消息确认的,我们可以通过autoAck=true标记明确地关闭,即采用系统自动确认消息。如果需要手动确认消息,那么将autoAck设置为false,一旦我们完成了任务,需要向工作进程发出确认消息,代码如下:
 
// 处理完成,手动接收消息时,需要在处理成功后进行反馈,保证消息不丢失
channel.basicAck(envelope.getDeliveryTag(), false);
示例的部分关键代码如下:
NewTask.java
    /**
     * 发布5条消息,每条消息的一个“.”表示消息执行时间需要1秒。
     *
     * @param args
     * @throws IOException
     * @throws TimeoutException
     */
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        String[] msgs = new String[]{
                "First Message.", // 1s
                "Second Message..", // 2s
                "Third Message...", // 3s
                "Fourth Message....", // 4s
                "Fifth Message....." // 5s
        };

        Channel channel = connection.createChannel();
        channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);

        for (String msg : msgs) {
            // 发布消息
            channel.basicPublish("", TASK_QUEUE_NAME, null, msg.getBytes("utf-8"));
            System.out.println("[x] Sent '" + msg + "'");
        }
        channel.close();
        connection.close();
    }
Woker.java
    /**
     * 启动三个Worker进程。
     * <p>
     * 任务发布者每发送一个"."号,线程睡眠1秒,模拟工作时间。
     * <p>
     * 可以看到各个Worker轮询执行任务。
     *
     * @param args 参数
     * @throws IOException
     * @throws TimeoutException
     */
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(NewTask.TASK_QUEUE_NAME, false, false, false, null);
        // 创建消费者,阻塞接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("Received \"" + msg + "\".");
                try {
                    doWork(msg);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(" [x] Done");
                    // 处理完成,手动接收消息时,需要在处理成功后进行反馈,保证消息不丢失
                    // 如果消费者工作进程挂掉,自动分给其他消费者
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        // 自动接收消息并处理,接收后,立即将消息标记为删除,如果工作进程挂掉,者其未处理和处理中的消息都将丢失
        // boolean autoAck = true; // acknowledgment is covered below
        // 手动接收消息,处理完成后调用channel.basicAck()反馈给RabbitMQ
        boolean autoAck = false;
        channel.basicConsume(NewTask.TASK_QUEUE_NAME, autoAck, consumer);
    }

    private static void doWork(String task) throws InterruptedException {
        System.out.println("Doing some works.");
        for (char ch : task.toCharArray()) {
            // 模拟工作时间消耗
            if (ch == '.') {
                Thread.sleep(1000);
            }
        }
    }

 

NewTask为任务发布者,即生产者,Woker为工作者,即消费者。

启动两个Woker,然后运行NewTask,发送5条消息,结果可以看到消息进行了轮询分发:
 
worker1:First Message、Third Message、Fifth Message
worker2:Second Message、Fourth Message
然后关闭各个进程,重新启动三个Woker,然后运行NewTask,发送5条消息,然后快速关闭woker1,结果可以看到5条消息都被成功处理,原本该work1处理的消息被转发到了其他两个woker上,说明消息确认机制保证了消息不丢失。

忘记确认消息

开发者经常会忘记设置basicAck,这样一个简单的错误,带来的后果确非常严重。如果客户端退出,RabbitMQ仍然会重新分发消息(看起来像随机分发),因为他没有收到处理成功反馈。由于RabbitMQ不能释放这些未收到反馈信息的消息,它会消耗越来越多的内存,导致性能问题设置RabbitMQ挂掉。
 
该问题的解决办法是,使用rabbitmqctl工具打印messages_unacknowledged字段内容,来进行调试:
linux上命令为:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
windows上为:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
接着前一节的Woker和NewTask例子,注释掉channel.basicAck(envelope.getDeliveryTag(), false);代码后,看看程序运行情况:
开三个工作客户端Woker,然后多次执行NewTask分发5条消息,可以看到,由于处理成功后客户端没有进行反馈,客户端会收到重复的消息,通过rabbitmqctl查看未确认的消息数量,结果如下:
D:\soft\RabbitMQ Server\rabbitmq_server-3.7.2\sbin>rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
helloworld 2 0
hello 0 13
可以看到当前队列hello里边还有13条消息,随着NewTask重复执行,消息会越来越多。这表明,如果没有收到Woker的消息处理成功确认,RabbitMQ队列中会一直保留消息(因为不知道消息是否成功处理),造成内存资源浪费。
 
当我们开启消息反馈后(取消注释channel.basicAck()),然后在运行一个客户端,可以看到它将会接收所有剩余的消息,并且队列中的消息数量会递减为0,消息最终被该客户端一一成功处理。

消息和队列持久化

我们已经知道,即使消费者挂掉,如何保证消息不丢失。但是,如果RabbitMQ挂掉了,我们的消息仍然会丢失。此时应该这么处理?
 
当RabbitMQ退出或者挂掉,所有的队列和消息数据将会丢失,除非我们进行持久化设置。我们需要做两件事情:
首先,在创建队列的时候将其设置为持久化:
boolean durable = true;
channel.queueDeclare("durable_queue", durable, false, false, null);
前边我们已经创建过hello队列了,对于已经存在的队列,RabbitMQ不允许重新设置,否则会抛出ShutdownSignalException异常,告诉你设置的参数与已有的参数不一致,可以给队列命名新的名称来解决。
 
boolean durable = true;
channel.queueDeclare("durable_queue", durable, false, false, null);
第二,在发布消息时,需要设置消息持久化参数:
// 设置消息持久化参数
AMQP.BasicProperties props = MessageProperties.PERSISTENT_TEXT_PLAIN;
// 发布消息
channel.basicPublish("", TASK_QUEUE_NAME, props, msg.getBytes("utf-8"));
System.out.println("[x] Sent '" + msg + "'");
模拟是否成功持久化的过程很简单,不启动消费者,直接启动生产者发布消息,然后重启RabbitMQ,然后再通过命令行工具rabbitmqctl查询队列和消息数量,就可以验证队列和未被成功处理消息是否已经持久化,这里不再赘述。

注意

将消息标记为持久化并不能完全保证消息不丢失,尽管RabbitMQ会保存消息到磁盘上,但是仍然有可能RabbitMQ接收了消息但是还没来得及保存。并且,RabbitMQ并不会对每一条消息进行fsync,它可能仅仅保存消息到缓存中而不是写磁盘。尽管持久化担保并不强大,但是对于简单队列已经足够了,如果还需要更强大的保障机制,可以使用 publisher confirms

消息公平分发

有时,消息分发可能并未按照我们所想,例如,有两个工作进程,当奇数任务耗时较长,而偶数任务耗时较短,那么一个工作进程将非常繁忙而另一个则会很闲。其实,RabbitMQ并不知道这些情况,仍然平衡的分发消息。这是因为RabbitMQ在消息进入队列时才会发送消息,它只是盲目地将第几条消息发送给第几个用户,而不会考虑消费者未确认的消息的数量,如图所示:
其实,我们可以调用basicQos方法来设置prefetchCount参数。这意味着RabbitMQ一次不会给一个消费者一条以上的信息。换言之,RabbitMQ不会向消费者发送新的消息,直到它处理并确认了之前的消息。相反,它会把消息发送给下一个不太忙的消费者。代码如下:
int prefetchCount = 1;
channel.basicQos(prefetchCount);
同样用上边的Woker和NewTask的例子,启动两个Worker,启动NewTask发送5条消息。如果不设置prefetchCount,那么应该的消息执行顺序为:
worker1:First Message、Third Message、Fifth Message
worker2:Second Message、Fourth Message
为了验证分发的公平性,这里我修改了NewTask的模拟的消息执行时间(通过修改“.”的数量,每一个“.”表示执行1秒)如下:
String[] msgs = new String[]{
                "First Message.", // 1s
                "Second Message.", // 1s
                "Third Message...", // 3s
                "Fourth Message.", // 1s
                "Fifth Message." // 1s
        };

再次执行后可以看到,执行结果并不是按照轮询的机制,而是RabbitMQ根据任务执行时间进行了公平的分发。

总结

  • RabbitMQ默认采用轮询的方式进行消息分发
  • 如果消费者设置autoAck=true,则消费者自动接收并处理消息,并将消息标记为删除,可能造成未处理和处理中的消息丢失
  • 如果消费者设置autoAck=false(默认),必须在消息处理成功后手动反馈,即设置channel.basicAck
  • 通过控制台工具rabbitmqctl可以查询队列和队列中的消息
  • 默认情况消息未开启持久化,要开启持久化,必须在创建队列时设置持久化,并且生产者发布消息时设置持久化类型
  • 消息标记为持久化并不能完全保证消息不丢失
  • 轮询的消息分发机制并不公平,可以通过设置channel.basicQos来保证消息未处理完成前不在分发新的消息给当前消费者

示例代码见github.


前一篇:RabbitMQ基础(一)——基本概念和HelloWorld
后一篇:MySQL中批量插入和批量更新

belonk

轻轻地我走了,正如我轻轻地来,我挥一挥衣袖,不带走一片云彩