Samza 功能預(yù)覽

2018-08-18 18:13 更新

本節(jié)內(nèi)容

  1. 概觀
  2. 試試看
  3. 建筑
  4. 高級(jí)API
  5. 靈活部署模式

概觀

Samza 0.13.0 引入了新的編程模型和新的部署模型,它們作為預(yù)覽被發(fā)布,因?yàn)樗鼈兇砹碎_發(fā)人員如何與Samza合作的重大改進(jìn),因此對(duì)早期采用者和Samza開發(fā)社區(qū)來說,有助于實(shí)驗(yàn)該版本并提供反饋。以下內(nèi)容介紹新功能和鏈接到教程,演示如何使用它們。請(qǐng)嘗試并發(fā)送反饋給開發(fā)者郵件列表。

試試看

想跳過所有細(xì)節(jié)并獲得一些實(shí)際經(jīng)驗(yàn)嗎?有三個(gè)教程可以幫助您了解在 YARN 和嵌入式模式下運(yùn)行 Samza 應(yīng)用程序以及使用高級(jí) API 進(jìn)行編程:

  • YARN部署 - 在 YARN 上運(yùn)行預(yù)先存在的維基百科應(yīng)用程序并觀察輸出。
  • 高級(jí)API代碼演練 - 逐步構(gòu)建維基百科應(yīng)用程序。
  • ZooKeeper部署 - 使用 ZooKeeper 協(xié)調(diào)運(yùn)行預(yù)先存在的維基百科應(yīng)用程序并觀察輸出。

建筑

介紹

Samza 高級(jí) API 提供統(tǒng)一的方式來處理流和批量數(shù)據(jù)。您可以在單個(gè)程序中使用地圖,過濾器,窗口和連接等操作員來描述端到端應(yīng)用程序邏輯,以完成以前所需的多個(gè)作業(yè)。API 旨在便攜式。相同的應(yīng)用程序代碼可以批處理或流式傳輸模式,嵌入式或集群管理器環(huán)境部署,并可以通過簡單的配置更改在Kafka,Kinesis,HDFS或其他系統(tǒng)之間切換。這種可移植性是由以下部分所述的新架構(gòu)啟用的。

概念

Samza 的架構(gòu)已經(jīng)被大修,具有不同的層次,可以處理應(yīng)用程序開發(fā)的每一個(gè)階段。下圖顯示了 Apache Samza 架構(gòu)與高級(jí) API 的概述。

layered-arch

建筑中有四層,以下部分描述每個(gè)圖層。

I.高級(jí)API

高級(jí) API 提供庫來定義應(yīng)用程序邏輯。該StreamApplication是您的應(yīng)用程序必須貫徹執(zhí)行中央的抽象。您首先將輸入聲明為MessageStream的實(shí)例。然后,您可以在每個(gè) MessageStream 上應(yīng)用運(yùn)算符,如地圖,過濾器,窗口和連接,以在單個(gè)程序中定義整個(gè)端到端數(shù)據(jù)處理。

要深入了解高級(jí) API,請(qǐng)參閱下面的高級(jí)API部分。

II.ApplicationRunner

Samza 使用ApplicationRunner來運(yùn)行流應(yīng)用程序。ApplicationRunner 生成配置(如輸入/輸出流),創(chuàng)建中間流,并開始執(zhí)行。ApplicationRunner 有兩種類型:

第一種:RemoteApplicationRunner - 將應(yīng)用程序提交到遠(yuǎn)程集群。該運(yùn)行程序通過 run-app.sh 腳本調(diào)用。

要使用 RemoteApplicationRunner,請(qǐng)?jiān)O(shè)置以下配置:

# The StreamApplication class to run
app.class=com.company.job.YourStreamApplication
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory

然后使用 run-app.sh 在遠(yuǎn)程集群中運(yùn)行應(yīng)用程序。該腳本將調(diào)用 RemoteApplicationRunner,它將使用 job.factory.class 指定的工廠啟動(dòng)一個(gè)或多個(gè)作業(yè)。

第二種:LocalApplicationRunner - 在JVM進(jìn)程中運(yùn)行該應(yīng)用程序。例如,要使用 ZooKeeper 在多臺(tái)機(jī)器上啟動(dòng)應(yīng)用程序進(jìn)行協(xié)調(diào),可以在各種機(jī)器上運(yùn)行多個(gè) LocalApplicationRunner實(shí)例。應(yīng)用程序加載后,他們將通過 ZooKeeper 啟動(dòng)它們的操作。以下是使用 LocalApplicationRunner 在程序中運(yùn)行 StreamApplication 的示例:

public static void main(String[] args) throws Exception {
CommandLine cmdLine = new CommandLine();
Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
LocalApplicationRunner localRunner = new LocalApplicationRunner(config);
StreamApplication app = new YourStreamApplication();
localRunner.run(app);

// Wait for the application to finish
localRunner.waitForFinish();
System.out.println("Application completed with status " + localRunner.status(app));
}

按照ZooKeeper部署教程進(jìn)行嘗試。

執(zhí)行計(jì)劃

ApplicationRunner 在開始執(zhí)行處理邏輯之前生成一個(gè)物理執(zhí)行計(jì)劃。該計(jì)劃表示應(yīng)用程序的運(yùn)行時(shí)結(jié)構(gòu)。特別地,它提供對(duì)生成的中間流的可見性。一旦部署了工作,該計(jì)劃可以被看作如下:

  • 對(duì)于使用 run-app.sh 啟動(dòng)的應(yīng)用程序,Samza 將在您的應(yīng)用程序部署目錄下創(chuàng)建一個(gè)計(jì)劃目錄,并在其中編寫 plan.json 文件。
  • 對(duì)于使用自己的腳本(例如 LocalApplicationRunner)啟動(dòng)的應(yīng)用程序,請(qǐng)?jiān)谂c bin 相同的級(jí)別創(chuàng)建一個(gè)計(jì)劃目錄,并指出 EXECUTION_PLAN_DIR 環(huán)境變量的位置。

要查看計(jì)劃,請(qǐng)?jiān)跒g覽器中打開 bin / plan.html 文件。這是一個(gè)示例計(jì)劃可視化:

execution-plan

III.執(zhí)行模型

Samza 支持兩種類型的執(zhí)行模型:基于群集的執(zhí)行和嵌入式執(zhí)行。

在基于群集的執(zhí)行中,Samza 將在多租戶群集上運(yùn)行和管理您的應(yīng)用程序。薩姆支持 YARN。您可以實(shí)現(xiàn)自己的 StreamJob 和相應(yīng)的 ResourceManagerFactory 來添加對(duì)另一個(gè)集群管理器的支持。

在嵌入式執(zhí)行模型中,您可以在應(yīng)用程序中使用 Sa    mza 作為輕量級(jí)庫。您可以旋轉(zhuǎn)應(yīng)用程序的多個(gè)實(shí)例,它們之間將分配和協(xié)調(diào)處理。此模式提供了在任意托管環(huán)境中運(yùn)行應(yīng)用程序的靈活性:它還支持可插拔協(xié)調(diào)邏輯,支持兩種類型的協(xié)調(diào)開箱即用:

  • 基于 ZooKeeper 的協(xié)調(diào) - Samza 可以配置為使用 ZooKeeper 在應(yīng)用程序的實(shí)例之間管理組成員資格和分區(qū)分配。這允許您通過關(guān)閉更多實(shí)例或縮小某些值來動(dòng)態(tài)縮放應(yīng)用程序。
  • 外部協(xié)調(diào) - Samza 可以在單個(gè)JVM本地運(yùn)行您的應(yīng)用程序,而無需協(xié)調(diào),或者具有靜態(tài)分區(qū)分配的多個(gè)JVM。當(dāng)在諸如 Kubernetes 或 Amazon ECS 這樣的集裝箱化環(huán)境中運(yùn)行時(shí),這是非常有用的。

有關(guān)在嵌入式模式下運(yùn)行 Samza 的更多詳細(xì)信息,請(qǐng)參閱下面的靈活部署模型部分。

IV.處理器

Samza 應(yīng)用程序的最低執(zhí)行單元是處理器。它讀取從ApplicationRunner生成的配置并處理 JobCoordinator 分配的輸入流分區(qū)。它可以使用KeyValueStore實(shí)現(xiàn)(例如 RocksDB 或內(nèi)存)和使用多線程的遠(yuǎn)程狀態(tài)(例如 REST 服務(wù))訪問本地狀態(tài)。

高級(jí)API

