Spark Core


Spark生态圈:
Spark Core : RDD(弹性分布式数据集)
Spark SQL
Spark Streaming
Spark MLLib :协同过滤,ALS,逻辑回归等等 –> 机器学习
Spark Graphx : 图计算

一、Spark Core

1、什么是Spark?特点

https://spark.apache.org/
Apache Spark™ is a unified analytics engine for large-scale data processing.
特点:快、易用、通用性、兼容性(完全兼容Hadoop)

快:快100倍(Hadoop 3 之前)
易用:支持多种语言开发
通用性:生态系统全
易用性:兼容Hadoop

二、安装和部署Spark、Spark 的 HA

1、Spark体系结构

Spark的运行方式

Yarn

Standalone:本机调试(Demo)

Worker:从节点。每个服务器上,资源和任务的管理者。只负责管理一个节点

执行过程:
一个Worker 有多个 Executor。 Executor是任务的执行者,按阶段(stage)划分任务。————> RDD

客户端:Driver Program 提交任务到集群中
1)spark-submit
2)spark-shell

2、Spark的搭建

1)准备工作:JDK 配置主机名 免密码登录

2)伪分布式模式
在一台虚拟机上模拟分布式环境(Master和Worker在一个节点上)
配置spark-env.sh
vi spark-env.sh

export JAVA_HOME=/root/hd/jdk1.8.0_192
export SPARK_MASTER_HOST=hsiehchou121
export SPARK_MASTER_PORT=7077

配置slaves
vi slaves
hsiehchou121

浏览器访问hsiehchou121:8080

在Spark中使用Scala语言

[root@hsiehchou121 bin]# ./spark-shell --master spark://hsiehchou121:7077

3)全分布式环境
修改slave文件 拷贝到其他三台服务器 启动

3、Spark的 HA

回顾HA(高可用)
)HDFS Yarn Hbase Spark 主从结构
)单点故障

(1)基于文件目录的单点恢复
主要用于开发或测试环境。当spark提供目录保存spark Application和worker的注册信息,并将他们的恢复状态写入该目录中,这时,一旦Master发生故障,就可以通过重新启动Master进程(sbin/start-master.sh),恢复已运行的spark Application和worker的注册信息

基于文件系统的单点恢复,主要是在spark-en.sh里对SPARK_DAEMON_JAVA_OPTS设置

配置参数 参考值
spark.deploy.recoveryMode 设置为FILESYSTEM开启单点恢复功能,默认值:NONE
spark.deploy.recoveryDirectory Spark 保存恢复状态的目录

参考
export SPARK_DAEMON_JAVA_OPTS=”-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/root/hd/spark-2.1.0-bin-hadoop2.7/recovery”

(*)本质:还是只有一个主节点Master,创建了一个恢复目录,保存集群状态和任务的信息
当Master挂掉,重新启动时,会从恢复目录下读取状态信息,恢复出来原来的状态

用途:这个只用于开发和测试,但是生产使用用ZooKeeper

export SPARK_DAEMON_JAVA_OPTS=”-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/root/hd/spark-2.1.0-bin-hadoop2.7/recovery”

(2)基于ZooKeeper :和Hadoop类似
ZooKeeper提供了一个Leader Election机制,利用这个机制可以保证虽然集群存在多个Master,但是只有一个是Active的,其他的都是Standby。当Active的Master出现故障时,另外的一个Standby Master会被选举出来。由于集群的信息,包括Worker, Driver和Application的信息都已经持久化到ZooKeeper,因此在切换的过程中只会影响新Job的提交,对于正在进行的Job没有任何的影响

配置参数 参考值
spark.deploy.recoveryMode 设置为ZOOKEEPER开启单点恢复功能,默认值:NONE
spark.deploy.zookeeper.url ZooKeeper集群的地址
spark.deploy.zookeeper.dir Spark信息在ZK中的保存目录,默认:/spark

参考
export SPARK_DAEMON_JAVA_OPTS=”-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hsiehchou121:2181,hsiehchou122:2181,hsiehchou123:2181,hsiehchou124:2181 -Dspark.deploy.zookeeper.dir=/spark”

(*)复习一下zookeeper:
相当于一个数据库,把一些信息存放在zookeeper中,比如集群的信息
数据同步功能,选举功能,分布式锁功能

数据同步:给一个节点中写入数据,可以同步到其他节点

选举:Zookeeper中存在不同的角色,Leader Follower。如果Leader挂掉,重新选举Leader

分布式锁:秒杀。以目录节点的方式来保存数据

修改 spark-env.sh

export JAVA_HOME=/root/hd/jdk1.8.0_192
#export SPARK_MASTER_HOST=hsiehchou121
#export SPARK_MASTER_PORT=7077
#export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/root/hd/spark-2.1.0-bin-hadoop2.7/recovery"
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hsiehchou121:2181,hsiehchou122:2181,hsiehchou123:2181,hsiehchou124:2181 -Dspark.deploy.zookeeper.dir=/spark"

同步到其他三台服务器
[root@hsiehchou121 spark-2.1.0-bin-hadoop2.7]# scp conf/spark-env.sh hsiehchou122:/root/hd/spark-2.1.0-bin-hadoop2.7/conf
[root@hsiehchou121 spark-2.1.0-bin-hadoop2.7]# scp conf/spark-env.sh hsiehchou123:/root/hd/spark-2.1.0-bin-hadoop2.7/conf
[root@hsiehchou121 spark-2.1.0-bin-hadoop2.7]# scp conf/spark-env.sh hsiehchou124:/root/hd/spark-2.1.0-bin-hadoop2.7/conf

在hsiehchou121 start-all hsiehchou121 master hsiehchou122 Worker hsiehchou123 Worker hsiehchou124 Worker
在hsiehchou121 start-master hsiehchou121 master hsiehchou122 master(standby) hsiehchou122 Worker hsiehchou123 Worker hsiehchou124 Worker

在hsiehchou121 上kill master
hsiehchou122 master(Active) hsiehchou122 Worker hsiehchou123 Worker hsiehchou124 Worker

