大数据知识体系
首页
数据结构与算法
  • JVM
  • Java
  • Scala
  • Python
设计模式
  • MySQL
  • Redis
  • HDFS
  • HBase
  • ClickHouse
  • ElasticSearch
  • Iceberg
  • Hudi
  • Spark
  • Flink
  • Hive
  • Yarn
  • Zookeeper
  • Maven
  • Git
  • 数据仓库
  • 用户画像
  • 指标体系
数据治理
关于
首页
数据结构与算法
  • JVM
  • Java
  • Scala
  • Python
设计模式
  • MySQL
  • Redis
  • HDFS
  • HBase
  • ClickHouse
  • ElasticSearch
  • Iceberg
  • Hudi
  • Spark
  • Flink
  • Hive
  • Yarn
  • Zookeeper
  • Maven
  • Git
  • 数据仓库
  • 用户画像
  • 指标体系
数据治理
关于
  • Spark

    • Spark 基础
    • Spark Core
      • 一、RDD 基础
        • 1.1 RDD 概念
        • 1.2 RDD 特征
        • 1.3 创建 RDD
        • 1.3.1 并行化集合创建
        • 1.3.2 外部存储创建
      • 二、RDD 算子
        • 2.1 Transformation
        • 2.2 Action
      • 三、RDD 依赖
        • 3.1 窄依赖
        • 3.2 宽依赖
        • 3.3 DAG
      • 四、RDD 持久化
        • 4.1 缓存
        • 4.1.1 缓存级别
        • 4.1.2 缓存方法
        • 4.1.3 释放缓存
        • 4.2 CheckPoint
      • 五、共享变量
        • 5.1 广播变量
        • 5.2 累加器
        • 5.2.1 内置累加器
        • 5.2.2 自定义累加器
      • 六、分区器
        • 6.1 HashPartitoner
        • 6.2 RangePartitioner
        • 6.3 自定义分区器
    • Spark 存储体系源码分析
    • Spark RPC 通信源码分析
    • Spark RDD 源码分析
    • Spark Task 源码分析
    • Spark Stage 源码分析
    • Spark DAGScheduler 源码分析
    • Spark TaskScheduler 源码分析
    • Spark Shuffle
    • Spark AppendOnlyMap
  • Flink

    • Flink 概述
    • Flink 架构
    • Flink 快速入门
    • Flink 安装
    • Flink API
    • Flink 状态管理
    • Flink 架构介绍
    • Flink Window
    • Flink Time WaterMark
    • Flink Table Api
    • Flink Sql
    • Flink CEP
    • Flink 面试题
  • Hive

    • Hive 概述
    • Hive 安装及配置参数
    • Hive 客户端的使用
    • Hive 数据类型
    • Hive DDL
    • Hive 表类型
    • Hive DML
    • Hive DQL
    • Hive 内置函数
    • Hive UDF
    • Hive 视图
    • Hive 索引
    • Hive 事务
    • Hive 文件存储
    • Hive HQL 执行原理
    • Hive 数据倾斜
    • Hive 执行计划
    • Hive 调优
    • Hive 面试题
  • 数据处理
  • Spark
Will
2022-04-03
目录

Spark Core

# 一、RDD 基础

# 1.1 RDD 概念

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合,所有的运算以及操作都建立在 RDD 数据结构的基础之上。

RDD 提供了一个抽象的数据模型,不必担心底层数据的分布式特性,只需将具体的应用逻辑表达为一系列转换操作(函数),不同 RDD 之间的转换操作之间还可以形成依赖关系,进而实现管道化,从而避免了中间结果的存储,大大降低了数据复制、磁盘 IO 和序列化开销,并且还提供了更多的 API(map/reduec/filter/groupBy 等)。

# 1.2 RDD 特征

  • 一组 Partition。Partition 是 RDD 的基本组成单位,每个 Partition 都会被一个 Task 处理,Partition 数量决定任务并行度。用户可以在创建 RDD 的时候指定分区数,如果没有指定则会采用默认值。
  • 一个计算每个分区的函数。RDD 的计算是以 Partition 为单位的,计算函数会作用到每个分区上。
  • RDD 之间的依赖关系。RDD 的每次转换都会生成一个新的 RDD,所以 RDD 之间会形成一个前后依赖关系。在部分分区数据丢失或 Task 执行失败以后可以通过依赖关系重新计算该分区的数据,而不是重新计算所有分区。
  • 一个针对 Key-Value 型 RDD 的分区器(可选的)。对于 Key-Value 型的 RDD 会有一个分区器,非 Key-Value 型的 RDD 分区器是 None。Spark 中内置的有 HashPartitioner 和 RangePartitioner。
  • 一个列表,存储存取每个 Partition 的优先位置(可选的)。对于一个 HDFS 文件来说,这个列表保存的就是每个 Partition 所在的块的位置。按照"移动数据不如移动计算"的理念,Spark 在进行任务调度的时候,会尽可能选择那些存有数据的 worker 节点来进行任务计算。

# 1.3 创建 RDD

在 Spark 中创建 RDD 的创建方式可以分为两种:

  • 从并行化本地集合创建;
  • 从外部存储创建 RDD。

# 1.3.1 并行化集合创建

示例:

val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))

val rdd2 = sc.makeRDD(Array(1,2,3,4,5,6,7,8))
1
2
3

# 1.3.2 外部存储创建

示例:

val rdd = sc.textFile("data.txt")
1

# 二、RDD 算子

Spark 中的算子分为 Transformation 和 Action 两种。Transformation 会返回一个新的 RDD,但是 Transformation 不会立即触发计算,只是记录 RDD 的转换过程,只有调用 Action 算子才会真正触发计算任务。

提示

RDD 中的所有转换都是 lazy 执行的,也就是说并不会直接计算。只有当发生一个要求返回结果给 Driver 的 Action 动作时,这些转换才会真正运行。之所以使用惰性求值/延迟执行,是因为这样可以在 Action 时对 RDD 操作形成 DAG 有向无环图进行 Stage 的划分和并行优化,这种设计让 Spark 更加有效率地运行。

# 2.1 Transformation

  • map(func):返回一个新的 RDD,该 RDD 由每一个输入元素经过 func 函数转换后组成。
  • mapPartitions(func):类似于 map,但独立地在 RDD 的每一个分片上运行,因此在类型为 T 的 RDD 上运行时,func 的函数类型必须是 Iterator[T] => Iterator[U]。假设有 N 个元素,有 M 个分区,那么 map 的函数的将被调用 N 次,而 mapPartitions 被调用 M 次,一个函数一次处理所有分区。

map 和 mapPartition 的区别:

map 每次处理一条数据,mapPartition 每次处理一个分区的数据,这个分区的数据处理完后,原 RDD 中分区的数据才能释放,可能导致 OOM。如果内存足够大可以使用 mapPartition 提高处理效率。

  • mapPartitionsWithIndex(func):类似于 mapPartitions,但 func 带有一个整数参数表示分片的索引值,因此在类型为 T 的 RDD 上运行时,func 的函数类型必须是(Int, Interator[T]) => Iterator[U]。
  • flatMap(func):类似于 map,但是每一个输入元素可以被映射为 0 或多个输出元素(所以 func 应该返回一个序列,而不是单一元素)。
  • glom:将每一个分区形成一个数组,形成新的 RDD 类型时 RDD[Array[T]]。
  • groupBy(func):分组,按照传入函数的返回值进行分组。将相同的 key 对应的值放入一个迭代器。
  • filter(func):过滤。返回一个新的 RDD,该 RDD 由经过 func 函数计算后返回值为 true 的输入元素组成。
  • sample(withReplacement, fraction, seed) :以指定的随机种子随机抽样出数量为 fraction 的数据,withReplacement 表示是抽出的数据是否放回,true 为有放回的抽样,false 为无放回的抽样,seed 用于指定随机数生成器种子。
  • distinct([numTasks])):对源 RDD 进行去重后返回一个新的 RDD。默认情况下,只有 8 个并行任务来操作,但是可以传入一个可选的 numTasks 参数改变它。
  • coalesce(numPartitions):缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。
  • repartition(numPartitions):根据分区数,重新通过网络随机洗牌所有数据。

coalesce 和 repartition 的区别:

  1. coalesce 重新分区,可以选择是否进行 shuffle 过程。由参数 shuffle: Boolean = false/true 决定。
  2. repartition 实际上是调用了 coalesce,一定会产生 shuffle。源码如下:
  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
  }
1
2
3
  • sortBy(func,[ascending], [numTasks]):使用 func 先对数据进行处理,按照处理后的数据比较结果排序,默认为正序。
  • pipe(command, [envVars]):管道,针对每个分区,都执行一个 shell 脚本,返回输出的 RDD。
  • union(otherDataset):对源 RDD 和参数 RDD 求并集后返回一个新的 RDD。
  • subtract (otherDataset):计算差的一种函数,去除两个 RDD 中相同的元素,不同的 RDD 将保留下来。
  • intersection(otherDataset):对源 RDD 和参数 RDD 求交集后返回一个新的 RDD。
  • cartesian(otherDataset):计算笛卡尔积。
  • zip(otherDataset):将两个 RDD 组合成 Key/Value 形式的 RDD,这里默认两个 RDD 的 partition 数量以及元素数量都相同,否则会抛出异常。
  • partitionBy:对 pairRDD 进行分区操作,如果原有的 partionRDD 和现有的 partionRDD 是一致的话就不进行分区,  否则会生成 ShuffleRDD,即会产生 shuffle 过程。
  • groupByKey:groupByKey 也是对每个 key 进行操作,但只生成一个 sequence。
  • reduceByKey(func, [numTasks]):在一个(K,V)的 RDD 上调用,返回一个(K,V)的 RDD,使用指定的 reduce 函数,将相同 key 的值聚合到一起,reduce 任务的个数可以通过第二个可选的参数来设置。

reduceByKey 和 groupByKey 的区别:

  1. reduceByKey 在 shuffle 之前有预聚合操作;
  2. groupByKey 直接进行 shuffle。
  • aggregateByKey:在 kv 对的 RDD 中,,按 key 将 value 进行分组合并,合并时,将每个 value 和初始值作为 seq 函数的参数,进行计算,返回的结果作为一个新的 kv 对,然后再将结果按照 key 进行合并,最后将每个分组的 value 传递给 combine 函数进行计算(先将前两个 value 进行计算,将返回结果和下一个 value 传给 combine 函数,以此类推),将 key 与计算结果作为一个新的 kv 对输出。
  • foldByKey:aggregateByKey 的简化操作,参数 seqop 和 combop 相同
  • combineByKey:对相同 K,把 V 合并成一个集合。
  • sortByKey([ascending], [numTasks]):在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口,返回一个按照 key 进行排序的(K,V)的 RDD。
  • mapValues:针对于(K,V)形式的类型只对 V 进行操作。
  • join(otherDataset, [numTasks]):在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素对在一起的(K,(V,W))的 RDD。
  • cogroup(otherDataset, [numTasks]):在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable< V>,Iterable< W>))类型的 RDD。

# 2.2 Action

  • reduce(func):通过 func 函数聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据。
  • collect():在驱动程序中,以数组的形式返回数据集的所有元素。
  • count():返回 RDD 中元素的个数。
  • first():返回 RDD 中的第一个元素。
  • take(n):返回一个由 RDD 的前 n 个元素组成的数组。
  • takeOrdered(n):返回该 RDD 排序后的前 n 个元素组成的数组。
  • aggregate:将每个分区里面的元素通过 seqOp 和初始值进行聚合,然后用 combine 函数将每个分区的结果和初始值(zeroValue)进行 combine 操作。这个函数最终返回的类型不需要和 RDD 中元素类型一致。
  • fold:aggregate 的简化操作,参数 seqop 和 combop 一样。
  • saveAsTextFile(path):将数据集的元素以 textfile 的形式保存到 HDFS 文件系统或者其他支持的文件系统,对于每个元素,Spark 将会调用 toString 方法,将它装换为文件中的文本。
  • saveAsSequenceFile(path):将数据集中的元素以 Hadoop sequencefile 的格式保存到指定的目录下,可以使 HDFS 或者其他 Hadoop 支持的文件系统。
  • saveAsObjectFile(path):用于将 RDD 中的元素序列化成对象,存储到文件中。
  • countByKey():针对(K,V)类型的 RDD,返回一个(K,Int)的 map,表示每一个 key 对应的元素个数。
  • foreach(func):在数据集的每一个元素上,运行函数 func 进行更新。

# 三、RDD 依赖

RDD 的容错机制是通过将 RDD 间转移操作构建成有向无环图来实现的。从抽象的角度看,RDD 间存在着血统继承关系,其本质上是 RDD 之间的依赖(Dependency)关系。RDD 和它依赖的父 RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(shuffle dependency)。

# 3.1 窄依赖

窄依赖中每个父 RDD 的 Partition 最多被一个子 RDD 的一个 Partition 使用。

# 3.2 宽依赖

宽依赖中多个子 RDD 的 Partition 会依赖同一个父 RDD 对的 Partition,会引起 Shuffle。

# 3.3 DAG

由于计算过程很多时候会有先后顺序,受制于某些任务必须比另一些任务较早执行的限制,必须对任务进行排队,形成一个队列的任务集合,这个队列的任务集合就是 DAG(有向无环图)。

根据 RDD 之间的宽窄依赖关系可以将 DAG 划分成不同的 Stage,对于窄依赖,partition 的转换处理在 Stage 中完成计算。对于宽依赖,由于有 Shuffle 的存在,只能在父 RDD 处理完成后,才能开始接下来的计算,因此宽依赖是划分 Stage 的依据。Stage 是由一组并行的 Task 组成(由于同一个 Stage 内部的 RDD 之间都是窄依赖,窄依赖的 RDD 分区之间是一对一的关系,所以是可以并行的)。

# 四、RDD 持久化

Spark RDD 持久化可以分为 RDD 缓存和 RDD CheckPoint 两种。

# 4.1 缓存

如果在应用程序中多次使用同一个 RDD,可以将该 RDD 缓存起来,该 RDD 只有在第一次计算的时候会根据血缘关系得到分区的数据,在后续其它地方用到该 RDD 的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用。

# 4.1.1 缓存级别

Spark 中关于缓存级别的定义如下:

  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
1
2
3
4
5
6
7
8
9
10
11
12

NONE表示不缓存;DISK_ONLY表示缓存到磁盘中;MEMORY_ONLY表示缓存到 Executor 内存中;MEMORY_AND_DISK表示缓存到 Executor 内存中,如果内存不够则缓存到磁盘;OFF_HEAP表示缓存到系统内存中。

其中_2后缀表示缓存 2 份副本,_SER后缀表示是序列化存储。

# 4.1.2 缓存方法

RDD 通过 persist() 方法或 cache() 方法可以将前面的计算结果缓存。但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 时,该 RDD 将会被缓存在计算节点的内存中,并供后面重用。

  /**
   * Persist this RDD with the default storage level (`MEMORY_ONLY`).
   */
  def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

  /**
   * Persist this RDD with the default storage level (`MEMORY_ONLY`).
   */
  def cache(): this.type = persist()
1
2
3
4
5
6
7
8
9

从源码中可以看到,cache()调用的是persist()方法,缓存级别为MEMORY_ONLY。

注意

如果数据类型是 RDD,cache()缓存级别为MEMORY_ONLY,但如果数据类型是 DataFrame 或 DataSet,则cache()缓存级别为MEMORY_AND_DISK。源码如下:

  /**
   * Persist this Dataset with the default storage level (`MEMORY_AND_DISK`).
   *
   * @group basic
   * @since 1.6.0
   */
  def persist(): this.type = {
    sparkSession.sharedState.cacheManager.cacheQuery(this)
    this
  }

  /**
   * Persist this Dataset with the default storage level (`MEMORY_AND_DISK`).
   *
   * @group basic
   * @since 1.6.0
   */
  def cache(): this.type = persist()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 4.1.3 释放缓存

调用unpersist()方法释放缓存。

# 4.2 CheckPoint

RDD 数据可以持久化,但是持久化/缓存可以把数据放在内存中,虽然是快速的,但是也是最不可靠的;也可以把数据放在磁盘上,也不是完全可靠的!例如磁盘会损坏等。

Checkpoint 的产生就是为了更加可靠的数据持久化,在 Checkpoint 的时候一般把数据放在在 HDFS 上,这就天然的借助了 HDFS 天生的高容错、高可靠来实现数据最大程度上的安全,实现了 RDD 的容错和高可用。

为当前 RDD 设置 Checkpoint。该函数将会创建一个二进制的文件,并存储到 Checkpoint 目录中,该目录是用 SparkContext.setCheckpointDir()设置的。在 Checkpoint 的过程中,该 RDD 的所有依赖于父 RDD 中的信息将全部被移除。对 RDD 进行 Checkpoint 操作并不会马上被执行,必须执行 Action 操作才能触发。

在 Checkpoint 的时候强烈建议先进行 Cache,并且当你 Checkpoint 执行成功了,那么前面所有的 RDD 依赖都会被销毁:

/**
   * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
   * directory set with `SparkContext#setCheckpointDir` and all references to its parent
   * RDDs will be removed. This function must be called before any job has been
   * executed on this RDD. It is strongly recommended that this RDD is persisted in
   * memory, otherwise saving it on a file will require recomputation.
   */
  def checkpoint(): Unit = RDDCheckpointData.synchronized {
    // NOTE: we use a global lock here due to complexities downstream with ensuring
    // children RDD partitions point to the correct parent partitions. In the future
    // we should revisit this consideration.
    if (context.checkpointDir.isEmpty) {
      throw new SparkException("Checkpoint directory has not been set in the SparkContext")
    } else if (checkpointData.isEmpty) {
      checkpointData = Some(new ReliableRDDCheckpointData(this))
    }
  }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

使用示例:

sc.setCheckpointDir("hdfs://localhost:9000/test")

rdd.checkpoint()
1
2
3

缓存和 Checkpoint 的区别:

  1. 存储位置不同
  • 缓存只能保存在本地的磁盘和内存中(或者堆外内存);
  • Checkpoint 可以保存数据到 HDFS 这类可靠的存储上。
  1. 生命周期不同
  • 缓存的 RDD 会在程序结束后会被清除或者手动调用 unpersist 方法;
  • Checkpoint 的 RDD 在程序结束后依然存在,不会被删除。
  1. 血缘管理不同
  • 缓存不会丢失 RDD 的血缘关系,因为内存和磁盘都不可靠;
  • Checkpoint 会斩断依赖链,因为 Checkpoint 会把结果保存在 HDFS 这类存储中,更加的安全可靠,一般不需要回溯依赖链。

# 五、共享变量

在默认情况下,当 Spark 在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本。但是,有时候需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点(Driver Program)之间共享变量。

为了满足这种需求,Spark 提供了两种类型的变量:广播变量(Broadcast Variables)和累加器(Accumulators)。

# 5.1 广播变量

广播变量允许开发人员在每个节点缓存只读变量,而不是在 Task 之间传递这些变量。使用广播变量能够高效地在集群每个节点创建大数据集的副本。同时 Spark 还使用高效的广播算法分发这些变量,从而减少通信的开销。

使用示例:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
1
2
3
4
5

# 5.2 累加器

Spark 提供的 Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator 只提供了累加的功能,即提供了多个 Task 对一个变量并行操作的功能。但是 Task 只能对 Accumulator 进行累加操作,不能读取 Accumulator 的值,只有 Driver 程序可以读取 Accumulator 的值。创建的 Accumulator 变量的值能够在 Spark Web UI 中查看,在创建时应该尽量为其命名。

Spark 内置了三种类型的 Accumulator,分别是:

  • LongAccumulator:累加整数
  • DoubleAccumulator:累加浮点数
  • CollectionAccumulator:累加集合元素

当内置的 Accumulator 无法满足要求时,可以继承org.apache.spark.util.AccumulatorV2实现自定义的累加器。

# 5.2.1 内置累加器

内置累加器使用示例:

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10
1
2
3
4
5
6
7
8
9

# 5.2.2 自定义累加器

步骤:

  1. 继承org.apache.spark.util.AccumulatorV2;
  2. 实现reset()和add()方法;
  3. 在 SparkContext 注册自定义累加器。

官方示例:

class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {

  private val myVector: MyVector = MyVector.createZeroVector

  def reset(): Unit = {
    myVector.reset()
  }

  def add(v: MyVector): Unit = {
    myVector.add(v)
  }
  ...
}


val myVectorAcc = new VectorAccumulatorV2

sc.register(myVectorAcc, "MyVectorAcc1")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 六、分区器

只有 Key-Value 类型的 RDD 才有分区器,非 Key-Value 类型的 RDD 分区器为 None。

可以通过 RDD 的partitioner属性获取分区器,Spark 内置的有两种:HashPartitioner和RangePartitioner。如果内置的分区器不能满足需求,还可以继承org.apache.spark.Partitioner自定义分区器。

# 6.1 HashPartitoner

HashPartitioner 分区的原理:对于给定的 Key,计算其 hashCode,并除以分区的个数取余,如果余数小于 0,则用余数+分区的个数(否则加 0),最后返回的值就是这个 key 所属的分区 ID。

# 6.2 RangePartitioner

HashPartitioner 在极端情况下可能导致数据分区不均匀。RangePartitioner 刚好可以解决这个问题,它的原理是将一定范围内的数映射到某一个分区内,尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,一个分区中的元素肯定都是比另一个分区内的元素小或者大,但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。

# 6.3 自定义分区器

要实现自定义的分区器,需要继承org.apache.spark.Partitioner类并实现下面三个方法:

  • numPartitions: Int,返回创建出来的分区数。
  • getPartition(key: Any): Int,返回给定键的分区编号(0 到 numPartitions-1)。
  • equals():判断相等性的标准方法。Spark 需要用这个方法来检查你的分区器对象是否和其他分区器实例相同,这样 Spark 才可以判断两个 RDD 的分区方式是否相同。
上次更新: 2023/11/01, 03:11:44

← Spark 基础 Spark 存储体系源码分析→

Theme by Vdoing | Copyright © 2022-2023 Will 蜀ICP备2022002285号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式