Storm集群和集成


一、Storm集群任务提交流程

Storm集群任务提交流程

二、Storm内部通信机制

Worker进程

Worker

三、集成Storm

1、与JDBC集成

  • 将Storm Bolt处理的结果插入MySQL数据库中

  • 需要依赖的jar包
    $STORM_HOME\external\sql\storm-sql-core*.jar
    $STORM_HOME\external\storm-jdbc\storm-jdbc-1.0.3.jar
     mysql的驱动
     commons-lang3-3.1.jar

  • 与JDBC集成的代码实现
     修改主程序WordCountTopology,增加如下代码:

    //创建一个JDBCBolt将结果插入数据库中
    builder.setBolt("wordcount_jdbcBolt", createJDBCBolt()).shuffleGrouping("wordcount_count");

增加一个新方法创建JDBCBolt组件

//创建JDBC Insert Bolt组件
//需要事先在MySQL数据库中创建对应的表,result
private static IRichBolt createJDBCBolt(){
    ConnectionProvider connectionProvider = new MyConnectionProvider();
    JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper("aaa", connectionProvider);

    return new JdbcInsertBolt(connectionProvider, simpleJdbcMapper).withTableName("result").withQueryTimeoutSecs(30);
}

 实现ConnectionProvider接口

class MyConnectionProvider implements ConnectionProvider{
    private static String driver = "com.mysql.cj.jdbc.Driver";
    private static String url = "jdbc:mysql://192.168.116.121:3306/demo";
    private static String user = "root";
    private static String password = "password";

    //静态块
    static{//注册驱动
        try{
            Class.forName(driver);
        }catch(ClassNotFoundException e){
            throw new ExceptionInInitializerError(e);
        }
    }

    @Override
    public Connection getConnection(){
        try{
            return DriverManager.getConnection(url, user, password);
        }catch{
            e.printStackTrace();
        }    
        return null;    
    }
    public void cleanup(){}
    public void prepare(){}
}

 修改WordCountSplitBolt组件,将统计后的结果发送给下一个组件写入MySQL

public class WordCountSplitBolt extends BaseRichBolt {
    private Map<String, Integer> result = new HashMap<String, Integer>();
    private OutputCollector collector;
    @Override
    public void execute(Tuple tuple){
        String word = tuple.getStringByField("word");
        int count = tuple.getIntegerByField("count");

        if(result.containsKey(word)) {
            int total = result.get(word);
            result.put(word, total+count);
        }else{
            result.put(word, 1);
        }
        //直接输出到屏幕
        //System.out.println("输出的结果是:" + result);
        //将统计结果发送下一个Bolt,插入数据
        this.collector.emit(new Values*(word, result.get(word));
    }

    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void declareOutputFields(OutputFieldsDecler declare){
        declare.declarer(new Fields("word", "sum"));
    }
}

2、与Redis集成

Redis是一个key-value存储系统。和Memcached类似,它支持存储的value类型相对更多,包括string(字符串)、list(链表)、set(集合)、zset(sorted set –有序集合)和hash(哈希类型)。与Memcached一样,为了保证效率,数据都是缓存在内存中。区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步

Redis 是一个高性能的key-value数据库。Redis的出现,很大程度补偿了memcached这类key/value存储的不足,在部分场合可以对关系数据库起到很好的补充作用。它提供了Java,C/C++,C#,PHP,JavaScript,Perl,Object-C,Python,Ruby,Erlang等客户端,使用很方便。

Redis支持主从同步。数据可以从主服务器向任意数量的从服务器上同步,从服务器可以是关联其他从服务器的主服务器

