HBase 操作


1、HBase API操作

1)首先将core-site.xml、hbase-site.xml、hdfs-site.xml引入maven工程的resources下面

2)配置pom.xml文件
增加hbase依赖

<dependencies>
   <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-server</artifactId>
       <version>1.3.0</version>
   </dependency>

   <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-client</artifactId>
       <version>1.3.0</version>
   </dependency>
</dependencies>

3)创建HbaseTest.java

package com.hsiehchou.hbase;

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.hbase.*; 
import org.apache.hadoop.hbase.client.*; 
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException; 
import java.util.ArrayList; 
import java.util.List;

public class HbaseTest { 
    //配置信息 
    public static Configuration conf; 
    //获取配置信息 
    static{ 
        //alt + enter 
        conf = HBaseConfiguration.create(); 
    }

判断HBase中表是否存在

//1.判断HBase中表是否存在
public static boolean isExist(String tableName) throws IOException{
    //对表操作需要用HbaseAdmin
    //HBaseAdmin admin = new HBaseAdmin(conf);老版本
    Connection connection = ConnectionFactory.createConnection(conf);
    //管理器
    HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
    return  admin.tableExists(TableName.valueOf(tableName));
}

在HBase中创建表

//2.在HBase中创建表
public static void createTable(String tableName, String... columnFamily) throws IOException {
    //1.如果对表操作需要使用管理器
    Connection connection = ConnectionFactory.createConnection(conf);
    HBaseAdmin admin = (HBaseAdmin)connection.getAdmin();

    //2.创建描述器
    HTableDescriptor hd = new HTableDescriptor(TableName.valueOf(tableName));

    //3.指定多个列族
    for(String cf:columnFamily){
        hd.addFamily(new HColumnDescriptor(cf));
    }

    //4.创建表
    admin.createTable(hd);
    System.out.println("表已经创建成功!!!!");
}

bin/hbase shell操作
list
scan ‘ni’
describe ‘ni’

向表中添加数据

//3,向表中添加数据 put   rowkey  cf:列族
public static void addData(String tableName, String rowkey, String cf, String column, String value) throws IOException {
  Connection connection = ConnectionFactory.createConnection(conf);
  Table table = connection.getTable(TableName.valueOf(tableName));

  //添加数据 put方式
  Put put = new Put(Bytes.toBytes(rowkey));

  //指定列族 列 值
  put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(value));

  table.put(put);
}

删除一行数据

//4.删除一行数据
public static void deleteRow(String tableName, String rowkey) throws IOException {
    Connection connection = ConnectionFactory.createConnection(conf);
    Table table = connection.getTable(TableName.valueOf(tableName));

    Delete delete = new Delete(Bytes.toBytes(rowkey));

    table.delete(delete);
}

删除多个rowkey的数据

//5.删除多个rowkey的数据
public static void deleteMore(String tableName, String... rowkey) throws IOException {
    Connection connection = ConnectionFactory.createConnection(conf);
    Table table = connection.getTable(TableName.valueOf(tableName));

    //封装delete
    List<Delete> d = new ArrayList<Delete>();

    //遍历rowkey
    for(String rk:rowkey){
        Delete dd = new Delete(Bytes.toBytes(rk));
        d.add(dd);
    }

    table.delete(d);
}

全表扫描

//6.全表扫描
public static void scanAll(String tableName) throws IOException {
    Connection connection = ConnectionFactory.createConnection(conf);
    Table table = connection.getTable(TableName.valueOf(tableName));

    Scan scan = new Scan();

    ResultScanner rs = table.getScanner(scan);

    //遍历
    for(Result r:rs){
        //单元格
        Cell[] cells = r.rawCells();

        for(Cell c:cells) {
            System.out.println("rowkey为:" + Bytes.toString(CellUtil.cloneRow(c)));
            System.out.println("列族为:" + Bytes.toString(CellUtil.cloneFamily(c)));
            System.out.println("值为:" + Bytes.toString(CellUtil.cloneValue(c)));
        }
    }
}

删除表

//7.删除表
public static void deleteTable(String tableName) throws IOException {
    //1.如果对表操作需要使用管理器
    Connection connection = ConnectionFactory.createConnection(conf);
    HBaseAdmin admin = (HBaseAdmin)connection.getAdmin();

    admin.disableTable(tableName);

    admin.deleteTable(TableName.valueOf(tableName));
}
public static void main(String[] args) throws IOException { 
    //System.out.println(isExist(“user”)); 
    //create ‘表名’,’列族名’ 
    //createTable(“ni”,”info1”,”info2”,”info3”); 
    //addData(“ni”,”shanghai”,”info1”,”name”,”lilei”); 
    //deleteRow(“ni”,”shanghai”); 
    //deleteMore(“ni”,”shanghai1”,”shanghai2”);

    //scanAll(“ni”); 
    deleteTable(“ni”); 
} 
}

2、HBase-MR

HBase主要擅长的领域是存储数据,不擅长分析数据

HBase如果想计算的话需要结合Hadoop的MapReduce

HBase-MR所需的jar包查看
bin/hbase mapredcp

配置临时环境变量

export HBASE_HOME=/root/hd/hbase-1.3.0
export HADOOP_HOME=/root/hd/hadoop-2.8.4
export HADOOP_CLASSPATH=${HBASE_HOME}/bin/hbase mapredcp
跑hbase-mr程序
bin/yarn jar /root/hd/hbase-1.3.0/lib/hbase-server-1.3.0.jar rowcounter user

3、HBase的表操作

场景一
region分片
指定列的过滤
name age high
name

代码实现
ReadLoveMapper.java

package com.hsiehchou.mr;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;

/**
 * HBase -MR
 * mapper类进行对数据的读取操作
 * key:ImmutableBytesWritable hbase中的rowkey
 * value:封装的一条条的数据
 */
public class ReadLoveMapper extends TableMapper<ImmutableBytesWritable, Put> {
    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
        //1.读取数据  根据rowkey拿到数据
        Put put = new Put(key.get());

        //2.过滤列 Cell单元格
        for (Cell c:value.rawCells()){
            //拿到info列族数据  如果是info列族  取出  如果不是info 过滤掉
            if("info".equals(Bytes.toString(CellUtil.cloneFamily(c)))){
                //过滤列
                if("name".equals(Bytes.toString(CellUtil.cloneQualifier(c)))){
                    put.add(c);
                }
            }
        }
        //3.输出到reducer端
        context.write(key,put);
    }
}

WriteLoveReducer .java

package com.hsiehchou.mr;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;

import java.io.IOException;

/**
 * keyIn:ImmutableBytesWritable
 * valueIn:Put
 * keyOut:NullWritable(在put里面已经有了rowkey了,所以不需要了)
 */
public class WriteLoveReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
        for (Put p:values){
            context.write(NullWritable.get(),p);
        }
    }
}

LoverDriver .java

package com.hsiehchou.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class LoverDriver implements Tool {

    private Configuration conf;

    public void setConf(Configuration configuration) {
        this.conf = HBaseConfiguration.create(configuration);
    }

    public Configuration getConf() {
        return this.conf;
    }

    public int run(String[] strings) throws Exception {
        //1.创建任务
        Job job = Job.getInstance(conf);

        //2.指定运行的主类
        job.setJarByClass(LoverDriver.class);

        //3.配置job
        Scan scan = new Scan();

        //4.设置具体运行的mapper类
        TableMapReduceUtil.initTableMapperJob("love",
                    scan,
                    ReadLoveMapper.class,
                    ImmutableBytesWritable.class,
                    Put.class,
                    job
                );

        //5.设置具体运行的Reducer类
        TableMapReduceUtil.initTableReducerJob("lovemr",
                    WriteLoveReducer.class,
                    job
                );

        //6.设置reduceTask
        job.setNumReduceTasks(1);

        boolean rs = job.waitForCompletion(true);

        return rs?0:1;
    }

    public static void main(String[] args) {
        try {
            //状态码
            int sts = ToolRunner.run(new LoverDriver(), args);
            System.exit(sts);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

场景二

把HDFS中的数据导入到HBase表中
HBase-MR

代码实现

ReadHdfsMapper .java

package com.hsiehchou.mr1;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * 读取hdfs中的数据
 * hdfs ->hbase
 */
public class ReadHdfsMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //1.读取数据
        String line = value.toString();

        //2.切分数据
        String[] fields = line.split("\t");

        //3.封装数据
        byte[] rowkey = Bytes.toBytes(fields[0]);
        byte[] name = Bytes.toBytes(fields[1]);
        byte[] desc = Bytes.toBytes(fields[2]);

        //4.封装成put
        Put put = new Put(rowkey);
        put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),name);
        put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("desc"),desc);

        //5.输出到reducer
        context.write(new ImmutableBytesWritable(rowkey),put);
    }
}

WriteHbaseReducer.java

package com.hsiehchou.mr1;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;

import java.io.IOException;

public class WriteHbaseReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {

    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
        for(Put p:values){
            context.write(NullWritable.get(),p);
        }
    }
}

LoveDriver.java

package com.hsiehchou.mr1;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class LoveDriver implements Tool {

    private Configuration conf = null;

    public void setConf(Configuration configuration) {
        this.conf = HBaseConfiguration.create(configuration);
    }

    public Configuration getConf() {
        return this.conf;
    }

    public int run(String[] strings) throws Exception {
        //1.创建job
        Job job = Job.getInstance();
        job.setJarByClass(LoveDriver.class);

        //2.配置mapper
        job.setMapperClass(ReadHdfsMapper.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);

        //3.配置reducer
        TableMapReduceUtil.initTableReducerJob("lovehdfs", WriteHbaseReducer.class, job);

        //4.输入配置 hdfs读数据 inputformat
        FileInputFormat.addInputPath(job,new Path("/lovehbase/"));

        //5.需要配置outputformat吗?不需要 reducer中已经指定了表

        return job.waitForCompletion(true)? 0:1;
    }

    public static void main(String[] args) {
        try {
            int sts = ToolRunner.run(new LoveDriver(),args);
            System.exit(sts);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

4、HBase优化

1)预分区问题
region分片?表很大 bigtable

分布式?数据量大

region存储数据,如果有多个region,每个region负责维护一部分的rowkey{startrowkey, endrowkey}
110001
1
2001 1980
2001~40002

分多少片?提前规划好,提高hbase的性能
进行存储数据前做好rowkey的预分区优化hbase

实际操作:
create ‘user_p’,’info’,’partition’,SPLITS =>[‘201’,’202’,’203’,’204’]

Table Regions

Region Server Start Key End Key
hsiehchou123:16020 -∞ 201
hsiehchou124:16020 201 202
hsiehchou124:16020 202 203
hsiehchou123:16020 203 204
hsiehchou122:16020 204 +∞

create ‘user_pppp’,’partition’,SPLITS_FILE => ‘partitions.txt’

partitions.txt’放在hbase-shell路径下

2)rowkey如何设计
rowkey是数据的唯一标识,这条数据存储在哪个分区由预分区范围决定

合理设计rowkey
如一份数据分为5个region存储
但是我们需要尽可能的保持每个region中的数据量差不多

尽可能的打散数据,平均分配到每个region中即可

解决方案:
生成随机数、hash/散列值
原本的rowkey是201,hash后
dfgyfugpgdcjhgfd11412nod
202变为:
21dqddwdgjohfxsovbxiufq12

字符串拼接:
20190316_a3d4
20190316_g04f

反转字符串:
201903161->161309102
201903162->261309102

3)HBase基础优化
HBase用的HDFS存储
DataNode允许最大文件打开数
默认4096 调大
dfs.datanode.max.transfer.threads
hdfs-site.xml

优化等待时间
dfs.image.transfer.timeout
默认60000毫秒
调大

内存优化:
hadoop-env.sh设置内存的堆大小
30%~40%最好

2G
512m

export HADOOP_PORTMAP_OPTS=’-Xmx512m $HADOOP_PORTMAP_OPTS’


文章作者: 谢舟
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 谢舟 !
 上一篇
ElasticSearch(一) ElasticSearch(一)
1. 全文检索技术简介什么是搜索?搜索,就是在任何场景下,找寻你想要的信息,这个时候,会输入一段你要搜索的关键字,然后就期望找到这个关键字相关的有些信息 如何实现搜索?OA系统,比如:通过名字搜索员工等等mysql :select * fr
2019-03-18
下一篇 
HBase基础 HBase基础
1、hbasegoogle:gfs –> hdfsmapreduce –> mapreducebigtable –> hbase Apache HBase™是Hadoop数据库,是一个分布式,可扩展的大数据存储 当您需要对
2019-03-12
  目录