MapReduce是一個(gè)框架,我們可以使用它來編寫應(yīng)用程序,以可靠的方式并行地處理大量商品硬件群集上的大量數(shù)據(jù)。
MapReduce是一種基于java的分布式計(jì)算的處理技術(shù)和程序模型。 MapReduce算法包含兩個(gè)重要任務(wù),即Map和Reduce。Map采用一組數(shù)據(jù)并將其轉(zhuǎn)換為另一組數(shù)據(jù),其中各個(gè)元素被分解為元組(鍵/值對)。其次,reduce任務(wù),它將map的輸出作為輸入,并將這些數(shù)據(jù)元組合并成一組較小的元組。作為MapReduce名稱的順序,reduce任務(wù)總是在map作業(yè)之后執(zhí)行。
MapReduce的主要優(yōu)點(diǎn)是易于在多個(gè)計(jì)算節(jié)點(diǎn)上擴(kuò)展數(shù)據(jù)處理。在MapReduce模型下,數(shù)據(jù)處理原語稱為映射器和縮減器。將數(shù)據(jù)處理應(yīng)用程序分解為映射器和簡化器有時(shí)并不重要。但是,一旦我們以MapReduce形式編寫應(yīng)用程序,擴(kuò)展應(yīng)用程序以在集群中運(yùn)行數(shù)百,數(shù)千甚至數(shù)萬臺機(jī)器只是一種配置更改。這種簡單的可擴(kuò)展性是吸引許多程序員使用MapReduce模型的原因。
通常MapReduce范例是基于將計(jì)算機(jī)發(fā)送到數(shù)據(jù)所在的位置!
MapReduce程序在三個(gè)階段執(zhí)行,即map階段,shuffle階段和reduce階段。
Map 階段 :映射或映射器的作業(yè)是處理輸入數(shù)據(jù)。一般來說,輸入數(shù)據(jù)是以文件或目錄的形式存儲在Hadoop文件系統(tǒng)(HDFS)中。輸入文件逐行傳遞到映射器函數(shù)。映射器處理數(shù)據(jù)并創(chuàng)建幾個(gè)小塊的數(shù)據(jù)。
Reduce 階段 :這個(gè)階段是Shuffle階段和Reduce階段的組合。 Reducer的工作是處理來自映射器的數(shù)據(jù)。處理后,它產(chǎn)生一組新的輸出,將存儲在HDFS中。
在MapReduce作業(yè)期間,Hadoop將Map和Reduce任務(wù)發(fā)送到集群中的相應(yīng)服務(wù)器。
該框架管理數(shù)據(jù)傳遞的所有細(xì)節(jié),例如發(fā)出任務(wù),驗(yàn)證任務(wù)完成,以及在節(jié)點(diǎn)之間復(fù)制集群周圍的數(shù)據(jù)。
大多數(shù)計(jì)算發(fā)生在節(jié)點(diǎn)上,本地磁盤上的數(shù)據(jù)減少了網(wǎng)絡(luò)流量。
完成給定任務(wù)后,集群收集并減少數(shù)據(jù)以形成適當(dāng)?shù)慕Y(jié)果,并將其發(fā)送回Hadoop服務(wù)器。
MapReduce框架對<key,value>對進(jìn)行操作,也就是說,框架將作業(yè)的輸入視為一組<key,value>對,并生成一組<key,value>對作為作業(yè)輸出,可能是不同類型。
鍵和值類應(yīng)該由框架以序列化的方式,因此,需要實(shí)現(xiàn)Writable接口。此外,鍵類必須實(shí)現(xiàn)Writable-Comparable接口,以方便框架進(jìn)行排序。MapReduce作業(yè)的輸入和輸出類型:(輸入)<k1,v1> - > map - > <k2,v2> - > reduce - > <k3,v3>(輸出)。
輸入 | 輸出 | |
---|---|---|
Map | <k1, v1> | list (<k2, v2>) |
Reduce | <k2, list(v2)> | list (<k3, v3>) |
PayLoad - 應(yīng)用程序?qū)崿F(xiàn)Map和Reduce功能,并形成作業(yè)的核心。
Mapper- 映射器將輸入鍵/值對映射到一組中間鍵/值對。
NamedNode - 管理Hadoop分布式文件系統(tǒng)(HDFS)的節(jié)點(diǎn)。
DataNode - 在任何處理發(fā)生之前提前呈現(xiàn)數(shù)據(jù)的節(jié)點(diǎn)。
MasterNode - JobTracker運(yùn)行并接受來自客戶端的作業(yè)請求的節(jié)??點(diǎn)。
SlaveNode - Map和Reduce程序運(yùn)行的節(jié)點(diǎn)。
JobTracker - 計(jì)劃作業(yè)并跟蹤將作業(yè)分配給任務(wù)跟蹤器。
Task Tracker - 跟蹤任務(wù)并向JobTracker報(bào)告狀態(tài)。
Job - 程序是跨數(shù)據(jù)集的Mapper和Reducer的執(zhí)行。
Task - 在一個(gè)數(shù)據(jù)片段上執(zhí)行Mapper或Reducer。
Task Attempt - 嘗試在SlaveNode上執(zhí)行任務(wù)的特定實(shí)例。
下面給出了關(guān)于組織的電力消耗的數(shù)據(jù)。它包含每月的電力消耗和各年的年平均值。
一月 | 二月 | 三月 | 四月 | 五月 | 六月 | 七月 | 八月 | 九月 | 十月 | 十一月 | 十二月 | 平均 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
1979年 | 23 | 23 | 2 | 43 | 24 | 25 | 26 | 26 | 26 | 26 | 25 | 26 | 25 |
1980年 | 26 | 27 | 28 | 28 | 28 | 30 | 31 | 31 | 31 | 30 | 30 | 30 | 29 |
1981年 | 31 | 32 | 32 | 32 | 33 | 34 | 35 | 36 | 36 | 34 | 34 | 34 | 34 |
1984年 | 39 | 38 | 39 | 39 | 39 | 41 | 42 | 43 | 40 | 39 | 38 | 38 | 40 |
1985年 | 38 | 39 | 39 | 39 | 39 | 41 | 41 | 41 | 00 | 40 | 39 | 39 | 45 |
如果上述數(shù)據(jù)作為輸入,我們必須編寫應(yīng)用程序來處理它,并產(chǎn)生結(jié)果,如找到最大使用年份,最小使用年份等。這是一個(gè)對于有限數(shù)量的記錄的程序員的walkover。它們將簡單地寫入邏輯以產(chǎn)生所需的輸出,并將數(shù)據(jù)傳遞給所寫的應(yīng)用程序。
但是,考慮一個(gè)特定國家的所有大型產(chǎn)業(yè)的電力消耗的數(shù)據(jù),因?yàn)樗男纬伞?/span>
當(dāng)我們編寫應(yīng)用程序來處理這樣的批量數(shù)據(jù)時(shí),
為了解決這些問題,我們有MapReduce框架。
上述數(shù)據(jù)保存為sample.txt并作為輸入。輸入文件如下所示。
1979 23 23 2 43 24 25 26 26 26 26 25 26 25 1980 26 27 28 28 28 30 31 31 31 30 30 30 29 1981 31 32 32 32 33 34 35 36 36 34 34 34 34 1984 39 38 39 39 39 41 42 43 40 39 38 38 40 1985 38 39 39 39 39 41 41 41 00 40 39 39 45
下面給出了程序?qū)κ褂肕apReduce框架的示例數(shù)據(jù)。
package hadoop; import java.util.*; import java.io.IOException; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class ProcessUnits { //Mapper class public static class E_EMapper extends MapReduceBase implements Mapper<LongWritable ,/*Input key Type */ Text, /*Input value Type*/ Text, /*Output key Type*/ IntWritable> /*Output value Type*/ { //Map function public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); String lasttoken = null; StringTokenizer s = new StringTokenizer(line," "); String year = s.nextToken(); while(s.hasMoreTokens()) { lasttoken=s.nextToken(); } int avgprice = Integer.parseInt(lasttoken); output.collect(new Text(year), new IntWritable(avgprice)); } } //Reducer class public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable > { //Reduce function public void reduce( Text key, Iterator <IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int maxavg=30; int val=Integer.MIN_VALUE; while (values.hasNext()) { if((val=values.next().get())>maxavg) { output.collect(key, new IntWritable(val)); } } } } //Main function public static void main(String args[])throws Exception { JobConf conf = new JobConf(ProcessUnits.class); conf.setJobName("max_eletricityunits"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(E_EMapper.class); conf.setCombinerClass(E_EReduce.class); conf.setReducerClass(E_EReduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } }
將上述程序保存為ProcessUnits.java。程序的編譯和執(zhí)行說明如下。
讓我們假設(shè)在Hadoop用戶的主目錄(例如/home/hadoop)。。
按照以下步驟編譯并執(zhí)行上述程序。
以下命令是創(chuàng)建一個(gè)目錄來存儲編譯的java類。
$ mkdir units
下載Hadoop-core-1.2.1.jar,用于編譯和執(zhí)行MapReduce程序。訪問以下鏈接http://mvnrepository.com/artifact/org.apache.hadoop/hadoop-core/1.2.1下載jar。讓我們假設(shè)下載的文件夾是/ home / hadoop /。
以下命令用于編譯ProcessUnits.java程序并為該程序創(chuàng)建一個(gè)jar。
$ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java $ jar -cvf units.jar -C units/ .
以下命令用于在HDFS中創(chuàng)建輸入目錄。
$HADOOP_HOME/bin/hadoop fs -mkdir input_dir
下命令用于復(fù)制名為sample.txt的輸入文件,在HDFS的輸入目錄中。
$HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir
以下命令用于驗(yàn)證輸入目錄中的文件。
$HADOOP_HOME/bin/hadoop fs -ls input_dir/
以下命令用于通過從輸入目錄獲取輸入文件來運(yùn)行Eleunit_max應(yīng)用程序。
$HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir
等待一段時(shí)間,直到文件被執(zhí)行。執(zhí)行后,如下所示,輸出將包含輸入拆分的數(shù)量,Map任務(wù)的數(shù)量,reducer任務(wù)的數(shù)量等。
INFO mapreduce.Job: Job job_1414748220717_0002 completed successfully 14/10/31 06:02:52 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=61 FILE: Number of bytes written=279400 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=546 HDFS: Number of bytes written=40 HDFS: Number of read operations=9 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=2 Launched reduce tasks=1 Data-local map tasks=2 Total time spent by all maps in occupied slots (ms)=146137 Total time spent by all reduces in occupied slots (ms)=441 Total time spent by all map tasks (ms)=14613 Total time spent by all reduce tasks (ms)=44120 Total vcore-seconds taken by all map tasks=146137 Total vcore-seconds taken by all reduce tasks=44120 Total megabyte-seconds taken by all map tasks=149644288 Total megabyte-seconds taken by all reduce tasks=45178880 Map-Reduce Framework Map input records=5 Map output records=5 Map output bytes=45 Map output materialized bytes=67 Input split bytes=208 Combine input records=5 Combine output records=5 Reduce input groups=5 Reduce shuffle bytes=6 Reduce input records=5 Reduce output records=5 Spilled Records=10 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=948 CPU time spent (ms)=5160 Physical memory (bytes) snapshot=47749120 Virtual memory (bytes) snapshot=2899349504 Total committed heap usage (bytes)=277684224 File Output Format Counters Bytes Written=40
以下命令用于驗(yàn)證輸出文件夾中的結(jié)果文件。
$HADOOP_HOME/bin/hadoop fs -ls output_dir/
以下命令用于查看Part-00000文件中的輸出。此文件由HDFS生成。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000
下面是MapReduce程序生成的輸出。
1981 34 1984 40 1985 45
以下命令用于將輸出文件夾從HDFS復(fù)制到本地文件系統(tǒng)進(jìn)行分析。
$HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop
所有Hadoop命令都由$ HADOOP_HOME / bin / hadoop命令調(diào)用。運(yùn)行不帶任何參數(shù)的Hadoop腳本會打印所有命令的描述。
用法 :hadoop [--config confdir] COMMAND
下表列出了可用的選項(xiàng)及其說明。
選項(xiàng) | 描述 |
---|---|
namenode -format | 格式化DFS文件系統(tǒng)。 |
secondarynamenode | 運(yùn)行DFS二次名稱節(jié)點(diǎn)。 |
namenode | 運(yùn)行DFS名稱節(jié)點(diǎn)。 |
datanode | 運(yùn)行DFS數(shù)據(jù)節(jié)點(diǎn)。 |
dfsadmin | 運(yùn)行DFS管理客戶端。 |
mradmin | 運(yùn)行Map-Reduce管理客戶端。 |
fsck | 運(yùn)行DFS文件系統(tǒng)檢查實(shí)用程序。 |
fs | 運(yùn)行通用文件系統(tǒng)用戶客戶端。 |
balancer | 運(yùn)行集群平衡實(shí)用程序。 |
oiv | 將離線fsimage查看器應(yīng)用于fsimage。 |
fetchdt | 從NameNode獲取委派令牌。 |
jobtracker | 運(yùn)行MapReduce作業(yè)跟蹤節(jié)點(diǎn)。 |
pipes | 運(yùn)行管道作業(yè)。 |
tasktracker | 運(yùn)行MapReduce任務(wù)跟蹤節(jié)點(diǎn)。 |
historyserver | 作為獨(dú)立的守護(hù)程序運(yùn)行作業(yè)歷史記錄服務(wù)器。 |
job | 操作MapReduce作業(yè)。 |
queue | 獲取有關(guān)JobQueues的信息。 |
version | 打印版本。 |
jar <jar> | 運(yùn)行jar文件。 |
distcp <srcurl> <desturl> | 遞歸復(fù)制文件或目錄。 |
distcp2 <srcurl> <desturl> | DistCp版本2。 |
archive -archiveName NAME -p | 創(chuàng)建hadoop歸檔。 |
<parent path> <src>* <dest> | |
classpath | 打印獲取Hadoop jar所需的類路徑和所需的庫。 |
daemonlog | 獲取/設(shè)置每個(gè)守護(hù)程序的日志級別 |
用法:Hadoop的工作[GENERIC_OPTIONS]
以下是在Hadoop作業(yè)的可用通用的選項(xiàng)。
通用選項(xiàng) | 描述 |
---|---|
-submit <job-file> | 提交作業(yè)。 |
-status <job-id> | 打印映射并減少完成百分比和所有作業(yè)計(jì)數(shù)器。 |
-counter <job-id> <group-name> <countername> | 打印計(jì)數(shù)器值。 |
-kill <job-id> | 終止作業(yè) |
-events <job-id> <fromevent-#> <#-of-events> | 打印jobtracker為給定范圍接收的事件詳細(xì)信息。 |
-history [all] <jobOutputDir> - history < jobOutputDir> | 打印作業(yè)詳細(xì)信息,失敗并停用提示詳細(xì)信息。可以通過指定[all]選項(xiàng)查看有關(guān)作業(yè)的更多詳細(xì)信息,如每個(gè)任務(wù)的成功任務(wù)和任務(wù)嘗試。 |
-list[all] | 顯示所有作業(yè)。 -list僅顯示尚未完成的作業(yè)。 |
-kill-task <task-id> | 終止任務(wù)。已終止的任務(wù)不會計(jì)入失敗的嘗試次數(shù)。 |
-fail-task <task-id> | 失敗的任務(wù)。失敗的任務(wù)將根據(jù)失敗的嘗試進(jìn)行計(jì)數(shù)。 |
-set-priority <job-id> <priority> | 更改作業(yè)的優(yōu)先級。允許的優(yōu)先級值為VERY_HIGH,HIGH,NORMAL,LOW,VERY_LOW |
$ $HADOOP_HOME/bin/hadoop job -status <JOB-ID> e.g. $ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004
$ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME> e.g. $ $HADOOP_HOME/bin/hadoop job -history /user/expert/output
$ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID> e.g. $ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004
更多建議: