流式计算专题
批量计算、实时计算、离线计算、流式计算
共同点:
数据源 –> 采集数据 –> task worker –> task worker –> sink 输出
批量计算和流式计算
区别:
处理数据粒度不一样
批量计算每次处理一定大小的数据块。流式计算,每次处理一条记录
流式计算可以提供类似批量计算的功能,为什么我们还要批量计算系统?
1、流式系统的吞吐量不如批量系统
2、流式系统无法提供精准的计算
- 任务类型不一样
- 流式计算会一直运行
- 数据源的区别
对于批量计算而言,数据是有限数据
而对于流式计算,是无限数据
一、Storm—-是最早流式计算框架
1、Storm概述
1)什么是Storm
网址:http://storm.apache.org/
Apache Storm是一个免费的开源分布式实时计算系统。Storm可以轻松可靠地处理无限数据流,实现Hadoop对批处理所做的实时处理。Storm非常简单,可以与任何编程语言一起使用,并且使用起来很有趣!
Storm为分布式实时计算提供了一组通用原语,可被用于“流处理”之中,实时处理消息并更新数据库。这是管理队列及工作者集群的另一种方式。 Storm也可被用于“连续计算”(continuous computation),对数据流做连续查询,在计算时就将结果以流的形式输出给用户。它还可被用于“分布式RPC”,以并行的方式运行昂贵的运算
Storm可以方便地在一个计算机集群中编写与扩展复杂的实时计算,Storm用于实时处理,就好比 Hadoop 用于批处理。Storm保证每个消息都会得到处理,而且它很快——在一个小集群中,每秒可以处理数以百万计的消息。更棒的是你可以使用任意编程语言来做开发
Storm有许多用例:实时分析,在线机器学习,连续计算,分布式RPC,ETL等。风暴很快:一个基准测试表示每个节点每秒处理超过一百万个元组。它具有可扩展性,容错性,可确保您的数据得到处理,并且易于设置和操作
Storm集成了您已经使用的排队和数据库技术。Storm拓扑消耗数据流并以任意复杂的方式处理这些流,然后在计算的每个阶段之间重新划分流
2、离线计算和流式计算
① 离线计算
- 离线计算:批量获取数据、批量传输数据、周期性批量计算数据、数据展示
- 代表技术:Sqoop批量导入数据、HDFS批量存储数据、MapReduce批量计算、Hive、Flume批量获取数据、Sqoop批量传输、HDFS/Hive/HBase批量存储、MR/Hive计算数据、BI
② 流式计算
- 流式计算:数据实时产生、数据实时传输、数据实时计算、实时展示
- 代表技术:Flume实时获取数据、Kafka/metaq实时数据存储、Storm/JStorm实时数据计算、Redis实时结果缓存、持久化存储(mysql)、阿里实时展示(DataV/QuickBI)
一句话总结:将源源不断产生的数据实时收集并实时计算,尽可能快的得到计算结果
③ Storm与Hadoop的区别
Storm用于实时计算 | Hadoop用于离线计算 |
---|---|
Storm处理的数据保存在内存中,源源不断中,一批一批 | Hadoop处理的数据保存在文件系统 |
Storm的数据通过网络传输进来 | Hadoop的数据保存在磁盘中 |
Storm与Hadoop的编程模型相似 |
Storm与Hadoop
角色
hadoop | storm |
---|---|
JobTracker | Nimbus |
TaskTracker | Supervisor |
Child | Worker |
应用名称
hadoop | storm |
---|---|
Job | Topology |
编程接口
hadoop | storm |
---|---|
Mapper/Reducer | Spout/Bolt |
3、Storm的体系结构
Nimbus:负责资源分配和任务调度
Supervisor:负责接受Nimbus分配的任务,启动和停止属于自己管理的worker进程。通过配置文件设置当前Supervisor上启动多少个Worker
Worker:运行具体处理组件逻辑的进程。Worker运行的任务类型只有两种,一种是Spout任务,一种是Bolt任务
Executor:Storm 0.8之后,Executor为Worker进程中的具体的物理线程,同一个Spout/Bolt的Task可能会共享一个物理线程,一个Executor中只能运行隶属于同一个Spout/Bolt的Task
Task:Worker中每一个Spout/Bolt的线程称为一个Task. 在Storm0.8之后,Task不再与物理线程对应,不同Spout/Bolt的Task可能会共享一个物理线程,该线程称为Executor
4、Storm编程模型
tuple:元组
是消息传输的基本单元
Spout:水龙头
Storm的核心抽象。拓扑的流的来源。Spout通常从外部数据源读取数据。转换为t敺内部的源数据。
主要方法:
nextTuple() -》 发出一个新的元组到拓扑
ack()
fail()
Bolt:转接头
Bolt是对流的处理节点。Bolt作用:过滤、业务、连接运算
Topology:拓扑
是一个实时的应用程序
永远运行除非被杀死
Spout到Bolt是一个连接流
5、Storm的运行机制
- 整个处理流程的组织协调不用用户去关心,用户只需要去定义每一个步骤中的具体业务处理逻辑
- 具体执行任务的角色是Worker,Worker执行任务时具体的行为则有我们定义的业务逻辑决定
6、Storm的集群安装配置
(1)Storm集群安装部署
1)准备工作
hsiehchou121 | hsiehchou122 | hsiehchou123 |
---|---|---|
storm01 | storm02 | storm03 |
2)下载安装包
http://storm.apache.org/downloads.html
3)上传
4)解压
tar -zxvf apache-storm-1.1.0.tar.gz
mv apache-storm-1.1.0 storm
5)设置环境变量
#STORM_HOME
export STORM_HOME=/root/hd/storm
export PATH=$STORM_HOME/bin:$PATH
source /etc/profile
6)修改配置文件
$ vi storm.yaml
#设置Zookeeper的主机名称
storm.zookeeper.servers:
- "hsiehchou121"
- "hsiehchou122"
- "hsiehchou123"
#设置主节点的主机名称
nimbus.seeds: ["hsiehchou121"]
#设置Storm的数据存储路径
storm.local.dir: "/root/hd/storm/data"
#设置Worker的端口号
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
注意:如果要搭建Storm的HA,只需要在nimbus.seeds中设置多个nimbus即可
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
mkdir data
7)分发到其他机器
[root@hsiehchou121 hd]# scp -r storm/ hsiehchou122:$PWD
[root@hsiehchou121 hd]# scp -r storm/ hsiehchou123:$PWD
7、启动和查看Storm
0)启动ZooKeeper
zkServer.sh start
1)在nimbus.host所属的机器上启动 nimbus服务和logviewer服务
启动nimbus
- storm nimbus &
*启动logviewer *
- storm logviewer &
2)在nimbus.host所属的机器上启动ui服务
启动ui界面
- storm ui &
3)在其它个节点上启动supervisor服务和logviewer服务
启动supervisor
- storm supervisor &
启动logviewer
- storm logviewer &
4)查看Storm集群:访问nimbus.host:/8080,即可看到storm的ui界面
http://hsiehchou121:8080/index.html
8、Storm的常用命令
有许多简单且有用的命令可以用来管理拓扑,它们可以提交、杀死、禁用、再平衡拓扑
1)查看命令帮助
storm help
2)查看版本
storm version
3)查看当前正在运行拓扑及其状态
storm list
4)提交任务命令格式
storm jar 【jar路径】 【拓扑包名.拓扑类名】 【拓扑名称】
storm jar [/路径/.jar][全类名]
[拓扑名称]
5)杀死任务命令格式
storm kill 【拓扑名称】 -w 10
(执行kill命令时可以通过-w [等待秒数]指定拓扑停用以后的等待时间)
storm kill topology-name -w 10
6)停用任务命令格式
storm deactivte 【拓扑名称】
storm deactivate topology-name
7)启用任务命令格式
storm activate【拓扑名称】
storm activate topology-name
8)重新部署任务命令格式
storm rebalance 【拓扑名称】
storm rebalance topology-name
再平衡使你重分配集群任务。这是个很强大的命令。比如,你向一个运行中的集群增加了节点。再平衡命令将会停用拓扑,然后在相应超时时间之后重分配工人,并重启拓扑
二、Storm编程案例
1、WordCount及流程分析
通过查看Storm UI上每个组件的events链接,可以查看Storm的每个组件(spout、blot)发送的消息。但Storm的event logger的功能默认是禁用的,需要在配置文件中设置:topology.eventlogger.executors: 1,具体说明如下:
- “topology.eventlogger.executors”: 0 默认,禁用
- “topology.eventlogger.executors”: 1 一个topology分配一个Event Logger
- “topology.eventlogger.executors”: nil 每个worker.分配一个Event Logger
WordCount的数据流程分析
2、Storm编程案例:WordCount
流式计算一般架构图:
- Flume用来获取数据
- Kafka用来临时保存数据
- Strom用来计算数据
- Redis是个内存数据库,用来保存数据
代码编写:
- 创建Spout(WordCountSpout)组件采集数据,作为整个Topology的数据源
WordCountSpout类
package com.hsiehchou.wc;
import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
/**
* 需求:单词计数 Hello ni hao! Hello China!
*
* 实现接口:IRichSpout IRichBolt
* 继承抽象类:BaseRichSpout BaseRichBolt
* @author hsiehchou
*/
public class WordCountSpout extends BaseRichSpout {
//定义收集器
private SpoutOutputCollector collector;
//发送数据
@Override
public void nextTuple() {
//1.发送数据
collector.emit(new Values("I am a boy!"));
//2.设置延迟
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//创建收集器
@Override
public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
this.collector = collector;
}
//声明
@Override
public void declareOutputFields(OutputFieldsDeclarer declare) {
//起别名
declare.declare(new Fields("hsiehchou"));
}
}
- 创建Bolt(WordCountSplitBolt)组件进行分词操作
WordCountSplitBolt类
package com.hsiehchou.wc;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class WordCountSplitBolt extends BaseRichBolt {
//数据继续发送到下一个bolt
private OutputCollector collector;
//业务逻辑
@Override
public void execute(Tuple in) {
//1.获取数据
String line = in.getStringByField("hsiehchou");
//2.切分数据
String[] fields = line.split(" ");
//3.<单词,1>发送出去,下一个bolt(累加求和)
for(String w:fields) {
collector.emit(new Values(w,1));
}
}
//初始化
@Override
public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
this.collector = collector;
}
//声明描述
@Override
public void declareOutputFields(OutputFieldsDeclarer declare) {
declare.declare(new Fields("word","sum"));
}
}
- 创建Bolt(WordCountBoltCount)组件进行单词计数作
WordCountBoltCount类
package com.hsiehchou.wc;
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
public class WordCountBoltCount extends BaseRichBolt {
private Map<String, Integer> map = new HashMap<String, Integer>();
//累加求和
@Override
public void execute(Tuple in) {
//1.获取数据
String word = in.getStringByField("word");
Integer sum = in.getIntegerByField("sum");
//2.业务处理
if(map.containsKey(word)) {
//之前出现的次数
Integer count = map.get(word);
//已有的
map.put(word, count + sum);
}else {
map.put(word, sum);
}
//3.打印控制台
System.err.println(Thread.currentThread().getId() + "单位为:" + word + "\t 当前出现次数为:" + map.get(word));
}
@Override
public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
}
}
- 也可以将主程序Topology(WordCountTopology)提交到Storm集群运行
WordCountTopology类
package com.hsiehchou.wc;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
public class WordCountTopology {
public static void main(String[] args) {
//1.hadoop --> Job Storm --> Topology 创建拓扑
TopologyBuilder builder = new TopologyBuilder();
//2.指定设置
builder.setSpout("WordCountSpout", new WordCountSpout(), 1);
builder.setBolt("WordCountSplitBolt", new WordCountSplitBolt(), 4).fieldsGrouping("WordCountSpout", new Fields("hsiehchou"));
builder.setBolt("WordCountBoltCount", new WordCountBolt(), 2).fieldsGrouping("WordCountSplitBolt", new Fields("word"));
//3.创建配置信息
Config conf = new Config();
//4.提交任务
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("wordcounttopology", conf, builder.createTopology());
}
}
3、集群部署
对WordCountDriver类进行修改,把本地模式修改为集群模式
public class WordCountDriver {
//集群模式运行
try {
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
e.printStackTrace();
}
//4.提交任务
//LocalCluster localCluster = new LocalCluster();
//localCluster.submitTopology("wordcounttopology", conf, builder.createTopology());
}
提交到集群
storm jar StormWordCount.jar com.hsiehchou.wc.WordCountDriver wordcount01
三、分组策略
1)fields Grouping
按照字段分组
相同字段发送到一个task中
fieldsGrouping
builder.setBolt(“WordCountSplitBolt”, new WordCountSplitBolt(), 4).fieldsGrouping(“WordCountSpout”, new Fields(“hsiehchou”));
builder.setBolt(“WordCountBolt”, new WordCountBolt(), 2).fieldsGrouping(“WordCountSplitBolt”, new Fields(“word”));
2)shuffle Grouping
随机分组
轮询。平均分配。随机分发tuple,保证每个bolt中的tuple数量相同
shuffleGrouping
builder.setBolt(“WordCountSplitBolt”, new WordCountSplitBolt(), 4).shuffleGrouping(“WordCountSpout”);
builder.setBolt(“WordCountBolt”, new WordCountBolt(), 2).shuffleGrouping(“WordCountSplitBolt”);
3)None Grouping
不分组
采用这种策略每个bolt中接收的单词不同
noneGrouping
builder.setBolt(“WordCountSplitBolt”, new WordCountSplitBolt(), 4).noneGrouping(“WordCountSpout”);
builder.setBolt(“WordCountBolt”, new WordCountBolt(), 2).noneGrouping(“WordCountSplitBolt”);
4)All Grouping
广播发送
tuple分发给每一个bolt
allGrouping
builder.setBolt(“WordCountSplitBolt”, new WordCountSplitBolt(), 4).allGrouping(“WordCountSpout”);
builder.setBolt(“WordCountBolt”, new WordCountBolt(), 2).allGrouping(“WordCountSplitBolt”);
5)Global Grouping
全局分组
分配给task id值最小的
根据线程id判断,只分配给线程id最小的
builder.setBolt(“WordCountSplitBolt”, new WordCountSplitBolt(), 4).globalGrouping(“WordCountSpout”);
builder.setBolt(“WordCountBolt”, new WordCountBolt(), 2).globalGrouping(“WordCountSplitBolt”);