自從0.13.0版本以來,Samza 提供了一個(gè)新的高級(jí) API,可以簡化應(yīng)用程序。此 API 支持重新分區(qū),加窗和加入流等操作。您現(xiàn)在可以在幾行代碼中簡潔地表達(dá)您的應(yīng)用程序邏輯,并完成以前需要的多個(gè)作業(yè)。

代碼示例

查看一些示例來查看高級(jí) API 的操作。

  1. PageView AdClick Joiner演示了一系列AdViews加入了一系列 PageViews,例如分析哪些網(wǎng)頁獲得最多的廣告點(diǎn)擊。
  2. Pageview Repartitioner說明了重新分割引入的 PageViews 的流。
  3. PageView Sessionizer根據(jù)用戶活動(dòng)將進(jìn)入的事件流分組到會(huì)話中。
  4. 按區(qū)域?yàn)g覽量計(jì)算每個(gè)區(qū)域在翻滾時(shí)間間隔內(nèi)的視圖數(shù)量。

關(guān)鍵概念

StreamApplication

在使用 Samza 高級(jí) API 編寫流處理應(yīng)用程序時(shí),應(yīng)實(shí)現(xiàn)StreamApplication并在 init 方法中定義處理邏輯。

public void init(StreamGraph graph, Config config) { … }

例如,這里是一個(gè) StreamApplication,它使用查看者的個(gè)人資料信息來驗(yàn)證和裝飾頁面瀏覽。

public class BadPageViewFilter implements StreamApplication {
  @Override
public void init(StreamGraph graph, Config config) {
    MessageStream<PageView> pageViews = graph.getInputStream(“page-views”..);

    pageViews.filter(this::isValidPageView)
                      .map(this::addProfileInformation)
                      .sendTo(graph.getOutputStream(“decorated-page-views”..))
 }
}

MessageStream

顧名思義,MessageStream 表示消息流。StreamApplication 被描述為 MessageStreams 上的一系列轉(zhuǎn)換。

您可以通過兩種方式獲取 MessageStream:

  1. 使用 StreamGraph.getInputStream 獲取給定輸入流的 MessageStream(例如,Kafka主題)。
  2. 通過使用地圖,過濾器,窗口,連接等操作轉(zhuǎn)換現(xiàn)有的 MessageStream。

典型Samza StreamApplication的解剖學(xué)

使用 Samza 高級(jí) API 編寫流處理應(yīng)用程序有3個(gè)簡單的步驟。

步驟1:獲取輸入流:

您可以使用 StreamGraph.getInputStream 獲取輸入流 ID(“page-views”)的 MessageStream。

MessageStream<PageView> pageViewInput = graph.getInputStream(“page-views”, (k,v) -> v);

第一個(gè)參數(shù) page-views 是邏輯流 ID。每個(gè)流 ID 與物理名稱和系統(tǒng)相關(guān)聯(lián)。默認(rèn)情況下,Samza 使用流 ID 作為物理流名稱,并訪問使用屬性 “job.default.system” 指定的默認(rèn)系統(tǒng)上的流。但是,物理名稱和系統(tǒng)屬性可以在配置中被覆蓋。例如,以下配置將流 ID“page-views” 定義為本地 Kafka 集群中 PageViewEvent 主題的別名。

streams.page-views.samza.system=kafka
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.consumer.zookeeper.connect=localhost:2181
systems.kafka.producer.bootstrap.servers=localhost:9092
streams.page-views.samza.physical.name=PageViewEvent

第二個(gè)參數(shù) (k,v) -> v 是 MessageBuilder 函數(shù),用于從傳入的鍵和值構(gòu)造消息。

步驟2:定義轉(zhuǎn)換邏輯:

您現(xiàn)在可以將 StreamApplication 邏輯定義為 MessageStream 上的一系列轉(zhuǎn)換。

MessageStream<DecoratedPageViews> decoratedPageViews
                                   = pageViewInput.filter(this::isValidPageView)
                                                  .map(this::addProfileInformation);

步驟3:將輸出寫入輸出流:

最后,您可以使用 StreamGraph.getOutputStream 創(chuàng)建一個(gè) OutputStream,并通過它發(fā)送已轉(zhuǎn)換的消息。

// Send messages with userId as the key to “decorated-page-views”.
decoratedPageViews.sendTo(
                          graph.getOutputStream(“decorated-page-views”,
                                                dpv -> dpv.getUserId(),
                                                dpv -> dpv));

第一個(gè)參數(shù) decorated-page-views 是邏輯流 ID。該流 ID 的屬性可以像輸入流的流 ID 一樣被覆蓋。例如:

streams.decorated-page-views.samza.system=kafka
streams.decorated-page-views.samza.physical.name=DecoratedPageViewEvent

第二和第三個(gè)參數(shù)定義提取器,將上游數(shù)據(jù)類型分別分成一個(gè)單獨(dú)的鍵和值。

運(yùn)營商

高級(jí) API 支持常用的運(yùn)算符,如地圖,平面圖,過濾器,合并,連接和窗口。大多數(shù)這些操作符接受相應(yīng)的函數(shù),這些函數(shù)是 Initable

地圖

將提供的1:1 MapFunction 應(yīng)用于 MessageStream 中的每個(gè)元素,并返回已轉(zhuǎn)換的 MessageStream。MapFunction 接收單個(gè)消息并返回單個(gè)消息(可能具有不同類型的消息)。

MessageStream<Integer> numbers = ...
MessageStream<Integer> tripled= numbers.map(m -> m * 3)
MessageStream<String> stringified = numbers.map(m -> String.valueOf(m))

Flatmap

將提供的 1:n FlatMapFunction應(yīng)用于 MessageStream 中的每個(gè)元素,并返回已轉(zhuǎn)換的 MessageStream。FlatMapFunction 接收單個(gè)消息并返回零個(gè)或多個(gè)消息。

MessageStream<String> sentence = ...
// Parse the sentence into its individual words splitting by space
MessageStream<String> words = sentence.flatMap(sentence ->
                                                          Arrays.asList(sentence.split(“ ”))

過濾

將提供的FilterFunction應(yīng)用于 MessageStream 并返回已過濾的 MessageStream。FilterFunction 是一個(gè)謂詞,用于指定是否應(yīng)將消息保留在過濾的流中。FilterFunction 返回 false 的消息將被過濾掉。

MessageStream<String> words = ...
// Extract only the long words
MessageStream<String> longWords = words.filter(word -> word.size() > 15);
// Extract only the short words
MessageStream<String> shortWords = words.filter(word -> word.size() < 3);

PartitionBy

使用提供的 keyExtractor 返回的鍵重新分區(qū)該 MessageStream,并返回已轉(zhuǎn)換的 MessageStream。在重新分區(qū)期間通過中間流發(fā)送消息。

// Repartition pageView by userId
MessageStream<PageView> pageViews = ...
MessageStream<PageView> partitionedPageViews =
                                        pageViews.partitionBy(pageView -> pageView.getUserId())

合并

將 MessageStream 與所有提供的 MessageStream 合并,并返回合并的流。

MessageStream<ServiceCall> serviceCall1 = ...
MessageStream<ServiceCall> serviceCall2 = ...
// Merge individual “ServiceCall” streams and create a new merged MessageStream
MessageStream<ServiceCall> serviceCallMerged = serviceCall1.merge(serviceCall2)

合并變換保留每個(gè) MessageStream 的順序,因此如果消息 m1 出現(xiàn) m2 在任何提供的流之前,則 m1 也會(huì)出現(xiàn) m2 在合并流之前。

作為 merge 實(shí)例方法的替代方法,您還可以使用MessageStream#mergeAll靜態(tài)方法來合并 MessageStream 而不在初始流上操作。

發(fā)給

將此消息流中的所有消息發(fā)送到提供的 OutputStream。您可以指定要用于傳出消息的密鑰和值。

// Output a new message with userId as the key and region as the value to the “user-region” stream.
MessageStream<PageView> pageViews = ...
OutputStream<String, String, PageView> userRegions
                           = graph.getOutputStream(“user-region”,
                                                   pageView -> pageView.getUserId(),
                                                   pageView -> pageView.getRegion())
pageView.sendTo(userRegions);

水槽

允許使用提供的SinkFunction從此 MessageStream 發(fā)送消息到輸出系統(tǒng)。

這提供了比 sendTo 更多的控制,因?yàn)?SinkFunction 可以訪問 MessageCollector 和 TaskCoordinator。例如,您可以選擇手動(dòng)提交偏移量,或使用 TaskCoordinator API 關(guān)閉作業(yè)。該運(yùn)營商也可用于向非薩姆薩系統(tǒng)發(fā)送消息(如遠(yuǎn)程數(shù)據(jù)庫,REST 服務(wù)等)

