一、Flink概述
官网:http://flink.apache.org/
MapReduce->MaxCompute
HBase->部门
QuickBI
DataV
Hive->高德地图
Storm->JStorm
2019年1月 阿里正式开源Flink->Blink
Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink设计为在所有常见的集群环境中运行,以内存速度和任何规模执行计算。
大数据计算框架
1、简介
Flink核心是一个流式的数据流执行引擎,其针对数据流的分布式计算提供了数据分布、数据通信以及容错机制等功能。基于流执行引擎,Flink提供了诸多更高抽象层的API以便用户编写分布式任务:
DataSet API,对静态数据进行批处理操作,将静态数据抽象成分布式的数据集,用户可以方便地使用Flink提供的各种操作符对分布式数据集进行处理,支持Java、Scala和Python
DataStream API,对数据流进行流处理操作,将流式的数据抽象成分布式的数据流,用户可以方便地对分布式数据流进行各种操作,支持Java和Scala
Table API,对结构化数据进行查询操作,将结构化数据抽象成关系表,并通过类SQL的DSL对关系表进行各种查询操作,支持Java和Scala
此外,Flink还针对特定的应用领域提供了领域库,例如:
Flink ML,Flink的机器学习库,提供了机器学习Pipelines API并实现了多种机器学习算法
Gelly,Flink的图计算库,提供了图计算的相关API及多种图计算算法实现
2、统一的批处理与流处理系统
在大数据处理领域,批处理任务与流处理任务一般被认为是两种不同的任务,一个大数据项目一般会被设计为只能处理其中一种任务,例如Apache Storm、Apache Smaza只支持流处理任务,而Aapche MapReduce、Apache Tez、Apache Spark只支持批处理任务。Spark Streaming是Apache Spark之上支持流处理任务的子系统,看似一个特例,实则不然——Spark Streaming采用了一种micro-batch的架构,即把输入的数据流切分成细粒度的batch,并为每一个batch数据提交一个批处理的Spark任务,所以Spark Streaming本质上还是基于Spark批处理系统对流式数据进行处理,和Apache Storm、Apache Smaza等完全流式的数据处理方式完全不同。通过其灵活的执行引擎,Flink能够同时支持批处理任务与流处理任务
在执行引擎这一层,流处理系统与批处理系统最大不同在于节点间的数据传输方式
对于一个流处理系统,其节点间数据传输的标准模型是:当一条数据被处理完成后,序列化到缓存中,然后立刻通过网络传输到下一个节点,由下一个节点继续处理
而对于一个批处理系统,其节点间数据传输的标准模型是:当一条数据被处理完成后,序列化到缓存中,并不会立刻通过网络传输到下一个节点,当缓存写满,就持久化到本地硬盘上,当所有数据都被处理完成后,才开始将处理后的数据通过网络传输到下一个节点
这两种数据传输模式是两个极端,对应的是流处理系统对低延迟的要求和批处理系统对高吞吐量的要求。Flink的执行引擎采用了一种十分灵活的方式,同时支持了这两种数据传输模型。Flink以固定的缓存块为单位进行网络数据传输,用户可以通过缓存块超时值指定缓存块的传输时机。如果缓存块的超时值为0,则Flink的数据传输方式类似上文所提到流处理系统的标准模型,此时系统可以获得最低的处理延迟。如果缓存块的超时值为无限大,则Flink的数据传输方式类似上文所提到批处理系统的标准模型,此时系统可以获得最高的吞吐量
同时缓存块的超时值也可以设置为0到无限大之间的任意值
缓存块的超时阈值越小,则Flink流处理执行引擎的数据处理延迟越低,但吞吐量也会降低,
反之亦然。通过调整缓存块的超时阈值,用户可根据需求灵活地权衡系统延迟和吞吐量
3、架构
要了解一个系统,一般都是从架构开始。我们关心的问题是:系统部署成功后各个节点都启动了哪些服务,各个服务之间又是怎么交互和协调的。下方是 Flink 集群启动后架构图
当 Flink 集群启动后,首先会启动一个 JobManger 和一个或多个的 TaskManager。由 Client 提交任务给 JobManager,JobManager 再调度任务到各个 TaskManager 去执行,然后 TaskManager 将心跳和统计信息汇报给 JobManager。TaskManager 之间以流的形式进行数据的传输。上述三者均为独立的 JVM 进程
Client 为提交 Job 的客户端,可以是运行在任何机器上(与 JobManager 环境连通即可)。提交 Job 后,Client 可以结束进程(Streaming的任务),也可以不结束并等待结果返回
JobManager 主要负责调度 Job 并协调 Task 做 checkpoint,职责上很像 Storm 的 Nimbus。从 Client 处接收到 Job 和 JAR 包等资源后,会生成优化后的执行计划,并以 Task 的单元调度到各个 TaskManager 去执行
TaskManager 在启动的时候就设置好了槽位数(Slot),每个 slot 能启动一个 Task,Task 为线程。从 JobManager 处接收需要部署的 Task,部署启动后,与自己的上游建立 Netty 连接,接收数据并处理
可以看到 Flink 的任务调度是多线程模型,并且不同Job/Task混合在一个 TaskManager 进程中。虽然这种方式可以有效提高 CPU 利用率,但是个人不太喜欢这种设计,因为不仅缺乏资源隔离机制,同时也不方便调试。类似 Storm 的进程模型,一个JVM 中只跑该 Job 的 Tasks 实际应用中更为合理
Flink编程模型
二、Flink特点
1)mapreduce
2)storm
3)spark
适用于所有企业,不同企业有不同的业务场景。处理数据量,模型都不一样
Flink
1、随处部署应用
与其它组件集成!
flink是分布式系统,需要计算资源才可执行程序。flink可以与常见的集群资源管理器进行集成(H
adoop Yarn,Apache Mesos..)
可以单独作为独立集群运行
通过不同部署模式实现
这些模式允许flink以其惯有的方式进行交互
当我们部署flink应用程序时,Flink会根据应用程序配置的并行性自动识别所需资源。从资源管理
器中请求它们
如果发生故障,flink会请求新的资源来替换发生故障的容器
提交或控制程序都通过REST调用进行,简化Flink在许多环境的集成。孵化…
2、以任何比例应用程序(小集群、无限集群)
Flink旨在以任何规模运行有状态流应用程序。应用程序可以并行化在集群中分布和同时执行程
序
因此,我们的应用集群可以利用无限的cpu和磁盘与网络IO
Flink可以轻松的维护非常大的应用程序状态
用户可拓展性报告:
- 应用程序每天可以处理万亿个事件
- 应用程序每天可以维护多个TB的状态
- 应用程序可以在数千个内核运行
3、利用内存中的性能
有状态Flink应用程序针对于对本地状态访问进行了优化。任务状态始终的保留在内存中,或者如果
大小超过了可用内存,则保存在访问高效的磁盘数据结构中(SSD 机械/固态)
任务可以通过访问本地来执行所有计算。从来产生极小的延迟
Flink定期和异步检查本地状态持久存储来保持出现故障时一次状态的一致性
三、有界无界
1、无界
有开始,没有结束…
处理实时数据
2、有界
有开始,有结束…
处理批量数据
四、无界数据集应用场景(实时计算)
1)源源不断的日志数据
2)web应用,指标分析
3)移动设备终端(分析app状况)
4)应用在任何数据源不断产生的项目中
五、Flink运行模型
1)流计算
数据源源不断产生,我们的需求是源源不断的处理
程序需要一直保持在计算的状态
2)批处理
计算一段完整的数据集,计算成功后释放资源,那么此时工作结束
六、Flink的使用
1、处理结果准确
无论是有序数据还是延迟到达的数据
2、容错机制
有状态:保持每次的结果往下传递,实现累加。DAG(有向无环图)
3、有很强大的吞吐量和低延迟
计算速度快,吞吐量处理的量级大
4、精准的维护一次的应用状态
storm:会发生要么多计算一次,要么漏计算
5、支持大规模的计算
可以运行在数千台节点上
6、支持流处理和窗口化操作
7、版本化处理
8、检查点机制实现精准的一次性计算保证
checkpoint
9、支持yarn与mesos资源管理器
七、flink单节点安装部署
1)下载安装包
2)上传
3)解压
tar -zxvf .tar
4)启动
bin/start-cluster.sh
5)访问ui界面
http://192.168.116.201:8081
八、搭建Flink1.6.1分布式集群
1、Flink的下载
安装包下载地址:http://flink.apache.org/downloads.html ,选择对应Hadoop的Flink版本下载
[root@hsiehchou121 software]$ wget http://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.6.1/flink-1.6.1-bin-hadoop28-scala_2.11.tgz
[root@hsiehchou121 software]$ ll
-rw-rw-r– 1 root root 301867081 Sep 15 15:47 flink-1.6.1-bin-hadoop28-scala_2.11.tgz
Flink 有三种部署模式,分别是 Local、Standalone Cluster 和 Yarn Cluster
2、Local模式
对于 Local 模式来说,JobManager 和 TaskManager 会公用一个 JVM 来完成 Workload
如果要验证一个简单的应用,Local 模式是最方便的。实际应用中大多使用 Standalone 或者 Yarn Cluster,而local模式只是将安装包解压启动(./bin/start-cluster.sh)即可,在这里不在演示
3、Standalone 模式
快速入门教程地址:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/quickstart/setup_quickstart.html
1) 软件要求
• Java 1.8.x或更高版本
• ssh(必须运行sshd才能使用管理远程组件的Flink脚本)
集群部署规划
节点名称 | master | worker | zookeeper |
---|---|---|---|
hsiehchou121 | master | zookeeper | |
hsiehchou122 | master | worker | zookeeper |
hsiehchou123 | worker | zookeeper |
2)解压
[root@hsiehchou121 software]$ tar -zxvf flink-1.6.1-bin-hadoop27-scala_2.11.tgz -C /opt/module/
[root@hsiehchou121 software]$ cd /opt/module/
[root@hsiehchou121 module]$ ll
drwxr-xr-x 8 root root 125 Sep 15 04:47 flink-1.6.1
3)修改配置文件
[root@hsiehchou121 conf]$ ls
flink-conf.yaml log4j-console.properties log4j-yarn-session.properties logback.xml masters sql-client-defaults.yaml
log4j-cli.properties log4j.properties logback-console.xml logback-yarn.xml slaves zoo.cfg
修改flink/conf/masters,slaves,flink-conf.yaml
[root@hsiehchou121 conf]$ sudo vi masters
hsiehchou121:8081
[root@hsiehchou121 conf]$ sudo vi slaves
hsiehchou122
hsiehchou123
[root@hsiehchou121 conf]$ sudo vi flink-conf.yaml
taskmanager.numberOfTaskSlots:2 //52行 和storm slot类似
jobmanager.rpc.address: hsiehchou121 //33行
可选配置:
• 每个JobManager(jobmanager.heap.mb)的可用内存量
• 每个TaskManager(taskmanager.heap.mb)的可用内存量
• 每台机器的可用CPU数量(taskmanager.numberOfTaskSlots)
• 集群中的CPU总数(parallelism.default)和
• 临时目录(taskmanager.tmp.dirs)
4)拷贝安装包到各节点
[root@hsiehchou121 module]$ scp -r flink-1.6.1/ root@hsiehchou122:pwd
[root@hsiehchou121 module]$ scp -r flink-1.6.1/ root@hsiehchou123:pwd
5) 配置环境变量
配置所有节点Flink的环境变量
[root@hsiehchou121 flink-1.6.1]$ sudo vi /etc/profile
export FLINK_HOME=/opt/module/flink-1.6.1
export PATH=$PATH:$
FLINK_HOME/bin
[root@hsiehchou121 flink-1.6.1]$ source /etc/profile
6)启动Flink
[root@hsiehchou121 flink-1.6.1]$ ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host hsiehchou121.
Starting taskexecutor daemon on host hsiehchou122.
Starting taskexecutor daemon on host hsiehchou123.
jps查看进程
hsiehchou121
2122 StandaloneSessionClusterEntrypoint
2172 Jps
hsiehchou122
1616 TaskManagerRunner
1658 Jps
hsiehchou123
1587 TaskManagerRunner
1627 Jps
7) WebUI查看
8)Flink的HA
首先,我们需要知道 Flink 有两种部署的模式,分别是 Standalone 以及 Yarn Cluster 模式。对于 Standalone 来说,Flink 必须依赖于 Zookeeper 来实现 JobManager 的 HA(Zookeeper 已经成为了大部分开源框架 HA 必不可少的模块)。在 Zookeeper 的帮助下,一个 Standalone 的 Flink 集群会同时有多个活着的 JobManager,其中只有一个处于工作状态,其他处于 Standby 状态。当工作中的 JobManager 失去连接后(如宕机或 Crash),Zookeeper 会从 Standby 中选举新的 JobManager 来接管 Flink 集群
对于 Yarn Cluaster 模式来说,Flink 就要依靠 Yarn 本身来对 JobManager 做 HA 了。其实这里完全是 Yarn 的机制。对于 Yarn Cluster 模式来说,JobManager 和 TaskManager 都是被 Yarn 启动在 Yarn 的 Container 中。此时的 JobManager,其实应该称之为 Flink Application Master。也就说它的故障恢复,就完全依靠着 Yarn 中的 ResourceManager(和 MapReduce 的 AppMaster 一样)。由于完全依赖了 Yarn,因此不同版本的 Yarn 可能会有细微的差异。这里不再做深究
(1) 修改配置文件
修改flink-conf.yaml,HA模式下,jobmanager不需要指定,在master file中配置,由zookeeper选出leader与standby。
jobmanager.rpc.address: hsiehchou121
high-availability:zookeeper //73行
指定高可用模式(必须) //88行
high-availability.zookeeper.quorum:hsiehchou121:2181,hsiehchou122:2181,hsiehchou123:2181
ZooKeeper仲裁是ZooKeeper服务器的复制组,它提供分布式协调服务(必须) //82行
high-availability.storageDir:hdfs:///flink/ha/
JobManager元数据保存在文件系统storageDir中,只有指向此状态的指针存储在ZooKeeper中(必须) //没有
high-availability.zookeeper.path.root:/flink
根ZooKeeper节点,在该节点下放置所有集群节点(推荐) //没有
high-availability.cluster-id:/flinkCluster
&&&&&自定义集群(推荐)
state.backend: filesystem
state.checkpoints.dir: hdfs:///flink/checkpoints
state.savepoints.dir: hdfs:///flink/checkpoints
修改conf/zoo.cfg
server.1=hsiehchou121:2888:3888
server.2=hsiehchou122:2888:3888
server.3=hsiehchou123:2888:3888
修改conf/masters
hsiehchou121:8081
hsiehchou122:8081
修改slaves
hsiehchou122
hsiehchou123
同步配置文件conf到各节点
(2) 启动HA
先启动zookeeper集群各节点(测试环境中也可以用Flink自带的start-zookeeper-quorum.sh),启动dfs ,再启动flink
[root@hsiehchou121 flink-1.6.1]$ start-cluster.sh
WebUI查看,这是会自动产生一个主Master,如下
(3) 验证HA
手动杀死hsiehchou122上的master,此时,hsiehchou121上的备用master转为主mater
(4)手动将JobManager / TaskManager实例添加到群集
您可以使用bin/jobmanager.sh和bin/taskmanager.sh脚本将JobManager和TaskManager实例添加到正在运行的集群中
添加JobManager
bin/jobmanager.sh ((start|start-foreground) [host] [webui-port]
)|stop|stop-all
添加TaskManager
bin/taskmanager.sh start|start-foreground|stop|stop-all
[root@hsiehchou122 flink-1.6.1]$ jobmanager.sh start hsiehchou122
新添加的为从master
9)运行测试任务
[root@hsiehchou121 flink-1.6.1]$ flink run -m hsiehchou121:8081 ./examples/batch/WordCount.jar –input /opt/wcinput/wc.txt –output /opt/wcoutput/
[root@hsiehchou121 flink-1.6.1]$ bin/flink run -m hsiehchou121:8081 ./examples/batch/WordCount.jar –input hdfs:///emp.csv –output hdfs:///user/root/output2
4、Yarn Cluster模式
1)引入
在一个企业中,为了最大化的利用集群资源,一般都会在一个集群中同时运行多种类型的 Workload。因此 Flink 也支持在 Yarn 上面运行。首先,让我们了解下 Yarn 和 Flink 的关系
在图中可以看出,Flink 与 Yarn 的关系与 MapReduce 和 Yarn 的关系是一样的。Flink 通过 Yarn 的接口实现了自己的 App Master。当在 Yarn 中部署了 Flink,Yarn 就会用自己的 Container 来启动 Flink 的 JobManager(也就是 App Master)和 TaskManager
启动新的Flink YARN会话时,客户端首先检查所请求的资源(容器和内存)是否可用。之后,它将包含Flink和配置的jar上传到HDFS(步骤1)
客户端的下一步是请求(步骤2)YARN容器以启动ApplicationMaster(步骤3)。由于客户端将配置和jar文件注册为容器的资源,因此在该特定机器上运行的YARN的NodeManager将负责准备容器(例如,下载文件)。完成后,将启动ApplicationMaster(AM)
该JobManager和AM在同一容器中运行。一旦它们成功启动,AM就知道JobManager(它自己的主机)的地址。它正在为TaskManagers生成一个新的Flink配置文件(以便它们可以连接到JobManager)。该文件也上传到HDFS。此外,AM容器还提供Flink的Web界面。YARN代码分配的所有端口都是临时端口。这允许用户并行执行多个Flink YARN会话
之后,AM开始为Flink的TaskManagers分配容器,这将从HDFS下载jar文件和修改后的配置。完成这些步骤后,即可建立Flink并准备接受作业
2)修改环境变量
export HADOOP_CONF_DIR= /opt/module/hadoop-2.8.4/etc/hadoop
3)部署启动
[root@hsiehchou121 flink-1.6.1]$ yarn-session.sh -d -s 1 -tm 800 -n 2
-n : TaskManager的数量,相当于executor的数量
-s : 每个JobManager的core的数量,executor-cores。建议将slot的数量设置每台机器的处理器数量
-tm : 每个TaskManager的内存大小,executor-memory
-jm : JobManager的内存大小,driver-memory
上面的命令的意思是,同时向Yarn申请3个container,其中 2 个 Container 启动 TaskManager(-n 2),每个 TaskManager 拥有两个 Task Slot(-s 2),并且向每个 TaskManager 的 Container 申请 800M 的内存,以及一个ApplicationMaster(Job Manager)
Flink部署到Yarn Cluster后,会显示Job Manager的连接细节信息
Flink on Yarn会覆盖下面几个参数,如果不希望改变配置文件中的参数,可以动态的通过-D选项指定,如
-Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368
jobmanager.rpc.address:因为JobManager会经常分配到不同的机器上
taskmanager.tmp.dirs:使用Yarn提供的tmp目录
parallelism.default:如果有指定slot个数的情况下
yarn-session.sh会挂起进程,所以可以通过在终端使用CTRL+C或输入stop停止yarn-session
如果不希望Flink Yarn client长期运行,Flink提供了一种detached YARN session,启动时候加上参数-d或—detached
在上面的命令成功后,我们就可以在 Yarn Application 页面看到 Flink 的纪录
如果在虚拟机中测试,可能会遇到错误。这里需要注意内存的大小,Flink 向 Yarn 会申请多个 Container,但是 Yarn 的配置可能限制了 Container 所能申请的内存大小,甚至 Yarn 本身所管理的内存就很小。这样很可能无法正常启动 TaskManager,尤其当指定多个 TaskManager 的时候。因此,在启动 Flink 之后,需要去 Flink 的页面中检查下 Flink 的状态。这里可以从 RM 的页面中,直接跳转(点击 Tracking UI)
yarn-session.sh启动命令参数如下:
[root@hsiehchou121 flink-1.6.1]$ yarn-session.sh –help
Usage:
Required
-n,–container <arg>
Number of YARN container to allocate (=Number of Task Managers)
Optional
-D <property=value> use value for given property
-d,–detached If present, runs the job in detached mode
-h,–help Help for the Yarn session CLI.
-id,–applicationId <arg>
Attach to running YARN session
-j,–jar <arg>
Path to Flink jar file
-jm,–jobManagerMemory <arg>
Memory for JobManager Container with optional unit (default: MB)
-m,–jobmanager <arg>
Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified i
n the configuration. -n,–container <arg>
Number of YARN container to allocate (=Number of Task Managers)
-nl,–nodeLabel <arg>
Specify YARN node label for the YARN application
-nm,–name <arg>
Set a custom name for the application on YARN
-q,–query Display available YARN resources (memory, cores)
-qu,–queue <arg>
Specify YARN queue.
-s,–slots <arg>
Number of slots per TaskManager
-st,–streaming Start Flink in streaming mode
-t,–ship <arg>
Ship files in the specified directory (t for transfer)
-tm,–taskManagerMemory <arg>
Memory per TaskManager Container with optional unit (default: MB)
-yd,–yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)
-z,–zookeeperNamespace Namespace to create the Zookeeper sub-paths for high availability mode
4)提交任务
之后,我们可以通过这种方式提交我们的任务
[root@hsiehchou121 flink-1.6.1]$ ./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar –input /opt/wcinput/wc.txt –output /opt/wcoutput/
bin/flink run -m yarn-cluster -yn2 examples/batch/WordCount.jar –input /input/ –output /XZ
以上命令在参数前加上y前缀,-yn表示TaskManager个数
在这个模式下,同样可以使用-m yarn-cluster提交一个”运行后即焚”的detached yarn(-yd)作业到yarn cluster
5)停止yarn cluster
yarn application -kill application_1539058959130_0001
6) Yarn模式的HA
应用最大尝试次数(yarn-site.xml),您必须配置为尝试应用的最大数量的设置yarn-site.xml,当前YARN版本的默认值为2(表示允许单个JobManager失败)<property>
<name>
yarn.resourcemanager.am.max-attempts</name>
<value>
4</value>
<description>
The maximum number of application master execution attempts</description>
</property>
申请尝试(flink-conf.yaml),您还必须配置最大尝试次数conf/flink-conf.yaml: yarn.application-attempts:10
示例:高度可用的YARN会话
配置HA模式和zookeeper法定人数在conf/flink-conf.yaml:
high-availability: zookeeper
high-availability.zookeeper.quorum: hsiehchou121:2181,hsiehchou122:2181,hsiehchou123:2181
high-availability.storageDir: hdfs:///flink/recovery
high-availability.zookeeper.path.root: /flink
yarn.application-attempts: 10
配置ZooKeeper的服务器中conf/zoo.cfg(目前它只是可以运行每台机器的单一的ZooKeeper服务器):
server.1=hsiehchou121:2888:3888
server.2=hsiehchou122:2888:3888
server.3=hsiehchou123:2888:3888
启动ZooKeeper仲裁:
$ bin / start-zookeeper-quorum.sh
启动HA群集:
$ bin / yarn-session.sh -n 2