Hadoop流是Hadoop發(fā)行版附帶的一個(gè)實(shí)用程序。此實(shí)用程序允許您使用任何可執(zhí)行文件或腳本作為映射程序和/或reducer創(chuàng)建和運(yùn)行Map / Reduce作業(yè)。
對(duì)于Hadoop流,我們正在考慮字?jǐn)?shù)問題。 Hadoop中的任何作業(yè)必須有兩個(gè)階段:mapper和reducer。我們已經(jīng)為python腳本中的mapper和reducer編寫了代碼,以便在Hadoop下運(yùn)行它。也可以在Perl和Ruby中寫同樣的內(nèi)容。
!/usr/bin/python import sys # Input takes from standard input for myline in sys.stdin: # Remove whitespace either side myline = myline.strip() # Break the line into words words = myline.split() # Iterate the words list for myword in words: # Write the results to standard output print '%s %s' % (myword, 1)
確保此文件具有執(zhí)行權(quán)限(chmod + x /home/expert /hadoop-1.2.1 / mapper.py)。
#!/usr/bin/python from operator import itemgetter import sys current_word = "" current_count = 0 word = "" # Input takes from standard input for myline in sys.stdin: # Remove whitespace either side myline = myline.strip() # Split the input we got from mapper.py word, count = myline.split(' ', 1) # Convert count variable to integer try: count = int(count) except ValueError: # Count was not a number, so silently ignore this line continue if current_word == word: current_count += count else: if current_word: # Write result to standard output print '%s %s' % (current_word, current_count) current_count = count current_word = word # Do not forget to output the last word if needed! if current_word == word: print '%s %s' % (current_word, current_count)
將mapper和reducer代碼保存在Hadoop主目錄中的mapper.py和reducer.py中。確保這些文件具有執(zhí)行權(quán)限(chmod + x mapper.py和chmod + x reducer.py)。因?yàn)閜ython是縮進(jìn)敏感所以相同的代碼可以從下面的鏈接下載。
$ $HADOOP_HOME/bin/hadoop jar contrib/streaming/hadoop-streaming-1. 2.1.jar -input input_dirs -output output_dir -mapper <path/mapper.py -reducer <path/reducer.py
其中“\”用于行連續(xù)以便清楚可讀性。
./bin/hadoop jar contrib/streaming/hadoop-streaming-1.2.1.jar -input myinput -output myoutput -mapper /home/expert/hadoop-1.2.1/mapper.py -reducer /home/expert/hadoop-1.2.1/reducer.py
在上面的示例中,mapper和reducer都是從標(biāo)準(zhǔn)輸入讀取輸入并將輸出發(fā)送到標(biāo)準(zhǔn)輸出的python腳本。該實(shí)用程序?qū)?chuàng)建一個(gè)Map / Reduce作業(yè),將作業(yè)提交到適當(dāng)?shù)娜杭⒈O(jiān)視作業(yè)的進(jìn)度,直到作業(yè)完成。
當(dāng)為映射器指定腳本時(shí),每個(gè)映射器任務(wù)將在映射器初始化時(shí)作為單獨(dú)的進(jìn)程啟動(dòng)腳本。當(dāng)映射器任務(wù)運(yùn)行時(shí),它將其輸入轉(zhuǎn)換為行,并將這些行饋送到進(jìn)程的標(biāo)準(zhǔn)輸入(STDIN)。同時(shí),映射器從進(jìn)程的標(biāo)準(zhǔn)輸出(STDOUT)收集面向行的輸出,并將每行轉(zhuǎn)換為鍵/值對(duì),作為映射器的輸出收集。默認(rèn)情況下,直到第一個(gè)制表符字符的行的前綴是鍵,行的其余部分(不包括制表符字符)將是值。如果行中沒有制表符,則整個(gè)行被視為鍵,值為null。但是,這可以根據(jù)一個(gè)需要定制。
當(dāng)為reducer指定腳本時(shí),每個(gè)reducer任務(wù)將作為單獨(dú)的進(jìn)程啟動(dòng)腳本,然后初始化reducer。當(dāng)reducer任務(wù)運(yùn)行時(shí),它將其輸入鍵/值對(duì)轉(zhuǎn)換為行,并將行饋送到進(jìn)程的標(biāo)準(zhǔn)輸入(STDIN)。同時(shí),reducer從進(jìn)程的標(biāo)準(zhǔn)輸出(STDOUT)收集面向行的輸出,將每行轉(zhuǎn)換為鍵/值對(duì),將其作為reducer的輸出進(jìn)行收集。默認(rèn)情況下,直到第一個(gè)制表符字符的行的前綴是鍵,行的其余部分(不包括制表符字符)是值。但是,這可以根據(jù)特定要求進(jìn)行定制。
參數(shù) | 描述 |
---|---|
-input directory/file-name | 輸入mapper的位置。(需要) |
-output directory-name | 減速器的輸出位置。(需要) |
-mapper executable or script or JavaClassName | Mapper可執(zhí)行文件。(需要) |
-reducer executable or script or JavaClassName | Reducer可執(zhí)行文件。(需要) |
-file file-name | 使mapper,reducer或combiner可執(zhí)行文件在計(jì)算節(jié)點(diǎn)本地可用。 |
-inputformat JavaClassName | 你提供的類應(yīng)該返回Text類的鍵/值對(duì)。如果未指定,則使用TextInputFormat作為默認(rèn)值。 |
-outputformat JavaClassName | 您提供的類應(yīng)該采用Text類的鍵/值對(duì)。如果未指定,則使用TextOutputformat作為默認(rèn)值。 |
-partitioner JavaClassName | 確定將鍵發(fā)送到哪個(gè)reduce的類。 |
-combiner streamingCommand or JavaClassName | 組合器可執(zhí)行映射輸出。 |
-cmdenv name=value | 將環(huán)境變量傳遞到流式命令。 |
-inputreader | 對(duì)于向后兼容性:指定記錄讀取器類(而不是輸入格式類)。 |
-verbose | 詳細(xì)輸出。 |
-lazyOutput | 創(chuàng)建輸出延遲。例如,如果輸出格式基于FileOutputFormat,則輸出文件僅在首次調(diào)用output.collect(或Context.write)時(shí)創(chuàng)建。 |
-numReduceTasks | 指定Reducer的數(shù)量。 |
-mapdebug | 映射任務(wù)失敗時(shí)調(diào)用的腳本。 |
-reducedebug | 當(dāng)reduce任務(wù)失敗時(shí)調(diào)用的腳本。 |
更多建議: