rabbitmq基础

Simple Queues 简单队列

  1. 生产者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    package com.mmr.myrabbitmq.util.simple;

    import com.mmr.myrabbitmq.util.ConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    /**
    * 一个生产者对应多个一个消费者
    */
    public class Send {
    public static final String QUEUE_NAME = "test_simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {

    // 获取一个连接
    Connection connection = ConnectionUtils.getConnection();

    // 从中获取一个通道
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    String msg = "Hello Simple";
    // 发送消息
    channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
    // 关闭资源
    channel.close();
    connection.close();

    }
    }

  2. 消费者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    package com.mmr.myrabbitmq.util.simple;

    import com.mmr.myrabbitmq.util.ConnectionUtils;
    import com.rabbitmq.client.*;

    import java.io.IOException;
    import java.io.UnsupportedEncodingException;
    import java.util.concurrent.TimeoutException;

    /**
    * @author Bruce Liu
    * @version V1.0.0
    * @ClassName Recv
    * @Description
    * @date 2019-11-30 16:44
    */
    public class Recv {
    public static final String QUEUE_NAME = "test_simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
    Connection connection = ConnectionUtils.getConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
    // 重写此方法 收到消息之后如何进行处理
    @Override
    public void handleDelivery(String consumerTag,
    Envelope envelope,
    AMQP.BasicProperties properties,
    byte[] body) throws UnsupportedEncodingException {
    String msg = new String(body, "UTF-8");
    System.out.println("收到了消息" + msg);

    }
    };
    // 进行监听
    channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    /**
    * 早期的写法
    * QueueingConsume consume = new QueueingConsume(channel);
    * channel.basicConsume(QUEUE_NAME,true,consume);
    * while (true) {
    * Delivery delivery = consumer.nextDelivery();
    * String msg = new String(delivery.getBody());
    *
    * }
    */
    }
    }

  3. 总结

    简单队列的不足 :耦合性高,生产者——对应消费者(如果想有多个消费者 消费 队列中的消息,不能实现)

    队列名变更,消费者需要同时变更

Work Queues 工作队列

为什么会出现工作队列

Simple 队列 是一一对应的,而实际生产环境中,生产者发消息是后不费力的,而消费者一般要跟业务相关联,处理是比较耗费时间的

轮询模式

  1. 生产者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    package com.mmr.myrabbitmq.util.work;

    import com.mmr.myrabbitmq.util.ConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    /**
    * @author Bruce Liu
    * @version V1.0.0
    * @ClassName Send
    * @Description
    * @date 2019-11-30 17:19
    */
    public class Send {
    public static final String QUEUE_NAME = "test_work_queue";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    // 获取连接
    Connection connection = ConnectionUtils.getConnection();

    // 获取Channel
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    for (int i = 0; i < 50; i++) {
    System.out.println("send" + i + "start");
    String msg = "message" + i;
    channel.basicPublish("",
    "test_work_queue",
    false,
    null, msg.getBytes());
    Thread.sleep(i * 20);
    System.out.println("send" + i + "done");

    }
    channel.close();
    connection.close();


    }
    }

  2. 消费者

    总共有两个 消费者 Recv1Recv2

    将下列代码的 Recv1 改为 Recv2,将休眠时间改为 1000,可得到 Recv2

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    package com.mmr.myrabbitmq.util.work;

    import com.mmr.myrabbitmq.util.ConnectionUtils;
    import com.rabbitmq.client.*;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    /**
    * @author Bruce Liu
    * @version V1.0.0
    * @ClassName Send
    * @Description
    * @date 2019-11-30 17:19
    */
    public class Recv1 {
    public static final String QUEUE_NAME = "test_work_queue";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    // 获取连接
    Connection connection = ConnectionUtils.getConnection();

    // 获取Channel
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    DefaultConsumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String msg = new String(body, "UTF-8");
    System.out.println("RECV1---->" + msg);
    try {
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    System.out.println("RECV1 DONE!");
    }
    }
    };
    boolean autoAck = true;
    channel.basicConsume(QUEUE_NAME, true, consumer);


    }
    }

    现象

    先启动 生产者 再启动消费者

    发现 两个消费者 一个接收到的消息 全是偶数,另一个全是奇数1

    这种方式叫做轮询分发(round-robin)结果就是不管谁忙,谁清闲,都不会多给一个消息,任务消息总是你一个,我一个。

