༺歲月蹉跎༻

只要路是对的,就不怕路远!

0%

Spark调优

1、Explain查看执行计划

  • Spark3.0大版本发布,Spark SQL的优化占比将近50%。SparkSQL取代SparkCore,成为新一代的引擎内核,所有其他子框架如Mllib、Streaming和Graph,都可以共享Spark SQL的性能优化,都能从Spark社区对于SparkSQL的投入中受益。
  • 要优化Spark SQL应用时,一定是要了解SparkSQL执行计划的。发现SQL执行慢的根本原因,才能知道应该在哪儿进行优化,是调整SQL的编写方式、还是用Hint、还是调参,而不是把优化方案拿来试一遍。

1.1 基本语法

1
2
3
4
5
6
7
8
9
10
11
.explain(mode="xxx")

// 案例
sqlway=spark.sql("""
select student.s_id,count(1)
from student
left join score
on student.s_id=score.s_id
group by student.s_id
""")
sqlway.explain(mode="extended")
  • 从3.0开始,explain方法有一个新的参数mode,该参数可以指定执行计划展示格式:
    • explain(mode=”simple”):只展示物理执行计划。
    • explain(mode=”extended”):展示物理执行计划和逻辑执行计划。
    • explain(mode=”codegen”):展示要Codegen生成的可执行Java代码。
    • explain(mode=”cost”):展示优化后的逻辑执行计划以及相关的统计。
    • explain(mode=”formatted”):以分隔的方式输出,它会输出更易读的物理执行计划,并展示每个节点的详细信息。

1.2 执行计划处理流程

image-20240623132301787

  • 核心的执行过程一共有 5 个步骤(这些操作和计划都是Spark SQL自动处理的,会生成以下计划):

    image-20240623132317576

    • Unresolved 逻辑执行计划(Parsed Logical Plan):Parser组件检查SQL语法上是否有问题,然后生成Unresolved(未决断)的逻辑计划,不检查表名、不检查列名。
    • 优化后的逻辑执行计划(Optimized Logical Plan):Catalyst优化器根据各种规则进行优化。
    • 物理执行计划(Physical Plan):
      • HashAggregate运算符表示数据聚合,一般HashAggregate是成对出现,第一个HashAggregate是将执行节点本地的数据进行局部聚合,另一个HashAggregate是将各个分区的数据进一步进行聚合计算。
      • Exchange运算符其实就是shuffle,表示需要在集群上移动数据。很多时候HashAggregate会以Exchange分隔开来。
      • Project运算符是SQL中的投影操作,就是选择列(例如:select name, age…)。
      • BroadcastHashJoin运算符表示通过基于广播方式进行HashJoin。
      • LocalTableScan 运算符就是全表扫描本地的表。

2、资源调优

2.1 资源规划

2.1.1 资源设定考虑

  • 总体原则
    • 以单台服务器128G内存,32线程为例。
    • 先设定单个Executor核数,根据Yarn配置得出每个节点最多的Executor数量,每个节点的yarn 内存/每个节点数量=单个节点的数量。
  • 具体提交参数
    • executor-cores:每个executor的最大核数。根据经验实践,设定在3~6之间比较合理。
    • num-executors:
      • 该参数值 = 每个节点的executor数 * work节点数。
      • 每个node的executor数 = 单节点yarn总核数 / 每个executor的最大cpu核数。
      • 考虑到系统基础服务和HDFS等组件的余量,yarn.nodemanager.resource.cpu-vcores配置为:28,参数executor-cores的值为:4,那么每个node的executor数 = 28/4 = 7 ,假设集群节点为10,那么num-executors = 7 * 10 = 70。
    • executor-memory:
      • 该参数值 = yarn-nodemanager.resource.memory-mb / 每个节点的executor数量。
      • 果yarn的参数配置为100G,那么每个Executor大概就是 100G/7≈14G,同时要注意yarn 配置中每个容器允许的最大内存是否匹配。

2.1.2 内存估算

image-20240623133218253

  • 估算Other内存 = 自定义数据结构 * 每个Executor核数。
  • 估算Storage内存 = 广播变量 + cache/Executor数量。
  • 估算Executor内存 = 每个Executor核数 * (数据集大小/并行度)。

2.1.3 调整内存配置项

  • 一般情况下,各个区域的内存比例保持默认值即可。如果需要更加精确的控制内存分 配,可以按照如下思路:
    • spark.memory.fraction = (估算storage 内存 + 估算Execution内存) / (估算storage内存 + 估算Execution内存 + 估算Other内存)得到spark.memory.storageFraction = (估算storage内存) / (估算storage内存 + 估算Execution内存)
    • 代入公式计算:
      • Storage堆内内存 = (spark.executor.memory–300MB) * spark.memory.fraction * spark.memory.storageFractionExecution。
      • 堆内内存 = (spark.executor.memory–300MB) * spark.memory.fraction * (1-spark.memory.storageFraction)

2.1 持久化和序列化

2.1.1 RDD

  • cache

    image-20240623133822632

    • 打成 jar,提交yarn任务,并在yarn界面查看spark ui。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      object RddCacheDemo {

      def main( args: Array[String] ): Unit = {
      val sparkConf = new SparkConf().setAppName("RddCacheDemo")
      val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)

      val result = sparkSession.sql("select * from sparktuning.course_pay").rdd
      result.cache()
      result.foreachPartition(( p: Iterator[Row] ) => p.foreach(item => println(item.get(0))))
      while (true) {
      //因为历史服务器上看不到,storage内存占用,所以这里加个死循环 不让sparkcontext立马结束
      }
      }

      }
      1
      2
      3
      4
      spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-
      executors 3 --executor-cores 2 --executor-memory 6g --class
      com.atguigu.sparktuning.cache.RddCacheDemo spark-tuning-1.0-SNAPSHOT-jar-
      with-dependencies.jar

      image-20240623134253321

      通过spark ui看到,rdd使用默认cache缓存级别,占用内存2.5GB,并且storage内存还不够,只缓存了29%。

  • kryo+序列化缓存

    • 使用kryo序列化并且使用rdd序列化缓存级别。使用kryo序列化需要修改spark的序列化模式,并且需要进程注册类操作。打成jar包在yarn上运行。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      object RddCacheKryoDemo {

      def main( args: Array[String] ): Unit = {
      val sparkConf = new SparkConf()
      .setAppName("RddCacheKryoDemo")
      // .setMaster("local[*]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .registerKryoClasses(Array(classOf[CoursePay]))


      val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)

      import sparkSession.implicits._
      val result = sparkSession.sql("select * from sparktuning.course_pay").as[CoursePay].rdd
      result.persist(StorageLevel.MEMORY_ONLY_SER)
      result.foreachPartition(( p: Iterator[CoursePay] ) => p.foreach(item => println(item.orderid)))

      while (true) {
      //因为历史服务器上看不到,storage内存占用,所以这里加个死循环 不让sparkcontext立马结束
      }
      }

      }
      1
      2
      3
      4
      spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-
      executors 3 --executor-cores 2 --executor-memory 6g --class
      com.atguigu.sparktuning.cache.RddCacheKryoDemo spark-tuning-1.0-SNAPSHOT-
      jar-with-dependencies.jar

      查看storage所占内存,内存占用减少了1083.6mb并且缓存了100%。使用序列化缓存配合kryo序列化,可以优化存储内存占用。

      image-20240623134712772

      image-20240623134736120

      根据官网的描述,那么可以推断出,如果yarn内存资源充足情况下,使用默认级别MEMORY_ONLY是对CPU的支持最好的。但是序列化缓存可以让体积更小,那么当yarn内存资源不充足情况下可以考虑使用MEMORY_ONLY_SER配合kryo使用序列化缓存。

2.1.2 DataFrame、DataSet

  • cache

    • 提交任务,在yarn上查看spark ui,查看storage内存占用。内存使用612.3mb。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      object DatasetCacheDemo {

      def main( args: Array[String] ): Unit = {
      val sparkConf = new SparkConf().setAppName("DataSetCacheDemo")
      // .setMaster("local[*]")
      val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)


      import sparkSession.implicits._
      val result = sparkSession.sql("select * from sparktuning.course_pay").as[CoursePay]
      result.cache()
      result.foreachPartition(( p: Iterator[CoursePay] ) => p.foreach(item => println(item.orderid)))
      while (true) {
      }

      }

      }
      1
      2
      3
      4
      spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-
      executors 3 --executor-cores 2 --executor-memory 6g --class
      com.atguigu.sparktuning.cache.DatasetCacheDemo spark-tuning-1.0-SNAPSHOT-
      jar-with-dependencies.jar

      image-20240623135025753

      DataSet的cache默认缓存级别与RDD不一样,是MEMORY_AND_DISK。源码:Dataset.cache() -> Dataset.persist() -> CacheManager.cacheQuery()

      image-20240623135254472

  • 序列化缓存

    • DataSet类似RDD,但是并不使用JAVA序列化也不使用Kryo序列化,而是使用一种特有的编码器进行序列化对象。

      image-20240623135352804

      打成jar包,提交yarn。查看spark ui,storage占用内存646.2mb。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      object DatasetCacheSerDemo {

      def main( args: Array[String] ): Unit = {
      val sparkConf = new SparkConf()
      .setAppName("DatasetCacheSerDemo")
      // .setMaster("local[*]")
      val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)

      import sparkSession.implicits._
      val result = sparkSession.sql("select * from sparktuning.course_pay").as[CoursePay]
      result.persist(StorageLevel.MEMORY_AND_DISK_SER)
      result.foreachPartition(( p: Iterator[CoursePay] ) => p.foreach(item => println(item.orderid)))

      while (true) {
      //因为历史服务器上看不到,storage内存占用,所以这里加个死循环 不让sparkcontext立马结束
      }
      }

      }
      1
      2
      3
      4
      spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-
      executors 3 --executor-cores 2 --executor-memory 6g --class
      com.atguigu.sparktuning.cache.DatasetCacheSerDemo spark-tuning-1.0-
      SNAPSHOT-jar-with-dependencies.jar

      image-20240623135523920
      和默认cache缓存级别差别不大。所以DataSet可以直接使用cache。从性能上来讲,DataSet、DataFrame大于RDD,建议开发中使用DataSet、DataFrame。

2.2 CPU优化

2.2.1 CPU低效原因

  • 概念理解

    • 并行度

      • spark.default.parallelism

        image-20240623135956463

        设置RDD的默认并行度,没有设置时,由join、reduceByKey和parallelize等转换决定。

      • spark.sql.shuffle.partitions

        image-20240623140018580

        适用SparkSQL时,Shuffle Reduce阶段默认的并行度,默认200。此参数只能控制Spark sql、DataFrame、DataSet分区个数。不能控制RDD分区个数。

    • 并发度:同时执行的task数。

  • CPU低效原因

    • 并行度较低、数据分片较大容易导致CPU线程挂起。
    • 并行度过高、数据过于分散会让调度开销更多。Executor 接收到TaskDescription之后,首先需要对TaskDescription反序列化才能读取任务信息,然后将任务代码再反序列化得到可执行代码,最后再结合其他任务信息创建TaskRunner。当数据过于分散,分布式任务数量会大幅增加,但每个任务需要处理的数据量却少之又少,就CPU消耗来说,相比花在数据处理上的比例,任务调度上的开销几乎与之分庭抗礼。显然,在这种情况下,CPU的有效利用率也是极低的。

2.2.2 合理利用CPU资源

  • 每个并行度的数据量(总数据量 / 并行度)在(Executor内存 / core数 / 2,Executor内存 / core 数)区间。提交执行:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    object PartitionDemo {
    def main( args: Array[String] ): Unit = {
    val sparkConf = new SparkConf().setAppName("PartitionDemo")
    .set("spark.sql.autoBroadcastJoinThreshold", "-1")//为了演示效果,先禁用了广播join
    // .setMaster("local[*]")
    val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)

    //查询出三张表 并进行join 插入到最终表中
    val saleCourse = sparkSession.sql("select * from sparktuning.sale_course")
    val coursePay = sparkSession.sql("select * from sparktuning.course_pay")
    .withColumnRenamed("discount", "pay_discount")
    .withColumnRenamed("createtime", "pay_createtime")
    val courseShoppingCart = sparkSession.sql("select * from sparktuning.course_shopping_cart")
    .drop("coursename")
    .withColumnRenamed("discount", "cart_discount")
    .withColumnRenamed("createtime", "cart_createtime")

    saleCourse
    .join(courseShoppingCart, Seq("courseid", "dt", "dn"), "right")
    .join(coursePay, Seq("orderid", "dt", "dn"), "left")
    // .coalesce(6)
    // .repartition(6)
    .select("courseid", "coursename", "status", "pointlistid", "majorid", "chapterid", "chaptername", "edusubjectid"
    , "edusubjectname", "teacherid", "teachername", "coursemanager", "money", "orderid", "cart_discount", "sellmoney",
    "cart_createtime", "pay_discount", "paymoney", "pay_createtime", "dt", "dn")
    .write.mode(SaveMode.Overwrite).saveAsTable("sparktuning.salecourse_detail")
    }

    }
    1
    2
    3
    4
    spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-
    executors 3 --executor-cores 4 --executor-memory 6g --class
    com.atguigu.sparktuning.partition.PartitionDemo spark-tuning-1.0-
    SNAPSHOT-jar-with-dependencies.jar
    • 去向yarn申请的executor vcore资源个数为12个(num-executors * executor-cores),如果不修改spark sql分区个数,那么就会像上图所展示存在cpu空转的情况。这个时候需要合理控制shuffle分区个数。如果想要让任务运行的最快当然是一个task对应一个vcore,但是一般不会这样设置,为了合理利用资源,一般会将并行度(task数)设置成并发度(vcore数)的2倍到3倍。

    • 修改参数spark.sql.shuffle.partitions(默认200),根据我们当前任务的提交参数有12个vcore,将此参数设置为24或36为最优效果:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      object PartitionTuning {
      def main( args: Array[String] ): Unit = {
      val sparkConf = new SparkConf().setAppName("PartitionTuning")
      .set("spark.sql.autoBroadcastJoinThreshold", "-1")//为了演示效果,先禁用了广播join
      // .setMaster("local[*]")
      .set("spark.sql.shuffle.partitions", "36")
      val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)

      //查询出三张表 并进行join 插入到最终表中
      val saleCourse = sparkSession.sql("select * from sparktuning.sale_course")
      val coursePay = sparkSession.sql("select * from sparktuning.course_pay")
      .withColumnRenamed("discount", "pay_discount")
      .withColumnRenamed("createtime", "pay_createtime")
      val courseShoppingCart = sparkSession.sql("select * from sparktuning.course_shopping_cart")
      .drop("coursename")
      .withColumnRenamed("discount", "cart_discount")
      .withColumnRenamed("createtime", "cart_createtime")


      saleCourse
      .join(courseShoppingCart, Seq("courseid", "dt", "dn"), "right")
      .join(coursePay, Seq("orderid", "dt", "dn"), "left")
      .select("courseid", "coursename", "status", "pointlistid", "majorid", "chapterid", "chaptername", "edusubjectid"
      , "edusubjectname", "teacherid", "teachername", "coursemanager", "money", "orderid", "cart_discount", "sellmoney",
      "cart_createtime", "pay_discount", "paymoney", "pay_createtime", "dt", "dn")
      .write.mode(SaveMode.Overwrite).saveAsTable("sparktuning.salecourse_detail")
      }

      }
      1
      2
      3
      4
      spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-
      executors 3 --executor-cores 4 --executor-memory 6g --class
      com.atguigu.sparktuning.partition.PartitionTuning spark-tuning-1.0-
      SNAPSHOT-jar-with-dependencies.jar

3、SparkSQL语法优化

  • SparkSQL在整个执行计划处理的过程中,使用了Catalyst优化器。

3.1 基于RBO的优化

  • 在Spark3.0版本中,Catalyst总共有81条优化规则(Rules),分成27组(Batches),其中有些规则会被归类到多个分组里。因此,如果不考虑规则的重复性,27组算下来总共会有129个优化规则。
  • 如果从优化效果的角度出发,这些规则可以归纳到以下3个范畴。

3.1.1 谓词下推(Predicate Pushdown)

  • 将过滤条件的谓词逻辑都尽可能提前执行,减少下游处理的数据量。对应PushDownPredicte优化规则,对于Parquet、ORC这类存储格式,结合文件注脚(Footer)中的统计信息,下推的谓词能够大幅减少数据扫描量,降低磁盘I/O开销。

    1
    2
    3
    4
    spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-
    executors 3 --executor-cores 4 --executor-memory 6g --class
    com.atguigu.sparktuning.PredicateTuning spark-tuning-1.0-SNAPSHOT-jar-
    with-dependencies.jar

3.1.1.1 inner join的谓词下堆

  • Inner join on 左表条件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    object PredicateTuning {
    def main( args: Array[String] ): Unit = {
    val sparkConf = new SparkConf().setAppName("PredicateTunning")
    .setMaster("local[*]") //TODO 要打包提交集群执行,注释掉
    val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)

    sparkSession.sql("use sparktuning;")

    println("=======================================Inner on 左表=======================================")
    val innerStr1 =
    """
    |select
    | l.courseid,
    | l.coursename,
    | r.courseid,
    | r.coursename
    |from sale_course l join course_shopping_cart r
    | on l.courseid=r.courseid and l.dt=r.dt and l.dn=r.dn
    | and l.courseid<2
    """.stripMargin
    sparkSession.sql(innerStr1).show()
    sparkSession.sql(innerStr1).explain(mode = "extended")
    }

    }

    image-20240623151031966

    image-20240623151405002

  • Inner join where 左表条件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    object PredicateTuning {
    def main( args: Array[String] ): Unit = {
    val sparkConf = new SparkConf().setAppName("PredicateTunning")
    .setMaster("local[*]") //TODO 要打包提交集群执行,注释掉
    val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)

    sparkSession.sql("use sparktuning;")

    println("=======================================Inner where 左表=======================================")
    val innerStr2 =
    """
    |select
    | l.courseid,
    | l.coursename,
    | r.courseid,
    | r.coursename
    |from sale_course l join course_shopping_cart r
    | on l.courseid=r.courseid and l.dt=r.dt and l.dn=r.dn
    |where l.courseid<2
    """.stripMargin
    sparkSession.sql(innerStr2).show()
    sparkSession.sql(innerStr2).explain(mode = "extended")
    }

    }

    image-20240623153423443

    image-20240623153553370

可以发现对于内连接的过滤条件不管是写在on还是where后面谓词下推优化都会同时对两张表进行过滤,即使过滤条件只有一张表。

3.1.1.2 外关联的谓词下堆

  • right join的谓词下推规则和left join相反,这里以left join为例。

    image-20240623163048945

    • left join on 左表条件

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      object PredicateTuning {
      def main( args: Array[String] ): Unit = {
      val sparkConf = new SparkConf().setAppName("PredicateTunning")
      .setMaster("local[*]") //TODO 要打包提交集群执行,注释掉
      val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)

      sparkSession.sql("use sparktuning;")

      println("=======================================left on 左表=======================================")
      val leftStr1 =
      """
      |select
      | l.courseid,
      | l.coursename,
      | r.courseid,
      | r.coursename
      |from sale_course l left join course_shopping_cart r
      | on l.courseid=r.courseid and l.dt=r.dt and l.dn=r.dn
      | and l.courseid<2
      """.stripMargin
      sparkSession.sql(leftStr1).show()
      sparkSession.sql(leftStr1).explain(mode = "extended")
      }

      }

      image-20240623154412808

      image-20240623154647727

      可以发现谓词下堆只会过滤右表。

    • left join where 左表条件

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      object PredicateTuning {
      def main( args: Array[String] ): Unit = {
      val sparkConf = new SparkConf().setAppName("PredicateTunning")
      .setMaster("local[*]") //TODO 要打包提交集群执行,注释掉
      val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)

      sparkSession.sql("use sparktuning;")

      println("=======================================left where 左表=======================================")
      val leftStr2 =
      """
      |select
      | l.courseid,
      | l.coursename,
      | r.courseid,
      | r.coursename
      |from sale_course l left join course_shopping_cart r
      | on l.courseid=r.courseid and l.dt=r.dt and l.dn=r.dn
      |where l.courseid<2
      """.stripMargin
      sparkSession.sql(leftStr2).show()
      sparkSession.sql(leftStr2).explain(mode = "extended")
      }

      }

      image-20240623155051136

![image-20240623155508763](Spark调优/image-20240623155508763.png)

> **可以发现谓词下堆对两张表都会过滤。**
  • left join on 右表条件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    object PredicateTuning {
    def main( args: Array[String] ): Unit = {
    val sparkConf = new SparkConf().setAppName("PredicateTunning")
    .setMaster("local[*]") //TODO 要打包提交集群执行,注释掉
    val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)

    sparkSession.sql("use sparktuning;")

    println("=======================================left on 右表=======================================")
    val leftStr3 =
    """
    |select
    | l.courseid,
    | l.coursename,
    | r.courseid,
    | r.coursename
    |from sale_course l left join course_shopping_cart r
    | on l.courseid=r.courseid and l.dt=r.dt and l.dn=r.dn
    | and r.courseid<2
    """.stripMargin
    sparkSession.sql(leftStr3).show()
    sparkSession.sql(leftStr3).explain(mode = "extended")
    }

    }

    image-20240623160420300

    image-20240623160511442

    可以发现谓词下堆只会过滤右表。

  • left join where 右表条件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    object PredicateTuning {
    def main( args: Array[String] ): Unit = {
    val sparkConf = new SparkConf().setAppName("PredicateTunning")
    .setMaster("local[*]") //TODO 要打包提交集群执行,注释掉
    val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)

    sparkSession.sql("use sparktuning;")

    println("=======================================left where 右表=======================================")
    val leftStr4 =
    """
    |select
    | l.courseid,
    | l.coursename,
    | r.courseid,
    | r.coursename
    |from sale_course l left join course_shopping_cart r
    | on l.courseid=r.courseid and l.dt=r.dt and l.dn=r.dn
    |where r.courseid<2 + 3
    """.stripMargin
    sparkSession.sql(leftStr4).show()
    sparkSession.sql(leftStr4).explain(mode = "extended")
    }

    }

    image-20240623161202892

    image-20240623161323534

    可以发现谓词下堆对两张表都会过滤。

注意:外关联时,过滤条件写在onwhere,结果是不一样的!

3.1.2 列剪裁(Column Pruning)

  • 列剪裁就是扫描数据源的时候,只读取那些与查询相关的字段。

3.1.3 常量替换(Constant Folding)

  • 假设我们在年龄上加的过滤条件是 “age < 12 + 18”,Catalyst会使用ConstantFolding规则,自动帮我们把条件变成 “age < 30”。再比如,我们在 select 语句中,掺杂了一些常量表达式,Catalyst也会自动地用表达式的结果进行替换。

3.2 基于CBO的优化

  • CBO优化主要在物理计划层面,原理是计算所有可能的物理计划的代价,并挑选出代价最小的物理执行计划。充分考虑了数据本身的特点(如大小、分布)以及操作算子的特 点(中间结果集的分布及大小)及代价,从而更好的选择执行代价最小的物理执行计划。
  • 而每个执行节点的代价,分为两个部分:
    • 该执行节点对数据集的影响,即该节点输出数据集的大小与分布。
    • 该执行节点操作算子的代价。
  • 每个操作算子的代价相对固定,可用规则来描述。而执行节点输出数据集的大小与分布,分为两个部分:
    • 初始数据集,也即原始表,其数据集的大小与分布可直接通过统计得到;
    • 中间节点输出数据集的大小与分布可由其输入数据集的信息与操作本身的特点推算。

3.2.1 Statistics收集

  • 需要先执行特定的SQL语句来收集所需的表和列的统计信息。

    • 生成表级别统计信息(扫表):

      1
      2
      ANALYZE TABLE 表名 COMPUTE STATISTICS
      -- 生成 sizeInBytes 和 rowCount。

      使用ANALYZE语句收集统计信息时,无法计算非HDFS数据源的表的文件大小。

      image-20240630141917712

    • 生成表级别统计信息(不扫表):

      1
      2
      ANALYZE TABLE src COMPUTE STATISTICS NOSCAN
      -- 只生成 sizeInBytes,如果原来已经生成过 sizeInBytes 和 rowCount,而本次生成的sizeInBytes 和原来的大小一样,则保留 rowCount(若存在),否则清除 rowCount。
    • 生成列级别统计信息:

      1
      2
      ANALYZE TABLE 表名 COMPUTE STATISTICS FOR COLUMNS1,列 2,列 3
      -- 生成列统计信息,为保证一致性,会同步更新表统计信息。目前不支持复杂数据类型(如 Seq, Map 等)和 HiveStringType 的统计信息生成。

      image-20240630142118792

    • 显示统计信息:

      1
      2
      DESC FORMATTED 表名
      -- 在 Statistics 中会显示“xxx bytes, xxx rows”分别表示表级别的统计信息。

      image-20240630142157538

      也可以通过如下命令显示列统计信息:

      1
      DESC FORMATTED 表名 列名

      执行:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      object StaticsCollect {


      def main( args: Array[String] ): Unit = {
      val sparkConf = new SparkConf().setAppName("CBOTunning")
      .setMaster("local[*]") //TODO 要打包提交集群执行,注释掉
      val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)

      AnalyzeTableAndColumn(sparkSession, "sparktuning.sale_course", "courseid,dt,dn")
      AnalyzeTableAndColumn(sparkSession, "sparktuning.course_shopping_cart", "courseid,orderid,dt,dn")
      AnalyzeTableAndColumn(sparkSession, "sparktuning.course_pay", "orderid,dt,dn")

      }

      def AnalyzeTableAndColumn( sparkSession: SparkSession, tableName: String, columnListStr: String ): Unit = {
      //TODO 查看 表级别 信息
      println("=========================================查看" + tableName + "表级别 信息========================================")
      sparkSession.sql("DESC FORMATTED " + tableName).show(100)
      //TODO 统计 表级别 信息
      println("=========================================统计 " + tableName + "表级别 信息========================================")
      sparkSession.sql("ANALYZE TABLE " + tableName + " COMPUTE STATISTICS").show()
      //TODO 再查看 表级别 信息
      println("======================================查看统计后 " + tableName + "表级别 信息======================================")
      sparkSession.sql("DESC FORMATTED " + tableName).show(100)


      //TODO 查看 列级别 信息
      println("=========================================查看 " + tableName + "表的" + columnListStr + "列级别 信息========================================")
      val columns: Array[String] = columnListStr.split(",")
      for (column <- columns) {
      sparkSession.sql("DESC FORMATTED " + tableName + " " + column).show()
      }
      //TODO 统计 列级别 信息
      println("=========================================统计 " + tableName + "表的" + columnListStr + "列级别 信息========================================")
      sparkSession.sql(
      s"""
      |ANALYZE TABLE ${tableName}
      |COMPUTE STATISTICS
      |FOR COLUMNS $columnListStr
      """.stripMargin).show()
      //TODO 再查看 列级别 信息
      println("======================================查看统计后 " + tableName + "表的" + columnListStr + "列级别 信息======================================")
      for (column <- columns) {
      sparkSession.sql("DESC FORMATTED " + tableName + " " + column).show()
      }
      }
      }
      1
      2
      3
      4
      spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-
      executors 3 --executor-cores 4 --executor-memory 6g --class
      com.atguigu.sparktuning.cbo.StaticsCollect spark-tuning-1.0-SNAPSHOT-jar-
      with-dependencies.jar

3.2.2 使用CBO

  • 通过”spark.sql.cbo.enabled”来开启,默认是false。配置开启CBO后,CBO优化器可以基于表和列的统计信息,进行一系列的估算,最终选择出最优的查询计划。比如:Build侧选择、优化Join类型、优化多表Join顺序等。

    image-20240630142518483

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    object CBOTuning {


    def main( args: Array[String] ): Unit = {
    val sparkConf = new SparkConf().setAppName("CBOTuning")
    .set("spark.sql.cbo.enabled", "true")
    // .set("spark.sql.cbo.joinReorder.enabled", "true")
    // .setMaster("local[*]") //TODO 要打包提交集群执行,注释掉
    val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)


    val sqlstr =
    """
    |select
    | csc.courseid,
    | sum(cp.paymoney) as coursepay
    |from course_shopping_cart csc,course_pay cp
    |where csc.orderid=cp.orderid
    |and cp.orderid ='odid-0'
    |group by csc.courseid
    """.stripMargin

    sparkSession.sql("use sparktuning;")
    sparkSession.sql(sqlstr).show()

    while (true){}

    }
    }
    1
    2
    3
    4
    spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-
    executors 3 --executor-cores 4 --executor-memory 4g --class
    com.atguigu.sparktuning.cbo.CBOTuning spark-tuning-1.0-SNAPSHOT-jar-with-
    dependencies.jar

3.3 广播Join

  • Spark join策略中,如果当一张小表足够小并且可以先缓存到内存中,那么可以使用Broadcast Hash Join,其原理就是先将小表聚合到driver端,再广播到各个大表分区中,那么再次进行join的时候,就相当于大表的各自分区的数据与小表进行本地join,从而规避了shuffle。

    • 通过参数指定自动广播。广播join默认值为10MB,由spark.sql.autoBroadcastJoinThreshold参数控制。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      object AutoBroadcastJoinTuning {


      def main( args: Array[String] ): Unit = {
      val sparkConf = new SparkConf().setAppName("BroadcastJoinTuning")
      // .set("spark.sql.autoBroadcastJoinThreshold","10m")
      .setMaster("local[*]") //TODO 要打包提交集群执行,注释掉
      val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)


      val sqlstr =
      """
      |select
      | sc.courseid,
      | csc.courseid
      |from sale_course sc join course_shopping_cart csc
      |on sc.courseid=csc.courseid
      """.stripMargin

      sparkSession.sql("use sparktuning;")
      sparkSession.sql(sqlstr).show()

      while(true){}

      }
      }
      1
      2
      3
      4
      spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-
      executors 3 --executor-cores 2 --executor-memory 4g --class
      com.atguigu.sparktuning.join.AutoBroadcastJoinTuning spark-tuning-1.0-
      SNAPSHOT-jar-with-dependencies.jar
    • 强行广播。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      object ForceBroadcastJoinTuning {


      def main( args: Array[String] ): Unit = {
      val sparkConf = new SparkConf().setAppName("ForceBroadcastJoinTuning")
      .set("spark.sql.autoBroadcastJoinThreshold","-1") // 关闭自动广播
      .setMaster("local[*]") //TODO 要打包提交集群执行,注释掉
      val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)


      //TODO SQL Hint方式
      val sqlstr1 =
      """
      |select /*+ BROADCASTJOIN(sc) */
      | sc.courseid,
      | csc.courseid
      |from sale_course sc join course_shopping_cart csc
      |on sc.courseid=csc.courseid
      """.stripMargin

      val sqlstr2 =
      """
      |select /*+ BROADCAST(sc) */
      | sc.courseid,
      | csc.courseid
      |from sale_course sc join course_shopping_cart csc
      |on sc.courseid=csc.courseid
      """.stripMargin

      val sqlstr3 =
      """
      |select /*+ MAPJOIN(sc) */
      | sc.courseid,
      | csc.courseid
      |from sale_course sc join course_shopping_cart csc
      |on sc.courseid=csc.courseid
      """.stripMargin



      sparkSession.sql("use sparktuning;")
      println("=======================BROADCASTJOIN Hint=============================")
      sparkSession.sql(sqlstr1).explain()
      println("=======================BROADCAST Hint=============================")
      sparkSession.sql(sqlstr2).explain()
      println("=======================MAPJOIN Hint=============================")
      sparkSession.sql(sqlstr3).explain()

      // TODO API的方式
      val sc: DataFrame = sparkSession.sql("select * from sale_course").toDF()
      val csc: DataFrame = sparkSession.sql("select * from course_shopping_cart").toDF()
      println("=======================DF API=============================")
      import org.apache.spark.sql.functions._
      broadcast(sc)
      .join(csc,Seq("courseid"))
      .select("courseid")
      .explain()
      }
      }
      1
      2
      3
      4
      spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-
      executors 3 --executor-cores 2 --executor-memory 4g --class
      com.atguigu.sparktuning.join.ForceBroadcastJoinTuning spark-tuning-1.0-
      SNAPSHOT-jar-with-dependencies.jar

3.4 SMB Join

  • SMB JOIN是sort merge bucket操作,需要进行分桶,首先会进行排序,然后根据key值合并,把相同key的数据放到同一个bucket中(按照key进行hash)。分桶的目的其实就是把大表化成小表。相同key的数据都在同一个桶中之后,再进行join操作,那么在联合的时候就会大幅度的减小无关项的扫描。使用条件:

    • 两表进行分桶,桶的个数必须相等。
    • 两边进行join时,join列=排序列=分桶列。
  • 不使用SMB Join:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    object BigJoinDemo {

    def main( args: Array[String] ): Unit = {

    val sparkConf = new SparkConf().setAppName("BigJoinDemo")
    .set("spark.sql.shuffle.partitions", "36")
    val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)
    useJoin(sparkSession)

    }


    def useJoin( sparkSession: SparkSession ) = {
    //查询出三张表 并进行join 插入到最终表中
    val saleCourse = sparkSession.sql("select *from sparktuning.sale_course")
    val coursePay = sparkSession.sql("select * from sparktuning.course_pay")
    .withColumnRenamed("discount", "pay_discount")
    .withColumnRenamed("createtime", "pay_createtime")
    val courseShoppingCart = sparkSession.sql("select *from sparktuning.course_shopping_cart")
    .drop("coursename")
    .withColumnRenamed("discount", "cart_discount")
    .withColumnRenamed("createtime", "cart_createtime")

    courseShoppingCart
    .join(coursePay, Seq("orderid"), "left")
    .join(saleCourse, Seq("courseid"), "right")
    .select("courseid", "coursename", "status", "pointlistid", "majorid", "chapterid", "chaptername", "edusubjectid"
    , "edusubjectname", "teacherid", "teachername", "coursemanager", "money", "orderid", "cart_discount", "sellmoney",
    "cart_createtime", "pay_discount", "paymoney", "pay_createtime", "sparktuning.sale_course.dt", "sparktuning.sale_course.dn")
    .write.mode(SaveMode.Overwrite).saveAsTable("sparktuning.salecourse_detail_1")

    }

    }
    1
    2
    3
    4
    spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-
    executors 3 --executor-cores 2 --executor-memory 6g --class
    com.atguigu.sparktuning.join.BigJoinDemo spark-tuning-1.0-SNAPSHOT-jar-
    with-dependencies.jar
  • 使用SMB Join:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    object SMBJoinTuning {

    def main( args: Array[String] ): Unit = {

    val sparkConf = new SparkConf().setAppName("SMBJoinTuning")
    .set("spark.sql.shuffle.partitions", "36")
    val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)
    useSMBJoin(sparkSession)

    }

    def useSMBJoin( sparkSession: SparkSession ) = {
    //查询出三张表 并进行join 插入到最终表中
    val saleCourse = sparkSession.sql("select *from sparktuning.sale_course")
    val coursePay = sparkSession.sql("select * from sparktuning.course_pay_cluster")
    .withColumnRenamed("discount", "pay_discount")
    .withColumnRenamed("createtime", "pay_createtime")
    val courseShoppingCart = sparkSession.sql("select *from sparktuning.course_shopping_cart_cluster")
    .drop("coursename")
    .withColumnRenamed("discount", "cart_discount")
    .withColumnRenamed("createtime", "cart_createtime")

    val tmpdata = courseShoppingCart.join(coursePay, Seq("orderid"), "left")
    val result = broadcast(saleCourse).join(tmpdata, Seq("courseid"), "right")
    result
    .select("courseid", "coursename", "status", "pointlistid", "majorid", "chapterid", "chaptername", "edusubjectid"
    , "edusubjectname", "teacherid", "teachername", "coursemanager", "money", "orderid", "cart_discount", "sellmoney",
    "cart_createtime", "pay_discount", "paymoney", "pay_createtime", "sparktuning.sale_course.dt", "sparktuning.sale_course.dn")
    .write
    .mode(SaveMode.Overwrite)
    .saveAsTable("sparktuning.salecourse_detail_2")

    }
    }
    1
    2
    3
    4
    spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-
    executors 3 --executor-cores 2 --executor-memory 6g --class
    com.atguigu.sparktuning.join.SMBJoinTuning spark-tuning-1.0-SNAPSHOT-jar-
    with-dependencies.jar

4、数据倾斜

4.1 数据倾斜现象

  • 现象:绝大多数task任务运行速度很快,但是就是有那么几个task任务运行极其缓慢,慢慢的可能就接着报内存溢出的问题。

    image-20240630143745946

  • 原因:数据倾斜一般是发生在shuffle类的算子,比如distinct、groupByKey、reduceByKey、 aggregateByKey、join、cogroup 等,涉及到数据重分区,如果其中某一个key数量特别大,就发生了数据倾斜。

4.2 数据倾斜大key定位

  • 从所有key中,把其中每一个key随机取出来一部分,然后进行一个百分比的推算,这是用局部取推算整体,虽然有点不准确,但是在整体概率上来说,我们只需要大概就可以定位那个最多的key了。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    object SampleKeyDemo {

    def main( args: Array[String] ): Unit = {

    val sparkConf = new SparkConf().setAppName("BigJoinDemo")
    .set("spark.sql.shuffle.partitions", "36")
    .setMaster("local[*]")
    val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)

    println("=============================================csc courseid sample=============================================")
    val cscTopKey: Array[(Int, Row)] = sampleTopKey(sparkSession,"sparktuning.course_shopping_cart","courseid")
    println(cscTopKey.mkString("\n"))

    println("=============================================sc courseid sample=============================================")
    val scTopKey: Array[(Int, Row)] = sampleTopKey(sparkSession,"sparktuning.sale_course","courseid")
    println(scTopKey.mkString("\n"))

    println("=============================================cp orderid sample=============================================")
    val cpTopKey: Array[(Int, Row)] = sampleTopKey(sparkSession,"sparktuning.course_pay","orderid")
    println(cpTopKey.mkString("\n"))

    println("=============================================csc orderid sample=============================================")
    val cscTopOrderKey: Array[(Int, Row)] = sampleTopKey(sparkSession,"sparktuning.course_shopping_cart","orderid")
    println(cscTopOrderKey.mkString("\n"))
    }


    def sampleTopKey( sparkSession: SparkSession, tableName: String, keyColumn: String ): Array[(Int, Row)] = {
    val df: DataFrame = sparkSession.sql("select " + keyColumn + " from " + tableName)
    val top10Key = df
    .select(keyColumn).sample(false, 0.1).rdd // 对key不放回采样
    .map(k => (k, 1)).reduceByKey(_ + _) // 统计不同key出现的次数
    .map(k => (k._2, k._1)).sortByKey(false) // 统计的key进行排序
    .take(10)
    top10Key
    }

    }
    1
    2
    3
    4
    spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-
    executors 3 --executor-cores 2 --executor-memory 6g --class
    com.atguigu.sparktuning.join.SampleKeyDemo spark-tuning-1.0-SNAPSHOT-jar-
    with-dependencies.jar

4.3 单表数据倾斜优化

  • 为了减少shuffle数据量以及reduce端的压力,通常Spark SQL在map端会做一个partialaggregate(通常叫做预聚合或者偏聚合),即在shuffle前将同一分区内所属同key的记录先进行一个预结算,再将结果进行shuffle,发送到reduce端做一个汇总,类似MR的提前Combiner,所以执行计划中HashAggregate通常成对出现。

  • 适用场景:聚合类的shuffle操作,部分key数据量较大,且大key的数据分布在很多不同的切片。

  • 解决逻辑:两阶段聚合(加盐局部聚合+去盐全局聚合)。

  • 案例演示:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    object SkewAggregationTuning {
    def main( args: Array[String] ): Unit = {

    val sparkConf = new SparkConf().setAppName("SkewAggregationTuning")
    .set("spark.sql.shuffle.partitions", "36")
    // .setMaster("local[*]")
    val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)

    sparkSession.udf.register("random_prefix", ( value: Int, num: Int ) => randomPrefixUDF(value, num))
    sparkSession.udf.register("remove_random_prefix", ( value: String ) => removeRandomPrefixUDF(value))


    val sql1 =
    """
    |select
    | courseid,
    | sum(course_sell) totalSell
    |from
    | (
    | select
    | remove_random_prefix(random_courseid) courseid,
    | course_sell
    | from
    | (
    | select
    | random_courseid,
    | sum(sellmoney) course_sell
    | from
    | (
    | select
    | random_prefix(courseid, 6) random_courseid,
    | sellmoney
    | from
    | sparktuning.course_shopping_cart
    | ) t1
    | group by random_courseid
    | ) t2
    | ) t3
    |group by
    | courseid
    """.stripMargin


    val sql2=
    """
    |select
    | courseid,
    | sum(sellmoney)
    |from sparktuning.course_shopping_cart
    |group by courseid
    """.stripMargin

    sparkSession.sql(sql1).show(10000)


    // while(true){}
    }


    def randomPrefixUDF( value: Int, num: Int ): String = {
    new Random().nextInt(num).toString + "_" + value
    }

    def removeRandomPrefixUDF( value: String ): String = {
    value.toString.split("_")(1)
    }
    }
    1
    2
    3
    4
    spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-
    executors 3 --executor-cores 2 --executor-memory 6g --class
    com.atguigu.sparktuning.skew.SkewAggregationTuning spark-tuning-1.0-
    SNAPSHOT-jar-with-dependencies.jar

4.4 Join数据倾斜优化

4.4.1 广播Join

  • 适用场景:适用于小表join大表。小表足够小,可被加载进Driver并通过Broadcast方法广播到各个Executor中。

  • 解决逻辑:在小表join大表时如果产生数据倾斜,那么广播join可以直接规避掉此shuffle阶段。直接优化掉stage。并且广播join也是SparkSql中最常用的优化方案。

  • 案例演示:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    object SkewMapJoinTuning {
    def main( args: Array[String] ): Unit = {
    val sparkConf = new SparkConf().setAppName("SkewMapJoinTuning")
    .set("spark.sql.autoBroadcastJoinThreshold", "10m")
    // .setMaster("local[*]")
    .set("spark.sql.shuffle.partitions", "36")
    val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)

    //查询出三张表 并进行join 插入到最终表中
    val saleCourse = sparkSession.sql("select * from sparktuning.sale_course")
    val coursePay = sparkSession.sql("select * from sparktuning.course_pay")
    .withColumnRenamed("discount", "pay_discount")
    .withColumnRenamed("createtime", "pay_createtime")
    val courseShoppingCart = sparkSession.sql("select * from sparktuning.course_shopping_cart")
    .drop("coursename")
    .withColumnRenamed("discount", "cart_discount")
    .withColumnRenamed("createtime", "cart_createtime")


    saleCourse
    .join(courseShoppingCart, Seq("courseid", "dt", "dn"), "right")
    .join(coursePay, Seq("orderid", "dt", "dn"), "left")
    .select("courseid", "coursename", "status", "pointlistid", "majorid", "chapterid", "chaptername", "edusubjectid"
    , "edusubjectname", "teacherid", "teachername", "coursemanager", "money", "orderid", "cart_discount", "sellmoney",
    "cart_createtime", "pay_discount", "paymoney", "pay_createtime", "dt", "dn")
    .write.mode(SaveMode.Overwrite).saveAsTable("sparktuning.salecourse_detail")
    }

    }
    1
    2
    3
    spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-
    executors 3 --executor-cores 2 --executor-memory 6g --class
    com.atguigu.sparktuning.skew.SkewMapJoinTuning spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

4.4.2 拆分大key 打散大表 扩容小表

  • 适用场景:适用于join时出现数据倾斜。

  • 解决逻辑:

    • 将存在倾斜的表,根据抽样结果,拆分为倾斜key(skew)和没有倾斜key(common)的两个数据集。
    • 将skew表的key全部加上随机前缀,然后对另外一个不存在严重数据倾斜的数据集(old表)整体与随机前缀集作笛卡尔乘积(即将数据量扩大N倍,得到new表)。
    • 打散的skew表 join 扩容的new表 union common表 join old表
  • 以下为打散大key和扩容小表的实现思路:

    • 打散大表:实际就是数据一进一出进行处理,对大key前拼上随机前缀实现打散。
    • 扩容小表:实际就是将DataFrame中每一条数据,转成一个集合,并往这个集合里循环添加10条数据,最后使用flatmap压平此集合,达到扩容的效果。
  • 案例演示:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    object SkewJoinTuning {
    def main( args: Array[String] ): Unit = {
    val sparkConf = new SparkConf().setAppName("SkewJoinTuning")
    .set("spark.sql.autoBroadcastJoinThreshold", "-1")
    .set("spark.sql.shuffle.partitions", "36")
    .setMaster("local[*]")
    val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)

    scatterBigAndExpansionSmall(sparkSession)

    // while(true){}
    }


    /**
    * 打散大表 扩容小表 解决数据倾斜
    *
    * @param sparkSession
    */
    def scatterBigAndExpansionSmall( sparkSession: SparkSession ): Unit = {
    import sparkSession.implicits._
    val saleCourse = sparkSession.sql("select *from sparktuning.sale_course")
    val coursePay = sparkSession.sql("select * from sparktuning.course_pay")
    .withColumnRenamed("discount", "pay_discount")
    .withColumnRenamed("createtime", "pay_createtime")
    val courseShoppingCart = sparkSession.sql("select * from sparktuning.course_shopping_cart")
    .withColumnRenamed("discount", "cart_discount")
    .withColumnRenamed("createtime", "cart_createtime")

    // TODO 1、拆分 倾斜的key
    val commonCourseShoppingCart: Dataset[Row] = courseShoppingCart.filter(item => item.getAs[Long]("courseid") != 101 && item.getAs[Long]("courseid") != 103)
    val skewCourseShoppingCart: Dataset[Row] = courseShoppingCart.filter(item => item.getAs[Long]("courseid") == 101 || item.getAs[Long]("courseid") == 103)

    //TODO 2、将倾斜的key打散 打散36份
    val newCourseShoppingCart = skewCourseShoppingCart.mapPartitions(( partitions: Iterator[Row] ) => {
    partitions.map(item => {
    val courseid = item.getAs[Long]("courseid")
    val randInt = Random.nextInt(36)
    CourseShoppingCart(courseid, item.getAs[String]("orderid"),
    item.getAs[String]("coursename"), item.getAs[String]("cart_discount"),
    item.getAs[String]("sellmoney"), item.getAs[String]("cart_createtime"),
    item.getAs[String]("dt"), item.getAs[String]("dn"), randInt + "_" + courseid)
    })
    })
    //TODO 3、小表进行扩容 扩大36倍
    val newSaleCourse = saleCourse.flatMap(item => {
    val list = new ArrayBuffer[SaleCourse]()
    val courseid = item.getAs[Long]("courseid")
    val coursename = item.getAs[String]("coursename")
    val status = item.getAs[String]("status")
    val pointlistid = item.getAs[Long]("pointlistid")
    val majorid = item.getAs[Long]("majorid")
    val chapterid = item.getAs[Long]("chapterid")
    val chaptername = item.getAs[String]("chaptername")
    val edusubjectid = item.getAs[Long]("edusubjectid")
    val edusubjectname = item.getAs[String]("edusubjectname")
    val teacherid = item.getAs[Long]("teacherid")
    val teachername = item.getAs[String]("teachername")
    val coursemanager = item.getAs[String]("coursemanager")
    val money = item.getAs[String]("money")
    val dt = item.getAs[String]("dt")
    val dn = item.getAs[String]("dn")
    for (i <- 0 until 36) {
    list.append(SaleCourse(courseid, coursename, status, pointlistid, majorid, chapterid, chaptername, edusubjectid,
    edusubjectname, teacherid, teachername, coursemanager, money, dt, dn, i + "_" + courseid))
    }
    list
    })

    // TODO 4、倾斜的大key 与 扩容后的表 进行join
    val df1: DataFrame = newSaleCourse
    .join(newCourseShoppingCart.drop("courseid").drop("coursename"), Seq("rand_courseid", "dt", "dn"), "right")
    .join(coursePay, Seq("orderid", "dt", "dn"), "left")
    .select("courseid", "coursename", "status", "pointlistid", "majorid", "chapterid", "chaptername", "edusubjectid"
    , "edusubjectname", "teacherid", "teachername", "coursemanager", "money", "orderid", "cart_discount", "sellmoney",
    "cart_createtime", "pay_discount", "paymoney", "pay_createtime", "dt", "dn")


    // TODO 5、没有倾斜大key的部分 与 原来的表 进行join
    val df2: DataFrame = saleCourse
    .join(commonCourseShoppingCart.drop("coursename"), Seq("courseid", "dt", "dn"), "right")
    .join(coursePay, Seq("orderid", "dt", "dn"), "left")
    .select("courseid", "coursename", "status", "pointlistid", "majorid", "chapterid", "chaptername", "edusubjectid"
    , "edusubjectname", "teacherid", "teachername", "coursemanager", "money", "orderid", "cart_discount", "sellmoney",
    "cart_createtime", "pay_discount", "paymoney", "pay_createtime", "dt", "dn")

    // TODO 6、将 倾斜key join后的结果 与 普通key join后的结果,uinon起来
    df1
    .union(df2)
    .write.mode(SaveMode.Overwrite).insertInto("sparktuning.salecourse_detail")
    }


    }
    1
    2
    3
    4
    spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-
    executors 3 --executor-cores 2 --executor-memory 6g --class
    com.atguigu.sparktuning.skew.SkewJoinTuning spark-tuning-1.0-SNAPSHOT-
    jar-with-dependencies.jar

5、Job优化

  • SortShuffle流程:

    image-20240630153405181

5.1 Map端优化

5.1.1 Map端聚合

  • map-side预聚合,就是在每个节点本地对相同的key进行一次聚合操作,类似于MapReduce中的本地combiner。map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。其他节点在拉取所有节点上的相同key时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销。
  • RDD的话建议使用reduceByKey或者aggregateByKey算子来替代掉groupByKey算子。因为reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。
  • SparkSQL本身的HashAggregte就会实现本地预聚合+全局聚合。

5.1.2 读取小文件优化

  • 读取的数据源有很多小文件,会造成查询性能的损耗,大量的数据分片信息以及对应产生的Task元信息也会给Spark Driver的内存造成压力,带来单点问题。

  • 设置参数:

    1
    2
    spark.sql.files.maxPartitionBytes=128MB # 默认128m, 一个分区最大字节数。
    spark.files.openCostInBytes=4194304 # 默认4m,打开一个文件的开销。
  • 案例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    object MapSmallFileTuning {


    def main( args: Array[String] ): Unit = {
    val sparkConf = new SparkConf().setAppName("MapSmallFileTuning")
    .set("spark.files.openCostInBytes", "7194304") //默认4m
    .set("spark.sql.files.maxPartitionBytes", "128MB") //默认128M
    // .setMaster("local[1]") //TODO 要打包提交集群执行,注释掉
    val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)


    sparkSession.sql("select * from sparktuning.course_shopping_cart")
    .write
    .mode(SaveMode.Overwrite)
    .saveAsTable("sparktuning.test")


    // while (true) {}
    }
    }
    1
    2
    3
    4
    spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-
    executors 3 --executor-cores 2 --executor-memory 6g --class
    com.atguigu.sparktuning.map.MapSmallFileTuning spark-tuning-1.0-SNAPSHOT-
    jar-with-dependencies.jar
  • 源码理解:

    • DataSourceScanExec.createNonBucketedReadRDD()

      image-20240630154832433

      • 切片大小= Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)),defaultParallelism就是RDD的并行度。
    • FilePartition. getFilePartitions()

      image-20240630155159413

      • 当(文件1大小+ openCostInBytes)+(文件2大小+ openCostInBytes)+…+(文件n-1大小+ openCostInBytes)+文件n <= maxPartitionBytes时,n个文件可以读入同一个分区,即满足:N个小文件总大小+(N-1)*openCostInBytes<= maxPartitionBytes的话。

5.1.3 增大map溢写时输出流buffer

  • map端Shuffle Write有一个缓冲区,初始阈值5m,超过会尝试增加到2*当前使用内存。如果申请不到内存,则进行溢写。这个参数是internal,指定无效(见下方源码)。也就是说资源足够会自动扩容,所以不需要我们去设置。

  • 溢写时使用输出流缓冲区默认32k,这些缓冲区减少了磁盘搜索和系统调用次数,适当提高可以提升溢写效率。

  • Shuffle文件涉及到序列化,是采取批的方式读写,默认按照每批次1万条去读写。设置得太低会导致在序列化时过度复制,因为一些序列化器通过增长和复制的方式来翻倍内部数据结构。这个参数是internal,指定无效(见下方源码)。

  • 综合以上分析,我们可以调整的就是输出缓冲区的大小。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    object MapFileBufferTuning {

    def main( args: Array[String] ): Unit = {
    val sparkConf = new SparkConf().setAppName("MapFileBufferTuning")
    .set("spark.sql.shuffle.partitions", "36")
    .set("spark.shuffle.file.buffer", "64")//对比 shuffle write 的stage 耗时
    // .set("spark.shuffle.spill.batchSize", "20000")// 不可修改
    // .set("spark.shuffle.spill.initialMemoryThreshold", "104857600")//不可修改
    // .setMaster("local[1]") //TODO 要打包提交集群执行,注释掉
    val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)


    //查询出三张表 并进行join 插入到最终表中
    val saleCourse = sparkSession.sql("select * from sparktuning.sale_course")
    val coursePay = sparkSession.sql("select * from sparktuning.course_pay")
    .withColumnRenamed("discount", "pay_discount")
    .withColumnRenamed("createtime", "pay_createtime")
    val courseShoppingCart = sparkSession.sql("select * from sparktuning.course_shopping_cart")
    .drop("coursename")
    .withColumnRenamed("discount", "cart_discount")
    .withColumnRenamed("createtime", "cart_createtime")

    saleCourse
    .join(courseShoppingCart, Seq("courseid", "dt", "dn"), "right")
    .join(coursePay, Seq("orderid", "dt", "dn"), "left")
    .select("courseid", "coursename", "status", "pointlistid", "majorid", "chapterid", "chaptername", "edusubjectid"
    , "edusubjectname", "teacherid", "teachername", "coursemanager", "money", "orderid", "cart_discount", "sellmoney",
    "cart_createtime", "pay_discount", "paymoney", "pay_createtime", "dt", "dn")
    .write.mode(SaveMode.Overwrite).saveAsTable("sparktuning.salecourse_detail")


    // while (true) {}
    }
    }
    1
    2
    3
    4
    spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-
    executors 3 --executor-cores 2 --executor-memory 6g --class
    com.atguigu.sparktuning.map.MapFileBufferTuning spark-tuning-1.0-
    SNAPSHOT-jar-with-dependencies.jar
  • 源码理解:

    image-20240630160548324

    image-20240630160602371

    image-20240630160611537

5.2 Reduce端优化

5.2.1 合理设置Reduce数

  • 过多的cpu资源出现空转浪费,过少影响任务性能。关于并行度、并发度的相关参数介绍,在2.2.1中已经介绍过。

5.2.2 输出产生小文件优化

  • Join后的结果插入新表

    • join结果插入新表,生成的文件数等于shuffle并行度,默认就是200份文件插入到hdfs上。
    • 解决方式:
      • 可以在插入表数据前进行缩小分区操作来解决小文件过多问题,如coalesce、 repartition算子。
      • 调整shuffle并行度。根据2.2.2的原则来设置。
  • 动态分区插入数据

    • 没有Shuffle的情况下。最差的情况下,每个Task中都有表各个分区的记录,那文件数最终文件数将达到Task数量 * 表分区数。这种情况下是极易产生小文件的。

      1
      2
      INSERT overwrite table A partition ( aa )
      SELECT * FROM B;
    • 有Shuffle的情况下,上面的Task数量就变成了spark.sql.shuffle.partitions(默认值200)。那么最差情况就会有spark.sql.shuffle.partitions * 表分区数。当spark.sql.shuffle.partitions设置过大时,小文件问题就产生了;当 spark.sql.shuffle.partitions设置过小时,任务的并行度就下降了,性能随之受到影响。

    • 最理想的情况是根据分区字段进行shuffle,在上面的sql中加上distribute by aa。把同一分区的记录都哈希到同一个分区中去,由一个Spark的Task进行写入,这样的话只会产 生N个文件,但是这种情况下也容易出现数据倾斜的问题。

    • 解决思路:

      • 结合第4章解决倾斜的思路,在确定哪个分区键倾斜的情况下,将倾斜的分区键单独拎出来。

      • 将入库的SQL拆成(where 分区 != 倾斜分区键)和 (where 分区 = 倾斜分区键)几个部分,非倾斜分区键的部分正常distribute by分区字段,倾斜分区键的部分distribute by随机数,sql如下:

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        //1.非倾斜键部分
        INSERT overwrite table A partition ( aa )
        SELECT *
        FROM B where aa != 大key
        distribute by aa;

        //2.倾斜键部分
        INSERT overwrite table A partition ( aa )
        SELECT *
        FROM B where aa = 大key
        distribute by cast(rand() * 5 as int);
      • 案例实操:

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        object DynamicPartitionSmallFileTuning {


        def main( args: Array[String] ): Unit = {
        val sparkConf = new SparkConf().setAppName("DynamicPartitionSmallFileTuning")
        .set("spark.sql.shuffle.partitions", "36")
        // .setMaster("local[*]") //TODO 要打包提交集群执行,注释掉
        val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)

        // sparkSession.sql(
        // """
        // |CREATE TABLE if not exists `sparktuning`.`dynamic_csc` (
        // | `courseid` BIGINT,
        // | `coursename` STRING,
        // | `createtime` STRING,
        // | `discount` STRING,
        // | `orderid` STRING,
        // | `sellmoney` STRING,
        // | `dt` STRING,
        // | `dn` STRING)
        // |USING parquet
        // |PARTITIONED BY (dt, dn)
        // """.stripMargin)

        // TODO 非倾斜分区写入
        sparkSession.sql(
        """
        |insert overwrite sparktuning.dynamic_csc partition(dt,dn)
        |select * from sparktuning.course_shopping_cart
        |where dt!='20190722' and dn!='webA'
        |distribute by dt,dn
        """.stripMargin)

        // TODO 倾斜分区打散写入
        sparkSession.sql(
        """
        |insert overwrite sparktuning.dynamic_csc partition(dt,dn)
        |select * from sparktuning.course_shopping_cart
        |where dt='20190722' and dn='webA'
        |distribute by cast(rand() * 5 as int)
        """.stripMargin)


        // while (true) {}
        }
        }
        1
        2
        3
        4
        spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-
        executors 3 --executor-cores 2 --executor-memory 6g --class
        com.atguigu.sparktuning.reduce.DynamicPartitionSmallFileTuning spark-
        tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

5.2.3 增大reduce缓冲区,减少拉取次数

  • Spark Shuffle过程中,shuffle reduce task的buffer缓冲区大小决定了reduce task每次能够缓冲的数据量,也就是每次能够拉取的数据量,如果内存资源较为充足,适当增加拉取数据缓冲区的大小,可以减少拉取数据的次数,也就可以减少网络传输的次数,进而提 升性能。

  • reduce端数据拉取缓冲区的大小可以通过spark.reducer.maxSizeInFlight参数进行设置, 默认为 48MB。

  • 源码:BlockStoreShuffleReader.read()

    image-20240706144545925

5.2.4 调节reduce端拉取数据重试次数

  • Spark Shuffle过程中,reduce task拉取属于自己的数据时,如果因为网络异常等原因导 致失败会自动进行重试。对于那些包含了特别耗时的 shuffle 操作的作业,建议增加重试最 大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失 败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的 shuffle 过程,调节该参数可以大幅度提升稳定性。
  • reduce端拉取数据重试次数可以通过spark.shuffle.io.maxRetries参数进行设置,该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败,默认为3。

5.2.5 调节reduce端拉取数据等待间隔

  • Spark Shuffle过程中,reduce task拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试,在一次失败后,会等待一定的时间间隔再进行重试,可以通过加大间隔时长(比如 60s),以增加shuffle操作的稳定性。

  • reduce端拉取数据等待间隔可以通过spark.shuffle.io.retryWait参数进行设置,默认值为5s。

  • 综合5.2.3、5.2.4、5.2.5,案例实操:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    object ReduceShuffleTuning {


    def main( args: Array[String] ): Unit = {
    val sparkConf = new SparkConf().setAppName("ReduceShuffleTuning")
    .set("spark.sql.autoBroadcastJoinThreshold", "-1")//为了演示效果,先禁用了广播join
    .set("spark.sql.shuffle.partitions", "36")
    .set("spark.reducer.maxSizeInFlight", "96m") // reduce缓冲区,默认48m
    .set("spark.shuffle.io.maxRetries", "6") // 重试次数,默认3次
    .set("spark.shuffle.io.retryWait", "60s") // 重试的间隔,默认5s
    // .setMaster("local[*]") //TODO 要打包提交集群执行,注释掉
    val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)


    //查询出三张表 并进行join 插入到最终表中
    val saleCourse = sparkSession.sql("select * from sparktuning.sale_course")
    val coursePay = sparkSession.sql("select * from sparktuning.course_pay")
    .withColumnRenamed("discount", "pay_discount")
    .withColumnRenamed("createtime", "pay_createtime")
    val courseShoppingCart = sparkSession.sql("select * from sparktuning.course_shopping_cart")
    .drop("coursename")
    .withColumnRenamed("discount", "cart_discount")
    .withColumnRenamed("createtime", "cart_createtime")

    saleCourse
    .join(courseShoppingCart, Seq("courseid", "dt", "dn"), "right")
    .join(coursePay, Seq("orderid", "dt", "dn"), "left")
    .select("courseid", "coursename", "status", "pointlistid", "majorid", "chapterid", "chaptername", "edusubjectid"
    , "edusubjectname", "teacherid", "teachername", "coursemanager", "money", "orderid", "cart_discount", "sellmoney",
    "cart_createtime", "pay_discount", "paymoney", "pay_createtime", "dt", "dn")
    .write.mode(SaveMode.Overwrite).saveAsTable("sparktuning.salecourse_detail")


    // while (true) {}
    }
    }
    1
    2
    3
    4
    spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-
    executors 3 --executor-cores 2 --executor-memory 6g --class
    com.atguigu.sparktuning.reduce.ReduceShuffleTuning spark-tuning-1.0-
    SNAPSHOT-jar-with-dependencies.jar

5.2.6 合理利用bypass

  • 当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200)且不需要map端进行合并操作,则shuffle write过程中不会进行排序操作,使用BypassMergeSortShuffleWriter去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。

  • 当你使用SortShuffleManager时,如果确实不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件, 因此shuffle write性能有待提高。

  • 源码分析:

    SortShuffleManager.registerShuffle()

    image-20240706145418137

    image-20240706145426560

    SortShuffleManager.getWriter()

    image-20240706145454012

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    object BypassTuning {


    def main( args: Array[String] ): Unit = {
    val sparkConf = new SparkConf().setAppName("BypassTuning")
    .set("spark.sql.shuffle.partitions", "36")
    // .set("spark.shuffle.sort.bypassMergeThreshold", "30") //bypass阈值,默认200,改成30对比效果
    // .setMaster("local[*]") //TODO 要打包提交集群执行,注释掉
    val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)


    //查询出三张表 并进行join 插入到最终表中
    val saleCourse = sparkSession.sql("select * from sparktuning.sale_course")
    val coursePay = sparkSession.sql("select * from sparktuning.course_pay")
    .withColumnRenamed("discount", "pay_discount")
    .withColumnRenamed("createtime", "pay_createtime")
    val courseShoppingCart = sparkSession.sql("select * from sparktuning.course_shopping_cart")
    .drop("coursename")
    .withColumnRenamed("discount", "cart_discount")
    .withColumnRenamed("createtime", "cart_createtime")

    saleCourse
    .join(courseShoppingCart, Seq("courseid", "dt", "dn"), "right")
    .join(coursePay, Seq("orderid", "dt", "dn"), "left")
    .select("courseid", "coursename", "status", "pointlistid", "majorid", "chapterid", "chaptername", "edusubjectid"
    , "edusubjectname", "teacherid", "teachername", "coursemanager", "money", "orderid", "cart_discount", "sellmoney",
    "cart_createtime", "pay_discount", "paymoney", "pay_createtime", "dt", "dn")
    .write.mode(SaveMode.Overwrite).saveAsTable("sparktuning.salecourse_detail")


    // while (true) {}
    }
    }
    1
    2
    3
    4
    spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-
    executors 3 --executor-cores 2 --executor-memory 6g --class
    com.atguigu.sparktuning.reduce.BypassTuning spark-tuning-1.0-SNAPSHOT-
    jar-with-dependencies.jar

5.3 整体优化

5.3.1 调节数据本地化等待时长

  • 在Spark项目开发阶段,可以使用client模式对程序进行测试,此时,可以在本地看到比较全的日志信息,日志信息中有明确的Task数据本地化的级别,如果大部分都是PROCESS_LOCAL、NODE_LOCAL,那么就无需进行调节,但是如果发现很多的级别都是RACK_LOCAL、ANY,那么需要对本地化的等待时长进行调节,应该是反复调节,每次调节完以后,再来运行观察日志,看看大部分的task的本地化级别有没有提升;看看,整个spark作业的运行时间有没有缩短。

  • 注意过犹不及,不要将本地化等待时长延长地过长,导致因为大量的等待时长,使得Spark作业的运行时间反而增加了。

  • 下面几个参数,默认都是3s,可以改成如下:

    • spark.locality.wait //建议 6s、10s
    • spark.locality.wait.process //建议 60s
    • spark.locality.wait.node //建议 30s
    • spark.locality.wait.rack //建议 20s
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    object LocalityWaitTuning {


    def main( args: Array[String] ): Unit = {
    val sparkConf = new SparkConf().setAppName("LocalityWaitTuning")
    // 分别打包测试
    // .set("spark.locality.wait", "1")
    // .set("spark.locality.wait.process", "1")
    // .set("spark.locality.wait.node", "1")
    // .set("spark.locality.wait.rack", "1")
    // 分别打包测试
    .set("spark.locality.wait", "6s")
    .set("spark.locality.wait.process", "60s")
    .set("spark.locality.wait.node", "30s")
    .set("spark.locality.wait.rack", "20s")
    val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)

    import sparkSession.implicits._
    val ds: Dataset[CoursePay] = sparkSession
    .read.json("/sparkdata/coursepay.log").as[CoursePay]
    ds.cache()
    ds.foreachPartition(( p: Iterator[CoursePay] ) => p.foreach(item => println(item.orderid)))
    }
    }
    1
    2
    3
    4
    spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-
    executors 3 --executor-cores 2 --executor-memory 6g --class
    com.atguigu.sparktuning.job.LocalityWaitTuning spark-tuning-1.0-SNAPSHOT-
    jar-with-dependencies.jar

5.3.2 使用堆外内存

  • 堆外内存参数

    • 讲到堆外内存,就必须去提一个东西,那就是去yarn申请资源的单位,容器。Spark on yarn模式,一个容器到底申请多少内存资源。

    • 一个容器最多可以申请多大资源,是由yarn参数yarn.scheduler.maximum-allocation-mb决定, 需要满足:spark.executor.memoryOverhead + spark.executor.memory + spark.memory.offHeap.size ≤ yarn.scheduler.maximum-allocation-mb

    • 参数解释:

      • spark.executor.memory:提交任务时指定的堆内内存。
      • spark.executor.memoryOverhead:堆外内存参数,内存额外开销。默认开启,默认值为spark.executor.memory*0.1并且会与最小值384mb做对比,取最大值。所以spark on yarn任务堆内内存申请1个g,而实际去yarn申请的内存大于1个g的原因。
      • spark.memory.offHeap.size:堆外内存参数,spark中默认关闭,需要将spark.memory.enable.offheap.enable参数设置为true。
    • 注意:很多网上资料说spark.executor.memoryOverhead包含spark.memory.offHeap.size,这是由版本区别的,仅限于 spark3.0之前的版本。3.0之后就发生改变,实际去yarn申请的内存资源由三个参数相加。

      image-20240706151328481

      image-20240706151338072

    • 测试申请容器上限:

      • yarn.scheduler.maximum-allocation-mb修改为7G,将三个参数设为如下,大于7G,会报错:

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        object SMBJoinTuning {

        def main( args: Array[String] ): Unit = {

        val sparkConf = new SparkConf().setAppName("SMBJoinTuning")
        .set("spark.sql.shuffle.partitions", "36")
        val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)
        useSMBJoin(sparkSession)

        }

        def useSMBJoin( sparkSession: SparkSession ) = {
        //查询出三张表 并进行join 插入到最终表中
        val saleCourse = sparkSession.sql("select *from sparktuning.sale_course")
        val coursePay = sparkSession.sql("select * from sparktuning.course_pay_cluster")
        .withColumnRenamed("discount", "pay_discount")
        .withColumnRenamed("createtime", "pay_createtime")
        val courseShoppingCart = sparkSession.sql("select *from sparktuning.course_shopping_cart_cluster")
        .drop("coursename")
        .withColumnRenamed("discount", "cart_discount")
        .withColumnRenamed("createtime", "cart_createtime")

        val tmpdata = courseShoppingCart.join(coursePay, Seq("orderid"), "left")
        val result = broadcast(saleCourse).join(tmpdata, Seq("courseid"), "right")
        result
        .select("courseid", "coursename", "status", "pointlistid", "majorid", "chapterid", "chaptername", "edusubjectid"
        , "edusubjectname", "teacherid", "teachername", "coursemanager", "money", "orderid", "cart_discount", "sellmoney",
        "cart_createtime", "pay_discount", "paymoney", "pay_createtime", "sparktuning.sale_course.dt", "sparktuning.sale_course.dn")
        .write
        .mode(SaveMode.Overwrite)
        .saveAsTable("sparktuning.salecourse_detail_2")

        }
        }
        1
        2
        3
        4
        5
        spark-submit --master yarn --deploy-mode client --driver-memory 1g  --
        num-executors 3 --executor-cores 4 --conf
        spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=2g --
        executor-memory 5g --class com.atguigu.sparktuning.join.SMBJoinTuning
        spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar
      • 将spark.memory.offHeap.size修改为1g后再次提交:

        1
        2
        3
        4
        5
        spark-submit --master yarn --deploy-mode client --driver-memory 1g  --
        num-executors 3 --executor-cores 4 --conf
        spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=1g --
        executor-memory 5g --class com.atguigu.sparktuning.join.SMBJoinTuning
        spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar
  • 使用堆外缓存

    • 使用堆外内存可以减轻垃圾回收的工作,也加快了复制的速度。

    • 当需要缓存非常大的数据量时,虚拟机将承受非常大的GC压力,因为虚拟机必须检查每个对象是否可以收集并必须访问所有内存页。本地缓存是最快的,但会给虚拟机带来GC压力,所以,当你需要处理非常多 GB 的数据量时可以考虑使用堆外内存来进行优化, 因为这不会给Java垃圾收集器带来任何压力。让 JAVA GC为应用程序完成工作,缓存操作交给堆外。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      object OFFHeapCache {

      def main( args: Array[String] ): Unit = {
      val sparkConf = new SparkConf().setAppName("OFFHeapCache")
      // .setMaster("local[*]")
      val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)
      useOFFHeapMemory(sparkSession)
      }

      def useOFFHeapMemory( sparkSession: SparkSession ): Unit = {
      import sparkSession.implicits._
      val result = sparkSession.sql("select * from sparktuning.course_pay").as[CoursePay]
      // TODO 指定持久化到 堆外内存
      result.persist(StorageLevel.OFF_HEAP)
      result.foreachPartition(( p: Iterator[CoursePay] ) => p.foreach(item => println(item.orderid)))

      // while (true) {}
      }
      }
      1
      2
      3
      4
      5
      spark-submit --master yarn --deploy-mode client --driver-memory 1g  --
      num-executors 3 --executor-cores 4 --conf
      spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=1g --
      executor-memory 5g --class com.atguigu.sparktuning.job.OFFHeapCache
      spark-tuning-1.0-SNAPSHOT-jar-with-dependencies.jar

5.3.3 调节连接等待时长

  • 在Spark作业运行过程中,Executor优先从自己本地关联的BlockManager中获取某份数据,如果本地BlockManager没有的话,会通过TransferService远程连接其他节点上Executor的BlockManager来获取数据。

  • 如果task在运行过程中创建大量对象或者创建的对象较大,会占用大量的内存,这回导致频繁的垃圾回收,但是垃圾回收会导致工作现场全部停止,也就是说,垃圾回收一旦执行,Spark的Executor进程就会停止工作,无法提供相应,此时,由于没有响应,无法建立网络连接,会导致网络连接超时。

  • 在生产环境下,有时会遇到file not found、file lost这类错误,在这种情况下,很有可能是Executor的BlockManager在拉取数据的时候,无法建立连接,然后超过默认的连接等待时长120s后,宣告数据拉取失败,如果反复尝试都拉取不到数据,可能会导致Spark作业的崩溃。这种情况也可能会导致DAGScheduler反复提交几次stage,TaskScheduler反复提交几次task,大大延长了我们的Spark作业的运行时间。

  • 为了避免长时间暂停(如GC)导致的超时,可以考虑调节连接的超时时长,连接等待时长需要在spark-submit脚本中进行设置,设置方式可以在提交时指定:

    1
    --conf spark.core.connection.ack.wait.timeout=300s
  • 调节连接等待时长后,通常可以避免部分的XX文件拉取失败、XX文件lost等报错。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    object AckWaitTuning {

    def main( args: Array[String] ): Unit = {
    val sparkConf = new SparkConf().setAppName("AckWaitTuning")
    // .set("spark.core.connection.ack.wait.timeout", "2s") // 连接超时时间,默认等于spark.network.timeout的值,默认120s
    // .setMaster("local[*]")
    val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)
    useOFFHeapMemory(sparkSession)
    }

    def useOFFHeapMemory( sparkSession: SparkSession ): Unit = {
    import sparkSession.implicits._
    val result = sparkSession.sql("select * from sparktuning.course_pay").as[CoursePay]
    result.cache()
    result.foreachPartition(( p: Iterator[CoursePay] ) => p.foreach(item => println(item.orderid)))

    // while (true) {}
    }
    }
    1
    2
    3
    4
    5
    spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-
    executors 3 --executor-cores 2 --executor-memory 1g --conf
    spark.core.connection.ack.wait.timeout=300s --class
    com.atguigu.sparktuning.job.AckWaitTuning spark-tuning-1.0-SNAPSHOT-jar-
    with-dependencies.jar

6、Spark3.0 AQE

  • Spark在3.0版本推出了AQE(Adaptive Query Execution),即自适应查询执行。AQE是Spark SQL的一种动态优化机制,在运行时,每当Shuffle Map阶段执行完毕,AQE都会结合这个阶段的统计信息,基于既定的规则动态地调整、修正尚未执行的逻辑计划和物理计划,来完成对原始查询语句的运行时优化。

6.1 动态合并分区

  • 在Spark中运行查询处理非常大的数据时,shuffle通常会对查询性能产生非常重要的影响。shuffle是非常昂贵的操作,因为它需要进行网络传输移动数据,以便下游进行计算。

  • 最好的分区取决于数据,但是每个查询的阶段之间的数据大小可能相差很大,这使得该数字难以调整:

    • 如果分区太少,则每个分区的数据量可能会很大,处理这些数据量非常大的分区, 可能需要将数据溢写到磁盘(例如,排序和聚合),降低了查询。
    • 如果分区太多,则每个分区的数据量大小可能很小,读取大量小的网络数据块, 这也会导致I/O效率低而降低了查询速度。拥有大量的 task(一个分区一个task)也会给Spark任务计划程序带来更多负担。
  • 为了解决这个问题,我们可以在任务开始时先设置较多的shuffle分区个数,然后在运行时通过查看shuffle文件统计信息将相邻的小分区合并成更大的分区。

  • 例如,假设正在运行select max(i) from tbl group by j。输入tbl很小,在分组前只有2个分区。那么任务刚初始化时,我们将分区数设置为5,如果没有AQE,Spark将启动五个任务来进行最终聚合,但是其中会有三个非常小的分区,为每个分区启动单独的任务这样 就很浪费。

    image-20240706154052810

  • 取而代之的是,AQE将这三个小分区合并为一个,因此最终聚只需三个task而不是五个:

    image-20240706154126853

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    object AQEPartitionTunning {
    def main( args: Array[String] ): Unit = {
    val sparkConf = new SparkConf().setAppName("AQEPartitionTunning")
    .set("spark.sql.autoBroadcastJoinThreshold", "-1") //为了演示效果,禁用广播join
    .set("spark.sql.adaptive.enabled", "true")
    .set("spark.sql.adaptive.coalescePartitions.enabled", "true") // 合并分区的开关
    .set("spark.sql.adaptive.coalescePartitions.initialPartitionNum","1000") // 初始的并行度
    .set("spark.sql.adaptive.coalescePartitions.minPartitionNum","10") // 合并后的最小分区数
    .set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "20mb") // 合并后的分区,期望有多大
    val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)
    useJoin(sparkSession)
    }

    def useJoin( sparkSession: SparkSession ) = {
    val saleCourse = sparkSession.sql("select *from sparktuning.sale_course")
    val coursePay = sparkSession.sql("select * from sparktuning.course_pay")
    .withColumnRenamed("discount", "pay_discount")
    .withColumnRenamed("createtime", "pay_createtime")
    val courseShoppingCart = sparkSession.sql("select *from sparktuning.course_shopping_cart")
    .drop("coursename")
    .withColumnRenamed("discount", "cart_discount")
    .withColumnRenamed("createtime", "cart_createtime")

    saleCourse.join(courseShoppingCart, Seq("courseid", "dt", "dn"), "right")
    .join(coursePay, Seq("orderid", "dt", "dn"), "left")
    .select("courseid", "coursename", "status", "pointlistid", "majorid", "chapterid", "chaptername", "edusubjectid"
    , "edusubjectname", "teacherid", "teachername", "coursemanager", "money", "orderid", "cart_discount", "sellmoney",
    "cart_createtime", "pay_discount", "paymoney", "pay_createtime", "dt", "dn")
    .write.mode(SaveMode.Overwrite).insertInto("sparktuning.salecourse_detail_1")
    }

    }
    1
    2
    3
    4
    spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-
    executors 3 --executor-cores 2 --executor-memory 2g --class
    com.atguigu.sparktuning.aqe.AQEPartitionTunning spark-tuning-1.0-
    SNAPSHOT-jar-with-dependencies.jar

    结合动态申请资源:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    object DynamicAllocationTunning {
    def main( args: Array[String] ): Unit = {
    val sparkConf = new SparkConf().setAppName("DynamicAllocationTunning")
    .set("spark.sql.autoBroadcastJoinThreshold", "-1")
    .set("spark.sql.adaptive.enabled", "true")
    .set("spark.sql.adaptive.coalescePartitions.enabled", "true")
    .set("spark.sql.adaptive.coalescePartitions.initialPartitionNum","1000")
    .set("spark.sql.adaptive.coalescePartitions.minPartitionNum","10")
    .set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "20mb")
    .set("spark.dynamicAllocation.enabled","true") // 动态申请资源
    .set("spark.dynamicAllocation.shuffleTracking.enabled","true") // shuffle动态跟踪
    val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)
    useJoin(sparkSession)
    }

    def useJoin( sparkSession: SparkSession ) = {
    val saleCourse = sparkSession.sql("select *from sparktuning.sale_course")
    val coursePay = sparkSession.sql("select * from sparktuning.course_pay")
    .withColumnRenamed("discount", "pay_discount")
    .withColumnRenamed("createtime", "pay_createtime")
    val courseShoppingCart = sparkSession.sql("select *from sparktuning.course_shopping_cart")
    .drop("coursename")
    .withColumnRenamed("discount", "cart_discount")
    .withColumnRenamed("createtime", "cart_createtime")
    saleCourse.join(courseShoppingCart, Seq("courseid", "dt", "dn"), "right")
    .join(coursePay, Seq("orderid", "dt", "dn"), "left")
    .select("courseid", "coursename", "status", "pointlistid", "majorid", "chapterid", "chaptername", "edusubjectid"
    , "edusubjectname", "teacherid", "teachername", "coursemanager", "money", "orderid", "cart_discount", "sellmoney",
    "cart_createtime", "pay_discount", "paymoney", "pay_createtime", "dt", "dn")
    .write.mode(SaveMode.Overwrite).insertInto("sparktuning.salecourse_detail_1")
    }

    }
    1
    2
    3
    4
    spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-
    executors 3 --executor-cores 2 --executor-memory 2g --class
    com.atguigu.sparktuning.aqe.DynamicAllocationTunning spark-tuning-1.0-
    SNAPSHOT-jar-with-dependencies.jar

6.2 动态切换Join策略

  • Spark支持多种join策略,其中如果join的一张表可以很好的插入内存,那么broadcast hash join通常性能最高。因此,spark join中,如果小表小于广播大小阀值(默认10mb),Spark将计划进行broadcast hash join。但是,很多事情都会使这种大小估计出错(例如,存在选择性很高的过滤器),或者join关系是一系列的运算符而不是简单的扫描表操作。

  • 为了解决此问题,AQE现在根据最准确的join大小运行时重新计划join策略。从下图实例中可以看出,发现连接的右侧表比左侧表小的多,并且足够小可以进行广播,那么AQE会重新优化,将sort merge join转换成为broadcast hash join。

    image-20240706160519973

  • 对于运行是的broadcast hash join,可以将shuffle优化成本地shuffle,优化掉stage减少网络传输。Broadcast hash join可以规避shuffle阶段,相当于本地join。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    object AqeDynamicSwitchJoin {
    def main( args: Array[String] ): Unit = {
    val sparkConf = new SparkConf().setAppName("AqeDynamicSwitchJoin")
    .set("spark.sql.adaptive.enabled", "true")
    .set("spark.sql.adaptive.localShuffleReader.enabled", "true") //在不需要进行shuffle重分区时,尝试使用本地shuffle读取器。将sort-meger join 转换为广播join
    val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)
    switchJoinStartegies(sparkSession)
    }


    def switchJoinStartegies( sparkSession: SparkSession ) = {
    val coursePay = sparkSession.sql("select * from sparktuning.course_pay")
    .withColumnRenamed("discount", "pay_discount")
    .withColumnRenamed("createtime", "pay_createtime")
    .where("orderid between 'odid-9999000' and 'odid-9999999'")
    val courseShoppingCart = sparkSession.sql("select *from sparktuning.course_shopping_cart")
    .drop("coursename")
    .withColumnRenamed("discount", "cart_discount")
    .withColumnRenamed("createtime", "cart_createtime")
    val tmpdata = coursePay.join(courseShoppingCart, Seq("orderid"), "right")
    tmpdata.show()
    }
    }
    1
    2
    3
    spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-
    executors 3 --executor-cores 4 --executor-memory 2g --class com.atguigu.sparktuning.aqe.AqeDynamicSwitchJoin spark-tuning-1.0-
    SNAPSHOT-jar-with-dependencies.jar

6.3 动态优化Join倾斜

  • 当数据在群集中的分区之间分布不均匀时,就会发生数据倾斜。严重的倾斜会大大降低查询性能,尤其对于join。AQEskewjoin优化会从随机shuffle文件统计信息自动检测到这种倾斜。然后它将倾斜分区拆分成较小的子分区。

  • 例如,下图A join B,A表中分区A0明显大于其他分区。

    image-20240706162512510
    因此,skew join会将A0分区拆分成两个子分区,并且对应连接B0分区。

    image-20240706162710409
    没有这种优化,会导致其中一个分区特别耗时拖慢整个stage,有了这个优化之后每个task耗时都会大致相同,从而总体上获得更好的性能。

  • 可以采取第4章提到的解决方式,3.0有了AQE机制就可以交给Spark自行解决。 Spark3.0增加了以下参数。

    • spark.sql.adaptive.skewJoin.enabled:是否开启倾斜join检测,如果开启了,那么会将倾斜的分区数据拆成多个分区,默认是开启的,但是得打开aqe。
    • spark.sql.adaptive.skewJoin.skewedPartitionFactor:默认值5,此参数用来判断分区数据量是否数据倾斜,当任务中最大数据量分区对应的数据量大于的分区中位数乘以此参数, 并且也大于spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 参数,那么此任务是数据倾斜。
    • spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes:默认值256mb,用于判断是否数据倾斜。
    • spark.sql.adaptive.advisoryPartitionSizeInBytes:此参数用来告诉spark进行拆分后推荐分区大小是多少。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    object AqeOptimizingSkewJoin {
    def main( args: Array[String] ): Unit = {
    val sparkConf = new SparkConf().setAppName("AqeOptimizingSkewJoin")
    .set("spark.sql.autoBroadcastJoinThreshold", "-1") //为了演示效果,禁用广播join
    .set("spark.sql.adaptive.coalescePartitions.enabled", "true") // 为了演示效果,关闭自动缩小分区
    .set("spark.sql.adaptive.enabled", "true")
    .set("spark.sql.adaptive.skewJoin.enable","true")
    .set("spark.sql.adaptive.skewJoin.skewedPartitionFactor","2")
    .set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes","20mb")
    .set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "8mb")
    val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)
    useJoin(sparkSession)
    }

    def useJoin( sparkSession: SparkSession ) = {
    val saleCourse = sparkSession.sql("select *from sparktuning.sale_course")
    val coursePay = sparkSession.sql("select * from sparktuning.course_pay")
    .withColumnRenamed("discount", "pay_discount")
    .withColumnRenamed("createtime", "pay_createtime")
    val courseShoppingCart = sparkSession.sql("select *from sparktuning.course_shopping_cart")
    .drop("coursename")
    .withColumnRenamed("discount", "cart_discount")
    .withColumnRenamed("createtime", "cart_createtime")
    saleCourse.join(courseShoppingCart, Seq("courseid", "dt", "dn"), "right")
    .join(coursePay, Seq("orderid", "dt", "dn"), "left")
    .select("courseid", "coursename", "status", "pointlistid", "majorid", "chapterid", "chaptername", "edusubjectid"
    , "edusubjectname", "teacherid", "teachername", "coursemanager", "money", "orderid", "cart_discount", "sellmoney",
    "cart_createtime", "pay_discount", "paymoney", "pay_createtime", "dt", "dn")
    .write.mode(SaveMode.Overwrite).insertInto("sparktuning.salecourse_detail_1")
    }
    }
    1
    2
    3
    4
    spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-
    executors 3 --executor-cores 4 --executor-memory 2g --class
    com.atguigu.sparktuning.aqe.AqeOptimizingSkewJoin spark-tuning-1.0-
    SNAPSHOT-jar-with-dependencies.jar

    如果同时开启了spark.sql.adaptive.coalescePartitions.enabled动态合并分区功能,那么会先合并分区,再去判断倾斜,将动态合并分区打开后,重新执行:

    1
    2
    3
    4
    spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-
    executors 3 --executor-cores 4 --executor-memory 2g --class
    com.atguigu.sparktuning.aqe.AqeOptimizingSkewJoin spark-tuning-1.0-
    SNAPSHOT-jar-with-dependencies.jar

    修改中位数的倍数为2,重新执行:

    1
    2
    3
    4
    spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-
    executors 3 --executor-cores 4 --executor-memory 2g --class
    com.atguigu.sparktuning.aqe.AqeOptimizingSkewJoin spark-tuning-1.0-
    SNAPSHOT-jar-with-dependencies.jar

7、Spark3.0 DPP

  • Spark3.0支持动态分区裁剪Dynamic Partition Pruning,简称DPP,核心思路就是先将join一侧作为子查询计算出来,再将其所有分区用到join另一侧作为表过滤条件,从而实现对分区的动态修剪。如下图所示:

    image-20240706164331070

    • select t1.id,t2.pkey from t1 join t2 on t1.pkey =t2.pkey and t2.id<2优化成了select t1.id,t2.pkey from t1 join t2 on t1.pkey=t2.pkey and t1.pkey in(select t2.pkey from t2 where t2.id<2
  • 触发条件:

    • 待裁剪的表join的时候,join条件里必须有分区字段。
    • 如果是需要修剪左表,那么join必须是inner join ,left semi join或right join,反之亦然。但如果是left out join,无论右边有没有这个分区,左边的值都存在,就不需要被裁剪。
    • 另一张表需要存在至少一个过滤条件,比如a join b on a.key=b.key and a.id<2
  • 参数spark.sql.optimizer.dynamicPartitionPruning.enabled默认开启。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    object DPPTest {


    def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("DPPTest")
    .set("spark.sql.optimizer.dynamicPartitionPruning.enabled", "true")
    // .setMaster("local[*]")
    val sparkSession: SparkSession = InitUtil.initSparkSession(sparkConf)

    val result=sparkSession.sql(
    """
    |select a.id,a.name,a.age,b.name
    |from sparktuning.test_student a
    |inner join sparktuning.test_school b
    |on a.partition=b.partition and b.id<1000
    """.stripMargin)
    // .explain(mode="extended")

    result.foreach(item=>println(item.get(1)))
    }
    }
    1
    2
    3
    4
    spark-submit --master yarn --deploy-mode client --driver-memory 1g --num-
    executors 3 --executor-cores 4 --executor-memory 2g --class
    com.atguigu.sparktuning.dpp.DPPTest spark-tuning-1.0-SNAPSHOT-jar-with-
    dependencies.jar

8、Spark3.0 Hint增强

  • 在spark2.4的时候就有了hint功能,不过只有broadcasthash join的hint,这次3.0又增加了sort merge join、shuffle_hash join、shuffle_replicate nested loop join。
  • Spark的5种Join策略:https://www.cnblogs.com/jmx-bigdata/p/14021183.html

8.1 broadcasthast join

1
2
3
4
5
6
7
sparkSession.sql("select /*+ BROADCAST(school) */ *  from test_student
student left join test_school school on student.id=school.id").show()
sparkSession.sql("select /*+ BROADCASTJOIN(school) */ * from
test_student student left join test_school school on
student.id=school.id").show()
sparkSession.sql("select /*+ MAPJOIN(school) */ * from test_student
student left join test_school school on student.id=school.id").show()

8.2 sort merge join

1
2
3
4
5
6
7
sparkSession.sql("select /*+ SHUFFLE_MERGE(school) */ *  from
test_student student left join test_school school on
student.id=school.id").show()
sparkSession.sql("select /*+ MERGEJOIN(school) */ * from test_student
student left join test_school school on student.id=school.id").show()
sparkSession.sql("select /*+ MERGE(school) */ * from test_student
student left join test_school school on student.id=school.id").show()

8.3 shuffle_hash join

1
2
sparkSession.sql("select /*+ SHUFFLE_HASH(school) */ *  from test_student
student left join test_school school on student.id=school.id").show()

8.4 shuffle_replicate_nl join

  • 使用条件非常苛刻,驱动表(school表)必须小,且很容易被spark执行成sort merge join。

    1
    2
    3
    sparkSession.sql("select /*+ SHUFFLE_REPLICATE_NL(school) */ *  from
    test_student student inner join test_school school on
    student.id=school.id").show()

9、故障排除

9.1 故障排除一:控制reduce端缓冲大小以避免OOM

  • 在Shuffle过程,reduce端task并不是等到map端task将其数据全部写入磁盘后再去拉取,而是map端写一点数据,reduce端task就会拉取一小部分数据,然后立即进行后面的聚合、算子函数的使用等操作。
  • reduce端task能够拉取多少数据,由reduce拉取数据的缓冲区buffer来决定,因为拉取过来的数据都是先放在buffer中,然后再进行后续的处理,buffer的默认大小为48MB。
  • reduce端task会一边拉取一边计算,不一定每次都会拉满48MB的数据,可能大多数时候拉取一部分数据就处理掉了。
  • 虽然说增大reduce端缓冲区大小可以减少拉取次数,提升Shuffle性能,但是有时map端的数据量非常大,写出的速度非常快,此时reduce端的所有task在拉取的时候,有可能全部达到自己缓冲的最大极限值,即48MB,此时,再加上reduce端执行的聚合函数的代码,可能会创建大量的对象,这可难会导致内存溢出,即OOM。
  • 如果一旦出现reduce端内存溢出的问题,我们可以考虑减小reduce端拉取数据缓冲区的大小,例如减少为12MB。
  • 在实际生产环境中是出现过这种问题的,这是典型的以性能换执行的原理。reduce端拉取数据的缓冲区减小,不容易导致OOM,但是相应的,reudce端的拉取次数增加,造成更多的网络传输开销,造成性能的下降。
  • 注意,要保证任务能够运行,再考虑性能的优化。

9.2 故障排除二:JVM GC导致的shuffle文件拉取失败

  • 在Spark作业中,有时会出现shuffle file not found的错误,这是非常常见的一个报错, 有时出现这种错误以后,选择重新执行一遍,就不再报出这种错误。

  • 出现上述问题可能的原因是Shuffle操作中,后面stage的task想要去上一个stage的task所在的Executor拉取数据,结果对方正在执行GC,执行GC会导致Executor内所有的工作现场全部停止,比如BlockManager、基于netty的网络通信等,这就会导致后面的task拉取数据拉取了半天都没有拉取到,就会报出shuffle file not found的错误,而第二次再次执行就不会再出现这种错误。

  • 可以通过调整reduce端拉取数据重试次数和reduce端拉取数据时间间隔这两个参数来对Shuffle性能进行调整,增大参数值,使得reduce端拉取数据的重试次数增加,并且每次失败后等待的时间间隔加长。

    1
    2
    3
    val conf = new SparkConf()
    .set("spark.shuffle.io.maxRetries", "60")
    .set("spark.shuffle.io.retryWait", "60s")

9.3 故障排除三:解决各种序列化导致的报错

  • 当Spark作业在运行过程中报错,而且报错信息中含有Serializable等类似词汇,那么可能是序列化问题导致的报错。序列化问题要注意以下三点:
    • 作为RDD的元素类型的自定义类,必须是可以序列化的;
    • 算子函数里可以使用的外部的自定义变量,必须是可以序列化的;
    • 不可以在RDD的元素类型、算子函数里使用第三方的不支持序列化的类型,例如Connection。

9.4 故障排除四:解决算子函数返回NULL导致的问题

  • 在一些算子函数里,需要我们有一个返回值,但是在一些情况下我们不希望有返回值,此时我们如果直接返回NULL,会报错,例如Scala.Math(NULL)异常。如果你遇到某些情况,不希望有返回值,那么可以通过下述方式解决:
    • 返回特殊值,不返回NULL,例如“-1”;
    • 在通过算子获取到了一个RDD之后,可以对这个RDD执行filter操作,进行数据过滤,将数值为-1的数据给过滤掉;
    • 在使用完filter算子后,继续调用coalesce算子进行优化。

9.5 故障排除五:解决YARN-CLIENT模式导致的网卡流量激增问题

  • YARN-client模式的运行原理如下图所示:

    image-20240707140933563

  • 在YARN-client模式下,Driver启动在本地机器上,而Driver负责所有的任务调度,需要与YARN集群上的多个Executor进行频繁的通信。

  • 假设有100个Executor,1000个task,那么每个Executor分配到10个task,之后,Driver要频繁地跟Executor上运行的1000个task进行通信,通信数据非常多,并且通信品类特别高。这就导致有可能在Spark任务运行过程中,由于频繁大量的网络通讯,本地机器的网卡流量会激增。

  • 注意,YARN-client模式只会在测试环境中使用,而之所以使用YARN-client模式,是由于可以看到详细全面的log信息,通过查看log,可以锁定程序中存在的问题,避免在生产环境下发生故障。

  • 在生产环境下,使用的一定是YARN-cluster模式。在YARN-cluster模式下,就不会造成本地机器网卡流量激增问题,如果YARN-cluster模式下存在网络通信的问题,需要运维团队进行解决。

9.6 故障排除六:解决YARN-CLUSTER模式的JVM栈内存溢出无法执行问题

  • YARN-cluster 模式的运行原理如下图所示:

    image-20240707141110429

  • 当Spark作业中包含SparkSQL的内容时,可能会碰到YARN-client模式下可以运行,但是YARN-cluster模式下无法提交运行(报出OOM错误)的情况。

  • YARN-client模式下,Driver是运行在本地机器上的,Spark使用的JVM的PermGen的配置,是本地机器上的spark-class文件,JVM永久代的大小是128MB,这个是没有问题的,但是在YARN-cluster模式下,Driver运行在YARN集群的某个节点上,使用的是没有经过配置的默认设置,PermGen永久代大小为82MB。

  • SparkSQL的内部要进行很复杂的SQL的语义解析、语法树转换等等,非常复杂,如果sql语句本身就非常复杂,那么很有可能会导致性能的损耗和内存的占用,特别是对PermGen的占用会比较大。

  • 所以,此时如果PermGen的占用好过了82MB,但是又小于128MB,就会出现YARN-client模式下可以运行,YARN-cluster模式下无法运行的情况。

  • 解决上述问题的方法时增加PermGen的容量,需要在spark-submit脚本中对相关参数进行设置,设置方法如代码清单所示。

    1
    --conf spark.driver.extraJavaOptions="-XX:PermSize=128M -XX:MaxPermSize=256M"
    • 通过上述方法就设置了Driver永久代的大小,默认为128MB,最大256MB,这样就可以避免上面所说的问题。

9.7 故障排除七:解决SparkSQL导致的JVM栈内存溢出

  • 当SparkSQL的sql语句有成百上千的or关键字时,就可能会出现Driver端的JVM栈内存溢出。
  • JVM栈内存溢出基本上就是由于调用的方法层级过多,产生了大量的,非常深的,超出了JVM栈深度限制的递归。(我们猜测SparkSQL有大量or语句的时候,在解析SQL时,例如转换为语法树或者进行执行计划的生成的时候,对于or的处理是递归,or非常多时,会发生大量的递归)。
  • 此时,建议将一条sql语句拆分为多条sql语句来执行,每条sql语句尽量保证100个以内的子句。根据实际的生产环境试验,一条sql语句的or关键字控制在100个以内,通常不会导致JVM栈内存溢出。

9.8 故障排除八:持久化与checkpoint的使用

  • Spark持久化在大部分情况下是没有问题的,但是有时数据可能会丢失,如果数据一旦丢失,就需要对丢失的数据重新进行计算,计算完后再缓存和使用,为了避免数据的丢失,可以选择对这个RDD进行checkpoint,也就是将数据持久化一份到容错的文件系统上(比如HDFS)。
  • 一个RDD缓存并checkpoint后,如果一旦发现缓存丢失,就会优先查看checkpoint数据存不存在,如果有,就会使用checkpoint数据,而不用重新计算。也即是说,checkpoint可以视为cache的保障机制,如果cache失败,就使用checkpoint的数据。
  • 使用checkpoint的优点在于提高了Spark作业的可靠性,一旦缓存出现问题,不需要重新计算数据,缺点在于,checkpoint时需要将数据写入HDFS等文件系统,对性能的消耗较大。

9.9 故障排除九:内存泄漏排查

  • 内存泄露是指程序中已动态分配的堆内存由于某种原因程序未释放或无法释放,造成系统内存的浪费,导致程序运行速度减慢,甚至系统崩溃等严重后果。
  • 在Spark Streaming中往往会因为开发者代码未正确编写导致无法回收或释放对象,造成Spark Streaming内存泄露越跑越慢甚至崩溃的结果。那么排查内存泄露需要一些第三方的工具。