Spark Transfrom算子总结 (一)

map算子

基本的转换算子,对数据的map处理。分区内数据有序,不同分区数据无序,分区数据处理互不干扰。数据串行操作。性能比较低。

package com.it.spark.core.rdd

import org.apache.spark.{SparkConf, SparkContext}

object Transfrom_Action_RDD {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(List(1, 2, 3, 4),2)
    rdd.map(_*2).collect().foreach(println)
  }
}

mapPartitions算子

对数据具有缓冲区功能,计算完每个分区后再进行下一个算子操作,原始的数据List不会被清空,但是
再对于数据量大,但是内存量小下的时候可能会发生bug。分区内数据批量操作,需要传入一个迭代器,对数据可能有增加或者减少的操作。性能比较高。但是可能会发生内存溢出(mapPartition算子会长时间占用内存)。

package com.it.spark.core.rdd

import org.apache.spark.{SparkConf, SparkContext}

object Transfrom_Action_RDD {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(List(1, 2, 3, 4),2)
    rdd.mapPartitions{
      iter => {
        iter.map(
          x => x * 2
        )
      }
    }.collect().foreach(println)
  }

}

分区最大值

package com.it.spark.core.rdd

import org.apache.spark.{SparkConf, SparkContext}

object Transfrom_Action_RDD {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(List(1, 2, 3, 4),2)
    rdd.mapPartitions{
      iter => {
        List(iter.max).iterator
      }
    }.collect().foreach(println)
  }

}

mapPartitionsWithIndex算子

可以获取到数据的分区。

package com.it.spark.core.rdd

import org.apache.spark.{SparkConf, SparkContext}

object Transfrom_Action_RDD {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
    rdd.mapPartitionsWithIndex {
      (index, iter) => {
        if (index == 1) {
          iter
        } else {
          Nil.iterator
        }
      }
    }.collect().foreach(println)
  }

}

flatMap算子

数据的扁平化操作,贴个wordcount。

package com.it.spark.core.wc

import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("WprdCount")
    val sc = new SparkContext(sparkConf)
    val value = sc.textFile("datas")
    value.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_+_).collect().foreach(println)
    sc.stop()
  }

}

glom算子

将通过分区的数据直接转换为一样类型的内存数组进行处理,分区不变。

package com.it.spark.core.rdd

import org.apache.spark.{SparkConf, SparkContext}

object Transfrom_Action_RDD {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
    rdd.glom().collect().foreach(
      data => println(data.mkString(","))
    )
    sc.stop()
  }

}

groupBy算子

将数据根据指定的规则进行分组,分区默认不变,但是数据会被打乱重新组合,我们称之为shuffle,极限情况下,数据可能会被分在同一个分区。

package com.it.spark.core.rdd

import org.apache.spark.{SparkConf, SparkContext}

object Transfrom_Action_RDD {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
    rdd.groupBy(x => x % 2).collect().foreach(println)
    sc.stop()
  }

}

filter算子

将数据根据指定的规则进行筛选过滤,符合规则的保留,不符合的过滤,分区不变,但是分区内数据可能不均衡,可能会出现数据倾斜。

package com.it.spark.core.rdd

import org.apache.spark.{SparkConf, SparkContext}

object Transfrom_Action_RDD {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)
    rdd.filter(x => x % 2 !=0 ).collect().foreach(println)
    sc.stop()
  }

}

sample算子

按照指定规则抽取数据
函数签名:

  /**
   * Return a sampled subset of this RDD.
   *
   * @param withReplacement can elements be sampled multiple times (replaced when sampled out)
   * @param fraction expected size of the sample as a fraction of this RDD s size
   *  without replacement: probability that each element is chosen; fraction must be [0, 1]
   *  with replacement: expected number of times each element is chosen; fraction must be greater
   *  than or equal to 0
   * @param seed seed for the random number generator
   *
   * @note This is NOT guaranteed to provide exactly the fraction of the count
   * of the given [[RDD]].
   */
  def sample(
      withReplacement: Boolean, //抽取数据后是否将数据返回 (true)返回,false(丢弃)
      fraction: Double,                  //抽取不放回每条数据可能被抽取的概率  有基准值   抽取放回:数据源中每条数据被抽取的可能次数
      seed: Long = Utils.random.nextLong): RDD[T] = {
    require(fraction >= 0,
      s"Fraction must be nonnegative, but got ${fraction}") //抽取数据是随机算法的种子 可以不写,默认是系统时间

    withScope {
      require(fraction >= 0.0, "Negative fraction value: " + fraction)
      if (withReplacement) {
        new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
      } else {
        new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
      }
    }
  }

代码:

package com.it.spark.core.rdd

import org.apache.spark.{SparkConf, SparkContext}

object Transfrom_Action_RDD {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(List(1, 2, 3, 4,5,6,7,8,9,10), 2)
    rdd.sample(
      true,
      0.4,
      2
    ).collect().foreach(println)
    sc.stop()
  }

}

distinct算子

将数据聚焦的数据去重
去重原理:

partitioner match {
      case Some(_) if numPartitions == partitions.length =>
        mapPartitions(removeDuplicatesInPartition, preservesPartitioning = true)
      case _ => map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
    }

代码实例:

package com.it.spark.core.rdd

import org.apache.spark.{SparkConf, SparkContext}

object Transfrom_Action_RDD {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(List(1, 2, 3, 4,1,2,3,4), 2)
    rdd.distinct().collect().foreach(println)
    sc.stop()
  }

}

coalesce算子

将数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率。
在spark程序中,存在过多小任务的时候,可以通过coalesce方法,收缩合并分区,减少分区的个数,减少任务调度成本。
coalesce默认不会把原来的分区数据打乱重新组合,可能会出现数据倾斜。

package com.it.spark.core.rdd

import org.apache.spark.{SparkConf, SparkContext}

object Transfrom_Action_RDD {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("RDD")
    val sc = new SparkContext(sparkConf)
    val rdd = sc.makeRDD(List(1, 2, 3, 4), 4)
    rdd.coalesce(2).saveAsTextFile("output")
    sc.stop()
  }

}

源码如下:

def coalesce(numPartitions: Int, shuffle: Boolean = false,
               partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
              (implicit ord: Ordering[T] = null)
      : RDD[T] = withScope {

如果需要避免数据倾斜,则传入第二个参数为true。coalesce是可以扩大分区的,但是要进行shuffle操作,否则没有意义。

repartition算子

spark不推荐使用coalesce算子扩大分区,可以使用repartition算子。源码实现是coalesce。

  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
  }

© 版权声明

相关文章

暂无评论

您必须登录才能参与评论!
立即登录
none
暂无评论...