 修改代码:WordCountTopology.java

builder.setBolt("wordcount_redisBolt", createRedisBolt()).shuffleGrouping("wordcount_count");

//创建Redis Bolt 组件
private staticIRichBolt createRedisBolt(){
    JedisPoolConfig.Builder builder = new JedisPoolConfig.Builder();
    builder.setHost("192.168.116.121");
    builder.setPort(6379);
    JedisPoolConfig poolConfig = builder.build();

    //RedisStoreMapper用于指定存入Redis中的数据格式
    return new RedisStoreBolt(poolConfig, new RedisStoreMapper(){
        @Override
        public RedisDateTypeDescription getDataTypeDescription(){
            return new RedisDateTypeDescription(RedisDateTypeDescription.RedisDateType.HASH, "wordCount");
        }

        @Override
        public String getValueFromTuple(){
            return String.valueOf(tuple.getIntegerByField("total"));
        }

        @Override
        public String getKeyFromTuple(){
            return tuple.getStringByField("word");    
        }
    })
}

3、与HDFS集成

  • 需要的jar包:
    $STORM_HOME\external\storm-hdfs\storm-hdfs-1.0.3.jar
     HDFS相关的jar包

  • 开发新的bolt组件

//创建一个新的HDFS Bolt组件,把前一个bolt组件处理的结果存入HDFS
private static IRichBolt createHDFSBolt(){
    HdfsBolt bolt = new HdfsBolt();
    //HDFS的位置
    bolt.withFsUrl("hdfs://192.168.116.121:9000");

    //数据保存在HDFS上的目录
    bolt.withFileNameFormat(new DefaultFileNameFormat().withPath("/stormdata"));

    //写入HDFS的数据的分隔符 | 结果:Beijing|10
    bolt.withRecordFormat(new DelimitedRecordFormat().withFieldDelimiter("|"));

    //每5M的数据生成一个文件
    bolt.withRotationPolicy(new FileSizeRotationPolicy(5.0f, Units.MB));
    //Bolt输出tuple,当tuple达到一定的大小(没1K),与HDFS进行同步
    bolt.withSyncPolicy(new CountSyncPolicy(1000));

    return bolt;
}

4、与HBase集成

  • 需要的jar包:HBase的相关包

  • 开发新的bolt组件(WordCountBoltHBase.java)

    /**
    * 在HBase中创建表,保存数据
    * create 'result','info'
    */
    public class WordCountBoltHBase extends BaseRichBolt {
    public void execute(Tuple tuple){
       //如何处理?将上一个bolt组件发送过来的结果,存入HBase
       //取出上一个组件发送过来的数据
       String word = tuple.getStringByField("word");
       int total = tuple.getIntegerByField("total");
    
       //构造一个Put对象
       Put put = new Put(Bytes.toBytes(word));
       put.add(Bytes.toBytes("info"), Bytes.toBytes("word"), Bytes.toBytes(word));
       put.add(Bytes.toBytes("info"), Bytes.toBytes("total"), Bytes.toBytes(String.valueOf(total)));
    
       //把数据插入HBase
       try{
           table.put(put);
       }catch(Exception e){
           e.printStackTrace();
       }
    }
    
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2){
       //初始化
       //指定ZK的地址
       Configuration conf = new Configuration();
       conf.set("hbase.zookeeper.quorum","192.168.116.121");
    
       //创建table的客户端
       try{
           table = new HTable(conf, "result");
       }catch(Exception ex){
           ex.printStackTrace();
       }
    }
    }

5、与Apache Kafka集成

  • 注意:需要把slf4j-log4j12-1.6.4.jar包去掉,有冲突(有两个)
private static IRichSpout creatKafkaSpout(){
    //定义ZK地址
    BrokerHosts hosts = new ZkHosts("hadoop121:2181,hadoop122:2181,hadoop123:2181");

    //指定Topic的信息
    SpoutConfig spoutConf = new SpoutConfig(hosts, "mydemo2", "/mydemo2", UUID.randomUUID().toString());

    //定义收到消息的Schema格式
    spoutConf.scheme = new SchemeAsMultiScheme(new Scheme(){
        @Override
        public Fields getOutputFields(){
            return new Fields("sentence");
        }

        @Override
        public List<Object> deserialize(ByteBuffer buffer){
            try{
                String msg = (Charset.forName("UTF-8").newDecoder()).decode(buffer).asReadOnlyBuffer().toString();
                System.out.println("**********收到的数据是msg" + msg);
                return new Values(msg);
            }catch(Exception e){
                e.printStackTrace();
            }
            return null;
        }
    });
    return new KafkaSpout(spoutConf);
}

