如何在Hadoop中使用Streaming编写MapReduce
1个回答
推荐于2016-02-20 · 知道合伙人数码行家
huanglenzhi
知道合伙人数码行家
向TA提问 私信TA
知道合伙人数码行家
采纳数:117538
获赞数:517198
长期从事计算机组装,维护,网络组建及管理。对计算机硬件、操作系统安装、典型网络设备具有详细认知。
向TA提问 私信TA
关注
展开全部
Michael G. Noll在他的Blog中提到如何在Hadoop中用Python编写MapReduce程序,韩国的gogamza在其Bolg中也提到如何用C编写MapReduce程序(我稍微修改了一下原程序,因为他的Map对单词切分使用tab键)。我合并他们两人的文章,也让国内的Hadoop用户能够使用别的语言来编写MapReduce程序。
首先您得配好您的Hadoop集群,这方面的介绍网上比较多,这儿给个链接(Hadoop学习笔记二 安装部署)。Hadoop Streaming帮 助我们用非Java的编程语言使用MapReduce,Streaming用STDIN (标准输入)和STDOUT (标准输出)来和我们编写的Map和Reduce进行数据的交换数据。任何能够使用STDIN和STDOUT都可以用来编写MapReduce程序,比如 我们用Python的sys.stdin和sys.stdout,或者是C中的stdin和stdout。
我们还是使用Hadoop的例子WordCount来做示范如何编写MapReduce,在WordCount的例子中我们要解决计算在一批文档中每一个单词的出现频率。首先我们在Map程序中会接受到这批文档每一行的数据,然后我们编写的Map程序把这一行按空格切开成一个数组。并对这个数组遍历按" 1"用标准的输出输出来,代表这个单词出现了一次。在Reduce中我们来统计单词的出现频率。
Python Code
Map: mapper.py
#!/usr/bin/env python
import sys
# maps words to their counts
word2count = {}
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words while removing any empty strings
words = filter(lambda word: word, line.split())
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited; the trivial word count is 1
print '%s\t%s' % (word, 1)
Reduce: reducer.py
#!/usr/bin/env python
from operator import itemgetter
import sys
# maps words to their counts
word2count = {}
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split()
# convert count (currently a string) to int
try:
count = int(count)
word2count[word] = word2count.get(word, 0) + count
except ValueError:
# count was not a number, so silently
# ignore/discard this line
pass
# sort the words lexigraphically;
#
# this step is NOT required, we just do it so that our
# final output will look more like the official Hadoop
# word count examples
sorted_word2count = sorted(word2count.items(), key=itemgetter(0))
# write the results to STDOUT (standard output)
for word, count in sorted_word2count:
print '%s\t%s'% (word, count)
C Code
Map: Mapper.c
#include
#include
#include
#include
#define BUF_SIZE 2048
#define DELIM "\n"
int main(int argc, char *argv[]){
char buffer[BUF_SIZE];
while(fgets(buffer, BUF_SIZE - 1, stdin)){
int len = strlen(buffer);
if(buffer[len-1] == '\n')
buffer[len-1] = 0;
char *querys = index(buffer, ' ');
char *query = NULL;
if(querys == NULL) continue;
querys += 1; /* not to include '\t' */
query = strtok(buffer, " ");
while(query){
printf("%s\t1\n", query);
query = strtok(NULL, " ");
}
}
return 0;
}
h>h>h>h>
Reduce: Reducer.c
#include
#include
#include
#include
#define BUFFER_SIZE 1024
#define DELIM "\t"
int main(int argc, char *argv[]){
char strLastKey[BUFFER_SIZE];
char strLine[BUFFER_SIZE];
int count = 0;
*strLastKey = '\0';
*strLine = '\0';
while( fgets(strLine, BUFFER_SIZE - 1, stdin) ){
char *strCurrKey = NULL;
char *strCurrNum = NULL;
strCurrKey = strtok(strLine, DELIM);
strCurrNum = strtok(NULL, DELIM); /* necessary to check error but.... */
if( strLastKey[0] == '\0'){
strcpy(strLastKey, strCurrKey);
}
if(strcmp(strCurrKey, strLastKey)){
printf("%s\t%d\n", strLastKey, count);
count = atoi(strCurrNum);
}else{
count += atoi(strCurrNum);
}
strcpy(strLastKey, strCurrKey);
}
printf("%s\t%d\n", strLastKey, count); /* flush the count */
return 0;
}
h>h>h>h>
首先我们调试一下源码:
chmod +x mapper.py
chmod +x reducer.py
echo "foo foo quux labs foo bar quux" | ./mapper.py | ./reducer.py
bar 1
foo 3
labs 1
quux 2
g++ Mapper.c -o Mapper
g++ Reducer.c -o Reducer
chmod +x Mapper
chmod +x Reducer
echo "foo foo quux labs foo bar quux" | ./Mapper | ./Reducer
bar 1
foo 2
labs 1
quux 1
foo 1
quux 1
你可能看到C的输出和Python的不一样,因为Python是把他放在词典里了.我们在Hadoop时,会对这进行排序,然后相同的单词会连续在标准输出中输出.
在Hadoop中运行程序
首先我们要下载我们的测试文档wget http://www.gutenberg.org/dirs/etext04/7ldvc10.txt.我们把文档存放在/tmp/doc这个目录下.拷贝测试文档到HDFS中.
bin/hadoop dfs -copyFromLocal /tmp/doc doc
运行 bin/hadoop dfs -ls doc 看看拷贝是否成功.接下来我们运行我们的MapReduce的Job.
bin/hadoop jar /home/hadoop/contrib/hadoop-0.15.1-streaming.jar -mapper /home/hadoop/Mapper\
-reducer /home/hadoop/Reducer -input doc/* -output c-output -jobconf mapred.reduce.tasks=1
additionalConfSpec_:null
null=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming
packageJobJar: [] [/home/msh/hadoop-0.15.1/contrib/hadoop-0.15.1-streaming.jar] /tmp/streamjob60816.jar tmpDir=null
08/03/04 19:03:13 INFO mapred.FileInputFormat: Total input paths to process : 1
08/03/04 19:03:13 INFO streaming.StreamJob: getLocalDirs(): [/home/msh/data/filesystem/mapred/local]
08/03/04 19:03:13 INFO streaming.StreamJob: Running job: job_200803031752_0003
08/03/04 19:03:13 INFO streaming.StreamJob: To kill this job, run:
08/03/04 19:03:13 INFO streaming.StreamJob: /home/msh/hadoop/bin/../bin/hadoop job -Dmapred.job.tracker=192.168.2.92:9001 -kill job_200803031752_0003
08/03/04 19:03:13 INFO streaming.StreamJob: Tracking URL: http://hadoop-master:50030/jobdetails.jsp?jobid=job_200803031752_0003
08/03/04 19:03:14 INFO streaming.StreamJob: map 0% reduce 0%
08/03/04 19:03:15 INFO streaming.StreamJob: map 33% reduce 0%
08/03/04 19:03:16 INFO streaming.StreamJob: map 100% reduce 0%
08/03/04 19:03:19 INFO streaming.StreamJob: map 100% reduce 100%
08/03/04 19:03:19 INFO streaming.StreamJob: Job complete: job_200803031752_0003
08/03/04 19:03:19 INFO streaming.StreamJob: Output: c-output
bin/hadoop jar /home/hadoop/contrib/hadoop-0.15.1-streaming.jar -mapper /home/hadoop/mapper.py\
-reducer /home/hadoop/reducer.py -input doc/* -output python-output -jobconf mapred.reduce.tasks=1
additionalConfSpec_:null
null=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming
packageJobJar: [] [/home/hadoop/hadoop-0.15.1/contrib/hadoop-0.15.1-streaming.jar] /tmp/streamjob26099.jar tmpDir=null
08/03/04 19:05:40 INFO mapred.FileInputFormat: Total input paths to process : 1
08/03/04 19:05:41 INFO streaming.StreamJob: getLocalDirs(): [/home/msh/data/filesystem/mapred/local]
08/03/04 19:05:41 INFO streaming.StreamJob: Running job: job_200803031752_0004
08/03/04 19:05:41 INFO streaming.StreamJob: To kill this job, run:
08/03/04 19:05:41 INFO streaming.StreamJob: /home/msh/hadoop/bin/../bin/hadoop job -Dmapred.job.tracker=192.168.2.92:9001 -kill job_200803031752_0004
08/03/04 19:05:41 INFO streaming.StreamJob: Tracking URL: http://hadoop-master:50030/jobdetails.jsp?jobid=job_200803031752_0004
08/03/04 19:05:42 INFO streaming.StreamJob: map 0% reduce 0%
08/03/04 19:05:48 INFO streaming.StreamJob: map 33% reduce 0%
08/03/04 19:05:49 INFO streaming.StreamJob: map 100% reduce 0%
08/03/04 19:05:52 INFO streaming.StreamJob: map 100% reduce 100%
08/03/04 19:05:52 INFO streaming.StreamJob: Job complete: job_200803031752_0004
08/03/04 19:05:52 INFO streaming.StreamJob: Output: python-output
当Job提交后我们还能够在web的界面http://localhost:50030/看到每一个工作的运行情况。
当Job工作完成后我们能够在c-output和python-output看到一些文件
bin/hadoop dfs -ls c-output
输入下面的命令我们能够看到我们运行完MapReduce的结果
bin/hadoop dfs -cat c-output/part-00000
用Hadoop Streaming运行MapReduce会比较用Java的代码要慢,因为有两方面的原因:
使用 Java API >> C Streaming >> Perl Streaming 这样的一个流程运行会阻塞IO.
不像Java在运行Map后输出结果有一定数量的结果集就启动Reduce的程序,用Streaming要等到所有的Map都运行完毕后才启动Reduce
如果用Python编写MapReduce的话,另一个可选的是使用Jython来转编译Pyhton为Java的原生码.另外对于C程序员更好的选择是使用Hadoop新的C++ MapReduce API Pipes来编写.不管怎样,毕竟Hadoop提供了一种不使用Java来进行分布式运算的方法.
下面是从http://www.lunchpauze.com/2007/10/writing-hadoop-mapreduce-program-in-php.html页面中摘下的用php编写的MapReduce程序,供php程序员参考:
Map: mapper.php
#!/usr/bin/php
$word2count = array();
// input comes from STDIN (standard input)
while (($line = fgets(STDIN)) !== false) {
// remove leading and trailing whitespace and lowercase
$line = strtolower(trim($line));
// split the line into words while removing any empty string
$words = preg_split('/\W/', $line, 0, PREG_SPLIT_NO_EMPTY);
// increase counters
foreach ($words as $word) {
$word2count[$word] += 1;
}
}
// write the results to STDOUT (standard output)
// what we output here will be the input for the
// Reduce step, i.e. the input for reducer.py
foreach ($word2count as $word => $count) {
// tab-delimited
echo $word, chr(9), $count, PHP_EOL;
}
?>
Reduce: mapper.php
#!/usr/bin/php
$word2count = array();
// input comes from STDIN
while (($line = fgets(STDIN)) !== false) {
// remove leading and trailing whitespace
$line = trim($line);
// parse the input we got from mapper.php
list($word, $count) = explode(chr(9), $line);
// convert count (currently a string) to int
$count = intval($count);
// sum counts
if ($count > 0) $word2count[$word] += $count;
}
// sort the words lexigraphically
//
// this set is NOT required, we just do it so that our
// final output will look more like the official Hadoop
// word count examples
ksort($word2count);
// write the results to STDOUT (standard output)
foreach ($word2count as $word => $count) {
echo $word, chr(9), $count, PHP_EOL;
}
?>
作者:马士华 发表于:2008-03-05
首先您得配好您的Hadoop集群,这方面的介绍网上比较多,这儿给个链接(Hadoop学习笔记二 安装部署)。Hadoop Streaming帮 助我们用非Java的编程语言使用MapReduce,Streaming用STDIN (标准输入)和STDOUT (标准输出)来和我们编写的Map和Reduce进行数据的交换数据。任何能够使用STDIN和STDOUT都可以用来编写MapReduce程序,比如 我们用Python的sys.stdin和sys.stdout,或者是C中的stdin和stdout。
我们还是使用Hadoop的例子WordCount来做示范如何编写MapReduce,在WordCount的例子中我们要解决计算在一批文档中每一个单词的出现频率。首先我们在Map程序中会接受到这批文档每一行的数据,然后我们编写的Map程序把这一行按空格切开成一个数组。并对这个数组遍历按" 1"用标准的输出输出来,代表这个单词出现了一次。在Reduce中我们来统计单词的出现频率。
Python Code
Map: mapper.py
#!/usr/bin/env python
import sys
# maps words to their counts
word2count = {}
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words while removing any empty strings
words = filter(lambda word: word, line.split())
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited; the trivial word count is 1
print '%s\t%s' % (word, 1)
Reduce: reducer.py
#!/usr/bin/env python
from operator import itemgetter
import sys
# maps words to their counts
word2count = {}
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split()
# convert count (currently a string) to int
try:
count = int(count)
word2count[word] = word2count.get(word, 0) + count
except ValueError:
# count was not a number, so silently
# ignore/discard this line
pass
# sort the words lexigraphically;
#
# this step is NOT required, we just do it so that our
# final output will look more like the official Hadoop
# word count examples
sorted_word2count = sorted(word2count.items(), key=itemgetter(0))
# write the results to STDOUT (standard output)
for word, count in sorted_word2count:
print '%s\t%s'% (word, count)
C Code
Map: Mapper.c
#include
#include
#include
#include
#define BUF_SIZE 2048
#define DELIM "\n"
int main(int argc, char *argv[]){
char buffer[BUF_SIZE];
while(fgets(buffer, BUF_SIZE - 1, stdin)){
int len = strlen(buffer);
if(buffer[len-1] == '\n')
buffer[len-1] = 0;
char *querys = index(buffer, ' ');
char *query = NULL;
if(querys == NULL) continue;
querys += 1; /* not to include '\t' */
query = strtok(buffer, " ");
while(query){
printf("%s\t1\n", query);
query = strtok(NULL, " ");
}
}
return 0;
}
h>h>h>h>
Reduce: Reducer.c
#include
#include
#include
#include
#define BUFFER_SIZE 1024
#define DELIM "\t"
int main(int argc, char *argv[]){
char strLastKey[BUFFER_SIZE];
char strLine[BUFFER_SIZE];
int count = 0;
*strLastKey = '\0';
*strLine = '\0';
while( fgets(strLine, BUFFER_SIZE - 1, stdin) ){
char *strCurrKey = NULL;
char *strCurrNum = NULL;
strCurrKey = strtok(strLine, DELIM);
strCurrNum = strtok(NULL, DELIM); /* necessary to check error but.... */
if( strLastKey[0] == '\0'){
strcpy(strLastKey, strCurrKey);
}
if(strcmp(strCurrKey, strLastKey)){
printf("%s\t%d\n", strLastKey, count);
count = atoi(strCurrNum);
}else{
count += atoi(strCurrNum);
}
strcpy(strLastKey, strCurrKey);
}
printf("%s\t%d\n", strLastKey, count); /* flush the count */
return 0;
}
h>h>h>h>
首先我们调试一下源码:
chmod +x mapper.py
chmod +x reducer.py
echo "foo foo quux labs foo bar quux" | ./mapper.py | ./reducer.py
bar 1
foo 3
labs 1
quux 2
g++ Mapper.c -o Mapper
g++ Reducer.c -o Reducer
chmod +x Mapper
chmod +x Reducer
echo "foo foo quux labs foo bar quux" | ./Mapper | ./Reducer
bar 1
foo 2
labs 1
quux 1
foo 1
quux 1
你可能看到C的输出和Python的不一样,因为Python是把他放在词典里了.我们在Hadoop时,会对这进行排序,然后相同的单词会连续在标准输出中输出.
在Hadoop中运行程序
首先我们要下载我们的测试文档wget http://www.gutenberg.org/dirs/etext04/7ldvc10.txt.我们把文档存放在/tmp/doc这个目录下.拷贝测试文档到HDFS中.
bin/hadoop dfs -copyFromLocal /tmp/doc doc
运行 bin/hadoop dfs -ls doc 看看拷贝是否成功.接下来我们运行我们的MapReduce的Job.
bin/hadoop jar /home/hadoop/contrib/hadoop-0.15.1-streaming.jar -mapper /home/hadoop/Mapper\
-reducer /home/hadoop/Reducer -input doc/* -output c-output -jobconf mapred.reduce.tasks=1
additionalConfSpec_:null
null=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming
packageJobJar: [] [/home/msh/hadoop-0.15.1/contrib/hadoop-0.15.1-streaming.jar] /tmp/streamjob60816.jar tmpDir=null
08/03/04 19:03:13 INFO mapred.FileInputFormat: Total input paths to process : 1
08/03/04 19:03:13 INFO streaming.StreamJob: getLocalDirs(): [/home/msh/data/filesystem/mapred/local]
08/03/04 19:03:13 INFO streaming.StreamJob: Running job: job_200803031752_0003
08/03/04 19:03:13 INFO streaming.StreamJob: To kill this job, run:
08/03/04 19:03:13 INFO streaming.StreamJob: /home/msh/hadoop/bin/../bin/hadoop job -Dmapred.job.tracker=192.168.2.92:9001 -kill job_200803031752_0003
08/03/04 19:03:13 INFO streaming.StreamJob: Tracking URL: http://hadoop-master:50030/jobdetails.jsp?jobid=job_200803031752_0003
08/03/04 19:03:14 INFO streaming.StreamJob: map 0% reduce 0%
08/03/04 19:03:15 INFO streaming.StreamJob: map 33% reduce 0%
08/03/04 19:03:16 INFO streaming.StreamJob: map 100% reduce 0%
08/03/04 19:03:19 INFO streaming.StreamJob: map 100% reduce 100%
08/03/04 19:03:19 INFO streaming.StreamJob: Job complete: job_200803031752_0003
08/03/04 19:03:19 INFO streaming.StreamJob: Output: c-output
bin/hadoop jar /home/hadoop/contrib/hadoop-0.15.1-streaming.jar -mapper /home/hadoop/mapper.py\
-reducer /home/hadoop/reducer.py -input doc/* -output python-output -jobconf mapred.reduce.tasks=1
additionalConfSpec_:null
null=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming
packageJobJar: [] [/home/hadoop/hadoop-0.15.1/contrib/hadoop-0.15.1-streaming.jar] /tmp/streamjob26099.jar tmpDir=null
08/03/04 19:05:40 INFO mapred.FileInputFormat: Total input paths to process : 1
08/03/04 19:05:41 INFO streaming.StreamJob: getLocalDirs(): [/home/msh/data/filesystem/mapred/local]
08/03/04 19:05:41 INFO streaming.StreamJob: Running job: job_200803031752_0004
08/03/04 19:05:41 INFO streaming.StreamJob: To kill this job, run:
08/03/04 19:05:41 INFO streaming.StreamJob: /home/msh/hadoop/bin/../bin/hadoop job -Dmapred.job.tracker=192.168.2.92:9001 -kill job_200803031752_0004
08/03/04 19:05:41 INFO streaming.StreamJob: Tracking URL: http://hadoop-master:50030/jobdetails.jsp?jobid=job_200803031752_0004
08/03/04 19:05:42 INFO streaming.StreamJob: map 0% reduce 0%
08/03/04 19:05:48 INFO streaming.StreamJob: map 33% reduce 0%
08/03/04 19:05:49 INFO streaming.StreamJob: map 100% reduce 0%
08/03/04 19:05:52 INFO streaming.StreamJob: map 100% reduce 100%
08/03/04 19:05:52 INFO streaming.StreamJob: Job complete: job_200803031752_0004
08/03/04 19:05:52 INFO streaming.StreamJob: Output: python-output
当Job提交后我们还能够在web的界面http://localhost:50030/看到每一个工作的运行情况。
当Job工作完成后我们能够在c-output和python-output看到一些文件
bin/hadoop dfs -ls c-output
输入下面的命令我们能够看到我们运行完MapReduce的结果
bin/hadoop dfs -cat c-output/part-00000
用Hadoop Streaming运行MapReduce会比较用Java的代码要慢,因为有两方面的原因:
使用 Java API >> C Streaming >> Perl Streaming 这样的一个流程运行会阻塞IO.
不像Java在运行Map后输出结果有一定数量的结果集就启动Reduce的程序,用Streaming要等到所有的Map都运行完毕后才启动Reduce
如果用Python编写MapReduce的话,另一个可选的是使用Jython来转编译Pyhton为Java的原生码.另外对于C程序员更好的选择是使用Hadoop新的C++ MapReduce API Pipes来编写.不管怎样,毕竟Hadoop提供了一种不使用Java来进行分布式运算的方法.
下面是从http://www.lunchpauze.com/2007/10/writing-hadoop-mapreduce-program-in-php.html页面中摘下的用php编写的MapReduce程序,供php程序员参考:
Map: mapper.php
#!/usr/bin/php
$word2count = array();
// input comes from STDIN (standard input)
while (($line = fgets(STDIN)) !== false) {
// remove leading and trailing whitespace and lowercase
$line = strtolower(trim($line));
// split the line into words while removing any empty string
$words = preg_split('/\W/', $line, 0, PREG_SPLIT_NO_EMPTY);
// increase counters
foreach ($words as $word) {
$word2count[$word] += 1;
}
}
// write the results to STDOUT (standard output)
// what we output here will be the input for the
// Reduce step, i.e. the input for reducer.py
foreach ($word2count as $word => $count) {
// tab-delimited
echo $word, chr(9), $count, PHP_EOL;
}
?>
Reduce: mapper.php
#!/usr/bin/php
$word2count = array();
// input comes from STDIN
while (($line = fgets(STDIN)) !== false) {
// remove leading and trailing whitespace
$line = trim($line);
// parse the input we got from mapper.php
list($word, $count) = explode(chr(9), $line);
// convert count (currently a string) to int
$count = intval($count);
// sum counts
if ($count > 0) $word2count[$word] += $count;
}
// sort the words lexigraphically
//
// this set is NOT required, we just do it so that our
// final output will look more like the official Hadoop
// word count examples
ksort($word2count);
// write the results to STDOUT (standard output)
foreach ($word2count as $word => $count) {
echo $word, chr(9), $count, PHP_EOL;
}
?>
作者:马士华 发表于:2008-03-05
推荐律师服务:
若未解决您的问题,请您详细描述您的问题,通过百度律临进行免费专业咨询