Storm基础


流式计算专题
批量计算、实时计算、离线计算、流式计算

共同点:
数据源 –> 采集数据 –> task worker –> task worker –> sink 输出

批量计算和流式计算
区别:
处理数据粒度不一样

批量计算每次处理一定大小的数据块。流式计算,每次处理一条记录

流式计算可以提供类似批量计算的功能,为什么我们还要批量计算系统?

1、流式系统的吞吐量不如批量系统

2、流式系统无法提供精准的计算

  1. 任务类型不一样
  2. 流式计算会一直运行
  3. 数据源的区别
    对于批量计算而言,数据是有限数据
    而对于流式计算,是无限数据

一、Storm—-是最早流式计算框架

1、Storm概述

1)什么是Storm
网址:http://storm.apache.org/
Apache Storm是一个免费的开源分布式实时计算系统。Storm可以轻松可靠地处理无限数据流实现Hadoop对批处理所做的实时处理。Storm非常简单,可以与任何编程语言一起使用,并且使用起来很有趣!

Storm为分布式实时计算提供了一组通用原语,可被用于“流处理”之中,实时处理消息并更新数据库。这是管理队列及工作者集群的另一种方式。 Storm也可被用于“连续计算”(continuous computation),对数据流做连续查询,在计算时就将结果以流的形式输出给用户。它还可被用于“分布式RPC”,以并行的方式运行昂贵的运算

Storm可以方便地在一个计算机集群中编写与扩展复杂实时计算,Storm用于实时处理,就好比 Hadoop 用于批处理。Storm保证每个消息都会得到处理,而且它很快——在一个小集群中每秒可以处理数以百万计的消息。更棒的是你可以使用任意编程语言来开发

Storm有许多用例:实时分析在线机器学习连续计算分布式RPCETL等。风暴很快:一个基准测试表示每个节点每秒处理超过一百万个元组。它具有可扩展性容错性,可确保您的数据得到处理,并且易于设置和操作

Storm集成了您已经使用的排队和数据库技术Storm拓扑消耗数据流以任意复杂的方式处理这些流,然后在计算的每个阶段之间重新划分流

2、离线计算和流式计算

① 离线计算

  • 离线计算:批量获取数据、批量传输数据、周期性批量计算数据、数据展示
  • 代表技术:Sqoop批量导入数据、HDFS批量存储数据、MapReduce批量计算、Hive、Flume批量获取数据、Sqoop批量传输、HDFS/Hive/HBase批量存储、MR/Hive计算数据、BI

② 流式计算

  • 流式计算:数据实时产生、数据实时传输、数据实时计算、实时展示
  • 代表技术:Flume实时获取数据、Kafka/metaq实时数据存储、Storm/JStorm实时数据计算、Redis实时结果缓存、持久化存储(mysql)、阿里实时展示(DataV/QuickBI)

一句话总结:将源源不断产生的数据实时收集并实时计算,尽可能快的得到计算结果

③ Storm与Hadoop的区别

Storm用于实时计算 Hadoop用于离线计算
Storm处理的数据保存在内存中,源源不断中,一批一批 Hadoop处理的数据保存在文件系统
Storm的数据通过网络传输进来 Hadoop的数据保存在磁盘中
Storm与Hadoop的编程模型相似

Storm与Hadoop
角色

hadoop storm
JobTracker Nimbus
TaskTracker Supervisor
Child Worker

应用名称

hadoop storm
Job Topology

编程接口

hadoop storm
Mapper/Reducer Spout/Bolt

3、Storm的体系结构

nimbus

