电信大数据


一、项目背景

通信运营商每时每刻会产生大量的通信数据,例如通话记录,短信记录,彩信记录,第三方服务资费等等繁多信息。数据量如此巨大,除了要满足用户的实时查询和展示之外,还需要定时定期的对已有数据进行离线的分析处理。例如,当日话单,月度话单,季度话单,年度话单,通话详情,通话记录等等+。我们以此为背景,寻找一个切入点,学习其中的方法论

二、项目架构

电信项目架构

三、项目实现

系统环境

系统 版本
windows 10 专业版
linux CentOS7.2 1611内核

开发工具

工具 版本
idea 2018.2.5旗舰版
maven 3.3.9
JDK 1.8+

尖叫提示:idea2018.2.5必须使用Maven3.3.9,不要使用Maven3.5,有部分兼容性问题

四、数据生产

此情此景,对于该模块的业务,即数据生产过程,一般并不会让你来进行操作,数据生产是一套完整且严密的体系,这样可以保证数据的鲁棒性。但是如果涉及到项目的一体化方案的设计(数据的产生、存储、分析、展示),则必须清楚每一个环节是如何处理的,包括其中每个环境可能隐藏的问题;数据结构,数据内容可能出现的问题

1、数据结构

我们将在HBase中存储两个电话号码,以及通话建立的时间和通话持续时间,最后再加上一个flag作为判断第一个电话号码是否为主叫。姓名字段的存储我们可以放置于另外一张表做关联查询,当然也可以插入到当前表中

列名 解释 举例
caller 第一个手机号码 15369468720
callerName 第一个手机号码人姓名(非必须) 李雁
callee 第二个手机号码 19920860202
calleename 第二个手机号码人姓名(非必须) 卫艺
dateTime 建立通话的时间 20181126091236
date_time_ts 建立通话的时间(时间戳形式)
duration 通话持续时间(秒) 0820
flag 用于标记本次通话第一个字段(caller)是主叫还是被叫 1为主叫,0为被叫

2、编写代码

思路
a)创建Java集合类存放模拟的电话号码和联系人;
b) 随机选取两个手机号码当做“主叫”与“被叫”(注意判断两个手机号不能重复),产出callercall2字段数据;
c) 创建随机生成通话建立时间的方法,可指定随机范围,最后生成通话建立时间,产出date_time字段数据;
d)随机一个通话时长,单位:秒,产出duration字段数据;
e)将产出的一条数据拼接封装到一个字符串中;
f)使用IO操作将产出的一条通话数据写入到本地文件中;

新建module项目:ct_producer

父pom.xml文件配置

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/junit/junit -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.12.4</version>
                <configuration>
                    <skipTests>true</skipTests>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

        </plugins>
    </build>

(1)随机输入一些手机号码以及联系人,保存于Java的集合中
新建类:ProductLog

package producer;

import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.text.DecimalFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;

public class ProductLog {

    String startTime = "2018-01-01";
    String endTime = "2018-12-31";

    //存放tel的List集合
    private List<String>  phoneList = new ArrayList<String>();
    //存放tel和Name的Map集合
    private Map<String, String> phoneNameMap = new HashMap<>();

    /**
     * 初始化数据
     */
    public void initPhone(){
        phoneList.add("17078388295");
        phoneList.add("13980337439");
        phoneList.add("14575535933");
        phoneList.add("18902496992");
        phoneList.add("18549641558");
        phoneList.add("17005930322");
        phoneList.add("18468618874");
        phoneList.add("18576581848");
        phoneList.add("15978226424");
        phoneList.add("15542823911");
        phoneList.add("17526304161");
        phoneList.add("15422018558");
        phoneList.add("17269452013");
        phoneList.add("17764278604");
        phoneList.add("15711910344");
        phoneList.add("15714728273");
        phoneList.add("16061028454");
        phoneList.add("16264433631");
        phoneList.add("17601615878");
        phoneList.add("15897468949");

        phoneNameMap.put("17078388295", "李为");
        phoneNameMap.put("13980337439", "王军");
        phoneNameMap.put("14575535933", "时俊");
        phoneNameMap.put("18902496992", "天机");
        phoneNameMap.put("18549641558", "蔡铭");
        phoneNameMap.put("17005930322", "陶尚");
        phoneNameMap.put("18468618874", "魏山帅");
        phoneNameMap.put("18576581848", "华倩");
        phoneNameMap.put("15978226424", "焦君山");
        phoneNameMap.put("15542823911", "钟尾田");
        phoneNameMap.put("17526304161", "司可可");
        phoneNameMap.put("15422018558", "官渡");
        phoneNameMap.put("17269452013", "上贵坡");
        phoneNameMap.put("17764278604", "时光机");
        phoneNameMap.put("15711910344", "李发");
        phoneNameMap.put("15714728273", "蒂冈");
        phoneNameMap.put("16061028454", "范德");
        phoneNameMap.put("16264433631", "周朝王");
        phoneNameMap.put("17601615878", "谢都都");
        phoneNameMap.put("15897468949", "刘何思");
    }

(2)创建随机生成通话时间的方法:randomDate
该时间生成后的格式为yyyy-MM-dd HH:mm:ss,并使之可以根据传入的起始时间和结束时间来随机生成

   /**
     * 注:传入时间要在时间[startTime, endTime]
     * 公式:起始时间 + (结束时间 - 起始时间)* Math.random()
     * @param startTime
     * @param endTime
     */
    private String randomBuildTime(String startTime, String endTime) {

        try {
            SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");
            Date startDate = sdf1.parse(startTime);
            Date endDate = sdf1.parse(endTime);

            if(endDate.getTime() <= startDate.getTime()){
                return null;
            }
            //公式:起始时间 + (结束时间 - 起始时间)* Math.random()
            long randomTs = startDate.getTime() + (long) ((endDate.getTime() - startDate.getTime()) * Math.random());

            Date resultDate = new Date(randomTs);
            SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            String resultTimeString = sdf2.format(resultDate);

            return resultTimeString;
        } catch (ParseException e) {
            e.printStackTrace();
        }

        return null;
    }

(3)创建生产日志一条日志的方法:productLog
随机抽取两个电话号码,随机产生通话建立时间,随机通话时长,将这几个字段拼接成一个字符串,然后return,便可以产生一条通话的记录。需要注意的是,如果随机出的两个电话号码一样,需要重新随机(随机过程可优化,但并非此次重点)。通话时长的随机为20分钟以内,即:60秒 * 30,并格式化为4位数字,例如:0600(10分钟)

   /**
     * 产生数据
     * 格式: caller,callee,buildTime,duration
     * @return
     */
    public String product(){
        //ctrl + d 复制此行 , ctrl + x 剪切此行 ,ctrl + y 删除此行
        //主叫
        String caller = null;
        String callerName = null;
        //被叫
        String callee = null;
        String calleeName = null;

        //ctrl + alt + v 推导出前面的对象类型  Home前  End后
        int callerIndex = (int) (Math.random() * phoneList.size());
        caller = phoneList.get(callerIndex);
        callerName = phoneNameMap.get(caller);

        while(true) {
            //ctrl + shift + 下  :下移这行
            int calleeIndex = (int) (Math.random() * phoneList.size());
            callee = phoneList.get(calleeIndex);
            calleeName = phoneNameMap.get(callee);
            if(!caller.equals(callee)) break;
        }

        //第三个字段
        String buildTime = randomBuildTime(startTime, endTime);

        //第四个字段,最多时长
        DecimalFormat df = new DecimalFormat("0000");
        String duration = df.format((int) 30 * 60 * Math.random());

        StringBuilder sb = new StringBuilder();
        sb.append(caller + ",").append(callee + ",").append(buildTime + ",").append( duration);

        return sb.toString();
    }

(4)创建写入日志方法:writeLog
productLog每产生一条日志,便将日志写入到本地文件中,所以建立一个专门用于日志写入的方法,需要涉及到IO操作,需要注意的是,输出流每次写一条日之后需要flush,不然可能导致积攒多条数据才输出一次。最后需要将productLog方法放置于while死循环中执行

/**
 * 把数据写到文件当中
 * @param filePath
 */
public void writeLog(String filePath){
    try {
        OutputStreamWriter osw = new OutputStreamWriter(new FileOutputStream(filePath, true), "UTF-8");
        while(true){
            Thread.sleep(200);
            String log = product();
            System.out.println(log);
             //一定要手动flush才可以确保每条数据都写入到文件一次
            osw.write(log + "\n");
            osw.flush();
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

(5)在主函数中初始化以上逻辑,并测试:

 public static void main(String[] args) {
        //args = new String[]{"E:\\CT Project file\\calllog.csv"};
        if(args == null || args.length <= 0){
            System.out.println("没写路径");
            return ;
        }
        ProductLog productLog = new ProductLog();
        productLog.initPhone();
        productLog.writeLog(args[0]);
    }

3、打包测试

1)Maven打包方式
分别在Windows上和Linux中进行测试:
java -cp jar包的绝对路径 全类名 输出路径

2)将此包放在在/opt/jar下面,并写如下脚本

product.sh

#!bin.bash
java -cp /opt/jars/CT_producer-1.0-SNAPSHOT.jar producer.ProductLog /opt/jars/calllog.csv

3)运行
sh product.sh
产生calllog.csv文件

五、数据采集/消费(存储)

欢迎来到数据采集模块(消费),在企业中你要清楚流式数据采集框架Flume和Kafka的定位是什么。我们在此需要将实时数据通过Flume采集到Kafka然后供给给HBase消费

Flume:Cloudera公司研发
适合下游数据消费者不多的情况;
适合数据安全性要求不高的操作;
适合与Hadoop生态圈对接的操作

Kafka:linkedin公司研发
适合数据下游消费众多的情况;
适合数据安全性要求较高的操作(支持replication);

因此我们常用的一种模型是
线上数据 –> Flume –> Kafka –> Flume(根据情景增删该流程) –> HDFS

消费存储模块流程图

消费存储模块流程图

1、数据采集:采集实时产生的数据到kafka集群

0)基础配置

  • 配置Kafka 略
  • 配置Flume(flume2kafka.conf)
# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/jars/calllog.csv
a1.sources.r1.shell = /bin/bash -c

# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = hsiehchou121:9092,hsiehchou122:9092,hsiehchou123:9092
a1.sinks.k1.topic = calllog
a1.sinks.k1.batchSize = 20
a1.sinks.k1.requiredAcks = 1

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

1)进入Flume根目录下,启动flume
/opt/module/flume-1.8.0/bin/flume-ng agent –conf /opt/module/flume-1.8.0/conf/ –name a1 –conf-file /opt/jars/flume2kafka.conf

2)运行生产日志的任务脚本,观察kafka控制台消费者是否成功显示产生的数据
$ sh productlog.sh

2、编写代码:数据消费(HBase)

如果以上操作均成功,则开始编写操作HBase的代码,用于消费数据,将产生的数据实时存储在HBase中

思路
a) 编写Kafka消费者,读取kafka集群中缓存的消息,并打印到控制台以观察是否成功;

b)既然能够读取到kafka中的数据了,就可以将读取出来的数据写入到HBase中,所以编写调用HBaseAPI相关方法,将从Kafka中读取出来的数据写入到HBase;

c) 以上两步已经足够完成消费数据,存储数据的任务,但是涉及到解耦,所以过程中需要将一些属性文件外部化,HBase通用性方法封装到某一个类中

创建新的module项目:ct_consumer

pom.xml文件配置

<dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.3.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.3.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.13</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.12.4</version>
                <configuration>
                    <skipTests>true</skipTests>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

1)新建类:HBaseConsumer(kafka的package)
该类主要用于读取kafka中缓存的数据,然后调用HBaseAPI,持久化数据

package kafka;

import hbase.HBaseDao;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import utils.PropertiesUtil;

import java.util.Arrays;

public class HBaseConsumer {
    public static void main(String[] args) {
        //消费者API
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(PropertiesUtil.properties);
        //kafka Topic
        kafkaConsumer.subscribe(Arrays.asList(PropertiesUtil.getProperty("kafka.topics")));

        //创建写入HBase的对象
        HBaseDao hd = new HBaseDao();
        while(true) {
            //消费拉取数据
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
            //遍历打印数据
            for(ConsumerRecord<String, String> cr : records){
                String value = cr.value();
                //13980337439,16264433631,2018-02-08 10:27:32,1740
                System.out.println(value);
                //把数据写入到HBase中
                hd.put(value);
            }
        }
    }
}

2) 新建类:PropertiesUtil(utils的package)
该类主要用于将常用的项目所需的参数外部化,解耦,方便配置

package utils;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

public class PropertiesUtil {
    public static Properties properties = null;

    static {
        //ctrl + alt + v
        InputStream is = ClassLoader.getSystemResourceAsStream("hbase_consumer.properties");
        properties = new Properties();
        try {
            properties.load(is);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static String getProperty(String key){
        return properties.getProperty(key);
    }
}

3) 创建kafka.properties文件,并放置于resources目录下

# 设置kafka的brokerlist
bootstrap.servers=hsiehchou121:9092,hsiehchou122:9092,hsiehchou123:9092
# 设置消费者所属的消费组
group.id=hbase_consumer_group
# 设置是否自动确认offset
enable.auto.commit=true
# 自动确认offset的时间间隔
auto.commit.interval.ms=30000
# 设置key,value的反序列化类的全名
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# 以下为自定义属性设置
# 设置本次消费的主题
kafka.topics=calllog

# 设置HBase的一些变量
hbase.calllog.regions=6
hbase.calllog.namespace=ns_ct
hbase.calllog.tablename=ns_ct:calllog

4)将hdfs-site.xml、core-site.xml、hbase-site.xml、log4j.properties放置于resources目录

5)新建类:HBaseUtil(utils的package)
该类主要用于封装一些HBase的常用操作,比如创建命名空间,创建表等等

package utils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.text.DecimalFormat;
import java.util.Iterator;
import java.util.TreeSet;

/**
 * 1、NameSpace ====>  命名空间
 * 2、createTable ===> 表
 * 3、isTable   ====>  判断表是否存在
 * 4、Region、RowKey、分区键
 */
public class HBaseUtil {

    /**
     * 初始化命名空间
     *
     * @param conf  配置对象
     * @param namespace 命名空间的名字
     */
    public static void initNameSpace(Configuration conf, String namespace) throws IOException {
        //获取链接connection
        Connection connection = ConnectionFactory.createConnection(conf);
        //获取admin对象
        Admin admin = connection.getAdmin();

        //创建命名空间,命名空间描述器
        NamespaceDescriptor nd = NamespaceDescriptor
                .create(namespace)
                //add配置信息不强制加
                .addConfiguration("create_time", String.valueOf(System.currentTimeMillis()))
                .build();

        //通过admin对象创建namespace
        admin.createNamespace(nd);

        close(admin,connection);
    }

    /**
     * 初始化表
     *
     * @param conf
     * @param tableName
     * @param regions
     * @param columnFamily
     */
    public static void createTable(Configuration conf, String tableName, int regions, String... columnFamily) throws IOException {
        //获取链接connection
        Connection connection = ConnectionFactory.createConnection(conf);
        //获取admin对象
        Admin admin = connection.getAdmin();

        //如果表已存在,就返回
        if (isExistTable(conf, tableName)){
            return ;
        }

        //创建表对象
        HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));

        for (String cf : columnFamily){
            htd.addFamily(new HColumnDescriptor(cf));
        }

        //添加协处理器的全类名
        htd.addCoprocessor("hbase.CalleeWriteObserver");

        //通过admin创建表(htd(列族),分裂的regions)
        admin.createTable(htd, getSplitKeys(regions));

        //关闭
        close(admin, connection);

    }

    /**
     * 分区
     *
     * @param regions
     * @return
     */
    private static byte[][] getSplitKeys(int regions) {
        //第一步:定义分区键数组
        String[] keys = new String[regions];

        //分区位数格式化
        DecimalFormat df = new DecimalFormat("00");

        //  00|01|02|03|04|05
        for (int i = 0; i < regions; i++){
            keys[i] = df.format(i) + "|";
        }

        //第二步
        byte[][] splitsKeys = new byte[regions][];

        //分区间有序
        TreeSet<byte[]> treeSet = new TreeSet<>(Bytes.BYTES_COMPARATOR);
        for (int i = 0; i < regions; i++){
            treeSet.add(Bytes.toBytes(keys[i]));
        }

        //第三步
        Iterator<byte[]> splitKeysIterator = treeSet.iterator();
        int index = 0;
        while (splitKeysIterator.hasNext()){
            byte[] next = splitKeysIterator.next();
            splitsKeys[index++] = next;
        }

        return splitsKeys;
    }


    /**
     * 判断表是否存在
     *
     * @param conf
     * @param tableName
     */
    public static boolean isExistTable(Configuration conf, String tableName) throws IOException {
        //获取链接connection
        Connection connection = ConnectionFactory.createConnection(conf);
        //获取admin对象
        Admin admin = connection.getAdmin();
        //判断表API
        boolean b = admin.tableExists(TableName.valueOf(tableName));
        //关闭
        close(admin, connection);

        return b;
    }

    /**
     * 关闭
     *
     * @param admin
     * @param connection
     */
    public static void close(Admin admin, Connection connection) throws IOException {
        if (admin != null) {
            admin.close();
        }
        if (connection != null) {
            connection.close();
        }
    }

    /**
     * regionCode, caller, buildTime, callee, flag, duration
     * regionCode(rowkey前的离散串)
     * duration(通话建立时间)
     * 主叫(flag:1):13980337439,16264433631,2018-02-08 10:27:32,1740   ==>f1列族
     * 被叫(flag:0):16264433631,13980337439,2018-02-08 10:27:32,1740   ==>f2列族
     *
     * 面试常问rowkey相关的问题:你们公司如何设计的RowKey?怎么设计RowKey才能避免热点问题(频繁访问某个区)?
     *
     * @param regionCode 散列的键
     * @param caller     叫
     * @param buildTime  建立时间
     * @param callee     被叫
     * @param flag       标明是主叫还是被叫
     * @param duration   通话持续时间
     * @return
     */
    public static String getRowKey(String regionCode, String caller, String buildTime, String callee, String flag, String duration){
        StringBuilder sb = new StringBuilder();
        sb.append(regionCode + "_")
                .append(caller + "_")
                .append(buildTime + "_")
                .append(callee + "_")
                .append(flag + "_")
                .append(duration);

        return sb.toString();
    }

    /**
     * 当数据进入HBase的Region的时候是足够的离散
     *
     * @param caller 主叫
     * @param buildTime 通话建立时间
     * @param regions region个数
     * @return 返回分区号
     */
    public static String getRegionCode(String caller, String buildTime, int regions){
        //取出主叫的后四位,lastPhone caller最后的后四位
        String lastPhone = caller.substring(caller.length() - 4);

        //取出年月   2018-02-08 10:27:32,1740 中取出年月
        String yearMonth = buildTime
                .replaceAll("-", "")
                .replaceAll(":", "")
                .replaceAll(" ", "")
                .substring(0, 6);

        //离散操作1:做异或处理 ^
        Integer x = Integer.valueOf(lastPhone) ^ Integer.valueOf(yearMonth);

        //离散操作2:把离散1的值再做hashcode
        int y = x.hashCode();

        //最终想要的分区号
        int regionCode = y % regions;

        DecimalFormat df = new DecimalFormat("00");

        return df.format(regionCode);
    }
}

6)新建类:ConnectionInstance(utils的package)

package utils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;

import java.io.IOException;

public class ConnectionInstance {
    private static Connection conn;

    public static synchronized Connection getConnection(Configuration configuration) {
        try {
            if (conn == null || conn.isClosed()) {
                conn = ConnectionFactory.createConnection(configuration);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return conn;
    }
}

7)新建类:HBaseDAO(完成以下内容后,考虑数据put的效率如何优化)(hbase的package)
该类主要用于执行具体的保存数据的操作,rowkey的生成规则等等

package hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import utils.ConnectionInstance;
import utils.HBaseUtil;
import utils.PropertiesUtil;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;

public class HBaseDao {

    public static final Configuration CONF;
    private String namespace;
    private int regions;
    private String tableName;
    private HTable table;
    private Connection connection;
    private SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    private SimpleDateFormat sdf2 = new SimpleDateFormat("yyyyMMddHHmmss");

    //用来存放一小堆数据(30行),用于优化
    private List<Put> cacheList = new ArrayList<>();

    static {
        CONF = HBaseConfiguration.create();
    }

    //Alt + Insert  Constructor
    /**
     * 用于构造命名空间和表
     */
    public HBaseDao() {
        try {
            namespace = PropertiesUtil.getProperty("hbase.calllog.namespace");
            tableName = PropertiesUtil.getProperty("hbase.calllog.tablename");
            regions = Integer.valueOf(PropertiesUtil.getProperty("hbase.calllog.regions"));

            if (!HBaseUtil.isExistTable(CONF, tableName)){
                HBaseUtil.initNameSpace(CONF, namespace);
                HBaseUtil.createTable(CONF, tableName, regions, "f1", "f2");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     *
     * @param value 13980337439,16264433631,2018-02-08 10:27:32,1740
     */
    public void put(String value) {
        try {
            if(cacheList.size() == 0){
                connection = ConnectionInstance.getConnection(CONF);
                table = (HTable) connection.getTable(TableName.valueOf(tableName));
                table.setAutoFlushTo(false);
                table.setWriteBufferSize(2 * 1024 * 1024);
            }

            //如果出现下标越界异常
            String[] splitValue = value.split(",");
            String caller = splitValue[0];
            String callee = splitValue[1];
            String buildTime = splitValue[2];
            String duration = splitValue[3];

            //散列得分区号
            String regionCode = HBaseUtil.getRegionCode(caller, buildTime, regions);

            //这个变量用于插入到HBase的列中
            String buildTimeReplace = sdf2.format(sdf1.parse(buildTime));
            //作为rowkey所需的参数
            String buildTimeTs = String.valueOf(sdf1.parse(buildTime).getTime());

            String rowkey = HBaseUtil.getRowKey(regionCode, caller, buildTimeReplace, callee, "1", duration);

            Put put = new Put(Bytes.toBytes(rowkey));
            //通过put对象添加rowkey和列值,参数说明:(列族:f1),(列名:caller),(列值:caller)
            //快捷键:ctrl + d
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("caller"), Bytes.toBytes(caller));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("callee"), Bytes.toBytes(callee));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("build_time"), Bytes.toBytes(buildTimeReplace));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("build_time_ts"), Bytes.toBytes(buildTimeTs));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("flag"), Bytes.toBytes("1"));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("duration"), Bytes.toBytes(duration));

            //把rowkey,列族,列名,列值放到cacheList的对象中
            cacheList.add(put);

            if(cacheList.size() >= 30) {
                table.put(cacheList);
                table.flushCommits();

                table.close();
                cacheList.clear();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

8)新建类:HBaseDao(hbase的package)

package hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import utils.ConnectionInstance;
import utils.HBaseUtil;
import utils.PropertiesUtil;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;

public class HBaseDao {

    public static final Configuration CONF;
    private String namespace;
    private int regions;
    private String tableName;
    private HTable table;
    private Connection connection;
    private SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    private SimpleDateFormat sdf2 = new SimpleDateFormat("yyyyMMddHHmmss");

    //用来存放一小堆数据(30行),用于优化
    private List<Put> cacheList = new ArrayList<>();

    static {
        CONF = HBaseConfiguration.create();
    }

    //Alt + Insert  Constructor
    /**
     * 用于构造命名空间和表
     */
    public HBaseDao() {
        try {
            namespace = PropertiesUtil.getProperty("hbase.calllog.namespace");
            tableName = PropertiesUtil.getProperty("hbase.calllog.tablename");
            regions = Integer.valueOf(PropertiesUtil.getProperty("hbase.calllog.regions"));

            if (!HBaseUtil.isExistTable(CONF, tableName)){
                HBaseUtil.initNameSpace(CONF, namespace);
                HBaseUtil.createTable(CONF, tableName, regions, "f1", "f2");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     *
     * @param value 13980337439,16264433631,2018-02-08 10:27:32,1740
     */
    public void put(String value) {
        try {
            if(cacheList.size() == 0){
                connection = ConnectionInstance.getConnection(CONF);
                table = (HTable) connection.getTable(TableName.valueOf(tableName));
                table.setAutoFlushTo(false);
                table.setWriteBufferSize(2 * 1024 * 1024);
            }

            //如果出现下标越界异常
            String[] splitValue = value.split(",");
            String caller = splitValue[0];
            String callee = splitValue[1];
            String buildTime = splitValue[2];
            String duration = splitValue[3];

            //散列得分区号
            String regionCode = HBaseUtil.getRegionCode(caller, buildTime, regions);

            //这个变量用于插入到HBase的列中
            String buildTimeReplace = sdf2.format(sdf1.parse(buildTime));
            //作为rowkey所需的参数
            String buildTimeTs = String.valueOf(sdf1.parse(buildTime).getTime());

            String rowkey = HBaseUtil.getRowKey(regionCode, caller, buildTimeReplace, callee, "1", duration);

            Put put = new Put(Bytes.toBytes(rowkey));
            //通过put对象添加rowkey和列值,参数说明:(列族:f1),(列名:caller),(列值:caller)
            //快捷键:ctrl + d
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("caller"), Bytes.toBytes(caller));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("callee"), Bytes.toBytes(callee));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("build_time"), Bytes.toBytes(buildTimeReplace));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("build_time_ts"), Bytes.toBytes(buildTimeTs));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("flag"), Bytes.toBytes("1"));
            put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("duration"), Bytes.toBytes(duration));

            //把rowkey,列族,列名,列值放到cacheList的对象中
            cacheList.add(put);

            if(cacheList.size() >= 30) {
                table.put(cacheList);
                table.flushCommits();

                table.close();
                cacheList.clear();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3、运行测试:HBase消费数据

尖叫提示:请将Linux允许打开的文件个数和进程数进行优化,优化RegionServer与Zookeeper会话的超时时间。(参考HBase文档中优化章节)
项目成功后,则将项目打包后在linux中运行测试

打包HBase消费者代码
a) 在windows中,进入工程的pom.xml所在目录下(建议将该工程的pom.xml文件拷贝到其他临时目录中,例如我把pom.xml文件拷贝到了F:\maven-lib\目录下),然后使用mvn命令下载工程所有依赖的jar包
mvn -DoutputDirectory=./lib -DgroupId=com.hsiehchou -DartifactId=ct_consumer -Dversion=r-1.0-SNAPSHOT dependency:copy-dependencies
b) 使用maven打包工程
c) 测试执行该jar包

方案一:推荐,使用通配符,将所有依赖加入到classpath中,不可使用.jar的方式
注意:如果是在Linux中实行,注意文件夹之间的分隔符。自己的工程要单独在cp中指定,不要直接放在maven-lib/lib目录下
java -cp F:\maven-lib\CT_consumerr-1.0-SNAPSHOT.jar;F:\maven-lib\lib\
com.hsiehchou.CT_kafka.HBaseConsumer

方案二:最最推荐,使用java.ext.dirs参数将所有依赖的目录添加进classpath中
注意:-Djava.ext.dirs=属性后边的路径不能为”~”
java -Djava.ext.dirs=F:\maven-lib\lib\ -cp F:\maven-lib\CT_consumerr-1.0-SNAPSHOT.jar com.hsiehchou.CT_consumer.kafka.HBaseConsumer

4、编写代码:优化数据存储方案

现在我们要使用HBase查找数据时,尽可能的使用rowKey去精准的定位数据位置,而非使用ColumnValueFilter或者SingleColumnValueFilter,按照单元格Cell中的Value过滤数据,这样做在数据量巨大的情况下,效率是极低的——如果要涉及到全表扫描。所以尽量不要做这样可怕的事情。注意,这并非ColumnValueFilter就无用武之地。现在,我们将使用协处理器,将数据一分为二

思路

a)编写协处理器类,用于协助处理HBase的相关操作(增删改查)
b)在协处理器中,一条主叫日志成功插入后,将该日志切换为被叫视角再次插入一次,放入到与主叫日志不同的列族中
c)重新创建hbase表,并设置为该表设置协处理器
d)编译项目,发布协处理器的jar包到hbase的lib目录下,并群发该jar包
e)修改hbase-site.xml文件,设置协处理器,并群发该hbase-site.xml文件

编码

1) 新建协处理器类:CalleeWriteObserver,并覆写postPut方法,该方法会在数据成功插入之后被回调(hbase的package)

package hbase;

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import utils.HBaseUtil;
import utils.PropertiesUtil;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;

public class CalleeWriteObserver extends BaseRegionObserver {

    SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");

    //ctrl + 0
    /**
     * 插入主叫数据后,随即插入被叫数据
     * @param e
     * @param put
     * @param edit
     * @param durability
     * @throws IOException
     */
    @Override
    public void postPut(ObserverContext<RegionCoprocessorEnvironment> e,
                        Put put,
                        WALEdit edit,
                        Durability durability) throws IOException {
        //注意:一定到删除super.postPut(e, put, edit, durability);

        //操作的目标表
        String targetTableName = PropertiesUtil.getProperty("hbase.calllog.tablename");

        //当前操作put后的表
        String currentTableName = e.getEnvironment().getRegionInfo().getTable().getNameAsString();

        //不是同一个表返回
        if (!targetTableName.equals(currentTableName)){
            return;
        }

        //05_18902496992_20180720182543_14575535933_1_0076
        String oriRowKey = Bytes.toString(put.getRow());

        System.out.println(oriRowKey);

        String[] splitOriRowKey = oriRowKey.split("_");
        String caller = splitOriRowKey[1];
        String callee = splitOriRowKey[3];
        String buildTime = splitOriRowKey[2];
        String duration = splitOriRowKey[5];

        //如果当前插入的是被叫数据,则直接返回(因为默认提供的数据全部为主叫数据)
        String flag = splitOriRowKey[4];

        String calleeflag = "0";

        if (flag.equals(calleeflag) ){
            return;
        }
        flag = calleeflag;

        Integer regions = Integer.valueOf(PropertiesUtil.getProperty("hbase.calllog.regions"));

        String regionCode = HBaseUtil.getRegionCode(callee, buildTime, regions);

        String calleeRowKey = HBaseUtil.getRowKey(regionCode, callee, buildTime, caller, flag, duration);

        String buildTimeTs = "";
        try {
            buildTimeTs = String.valueOf(sdf.parse(buildTime).getTime());
        } catch (ParseException e1) {
            e1.printStackTrace();
        }

        Put calleePut = new Put(Bytes.toBytes(calleeRowKey));
        calleePut.addColumn(Bytes.toBytes("f2"), Bytes.toBytes("callee"), Bytes.toBytes(caller));
        calleePut.addColumn(Bytes.toBytes("f2"), Bytes.toBytes("caller"), Bytes.toBytes(callee));
        calleePut.addColumn(Bytes.toBytes("f2"), Bytes.toBytes("build_time"), Bytes.toBytes(buildTime));
        calleePut.addColumn(Bytes.toBytes("f2"), Bytes.toBytes("build_time_ts"), Bytes.toBytes(buildTimeTs));
        calleePut.addColumn(Bytes.toBytes("f2"), Bytes.toBytes("flag"), Bytes.toBytes(flag));
        calleePut.addColumn(Bytes.toBytes("f2"), Bytes.toBytes("duration"), Bytes.toBytes(duration));

        Bytes.toBytes(100L);

        Table table = e.getEnvironment().getTable(TableName.valueOf(targetTableName));
        table.put(calleePut);
        table.close();
    }
}

2)重新创建HBase表,并设置为该表设置协处理器。在“表描述器”中调用addCoprocessor方法进行协处理器的设置,大概是这样的:(你需要找到你的建表的那部分代码,添加如下逻辑)
tableDescriptor.addCoprocessor(“hbase.CalleeWriteObserver”);

5、运行测试:协处理器

重新编译项目,发布jar包到hbase的lib目录下(注意需群发):
$ scp -r CT_consumer-1.0-SNAPSHOT.jar root@hsiehchou121:pwd

重新修改hbase-site.xml

    <property>
        <name>hbase.coprocessor.region.classes</name>
        <value>hbase.CalleeWriteObserver</value>
    </property>

完成以上步骤后,重新消费数据进行测试

6、编写测试单元:范围查找数据

思路
a)已知要查询的手机号码以及起始时间节点和结束时间节点,查询该节点范围内的该手机号码的通话记录

b)拼装startRowKey和stopRowKey,即扫描范围,要想拼接出扫描范围,首先需要了解rowkey组成结构,我们再来复习一下,举个大栗子
rowkey:
分区号_手机号码1_通话建立时间_手机号码2_主(被)叫标记_通话持续时间
01_15837312345_20180725071833_1_0180

c)比如按月查询通话记录,则startRowKey举例:
regionHash_158373123456_20180805010000
stopRowKey举例:
regionHash_158373123456_20180805010000

注意:startRowKey和stopRowKey设计时,后面的部分已经被去掉

尖叫提示:rowKey的扫描范围为前闭后开

尖叫提示:rowKey默认是有序的,排序规则为字符的按位比较
d)如果查找所有的,需要多次scan表,每次scan设置为下一个时间窗口即可,该操作可放置于for循环中

编码
e)运行测试
观察是否已经按照时间范围查询出对应的数据

7、将数据从本地读取到HBase

1)启动ZooKeeper(配置了全局环境变量)

zkServer.sh start

2)启动Kafka(配置了全局环境变量)

kafka-server-start.sh /root/hd/kafka/config/server.properties &  

创建主题

bin/kafka-topics.sh --zookeeper hsiehchou121:2181 --topic calllog --create --replication-factor 1 --partitions 3

列出所有主题

bin/kafka-topics.sh --zookeeper hsiehchou121:2181 --list

启动 Kafka消费者

bin/kafka-console-consumer.sh --bootstrap-server hsiehchou121:9092 --topic calllog --from-beginning 

3)启动Hadoop(配置了全局环境变量)

start-all.sh

4)启动HBase(配置了全局环境变量)
start-hbase.sh

5)启动Flume(没有配置全局环境变量,去flume目录下)

bin/flume-ng agent --conf conf/ --name a1 --conf-file myagent/flume2kafka.conf

6)IDEA打包CT_consumer.jar
此jar包要放入hbase的lib下面,不然HBase写不进数据

7)在IDEA里面运行HBaseConsumer.java

8、总结

数据(本地)->Flume采集数据->Kafka消费数据->HBase

六、数据分析

我们的数据已经完整的采集到了HBase集群中,这次我们需要对采集到的数据进行分析,统计出我们想要的结果。注意,在分析的过程中,我们不一定会采取一个业务指标对应一个MapReduce-Job的方式,如果情景允许,我们会采取一个MapReduce分析多个业务指标的方式来进行任务

数据分析模块流程图

数据分析模块流程图

业务指标
a)用户每天主叫通话个数统计,通话时间统计
b)用户每月通话记录统计,通话时间统计
c)用户之间亲密关系统计。(通话次数与通话时间体现用户亲密关系)

1、MySQL表结构设计

我们将分析的结果数据保存到Mysql中,以方便Web端进行查询展示
1)表:db_telecom.tb_contacts

用于存放用户手机号码与联系人姓名

备注 类型
id 自增主键 int(11) NOT NULL
telephone 手机号码 varchar(255) NOT NULL
name 联系人姓名 varchar(255) NOT NULL

2)表:db_telecom.tb_call

用于存放某个时间维度下通话次数与通话时长的总和

备注 类型
id_date_contact 复合主键(联系人维度id,时间维度id) varchar(255) NOT NULL
id_date_dimension 时间维度id int(11) NOT NULL
id_contact 查询人的电话号码 int(11) NOT NULL
call_sum 通话次数总和 int(11) NOT NULL DEFAULT 0
call_duration_sum 通话时长总和 int(11) NOT NULL DEFAULT 0

3)表:db_telecom.tb_dimension_date

用于存放时间维度的相关数据

