Storm练习


Storm练习

一、需求

需求:统计网站访问量(实时统计)

技术选型:特点(数据量大、做计算、实时)

实时计算框架:storm
1)spout
数据源,接入数据
本地文件

2)bolt
业务逻辑处理
切分数据
查到网址

3)bolt
累加次数求和

二、代码编写

  1. PvCountSpout.java
package com.hsiehchou.pvcount;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Map;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

public class PvCountSpout implements IRichSpout {

    private SpoutOutputCollector collector;
    private BufferedReader br;
    private String line;

    @Override
    public void nextTuple() {
        //发送读取数据的每一行
        try {
            while((line = br.readLine()) != null) {
                //发送数据到splitbolt
                collector.emit(new Values(line));
                //设置延迟
                Thread.sleep(500);
            }
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }

    }

    @Override
    public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
        this.collector = collector;

        //读取文件
        try {
            br = new BufferedReader(new InputStreamReader(new FileInputStream("e:/weblog.log")));
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        //声明
        declarer.declare(new Fields("logs"));
    }

    //处理Tuple成功 回调的方法
    @Override
    public void ack(Object arg0) {
    }

    //如果spout在失效的模式中,调用此方法来激活
    @Override
    public void activate() {
    }

    //在spout程序关闭前执行,不能保证一定执行,kill -9是不执行  storm kill是不执行
    @Override
    public void close() {
    }

    //在spout失效期间,nextTuple不会被调用
    @Override
    public void deactivate() {
    }

    //处理Tuple失败回调的方法
    @Override
    public void fail(Object arg0) {
    }

    //配置
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}
  1. PvCountSplitBolt.java
package com.hsiehchou.pvcount;

import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class PvCountSplitBolt implements IRichBolt {

    private OutputCollector collector;
    private int pvnum = 0;

    //一个bolt即将关闭时调用,不能保证一定会被调用
    @Override
    public void cleanup() {

    }

    //业务逻辑-分布式-集群-并发度-线程(接收Tuple然后进行处理)资源清理
    @Override
    public void execute(Tuple input) {
        //1.获取数据
        String line = input.getStringByField("logs");
        //2.切分数据
        String[] fields = line.split("\t");
        String session_id = fields[1];

        //3.局部累加
        if(session_id != null) {
            //列累加
            pvnum++;
            //输出
            collector.emit(new Values(Thread.currentThread().getId(),pvnum));
        }
    }

    //初始化时调用
    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
        this.collector = collector;
    }

    //声明
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        //声明输出字段
        declarer.declare(new Fields("threadid","pvnum"));
    }

    //配置
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}
  1. PvCountBolt.java
package com.hsiehchou.pvcount;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;

public class PvCountBolt implements IRichBolt {


    private HashMap<Long, Integer> hashmap = new HashMap<>();

    @Override
    public void cleanup() {
    }

    //全局累加求和 业务逻辑
    @Override
    public void execute(Tuple input) {
        //1.获取数据
        Long threadid = input.getLongByField("threadid");
        Integer pvnum = input.getIntegerByField("pvnum");

        //2.创建集合 存储(threadid,pvnum)
        hashmap.put(threadid,pvnum);

        //3.累加求和
        Iterator<Integer> iterator = hashmap.values().iterator();

        //4.清空之前的数据
        int sumnum = 0;
        while(iterator.hasNext()) {
            sumnum += iterator.next();
        }

        System.out.println(Thread.currentThread().getName() + "总访问量为:" + sumnum);
    }

    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer arg0) {
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}
  1. PvCountDriver.java
package com.hsiehchou.pvcount;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class PvCountDriver {

    public static void main(String[] args) {

        //1.hadoop --> Job Storm --> Topology 创建拓扑
        TopologyBuilder builder = new TopologyBuilder();


        builder.setSpout("PvCountSpout", new PvCountSpout(), 1);
        //builder.setBolt("PvCountSplitBolt", new PvCountSplitBolt(), 6).setNumTasks(4).shuffleGrouping("PvCountSpout");
        builder.setBolt("PvCountSplitBolt", new PvCountSplitBolt(), 6).setNumTasks(4)
            .fieldsGrouping("PvCountSpout", new Fields("logs"));

        //builder.setBolt("PvCountSumBolt", new PvCountBolt(), 1).shuffleGrouping("PvCountSplitBolt");
        builder.setBolt("PvCountSumBolt", new PvCountBolt(), 1).fieldsGrouping("PvCountSplitBolt", new Fields("pvnum"));

        Config conf = new Config();
        conf.setNumWorkers(1);

        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("pvcountsum", conf, builder.createTopology());
    }
}

文章作者: 谢舟
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 谢舟 !
 上一篇
Flink基础 Flink基础
一、Flink概述官网:http://flink.apache.org/MapReduce->MaxComputeHBase->部门QuickBIDataVHive->高德地图Storm->JStorm 2019年1
2019-05-16
下一篇 
Storm集群和集成 Storm集群和集成
一、Storm集群任务提交流程 二、Storm内部通信机制 三、集成Storm1、与JDBC集成 将Storm Bolt处理的结果插入MySQL数据库中 需要依赖的jar包 $STORM_HOME\external\sql\st
2019-05-12
  目录