(十五)—— 管道和過濾器模式

2018-02-24 15:44 更新

云計(jì)算設(shè)計(jì)模式(十五)——管道和過濾器模式

分解,執(zhí)行復(fù)雜處理成一系列可重復(fù)使用分立元件的一個(gè)任務(wù)。這種模式可以允許執(zhí)行的處理進(jìn)行部署和獨(dú)立縮放任務(wù)元素提高性能,可擴(kuò)展性和可重用性。

背景和問題

一個(gè)應(yīng)用程序可能需要執(zhí)行各種關(guān)于它處理的信息不同復(fù)雜的任務(wù)。一個(gè)簡單,但不靈活的方式來實(shí)施這個(gè)應(yīng)用程序可以執(zhí)行此處理為單一模塊。然而,這種方法有可能減少用于重構(gòu)代碼,對(duì)其進(jìn)行優(yōu)化,或者重新使用它,如果是在應(yīng)用程序中其他地方所需要的相同的處理的部件的機(jī)會(huì)。

圖1通過使用單片式的方式示出了與處理數(shù)據(jù)的問題。一個(gè)應(yīng)用程序接收并處理來自兩個(gè)來源的數(shù)據(jù)進(jìn)行處理。從每個(gè)源數(shù)據(jù)是由執(zhí)行一系列任務(wù)來轉(zhuǎn)換該數(shù)據(jù),并傳遞結(jié)果給應(yīng)用程序的業(yè)務(wù)邏輯之前的獨(dú)立模塊進(jìn)行處理。

圖1 - 使用單一模塊實(shí)現(xiàn)的解決方案

部分的單片模塊執(zhí)行的任務(wù)在功能上是非常相似的,但在模塊已被分開設(shè)計(jì)的。實(shí)現(xiàn)該任務(wù)的代碼被緊密模塊內(nèi)耦合,并且此代碼已開發(fā)具有很少或沒有給定重新使用或可伸縮性的思想。

然而,由每個(gè)模塊或每個(gè)任務(wù)的部署要求執(zhí)行的處理任務(wù),可能會(huì)改變,因?yàn)闃I(yè)務(wù)需求進(jìn)行修改。有些任務(wù)可能是計(jì)算密集型的,并可能受益于強(qiáng)大的硬件上運(yùn)行,而其他人可能并不需要如此昂貴的資源。此外,額外的處理可能需要在將來,或順序,其中由所述處理執(zhí)行的任務(wù)可能會(huì)改變。一個(gè)解決方案是必需的,解決了這些問題,并且增加的可能性代碼重用。

解決方案

分解需要為每個(gè)數(shù)據(jù)流轉(zhuǎn)換為一組離散的元件(或過濾器)的處理,其中每一個(gè)執(zhí)行單任務(wù)。通過標(biāo)準(zhǔn)化每個(gè)組件接收和發(fā)射的數(shù)據(jù)的格式,這些過濾器可以組合在一起成為一個(gè)管道。這有助于避免重復(fù)代碼,并且可以很容易地移除,替換或集成額外的組件,如果處理要求改變。圖2顯示了這種結(jié)構(gòu)的一個(gè)例子。

圖2 - 通過使用管道和過濾器實(shí)現(xiàn)的解決方案

處理一個(gè)請(qǐng)求所花費(fèi)的時(shí)間取決于最慢的過濾器管道中的速度。這可能是一個(gè)或多個(gè)濾波器可能被證明是一個(gè)瓶頸,尤其是如果出現(xiàn)在從一個(gè)特定的數(shù)據(jù)源的數(shù)據(jù)流的大量請(qǐng)求。流水線結(jié)構(gòu)的一個(gè)關(guān)鍵優(yōu)點(diǎn)是它提供了機(jī)會(huì),運(yùn)行速度慢的過濾器的并聯(lián)情況下,使系統(tǒng)能夠分散負(fù)載并提高吞吐量。

