SparkSQL
1、Spark SQL概述
1.1 什么是Spark SQL
- Spark SQL是用于结构化数据处理的Spark模块。与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了有关数据结构和正在执行的计算的更多信息。在内部,Spark SQL使用这些额外的信息来执行额外的优化。与Spark SQL交互的方式有多种,包括SQL和Dataset API。计算结果时,使用相同的执行引擎,与您用于表达计算的API/语言无关。
1.2 为什么要有Spark SQL
1.3 SparkSQL的发展
- 发展历史
- RDD(Spark1.0)=》Dataframe(Spark1.3)=》Dataset(Spark1.6)
- 如果同样的数据都给到这三个数据结构,他们分别计算之后,都会给出相同的结果。不同的是他们的执行效率和执行方式。在现在的版本中,dataSet性能最好,已经成为了唯一使用的接口。其中Dataframe已经在底层被看做是特殊泛型的DataSet<Row>。
- 三者的共性
- RDD、DataFrame、DataSet全都是Spark平台下的分布式弹性数据集,为处理超大型数据提供便利。
- 三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action行动算子如foreach时,三者才会开始遍历运算。
- 三者有许多共同的函数,如filter,排序等。
- 三者都会根据Spark的内存情况自动缓存运算。
- 三者都有分区的概念。
1.4 Spark SQL的特点
易整合。无缝的整合了SQL查询和Spark编程。
统一的数据访问方式。使用相同的方式连接不同的数据源。
兼容Hive。在已有的仓库上直接运行SQL或者HQL。
标准的数据连接。通过JDBC或者ODBC来连接。
2、Spark SQL编程
2.1 SparkSession新的起始点
在老的版本中,SparkSQL提供两种SQL查询起始点:
- 一个叫SQLContext,用于Spark自己提供的SQL查询;
- 一个叫HiveContext,用于连接Hive的查询。
SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。
SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的。当我们使用spark-shell的时候,Spark框架会自动的创建一个名称叫做Spark的SparkSession,就像我们以前可以自动获取到一个sc来表示SparkContext。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19[spark-local]$ bin/spark-shell
20/09/12 11:16:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://hadoop102:4040
Spark context available as 'sc' (master = local[*], app id = local-1599880621394).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.3.1
/_/
Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_212)
Type in expressions to have them evaluated.
Type :help for more information.
2.2 常用方式
2.2.1 方法调用
引入pom依赖:
1
2
3
4
5
6
7
8
9
10
11
12
13<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
</dependencies>在项目目录下新建input/user.json:
1
2
3
4
5
6{"age":20,"name":"qiaofeng"}
{"age":19,"name":"xuzhu"}
{"age":18,"name":"duanyu"}
{"age":22,"name":"qiaofeng"}
{"age":11,"name":"xuzhu"}
{"age":12,"name":"duanyu"}代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62public class Test01_Method {
public static void main(String[] args) {
//1. 创建配置对象
SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");
//2. 获取sparkSession
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
//3. 编写代码
// 按照行读取
Dataset<Row> lineDS = spark.read().json("input/user.json");
// 转换为类和对象
Dataset<User> userDS = lineDS.as(Encoders.bean(User.class));
// userDS.show();
// 使用方法操作
// 函数式的方法
Dataset<User> userDataset = lineDS.map(new MapFunction<Row, User>() {
public User call(Row value) throws Exception {
return new User(value.getLong(0), value.getString(1));
}
},
// 使用kryo在底层会有部分算子无法使用
Encoders.bean(User.class));
// 常规方法
Dataset<User> sortDS = userDataset.sort(new Column("age"));
sortDS.show();
// 区分
RelationalGroupedDataset groupByDS = userDataset.groupBy("name");
// 后续方法不同
Dataset<Row> count = groupByDS.count();
// 推荐使用函数式的方法 使用更灵活
KeyValueGroupedDataset<String, User> groupedDataset = userDataset.groupByKey(new MapFunction<User, String>() {
public String call(User value) throws Exception {
return value.name;
}
}, Encoders.STRING());
// 聚合算子都是从groupByKey开始
// 推荐使用reduceGroup
Dataset<Tuple2<String, User>> result = groupedDataset.reduceGroups(new ReduceFunction<User>() {
public User call(User v1, User v2) throws Exception {
// 取用户的大年龄
return new User(Math.max(v1.age, v2.age), v1.name);
}
});
result.show();
//4. 关闭sparkSession
spark.close();
}
}- 在sparkSql中DS直接支持的转换算子有:map(底层已经优化为mapPartition)、mapPartition、flatMap、groupByKey(聚合算子全部由groupByKey开始)、filter、distinct、coalesce、repartition、sort和orderBy(不是函数式的算子,不过不影响使用)。
2.2.2 SQL使用方式
1 | public class Test02_SQL { |
2.2.3 DSL特殊语法(扩展)
1 | public class Test03_DSL { |
2.3 SQL语法的用户自定义函数
2.3.1 UDF
UDF:一行进入,一行出。
代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37public class Test04_UDF {
public static void main(String[] args) {
//1. 创建配置对象
SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");
//2. 获取sparkSession
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
//3. 编写代码
Dataset<Row> lineRDD = spark.read().json("input/user.json");
lineRDD.createOrReplaceTempView("user");
// 定义一个函数
// 需要首先导入依赖import static org.apache.spark.sql.functions.udf;
UserDefinedFunction addName = udf(new UDF1<String, String>() {
public String call(String s) throws Exception {
return s + " 大侠";
}
}, DataTypes.StringType);
spark.udf().register("addName", addName);
spark.sql("select addName(name) newName from user")
.show();
// lambda表达式写法
spark.udf().register("addName1", (UDF1<String, String>) name -> name + " 大侠", DataTypes.StringType);
//4. 关闭sparkSession
spark.close();
}
}
2.3.2 UDAF
UDAF:输入多行,返回一行。通常和groupBy一起使用,如果直接使用UDAF函数,默认将所有的数据合并在一起。
Spark3.x推荐使用extends Aggregator自定义UDAF,属于强类型的Dataset方式。
Spark2.x使用extends UserDefinedAggregateFunction,属于弱类型的DataFrame。
案例实操。需求:实现求平均年龄,自定义UDAF,MyAvg(age)。
自定义聚合函数实现-强类型。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91public class Test05_UDAF {
public static void main(String[] args) {
//1. 创建配置对象
SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");
//2. 获取sparkSession
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
//3. 编写代码
spark.read().json("input/user.json").createOrReplaceTempView("user");
// 注册需要导入依赖 import static org.apache.spark.sql.functions.udaf;
spark.udf().register("avgAge", udaf(new MyAvg(), Encoders.LONG()));
spark.sql("select avgAge(age) newAge from user").show();
//4. 关闭sparkSession
spark.close();
}
public static class Buffer implements Serializable {
private Long sum;
private Long count;
public Buffer() {
}
public Buffer(Long sum, Long count) {
this.sum = sum;
this.count = count;
}
public Long getSum() {
return sum;
}
public void setSum(Long sum) {
this.sum = sum;
}
public Long getCount() {
return count;
}
public void setCount(Long count) {
this.count = count;
}
}
public static class MyAvg extends Aggregator<Long, Buffer, Double> {
public Buffer zero() {
return new Buffer(0L, 0L);
}
public Buffer reduce(Buffer b, Long a) {
b.setSum(b.getSum() + a);
b.setCount(b.getCount() + 1);
return b;
}
public Buffer merge(Buffer b1, Buffer b2) {
b1.setSum(b1.getSum() + b2.getSum());
b1.setCount(b1.getCount() + b2.getCount());
return b1;
}
public Double finish(Buffer reduction) {
return reduction.getSum().doubleValue() / reduction.getCount();
}
public Encoder<Buffer> bufferEncoder() {
// 可以用kryo进行优化
return Encoders.kryo(Buffer.class);
}
public Encoder<Double> outputEncoder() {
return Encoders.DOUBLE();
}
}
}
2.3.3 UDTF(没有)
- 输入一行,返回多行(Hive)。
- SparkSQL中没有UDTF,需要使用算子类型的flatMap先完成拆分。
3、SparkSQL数据的加载与保存
3.1 读取和保存文件
- SparkSQL读取和保存的文件一般为三种,JSON文件、CSV文件和列式存储的文件,同时可以通过添加参数,来识别不同的存储和压缩格式。
3.1.1 CSV文件
代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53public class Test06_CSV {
public static void main(String[] args) throws ClassNotFoundException {
//1. 创建配置对象
SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");
//2. 获取sparkSession
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
//3. 编写代码
DataFrameReader reader = spark.read();
// 添加参数 读取csv
Dataset<Row> userDS = reader
.option("header", "true")//默认为false 不读取列名
.option("sep",",") // 默认为, 列的分割
// 不需要写压缩格式 自适应
.csv("input/user.csv");
userDS.show();
// 转换为user的ds
// 直接转换类型会报错 csv读取的数据都是string
// Dataset<User> userDS1 = userDS.as(Encoders.bean(User.class));
userDS.printSchema();
Dataset<User> userDS1 = userDS.map(new MapFunction<Row, User>() {
public User call(Row value) throws Exception {
return new User(Long.valueOf(value.getString(0)), value.getString(1));
}
}, Encoders.bean(User.class));
userDS1.show();
// 写出为csv文件
DataFrameWriter<User> writer = userDS1.write();
writer.option("seq",";")
.option("header","true")
// .option("compression","gzip")// 压缩格式
// 写出模式
// append 追加
// Ignore 忽略本次写出
// Overwrite 覆盖写
// ErrorIfExists 如果存在报错
.mode(SaveMode.Append)
.csv("output");
//4. 关闭sparkSession
spark.close();
}
}
3.1.2 JSON文件
1 | public class Test07_JSON { |
3.1.3 Parquet文件
列式存储的数据自带列分割。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public class Test08_Parquet {
public static void main(String[] args) {
//1. 创建配置对象
SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");
//2. 获取sparkSession
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
//3. 编写代码
Dataset<Row> json = spark.read().json("input/user.json");
// 写出默认使用snappy压缩
// json.write().parquet("output");
// 读取parquet 自带解析 能够识别列名
Dataset<Row> parquet = spark.read().parquet("output");
parquet.printSchema();
//4. 关闭sparkSession
spark.close();
}
}
3.2 与MySQL交互
导入依赖:
1
2
3
4
5
6
7
8
9
10
11
12<!-- 早期MySQL版本 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
<!-- 5.8(8.0)MySQL版本 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.18</version>
</dependency>从MySQL读数据:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30public class Test09_Table {
public static void main(String[] args) {
//1. 创建配置对象
SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");
//2. 获取sparkSession
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
//3. 编写代码
Dataset<Row> json = spark.read().json("input/user.json");
// 添加参数
Properties properties = new Properties();
properties.setProperty("user","root");
properties.setProperty("password","000000");
// json.write()
// // 写出模式针对于表格追加覆盖
// .mode(SaveMode.Append)
// .jdbc("jdbc:mysql://hadoop102:3306","gmall.testInfo",properties);
Dataset<Row> jdbc = spark.read().jdbc("jdbc:mysql://hadoop102:3306/gmall?useSSL=false&useUnicode=true&characterEncoding=UTF-8&allowPublicKeyRetrieval=true", "test_info", properties);
jdbc.show();
//4. 关闭sparkSession
spark.close();
}
}
3.3 与Hive交互
- SparkSQL可以采用内嵌Hive(spark开箱即用的hive),也可以采用外部Hive。企业开发中,通常采用外部Hive。
3.3.1 Linux中的交互
添加MySQL连接驱动到spark-yarn的jars目录。
1
[spark-yarn]$ cp /opt/software/mysql-connector-java-5.1.27-bin.jar /opt/module/spark-yarn/jars
添加hive-site.xml文件到spark-yarn的conf目录。
1
[spark-yarn]$ cp /opt/module/hive/conf/hive-site.xml /opt/module/spark-yarn/conf
启动spark-sql的客户端即可。
1
2[spark-yarn]$ bin/spark-sql --master yarn
spark-sql (default)> show tables;
3.3.2 IDEA中的交互
添加依赖。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
</dependencies>拷贝hive-site.xml到resources目录(如果需要操作Hadoop,需要拷贝hdfs-site.xml、core-site.xml、yarn-site.xml)。
代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public class Test10_Hive {
public static void main(String[] args) {
System.setProperty("HADOOP_USER_NAME","atguigu");
//1. 创建配置对象
SparkConf conf = new SparkConf().setAppName("sparksql").setMaster("local[*]");
//2. 获取sparkSession
SparkSession spark = SparkSession.builder()
.enableHiveSupport()// 添加hive支持
.config(conf).getOrCreate();
//3. 编写代码
spark.sql("show tables").show();
spark.sql("create table user_info(name String,age bigint)");
spark.sql("insert into table user_info values('zhangsan',10)");
spark.sql("select * from user_info").show();
//4. 关闭sparkSession
spark.close();
}
}