在网页http://192.168.116.122:8080/ 可以看到相应信息

三、执行Spark的任务:两个工具

1、spark-submit:用于提交Spark的任务

任务:jar

举例:蒙特卡洛求PI(圆周率)

./spark-submit --master spark://hsiehchou121:7077 --class
--class指明主程序的名字

[root@hsiehchou121 /]#cd /root/hd/spark-2.1.0-bin-hadoop2.7/bin
[root@hsiehchou121 bin]# ./spark-submit --master spark://hsiehchou121:7077 --class org.apache.spark.examples.SparkPi /root/hd/spark-2.1.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.1.0.jar 100

其中100指定执行的次数

2、spark-shell 相当于REPL

spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下用scala编写spark程序
(*)启动Spark Shell:spark-shell
也可以使用以下参数:
参数说明:
--master spark://hsiehchou121:7077 指定Master的地址
--executor-memory 2g 指定每个worker可用内存为2G
--total-executor-cores 2 指定整个集群使用的cup核数为2个
例如:

spark-shell --master spark://hsiehchou121:7077 --executor-memory 2g --total-executor-cores 2
注意:
如果启动spark shell时没有指定master地址,但是也可以正常启动spark shell和执行spark shell中的程序,其实是启动了spark的local模式,该模式仅在本机启动一个进程,没有与集群建立联系

作为一个独立的Application运行
两种模式:
(1)本地模式
spark-shell 后面不接任何参数,代表本地模式
./spark-shell
Spark context available as ‘sc’ (master = local[], app id = local-1554372019995).
sc 是 SparkContext 对象名。 local[
] 代表本地模式,不提交到集群中运行

(2)集群模式
[root@hsiehchou121 bin]# ./spark-shell --master spark://hsiehchou121:7077
提交到集群运行
Spark context available as ‘sc’ (master = spark://hsiehchou121:7077, app id = app-20190404190030-0000).

master = spark://hsiehchou121:7077
Spark session available as ‘spark’
Spark Session 是 2.0 以后提供的,利用 SparkSession 可以访问spark所有组件

示例:
WordCount程序

程序如下:

sc.textFile("hdfs://192.168.116.121:9000/data.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://192.168.116.121:9000/output/wc")

说明:
sc是SparkContext对象,该对象时提交spark程序的入口
textFile(“hdfs://192.168.116.121:9000/data.txt”)是hdfs中读取数据
flatMap(.split(” “))先map在压平
map((
,1))将单词和1构成元组
reduceByKey(+)按照key进行reduce,并将value累加
saveAsTextFile(“hdfs://192.168.116.121:9000/output/wc”)将结果写入到hdfs中

(*)处理本地文件,把结果打印到屏幕上
vi /root/hd/tmp_files/test_WordCount.txt
I love China
I love Jiangsu
Jiangsu is a beautiful place in China

scala> sc.textFile("/root/hd/tmp_files/test_WordCount.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res0: Array[(String, Int)] = Array((is,1), (love,2), (China,2), (a,1), (Jiangsu,2), (I,2), (in,1), (place,1), (beautiful,1))

(*)处理HDFS文件,结果保存在HDFS上

[root@hsiehchou121 tmp_files]# hdfs dfs -mkdir /tmp_files
[root@hsiehchou121 tmp_files]# hdfs dfs -copyFromLocal ~/hd/tmp_files/test_WordCount.txt /tmp_files
scala> sc.textFile("hdfs://hsiehchou121:9000/tmp_files/test_WordCount.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://hsiehchou121:9000/out/0404/test_WordCount")

-rw-r–r– 3 root supergroup 0 2019-04-04 19:12 /out/0404/test_WordCount/_SUCCESS
-rw-r–r– 3 root supergroup 16 2019-04-04 19:12 /out/0404/test_WordCount/part-00000
-rw-r–r– 3 root supergroup 65 2019-04-04 19:12 /out/0404/test_WordCount/part-00001

_SUCCESS 代表程序执行成功

part-00000 part-00001 结果文件,分区。里面内容不重复

(*)单步运行WordCount —-> RDD
scala> val rdd1 = sc.textFile(“/root/hd/tmp_files/test_WordCount.txt”)
rdd1: org.apache.spark.rdd.RDD[String] = /root/hd/tmp_files/test_WordCount.txt MapPartitionsRDD[12] at textFile at <console>:24

scala> rdd1.collect
res5: Array[String] = Array(I love China, I love Jiangsu, Jiangsu is a beautiful place in China)

scala> val rdd2 = rdd1.flatMap(_.split(“ “))
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[14] at flatMap at <console>:26

scala> rdd2.collect
res6: Array[String] = Array(I, love, China, I, love, Jiangsu, Jiangsu, is, a, beautiful, place, in, China)

scala> val rdd3 = rdd2.map((_,1))
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[15] at map at <console>:28

scala> rdd3.collect
res7: Array[(String, Int)] = Array((I,1), (love,1), (China,1), (I,1), (love,1), (Jiangsu,1), (Jiangsu,1), (is,1), (a,1), (beautiful,1), (place,1), (in,1), (China,1))

scala> val rdd4 = rdd3.reduceByKey(+)
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[16] at reduceByKey at <console>:30

scala> rdd4.collect
res8: Array[(String, Int)] = Array((is,1), (love,2), (China,2), (a,1), (Jiangsu,2), (I,2), (in,1), (place,1), (beautiful,1))

RDD 弹性分布式数据集

(1)依赖关系 : 宽依赖和窄依赖
(2)算子:
函数:
Transformation : 延时计算 map flatMap textFile
Action : 立即触发计算 collect

说明:

scala复习

(*)flatten:把嵌套的结果展开
scala> List(List(2,4,6,8,10),List(1,3,5,7,9)).flatten
res21: List[Int] = List(2, 4, 6, 8, 10, 1, 3, 5, 7, 9)

(*)flatmap : 相当于一个 map + flatten
scala> var myList = List(List(2,4,6,8,10),List(1,3,5,7,9))
myList: List[List[Int]] = List(List(2, 4, 6, 8, 10), List(1, 3, 5, 7, 9))

scala> myList.flatMap(x=>x.map(_*2))
res22: List[Int] = List(4, 8, 12, 16, 20, 2, 6, 10, 14, 18)

myList.flatMap(x=>x.map(_*2))

执行过程:
(1)将 List(2, 4, 6, 8, 10), List(1, 3, 5, 7, 9) 调用 map(_*2) 方法。x 代表一个List
(2)flatten
(3)在IDE中开发scala版本和Java版本的WorkCount

四、WordCount(Scala版本和Java版本)

1、Scala版本的WordCount

新建一个工程,把jar引入到工程中

package day1

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object WordCount {
  def main(args: Array[String]): Unit = {

    //创建一个Spark的配置文件
    val conf = new SparkConf().setAppName("My Scala WordCount 0404").setMaster("local")

    //创建SparkContext对象
    val sc = new SparkContext(conf)

    //1.从本地模式运行 .setMaster("local")
   //val result = sc.textFile("hdfs://hsiehchou121:9000/tmp_files/test_WordCount.txt")
      //.flatMap(_.split(" "))
      //.map((_,1))
      //.reduceByKey(_+_)

    //result.foreach(println)

    //2、在集群模式运行
    val result = sc.textFile(args(0))
      .flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .saveAsTextFile(args(1))

    sc.stop()
  }
}

export Demo1.jar 点击下一步,把jar包上传到服务器上/root/hd/tmp_files/下

在spark里面的bin目录下输入

[root@hsiehchou121 bin]# ./spark-submit --master spark://hsiehchou121:7077 --class day1.WordCount /root/hd/tmp_files/Demo1.jar hdfs://hsiehchou121:9000/tmp_files/test_WordCount.txt hdfs://hsiehchou121:9000/out/0405/Demo1

2、Java版本的WordCount

package day1;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

public class JavaWordCount {
    public static void main(String[] args) {

        SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");

        //创建SparkContext对象
        JavaSparkContext sc = new JavaSparkContext(conf);

        //读入数据
        JavaRDD<String> lines = sc.textFile("hdfs://192.168.116.121:9000/tmp_files/test_WordCount.txt");

        //分词,第一个参数表示读进来的每一句话,第二个参数表示返回值
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String,String>(){

            @Override
            public Iterator<String> call(String input) throws Exception {
                return Arrays.asList(input.split(" ")).iterator();
            }
        });

        //每一个单词记一个数
        JavaPairRDD<String,Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {

            @Override
            public Tuple2<String, Integer> call(String input) throws Exception {
                return new Tuple2<String, Integer>(input,1);
            }
        });

        //执行reduce操作
        JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {

            @Override
            public Integer call(Integer arg0, Integer arg1) throws Exception {
                return arg0 + arg1;
            }
        });

        List<Tuple2<String,Integer>> output = counts.collect();
        for(Tuple2<String, Integer> tuple:output) {
            System.out.println(tuple._1 + ":" + tuple._2);
        }

        sc.stop();
    }
}

[root@hsiehchou121 bin]# ./spark-submit --master spark://hsiehchou121:7077 --class day1.JavaWordCount /root/hd/tmp_files/Demo2.jar

五、分析Spark的任务流程

1、分析WordCount程序处理过程

WordCount程序分析

2、Spark调度任务的过程

提交到及群众运行任务时,spark执行任务调度
spark的调用任务过程

六、RDD和RDD特性、RDD的算子

1、RDD:弹性分布式数据集

)Spark中最基本的数据抽象
)RDD的特性