6、与Hive集成

  • 由于集成Storm和Hive依赖的jar较多,并且冲突的jar包很多,强烈建议使用Maven来搭建新的工程
<dependencies>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>1.0.3</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-hive</artifactId>
        <version>1.0.3</version>
        <type>jar</type>
    </dependency>
</dependencies>
  • 需要对Hive做一定的配置(在hive-site.xml文件中):
<property>  
<name>hive.in.test</name>  
<value>true</value>  
</property>
  • 需要使用下面的语句在hive中创建表:
create table wordcount
(word string,total int)
clustered by (word) into 10 buckets
stored as orc TBLPROPERTIES('transactional'='true');
  • 启动metastore服务:hive –service metastore
  • 开发新的bolt组件,用于将前一个bolt处理的结果写入Hive
private static IRichBolt createHiveBolt(){
    //设置环境变量,能找到winutils.exe
    System.setProperty("hadoop.home.dir", "D:\\tools\\hadoop-2.8.4");

    //作用:将bolt组件处理后的结果tuple,如何存入hive表
    DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper().withColumnFields(new Fields("word", "total"));

    //配置Hive的参数信息
    HiveOptions options = new HiveOptions("thrift://hadoop121:9083",//hive的metastore
                                          "default",//hive数据库的名字
                                          "wordcount",//保存数据的表                            mapper)
                            .withTxnsPerBatch(10)
                            .withBatchSize(1000)
                            .withIdleTimeout(10);

    //创建一个Hive的bolt组件,将单词计数后的结果存入hive
    HiveBolt bolt = new HiveBolt(options);
    return bolt;
}
  • 为了测试的方便,我们依然采用之前随机产生字符串的Spout组件产生数据

7、与JMS集成

JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持

JMS的两种消息类型:Queue和Topic
基于Weblogic的JMS架构 :

基于Weblogic的JMS架构

private static IRichBolt createJMSBolt(){
    //创建一个JMS Bolt,将前一个bolt发送过来的数据 写入JMS
    JmsBolt bolt = new JmsBolt();

    //指定JMSBolt的provider
    bolt.setJmsProvider(new MyJMSProvider());

    //指定bolt如何解析信息
    bolt.setJmsMessageProducer(new JmsMessageProducer(){

        @Override
        public Message toMessage(Session session, ITuple tuple) throws JMSException {
            //取出上一个组件发送过来的数据
            String word = tuple.getStringByField("word");
            int total = tuple.getIntegerByField("total");
            return session.createTextMessage(word + "   " + total);
        }
    });
    return bolt;
}
  • 需要的weblogic的jar包

wljmsclient.jar
wlclient.jar

  • permission javax.management.MBeanTrustPermission “register”;

文章作者: 谢舟
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 谢舟 !
 上一篇
Storm练习 Storm练习
Storm练习 一、需求需求:统计网站访问量(实时统计) 技术选型:特点(数据量大、做计算、实时) 实时计算框架:storm1)spout 数据源,接入数据 本地文件 2)bolt 业务逻辑处理 切分数据 查到
2019-05-14
下一篇 
私有网络软件仓库搭建和挂载网络系统镜像 私有网络软件仓库搭建和挂载网络系统镜像
一、私有网络软件仓库在集群安装的过程中,要求每个节点都必须挂载光驱, 而对于每台节点都手动的去挂载光驱太麻烦,也不方便。这里使用每个节点都指向同一个私有网络镜像来解决这个问题 我们的集群采用的是全离线安装,也不可能逐个节点的安装,同样是也使
2019-05-10
  目录