备注 类型
id 自增主键 int(11) NOT NULL
year 年,当前通话信息所在年 int(11) NOT NULL
month 月,当前通话信息所在月,如果按照年来统计信息,则month为-1 int(11) NOT NULL
day 日,当前通话信息所在日,如果是按照月来统计信息,则day为-1 int(11) NOT NULL

MySQL的建表语句:

CREATE TABLE tb_call (
id_date_contact varchar(255) NOT NULL,
id_date_dimension int(11) NOT NULL,
id_contact int(11) NOT NULL,
call_sum int(11) NOT NULL,
call_duration_sum int(11) NOT NULL,
PRIMARY KEY (id_date_contact)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE tb_contacts (
id int(11) NOT NULL AUTO_INCREMENT,
telephone varchar(255) NOT NULL,
name varchar(255) NOT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=21 DEFAULT CHARSET=utf8;

CREATE TABLE tb_dimension_date (
id int(11) NOT NULL AUTO_INCREMENT,
year int(11) NOT NULL,
month int(11) NOT NULL,
day int(11) NOT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=263 DEFAULT CHARSET=utf8;

2、需求:按照不同的维度统计通话

根据需求目标,设计出如上表结构。我们需要按照时间范围(年月日),结合MapReduce统计出所属时间范围内所有手机号码的通话次数总和以及通话时长总和。
思路:
a)维度,即某个角度,某个视角,按照时间维度来统计通话,比如我想统计2018年所有月份所有日子的通话记录,那这个维度我们大概可以表述为2018年**
b)通过Mapper将数据按照不同维度聚合给Reducer
c)通过Reducer拿到按照各个维度聚合过来的数据,进行汇总,输出。
d)根据业务需求,将Reducer的输出通过Outputformat把数据
数据输入:HBase
数据输出:MySQL
HBase中数据源结构:

标签 举例&说明
rowkey hashregion_caller_datetime_callee_flag_duration; 01_15837312345_20180527081033_13766889900_1_0180
family f1列族:存放主叫信息; f2列族:存放被叫信息
caller 第一个手机号码
callee 第二个手机号码
date_time 通话建立的时间,例如:20181017081520
date_time_ts date_time对应的时间戳形式
duration 通话时长(单位:秒)
flag 标记caller是主叫还是被叫(caller的身份与call2的身份互斥)

a)已知目标,那么需要结合目标思考已有数据是否能够支撑目标实现;
b) 根据目标数据结构,构建MySQL表结构,建表;
c)思考代码需要涉及到哪些功能模块,建立不同功能模块对应的包结构。
d)描述数据,一定是基于某个维度(视角)的,所以构建维度类。比如按照“年”与“手机号码”的组合作为key聚合所有的数据,便可以统计这个手机号码,这一年的相关结果
e)自定义OutputFormat用于对接MySQL,使数据输出
f)创建相关工具类

3、环境准备

1) 新建module:ct_analysis
pom文件配置

  <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.13</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.3.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.3.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>1.3.0</version>
        </dependency>

        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjweaver</artifactId>
            <version>1.8.10</version>
        </dependency>

        <!--简化javabean-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.18</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.12.4</version>
                <configuration>
                    <skipTests>true</skipTests>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

2)创建包结构,根包:com.hsiehchou(不同颜色代表不同层级的递进)

CT_XZ

直接看CT_analysis

3) 类表
| 类名 | 备注 |
| :——–: | :——–:|
| DimensionConverter | 负责实际的维度转id功能接口|
| DimensionConverterImpl | DimensionConverter 实现类,负责实际的维度转id功能 |
| BaseDimension | 维度(key)基类 |
| BaseValue | 值(value)基类 |
| ComDimension | 时间维度+联系人维度的组合维度 |
| ContactDimension | 联系人维度 |
| DateDimension | 时间维度 |
| CountDurationValue | 通话次数与通话时长的封装 |
| CountDurationMapper | 数据分析的Mapper类,继承自TableMapper |
| MysqlOutputFormat | 自定义Outputformat,对接Mysql |
| CountDurationReducer | 数据分析的Reducer类,继承自Reduccer |
| CountDurationRunner | 数据分析的驱动类,组装Job |
| JDBCInstance | 获取连接实例 |
| JDBCUtils | 连接Mysql的工具类 |
| LRUCache | 用于缓存已知的维度id,减少对mysql的操作次数,提高效率 |

4、编写代码:数据分析

1)创建类:CountDurationMapper

package mapper;

import kv.key.ComDimension;
import kv.key.ContactDimension;
import kv.key.DateDimension;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class CountDurationMapper extends TableMapper<Comparable, Text> {

    private ComDimension comDimension = new ComDimension();
    private Text durationText = new Text();
    private Map<String, String> phoneMap;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        phoneMap = new HashMap<>(20);
        //批量修改名字Ctrl + Alt + Shift + J
        phoneMap.put("17078388295", "李为");
        phoneMap.put("13980337439", "王军");
        phoneMap.put("14575535933", "时俊");
        phoneMap.put("18902496992", "天机");
        phoneMap.put("18549641558", "蔡铭");
        phoneMap.put("17005930322", "陶尚");
        phoneMap.put("18468618874", "魏山帅");
        phoneMap.put("18576581848", "华倩");
        phoneMap.put("15978226424", "焦君山");
        phoneMap.put("15542823911", "钟尾田");
        phoneMap.put("17526304161", "司可可");
        phoneMap.put("15422018558", "官渡");
        phoneMap.put("17269452013", "上贵坡");
        phoneMap.put("17764278604", "时光机");
        phoneMap.put("15711910344", "李发");
        phoneMap.put("15714728273", "蒂冈");
        phoneMap.put("16061028454", "范德");
        phoneMap.put("16264433631", "周朝王");
        phoneMap.put("17601615878", "谢都都");
        phoneMap.put("15897468949", "刘何思");
    }

    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {

        //05_18902496992_1525454104000_15711910344_1_1705
        String rowkey = Bytes.toString(key.get());
        String[] splits = rowkey.split("_");
        if ("0".equals(splits[4])){
            return;
        }

        //聚合的是主叫数据
        String caller = splits[1];
        String callee = splits[3];
        String buildTime = splits[2];
        String duration = splits[5];
        durationText.set(duration);

        String year = buildTime.substring(0,4);
        String month = buildTime.substring(4,6);
        String day = buildTime.substring(6,8);

        //年、月、日整数
        DateDimension yearDimension = new DateDimension(year, "-1", "-1");
        DateDimension monthDimension = new DateDimension(year, month, "-1");
        DateDimension dayDimension = new DateDimension(year, month, day);

        //主叫callerContactDimension
        ContactDimension callerContactDimension = new ContactDimension(caller, phoneMap.get(caller));

        comDimension.setContactDimension(callerContactDimension);

        //年
        comDimension.setDateDimension(yearDimension);
        context.write(comDimension, durationText);

        //月
        comDimension.setDateDimension(monthDimension);
        context.write(comDimension, durationText);

        //日
        comDimension.setDateDimension(dayDimension);
        context.write(comDimension, durationText);

        //被叫callerContactDimension
        ContactDimension calleeContactDimension = new ContactDimension(callee, phoneMap.get(callee));

        comDimension.setContactDimension(calleeContactDimension);

        //年
        comDimension.setDateDimension(yearDimension);
        context.write(comDimension, durationText);

        //月
        comDimension.setDateDimension(monthDimension);
        context.write(comDimension, durationText);

        //日
        comDimension.setDateDimension(dayDimension);
        context.write(comDimension, durationText);
    }
}

2)创建类:CountDurationReducer

package reducer;

import kv.key.ComDimension;
import kv.value.CountDurationValue;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class CountDurationReducer extends Reducer<ComDimension, Text, ComDimension, CountDurationValue> {

    private CountDurationValue countDurationValue = new CountDurationValue();

    @Override
    protected void reduce(ComDimension key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        int callSum = 0;
        int callDuration = 0;
        for (Text t : values){
            callSum++;
            callDuration += Integer.valueOf(t.toString());
        }
        countDurationValue.setCallSum(String.valueOf(callSum));
        countDurationValue.setCallDurationSum(String.valueOf(callDuration));

        context.write(key, countDurationValue);
    }
}

3)创建类:CountDurationRunner

package runner;

import kv.key.ComDimension;
import kv.value.CountDurationValue;
import mapper.CountDurationMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import outputformat.MysqlOutputFormat;
import reducer.CountDurationReducer;

import java.io.IOException;

public class CountDurationRunner implements Tool {

    private Configuration conf = null;

    @Override
    public void setConf(Configuration conf) {
        this.conf = HBaseConfiguration.create(conf);
    }

    @Override
    public Configuration getConf() {
        return this.conf;
    }

    @Override
    public int run(String[] strings) throws Exception {
        //得到conf
        Configuration conf = this.getConf();
        //实例化Job
        Job job = Job.getInstance(conf);
        job.setJarByClass(CountDurationRunner.class);
        //组装Mapper InputFormat
        initHBaseInputConfig(job);
        //组装Reducer OutputFormay
        initReducerOutputConfig(job);
        return job.waitForCompletion(true) ? 0:1;
    }

