1、数据压缩发生阶段
端 | 操作 | Col3 |
---|---|---|
数据源 | 》数据传输 | 数据压缩 |
mapper | map端输出压缩 | |
》数据传输 | 数据压缩 | |
reducer | reduce端输出压缩 | |
》数据传输 | 数据压缩 | |
结果数据 |
设置map端输出压缩
1)开启压缩
conf.setBoolean
//开启map端输出压缩
conf.setBoolean(“mapreduce.map.output.compress”,true);
2)设置具体压缩编码
conf.setClass
//设置压缩方式
//conf.setClass(“mapreduce.map.output.compress.codec”, BZip2Codec.class, CompressionCodec.class);
conf.setClass(“mapreduce.map.output.compress.codec”, DefaultCodec.class, CompressionCodec.class);
设置reduce端输出压缩
1)设置reduce输出压缩
FileOutputFormat.setCompressOutput
//设置reduce端输出压缩
FileOutputFormat.setCompressOutput(job,true);
2)设置具体压缩编码
FileOutputFormat.setOutputCompressorClass
//设置压缩方式
//FileOutputFormat.setOutputCompressorClass(job,BZip2Codec.class);
//FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
FileOutputFormat.setOutputCompressorClass(job,DefaultCodec.class);
hive数据仓库:mapreduce 用hsql处理大数据
2、压缩编码使用场景
1-> Gzip压缩方式
压缩率比较高,并且压缩解压缩速度很快
hadoop自身支持的压缩方式,用gzip格式处理数据就像直接处理文本数据是完全一样
的;
在linux系统自带gzip命令,使用很方便简洁
不支持split
使用每个文件压缩之后大小需要在128M以下(块大小)
200M-》设置块大小
2->LZO压缩方式
压缩解压速度比较快并且,压缩率比较合理
支持split
在linux系统不可以直接使用,但是可以进行安装
压缩率比gzip和bzip2要弱,hadoop本身不支持
需要安装
3->Bzip2压缩方式
支持压缩,具有很强的压缩率。hadoop本身支持
linux中可以安装
压缩解压缩速度很慢
4->Snappy压缩方式
压缩解压缩速度很快,而且有合理的压缩率
不支持split
3、数据倾斜
reduce join
数据倾斜就是我们在计算数据的时候,数据的分散度不够,导致大量的数据集中到了一台或者几台机器上计算,这些数据的计算速度远远低于平均计算速度,导致整个计算过程过慢
4、Hadoop中有哪些组件
HDFS:数据的分布式存储
MapReduce:数据的分布式计算
Yarn:资源调度(cpu/内存…)
Yarn节点:resourceManager、nodeManager
5、优化
MapReduce程序的编写过程中考虑的问题
优化目的:提高程序运行的效率
优化方案:
存储和处理海量数据,如何优化MR
影响MR程序的因素
1)硬件
压缩
CPU/磁盘(固态、机械)/内存/网络…
2)I/O优化
传输
-》maptask与reducetask合理设置个数
-》数据倾斜(reducetask-》merge)
避免出现数据倾斜
-》大量小文件情况 (combineTextInputFormat)
-》combiner优化(不影响业务逻辑)
具体优化方式:
MR(数据接入、Map、Reduce、IO传输、处理倾斜、参数优化)
数据接入:小文件的话 进行合并 ,namenode存储元数据信息,sn
解决方式:CombineTextInputFormat
Map:会发生溢写,如果减少溢写次数也能达到优化
溢写内存增加这样就减少了溢写次数
解决方式:mapred-site.xml
属性:
mapreduce.task.io.sort.mb
100
调大
mapreduce.map.sort.spill.percent
0.8
调大
combiner:map后优化
Reduce:reduceTask设置合理的个数
写mr程序可以合理避免写reduce阶段
设置map/reduce共存
属性:
mapred-site.xml
mapreduce.job.reduce.slowstart.completedmaps
0.05
减少
IO传输:压缩
数据倾斜:避免出现数据倾斜,map端合并。手动的对数据进行分段处理,合理的
分区
JVM重用
不关JVM
一个map运行一个jvm,开启重用,在运行完这个map后JVM继续运行其它map。
线程池
属性:mapreduce.job.jvm.numtasks
20
启动40%运行时间
6、进行两个表的拼接
DistributedCacheMapper类
package com.hsiehchou.mapjoin;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
/**
* mapjoin
* 完成两张表数据的关联操作
*/
public class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
HashMap<String, String> pdMap = new HashMap<String, String>();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//1.加载缓存文件
URI[] cacheFiles = context.getCacheFiles();
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(cacheFiles[0].getPath()), "UTF-8"));
//这里可以将文件放在当前项目文件下,如果不放就用上面的那两句
//BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("pd.txt"), "UTF-8"));
String line;
//2.判断缓存文件不为空
while(StringUtils.isNotEmpty(line = br.readLine())){
//切割数据
String[] fields = line.split("\t");
//缓冲 到 集合; 商品ID 商品名
pdMap.put(fields[0],fields[1]);
}
br.close();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1.获取数据
String line = value.toString();
//2.切分数据
String[] fields = line.split("\t");
//3.获取商品的pid,商品名称
String pid = fields[1];
String pName = pdMap.get(pid);
//4.拼接
line = line + "\t" + pName;
//5.输出
context.write(new Text(line),NullWritable.get());
}
}
DistributedCacheDriver类
package com.hsiehchou.mapjoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class DistributedCacheDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
//创建job任务
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//指定jar包位置
job.setJarByClass(DistributedCacheDriver.class);
//关联使用的Mapper
job.setMapperClass(DistributedCacheMapper.class);
//设置最终的输出的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//设置数据输入的路径
FileInputFormat.setInputPaths(job,new Path("e://test//table//in"));
//设置数据输出的路径
FileOutputFormat.setOutputPath(job,new Path("e://test//table//out"));
//加载缓存数据
job.addCacheFile(new URI("file:///e:/test/inputcache/pd.txt"));
//注意:没有跑reducer 需要指定reduceTask为0
job.setNumReduceTasks(0);
//提交任务
boolean rs = job.waitForCompletion(true);
System.exit(rs? 0:1);
}
}
本地模式测试
URI[] cacheFiles = context.getCacheFiles();
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(cacheFiles[0].getPath()), “UTF-8”));
集群模式时
conf.set(“mapreduce.framework.name”, “yarn”);yarn模式
job.addCacheFile(new URI(“hdfs:///test2/pd.txt”));//添加hdfs文件做缓存