Spark SQL 类似于Hive
一、Spark SQL 基础
1、什么是Spark SQL
Spark SQL is Apache Spark’s module for working with structured data.
Spark SQL 是spark 的一个模块。来处理 结构化 的数据
不能处理非结构化的数据
特点:
1)容易集成
不需要单独安装
2)统一的数据访问方式
结构化数据的类型:JDBC JSon Hive parquer文件 都可以作为Spark SQL 的数据源
对接多种数据源,且使用方式类似
3)完全兼容hive
把Hive中的数据,读取到Spark SQL中运行
4)支持标准的数据连接
JDBC
2、为什么学习Spark SQL
执行效率比Hive高
hive 2.x 执行引擎可以使用 Spark
3、核心概念:表(DataFrame DataSet)
mysql中的表:表结构、数据
DataFrame:Schema、RDD(数据)
DataSet 在spark1.6以后,对DataFrame做了一个封装
4、创建DataFrame
()测试数据:员工表、部门表
第一种方式:使用case class
*1)定义Schema**
样本类来定义Schema
case class 特点:
可以支持模式匹配,使用case class建立表结构
7521, WARD, SALESMAN,7698, 1981/2/22, 1250, 500, 30
case class Emp(empno:Int,ename:String,job:String,mgr:Int,hiredate:String,sal:Int,comm:Int,deptno:Int)
2)读取文件
val lines = sc.textFile(“/root/hd/tmp_files/emp.csv”).map(_.split(“,”))
3)把每行数据,映射到Emp上
val allEmp = lines.map(x => Emp(x(0).toInt,x(1),x(2),x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt))
4)生成DataFrame
val df1 = allEmp.toDF
df1.show
第二种方式 使用Spark Session
(1)什么是Spark Session
Spark session available as ‘spark’.
2.0以后引入的统一访问方式。可以访问所有的Spark组件
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame
(2)使用StructType来创建Schema
val struct =
StructType(
StructField(“a”, IntegerType, true) ::
StructField(“b”, LongType, false) ::
StructField(“c”, BooleanType, false) :: Nil)
case class Emp(
empno:Int,
ename:String,
job:String,
mgr:Int,
hiredate:String,
sal:Int,
comm:Int,
deptno:Int)
—————–分割———————-
import org.apache.spark.sql.types._
val myschema = StructType(
List(
StructField(“empno”,DataTypes.IntegerType),
StructField(“ename”,DataTypes.StringType),
StructField(“job”,DataTypes.StringType),
StructField(“mgr”,DataTypes.IntegerType),
StructField(“hiredate”,DataTypes.StringType),
StructField(“sal”,DataTypes.IntegerType),
StructField(“comm”,DataTypes.IntegerType),
StructField(“deptno”,DataTypes.IntegerType)
))
准备数据 RDD[Row]
import org.apache.spark.sql.Row
val allEmp = lines.map(x => Row(x(0).toInt,x(1),x(2),x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt))
val df2 = spark.createDataFrame(allEmp,myschema)
df2.show
第三种方式
直接读取一个带格式的文件
在/root/hd/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources有现成的json代码
val df3 = spark.read 读文件,默认是Parquet文件
val df3 = spark.read.json(“/uroot/hd/tmp_files/people.json”)
df3.show
val df4 = spark.read.format(“json”).load(“/root/hd/tmp_files/people.json”)
df4.show
5、操作DataFrame
1)DSL语句
mybatis Hibernate
df1.printSchema
df1.select(“ename”,”sal”).show
df1.select($”ename”,$”sal”,$”sal”+100).show
$”sal” 可以看做是一个变量
查询薪水大于2000的员工
df1.filter($”sal” > 2000).show
求每个部门的员工人数
df1.groupBy($”deptno”).count.show
相当于select deptno,count(1) from emp group by deptno
2)SQL语句
注意:不能直接执行SQL,需要生成一个视图,再执行sql
scala> df1.create
createGlobalTempView createOrReplaceTempView createTempView
一般用到 createOrReplaceTempView createTempView
视图:类似于表,但不保存数据
df1.createOrReplaceTempView(“emp”)
操作:
spark.sql(“select * from emp”).show
查询薪水大于2000的员工
spark.sql(“select * from emp where sal > 2000”).show
求每个部门的员工人数
spark.sql(“select deptno,count(1) from emp group by deptno”).show
3)多表查询
10,ACCOUNTING,NEW YORK
case class Dept(deptno:Int,dname:String,loc:String)
val lines = sc.textFile(“/root/hd/tmp_files/dept.csv”).map(_.split(“,”))
val allDept = lines.map(x=>Dept(x(0).toInt,x(1),x(2)))
df5.createOrReplaceTempView(“dept”)
spark.sql(“select dname,ename from emp,dept where emp.deptno=dept.deptno”).show
6、操作DataSet
Dataset是一个分布式的数据收集器。这是在Spark1.6之后新加的一个接口,兼顾了RDD的优点(强类型,可以使用功能强大的lambda)以及Spark SQL的执行器高效性的优点。所以可以把DataFrames看成是一种特殊的Datasets,即:Dataset(Row)
Dataset跟DataFrame类似,是一套新的接口,是高级的Dataframe
举例:
1)创建DataSet
(1)使用序列来创建DataSet
定义一个case class
case class MyData(a:Int,b:String)
生成序列,并创建DataSet
val ds = Seq(MyData(1,”Tom”),MyData(2,”Merry”)).toDS
.toDS 生成DataSet
ds.show
(2)使用JSON数据来创建DataSet
定义case class
case class Person(name:String,age:BigInt)
通过Json数据来生成DataFrame
val df = spark.read.format(“json”).load(“/root/hd/tmp_files/people.json”)
将DataFrame转换成DataSet
df.as[Person].show
df.as[Person] 就是一个DataSet
(3)使用其他数据
RDD操作和DataFrame操作相结合 —> DataSet
读取数据,创建DataSet
val linesDS = spark.read.text(“/root/hd/tmp_files/test_WordCount.txt”).as[String]
对DataSet进行操作:
val words = linesDS.flatMap(.split(” “)).filter(.length > 3)
words.show
words.collect
执行一个WordCount程序
val result = linesDS.flatMap(.split(” “)).map((,1)).groupByKey( x => x._1).count
result.show
排序:
result.orderBy($"value").show
result.orderBy($"count(1)").show
2)DataSet操作案例
使用emp.json 生成一个DataFrame
val empDF = spark.read.json(“/root/hd/tmp_files/emp.json”)
查询工资大于3000的员工
empDF.where($”sal” >= 3000).show
创建case class
case class Emp(empno:BigInt,ename:String,job:String,mgr:String,hiredate:String,sal:BigInt,comm:String,deptno:BigInt)
生成DataSet
val empDS = empDF.as[Emp]
查询工资大于3000的员工
empDS.filter(_.sal > 3000).show
查询10号部门的员工
empDS.filter(_.deptno == 10).show
3)多表查询
(1)创建部门表
val deptRDD = sc.textFile(“/root/hd/tmp_files/dept.csv”).map(_.split(“,”))
case class Dept(deptno:Int,dname:String,loc:String)
val deptDS = deptRDD.map( x=> Dept(x(0).toInt,x(1),x(2))).toDS
(2)创建员工表
case class Emp(empno:Int,ename:String,job:String,mgr:Int,hiredate:String,sal:Int,comm:Int,deptno:Int)
val empRDD = sc.textFile(“/root/hd/tmp_files/emp.csv”).map(_.split(“,”))
7369,SMITH,CLERK,7902,1980/12/17,800,0,20
val empDS = empRDD.map(x=> Emp(x(0).toInt,x(1),x(2),x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt)).toDS
(3)执行多表查询:等值连接
val result = deptDS.join(empDS,”deptno”)
result.show
result.printSchema
val result1 = deptDS.joinWith(empDS, deptDS(“deptno”) === empDS(“deptno”) )
result1.show
result1.printSchema
join 和 joinWith 区别:连接后schema不同
join :将两张表展开成一张更大的表
joinWith :把两张表的数据分别做成一列,然后直接拼在一起
4)多表连接后再筛选
deptDS.join(empDS,”deptno”).where(“deptno == 10”).show
result.explain:执行计划
7、Spark SQL 中的视图
视图是一个虚表,不存储数据
两种类型:
1)普通视图(本地视图)
只在当前Session中有效createOrReplaceTempView createTempView
2)全局视图
createGlobalTempView
在不同的Session中都有用,把全局视图创建在命名空间中:global_temp中。类似于一个库
scala> df1.create
createGlobalTempView createOrReplaceTempView createTempView
举例:
创建一个新session,读取不到emp视图,报错
df1.createOrReplaceTempView(“emp”)
spark.sql(“select * from emp”).show
spark.newSession.sql(“select * from emp”)
以下两种方式均可读到全局视图中的数据
df1.createGlobalTempView(“emp1”)
spark.newSession.sql(“select * from global_temp.emp1”).show
spark.sql(“select * from global_temp.emp1”).show
二、使用数据源
在Spark SQL中,可以使用各种各样的数据源来操作。 结构化
1、使用load函数、save函数
load函数是加载数据,save是存储数据
注意:使用load 或 save时,默认是Parquet文件。列式存储文件
举例:
读取 users.parquet 文件
val userDF = spark.read.load(“/root/hd/tmp_files/users.parquet”)
userDF.printSchema
userDF.show
val userDF = spark.read.load(“/root/hd/tmp_files/emp.json”)
保存parquet文件
userDF.select($"name",$"favorite_color").write.save("/root/hd/tmp_files/parquet")
读取刚刚写入的文件:
val userDF1 = spark.read.load(“/root/hd/tmp_files/parquet/part-00000-f9a3d6bb-d481-4fc9-abf6-5f20139f97c5.snappy.parquet”)—> 不推荐
生产中直接读取存放的目录即可:
val userDF2 = spark.read.load(“/root/hd/tmp_files/parquet”)
读json文件 必须format
val userDF = spark.read.format(“json”).load(“/root/hd/tmp_files/emp.json”)
val userDF3 = spark.read.json(“/root/hd/tmp_files/emp.json”)
关于save函数:
调用save函数的时候,可以指定存储模式,追加、覆盖等等
userDF.write.save(“/root/hd/tmp_files/parquet”)
userDF.write.save(“/root/hd/tmp_files/parquet”)
org.apache.spark.sql.AnalysisException: path file:/root/hd/tmp_files/parquet already exists.;
save的时候覆盖
userDF.write.mode(“overwrite”).save(“/root/hd/tmp_files/parquet”)
将结果保存成表
userDF.select($”name”).write.saveAsTable(“table1”)
scala> userDF.select($”name”).write.saveAsTable(“table1”)
scala> spark.sql(“select * from table1”).show
+——+
| name|
+——+
|Alyssa|
| Ben|
+——+
2、Parquet文件
列式存储文件,是Spark SQL 默认的数据源
就是一个普通的文件
举例:
1)把其他文件,转换成Parquet文件
调用save函数
把数据读进来,再写出去,就是Parquet文件
val empDF = spark.read.json(“/root/hd/tmp_files/emp.json”)
empDF.write.mode(“overwrite”).save(“/root/hd/tmp_files/parquet”)
empDF.write.mode(“overwrite”).parquet(“/root/hd/tmp_files/parquet”)
val emp1 = spark.read.parquet(“/root/hd/tmp_files/parquet”)
emp1.createOrReplaceTempView(“emp1”)
spark.sql(“select * from emp1”)
2)支持Schema的合并
项目开始 表结构简单 schema简单
项目越来越大 schema越来越复杂
举例:
通过RDD来创建DataFrame
val df1 = sc.makeRDD(1 to 5).map( i => (i,i*2)).toDF(“single”,”double”)
“single”,”double” 是表结构
df1.show
df1.write.mode(“overwrite”).save(“/root/hd/tmp_files/test_table/key=1”)
val df2 = sc.makeRDD(6 to 10).map( i => (i,i*3)).toDF(“single”,”triple”)
df2.show
df2.write.mode(“overwrite”).save(“/root/hd/tmp_files/test_table/key=2”)
合并两个部分
val df3 = spark.read.parquet(“/root/hd/tmp_files/test_table”)
val df3 = spark.read.option(“mergeSchema”,true).parquet(“/root/hd/tmp_files/test_table”)
key是可以随意取名字的,两个key需要一致,不然合并会报错
通过RDD来创建DataFrame
val df1 = sc.makeRDD(1 to 5).map( i => (i,i*2)).toDF(“single”,”double”)
“single”,”double” 是表结构
df1.show
df1.write.mode(“overwrite”).save(“/root/hd/tmp_files/test_table/kt=1”)
val df2 = sc.makeRDD(6 to 10).map( i => (i,i*3)).toDF(“single”,”triple”)
df2.show
df2.write.mode(“overwrite”).save(“/root/hd/tmp_files/test_table/kt=2”)
合并两个部分
val df3 = spark.read.parquet(“/root/hd/tmp_files/test_table”)
val df3 = spark.read.option(“mergeSchema”,true).parquet(“/root/hd/tmp_files/test_table”)
3、json文件
读取Json文件,生成DataFrame
val peopleDF = spark.read.json(“/root/hd/tmp_files/people.json”)
peopleDF.printSchema
peopleDF.createOrReplaceTempView(“peopleView”)
spark.sql(“select * from peopleView”).show
Spark SQL 支持统一的访问接口。对于不同的数据源,读取进来,生成DataFrame后,操作完全一样
4、JDBC
使用JDBC操作关系型数据库,加载到Spark中进行分析和处理
方式一:
./spark-shell --master spark://hsiehchou121:7077 --jars /root/hd/tmp_files/mysql-connector-java-8.0.12.jar --driver-class-path /root/hd/tmp_files/mysql-connector-java-8.0.12.jar
val mysqlDF = spark.read.format("jdbc")
.option("url","jdbc:mysql://192.168.116.1/company?serverTimezone=UTC&characterEncoding=utf-8")
.option("driver","com.mysql.cj.jdbc.Driver")
.option("user","root")
.option("password","123456")
.option("dbtable","emp").load
val mysqlDF = spark.read.format("jdbc").option("url","jdbc:mysql://192.168.116.1/company?serverTimezone=UTC&characterEncoding=utf-8").option("driver","com.mysql.cj.jdbc.Driver").option("user","root").option("password","123456").option("dbtable","emp").load
mysqlDF.show
问题解决
如果遇到下面问题,就是你本机的mysql数据库没有权限给你虚拟机访问
java.sql.SQLException: null, message from server: “Host ‘hsiehchou121’ is not allowed to connect to this MySQL server”
解决方案
1)进入你本机的数据库
mysql -u root -p
2)use mysql;
3)修改root用户前面的Host,改为%,意思是全部IP都能访问
4)flush privileges;
方式二
定义一个Properties类
import java.util.Properties
val mysqlProps = new Properties()
mysqlProps.setProperty(“driver”,”com.mysql.cj.jdbc.Driver”)
mysqlProps.setProperty(“user”,”root”)
mysqlProps.setProperty(“password”,”123456”)
val mysqlDF1 = spark.read.jdbc(“jdbc:mysql://192.168.116.1:3306/company?serverTimezone=UTC&characterEncoding=utf-8”,”emp”,mysqlProps)
mysqlDF1.show
5、使用Hive
比较常见
()spark SQL 完全兼容hive
()需要进行配置
拷贝一下文件到spark/conf目录下:
Hive 配置文件: hive-site.xml
Hadoop 配置文件:core-site.xml hdfs-site.xml
配置好后,重启spark
在hive的lib下和spark的jars下面增加mysql-connector-java-8.0.12.jar这边连接数据库的jar包
启动Hadoop :start-all.sh
启动 hive:
hsiehchou121
cd hive/bin/
./hive --service metastore
hsiehchou122
cd hive/bin
./hive
hsiehchou121启动问题
java.sql.SQLSyntaxErrorException: Table ‘hive.version’ doesn’t exist
解决:去mysql数据库中的hive库下面创建version表
这里需要给本地的hive库创建下hive所必须用的表
我们去/root/hd/hive/scripts/metastore/upgrade/mysql这里面找到hive-schema-1.2.0.mysql.sql,将里面的sql语句在hive库中执行
hive-txn-schema-0.14.0.mysql.sql,这个也做好执行下,用于事务管理
显示当前所在库名字
set hive.cli.print.current.db=true;
j将emp.csv上传到hdfs中的/tmp_files/下面
hdfs dfs -put emp.csv /tmp_files
在hive中创建emp_default表
hive (default)> create table emp(empno int,ename string,job string,mgr int,hiredate string,sal int,comm int,deptno int)
> row format
> delimited fields
> terminated by ",";
hive (default)> load data inpath '/tmp_files/emp.csv' into table emp;
Time taken: 1.894 seconds
hive (default)> show tables;
hive (default)> select * from emp;
hdfs dfs -put /root/hd/tmp_files/emp.csv /tmp_files
[root@hsiehchou121 bin]# ./spark-shell --master spark://hsiehchou121:7077
启动Spatk时,如果出现如下错误
java.sql.SQLSyntaxErrorException: Table ‘hive.partitions’ doesn’t exist
在MySQL数据库里面创建partitions表
scala> spark.sql(“select * from emp_default”).show
scala> spark.sql(“select * from default.emp_default”).show
spark.sql(“create table company.emp_4(empno Int,ename String,job String,mgr String,hiredate String,sal Int,comm String,deptno Int)row format delimited fields terminated by ‘,’”)
spark.sql(“load data local inpath ‘/root/hd/tmp_files/emp.csv’ overwrite into table company.emp_4”)
三、在IDE中开发Spark SQL
1、创建DataFrame StructType方式
package day4
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
import org.apache.log4j.Logger
import org.apache.log4j.Level
/**
* 创建DataFrame StructType方式
*/
object Demo1 {
def main(args: Array[String]): Unit = {
//减少Info日志的打印
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
//创建Spark Session对象
val spark = SparkSession.builder().master("local").appName("Demo1").getOrCreate()
//从指定的地址创建RDD对象
val personRDD = spark.sparkContext.textFile("H:\\other\\students.txt").map(_.split("\t"))
//通过StructType方式指定Schema
val schema = StructType(
List(
StructField("id", IntegerType),
StructField("name", StringType),
StructField("age", IntegerType)))
//将RDD映射到rowRDD上,映射到Schema上
val rowRDD = personRDD.map(p => Row(p(0).toInt,p(1),p(2).toInt))
val personDataFrame = spark.createDataFrame(rowRDD, schema)
//注册视图
personDataFrame.createOrReplaceTempView("t_person")
//执行SQL语句 desc降序 asc 升序
val df = spark.sql("select * from t_person order by age desc")
df.show
spark.stop()
}
}
2、使用case class来创建DataFrame
package day4
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.sql.SparkSession
/**
* 使用case class来创建DataFrame
*/
object Demo2 {
def main(args: Array[String]): Unit = {
//减少Info日志的打印
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
//创建Spark Session对象
val spark = SparkSession.builder().master("local").appName("Demo1").getOrCreate()
//从指定的地址创建RDD对象
val lineRDD = spark.sparkContext.textFile("H:\\other\\students.txt").map(_.split("\t"))
//把数据与case class做匹配
val studentRDD = lineRDD.map(x => Student(x(0).toInt,x(1),x(2).toInt))
//生成DataFrame
import spark.sqlContext.implicits._
val studentDF = studentRDD.toDF()
//注册视图,执行SQL
studentDF.createOrReplaceTempView("student")
spark.sql("select * from student").show
spark.stop()
}
}
//定义case class
case class Student(stuId:Int, stuName:String, stuAge:Int)
3、写入MySQL
package day4
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import java.util.Properties
/**
* 写入mysql
*/
object Demo3 {
def main(args: Array[String]): Unit = {
//减少Info日志的打印
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
//创建Spark Session对象
val spark = SparkSession.builder().master("local").appName("Demo1").getOrCreate()
//从指定的地址创建RDD对象
val lineRDD = spark.sparkContext.textFile("H:\\other\\students.txt").map(_.split("\t"))
//通过StructType方式指定Schema
val schema = StructType(
List(
StructField("personID", IntegerType),
StructField("personName", StringType),
StructField("personAge", IntegerType)))
//将RDD映射到rowRDD上,映射到Schema上
val rowRDD = lineRDD.map(p => Row(p(0).toInt,p(1),p(2).toInt))
val personDataFrame = spark.createDataFrame(rowRDD, schema)
personDataFrame.createOrReplaceTempView("myperson")
val result = spark.sql("select * from myperson")
result.show
//把结果存入mysql中
val props = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "123456")
props.setProperty("driver", "com.mysql.cj.jdbc.Driver")
result.write.mode("append").jdbc("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "student", props)
spark.stop()
}
}
4、使用Spark SQL 读取Hive中的数据,将计算结果存入MySQL
package day4
import org.apache.spark.sql.SparkSession
import java.util.Properties
/**
* 使用Spark SQL 读取Hive中的数据,将计算结果存入mysql
*/
object Demo4 {
def main(args: Array[String]): Unit = {
//创建SparkSession
val spark = SparkSession.builder().appName("Demo4").enableHiveSupport().getOrCreate()
//执行SQL
val result = spark.sql("select deptno,count(1) from company.emp group by deptno")
//将结果保存到mysql中
val props = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "123456")
props.setProperty("driver", "com.mysql.cj.jdbc.Driver")
result.write.jdbc("jdbc:mysql://192.168.116.1:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "emp_stat", props)
spark.stop()
}
}
提交任务
[root@hsiehchou121 bin]# ./spark-submit --master spark://hsiehchou121:7077 --jars /root/hd/tmp_files/mysql-connector-java-8.0.12.jar --driver-class-path /root/hd/tmp_files/mysql-connector-java-8.0.12.jar --class day4.Demo4 /root/hd/tmp_files/Demo4.jar
四、性能优化
与RDD类似
1、把内存中缓存表的数据
直接读取内存的值,来提高性能
RDD中如何缓存:
rdd.cache 或者 rdd.persist
在Spark SQL中,使用SparkSession.sqlContext.cacheTable
spark中所有context对象
1)sparkContext : SparkCore
2)sql Context : SparkSQL
3)Streaming Context :SparkStreaming
统一起来:SparkSession
操作mysql,启动spark shell 时,需要:
./spark-shell --master
spark://hsiehchou121:7077 --jars
/root/hd/tmp_files/mysql-connector-java-8.0.12.jar --driver-class-path
/root/hd/tmp_files/mysql-connector-java-8.0.12.jar
val mysqlDF = spark.read.format(“jdbc”).option(“driver”,”com.mysql.cj.jdbc.Driver”).option(“url”,”jdbc:mysql://192.168.116.1:3306/company?serverTimezone=UTC&characterEncoding=utf-8”).option(“user”,”root”).option(“password”,”123456”).option(“dbtable”,”emp”).load
mysqlDF.show
mysqlDF.createOrReplaceTempView(“emp”)
spark.sqlContext.cacheTable(“emp”) —-> 标识这张表可以被缓存,数据还没有真正被缓存
spark.sql(“select * from emp”).show —-> 依然读取mysql
spark.sql(“select * from emp”).show —-> 从缓存中读取数据
spark.sqlContext.clearCache
清空缓存后,执行查询,会触发查询mysql数据库
2、了解性能优化的相关参数
将数据缓存到内存中的相关优化参数
spark.sql.inMemoryColumnarStorage.compressed
默认为 true
Spark SQL 将会基于统计信息自动地为每一列选择一种压缩编码方式
spark.sql.inMemoryColumnarStorage.batchSize
默认值:10000
缓存批处理大小。缓存数据时, 较大的批处理大小可以提高内存利用率和压缩率,但同时也会带来 OOM(Out Of Memory)的风险
其他性能相关的配置选项(不过不推荐手动修改,可能在后续版本自动的自适应修改)
spark.sql.files.maxPartitionBytes
默认值:128 MB
读取文件时单个分区可容纳的最大字节数
spark.sql.files.openCostInBytes
默认值:4M
打开文件的估算成本, 按照同一时间能够扫描的字节数来测量。当往一个分区写入多个文件的时候会使用。高估更好, 这样的话小文件分区将比大文件分区更快 (先被调度)
spark.sql.autoBroadcastJoinThreshold
默认值:10M
用于配置一个表在执行 join 操作时能够广播给所有 worker 节点的最大字节大小。通过将这个值设置为 -1 可以禁用广播。注意,当前数据统计仅支持已经运行了 ANALYZE TABLE COMPUTE STATISTICS noscan 命令的 Hive Metastore 表
spark.sql.shuffle.partitions
默认值:200
用于配置 join 或聚合操作混洗(shuffle)数据时使用的分区数