公平模式

  1. 生产者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    package com.mmr.myrabbitmq.util.workfair;

    import com.mmr.myrabbitmq.util.ConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    /**
    * @author Bruce Liu
    * @version V1.0.0
    * @ClassName Send
    * @Description
    * @date 2019-11-30 17:19
    */
    public class Send {
    public static final String QUEUE_NAME = "test_work_queue_work_fair";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    // 获取连接
    Connection connection = ConnectionUtils.getConnection();

    // 获取Channel
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    /**
    * 每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者,一次只能处理一个
    * 限制发送给同一消费者,不得超过一条数据
    */
    // channel.basicQos(1);
    for (int i = 0; i < 50; i++) {
    System.out.println("send" + i + "start");
    String msg = "message" + i;
    channel.basicPublish("",
    QUEUE_NAME,
    false,
    null, msg.getBytes());
    Thread.sleep(i * 20);
    System.out.println("send" + i + "done");

    }
    channel.close();
    connection.close();


    }
    }
  1. 消费者

    总共有两个 消费者 Recv1Recv2

    将下列代码的 Recv1 改为 Recv2,将休眠时间改为 1000,可得到 Recv2

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    package com.mmr.myrabbitmq.util.workfair;

    import com.mmr.myrabbitmq.util.ConnectionUtils;
    import com.rabbitmq.client.*;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    /**
    * @author Bruce Liu
    * @version V1.0.0
    * @ClassName Send
    * @Description
    * @date 2019-11-30 17:19
    */
    public class Recv1 {
    public static final String QUEUE_NAME = "test_work_queue_work_fair";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    // 获取连接
    Connection connection = ConnectionUtils.getConnection();

    // 获取Channel
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    // 保证一次只有一个消息
    channel.basicQos(1);

    DefaultConsumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String msg = new String(body, "UTF-8");
    System.out.println("RECV1---->" + msg);
    try {
    Thread.sleep(2000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    System.out.println("RECV1 DONE!");
    channel.basicAck(envelope.getDeliveryTag(),false);
    }
    }
    };
    boolean autoAck = false;
    channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
    }
  2. 现象

    消费者2处理的消息比消费者1多,能者多劳;

Public/Subscribe 发布订阅模式

解读

  1. 一个生产者,多个消费者
  2. 每一个消费者都有自己的队列
  3. 生产者没有直接把消息发送到队列,而是发送到了交换机 exchange
  4. 每个队列都要绑定到交换机上
  5. 生产者发送的消息,经过交换机到达队列,就能实现一个消息被多个消费者消费

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Send {
private static final String EXCHANGE_NAME = "test_exchange_fanout";

public static void main(String[] args) throws IOException, TimeoutException {
final Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明交换机
// 这里的type 必须指定为 ExchangeTypes.FANOUT(fanout)
channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT);
// 发送消息
String msg = "hello ps";
channel.basicPublish(EXCHANGE_NAME, "", false, null, msg.getBytes());
System.out.println("发送了" + msg);
channel.close();
connection.close();

}
}

消费者1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.mmr.myrabbitmq.ps;

import com.mmr.myrabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* @author Bruce Liu
* @version V1.0.0
* @ClassName Recv1
* @Description
* @date 2019-11-30 21:18
*/
public class Recv1 {
private static final String EXCHANGE_NAME = "test_exchange_fanout";
public static final String QUEUE_NAME = "test_queue_fanout_email";

public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("RECV1---->" + msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("RECV1 DONE!");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}

消费者2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Recv2 {
private static final String EXCHANGE_NAME = "test_exchange_fanout";
public static final String QUEUE_NAME = "test_queue_fanout_sms";

public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("RECV2---->" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("RECV2 DONE!");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);


}
}

现象 邮件队列 和 短信队列 都能收到同一条消息

Routing 路由模式

配置了相同 routingKey 的消费者才能收到消息

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Send {
private static final String EXCHANGE_NAME = "test_exchange_direct";

public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 这里的 type 必须指定为 ExchangeTypes.DIRECT(direct)
channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.DIRECT);
String msg = "hello direct!";
// 在这里指定 routingKey
String routingKey = "info";
System.out.println("发送了" + msg);
channel.basicPublish(EXCHANGE_NAME, routingKey, false, null, msg.getBytes());
channel.close();
connection.close();
}
}

消费者1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Recv1 {
private static final String EXCHANGE_NAME = "test_exchange_direct";
public static final String QUEUE_NAME = "test_queue_direct_1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
// 这里指定 接收 routingKey 为 "error" 的消息
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");

DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("RECV1---->" + msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("RECV1 DONE!");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);

}
}

消费者2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class Recv2 {
private static final String EXCHANGE_NAME = "test_exchange_direct";
public static final String QUEUE_NAME = "test_queue_direct_2";


public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
// 这里指定 接收 routingKey 为 "error" 或者 "info" 的消息
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("RECV2 ---->" + msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("RECV2 DONE!");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}

现象 当生产者 发送消息时 routingKey = error 时,两个 消费者都能收到

当生产者 发送消息时 routingKey = info 时,只有消费者 2 能收到消息

主题模式 topic

使用匹配符 * 匹配一项,# 匹配多项

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Send {
private static final String EXCHANGE_NAME = "test_exchange_topic";
//public static final String QUEUE_NAME = "test_queue_fanout_email";

public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.TOPIC);
String msg = "商品。。。";
// 匹配商品的新增业务
channel.basicPublish(EXCHANGE_NAME, "goods.add",
null, msg.getBytes());
System.out.println("---send" + msg);
channel.close();
connection.close();
}
}

消费者1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Recv1 {
private static final String EXCHANGE_NAME = "test_exchange_topic";
public static final String QUEUE_NAME = "test_queue_topic_1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("RECV1 ---->" + msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("RECV1 DONE!");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);


}
}

消费者2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class Recv2 {
private static final String EXCHANGE_NAME = "test_exchange_topic";
public static final String QUEUE_NAME = "test_queue_topic_2";


public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.add");

DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("RECV2 ---->" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("RECV2 DONE!");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);


}
}

现象 当生产者 发送消息时 routingKey = goods.add 时,两个 消费者都能收到

当生产者 发送消息时 routingKey = goods.insert 时,只有消费者 1 能收到消息, 因为good.# 匹配所有goods开始的 routingKey

rabbitmq 的消息确认机制(事务+confirm)

rabbitmq 中,我们可以持久化数据,解决 rabbitmq 服务器异常的数据丢失问题。

问题:生产者将消息发送出去之后,消息有没有到达 rabbitmq ,服务器默认的情况是不知道可。

两种方式

  1. AMQP 实现了事务机制
  2. Confirm 模式

事务机制

txSelect txCommit txRollback;

txSelect : 用于将当前 channel 设置 transaction 模式

txCommit: 用于提交事务。

txRollback: 回滚事务

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.mmr.myrabbitmq.tx;

import com.mmr.myrabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* @author Bruce Liu
* @version V1.0.0
* @ClassName Send
* @Description
* @date 2019-12-01 14:13
*/
public class TxSend {
public static final String QUEUE_NAME = "test_queue_tx";


public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "hello tx";
try {
channel.txSelect();
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
// 当出错时 不会进行提交
// int xx = 1 / 0;
channel.txCommit();
} catch (Exception e) {
channel.txRollback();
System.out.println("sendMsgRollBack");
} finally {
channel.close();
connection.close();
}

}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Recv {
public static final String QUEUE_NAME = "test_queue_confirm1";


public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("recv tx" + new String(body, "utf-8"));
}
});


}
}

Confirm 模式

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class Send3 {
public static final String QUEUE_NAME = "test_queue_confirm3";

public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

channel.confirmSelect();

final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());

// 添加一个监听
channel.addConfirmListener(new ConfirmListener() {
// 正常收到 确认消息
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
System.out.println("-----handleAck-------multiple");
confirmSet.headSet(deliveryTag + 1).clear();
} else {
System.out.println("-----handleAck-------multiple false");
confirmSet.remove(deliveryTag);

}
}

@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
System.out.println("-----handleNAck-------multiple");
confirmSet.headSet(deliveryTag + 1).clear();

} else {
System.out.println("-----handleNAck-------multiple false");
confirmSet.remove(deliveryTag);

}
}
});
String msg = "ssssssss";
while (true) {
long seqNo = channel.getNextPublishSeqNo();
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
confirmSet.add(seqNo);
}

}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Recv3 {
public static final String QUEUE_NAME = "test_queue_confirm3";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("recv confirm" + new String(body, "utf-8"));
}
});
}
}

Return 消息机制

return listener 用户处理一些不可路由的消息

某些情况下,会出现不可达的消息,这时候就需要使用 Return Listener

mandatory 如果为 true,则监听器会收到路由不可达的消息,然后进行后续的处理,如果为 false,则服务器会自动删除该消息。

消费端限流

防止消费端收到大量的消息,而使服务端挂掉

RabbitMQ 提供了一种 qos(服务质量保证) 功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consume 或者 channel 设置qos 的值)未被确认前,不进行消费新的消息。

在consumer 端有如下的函数来设置参数

void basicQos

1
channel.basicNack(envelope.getDeliveryTag(), false, true);

TTL time to live

  1. 在消息上加过期时间
  2. 在队列上加超时时间

DLX 死信队列(交换机)

消息变为私信有以下几种情况

  1. 消息被拒绝(basic.reject/basic.nack)并且 requeue=false
  2. 消息 TTL 过期
  3. 当队列长度达到队列长度

DLX 也是一个正常的Exchange,和一般的Exchange没有区别,他能在任何的队列上被指定,实际上就是设置某个队列的属性

当这个队列中有死信时,RabbitMq 就会自动地将这个消息重新发布到设置的Exchange中去,进而被路由到另外一个队列死信队列的 exchange 和 queue,,然后进行绑定

声明一个死信队列

Exchange: dlx.exchnage

Queue: dlx.queue

RoutingKey: #

然后正常声明交换机、队列、绑定,只不过需要在队列上加一个参数

1
arguments.put("x-dead-letter-exchange","dlx.exchange")

Springboot Spring AMQP实战

Docker安装Elastic-Search

启动命令

1
docker run -p 9200:9200 -p 9300:9300   -e "ES_JAVA_OPTS=-Xms256m -Xmx512m" -e "discovery.type=single-node" -d  docker.io/elasticsearch:7.4.2 

docker安装rabbitmq

  1. 拉取 docker 镜像

    这里使用的 网易云的镜像

    1
    docker pull hub.c.163.com/library/rabbitmq:latest
  2. 启动容器

    5762 是服务的端口,15762 是管理控制台的端口

    1
    docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq hub.c.163.com/library/rabbitmq:latest 

    启动好后,访问 rabbitmq 的控制台,发现无法访问,需要开启管理插件

    1
    http://ip:15762/
  3. 进入 rabbitmq 的命令行

    1
    docker exec -it rabbitmq bash
  4. 执行以下命令

    1
    rabbitmq-plugins enable rabbitmq_management

再次访问 ip:15762 即可以正常访问控制台

初始用户名和密码是 guest guest

fastdfs安装遇到的问题

在安装 fastdfs 启动 nginx 遇到如下两个问题

1
2
3
4
5
[2019-10-18 09:44:00] ERROR - file: shared_func.c, line: 968, /etc/fdfs/mod_fastdfs.conf is not a regular file
[2019-10-18 09:44:00] ERROR - file: /tmp/fastdfs-nginx-module/src/common.c, line: 155, load conf file "/etc/fdfs/mod_fastdfs.conf" fail, ret code: 22
2019/10/18 09:44:00 [alert] 29644#0: worker process 29991 exited with fatal code 2 and cannot be respawned
2019/10/18 09:51:35 [notice] 30120#0: signal process started

解决方法:

1
chmod 755 /etc/fdfs/

在执行上面的命令后 还会遇到另一个问题

1
2
3
4
[2019-10-18 09:57:03] ERROR - file: shared_func.c, line: 979, open file /etc/fdfs/http.conf fail, errno: 13, error info: Permission denied
[2019-10-18 09:57:03] ERROR - file: /tmp/fastdfs-nginx-module/src/common.c, line: 155, load conf file "/etc/fdfs/mod_fastdfs.conf" fail, ret code: 13
2019/10/18 09:57:03 [alert] 29644#0: worker process 30215 exited with fatal code 2 and cannot be respawned

解决方法

打开 nginx 的配置文件

1
#user  nobody;

改为

1
user  root;

docker尚硅谷

Docker 解决了什么问题

  1. 运行环境不一致所带来的问题
  2. 隔离运行环境
  3. docker 的标准化,让弹性伸缩变为可能

Linux下安装MySql

软件准备

一、软件准备

Linux 版本:CentOS 7

MySql 版本 MySql 5.7

MySql 下载地址:

1
https://dev.mysql.com/downloads/mysql/5.7.html#downloads

选择如下版本

MySql版本

二、安装过程

