SparkCore
1、RDD概述
1.1 什么是RDD
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象。
代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
RDD类比工厂生产。
1.2 RDD五大特性
2、RDD编程
2.1 RDD的创建
- 在Spark中创建RDD的创建方式可以分为三种:从集合中创建RDD、从外部存储创建RDD、从其他RDD创建。
2.1.1 IDEA环境准备
在pom文件中添加spark-core的依赖。
1
2
3
4
5<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.3.1</version>
</dependency>如果不希望运行时打印大量日志,可以在resources文件夹中添加log4j2.properties文件,并添加日志配置信息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50# Set everything to be logged to the console
rootLogger.level = ERROR
rootLogger.appenderRef.stdout.ref = console
# In the pattern layout configuration below, we specify an explicit `%ex` conversion
# pattern for logging Throwables. If this was omitted, then (by default) Log4J would
# implicitly add an `%xEx` conversion pattern which logs stacktraces with additional
# class packaging information. That extra information can sometimes add a substantial
# performance overhead, so we disable it in our default logging config.
# For more information, see SPARK-39361.
appender.console.type = Console
appender.console.name = console
appender.console.target = SYSTEM_ERR
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex
# Set the default spark-shell/spark-sql log level to WARN. When running the
# spark-shell/spark-sql, the log level for these classes is used to overwrite
# the root logger's log level, so that the user can have different defaults
# for the shell and regular Spark apps.
logger.repl.name = org.apache.spark.repl.Main
logger.repl.level = warn
logger.thriftserver.name = org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver
logger.thriftserver.level = warn
# Settings to quiet third party logs that are too verbose
logger.jetty1.name = org.sparkproject.jetty
logger.jetty1.level = warn
logger.jetty2.name = org.sparkproject.jetty.util.component.AbstractLifeCycle
logger.jetty2.level = error
logger.replexprTyper.name = org.apache.spark.repl.SparkIMain$exprTyper
logger.replexprTyper.level = info
logger.replSparkILoopInterpreter.name = org.apache.spark.repl.SparkILoop$SparkILoopInterpreter
logger.replSparkILoopInterpreter.level = info
logger.parquet1.name = org.apache.parquet
logger.parquet1.level = error
logger.parquet2.name = parquet
logger.parquet2.level = error
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
logger.RetryingHMSHandler.name = org.apache.hadoop.hive.metastore.RetryingHMSHandler
logger.RetryingHMSHandler.level = fatal
logger.FunctionRegistry.name = org.apache.hadoop.hive.ql.exec.FunctionRegistry
logger.FunctionRegistry.level = error
# For deploying Spark ThriftServer
# SPARK-34128: Suppress undesirable TTransportException warnings involved in THRIFT-4805
appender.console.filter.1.type = RegexFilter
appender.console.filter.1.regex = .*Thrift error occurred during processing of message.*
appender.console.filter.1.onMatch = deny appender.console.filter.1.onMismatch = neutral
2.1.2 从集合中创建
从集合中创建RDD:parallelize。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22public class Test01_List {
public static void main(String[] args) {
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 3. 编写代码
JavaRDD<String> stringRDD = sc.parallelize(Arrays.asList("hello", "spark"));
List<String> result = stringRDD.collect();
for (String s : result) {
System.out.println(s);
}
// 4. 关闭sc
sc.stop();
}
}
2.1.3 从外部存储系统的数据集创建
由外部存储系统的数据集创建RDD包括:本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、HBase等。
数据准备:在新建的SparkCore项目名称上右键=>新建input文件夹=>在input文件夹上右键=>分别新建1.txt和2.txt。每个文件里面准备一些word单词。
创建RDD:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public class Test02_File {
public static void main(String[] args) {
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 3. 编写代码
JavaRDD<String> lineRDD = sc.textFile("input");
List<String> result = lineRDD.collect();
for (String s : result) {
System.out.println(s);
}
// 4. 关闭sc
sc.stop();
}
}
2.1.4 从其他RDD创建
- 主要是通过一个RDD运算完后,再产生新的RDD。后面会提及。
2.2 分区规则
2.2.1 从集合创建RDD
1 | public class Test01_ListPartition { |
2.2.2 从文件创建RDD
1 | public class Test02_FilePartition { |
- 注意:getSplits文件返回的是切片规划,真正读取是在compute方法中创建LineRecordReader读取的,有两个关键变量: start = split.getStart() end = start + split.getLength。
- 分区数量的计算方式:
- totalSize = 10
- goalSize = 10 / 3 = 3(byte) 表示每个分区存储3字节的数据
- 分区数= totalSize/ goalSize = 10 /3 => 3,3,4
- 4子节大于3子节的1.1倍,符合hadoop切片1.1倍的策略,因此会多创建一个分区,即一共有4个分区3,3,3,1
- Spark读取文件,采用的是hadoop的方式读取,所以一行一行读取,跟字节数没有关系。
- 数据读取位置计算是以偏移量为单位来进行计算的。
- 分区数量的计算方式:
2.3 Transformation转换算子
2.3.1 Value类型
2.3.1.1 map()映射
参数f是一个函数可以写作匿名子类,它可以接收一个参数。当某个RDD执行map方法时,会遍历该RDD中的每一个数据项,并依次应用f函数,从而产生一个新的RDD。即,这个新RDD中的每一个元素都是原来RDD中每一个元素依次应用f函数而得到的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35public class Test01_Map {
public static void main(String[] args) {
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 3. 编写代码
JavaRDD<String> lineRDD = sc.textFile("input/1.txt");
// 需求:每行结尾拼接||
// 两种写法 lambda表达式写法(匿名函数)
JavaRDD<String> mapRDD = lineRDD.map(s -> s + "||");
// 匿名函数写法
JavaRDD<String> mapRDD1 = lineRDD.map(new Function<String, String>() {
public String call(String v1) throws Exception {
return v1 + "||";
}
});
for (String s : mapRDD.collect()) {
System.out.println(s);
}
// 输出数据的函数写法
mapRDD1.collect().forEach(a -> System.out.println(a));
mapRDD1.collect().forEach(System.out::println);
// 4. 关闭sc
sc.stop();
}
}
2.3.1.2 flatMap()扁平化
功能说明
- 与map操作类似,将RDD中的每一个元素通过应用f函数依次转换为新的元素,并封装到RDD中。
- 区别:在flatMap操作中,f函数的返回值是一个集合,并且会将每一个该集合中的元素拆分出来放到新的RDD中。
需求说明:创建一个集合,集合里面存储的还是子集合,把所有子集合中数据取出放入到一个大的集合中。
具体实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45public class Test02_FlatMap {
public static void main(String[] args) {
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 3. 编写代码
ArrayList<List<String>> arrayLists = new ArrayList<>();
arrayLists.add(Arrays.asList("1","2","3"));
arrayLists.add(Arrays.asList("4","5","6"));
JavaRDD<List<String>> listJavaRDD = sc.parallelize(arrayLists,2);
// 对于集合嵌套的RDD 可以将元素打散
// 泛型为打散之后的元素类型
JavaRDD<String> stringJavaRDD = listJavaRDD.flatMap(new FlatMapFunction<List<String>, String>() {
public Iterator<String> call(List<String> strings) throws Exception {
return strings.iterator();
}
});
stringJavaRDD.collect().forEach(System.out::println);
// 通常情况下需要自己将元素转换为集合
JavaRDD<String> lineRDD = sc.textFile("input/2.txt");
JavaRDD<String> stringJavaRDD1 = lineRDD.flatMap(new FlatMapFunction<String, String>() {
public Iterator<String> call(String s) throws Exception {
String[] s1 = s.split(" ");
return Arrays.asList(s1).iterator();
}
});
stringJavaRDD1. collect().forEach(System.out::println);
// 4. 关闭sc
sc.stop();
}
}
2.3.1.3 groupBy()分组
功能说明:分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。
需求说明:创建一个RDD,按照元素模以2的值进行分组。
具体实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39public class Test03_GroupBy {
public static void main(String[] args) throws InterruptedException {
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 3. 编写代码
JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
// 泛型为分组标记的类型
JavaPairRDD<Integer, Iterable<Integer>> groupByRDD = integerJavaRDD.groupBy(new Function<Integer, Integer>() {
public Integer call(Integer v1) throws Exception {
return v1 % 2;
}
});
groupByRDD.collect().forEach(System.out::println);
// 类型可以任意修改
JavaPairRDD<Boolean, Iterable<Integer>> groupByRDD1 = integerJavaRDD.groupBy(new Function<Integer, Boolean>() {
public Boolean call(Integer v1) throws Exception {
return v1 % 2 == 0;
}
});
groupByRDD1.collect().forEach(System.out::println);
Thread.sleep(600000);
// 4. 关闭sc
sc.stop();
}
}- groupBy会存在shuffle过程。
- shuffle:将不同的分区数据进行打乱重组的过程。
- shuffle一定会落盘。可以在local模式下执行程序,通过4040看效果。
2.3.1.4 filter()过滤
功能说明:接收一个返回值为布尔类型的函数作为参数。当某个RDD调用filter方法时,会对该RDD中每一个元素应用f函数,如果返回值类型为true,则该元素会被添加到新的RDD中。
需求说明:创建一个RDD,过滤出对2取余等于0的数据。
代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public class Test04_Filter {
public static void main(String[] args) {
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 3. 编写代码
JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);
JavaRDD<Integer> filterRDD = integerJavaRDD.filter(new Function<Integer, Boolean>() {
public Boolean call(Integer v1) throws Exception {
return v1 % 2 == 0;
}
});
filterRDD. collect().forEach(System.out::println);
// 4. 关闭sc
sc.stop();
}
}
2.3.1.5 distinct()去重
功能说明:对内部的元素去重,并将去重后的元素放到新的RDD中。
代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21public class Test05_Distinct {
public static void main(String[] args) {
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 3. 编写代码
JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);
// 底层使用分布式分组去重 所有速度比较慢,但是不会OOM
JavaRDD<Integer> distinct = integerJavaRDD.distinct();
distinct. collect().forEach(System.out::println);
// 4. 关闭sc
sc.stop();
}
}- 注意:distinct会存在shuffle过程。
2.3.1.6 sortBy()排序
功能说明:该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为正序排列。排序后新产生的RDD的分区数与原RDD的分区数一致。Spark的排序结果是全局有序。
需求说明:创建一个RDD,按照数字大小分别实现正序和倒序排序。
代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24public class Test6_SortBy {
public static void main(String[] args) {
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 3. 编写代码
JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(5, 8, 1, 11, 20), 2);
// (1)泛型为以谁作为标准排序 (2) true为正序 (3) 排序之后的分区个数
JavaRDD<Integer> sortByRDD = integerJavaRDD.sortBy(new Function<Integer, Integer>() {
public Integer call(Integer v1) throws Exception {
return v1;
}
}, true, 2);
sortByRDD.collect().forEach(System.out::println);
// 4. 关闭sc
sc.stop();
}
}
2.3.2 Key-Value类型
要想使用Key-Value类型的算子首先需要使用特定的方法转换为PairRDD。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public class Test01_pairRDD {
public static void main(String[] args) {
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 3. 编写代码
JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
JavaPairRDD<Integer, Integer> pairRDD = integerJavaRDD.mapToPair(new PairFunction<Integer, Integer, Integer>() {
public Tuple2<Integer, Integer> call(Integer integer) throws Exception {
return new Tuple2<>(integer, integer);
}
});
pairRDD.collect().forEach(System.out::println);
// 4. 关闭sc
sc.stop();
}
}
2.3.2.1 mapValues()只对V进行操作
功能说明:针对于(K,V)形式的类型只对V进行操作。
需求说明:创建一个pairRDD,并将value添加字符串”|||”。
代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28public class Test02_MapValues {
public static void main(String[] args) {
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 3. 编写代码
JavaPairRDD<String, String> javaPairRDD = sc.parallelizePairs(
Arrays.asList(new Tuple2<>("k", "v"), new Tuple2<>("k1", "v1"), new Tuple2<>("k2", "v2")));
// 只修改value 不修改key
JavaPairRDD<String, String> mapValuesRDD = javaPairRDD.mapValues(new Function<String, String>() {
public String call(String v1) throws Exception {
return v1 + "|||";
}
});
mapValuesRDD. collect().forEach(System.out::println);
// 4. 关闭sc
sc.stop();
}
}
2.3.2.2 groupByKey()按照K重新分组
功能说明:
- groupByKey对每个key进行操作,但只生成一个seq,并不进行聚合。
- 该操作可以指定分区器或者分区数(默认使用HashPartitioner)。
需求说明:统计单词出现次数。
代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41public class Test03_GroupByKey {
public static void main(String[] args) {
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 3. 编写代码
JavaRDD<String> integerJavaRDD = sc.parallelize(Arrays.asList("hi", "hi", "hello", "spark"), 2);
// 统计单词出现次数
JavaPairRDD<String, Integer> pairRDD = integerJavaRDD.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s, 1);
}
});
// 聚合相同的key
JavaPairRDD<String, Iterable<Integer>> groupByKeyRDD = pairRDD.groupByKey();
// 合并值
JavaPairRDD<String, Integer> result = groupByKeyRDD.mapValues(new Function<Iterable<Integer>, Integer>() {
public Integer call(Iterable<Integer> v1) throws Exception {
Integer sum = 0;
for (Integer integer : v1) {
sum += integer;
}
return sum;
}
});
result.collect().forEach(System.out::println);
// 4. 关闭sc
sc.stop();
}
}
2.3.2.3 reduceByKey()按照K聚合V
功能说明:该操作可以将RDD[K,V]中的元素按照相同的K对V进行聚合。其存在多种重载形式,还可以设置新RDD的分区数。
需求说明:统计单词出现次数。
代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33public class Test04_ReduceByKey {
public static void main(String[] args) {
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 3. 编写代码
JavaRDD<String> integerJavaRDD = sc.parallelize(Arrays.asList("hi","hi","hello","spark" ),2);
// 统计单词出现次数
JavaPairRDD<String, Integer> pairRDD = integerJavaRDD.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s, 1);
}
});
// 聚合相同的key
JavaPairRDD<String, Integer> result = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
result. collect().forEach(System.out::println);
// 4. 关闭sc
sc.stop();
}
}
2.3.2.4 reduceByKey和groupByKey区别
reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[K,V]。
groupByKey:按照key进行分组,直接进行shuffle。
开发指导:在不影响业务逻辑的前提下,优先选用reduceByKey。求和操作不影响业务逻辑,求平均值影响业务逻辑。影响业务逻辑时建议先对数据类型进行转换再合并。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49public class Test06_ReduceByKeyAvg {
public static void main(String[] args) {
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 3. 编写代码
JavaPairRDD<String, Integer> javaPairRDD = sc.parallelizePairs(
Arrays.asList(new Tuple2<>("hi", 96), new Tuple2<>("hi", 97), new Tuple2<>("hello", 95),
new Tuple2<>("hello", 195)));
// ("hi",(96,1))
JavaPairRDD<String, Tuple2<Integer, Integer>> tuple2JavaPairRDD = javaPairRDD.mapValues(
new Function<Integer, Tuple2<Integer, Integer>>() {
public Tuple2<Integer, Integer> call(Integer v1) throws Exception {
return new Tuple2<>(v1, 1);
}
});
// 聚合RDD
JavaPairRDD<String, Tuple2<Integer, Integer>> reduceRDD = tuple2JavaPairRDD.reduceByKey(
new Function2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> v1, Tuple2<Integer, Integer> v2)
throws Exception {
return new Tuple2<>(v1._1 + v2._1, v1._2 + v2._2);
}
});
// 相除
JavaPairRDD<String, Double> result = reduceRDD.mapValues(new Function<Tuple2<Integer, Integer>, Double>() {
public Double call(Tuple2<Integer, Integer> v1) throws Exception {
return (new Double(v1._1) / v1._2);
}
});
result. collect().forEach(System.out::println);
// 4. 关闭sc
sc.stop();
}
}
2.3.2.5 sortByKey()按照K进行排序
功能说明:在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD。
需求说明:创建一个pairRDD,按照key的正序和倒序进行排序。
代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22public class Test05_SortByKey {
public static void main(String[] args) {
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 3. 编写代码
JavaPairRDD<Integer, String> javaPairRDD = sc.parallelizePairs(
Arrays.asList(new Tuple2<>(4, "a"), new Tuple2<>(3, "c"), new Tuple2<>(2, "d")));
// 填写布尔类型选择正序倒序
JavaPairRDD<Integer, String> pairRDD = javaPairRDD.sortByKey(false);
pairRDD. collect().forEach(System.out::println);
// 4. 关闭sc
sc.stop();
}
}
2.4 Action行动算子
- 行动算子是触发了整个作业的执行。因为转换算子都是懒加载,并不会立即执行。
2.4.1 collect()以数组的形式返回数据集
功能说明:在驱动程序中,以数组Array的形式返回数据集的所有元素。
- 注意:所有的数据都会被拉取到Driver端,慎用。
需求说明:创建一个RDD,并将RDD内容收集到Driver端打印。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22public class Test01_Collect {
public static void main(String[] args) {
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 3. 编写代码
JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);
List<Integer> collect = integerJavaRDD.collect();
for (Integer integer : collect) {
System.out.println(integer);
}
// 4. 关闭sc
sc.stop();
}
}
2.4.2 count()返回RDD中元素个数
功能说明:返回RDD中元素的个数。
需求说明:创建一个RDD,统计该RDD的条数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public class Test02_Count {
public static void main(String[] args) {
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 3. 编写代码
JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);
long count = integerJavaRDD.count();
System.out.println(count);
// 4. 关闭sc
sc.stop();
}
}
2.4.3 first()返回RDD中的第一个元素
功能说明:返回RDD中的第一个元素。
需求说明:创建一个RDD,返回该RDD中的第一个元素。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public class Test03_First {
public static void main(String[] args) {
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 3. 编写代码
JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);
Integer first = integerJavaRDD.first();
System.out.println(first);
// 4. 关闭sc
sc.stop();
}
}
2.4.4 take()返回由RDD前n个元素组成的数组
功能说明:返回一个由RDD的前n个元素组成的数组。
需求说明:创建一个RDD,取出前两个元素。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public class Test04_Take {
public static void main(String[] args) {
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 3. 编写代码
JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);
List<Integer> list = integerJavaRDD.take(3);
list.forEach(System.out::println);
// 4. 关闭sc
sc.stop();
}
}
2.4.5 countByKey()统计每种key的个数
功能说明:统计每种key的个数。
需求说明:创建一个PairRDD,统计每种key的个数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22public class Test05_CountByKey {
public static void main(String[] args) {
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 3. 编写代码
JavaPairRDD<String, Integer> pairRDD = sc.parallelizePairs(
Arrays.asList(new Tuple2<>("a", 8), new Tuple2<>("b", 8), new Tuple2<>("a", 8), new Tuple2<>("d", 8)));
Map<String, Long> map = pairRDD.countByKey();
System.out.println(map);
// 4. 关闭sc
sc.stop();
}
}
2.4.6 save相关算子
saveAsTextFile(path)保存成Text文件
- 功能说明:将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本。
saveAsObjectFile(path)序列化成对象保存到文件
功能说明:用于将RDD中的元素序列化成对象,存储到文件中。
代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21public class Test06_Save {
public static void main(String[] args) {
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 3. 编写代码
JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),2);
integerJavaRDD.saveAsTextFile("output");
integerJavaRDD.saveAsObjectFile("output1");
// 4. 关闭sc
sc.stop();
}
}
2.4.7 foreach()遍历RDD中每一个元素
功能说明:遍历RDD中的每一个元素,并依次应用f函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23public class Test07_Foreach {
public static void main(String[] args) {
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 3. 编写代码
JavaRDD<Integer> integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 4),4);
integerJavaRDD.foreach(new VoidFunction<Integer>() {
public void call(Integer integer) throws Exception {
System.out.println(integer);
}
});
// 4. 关闭sc
sc.stop();
}
}
2.4.8 foreachPartition()遍历RDD中每一个分区
1 |
|
2.5 WordCount案例实操
导入项目依赖,下方的是scala语言打包插件 只要使用scala语法打包运行到linux上面 ,必须要有:
1
2
3
4
5
6
7<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.3.1</version>
</dependency>
</dependencies>
2.5.1 本地调试
本地Spark程序调试需要使用Local提交模式,即将本机当做运行环境,Master和Worker都为本机。运行时直接加断点调试即可。如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44public class WordCount {
public static void main(String[] args) {
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 3. 编写代码
// 读取数据
JavaRDD<String> javaRDD = sc.textFile("input/2.txt");
// 长字符串切分为单个单词
JavaRDD<String> flatMapRDD = javaRDD.flatMap(new FlatMapFunction<String, String>() {
public Iterator<String> call(String s) throws Exception {
List<String> strings = Arrays.asList(s.split(" "));
return strings.iterator();
}
});
// 转换格式为 (单词,1)
JavaPairRDD<String, Integer> pairRDD = flatMapRDD.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s, 1);
}
});
// 合并相同单词
JavaPairRDD<String, Integer> javaPairRDD = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
javaPairRDD. collect().forEach(System.out::println);
// 4. 关闭sc
sc.stop();
}
}
2.5.2 集群运行
修改代码,修改运行模式,将输出的方法修改为落盘,同时设置可以自定义的传入传出路径:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45public class WordCount {
public static void main(String[] args) {
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("yarn").setAppName("sparkCore");
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 3. 编写代码
// 读取数据
JavaRDD<String> javaRDD = sc.textFile(args[0]);
// 长字符串切分为单个单词
JavaRDD<String> flatMapRDD = javaRDD.flatMap(new FlatMapFunction<String, String>() {
public Iterator<String> call(String s) throws Exception {
List<String> strings = Arrays.asList(s.split(" "));
return strings.iterator();
}
});
// 转换格式为 (单词,1)
JavaPairRDD<String, Integer> pairRDD = flatMapRDD.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s, 1);
}
});
// 合并相同单词
JavaPairRDD<String, Integer> javaPairRDD = pairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
javaPairRDD.saveAsTextFile(args[1]);
// 4. 关闭sc
sc.stop();
}
}打包到集群测试。
点击package打包,然后,查看打完后的jar包。
将WordCount.jar上传到/opt/module/spark-yarn目录。
在HDFS上创建,存储输入文件的路径/input。
1
[spark-yarn]$ hadoop fs -mkdir /input
上传输入文件到/input路径。
1
[spark-yarn]$ hadoop fs -put /opt/software /input
执行任务。
1
2
3
4
5
6
7[spark-yarn]$ bin/spark-submit \
--class com.example.spark.WordCount \
--master yarn \
./WordCount.jar \
/input \
/output
# 注意:input和ouput都是HDFS上的集群路径。查询运行结果:
1
[spark-yarn]$ hadoop fs -cat /output/*
2.6 RDD序列化
- 在实际开发中我们往往需要自己定义一些对于RDD的操作,那么此时需要注意的是,初始化工作是在Driver端进行的,而实际运行程序是在Executor端进行的,这就涉及到了跨进程通信,是需要序列化的。下面我们看几个例子。
2.6.1 序列化异常
1 | public class User implements Serializable { |
1 | public class Test01_Ser { |
2.6.2 Kryo序列化框架
Java的序列化能够序列化任何的类。但是比较重,序列化后对象的体积也比较大。
Spark出于性能的考虑,Spark2.0开始支持另外一种Kryo序列化机制。Kryo速度是Serializable的10倍。当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用Kryo来序列化。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33public class Test02_Kryo {
public static void main(String[] args) throws ClassNotFoundException {
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore")
// 替换默认的序列化机制
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册需要使用kryo序列化的自定义类
.registerKryoClasses(new Class[]{Class.forName("org.example.serializable.User")});
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 3. 编写代码
User zhangsan = new User("zhangsan", 13);
User lisi = new User("lisi", 13);
JavaRDD<User> userJavaRDD = sc.parallelize(Arrays.asList(zhangsan, lisi), 2);
JavaRDD<User> mapRDD = userJavaRDD.map(new Function<User, User>() {
public User call(User v1) throws Exception {
return new User(v1.getName(), v1.getAge() + 1);
}
});
mapRDD. collect().forEach(System.out::println);
// 4. 关闭sc
sc.stop();
}
}
2.7 RDD依赖关系
2.7.1 查看血缘关系
RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45public class Test01_Dep {
public static void main(String[] args) {
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 3. 编写代码
JavaRDD<String> lineRDD = sc.textFile("input/2.txt");
System.out.println(lineRDD.toDebugString());
System.out.println("-------------------");
JavaRDD<String> wordRDD = lineRDD.flatMap(new FlatMapFunction<String, String>() {
public Iterator<String> call(String s) throws Exception {
List<String> stringList = Arrays.asList(s.split(" "));
return stringList.iterator();
}
});
System.out.println(wordRDD.toDebugString());
System.out.println("-------------------");
JavaPairRDD<String, Integer> tupleRDD = wordRDD.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s, 1);
}
});
System.out.println(tupleRDD.toDebugString());
System.out.println("-------------------");
JavaPairRDD<String, Integer> wordCountRDD = tupleRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
System.out.println(wordCountRDD.toDebugString());
System.out.println("-------------------");
// 4. 关闭sc
sc.stop();
}
}- 注意:圆括号中的数字表示RDD的并行度,也就是有几个分区。
2.7.2 窄依赖
窄依赖表示每一个父RDD的Partition最多被子RDD的一个Partition使用(一对一or多对一),窄依赖我们形象的比喻为独生子女。
2.7.3 宽依赖
宽依赖表示同一个父RDD的Partition被多个子RDD的Partition依赖(只能是一对多),会引起Shuffle,总结:宽依赖我们形象的比喻为超生。
具有宽依赖的transformations包括:sort、reduceByKey、groupByKey、join和调用rePartition函数的任何操作。
宽依赖对Spark去评估一个transformations有更加重要的影响,比如对性能的影响。在不影响业务要求的情况下,要尽量避免使用有宽依赖的转换算子,因为有宽依赖,就一定会走shuffle,影响性能。
2.7.4 Stage任务划分
DAG有向无环图。
DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。例如,DAG记录了RDD的转换过程和任务的阶段。
任务运行的整体流程。
RDD任务切分中间分为:Application、Job、Stage和Task。
- Application:初始化一个SparkContext即生成一个Application;
- Job:一个Action算子就会生成一个Job;
- Stage:Stage等于宽依赖的个数加1;
- Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。
代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55public class Test02_Dep {
public static void main(String[] args) throws InterruptedException {
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 3. 编写代码
JavaRDD<String> lineRDD = sc.textFile("input/2.txt");
System.out.println(lineRDD.toDebugString());
System.out.println("-------------------");
JavaRDD<String> wordRDD = lineRDD.flatMap(new FlatMapFunction<String, String>() {
public Iterator<String> call(String s) throws Exception {
List<String> stringList = Arrays.asList(s.split(" "));
return stringList.iterator();
}
});
System.out.println(wordRDD);
System.out.println("-------------------");
JavaPairRDD<String, Integer> tupleRDD = wordRDD.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s, 1);
}
});
System.out.println(tupleRDD.toDebugString());
System.out.println("-------------------");
// 缩减分区
JavaPairRDD<String, Integer> coalesceRDD = tupleRDD.coalesce(1);
JavaPairRDD<String, Integer> wordCountRDD = coalesceRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
},4);
System.out.println(wordCountRDD.toDebugString());
System.out.println("-------------------");
wordCountRDD. collect().forEach(System.out::println);
wordCountRDD. collect().forEach(System.out::println);
Thread.sleep(600000);
// 4. 关闭sc
sc.stop();
}
}查看Job个数。查看http://localhost:4040/jobs/,发现Job有两个。
查看Stage个数。
查看Job0的Stage。由于只有1个Shuffle阶段,所以Stage个数为2。
查看Job1的Stage。由于只有1个Shuffle阶段,所以Stage个数为2。
Task个数。
查看Job0的Stage0的Task个数,2个。
查看Job0的Stage1的Task个数,2个。
查看Job1的Stage2的Task个数,0个(2个跳过skipped)。
查看Job1的Stage3的Task个数,2个。
注意:如果存在shuffle过程,系统会自动进行缓存,UI界面显示skipped的部分。
2.8 RDD持久化
2.8.1 RDD Cache缓存
RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以序列化的形式缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action算子时,该RDD将会被缓存在计算节点的内存中,并供后面重用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57public class Test01_Cache {
public static void main(String[] args) throws InterruptedException {
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 3. 编写代码
JavaRDD<String> lineRDD = sc.textFile("input/2.txt");
//3.1.业务逻辑
JavaRDD<String> wordRDD = lineRDD.flatMap(new FlatMapFunction<String, String>() {
public Iterator<String> call(String s) throws Exception {
List<String> stringList = Arrays.asList(s.split(" "));
return stringList.iterator();
}
});
JavaRDD<Tuple2<String, Integer>> tuple2JavaRDD = wordRDD.map(new Function<String, Tuple2<String, Integer>>() {
public Tuple2<String, Integer> call(String v1) throws Exception {
System.out.println("*****************");
return new Tuple2<>(v1, 1);
}
});
//3.5 cache缓存前打印血缘关系
System.out.println(tuple2JavaRDD.toDebugString());
//3.4 数据缓存。
//cache底层调用的就是persist方法,缓存级别默认用的是MEMORY_ONLY
tuple2JavaRDD.cache();
//3.6 persist方法可以更改存储级别
// wordToOneRdd.persist(StorageLevel.MEMORY_AND_DISK_2)
//3.2 触发执行逻辑
tuple2JavaRDD.collect().forEach(System.out::println);
//3.5 cache缓存后打印血缘关系
//cache操作会增加血缘关系,不改变原有的血缘关系
System.out.println(tuple2JavaRDD.toDebugString());
System.out.println("=====================");
//3.3 再次触发执行逻辑
tuple2JavaRDD.collect().forEach(System.out::println);
Thread.sleep(1000000);
// 4. 关闭sc
sc.stop();
}
}源码解析:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16mapRdd.cache()
def cache(): this.type = persist()
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)注意:默认的存储级别都是仅在内存存储一份。在存储级别的末尾加上“_2”表示持久化的数据存为两份。SER:表示序列化。
缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
自带缓存算子。
Spark会自动对一些Shuffle操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点Shuffle失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用persist或cache。
查看前面2.7.4依赖关系代码的DAG图。访问http://localhost:4040/jobs/页面,查看第一个和第二个job的DAG图。说明:增加缓存后血缘依赖关系仍然有,但是,第二个job取的数据是从缓存中取的。
2.8.2 RDD CheckPoint检查点
检查点:是通过将RDD中间结果写入磁盘。
为什么要做检查点?
- 由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。
检查点存储路径:Checkpoint的数据通常是存储在HDFS等容错、高可用的文件系统。
检查点数据存储格式为:二进制的文件。
检查点切断血缘:在Checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移除。
检查点触发时间:对RDD进行Checkpoint操作并不会马上被执行,必须执行Action操作才能触发。但是检查点为了数据安全,会从血缘关系的最开始执行一遍。
设置检查点步骤。
- 设置检查点数据存储路径:sc.setCheckpointDir(“./checkpoint1”)。
- 调用检查点方法:wordToOneRdd.checkpoint()。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42public class Test02_CheckPoint {
public static void main(String[] args) {
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
sc.setCheckpointDir("ck");
// 3. 编写代码
JavaRDD<String> lineRDD = sc.textFile("input/2.txt");
JavaPairRDD<String, Long> tupleRDD = lineRDD.mapToPair(new PairFunction<String, String, Long>() {
public Tuple2<String, Long> call(String s) throws Exception {
return new Tuple2<String, Long>(s, System.currentTimeMillis());
}
});
// 查看血缘关系
System.out.println(tupleRDD.toDebugString());
// 增加检查点避免计算两次
tupleRDD.cache();
// 进行检查点
tupleRDD.checkpoint();
tupleRDD. collect().forEach(System.out::println);
System.out.println(tupleRDD.toDebugString());
// 第二次计算
tupleRDD.collect().forEach(System.out::println);
// 第三次计算
tupleRDD.collect().forEach(System.out::println);
// 4. 关闭sc
sc.stop();
}
}访问http://localhost:4040/jobs/页面,查看4个job的DAG图。其中第2个图是checkpoint的job运行DAG图。第3、4张图说明,检查点切断了血缘依赖关系。
- 只增加checkpoint,没有增加Cache缓存打印。
- 第1个job执行完,触发了checkpoint,第2个job运行checkpoint,并把数据存储在检查点上。第3、4个job,数据从检查点上直接读取。
- 增加checkpoint,也增加Cache缓存打印。
- 第1个job执行完,数据就保存到Cache里面了,第2个job运行checkpoint,直接读取Cache里面的数据,并把数据存储在检查点上。第3、4个job,数据从检查点上直接读取。
- 只增加checkpoint,没有增加Cache缓存打印。
2.8.3 缓存和检查点区别
- Cache缓存只是将数据保存起来,不切断血缘依赖。Checkpoint检查点切断血缘依赖。
- Cache缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint的数据通常存储在HDFS等容错、高可用的文件系统,可靠性高。
- 建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD。
- 如果使用完了缓存,可以通过unpersist()方法释放缓存。
2.8.4 检查点存储到HDFS集群
如果检查点数据存储到HDFS集群,要注意配置访问集群的用户名。否则会报访问权限异常。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47public class Test02_CheckPoint2 {
public static void main(String[] args) {
// 修改用户名称
System.setProperty("HADOOP_USER_NAME", "atguigu");
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 需要设置路径.需要提前在HDFS集群上创建/checkpoint路径
sc.setCheckpointDir("hdfs://hadoop102:8020/checkpoint");
// 3. 编写代码
JavaRDD<String> lineRDD = sc.textFile("input/2.txt");
JavaPairRDD<String, Long> tupleRDD = lineRDD.mapToPair(new PairFunction<String, String, Long>() {
public Tuple2<String, Long> call(String s) throws Exception {
return new Tuple2<String, Long>(s, System.currentTimeMillis());
}
});
// 查看血缘关系
System.out.println(tupleRDD.toDebugString());
// 增加检查点避免计算两次
tupleRDD.cache();
// 进行检查点
tupleRDD.checkpoint();
tupleRDD.collect().forEach(System.out::println);
System.out.println(tupleRDD.toDebugString());
// 第二次计算
tupleRDD.collect().forEach(System.out::println);
// 第三次计算
tupleRDD.collect().forEach(System.out::println);
// 4. 关闭sc
sc.stop();
}
}
2.9 键值对RDD数据分区
Spark目前支持Hash分区、Range分区和用户自定义分区。Hash分区为当前的默认分区。分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入哪个分区和Reduce的个数。
注意:
- 只有Key-Value类型的pairRDD才有分区器,非Key-Value类型的RDD分区的值是None。
- 每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的。
获取RDD分区:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33public class Test01_Partitioner {
public static void main(String[] args) {
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 3. 编写代码
JavaPairRDD<String, Integer> tupleRDD = sc.parallelizePairs(
Arrays.asList(new Tuple2<>("s", 1), new Tuple2<>("a", 3), new Tuple2<>("d", 2)));
// 获取分区器
Optional<Partitioner> partitioner = tupleRDD.partitioner();
System.out.println(partitioner);
JavaPairRDD<String, Integer> reduceByKeyRDD = tupleRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
// 获取分区器
Optional<Partitioner> partitioner1 = reduceByKeyRDD.partitioner();
System.out.println(partitioner1);
// 4. 关闭sc
sc.stop();
}
}
2.9.1 Hash分区
2.9.2 Ranger分区
3、广播变量
广播变量:分布式共享只读变量。
广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个Spark Task操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,广播变量用起来会很顺手。在多个Task并行操作中使用同一个变量,但是Spark会为每个Task任务分别发送。
使用广播变量步骤:
- 调用SparkContext.broadcast(广播变量)创建出一个广播对象,任何可序列化的类型都可以这么实现。
- 通过广播变量.value,访问该对象的值。
- 广播变量只会被发到各个节点一次,作为只读值处理(修改这个值不会影响到别的节点)。
原理说明:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43public class Test02_Broadcast {
public static void main(String[] args) {
// 1.创建配置对象
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("sparkCore");
// 2. 创建sparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 3. 编写代码
JavaRDD<Integer> intRDD = sc.parallelize(Arrays.asList(4, 56, 7, 8, 1, 2));
// 幸运数字
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
// 找出幸运数字
// 每一个task都会创建一个list浪费内存
/*
JavaRDD<Integer> result = intRDD.filter(new Function<Integer, Boolean>() {
@Override
public Boolean call(Integer v1) throws Exception {
return list.contains(v1);
}
});
*/
// 创建广播变量
// 只发送一份数据到每一个executor
Broadcast<List<Integer>> broadcast = sc.broadcast(list);
JavaRDD<Integer> result = intRDD.filter(new Function<Integer, Boolean>() {
public Boolean call(Integer v1) throws Exception {
return broadcast.value().contains(v1);
}
});
result. collect().forEach(System.out::println);
// 4. 关闭sc
sc.stop();
}
}