一、问题描述
在编写mq队列消费的时候出现这样一个问题,多个消费者在对同一个队列消费的时候出现卡进程的情况,当一个消费者运行时候,另外一个消费者无法运行只能等待,当关闭一个消费者,另外的消费者立马能运行。
消费函数编写源码:
pub async fn consume_mq2subfinder() -> Result<(), Box<dyn Error>> {
let mqaddress = mqaddress()?;
let conn = Connection::connect(&mqaddress, ConnectionProperties::default()).await?;
let channel = conn.create_channel().await?;
let consumer_tag = format!("monitor@q-1_consumer_{}", Uuid::new_v4());
let queue_name = "monitor@q-1";
let consumer = channel.basic_consume(
queue_name,
&consumer_tag, // 给消费者一个唯一的标识
BasicConsumeOptions::default(),
FieldTable::default()
).await?;
// 使用 for_each_concurrent 并发处理消息
consumer.for_each_concurrent(5, |delivery| { // None 表示不限制并发数
let tag = consumer_tag.clone();
let channel = channel.clone();
async move {
match delivery {
Ok(delivery) => {
if let Ok(message) = std::str::from_utf8(&delivery.data) {
if let Ok(company) = serde_json::from_str::<Company>(message) {
info!("consume_mq2subfinder fn 日志 {:?} => {:?}",tag,company);
// 假设 subfinder_projectdiscovery 是异步的
if let Err(e) = subfinder_projectdiscovery(company).await {
info!("处理消息时发生错误: {:?}", e);
// 根据需要处理错误,例如重试或记录错误
}
}
}
// 确认消息
let _ = channel.basic_ack(delivery.delivery_tag, Default::default()).await;
},
Err(error) => info!("消费消息时发生错误: {:?}", error),
}
}
}).await;
Ok(())
}
二、排查问题
经过一系列的排查,排除了我的函数并发问题,函数运行出现错误问题,根据调试发现问题集中在consumer.for_each_concurrent(),因为函数连接mq服务器是正常的。
查阅官方文档可以得知,消息队列的消费者竞争
当多个消费者连接到同一个消息队列并监听同一个队列时,消息队列服务通常会按照某种策略(如轮询)将消息分发给这些消费者。这意味着,如果一个消息被一个消费者接收处理,那么这个消息就不会被其他消费者接收。这种设计是为了保证每条消息只被处理一次,避免重复处理。
在你的场景中,两个进程都尝试消费同一个队列(monitor@q-1)中的消息。当一个进程成功连接到队列并开始消费消息时,它会接收到队列中的消息并开始处理。这时,另一个进程虽然也连接到了同一个队列,但由于消息已经被第一个进程接收,它就没有消息可以处理,看起来就像是没有运行。
当停止了第一个进程,它与消息队列的连接会被关闭,消息队列服务会将未处理的消息以及新到达的消息分发给其他活跃的消费者,即你的第二个进程。这就是为什么当停止一个进程后,另一个进程会立即开始运行的原因。
三、解决方案
使用多个队列:为不同的任务或进程创建不同的队列。(这里不适用于我的程序场景)
调整消息队列的配置:某些消息队列服务允许更细粒度的控制消息的分发策略,你可以查看你所使用的消息队列服务的文档,看是否有相关配置可以调整。
对于我的函数来说,最佳的解决方案是通过调整“预取计数”(prefetch count)来控制,预取计数决定了RabbitMQ会向每个消费者发送多少条消息,直到消费者发送回确认(acknowledgment)。
源码对消费者进行配置,增加预取计数
channel.basic_qos(5, BasicQosOptions::default()).await?;
完整源码
use lapin::{BasicProperties, Connection, ConnectionProperties};
use lapin::options::{QueueDeclareOptions,BasicPublishOptions,BasicConsumeOptions,BasicQosOptions};
use lapin::types::FieldTable;
pub async fn consume_mq2subfinder() -> Result<(), Box<dyn Error>> {
let mqaddress = mqaddress()?;
let conn = Connection::connect(&mqaddress, ConnectionProperties::default()).await?;
let channel = conn.create_channel().await?;
let consumer_tag = format!("monitor@q-1_consumer_{}", Uuid::new_v4());
let queue_name = "monitor@q-1";
channel.basic_qos(5, BasicQosOptions::default()).await?;
let consumer = channel.basic_consume(
queue_name,
&consumer_tag, // 给消费者一个唯一的标识
BasicConsumeOptions::default(),
FieldTable::default()
).await?;
// 使用 for_each_concurrent 并发处理消息 https://www.exp-9.com
consumer.for_each_concurrent(5, |delivery| { // None 表示不限制并发数
let tag = consumer_tag.clone();
let channel = channel.clone();
async move {
match delivery {
Ok(delivery) => {
if let Ok(message) = std::str::from_utf8(&delivery.data) {
if let Ok(company) = serde_json::from_str::<Company>(message) {
info!("consume_mq2subfinder fn 日志 {:?} => {:?}",tag,company);
// 假设 subfinder_projectdiscovery 是异步的
if let Err(e) = subfinder_projectdiscovery(company).await {
info!("处理消息时发生错误: {:?}", e);
// 根据需要处理错误,例如重试或记录错误 https://www.exp-9.com
}
}
}
// 确认消息
let _ = channel.basic_ack(delivery.delivery_tag, Default::default()).await;
},
Err(error) => info!("消费消息时发生错误: {:?}", error),
}
}
}).await;
Ok(())
}