// Repartition pageView by userId.
MessageStream<PageView> pageViews = ...
pageViews.sink( (msg, collector, coordinator) -> {
// Construct a new outgoing message, and send it to a kafka topic named TransformedPageViewEvent.
 collector.send(new OutgoingMessageEnvelope(new SystemStream(“kafka”,
                         “TransformedPageViewEvent”), msg));
} )

加入

Join 運(yùn)算符使用提供的成對(duì)JoinFunction從兩個(gè) MessageStream 中加入消息。當(dāng)從第一流的消息提取的密鑰匹配從第二個(gè)流中的消息提取的密鑰時(shí),消息被連接。每個(gè)流中的消息將保留提供的 ttl 持續(xù)時(shí)間,并且連接結(jié)果將在匹配發(fā)現(xiàn)時(shí)發(fā)出。

// Joins a stream of OrderRecord with a stream of ShipmentRecord by orderId with a TTL of 20 minutes.
// Results are produced to a new stream of FulfilledOrderRecord.
MessageStream<OrderRecord> orders = …
MessageStream<ShipmentRecord> shipments = …

MessageStream<FulfilledOrderRecord> shippedOrders = orders.join(shipments, new OrderShipmentJoiner(), Duration.ofMinutes(20) )

// Constructs a new FulfilledOrderRecord by extracting the order timestamp from the OrderRecord and the shipment timestamp from the ShipmentRecord.
 class OrderShipmentJoiner implements JoinFunction<String, OrderRecord, ShipmentRecord, FulfilledOrderRecord> {
   @Override
   public FulfilledOrderRecord apply(OrderRecord message, ShipmentRecord otherMessage) {
     return new FulfilledOrderRecord(message.orderId, message.orderTimestamp, otherMessage.shipTimestamp);
   }

   @Override
   public String getFirstKey(OrderRecord message) {
     return message.orderId;
   }

   @Override
   public String getSecondKey(ShipmentRecord message) {
     return message.orderId;
   }
 }

窗口

窗口概念

Windows,觸發(fā)器和 WindowPanes:窗口操作符將 MessageStream 中的傳入消息分組為有限的窗口。每個(gè)發(fā)出的結(jié)果在窗口中包含一個(gè)或多個(gè)消息,稱為 WindowPane。

窗口可以具有一個(gè)或多個(gè)關(guān)聯(lián)的觸發(fā)器,以確定何時(shí)發(fā)出窗口的結(jié)果。觸發(fā)器可以是早期觸發(fā)器,允許在窗口的所有數(shù)據(jù)到達(dá)之前推測出結(jié)果,或者延遲觸發(fā)器允許處理窗口的延遲消息。

聚合器功能:默認(rèn)情況下,發(fā)出的 WindowPane 將包含窗口的所有消息。您通常不會(huì)保留所有消息,而是為 WindowPane 定義一個(gè)更緊湊的數(shù)據(jù)結(jié)構(gòu),并在新消息到達(dá)時(shí)逐漸更新,例如在窗口中保留消息計(jì)數(shù)。為此,您可以提供一個(gè)聚合的FoldLeftFunction,它為每個(gè)添加到窗口的傳入消息調(diào)用,并定義如何更新該消息的 WindowPane。

累積模式:窗口的累加模式確定從窗口發(fā)出的結(jié)果與同一窗口的先前發(fā)射結(jié)果相關(guān)。當(dāng)窗口配置有早期或晚期觸發(fā)器時(shí),這特別有用。累積模式可以是丟棄或累積。

一個(gè)丟棄窗口清除在每一個(gè)發(fā)射窗口的所有狀態(tài)。每次發(fā)射只會(huì)對(duì)應(yīng)于從前一次發(fā)射窗口到達(dá)的新消息。

一個(gè)累加窗口保留從以前的排放窗口的結(jié)果。每個(gè)排放將包含從窗口開始到達(dá)的所有郵件。

窗口類型:

Samza 高級(jí) API 目前支持翻滾和會(huì)話窗口。

翻滾窗口:一個(gè)翻滾窗口定義了一系列的流中連續(xù)的,固定大小的時(shí)間間隔。

例子:

// Group the pageView stream into 3 second tumbling windows keyed by the userId.
MessageStream<PageView> pageViews = ...
MessageStream<WindowPane<String, Collection<PageView>>> =
                     pageViews.window(
                         Windows.keyedTumblingWindow(pageView -> pageView.getUserId(),
                           Duration.ofSeconds(30)))


