Spark SQL


Spark SQL 类似于Hive

一、Spark SQL 基础

1、什么是Spark SQL

Spark SQL is Apache Spark’s module for working with structured data.
Spark SQL 是spark 的一个模块。来处理 结构化 的数据
不能处理非结构化的数据

特点:

1)容易集成

不需要单独安装

2)统一的数据访问方式

结构化数据的类型:JDBC JSon Hive parquer文件 都可以作为Spark SQL 的数据源
对接多种数据源,且使用方式类似

3)完全兼容hive

把Hive中的数据,读取到Spark SQL中运行

4)支持标准的数据连接

JDBC

2、为什么学习Spark SQL

执行效率比Hive高

hive 2.x 执行引擎可以使用 Spark

3、核心概念:表(DataFrame DataSet)

mysql中的表:表结构、数据
DataFrame:Schema、RDD(数据)

DataSet 在spark1.6以后,对DataFrame做了一个封装

4、创建DataFrame

)测试数据:员工表、部门表
第一种方式:使用case class
*
1)定义Schema**
样本类来定义Schema

case class 特点:
可以支持模式匹配,使用case class建立表结构

7521, WARD, SALESMAN,7698, 1981/2/22, 1250, 500, 30

case class Emp(empno:Int,ename:String,job:String,mgr:Int,hiredate:String,sal:Int,comm:Int,deptno:Int)

2)读取文件
val lines = sc.textFile(“/root/hd/tmp_files/emp.csv”).map(_.split(“,”))

3)把每行数据,映射到Emp上
val allEmp = lines.map(x => Emp(x(0).toInt,x(1),x(2),x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt))

4)生成DataFrame
val df1 = allEmp.toDF

df1.show

第二种方式 使用Spark Session
(1)什么是Spark Session
Spark session available as ‘spark’.
2.0以后引入的统一访问方式。可以访问所有的Spark组件

def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

(2)使用StructType来创建Schema

val struct =
StructType(
StructField(“a”, IntegerType, true) ::
StructField(“b”, LongType, false) ::
StructField(“c”, BooleanType, false) :: Nil)

case class Emp(
empno:Int,
ename:String,
job:String,
mgr:Int,
hiredate:String,
sal:Int,
comm:Int,
deptno:Int)

—————–分割———————-
import org.apache.spark.sql.types._

val myschema = StructType(
List(
StructField(“empno”,DataTypes.IntegerType),
StructField(“ename”,DataTypes.StringType),
StructField(“job”,DataTypes.StringType),
StructField(“mgr”,DataTypes.IntegerType),
StructField(“hiredate”,DataTypes.StringType),
StructField(“sal”,DataTypes.IntegerType),
StructField(“comm”,DataTypes.IntegerType),
StructField(“deptno”,DataTypes.IntegerType)
))

准备数据 RDD[Row]
import org.apache.spark.sql.Row

val allEmp = lines.map(x => Row(x(0).toInt,x(1),x(2),x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt))

val df2 = spark.createDataFrame(allEmp,myschema)

df2.show

第三种方式

直接读取一个带格式的文件
在/root/hd/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources有现成的json代码

val df3 = spark.read 读文件,默认是Parquet文件
val df3 = spark.read.json(“/uroot/hd/tmp_files/people.json”)

df3.show

val df4 = spark.read.format(“json”).load(“/root/hd/tmp_files/people.json”)

df4.show

5、操作DataFrame

1)DSL语句
mybatis Hibernate

df1.printSchema

df1.select(“ename”,”sal”).show

df1.select($”ename”,$”sal”,$”sal”+100).show
$”sal” 可以看做是一个变量

查询薪水大于2000的员工
df1.filter($”sal” > 2000).show

求每个部门的员工人数
df1.groupBy($”deptno”).count.show

相当于select deptno,count(1) from emp group by deptno

2)SQL语句

注意:不能直接执行SQL,需要生成一个视图,再执行sql

scala> df1.create
createGlobalTempView createOrReplaceTempView createTempView

一般用到 createOrReplaceTempView createTempView
视图:类似于表,但不保存数据

df1.createOrReplaceTempView(“emp”)

操作:
spark.sql(“select * from emp”).show

查询薪水大于2000的员工
spark.sql(“select * from emp where sal > 2000”).show

求每个部门的员工人数
spark.sql(“select deptno,count(1) from emp group by deptno”).show

3)多表查询