topology

  • Nimbus:负责资源分配和任务调度

  • Supervisor:负责接受Nimbus分配的任务,启动和停止属于自己管理的worker进程。通过配置文件设置当前Supervisor上启动多少个Worker

  • Worker:运行具体处理组件逻辑的进程。Worker运行的任务类型只有两种,一种是Spout任务,一种是Bolt任务

  • Executor:Storm 0.8之后,Executor为Worker进程中的具体的物理线程,同一个Spout/Bolt的Task可能会共享一个物理线程,一个Executor中只能运行隶属于同一个Spout/Bolt的Task

  • Task:Worker中每一个Spout/Bolt的线程称为一个Task. 在Storm0.8之后,Task不再与物理线程对应,不同Spout/Bolt的Task可能会共享一个物理线程,该线程称为Executor

Worker进程

4、Storm编程模型

Storm编程模型

tuple:元组
是消息传输的基本单元

Spout:水龙头
Storm的核心抽象。拓扑的流的来源。Spout通常从外部数据源读取数据。转换为t敺内部的源数据。
主要方法:
nextTuple() -》 发出一个新的元组到拓扑
ack()
fail()

Bolt:转接头
Bolt是对流的处理节点。Bolt作用:过滤、业务、连接运算

Topology:拓扑
是一个实时的应用程序
永远运行除非被杀死
Spout到Bolt是一个连接流

5、Storm的运行机制

Nimbus-Supervisor

  • 整个处理流程的组织协调不用用户去关心,用户只需要去定义每一个步骤中的具体业务处理逻辑
  • 具体执行任务的角色是Worker,Worker执行任务时具体的行为则有我们定义的业务逻辑决定

Storm物理集群结构

6、Storm的集群安装配置

(1)Storm集群安装部署
1)准备工作

hsiehchou121 hsiehchou122 hsiehchou123
storm01 storm02 storm03

2)下载安装包
http://storm.apache.org/downloads.html

3)上传

4)解压
tar -zxvf apache-storm-1.1.0.tar.gz
mv apache-storm-1.1.0 storm

5)设置环境变量
#STORM_HOME
export STORM_HOME=/root/hd/storm
export PATH=$STORM_HOME/bin:$PATH
source /etc/profile

6)修改配置文件
$ vi storm.yaml

#设置Zookeeper的主机名称
storm.zookeeper.servers:
- "hsiehchou121"
- "hsiehchou122"
- "hsiehchou123"

#设置主节点的主机名称
nimbus.seeds: ["hsiehchou121"]

#设置Storm的数据存储路径
storm.local.dir: "/root/hd/storm/data"

#设置Worker的端口号
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
注意:如果要搭建Storm的HA,只需要在nimbus.seeds设置多个nimbus即可
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
mkdir data

7)分发到其他机器
[root@hsiehchou121 hd]# scp -r storm/ hsiehchou122:$PWD
[root@hsiehchou121 hd]# scp -r storm/ hsiehchou123:$PWD

7、启动和查看Storm

0)启动ZooKeeper
zkServer.sh start

1)在nimbus.host所属的机器上启动 nimbus服务和logviewer服务

启动nimbus

  • storm nimbus &

*启动logviewer *

  • storm logviewer &

2)在nimbus.host所属的机器上启动ui服务
启动ui界面

  • storm ui &

3)在其它个节点上启动supervisor服务和logviewer服务
启动supervisor

  • storm supervisor &

启动logviewer

  • storm logviewer &

4)查看Storm集群:访问nimbus.host:/8080,即可看到storm的ui界面
http://hsiehchou121:8080/index.html

8、Storm的常用命令

有许多简单且有用的命令可以用来管理拓扑,它们可以提交、杀死、禁用、再平衡拓扑

1)查看命令帮助
storm help

2)查看版本
storm version

3)查看当前正在运行拓扑及其状态
storm list

4)提交任务命令格式
storm jar 【jar路径】 【拓扑包名.拓扑类名】 【拓扑名称】
storm jar [/路径/.jar][全类名][拓扑名称]

5)杀死任务命令格式
storm kill 【拓扑名称】 -w 10
(执行kill命令时可以通过-w [等待秒数]指定拓扑停用以后的等待时间)
storm kill topology-name -w 10

6)停用任务命令格式
storm deactivte 【拓扑名称】
storm deactivate topology-name

7)启用任务命令格式
storm activate【拓扑名称】
storm activate topology-name

8)重新部署任务命令格式
storm rebalance 【拓扑名称】
storm rebalance topology-name
再平衡使你重分配集群任务。这是个很强大的命令。比如,你向一个运行中的集群增加了节点。再平衡命令将会停用拓扑,然后在相应超时时间之后重分配工人,并重启拓扑

二、Storm编程案例

1、WordCount及流程分析

通过查看Storm UI上每个组件的events链接,可以查看Storm的每个组件(spout、blot)发送的消息。但Storm的event logger的功能默认是禁用的,需要在配置文件中设置:topology.eventlogger.executors: 1,具体说明如下:

  • “topology.eventlogger.executors”: 0 默认,禁用
  • “topology.eventlogger.executors”: 1 一个topology分配一个Event Logger
  • “topology.eventlogger.executors”: nil 每个worker.分配一个Event Logger

WordCount的数据流程分析

WordCount的数据流程分析

2、Storm编程案例:WordCount

流式计算一般架构图:

流式计算一般架构图

  • Flume用来获取数据
  • Kafka用来临时保存数据
  • Strom用来计算数据
  • Redis是个内存数据库,用来保存数据

代码编写:

  1. 创建Spout(WordCountSpout)组件采集数据,作为整个Topology的数据源

WordCountSpout类

package com.hsiehchou.wc;

import java.util.Map;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

/**
 * 需求:单词计数  Hello ni hao! Hello China!
 * 
 * 实现接口:IRichSpout  IRichBolt
 * 继承抽象类:BaseRichSpout  BaseRichBolt
 * @author hsiehchou
 */
public class WordCountSpout extends BaseRichSpout {

    //定义收集器
    private SpoutOutputCollector collector;

    //发送数据
    @Override
    public void nextTuple() {
        //1.发送数据
        collector.emit(new Values("I am a boy!"));
        //2.设置延迟
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    //创建收集器
    @Override
    public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
        this.collector = collector;
    }

    //声明
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declare) {
        //起别名
        declare.declare(new Fields("hsiehchou"));
    }
}
  1. 创建Bolt(WordCountSplitBolt)组件进行分词操作

WordCountSplitBolt类

package com.hsiehchou.wc;

import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class WordCountSplitBolt extends BaseRichBolt {

    //数据继续发送到下一个bolt
    private OutputCollector collector;

    //业务逻辑
    @Override
    public void execute(Tuple in) {
        //1.获取数据
        String line = in.getStringByField("hsiehchou");

        //2.切分数据
        String[] fields = line.split(" ");

        //3.<单词,1>发送出去,下一个bolt(累加求和)
        for(String w:fields) {
            collector.emit(new Values(w,1));
        }
    }

    //初始化
    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
        this.collector = collector;
    }

    //声明描述
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declare) {
        declare.declare(new Fields("word","sum"));
    }
}
  1. 创建Bolt(WordCountBoltCount)组件进行单词计数作

WordCountBoltCount类

package com.hsiehchou.wc;

import java.util.HashMap;
import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

public class WordCountBoltCount extends BaseRichBolt {

    private Map<String, Integer> map = new HashMap<String, Integer>();

    //累加求和
    @Override
    public void execute(Tuple in) {
        //1.获取数据
        String word = in.getStringByField("word");
        Integer sum = in.getIntegerByField("sum");

        //2.业务处理
        if(map.containsKey(word)) {
            //之前出现的次数
            Integer count = map.get(word);
            //已有的
            map.put(word, count + sum);
        }else {
            map.put(word, sum);
        }

        //3.打印控制台
        System.err.println(Thread.currentThread().getId() + "单位为:" + word + "\t 当前出现次数为:" + map.get(word));    
    }

    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer arg0) {

    }
}
  1. 也可以将主程序Topology(WordCountTopology)提交到Storm集群运行

WordCountTopology类

package com.hsiehchou.wc;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class WordCountTopology {

    public static void main(String[] args) {
        //1.hadoop --> Job Storm --> Topology 创建拓扑
        TopologyBuilder builder = new TopologyBuilder();

        //2.指定设置
        builder.setSpout("WordCountSpout", new WordCountSpout(), 1);
        builder.setBolt("WordCountSplitBolt", new WordCountSplitBolt(), 4).fieldsGrouping("WordCountSpout", new Fields("hsiehchou"));
        builder.setBolt("WordCountBoltCount", new WordCountBolt(), 2).fieldsGrouping("WordCountSplitBolt", new Fields("word"));

        //3.创建配置信息
        Config conf = new Config();

        //4.提交任务
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("wordcounttopology", conf, builder.createTopology());
    }
}

3、集群部署

对WordCountDriver类进行修改,把本地模式修改为集群模式

public class WordCountDriver {
        //集群模式运行
        try {
            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
            e.printStackTrace();
        }

        //4.提交任务
        //LocalCluster localCluster = new LocalCluster();
        //localCluster.submitTopology("wordcounttopology", conf, builder.createTopology());
}

提交到集群

storm jar StormWordCount.jar com.hsiehchou.wc.WordCountDriver wordcount01

三、分组策略

1)fields Grouping

按照字段分组
相同字段发送到一个task中
fieldsGrouping
builder.setBolt(“WordCountSplitBolt”, new WordCountSplitBolt(), 4).fieldsGrouping(“WordCountSpout”, new Fields(“hsiehchou”));

builder.setBolt(“WordCountBolt”, new WordCountBolt(), 2).fieldsGrouping(“WordCountSplitBolt”, new Fields(“word”));

2)shuffle Grouping

随机分组
轮询。平均分配。随机分发tuple,保证每个bolt中的tuple数量相同
shuffleGrouping
builder.setBolt(“WordCountSplitBolt”, new WordCountSplitBolt(), 4).shuffleGrouping(“WordCountSpout”);

builder.setBolt(“WordCountBolt”, new WordCountBolt(), 2).shuffleGrouping(“WordCountSplitBolt”);

3)None Grouping

不分组
采用这种策略每个bolt中接收的单词不同
noneGrouping
builder.setBolt(“WordCountSplitBolt”, new WordCountSplitBolt(), 4).noneGrouping(“WordCountSpout”);

builder.setBolt(“WordCountBolt”, new WordCountBolt(), 2).noneGrouping(“WordCountSplitBolt”);

4)All Grouping

广播发送
tuple分发给每一个bolt
allGrouping
builder.setBolt(“WordCountSplitBolt”, new WordCountSplitBolt(), 4).allGrouping(“WordCountSpout”);

builder.setBolt(“WordCountBolt”, new WordCountBolt(), 2).allGrouping(“WordCountSplitBolt”);

5)Global Grouping

全局分组
分配给task id值最小的
根据线程id判断,只分配给线程id最小的
builder.setBolt(“WordCountSplitBolt”, new WordCountSplitBolt(), 4).globalGrouping(“WordCountSpout”);

builder.setBolt(“WordCountBolt”, new WordCountBolt(), 2).globalGrouping(“WordCountSplitBolt”);


文章作者: 谢舟
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 谢舟 !
 上一篇
大数据图片汇总 大数据图片汇总
1、大数据课程概述与大数据背景知识(1)数据仓库与大数据 (2)PageRank (3)MR基本原理 (4)HDFS原理 (5)bigtable与Habase 2、搭建Hadoop的环境(1)1.PNG (2)2.PNG (3)3.PNG
2019-05-06
下一篇 
Hadoop的HA高可用 Hadoop的HA高可用
Hadoop的HA高可用(可行) 一、集群的规划ZooKeeper集群 192.168.116.121 192.168.116.122 192.168.116.123 hsiehchou121 hsiehchou122 hsie
2019-04-29
  目录