// Compute the maximum value over tumbling windows of 3 seconds.
MessageStream<Integer> integers = …
Supplier<Integer> initialValue = () -> Integer.MIN_VALUE
FoldLeftFunction<Integer, Integer> aggregateFunction = (msg, oldValue) -> Math.max(msg, oldValue)
MessageStream<WindowPane<Void, Integer>> windowedStream =
         integers.window(Windows.tumblingWindow(Duration.ofSeconds(30), initialValue, aggregateFunction))

會(huì)話窗口:會(huì)話窗口將 MessageStream 組合成會(huì)話。會(huì)話通過 MessageStream 捕獲一段活動(dòng),并由間隙定義。關(guān)閉會(huì)話,如果沒有新消息到達(dá)窗口以獲得間隙時(shí)間,則會(huì)發(fā)出結(jié)果。

例子:

// Sessionize a stream of page views, and count the number of page-views in a session for every user.
MessageStream<PageView> pageViews = …
Supplier<Integer> initialValue = () -> 0
FoldLeftFunction<PageView, Integer> countAggregator = (pageView, oldCount) -> oldCount + 1;
Duration sessionGap = Duration.ofMinutes(3);
MessageStream<WindowPane<String, Integer> sessionCounts = pageViews.window(Windows.keyedSessionWindow(
    pageView -> pageView.getUserId(), sessionGap, initialValue, countAggregator));

// Compute the maximum value over tumbling windows of 3 seconds.
MessageStream<Integer> integers = …
Supplier<Integer> initialValue = () -> Integer.MAX_INT

FoldLeftFunction<Integer, Integer> aggregateFunction = (msg, oldValue) -> Math.max(msg, oldValue)
MessageStream<WindowPane<Void, Integer>> windowedStream =
     integers.window(Windows.tumblingWindow(Duration.ofSeconds(3), initialValue, aggregateFunction))

已知的問題

目前,窗口和連接操作符緩沖內(nèi)存中的消息。因此,消息可能在故障和重新啟動(dòng)時(shí)丟失。

靈活部署模式

介紹

在 Samza 0.13.0 之前,Samza僅支持使用 YARN進(jìn)行 集群管理的部署。

使用 Samza 0.13.0,部署模式已被簡化并與 YARN 分離。如果您喜歡集群管理,您仍然可以使用 YARN,或者您可以實(shí)現(xiàn)自己的擴(kuò)展,以在其他集群管理系統(tǒng)上部署 Samza。但是如果你想避免集群管理系統(tǒng)呢?

Samza 現(xiàn)在可以將應(yīng)用程序部署為具有可插拔協(xié)調(diào)的簡單嵌入式庫。使用嵌入式模式,您可以直接在應(yīng)用程序中使用 Samza 處理器,并以任何您喜歡的方式進(jìn)行部署。Samza 有一個(gè)可插拔的工作協(xié)調(diào)器層,用于執(zhí)行領(lǐng)導(dǎo)選舉,并為處理器分配工作。

本節(jié)將重點(diǎn)介紹新的嵌入式部署功能。

概念

我們來仔細(xì)看看嵌入式部署的工作原理。

上面的概念部分提供了能夠靈活部署模型的層的概述。新的嵌入式模式進(jìn)入了部署層。部署層包括向可用處理器分配輸入分區(qū)。

有兩種類型的分區(qū)分配模型可以通過配置中的 job.coordinator.factory 進(jìn)行控制:

外部分區(qū)管理

通過外部分區(qū)管理,Samza 本身不管理分區(qū)。相反,它使用一個(gè) PassthroughJobCoordinator 供奉由SystemStreamPartitionGrouper提供的任何分區(qū)映射。外部分區(qū)管理有兩種常見的模式:

  • 使用高級(jí)卡夫卡消費(fèi)者 - 分區(qū)分配由高級(jí)卡夫卡消費(fèi)者本身完成。要使用此模型,您需要實(shí)現(xiàn)和配置一個(gè) SystemFactory,它提供了 yarn 高級(jí)消費(fèi)者。然后,您需要將job.systemstreampartition.grouper.factory 配置為 org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouper,以使 yarn 的分區(qū)分配全都轉(zhuǎn)到一個(gè)任務(wù)。
  • 自定義分區(qū) - 使用自定義分割器完成分區(qū)分配。石斑魚的邏輯完全取決于你。此模型的一個(gè)實(shí)際示例是實(shí)現(xiàn)從配置讀取靜態(tài)分區(qū)分配的自定義分組器。

