Spark Transfrom算子总结 (一)
map算子
基本的转换算子,对数据的map处理。分区内数据有序,不同分区数据无序,分区数据处理互不干扰。数据串行操作。性能比较低。
package com.it.spark.core.rdd
import org.apache.spark.{SparkConf, SparkContext}
object Transfrom_Action_RDD {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4),2)
rdd.map(_*2).collect().foreach(println)
}
}
mapPartitions算子
对数据具有缓冲区功能,计算完每个分区后再进行下一个算子操作,原始的数据List不会被清空,但是
再对于数据量大,但是内存量小下的时候可能会发生bug。分区内数据批量操作,需要传入一个迭代器,对数据可能有增加或者减少的操作。性能比较高。但是可能会发生内存溢出(mapPartition算子会长时间占用内存)。
package com.it.spark.core.rdd
import org.apache.spark.{SparkConf, SparkContext}
object Transfrom_Action_RDD {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4),2)
rdd.mapPartitions{
iter => {
iter.map(
x => x * 2
)
}
}.collect().foreach(println)
}
}
分区最大值
package com.it.spark.core.rdd
import org.apache.spark.{SparkConf, SparkContext}
object Transfrom_Action_RDD {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4),2)
rdd.mapPartitions{
iter => {
List(iter.max).iterator
}
}.collect().foreach(println)
}
}
mapPartitionsWithIndex算子
可以获取到数据的分区。
package com.it.spark.core.rdd
import org.apache.spark.{SparkConf, SparkContext}
object Transfrom_Action_RDD {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
rdd.mapPartitionsWithIndex {
(index, iter) => {
if (index == 1) {
iter
} else {
Nil.iterator
}
}
}.collect().foreach(println)
}
}
flatMap算子
数据的扁平化操作,贴个wordcount。
package com.it.spark.core.wc
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("WprdCount")
val sc = new SparkContext(sparkConf)
val value = sc.textFile("datas")
value.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_+_).collect().foreach(println)
sc.stop()
}
}
glom算子
将通过分区的数据直接转换为一样类型的内存数组进行处理,分区不变。
package com.it.spark.core.rdd
import org.apache.spark.{SparkConf, SparkContext}
object Transfrom_Action_RDD {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
rdd.glom().collect().foreach(
data => println(data.mkString(","))
)
sc.stop()
}
}
groupBy算子
将数据根据指定的规则进行分组,分区默认不变,但是数据会被打乱重新组合,我们称之为shuffle,极限情况下,数据可能会被分在同一个分区。
package com.it.spark.core.rdd
import org.apache.spark.{SparkConf, SparkContext}
object Transfrom_Action_RDD {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
rdd.groupBy(x => x % 2).collect().foreach(println)
sc.stop()
}
}
filter算子
将数据根据指定的规则进行筛选过滤,符合规则的保留,不符合的过滤,分区不变,但是分区内数据可能不均衡,可能会出现数据倾斜。
package com.it.spark.core.rdd
import org.apache.spark.{SparkConf, SparkContext}
object Transfrom_Action_RDD {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
rdd.filter(x => x % 2 !=0 ).collect().foreach(println)
sc.stop()
}
}
sample算子
按照指定规则抽取数据
函数签名:
/**
* Return a sampled subset of this RDD.
*
* @param withReplacement can elements be sampled multiple times (replaced when sampled out)
* @param fraction expected size of the sample as a fraction of this RDD s size
* without replacement: probability that each element is chosen; fraction must be [0, 1]
* with replacement: expected number of times each element is chosen; fraction must be greater
* than or equal to 0
* @param seed seed for the random number generator
*
* @note This is NOT guaranteed to provide exactly the fraction of the count
* of the given [[RDD]].
*/
def sample(
withReplacement: Boolean, //抽取数据后是否将数据返回 (true)返回,false(丢弃)
fraction: Double, //抽取不放回每条数据可能被抽取的概率 有基准值 抽取放回:数据源中每条数据被抽取的可能次数
seed: Long = Utils.random.nextLong): RDD[T] = {
require(fraction >= 0,
s"Fraction must be nonnegative, but got ${fraction}") //抽取数据是随机算法的种子 可以不写,默认是系统时间
withScope {
require(fraction >= 0.0, "Negative fraction value: " + fraction)
if (withReplacement) {
new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
} else {
new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
}
}
}
代码:
package com.it.spark.core.rdd
import org.apache.spark.{SparkConf, SparkContext}
object Transfrom_Action_RDD {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4,5,6,7,8,9,10), 2)
rdd.sample(
true,
0.4,
2
).collect().foreach(println)
sc.stop()
}
}
distinct算子
将数据聚焦的数据去重
去重原理:
partitioner match {
case Some(_) if numPartitions == partitions.length =>
mapPartitions(removeDuplicatesInPartition, preservesPartitioning = true)
case _ => map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
}
代码实例:
package com.it.spark.core.rdd
import org.apache.spark.{SparkConf, SparkContext}
object Transfrom_Action_RDD {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4,1,2,3,4), 2)
rdd.distinct().collect().foreach(println)
sc.stop()
}
}
coalesce算子
将数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率。
在spark程序中,存在过多小任务的时候,可以通过coalesce方法,收缩合并分区,减少分区的个数,减少任务调度成本。
coalesce默认不会把原来的分区数据打乱重新组合,可能会出现数据倾斜。
package com.it.spark.core.rdd
import org.apache.spark.{SparkConf, SparkContext}
object Transfrom_Action_RDD {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("RDD")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1, 2, 3, 4), 4)
rdd.coalesce(2).saveAsTextFile("output")
sc.stop()
}
}
源码如下:
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
如果需要避免数据倾斜,则传入第二个参数为true。coalesce是可以扩大分区的,但是要进行shuffle操作,否则没有意义。
repartition算子
spark不推荐使用coalesce算子扩大分区,可以使用repartition算子。源码实现是coalesce。
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
© 版权声明
文章版权归作者所有,未经允许请勿转载。
相关文章
暂无评论...