    private void initHBaseInputConfig(Job job) {
        Connection connection = null;
        Admin admin = null;
        try {
            String tableName = "ns_ct:calllog";
            connection = ConnectionFactory.createConnection(job.getConfiguration());
            admin = connection.getAdmin();
            if (!admin.tableExists(TableName.valueOf(tableName))){
                throw new RuntimeException("无法找到目标表");
            }
            Scan scan = new Scan();
            //可以优化
            TableMapReduceUtil.initTableMapperJob(
                    tableName,
                    scan,
                    CountDurationMapper.class,
                    ComDimension.class,
                    Text.class,
                    job,
                    true);

        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            try {
                if (admin != null){
                    admin.close();
                }
                if (connection != null && !connection.isClosed()){
                    connection.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void initReducerOutputConfig(Job job){
        job.setReducerClass(CountDurationReducer.class);
        job.setOutputKeyClass(ComDimension.class);
        job.setOutputKeyClass(CountDurationValue.class);
        job.setOutputFormatClass(MysqlOutputFormat.class);
    }

    public static void main(String[] args) {
        try {
            int status = ToolRunner.run(new CountDurationRunner(), args);
            System.exit(status);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

4)创建类:MysqlOutputFormat

package outputformat;

import converter.DimensionConverterImpl;
import kv.key.ComDimension;
import kv.value.CountDurationValue;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import utils.JDBCInstance;
import utils.JDBCUtils;

import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class MysqlOutputFormat extends OutputFormat<ComDimension, CountDurationValue> {

    private OutputCommitter committer = null;

    @Override
    public RecordWriter<ComDimension, CountDurationValue> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        //初始化JDBC连接对象
        Connection conn = null;
        conn = JDBCInstance.getInstance();

        try {
            //出问题的点之一,报空指针
            conn.setAutoCommit(false);
        } catch (SQLException e) {
            throw new RuntimeException(e.getMessage());
        }
        return new MysqlRecordWriter(conn);
    }

    //输出校验
    @Override
    public void checkOutputSpecs(JobContext jobContext) throws InterruptedException {
    }

    @Override
    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException {
        //此方法点击OutputFormat(按Ctrl + H的源码,复制getOutputCommitter,在FileOutputFormat)
        if (committer == null){
            String name = context.getConfiguration().get(FileOutputFormat.OUTDIR);
            Path outputPath = name==null ? null:new Path(name);
            committer = new FileOutputCommitter(outputPath, context);
        }
        return committer;
    }

    private static class MysqlRecordWriter extends RecordWriter<ComDimension, CountDurationValue>{
        private DimensionConverterImpl dci = new DimensionConverterImpl();
        private Connection conn = null;
        private PreparedStatement preparedStatement = null;
        private String insertSQL =null;
        private int count = 0;
        private final int BATCH_SIZE = 500;//批次大小

        public MysqlRecordWriter(Connection conn){
            this.conn = conn;
        }

        /**
         * 输出到mysql
         * @param key
         * @param value
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        public void write(ComDimension key, CountDurationValue value) throws IOException, InterruptedException {
            try{
                //tb_call
                //id_date_contact, id_date_dimension, id_cantact, call_sum, call_duration_sum

                //year month day
                int idDateDimension = dci.getDimensionID(key.getDateDimension());

                //telephone name
                int idContactDimension = dci.getDimensionID(key.getContactDimension());

                String idDateContact = idDateDimension + "_" + idContactDimension;

                int callSum = Integer.valueOf(value.getCallSum());
                int callDurationSum = Integer.valueOf(value.getCallDurationSum());

                if (insertSQL == null){
                    insertSQL = "INSERT INTO `tb_call` (`id_date_contact`, `id_date_dimension`, `id_contact`,  `call_sum`, `call_duration_sum`) values (?,?,?,?,?) ON DUPLICATE KEY UPDATE `id_date_contact` = ?;";
                }

                if (preparedStatement == null){
                    preparedStatement = conn.prepareStatement(insertSQL);
                }
                //本次SQL
                int i = 0;
                preparedStatement.setString(++i, idDateContact);
                preparedStatement.setInt(++i, idDateDimension);
                preparedStatement.setInt(++i, idContactDimension);
                preparedStatement.setInt(++i, callSum);
                preparedStatement.setInt(++i, callDurationSum);

                //无则插入,有则更新的判断依据,增加批次
                preparedStatement.setString(++i, idDateContact);
                preparedStatement.addBatch();

                //当前缓存了多少个sql语句等待批量执行,计数器
                count++;
                if (count >= BATCH_SIZE){
                    preparedStatement.executeBatch();//执行批处理命令
                    conn.commit();
                    count = 0;
                    preparedStatement.clearBatch();//清除批处理命令
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        }

        @Override
        public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            try {
                if (preparedStatement != null){
                    preparedStatement.executeBatch();
                        this.conn.commit();
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }finally {
                JDBCUtils.close(conn, preparedStatement, null);
            }
        }
    }
}

5)创建类:BaseDimension

package kv.base;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public abstract class BaseDimension implements WritableComparable<BaseDimension> {

    public abstract int compareTo(BaseDimension o);

    //将字节写入二进制流
    public abstract void write(DataOutput out) throws IOException;

    //从二进制流读取字节
    public abstract void readFields(DataInput in) throws IOException;

}

6)创建类:BaseValue

package kv.base;

import org.apache.hadoop.io.Writable;

public abstract class BaseValue implements Writable {
}

7)创建类:ComDimension

package kv.key;

import kv.base.BaseDimension;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class ComDimension extends BaseDimension {

    //联系人维度
    private ContactDimension contactDimension = new ContactDimension();

    //时间维度
    private DateDimension dateDimension = new DateDimension();

    @Override
    public int compareTo(BaseDimension o) {
        ComDimension o1 = (ComDimension) o;
        int result = this.dateDimension.compareTo(o1.dateDimension);
        if (result != 0) {
            return result;
        }

        result = this.contactDimension.compareTo(o1.contactDimension);
        return result;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        contactDimension.write(out);
        dateDimension.write(out);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.contactDimension.readFields(in);
        this.dateDimension.readFields(in);
    }
}

8)创建类:ContactDimension

package kv.key;

import kv.base.BaseDimension;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * 联系人维度类
 */
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class ContactDimension extends BaseDimension {

    //手机号码
    private String telephone;
    //姓名
    private String name;

    @Override
    public int compareTo(BaseDimension o) {
        ContactDimension o1 = (ContactDimension) o;
        int result = this.name.compareTo(o1.name);
        if (result != 0){
            return result;
        }
        result = this.telephone.compareTo(o1.telephone);
        return result;
    }

    //将字节写入二进制流
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(this.telephone);
        out.writeUTF(this.name);
    }

    //从二进制流读取字节
    // Alt + Enter
    @Override
    public void readFields(DataInput in) throws IOException {
        this.telephone = in.readUTF();
        this.name = in.readUTF();
    }
}

9)创建类:DateDimension

package kv.key;

import kv.base.BaseDimension;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * 时间维度类
 */
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class DateDimension extends BaseDimension {

    //时间维度:当前通话信息所在年
    private String year;
    //时间维度:当前通话信息所在月,如果按照年来统计信息,则month为-1
    private String month;
    //时间维度:当前通话信息所在日,如果按照年来统计信息,则day为-1。
    private String day;

    @Override
    public int compareTo(BaseDimension o) {
        DateDimension o1 = (DateDimension) o;
        int result = this.year.compareTo(o1.year);
        if (result != 0){
            return result;
        }

        result = this.month.compareTo(o1.month);
        if (result != 0){
           return result;
        }

        result = this.day.compareTo(o1.day);
        return result;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(this.year);
        out.writeUTF(this.month);
        out.writeUTF(this.day);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.year = in.readUTF();
        this.month = in.readUTF();
        this.day = in.readUTF();
    }
}

10)创建类:CountDurationValue

package kv.value;

import kv.base.BaseValue;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

@Setter
@Getter
@AllArgsConstructor
@NoArgsConstructor
public class CountDurationValue extends BaseValue {

    //某个维度通话次数总和
    private String callSum;

    //某个维度通话时间总和
    private String callDurationSum;

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(this.callSum);
        dataOutput.writeUTF(this.callDurationSum);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.callSum = dataInput.readUTF();
        this.callDurationSum = dataInput.readUTF();
    }
}

11) 创建类:JDBCUtil

package utils;

import java.sql.*;

public class JDBCUtils {

    private static final String MYSQL_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
    private static final String MYSQL_URL = "jdbc:mysql://hsiehchou121:3306/db_telecom?useUnicode=true&characterEncoding=UTF-8";
    private static final String MYSQL_USERNAME = "root";
    private static final String MYSQL_PASSWORD = "root";

    /**
     * 实例化JDBC连接器
     * @return
     */
    public static Connection getConnection(){
        try {
            Class.forName(MYSQL_DRIVER_CLASS);
            return DriverManager.getConnection(MYSQL_URL, MYSQL_USERNAME, MYSQL_PASSWORD);
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static void close(Connection connection, Statement statement, ResultSet resultSet){

        try {
            if (resultSet != null && !resultSet.isClosed()){
                resultSet.close();
            }
            if (statement != null && !statement.isClosed()){
                statement.close();
            }
            if (connection != null && !connection.isClosed()){
                connection.close();
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

12)创建类:JDBCInstance

package utils;

import java.sql.Connection;
import java.sql.SQLException;

/**
 * 获取链接实例
 */
public class JDBCInstance {
    private static Connection connection = null;

    public JDBCInstance() {
    }

    public static Connection getInstance(){
        try {
            if (connection == null || connection.isClosed() || !connection.isValid(3)){
                connection = JDBCUtils.getConnection();
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return connection;
    }
}

13) 创建接口:DimensionConverter

package converter;

import kv.base.BaseDimension;

public interface DimensionConverter {
    int getDimensionID(BaseDimension dimension);
}

14)创建类:DimensionConverterImpl

package converter;

import kv.base.BaseDimension;
import kv.key.ContactDimension;
import kv.key.DateDimension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import utils.JDBCInstance;
import utils.JDBCUtils;
import utils.LRUCache;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

/**
 * 1、根据传入的维度数据,得到该数据对应的在表中的主键id
 * ** 做内存缓存,LRUCache
 * 分支
 * -- 缓存中有数据 -> 直接返回id
 * -- 缓存中无数据 ->
 * ** 查询Mysql
 * 分支:
 * -- Mysql中有该条数据 -> 直接返回id -> 将本次读取到的id缓存到内存中
 * -- Mysql中没有该数据  -> 插入该条数据 -> 再次反查该数据,得到id并返回 -> 缓存到内存中
 */
public class DimensionConverterImpl implements DimensionConverter {

    // Logger 打印该类的日志,取代resources里的log4j.properties
    private static final Logger logger = LoggerFactory.getLogger(DimensionConverterImpl.class);

    //对象线程化,用于每个线程管理自己的JDBC连接器
    private ThreadLocal<Connection> threadLocalConnection = new ThreadLocal<>();

    //构建内存缓存对象
    private LRUCache lruCache = new LRUCache(3000);

    public DimensionConverterImpl() {
        //jvm关闭时,释放资源
        Runtime.getRuntime().addShutdownHook(new Thread(() -> JDBCUtils.close(threadLocalConnection.get(), null, null)));
    }

    @Override
    public int getDimensionID(BaseDimension dimension) {

        //1、根据传入的维度对象获取对应的主键id,先从LRUCache中获取
        //时间维度:date_dimension_year_month_day, 10
        //联系人维度:contact_dimension_telephone_name(到了电话号码就不会重复了), 12
        String cacheKey = getCacheKey(dimension);
        //尝试获取缓存的id
        if (lruCache.containsKey(cacheKey)) {
            return lruCache.get(cacheKey);
        }
        //没有得到缓存id,需要执行select操作
        //sqls包含了1组sql语句:查询和插入
        String[] sqls = null;
        if (dimension instanceof DateDimension) {
            sqls = getDateDimensionSQL();
        } else if (dimension instanceof ContactDimension) {
            sqls = getContactDimensionSQL();
        } else {
            throw new RuntimeException("没有匹配到对应维度信息.");
        }

        //准备对Mysql表进行操作,先查询,有可能再插入
        Connection conn = this.getConnection();
        int id = -1;
        synchronized (this) {
            id = execSQL(conn, sqls, dimension);
        }
        //将刚查询的id加入到缓存中
        lruCache.put(cacheKey, id);
        return id;
    }

    /**
     * 得到当前线程维护的Connection对象
     *
     * @return
     */
    public Connection getConnection() {
        Connection conn = null;
        try {
            conn = threadLocalConnection.get();
            if (conn == null || conn.isClosed()) {
                conn = JDBCInstance.getInstance();
                threadLocalConnection.set(conn);
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return conn;
    }

    /**
     * @param conn      JDBC连接器
     * @param sqls      长度为2,第一个位置为查询语句,第二个位置为插入语句
     * @param dimension 对应维度所保存的数据
     * @return
     */
    private int execSQL(Connection conn, String[] sqls, BaseDimension dimension) {
        PreparedStatement preparedStatement = null;
        ResultSet resultSet = null;

        try {
            //1
            //查询的preparedStatement
            preparedStatement = conn.prepareStatement(sqls[0]);
            //根据不同的维度,封装不同的SQL语句
            setArguments(preparedStatement, dimension);
            //执行查询
            resultSet = preparedStatement.executeQuery();
            if (resultSet.next()) {
                int result = resultSet.getInt(1);
                //释放资源
                JDBCUtils.close(null, preparedStatement, resultSet);
                return result;
            }
            //释放资源
            JDBCUtils.close(null, preparedStatement, resultSet);

            //2
            //执行插入,封装插入的sql语句
            preparedStatement = conn.prepareStatement(sqls[1]);
            setArguments(preparedStatement, dimension);
            //执行插入
            preparedStatement.executeUpdate();
            //释放资源
            JDBCUtils.close(null, preparedStatement, null);

            //3
            //查询的preparedStatement
            preparedStatement = conn.prepareStatement(sqls[0]);
            //根据不同的维度,封装不同的SQL语句
            setArguments(preparedStatement, dimension);
            //执行查询
            resultSet = preparedStatement.executeQuery();
            if (resultSet.next()) {
                return resultSet.getInt(1);
            }
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            //释放资源
            JDBCUtils.close(null, preparedStatement, resultSet);
        }
        return -1;
    }

    /**
     * 设置SQL语句的具体参数
     *
     * @param preparedStatement
     * @param dimension
     */
    private void setArguments(PreparedStatement preparedStatement, BaseDimension dimension) {
        int i = 0;
        try {
            if (dimension instanceof DateDimension) {
                //可以优化
                DateDimension dateDimension = (DateDimension) dimension;
                preparedStatement.setString(++i, dateDimension.getYear());
                preparedStatement.setString(++i, dateDimension.getMonth());
                preparedStatement.setString(++i, dateDimension.getDay());
            } else if (dimension instanceof ContactDimension) {
                ContactDimension contactDimension = (ContactDimension) dimension;
                preparedStatement.setString(++i, contactDimension.getTelephone());
                preparedStatement.setString(++i, contactDimension.getName());
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    /**
     * 返回联系人表的查询和插入语句
     *
     * @return
     */
    private String[] getContactDimensionSQL() {
        String query = "SELECT `id` FROM `tb_contacts` WHERE `telephone` = ? AND `name` = ? ORDER BY `id`;";
        String insert = "INSERT INTO `tb_contacts`(`telephone`, `name`) VALUES(?, ?);";
        return new String[]{query, insert};
    }

    /**
     * 返回时间表的查询和插入语句
     *
     * @return
     */
    private String[] getDateDimensionSQL() {
        String query = "SELECT `id` FROM `tb_dimension_date` WHERE `year` = ? AND `month` = ? AND `day` = ? ORDER BY `id`;";
        String insert = "INSERT INTO `tb_dimension_date`(`year`,`month`,`day`) VALUES(?, ?, ?);";
        return new String[]{query, insert};
    }

    /**
     * 根据维度信息得到维度对应的缓存键
     *
     * @param dimension
     * @return
     */
    private String getCacheKey(BaseDimension dimension) {
        StringBuilder sb = new StringBuilder();
        if (dimension instanceof DateDimension) {
            DateDimension dateDimension = (DateDimension) dimension;
            sb.append("date_dimension")
                    .append(dateDimension.getYear())
                    .append(dateDimension.getMonth())
                    .append(dateDimension.getDay());
        } else if (dimension instanceof ContactDimension) {
            ContactDimension contactDimension = (ContactDimension) dimension;
            sb.append("contact_dimension")
                    .append(contactDimension.getTelephone());
        }
        return sb.toString();
    }
}

15) 创建类:LRUCache

package utils;

import java.util.LinkedHashMap;
import java.util.Map;

public class LRUCache extends LinkedHashMap<String, Integer> {

    private static  final long serialVersionUID = 1L;

    protected int maxElements;

    public LRUCache(int maxSize){
        super(maxSize, 0.75F, true);
        this.maxElements = maxSize;
    }

    @Override
    protected boolean removeEldestEntry(Map.Entry<String, Integer> eldest) {
        return (size() > this.maxElements);
    }
}

5、运行测试

1)将mysql驱动包放入到/opt/jars的lib目录下
mysql-connector-java-8.0.13.jar

2)提交任务

$ /root/hd/hadoop-2.8.4/bin/yarn jar /opt/jars/CT_analysis-1.0-SNAPSHOT.jar runner.CountDurationRunner -libjars /opt/jars/lib/mysql-connector-java-8.0.13.jar

观察Mysql中的结果

七、数据展示

令人兴奋的时刻马上到了,接下来我们需要将某人按照不同维度查询出来的结果,展示到web页面上
数据展示模块流程图

数据展示模块流程图

1、环境准备

1)新建module或项目:CT_web
pom.xml配置文件:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.hsiehchou</groupId>
  <artifactId>CT_web</artifactId>
  <packaging>war</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>ct_web Maven Webapp</name>
  <url>http://maven.apache.org</url>
  <dependencies>
    <!-- https://mvnrepository.com/artifact/junit/junit -->
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>8.0.13</version>
    </dependency>

    <dependency>
      <groupId>c3p0</groupId>
      <artifactId>c3p0</artifactId>
      <version>0.9.1.2</version>
    </dependency>

    <dependency>
      <groupId>org.mybatis</groupId>
      <artifactId>mybatis</artifactId>
      <version>3.2.1</version>
    </dependency>

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-context-support</artifactId>
      <version>4.3.3.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-jdbc</artifactId>
      <version>4.3.3.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-webmvc</artifactId>
      <version>4.3.3.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.mybatis</groupId>
      <artifactId>mybatis-spring</artifactId>
      <version>1.3.0</version>
    </dependency>
    <dependency>
      <groupId>org.aspectj</groupId>
      <artifactId>aspectjweaver</artifactId>
      <version>1.8.10</version>
    </dependency>
    <dependency>
      <groupId>javax.servlet</groupId>
      <artifactId>servlet-api</artifactId>
      <version>2.5</version>
    </dependency>
    <dependency>
      <groupId>javax.servlet</groupId>
      <artifactId>jstl</artifactId>
      <version>1.2</version>
    </dependency>
      <dependency>
          <groupId>org.springframework</groupId>
          <artifactId>spring-beans</artifactId>
          <version>4.3.3.RELEASE</version>
      </dependency>
  </dependencies>
  <build>
    <finalName>ct_web</finalName>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>2.12.4</version>
        <configuration>
          <skipTests>true</skipTests>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.8.1</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

2)创建包结构,根包:com.hsiehchou
bean
controller
dao
3)类表

类名 备注
CallLog 用于封装数据分析结果的JavaBean
QueryInfo 用于封装向服务器发来的请求参数
CallLogHandler 用于处理请求的Controller
CallLogDAO 查询某人某个维度通话记录的DAO

4)web目录结构,web部分的根目录:webapp

文件夹名 备注
css 存放css静态资源的文件夹
html 存放html静态资源的文件夹
images 存放图片静态资源文件夹
js 存放js静态资源的文件夹
jsp 存放jsp页面的文件夹
WEB-INF 存放web相关配置的文件夹

5) resources目录下创建spring相关配置文件

dbconfig.properties:用于存放数据库连接配置

jdbc.user=root
jdbc.password=root
jdbc.jdbcUrl=jdbc:mysql://hsiehchou121:3306/db_telecom?useUnicode=true&characterEncoding=UTF-8
jdbc.driverClass=com.mysql.cj.jdbc.Driver

applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:mvc="http://www.springframework.org/schema/mvc"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
        http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd
        http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee.xsd
        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">
    <context:property-placeholder location="classpath:dbconfig.properties"/>

    <bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource">
        <property name="user" value="${jdbc.user}"/>
        <property name="password" value="${jdbc.password}"/>
        <property name="driverClass" value="${jdbc.driverClass}"/>
        <property name="jdbcUrl" value="${jdbc.jdbcUrl}"/>
    </bean>
    <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
        <constructor-arg name="dataSource" value="#{dataSource}"></constructor-arg>
    </bean>

    <bean id="namedParameterJdbcTemplate" class="org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate">
        <constructor-arg name="dataSource" value="#{dataSource}"></constructor-arg>
    </bean>

    <!-- 包扫描 -->
    <context:component-scan base-package="controller"></context:component-scan>
    <context:component-scan base-package="dao"></context:component-scan>

    <!-- 配置视图解析器-->
    <bean id="viewResolver" class="org.springframework.web.servlet.view.InternalResourceViewResolver">
        <property name="prefix" value="/"/>
        <property name="suffix" value=".jsp"/>
    </bean>

    <!--<mvc:annotation-driven />-->
    <!--<mvc:default-servlet-handler />-->
    <!--<mvc:resources location="/images/" mapping="/images/**"/>-->
    <!--<mvc:resources location="/js/" mapping="/js/**"/>-->
    <!--<mvc:resources location="/css/" mapping="/css/**"/>-->
</beans>

6) webapp的WEB-INF目录下创建web相关配置
web.xml

<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"
         version="3.1">
    <display-name>SpringMVC_CRUD</display-name>
    <!-- spring拦截器 -->
    <servlet>
        <servlet-name>dispatcherServlet</servlet-name>
        <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
        <init-param>
            <param-name>contextConfigLocation</param-name>
            <param-value>classpath:applicationContext.xml</param-value>
        </init-param>
        <load-on-startup>1</load-on-startup>
    </servlet>
    <servlet-mapping>
        <servlet-name>dispatcherServlet</servlet-name>
        <url-pattern>/</url-pattern>
    </servlet-mapping>

    <welcome-file-list>
        <welcome-file>index.jsp</welcome-file>
    </welcome-file-list>

    <filter>
        <filter-name>CharacterEncodingFilter</filter-name>
        <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
        <init-param>
            <param-name>encoding</param-name>
            <param-value>utf-8</param-value>
        </init-param>
        <init-param>
            <param-name>forceEncoding</param-name>
            <param-value>true</param-value>
        </init-param>
    </filter>
    <filter-mapping>
        <filter-name>CharacterEncodingFilter</filter-name>
        <url-pattern>/*</url-pattern>
    </filter-mapping>

    <servlet-mapping>
        <servlet-name>default</servlet-name>
        <url-pattern>*.jpg</url-pattern>
    </servlet-mapping>
    <servlet-mapping>
        <servlet-name>default</servlet-name>
        <url-pattern>*.js</url-pattern>
    </servlet-mapping>
    <servlet-mapping>
        <servlet-name>default</servlet-name>
        <url-pattern>*.css</url-pattern>
    </servlet-mapping>
</web-app>

7)拷贝js框架到webapp的js目录下
框架名称:
echarts.min.js

2、编写代码

思路:
a)首先测试数据通顺以及完整性,写一个联系人的测试用例。
b)测试通过后,通过输入手机号码以及时间参数,查询指定维度的数据,并以图表展示。
代码:
1)新建类: CallLog

package bean;

/**
 * 用于存放返回给用户的数据
 */
public class CallLog {
    private String id_date_contact;
    private String id_date_dimension;
    private String id_contact;
    private String call_sum;
    private String call_duration_sum;
    private String telephone;
    private String name;
    private String year;
    private String month;
    private String day;

    public String getId_date_contact() {
        return id_date_contact;
    }

    public void setId_date_contact(String id_date_contact) {
        this.id_date_contact = id_date_contact;
    }

    public String getId_date_dimension() {
        return id_date_dimension;
    }

    public void setId_date_dimension(String id_date_dimension) {
        this.id_date_dimension = id_date_dimension;
    }

    public String getId_contact() {
        return id_contact;
    }

    public void setId_contact(String id_contact) {
        this.id_contact = id_contact;
    }

    public String getCall_sum() {
        return call_sum;
    }

    public void setCall_sum(String call_sum) {
        this.call_sum = call_sum;
    }

    public String getCall_duration_sum() {
        return call_duration_sum;
    }

    public void setCall_duration_sum(String call_duration_sum) {
        this.call_duration_sum = call_duration_sum;
    }

    public String getTelephone() {
        return telephone;
    }

    public void setTelephone(String telephone) {
        this.telephone = telephone;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getYear() {
        return year;
    }

    public void setYear(String year) {
        this.year = year;
    }

    public String getMonth() {
        return month;
    }

    public void setMonth(String month) {
        this.month = month;
    }

    public String getDay() {
        return day;
    }

    public void setDay(String day) {
        this.day = day;
    }

    @Override
    public String toString() {
        return "CallLog{" +
                "call_sum='" + call_sum + '\'' +
                ", call_duration_sum='" + call_duration_sum + '\'' +
                ", telephone='" + telephone + '\'' +
                ", name='" + name + '\'' +
                ", year='" + year + '\'' +
                ", month='" + month + '\'' +
                ", day='" + day + '\'' +
                '}';
    }
}

2)新建类:QueryInfo

package bean;

/**
 * 该类用于存放用户请求的数据
 */
public class QueryInfo {
    private String telephone;
    private String year;
    private String month;
    private String day;

    public QueryInfo() {
        super();
    }

    public QueryInfo(String telephone, String year, String month, String day) {
        super();
        this.telephone = telephone;
        this.year = year;
        this.month = month;
        this.day = day;
    }

    public String getTelephone() {
        return telephone;
    }

    public void setTelephone(String telephone) {
        this.telephone = telephone;
    }

    public String getYear() {
        return year;
    }

    public void setYear(String year) {
        this.year = year;
    }

    public String getMonth() {
        return month;
    }

    public void setMonth(String month) {
        this.month = month;
    }

    public String getDay() {
        return day;
    }

    public void setDay(String day) {
        this.day = day;
    }
}

3)新建类: CallLogDAO

package dao;

import bean.CallLog;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.stereotype.Repository;

import java.util.HashMap;
import java.util.List;

@Repository
public class CallLogDAO {

    @Autowired
    private NamedParameterJdbcTemplate namedParameterJdbcTemplate;

    public List<CallLog> getCallLogList(HashMap<String, String> paramsMap) {
        //按照年统计:统计某个用户,1年12个月的所有的数据(不精确到day)
        String sql = "SELECT `call_sum`, `call_duration_sum`, `telephone`, `name`, `year` , `month`, `day` FROM tb_dimension_date t4 INNER JOIN ( SELECT `id_date_dimension`, `call_sum`, `call_duration_sum`, `telephone`, `name` FROM tb_call t2 INNER JOIN ( SELECT `id`, `telephone`, `name` FROM tb_contacts WHERE telephone = :telephone ) t1 ON t2.id_contact = t1.id ) t3 ON t4.id = t3.id_date_dimension WHERE `year` = :year AND `month` != :month AND `day` = :day ORDER BY `year`, `month`;";
        BeanPropertyRowMapper<CallLog> beanPropertyRowMapper = new BeanPropertyRowMapper<>(CallLog.class);
        List<CallLog> list = namedParameterJdbcTemplate.query(sql, paramsMap, beanPropertyRowMapper);
        return list;
    }
}

4)新建类:CallLogHandler

package controller;

import bean.CallLog;
import bean.QueryInfo;
import dao.CallLogDAO;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.RequestMapping;

import java.util.HashMap;
import java.util.List;

@Controller
public class CallLogHandler {

    @RequestMapping("/queryCallLogList")
    public String queryCallLog(Model model, QueryInfo queryInfo){
        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml");
        CallLogDAO callLogDAO = applicationContext.getBean(CallLogDAO.class);

        HashMap<String, String> hashMap = new HashMap<>();
        hashMap.put("telephone", queryInfo.getTelephone());
        hashMap.put("year", queryInfo.getYear());
        hashMap.put("month", queryInfo.getMonth());
        hashMap.put("day", queryInfo.getDay());

        List<CallLog> list = callLogDAO.getCallLogList(hashMap);

        StringBuilder dateSB = new StringBuilder();
        StringBuilder callSumSB = new StringBuilder();
        StringBuilder callDurationSumSB = new StringBuilder();

        for(int i = 0; i < list.size(); i++){
            CallLog callLog = list.get(i);
            //1月, 2月, ....12月,
            dateSB.append(callLog.getMonth() + "月,");
            callSumSB.append(callLog.getCall_sum() + ",");
            callDurationSumSB.append(callLog.getCall_duration_sum() + ",");
        }

        dateSB.deleteCharAt(dateSB.length() - 1);
        callSumSB.deleteCharAt(callSumSB.length() - 1);
        callDurationSumSB.deleteCharAt(callDurationSumSB.length() - 1);

        //通过model返回数据
        model.addAttribute("telephone", list.get(0).getTelephone());
        model.addAttribute("name", list.get(0).getName());
        model.addAttribute("date", dateSB.toString());
        model.addAttribute("count", callSumSB.toString());
        model.addAttribute("duration", callDurationSumSB.toString());

        return "jsp/CallLogListEchart";
    }
}

5)新建:index.jsp

<%@ taglib prefix="c" uri="http://java.sun.com/jsp/jstl/core" %>
<%@ page language="java" contentType="text/html; charset=utf-8"
         pageEncoding="utf-8" %>
<%
    String path = request.getContextPath();
%>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
    <head>
        <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
        <title>电信查询系统</title>
        <link href="//cdn.bootcss.com/bootstrap/3.3.6/css/bootstrap.min.css" rel="stylesheet">
        <script src="//cdn.bootcss.com/jquery/2.1.1/jquery.min.js"></script>
        <script src="//cdn.bootcss.com/bootstrap/3.3.6/js/bootstrap.min.js"></script>
    </head>

    <body>
        <div style="width:410px;margin: 0 auto;">
            <h3>电信查询用户通话次数和通话时长系统</h3>
            <br />
            <form role="form" action="/queryCallLogList" method="post">
                <div class="form-group" style="margin-bottom: 0;">
                    <label for="name">手机号码</label>
                    <input type="text" name="telephone" class="form-control" id="name" placeholder="请输入手机号码">
                </div>
                <div class="form-group" style="margin-bottom: 0;">
                    <label for="name">年</label>
                    <input type="text" name="year" class="form-control" id="name" placeholder="请输入年">
                </div>
                <div class="form-group" style="margin-bottom: 0;">
                    <label for="name">月</label>
                    <input type="text" name="month" class="form-control" id="name" placeholder="请输入月">
                </div>
                <div class="form-group" style="margin-bottom: 0;">
                    <label for="name">日</label>
                    <input type="text" name="day" class="form-control" id="name" placeholder="请输入日">
                </div>
                <button type="submit" class="btn btn-default">查询</button>
            </form>
        </div>
        <br />
        <div style="width: 1000px;margin: 0 auto;">
            <table class="table">
                <h4>数据库电话号码表</h4>
                <thead>
                    <tr>
                        <th>姓名</th>
                        <th>手机号</th>
                        <th></th>

                        <th>姓名</th>
                        <th>手机号</th>
                        <th></th>

                        <th>姓名</th>
                        <th>手机号</th>
                        <th></th>

                        <th>姓名</th>
                        <th>手机号</th>
                        <th></th>

                        <th>姓名</th>
                        <th>手机号</th>
                    </tr>
                </thead>
                <tbody>
                    <tr class="active">
                        <td>李为</td>
                        <td>17078388295</td>
                        <td></td>

                        <td>王军</td>
                        <td>13980337439</td>
                        <td></td>

                        <td>时俊</td>
                        <td>14575535933</td>
                        <td></td>

                        <td>天机</td>
                        <td>18902496992</td>
                        <td></td>

                        <td>蔡铭</td>
                        <td>18549641558</td>
                    </tr>
                    <tr class="success">
                        <td>陶尚</td>
                        <td>17005930322</td>
                        <td></td>

                        <td>魏山帅</td>
                        <td>18468618874</td>
                        <td></td>

                        <td>华倩</td>
                        <td>18576581848</td>
                        <td></td>

                        <td>焦君山</td>
                        <td>15978226424</td>
                        <td></td>

                        <td>钟尾田</td>
                        <td>15542823911</td>
                    </tr>
                    <tr  class="warning">
                        <td>司可可</td>
                        <td>17526304161</td>
                        <td></td>

                        <td>官渡</td>
                        <td>15422018558</td>
                        <td></td>

                        <td>上贵坡</td>
                        <td>17269452013</td>
                        <td></td>

                        <td>时光机</td>
                        <td>17764278604</td>
                        <td></td>

                        <td>李发</td>
                        <td>15711910344</td>
                    </tr>
                    <tr  class="danger">
                        <td>蒂冈</td>
                        <td>15714728273</td>
                        <td></td>

                        <td>范德</td>
                        <td>16061028454</td>
                        <td></td>

                        <td>周朝王</td>
                        <td>16264433631</td>
                        <td></td>

                        <td>谢都都</td>
                        <td>17601615878</td>
                        <td></td>

                        <td>刘何思</td>
                        <td>15897468949</td>
                    </tr>
                </tbody>
            </table>
        </div>
    </body>
</html>

6)新建:CallLogListEchart.jsp

<%--
  Created by IntelliJ IDEA.
  User: Z
  Date: 2017/10/28
  Time: 14:36
  To change this template use File | Settings | File Templates.
--%>
<%@ page contentType="text/html;charset=UTF-8" language="java" isELIgnored="false" %>
<html>
<head>
    <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
    <title>显示通话记录</title>
    <script type="text/javascript" src="../js/echarts.min.js"></script>
    <%--<script type="text/javascript" src="${pageContext.request.contextPath}/js/echarts.min.js"></script>--%>
    <%--<script type="text/javascript" src="${pageContext.request.contextPath}/jquery-3.2.0.min.js"></script>--%>
    <%--<script type="text/javascript" src="http://echarts.baidu.com/gallery/vendors/echarts/echarts-all-3.js"></script>--%>
</head>
<body style="height: 100%; margin: 0; background-color: #3C3F41">
<style type="text/css">
    h3 {
        font-size: 12px;
        color: #ffffff;
        display: inline
    }
</style>
<h4 style="color: #ffffff;text-align:center">通话月单查询:${requestScope.name}</h4>
<%--<h3 style="margin-left: 70%">通话次数</h3>--%>
<%--<h3 style="margin-left: 20%">通话时长</h3>--%>
<div id="container1" style="height: 80%; width: 50%; float:left"></div>
<div id="container2" style="height: 80%; width: 50%; float:right"></div>
<script type="text/javascript">
    var telephone = "${requestScope.telephone}"
    var name = "${requestScope.name}"

    var date = "${requestScope.date}"//1月,2月,3月,xxxxx
    var count = "${requestScope.count}"

    var duration = "${requestScope.duration}"
    var pieData = converterFun(duration.split(","), date.split(","))
    callog1();
    callog2();

    function converterFun(duration, date) {
        var array = [];
        for (var i = 0; i < duration.length; i++) {
            var map = {};
            map['value'] = parseFloat(duration[i]);
            map['name'] = date[i];
            array.push(map);
        }
        return array;
    }

    function callog1() {
        var dom = document.getElementById("container1");
        var myChart = echarts.init(dom);
        myChart.showLoading();
        var option = {
            title: {
                text: '通话次数',
                textStyle: {
                    //文字颜色
                    color: '#ffffff',
                    //字体风格,'normal','italic','oblique'
                    fontStyle: 'normal',
                    //字体粗细 'normal','bold','bolder','lighter',100 | 200 | 300 | 400...
                    fontWeight: 'bold',
                    //字体系列
                    fontFamily: 'sans-serif',
                    //字体大小
                    fontSize: 13
                },
                itemGap: 12,
            },
            grid: {
                x: 80,
                y: 60,
                x2: 80,
                y2: 60,
                backgroundColor: 'rgba(0,0,0,0)',
                borderWidth: 1,
                borderColor: '#ffffff'
            },
            tooltip: {
                trigger: 'axis'
            },
            legend: {
                borderColor: '#ffffff',
                itemGap: 10,
                data: ['通话次数'],
                textStyle: {
                    color: '#ffffff'// 图例文字颜色
                }
            },
            toolbox: {
                show: false,
                feature: {
                    dataZoom: {
                        yAxisIndex: 'none'
                    },
                    dataView: {readOnly: false},
                    magicType: {type: ['line', 'bar']},
                    restore: {},
                    saveAsImage: {}
                }
            },
            xAxis: {
                data: date.split(","),
                axisLine: {
                    lineStyle: {
                        color: '#ffffff',
                        width: 2
                    }
                }
            },
            yAxis: {
                axisLine: {
                    lineStyle: {
                        color: '#ffffff',
                        width: 2
                    }
                }
            },
            series: [
                {
                    type: 'line',
                    data: count.split(","),
                    itemStyle: {
                        normal: {
                            color: '#ffca29',
                            lineStyle: {
                                color: '#ffd80d',
                                width: 2
                            }
                        }
                    },
                    markPoint: {
                        data: [
                            {type: 'max', name: '最大值'},
                            {type: 'min', name: '最小值'}
                        ]
                    },
                    markLine: {
                        data: [
                            {type: 'average', name: '平均值'}
                        ]
                    }
                }
            ]
        };
        if (option && typeof option === "object") {
            myChart.setOption(option, true);
        }
        myChart.hideLoading()
    }

    function callog2() {
        var dom = document.getElementById("container2");
        var myChart = echarts.init(dom);
        myChart.showLoading();
        var option = {
            title: {
                text: '通话时长',
                textStyle: {
                    //文字颜色
                    color: '#ffffff',
                    //字体风格,'normal','italic','oblique'
                    fontStyle: 'normal',
                    //字体粗细 'normal','bold','bolder','lighter',100 | 200 | 300 | 400...
                    fontWeight: 'bold',
                    //字体系列
                    fontFamily: 'sans-serif',
                    //字体大小
                    fontSize: 13
                },
                itemGap: 12,
            },
            tooltip: {
                trigger: 'item',
                formatter: "{a} <br/>{b} : {c} ({d}%)"
            },
            visualMap: {
                show: false,
                min: Math.min.apply(null, duration.split(",")),
                max: Math.max.apply(null, duration.split(",")),
                inRange: {
                    colorLightness: [0, 0.5]
                }
            },
            series: [
                {
                    name: '通话时长',
                    type: 'pie',
                    radius: '55%',
                    center: ['50%', '50%'],
                    data: pieData.sort(function (a, b) {
                        return a.value - b.value;
                    }),
                    roseType: 'radius',
                    label: {
                        normal: {
                            textStyle: {
                                color: 'rgba(255, 255, 255, 0.3)'
                            }
                        }
                    },
                    labelLine: {
                        normal: {
                            lineStyle: {
                                color: 'rgba(255, 255, 255, 0.3)'
                            },
                            smooth: 0.2,
                            length: 10,
                            length2: 20
                        }
                    },
                    itemStyle: {
                        normal: {
                            color: '#01c1c2',
                            shadowBlur: 200,
                            shadowColor: 'rgba(0, 0, 0, 0.5)'
                        }
                    },

                    animationType: 'scale',
                    animationEasing: 'elasticOut',
                    animationDelay: function (idx) {
                        return Math.random() * 200;
                    }
                }
            ]
        };
        if (option && typeof option === "object") {
            myChart.setOption(option, true);
        }
        myChart.hideLoading()
    }
</script>
</body>
</html>

3、最终预览

查询人通话时长与通话次数统计大概如下所示:

首页

index

统一展示

图形化展示

八、定时任务

新的数据每天都会产生,所以我们每天都需要更新离线的分析结果,所以此时我们可以用各种各样的定时任务调度工具来完成此操作。此例我们使用crontab来执行该操作。
1)编写任务脚本:analysis.sh

#!/bin/bash
/root/hd/hadoop-2.8.4/bin/yarn jar /opt/jars/CT_analysis-1.0-SNAPSHOT.jar runner.CountDurationRunner -libjars /root/hd/hadoop-2.8.4/lib/*

2) 制定crontab任务

# .------------------------------------------minute(0~59)
# | .----------------------------------------hours(0~23)
# | | .--------------------------------------day of month(1~31)
# | | | .------------------------------------month(1~12)
# | | | | .----------------------------------day of week(0~6)
# | | | | | .--------------------------------command
# | | | | | |
# | | | | | |
0 0 * * * /opt/jars/analysis.sh

3)考虑数据处理手段是否安全
a、定时任务统计结果是否会重复
b、定时任务处理的数据是否全面

九、项目总结

重新总结梳理整个项目流程和方法论
1、实现月查询(某个月每一天的数据展示)
2、用户亲密度展示
3、考虑Hive实现
4、用户按照时间区间,查找所有的通话数据
5、给读者建议—–按代码来—–》成功运行——》掌握此项目

十、附录

1、/flume/myagent/flume2kafka.conf

# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/jars/calllog.csv
a1.sources.r1.shell = /bin/bash -c

# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = hsiehchou121:9092,hsiehchou122:9092,hsiehchou123:9092
a1.sinks.k1.topic = calllog
a1.sinks.k1.batchSize = 20
a1.sinks.k1.requiredAcks = 1

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2、core-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->

<!-- Put site-specific property overrides in this file. -->
<configuration>
    <!-- 指定hdfs的nameservice为mycluster -->
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://mycluster</value>
    </property>

    <!-- 指定hadoop临时目录 -->
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/root/hd/hadoop-2.8.4/tmp</value>
    </property>

    <!-- 指定zookeeper地址 -->
    <property>
        <name>ha.zookeeper.quorum</name>
        <value>hsiehchou121:2181,hsiehchou122:2181,hsiehchou123:2181</value>
    </property>

    <!--<property>
        <name>ipc.client.connect.max.retries</name>
        <value>30</value>
    </property>

    <property>
        <name>ipc.client.connect.retry.interval</name>
        <value>1000</value>
    </property> -->
</configuration>

3、hdfs-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->

<!-- Put site-specific property overrides in this file. -->
<configuration> 
    <!--指定hdfs的nameservice为mycluster,需要和core-site.xml中的保持一致 -->
    <property>
        <name>dfs.nameservices</name>
        <value>mycluster</value>
    </property>

    <!-- mycluster下面有两个NameNode,分别是nn1,nn2 -->
    <property>
        <name>dfs.ha.namenodes.mycluster</name>
        <value>nn1,nn2</value>
    </property>

    <!-- nn1的RPC通信地址 -->
    <property>
        <name>dfs.namenode.rpc-address.mycluster.nn1</name>
        <value>hsiehchou121:9000</value>
    </property>

    <!-- nn1的http通信地址 -->
    <property>
        <name>dfs.namenode.http-address.mycluster.nn1</name>
        <value>hsiehchou121:50070</value>
    </property>

    <!-- nn2的RPC通信地址 -->
    <property>
        <name>dfs.namenode.rpc-address.mycluster.nn2</name>
        <value>hsiehchou122:9000</value>
    </property>

    <!-- nn2的http通信地址 -->
    <property>
        <name>dfs.namenode.http-address.mycluster.nn2</name>
        <value>hsiehchou122:50070</value>
    </property>

    <!-- 指定NameNode的日志在JournalNode上的存放位置 -->
    <property>
        <name>dfs.namenode.shared.edits.dir</name>
        <value>qjournal://hsiehchou121:8485;hsiehchou122:8485;/mycluster</value>
    </property>

    <!-- 指定JournalNode在本地磁盘存放数据的位置 -->
    <property>
        <name>dfs.journalnode.edits.dir</name>
        <value>/root/hd/hadoop-2.8.4/journal</value>
    </property>

    <!-- 开启NameNode失败自动切换 -->
    <property>
        <name>dfs.ha.automatic-failover.enabled</name>
        <value>true</value>
    </property>

    <!-- 配置失败自动切换实现方式 -->
    <property>
        <name>dfs.client.failover.proxy.provider.mycluster</name>
        <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
    </property>

    <!-- 配置隔离机制方法,多个机制用换行分割,即每个机制暂用一行-->
    <property>
        <name>dfs.ha.fencing.methods</name>
        <value>
            sshfence
            shell(/bin/true)
        </value>
    </property>

    <!-- 使用sshfence隔离机制时需要ssh免登陆 -->
    <property>
        <name>dfs.ha.fencing.ssh.private-key-files</name>
        <value>/root/.ssh/id_rsa</value>
    </property>

    <!-- 配置sshfence隔离机制超时时间 -->
    <property>
        <name>dfs.ha.fencing.ssh.connect-timeout</name>
        <value>30000</value>
    </property>
</configuration>

4、hbase-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
/**
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
-->
<configuration> 
    <property> 
        <name>hbase.cluster.distributed</name> 
        <value>true</value> 
    </property> 

    <property> 
        <name>hbase.rootdir</name> 
        <value>hdfs://mycluster/hbase</value> 
    </property> 

    <property> 
        <name>hbase.master.port</name> 
        <value>16000</value> 
    </property> 


    <property> 
        <name>hbase.zookeeper.quorum</name> 
        <value>hsiehchou121,hsiehchou122,hsiehchou123</value> 
    </property> 

    <!-- hbase的元数据信息存储在zookeeper的位置 --> 
    <property> 
        <name>hbase.zookeeper.property.dataDir</name> 
        <value>/root/hd/zookeeper-3.4.10/zkData</value> 
    </property> 

    <property> 
        <name>hbase.zookeeper.property.clientPort</name> 
        <value>2181</value> 
    </property> 

    <property> 
        <name>zookeeper.session.timeout</name> 
        <value>120000</value> 
    </property> 

    <property> 
        <name>hbase.zookeeper.property.tickTime</name> 
        <value>6000</value> 
    </property> 

    <!-- 保证HBase之间时间同步 -->
    <property>
        <name>hbase.master.maxclockskew</name>
        <value>180000</value>
        <description>Time difference of regionserver from master</description>
    </property>

    <!-- 使用HBase Coprocessor协处理器 -->
    <property>
        <name>hbase.coprocessor.region.classes</name>
        <value>hbase.CalleeWriteObserver</value>
    </property>

</configuration>

5、log4j.properties

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Define some default values that can be overridden by system properties
hbase.root.logger=INFO,console
hbase.security.logger=INFO,console
hbase.log.dir=.
hbase.log.file=hbase.log

# Define the root logger to the system property "hbase.root.logger".
log4j.rootLogger=${hbase.root.logger}

# Logging Threshold
log4j.threshold=ALL

#
# Daily Rolling File Appender
#
log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
log4j.appender.DRFA.File=${hbase.log.dir}/${hbase.log.file}

# Rollver at midnight
log4j.appender.DRFA.DatePattern=.yyyy-MM-dd

# 30-day backup
#log4j.appender.DRFA.MaxBackupIndex=30
log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout

# Pattern format: Date LogLevel LoggerName LogMessage
log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}: %m%n

# Rolling File Appender properties
hbase.log.maxfilesize=256MB
hbase.log.maxbackupindex=20

# Rolling File Appender
log4j.appender.RFA=org.apache.log4j.RollingFileAppender
log4j.appender.RFA.File=${hbase.log.dir}/${hbase.log.file}

log4j.appender.RFA.MaxFileSize=${hbase.log.maxfilesize}
log4j.appender.RFA.MaxBackupIndex=${hbase.log.maxbackupindex}

log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}: %m%n

#
# Security audit appender
#
hbase.security.log.file=SecurityAuth.audit
hbase.security.log.maxfilesize=256MB
hbase.security.log.maxbackupindex=20
log4j.appender.RFAS=org.apache.log4j.RollingFileAppender
log4j.appender.RFAS.File=${hbase.log.dir}/${hbase.security.log.file}
log4j.appender.RFAS.MaxFileSize=${hbase.security.log.maxfilesize}
log4j.appender.RFAS.MaxBackupIndex=${hbase.security.log.maxbackupindex}
log4j.appender.RFAS.layout=org.apache.log4j.PatternLayout
log4j.appender.RFAS.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
log4j.category.SecurityLogger=${hbase.security.logger}
log4j.additivity.SecurityLogger=false
#log4j.logger.SecurityLogger.org.apache.hadoop.hbase.security.access.AccessController=TRACE
#log4j.logger.SecurityLogger.org.apache.hadoop.hbase.security.visibility.VisibilityController=TRACE

#
# Null Appender
#
log4j.appender.NullAppender=org.apache.log4j.varia.NullAppender

#
# console
# Add "console" to rootlogger above if you want to use this
#
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}: %m%n

log4j.appender.asyncconsole=org.apache.hadoop.hbase.AsyncConsoleAppender
log4j.appender.asyncconsole.target=System.err

# Custom Logging levels

log4j.logger.org.apache.zookeeper=INFO
#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG
log4j.logger.org.apache.hadoop.hbase=INFO
# Make these two classes INFO-level. Make them DEBUG to see more zk debug.
log4j.logger.org.apache.hadoop.hbase.zookeeper.ZKUtil=INFO
log4j.logger.org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher=INFO
#log4j.logger.org.apache.hadoop.dfs=DEBUG
# Set this class to log INFO only otherwise its OTT
# Enable this to get detailed connection error/retry logging.
# log4j.logger.org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation=TRACE


# Uncomment this line to enable tracing on _every_ RPC call (this can be a lot of output)
#log4j.logger.org.apache.hadoop.ipc.HBaseServer.trace=DEBUG

# Uncomment the below if you want to remove logging of client region caching'
# and scan of hbase:meta messages
# log4j.logger.org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation=INFO
# log4j.logger.org.apache.hadoop.hbase.client.MetaScanner=INFO

# Prevent metrics subsystem start/stop messages (HBASE-17722)
log4j.logger.org.apache.hadoop.metrics2.impl.MetricsConfig=WARN
log4j.logger.org.apache.hadoop.metrics2.impl.MetricsSinkAdapter=WARN
log4j.logger.org.apache.hadoop.metrics2.impl.MetricsSystemImpl=WARN

文章作者: 谢舟
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 谢舟 !
 上一篇
系统定时任务 系统定时任务
crond系统定时任务1、crond服务管理service crond restart (重新启动服务) 2、crontab定时任务设置1)基本语法crontab [选项]选项: -e: 编辑crontab定时任
2019-06-04
下一篇 
Flink练习 Flink练习
一、Flink开发IDEA环境搭建与测试1、IDEA开发环境先虚拟机联网,然后执行yum -y install ncnc是用来打开端口的工具然后nc -l 9000 1.pom文件设置 <properties>
2019-05-18
  目录