可以獨(dú)立縮放組成一個(gè)管道可以在不同的機(jī)器上運(yùn)行過濾器,使他們和可以利用的彈性,許多云計(jì)算環(huán)境提供的優(yōu)勢(shì)。過濾器是計(jì)算密集型可以在高性能的硬件上運(yùn)行,而其他要求不高的過濾器可以對(duì)商品(便宜)的硬件來承載。過濾器甚至不需要是在同一數(shù)據(jù)中心或地理位置,它允許在一個(gè)管道中的每個(gè)元素的環(huán)境下接近它需要的資源來運(yùn)行。

圖3示出了從源 1 施加到管道中的數(shù)據(jù)的一個(gè)例子。

圖3 - 在一個(gè)管道負(fù)載平衡組件

如果一個(gè)濾波器的輸入和輸出被構(gòu)造為一個(gè)流,它可能是能夠進(jìn)行的處理并行的每個(gè)過濾器。在流水線的第一個(gè)過濾器可以開始工作,并開始發(fā)射其結(jié)果,它們會(huì)直接傳遞到序列中的下一個(gè)過濾器之前的第一過濾器已經(jīng)完成它的工作。

另一個(gè)好處是靈活性,這種模式可以提供。如果一個(gè)過濾器發(fā)生故障或者其上運(yùn)行的機(jī)器不再可用時(shí),管道可能能夠重新安排濾波器所執(zhí)行的工作,并指示此工作到組件的另一個(gè)實(shí)例。單個(gè)過濾器的故障不會(huì)必然導(dǎo)致整個(gè)管道的故障。

使用管道和過濾器與補(bǔ)償交易模式相結(jié)合的模式可以提供一種替代的方法來實(shí)現(xiàn)分布式事務(wù)。分布式事務(wù)可以被分解成單獨(dú)的賠的任務(wù),每個(gè)都可以通過使用一個(gè)過濾器,也實(shí)現(xiàn)了補(bǔ)償事務(wù)圖案來實(shí)現(xiàn)。在一個(gè)管道中的過濾器可以在運(yùn)行接近它們保持?jǐn)?shù)據(jù)被實(shí)現(xiàn)為單獨(dú)的托管工作。

問題和注意事項(xiàng)

在決定如何實(shí)現(xiàn)這個(gè)模式時(shí),您應(yīng)考慮以下幾點(diǎn):

  • 復(fù)雜性。增加的靈活性,這種模式提供了還可以引入復(fù)雜性,特別是如果被分布在不同的服務(wù)器上在管道的過濾器。
  • 可靠性。使用一個(gè)基礎(chǔ)結(jié)構(gòu),可以確保在管道中的過濾器之間流動(dòng)的數(shù)據(jù)也不會(huì)丟失。
  • 冪等性。如果在管道中的過濾失敗接收到消息后,任務(wù)被重新調(diào)度到過濾器的另一個(gè)實(shí)例,所述部分工作可能已經(jīng)完成。如果這個(gè)工作更新的全局狀態(tài)的某些方面(如存儲(chǔ)在數(shù)據(jù)庫中的信息),同樣更新可以重復(fù)。如果公布的結(jié)果,在管道中的下一個(gè)過濾器后,過濾器出現(xiàn)故障,但在此之前表示,該公司已經(jīng)成功地完成了它的工作可能會(huì)出現(xiàn)類似的問題。在這些情況下,相同的工作可以由過濾器的另一個(gè)實(shí)例被重復(fù),導(dǎo)致相同的結(jié)果要貼兩次。這可能導(dǎo)致在管道隨后過濾兩次處理相同的數(shù)據(jù)。因此,在一個(gè)管道的過濾器應(yīng)該被設(shè)計(jì)為冪等。欲了解更多信息,請(qǐng)參見喬納森·奧利弗的博客冪等模式。
  • 重復(fù)的消息。如果在管道中的過濾器可以發(fā)布一個(gè)消息給流水線的下一個(gè)階段之后發(fā)生故障時(shí),過濾器的另一個(gè)實(shí)例,可以執(zhí)行(由冪等考慮以上所描述的),并且將發(fā)布相同消息的拷貝到流水線。這可能導(dǎo)致同樣的信息的兩個(gè)實(shí)例被傳遞到下一個(gè)過濾器。為了避免這種情況,該管道應(yīng)檢測(cè)并消除重復(fù)的消息。

