Kafka--集群及API操作
本篇博客,主要讲解kafka集群配置、kafka的容错和kafka扩展机制。以及使用kafka模拟一个网络流量实时统计来看一看Kafka的一些api使用方法。
kafka集群
集群搭建
- 一、准备集群机器(这里使用3台):主机名xxo08、xxo09、xxo10(我把zookeeper集群也放在了这里)。
二、搭建并启动 zookeeper集群。
三、修改配置文件
- /opt/kafka/config/server.properties broker.id=0 ##broker的id每一个broker应该不同
- log.dirs=/opt/kafka_logs ##
- zookeeper.connect=xxo08:2181,xxo09:2181,xxo10:2181 ##注意:我的zokeeper也在这三台机器上
- 四、同步其它节点
scp -r /opt/kafka/ root@xxo09:/opt/
scp -r /opt/kafka/ root@xxo10:/opt/
并修改
xxo09 /opt/kafka/config/server.properties broker.id=1 ##broker的id
xxo10 /opt/kafka/config/server.properties broker.id=2 ##broker的id
- 五、启动所有server
nohup /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties >>/opt/logs/kafka-server.log 2>&1 &
六、查看进程
1
2
3
4[root@xxo08 ~]# jps
1337 QuorumPeerMain ##zookeeper
1411 Jps
1362 Kafka ##kafka总结: 对于broker来说是所有broker是独立的,由zk来协调管理。
容错和扩展机制
上篇博客Kafka-分布式消息队列/中我们使用了单机的kafka,在创建一个topic的时候,副本 replication-factor 指定为 1(我们只有一个broker也只能指定为1)。现在我们搭建了集群,三个kafka服务(3个broker)我们最多就能创建三个副本了。记住:kafka容错的保障是他的副本机制!
容错
如果,kafka集群中某个broker挂掉(fails),则zk会选择新的broker提供服务。
- 下面来做一个测试,模拟节点宕机或挂掉,这里就强制kill掉某个kafka服务吧!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38########################1、创建一个hello的topic##################################
[root@xxo08 kafka]# bin/kafka-topics.sh --create --zookeeper xxo08:2181,xxo09:2181,xxo10:2181 --replication-factor 3 --partitions 4 --topic world
Created topic "world".
########################2、查看一下hello (分布在4个partition中,每个P有三个备份)####
[root@xxo08 kafka]# bin/kafka-topics.sh --describe --zookeeper xxo08:2181,xxo09:2181,xxo10:2181 --topic world
Topic:world PartitionCount:4 ReplicationFactor:3 Configs:
Topic: world Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: world Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Topic: world Partition: 2 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: world Partition: 3 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
###############PartitionCount: 当前topic分区数
###############ReplicationFactor: 当前topic副本数
###############Configs: 当前topic自定义配置信息
###############Partition: patition序号
###############Leader: Leader的brokerID
###############Replicas: 所有副本的brokerId
###############Isr: 所有存活(可用)的brokerId
#######################3、下面我们来强制kill掉,brokerId等于2的#####################
#######################注意我们在xxo10(brokerId=2)上操作#########################
[root@xxo10 ~]# jps
1270 Kafka
1372 Jps
1245 QuorumPeerMain
[root@xxo10 ~]# kill -9 1270 ###########强制kill掉
[root@xxo10 ~]# jps
1245 QuorumPeerMain
1382 Jps
[root@xxo10 ~]# cd /opt/kafka
#######################4、再来看一下topic 就会发现平衡机制已经重启分配partitoin######
[root@xxo10 kafka]# bin/kafka-topics.sh --describe --zookeeper xxo08:2181,xxo09:2181,xxo10:2181 --topic world
Topic:world PartitionCount:4 ReplicationFactor:3 Configs:
Topic: world Partition: 0 Leader: 1 Replicas: 2,1,0 Isr: 1,0
Topic: world Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,1
Topic: world Partition: 2 Leader: 1 Replicas: 1,0,2 Isr: 1,0
Topic: world Partition: 3 Leader: 0 Replicas: 2,0,1 Isr: 0,1
扩容
对于扩容,是公司中使用教多的,比如集群不够了,重新购买了服务器,需要把这台服务器加入到kafak集群中。新加入的这个节点zk会自动识别并让它在适当的机会选择此节点提供服务。
- 新加入的节点,配置信息(server.properties)记得保持相对一致。
- 修改broker.id、log.dirs、zookeeper.connect
- 如果直接拷贝kafka,记得拷贝后新节点zk和kafka的存储路径应该把数据都删掉,防止数据不一致问题。因为zk的dataDir里面存储了zk和kafka的元数据信息,kafka的数据在kafka的Data目录就会多出一下奇怪的数据。
- 下面我们就假如新加入了一个节点就是xxo10(brokerId=2),先来启动该节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29[root@xxo10 kafka]# nohup bin/kafka-server-start.sh config/server.properties >> ../logs/kafka-server.log 2>
&1 &
[1] 1412
##########1、启动后会发现这个节点不会是任何分区的leader,在Isr能看见它以存活########
[root@xxo10 kafka]# bin/kafka-topics.sh --describe --zookeeper xxo08:2181,xxo09:2181,xxo10:2181 --topic world
Topic:world PartitionCount:4 ReplicationFactor:3 Configs:
Topic: world Partition: 0 Leader: 1 Replicas: 2,1,0 Isr: 1,0,2
Topic: world Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,1,2
Topic: world Partition: 2 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: world Partition: 3 Leader: 0 Replicas: 2,0,1 Isr: 0,1,2
##########2、怎么重新均匀分配?########
##########2.1、Broker配置中的自动均衡策略######################################
#####Auto.leader.rebalance.enable=true######################################
#####leader.imbalance.check.interval.seconds 默认值:300#####################
##########2.2、手动配置执行###################################################
##########下面我们手动配置一下#################################################
[root@xxo10 kafka]# bin/kafka-preferred-replica-election.sh --zookeeper xxo08:2181,xxo09:2181,xxo10:2181
Successfully started preferred replica election for partitions Set([world,0], [world,2], [world,3], [world,1])
[root@xxo10 kafka]# bin/kafka-topics.sh --describe --zookeeper xxo08:2181,xxo09:2181,xxo10:2181 --topic world
Topic:world PartitionCount:4 ReplicationFactor:3 Configs:
Topic: world Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 1,0,2
Topic: world Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,1,2
Topic: world Partition: 2 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: world Partition: 3 Leader: 2 Replicas: 2,0,1 Isr: 0,1,2
####负载均衡,Kafka提供了一个 metadata API来管理broker之间的负载(对于0.7.x主要靠zookeeper来实现负载均衡)。
API操作
kafka使用了scala编写,但是提供了很多语言的API接口。比如scala、java、python、R、PHP等。
这里我就使用java来模拟一个网页流量的实时统计处理:
项目结构:
一、配置文件
1 | ############### |
二、生产者,模拟用户访问数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78package com.xxo.kafka;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;
/**
*
* 模拟生产用户访问数据
* Created by xiaoxiaomo on 2016/5/14.
*/
public class ProduceUserData {
static Map<Integer,String> path = new HashMap<Integer, String>();
static Map<Integer,Integer> userId = new HashMap<Integer, Integer>();
public static void main(String[] args) throws IOException, InterruptedException {
//1. 创建一个生产者对象
Properties prop = new Properties();
prop.load( ProduceUserData.class.getClassLoader().getResourceAsStream("producer.properties") );
Producer<String, String> producer = new Producer<String, String>(new ProducerConfig( prop ));
//2. 构造message
String topic = "world" ; //world 主题
List<KeyedMessage<String,String>> list = new ArrayList<KeyedMessage<String, String>>() ;
//用户数据
//init path
path.put(0,"http://xiaoxiaomo.com/");
path.put(1,"http://blog.xiaoxiaomo.com/");
path.put(2,"http://blog.xiaoxiaomo.com/archives/");
path.put(3,"http://blog.xiaoxiaomo.com/photo/");
path.put(4,"http://blog.xiaoxiaomo.com/about/");
//init userId
userId.put(0,2010);
userId.put(1,1001);
userId.put(2,1002);
userId.put(3,2001);
userId.put(4,2002);
userId.put(5,3003);
userId.put(6,4004);
userId.put(7,1007);
userId.put(8,1008);
userId.put(9,1009);
Random random = new Random();
while ( true ){
//随机产生数据
int pathIndex=random.nextInt(5);
int userIndex=random.nextInt(10);
int isVip=random.nextInt(2);
//构造一个用户访问数据
String visData = String.format("%s\t%s\t%s\t%s",
userId.get(userIndex),
path.get(pathIndex),
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()), isVip) ;
//使用了pathIndex作为一个Key分区用
list.add( new KeyedMessage<String, String>( topic ,String.valueOf(pathIndex) , visData ) );
//3. 发送
producer.send(list);
Thread.sleep( random.nextInt(2000) );
}
//4. 关闭
//producer.close();
}
}三、消费者,处理用户访问日志信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88package com.xxo.kafka;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import java.io.IOException;
import java.util.*;
/**
* 消费者
* 处理用户访问日志信息
* Created by xiaoxiaomo on 2016/5/14.
*/
public class ConsumerUserVisit {
private static int times = 0;
static Map<String,Integer> map = new HashMap<String, Integer>();
public static void main(String[] args) throws IOException {
//1. 创建消费者
Properties prop = new Properties();
prop.load( ConsumerUserVisit.class.getClassLoader().getResourceAsStream( "consumer.properties" ) );
ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(prop));
//2.
String topic = "world" ;
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put( topic , 3 ) ; //这里启用了三个消费者线程
Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streamList = streams.get(topic);
for (KafkaStream<byte[], byte[]> stream : streamList) {
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
new Thread( new RunConsumer( iterator ) ).start();
}
//处理访问数据
Timer timer = new Timer(); //定时来打印一下信息
timer.schedule(new TimerTask() {
public void run() {
synchronized (map) {
for (String key:map.keySet()){
System.out.println(String.format("访问path: %s, 的次数: %s",key,map.get(key)));
}
System.out.println("总访问次数: "+times);
map.clear();
times=0;
}
}
} , 0 ,5*1000 );
}
/**
* 多线程处理数据,防阻塞
*/
public static class RunConsumer implements Runnable{
private ConsumerIterator<byte[], byte[]> iterator ;
public RunConsumer(ConsumerIterator<byte[], byte[]> iterator) {
this.iterator = iterator ;
}
public void run() {
while ( iterator.hasNext() ){
MessageAndMetadata<byte[], byte[]> next = iterator.next();
String str = new String(next.message());
String[] strs = str.split("\t");
Integer key = map.get(strs[1]);
synchronized ( map ) {
int count = 1 ;
if( key != null ){
count += key ;
}
map.put( strs[1] , count ) ;
times ++ ;
}
}
}
}
}四、自定义Partition
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21package com.xxo.kafka;
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
/**
* 自定义partition
* Created by xiaoxiaomo on 2016/5/14.
*/
public class MyPartition implements Partitioner {
public MyPartition(VerifiableProperties verifiableProperties) {
//记得要有这个构造函数,不然会报错!
}
public int partition(Object key, int numPartitions) {
if( key == null ) return 0 ;
Integer k = Integer.parseInt(key+"") ;
return k % numPartitions;
}
}输出结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23总访问次数: 0
总访问次数: 0
访问path: http://xiaoxiaomo.com/, 的次数: 5
访问path: http://blog.xiaoxiaomo.com/photo/, 的次数: 3
访问path: http://blog.xiaoxiaomo.com/archives/, 的次数: 2
总访问次数: 10
访问path: http://xiaoxiaomo.com/, 的次数: 6
访问path: http://blog.xiaoxiaomo.com/about/, 的次数: 3
访问path: http://blog.xiaoxiaomo.com/photo/, 的次数: 3
访问path: http://blog.xiaoxiaomo.com/archives/, 的次数: 6
总访问次数: 18
访问path: http://xiaoxiaomo.com/, 的次数: 6
访问path: http://blog.xiaoxiaomo.com/, 的次数: 4
访问path: http://blog.xiaoxiaomo.com/about/, 的次数: 3
访问path: http://blog.xiaoxiaomo.com/photo/, 的次数: 5
访问path: http://blog.xiaoxiaomo.com/archives/, 的次数: 9
总访问次数: 27
访问path: http://xiaoxiaomo.com/, 的次数: 24
访问path: http://blog.xiaoxiaomo.com/, 的次数: 23
访问path: http://blog.xiaoxiaomo.com/about/, 的次数: 16
访问path: http://blog.xiaoxiaomo.com/photo/, 的次数: 21
访问path: http://blog.xiaoxiaomo.com/archives/, 的次数: 32
总访问次数: 116本博客所用源码下载地址:http://download.csdn.net/detail/tang__xuandong/9520209