1、自定义分区
需求:统计结果进行分区,根据手机号前三位来进行分区
总结:
1)自定义类继承partitioner<key,value>
2)重写方法getPartition()
3)业务逻辑
4)在driver类中加入
setPartitionerClass
5)注意:需要指定setNumReduceTasks(个数=分区数+1)
新增PhonenumPartitioner类
package com.hsiehchou.logs1;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* 自定义分区,根据手机号前三位
* 默认分区方式,hash
*/
public class PhonenumPartitioner extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {
//1.获取手机号的前三位
String phoneNum = key.toString().substring(0, 3);
//2.分区
int partitioner = 4;
if ("135".equals(phoneNum)){
return 0;
}else if ("137".equals(phoneNum)){
return 1;
}else if ("138".equals(phoneNum)){
return 2;
}else if("139".equals(phoneNum)){
return 3;
}
return partitioner;
}
}
FlowCountDriver类中增加
//加入自定义分区
job.setPartitionerClass(PhonenumPartitioner.class);
//注意,结果文件几个?
job.setNumReduceTasks(5);
//7.设置数据输入的路径
FileInputFormat.setInputPaths(job, new Path("E:/test/flow/in"));
//8.设置数据输出的路径
FileOutputFormat.setOutputPath(job, new Path("E:/test/flow/out2"));
2、排序
需求:每个分区内进行排序?
总结:
1)实现WritableComparable接口
2)重写compareTo方法
combineTextInputFormat设置切片的大小 maptask
实现
FlowBean类
package com.hsiehchou.logs2;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements WritableComparable<FlowBean> {
//定义属性:上行流量 下行流量 总流量总和
private long upFlow;
private long dfFlow;
private long flowsum;
public FlowBean(){}
public FlowBean(long upFlow,long dfFlow){
this.upFlow = upFlow;
this.dfFlow = dfFlow;
this.flowsum = upFlow + dfFlow;
}
public long getUpFlow(){
return upFlow;
}
public void setUpFlow(long upFlow){
this.upFlow = upFlow;
}
public long getDfFlow(){
return dfFlow;
}
public void setDfFlow(long dfFlow){
this.dfFlow = dfFlow;
}
public long getFlowsum(){
return flowsum;
}
public void setFlowsum(long flowsum){
this.flowsum = flowsum;
}
//序列化
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(dfFlow);
out.writeLong(flowsum);
}
//反序列化
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
dfFlow = in.readLong();
flowsum = in.readLong();
}
@Override
public String toString() {
return upFlow + "\t" + dfFlow + "\t" + flowsum;
}
public int compareTo(FlowBean o) {
//倒序
return this.flowsum > o.getFlowsum() ? -1:1;
}
}
FlowSortMapper类
package com.hsiehchou.logs2;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowSortMapper extends Mapper<LongWritable,Text,FlowBean,Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1.接入数据
String line = value.toString();
//2.切割 \t
String[] fields = line.split("\t");
//3.拿到关键字段:手机号 上行流量 下行流量
String phoneNr = fields[0];
long upFlow = Long.parseLong(fields[1]);
long dfFlow = Long.parseLong(fields[2]);
//4.写出到reducer
context.write(new FlowBean(upFlow,dfFlow),new Text(phoneNr));
}
}
FlowSortReducer类
package com.hsiehchou.logs2;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//手机号 流量
context.write(values.iterator().next(),key);
}
}
FlowSortPartitioner类
package com.hsiehchou.logs2;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class FlowSortPartitioner extends Partitioner<FlowBean, Text> {
@Override
public int getPartition(FlowBean key, Text value, int numPartitions) {
//1.获取手机号的前三位
String phoneNum = value.toString().substring(0, 3);
//2.分区
int partitioner = 4;
if ("135".equals(phoneNum)){
return 0;
}else if ("137".equals(phoneNum)){
return 1;
}else if ("138".equals(phoneNum)){
return 2;
}else if("139".equals(phoneNum)){
return 3;
}
return partitioner;
}
}
FlowSortDriver类
package com.hsiehchou.logs2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowSortDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1.创建job任务
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2.指定kjar包位置
job.setJarByClass(FlowSortDriver.class);
//3.关联使用的Mapper
job.setMapperClass(FlowSortMapper.class);
//4.关联使用的Reducer类
job.setReducerClass(FlowSortReducer.class);
//5.设置mapper阶段输出的数据类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
//6.设置reducer阶段输出的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//加入自定义分区
job.setPartitionerClass(FlowSortPartitioner.class);
//注意,结果文件几个
job.setNumReduceTasks(5);
//7.设置数据输入的路径
FileInputFormat.setInputPaths(job, new Path("E:/test/flow/out"));
//8.设置数据输出的路径
FileOutputFormat.setOutputPath(job, new Path("E:/test/flow/out4"));
//9.提交任务
boolean rs = job.waitForCompletion(true);
System.exit(rs? 0:1);
}
}
3、combiner 合并
1)combiner是一个组件
注意:是Mapper和Reducer之外的一种组件
但是这个组件的父类是Reduer
2)如果想使用combiner继承Reduer即可
3)通过编写combiner发现与Reducer代码相同
只需在Driver端指定
setCombinerClass(WordCountReduer.class)
注意:前提是不能影响业务逻辑<a,1><c,1> <a,2><a,1> = <a,3>
数学运算:
(3 + 5 + 7)/3 = 5
(2 + 6)/2 = 4
不进行局部累加:(3 + 5 + 7 + 2 + 6)/5 = 23/5
进行了局部累加:(5+4)/2 = 9/2=4.5 不等于 23/5=4.6
4、数据压缩
为什么对数据进行压缩?
MapReduce操作需要对大量数据进行传输
压缩技术有效的减少底层存储系统读写字节数,HDFS
压缩提高网络带宽和磁盘空间效率
数据压缩节省资源,减少网络I/O
通过压缩可以影响到MapReduce性能。(小文件优化,combiner)代码角度进行优化
注意:利用好压缩提高性能,运用不好会降低性能
压缩 -》 解压缩
mapreduce常用的压缩编码
压缩格式 | 是否需要安装 | 文件拓展名 | 是否可以切分 |
---|---|---|---|
DEFAULT | 直接使用 | .deflate | 否 |
bzip2 | 直接使用 | .bz2 | 是 |
Gzip | 直接使用 | .gz | 否 |
LZO | 需要安装 | .lzo | 是 |
Snappy | 需要安装 | .snappy | 否 |
性能测试
压缩格式 | 原文件大小 | 压缩后大小 | 压缩速度 | 解压速度 |
---|---|---|---|---|
gzip | 8.3GB | 1.8GB | 20MB/s | 60MB/s |
LZO | 8.3GB | 3GB | 50MB/s | 70MB/s |
bzip2 | 8.3GB | 1.1GB | 3MB/s | 10MB/s |