Internally, each RDD is characterized by five main properties:
*

A list of partitions
1)是一组分区
RDD由分区组成,每个分区运行在不同的Worker上,通过这种方式来实现分布式计算

RDD

A function for computing each split
在RDD中,提供算子处理每个分区中的数据

-A list of dependencies on other RDDs
RDD存在依赖关系:宽依赖和窄依赖

Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
可以自定义分区规则来创建RDD

Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
优先选择离文件位置近的节点来执行

如何创建RDD
(1)通过SparkContext.parallelize方法来创建

scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at parallelize at <console>:29

scala> rdd1.partitions.length
res35: Int = 3

scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[33] at parallelize at <console>:29

scala> rdd1.partitions.length
res36: Int = 2

(2)通过外部数据源来创建
sc.textFile()

scala> val rdd2 = sc.textFile(“/root/hd/tmp_files/test_WordCount.txt”)
rdd2: org.apache.spark.rdd.RDD[String] = /usr/local/tmp_files/test_WordCount.txt MapPartitionsRDD[35] at textFile at <console>:29

2、 算子

1)Transformation
map(func):相当于for循环,返回一个新的RDD

filter(func):过滤
flatMap(func):flat+map 压平

mapPartitions(func):对RDD中的每个分区进行操作
mapPartitionsWithIndex(func):对RDD中的每个分区进行操作,可以取到分区号

sample(withReplacement, fraction, seed):采样

集合运算
union(otherDataset):对源RDD和参数RDD求并集后返回一个新的RDD
intersection(otherDataset):对源RDD和参数RDD求交集后返回一个新的RDD

distinct([numTasks])):去重

聚合操作group by
groupByKey([numTasks]) :在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]):在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
aggregateByKey(zeroValue)(seqOp,combOp,[numTasks]):按照key进行聚合

排序
sortByKey([ascending], [numTasks]):在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]):与sortByKey类似,但是更灵活

join(otherDataset, [numTasks]):在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks]):在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
cartesian(otherDataset)
pipe(command, [envVars])
coalesce(numPartitions)

重分区
repartition(numPartitions)
repartitionAndSortWithinPartitions(partitioner)

举例:
(1)创建一个RDD,每个元素乘以2,再排序
scala> val rdd1 = sc.parallelize(Array(3,4,5,100,79,81,6,8))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val rdd2 = rdd1.map(_*2)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:26

scala> rdd2.collect
res0: Array[Int] = Array(6, 8, 10, 200, 158, 162, 12, 16)