10,ACCOUNTING,NEW YORK

case class Dept(deptno:Int,dname:String,loc:String)
val lines = sc.textFile(“/root/hd/tmp_files/dept.csv”).map(_.split(“,”))
val allDept = lines.map(x=>Dept(x(0).toInt,x(1),x(2)))

df5.createOrReplaceTempView(“dept”)

spark.sql(“select dname,ename from emp,dept where emp.deptno=dept.deptno”).show

6、操作DataSet

Dataset是一个分布式的数据收集器。这是在Spark1.6之后新加的一个接口,兼顾了RDD的优点(强类型,可以使用功能强大的lambda)以及Spark SQL的执行器高效性的优点。所以可以把DataFrames看成是一种特殊的Datasets,即:Dataset(Row)

Dataset跟DataFrame类似,是一套新的接口,是高级的Dataframe

举例:

1)创建DataSet

(1)使用序列来创建DataSet
定义一个case class
case class MyData(a:Int,b:String)

生成序列,并创建DataSet
val ds = Seq(MyData(1,”Tom”),MyData(2,”Merry”)).toDS

.toDS 生成DataSet

ds.show

(2)使用JSON数据来创建DataSet

定义case class
case class Person(name:String,age:BigInt)

通过Json数据来生成DataFrame
val df = spark.read.format(“json”).load(“/root/hd/tmp_files/people.json”)

将DataFrame转换成DataSet
df.as[Person].show

df.as[Person] 就是一个DataSet

(3)使用其他数据
RDD操作和DataFrame操作相结合 —> DataSet

读取数据,创建DataSet
val linesDS = spark.read.text(“/root/hd/tmp_files/test_WordCount.txt”).as[String]

对DataSet进行操作:
val words = linesDS.flatMap(.split(” “)).filter(.length > 3)

words.show
words.collect

执行一个WordCount程序
val result = linesDS.flatMap(.split(” “)).map((,1)).groupByKey( x => x._1).count
result.show

排序:

result.orderBy($"value").show
result.orderBy($"count(1)").show

2)DataSet操作案例

使用emp.json 生成一个DataFrame
val empDF = spark.read.json(“/root/hd/tmp_files/emp.json”)

查询工资大于3000的员工
empDF.where($”sal” >= 3000).show

创建case class

case class Emp(empno:BigInt,ename:String,job:String,mgr:String,hiredate:String,sal:BigInt,comm:String,deptno:BigInt)

生成DataSet
val empDS = empDF.as[Emp]

查询工资大于3000的员工
empDS.filter(_.sal > 3000).show

查询10号部门的员工
empDS.filter(_.deptno == 10).show

3)多表查询

(1)创建部门表
val deptRDD = sc.textFile(“/root/hd/tmp_files/dept.csv”).map(_.split(“,”))
case class Dept(deptno:Int,dname:String,loc:String)

val deptDS = deptRDD.map( x=> Dept(x(0).toInt,x(1),x(2))).toDS

(2)创建员工表
case class Emp(empno:Int,ename:String,job:String,mgr:Int,hiredate:String,sal:Int,comm:Int,deptno:Int)
val empRDD = sc.textFile(“/root/hd/tmp_files/emp.csv”).map(_.split(“,”))

7369,SMITH,CLERK,7902,1980/12/17,800,0,20
val empDS = empRDD.map(x=> Emp(x(0).toInt,x(1),x(2),x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt)).toDS

(3)执行多表查询:等值连接
val result = deptDS.join(empDS,”deptno”)
result.show
result.printSchema

val result1 = deptDS.joinWith(empDS, deptDS(“deptno”) === empDS(“deptno”) )
result1.show
result1.printSchema

join 和 joinWith 区别:连接后schema不同

join :将两张表展开成一张更大的表
joinWith :把两张表的数据分别做成一列,然后直接拼在一起

4)多表连接后再筛选

deptDS.join(empDS,”deptno”).where(“deptno == 10”).show

result.explain:执行计划

7、Spark SQL 中的视图

视图是一个虚表,不存储数据
两种类型:

1)普通视图(本地视图)

只在当前Session中有效createOrReplaceTempView createTempView

2)全局视图

createGlobalTempView
在不同的Session中都有用,把全局视图创建在命名空间中:global_temp中。类似于一个库

scala> df1.create
createGlobalTempView createOrReplaceTempView createTempView

举例:
创建一个新session,读取不到emp视图,报错
df1.createOrReplaceTempView(“emp”)
spark.sql(“select * from emp”).show
spark.newSession.sql(“select * from emp”)

以下两种方式均可读到全局视图中的数据
df1.createGlobalTempView(“emp1”)

spark.newSession.sql(“select * from global_temp.emp1”).show

spark.sql(“select * from global_temp.emp1”).show

二、使用数据源

在Spark SQL中,可以使用各种各样的数据源来操作。 结构化

1、使用load函数、save函数

load函数是加载数据,save是存储数据

注意:使用load 或 save时,默认是Parquet文件。列式存储文件

举例:
读取 users.parquet 文件
val userDF = spark.read.load(“/root/hd/tmp_files/users.parquet”)

userDF.printSchema
userDF.show

val userDF = spark.read.load(“/root/hd/tmp_files/emp.json”)

保存parquet文件

userDF.select($"name",$"favorite_color").write.save("/root/hd/tmp_files/parquet")

读取刚刚写入的文件:
val userDF1 = spark.read.load(“/root/hd/tmp_files/parquet/part-00000-f9a3d6bb-d481-4fc9-abf6-5f20139f97c5.snappy.parquet”)—> 不推荐

生产中直接读取存放的目录即可:
val userDF2 = spark.read.load(“/root/hd/tmp_files/parquet”)

读json文件 必须format
val userDF = spark.read.format(“json”).load(“/root/hd/tmp_files/emp.json”)
val userDF3 = spark.read.json(“/root/hd/tmp_files/emp.json”)

关于save函数

调用save函数的时候,可以指定存储模式,追加、覆盖等等
userDF.write.save(“/root/hd/tmp_files/parquet”)

userDF.write.save(“/root/hd/tmp_files/parquet”)
org.apache.spark.sql.AnalysisException: path file:/root/hd/tmp_files/parquet already exists.;

save的时候覆盖
userDF.write.mode(“overwrite”).save(“/root/hd/tmp_files/parquet”)

将结果保存成表
userDF.select($”name”).write.saveAsTable(“table1”)

scala> userDF.select($”name”).write.saveAsTable(“table1”)

scala> spark.sql(“select * from table1”).show
+——+
| name|
+——+
|Alyssa|
| Ben|
+——+

2、Parquet文件

列式存储文件,是Spark SQL 默认的数据源
就是一个普通的文件

举例:
1)把其他文件,转换成Parquet文件
调用save函数
把数据读进来,再写出去,就是Parquet文件

val empDF = spark.read.json(“/root/hd/tmp_files/emp.json”)
empDF.write.mode(“overwrite”).save(“/root/hd/tmp_files/parquet”)
empDF.write.mode(“overwrite”).parquet(“/root/hd/tmp_files/parquet”)

val emp1 = spark.read.parquet(“/root/hd/tmp_files/parquet”)
emp1.createOrReplaceTempView(“emp1”)
spark.sql(“select * from emp1”)

2)支持Schema的合并
项目开始 表结构简单 schema简单
项目越来越大 schema越来越复杂

举例:
通过RDD来创建DataFrame
val df1 = sc.makeRDD(1 to 5).map( i => (i,i*2)).toDF(“single”,”double”)
“single”,”double” 是表结构
df1.show

df1.write.mode(“overwrite”).save(“/root/hd/tmp_files/test_table/key=1”)

val df2 = sc.makeRDD(6 to 10).map( i => (i,i*3)).toDF(“single”,”triple”)
df2.show
df2.write.mode(“overwrite”).save(“/root/hd/tmp_files/test_table/key=2”)

合并两个部分
val df3 = spark.read.parquet(“/root/hd/tmp_files/test_table”)

val df3 = spark.read.option(“mergeSchema”,true).parquet(“/root/hd/tmp_files/test_table”)

key是可以随意取名字的,两个key需要一致,不然合并会报错

通过RDD来创建DataFrame
val df1 = sc.makeRDD(1 to 5).map( i => (i,i*2)).toDF(“single”,”double”)
“single”,”double” 是表结构
df1.show

df1.write.mode(“overwrite”).save(“/root/hd/tmp_files/test_table/kt=1”)

val df2 = sc.makeRDD(6 to 10).map( i => (i,i*3)).toDF(“single”,”triple”)
df2.show
df2.write.mode(“overwrite”).save(“/root/hd/tmp_files/test_table/kt=2”)

合并两个部分
val df3 = spark.read.parquet(“/root/hd/tmp_files/test_table”)

val df3 = spark.read.option(“mergeSchema”,true).parquet(“/root/hd/tmp_files/test_table”)

3、json文件

读取Json文件,生成DataFrame
val peopleDF = spark.read.json(“/root/hd/tmp_files/people.json”)

peopleDF.printSchema

peopleDF.createOrReplaceTempView(“peopleView”)

spark.sql(“select * from peopleView”).show

Spark SQL 支持统一的访问接口。对于不同的数据源,读取进来,生成DataFrame后,操作完全一样

4、JDBC

使用JDBC操作关系型数据库,加载到Spark中进行分析和处理

方式一:

./spark-shell --master spark://hsiehchou121:7077 --jars /root/hd/tmp_files/mysql-connector-java-8.0.12.jar --driver-class-path /root/hd/tmp_files/mysql-connector-java-8.0.12.jar 
val mysqlDF = spark.read.format("jdbc")
.option("url","jdbc:mysql://192.168.116.1/company?serverTimezone=UTC&characterEncoding=utf-8")
.option("driver","com.mysql.cj.jdbc.Driver")
.option("user","root")
.option("password","123456")
.option("dbtable","emp").load
val mysqlDF = spark.read.format("jdbc").option("url","jdbc:mysql://192.168.116.1/company?serverTimezone=UTC&characterEncoding=utf-8").option("driver","com.mysql.cj.jdbc.Driver").option("user","root").option("password","123456").option("dbtable","emp").load
mysqlDF.show

问题解决

如果遇到下面问题,就是你本机的mysql数据库没有权限给你虚拟机访问
java.sql.SQLException: null, message from server: “Host ‘hsiehchou121’ is not allowed to connect to this MySQL server”

解决方案

1)进入你本机的数据库
mysql -u root -p
2)use mysql;
3)修改root用户前面的Host,改为%,意思是全部IP都能访问
4)flush privileges;

方式二
定义一个Properties类
import java.util.Properties
val mysqlProps = new Properties()
mysqlProps.setProperty(“driver”,”com.mysql.cj.jdbc.Driver”)
mysqlProps.setProperty(“user”,”root”)
mysqlProps.setProperty(“password”,”123456”)

val mysqlDF1 = spark.read.jdbc(“jdbc:mysql://192.168.116.1:3306/company?serverTimezone=UTC&characterEncoding=utf-8”,”emp”,mysqlProps)

mysqlDF1.show

5、使用Hive

比较常见
)spark SQL 完全兼容hive
)需要进行配置
拷贝一下文件到spark/conf目录下:
Hive 配置文件: hive-site.xml
Hadoop 配置文件:core-site.xml hdfs-site.xml

配置好后,重启spark

在hive的lib下和spark的jars下面增加mysql-connector-java-8.0.12.jar这边连接数据库的jar包

启动Hadoop :start-all.sh
启动 hive:

hsiehchou121
cd hive/bin/
./hive --service metastore

hsiehchou122
cd hive/bin
./hive

hsiehchou121启动问题

java.sql.SQLSyntaxErrorException: Table ‘hive.version’ doesn’t exist
解决:去mysql数据库中的hive库下面创建version表
这里需要给本地的hive库创建下hive所必须用的表

我们去/root/hd/hive/scripts/metastore/upgrade/mysql这里面找到hive-schema-1.2.0.mysql.sql,将里面的sql语句在hive库中执行

hive-txn-schema-0.14.0.mysql.sql,这个也做好执行下,用于事务管理

显示当前所在库名字

set hive.cli.print.current.db=true;

j将emp.csv上传到hdfs中的/tmp_files/下面
hdfs dfs -put emp.csv /tmp_files

在hive中创建emp_default表

hive (default)> create table emp(empno int,ename string,job string,mgr int,hiredate string,sal int,comm int,deptno int)
              > row format
              > delimited fields
              > terminated by ",";
hive (default)> load data inpath '/tmp_files/emp.csv' into table emp;
Time taken: 1.894 seconds
hive (default)> show tables;
hive (default)> select * from emp;

hdfs dfs -put /root/hd/tmp_files/emp.csv /tmp_files

[root@hsiehchou121 bin]# ./spark-shell --master spark://hsiehchou121:7077

启动Spatk时,如果出现如下错误
java.sql.SQLSyntaxErrorException: Table ‘hive.partitions’ doesn’t exist
在MySQL数据库里面创建partitions表

scala> spark.sql(“select * from emp_default”).show
scala> spark.sql(“select * from default.emp_default”).show

spark.sql(“create table company.emp_4(empno Int,ename String,job String,mgr String,hiredate String,sal Int,comm String,deptno Int)row format delimited fields terminated by ‘,’”)
spark.sql(“load data local inpath ‘/root/hd/tmp_files/emp.csv’ overwrite into table company.emp_4”)

三、在IDE中开发Spark SQL

1、创建DataFrame StructType方式

package day4

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
import org.apache.log4j.Logger
import org.apache.log4j.Level

/**
 * 创建DataFrame StructType方式
 */
object Demo1 {
  def main(args: Array[String]): Unit = {

    //减少Info日志的打印
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    //创建Spark Session对象
    val spark = SparkSession.builder().master("local").appName("Demo1").getOrCreate()

    //从指定的地址创建RDD对象
    val personRDD = spark.sparkContext.textFile("H:\\other\\students.txt").map(_.split("\t"))

    //通过StructType方式指定Schema
    val schema = StructType(
      List(
        StructField("id", IntegerType),
        StructField("name", StringType),
        StructField("age", IntegerType)))

    //将RDD映射到rowRDD上,映射到Schema上
    val rowRDD = personRDD.map(p => Row(p(0).toInt,p(1),p(2).toInt))
    val personDataFrame = spark.createDataFrame(rowRDD, schema)

    //注册视图
    personDataFrame.createOrReplaceTempView("t_person")

    //执行SQL语句  desc降序   asc 升序
    val df = spark.sql("select * from t_person order by age desc")

    df.show

    spark.stop()
  }
}

2、使用case class来创建DataFrame

package day4

import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.sql.SparkSession

/**
 * 使用case class来创建DataFrame
 */
object Demo2 {
  def main(args: Array[String]): Unit = {

    //减少Info日志的打印
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    //创建Spark Session对象
    val spark = SparkSession.builder().master("local").appName("Demo1").getOrCreate()

    //从指定的地址创建RDD对象
    val lineRDD = spark.sparkContext.textFile("H:\\other\\students.txt").map(_.split("\t"))

    //把数据与case class做匹配
    val studentRDD = lineRDD.map(x => Student(x(0).toInt,x(1),x(2).toInt))

    //生成DataFrame
    import spark.sqlContext.implicits._
    val studentDF = studentRDD.toDF()

    //注册视图,执行SQL
    studentDF.createOrReplaceTempView("student")

    spark.sql("select * from student").show

    spark.stop()
  }
}

//定义case class
case class Student(stuId:Int, stuName:String, stuAge:Int)

3、写入MySQL

package day4

import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import java.util.Properties

/**
 * 写入mysql
 */
object Demo3 {
  def main(args: Array[String]): Unit = {

    //减少Info日志的打印
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    //创建Spark Session对象
    val spark = SparkSession.builder().master("local").appName("Demo1").getOrCreate()

    //从指定的地址创建RDD对象
    val lineRDD = spark.sparkContext.textFile("H:\\other\\students.txt").map(_.split("\t"))

    //通过StructType方式指定Schema
    val schema = StructType(
      List(
        StructField("personID", IntegerType),
        StructField("personName", StringType),
        StructField("personAge", IntegerType)))

    //将RDD映射到rowRDD上,映射到Schema上
    val rowRDD = lineRDD.map(p => Row(p(0).toInt,p(1),p(2).toInt))
    val personDataFrame = spark.createDataFrame(rowRDD, schema)

    personDataFrame.createOrReplaceTempView("myperson")

    val result = spark.sql("select * from myperson")

    result.show

    //把结果存入mysql中
    val props = new Properties()
    props.setProperty("user", "root")
    props.setProperty("password", "123456")
    props.setProperty("driver", "com.mysql.cj.jdbc.Driver")

    result.write.mode("append").jdbc("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "student", props)

    spark.stop()
  }
}

4、使用Spark SQL 读取Hive中的数据,将计算结果存入MySQL

package day4

import org.apache.spark.sql.SparkSession
import java.util.Properties

/**
 * 使用Spark SQL 读取Hive中的数据,将计算结果存入mysql
 */
object Demo4 {
  def main(args: Array[String]): Unit = {

    //创建SparkSession
    val spark = SparkSession.builder().appName("Demo4").enableHiveSupport().getOrCreate()

    //执行SQL
    val result = spark.sql("select deptno,count(1) from company.emp group by deptno")

    //将结果保存到mysql中
     val props = new Properties()
    props.setProperty("user", "root")
    props.setProperty("password", "123456")
    props.setProperty("driver", "com.mysql.cj.jdbc.Driver")

    result.write.jdbc("jdbc:mysql://192.168.116.1:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "emp_stat", props)

    spark.stop()
  } 
}

提交任务

[root@hsiehchou121 bin]# ./spark-submit --master spark://hsiehchou121:7077 --jars /root/hd/tmp_files/mysql-connector-java-8.0.12.jar --driver-class-path /root/hd/tmp_files/mysql-connector-java-8.0.12.jar --class day4.Demo4 /root/hd/tmp_files/Demo4.jar

四、性能优化

与RDD类似

1、把内存中缓存表的数据

直接读取内存的值,来提高性能

RDD中如何缓存:
rdd.cache 或者 rdd.persist

在Spark SQL中,使用SparkSession.sqlContext.cacheTable

spark中所有context对象
1)sparkContext : SparkCore
2)sql Context : SparkSQL
3)Streaming Context :SparkStreaming

统一起来:SparkSession

操作mysql,启动spark shell 时,需要:
./spark-shell --master spark://hsiehchou121:7077 --jars /root/hd/tmp_files/mysql-connector-java-8.0.12.jar --driver-class-path /root/hd/tmp_files/mysql-connector-java-8.0.12.jar

val mysqlDF = spark.read.format(“jdbc”).option(“driver”,”com.mysql.cj.jdbc.Driver”).option(“url”,”jdbc:mysql://192.168.116.1:3306/company?serverTimezone=UTC&characterEncoding=utf-8”).option(“user”,”root”).option(“password”,”123456”).option(“dbtable”,”emp”).load

mysqlDF.show
mysqlDF.createOrReplaceTempView(“emp”)

spark.sqlContext.cacheTable(“emp”) —-> 标识这张表可以被缓存,数据还没有真正被缓存
spark.sql(“select * from emp”).show —-> 依然读取mysql
spark.sql(“select * from emp”).show —-> 从缓存中读取数据

spark.sqlContext.clearCache

清空缓存后,执行查询,会触发查询mysql数据库

2、了解性能优化的相关参数

将数据缓存到内存中的相关优化参数
spark.sql.inMemoryColumnarStorage.compressed
默认为 true
Spark SQL 将会基于统计信息自动地为每一列选择一种压缩编码方式

spark.sql.inMemoryColumnarStorage.batchSize
默认值:10000
缓存批处理大小。缓存数据时, 较大的批处理大小可以提高内存利用率和压缩率,但同时也会带来 OOM(Out Of Memory)的风险

其他性能相关的配置选项(不过不推荐手动修改,可能在后续版本自动的自适应修改)
spark.sql.files.maxPartitionBytes
默认值:128 MB
读取文件时单个分区可容纳的最大字节数

spark.sql.files.openCostInBytes
默认值:4M
打开文件的估算成本, 按照同一时间能够扫描的字节数来测量。当往一个分区写入多个文件的时候会使用。高估更好, 这样的话小文件分区将比大文件分区更快 (先被调度)

spark.sql.autoBroadcastJoinThreshold
默认值:10M
用于配置一个表在执行 join 操作时能够广播给所有 worker 节点的最大字节大小。通过将这个值设置为 -1 可以禁用广播。注意,当前数据统计仅支持已经运行了 ANALYZE TABLE COMPUTE STATISTICS noscan 命令的 Hive Metastore 表

spark.sql.shuffle.partitions
默认值:200
用于配置 join 或聚合操作混洗(shuffle)数据时使用的分区数


文章作者: 谢舟
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 谢舟 !
 上一篇
Spark Streaming Spark Streaming
Spark Streaming流式计算框架,类似于Storm 常用的实时计算引擎(流式计算)1、Apache Storm:真正的流式计算 2、Spark Streaming :严格上来说,不是真正的流式计算(实时计算)把连续的流式数据,当成
2019-04-03
下一篇 
Spark Core Spark Core
Spark生态圈:Spark Core : RDD(弹性分布式数据集)Spark SQLSpark StreamingSpark MLLib :协同过滤,ALS,逻辑回归等等 –> 机器学习Spark Graphx : 图计算 一、S
2019-03-29
  目录