優(yōu)先發(fā)送到服務,以便具有較高優(yōu)先級的請求被接收和高于一個較低優(yōu)先級的更快速地處理請求。這種模式是在應用程序是有用的,它提供不同的服務級別保證或者針對獨立客戶。
應用程序可以委托給其他服務的具體任務;例如,為了執(zhí)行后臺處理或與其他應用程序或服務的整合。在云中,消息隊列通常用于將任務委派給后臺處理。在許多情況下,請求由服務接收的順序是不重要的。然而,在某些情況下,可能需要優(yōu)先考慮的具體要求。這些要求必須早于較低優(yōu)先級的其他可能先前已發(fā)送由應用程序進行處理。
隊列通常是先入先出(FIFO)結構,而消費者通常會收到他們發(fā)布到隊列中的順序相同的消息。然而,一些消息隊列支持優(yōu)先級的消息傳遞;應用程序發(fā)布一條消息可以分配優(yōu)先級的消息,并在隊列中的消息會自動重新排序,使得具有較高優(yōu)先級的消息將這些優(yōu)先級較低的前被接收。圖1示出了一個隊列,它提供優(yōu)先權的消息。
圖1 - 使用支持消息優(yōu)先級排隊機制
注意: 大多數消息隊列的實現(xiàn)支持多個消費者(以下的競爭消費者模式)和消費過程的數量可以按比例增加或減小的需求支配。
在不支持基于優(yōu)先級的消息隊列系統(tǒng)中,一種替代的解決方案是為每一個優(yōu)先級的獨立隊列。該應用程序負責將郵件投遞到適當的隊列。每個隊列可以有一個單獨的消費者池。高優(yōu)先級隊列可以有更快的硬件比低優(yōu)先級隊列中運行的消費者一個更大的泳池。圖2示出了這種方法。
圖2 - 使用不同的消息隊列為每個優(yōu)先級
這種策略的變化是有消費者認為檢查對高優(yōu)先級隊列中的消息,然后再才開始從低優(yōu)先級隊列中讀取消息,如果沒有更高優(yōu)先級的消息都在等待的一個池。還有,使用消費過程的一個池的溶液之間的一些語義差異(或者使用支持不同的優(yōu)先級或多個隊列,每個處理一個單一的優(yōu)先級消息的消息的單個隊列),以及使用多個隊列用溶液為每個隊列一個單獨的游泳池。
在單池的做法,高優(yōu)先級的消息總是會收到以前低優(yōu)先級的消息處理。在理論中,具有非常低的優(yōu)先級的消息可以被不斷地取代,并且可能永遠不會被處理。在多池的方法,較低優(yōu)先級的報文將總是被處理,只是不一樣迅速的那些更高的優(yōu)先級的(取決于池和它們具有可用資源的相對大小)。
使用優(yōu)先級排隊機制可提供以下優(yōu)點:
在決定如何實現(xiàn)這個模式時,請考慮以下幾點:
這種模式非常適合場景:
微軟 Azure 不提供經過整理的本地支持郵件自動優(yōu)先級排隊機制。然而,它確實提供了 Azure 的服務總線主題和訂閱,支持排隊機制,提供郵件過濾,具有多種靈活的功能,使其非常適合用在幾乎所有的優(yōu)先級隊列的實現(xiàn)在一起。
一個 Azure 的解決方案,可以實現(xiàn)服務總線話題,其中一個應用程序可以發(fā)布消息,以同樣的方式作為一個隊列。消息可以包含在應用程序定義的自定義屬性的形式的元數據。服務總線訂閱可以與主題相關聯(lián),并且這些訂閱可以篩選根據它們的屬性信息。當一個應用程序將消息發(fā)送到一個主題,該消息被定向到從那里它可以被消費者閱讀相應的訂閱。消費者的過程可以檢索使用相同的語義消息隊列(訂閱是一個邏輯隊列)從一個訂閱消息。
圖 3 示出了使用的 Azure 服務總線主題和訂閱的解決方案
圖3 - 實現(xiàn)與 Azure 的服務總線主題和訂閱優(yōu)先級隊列
在圖3中的應用程序創(chuàng)建多個消息和每個消息與價值分配被稱為優(yōu)先級的自定義屬性,無論是高還是低。該應用程序的帖子,這些消息的一個話題。這個主題有兩個相關的訂閱,這兩個濾波器的消息通過檢查優(yōu)先級屬性。一位接受認購,其中優(yōu)先級屬性設置為高的消息,而其他接受其中優(yōu)先級屬性設置為低的消息。消費者池讀取每個訂閱的消息。高優(yōu)先認購有較大的游泳池,而這些消費者可能會更強大(且昂貴)的計算機上運行有提供比消費者在低優(yōu)先級池的更多資源。
請注意,沒有什么特別的高,低優(yōu)先級消息在這個例子中指定。這些僅僅是指定為每個消息中的屬性的標簽,并用于引導消息發(fā)送到一個特定的訂閱。如果附加的優(yōu)先級是必需的,它是比較容易地創(chuàng)建進一步的訂閱和消費者進程池來處理這些優(yōu)先級。
在可用于此引導代碼時 Queue 解決方案包含這種方法的一個實現(xiàn)。該解決方案包含一個名為 PriorityQueue.High 和 PriorityQueue.Low 兩個工作角色的項目。這兩個輔助角色繼承的類被稱為 PriorityWorkerRole 它包含用于連接到一個指定的預訂中 OnStart 方法的功能。
該 PriorityQueue.High 和 PriorityQueue.Low 輔助角色連接到不同的預訂,他們的配置設置來定義。管理員可以配置每個角色的不同數量要運行;通常會有比 PriorityQueue.Low 工作者角色的 PriorityQueue.High 輔助角色更多的實例。
在 PriorityWorkerRole 類的 Run 方法安排虛擬 ProcessMessage 的方法(在 PriorityWorkerRole 類定義)的隊列中接收到的每個消息被執(zhí)行。下面的代碼顯示了運行和 ProcessMessage 的方法。在類的 QueueManager,在 PriorityQueue.Shared 項目定義,提供了輔助方法使用的 Azure 服務總線隊列。
public class PriorityWorkerRole : RoleEntryPoint
{
private QueueManager queueManager;
...
?
public override void Run()
{
// Start listening for messages on the subscription.
var subscriptionName = CloudConfigurationManager.GetSetting("SubscriptionName");
this.queueManager.ReceiveMessages(subscriptionName, this.ProcessMessage);
...;
}
...
?
protected virtual async Task ProcessMessage(BrokeredMessage message)
{
// Simulating processing.
await Task.Delay(TimeSpan.FromSeconds(2));
}
}
該 PriorityQueue.High 和 PriorityQueue.Low 輔助角色既覆蓋 ProcessMessage 的方法的默認功能。下面的代碼顯示了 ProcessMessage 的方法為 PriorityQueue.High 輔助角色。
Copy
?
?
protected override async Task ProcessMessage(BrokeredMessage message)
{
// Simulate message processing for High priority messages.
await base.ProcessMessage(message);
Trace.TraceInformation("High priority message processed by " +
RoleEnvironment.CurrentRoleInstance.Id + " MessageId: " + message.MessageId);
}
當一個應用程序將消息發(fā)布到與所使用的 PriorityQueue.High 和 PriorityQueue.Low 輔助角色的訂閱相關聯(lián)的主題,它指定了優(yōu)先使用優(yōu)先級的自定義屬性,如在下面的代碼示例。此代碼(這是在 PriorityQueue.Sender項目 WorkerRole 類實現(xiàn)),使用的 QueueManager 類的 SendBatchAsync 輔助方法發(fā)帖分批的話題。
// Send a low priority batch.
var lowMessages = new List<BrokeredMessage>();
?
for (int i = 0; i < 10; i++)
{
var message = new BrokeredMessage() { MessageId = Guid.NewGuid().ToString() };
message.Properties["Priority"] = Priority.Low;
lowMessages.Add(message);
}
?
this.queueManager.SendBatchAsync(lowMessages).Wait();
...
?
// Send a high priority batch.
var highMessages = new List<BrokeredMessage>();
?
for (int i = 0; i < 10; i++)
{
var message = new BrokeredMessage() { MessageId = Guid.NewGuid().ToString() };
message.Properties["Priority"] = Priority.High;
highMessages.Add(message);
}
?
this.queueManager.SendBatchAsync(highMessages).Wait();
更多建議: