此篇文章仅作为个人笔记,由于初学难免有理解错误的地方,请大佬指正~
由于我这个部分没有使用SpringBoot,使用的是RabbitMq java Client API 所以在Springboot上的有些功能用不了,只能手动实现,故在思维上走了一个坑。
测试代码:
public static void main(String[] args) { //连接工厂 ConnectionFactory factory = new ConnectionFactory(); // factory.useNio(); // factory.setNioParams(new NioParams().setNbIoThreads(4)); factory.setHost("xxx.xxx.xxx.xxx"); factory.setPort(port); factory.setUsername("username"); factory.setPassword("password"); factory.setVirtualHost("msgpush"); // f.setConnectionTimeout(5000); // f.setHandshakeTimeout(3000); ExecutorService service = Executors.newFixedThreadPool(30); factory.setSharedExecutor(service); // 设置自动恢复 factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(2); factory.setTopologyRecoveryEnabled(true);// 设置不重新声明交换器,队列等信息。 System.out.println("检索请求的最大频道数:" + factory.getRequestedChannelMax()); // f.setRequestedChannelMax(10); try { // Channel channel = f.newConnection().createChannel(); //建立连接 Connection c = factory.newConnection("push Client"); c.addShutdownListener(new ShutdownListener() { @Override public void shutdownCompleted(ShutdownSignalException e) { System.out.println( "断线了......"); } }); //建立信道 Channel ch = c.createChannel(); //声明队列,如果该队列已经创建过,则不会重复创建 //ch.queueDeclare("QueueWX", false, false, false, null); System.out.println("等待接收数据"); // 单条消息的大小限制,一般设为0或不设置,不限制大小 int prefecthSize = 0; // 告诉RabbitMQ不要同时给消费端推送n条消息,一旦有n个消息还没ack,则该consumer将block掉,直到有ack;注意在自动应答下不生效 int prefecthCount = 5; // 表示是否应用于channel上,即是channel级别还是consumer级别 boolean global = true; ch.basicQos(3); //消费者取消时的回调对象1 CancelCallback cancelHandel = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { System.out.println(consumerTag + " 链接已断开"); } }; /*微信*/ DeliverImpl deliverWx = new DeliverImpl(ch, MQEnum.QUEUE_WX); ch.basicConsume(MQEnum.QUEUE_WX.getData(), false, deliverWx, cancelHandel); /*企业微信*/ DeliverImpl deliver = new DeliverImpl(ch, MQEnum.QUEUE_WORKWX); //第二个参数为消息回执,消息确认处理完成,为true为自动确认,只要消息发送到消费者即消息处理成功;为false为,手动发送确认回执,服务器才认为这个消息处理成功 ch.basicConsume(MQEnum.QUEUE_WORKWX.getData(), false, deliver, cancelHandel); /*钉钉*/ DeliverImpl deliverDing = new DeliverImpl(ch, MQEnum.QUEUE_DINGDING); ch.basicConsume(MQEnum.QUEUE_DINGDING.getData(), false, deliverDing, cancelHandel); /*tg*/ DeliverImpl deliverTg = new DeliverImpl(ch, MQEnum.QUEUE_TG); ch.basicConsume(MQEnum.QUEUE_TG.getData(), false, deliverTg, cancelHandel); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } }
Deliver实现
public class DeliverImpl implements DeliverCallback { private static final Gson gson; private Channel ch; private MQEnum mqEnum; public DeliverImpl(Channel ch, MQEnum mqEnum) { this.ch = ch; this.mqEnum = mqEnum; } static { gson = new Gson(); } @Override public void handle(String s, Delivery message) throws IOException { // try { String msg = new String(message.getBody(), "UTF-8"); // System.out.println("收到消息:" + msg); log.info("收到消息: {}", msg); // MqMessageDispatcher.doDispatch(msg, message, ch); } }
上面的代码是我看了其他文章和官方文档复制过来的,发现一个问题,无论服务端中有多少消息,也会单条的执行,虽然这就是队列的作用,但是这样效率就上不去了,研究了段时间后在官方文档中发现:
Channel thread-safety in Java Client API Guide:
Channel instances are safe for use by multiple threads. Requests into a Channel are serialized, with only one thread being able to run a command on the Channel at a time. Even so, applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads.
通道实例可安全用于多个线程。进入Channel的请求被序列化,一次只能有一个线程在Channel上运行命令。即使这样,应用程序应该更喜欢每个线程使用一个Channel,而不是在多个线程之间共享同一个Channel。
也就是说创建的Channel只会在一个一个线程里运行,这就说明以上代码中每次只会执行一次的问题,即使设置的 ch.basicQos(3); 所以可以看出
/*微信*/ DeliverImpl deliverWx = new DeliverImpl(ch, MQEnum.QUEUE_WX); ch.basicConsume(MQEnum.QUEUE_WX.getData(), false, deliverWx, cancelHandel); /*企业微信*/ DeliverImpl deliver = new DeliverImpl(ch, MQEnum.QUEUE_WORKWX); //第二个参数为消息回执,消息确认处理完成,为true为自动确认,只要消息发送到消费者即消息处理成功;为false为,手动发送确认回执,服务器才认为这个消息处理成功 ch.basicConsume(MQEnum.QUEUE_WORKWX.getData(), false, deliver, cancelHandel); /*钉钉*/ DeliverImpl deliverDing = new DeliverImpl(ch, MQEnum.QUEUE_DINGDING); ch.basicConsume(MQEnum.QUEUE_DINGDING.getData(), false, deliverDing, cancelHandel); /*tg*/ DeliverImpl deliverTg = new DeliverImpl(ch, MQEnum.QUEUE_TG); ch.basicConsume(MQEnum.QUEUE_TG.getData(), false, deliverTg, cancelHandel);
这里的代码,我还将多个Deliver绑定到一个Channel上,后果就是5个队列共享一个Channel,虽然每次拉取3条消息,但处理消息还是单线程(阻塞),造成消息堆积。
按照官方文档 通道和并发注意事项(线程安全) 这种写法会导致某些操作却不能并发调用,这将导致导线上的帧交错不正确,两次确认。每个线程使用一个Channel,而不是在多个线程之间共享同一Channel。
这里改为
//建立信道 Channel ch_wx = c.createChannel(); Channel ch_workwx = c.createChannel(); Channel ch_dingding = c.createChannel(); Channel ch_tg = c.createChannel(); Channel ch_email = c.createChannel(); /*微信*/ DeliverImpl deliverWx = new DeliverImpl(ch_wx, MQEnum.QUEUE_WX); ch_wx.basicConsume(MQEnum.QUEUE_WX.getData(), false, deliverWx, cancelHandel); /*企业微信*/ DeliverImpl deliver = new DeliverImpl(ch_workwx, MQEnum.QUEUE_WORKWX); //第二个参数为消息回执,消息确认处理完成,为true为自动确认,只要消息发送到消费者即消息处理成功;为false为,手动发送确认回执,服务器才认为这个消息处理成功 ch_workwx.basicConsume(MQEnum.QUEUE_WORKWX.getData(), false, deliver, cancelHandel); /*钉钉*/ DeliverImpl deliverDing = new DeliverImpl(ch_dingding, MQEnum.QUEUE_DINGDING); ch_dingding.basicConsume(MQEnum.QUEUE_DINGDING.getData(), false, deliverDing, cancelHandel); /*tg*/ DeliverImpl deliverTg = new DeliverImpl(ch_tg, MQEnum.QUEUE_TG); ch_tg.basicConsume(MQEnum.QUEUE_TG.getData(), false, deliverTg, cancelHandel);
每个Channel 对应一个链接。
说到这里 需要理解这几个注意点:
- Connection: Connection表示到消息代理的真实TCP连接,
- Channel:是其中的虚拟连接(AMQP连接)。这样,您可以在应用程序内部使用任意数量的(虚拟)连接,而不会使TCP连接使代理过载。
您可以Channel为所有内容使用一个。但是,如果您有多个线程,建议Channel对每个线程使用不同的线程。
每个Consumer
线程都在从使用者线程池分配的自己的线程中运行。如果多个使用者订阅了同一队列,则代理将使用循环机制在它们之间平均分配消息。请参阅教程二:“工作队列”。
使用线程池处理队列
这样可以将拉取到的消息并行处理,也就是所谓的并发。
// 将Deliver类中这行取消注释 MqMessageDispatcher.doDispatch(msg, message, ch);
任务线程池:
这里仅打印,使用延迟模拟业务耗时
public class MqMessageDispatcher { public static ExecutorService msgHandleService = Executors.newFixedThreadPool(30); static { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { msgHandleService.shutdown(); } }); } public static void doDispatch(String s, Delivery message,Channel ch) { msgHandleService.execute(new MessageHandleTask(s, message, ch)); } private static class MessageHandleTask implements Runnable { String s; Delivery message; Channel ch; public MessageHandleTask(String s, Delivery message, Channel ch) { this.s = s; this.message = message; this.ch = ch; } @Override public void run() { try { long start = System.currentTimeMillis(); String tName = Thread.currentThread().getName(); System.out.println(tName + " [x] Received '" + s + "'"); try { int randNumber = new Random().nextInt(5000 - 1000 + 1) + 1000; Thread.sleep(randNumber); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } long end = System.currentTimeMillis(); System.out.println(tName + " cost " + (end - start)); ch.basicAck(message.getEnvelope().getDeliveryTag(), false); } catch (IOException e) { e.printStackTrace(); } } } }
未完:
参考文章: