Storm练习
一、需求
需求:统计网站访问量(实时统计)
技术选型:特点(数据量大、做计算、实时)
实时计算框架:storm
1)spout
数据源,接入数据
本地文件
2)bolt
业务逻辑处理
切分数据
查到网址
3)bolt
累加次数求和
二、代码编写
- PvCountSpout.java
package com.hsiehchou.pvcount;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
public class PvCountSpout implements IRichSpout {
private SpoutOutputCollector collector;
private BufferedReader br;
private String line;
@Override
public void nextTuple() {
//发送读取数据的每一行
try {
while((line = br.readLine()) != null) {
//发送数据到splitbolt
collector.emit(new Values(line));
//设置延迟
Thread.sleep(500);
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
this.collector = collector;
//读取文件
try {
br = new BufferedReader(new InputStreamReader(new FileInputStream("e:/weblog.log")));
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//声明
declarer.declare(new Fields("logs"));
}
//处理Tuple成功 回调的方法
@Override
public void ack(Object arg0) {
}
//如果spout在失效的模式中,调用此方法来激活
@Override
public void activate() {
}
//在spout程序关闭前执行,不能保证一定执行,kill -9是不执行 storm kill是不执行
@Override
public void close() {
}
//在spout失效期间,nextTuple不会被调用
@Override
public void deactivate() {
}
//处理Tuple失败回调的方法
@Override
public void fail(Object arg0) {
}
//配置
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
- PvCountSplitBolt.java
package com.hsiehchou.pvcount;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class PvCountSplitBolt implements IRichBolt {
private OutputCollector collector;
private int pvnum = 0;
//一个bolt即将关闭时调用,不能保证一定会被调用
@Override
public void cleanup() {
}
//业务逻辑-分布式-集群-并发度-线程(接收Tuple然后进行处理)资源清理
@Override
public void execute(Tuple input) {
//1.获取数据
String line = input.getStringByField("logs");
//2.切分数据
String[] fields = line.split("\t");
String session_id = fields[1];
//3.局部累加
if(session_id != null) {
//列累加
pvnum++;
//输出
collector.emit(new Values(Thread.currentThread().getId(),pvnum));
}
}
//初始化时调用
@Override
public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
this.collector = collector;
}
//声明
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//声明输出字段
declarer.declare(new Fields("threadid","pvnum"));
}
//配置
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
- PvCountBolt.java
package com.hsiehchou.pvcount;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
public class PvCountBolt implements IRichBolt {
private HashMap<Long, Integer> hashmap = new HashMap<>();
@Override
public void cleanup() {
}
//全局累加求和 业务逻辑
@Override
public void execute(Tuple input) {
//1.获取数据
Long threadid = input.getLongByField("threadid");
Integer pvnum = input.getIntegerByField("pvnum");
//2.创建集合 存储(threadid,pvnum)
hashmap.put(threadid,pvnum);
//3.累加求和
Iterator<Integer> iterator = hashmap.values().iterator();
//4.清空之前的数据
int sumnum = 0;
while(iterator.hasNext()) {
sumnum += iterator.next();
}
System.out.println(Thread.currentThread().getName() + "总访问量为:" + sumnum);
}
@Override
public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
- PvCountDriver.java
package com.hsiehchou.pvcount;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
public class PvCountDriver {
public static void main(String[] args) {
//1.hadoop --> Job Storm --> Topology 创建拓扑
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("PvCountSpout", new PvCountSpout(), 1);
//builder.setBolt("PvCountSplitBolt", new PvCountSplitBolt(), 6).setNumTasks(4).shuffleGrouping("PvCountSpout");
builder.setBolt("PvCountSplitBolt", new PvCountSplitBolt(), 6).setNumTasks(4)
.fieldsGrouping("PvCountSpout", new Fields("logs"));
//builder.setBolt("PvCountSumBolt", new PvCountBolt(), 1).shuffleGrouping("PvCountSplitBolt");
builder.setBolt("PvCountSumBolt", new PvCountBolt(), 1).fieldsGrouping("PvCountSplitBolt", new Fields("pvnum"));
Config conf = new Config();
conf.setNumWorkers(1);
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("pvcountsum", conf, builder.createTopology());
}
}