scala> rdd2.sortBy(x=>x,true).collect
res1: Array[Int] = Array(6, 8, 10, 12, 16, 158, 162, 200)

scala> rdd2.sortBy(x=>x,false).collect
res2: Array[Int] = Array(200, 162, 158, 16, 12, 10, 8, 6)

def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true)
过滤出大于20的元素:

scala> val rdd3 = rdd2.filter(_>20)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[53] at filter at <console>:33+

scala> rdd3.collect
res3: Array[Int] = Array(200, 158, 162)

(2)字符串(字符)类型的RDD
scala> val rdd4 = sc.parallelize(Array(“a b c”,”d e f”,”g h i”))
rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[28] at parallelize at <console>:24

scala> rdd4.flatMap(_.split(“ “)).collect
res4: Array[String] = Array(a, b, c, d, e, f, g, h, i)

3、RDD的集合运算

scala> val rdd5 = sc.parallelize(List(1,2,3,6,7,8,100))
rdd5: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :24

scala> val rdd6 = sc.parallelize(List(1,2,3,4))
rdd6: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at :24

scala> val rdd7 = rdd5.union(rdd6)
rdd7: org.apache.spark.rdd.RDD[Int] = UnionRDD[2] at union at :28

scala> rdd7.collect
res5: Array[Int] = Array(1, 2, 3, 6, 7, 8, 100, 1, 2, 3, 4)

scala> rdd7.distinct.collect
res6: Array[Int] = Array(100, 4, 8, 1, 6, 2, 3, 7)

4、分组操作:reduceByKey

scala> val rdd1 = sc.parallelize(List(("Time",1800),("Dadi",2400),("Giu",1600))) rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[6] at parallelize at :24

scala> val rdd2 = sc.parallelize(List((“Dadi”,1300),(“Time”,2900),(“Mi”,600)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at parallelize at :24

scala> val rdd3 = rdd1 union rdd2
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = UnionRDD[8] at union at :28

scala> rdd3.collect
res3: Array[(String, Int)] = Array((Time,1800), (Dadi,2400), (Giu,1600), (Dadi,1300), (Time,2900), (Mi,600))

scala> val rdd4 = rdd3.groupByKey
rdd4: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[9] at groupByKey at :30

scala> rdd4.collect
res4: Array[(String, Iterable[Int])] = Array((Mi,CompactBuffer(600)),
(Time,CompactBuffer(1800, 2900)),
(Dadi,CompactBuffer(2400, 1300)),
(Giu,CompactBuffer(1600)))

scala> rdd3.reduceByKey(+).collect
res5: Array[(String, Int)] = Array((Mi,600), (Time,4700), (Dadi,3700), (Giu,1600))
reduceByKey will provide much better performance.
官方不推荐使用 groupByKey 推荐使用 reduceByKey

5、cogroup

在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD

对两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。与reduceByKey不同的是针对两个RDD中相同的key的元素进行合并,与groupByKey返回值上与区别

scala> val rdd1 = sc.parallelize(List((“Tim”,1),(“Tim”,2),(“Jert”,3),(“kiy”,2)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[11] at parallelize at :24

scala> val rdd1 = sc.parallelize(List((“Tim”,1),(“Tim”,2),(“Jert”,3),(“Kiy”,2)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[12] at parallelize at :24

scala> val rdd2 = sc.parallelize(List((“Jert”,2),(“Tim”,1),(“Sun”,2)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[13] at parallelize at :24

scala> val rdd3 = rdd1.cogroup(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, (Iterable[Int], Iterable[Int]))] = MapPartitionsRDD[15] at cogroup at :28

scala> rdd3.collect
res6: Array[(String, (Iterable[Int], Iterable[Int]))] = Array(
(Tim,(CompactBuffer(1, 2),CompactBuffer(1))),
(Sun,(CompactBuffer(),CompactBuffer(2))),
(Kiy,(CompactBuffer(2),CompactBuffer())),
(Jert,(CompactBuffer(3),CompactBuffer(2))))

6、reduce操作(Action)

聚合操作

scala> val rdd1 = sc.parallelize(List(1,2,3,4,5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at parallelize at :24

scala> rdd1.reduce(+)
res7: Int = 15

7、需求:按照value排序

做法:
1)交换,把key 和 value交换,然后调用sortByKey方法
2)再次交换

scala> val rdd1 = sc.parallelize(List((“tim”,1),(“jery”,3),(“kef”,2),(“sun”,2)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[17] at parallelize at :24

scala> val rdd2 = sc.parallelize(List((“jery”,1),(“tim”,3),(“sun”,5),(“kef”,1)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[18] at parallelize at :24

scala> val rdd3 = rdd1.union(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = UnionRDD[19] at union at :28

scala> val rdd4 = rdd3.reduceByKey(+)
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[20] at reduceByKey at :30

scala> rdd4.collect
res8: Array[(String, Int)] = Array((tim,4), (kef,3), (sun,7), (jery,4))

scala> val rdd5 = rdd4.map(t=>(t._2,t._1)).sortByKey(false).map(t=>(t._2,t._1))
rdd5: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[25] at map at :32

scala> rdd5.collect
res10: Array[(String, Int)] = Array((sun,7), (tim,4), (jery,4), (kef,3))

(2)Action
reduce(func):通过func函数聚集RDD中的所有元素,这个功能必须是课交换且可并联的

collect():在驱动程序中,以数组的形式返回数据集的所有元素
count():返回RDD的元素个数
first():返回RDD的第一个元素(类似于take(1))
take(n):返回一个由数据集的前n个元素组成的数组

takeSample(withReplacement,num, [seed]):返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子

takeOrdered(n, [ordering]):takeOrdered和top类似,只不过以和top相反的顺序返回元素

saveAsTextFile(path):将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本

saveAsSequenceFile(path) :将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统

saveAsObjectFile(path) :saveAsObjectFile用于将RDD中的元素序列化成对象,存储到文件中

countByKey():针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数

foreach(func):在数据集的每一个元素上,运行函数func进行更新。
与map类似,没有返回值

3)特性
(1)RDD的缓存机制
RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用

通过查看源码发现cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的

缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition

)作用:提高性能
)使用:标识RDD可以被缓存 persist cache
(*)可以缓存的位置:

val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

/**

  • Persist this RDD with the default storage level (MEMORY_ONLY).
  • /
    def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

/**

  • Persist this RDD with the default storage level (MEMORY_ONLY).
  • /
    def cache(): this.type = persist()
    举例:测试数据,92万条
    进入spark-shell命令

./spark-shell --master spark://hsiehchou121:7077

scala> val rdd1 = sc.textFile(“hdfs://192.168.116.121:9000/tmp_files/test_Cache.txt”)
rdd1: org.apache.spark.rdd.RDD[String] = hdfs://192.168.116.121:9000/tmp_files/test_Cache.txt MapPartitionsRDD[3] at textFile at <console>:24

scala> rdd1.count –> 直接出发计算
res0: Long = 921911

scala> rdd1.cache –> 标识RDD可以被缓存,不会触发计算
res1: rdd1.type = hdfs://192.168.116.121:9000/tmp_files/test_Cache.txt MapPartitionsRDD[3] at textFile at <console>:24

scala> rdd1.count –> 和第一步一样,触发计算,但是,把结果进行缓存
res2: Long = 921911

scala> rdd1.count –> 从缓存中直接读出结果
res3: Long = 921911

(2)RDD的容错机制:通过检查点来实现
检查点(本质是通过将RDD写入Disk做检查点)是为了通过lineage(血统)做容错的辅助,lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就会减少开销

设置checkpoint的目录,可以是本地的文件夹、也可以是HDFS。一般是在具有容错能力,高可靠的文件系统上(比如HDFS, S3等)设置一个检查点路径,用于保存检查点数据

/**

Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint

directory set with SparkContext#setCheckpointDir and all references to its parent

RDDs will be removed. This function must be called before any job has been

executed on this RDD. It is strongly recommended that this RDD is persisted in

memory, otherwise saving it on a file will require recomputation.
*/

(*)复习检查点:
HDFS中的检查点:有SecondaryNamenode来实现日志的合并

(*)RDD的检查点:容错
概念:血统 Lineage
理解:表示任务执行的生命周期
WordCount textFile —> redceByKey

如果血统越长,越容易出错

假如有检查点,可以从最近的一个检查点开始,往后面计算。不用重头计算

(*)RDD检查点的类型
(1)基于本地目录:需要将Spark shell 或者任务运行在本地模式上(setMaster(“local”))
开发和测试

(2)HDFS目录:用于生产
sc.setCheckPointDir(目录)

举例:设置检查点
scala> var rdd1 = sc.textFile(“hdfs://192.168.116.121:9000/tmp_files/test_Cache.txt”)
rdd1: org.apache.spark.rdd.RDD[String] = hdfs://192.168.116.121:9000/tmp_files/test_Cache.txt MapPartitionsRDD[1] at textFile at <console>:24

设置检查点目录:
scala> sc.setCheckpointDir(“hdfs://192.168.116.121:9000/sparkchkpt”)

标识rdd1可以执行检查点操作
scala> rdd1.checkpoint

scala> rdd1.count
res2: Long = 921911

(3)*依赖关系:宽依赖,窄依赖 *

Stage是每一个job处理过程要分为的几个阶段

Stage划分

划分任务执行的stage
RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)

窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用(一(父)对一(子))
总结:窄依赖我们形象的比喻为独生子女

宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition(一(父)对多(子))
总结:宽依赖我们形象的比喻为超生

DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据

七、RDD的高级算子

1、mapPartitionsWithIndex

对RDD中的每个分区(带有下标)进行操作,下标用index表示
通过这个算子,我们可以获取分区号

def mapPartitionsWithIndex<a href=”
f: %28Int, Iterator%5bT%5d%29 ⇒ Iterator%5bU%5d,
preservesPartitioning: Boolean = false”>U(implicit arg0: ClassTag[U]): RDD[U]

通过将函数应用于此RDD的每个分区来返回新的RDD,同时跟踪原始分区的索引

preservesPartitioning指输入函数是否保留分区器,除非是一对RDD并且输入函数不修改keys,否则应该是false

参数:f是个函数参数 f 中第一个参数是Int,代表分区号,第二个Iterator[T]代表分区中的元素

举例:把分区中的元素,包括分区号,都打印出来

scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8),3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24

scala> def fun1(index:Int, iter:Iterator[Int]) : Iterator[String] = {
| iter.toList.map(x => “[partId: “+ index +” , value = “ + x + “ ]”).iterator
| }
fun1: (index: Int, iter: Iterator[Int])Iterator[String]

scala> rdd1.mapPartitionsWithIndex(fun1).collect
res3: Array[String] = Array(
[partId: 0 , value = 1 ], [partId: 0 , value = 2 ],
[partId: 1 , value = 3 ], [partId: 1 , value = 4 ], [partId: 1 , value = 5 ],
[partId: 2 , value = 6 ], [partId: 2 , value = 7 ], [partId: 2 , value = 8 ])

2、aggregate

聚合操作。类似于分组
(*)先对局部进行聚合操作,再对全局进行聚合操作

调用聚合操作
scala> val rdd2 = sc.parallelize(List(1,2,3,4,5),2)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:24

scala> rdd2.mapPartitionsWithIndex(fun1).collect
res4: Array[String] = Array(
[partId : 0 , value = 1 ], [partId : 0 , value = 2 ],
[partId : 1 , value = 3 ], [partId : 1 , value = 4 ], [partId : 1 , value = 5 ])

scala> import scala.math._
import scala.math._

scala> rdd2.aggregate(0)(max(_,_),_+_)
res6: Int = 7

说明:aggregate

(0) 初始值是 0 
(max(_,_) 局部操作的函数
,   _+_   全局操作的函数
)
scala> rdd2.aggregate(100)(max(_,_),_+_)
res8: Int = 300

分析结果:初始值是100,代表每个分区多了一个100
全局操作,也多了一个100
100+100+100 = 300

对RDD中的元素进行求和
RDD.map

聚合操作(效率大于map)

scala> rdd2.aggregate(0)(_+_,_+_)
res9: Int = 15

相当于MapReduce 的 Combiner

scala> rdd2.aggregate(10)(_+_,_+_)
res10: Int = 45

(*)对字符串操作

scala> val rdd2 = sc.parallelize(List(“a”,”b”,”c”,”d”,”e”,”f”),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:27

scala> rdd2.aggregate("")(_+_,_+_)
res11: String = abcdef

scala> rdd2.aggregate("*")(_+_,_+_)
res12: String = **def*abc

结果分析:
*abc *def

*defabc

(*)复杂的例子
1)
scala> val rdd3 = sc.parallelize(List(“12”,”23”,”345”,”4567”),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[8] at parallelize at <console>:27

scala> def fun1(index:Int, iter:Iterator[String]) : Iterator[String] = {
| iter.toList.map(x => “[partId : “ + index + “ , value = “ + x + “ ]”).iterator
| }

scala> rdd3.mapPartitionsWithIndex(fun1).collect
res17: Array[String] = Array(
[partId : 0 , value = 12 ], [partId : 0 , value = 23 ],
[partId : 1 , value = 345 ], [partId : 1 , value = 4567 ])

scala> rdd3.aggregate(“”)((x,y)=> math.max(x.length,y.length).toString,(x,y)=>x+y)
res13: String = 42
执行过程:
第一个分区:
第一次比较: “” “12” 长度最大值 2 2–>”2”
第二次比较: “2” “23” 长度最大值 2 2–>”2”

第二个分区:
第一次比较: “” “345” 长度最大值 3 3–>”3”
第二次比较: “3” “4567” 长度最大值 4 4–>”4”
结果:24 或者42

2)
scala> rdd3.aggregate(“”)((x,y)=> math.min(x.length,y.length).toString,(x,y)=>x+y)
res18: String = 11
执行过程:
第一个分区:
第一次比较: “” “12” 长度最小值 0 0–>”0”
第二次比较: “0” “23” 长度最小值 1 1–>”1”

第二个分区:
第一次比较: “” “345” 长度最小值 0 0–>”0”
第二次比较: “0” “4567” 长度最小值 1 1–>”1”

val rdd3 = sc.parallelize(List(“12”,”23”,”345”,””),2)
rdd3.aggregate(“”)((x,y)=> math.min(x.length,y.length).toString,(x,y)=>x+y)

scala> val rdd3 = sc.parallelize(List(“12”,”23”,”345”,””),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at :27

scala> rdd3.aggregate(“”)((x,y)=> math.min(x.length,y.length).toString,(x,y)=>x+y)
res19: String = 10

scala> rdd3.aggregate(“”)((x,y)=> math.min(x.length,y.length).toString,(x,y)=>x+y)
res20: String = 01
3)aggregateByKey:类似于aggregate,区别:操作的是 key value 的数据类型

scala> val pairRDD = sc.parallelize(List((“cat”,2),(“cat”,5),(“mouse”,4),(“cat”,12),(“dog”,12),(“mouse”,2)),2)
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> def fun3(index:Int, iter:Iterator[(String,Int)]) : Iterator[String] = {
| iter.toList.map(x=>”partId : “ + index + “ , value = “ + x + “ ]”).iterator
| }
fun3: (index: Int, iter: Iterator[(String, Int)])Iterator[String]

scala> pairRDD.mapPartitionsWithIndex(fun3).collect
res0: Array[String] = Array(
partId : 0 , value = (cat,2) ], partId : 0 , value = (cat,5) ], partId : 0 , value = (mouse,4) ],
partId : 1 , value = (cat,12) ], partId : 1 , value = (dog,12) ], partId : 1 , value = (mouse,2) ])

1.将每个动物园(分区)中,动物数最多的动物,进行求和
动物园0
[partId : 0 , value = (cat,2) ], [partId : 0 , value = (cat,5) ], [partId : 0 , value = (mouse,4) ],

动物园1
[partId : 1 , value = (cat,12) ], [partId : 1 , value = (dog,12) ], [partId : 1 , value = (mouse,2) ])

pairRDD.aggregateByKey(0)(math.max(_,_),_+_)

scala> pairRDD.aggregateByKey(0)(math.max(_,_),_+_).collect
res1: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))   

2.将所有动物求和

pairRDD.aggregateByKey(0)(_+_,_+_).collect

scala> pairRDD.reduceByKey(_+_).collect
res27: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))

aggregateByKey效率更高

4)coalesce与repartition
与分区有关
都是对RDD进行重分区

区别:
coalesce 默认不会进行Shuffle 默认 false 如需修改分区,需置为true

repartition 会进行Shuffle

scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>

scala> val rdd2 = rdd1.repartition(3)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at repartition at <console>:26

scala> rdd2.partitions.length
res4: Int = 3

scala> val rdd3 = rdd1.coalesce(3,true)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at coalesce at <console>:26

scala> rdd3.partitions.length
res5: Int = 3

scala> val rdd4 = rdd1.coalesce(4)
rdd4: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[14] at coalesce at <console>:26

scala> rdd4.partitions.length
res6: Int = 2

5)其他高级算子
比较好的高级算子的博客(推荐)
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html

八、编程案例

1、分析日志

需求:找到访问量最高的两个网页
)第一步:对网页的访问量求和
)第二步:排序,降序

日志数据
192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] “GET /MyDemoWeb/ HTTP/1.1” 200 259
192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] “GET /MyDemoWeb/head.jsp HTTP/1.1” 200 713
192.168.88.1 - - [30/Jul/2017:12:53:43 +0800] “GET /MyDemoWeb/body.jsp HTTP/1.1” 200 240
192.168.88.1 - - [30/Jul/2017:12:54:37 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:38 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:38 +0800] “GET /MyDemoWeb/java.jsp HTTP/1.1” 200 240
192.168.88.1 - - [30/Jul/2017:12:54:40 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:40 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:41 +0800] “GET /MyDemoWeb/mysql.jsp HTTP/1.1” 200 241
192.168.88.1 - - [30/Jul/2017:12:54:41 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:42 +0800] “GET /MyDemoWeb/web.jsp HTTP/1.1” 200 239
192.168.88.1 - - [30/Jul/2017:12:54:42 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:52 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:52 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:53 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:54 +0800] “GET /MyDemoWeb/mysql.jsp HTTP/1.1” 200 241
192.168.88.1 - - [30/Jul/2017:12:54:54 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:54 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:56 +0800] “GET /MyDemoWeb/web.jsp HTTP/1.1” 200 239
192.168.88.1 - - [30/Jul/2017:12:54:56 +0800] “GET /MyDemoWeb/java.jsp HTTP/1.1” 200 240
192.168.88.1 - - [30/Jul/2017:12:54:57 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:57 +0800] “GET /MyDemoWeb/java.jsp HTTP/1.1” 200 240
192.168.88.1 - - [30/Jul/2017:12:54:58 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:58 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:59 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:54:59 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:55:43 +0800] “GET /MyDemoWeb/mysql.jsp HTTP/1.1” 200 241
192.168.88.1 - - [30/Jul/2017:12:55:43 +0800] “GET /MyDemoWeb/oracle.jsp HTTP/1.1” 200 242
192.168.88.1 - - [30/Jul/2017:12:55:43 +0800] “GET /MyDemoWeb/web.jsp HTTP/1.1” 200 239
192.168.88.1 - - [30/Jul/2017:12:55:43 +0800] “GET /MyDemoWeb/hadoop.jsp HTTP/1.1” 200 242

