Kafka--分布式消息队列
Kafka
分布式的发布-订阅消息系统。最初由 LinkedIn 公司开发,使用 Scala 语言编写,2010年12月份开源成为 Apache 项目的一部分。Kafka 是一个高吞吐量的、持久性的、分布式发布订阅消息系统。它主要用于处理活跃的数据(登录、浏览、点击、分享、喜欢等用户行为产生的数据)。
Kafka概述
设计
- 主要的设计元素:
- Kafka在设计之时为就将持久化消息作为通常的使用情况进行了考虑。
- 主要的设计约束是吞吐量而不是功能。
- 有关哪些数据已经被使用了的状态信息保存为数据使用者(consumer)的一部分,而不是保存在服务器之上。
- Kafka是一种显式的分布式系统。它假设,数据生产者(producer)、代理(brokers)和数据使用者(consumer)分散于多台机器之上。
- 在对消息进行存储和缓存时,Kafka严重地依赖于文件系统。所有数据都要立即写入文件系统持久化的日志中但不进行刷新数据的任何调用(有刷新策略,可以配置)。就意味着,数据被传输到OS内核的页面缓存中了,OS随后会将这些数据刷新到磁盘的。
特点
- 高吞吐量、低延迟:可以满足每秒百万级别消息的生产和消费、延迟最低只有几毫秒。
- 持久性:有一套完善的消息存储机制,确保数据的高效安全的持久化。
- 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
- 可扩展性:kafka集群支持热扩展。
- 高并发:支持数千个客户端同时读写。
服务
- broker: 缓存代理,kafka集群中的一个kafka服务节点称为一个broker,主要存储消息数据。不是主从关系,各个broker在集群中地位一样,可以随意的增删broker节点。
- topic: kafka 处理的消息的分类。
- partition: 一个topic在broker中被分为1个或者多个partition,分区在创建topic的时候指定。
- message: 消息(offset,MessageSize,data),是通信的基本单位,每个消息都属于一个partition。
- producer: 生产者,向kafka的一个topic发布消息。
- consumer: 消费者,订阅topic并处理其发布的消息。
- zookeeper: 协调kafka的正常运行。
- push-and-pull : Kafka中的Producer和consumer采用的是push-and-pull模式,即Producer只管向broker push消息,consumer只管从broker pull消息,两者对消息的生产和消费是异步的。
消息发送流程
- 1、Producer: 可以根据用户自定义算法来根据消息的key来计算输入到哪个partition。可以指定同步异步(producer.type),异步发送模式允许进行批量发送,先将消息缓存在内存中,然后一次请求批量发送出去。(queue.buffering.max.ms=5000、queue.buffering.max.messages=10000)。
- 2、broker 为了减少磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数达到一定阀值或者过了一定的时间间隔时,再flush到磁盘,这样减少了磁盘IO调用的次数。配置(log.flush.interval…、log.retention…默认7天)。
- 3、kafka: 集群接收到Producer发过来的消息后,将其持久化到硬盘,并保留消息指定时长(可配置),不关注消息是否被消费。
- 4、Consumer: 从kafka集群pull数据,并控制获取消息的offset。每个consumer属于一个consumer group,可以指定组id。Group.id。
Kafka安装
下载 https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.2.2/kafka_2.11-0.8.2.2.tgz
解压并修改目录
1
2[root@xxo07 ~]# tar -zxvf kafka_2.11-0.8.2.2.tgz -C /opt/ ##解压到指定目录
[root@xxo07 ~]# mv /opt/kafka_2.11-0.8.2.2/ /opt/kafka ##改一下目录名配置和启动zookeeper服务(使用kafka内置zk)
1
2
3
4
5
6
7
8###################启用zk服务 #############################
[root@xxo07 ~]# mkdir /opt/logs ##创建一个logs目录,用于存放日子文件
[root@xxo07 ~]# nohup /opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties > /opt/logs/kafka-zk.log 2>&1 & ##后台启动zk服务
[1] 1212
[root@xxo07 ~]# jps ##查看进程
1234 Jps
1212 QuorumPeerMain ##zk已经启动
###################查看日志:more /opt/logs/kafka-zk.log ######启动 Kafka-server
1
2
3
4
5
6
7[root@xxo07 ~]# nohup /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties > /opt/logs/kafka-server.log 2>&1 &
[2] 1244
[root@xxo07 ~]# jps ##查看进程
1289 Jps
1244 Kafka ##Kafka服务
1212 QuorumPeerMain
###################查看日志: more /opt/logs/kafka-server.log ######
kafka操作
主题topic
kafka-topics.sh 操作 如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64[root@xxo08 kafka]# bin/kafka-topics.sh ##看一下怎么使用,需要什么参数
Create, delete, describe, or change a topic.
Option Description
------ -----------
--alter Alter the configuration for the topic.
--config <name=value> A topic configuration override for the
topic being created or altered.The
following is a list of valid
configurations:
unclean.leader.election.enable
delete.retention.ms
segment.jitter.ms
retention.ms
flush.ms
segment.bytes
flush.messages
segment.ms
retention.bytes
cleanup.policy
segment.index.bytes
min.cleanable.dirty.ratio
max.message.bytes
file.delete.delay.ms
min.insync.replicas
index.interval.bytes
See the Kafka documentation for full
details on the topic configs.
--create Create a new topic.
--delete Delete a topic
--delete-config <name> A topic configuration override to be
removed for an existing topic (see
the list of configurations under the
--config option).
--describe List details for the given topics.
--help Print usage information.
--list List all available topics.
--partitions <Integer: # of partitions> The number of partitions for the topic
being created or altered (WARNING:
If partitions are increased for a
topic that has a key, the partition
logic or ordering of the messages
will be affected
--replica-assignment A list of manual partition-to-broker
<broker_id_for_part1_replica1 : assignments for the topic being
broker_id_for_part1_replica2 , created or altered.
broker_id_for_part2_replica1 :
broker_id_for_part2_replica2 , ...>
--replication-factor <Integer: The replication factor for each
replication factor> partition in the topic being created.
--topic <topic> The topic to be create, alter or
describe. Can also accept a regular
expression except for --create option
--topics-with-overrides if set when describing topics, only
show topics that have overridden
configs
--unavailable-partitions if set when describing topics, only
show partitions whose leader is not
available
--under-replicated-partitions if set when describing topics, only
show under replicated partitions
--zookeeper <urls> REQUIRED: The connection string for
the zookeeper connection in the form
host:port. Multiple URLS can be
given to allow fail-over.新增 一个topic
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15[root@xxo07 ~]# cd /opt/kafka/bin/ ##这里进入一下kafka/bin目录
################新增一个topic:"hello",为它分配一个分区,保存一个副本############
[root@xxo07 bin]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic hello
Created topic "hello".
################replication-factor不能大于broker数(这里我们只有一个partitions)#
[root@xxo07 bin]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic world
Error while executing topic command replication factor: 2 larger than available brokers: 1
kafka.admin.AdminOperationException: replication factor: 2 larger than available brokers: 1
at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:70)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:171)
at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:93)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:55)
at kafka.admin.TopicCommand.main(TopicCommand.scala)查询 topic
1
2
3
4
5
6
7
8
9[root@xxo07 bin]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic hello
Topic:hello PartitionCount:2 ReplicationFactor:1 Configs:
Topic: hello Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: hello Partition: 1 Leader: 0 Replicas: 0 Isr: 0
#########################查询所有可以使用的topic######################
[root@xxo07 bin]# kafka-topics.sh --list --zookeeper localhost:2181
hello
world修改 topic
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18#########################修改hello为2个分区#########################
[root@xxo07 bin]# kafka-topics.sh --alter --zookeeper localhost:2181 -partitions 3 --topic hello
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
########################修改partition数(只能增加)#################
[root@xxo07 bin]# kafka-topics.sh --alter --zookeeper localhost:2181 -partitions 2 --topic hello
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Error while executing topic command The number of partitions for a topic can only be increased
kafka.admin.AdminOperationException: The number of partitions for a topic can only be increased
at kafka.admin.AdminUtils$.addPartitions(AdminUtils.scala:114)
at kafka.admin.TopicCommand$$anonfun$alterTopic$1.apply(TopicCommand.scala:119)
at kafka.admin.TopicCommand$$anonfun$alterTopic$1.apply(TopicCommand.scala:100)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at kafka.admin.TopicCommand$.alterTopic(TopicCommand.scala:100)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:57)
at kafka.admin.TopicCommand.main(TopicCommand.scala)删除 topic
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29#######################删除world topic ############################
[root@xxo07 bin]# kafka-topics.sh --delete --zookeeper localhost:2181 --topic world
Topic world is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[root@xxo07 bin]# kafka-topics.sh --list --zookeeper localhost:2181
hello
world - marked for deletion ##可以看见还在,如果重启可以删除
###是否开启topic的删除功能:默认为false 修改delete.topic.enable=true可以不用重启
[root@xxo07 bin]# vim ../config/server.properties ###修改配置如下图
[root@xxo07 bin]# kafka-server-stop.sh ###关闭kafka
[root@xxo07 bin]# jps
1645 Jps
1212 QuorumPeerMain
[root@xxo07 bin]# nohup /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties > /opt/logs/kafka-server.log 2>&1 &
[2] 1665
[root@xxo07 bin]# jps #####################已经重新启动
1665 Kafka
1710 Jps
1212 QuorumPeerMain
[root@xxo07 bin]# kafka-topics.sh --list --zookeeper localhost:2181 ########查看,已经被删除了
hello
[root@xxo07 bin]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic world
Created topic "world". ######################重新创建了一个
[root@xxo07 bin]# kafka-topics.sh --delete --zookeeper localhost:2181 --topic world ###再次删除
Topic world is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true. ######这个为提示信息
[root@xxo07 bin]# kafka-topics.sh --list --zookeeper localhost:2181 ########查看,已经被删除了
hello
生产/消费
创建生产者 producer
1
2############broker-list:必须的参数,kafka的服务地址[多个用逗号隔开]#############
[root@xxo07 bin]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hello创建消费者 consumer
1
2
3
4
5
6
7###################Zookeeper:必须的参数,kafka的zk集群地址####################
############Topic\whitelist\blacklist:
############1、具体的单个topic
############2、多个白名单topic字符串[逗号隔开]。
############3、多个黑名单topic字符串[逗号隔开]。
[root@xxo07 bin]# kafka-console-consumer.sh --zookeeper localhost:2181 --topic hello --from-beginning
##################标记删除的topic也可以使用###################################实例
- 概念理解:
消费者组理解:很多consumer可以组成一个组,一个消息在组中只能被一个consumer消费,可以被不同的组消费。
消息持久化:Kafka中会把消息持久化到本地文件系统中,并且保持极高的效率。
消息状态:在Kafka中,消息的状态被保存在consumer中,broker不会关心哪个消息被消费了被谁消费了,只记录一个offset值(指向partition中下一个要被消费的消息位置)。
消息有效期:Kafka会长久(默认七天,一般公司生产环境中会设置很长)保留其中的消息,以便consumer可以多次消费。
批量发送:Kafka支持以消息集合为单位进行批量发送,以提高push效率。
同步异步:Producer采用异步push方式,极大提高Kafka系统的吞吐率(可以通过参数控制)。
分区机制partition:Kafka的broker端支持消息分区,Producer可以决定把消息发到哪个分区,在一个分区中消息的顺序就是Producer发送消息的顺序,一个主题中可以有多个分区,具体分区的数量是可配置的。分区的意义很重大,后面的内容会逐渐体现。
离线数据装载:Kafka由于对可拓展的数据持久化的支持,它也非常适合向Hadoop或者数据仓库中进行数据装载。
插件支持:现在不少活跃的社区已经开发出不少插件来拓展Kafka的功能,如用来配合Storm、Hadoop、flume相关的插件。