Spark MLlib和Spark GraphX


Spark MLlib

MLlib 是 Spark 可以扩展的机器学习库

MLlib is Apache Spark’s scalable machine learning library.

一、MLlib概述

MLlib 是 Spark 可以扩展的机器学习库

Spark在机器学习方面具有得天独厚的有事,有以下几个原因:

1、机器学习算法

一般都有多个步骤迭代计算,需要在多次迭代后,获得足够小的误差或者收敛才会停止

double wucha = 1.0
while(wucha>=0.00001){
    建模  wucha -= 某个值
}

模型计算完毕

当迭代使用Hadoop的MapReduce计算框架时,每次都要读写硬盘以及任务启动工作,导致很大的IO开销

而Spark基于内存的计算模型天生擅长迭代计算。只有在必要时,才会读写硬盘

所以Spark是机器学习比较理想的平台

2、通信

Hadoop的MapReduce计算框架,通过heartbeat方式来进行通信和传递数据,执行速度慢

spark 有高效的 Akka 和 Netty 的通信系统,通行效率高

Spark MLlib 是Spark 对常用的机器学习算法的实现库,同时包括相关测试和数据生成器

二、什么是机器学习

1、机器学习的定义

A computer program is said to learn from experience E with respect to some class of tasks T and performance measure P,
if its performance at tasks in T, as measured by P, improves with experience E

三个关键词:算法、经验、模型评价

在数据的基础上,通过算法构建出模型,并进行评价
如果达到要求,则用该模型测试其他数据
如果不达到要求,要调整算法来重新建立模型,再次进行评估
循环往复,知道获得满意的经验

应用:金融反欺诈、语音识别、自然语言处理、翻译、模式识别、智能控制等等

2、基于大数据的机器学习

传统的机器学习算法,由于技术和单机存储的现值,只能在少量数据上使用
即,依赖于数据抽样
问题:很难做好随机,导致学习的模型不准确

在大数据上进行机器学习,直接处理全量数据并进行大量迭代计算

Spark本身计算优势,适合机器学习

另外 spark-shell pyspark 都可以提供及时查询工具

3、MLlib

MLlib是Spark机器学习库,简化机器学习的工程实践工作,方便扩展到更大规模
集成了通用的学习算法:分类、回归、聚类、协同过滤、降维等等

另外,MLlib本身在Spark中,数据清洗、SQL、建模放在一起

sample_linear_regression_data.txt
1 1:1.9
2 1:3.1
3 1:4
3.5 1:4.45
4 1:5.02
9 1:9.97
-2 1:-0.98

package day7

import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.regression.LinearRegression

/*
 * 1.3850645873427236 1:0.14476184437006356 2:-0.11280617018445871 3:-0.4385084538142101 4:-0.5961619435136434 5:0.419554626795412 6:-0.5047767472761191 7:0.457180284958592 8:-0.9129360314541999 9:-0.6320022059786656 10:-0.44989608519659363
 * 
 */
object Demo1 {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("Demo1").master("local").getOrCreate()

    val data_path = "H:\\sample_linear_regression_data.txt"

    //读取训练数据
    val trainning = spark.read.format("libsvm").load(data_path)

    //定义模型
    val lr = new LinearRegression().setMaxIter(10000)

    //训练模型
    val lrModel = lr.fit(trainning)

    //获取模型训练结果
    val trainningSummary = lrModel.summary

    //获取预测值
    trainningSummary.predictions.show()

    //获取误差
    print(trainningSummary.rootMeanSquaredError)

    spark.stop()
  }
}

Spark Graphx

一、Spark Graphx 是什么?

1、是Spark 的一个模块,主要用于进行以图为核心的计算,还有分布式图计算

2、Graphx 底层基于RDD计算,和RDD共用一种存储形态。在展示形态上,可以用数据集来表示,也可以用图来表示

二、Spark GraphX 有哪些抽象?

1、顶点

RDD[(VertexId,VD)]表示
VertexId 代表了顶点的ID,是Long类型
VD 是顶点的属性,可以是任何类型

2、边

RDD[Edge[ED]]表示
Edge表示一个边
包含一个ED类型参数来设定属性
另外,边还包含了源顶点ID和目标顶点ID

3、三元组

三元组结构用RDD[EdgeTriplet[VD,ED]]表示
三元组包含一个边、边的属性、源顶点ID、源顶点属性、目标顶点ID、目标顶点属性

4、图

Graph表示,通过顶点和边来构建

package day7

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Edge
import org.apache.spark.graphx.Graph

object Demo2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Demo2").setMaster("local")

    //创建Spark Context对象
    val sc = new SparkContext(conf)

    //定义点
    val users = sc.parallelize(Array((3L,("TIme","student")),(5L,("Andy","student")),
        (7L,("Mary","student")),(2L,("Lily","post"))))

    //定义边
    val relationship = sc.parallelize(Array(Edge(3L,7L,"col"),Edge(5L,3L,"ad"),Edge(2L,5L,"col"),Edge(5L,7L,"heh"))) 

    //构建图
    val graph = Graph(users, relationship)

    //图的操作
    val post_count = graph.vertices.filter{ case (id,(name,pos)) => pos=="post"}.count

    println("post count is " + post_count)

    val edges_count = graph.edges.filter(e => e.srcId > e.dstId).count()

    println("the value is " + edges_count)
  }
}

文章作者: 谢舟
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 谢舟 !
 上一篇
内存数据库专题(MemCached和Redis) 内存数据库专题(MemCached和Redis)
内存数据库专题为什么要把数据存入内存?快 常见的内存数据库:MemCached:看成Redis前身,严格来说,MemCached不能叫数据库,只能叫缓存不支持持久化。如果内存停电,数据丢失 Redis:内存数据库,支持持久化,支持HA Or
2019-04-18
下一篇 
Spark 调优 Spark 调优
Spark 调优 问题:只要会用就可以,为什么还要精通内核源码与调优?Spark 性能优化概览:Spark的计算本质是,分布式计算所以,Spark程序的性能可能因为集群中的任何因素出现瓶颈:CPU、网络带宽、或者内存 CPU、网络带
2019-04-07
  目录