Spark之RDD

Aug 20, 2016   #spark  #rdd 

RDD

RDD,全称为Resilient Distributed Datasets(弹性分布式数据集)。简单接触了spark后,萌发了想要深入且透彻的了解RDD到底是什么。

从英文解释来看,它应该是Dataset的集合,且是一个提供了许多操作接口的数据集合。和普通数据集的区别是:实际数据分布存储在一批机器的内存或硬盘中。当然RDD还有容错和并行的功能。

下面摘录了一段对RDD的解释:

它是一个容错并行的数据结构,可以让用户显式地将数据存储到磁盘和内存中,并能控制数据的分区。它只能基于在稳定物理存储中的数据集和其他已有的RDD上执行确定性操作(transformation)来创建。

所以RDD的两个关键点为:带操作接口的数据集,分布式。

举个例子:

//读取文件创建RDD
val rdd = sc.textFile("README.md").cache()
val numAs = rdd.filter(line => line.contains("a")).count()//包含a的行数
val numBs = rdd.filter(line => line.contains("b")).count()//包含b的行数
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))

由此可见,RDD本质是一个只读的分区记录集合。一个RDD包含多个分区,每个分区是一个dataset片段。

前面提到了带操作接口的数据集,操作接口主要是RDD提供了对数据进行映射,归一等操作的API接口。这无疑方便了对分布式环境中的数据整合操作。操作分为两类:转换(transformations)和行动(actions)。前者根据原有的RDD创建一个新的RDD,后者则操作结果返回给driver。

以前总是不理解操作分成两类的原因,直接每次操作后都返回结果不可以么?解释是这样的,首先考虑到有些操作(如filter)过后,返回结果的数据量较大,而有些操作(如count)过后,返回的结果却很简洁。spark设计时,将转换操作设置为lazy的,即不会立即计算结果,只有遇到action,才会处理并返回结果。这样会使spark更高效。

转换类的操作有:map, filter, flatMap, mapPartitions, mapPartitionsWithIndex, sample, union, intersection, distinct, groupByKey, reduceByKey, aggregateByKey, sortByKey, join, cogroup, cartesian, pipe, coalesce, repartition。

行动类的操作有:reduce, collect, count, first, take, takeSample, takeOrdered, saveAsTextFile, saveAsSequenceFile, saveAsObjectFile

相关操作函数如下表:

val rdd1 = sc.parallelize(1 to 9, 3)
val rdd2 = sc.parallelize(list("dog","tiger","lion","cat","parther","eagle"),2)
val rdd3 = sc.parallelize(List(1,2),(3,4),(3,6)))
函数名 描述 输入与输出 Code
map 对RDD中的每个元素执行一个指定的函数来产生一个新的RDD,映射函数的参数为RDD中的每一个元素 一对一 val a = rdd1.map(x => x*2)
mapValues 对象是元素为kv对的RDD,key保持不变,对value进行映射 一对一 val b = rdd2.map(x => (x.length, x)) b.mapValues(“x” + _ + “x”).collect
flatMap 扁平化map后的结果,多个集合合并为一个集合 一对一
mapPartitions 映射函数的参数为RDD中每一个分区的迭代器 一对一 可用于为RDD中数据按分区创建连接
reduce 将RDD中元素两两传递给输入函数,产生一个新值,新产生的值与RDD中的下一个元素再被传递给输入函数,直到最后只有一个值为止 多对一 rdd1.reduce((x,y) => x+y) //结果为45
reduceByKey 将key相同的元素的value进行reduce,组成一个新的KV对 多对一 rdd3.reduceByKey((x,y) => x+y).collect //结果为Array((1,2),(3,10))
filter 对每个元素应用f函数,只保留返回值为true的元素 子集型 val d = rdd3.filter{ case(x,y) => equal(x,y) }.count() //结果为0
sample 对元素采样,获取子集 子集型
cache 将元素从磁盘缓存到内存 cache型 rdd1.cache()
persist 对RDD缓存,位置可指定 cache型

([1-9]\d+|[2-9]) 通常来讲,针对数据处理有几种常见模型,包括:Iterative Algorithms,Relational Queries,MapReduce,Stream Processing。例如Hadoop MapReduce采用了MapReduces模型,Storm则采用了Stream Processing模型。RDD混合了这四种模型,使得Spark可以应用于各种大数据处理场景。

Spark将依赖分为narrow与wide。如果RDD的每个分区最多只能被一个Child RDD的一个分区使用,则称之为narrow dependency;若多个Child RDD分区都可以依赖,则称之为wide dependency。不同的操作依据其特性,可能会产生不同的依赖。例如map操作会产生narrow dependency,而join操作则产生wide dependency。

上图说明了narrow和wide之间的区别。 narrow dependencies可以支持在同一个cluster node上以管道形式执行多条命令,而且它的失败恢复更有效,只需要重新计算丢失的parent partition即可;wide dependencies需要所有的父分区都是可用的,牵涉到RDD各级的多个Parent Partitions。

共享变量

broadcast变量:只读的共享变量 每个节点上都有一个拷贝。

val broadcastVar = sc.broadcast("string test") //broadcast variable is readonly
val v = broadcastVar.value
println(v)

accumulator变量:做累加器用

val accum = sc.accumulator(0, "My Accumulator") //value and name
sc.parallelize(1 to 1000000).foreach(x => accum+= 1)
println(accum.name + ":" + accum.value)

SparkContext