注意: 如果要實(shí)現(xiàn)管道使用消息隊(duì)列(如微軟的Azure服務(wù)總線隊(duì)列),消息隊(duì)列基礎(chǔ)設(shè)施可以提供自動(dòng)重復(fù)消息檢測(cè)和清除。

  • 上下文和狀態(tài)。在管道中,每個(gè)過濾器主要運(yùn)行在孤立和不應(yīng)該做這件事是如何被調(diào)用的任何假設(shè)。這意味著,每一個(gè)過濾器必須具有足夠的上下文與它能夠執(zhí)行它的工作提供。這種情況下可包含相當(dāng)數(shù)量的狀態(tài)信息。

何時(shí)使用這個(gè)模式

使用這種模式時(shí):

  • 由一個(gè)應(yīng)用程序所需的處理可以很容易地被分解成一組離散的,獨(dú)立的步驟。
  • 由應(yīng)用程序執(zhí)行的處理步驟具有不同的可擴(kuò)展性要求。

注意:它可能會(huì)向組過濾器應(yīng)擴(kuò)展一起在相同的過程。欲了解更多信息,請(qǐng)參閱計(jì)算資源整合模式。

  • 靈活性是必需的,以允許通過一個(gè)應(yīng)用程序,或能力進(jìn)行添加和刪除步驟中的處理步驟重新排序。
  • 該系統(tǒng)可以受益于分配處理跨不同服務(wù)器的步驟。
  • 一個(gè)可靠的解決方案是必需的,當(dāng)數(shù)據(jù)正在被處理的最小化在一個(gè)步驟失敗的影響。

這種模式可能不適合時(shí):

  • 通過應(yīng)用程序執(zhí)行的處理步驟并不是獨(dú)立的,或者他們必須共同作為同一事務(wù)的一部分來執(zhí)行。
  • 在一個(gè)步驟所需的上下文或狀態(tài)的信息量使得這種方法效率很低。它可能會(huì)持續(xù)狀態(tài)信息到數(shù)據(jù)庫代替,但不要使用此策略,如果在數(shù)據(jù)庫上的額外負(fù)載會(huì)導(dǎo)致過度競爭。

例子

可以使用消息隊(duì)列的一個(gè)序列,以提供執(zhí)行流水線所需的基礎(chǔ)設(shè)施。最初的消息隊(duì)列接收未處理的消息。實(shí)現(xiàn)為過濾器的任務(wù)偵聽此隊(duì)列的消息的組件,它執(zhí)行其工作,然后投遞轉(zhuǎn)化的消息序列中的下一個(gè)隊(duì)列。另一個(gè)過濾器的任務(wù)可以偵聽在這個(gè)隊(duì)列中的消息,對(duì)其進(jìn)行處理,后的結(jié)果到另一個(gè)隊(duì)列,依此類推,直到完全轉(zhuǎn)化的數(shù)據(jù)出現(xiàn)在隊(duì)列中的最后一個(gè)消息。

如果你正在構(gòu)建一個(gè)解決方案,在 Azure 上,你可以使用服務(wù)總線隊(duì)列提供了可靠的,可擴(kuò)展的排隊(duì)機(jī)制。下面所示的 ServiceBusPipeFilter 類提供了一個(gè)例子。它演示了如何實(shí)現(xiàn)接收從隊(duì)列中輸入消息,處理這些郵件的過濾器,并張貼結(jié)果到另一個(gè)隊(duì)列。

注意: 該 ServiceBusPipeFilter 類在 PipesAndFilters 解決方案 PipesAndFilters.Shared 項(xiàng)目定義。此示例代碼都可以可以下載本指導(dǎo)意見。

public class ServiceBusPipeFilter  
{  
  ...  
  private readonly string inQueuePath;  
  private readonly string outQueuePath;  
  ...  
  private QueueClient inQueue;  
  private QueueClient outQueue;  
  ...  
?
  public ServiceBusPipeFilter(..., string inQueuePath, string outQueuePath = null)  
  {  
     ...  
     this.inQueuePath = inQueuePath;  
     this.outQueuePath = outQueuePath;  
  }  
?
  public void Start()  
  {  
    ...  
    // Create the outbound filter queue if it does not exist.  
    ...  
    this.outQueue = QueueClient.CreateFromConnectionString(...);  
?
    ...  
    // Create the inbound and outbound queue clients.  
    this.inQueue = QueueClient.CreateFromConnectionString(...);  
  }  
?
  public void OnPipeFilterMessageAsync(  
    Func<BrokeredMessage, Task<BrokeredMessage>> asyncFilterTask, ...)   
  {  
    ...  
?
    this.inQueue.OnMessageAsync(  
      async (msg) =>  
    {  
      ...  
      // Process the filter and send the output to the   
      // next queue in the pipeline.  
      var outMessage = await asyncFilterTask(msg);  
?
      // Send the message from the filter processor   
      // to the next queue in the pipeline.  
      if (outQueue != null)  
      {  
        await outQueue.SendAsync(outMessage);  
      }  
?
      // Note: There is a chance that the same message could be sent twice   
      // or that a message may be processed by an upstream or downstream   
      // filter at the same time.  
      // This would happen in a situation where processing of a message was  
      // completed, it was sent to the next pipe/queue, and then failed   
      // to complete when using the PeekLock method.  
      // Idempotent message processing and concurrency should be considered   
      // in a real-world implementation.  
    },  
    options);  
  }  
?
  public async Task Close(TimeSpan timespan)  
  {  
    // Pause the processing threads.  
    this.pauseProcessingEvent.Reset();  
?
    // There is no clean approach for waiting for the threads to complete  
    // the processing. This example simply stops any new processing, waits  
    // for the existing thread to complete, then closes the message pump   
    // and finally returns.  
    Thread.Sleep(timespan);  
?
    this.inQueue.Close();  
    ...  
  }  
?
  ...  
}

在 ServiceBusPipeFilter 類 Start 方法連接到一對(duì)輸入和輸出隊(duì)列,以及關(guān)閉方法從輸入隊(duì)列斷開。該 OnPipeFilterMessageAsync 方法執(zhí)行消息的實(shí)際處理;該 asyncFilterTask 參數(shù)這種方法指定要執(zhí)行的處理。該 OnPipeFilterMessageAsync 方法等待輸入隊(duì)列中收到的消息,因?yàn)樗竭_(dá),并張貼結(jié)果到輸出隊(duì)列通過運(yùn)行在每個(gè)郵件的 asyncFilterTask 參數(shù)指定的代碼。隊(duì)列本身的構(gòu)造函數(shù)中指定。

樣品溶液的過濾器實(shí)現(xiàn)了在一組工作角色。每個(gè)工人的作用可獨(dú)立進(jìn)行調(diào)整,這取決于它執(zhí)行的業(yè)務(wù)處理的復(fù)雜性,或者它需要執(zhí)行此處理的資源。此外,各輔助角色的多個(gè)實(shí)例可以并行地運(yùn)行,以提高吞吐量。

下面的代碼顯示了一個(gè)名為 PipeFilterARoleEntry 的 Azure 工作者角色,這是在樣品溶液中 PipeFilterA 項(xiàng)目定義。

