Kafka简介
Kafka是一种分布式的,基于发布/订阅的消息系统,原本开发自Linkedln,用作Linkedln的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础。
- Kafka被用于构建实时数据管道和流处理,支持横向扩展、容错、极快,能够在上千台服务器上运行。
- Apache Kafka是开源消息系统,由Scala写成,是由Apache软件基金会开发的一个开源消息系统项目。Kafka最初是由Linkedln开发,并于2011年开源。2012年10月从Apache lncudator毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。
- Kafka是一个分布式消息队列:生产者、消费者的功能。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范实现
生产者消费者模式
生产者消费者问题,也称为有限缓冲问题,是一个多线程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个线程—所谓的“生产者”和“消费者”—-在实际运行时会发生的问题。生产者的主要作用时生产一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓存区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满的时候加入数据,消费者也不会在缓冲区空时消耗数据。
Kafka安装
安装kafka配置
- 下载kafka_**.tgz,解压
- 配置KAFKA_HOME
开启:
- 开启zookeeper,需要一直在服务器端实时运行,以守护进程运行
1
/root/bigdata/kafka/bin/zookeeper-server-start.sh -daemon /root/bigdata/kafka/config/zookeeper.properties
- 开启kafka服务:
1
/root/bigdata/kafka/bin/kafka-server-start.sh /root/bigdata/kafka/config/server.properties
- 开启生产这与消费者
1
2
3
4开启消息生产者
/root/bigdata/kafka/bin/kafka-console-producer.sh --broker-list 192.168.19.137:9092 --sync --topic click-trace
开启消费者
/root/bigdata/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.19.137:9092 --topic click-traceKafka架构
一个典型的Kafka集群中包含若干Producer(可以时web前端生产的Page View,或者是服务器日志、系统CPU、Menory等),若干个Broker(Kafka支持水平扩展,一般Broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。
- Broker:Kafka集群包含一个或多个服务器,这种服务器被称为broker
- 作用:接收Producer和Consumer的请求,并把Message持久化到本地磁盘
- Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需要指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
- Partition:Partition是物理上的概念,每个Topic包含一个或多个Partition。
- Producer:负责发布消息到Kafka Broker
- Consumer:消息消费者,向Kafka Broker读取消息的客户端
- Consumer Group:每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
- Zookeeper:保存着集群broker、topic、partition等meta数据;负责broker故障发现,partition leader选举,负载均衡等功能
- 运行过程Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。
Kafka设计原理
Topic & Partition(存储设计原理)
- Topic在逻辑上可以被认为一个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue里。可以使得Kafka的吞吐率可以线性提高
- 物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这和Partition的所有消息和索引文件
- 若创建topic1和topic2两个topic,且分别有13个和19个分区,则整个集群上会相应生成共32个文件夹,
- 一般建议选择broker num * consumer num,这样平均每个consumer会同时读取broker数个partition,这些partition压力可以摊到每台broker上。
Partition的数据文件
- offset:offset表示Message在这个partition中的偏移量,offset不是该Message在partition数据文件中实际存储位置,而是逻辑上一个值,可以认为offset是partition中Message的id。
- MessageSize:MessageSize表示消息内容data的大小
- data:data为Message的具体内容
partition中每条Message包含三个属性:partition的数据文件由以上格式的Message组成,按offset由小到大排列在一起,如果一个partition多个数据文件,Kafka通过片段和索引来提高查询效率
- 数据文件分段segment:partition物理上由多个segment文件组成,每个segment大小相等,顺序读写。每个segment数据文件以该段中最小的offset命名,文件扩展名为.log。这样在查找指定offset的Message的时候,用而分查找就可以定位到该Message在哪个segment数据文件中。
- 数据文件索引:数据文件分段使得可以在一个较小的数据文件中查找对应offset的Messagel,但是这一让需要顺序扫描才能找到对应offset的Message。为了进一步提高查找的效率,Kafka为每个分段后的数据文件建立索引文件,文件名于数据文件的名字是一样的,只是文件扩展名为.index
索引文件中包含若干个索引条目,每个条目表示数据文件中一条Message的索引。索引包含两个部分,分别为相对offset和position(位置)
segment中index<—->data file对应关系物理结构如下:
- 查找过程:查找某个offset的消息,先二分法找出消息所在的segment文件;然后加载对应的index索引文件到内存,同样二分法找出小于等于给定offset的最大的那个offset记录;最后根据position到log文件中,顺序查找出offset等于给定offset值得消息
- 由于消息在partition的segment数据文件中是顺序读写的,且消息消费后不会删除(删除策略只针对过期的segment文件),这种顺序磁盘IO存储设计是Kafka高性能很重要的原因。
- Kafka运行时很少由大量读磁盘的操作,主要时定期批量写磁盘操作,因此操作磁盘很高效,这跟Kafka文件存储中读写message的设计是息息相关的。Kafka中读写message由如下特点:
- 写message(生产者)
- 消息从java堆转入page cache(即物理内存)。
- 由异步线程刷盘,消息从page cache刷入磁盘。
- 读message(消费者)
- 消息直接从page cache转入socket发送出去。
- 当从page cache没有找到相应数据时,此时会产生磁盘IO,从磁 盘Load消息到page cache,然后直接从socket发出去
- 写message(生产者)
- Kafka高效文件存储设计特点:
- Kafka把Topic中一个partition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完的文件,减少磁盘占用。
- 通过索引信息可以快速定位Message和确定response的最大大小
- 通过index元数据全部映射到memory,可以避免segment file的IO磁盘操作
- 通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小
Producer(生产者)
producer采用推(push)模式将消息发布到broker,每条消息都被追加(append)到分区(partition)中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保证kafka吞吐率)
- 创建一个ProducerRecord,这个对象需要包含消息的主题(Topic)和值(value),可以选择性指定一个键值(key)或分区(partition)。key和value序列化为ByteArrays,以便它们可以通过网络发送
- 发送到分配器(partitioner):如果我们指定了分区,那么分配器返回该分区即可;否则,分配器将会基于键值来选择一个分区并返回。选择完分区后,生产者知道了消息所属的主题和分区,它将这条记录添加到相同主题和分区的批量消息中,另一个线程负责发送这些批量消息到对应的Kafka broker。
- broker接收到数据的时候,如果数据已被成功写入到Kafka,会返回一个包含topic、分区和偏移量offset的RecordMetadata对象;
- 如果broker写入数据失败,会返回一个异常信息给生产者。当生产者接收到异常信息时会尝试重新发送数据,如果尝试失败则抛出异常。
Consumer Group
- 任何Consumer必须属于有一个Consumer Group
- 使用 Consumer high level API 时,同一 Topic 的一条消息只能被同一个 Consumer Group 内的一个 Consumer 消费,但多个 Consumer Group 可同时消费这一消息。
- 特点:Kafka 用来实现一个 Topic 消息的广播(发给所有的 Consumer)和单播(发给某一个 Consumer)的手段。一个 Topic 可以对应多个 Consumer Group。
- 如果需要实现广播,只要每个 Consumer 有一个独立的 Group 就可以了。
- 要实现单播只要所有的 Consumer 在同一个 Group 里。用 Consumer Group 还可以将 Consumer 进行自由的分组而不需要多次发送消息到不同的 Topic。
消费之后一般会删除已经被消费的消息,而Kafka集群会保留所有的消息,无论其被消费与否。当然,因为磁盘限制,不可能永久保留所有数据(实际上也没必要),因此Kafka提供两种策略删除旧数据。一是基于时间,二是基于Partition文件大小。例如可以通过配置$KAFKA_HOME/config/server.properties
,让Kafka删除一周前的数据,也可在Partition文件超过1GB时删除旧数据,配置如下所示。
1 | # The minimum age of a log file to be eligible for deletion |
Message Queue常见对比
- RabbitMQ:RabbitMQ 是使用 Erlang 编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量级,更适合于企业级的开发。同时实现了 Broker 构架,消息在发送给客户端时先在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。
- Kafka/Jafka
- 快速持久化,可以在 O(1) 的系统开销下进行消息持久化;
- 高吞吐;,完全的分布式系统,Broker、Producer、Consumer 都原生自动支持分布式,自动实现负载均衡;Kafka 相对于 ActiveMQ 是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。
Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么优缺点?
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
单机吞吐量 | 万级,比 RocketMQ、Kafka 低一个数量级 | 同 ActiveMQ | 10 万级,支撑高吞吐 | 10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景 |
topic 数量对吞吐量的影响 | topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topic | topic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源 | ||
时效性 | ms 级 | 微秒级,这是 RabbitMQ 的一大特点,延迟最低 | ms 级 | 延迟在 ms 级以内 |
可用性 | 高,基于主从架构实现高可用 | 同 ActiveMQ | 非常高,分布式架构 | 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 |
消息可靠性 | 有较低的概率丢失数据 | 经过参数优化配置,可以做到 0 丢失 | 同 RocketMQ | |
功能支持 | MQ 领域的功能极其完备 | 基于 erlang 开发,并发能力很强,性能极好,延时很低 | MQ 功能较为完善,还是分布式的,扩展性好 | 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用 |
如何选择MQ
- 所以中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择;
- 如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。
Kafka Python API
环境安装
- 下载安装kafka-python:
pip install kafka-python
- 验证是否安装成功:
import kafka
命令使用
- 创建topic命令
- bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 **–topic test ** –group testgroup
- replication-factor:副本数量
- partitions:分区数量
- group:创建分组
- 通过生产者发送消息
- bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test
- 通过消费者消费消息
- bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic test –from-beginning
- -from-beginning:从最开始生产队的数据开始消费
- 其它Kafka命令
- 查看所有topic
- bin/kafka-topics.sh –list –zookeeper localhost:2181
- 查看所有topic
Python 代码使用创建生产者
创建Kafka生产者有三个基本属性:
bootstrap.servers:属性值是一个host:port的broker列表。
key.serializer:因此需要将这些对象序列化成字节数组。key.serializer指定的类需要实现org.apache.kafka.common.serialization.Serializer接口,Kafka客户端包中包含了几个默认实现,例如ByteArraySerializer、StringSerializer和IntegerSerialier。
value.serializer:属性值是类的名称。这个属性指定了用来序列化消息记录的类,与key.serializer差不多。
命令行方式:普通的发送方式:
- 导入KafkaProducer,创建连接到192.168.19.137:9092
- 向这个Broker的Producer,循环向test_topic这个Topic发送100个消息,消息内容都是’some_message_bytes’,这种发送方式不指定Partition,kafka会均匀的把这些消息分别写入n个Partiton里面
1
2
3
4from kafka import KafkaProducer
'192.168.19.137:9092') producer = KafkaProducer(bootstrap_servers=
for _ in range(100):
'test_topic',b'some_message_bytes') producer.send(注:kafka发送的数据是字节,需要将字符串转换为字节
2、命令行方式:发送json字符串
1
2
3
4'node-teach:9092',value_serializer=lambda v: json.dumps(v).encode('utf-8')) producer = KafkaProducer(bootstrap_servers=
'test_topic', {'key1': 'value1'}) producer.send(
<kafka.producer.future.FutureRecordMetadata object at 0x2a9ebd0>
>>>3、命令行方式:发送普通字符串
compression_type:压缩类型,’gzip’
1
2'node-teach:9092',compression_type='gzip') producer = KafkaProducer(bootstrap_servers=
'test_topic', b'msg') producer.send(案例
目的:实现一个消息发送与消息接受功能程序
步骤:
- 1、创建producer,利用producer将某个目录下的所有文件名发送到指定topic,并由consumer来接收
- 2、 创建consumer进行消费
1、创建producer,利用producer将某个目录下的所有文件名发送到指定topic,并由consumer来接收
1 | from kafka import KafkaProducer |
2、创建consumer进行消费
1 | from kafka import KafkaConsumer |
High Available(高可用)
为什么需要高可用
- Kafka在0.8以前的版本中,并不提供High Availablity机制,一旦一个或多个Broker宕机,则宕机期间其上所有Partition都无法继续提供服务。若该Broker永远不能再恢复,亦或磁盘故障,则其上数据将丢失
- Kafka从0.8开始提供High Availability机制。本文从Data Replication和Leader Election两方面介绍了Kafka的HA机制。
Replication(副本)设计
作为消息中间件,数据的可靠性以及系统的可用性,必然依赖数据副本的设计。
Kafka的replica副本单元是topic的partition,一个partition的replica数量不能超过broker的数量,因为一个broker最多只会存储这个partition的一个副本。所有消息生产、消费请求都是由partition的leader replica来处理,其他follower replica负责从leader复制数据进行备份。
Replica均匀分布到整个集群,Replica的算法如下:
- 将所有Broker(假设共n个Broker)和待分配的Partition排序
- 将第i个Partition分配到第(i mod n)个Broker上
- 将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上
如图,TopicA有三个partition:part0、part1、part2,每个partition的replica数等于2(一个是leader,另一个是follower),按照以上算法会均匀落到三个broker上。
Leader Election(选举机制)
- 为什么需要Leader?
- 引入Replication之后,同一个Partition可能会有多个Replica,而这时需要在这些Replica中选出一个Leader,Producer和Consumer只与这个Leader交互,其它Replica作为Follower从Leader中复制数据。因为需要保证同一个Partition的多个Replica之间的数据一致性(其中一个宕机后其它Replica必须要能继续服务并且即不能造成数据重复也不能造成数据丢失)。
如果没有一个Leader,所有Replica都可同时读/写数据,那就需要保证多个Replica之间互相(N×N条通路)同步数据,数据的一致性和有序性非常难保证,大大增加了Replication实现的复杂性,同时也增加了出现异常的几率。而引入Leader后,只有Leader负责数据读写,Follower只向Leader顺序Fetch数据(N条通路),系统更加简单且高效。
kafka消息投递(delivery guarantee)
要确定Kafka的消息是否丢失或重复,从两个方面分析入手:消息发送和消息消费
- At most once——最多一次,消息可能会丢失,但不会重复
- At least once——最少一次,消息不会丢失,可能会重复
- Exactly once——只且一次,消息不丢失不重复,只且消费一次。
但是整体的消息投递语义需要Producer端和Consumer端两者来保证。
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 zoubinbf@163.com