Simple Queues 简单队列
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 package com.mmr.myrabbitmq.util.simple;import com.mmr.myrabbitmq.util.ConnectionUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Send { public static final String QUEUE_NAME = "test_simple_queue" ; public static void main (String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); String msg = "Hello Simple" ; channel.basicPublish("" , QUEUE_NAME, null , msg.getBytes()); channel.close(); connection.close(); } }
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 package com.mmr.myrabbitmq.util.simple;import com.mmr.myrabbitmq.util.ConnectionUtils;import com.rabbitmq.client.*;import java.io.IOException;import java.io.UnsupportedEncodingException;import java.util.concurrent.TimeoutException;public class Recv { public static final String QUEUE_NAME = "test_simple_queue" ; public static void main (String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); DefaultConsumer defaultConsumer = new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws UnsupportedEncodingException { String msg = new String (body, "UTF-8" ); System.out.println("收到了消息" + msg); } }; channel.basicConsume(QUEUE_NAME, true , defaultConsumer); } }
总结
简单队列的不足 :耦合性高,生产者——对应消费者(如果想有多个消费者 消费 队列中的消息,不能实现)
队列名变更,消费者需要同时变更
Work Queues 工作队列
为什么会出现工作队列
Simple 队列 是一一对应的,而实际生产环境中,生产者发消息是后不费力的,而消费者一般要跟业务相关联,处理是比较耗费时间的
轮询模式
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 package com.mmr.myrabbitmq.util.work;import com.mmr.myrabbitmq.util.ConnectionUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Send { public static final String QUEUE_NAME = "test_work_queue" ; public static void main (String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); for (int i = 0 ; i < 50 ; i++) { System.out.println("send" + i + "start" ); String msg = "message" + i; channel.basicPublish("" , "test_work_queue" , false , null , msg.getBytes()); Thread.sleep(i * 20 ); System.out.println("send" + i + "done" ); } channel.close(); connection.close(); } }
消费者
总共有两个 消费者 Recv1
和 Recv2
将下列代码的 Recv1
改为 Recv2
,将休眠时间改为 1000
,可得到 Recv2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 package com.mmr.myrabbitmq.util.work;import com.mmr.myrabbitmq.util.ConnectionUtils;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Recv1 { public static final String QUEUE_NAME = "test_work_queue" ; public static void main (String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); DefaultConsumer consumer = new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { String msg = new String (body, "UTF-8" ); System.out.println("RECV1---->" + msg); try { Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("RECV1 DONE!" ); } } }; boolean autoAck = true ; channel.basicConsume(QUEUE_NAME, true , consumer); } }
现象 先启动 生产者 再启动消费者
发现 两个消费者 一个接收到的消息 全是偶数,另一个全是奇数1
这种方式叫做轮询分发(round-robin
)结果就是不管谁忙,谁清闲,都不会多给一个消息,任务消息总是你一个,我一个。
公平模式
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 package com.mmr.myrabbitmq.util.workfair;import com.mmr.myrabbitmq.util.ConnectionUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Send { public static final String QUEUE_NAME = "test_work_queue_work_fair" ; public static void main (String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); for (int i = 0 ; i < 50 ; i++) { System.out.println("send" + i + "start" ); String msg = "message" + i; channel.basicPublish("" , QUEUE_NAME, false , null , msg.getBytes()); Thread.sleep(i * 20 ); System.out.println("send" + i + "done" ); } channel.close(); connection.close(); } }
消费者
总共有两个 消费者 Recv1
和 Recv2
将下列代码的 Recv1
改为 Recv2
,将休眠时间改为 1000
,可得到 Recv2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 package com.mmr.myrabbitmq.util.workfair;import com.mmr.myrabbitmq.util.ConnectionUtils;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Recv1 { public static final String QUEUE_NAME = "test_work_queue_work_fair" ; public static void main (String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); channel.basicQos(1 ); DefaultConsumer consumer = new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { String msg = new String (body, "UTF-8" ); System.out.println("RECV1---->" + msg); try { Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("RECV1 DONE!" ); channel.basicAck(envelope.getDeliveryTag(),false ); } } }; boolean autoAck = false ; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
现象
消费者2处理的消息比消费者1多,能者多劳;
Public/Subscribe 发布订阅模式
解读
一个生产者,多个消费者
每一个消费者都有自己的队列
生产者没有直接把消息发送到队列,而是发送到了交换机 exchange
每个队列都要绑定到交换机上
生产者发送的消息,经过交换机到达队列,就能实现一个消息被多个消费者消费
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class Send { private static final String EXCHANGE_NAME = "test_exchange_fanout" ; public static void main (String[] args) throws IOException, TimeoutException { final Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT); String msg = "hello ps" ; channel.basicPublish(EXCHANGE_NAME, "" , false , null , msg.getBytes()); System.out.println("发送了" + msg); channel.close(); connection.close(); } }
消费者1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 package com.mmr.myrabbitmq.ps;import com.mmr.myrabbitmq.util.ConnectionUtils;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Recv1 { private static final String EXCHANGE_NAME = "test_exchange_fanout" ; public static final String QUEUE_NAME = "test_queue_fanout_email" ; public static void main (String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "" ); DefaultConsumer consumer = new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { String msg = new String (body, "UTF-8" ); System.out.println("RECV1---->" + msg); try { Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("RECV1 DONE!" ); channel.basicAck(envelope.getDeliveryTag(),false ); } } }; boolean autoAck = false ; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
消费者2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class Recv2 { private static final String EXCHANGE_NAME = "test_exchange_fanout" ; public static final String QUEUE_NAME = "test_queue_fanout_sms" ; public static void main (String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "" ); DefaultConsumer consumer = new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { String msg = new String (body, "UTF-8" ); System.out.println("RECV2---->" + msg); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("RECV2 DONE!" ); channel.basicAck(envelope.getDeliveryTag(), false ); } } }; boolean autoAck = false ; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
现象 邮件队列 和 短信队列 都能收到同一条消息
Routing 路由模式
配置了相同 routingKey
的消费者才能收到消息
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class Send { private static final String EXCHANGE_NAME = "test_exchange_direct" ; public static void main (String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.DIRECT); String msg = "hello direct!" ; String routingKey = "info" ; System.out.println("发送了" + msg); channel.basicPublish(EXCHANGE_NAME, routingKey, false , null , msg.getBytes()); channel.close(); connection.close(); } }
消费者1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class Recv1 { private static final String EXCHANGE_NAME = "test_exchange_direct" ; public static final String QUEUE_NAME = "test_queue_direct_1" ; public static void main (String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); channel.basicQos(1 ); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error" ); DefaultConsumer consumer = new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { String msg = new String (body, "UTF-8" ); System.out.println("RECV1---->" + msg); try { Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("RECV1 DONE!" ); channel.basicAck(envelope.getDeliveryTag(), false ); } } }; boolean autoAck = false ; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
消费者2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class Recv2 { private static final String EXCHANGE_NAME = "test_exchange_direct" ; public static final String QUEUE_NAME = "test_queue_direct_2" ; public static void main (String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); channel.basicQos(1 ); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error" ); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info" ); channel.basicQos(1 ); DefaultConsumer consumer = new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { String msg = new String (body, "UTF-8" ); System.out.println("RECV2 ---->" + msg); try { Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("RECV2 DONE!" ); channel.basicAck(envelope.getDeliveryTag(), false ); } } }; boolean autoAck = false ; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
现象 当生产者 发送消息时 routingKey
= error
时,两个 消费者都能收到
当生产者 发送消息时 routingKey
= info
时,只有消费者 2 能收到消息
主题模式 topic
使用匹配符 *
匹配一项,#
匹配多项
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class Send { private static final String EXCHANGE_NAME = "test_exchange_topic" ; public static void main (String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.TOPIC); String msg = "商品。。。" ; channel.basicPublish(EXCHANGE_NAME, "goods.add" , null , msg.getBytes()); System.out.println("---send" + msg); channel.close(); connection.close(); } }
消费者1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class Recv1 { private static final String EXCHANGE_NAME = "test_exchange_topic" ; public static final String QUEUE_NAME = "test_queue_topic_1" ; public static void main (String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); channel.basicQos(1 ); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#" ); DefaultConsumer consumer = new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { String msg = new String (body, "UTF-8" ); System.out.println("RECV1 ---->" + msg); try { Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("RECV1 DONE!" ); channel.basicAck(envelope.getDeliveryTag(), false ); } } }; boolean autoAck = false ; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
消费者2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class Recv2 { private static final String EXCHANGE_NAME = "test_exchange_topic" ; public static final String QUEUE_NAME = "test_queue_topic_2" ; public static void main (String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); channel.basicQos(1 ); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.add" ); DefaultConsumer consumer = new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { String msg = new String (body, "UTF-8" ); System.out.println("RECV2 ---->" + msg); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("RECV2 DONE!" ); channel.basicAck(envelope.getDeliveryTag(), false ); } } }; boolean autoAck = false ; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
现象 当生产者 发送消息时 routingKey
= goods.add
时,两个 消费者都能收到
当生产者 发送消息时 routingKey
= goods.insert
时,只有消费者 1 能收到消息, 因为good.#
匹配所有goods
开始的 routingKey
。
rabbitmq 的消息确认机制(事务+confirm) 在 rabbitmq
中,我们可以持久化数据,解决 rabbitmq
服务器异常的数据丢失问题。
问题:生产者将消息发送出去之后,消息有没有到达 rabbitmq
,服务器默认的情况是不知道可。
两种方式
AMQP 实现了事务机制
Confirm 模式
事务机制 txSelect txCommit txRollback;
txSelect : 用于将当前 channel 设置 transaction 模式
txCommit: 用于提交事务。
txRollback: 回滚事务
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 package com.mmr.myrabbitmq.tx;import com.mmr.myrabbitmq.util.ConnectionUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import java.io.IOException;import java.util.concurrent.TimeoutException;public class TxSend { public static final String QUEUE_NAME = "test_queue_tx" ; public static void main (String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); String msg = "hello tx" ; try { channel.txSelect(); channel.basicPublish("" , QUEUE_NAME, null , msg.getBytes()); channel.txCommit(); } catch (Exception e) { channel.txRollback(); System.out.println("sendMsgRollBack" ); } finally { channel.close(); connection.close(); } } }
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class Recv { public static final String QUEUE_NAME = "test_queue_confirm1" ; public static void main (String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); channel.basicConsume(QUEUE_NAME, true , new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { System.out.println("recv tx" + new String (body, "utf-8" )); } }); } }
Confirm 模式 生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public class Send3 { public static final String QUEUE_NAME = "test_queue_confirm3" ; public static void main (String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); channel.confirmSelect(); final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet <Long>()); channel.addConfirmListener(new ConfirmListener () { @Override public void handleAck (long deliveryTag, boolean multiple) throws IOException { if (multiple) { System.out.println("-----handleAck-------multiple" ); confirmSet.headSet(deliveryTag + 1 ).clear(); } else { System.out.println("-----handleAck-------multiple false" ); confirmSet.remove(deliveryTag); } } @Override public void handleNack (long deliveryTag, boolean multiple) throws IOException { if (multiple) { System.out.println("-----handleNAck-------multiple" ); confirmSet.headSet(deliveryTag + 1 ).clear(); } else { System.out.println("-----handleNAck-------multiple false" ); confirmSet.remove(deliveryTag); } } }); String msg = "ssssssss" ; while (true ) { long seqNo = channel.getNextPublishSeqNo(); channel.basicPublish("" , QUEUE_NAME, null , msg.getBytes()); confirmSet.add(seqNo); } } }
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class Recv3 { public static final String QUEUE_NAME = "test_queue_confirm3" ; public static void main (String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false , false , false , null ); channel.basicConsume(QUEUE_NAME, true , new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { System.out.println("recv confirm" + new String (body, "utf-8" )); } }); } }
Return 消息机制 return listener
用户处理一些不可路由的消息
某些情况下,会出现不可达的消息,这时候就需要使用 Return Listener
mandatory
如果为 true,则监听器会收到路由不可达的消息,然后进行后续的处理,如果为 false
,则服务器会自动删除该消息。
消费端限流 防止消费端收到大量的消息,而使服务端挂掉
RabbitMQ
提供了一种 qos(服务质量保证) 功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consume 或者 channel 设置qos 的值)未被确认前,不进行消费新的消息。
在consumer 端有如下的函数来设置参数
void basicQos
1 channel.basicNack(envelope.getDeliveryTag(), false , true );
TTL time to live
在消息上加过期时间
在队列上加超时时间
DLX 死信队列(交换机) 消息变为私信有以下几种情况
消息被拒绝(basic.reject/basic.nack)并且 requeue=false
消息 TTL 过期
当队列长度达到队列长度
DLX 也是一个正常的Exchange,和一般的Exchange没有区别,他能在任何的队列上被指定,实际上就是设置某个队列的属性
当这个队列中有死信时,RabbitMq 就会自动地将这个消息重新发布到设置的Exchange中去,进而被路由到另外一个队列死信队列的 exchange 和 queue,,然后进行绑定
声明一个死信队列 Exchange: dlx.exchnage
Queue: dlx.queue
RoutingKey: #
然后正常声明交换机、队列、绑定,只不过需要在队列上加一个参数
1 arguments.put("x-dead-letter-exchange" ,"dlx.exchange" )
Springboot Spring AMQP实战