public class PipeFilterARoleEntry : RoleEntryPoint  
{  
  ...  
  private ServiceBusPipeFilter pipeFilterA;  
?
  public override bool OnStart()  
  {  
    ...  
    this.pipeFilterA = new ServiceBusPipeFilter(  
      ...,  
      Constants.QueueAPath,  
      Constants.QueueBPath);  
?
    this.pipeFilterA.Start();  
    ...  
  }  
?
  public override void Run()  
  {  
    this.pipeFilterA.OnPipeFilterMessageAsync(async (msg) =>  
    {  
      // Clone the message and update it.  
      // Properties set by the broker (Deliver count, enqueue time, ...)   
      // are not cloned and must be copied over if required.  
      var newMsg = msg.Clone();  
?
      await Task.Delay(500); // DOING WORK  
?
      Trace.TraceInformation("Filter A processed message:{0} at {1}",   
        msg.MessageId, DateTime.UtcNow);  
?
      newMsg.Properties.Add(Constants.FilterAMessageKey, "Complete");  
?
      return newMsg;  
    });  
?
    ...  
  }  
?
  ...  
}

這個(gè)角色包含 ServiceBusPipeFilter 對(duì)象。在角色 OnStart 方法連接到隊(duì)列接收輸入的信息并張貼輸出消息(隊(duì)列的名稱在常量類中定義)。 Run 方法調(diào)用 OnPipeFilterMessagesAsync 方法來對(duì)接收到的(在本例中,該處理通過等待較短的時(shí)間段模擬的)的每個(gè)消息執(zhí)行某些處理。何時(shí)處理完成時(shí),一個(gè)新的消息被構(gòu)造包含結(jié)果(在這種情況下,輸入消息被簡單地增加了一個(gè)自定義屬性),并將該消息發(fā)送到輸出隊(duì)列。

示例代碼中包含一個(gè)名為 PipeFilterBRoleEntry 在 PipeFilterB 項(xiàng)目的另一名工人的作用。這個(gè)角色類似于 PipeFilterARoleEntry 不同之處在于它的 Run 方法進(jìn)行不同的處理。在本例中的解決方案,這兩種作用結(jié)合起來,構(gòu)建一個(gè)管道;為 PipeFilterARoleEntry 角色輸出隊(duì)列是用于 PipeFilterBRoleEntry 角色的輸入隊(duì)列。

樣品溶液還提供了兩個(gè)名為 InitialSenderRoleEntry(在 InitialSender 項(xiàng)目)和 FinalReceiverRoleEntry(在 FinalReceiver 項(xiàng)目),進(jìn)一步的角色。該 InitialSenderRoleEntry 作用提供了在管道中的初始消息。OnStart 方法連接到單個(gè)隊(duì)列和運(yùn)行方法的帖子的方法來此隊(duì)列。這個(gè)隊(duì)列是所使用的 PipeFilterARoleEntry 作用,所以發(fā)布一條消息到這個(gè)隊(duì)列的輸入隊(duì)列導(dǎo)致由 PipeFilterARoleEntry 作用來接收和處理消息。經(jīng)處理的信息,然后通過 PipeFilterBRoleEntry 作用傳遞。

為 FinalReceiveRoleEntry 角色輸入隊(duì)列是用于 PipeFilterBRoleEntry 角色的輸出隊(duì)列。 Run 方法在 FinalReceiveRoleEntry 作用,如下圖所示,接收到該消息,并且執(zhí)行一些最后的處理。然后將其寫入了過濾器的管道跟蹤輸出添加自定義屬性的值。

public class FinalReceiverRoleEntry : RoleEntryPoint  
{  
  ...  
  // Final queue/pipe in the pipeline from which to process data.  
  private ServiceBusPipeFilter queueFinal;  
?
  public override bool OnStart()  
  {  
    ...  
    // Set up the queue.  
    this.queueFinal = new ServiceBusPipeFilter(...,Constants.QueueFinalPath);  
    this.queueFinal.Start();  
    ...  
  }  
?
  public override void Run()  
  {  
    this.queueFinal.OnPipeFilterMessageAsync(  
      async (msg) =>  
      {  
        await Task.Delay(500); // DOING WORK  
?
        // The pipeline message was received.  
        Trace.TraceInformation(  
          "Pipeline Message Complete - FilterA:{0} FilterB:{1}",  
          msg.Properties[Constants.FilterAMessageKey],  
          msg.Properties[Constants.FilterBMessageKey]);  
?
        return null;  
      });  
    ...  
  }  
?
  ...  
}
以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)