侧边栏壁纸
博主头像
晓果冻博主等级

一个热爱生活的95后精神小伙

  • 累计撰写 131 篇文章
  • 累计创建 15 个标签
  • 累计收到 67 条评论

目 录CONTENT

文章目录

RabbitMQ学习

晓果冻
2022-05-21 / 0 评论 / 3 点赞 / 442 阅读 / 1,184 字
温馨提示:
本文最后更新于 2022-05-22,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

RabbitMQ学习

传统http请求的缺点
Http请求基于请求与响应的模型,在高并发的情况下,客户端发送大量的请求达到
服务器端有可能会导致我们服务器端处理请求堆积。
Tomcat服务器处理每个请求都有自己独立的线程,如果超过最大线程数会将该请求缓存到队列中,如果请求堆积过多的情况下,有可能会导致tomcat服务器崩溃的问题。
所以一般都会在nginx入口实现限流,整合服务保护框架。
http请求处理业务逻辑如果比较耗时的情况下,容易造成客户端一直等待,阻塞等待
过程中会导致客户端超时发生重试策略,有可能会引发幂等性问题。

接口是为http协议的情况下,最好不要处理比较耗时的业务逻辑,耗时的业务逻辑应该单独交给多线程或者是mq处理。

应用场景
  1. 异步发送消息
  2. 处理一些比较耗时的操作
作用
支撑高并发、异步解耦、流量削峰、降低耦合度

多线程也能实现,但是消耗cpu资源,没有实现解耦。

中间件名词
  • Producer 生产者:投递消息到MQ服务器端;

  • Consumer 消费者:从MQ服务器端获取消息处理业务逻辑;

  • Broker MQ服务器端

  • Topic 主题:分类业务逻辑发送短信主题、发送优惠券主题

  • Queue 存放消息模型 队列 先进先出 后进后出原则 数组/链表

  • Message 生产者投递消息报文:json

主流MQ区别对比

image-20220521095630540

RabbitMQ如何保证消息不丢失
  • 针对生产者

    确保生产者投递消息到MQ服务器端成功。
    Ack 消息确认机制
    同步或者异步的形式
    方式1:Confirms
    方式2:事务消息
    
  • 针对消费者

    在rabbitmq情况下:
       必须要将消息消费成功之后,才会将该消息从mq服务器端中移除。
    在kafka中的情况下:
       不管是消费成功还是消费失败,该消息都不会立即从mq服务器端移除。
    
  • 针对MQ服务器端

    在默认的情况下 都会对队列中的消息实现持久化
    持久化硬盘。
    

    image-20220521101126028

    image-20220521101523897

相关核心代码
  • 生产者

    public class Producer {
        private static final String QUEUE_NAME = "mayikt-queue";
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            //1.创建一个新连接
            Connection connection = RabbitMQConnection.getConnection();
            //2.设置channel
            Channel channel = connection.createChannel();
            //3.发送消息
            String msg = "每特教育6666";
    channel.confirmSelect();
    
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            boolean result = channel.waitForConfirms();
            if (result) {
                System.out.println("消息投递成功");
            } else {
                System.out.println("消息投递失败");
            }
            channel.close();
            connection.close();
        }
    }
    
  • 消费者

    public class Consumer {
        private static final String QUEUE_NAME = "mayikt-queue";
    
        public static void main(String[] args) throws IOException, TimeoutException, IOException, TimeoutException {
            // 1.创建连接
            Connection connection = RabbitMQConnection.getConnection();
            // 2.设置通道
            Channel channel = connection.createChannel();
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "UTF-8");
                    System.out.println("消费者获取消息:" + msg);
                    // 消费者完成 消费该消息
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            // 3.监听队列
            channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
    
        }
    }
    
交换机类型
  • Direct exchange(直连交换机):根据队列绑定的路由键转发到具体的队列中存放消息

  • Fanout exchange(扇型交换机):发布订阅

  • Topic exchange(主题交换机):根据队列绑定的路由建模糊转发到具体的队列中存放

  • Headers exchange(头交换机)

----队列 存放消息
----交换机 路由消息存放在那个队列中 类似于nginx
---路由key 分发规则
RabbitMQ死信队列
产生背景
RabbitMQ死信队列俗称,备胎队列;消息中间件因为某种原因拒收该消息后,可以转移到死信队列中存放,死信队列也可以有交换机和路由key等。
产生的原因
  1. 消息投递到MQ中存放 消息已经过期 消费者没有及时的获取到我们消息,消息如果存放到mq服务器中过期之后,会转移到备胎死信队列存放。
  2. 队列达到最大的长度 (队列容器已经满了)
  3. 消费者消费多次消息失败,就会转移存放到死信队列中

以上内容来自蚂蚁课堂

原文档地址:http://file.chenmx.net/s/YmUV

3

评论区