rabbitmq基础

Simple Queues 简单队列

  1. 生产者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    package com.mmr.myrabbitmq.util.simple;

    import com.mmr.myrabbitmq.util.ConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    /**
    * 一个生产者对应多个一个消费者
    */
    public class Send {
    public static final String QUEUE_NAME = "test_simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {

    // 获取一个连接
    Connection connection = ConnectionUtils.getConnection();

    // 从中获取一个通道
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    String msg = "Hello Simple";
    // 发送消息
    channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
    // 关闭资源
    channel.close();
    connection.close();

    }
    }

  2. 消费者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    package com.mmr.myrabbitmq.util.simple;

    import com.mmr.myrabbitmq.util.ConnectionUtils;
    import com.rabbitmq.client.*;

    import java.io.IOException;
    import java.io.UnsupportedEncodingException;
    import java.util.concurrent.TimeoutException;

    /**
    * @author Bruce Liu
    * @version V1.0.0
    * @ClassName Recv
    * @Description
    * @date 2019-11-30 16:44
    */
    public class Recv {
    public static final String QUEUE_NAME = "test_simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
    Connection connection = ConnectionUtils.getConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
    // 重写此方法 收到消息之后如何进行处理
    @Override
    public void handleDelivery(String consumerTag,
    Envelope envelope,
    AMQP.BasicProperties properties,
    byte[] body) throws UnsupportedEncodingException {
    String msg = new String(body, "UTF-8");
    System.out.println("收到了消息" + msg);

    }
    };
    // 进行监听
    channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    /**
    * 早期的写法
    * QueueingConsume consume = new QueueingConsume(channel);
    * channel.basicConsume(QUEUE_NAME,true,consume);
    * while (true) {
    * Delivery delivery = consumer.nextDelivery();
    * String msg = new String(delivery.getBody());
    *
    * }
    */
    }
    }

  3. 总结

    简单队列的不足 :耦合性高,生产者——对应消费者(如果想有多个消费者 消费 队列中的消息,不能实现)

    队列名变更,消费者需要同时变更

Work Queues 工作队列

为什么会出现工作队列

Simple 队列 是一一对应的,而实际生产环境中,生产者发消息是后不费力的,而消费者一般要跟业务相关联,处理是比较耗费时间的

轮询模式

  1. 生产者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    package com.mmr.myrabbitmq.util.work;

    import com.mmr.myrabbitmq.util.ConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    /**
    * @author Bruce Liu
    * @version V1.0.0
    * @ClassName Send
    * @Description
    * @date 2019-11-30 17:19
    */
    public class Send {
    public static final String QUEUE_NAME = "test_work_queue";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    // 获取连接
    Connection connection = ConnectionUtils.getConnection();

    // 获取Channel
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    for (int i = 0; i < 50; i++) {
    System.out.println("send" + i + "start");
    String msg = "message" + i;
    channel.basicPublish("",
    "test_work_queue",
    false,
    null, msg.getBytes());
    Thread.sleep(i * 20);
    System.out.println("send" + i + "done");

    }
    channel.close();
    connection.close();


    }
    }

  2. 消费者

    总共有两个 消费者 Recv1Recv2

    将下列代码的 Recv1 改为 Recv2,将休眠时间改为 1000,可得到 Recv2

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    package com.mmr.myrabbitmq.util.work;

    import com.mmr.myrabbitmq.util.ConnectionUtils;
    import com.rabbitmq.client.*;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    /**
    * @author Bruce Liu
    * @version V1.0.0
    * @ClassName Send
    * @Description
    * @date 2019-11-30 17:19
    */
    public class Recv1 {
    public static final String QUEUE_NAME = "test_work_queue";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    // 获取连接
    Connection connection = ConnectionUtils.getConnection();

    // 获取Channel
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    DefaultConsumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String msg = new String(body, "UTF-8");
    System.out.println("RECV1---->" + msg);
    try {
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    System.out.println("RECV1 DONE!");
    }
    }
    };
    boolean autoAck = true;
    channel.basicConsume(QUEUE_NAME, true, consumer);


    }
    }

    现象

    先启动 生产者 再启动消费者

    发现 两个消费者 一个接收到的消息 全是偶数,另一个全是奇数1

    这种方式叫做轮询分发(round-robin)结果就是不管谁忙,谁清闲,都不会多给一个消息,任务消息总是你一个,我一个。