package day2

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object MyTomcatLogCount {
  def main(args: Array[String]): Unit = {
    val conf  = new SparkConf().setMaster("local").setAppName("MyTomcatLogCount")

    val sc = new SparkContext(conf)

    /**
     * 读入日志解析
     * 
     * 192.168.88.1 - - [30/Jul/2017:12:54:52 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
     * 
     */
    val rdd1 = sc.textFile("H:\\other\\localhost_access_log.txt")
      .map(
       line => {
         //解析字符串, 得到jsp的名字
         //1.解析两个引号之间的字符串
         val index1 = line.indexOf("\"")
         val index2 = line.lastIndexOf("\"")
         val line1 = line.substring(index1+1,index2)//GET /MyDemoWeb/oracle.jsp HTTP/1.1

         //得到两个空格的位置
         val index3 = line1.indexOf(" ")
         val index4 = line1.lastIndexOf(" ")
         val line2 = line1.substring(index3+1,index4)///MyDemoWeb/oracle.jsp

         //得到jsp的名字
         val jspName = line2.substring(line2.lastIndexOf("/"))//oracle.jsp

         (jspName,1)
       }
      )
    //统计出每个jsp的次数           
    val rdd2 = rdd1.reduceByKey(_+_)                           

    //使用value排序
    val rdd3 = rdd2.sortBy(_._2, false)

    rdd3.take(2).foreach(println)

    sc.stop()
  }
}

结果:
(/hadoop.jsp,9)
(/oracle.jsp,9)

2、创建自定义分区

根据jsp文件的名字,将各自的访问日志放入到不同的分区文件中

package day2

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.Partitioner
import scala.collection.mutable.HashMap

object MyTomcatLogPartitioner {
  def main(args: Array[String]): Unit = {

    System.setProperty("hadoop.home.dir", "E:\\hadoop-2.7.3")

    val conf = new SparkConf().setMaster("local").setAppName("MyTomcatLogPartitioner")
    val sc = new SparkContext(conf)

     /**
     * 读入日志解析
     * 
     * 192.168.88.1 - - [30/Jul/2017:12:54:52 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
     * 
     */
    val rdd1 = sc.textFile("H:\\other\\localhost_access_log.txt")
      .map(
       line => {
         //解析字符串, 得到jsp的名字
         //1.解析两个引号之间的字符串
         val index1 = line.indexOf("\"")
         val index2 = line.lastIndexOf("\"")
         val line1 = line.substring(index1+1,index2)//GET /MyDemoWeb/oracle.jsp HTTP/1.1

         //得到两个空格的位置
         val index3 = line1.indexOf(" ")
         val index4 = line1.lastIndexOf(" ")
         val line2 = line1.substring(index3+1,index4)///MyDemoWeb/oracle.jsp

         //得到jsp的名字
         val jspName = line2.substring(line2.lastIndexOf("/"))//oracle.jsp

         (jspName,line)
       }
      )                          
    //定义分区规则
    //得到不重复的jsp的名字
    val rdd2 = rdd1.map(_._1).distinct().collect()

    //创建分区规则
    val myPartitioner = new MyWebPartitioner(rdd2)
    val rdd3 = rdd1.partitionBy(myPartitioner)

    //将rdd3 输出
    rdd3.saveAsTextFile("H:\\other\\test_partition")  

    sc.stop()
  }
}

class MyWebPartitioner(jspList : Array[String]) extends Partitioner{

  //定义一个集合来保存分区条件, String 代表jsp的名字, Int 代表序号
  val partitionMap = new HashMap[String,Int]()

  var partID = 0 //初始分区号

  for (jsp <- jspList){
    partitionMap.put(jsp, partID)
    partID += 1
  }

  //定义有多少个分区
  def numPartitions : Int = partitionMap.size

  //根据jsp,返回对应的分区
  def getPartition(key : Any) : Int = partitionMap.getOrElse(key.toString(),0)
}

3、使用JDBCRDD 操作数据库

将RDD的数据保存到mysql数据库中

package day2

import java.sql.DriverManager
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.JdbcRDD

/**
 * 需求找出工资小于等于2000大于900的员工
 * select * from emp where sal > ? and sal <= ?
 */
object MyMysqlDemo {

  val connection = () => {
    Class.forName("com.mysql.cj.jdbc.Driver").newInstance()
    DriverManager.getConnection("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8","root","123456")
  }

  def main(args: Array[String]): Unit = {

    System.setProperty("hadoop.home.dir", "E:\\hadoop-2.7.3")
    val conf = new SparkConf().setMaster("local").setAppName("MyMysqlDemo")
    val sc = new SparkContext(conf)

    val mysqlRDD = new JdbcRDD(sc, connection, "select * from emp where sal > ? and sal <= ?", 900, 2000, 2, r => {
      val ename = r.getString(2)
      val sal = r.getInt(4)
      (ename, sal)
    })

    val result = mysqlRDD.collect()
    println(result.toBuffer)

    sc.stop()    
  }
}

mysql的company的emp数据
1 Tom 10 2400
2 Alis 11 1900
3 Kei 12 1500
4 Mi 11 900
结果
ArrayBuffer((Alis,1900), (Kei,1500))

JdbcRDD参数说明

参数名称 类型 说明
sc org.apache.spark.SparkContext Spark Context对象
getConnection scala.Function0[java.sql.Connection] 得到一个数据库Connection
sql scala.Predef.String 执行的SQL语句
lowerBound scala.Long 下边界值,即:SQL的第一个参数
upperBound scala.Long 上边界值,即:SQL的第二个参数
numPartitions scala.Int 分区的个数,即:启动多少个Executor
mapRow scala.Function1[java.sql.ResultSet, T] 得到的结果集

JdbcRDD的缺点:从上面的参数说明可以看出,JdbcRDD有以下两个缺点:
(1)执行的SQL必须有两个参数,并类型都是Long
(2)得到的结果是ResultSet,即:只支持select操作

4、操作数据库:把结果存放到数据库中

package day3

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import java.sql.Connection
import java.sql.DriverManager
import java.sql.PreparedStatement

/**
 * 把Spark结果存放到mysql数据库中
 */
object MyTomcatLogCountToMysql {


  def main(args: Array[String]): Unit = {
    //创建SparkContext
    val conf = new SparkConf().setMaster("local").setAppName("MyTomcatLogCountToMysql")
    val sc = new SparkContext(conf)

     /**
     * 读入日志解析
     * 
     * 192.168.88.1 - - [30/Jul/2017:12:54:52 +0800] "GET /MyDemoWeb/oracle.jsp HTTP/1.1" 200 242
     * 
     */
    val rdd1 = sc.textFile("H:\\other\\localhost_access_log.txt")
      .map(
       line => {
         //解析字符串, 得到jsp的名字
         //1.解析两个引号之间的字符串
         val index1 = line.indexOf("\"")
         val index2 = line.lastIndexOf("\"")
         val line1 = line.substring(index1+1,index2)//GET /MyDemoWeb/oracle.jsp HTTP/1.1

         //得到两个空格的位置
         val index3 = line1.indexOf(" ")
         val index4 = line1.lastIndexOf(" ")
         val line2 = line1.substring(index3+1,index4)///MyDemoWeb/oracle.jsp

         //得到jsp的名字
         val jspName = line2.substring(line2.lastIndexOf("/"))//oracle.jsp

         (jspName,1)
     }
    )  

    //存入数据库
//    var conn : Connection = null
//    var pst : PreparedStatement = null
//    
//    try{
//        /**
//         * create table mydata(jspname varchar(50), countNumber Int);
//         * 
//         * foreach 没有返回值 , 在本需求中,只需要写数据库,不需要返回新的RDD,所以用foreach即可
//         * 
//         * 运行Task not serializable
//         */
//        conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8","root","123456") 
//        pst = conn.prepareStatement("insert into mydata values (?,?)")
//    
//        rdd1.foreach(f => {
//          pst.setString(1, f._1)
//          pst.setInt(2, f._2)
//          
//          pst.executeUpdate()
//        })
//    }catch{
//      case t : Throwable => t.printStackTrace()
//    }finally{
//      if(pst != null) pst.close()
//      if(conn != null)  conn.close()
//    }
//    sc.stop()

    //第一种修改方式
    //存入数据库
//    var conn : Connection = null
//    var pst : PreparedStatement = null
//    
//    try{
//      rdd1.foreach(f => {
//        conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8","root","123456") 
//        pst = conn.prepareStatement("insert into mydata values (?,?)")
//  
//        pst.setString(1, f._1)
//        pst.setInt(2, f._2)
//        
//        pst.executeUpdate()
//      })
//    }catch{
//      case t : Throwable => t.printStackTrace()
//    }finally{
//      if(pst != null) pst.close()
//      if(conn != null)  conn.close()
//    }
//    sc.stop()  

    /*
     * 第一种修改方式功能上可以实现,但每条数据都会创建连接,对数据库造成很大压力
     * 
     * 针对分区来操作:一个分区建立一个连接即可
     */
     rdd1.foreachPartition(saveToMysql) 
     sc.stop()
  }

  def saveToMysql(it : Iterator[(String, Int)]) = {
      var conn : Connection = null
      var pst : PreparedStatement = null

      try{
        conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8","root","123456") 
        pst = conn.prepareStatement("insert into mydata values (?,?)")

        it.foreach(f => {
          pst.setString(1, f._1)
          pst.setInt(2, f._2)

          pst.executeUpdate()
        })
      }catch{
        case t : Throwable => t.printStackTrace()
      }finally{
        if(pst != null) pst.close()
        if(conn != null)  conn.close()
      }
  }
}

文章作者: 谢舟
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 谢舟 !
 上一篇
Spark SQL Spark SQL
Spark SQL 类似于Hive 一、Spark SQL 基础1、什么是Spark SQLSpark SQL is Apache Spark’s module for working with structured data.Spark
2019-03-31
下一篇 
Akka练习 Akka练习
Actor并发模型Java中的并发开发Java的并发编程是基于 共享数据 和 加锁 的一种机制。锁的是共享数据synchronized Scala中的并发开发不共享数据。依赖于 消息传递 的一种并发编程模式 如果 Actor A 和 Act
2019-03-27
  目录