大数据之MapReduce小实战


手写wordcount的程序

1、pom.xml

  <dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs-client -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.3</version>
        </dependency>
    </dependencies>

2、新建Mapper类

package com.hsiehchou.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
 * 海量数据
 *
 * hello hsiehchou
 * nihao
 *
 * 数据的输入与输出以Key value进行传输
 * keyIN:LongWritable(Long) 数据的起始偏移量
 * valuewIN:具体数据
 *
 * mapper需要把数据传递到reducer阶段(<hello,1>)
 * keyOut:单词 Text
 * valueOut:出现的次数IntWritable
 *
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    //对数据进行打散 ctrl+o
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //1、接入数据 hello nihao
        String line = value.toString();
        //2、对数据进行切分
        String[] words = line.split(" ");
        //3、写出以<hello,1>
        for (String w:words){
            //写出reducer端
            context.write(new Text(w), new IntWritable(1));
        }
    }
}

mapper端原理

mapper端原理

3、新建Reducer类

package com.hsiehchou.wordcount;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
 * reducer阶段接收的是Mapper输出的数据
 * mapper的输出是reducer输入
 *
 * keyIn:mapper输出的key的类型
 * valueIn:mapper输出的value的类型
 *
 * reducer端输出的数据类型,想要一个什么样的结果<hello,1888>
 * keyOut:Text
 * valueOut:IntWritalble
 *
 */
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    //key-->单词  value-->次数
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        //1、记录出现的次数
        int sum = 0;
        for (IntWritable v:values){
            sum += v.get();
        }
        //2、l累加求和输出
        context.write(key, new IntWritable(sum));
    }
}

4、新建驱动类

package com.hsiehchou.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //1、创建job任务
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        //2、指定jar包位置
        job.setJarByClass(WordCountDriver.class);
        //3、关联使用的Mapper类
        job.setMapperClass(WordCountMapper.class);
        //4、关联使用的Reducer类
        job.setReducerClass(WordCountReducer.class);
        //5、设置mapper阶段输出的数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //6、设置reducer阶段输出的数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //7、设置数据输入的路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //8设置数据输出的路径
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //9、提交任务
        boolean rs = job.waitForCompletion(true);
        System.exit(rs ? 0:1);
    }
}

运行结果
[root@hsiehchou121 ~]# hadoop jar mapreduce-1.0-SNAPSHOT.jar com.hsiehchou.wordcount.WordCountDriver /wc/in /wc/out
[root@hsiehchou121 ~]# hdfs dfs -cat /wc/out/part-r-00000
fd 1
fdgs 1
fdsbv 1
gd 1
hello 3

5、IDEA的相关使用

Ctrl+O导入相关未实现的方法
Maven中的Lifecycle的package可以直接打包成jar

案例分析
需求:运营商流量日志
10086
计算每个用户当前使用的总流量
思路?总流量 = 上行流量+下行流量
三个字段:手机号 上行流量 下行流量
技术选型:PB+
数据分析:海量数据(存储hdfs)
海量数据计算(分布式计算框架MapReduce)

4、实现

FlowBean类

package com.hsiehchou.logs;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
 * 封装数据类型需要怎么做
 * hadoop数据类型实现了序列化接口
 * 如果自定义需要实现这个序列化接口
 */
public class FlowBean implements Writable {
    //定义属性:上行流量 下行流量 总流量总和
    private long upFlow;
    private long dfFlow;
    private long flowsum;

    public FlowBean(){}
    public FlowBean(long upFlow, long dfFlow){
        this.upFlow = upFlow;
        this.dfFlow = dfFlow;
        this.flowsum = upFlow + dfFlow;
    }
    public long getUpFlow(){
        return upFlow;
    }

    public void setUpFlow(long upFlow){
        this.upFlow = upFlow;
    }
    public long getDfFlow(){
        return dfFlow;
    }
    public void setDfFlow(long dfFlow){
        this.dfFlow = dfFlow;
    }
    public long getFlowsum(){
        return flowsum;
    }
    public void setFlowsum(long flowsum){
        this.flowsum = flowsum;
    }
    //序列化
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(dfFlow);
        out.writeLong(flowsum);
    }
    //反序列化
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        dfFlow = in.readLong();
        flowsum = in.readLong();
    }
    @Override
    public String toString() {
        return upFlow + "\t" + dfFlow + "\t" + flowsum;
    }
}

FlowCountMapper类

package com.hsiehchou.logs;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
 * keyIN:
 * valueIN:
 *
 * 思路:根据想要的结果的kv类型  手机号  流量总和(上行+下行)自定义类
 * keyOut:
 * valueOut:
 */
public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //1、接入数据源
        String line = value.toString();
        //2、切割   \t
        String[] fields = line.split("\t");
        //3、拿到关键字段
        String phoneNr = fields[1];
        long upFlow = Long.parseLong(fields[fields.length - 3]);
        long dfFlow = Long.parseLong(fields[fields.length - 2]);
        //4、写出到reducer
        context.write(new Text(phoneNr), new FlowBean(upFlow,dfFlow));
    }
}

FlowCountReducer类

package com.hsiehchou.logs;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
        long upFlow_sum = 0;
        long dfFlow_sum = 0;
        for (FlowBean v:values){
            upFlow_sum += v.getUpFlow();
            dfFlow_sum += v.getDfFlow();
        }
        FlowBean rsSum = new FlowBean(upFlow_sum, dfFlow_sum);
        //输出结果
        context.write(key, rsSum);
    }
}

FlowCountDriver类

package com.hsiehchou.logs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //1.创建job任务
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        //2.指定kjar包位置
        job.setJarByClass(FlowCountDriver.class);
        //3.关联使用的Mapper
        job.setMapperClass(FlowCountMapper.class);
        //4.关联使用的Reducer类
        job.setReducerClass(FlowCountReducer.class);
        //5.设置mapper阶段输出的数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        //6.设置reducer阶段输出的数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        //优化含有大量小文件的数据
        //设置读取数据切片的类
        job.setInputFormatClass(CombineTextInputFormat.class);
        //最大切片大小8M
        CombineTextInputFormat.setMaxInputSplitSize(job, 8388608);
        //最小切片大小6M
        CombineTextInputFormat.setMinInputSplitSize(job, 6291456);
        //7.设置数据输入的路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //8.设置数据输出的路径
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //9.提交任务
        boolean  rs = job.waitForCompletion(true);
        System.exit(rs? 0:1);
    }
}

运行结果
[root@hsiehchou121 ~]# hdfs dfs -mkdir -p /flow/in
[root@hsiehchou121 ~]# hdfs dfs -put HTTP_20180313143750.dat /flow/in
[root@hsiehchou121 ~]# hadoop jar mapreduce-1.0-SNAPSHOT.jar com.hsiehchou.logs.FlowCountDriver /flow/in /flow/out
[root@hsiehchou121 ~]# hdfs dfs -cat /flow/out/part-r-00000
13480253104 120 1320 1440
13502468823 735 11349 12084
13510439658 1116 954 2070
13560436326 1136 94 1230
13560436666 1136 94 1230
13560439658 918 4938 5856
13602846565 198 910 1108
13660577991 660 690 1350
13719199419 240 0 240
13726130503 299 681 980
13726238888 2481 24681 27162
13760778710 120 120 240
13822544101 264 0 264
13884138413 4116 1432 5548
13922314466 3008 3720 6728
13925057413 11058 4243 15301
13926251106 240 0 240
13926435656 132 1512 1644
15013685858 369 338 707
15889002119 938 380 1318
15920133257 316 296 612
18212575961 1527 2106 3633
18320173382 9531 212 9743

小文件优化

如果企业中存在海量的小文件数据
TextInputFormat按照文件规划切片,文件不管多小都是一个单独的切片,启动mapt
ask任务去执行,这样会产生大量的maptask,浪费资源

优化手段

小文件合并大文件,如果不动这个小文件内容


文章作者: 谢舟
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 谢舟 !
 上一篇
大数据之排序、combiner、压缩 大数据之排序、combiner、压缩
1、自定义分区需求:统计结果进行分区,根据手机号前三位来进行分区总结:1)自定义类继承partitioner<key,value>2)重写方法getPartition()3)业务逻辑4)在driver类中加入setPartiti
下一篇 
大数据基础之HDFS3 大数据基础之HDFS3
1、hdfs的副本的配置修改hdfs-site.xml文件 <!-- 注释配置数据块的冗余度,默认是3 --> <property> <name>dfs.replication</name&
2019-02-12
  目录