• 本站域名更换为 qyi.io

RabbitMQ Java Client 并发问题思考(一)

RabbitMQ admin 1个月前 (03-09) 127次浏览 0个评论
文章目录[隐藏]

此篇文章仅作为个人笔记,由于初学难免有理解错误的地方,请大佬指正~

由于我这个部分没有使用SpringBoot,使用的是RabbitMq java Client API 所以在Springboot上的有些功能用不了,只能手动实现,故在思维上走了一个坑。

测试代码:

    public static void main(String[] args) {
        //连接工厂
        ConnectionFactory factory = new ConnectionFactory();
//        factory.useNio();
//        factory.setNioParams(new NioParams().setNbIoThreads(4));
        factory.setHost("xxx.xxx.xxx.xxx");
        factory.setPort(port);
        factory.setUsername("username");
        factory.setPassword("password");
        factory.setVirtualHost("msgpush");
//        f.setConnectionTimeout(5000);
//        f.setHandshakeTimeout(3000);
        ExecutorService service = Executors.newFixedThreadPool(30);
        factory.setSharedExecutor(service);
        // 设置自动恢复
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(2);
        factory.setTopologyRecoveryEnabled(true);// 设置不重新声明交换器,队列等信息。
        System.out.println("检索请求的最大频道数:" + factory.getRequestedChannelMax());
//        f.setRequestedChannelMax(10);
        try {
//            Channel channel = f.newConnection().createChannel();
            //建立连接
            Connection c = factory.newConnection("push Client");
            c.addShutdownListener(new ShutdownListener() {
                @Override
                public void shutdownCompleted(ShutdownSignalException e) {
                    System.out.println( "断线了......");
                }
            });
            //建立信道
            Channel ch = c.createChannel();
            //声明队列,如果该队列已经创建过,则不会重复创建
            //ch.queueDeclare("QueueWX", false, false, false, null);
            System.out.println("等待接收数据");
            // 单条消息的大小限制,一般设为0或不设置,不限制大小
            int prefecthSize = 0;
            // 告诉RabbitMQ不要同时给消费端推送n条消息,一旦有n个消息还没ack,则该consumer将block掉,直到有ack;注意在自动应答下不生效
            int prefecthCount = 5;
            // 表示是否应用于channel上,即是channel级别还是consumer级别
            boolean global = true;
            ch.basicQos(3);
            //消费者取消时的回调对象1
            CancelCallback cancelHandel = new CancelCallback() {
                @Override
                public void handle(String consumerTag) throws IOException {
                    System.out.println(consumerTag + "  链接已断开");
                }
            };
            /*微信*/
            DeliverImpl deliverWx = new DeliverImpl(ch, MQEnum.QUEUE_WX);
            ch.basicConsume(MQEnum.QUEUE_WX.getData(), false, deliverWx, cancelHandel);
            /*企业微信*/
            DeliverImpl deliver = new DeliverImpl(ch, MQEnum.QUEUE_WORKWX);
            //第二个参数为消息回执,消息确认处理完成,为true为自动确认,只要消息发送到消费者即消息处理成功;为false为,手动发送确认回执,服务器才认为这个消息处理成功
            ch.basicConsume(MQEnum.QUEUE_WORKWX.getData(), false, deliver, cancelHandel);
            /*钉钉*/
            DeliverImpl deliverDing = new DeliverImpl(ch, MQEnum.QUEUE_DINGDING);
            ch.basicConsume(MQEnum.QUEUE_DINGDING.getData(), false, deliverDing, cancelHandel);
            /*tg*/
            DeliverImpl deliverTg = new DeliverImpl(ch, MQEnum.QUEUE_TG);
            ch.basicConsume(MQEnum.QUEUE_TG.getData(), false, deliverTg, cancelHandel);

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

 

Deliver实现

public class DeliverImpl implements DeliverCallback {
    private static final Gson gson;
    private Channel ch;
    private MQEnum mqEnum;


    public DeliverImpl(Channel ch, MQEnum mqEnum) {
        this.ch = ch;
        this.mqEnum = mqEnum;
    }

    static {
        gson = new Gson();
    }


    @Override
    public void handle(String s, Delivery message) throws IOException {
//        try {
        String msg = new String(message.getBody(), "UTF-8");
//        System.out.println("收到消息:" + msg);
        log.info("收到消息: {}", msg);
//        MqMessageDispatcher.doDispatch(msg, message, ch);
    }
}

上面的代码是我看了其他文章和官方文档复制过来的,发现一个问题,无论服务端中有多少消息,也会单条的执行,虽然这就是队列的作用,但是这样效率就上不去了,研究了段时间后在官方文档中发现:

Channel thread-safety in Java Client API Guide:

Channel instances are safe for use by multiple threads. Requests into a Channel are serialized, with only one thread being able to run a command on the Channel at a time. Even so, applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads.

 

通道实例可安全用于多个线程。进入Channel的请求被序列化,一次只能有一个线程在Channel上运行命令。即使这样,应用程序应该更喜欢每个线程使用一个Channel,而不是在多个线程之间共享同一个Channel。

也就是说创建的Channel只会在一个一个线程里运行,这就说明以上代码中每次只会执行一次的问题,即使设置的 ch.basicQos(3); 所以可以看出

 /*微信*/
DeliverImpl deliverWx = new DeliverImpl(ch, MQEnum.QUEUE_WX);
ch.basicConsume(MQEnum.QUEUE_WX.getData(), false, deliverWx, cancelHandel);
/*企业微信*/
DeliverImpl deliver = new DeliverImpl(ch, MQEnum.QUEUE_WORKWX);
//第二个参数为消息回执,消息确认处理完成,为true为自动确认,只要消息发送到消费者即消息处理成功;为false为,手动发送确认回执,服务器才认为这个消息处理成功
ch.basicConsume(MQEnum.QUEUE_WORKWX.getData(), false, deliver, cancelHandel);
/*钉钉*/
DeliverImpl deliverDing = new DeliverImpl(ch, MQEnum.QUEUE_DINGDING);
ch.basicConsume(MQEnum.QUEUE_DINGDING.getData(), false, deliverDing, cancelHandel);
/*tg*/
DeliverImpl deliverTg = new DeliverImpl(ch, MQEnum.QUEUE_TG);
ch.basicConsume(MQEnum.QUEUE_TG.getData(), false, deliverTg, cancelHandel);

这里的代码,我还将多个Deliver绑定到一个Channel上,后果就是5个队列共享一个Channel,虽然每次拉取3条消息,但处理消息还是单线程(阻塞),造成消息堆积。

按照官方文档 通道和并发注意事项(线程安全) 这种写法会导致某些操作却不能并发调用,这将导致导线上的帧交错不正确,两次确认。每个线程使用一个Channel,而不是在多个线程之间共享同一Channel。

这里改为

 //建立信道
Channel ch_wx = c.createChannel();
Channel ch_workwx = c.createChannel();
Channel ch_dingding = c.createChannel();
Channel ch_tg = c.createChannel();
Channel ch_email = c.createChannel();

/*微信*/
DeliverImpl deliverWx = new DeliverImpl(ch_wx, MQEnum.QUEUE_WX);
ch_wx.basicConsume(MQEnum.QUEUE_WX.getData(), false, deliverWx, cancelHandel);
/*企业微信*/
DeliverImpl deliver = new DeliverImpl(ch_workwx, MQEnum.QUEUE_WORKWX);
//第二个参数为消息回执,消息确认处理完成,为true为自动确认,只要消息发送到消费者即消息处理成功;为false为,手动发送确认回执,服务器才认为这个消息处理成功
ch_workwx.basicConsume(MQEnum.QUEUE_WORKWX.getData(), false, deliver, cancelHandel);
/*钉钉*/
DeliverImpl deliverDing = new DeliverImpl(ch_dingding, MQEnum.QUEUE_DINGDING);
ch_dingding.basicConsume(MQEnum.QUEUE_DINGDING.getData(), false, deliverDing, cancelHandel);
/*tg*/
DeliverImpl deliverTg = new DeliverImpl(ch_tg, MQEnum.QUEUE_TG);
ch_tg.basicConsume(MQEnum.QUEUE_TG.getData(), false, deliverTg, cancelHandel);

每个Channel 对应一个链接。

 

说到这里 需要理解这几个注意点:

  • Connection: Connection表示到消息代理的真实TCP连接,
  • Channel:是其中的虚拟连接(AMQP连接)。这样,您可以在应用程序内部使用任意数量的(虚拟)连接,而不会使TCP连接使代理过载。
    您可以Channel为所有内容使用一个。但是,如果您有多个线程,建议Channel对每个线程使用不同的线程。

每个Consumer线程都在从使用者线程池分配的自己的线程中运行。如果多个使用者订阅了同一队列,则代理将使用循环机制在它们之间平均分配消息。请参阅教程二:“工作队列”

 

使用线程池处理队列

这样可以将拉取到的消息并行处理,也就是所谓的并发。

// 将Deliver类中这行取消注释
MqMessageDispatcher.doDispatch(msg, message, ch);

任务线程池:

这里仅打印,使用延迟模拟业务耗时

public class MqMessageDispatcher {
    public static ExecutorService msgHandleService = Executors.newFixedThreadPool(30);

    static {
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                msgHandleService.shutdown();
            }
        });
    }

    public static void doDispatch(String s, Delivery message,Channel ch) {
        msgHandleService.execute(new MessageHandleTask(s, message, ch));
    }

    private static class MessageHandleTask implements Runnable {

        String s;
        Delivery message;
        Channel ch;

        public MessageHandleTask(String s, Delivery message, Channel ch) {
            this.s = s;
            this.message = message;
            this.ch = ch;
        }

        @Override
        public void run() {
            try {
                long start = System.currentTimeMillis();
                String tName = Thread.currentThread().getName();
                System.out.println(tName + " [x] Received '" + s + "'");
                try {
                    int randNumber = new Random().nextInt(5000 - 1000 + 1) + 1000;
                    Thread.sleep(randNumber);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                long end = System.currentTimeMillis();
                System.out.println(tName + " cost " + (end - start));

                ch.basicAck(message.getEnvelope().getDeliveryTag(), false);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

 

未完:

参考文章:

RabbitMQ以及通道和连接之间的关系
通道和并发注意事项(线程安全)

RabbitMQ多线程消费消息


版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:RabbitMQ Java Client 并发问题思考(一)
喜欢 (0)
发表我的评论
取消评论
表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址