Samza 配備了 PassthroughJobCoordinatorFactory 一種便于這種類型的分區(qū)管理。

動(dòng)態(tài)分區(qū)管理

使用動(dòng)態(tài)分區(qū),分區(qū)在運(yùn)行時(shí)分布在可用處理器之間。如果可用處理器的數(shù)量發(fā)生變化(例如,如果有些處理器被關(guān)閉或添加),映射將被重新生成并重新分配給所有處理器。有關(guān)當(dāng)前映射的信息包含在稱為 JobModel 的特殊結(jié)構(gòu)中。有一個(gè)領(lǐng)先的處理器生成 JobModel 并將其分發(fā)到其他處理器。領(lǐng)導(dǎo)人由“領(lǐng)導(dǎo)人選舉”進(jìn)程決定。

我們來仔細(xì)看看動(dòng)態(tài)協(xié)調(diào)的工作原理。

協(xié)調(diào)服務(wù)

處理器的動(dòng)態(tài)協(xié)調(diào)假定存在協(xié)調(diào)服務(wù)。該服務(wù)的主要職責(zé)是:

  • 領(lǐng)導(dǎo)選舉 - 選擇一個(gè)單一的處理器,負(fù)責(zé) JobModel 的計(jì)算和分發(fā)或創(chuàng)建中間流。
  • 中央屏障和鎖存器 - 處理器使用的協(xié)調(diào)原語。
  • JobModel 通知 - 通知處理器有關(guān)新 JobModel 的可用性。
  • JobModel 存儲(chǔ)協(xié)調(diào)服務(wù)指示 JobModel 持久化的位置。

協(xié)調(diào)服務(wù)目前來自工作協(xié)調(diào)員工廠。Samza ZkJobCoordinatorFactory 有一個(gè)相應(yīng)的實(shí)現(xiàn) ZkCoordinationServiceFactory。

我們來看看基于 ZooKeeper 的嵌入式應(yīng)用程序的協(xié)調(diào)順序:

  • 每個(gè)處理器(參與者)將向可插拔協(xié)調(diào)服務(wù)注冊。在注冊期間,它將提供自己的參與者 ID。
  • 其中一名參賽者將當(dāng)選為領(lǐng)導(dǎo)人。
  • 領(lǐng)導(dǎo)者監(jiān)督所有活躍參與者的名單。
  • 每當(dāng)參與者名單發(fā)生變化時(shí),領(lǐng)導(dǎo)者將為當(dāng)前參與者生成一個(gè)新的 JobModel。
  • 新的 JobModel 將被推送到一個(gè)常見的存儲(chǔ)。默認(rèn)實(shí)現(xiàn)為此目的使用 ZooKeeper。
  • 通知參與者新的 JobModel 可用。通過協(xié)調(diào)服務(wù)完成通知,例如 ZooKeeper。
  • 參與者將停止處理,應(yīng)用新的 JobModel,然后恢復(fù)處理。

下圖顯示了 ZooKeeper 協(xié)調(diào)服務(wù)實(shí)現(xiàn)中協(xié)調(diào)器的關(guān)系。


以下是協(xié)調(diào)服務(wù)的幾個(gè)重要細(xì)節(jié):

  • 為了確保兩個(gè)分區(qū)不被不同處理器處理兩次,處理暫停,處理器在屏障上同步旦所有處理器都被暫停,就會(huì)應(yīng)用新的 JobModel 并繼續(xù)處理。使用協(xié)調(diào)服務(wù)實(shí)現(xiàn)屏障。
  • 在啟動(dòng)和關(guān)機(jī)期間,處理器將陸續(xù)加入/離開。為了避免冗余的 JobModel 重新計(jì)算,有一個(gè)去抖動(dòng)定時(shí)器等待一些短時(shí)間(默認(rèn)為2秒,在將來的版本中可配置),以便更多的處理器加入或離開。每次處理器加入或離開時(shí),定時(shí)器被復(fù)位。當(dāng)定時(shí)器到期時(shí),最終重新計(jì)算 JobModel。
  • 如果處理器需要相鄰或臨時(shí)數(shù)據(jù)的本地存儲(chǔ),我們希望在重新啟動(dòng)時(shí)保持映射。為此,我們使用一些關(guān)于每個(gè)處理器的額外信息,它們唯一標(biāo)識(shí)它及其位置。如果相同的處理器在相同的位置重新啟動(dòng),我們將嘗試為其分配相同的分區(qū)。這個(gè)地方信息應(yīng)該能夠在重新啟動(dòng)之后繼續(xù)存在,所以它被存儲(chǔ)在一個(gè)公共的存儲(chǔ)器上(目前使用的是 ZooKeeper)。

