此篇文章仅作为个人笔记,由于初学难免有理解错误的地方,请大佬指正~
由于我这个部分没有使用SpringBoot,使用的是RabbitMq java Client API 所以在Springboot上的有些功能用不了,只能手动实现,故在思维上走了一个坑。
测试代码:
public static void main(String[] args) {
//连接工厂
ConnectionFactory factory = new ConnectionFactory();
// factory.useNio();
// factory.setNioParams(new NioParams().setNbIoThreads(4));
factory.setHost("xxx.xxx.xxx.xxx");
factory.setPort(port);
factory.setUsername("username");
factory.setPassword("password");
factory.setVirtualHost("msgpush");
// f.setConnectionTimeout(5000);
// f.setHandshakeTimeout(3000);
ExecutorService service = Executors.newFixedThreadPool(30);
factory.setSharedExecutor(service);
// 设置自动恢复
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(2);
factory.setTopologyRecoveryEnabled(true);// 设置不重新声明交换器,队列等信息。
System.out.println("检索请求的最大频道数:" + factory.getRequestedChannelMax());
// f.setRequestedChannelMax(10);
try {
// Channel channel = f.newConnection().createChannel();
//建立连接
Connection c = factory.newConnection("push Client");
c.addShutdownListener(new ShutdownListener() {
@Override
public void shutdownCompleted(ShutdownSignalException e) {
System.out.println( "断线了......");
}
});
//建立信道
Channel ch = c.createChannel();
//声明队列,如果该队列已经创建过,则不会重复创建
//ch.queueDeclare("QueueWX", false, false, false, null);
System.out.println("等待接收数据");
// 单条消息的大小限制,一般设为0或不设置,不限制大小
int prefecthSize = 0;
// 告诉RabbitMQ不要同时给消费端推送n条消息,一旦有n个消息还没ack,则该consumer将block掉,直到有ack;注意在自动应答下不生效
int prefecthCount = 5;
// 表示是否应用于channel上,即是channel级别还是consumer级别
boolean global = true;
ch.basicQos(3);
//消费者取消时的回调对象1
CancelCallback cancelHandel = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println(consumerTag + " 链接已断开");
}
};
/*微信*/
DeliverImpl deliverWx = new DeliverImpl(ch, MQEnum.QUEUE_WX);
ch.basicConsume(MQEnum.QUEUE_WX.getData(), false, deliverWx, cancelHandel);
/*企业微信*/
DeliverImpl deliver = new DeliverImpl(ch, MQEnum.QUEUE_WORKWX);
//第二个参数为消息回执,消息确认处理完成,为true为自动确认,只要消息发送到消费者即消息处理成功;为false为,手动发送确认回执,服务器才认为这个消息处理成功
ch.basicConsume(MQEnum.QUEUE_WORKWX.getData(), false, deliver, cancelHandel);
/*钉钉*/
DeliverImpl deliverDing = new DeliverImpl(ch, MQEnum.QUEUE_DINGDING);
ch.basicConsume(MQEnum.QUEUE_DINGDING.getData(), false, deliverDing, cancelHandel);
/*tg*/
DeliverImpl deliverTg = new DeliverImpl(ch, MQEnum.QUEUE_TG);
ch.basicConsume(MQEnum.QUEUE_TG.getData(), false, deliverTg, cancelHandel);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
Deliver实现
public class DeliverImpl implements DeliverCallback {
private static final Gson gson;
private Channel ch;
private MQEnum mqEnum;
public DeliverImpl(Channel ch, MQEnum mqEnum) {
this.ch = ch;
this.mqEnum = mqEnum;
}
static {
gson = new Gson();
}
@Override
public void handle(String s, Delivery message) throws IOException {
// try {
String msg = new String(message.getBody(), "UTF-8");
// System.out.println("收到消息:" + msg);
log.info("收到消息: {}", msg);
// MqMessageDispatcher.doDispatch(msg, message, ch);
}
}
上面的代码是我看了其他文章和官方文档复制过来的,发现一个问题,无论服务端中有多少消息,也会单条的执行,虽然这就是队列的作用,但是这样效率就上不去了,研究了段时间后在官方文档中发现:
Channel thread-safety in Java Client API Guide:
Channel instances are safe for use by multiple threads. Requests into a Channel are serialized, with only one thread being able to run a command on the Channel at a time. Even so, applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads.
通道实例可安全用于多个线程。进入Channel的请求被序列化,一次只能有一个线程在Channel上运行命令。即使这样,应用程序应该更喜欢每个线程使用一个Channel,而不是在多个线程之间共享同一个Channel。
也就是说创建的Channel只会在一个一个线程里运行,这就说明以上代码中每次只会执行一次的问题,即使设置的 ch.basicQos(3); 所以可以看出
/*微信*/ DeliverImpl deliverWx = new DeliverImpl(ch, MQEnum.QUEUE_WX); ch.basicConsume(MQEnum.QUEUE_WX.getData(), false, deliverWx, cancelHandel); /*企业微信*/ DeliverImpl deliver = new DeliverImpl(ch, MQEnum.QUEUE_WORKWX); //第二个参数为消息回执,消息确认处理完成,为true为自动确认,只要消息发送到消费者即消息处理成功;为false为,手动发送确认回执,服务器才认为这个消息处理成功 ch.basicConsume(MQEnum.QUEUE_WORKWX.getData(), false, deliver, cancelHandel); /*钉钉*/ DeliverImpl deliverDing = new DeliverImpl(ch, MQEnum.QUEUE_DINGDING); ch.basicConsume(MQEnum.QUEUE_DINGDING.getData(), false, deliverDing, cancelHandel); /*tg*/ DeliverImpl deliverTg = new DeliverImpl(ch, MQEnum.QUEUE_TG); ch.basicConsume(MQEnum.QUEUE_TG.getData(), false, deliverTg, cancelHandel);
这里的代码,我还将多个Deliver绑定到一个Channel上,后果就是5个队列共享一个Channel,虽然每次拉取3条消息,但处理消息还是单线程(阻塞),造成消息堆积。
按照官方文档 通道和并发注意事项(线程安全) 这种写法会导致某些操作却不能并发调用,这将导致导线上的帧交错不正确,两次确认。每个线程使用一个Channel,而不是在多个线程之间共享同一Channel。
这里改为
//建立信道 Channel ch_wx = c.createChannel(); Channel ch_workwx = c.createChannel(); Channel ch_dingding = c.createChannel(); Channel ch_tg = c.createChannel(); Channel ch_email = c.createChannel(); /*微信*/ DeliverImpl deliverWx = new DeliverImpl(ch_wx, MQEnum.QUEUE_WX); ch_wx.basicConsume(MQEnum.QUEUE_WX.getData(), false, deliverWx, cancelHandel); /*企业微信*/ DeliverImpl deliver = new DeliverImpl(ch_workwx, MQEnum.QUEUE_WORKWX); //第二个参数为消息回执,消息确认处理完成,为true为自动确认,只要消息发送到消费者即消息处理成功;为false为,手动发送确认回执,服务器才认为这个消息处理成功 ch_workwx.basicConsume(MQEnum.QUEUE_WORKWX.getData(), false, deliver, cancelHandel); /*钉钉*/ DeliverImpl deliverDing = new DeliverImpl(ch_dingding, MQEnum.QUEUE_DINGDING); ch_dingding.basicConsume(MQEnum.QUEUE_DINGDING.getData(), false, deliverDing, cancelHandel); /*tg*/ DeliverImpl deliverTg = new DeliverImpl(ch_tg, MQEnum.QUEUE_TG); ch_tg.basicConsume(MQEnum.QUEUE_TG.getData(), false, deliverTg, cancelHandel);
每个Channel 对应一个链接。
说到这里 需要理解这几个注意点:
- Connection: Connection表示到消息代理的真实TCP连接,
- Channel:是其中的虚拟连接(AMQP连接)。这样,您可以在应用程序内部使用任意数量的(虚拟)连接,而不会使TCP连接使代理过载。
您可以Channel为所有内容使用一个。但是,如果您有多个线程,建议Channel对每个线程使用不同的线程。
每个Consumer线程都在从使用者线程池分配的自己的线程中运行。如果多个使用者订阅了同一队列,则代理将使用循环机制在它们之间平均分配消息。请参阅教程二:“工作队列”。
使用线程池处理队列
这样可以将拉取到的消息并行处理,也就是所谓的并发。
// 将Deliver类中这行取消注释 MqMessageDispatcher.doDispatch(msg, message, ch);
任务线程池:
这里仅打印,使用延迟模拟业务耗时
public class MqMessageDispatcher {
public static ExecutorService msgHandleService = Executors.newFixedThreadPool(30);
static {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
msgHandleService.shutdown();
}
});
}
public static void doDispatch(String s, Delivery message,Channel ch) {
msgHandleService.execute(new MessageHandleTask(s, message, ch));
}
private static class MessageHandleTask implements Runnable {
String s;
Delivery message;
Channel ch;
public MessageHandleTask(String s, Delivery message, Channel ch) {
this.s = s;
this.message = message;
this.ch = ch;
}
@Override
public void run() {
try {
long start = System.currentTimeMillis();
String tName = Thread.currentThread().getName();
System.out.println(tName + " [x] Received '" + s + "'");
try {
int randNumber = new Random().nextInt(5000 - 1000 + 1) + 1000;
Thread.sleep(randNumber);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
long end = System.currentTimeMillis();
System.out.println(tName + " cost " + (end - start));
ch.basicAck(message.getEnvelope().getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
未完:
参考文章: