Simple Queues 简单队列
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| package com.mmr.myrabbitmq.util.simple;
import com.mmr.myrabbitmq.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class Send { public static final String QUEUE_NAME = "test_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null); String msg = "Hello Simple"; channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); channel.close(); connection.close();
} }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| package com.mmr.myrabbitmq.util.simple;
import com.mmr.myrabbitmq.util.ConnectionUtils; import com.rabbitmq.client.*;
import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.concurrent.TimeoutException;
public class Recv { public static final String QUEUE_NAME = "test_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException { String msg = new String(body, "UTF-8"); System.out.println("收到了消息" + msg);
} }; channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
} }
|
总结
简单队列的不足 :耦合性高,生产者——对应消费者(如果想有多个消费者 消费 队列中的消息,不能实现)
队列名变更,消费者需要同时变更
Work Queues 工作队列
为什么会出现工作队列
Simple 队列 是一一对应的,而实际生产环境中,生产者发消息是后不费力的,而消费者一般要跟业务相关联,处理是比较耗费时间的
轮询模式
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| package com.mmr.myrabbitmq.util.work;
import com.mmr.myrabbitmq.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class Send { public static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 50; i++) { System.out.println("send" + i + "start"); String msg = "message" + i; channel.basicPublish("", "test_work_queue", false, null, msg.getBytes()); Thread.sleep(i * 20); System.out.println("send" + i + "done");
} channel.close(); connection.close();
} }
|
消费者
总共有两个 消费者 Recv1
和 Recv2
将下列代码的 Recv1
改为 Recv2
,将休眠时间改为 1000
,可得到 Recv2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| package com.mmr.myrabbitmq.util.work;
import com.mmr.myrabbitmq.util.ConnectionUtils; import com.rabbitmq.client.*;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class Recv1 { public static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("RECV1---->" + msg); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("RECV1 DONE!"); } } }; boolean autoAck = true; channel.basicConsume(QUEUE_NAME, true, consumer);
} }
|
现象
先启动 生产者 再启动消费者
发现 两个消费者 一个接收到的消息 全是偶数,另一个全是奇数1
这种方式叫做轮询分发(round-robin
)结果就是不管谁忙,谁清闲,都不会多给一个消息,任务消息总是你一个,我一个。
公平模式
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| package com.mmr.myrabbitmq.util.workfair;
import com.mmr.myrabbitmq.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class Send { public static final String QUEUE_NAME = "test_work_queue_work_fair";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 50; i++) { System.out.println("send" + i + "start"); String msg = "message" + i; channel.basicPublish("", QUEUE_NAME, false, null, msg.getBytes()); Thread.sleep(i * 20); System.out.println("send" + i + "done");
} channel.close(); connection.close();
} }
|
消费者
总共有两个 消费者 Recv1
和 Recv2
将下列代码的 Recv1
改为 Recv2
,将休眠时间改为 1000
,可得到 Recv2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| package com.mmr.myrabbitmq.util.workfair;
import com.mmr.myrabbitmq.util.ConnectionUtils; import com.rabbitmq.client.*;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class Recv1 { public static final String QUEUE_NAME = "test_work_queue_work_fair";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("RECV1---->" + msg); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("RECV1 DONE!"); channel.basicAck(envelope.getDeliveryTag(),false); } } }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
|
现象
消费者2处理的消息比消费者1多,能者多劳;
Public/Subscribe 发布订阅模式
解读
- 一个生产者,多个消费者
- 每一个消费者都有自己的队列
- 生产者没有直接把消息发送到队列,而是发送到了交换机
exchange
- 每个队列都要绑定到交换机上
- 生产者发送的消息,经过交换机到达队列,就能实现一个消息被多个消费者消费
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class Send { private static final String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException { final Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT); String msg = "hello ps"; channel.basicPublish(EXCHANGE_NAME, "", false, null, msg.getBytes()); System.out.println("发送了" + msg); channel.close(); connection.close();
} }
|
消费者1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| package com.mmr.myrabbitmq.ps;
import com.mmr.myrabbitmq.util.ConnectionUtils; import com.rabbitmq.client.*;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class Recv1 { private static final String EXCHANGE_NAME = "test_exchange_fanout"; public static final String QUEUE_NAME = "test_queue_fanout_email";
public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("RECV1---->" + msg); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("RECV1 DONE!"); channel.basicAck(envelope.getDeliveryTag(),false); } } }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
|
消费者2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public class Recv2 { private static final String EXCHANGE_NAME = "test_exchange_fanout"; public static final String QUEUE_NAME = "test_queue_fanout_sms";
public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("RECV2---->" + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("RECV2 DONE!"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, consumer);
} }
|
现象 邮件队列 和 短信队列 都能收到同一条消息
Routing 路由模式
配置了相同 routingKey
的消费者才能收到消息
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class Send { private static final String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.DIRECT); String msg = "hello direct!"; String routingKey = "info"; System.out.println("发送了" + msg); channel.basicPublish(EXCHANGE_NAME, routingKey, false, null, msg.getBytes()); channel.close(); connection.close(); } }
|
消费者1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class Recv1 { private static final String EXCHANGE_NAME = "test_exchange_direct"; public static final String QUEUE_NAME = "test_queue_direct_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("RECV1---->" + msg); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("RECV1 DONE!"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
|
消费者2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public class Recv2 { private static final String EXCHANGE_NAME = "test_exchange_direct"; public static final String QUEUE_NAME = "test_queue_direct_2";
public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info"); channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("RECV2 ---->" + msg); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("RECV2 DONE!"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
|
现象 当生产者 发送消息时 routingKey
= error
时,两个 消费者都能收到
当生产者 发送消息时 routingKey
= info
时,只有消费者 2 能收到消息
主题模式 topic
使用匹配符 *
匹配一项,#
匹配多项
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class Send { private static final String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.TOPIC); String msg = "商品。。。"; channel.basicPublish(EXCHANGE_NAME, "goods.add", null, msg.getBytes()); System.out.println("---send" + msg); channel.close(); connection.close(); } }
|
消费者1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public class Recv1 { private static final String EXCHANGE_NAME = "test_exchange_topic"; public static final String QUEUE_NAME = "test_queue_topic_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#"); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("RECV1 ---->" + msg); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("RECV1 DONE!"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, consumer);
} }
|
消费者2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public class Recv2 { private static final String EXCHANGE_NAME = "test_exchange_topic"; public static final String QUEUE_NAME = "test_queue_topic_2";
public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.add");
DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "UTF-8"); System.out.println("RECV2 ---->" + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("RECV2 DONE!"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, consumer);
} }
|
现象 当生产者 发送消息时 routingKey
= goods.add
时,两个 消费者都能收到
当生产者 发送消息时 routingKey
= goods.insert
时,只有消费者 1 能收到消息, 因为good.#
匹配所有goods
开始的 routingKey
。
rabbitmq 的消息确认机制(事务+confirm)
在 rabbitmq
中,我们可以持久化数据,解决 rabbitmq
服务器异常的数据丢失问题。
问题:生产者将消息发送出去之后,消息有没有到达 rabbitmq
,服务器默认的情况是不知道可。
两种方式
- AMQP 实现了事务机制
- Confirm 模式
事务机制
txSelect txCommit txRollback;
txSelect : 用于将当前 channel 设置 transaction 模式
txCommit: 用于提交事务。
txRollback: 回滚事务
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| package com.mmr.myrabbitmq.tx;
import com.mmr.myrabbitmq.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class TxSend { public static final String QUEUE_NAME = "test_queue_tx";
public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String msg = "hello tx"; try { channel.txSelect(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); channel.txCommit(); } catch (Exception e) { channel.txRollback(); System.out.println("sendMsgRollBack"); } finally { channel.close(); connection.close(); }
} }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class Recv { public static final String QUEUE_NAME = "test_queue_confirm1";
public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("recv tx" + new String(body, "utf-8")); } });
} }
|
Confirm 模式
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| public class Send3 { public static final String QUEUE_NAME = "test_queue_confirm3";
public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.confirmSelect();
final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>()); channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { if (multiple) { System.out.println("-----handleAck-------multiple"); confirmSet.headSet(deliveryTag + 1).clear(); } else { System.out.println("-----handleAck-------multiple false"); confirmSet.remove(deliveryTag);
} }
@Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { if (multiple) { System.out.println("-----handleNAck-------multiple"); confirmSet.headSet(deliveryTag + 1).clear();
} else { System.out.println("-----handleNAck-------multiple false"); confirmSet.remove(deliveryTag);
} } }); String msg = "ssssssss"; while (true) { long seqNo = channel.getNextPublishSeqNo(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); confirmSet.add(seqNo); }
} }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public class Recv3 { public static final String QUEUE_NAME = "test_queue_confirm3"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("recv confirm" + new String(body, "utf-8")); } }); } }
|
Return 消息机制
return listener
用户处理一些不可路由的消息
某些情况下,会出现不可达的消息,这时候就需要使用 Return Listener
mandatory
如果为 true,则监听器会收到路由不可达的消息,然后进行后续的处理,如果为 false
,则服务器会自动删除该消息。
消费端限流
防止消费端收到大量的消息,而使服务端挂掉
RabbitMQ
提供了一种 qos(服务质量保证) 功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consume 或者 channel 设置qos 的值)未被确认前,不进行消费新的消息。
在consumer 端有如下的函数来设置参数
void basicQos
1
| channel.basicNack(envelope.getDeliveryTag(), false, true);
|
TTL time to live
- 在消息上加过期时间
- 在队列上加超时时间
DLX 死信队列(交换机)
消息变为私信有以下几种情况
- 消息被拒绝(basic.reject/basic.nack)并且 requeue=false
- 消息 TTL 过期
- 当队列长度达到队列长度
DLX 也是一个正常的Exchange,和一般的Exchange没有区别,他能在任何的队列上被指定,实际上就是设置某个队列的属性
当这个队列中有死信时,RabbitMq 就会自动地将这个消息重新发布到设置的Exchange中去,进而被路由到另外一个队列死信队列的 exchange 和 queue,,然后进行绑定
声明一个死信队列
Exchange: dlx.exchnage
Queue: dlx.queue
RoutingKey: #
然后正常声明交换机、队列、绑定,只不过需要在队列上加一个参数
1
| arguments.put("x-dead-letter-exchange","dlx.exchange")
|
Springboot Spring AMQP实战