用戶指南

嵌入式部署旨在幫助希望更好地控制其應(yīng)用程序部署的用戶。因此,用戶有責(zé)任配置和部署處理器。在 ZooKeeper 協(xié)調(diào)的情況下,您還需要配置 ZooKeeper 實(shí)例的 URL。

此外,每個(gè)處理器需要與協(xié)調(diào)服務(wù)一起使用的唯一 ID。如果位置關(guān)聯(lián)性很重要,則該 ID 對(duì)于特定主機(jī)名上的每個(gè)處理器(假設(shè)本地存儲(chǔ)服務(wù))應(yīng)該是唯一的。為了滿足這一要求,Samza 使用ProcessorIdGenerator為每個(gè)處理器提供 ID。如果沒有顯式配置生成器,默認(rèn)情況下將為每個(gè)處理器創(chuàng)建一個(gè) UUID。

組態(tài)

要運(yùn)行嵌入式 Samza 處理器,您需要使用 job.coordinator.factory 屬性配置協(xié)調(diào)器服務(wù)。此外,目前有一個(gè)支持嵌入式模式的 taskname 分組,因此您必須明確配置。

我們來看看如何配置 Samza 附帶的兩個(gè)協(xié)調(diào)服務(wù)實(shí)現(xiàn)。

要使用基于 ZooKeeper 的協(xié)調(diào),需要以下配置:

job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
job.coordinator.zk.connect=yourzkconnection
task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory

要使用外部協(xié)調(diào),需要以下配置:

job.coordinator.factory=org.apache.samza.standalone.PassthroughJobCoordinatorFactory
task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory

API

如上面的概述部分所述,您可以使用 LocalApplicationRunner 從應(yīng)用程序代碼啟動(dòng)處理器,如下所示:

public class WikipediaZkLocalApplication {

 public static void main(String[] args) {
   CommandLine cmdLine = new CommandLine();
   OptionSet options = cmdLine.parser().parse(args);
   Config config = cmdLine.loadConfig(options);

   LocalApplicationRunner runner = new LocalApplicationRunner(config);
   WikipediaApplication app = new WikipediaApplication();

   runner.run(app);
   runner.waitForFinish();
 }
}

在上面的代碼中,WikipediaApplication 是用高級(jí)API編寫的應(yīng)用程序。

請(qǐng)查看本教程,以便現(xiàn)在在機(jī)器上使用 ZooKeeper 協(xié)調(diào)運(yùn)行此應(yīng)用程序。

部署和縮放

您可以以任何您喜歡的方式部署應(yīng)用程序?qū)嵗?。如果使用協(xié)調(diào)服務(wù),您可以隨時(shí)添加或刪除實(shí)例,并且領(lǐng)導(dǎo)者的工作協(xié)調(diào)員(通過協(xié)調(diào)服務(wù)選舉)將在去抖動(dòng)時(shí)間后自動(dòng)重新計(jì)算 JobModel,并將其應(yīng)用于可用的處理器。所以,為了擴(kuò)展你的應(yīng)用程序,你只需要啟動(dòng)更多的處理器。

已知的問題

請(qǐng)注意0??.13.0版本的嵌入式部署功能的以下問題。他們將在隨后的版本中修復(fù)。

  • 不支持 GroupByContainerCount 默認(rèn)任務(wù)名稱分組。
  • 主機(jī)關(guān)聯(lián)性未啟用。
  • 尚未提供 ZkJobCoordinator 指標(biāo)。指標(biāo)即將加入重新計(jì)算 JobModel 的數(shù)量讀/寫數(shù)領(lǐng)導(dǎo)人重選更多..
  • LocalApplicationRunner 還不支持低級(jí)API。這意味著您不能使用 StreamTask 與 LocalApplicationRunner。
  • 目前,對(duì)于使用此zk集群的所有應(yīng)用程序,“app.id” 配置必須是唯一的。


以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)