SparkSQL

1、Spark SQL概述

1.1 什么是Spark SQL

  • Spark SQL是用于结构化数据处理的Spark模块。与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。在内部,Spark SQL使用这些额外的信息来执行额外的优化。与Spark SQL交互的方式有多种,包括SQL和Dataset API。计算结果时,使用相同的执行引擎,与您用于表达计算的API/语言无关。

1.2 为什么要有Spark SQL

1.3 SparkSQL的发展

  • 发展历史
    • RDD(Spark1.0)=》Dataframe(Spark1.3)=》Dataset(Spark1.6)
    • 如果同样的数据都给到这三个数据结构,他们分别计算之后,都会给出相同的结果。不同的是他们的执行效率和执行方式。在现在的版本中,dataSet性能最好,已经成为了唯一使用的接口。其中Dataframe已经在底层被看做是特殊泛型的DataSet<Row>。
  • 三者的共性
    • RDD、DataFrame、DataSet全都是Spark平台下的分布式弹性数据集,为处理超大型数据提供便利。
    • 三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action行动算子如foreach时,三者才会开始遍历运算
    • 三者有许多共同的函数,如filter,排序等。
    • 三者都会根据Spark的内存情况自动缓存运算
    • 三者都有分区的概念

1.4 Spark SQL的特点

  • 易整合。无缝的整合了SQL查询和Spark编程。

  • 统一的数据访问方式。使用相同的方式连接不同的数据源。

  • 兼容Hive。在已有的仓库上直接运行SQL或者HQL。

  • 标准的数据连接。通过JDBC或者ODBC来连接。

2、Spark SQL编程

2.1 SparkSession新的起始点

  • 在老的版本中,SparkSQL提供两种SQL查询起始点:

    • 一个叫SQLContext,用于Spark自己提供的SQL查询;
    • 一个叫HiveContext,用于连接Hive的查询。
  • SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。

  • SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的。当我们使用spark-shell的时候,Spark框架会自动的创建一个名称叫做Spark的SparkSession,就像我们以前可以自动获取到一个sc来表示SparkContext。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    [spark-local]$ bin/spark-shell

    20/09/12 11:16:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    Spark context Web UI available at http://hadoop102:4040
    Spark context available as 'sc' (master = local[*], app id = local-1599880621394).
    Spark session available as 'spark'.
    Welcome to
    ____ __
    / __/__ ___ _____/ /__
    _\ \/ _ \/ _ `/ __/ '_/
    /___/ .__/\_,_/_/ /_/\_\ version 3.3.1
    /_/

    Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_212)
    Type in expressions to have them evaluated.
    Type :help for more information.

2.2 常用方式

2.2.1 方法调用

  • 引入pom依赖:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    <dependencies>
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>3.3.1</version>
    </dependency>

    <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.22</version>
    </dependency>
    </dependencies>
  • 在项目目录下新建input/user.json:

    1
    2
    3
    4
    5
    6
    {"age":20,"name":"qiaofeng"}
    {"age":19,"name":"xuzhu"}
    {"age":18,"name":"duanyu"}
    {"age":22,"name":"qiaofeng"}
    {"age":11,"name":"xuzhu"}
    {"age":12,"name":"duanyu"}
  • 代码实现:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    public class Test01_Method {
    public static void main(String[] args) {
    //1. 创建配置对象
    SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");

    //2. 获取sparkSession
    SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

    //3. 编写代码
    // 按照行读取
    Dataset<Row> lineDS = spark.read().json("input/user.json");

    // 转换为类和对象
    Dataset<User> userDS = lineDS.as(Encoders.bean(User.class));

    // userDS.show();

    // 使用方法操作
    // 函数式的方法
    Dataset<User> userDataset = lineDS.map(new MapFunction<Row, User>() {
    @Override
    public User call(Row value) throws Exception {
    return new User(value.getLong(0), value.getString(1));
    }
    },
    // 使用kryo在底层会有部分算子无法使用
    Encoders.bean(User.class));

    // 常规方法
    Dataset<User> sortDS = userDataset.sort(new Column("age"));
    sortDS.show();

    // 区分
    RelationalGroupedDataset groupByDS = userDataset.groupBy("name");

    // 后续方法不同
    Dataset<Row> count = groupByDS.count();

    // 推荐使用函数式的方法 使用更灵活
    KeyValueGroupedDataset<String, User> groupedDataset = userDataset.groupByKey(new MapFunction<User, String>() {
    @Override
    public String call(User value) throws Exception {
    return value.name;
    }
    }, Encoders.STRING());

    // 聚合算子都是从groupByKey开始
    // 推荐使用reduceGroup
    Dataset<Tuple2<String, User>> result = groupedDataset.reduceGroups(new ReduceFunction<User>() {
    @Override
    public User call(User v1, User v2) throws Exception {
    // 取用户的大年龄
    return new User(Math.max(v1.age, v2.age), v1.name);
    }
    });

    result.show();

    //4. 关闭sparkSession
    spark.close();
    }
    }
    • 在sparkSql中DS直接支持的转换算子有:map(底层已经优化为mapPartition)、mapPartition、flatMap、groupByKey(聚合算子全部由groupByKey开始)、filter、distinct、coalesce、repartition、sort和orderBy(不是函数式的算子,不过不影响使用)。

2.2.2 SQL使用方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class Test02_SQL {

public static void main(String[] args) {

//1. 创建配置对象
SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");

//2. 获取sparkSession
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

//3. 编写代码
Dataset<Row> lineDS = spark.read().json("input/user.json");

// 创建视图 => 转换为表格 填写表名
// 临时视图的生命周期和当前的sparkSession绑定
// orReplace表示覆盖之前相同名称的视图
lineDS.createOrReplaceTempView("t1");

// 支持所有的hive sql语法,并且会使用spark的又花钱
Dataset<Row> result = spark.sql("select * from t1 where age > 18");

result.show();

//4. 关闭sparkSession
spark.close();
}

}

2.2.3 DSL特殊语法(扩展)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Test03_DSL {
public static void main(String[] args) {
//1. 创建配置对象
SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");

//2. 获取sparkSession
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

//3. 编写代码
// 导入特殊的依赖 import static org.apache.spark.sql.functions.col;
Dataset<Row> lineRDD = spark.read().json("input/user.json");

Dataset<Row> result = lineRDD.select(col("name").as("newName"),col("age").plus(1).as("newAge"))
.filter(col("age").gt(18));

result.show();

//4. 关闭sparkSession
spark.close();
}
}

2.3 SQL语法的用户自定义函数

2.3.1 UDF

  • UDF:一行进入,一行出。

  • 代码实现:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    public class Test04_UDF {

    public static void main(String[] args) {

    //1. 创建配置对象
    SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");

    //2. 获取sparkSession
    SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

    //3. 编写代码
    Dataset<Row> lineRDD = spark.read().json("input/user.json");

    lineRDD.createOrReplaceTempView("user");

    // 定义一个函数
    // 需要首先导入依赖import static org.apache.spark.sql.functions.udf;
    UserDefinedFunction addName = udf(new UDF1<String, String>() {
    @Override
    public String call(String s) throws Exception {
    return s + " 大侠";
    }
    }, DataTypes.StringType);

    spark.udf().register("addName", addName);

    spark.sql("select addName(name) newName from user")
    .show();

    // lambda表达式写法
    spark.udf().register("addName1", (UDF1<String, String>) name -> name + " 大侠", DataTypes.StringType);

    //4. 关闭sparkSession
    spark.close();
    }

    }

2.3.2 UDAF

  • UDAF:输入多行,返回一行。通常和groupBy一起使用,如果直接使用UDAF函数,默认将所有的数据合并在一起。

  • Spark3.x推荐使用extends Aggregator自定义UDAF,属于强类型的Dataset方式。

  • Spark2.x使用extends UserDefinedAggregateFunction,属于弱类型的DataFrame。

  • 案例实操。需求:实现求平均年龄,自定义UDAF,MyAvg(age)。

    • 自定义聚合函数实现-强类型。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      public class Test05_UDAF {

      public static void main(String[] args) {

      //1. 创建配置对象
      SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");

      //2. 获取sparkSession
      SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

      //3. 编写代码
      spark.read().json("input/user.json").createOrReplaceTempView("user");

      // 注册需要导入依赖 import static org.apache.spark.sql.functions.udaf;
      spark.udf().register("avgAge", udaf(new MyAvg(), Encoders.LONG()));

      spark.sql("select avgAge(age) newAge from user").show();

      //4. 关闭sparkSession
      spark.close();
      }

      public static class Buffer implements Serializable {

      private Long sum;
      private Long count;

      public Buffer() {
      }

      public Buffer(Long sum, Long count) {
      this.sum = sum;
      this.count = count;
      }

      public Long getSum() {
      return sum;
      }

      public void setSum(Long sum) {
      this.sum = sum;
      }

      public Long getCount() {
      return count;
      }

      public void setCount(Long count) {
      this.count = count;
      }
      }

      public static class MyAvg extends Aggregator<Long, Buffer, Double> {

      @Override
      public Buffer zero() {
      return new Buffer(0L, 0L);
      }

      @Override
      public Buffer reduce(Buffer b, Long a) {
      b.setSum(b.getSum() + a);
      b.setCount(b.getCount() + 1);
      return b;
      }

      @Override
      public Buffer merge(Buffer b1, Buffer b2) {
      b1.setSum(b1.getSum() + b2.getSum());
      b1.setCount(b1.getCount() + b2.getCount());
      return b1;
      }

      @Override
      public Double finish(Buffer reduction) {
      return reduction.getSum().doubleValue() / reduction.getCount();
      }

      @Override
      public Encoder<Buffer> bufferEncoder() {
      // 可以用kryo进行优化
      return Encoders.kryo(Buffer.class);
      }

      @Override
      public Encoder<Double> outputEncoder() {
      return Encoders.DOUBLE();
      }
      }

      }

2.3.3 UDTF(没有)

  • 输入一行,返回多行(Hive)。
  • SparkSQL中没有UDTF,需要使用算子类型的flatMap先完成拆分。

3、SparkSQL数据的加载与保存

3.1 读取和保存文件

  • SparkSQL读取和保存的文件一般为三种,JSON文件、CSV文件和列式存储的文件,同时可以通过添加参数,来识别不同的存储和压缩格式。

3.1.1 CSV文件

  • 代码实现:

    image-20240601173342221
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    public class Test06_CSV {
    public static void main(String[] args) throws ClassNotFoundException {

    //1. 创建配置对象
    SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");

    //2. 获取sparkSession
    SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

    //3. 编写代码
    DataFrameReader reader = spark.read();

    // 添加参数 读取csv
    Dataset<Row> userDS = reader
    .option("header", "true")//默认为false 不读取列名
    .option("sep",",") // 默认为, 列的分割
    // 不需要写压缩格式 自适应
    .csv("input/user.csv");

    userDS.show();

    // 转换为user的ds
    // 直接转换类型会报错 csv读取的数据都是string
    // Dataset<User> userDS1 = userDS.as(Encoders.bean(User.class));
    userDS.printSchema();

    Dataset<User> userDS1 = userDS.map(new MapFunction<Row, User>() {
    @Override
    public User call(Row value) throws Exception {
    return new User(Long.valueOf(value.getString(0)), value.getString(1));
    }
    }, Encoders.bean(User.class));
    userDS1.show();

    // 写出为csv文件
    DataFrameWriter<User> writer = userDS1.write();

    writer.option("seq",";")
    .option("header","true")
    // .option("compression","gzip")// 压缩格式
    // 写出模式
    // append 追加
    // Ignore 忽略本次写出
    // Overwrite 覆盖写
    // ErrorIfExists 如果存在报错
    .mode(SaveMode.Append)
    .csv("output");

    //4. 关闭sparkSession
    spark.close();
    }

    }

3.1.2 JSON文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Test07_JSON {
public static void main(String[] args) {
//1. 创建配置对象
SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");

//2. 获取sparkSession
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

//3. 编写代码
Dataset<Row> json = spark.read().json("input/user.json");

// json数据可以读取数据的数据类型
Dataset<User> userDS = json.as(Encoders.bean(User.class));

userDS.show();

// 读取别的类型的数据也能写出为json
DataFrameWriter<User> writer = userDS.write();

writer.json("output1");

//4. 关闭sparkSession
spark.close();
}

}

3.1.3 Parquet文件

  • 列式存储的数据自带列分割。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public class Test08_Parquet {
    public static void main(String[] args) {

    //1. 创建配置对象
    SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");

    //2. 获取sparkSession
    SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

    //3. 编写代码
    Dataset<Row> json = spark.read().json("input/user.json");

    // 写出默认使用snappy压缩
    // json.write().parquet("output");

    // 读取parquet 自带解析 能够识别列名
    Dataset<Row> parquet = spark.read().parquet("output");

    parquet.printSchema();

    //4. 关闭sparkSession
    spark.close();
    }

    }

3.2 与MySQL交互

  • 导入依赖:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    <!-- 早期MySQL版本 -->
    <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.27</version>
    </dependency>
    <!-- 5.8(8.0)MySQL版本 -->
    <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.18</version>
    </dependency>
  • 从MySQL读数据:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    public class Test09_Table {
    public static void main(String[] args) {

    //1. 创建配置对象
    SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");

    //2. 获取sparkSession
    SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

    //3. 编写代码
    Dataset<Row> json = spark.read().json("input/user.json");

    // 添加参数
    Properties properties = new Properties();
    properties.setProperty("user","root");
    properties.setProperty("password","000000");

    // json.write()
    // // 写出模式针对于表格追加覆盖
    // .mode(SaveMode.Append)
    // .jdbc("jdbc:mysql://hadoop102:3306","gmall.testInfo",properties);

    Dataset<Row> jdbc = spark.read().jdbc("jdbc:mysql://hadoop102:3306/gmall?useSSL=false&useUnicode=true&characterEncoding=UTF-8&allowPublicKeyRetrieval=true", "test_info", properties);

    jdbc.show();

    //4. 关闭sparkSession
    spark.close();
    }
    }

3.3 与Hive交互

  • SparkSQL可以采用内嵌Hive(spark开箱即用的hive),也可以采用外部Hive。企业开发中,通常采用外部Hive。

3.3.1 Linux中的交互

  • 添加MySQL连接驱动到spark-yarn的jars目录。

    1
    [spark-yarn]$ cp /opt/software/mysql-connector-java-5.1.27-bin.jar /opt/module/spark-yarn/jars
  • 添加hive-site.xml文件到spark-yarn的conf目录。

    1
    [spark-yarn]$ cp /opt/module/hive/conf/hive-site.xml /opt/module/spark-yarn/conf
  • 启动spark-sql的客户端即可。

    1
    2
    [spark-yarn]$  bin/spark-sql --master yarn
    spark-sql (default)> show tables;

3.3.2 IDEA中的交互

  • 添加依赖。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    <dependencies>
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>3.3.1</version>
    </dependency>

    <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.27</version>
    </dependency>

    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.12</artifactId>
    <version>3.3.1</version>
    </dependency>

    <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.22</version>
    </dependency>
    </dependencies>
  • 拷贝hive-site.xml到resources目录(如果需要操作Hadoop,需要拷贝hdfs-site.xml、core-site.xml、yarn-site.xml)。

  • 代码实现:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public class Test10_Hive {
    public static void main(String[] args) {

    System.setProperty("HADOOP_USER_NAME","atguigu");

    //1. 创建配置对象
    SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");

    //2. 获取sparkSession
    SparkSession spark = SparkSession.builder()
    .enableHiveSupport()// 添加hive支持
    .config(conf).getOrCreate();

    //3. 编写代码
    spark.sql("show tables").show();

    spark.sql("create table user_info(name String,age bigint)");
    spark.sql("insert into table user_info values('zhangsan',10)");
    spark.sql("select * from user_info").show();

    //4. 关闭sparkSession
    spark.close();
    }

    }