Spark核心技术,运行流以及SparkRDD基本

1. 什么是Spark

Spark 以前是一个单一的内存计算机框架,发展至今成为一个独立的生态系统

现在基本每年都会在北京,深圳开设一些关于Spark的相关圆桌会议

相对于Spark来说,有一个相似的计算机框架,Storm。Storm是一个纯粹的实时计算框架

Spark支持离线计算,通过SQL的方式进行数据处理、分析

此外也具备Streaming的准实时计算机方式。

Spark能替代Hadoop吗?

不能, Spark不能代替Hadoop的主要原因,不是在于技术框架, 而是在于业务场景。

Spark很快?(不算Streaming)

相对于Hadoop, Hadoop跑1时间,Spark10分钟、 Hadoop跑1天,Spark跑1小时

1.1 Spark集群概述

img

角色介绍

  1. Dirver Program
    • 编写的Spark程序
  2. SparkContext
    • 在每个Driver Program中都会存在一个SparkContext
    • SparkContext主要用于跟Spark集群进行交互(申请计算资源,配置我们任务调度机制…)
  3. Cluster Manager
    • 管理Spark集群以及与Driver Program中的SparkContext进行交互
  4. Worker Node
    • 就是一台Spark集群中的计算机节点,通常与DataNode是同一台(数据本地化)
    • 每台Worker都具有一定的Cpu Cores和Memory
  5. Executor
    • Executor是一个进程,是Spark集群中,资源分配的最小单元
    • 一个Driver Program 会拥有多个Executor
    • 一个Worker Node下会产生多个Executor
    • 但Executor本身只做任务的管理与调度
  6. Task
    • Task是一个线程, Driver Program中的每一个RDD操作都会转换成一个Task任务
    • Driver Program会将一批Task发送到Excutor中
    • 由Excutor来管理执行Task,将结果反馈给SparkContext中

2 什么是RDD

2.1 RDD(弹性分布式数据集)

  1. 不可变
  2. 只读
  3. 具有自动容错恢复机制(DAG,有向无环图)
  4. 具有优先选择数据处理节点位置的机制(计算本地化)
  5. 本身并不是数据集,而一个并行处理数据集的工具

2.2 内部每个RDD有以下主要的五个属性

2.3 RDD两种数据集创建方式

  1. 并行化集合
    • 参考主机的cpu core的数量
    • 划分的分区数一般是 cpu core的两倍
  2. 外部数据集,例如HDFSAWS , S3
    • 默认情况下,HDFS上文件对应的BLOCK数量有多少,Spark就有多少Partition
    • 可以大于当前BLOCK数量 的partition,但最低不能低于HDFS上的block数量。

2.4 RDD动作

  1. Transformation
    • 主要是多个RDD之间数据的转换 (Lazy)
  2. Action
    • 主要的用于最终的数据聚合,数据转换后的计算结果
    • 每次调用一个Action操作时, 每一个RDD都会重新被计算

只有在执行Action时,才会触发Spark的任务调度与计算, Transformation是一个Lazy操作。

2.5 RDD 缓存、持久化(persist)

Spark非常快速的原因之一,就是在不同的操作中在内存中持久化一个数据集。

当持久化一个数据集后,每个节点都将把计算的分片结果,保存在内存中,并在此数据集后面进行的(action)操作,会对其进行重用。这使用后续动作变得更加迅速(通常快10倍)。

假设首先进行RDD0 -> RDD1 -> RDD2的计算作业,那么计算结束时,RDD1就已经缓存在系统中。在进行RDD0 -> RDD1 -> RDD3的作用时,由于RDD1已经缓存在系统中,只需进行RDD1 -> RDD3,因此计算速度可以得到很大提升。

img

2.5.1 两种持久化方法
2.5.2 Spark持久化级别
持久化级别 含义解释
MEMORY_ONLY 默认级别,如果cache中的元素无法全部缓存到内存中,则丢失一部分。 有一部分数据还是会重新计算(数据无法完全存储到内存中)。数据没有经过序列化
MEMORY_AND_DISK 优先将数据存储到内存中。如果内存不够,就将数据存储到磁盘。
MEMORY_ONLY_SER 为了节省更多空间,我们可以先数据序列化,然后在存储在内存中。推荐使用Kryo serialization进行序列化。
MEMORY_AND_DISK_SER 基本含义与MEMORY_AND_DISK相同。唯一的区别是, 会将RDD的数据进行序列化。
DISK_ONLY 使用未序列化Java对象格式,将数据存储到磁盘文件中
MEMORY_ONLY_2 cache的数据会具有多少个副本
2.5.3 这么多持久化,哪家强(应该使用哪种)?

Spark的优势在于内存,如果还向磁盘上写入数据,那么会成为Spark版的Hadoop。

不要往磁盘上进行Cache上,磁盘的读写速度还不如在内存中重新做计算。

2.6 缓存的使用场景?

  1. 获取大量数据之后,比如来自外部数据源
  2. 进行一个非常长的计算链条
  3. 某个计算非常耗时
  4. 进行checkpoint之前
2.6.1 如何选择持久化策略?
  1. 优先使用MEMORY_ONLY

  2. 内存放不下就使用MEMORY_ONLY_SER,但会额外消耗CPU资源

  3. 容错恢复使用MEMORY_ONLY_2
  4. 尽量不适用DISK相关策略

2.7 RDD Checkpoint(检查点)

Checkpoint 的引入: 如果缓存丢失了,则需要重新计算。但有的时候,计算特别复杂或计算特别耗时,那么缓存丢失对于整个Job是影响是非常大的。

Checkpoint机制:计算完成后,重新建立一个Job来计算。为了避免重复计算,推荐先将RDD进行缓存。

2.8 Shared Variables

  1. Broadcast Variables(广播变量)
    • 广播变量就是一个全局只读的变量或集合。
    • 因为我们没有必要把每一个相关变量都往我们的Executor进行拷贝
  2. Accumulators(累加器)
    • 定义个全局的累加变量(只能做累加),用于所有的Executor执行全局累加操作

3 RDD 的转换和DAG生成

3.1 缓存的处理

如果存储级别不是NONE,那么先检查是否在缓存,没有则进行计算。

缓存的级别是什么?

RDD的第个Partition对应Storage模块的一个Block,Block是Partition处理后的数据。

3.1.1 WordCount 流程
val conf = new SparkConf().setAppName("WordCount").setMaster("local")
val sc = new SparkContext(conf)
val textFile = sc.textFile("C:\\Users\\13194\\Desktop\\test.txt")
val wordsRdd = textFile.flatMap(_.split(" "))
val wordsCountRdd = wordsRdd.map((_, 1)).reduceByKey(_+_)
wordsCountRdd.collect().foreach(println)

img

3.2 RDD 依赖关系

img

所有的依赖都要实现trait Dependency[T]:

abstract class Dependency[T] extends Serializable {
  def rdd: RDD[T]
}

窄依赖的具体实现:

abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
  // 返回子RDD的partitionID依赖的所有Parent RDD的Partition(s)
  def getParents(partitionId: Int): Seq[Int]
  override def rdd: RDD[T] = _rdd
}

两种窄依赖的具体实现

  1. OneToOneDependency :一对一依赖
// RDD仅仅依赖于parentRDD相同ID的partition
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int) = List(partitionId)
}
  1. RangeDependency :范围依赖,它仅仅被org.apache.spark.rdd.UnionRDD使用
// Union仅仅是将多个RDD合并成一个RDD,每个Parent RDD中的Partition相对顺序不会变
// 只不过每个Parent RDD在Union RDD中的partion的起始位置不同
  override def getParents(partitionId: Int) = {
    // inStart parentRDD的Partion中的起始位置
    // outStart 是UnionRDD中的起始位置
    // length parentRDD中的Partition的数量
    if (partitionId >= outStart && partitionId < outStart + length) {
      List(partitionId - outStart + inStart)
    } else {
      Nil
    }
  }

宽依赖的实现: ShuffleDependency

class ShuffleDependency[K, V, C](
    @transient _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Option[Serializer] = None,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false)
  extends Dependency[Product2[K, V]] {

  override def rdd = _rdd.asInstanceOf[RDD[Product2[K, V]]]

  val shuffleId: Int = _rdd.context.newShuffleId()

  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, _rdd.partitions.size, this)

  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}

宽依赖支持两种Shuffle Manager

3.2.1 RDD Lineages(生命线,血统)
3.3.2 Transfermations 常见的窄依赖

它们只是将Parition的数据根据规则进行转化,并不做其它处理

只是将数据从一个形式转化成另一种形式

3.2.3 Transfermations 常见的宽依赖

对于groupByKey 子RDD所有Partition会依赖于 parent RDD的所有Partition

子RDD的Partition是parent RDD的所有Partition Shuffle的结果

3.2.4 怎么分辨 Narrow Or Wide Dependency

查看RDD依赖关系 option.dependenices

查看RDD生命线 option.toDebugString

3.2.5 DAG的生成

原始的RDD(S)通过一系列转换就形成了DAG

RDD之间的依赖关系包含RDD由哪些 parent RDD(S)转换而来和它依赖parent RDD(S)的哪些partitions

Spark是如何根据DAG来生成计算任务呢?

3.2.6 RDD 的容错机制

img

如果某个RDD计算错误,子RDD则找其父RDD,重新计算该RDD,如果此时是宽依赖,则会对集群性能造成很大影响。

此时解决的办法,是将数据持久化。

持久化两种机制:

  1. 内存持久(cache)
  2. 检查点(Checkpoint, 磁盘持久化)

4 RDD的计算

4.1 Task简介

Spark的Task分两种:

  1. org.apache.spark.scheduler.ShuffleMapTask
  2. org.apache.spark.scheduler.ResultTask

DAG的最后一个阶段会为每个结果和Partition生成一个ResultTask,其余所有阶段都会生成ShuffleMapTask。生成的Task会被发送到已经启动的Executor上,由Executor来完成计算任务。

4.2 Task执行起点

org.apache.spark.scheduler.Task#run 会调用ShuffleMapTask或者ResultTaskrunTaskrunTask则会调用RDD的org.apache.spark.rdd.RDD#iterator

package org.apache.spark.rdd.RDD
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
      //如果存储级别不是None,那么先检查是否有缓存;没有则开始计算
      SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
    } else {
      computeOrReadCheckpoint(split, context)
    }
  }

在运行时节点所需要的环境信息SparkEnv中, cache-Manager是org.apache.spark.CacheManager, 它负责调用CacheManager来管理RDD缓存,缓存过的RDD接下的运算都可以通过BlockManager来直接读取缓存后返回。

4.3 缓存的处理

  1. cacheManager会通过RDD的ID和当前计算Partition的ID向Storages模块的BlockManager发送请求,
  2. 如果获取到Block信息则直接返回。
  3. 否则,代表该RDD是需要计算的。这个RDD可能是被计算过并被存储到内存中,但是后来内存紧张,这部分被清理了。
  4. 计算结束后,会根据用户定义存储级别,写入BlockManager中。下次计算就可以不经过计算直接读取RDD,提高性能。