公平模式

  1. 生产者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    package com.mmr.myrabbitmq.util.workfair;

    import com.mmr.myrabbitmq.util.ConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    /**
    * @author Bruce Liu
    * @version V1.0.0
    * @ClassName Send
    * @Description
    * @date 2019-11-30 17:19
    */
    public class Send {
    public static final String QUEUE_NAME = "test_work_queue_work_fair";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    // 获取连接
    Connection connection = ConnectionUtils.getConnection();

    // 获取Channel
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    /**
    * 每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者,一次只能处理一个
    * 限制发送给同一消费者,不得超过一条数据
    */
    // channel.basicQos(1);
    for (int i = 0; i < 50; i++) {
    System.out.println("send" + i + "start");
    String msg = "message" + i;
    channel.basicPublish("",
    QUEUE_NAME,
    false,
    null, msg.getBytes());
    Thread.sleep(i * 20);
    System.out.println("send" + i + "done");

    }
    channel.close();
    connection.close();


    }
    }
  1. 消费者

    总共有两个 消费者 Recv1Recv2

    将下列代码的 Recv1 改为 Recv2,将休眠时间改为 1000,可得到 Recv2

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    package com.mmr.myrabbitmq.util.workfair;

    import com.mmr.myrabbitmq.util.ConnectionUtils;
    import com.rabbitmq.client.*;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    /**
    * @author Bruce Liu
    * @version V1.0.0
    * @ClassName Send
    * @Description
    * @date 2019-11-30 17:19
    */
    public class Recv1 {
    public static final String QUEUE_NAME = "test_work_queue_work_fair";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    // 获取连接
    Connection connection = ConnectionUtils.getConnection();

    // 获取Channel
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    // 保证一次只有一个消息
    channel.basicQos(1);

    DefaultConsumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String msg = new String(body, "UTF-8");
    System.out.println("RECV1---->" + msg);
    try {
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    System.out.println("RECV1 DONE!");
    channel.basicAck(envelope.getDeliveryTag(),false);
    }
    }
    };
    boolean autoAck = false;
    channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
    }
  2. 现象

    消费者2处理的消息比消费者1多,能者多劳;

Public/Subscribe 发布订阅模式

解读

  1. 一个生产者,多个消费者
  2. 每一个消费者都有自己的队列
  3. 生产者没有直接把消息发送到队列,而是发送到了交换机 exchange
  4. 每个队列都要绑定到交换机上
  5. 生产者发送的消息,经过交换机到达队列,就能实现一个消息被多个消费者消费

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Send {
private static final String EXCHANGE_NAME = "test_exchange_fanout";

public static void main(String[] args) throws IOException, TimeoutException {
final Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明交换机
// 这里的type 必须指定为 ExchangeTypes.FANOUT(fanout)
channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT);
// 发送消息
String msg = "hello ps";
channel.basicPublish(EXCHANGE_NAME, "", false, null, msg.getBytes());
System.out.println("发送了" + msg);
channel.close();
connection.close();

}
}

消费者1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.mmr.myrabbitmq.ps;

import com.mmr.myrabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* @author Bruce Liu
* @version V1.0.0
* @ClassName Recv1
* @Description
* @date 2019-11-30 21:18
*/
public class Recv1 {
private static final String EXCHANGE_NAME = "test_exchange_fanout";
public static final String QUEUE_NAME = "test_queue_fanout_email";

public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("RECV1---->" + msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("RECV1 DONE!");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}

消费者2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Recv2 {
private static final String EXCHANGE_NAME = "test_exchange_fanout";
public static final String QUEUE_NAME = "test_queue_fanout_sms";

public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("RECV2---->" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("RECV2 DONE!");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);


}
}

现象 邮件队列 和 短信队列 都能收到同一条消息

Routing 路由模式

配置了相同 routingKey 的消费者才能收到消息

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Send {
private static final String EXCHANGE_NAME = "test_exchange_direct";

public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 这里的 type 必须指定为 ExchangeTypes.DIRECT(direct)
channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.DIRECT);
String msg = "hello direct!";
// 在这里指定 routingKey
String routingKey = "info";
System.out.println("发送了" + msg);
channel.basicPublish(EXCHANGE_NAME, routingKey, false, null, msg.getBytes());
channel.close();
connection.close();
}
}

消费者1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Recv1 {
private static final String EXCHANGE_NAME = "test_exchange_direct";
public static final String QUEUE_NAME = "test_queue_direct_1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
// 这里指定 接收 routingKey 为 "error" 的消息
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");

DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("RECV1---->" + msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("RECV1 DONE!");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);

}
}

消费者2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class Recv2 {
private static final String EXCHANGE_NAME = "test_exchange_direct";
public static final String QUEUE_NAME = "test_queue_direct_2";


public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
// 这里指定 接收 routingKey 为 "error" 或者 "info" 的消息
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("RECV2 ---->" + msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("RECV2 DONE!");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}

现象 当生产者 发送消息时 routingKey = error 时,两个 消费者都能收到

当生产者 发送消息时 routingKey = info 时,只有消费者 2 能收到消息

主题模式 topic

使用匹配符 * 匹配一项,# 匹配多项

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Send {
private static final String EXCHANGE_NAME = "test_exchange_topic";
//public static final String QUEUE_NAME = "test_queue_fanout_email";

public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.TOPIC);
String msg = "商品。。。";
// 匹配商品的新增业务
channel.basicPublish(EXCHANGE_NAME, "goods.add",
null, msg.getBytes());
System.out.println("---send" + msg);
channel.close();
connection.close();
}
}

消费者1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Recv1 {
private static final String EXCHANGE_NAME = "test_exchange_topic";
public static final String QUEUE_NAME = "test_queue_topic_1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("RECV1 ---->" + msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("RECV1 DONE!");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);


}
}

消费者2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class Recv2 {
private static final String EXCHANGE_NAME = "test_exchange_topic";
public static final String QUEUE_NAME = "test_queue_topic_2";


public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.add");

DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("RECV2 ---->" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("RECV2 DONE!");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);


}
}

现象 当生产者 发送消息时 routingKey = goods.add 时,两个 消费者都能收到

当生产者 发送消息时 routingKey = goods.insert 时,只有消费者 1 能收到消息, 因为good.# 匹配所有goods开始的 routingKey

rabbitmq 的消息确认机制(事务+confirm)

rabbitmq 中,我们可以持久化数据,解决 rabbitmq 服务器异常的数据丢失问题。

问题:生产者将消息发送出去之后,消息有没有到达 rabbitmq ,服务器默认的情况是不知道可。

两种方式

  1. AMQP 实现了事务机制
  2. Confirm 模式

事务机制

txSelect txCommit txRollback;

txSelect : 用于将当前 channel 设置 transaction 模式

txCommit: 用于提交事务。

txRollback: 回滚事务

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.mmr.myrabbitmq.tx;

import com.mmr.myrabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* @author Bruce Liu
* @version V1.0.0
* @ClassName Send
* @Description
* @date 2019-12-01 14:13
*/
public class TxSend {
public static final String QUEUE_NAME = "test_queue_tx";


public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "hello tx";
try {
channel.txSelect();
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
// 当出错时 不会进行提交
// int xx = 1 / 0;
channel.txCommit();
} catch (Exception e) {
channel.txRollback();
System.out.println("sendMsgRollBack");
} finally {
channel.close();
connection.close();
}

}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Recv {
public static final String QUEUE_NAME = "test_queue_confirm1";


public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("recv tx" + new String(body, "utf-8"));
}
});


}
}

Confirm 模式

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class Send3 {
public static final String QUEUE_NAME = "test_queue_confirm3";

public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

channel.confirmSelect();

final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());

// 添加一个监听
channel.addConfirmListener(new ConfirmListener() {
// 正常收到 确认消息
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
System.out.println("-----handleAck-------multiple");
confirmSet.headSet(deliveryTag + 1).clear();
} else {
System.out.println("-----handleAck-------multiple false");
confirmSet.remove(deliveryTag);

}
}

@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
System.out.println("-----handleNAck-------multiple");
confirmSet.headSet(deliveryTag + 1).clear();

} else {
System.out.println("-----handleNAck-------multiple false");
confirmSet.remove(deliveryTag);

}
}
});
String msg = "ssssssss";
while (true) {
long seqNo = channel.getNextPublishSeqNo();
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
confirmSet.add(seqNo);
}

}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Recv3 {
public static final String QUEUE_NAME = "test_queue_confirm3";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("recv confirm" + new String(body, "utf-8"));
}
});
}
}

Return 消息机制

return listener 用户处理一些不可路由的消息

某些情况下,会出现不可达的消息,这时候就需要使用 Return Listener

mandatory 如果为 true,则监听器会收到路由不可达的消息,然后进行后续的处理,如果为 false,则服务器会自动删除该消息。

消费端限流

防止消费端收到大量的消息,而使服务端挂掉

RabbitMQ 提供了一种 qos(服务质量保证) 功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consume 或者 channel 设置qos 的值)未被确认前,不进行消费新的消息。

在consumer 端有如下的函数来设置参数

void basicQos

1
channel.basicNack(envelope.getDeliveryTag(), false, true);

TTL time to live

  1. 在消息上加过期时间
  2. 在队列上加超时时间

DLX 死信队列(交换机)

消息变为私信有以下几种情况

  1. 消息被拒绝(basic.reject/basic.nack)并且 requeue=false
  2. 消息 TTL 过期
  3. 当队列长度达到队列长度

DLX 也是一个正常的Exchange,和一般的Exchange没有区别,他能在任何的队列上被指定,实际上就是设置某个队列的属性

当这个队列中有死信时,RabbitMq 就会自动地将这个消息重新发布到设置的Exchange中去,进而被路由到另外一个队列死信队列的 exchange 和 queue,,然后进行绑定

声明一个死信队列

Exchange: dlx.exchnage

Queue: dlx.queue

RoutingKey: #

然后正常声明交换机、队列、绑定,只不过需要在队列上加一个参数

1
arguments.put("x-dead-letter-exchange","dlx.exchange")

Springboot Spring AMQP实战

作者

Bruce Liu

发布于

2019-11-30

更新于

2022-11-12

许可协议

You need to set install_url to use ShareThis. Please set it in _config.yml.
You forgot to set the business or currency_code for Paypal. Please set it in _config.yml.

评论

You forgot to set the shortname for Disqus. Please set it in _config.yml.