RabbitMQ queue

佇列的功能

當系統已經繁忙逐漸無法及時回應需求,或是需求本身需要一段時間處理,以及複雜的HTTP請求這些狀況都很適合搭配訊息佇列來做處理。
這邊用sleep來模擬處理很久的訊息。

Publisher (範例命名為Sender)

基本上與範例一的內容一樣,就不多著墨。

Consumer (範例命名為Receiver)

為了模擬處理很久的訊息,在收到訊息後將內容逐一字元顯示。

1
2
3
4
5
foreach (var i in message.ToCharArray())
{
Thread.Sleep(150);
Console.Write("{0}", i);
}

其他內容也與範例一是相同的,如果有不理解的可以回頭看第一篇文章。

為了更幫助理解,建議可以執行兩個 receiver.exe,再分別多次執行 sender.exe 這樣看以看多個訊息進入佇列後,每個 receiver 在繁忙中如何分配處理。

訊息處理確認機制(Acknowledgement)

RabbitMQ提供每個訊息的回覆機制,啟用這個機制的話,會要求每個Consumer處理完訊息後要告知佇列,若沒有的話則佇列會視為訊息未正確被處理,並且立即由下一個Consumer來處理,直到正確收到回覆為止。

如何啟用?

其實在範例中已經是啟用自動回覆機制,若要關閉自動回覆,則在這個地方將 autoack 參數設定為 false

1
2
3
channel.BasicConsume(queue: "hello",
autoAck: false ,
consumer: consumer);

這邊一樣可以試試看開啟兩個 consumer 程式,然後僅執行一次 sender.exe。當第一個Consumer收到訊息且處理完畢後,將它關閉,接著可以看到第二個Consumer馬上收到相同訊息且開始處理。這就是設定了不自動回覆,導致佇列認為Consumer沒有正確處理訊息。
所以這邊要自己撰寫回覆佇列的程式。

1
2
Console.WriteLine("Msg handle finished, send acknowledgement now");
channel.BasicAck(ea.DeliveryTag, false);

如此一來同樣訊息就不會一直被視為未正確處理。

程式範例檔

完整的範例檔放在這 範例,有興趣的可以自行下載。

  • 作者: MingYi Chou
  • 版權聲明: 轉載不用問,但請註明出處!本網誌均採用 BY-NC-SA 許可協議。