我們已經(jīng)經(jīng)歷了Apache Storm的核心技術(shù)細(xì)節(jié),現(xiàn)在是時(shí)候編寫一些簡(jiǎn)單的場(chǎng)景。
移動(dòng)呼叫及其持續(xù)時(shí)間將作為對(duì)Apache Storm的輸入,Storm將處理和分組在相同呼叫者和接收者之間的呼叫及其呼叫總數(shù)。
Spout是用于數(shù)據(jù)生成的組件。基本上,一個(gè)spout將實(shí)現(xiàn)一個(gè)IRichSpout接口。 “IRichSpout”接口有以下重要方法 -
open -為Spout提供執(zhí)行環(huán)境。執(zhí)行器將運(yùn)行此方法來初始化噴頭。
nextTuple -通過收集器發(fā)出生成的數(shù)據(jù)。
close -當(dāng)spout將要關(guān)閉時(shí)調(diào)用此方法。
declareOutputFields -聲明元組的輸出模式。
ack -確認(rèn)處理了特定元組。
fail -指定不處理和不重新處理特定元組。
open方法的簽名如下 -
open(Map conf, TopologyContext context, SpoutOutputCollector collector)
conf - 為此spout提供storm配置。
context - 提供有關(guān)拓?fù)渲械?span>spout位置,其任務(wù)ID,輸入和輸出信息的完整信息。
collector - 使我們能夠發(fā)出將由bolts處理的元組。
nextTuple方法的簽名如下 -
nextTuple()
nextTuple()從與ack()和fail()方法相同的循環(huán)中定期調(diào)用。它必須釋放線程的控制,當(dāng)沒有工作要做,以便其他方法有機(jī)會(huì)被調(diào)用。因此,nextTuple的第一行檢查處理是否已完成。如果是這樣,它應(yīng)該休眠至少一毫秒,以減少處理器在返回之前的負(fù)載。
close方法的簽名如下-
close()
declareOutputFields方法的簽名如下-
declareOutputFields(OutputFieldsDeclarer declarer)
declarer -它用于聲明輸出流id,輸出字段等
此方法用于指定元組的輸出模式。
ack方法的簽名如下 -
ack(Object msgId)
該方法確認(rèn)已經(jīng)處理了特定元組。
nextTuple方法的簽名如下-
ack(Object msgId)
此方法通知特定元組尚未完全處理。 Storm將重新處理特定的元組。
在我們的場(chǎng)景中,我們需要收集呼叫日志詳細(xì)信息。呼叫日志的信息包含。
由于我們沒有呼叫日志的實(shí)時(shí)信息,我們將生成假呼叫日志。假信息將使用Random類創(chuàng)建。完整的程序代碼如下。
import java.util.*; //import storm tuple packages import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; //import Spout interface packages import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; //Create a class FakeLogReaderSpout which implement IRichSpout interface to access functionalities public class FakeCallLogReaderSpout implements IRichSpout { //Create instance for SpoutOutputCollector which passes tuples to bolt. private SpoutOutputCollector collector; private boolean completed = false; //Create instance for TopologyContext which contains topology data. private TopologyContext context; //Create instance for Random class. private Random randomGenerator = new Random(); private Integer idx = 0; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.context = context; this.collector = collector; } @Override public void nextTuple() { if(this.idx <= 1000) { List<String> mobileNumbers = new ArrayList<String>(); mobileNumbers.add("1234123401"); mobileNumbers.add("1234123402"); mobileNumbers.add("1234123403"); mobileNumbers.add("1234123404"); Integer localIdx = 0; while(localIdx++ < 100 && this.idx++ < 1000) { String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); while(fromMobileNumber == toMobileNumber) { toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4)); } Integer duration = randomGenerator.nextInt(60); this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration)); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("from", "to", "duration")); } //Override all the interface methods @Override public void close() {} public boolean isDistributed() { return false; } @Override public void activate() {} @Override public void deactivate() {} @Override public void ack(Object msgId) {} @Override public void fail(Object msgId) {} @Override public Map<String, Object> getComponentConfiguration() { return null; } }
Bolt是一個(gè)使用元組作為輸入,處理元組,并產(chǎn)生新的元組作為輸出的組件。Bolts將實(shí)現(xiàn)IRichBolt接口。在此程序中,使用兩個(gè)Bolts
類CallLogCreatorBolt和CallLogCounterBolt來執(zhí)行操作。
IRichBolt接口有以下方法 -
prepare -為bolt提供要執(zhí)行的環(huán)境。執(zhí)行器將運(yùn)行此方法來初始化spout。
execute -處理單個(gè)元組的輸入
cleanup -當(dāng)spout要關(guān)閉時(shí)調(diào)用。
declareOutputFields -聲明元組的輸出模式。
prepare(Map conf, TopologyContext context, OutputCollector collector)
conf -為此bolt提供Storm配置。
context -提供有關(guān)拓?fù)渲械?span>bolt位置,其任務(wù)ID,輸入和輸出信息等的完整信息。
collector -使我們能夠發(fā)出處理的元組。
execute方法的簽名如下-
execute(Tuple tuple)
這里的元組是要處理的輸入元組。
execute方法一次處理單個(gè)元組。元組數(shù)據(jù)可以通過Tuple類的getValue方法訪問。不必立即處理輸入元組。多元組可以被處理和輸出為單個(gè)輸出元組。處理的元組可以通過使用OutputCollector類發(fā)出。
cleanup方法的簽名如下 -
cleanup()
declareOutputFields方法的簽名如下-
declareOutputFields(OutputFieldsDeclarer declarer)
這里的參數(shù)declarer用于聲明輸出流id,輸出字段等。
此方法用于指定元組的輸出模式。
呼叫日志創(chuàng)建者bolt接收呼叫日志元組。呼叫日志元組具有主叫方號(hào)碼,接收方號(hào)碼和呼叫持續(xù)時(shí)間。此bolt通過組合主叫方號(hào)碼和接收方號(hào)碼簡(jiǎn)單地創(chuàng)建一個(gè)新值。新值的格式為“來電號(hào)碼 - 接收方號(hào)碼”,并將其命名為新字段“呼叫”。完整的代碼如下。
//import util packages import java.util.HashMap; import java.util.Map; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; //import Storm IRichBolt package import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; //Create a class CallLogCreatorBolt which implement IRichBolt interface public class CallLogCreatorBolt implements IRichBolt { //Create instance for OutputCollector which collects and emits tuples to produce output private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple tuple) { String from = tuple.getString(0); String to = tuple.getString(1); Integer duration = tuple.getInteger(2); collector.emit(new Values(from + " - " + to, duration)); } @Override public void cleanup() {} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("call", "duration")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
呼叫日志創(chuàng)建者bolt接收呼叫日志元組。呼叫日志元組具有主叫方號(hào)碼,接收方號(hào)碼和呼叫持續(xù)時(shí)間。此bolt通過組合主叫方號(hào)碼和接收方號(hào)碼簡(jiǎn)單地創(chuàng)建一個(gè)新值。新值的格式為“來電號(hào)碼 - 接收方號(hào)碼”,并將其命名為新字段“呼叫”。完整的代碼如下。
import java.util.HashMap; import java.util.Map; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class CallLogCounterBolt implements IRichBolt { Map<String, Integer> counterMap; private OutputCollector collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.counterMap = new HashMap<String, Integer>(); this.collector = collector; } @Override public void execute(Tuple tuple) { String call = tuple.getString(0); Integer duration = tuple.getInteger(1); if(!counterMap.containsKey(call)){ counterMap.put(call, 1); }else{ Integer c = counterMap.get(call) + 1; counterMap.put(call, c); } collector.ack(tuple); } @Override public void cleanup() { for(Map.Entry<String, Integer> entry:counterMap.entrySet()){ System.out.println(entry.getKey()+" : " + entry.getValue()); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("call")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
Storm拓?fù)浠旧鲜且粋€(gè)Thrift結(jié)構(gòu)。 TopologyBuilder類提供了簡(jiǎn)單而容易的方法來創(chuàng)建復(fù)雜的拓?fù)洹?span>TopologyBuilder類具有設(shè)置spout(setSpout)和設(shè)置bolt(setBolt)的方法。最后,TopologyBuilder有createTopology來創(chuàng)建拓?fù)洹J褂靡韵麓a片段創(chuàng)建拓?fù)?-
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout()); builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt()) .shuffleGrouping("call-log-reader-spout"); builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt()) .fieldsGrouping("call-log-creator-bolt", new Fields("call"));
shuffleGrouping和fieldsGrouping方法有助于為spout和bolts設(shè)置流分組。
為了開發(fā)目的,我們可以使用“LocalCluster”對(duì)象創(chuàng)建本地集群,然后使用“LocalCluster”類的“submitTopology”方法提交拓?fù)洹?“submitTopology”的參數(shù)之一是“Config”類的實(shí)例。“Config”類用于在提交拓?fù)渲霸O(shè)置配置選項(xiàng)。此配置選項(xiàng)將在運(yùn)行時(shí)與集群配置合并,并使用prepare方法發(fā)送到所有任務(wù)(spout和bolt)。一旦拓?fù)涮峤坏郊?,我們將等?0秒鐘,集群計(jì)算提交的拓?fù)?,然后使用“LocalCluster”的“shutdown”方法關(guān)閉集群。完整的程序代碼如下 -
import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; //import storm configuration packages import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; //Create main class LogAnalyserStorm submit topology. public class LogAnalyserStorm { public static void main(String[] args) throws Exception{ //Create Config instance for cluster configuration Config config = new Config(); config.setDebug(true); // TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout()); builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt()) .shuffleGrouping("call-log-reader-spout"); builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt()) .fieldsGrouping("call-log-creator-bolt", new Fields("call")); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology()); Thread.sleep(10000); //Stop the topology cluster.shutdown(); } }
完整的應(yīng)用程序有四個(gè)Java代碼。它們是 -
應(yīng)用程序可以使用以下命令構(gòu)建 -
javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java
應(yīng)用程序可以使用以下命令運(yùn)行 -
java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm
一旦應(yīng)用程序啟動(dòng),它將輸出有關(guān)集群?jiǎn)?dòng)過程,spout和螺栓處理的完整詳細(xì)信息,最后是集群關(guān)閉過程。在“CallLogCounterBolt”中,我們打印了呼叫及其計(jì)數(shù)詳細(xì)信息。此信息將顯示在控制臺(tái)上如下 -
1234123402 - 1234123401 : 78 1234123402 - 1234123404 : 88 1234123402 - 1234123403 : 105 1234123401 - 1234123404 : 74 1234123401 - 1234123403 : 81 1234123401 - 1234123402 : 81 1234123403 - 1234123404 : 86 1234123404 - 1234123401 : 63 1234123404 - 1234123402 : 82 1234123403 - 1234123402 : 83 1234123404 - 1234123403 : 86 1234123403 - 1234123401 : 93
Storm拓?fù)渫ㄟ^Thrift接口實(shí)現(xiàn),這使得輕松地提交任何語言的拓?fù)?。Storm支持Ruby,Python和許多其他語言。讓我們來看看python綁定。
Python是一種通用的解釋,交互,面向?qū)ο蠛透呒?jí)編程語言。Storm支持Python實(shí)現(xiàn)其拓?fù)?。Python支持發(fā)射,錨定,acking和日志操作。
如你所知,bolt可以用任何語言定義。用另一種語言編寫的bolt作為子進(jìn)程執(zhí)行,Storm通過stdin / stdout與JSON消息進(jìn)行通信。首先拿一個(gè)支持python綁定的樣例bolt WordCount。
public static class WordCount implements IRichBolt { public WordSplit() { super("python", "splitword.py"); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
這里的類WordCount實(shí)現(xiàn)IRichBolt接口和運(yùn)行與python實(shí)現(xiàn)指定超級(jí)方法參數(shù)“splitword.py”?,F(xiàn)在創(chuàng)建一個(gè)名為“splitword.py”的python實(shí)現(xiàn)。
import storm class WordCountBolt(storm.BasicBolt): def process(self, tup): words = tup.values[0].split(" ") for word in words: storm.emit([word]) WordCountBolt().run()
這是Python的示例實(shí)現(xiàn),它計(jì)算給定句子中的單詞。同樣,您也可以與其他支持語言綁定。
更多建議: