离线部分
Hadoop->离线计算(hdfs / mapreduce) yarn
zookeeper->分布式协调(动物管理员)
hive->数据仓库(离线计算 / sql)easy coding
flume->数据采集
sqoop->数据迁移mysql->hdfs/hive hdfs/hive->mysql
Azkaban->任务调度工具
hbase->数据库(nosql)列式存储 读写速度
实时
kafka
storm
一、Kafka是什么
kafka一般用来缓存数据
1、开源消息系统
2、最初是LinkedIn公司开发,2011年开源
2012年10月从Apache Incubator毕业
项目目标是为处理实时数据,提供一个统一、高通量、低等待的平台
3、Kafka是一个分布式消息队列
消息根据Topic来归类,发送消息 Producer,接收 Consumer
kafka集群有多个kafka实例组成,每个实例成为broker
4、依赖于 Zookeeper 集群
无论是kafka集群,还是 Producer、Consumer 都依赖于 Zookeeper 集群保存元信息,来保证系统可用性
官网
http://kafka.apache.org/
ApacheKafka?是一个分布式流媒体平台
流媒体平台有三个关键功能:
- 发布和订阅记录流,类似于消息队列或企业消息传递系统
- 以容错的持久方式存储记录流
- 记录发生时处理流
Kafka通常用于两大类应用:
构建可在系统或应用程序之间可靠获取数据的实时流数据管道
构建转换或响应数据流的实时流应用程序
kafka在流计算中,kafka主要功能是用来缓存数据,storm可以通过消费kafka中的数据进行流计算,是一套开源的消息系统,由scala写成,支持javaAPI。
kafka最初由LinkedIn公司开发,2011年开源,2012年从Apache毕业,是一个分布式消息队列,kafka读消息保存采用Topic进行归类
二、消息队列
点对点
发布、订阅模式
角色
发送消息:Producer(生产者)
接收消息:Consumer(消费者)
三、为什么需要消息队列
1、解耦
为了避免出现问题
2、冗余
消息队列把数据进行持久化,直到他们已经被完全处理
3、扩展性(拓展性)
可增加处理过程
4、灵活性
面对访问量剧增,不会因为超负荷请求而完全瘫痪
5、可恢复性
一部分组件失效,不会影响整个系统。可以进行恢复
6、顺序保证(相对)
kafka保证一个Partition内部的消息有序,对消息进行有序处理
7、缓冲
控制数据流经过系统的速度
8、异步通信
akka,消息队列提供了异步处理的机制
很多时候,用户不想也不需要立即处理消息,消息队列提供异步处理机制,允许用户把消息放入队列,但不立即处理
四、Kafka架构
1、Producer:消息生产者
就是往kafka中发消息的客户端
2、Consumer:消息消费者
向kafka broker中取消息的客户端
3、Topic 理解为队列
4、Consumer Group 消费者组
组内有多个消费者实例,共享一个公共的ID,即groupID
组内所有消费者协调在一起,消费Topic
每个分区,只能有同一个消费组内的一个Consumer消费
5、broker
一台kafka服务器就是一个broker
6、partition:一个topic分为多个partition
每个partition是一个有序队列
kafka保证按一个partition中的顺序将消息发送个consumer
不能保证topic整体有序
7、offset:Kafka存储文件按照offset.kafka命名
五、Kafka部署
前提:Zookeeper
官网下载安装包
http://kafka.apache.org/downloads
上传tar
解压
[root@hsiehchou121 hd]# tar -zxvf kafka_2.11-2.1.1.tgz
[root@hsiehchou121 hd]# mv kafka_2.11-2.1.1 kafka
在kafka目录中,创建一个logs文件夹
[root@hsiehchou121 kafka]# mkdir logs
如果不创建,默认放在 /tmp 目录下
修改文件
config/server.properties
21 broker.id=0
broker 的 全局唯一编号,不能重复
新增
22 delete.topic.enable=true
允许删除topic
#
The number of threads that the server uses for receiving requests from the network and sending responses to the network
42 num.network.threads=3
处理网络请求的线程数量
#
The number of threads that the server uses for processing requests, which may include disk I/O
46 num.io.threads=8
用来处理磁盘IO的线程数量
#
A comma separated list of directories under which to store log files
62 log.dirs=/root/hd/kafka/logs
kafka运行日志存放的路径
#
The default number of log partitions per topic. More partitions allow greater#
parallelism for consumption, but this will also result in more files across#
the brokers.
67 num.partitions=1
当前主题在broker上的分区个数
#
· The number of threads per data directory to be used for log recovery at start up and flushing at shutdown.#
This value is recommended to be increased for installations with data dirs lo cated in RAID array.
71 num.recovery.threads.per.data.dir=1
恢复和清理data下的线程数量
125 zookeeper.connect=hsiehchou121:2181,hsiehchou122:2181,hsiehchou123:2181
zookeeper相关信息
#
Timeout in ms for connecting to zookeeper
128 zookeeper.connection.timeout.ms=6000
zookeeper连接超时的时间
添加全局环境变量,以便于在任何地方都可以启动
[root@hsiehchou121 config]# vi /etc/profile#
kafka_home
export KAFKA_HOME=/root/hd/kafka
export PATH=$KAFKA_HOME/bin:$PATH
[root@hsiehchou121 config]# source /etc/profile
分发安装包
注意:要修改配置文件中 borker.id的值
broker id 不得重复
21 broker.id=0 —- hsiehchou121
21 broker.id=1 —- hsiehchou122
21 broker.id=2 —- hsiehchou123
启动kafka集群
首先需要先启动zookeeper
zkServer.sh start
[root@hsiehchou121 kafka]# zkServer.sh start
[root@hsiehchou122 kafka]# zkServer.sh start
[root@hsiehchou123 kafka]# zkServer.sh start
(必须先启动!!!!!!!!!)
./bin/kafka-server-start.sh config/server.properties &
&====后台运行
[root@hsiehchou121 kafka]# ./bin/kafka-server-start.sh config/server.properties &
[root@hsiehchou122 kafka]# ./bin/kafka-server-start.sh config/server.properties &
[root@hsiehchou123 kafka]# ./bin/kafka-server-start.sh config/server.properties &
jps命令可以看Kafka进程
[root@hsiehchou121 kafka]# jps
7104 Jps
6742 Kafka
6715 QuorumPeerMain
关闭命令:
./bin/kafka-server-stop.sh stop
或者./bin/kafka-server-stop.sh
六、Kafka命令行操作
1、查看当前服务器中所有的topic
[root@hsiehchou121 kafka]# ./bin/kafka-topics.sh --zookeeper
hsiehchou121:2181 --list
2、创建topic
[root@hsiehchou121 kafka]# ./bin/kafka-topics.sh --zookeeper
hsiehchou121:2181 --create
--replication-factor
3 --partitions
1 --topic
second
[root@hsiehchou121 kafka]# ./bin/kafka-topics.sh --zookeeper
hsiehchou121:2181 --create
--replication-factor
3 --partitions
1 --topic
xz
--zookeeper
:连接zk集群--create
:创建--replication-factor
: 副本--partitions
:分区--topic
:主题名
3、删除topic
[root@hsiehchou121 kafka]# ./bin/kafka-topics.sh --zookeeper
hsiehchou121:2181 --delete
--topic
xz
4、发送消息
[root@hsiehchou121 kafka]# ./bin/kafka-console-producer.sh --broker-list
hsiehchou121:9092 --topic
second>
输入消息
5、消费消息
[root@hsiehchou122 kafka]# ./bin/kafka-console-consumer.sh --bootstrap-server
hsiehchou121:9092 --from-beginning
--topic
second
接收消息
注意:这里的--from-beginning
是从头开始消费,不加则是消费当前正在发送到该topic的消息
6、查看主题描述信息
[root@hsiehchou121 kafka]# ./bin/kafka-topics.sh –zookeeper hsiehchou121:2181 –describe –topic second
Topic:second PartitionCount:1 ReplicationFactor:3 Configs:
Topic: second Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
7、消费者组消费
修改 consumer.properties 配置文件
consumer group id
group.id=xz
8、启动生产者
[root@hsiehchou121 kafka]# ./bin/kafka-console-producer.sh --broker-list
hsiehchou121:9092 --topic
second
9、启动消费者
[root@hsiehchou122 kafka]# ./bin/kafka-console-consumer.sh --bootstrap-server
hsiehchou121:9092 --topic
second --consumer.config
config/consumer.properties
七、Kafka工作流程分析
1、Kafka生产过程分析
1)写入方式
producer采用推(push)模式将消息发布到broker,每条消息都被追加(append)到分区(patition)中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障kafka吞吐率)
2) 分区(Partition)
Kafka集群有多个消息代理服务器(broker-server)组成,发布到Kafka集群的每条消息都有一个类别,用主题(topic)来表示。通常,不同应用产生不同类型的数据,可以设置不同的主题。一个主题一般会有多个消息的订阅者,当生产者发布消息到某个主题时,订阅了这个主题的消费者都可以接收到生成者写入的新消息
Kafka集群为每个主题维护了分布式的分区(partition)日志文件,物理意义上可以把主题(topic)看作进行了分区的日志文件(partition log)。主题的每个分区都是一个有序的、不可变的记录序列,新的消息会不断追加到日志中。分区中的每条消息都会按照时间顺序分配到一个单调递增的顺序编号,叫做偏移量(offset),这个偏移量能够唯一地定位当前分区中的每一条消息
消息发送时都被发送到一个topic,其本质就是一个目录,而topic是由一些Partition Logs(分区日志)组成,其组织结构如下图所示:
下图中的topic有3个分区,每个分区的偏移量都从0开始,不同分区之间的偏移量都是独立的,不会相互影响
我们可以看到,每个Partition中的消息都是有序的,生产的消息被不断追加到Partition log上,其中的每一个消息都被赋予了一个唯一的offset值
发布到Kafka主题的每条消息包括键值和时间戳。消息到达服务器端的指定分区后,都会分配到一个自增的偏移量。原始的消息内容和分配的偏移量以及其他一些元数据信息最后都会存储到分区日志文件中。消息的键也可以不用设置,这种情况下消息会均衡地分布到不同的分区
- 分区的原因
(1)方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了。
(2)可以提高并发,因为可以以Partition为单位读写了。
传统消息系统在服务端保持消息的顺序,如果有多个消费者消费同一个消息队列,服务端会以消费存储的顺序依次发送给消费者。但由于消息是异步发送给消费者的,消息到达消费者的顺序可能是无序的,这就意味着在并行消费时,传统消息系统无法很好地保证消息被顺序处理。虽然我们可以设置一个专用的消费者只消费一个队列,以此来解决消息顺序的问题,但是这就使得消费处理无法真正执行。
Kafka比传统消息系统有更强的顺序性保证,它使用主题的分区作为消息处理的并行单元。Kafka以分区作为最小的粒度,将每个分区分配给消费者组中不同的而且是唯一的消费者,并确保一个分区只属于一个消费者,即这个消费者就是这个分区的唯一读取线程。那么,只要分区的消息是有序的,消费者处理的消息顺序就有保证。每个主题有多个分区,不同的消费者处理不同的分区,所以Kafka不仅保证了消息的有序性,也做到了消费者的负载均衡。 - 分区的原则
(1)指定了patition,则直接使用;
(2)未指定patition但指定key,通过对key的value进行hash出一个patition
(3)patition和key都未指定,使用轮询选出一个patition。
DefaultPartitioner类
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
3)副本(Replication)
同一个partition可能会有多个replication(对应 server.properties 配置中的 default.replication.factor=N)。没有replication的情况下,一旦broker 宕机,其上所有 patition 的数据都不可被消费,同时producer也不能再将数据存于其上的patition。引入replication之后,同一个partition可能会有多个replication,而这时需要在这些replication之间选出一个leader,producer和consumer只与这个leader交互,其它replication作为follower从leader 中复制数据
4) 写入流程
producer写入消息流程如下:
- producer先从zookeeper的 “/brokers/…/state”节点找到该partition的leader
- producer将消息发送给该leader
- leader将消息写入本地log
- followers从leader pull消息,写入本地log后向leader发送ACK
- leader收到所有ISR中的replication的ACK后,增加HW(high watermark,最后commit 的offset)并向producer发送ACK
2、Broker 保存消息
1)存储方式
物理上把topic分成一个或多个patition(对应 server.properties 中的num.partitions=3配置),每个patition物理上对应一个文件夹(该文件夹存储该patition的所有消息和索引文件),如下:
[root @hsiehchou121 logs]$ ll
drwxrwxr-x. 2 root root 4096 4月 6 14:37 first-0
drwxrwxr-x. 2 root root 4096 4月 6 14:35 first-1
drwxrwxr-x. 2 root root 4096 4月 6 14:37 first-2
[root @hsiehchou121 logs]$ cd first-0
[root @hsiehchou121 first-0]$ ll
-rw-rw-r--. 1 root root 10485760 4月 6 14:33 00000000000000000000.index
-rw-rw-r--. 1 root root 219 4月 6 15:07 00000000000000000000.log
-rw-rw-r--. 1 root root 10485756 4月 6 14:33 00000000000000000000.timeindex
-rw-rw-r--. 1 root root 8 4月 6 14:37 leader-epoch-checkpoint
2)存储策略
无论消息是否被消费,kafka都会保留所有消息。有两种策略可以删除旧数据:
- 基于时间:log.retention.hours=168
- 基于大小:log.retention.bytes=1073741824
需要注意的是,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高 Kafka 性能无关。
3)Zookeeper存储结构
注意:producer不在zk中注册,消费者在zk中注册
3、Kafka消费过程分析
kafka提供了两套consumer API:高级Consumer API和低级API
1)消费模型
消息由生产者发布到Kafka集群后,会被消费者消费。消息的消费模型有两种:推送模型(push)和拉取模型(pull)
基于推送模型(push)的消息系统,由消息代理记录消费者的消费状态。消息代理在将消息推送到消费者后,标记这条消息为已消费,但这种方式无法很好地保证消息被处理。比如,消息代理把消息发送出去后,当消费进程挂掉或者由于网络原因没有收到这条消息时,就有可能造成消息丢失(因为消息代理已经把这条消息标记为已消费了,但实际上这条消息并没有被实际处理)。如果要保证消息被处理,消息代理发送完消息后,要设置状态为“已发送”,只有收到消费者的确认请求后才更新为“已消费”,这就需要消息代理中记录所有的消费状态,这种做法显然是不可取的
Kafka采用拉取模型,由消费者自己记录消费状态,每个消费者互相独立地顺序读取每个分区的消息。如下图所示,有两个消费者(不同消费者组)拉取同一个主题的消息,消费者A的消费进度是3,消费者B的消费进度是6。消费者拉取的最大上限通过最高水位(watermark)控制,生产者最新写入的消息如果还没有达到备份数量,对消费者是不可见的。这种由消费者控制偏移量的优点是:消费者可以按照任意的顺序消费消息。比如,消费者可以重置到旧的偏移量,重新处理之前已经消费过的消息;或者直接跳到最近的位置,从当前的时刻开始消费
在一些消息系统中,消息代理会在消息被消费之后立即删除消息。如果有不同类型的消费者订阅同一个主题,消息代理可能需要冗余地存储同一消息;或者等所有消费者都消费完才删除,这就需要消息代理跟踪每个消费者的消费状态,这种设计很大程度上限制了消息系统的整体吞吐量和处理延迟。Kafka的做法是生产者发布的所有消息会一致保存在Kafka集群中,不管消息有没有被消费。用户可以通过设置保留时间来清理过期的数据,比如,设置保留策略为两天。那么,在消息发布之后,它可以被不同的消费者消费,在两天之后,过期的消息就会自动清理掉
2)高级API
- 高级API优点
高级API 写起来简单
不需要自行去管理offset,系统通过zookeeper自行管理。
不需要管理分区,副本等情况,.系统自动管理。
消费者断线会自动根据上一次记录在zookeeper中的offset去接着获取数据(默认设置1分钟更新一下zookeeper中存的offset)
可以使用group来区分对同一个topic 的不同程序访问分离开来(不同的group记录不同的offset,这样不同程序读取同一个topic才不会因为offset互相影响) - 高级API缺点
不能自行控制offset(对于某些特殊需求来说)
不能细化控制如分区、副本、zk等
3)低级API
- 低级 API 优点
能够让开发者自己控制offset,想从哪里读取就从哪里读取。
自行控制连接分区,对分区自定义进行负载均衡
对zookeeper的依赖性降低(如:offset不一定非要靠zk存储,自行存储offset即可,比如存在文件或者内存中) - 低级API缺点
太过复杂,需要自行控制offset,连接哪个分区,找到分区leader 等。
4)消费者组
消费者是以consumer group消费者组的方式工作,由一个或者多个消费者组成一个组,共同消费一个topic。每个分区在同一时间只能由group中的一个消费者读取,但是多个group可以同时消费这个partition。在图中,有一个由三个消费者组成的group,有一个消费者读取主题中的两个分区,另外两个分别读取一个分区。某个消费者读取某个分区,也可以叫做某个消费者是某个分区的拥有者
在这种情况下,消费者可以通过水平扩展的方式同时读取大量的消息。另外,如果一个消费者失败了,那么其他的group成员会自动负载均衡读取之前失败的消费者读取的分区
5)消费方式
consumer采用pull(拉)模式从broker中读取数据
push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息
对于Kafka而言,pull模式更合适,它可简化broker的设计,consumer可自主控制消费消息的速率,同时consumer可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义
pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直等待数据到达。为了避免这种情况,我们在我们的拉请求中有参数,允许消费者请求在等待数据到达的“长轮询”中进行阻塞(并且可选地等待到给定的字节数,以确保大的传输大小)
6) 消费者组案例
- 需求:测试同一个消费者组中的消费者,同一时刻只能有一个消费者消费
- 案例实操
(1)在hsiehchou121、hsiehchou122上修改/root/hd/kafka/config/consumer.properties配置文件中的group.id属性为任意组名
[root@hsiehchou122 config]$ vi consumer.properties
group.id=root
规划:hsiehchou121生产者,hsiehchou122消费者,hsiehchou123消费者
(2)在hsiehchou122、hsiehchou123上分别启动消费者
[root@hsiehchou122 kafka]$ ./bin/kafka-console-consumer.sh --bootstrap-server hsiehchou121:9092 --topic second --consumer.config config/consumer.properties
[root@hsiehchou123 kafka]$ ./bin/kafka-console-consumer.sh --bootstrap-server hsiehchou121:9092 --topic second --consumer.config config/consumer.properties
(3)在hsiehchou121上启动生产者
[root@hsiehchou121 kafka]$ bin/kafka-console-producer.sh --broker-list hsiehchou121:9092 --topic first
>hello world
(4)查看hsiehchou122和hsiehchou123的接收者
同一时刻只有一个消费者接收到消息
八、 Kafka API实战
1、 环境准备
1)在eclipse中创建一个java工程
2)在工程的根目录创建一个lib文件夹
3)解压kafka安装包,将安装包libs目录下的jar包拷贝到工程的lib目录下,并build path
4)启动zk和kafka集群,在kafka集群中打开一个消费者
[root@hsiehchou121 kafka]$ bin/kafka-console-consumer.sh --zookeeper hsiehchou121:2181 --topic first
2、Kafka生产者Java API
1)创建生产者
Producer1 类
package com.hsiehchou.kafka.kafka_producer;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* kafka
* @author hsiehchou
*/
public class Producer1 {
public static void main(String[] args) {
//1.配置生产者的属性
Properties prop = new Properties();
//2.参数配置
//kafka节点的地址,Kafka服务端的主机名和端口号
prop.put("bootstrap.servers", "192.168.116.121:9092");
//发送消息是否等待应答,等待所有副本节点的应答
prop.put("acks", "all");
//配置发送消息失败重试,消息发送最大尝试次数
prop.put("retries", "0");
//配置批量处理消息的大小,一批消息处理大小
prop.put("batch.size", "16384");
//配置批量处理数据的延迟,请求延时
prop.put("linger.ms", "1");
//配置内存缓冲区的大小,发送缓存区内存大小
prop.put("buffer.memory", 33445552);
//消息在发送前必须要序列化,key序列化
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//3.实例化producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
//4.发送消息
for (int i = 0; i < 50; i++) {
producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), "hello world-" + i));
}
//5.关闭资源
producer.close();
}
}
[root@hsiehchou122 kafka]# ./bin/kafka-console-consumer.sh --bootstrap-server
hsiehchou121:9092 --topic
first --consumer.config
config/consumer.properties
hello world-0
hello world-1
hello world-2
hello world-3......
hello world-49
2)创建生产者带回调函数
Producer2类
package com.hsiehchou.kafka.kafka_producer;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class Producer2 {
public static void main(String[] args) {
//1.配置生产者的属性
Properties prop = new Properties();
//2.参数配置
//kafka节点的地址,Kafka服务端的主机名和端口号
prop.put("bootstrap.servers", "192.168.116.121:9092");
//发送消息是否等待应答,等待所有副本节点的应答
prop.put("acks", "all");
//配置发送消息失败重试,消息发送最大尝试次数
prop.put("retries", "0");
//配置批量处理消息的大小,一批消息处理大小
prop.put("batch.size", "16384");
//配置批量处理数据的延迟,请求延时
prop.put("linger.ms", "1");
//配置内存缓冲区的大小,发送缓存区内存大小
prop.put("buffer.memory", 33445552);
//消息在发送前必须要序列化,key序列化
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//3.实例化producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
//4.发送消息
for (int i = 0; i < 50; i++) {
producer.send(new ProducerRecord<String, String>("first", Integer.toString(i), "hello world-" + i), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
//如果metadata不为null,拿到当前的数据偏移量与分区
if(metadata != null) {
System.out.println(metadata.topic() + "----" + metadata.offset() + "----" + metadata.partition());
}
}
});
}
//5.关闭资源
producer.close();
}
}
[root@hsiehchou122 kafka]# ./bin/kafka-console-consumer.sh --bootstrap-server
hsiehchou121:9092 --topic
three --consumer.config
config/consumer.properties
控制台输出:
first----
00----
0
first----
01----
0
first----
02----
0
first----
03----
0......
first----
48----
0
first----
49----
0
3)自定义分区生产者
Partition1 类
package com.hsiehchou.kafka.kafka_producer;
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
/**
* 自定义分区
* @author hsiehchou
*/
public class Partition1 implements Partitioner{
//设置
public void configure(Map<String, ?> configs) {
}
//分区逻辑,控制分区
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return 2;
}
//释放资源
public void close() {
}
}
Producer2类中新增
prop.put("partitioner.class", "com.hsiehchou.kafka.kafka_producer.Partition1");
[root@hsiehchou122 kafka]# ./bin/kafka-console-consumer.sh --bootstrap-server
hsiehchou121:9092 --topic
three --consumer.config
config/consumer.properties
first—–1—-2
first—–1—-2
first—–1—-2......
first—–1—-2
first—–1—-2
3、Kafka消费者Java API
Consumer1 类
package com.hsiehchou.kafka.kafka_consumer;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
/**
* 创建消费者
* @author hsiehchou
*/
public class Consumer1 {
public static void main(String[] args) {
//配置消费者属性
Properties prop = new Properties();
//2.参数配置
//kafka节点的地址,Kafka服务端的主机名和端口号
prop.put("bootstrap.servers", "192.168.116.122:9092");
//配置消费者组
prop.put("group.id", "g1");
//配置是否自动确认offset
prop.put("enable.auto.commit", "true");
//序列化
prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//3.实例消费者,定义consumer
final KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
//释放资源
//5.释放资源,线程安全
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
if(consumer != null) {
consumer.close();
}
}
}));
//订阅消息
consumer.subscribe(Arrays.asList("first"));
//4.拉消息 推push 拉poll
while(true) {
ConsumerRecords<String, String> records = consumer.poll(100);
//遍历消息
for(ConsumerRecord<String, String> record:records) {
System.out.println(record.topic() + "--------" + record.value());
}
}
}
}
Producer1 类
跟之前的一样,此处省略
strong text启动kafka集群
[root@hsiehchou121 kafka]# ./bin/kafka-server-start.sh config/server.properties &
[root@hsiehchou122 kafka]# ./bin/kafka-server-start.sh config/server.properties &
[root@hsiehchou123 kafka]# ./bin/kafka-server-start.sh config/server.properties &
分别对Consumer1类和Producer1类依次进行Run as Java Application
控制台输出
first——–1556248624834-hello world-0
first——–1556248625017-hello world-1
first——–1556248625017-hello world-2
first——–1556248625017-hello world-3......
first——–1556248625021-hello world-48
first——–1556248625021-hello world-49
九、Kafka producer拦截器(interceptor)
1、拦截器原理
Producer拦截器(interceptor)是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑
对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:
1)configure(configs)
获取配置信息和初始化数据时调用
2)onSend(ProducerRecord)
该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中。Producer确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算
3)onAcknowledgement(RecordMetadata, Exception)
该方法会在消息被应答或消息发送失败时调用,并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率
4)close
关闭interceptor,主要用于执行一些资源清理工作
如前所述,interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意
2、拦截器案例
1)需求:
实现一个简单的双interceptor组成的拦截链。第一个interceptor会在消息发送前将时间戳信息加到消息value的最前部;第二个interceptor会在消息发送后更新成功发送消息数或失败发送消息数。
2)案例实操
(1)增加时间戳拦截器
TimeInterceptor 类
package com.hsiehchou.kafka.interceptor;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class TimeInterceptor implements ProducerInterceptor<String, String> {
//配置信息
public void configure(Map<String, ?> configs) {
}
//业务逻辑
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return new ProducerRecord<String, String>(
record.topic(),
record.partition(),
record.timestamp(),
record.key(),
System.currentTimeMillis() + "-" + record.value()
);
}
//发送失败调用
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
//关闭资源
public void close() {
}
}
(2)统计发送消息成功和发送失败消息数,并在producer关闭时打印这两个计数器
CounterInterceptor 类
package com.hsiehchou.kafka.interceptor;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class CounterInterceptor implements ProducerInterceptor<String, String> {
private int errorCounter = 0;
private int successCounter = 0;
public void configure(Map<String, ?> configs) {
}
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return record;
}
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
//统计成功和失败的次数
if(exception == null) {
successCounter++;
}else {
errorCounter++;
}
}
public void close() {
//保存结果
System.out.println("Successful sent:" + successCounter);
System.out.println("Failed sent:" + errorCounter);
}
}
在Producer1类中增加
//拦截器
ArrayList<String> inList = new ArrayList<String>();
inList.add("com.hsiehchou.kafka.interceptor.TimeInterceptor");
inList.add("com.hsiehchou.kafka.interceptor.CounterInterceptor");
prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, inList);
测试一:
[root@hsiehchou122 kafka]# ./bin/kafka-server-start.sh config/server.properties &
--from beginning
是从头开始消费,不加则是消费当前正在发送到该topic的消息
[root@hsiehchou123 kafka]#./bin/kafka-console-consumer.sh --bootstrap-server
hsiehchou121:9092 --topic
first --from-beginning
测试二:
[root@hsiehchou122 kafka]# ./bin/kafka-server-start.sh config/server.properties &
[root@hsiehchou122 kafka]# ./bin/kafka-console-consumer.sh --bootstrap-server
hsiehchou121:9092 --topic
first --consumer.config
config/consumer.properties
结果:
1556253447912-hello world-0
1556253448100-hello world-1
1556253448100-hello world-2
1556253448100-hello world-3
1556253448100-hello world-4......
1556253448100-hello world-12
1556253448100-hello world-13
1556253448104-hello world-14
1556253448104-hello world-15
1556253448104-hello world-16......
1556253448104-hello world-48
1556253448104-hello world-49
Successful sent:50
Failed sent:0
十、Kafka Streams
1、概述
1)Kafka Streams
Kafka Streams。Apache Kafka开源项目的一个组成部分。是一个功能强大,易于使用的库。用于在Kafka上构建高可分布式、拓展性,容错的应用程序
2)Kafka Streams特点
- 功能强大
高扩展性,弹性,容错 - 轻量级
无需专门的集群
一个库,而不是框架 - 完全集成
100%的Kafka 0.10.0版本兼容
易于集成到现有的应用程序 - 实时性
毫秒级延迟
并非微批处理
窗口允许乱序数据
允许迟到数据
3)为什么要有Kafka Stream
当前已经有非常多的流式处理系统,最知名且应用最多的开源流式处理系统有Spark Streaming和Apache Storm。Apache Storm发展多年,应用广泛,提供记录级别的处理能力,当前也支持SQL on Stream。而Spark Streaming基于Apache Spark,可以非常方便与图计算,SQL处理等集成,功能强大,对于熟悉其它Spark应用开发的用户而言使用门槛低。另外,目前主流的Hadoop发行版,如Cloudera和Hortonworks,都集成了Apache Storm和Apache Spark,使得部署更容易
既然Apache Spark与Apache Storm拥用如此多的优势,那为何还需要Kafka Stream呢?主要有如下原因
第一,Spark和Storm都是流式处理框架,而Kafka Stream提供的是一个基于Kafka的流式处理类库。框架要求开发者按照特定的方式去开发逻辑部分,供框架调用。开发者很难了解框架的具体运行方式,从而使得调试成本高,并且使用受限。而Kafka Stream作为流式处理类库,直接提供具体的类给开发者调用,整个应用的运行方式主要由开发者控制,方便使用和调试
第二,虽然Cloudera与Hortonworks方便了Storm和Spark的部署,但是这些框架的部署仍然相对复杂。而Kafka Stream作为类库,可以非常方便的嵌入应用程序中,它对应用的打包和部署基本没有任何要求
第三,就流式处理系统而言,基本都支持Kafka作为数据源。例如Storm具有专门的kafka-spout,而Spark也提供专门的spark-streaming-kafka模块。事实上,Kafka基本上是主流的流式处理系统的标准数据源。换言之,大部分流式系统中都已部署了Kafka,此时使用Kafka Stream的成本非常低
第四,使用Storm或Spark Streaming时,需要为框架本身的进程预留资源,如Storm的supervisor和Spark on YARN的node manager。即使对于应用实例而言,框架本身也会占用部分资源,如Spark Streaming需要为shuffle和storage预留内存。但是Kafka作为类库不占用系统资源
第五,由于Kafka本身提供数据持久化,因此Kafka Stream提供滚动部署和滚动升级以及重新计算的能力
第六,由于Kafka Consumer Rebalance机制,Kafka Stream可以在线动态调整并行度
2、Kafka Stream数据清洗案例
1)需求:
实时处理单词带有”>>>”前缀的内容。例如输入”itstar>>>ximenqing”,最终处理成“ximenqing”
2)需求分析:
3)案例实操
- 创建一个工程,并添加jar包
- 创建主类
Application类
package com.hsiehchou.kafka.kafka_stream;
import java.util.Properties;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
/**
*
* 需求:对数据进行清洗操作
* 思路:xie-hs 把-清洗掉
* @author hsiehchou
*/
public class Application {
public static void main(String[] args) {
//1.定义主题 发送到 另外一个主题中 数据清洗
String oneTopic = "t1";
String twoTopic = "t1";
//2.设置参数
Properties prop = new Properties();
prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "logProcessor");
prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.116.121:9092,192.168.116.122:9092,192.168.116.123:9092");
//3.实例化对象
StreamsConfig config = new StreamsConfig(prop);
//4. 流计算 拓扑
Topology builder = new Topology();
//5.定义kafka组件数据源
builder.addSource("Source", oneTopic).addProcessor("Processor", new ProcessorSupplier<byte[], byte[]>() {
public Processor<byte[], byte[]> get() {
return new LogProcesser();
}
//从哪里来
}, "Source")
//到哪里去
.addSink("Sink", twoTopic, "Processor");
//6.实例化KafkaStream
KafkaStreams kafkaStreams = new KafkaStreams(builder, prop);
kafkaStreams.start();
}
}
LogPRocessor类
package com.hsiehchou.kafka.kafka_stream;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
public class LogProcesser implements Processor<byte[], byte[]>{
private ProcessorContext context;
//初始化
public void init(ProcessorContext context) {
//传输
this.context = context;
}
//业务逻辑
public void process(byte[] key, byte[] value) {
//1.拿到消息数据
String message = new String(value);
//2.如果包含 - 去除
if(message.contains("-")) {
//3.把-去掉之后去掉左侧数据
message = message.split("-")[1];
//4.发送数据
context.forward(key, message.getBytes());
}
}
//释放资源
public void close() {
}
}
[root@hsiehchou121 kafka]#./bin/kafka-topics.sh --zookeeper
hsiehchou121:2181 --create
--replication-factor
3 --partitions
1 --topic
t1
[root@hsiehchou122 kafka]# ./bin/kafka-console-consumer.sh --bootstrap-server
hsiehchou122:9092 --topic
t2 --from-beginning
--consumer.config
config/consumer.properties
[root@hsiehchou121 kafka]# ./bin/kafka-console-producer.sh --broker-list
hsiehchou121:9092 --topic
t1