数据压缩、数据倾斜join操作


1、数据压缩发生阶段

操作 Col3
数据源 》数据传输 数据压缩
mapper map端输出压缩
》数据传输 数据压缩
reducer reduce端输出压缩
》数据传输 数据压缩
结果数据

设置map端输出压缩
1)开启压缩
conf.setBoolean
//开启map端输出压缩
conf.setBoolean(“mapreduce.map.output.compress”,true);

2)设置具体压缩编码
conf.setClass
//设置压缩方式
//conf.setClass(“mapreduce.map.output.compress.codec”, BZip2Codec.class, CompressionCodec.class);

conf.setClass(“mapreduce.map.output.compress.codec”, DefaultCodec.class, CompressionCodec.class);

设置reduce端输出压缩
1)设置reduce输出压缩
FileOutputFormat.setCompressOutput

//设置reduce端输出压缩
FileOutputFormat.setCompressOutput(job,true);

2)设置具体压缩编码
FileOutputFormat.setOutputCompressorClass

//设置压缩方式
//FileOutputFormat.setOutputCompressorClass(job,BZip2Codec.class);

//FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

FileOutputFormat.setOutputCompressorClass(job,DefaultCodec.class);
hive数据仓库:mapreduce 用hsql处理大数据

2、压缩编码使用场景

1-> Gzip压缩方式

压缩率比较高,并且压缩解压缩速度很快
hadoop自身支持的压缩方式,用gzip格式处理数据就像直接处理文本数据是完全一样
的;
在linux系统自带gzip命令,使用很方便简洁
不支持split
使用每个文件压缩之后大小需要在128M以下(块大小)
200M-》设置块大小

2->LZO压缩方式

压缩解压速度比较快并且,压缩率比较合理
支持split
在linux系统不可以直接使用,但是可以进行安装
压缩率比gzip和bzip2要弱,hadoop本身不支持
需要安装

3->Bzip2压缩方式

支持压缩,具有很强的压缩率。hadoop本身支持
linux中可以安装
压缩解压缩速度很慢

4->Snappy压缩方式

压缩解压缩速度很快,而且有合理的压缩率
不支持split

3、数据倾斜

reduce join
数据倾斜就是我们在计算数据的时候,数据的分散度不够,导致大量的数据集中到了一台或者几台机器上计算,这些数据的计算速度远远低于平均计算速度,导致整个计算过程过慢

4、Hadoop中有哪些组件

HDFS:数据的分布式存储
MapReduce:数据的分布式计算
Yarn:资源调度(cpu/内存…)
Yarn节点:resourceManager、nodeManager

5、优化

MapReduce程序的编写过程中考虑的问题
优化目的:提高程序运行的效率
优化方案:
存储和处理海量数据,如何优化MR
影响MR程序的因素
1)硬件
压缩
CPU/磁盘(固态、机械)/内存/网络…

2)I/O优化
传输
-》maptask与reducetask合理设置个数
-》数据倾斜(reducetask-》merge)
避免出现数据倾斜
-》大量小文件情况 (combineTextInputFormat)
-》combiner优化(不影响业务逻辑)

具体优化方式:
MR(数据接入、Map、Reduce、IO传输、处理倾斜、参数优化)
数据接入:小文件的话 进行合并 ,namenode存储元数据信息,sn
解决方式:CombineTextInputFormat

Map:会发生溢写,如果减少溢写次数也能达到优化
溢写内存增加这样就减少了溢写次数
解决方式:mapred-site.xml
属性:
mapreduce.task.io.sort.mb
100
调大

mapreduce.map.sort.spill.percent
0.8
调大

combiner:map后优化

Reduce:reduceTask设置合理的个数
写mr程序可以合理避免写reduce阶段
设置map/reduce共存
属性:
mapred-site.xml
mapreduce.job.reduce.slowstart.completedmaps
0.05
减少

IO传输:压缩
数据倾斜:避免出现数据倾斜,map端合并。手动的对数据进行分段处理,合理的
分区

JVM重用
不关JVM
一个map运行一个jvm,开启重用,在运行完这个map后JVM继续运行其它map。
线程池
属性:mapreduce.job.jvm.numtasks
20
启动40%运行时间

6、进行两个表的拼接

DistributedCacheMapper类

package com.hsiehchou.mapjoin;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
/**
 * mapjoin
 * 完成两张表数据的关联操作
 */
public class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    HashMap<String, String> pdMap = new HashMap<String, String>();
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        //1.加载缓存文件
        URI[] cacheFiles = context.getCacheFiles();
        BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(cacheFiles[0].getPath()), "UTF-8"));
        //这里可以将文件放在当前项目文件下,如果不放就用上面的那两句
        //BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("pd.txt"), "UTF-8"));

        String line;
        //2.判断缓存文件不为空
        while(StringUtils.isNotEmpty(line = br.readLine())){
            //切割数据
            String[] fields = line.split("\t");
            //缓冲 到 集合; 商品ID  商品名
            pdMap.put(fields[0],fields[1]);
        }
        br.close();
    }
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //1.获取数据
        String line = value.toString();
        //2.切分数据
        String[] fields = line.split("\t");
        //3.获取商品的pid,商品名称
        String pid = fields[1];
        String pName = pdMap.get(pid);
        //4.拼接
        line = line + "\t" + pName;
        //5.输出
        context.write(new Text(line),NullWritable.get());
    }
}

DistributedCacheDriver类

package com.hsiehchou.mapjoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class DistributedCacheDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
      //创建job任务
      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf);
      //指定jar包位置
      job.setJarByClass(DistributedCacheDriver.class);
      //关联使用的Mapper
      job.setMapperClass(DistributedCacheMapper.class);
      //设置最终的输出的数据类型
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(NullWritable.class);
      //设置数据输入的路径
      FileInputFormat.setInputPaths(job,new Path("e://test//table//in"));
      //设置数据输出的路径
      FileOutputFormat.setOutputPath(job,new Path("e://test//table//out"));
      //加载缓存数据
      job.addCacheFile(new URI("file:///e:/test/inputcache/pd.txt"));
      //注意:没有跑reducer  需要指定reduceTask为0
      job.setNumReduceTasks(0);
      //提交任务
      boolean rs = job.waitForCompletion(true);
      System.exit(rs? 0:1);
    }
}

本地模式测试
URI[] cacheFiles = context.getCacheFiles();
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(cacheFiles[0].getPath()), “UTF-8”));

集群模式时
conf.set(“mapreduce.framework.name”, “yarn”);yarn模式
job.addCacheFile(new URI(“hdfs:///test2/pd.txt”));//添加hdfs文件做缓存


文章作者: 谢舟
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 谢舟 !
 上一篇
大数据常用基本算法 大数据常用基本算法
1、冒泡排序冒泡排序(Bubble Sort),是一种计算机科学领域的较简单的排序算法,它重复地走访过要排序的元素列,依次比较两个相邻的元素,如果他们的顺序(如从大到小、首字母从A到Z)错误就把他们交换过来。走访元素的工作是重复地进行直到没
2019-02-18
下一篇 
大数据之排序、combiner、压缩 大数据之排序、combiner、压缩
1、自定义分区需求:统计结果进行分区,根据手机号前三位来进行分区总结:1)自定义类继承partitioner<key,value>2)重写方法getPartition()3)业务逻辑4)在driver类中加入setPartiti
  目录