如何将MapReduce转换成Spark

 我来答
hs...o@163.com
2017-06-14
知道答主
回答量:47
采纳率:0%
帮助的人:9.1万
展开全部
MapReduce 转换到 Spark
Spark 是类似于 MapReduce 的计算引擎,它提出的内存方式解决了 MapReduce 存在的读取磁盘速度较慢的困难,此外,它基于 Scala 的函数式编程风格和 API,进行并行计算时效率很高。

于 Spark 采用的是 RDD(弹性分布式结果集) 方式对数据进行计算,这种方式与 MapReduce 的 Map()、Reduce()
方式差距较大,所以很难直接使用 Mapper、Reducer 的 API,这也是阻碍 MapReduce 转为 Spark 的绊脚石。
Scala 或者 Spark 里面的 map() 和 reduce() 方法与 Hadoop MapReduce 里面的 map()、reduce() 方法相比,Hadoop
MapReduce 的 API 更加灵活和复杂,下面列出了 Hadoop MapReduce 的一些特性:
Mappers 和 Reducers 通常使用 key-value 键值对作为输入和输出;
一个 key 对应一个 Reducer 的 reduce;
每一个 Mapper 或者 Reducer 可能发出类似于 0,1 这样的键值对作为每一次输出;
Mappers 和 Reducers 可能发出任意的 key 或者 value,而不是标准数据集方式;
Mapper 和 Reducer 对象对每一次 map() 和 reduce() 的调用都存在生命周期。它们支持一个 setup() 方法和 cleanup() 方法,这些方法可以被用来在处理批量数据之前的操作。
试想这么一个场景,我们需要计算一个文本文件里每一行的字符数量。在 Hadoop
MapReduce 里,我们需要为 Mapper 方法准备一个键值对,key 用作行的行数,value 的值是这一行的字符数量。
清单 9.
MapReduce 方式 Map 函数
public class LineLengthCountMapper
extends Mapper<LongWritable,Text,IntWritable,IntWritable> {
@Override
protected void map(LongWritable lineNumber, Text line, Context context)
throws IOException, InterruptedException {
context.write(new IntWritable(line.getLength()), new IntWritable(1));
}
}

清单 9 所示代码,由于 Mappers 和 Reducers 只处理键值对,所以对于类
LineLengthCountMapper 而言,输入是 TextInputFormat 对象,它的 key 由行数提供,value
就是该行所有字符。换成 Spark 之后的代码如清单 10 所示。
清单 10. Spark 方式 Map 函数
lines.map(line => (line.length, 1))

在 Spark 里,输入是弹性分布式数据集 (Resilient Distributed
Dataset),Spark 不需要 key-value 键值对,代之的是 Scala 元祖 (tuple),它是通过 (line.length,
1) 这样的 (a,b) 语法创建的。以上代码中 map() 操作是一个 RDD,(line.length,
1) 元祖。当一个 RDD 包含元祖时,它依赖于其他方法,例如 reduceByKey(),该方法对于重新生成 MapReduce 特性具有重要意义。
清单 11 所示代码是 Hadoop MapReduce 统计每一行的字符数,然后以 Reduce 方式输出。
清单 11.
MapReduce 方式 Reduce 函数
public class LineLengthReducer
extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> {
@Override
protected void reduce(IntWritable length, Iterable<IntWritable> counts, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable count : counts) {
sum += count.get();
}
context.write(length, new IntWritable(sum));
}
}

Spark 里面的对应代码如清单 12 所示。
清单 12.
Spark 方式 Reduce 函数
val lengthCounts = lines.map(line => (line.length, 1)).reduceByKey(_ + _)

Spark 的 RDD API 有一个 reduce() 方法,它会 reduce 所有的 key-value 键值对到一个独立的 value。
我们现在需要统计大写字母开头的单词数量,对于文本的每一行而言,一个 Mapper 可能需要统计很多个键值对,代码如清单 13 所示。
清单 13.
MapReduce 方式计算字符数量
public class CountUppercaseMapper
extends Mapper<LongWritable,Text,Text,IntWritable> {
@Override
protected void map(LongWritable lineNumber, Text line, Context context)
throws IOException, InterruptedException {
for (String word : line.toString().split(" ")) {
if (Character.isUpperCase(word.charAt(0))) {
context.write(new Text(word), new IntWritable(1));
}
}
}
}

在 Spark 里面,对应的代码如清单 14 所示。
清单 14. Spark 方式计算字符数量
lines.flatMap(
_.split(" ").filter(word => Character.isUpperCase(word(0))).map(word => (word,1))
)

MapReduce 依赖的 Map
方法这里并不适用,因为每一个输入必须对应一个输出,这样的话,每一行可能占用到很多的输出。相反的,Spark 里面的 Map
方法比较简单。Spark
里面的方法是,首先对每一行数据进行汇总后存入一个输出结果物数组,这个数组可能是空的,也可能包含了很多的值,最终这个数组会作为一个 RDD
作为输出物。这就是 flatMap() 方法的功能,它对每一行文本里的单词转换成函数内部的元组后进行了过滤。
在 Spark 里面,reduceByKey() 方法可以被用来统计每篇文章里面出现的字母数量。如果我们想统计每一篇文章里面出现的大写字母数量,在 MapReduce 里程序可以如清单 15 所示。
清单 15. MapReduce 方式
public class CountUppercaseReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
@Override
protected void reduce(Text word, Iterable<IntWritable> counts, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable count : counts) {
sum += count.get();
}
context.write(new Text(word.toString().toUpperCase()), new IntWritable(sum));
}
}

在 Spark 里,代码如清单 16 所示。
清单 16. Spark 方式
groupByKey().map { case (word,ones) => (word.toUpperCase, ones.sum) }

groupByKey() 方法负责收集一个 key 的所有值,不应用于一个 reduce 方法。本例中,key 被转换成大写字母,值被直接相加算出总和。但这里需要注意,如果一个 key 与很多个 value 相关联,可能会出现 Out
Of Memory 错误。
Spark 提供了一个简单的方法可以转换 key 对应的值,这个方法把 reduce 方法过程移交给了 Spark,可以避免出现 OOM 异常。
reduceByKey(_ + _).map { case (word,total) => (word.toUpperCase,total)
}
setup() 方法在 MapReduce 里面主要的作用是在 map 方法开始前对输入进行处理,常用的场景是连接数据库,可以在 cleanup() 方法中释放在 setup() 方法里面占用的资源。
清单 17. MapReduce 方式
public class SetupCleanupMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
private Connection dbConnection;
@Override
protected void setup(Context context) {
dbConnection = ...;
}
...
@Override
protected void cleanup(Context context) {
dbConnection.close();
}
}
推荐律师服务: 若未解决您的问题,请您详细描述您的问题,通过百度律临进行免费专业咨询

为你推荐:

下载百度知道APP,抢鲜体验
使用百度知道APP,立即抢鲜体验。你的手机镜头里或许有别人想知道的答案。
扫描二维码下载
×

类别

我们会通过消息、邮箱等方式尽快将举报结果通知您。

说明

0/200

提交
取消

辅 助

模 式