package org.apache.spark.CacheManager#getOrCompute
def getOrCompute[T](
      rdd: RDD[T],
      partition: Partition,
      context: TaskContext,
      storageLevel: StorageLevel): Iterator[T] = {

    val key = RDDBlockId(rdd.id, partition.index)
    logDebug(s"Looking for partition $key")
    blockManager.get(key) match { //向blockManager块中是否有缓存
      case Some(blockResult) =>  //缓存命中
        // 更新统计信息, 将缓存作为结果返回
        val inputMetrics = blockResult.inputMetrics
        val existingMetrics = context.taskMetrics
          .getInputMetricsForReadMethod(inputMetrics.readMethod)
        existingMetrics.incBytesRead(inputMetrics.bytesRead)
			
        val iter = blockResult.data.asInstanceOf[Iterator[T]]
        new InterruptibleIterator[T](context, iter) {
          override def next(): T = {
            existingMetrics.incRecordsRead(1)
            delegate.next()
          }
        }
      case None => // 没有缓存命中,则需要计算
        // 加载分区需要获得锁,如果在其它的线程,正持有该锁,则等待它结束,释放该锁。
  		// 判断当前是否有线程正在处理当前的Partition, 如果有则等待它结束后,直接从BlockManage
        // 中读取处理结果,如果没有,则storedValues为None。
        val storedValues = acquireLockForPartition[T](key)
        if (storedValues.isDefined) { //已经被其它线程处理
          return new InterruptibleIterator[T](context, storedValues.get)
        }
        // 需要加载自己的分区,即需要计算
        try {
        // 如果在checkpoint中存在数据,则直接读取。否则调用rdd的compute()开始计算
           val computedValues = rdd.computeOrReadCheckpoint(partition, context)
	     // 如果数据在本地(Driver端),则不要缓存结果。这个主要是为了first, take
         // 这类任务没有shuffle阶段,直接运行在Driver端可能更省时间。 
          if (context.isRunningLocally) {
            return computedValues
          }
          // 将计算机结果写入到BlockManager
          val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
          val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
          // 更新任务的统计信息
          val metrics = context.taskMetrics
          val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
          metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
          new InterruptibleIterator(context, cachedValues)

        } finally {
          loading.synchronized {
            loading.remove(key)
          	// 如果有其它任务正在等待该Partition的处理结果,那么通知它们
            // 结果已经存在BlockManager中(释放锁)
            // (注意:前面的类不会写入到BlockManager的本地任务)
            loading.notifyAll()
          }
        }
    }
  }

4.4 checkpoint的处理

如果缓存没有命中,首先会判断是否保存了RDD的Checkpoint,如果有则读取Checkpoint。

那么checkpoint的数据是如何写入的呢?

  1. 在job结束后,会判断是否需要checkpoint。
  2. 如果需要就调用org.apache.spark.rdd.RDDCheckpointData#doCheckpoint
  3. doCheckpoint首先会创建一个文件目录
  4. 然后,启动一个job来计算,并将计算结果写入新建的目录
  5. 接着创建一个org.apache.spark.rdd.CheckpointRDD
  6. 原始RDD的所有依赖被清除。RDD转换计算链(compute chain)等信息被全部清除。

核心逻辑

package org.apache.spark.rdd.RDDCheckpointData
 // 创建一个保存checkpoint数据的目录
val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)
val fs = path.getFileSystem(rdd.context.hadoopConfiguration)
if (!fs.mkdirs(path)) {
    throw new SparkException("Failed to create checkpoint path " + path)
}
// 创建广播变量
val broadcastedConf = rdd.context.broadcast(
    new SerializableWritable(rdd.context.hadoopConfiguration))
// 开始一个新的job进行计算,计算结果存入path路径
rdd.context.runJob(rdd, CheckpointRDD.writeToFile[T](path.toString, broadcastedConf) _)
val newRDD = new CheckpointRDD[T](rdd.context, path.toString)

// 保存结果,清除原始RDD的依赖、Partition信息等 
RDDCheckpointData.synchronized {
    cpFile = Some(path.toString)
    cpRDD = Some(newRDD) //RDDCheckpointData对应的ChecpointRDD
    // 底层调用的clearDependencies()
    rdd.markCheckpointed(newRDD)   // 清除原始的依赖、parition
    cpState = Checkpointed //mark checkpoint的状态为完成
}

清除了RDD的依赖后,并没有和CheckpointRDD建立联系。Spark是如何确定一个RDD是否被Checkpoint了,d而且正确读取checkpoint的数据呢?

/** An Option holding our checkpoint RDD, if we are checkpointed */
private def checkpointRDD: Option[RDD[T]] = checkpointData.flatMap(_.checkpointRDD)
final def dependencies: Seq[Dependency[_]] = {
    checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
        if (dependencies_ == null) { // 没有checkpoint
            dependencies_ = getDependencies
        }
        dependencies_
    }
}

在上述中,有两个地方调用了computeOrReadCheckpoint

  1. org.apache.spark.rdd.RDD#iterator
  2. org.apache.spark.CacheManager#getOrCompute

computeOrReadCheckpoint主要是检查当前RDD是否被checkpoint是否过,有则直接读取Checkpoint中的数据否则开始计算

 private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
  {
    if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context)
  }

firstParent[T].iterator(split, context) 最终都会调用它的compute

// org.apache.spark.rdd.CheckpointRDD#compute
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
  val file = new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index))
  CheckpointRDD.readFromFile(file, broadcastedConf, context) //读取checkpoint的数据
}

4.5 RDD计算逻辑

RDD的计算逻辑在org.apache.spark.rdd.RDD#compute中实现。每个特定的RDD都会实现compute

package org.apache.spark.rdd.RDD#compute
def compute(split: Partition, context: TaskContext): Iterator[T]

5 Understanding closures (理解闭包函数)

var counter = 0
val data = Array(1, 2, 3, 4, 5, 6)
var rdd = sc.parallelize(data)
// Wrong: Don't do this!!
rdd.foreach(x => counter += x)
// 此时输出的结果为0
println("Counter value: " + counter)

img

此时,可以用全局变量解决该问题:

val accum = sc.accumulator(0, "My Accumulator")
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
accum.value

5 Shuffle operations

不同节点之间的数据传输

在Spark中的某些RDD操作,会触发Shuffle事件。

Shuffle事件一旦产生,就把我们所操作的数据集进行重新分布。

一旦涉及到重新分布,将会耗费大量的IO与网路资源。

ReduceByKey 生产中会遇到的问题

第一列:代表游客的编号 第二列:代表游客旅行地 第三列:代表游客旅行的消费 //统计出每一个用户本月旅行次数以及总消费 (100,2,300)

 val arr2 = Array(
      (100,"Geneva",22.25),
      (100,"Zurich",42.10),
      (200,"Fribourg",12.40),
      (200,"St.Gallen",8.20),
      (300,"Lucerne",31.60),
      (300,"Basel",16.20)
    )
val rdd = sc.parallelize(arr2)
println("次数:" + groupByKeyRDD.count)
groupByKeyRDD.foreach(x => {
    println("ID: " + x._1)
    println("总额:" + x._2.sum)
})

此时就会产生Shuffer:

在进行Shuffle操作前,尽可能的减少每个分区即将进行Shuffle的数据量。(优化技巧)

我现在有上亿条的数据需要进行分组求和、求平均,该如何操作?为什么?

解决方案就是,在执行Shuffle前,尽可能的减少需要交换的数据。

减少Shuffle数据的两种方案?

  1. 选择具有Combiner函数的RDD操作(reduceByKey)

  2. 对业务数据进行数据变换.

    (100,"xxxx",2500)
    (100,(1,2500))
    (2500,(1,100))
    

Shuffle 优化主要目的:

  1. 数据处理(例如:reduceByKey 本地shuffle提速)
  2. RDD容错计算(提速)

6 Spark 经验之谈

如果面对比较复杂数据分析时,通常会将数据转成Tuple

Tuple操作的好处: 列式操作

("class1", 85)
("class1", 100)
("class2", 85)
("class2", 100)