RDD
RDD,全称为Resilient Distributed Datasets(弹性分布式数据集)。简单接触了spark后,萌发了想要深入且透彻的了解RDD到底是什么。
从英文解释来看,它应该是Dataset的集合,且是一个提供了许多操作接口的数据集合。和普通数据集的区别是:
下面摘录了一段对RDD的解释:
所以RDD的两个关键点为:带操作接口的数据集,分布式。
举个例子:
//读取文件创建RDD
val rdd = sc.textFile("README.md").cache()
val numAs = rdd.filter(line => line.contains("a")).count()//包含a的行数
val numBs = rdd.filter(line => line.contains("b")).count()//包含b的行数
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
由此可见,RDD
前面提到了带操作接口的数据集,操作接口主要是RDD提供了对数据进行映射,归一等操作的API接口。这无疑方便了对分布式环境中的数据整合操作。操作分为两类:转换(transformations)和行动(actions)。前者根据原有的RDD创建一个新的RDD,后者则操作结果返回给driver。
以前总是不理解操作分成两类的原因,直接每次操作后都返回结果不可以么?解释是这样的,首先考虑到有些操作(如filter)过后,返回结果的数据量较大,而有些操作(如count)过后,返回的结果却很简洁。spark设计时,将转换操作设置为lazy的,即不会立即计算结果,只有遇到action,才会处理并返回结果。这样会使spark更高效。
转换类的操作有:map, filter, flatMap, mapPartitions, mapPartitionsWithIndex, sample, union, intersection, distinct, groupByKey, reduceByKey, aggregateByKey, sortByKey, join, cogroup, cartesian, pipe, coalesce, repartition。
行动类的操作有:reduce, collect, count, first, take, takeSample, takeOrdered, saveAsTextFile, saveAsSequenceFile, saveAsObjectFile
相关操作函数如下表:
val rdd1 = sc.parallelize(1 to 9, 3)
val rdd2 = sc.parallelize(list("dog","tiger","lion","cat","parther","eagle"),2)
val rdd3 = sc.parallelize(List(1,2),(3,4),(3,6)))
函数名 | 描述 | 输入与输出 | Code |
---|---|---|---|
map | 对RDD中的每个元素执行一个指定的函数来产生一个新的RDD,映射函数的参数为RDD中的每一个元素 | 一对一 | val a = rdd1.map(x => x*2) |
mapValues | 对象是元素为kv对的RDD,key保持不变,对value进行映射 | 一对一 | val b = rdd2.map(x => (x.length, x)) b.mapValues(“x” + _ + “x”).collect |
flatMap | 扁平化map后的结果,多个集合合并为一个集合 | 一对一 | |
mapPartitions | 映射函数的参数为RDD中每一个分区的迭代器 | 一对一 | 可用于为RDD中数据按分区创建连接 |
reduce | 将RDD中元素两两传递给输入函数,产生一个新值,新产生的值与RDD中的下一个元素再被传递给输入函数,直到最后只有一个值为止。 | 多对一 | rdd1.reduce((x,y) => x+y) //结果为45 |
reduceByKey | 将key相同的元素的value进行reduce,组成一个新的KV对 | 多对一 | rdd3.reduceByKey((x,y) => x+y).collect //结果为Array((1,2),(3,10)) |
filter | 对每个元素应用f函数,只保留返回值为true的元素 | 子集型 | val d = rdd3.filter{ case(x,y) => equal(x,y) }.count() //结果为0 |
sample | 对元素采样,获取子集 | 子集型 | |
cache | 将元素从磁盘缓存到内存 | cache型 | rdd1.cache() |
persist | 对RDD缓存,位置可指定 | cache型 |
([1-9]\d+|[2-9])
通常来讲,针对数据处理有几种常见模型,包括:
Spark将依赖分为narrow与wide。如果RDD的每个分区最多只能被一个Child RDD的一个分区使用,则称之为narrow dependency;若多个Child RDD分区都可以依赖,则称之为wide dependency。不同的操作依据其特性,可能会产生不同的依赖。例如map操作会产生narrow dependency,而join操作则产生wide dependency。
上图说明了narrow和wide之间的区别。 narrow dependencies可以支持在同一个cluster node上以管道形式执行多条命令,而且它的失败恢复更有效,只需要重新计算丢失的parent partition即可;wide dependencies需要所有的父分区都是可用的,牵涉到RDD各级的多个Parent Partitions。
共享变量
val broadcastVar = sc.broadcast("string test") //broadcast variable is readonly
val v = broadcastVar.value
println(v)
val accum = sc.accumulator(0, "My Accumulator") //value and name
sc.parallelize(1 to 1000000).foreach(x => accum+= 1)
println(accum.name + ":" + accum.value)