1. 将下载好的mysql 安装文件上传,使用如下命令解压

1
tar -xvf mysql-5.7.26-1.el7.x86_64.rpm-bundle.tar

解压完成后的文件列表如下

文件列表

2. 安装mysql

1
yum install mysql-*.rpm

安装过程中一路输入y。

3. 启动和关闭mysql

1
2
3
4
## 启动
service mysqld start
## 关闭
service mysqld stop

###4. 设置密码

mysql 安装好之后,系统会自动生成一个临时的密码,使用如下命令来查看

1
2
3
4
5
6
grep 'password' /var/log/mysqld.log |head -n 1
## 显示的临时密码如下
A temporary password is generated for root@localhost: RqWlxls3p.+c
## 用这个临时密码登录,输入如下命令
mysql -u root -p
## 输入上面的命令和密码即可登录

登录完成后进行数据库操作

1
2
mysql> show databases;
2 ERROR 1820 (HY000): You must reset your password using ALTER USER statement before executing this statement.

上面的命令提示必须首先修改密码,使用如下的命令来修改

1
alter user user() identified by "Ds343242+4@";

注意:密码必须使用 大写字母+数字+字母+特殊字符的组合,不让不能通过检验。也可以使用如下命令来降低安全策略的限制

1
2
set global validate_password_policy=0;
set global validate_password_length=1;

下次使用命令行登录 mySql 时使用如下命令

1
2
3
4
[root@instance-oje799dw local]# mysql -u root -p
Enter password:
Welcome to the MySQL monitor. Commands end with ; or \g.
....

三、远程登录配置

mySql 安装好之后,如果使用其他机器连接可能会出现如下报错:

1
Host * is not allowed to connect to this MySQL server

这是因为 mySql 安全性的考虑。

使用如下方法解决

1
2
3
4
5
6
7
## 先用命令行登录 MySql
mysql -u root -p
## 设置权限
use mysql
update user set host = '%' where user = 'root';
## 刷新权限,这一步很关键,不让不会起效
FLUSH PRIVILEGES;

执行上面的命令之后,就可以远程链接MySQL了。

spring-security基础

一、简介

构建系统时,对于请求需要考虑安全性的问题,可能需要进行身份验证、权限管理,这样可以有效保证自己网站的安全。

二、使用

1. 安装

引入 Spring Security 的依赖

1
2
3
4
5
6

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>

java8学习笔记

Lambda 表达式

三、Lambda 表达式

5. 类型检查、类型推断以及限制

使用局部变量

Lambda 可以没有限制地捕获(也就是在其主体中引用)实例变量和静态变量。但是局部变量必须显式申明为 final, 或者事实上是final

Variable used in lambda expression should be final or effectively final

例如下面的 代码是可以编译的

1
2
int portNumber = 1337;
Runnable r = () -> System.out.println(portNumber);

上面的代码在 Lambda 中引用了局部变量 portNumber,虽然这个变量没有声明为 final ,但是在其声明赋初值之后,并没有试图去改变这个变量的值,所以这个变量事实上是 final 状态。

下面的代码不能通过编译

1
2
3
int portNumber = 1337;
Runnable r = () -> System.out.println(portNumber);
portNumber = 31337;

上面的代码在第三行试图修改 portNumber 的值,所以这个变量并不是事实上 final

springboot2.0深度实践

核心特性

Spring Boot 三大特性

  1. 组件自动装配:Web Mvc、Web Flux、JDBC
  2. 嵌入式WEB容器:Tomcat、Jetty 以及 Undertow
  3. 生产准备特性:指标、健康检查、外部化配置

Spring Boot Bean 源码解读

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class SpringApplicationBootstrap {
public static void main(String[] args) {
// 在 run 方法中可以填入带有 @SpringBootApplication 的注解的类
// SpringApplication.run(ApplicationTest.class, args);
// 引导类的配置
Set<String> sources = new HashSet<>();
sources.add(ApplicationTest.class.getName());
SpringApplication springApplication = new SpringApplication();
springApplication.setSources(sources);
ConfigurableApplicationContext context = springApplication.run(args);
System.out.println("Bean "+context.getBean(ApplicationTest.class));
}
@SpringBootApplication
public static class ApplicationTest{

}
}

Spring 可以通断出当前应用的类型,主要是在 SpringApplication 中的 org.springframework.boot.SpringApplication#setApplicationContextClass 方法中