Trident是Storm的延伸。像Storm,Trident也是由Twitter開發(fā)的。開發(fā)Trident的主要原因是在Storm上提供高級抽象,以及狀態(tài)流處理和低延遲分布式查詢。
Trident使用spout和bolt,但是這些低級組件在執(zhí)行之前由Trident自動生成。 Trident具有函數(shù),過濾器,聯(lián)接,分組和聚合。
Trident將流處理為一系列批次,稱為事務。通常,這些小批量的大小將是大約數(shù)千或數(shù)百萬個元組,這取決于輸入流。這樣,Trident不同于Storm,它執(zhí)行元組一元組處理。
批處理概念非常類似于數(shù)據(jù)庫事務。每個事務都分配了一個事務ID。該事務被認為是成功的,一旦其所有的處理完成。然而,處理事務的元組中的一個的失敗將導致整個事務被重傳。對于每個批次,Trident將在事務開始時調用beginCommit,并在結束時提交。
Trident API公開了一個簡單的選項,使用“TridentTopology”類創(chuàng)建Trident拓撲。基本上,Trident拓撲從流出接收輸入流,并對流上執(zhí)行有序的操作序列(濾波,聚合,分組等)。Storm元組被替換為Trident元組,bolt被操作替換。一個簡單的Trident拓撲可以創(chuàng)建如下 -
TridentTopology topology = new TridentTopology();
Trident Tuples是一個命名的值列表。TridentTuple接口是Trident拓撲的數(shù)據(jù)模型。TridentTuple接口是可由Trident拓撲處理的數(shù)據(jù)的基本單位。
Trident spout與類似于Storm spout,附加選項使用Trident的功能。實際上,我們仍然可以使用IRichSpout,我們在Storm拓撲中使用它,但它本質上是非事務性的,我們將無法使用Trident提供的優(yōu)點。
具有使用Trident的特征的所有功能的基本spout是“ITridentSpout”。它支持事務和不透明的事務語義。其他的spouts是IBatchSpout,IPartitionedTridentSpout和IOpaquePartitionedTridentSpout。
除了這些通用spouts,Trident有許多樣品實施trident spout。其中之一是FeederBatchSpout輸出,我們可以使用它發(fā)送trident tuples的命名列表,而不必擔心批處理,并行性等。
FeederBatchSpout創(chuàng)建和數(shù)據(jù)饋送可以如下所示完成 -
TridentTopology topology = new TridentTopology(); FeederBatchSpout testSpout = new FeederBatchSpout( ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”)); topology.newStream("fixed-batch-spout", testSpout) testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));
Trident依靠“Trident操作”來處理trident tuples的輸入流。Trident API具有多個內置操作來處理簡單到復雜的流處理。這些操作的范圍從簡單驗證到復雜的trident tuples分組和聚合。讓我們經(jīng)歷最重要和經(jīng)常使用的操作。
過濾器是用于執(zhí)行輸入驗證任務的對象。Trident過濾器獲取trident tuples字段的子集作為輸入,并根據(jù)是否滿足某些條件返回真或假。如果返回true,則該元組保存在輸出流中;否則,從流中移除元組。過濾器將基本上繼承自BaseFilter類并實現(xiàn)isKeep方法。這里是一個濾波器操作的示例實現(xiàn) -
public class MyFilter extends BaseFilter { public boolean isKeep(TridentTuple tuple) { return tuple.getInteger(1) % 2 == 0; } } input [1, 2] [1, 3] [1, 4] output [1, 2] [1, 4]
可以使用“each”方法在拓撲中調用過濾器功能。“Fields”類可以用于指定輸入(trident tuple的子集)。示例代碼如下 -
TridentTopology topology = new TridentTopology(); topology.newStream("spout", spout) .each(new Fields("a", "b"), new MyFilter())
函數(shù)是用于對單個trident tuple執(zhí)行簡單操作的對象。它需要一個trident tuple字段的子集,并發(fā)出零個或多個新的trident tuple字段。
函數(shù)基本上從BaseFunction類繼承并實現(xiàn)execute方法。下面給出了一個示例實現(xiàn):
public class MyFunction extends BaseFunction { public void execute(TridentTuple tuple, TridentCollector collector) { int a = tuple.getInteger(0); int b = tuple.getInteger(1); collector.emit(new Values(a + b)); } } input [1, 2] [1, 3] [1, 4] output [1, 2, 3] [1, 3, 4] [1, 4, 5]
與過濾操作類似,可以使用每個方法在拓撲中調用函數(shù)操作。示例代碼如下 -
TridentTopology topology = new TridentTopology(); topology.newStream("spout", spout) .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));
聚合是用于對輸入批處理或分區(qū)或流執(zhí)行聚合操作的對象。Trident有三種類型的聚合。他們如下 -
aggregate -單獨聚合每批trident tuple。在聚合過程期間,首先使用全局分組將元組重新分區(qū),以將同一批次的所有分區(qū)組合到單個分區(qū)中。
partitionAggregate -聚合每個分區(qū),而不是整個trident tuple。分區(qū)集合的輸出完全替換輸入元組。分區(qū)集合的輸出包含單個字段元組。
persistentaggregate -聚合所有批次中的所有trident tuple,并將結果存儲在內存或數(shù)據(jù)庫中。
TridentTopology topology = new TridentTopology(); // aggregate operation topology.newStream("spout", spout) .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”)) .aggregate(new Count(), new Fields(“count”)) // partitionAggregate operation topology.newStream("spout", spout) .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”)) .partitionAggregate(new Count(), new Fields(“count")) // persistentAggregate - saving the count to memory topology.newStream("spout", spout) .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”)) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
可以使用CombinerAggregator,ReducerAggregator或通用Aggregator接口創(chuàng)建聚合操作。上面例子中使用的“計數(shù)”聚合器是內置聚合器之一,它使用“CombinerAggregator”實現(xiàn),實現(xiàn)如下 -
public class Count implements CombinerAggregator<Long> { @Override public Long init(TridentTuple tuple) { return 1L; } @Override public Long combine(Long val1, Long val2) { return val1 + val2; } @Override public Long zero() { return 0L; } }
分組操作是一個內置操作,可以由groupBy方法調用。groupBy方法通過在指定字段上執(zhí)行partitionBy來重新分區(qū)流,然后在每個分區(qū)中,它將組字段相等的元組組合在一起。通常,我們使用“groupBy”以及“persistentAggregate”來獲得分組聚合。示例代碼如下 -
TridentTopology topology = new TridentTopology(); // persistentAggregate - saving the count to memory topology.newStream("spout", spout) .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”)) .groupBy(new Fields(“d”) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));
合并和連接可以分別通過使用“合并”和“連接”方法來完成。合并組合一個或多個流。加入類似于合并,除了加入使用來自兩邊的trident tuple字段來檢查和連接兩個流的事實。此外,加入將只在批量級別工作。示例代碼如下 -
TridentTopology topology = new TridentTopology(); topology.merge(stream1, stream2, stream3); topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));
Trident提供了狀態(tài)維護的機制。狀態(tài)信息可以存儲在拓撲本身中,否則也可以將其存儲在單獨的數(shù)據(jù)庫中。原因是維護一個狀態(tài),如果任何元組在處理過程中失敗,則重試失敗的元組。這會在更新狀態(tài)時產(chǎn)生問題,因為您不確定此元組的狀態(tài)是否已在之前更新過。如果在更新狀態(tài)之前元組已經(jīng)失敗,則重試該元組將使狀態(tài)穩(wěn)定。然而,如果元組在更新狀態(tài)后失敗,則重試相同的元組將再次增加數(shù)據(jù)庫中的計數(shù)并使狀態(tài)不穩(wěn)定。需要執(zhí)行以下步驟以確保消息僅處理一次 -
小批量處理元組。
為每個批次分配唯一的ID。如果重試批次,則給予相同的唯一ID。
狀態(tài)更新在批次之間排序。例如,第二批次的狀態(tài)更新將不可能,直到第一批次的狀態(tài)更新完成為止。
分布式RPC用于查詢和檢索Trident拓撲結果。 Storm有一個內置的分布式RPC服務器。分布式RPC服務器從客戶端接收RPC請求并將其傳遞到拓撲。拓撲處理請求并將結果發(fā)送到分布式RPC服務器,分布式RPC服務器將其重定向到客戶端。Trident的分布式RPC查詢像正常的RPC查詢一樣執(zhí)行,除了這些查詢并行運行的事實。
在許多使用情況下,如果要求是只處理一次查詢,我們可以通過在Trident中編寫拓撲來實現(xiàn)。另一方面,在Storm的情況下將難以實現(xiàn)精確的一次處理。因此,Trident將對那些需要一次處理的用例有用。Trident不適用于所有用例,特別是高性能用例,因為它增加了Storm的復雜性并管理狀態(tài)。
我們將把上一節(jié)中制定的呼叫日志分析器應用程序轉換為Trident框架。由于其高級API,Trident應用程序將比普通風暴更容易。Storm基本上需要執(zhí)行Trident中的Function,F(xiàn)ilter,Aggregate,GroupBy,Join和Merge操作中的任何一個。最后,我們將使用LocalDRPC類啟動DRPC服務器,并使用LocalDRPC類的execute方法搜索一些關鍵字。
FormatCall類的目的是格式化包括“呼叫者號碼”和“接收者號碼”的呼叫信息。完整的程序代碼如下 -
import backtype.storm.tuple.Values; import storm.trident.operation.BaseFunction; import storm.trident.operation.TridentCollector; import storm.trident.tuple.TridentTuple; public class FormatCall extends BaseFunction { @Override public void execute(TridentTuple tuple, TridentCollector collector) { String fromMobileNumber = tuple.getString(0); String toMobileNumber = tuple.getString(1); collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber)); } }
CSVSplit類的目的是基于“comma(,)”拆分輸入字符串,并發(fā)出字符串中的每個字。此函數(shù)用于解析分布式查詢的輸入?yún)?shù)。完整的代碼如下 -
import backtype.storm.tuple.Values; import storm.trident.operation.BaseFunction; import storm.trident.operation.TridentCollector; import storm.trident.tuple.TridentTuple; public class CSVSplit extends BaseFunction { @Override public void execute(TridentTuple tuple, TridentCollector collector) { for(String word: tuple.getString(0).split(",")) { if(word.length() > 0) { collector.emit(new Values(word)); } } } }
這是主要的應用程序。最初,應用程序將使用FeederBatchSpout初始化TridentTopology并提供調用者信息。Trident拓撲流可以使用TridentTopology類的newStream方法創(chuàng)建。類似地,Trident拓撲DRPC流可以使用TridentTopology類的newDRCPStream方法創(chuàng)建??梢允褂肔ocalDRPC類創(chuàng)建一個簡單的DRCP服務器。LocalDRPC有execute方法來搜索一些關鍵字。完整的代碼如下。
import java.util.*; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.utils.DRPCClient; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import storm.trident.TridentState; import storm.trident.TridentTopology; import storm.trident.tuple.TridentTuple; import storm.trident.operation.builtin.FilterNull; import storm.trident.operation.builtin.Count; import storm.trident.operation.builtin.Sum; import storm.trident.operation.builtin.MapGet; import storm.trident.operation.builtin.Debug; import storm.trident.operation.BaseFilter; import storm.trident.testing.FixedBatchSpout; import storm.trident.testing.FeederBatchSpout; import storm.trident.testing.Split; import storm.trident.testing.MemoryMapState; import com.google.common.collect.ImmutableList; public class LogAnalyserTrident { public static void main(String[] args) throws Exception { System.out.println("Log Analyser Trident"); TridentTopology topology = new TridentTopology(); FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber", "toMobileNumber", "duration")); TridentState callCounts = topology .newStream("fixed-batch-spout", testSpout) .each(new Fields("fromMobileNumber", "toMobileNumber"), new FormatCall(), new Fields("call")) .groupBy(new Fields("call")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")); LocalDRPC drpc = new LocalDRPC(); topology.newDRPCStream("call_count", drpc) .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count")); topology.newDRPCStream("multiple_call_count", drpc) .each(new Fields("args"), new CSVSplit(), new Fields("call")) .groupBy(new Fields("call")) .stateQuery(callCounts, new Fields("call"), new MapGet(), new Fields("count")) .each(new Fields("call", "count"), new Debug()) .each(new Fields("count"), new FilterNull()) .aggregate(new Fields("count"), new Sum(), new Fields("sum")); Config conf = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("trident", conf, topology.build()); Random randomGenerator = new Random(); int idx = 0; while(idx < 10) { testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", randomGenerator.nextInt(60)))); testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123403", randomGenerator.nextInt(60)))); testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123404", randomGenerator.nextInt(60)))); testSpout.feed(ImmutableList.of(new Values("1234123402", "1234123403", randomGenerator.nextInt(60)))); idx = idx + 1; } System.out.println("DRPC : Query starts"); System.out.println(drpc.execute("call_count","1234123401 - 1234123402")); System.out.println(drpc.execute("multiple_call_count", "1234123401 - 1234123402,1234123401 - 1234123403")); System.out.println("DRPC : Query ends"); cluster.shutdown(); drpc.shutdown(); // DRPCClient client = new DRPCClient("drpc.server.location", 3772); } }
完整的應用程序有三個Java代碼。他們如下 -
可以使用以下命令構建應用程序 -
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java
可以使用以下命令運行應用程序 -
java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident
一旦應用程序啟動,應用程序將輸出有關集群啟動過程,操作處理,DRPC服務器和客戶端信息的完整詳細信息,以及最后的集群關閉過程。此輸出將顯示在控制臺上,如下所示。
DRPC : Query starts [["1234123401 - 1234123402",10]] DEBUG: [1234123401 - 1234123402, 10] DEBUG: [1234123401 - 1234123403, 10] [[20]] DRPC : Query ends
更多建議: