Kafka--集群及API操作

  本篇博客,主要讲解kafka集群配置kafka的容错kafka扩展机制。以及使用kafka模拟一个网络流量实时统计来看一看Kafka的一些api使用方法。

kafka集群

集群搭建

  • 一、准备集群机器(这里使用3台):主机名xxo08、xxo09、xxo10(我把zookeeper集群也放在了这里)。
  • 二、搭建并启动 zookeeper集群

  • 三、修改配置文件

  1. /opt/kafka/config/server.properties broker.id=0  ##broker的id每一个broker应该不同
  2. log.dirs=/opt/kafka_logs  ##
  3. zookeeper.connect=xxo08:2181,xxo09:2181,xxo10:2181 ##注意:我的zokeeper也在这三台机器上
  • 四、同步其它节点
    scp -r /opt/kafka/ root@xxo09:/opt/
    scp -r /opt/kafka/ root@xxo10:/opt/
    并修改
    xxo09 /opt/kafka/config/server.properties broker.id=1  ##broker的id
    xxo10 /opt/kafka/config/server.properties broker.id=2  ##broker的id
  • 五、启动所有server
    nohup /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties >>/opt/logs/kafka-server.log 2>&1 &
  • 六、查看进程

    1
    2
    3
    4
    [root@xxo08 ~]# jps
    1337 QuorumPeerMain ##zookeeper
    1411 Jps
    1362 Kafka ##kafka
  • 总结: 对于broker来说是所有broker是独立的,由zk来协调管理。

容错和扩展机制

上篇博客Kafka-分布式消息队列/中我们使用了单机的kafka,在创建一个topic的时候,副本 replication-factor 指定为 1(我们只有一个broker也只能指定为1)。现在我们搭建了集群,三个kafka服务(3个broker)我们最多就能创建三个副本了。记住:kafka容错的保障是他的副本机制!

容错

如果,kafka集群中某个broker挂掉(fails),则zk会选择新的broker提供服务。

  • 下面来做一个测试,模拟节点宕机或挂掉,这里就强制kill掉某个kafka服务吧!
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    ########################1、创建一个hello的topic##################################
    [root@xxo08 kafka]# bin/kafka-topics.sh --create --zookeeper xxo08:2181,xxo09:2181,xxo10:2181 --replication-factor 3 --partition 4 --topic world
    Created topic "world".

    ########################2、查看一下hello (分布在4个partition中,每个P有三个备份)####
    [root@xxo08 kafka]# bin/kafka-topics.sh --describe --zookeeper xxo08:2181,xxo09:2181,xxo10:2181 --topic world
    Topic:world PartitionCount:4 ReplicationFactor:3 Configs:
    Topic: world Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
    Topic: world Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
    Topic: world Partition: 2 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
    Topic: world Partition: 3 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
    ###############PartitionCount: 当前topic分区数
    ###############ReplicationFactor: 当前topic副本数
    ###############Configs: 当前topic自定义配置信息
    ###############Partition: patition序号
    ###############Leader: Leader的brokerID
    ###############Replicas: 所有副本的brokerId
    ###############Isr: 所有存活(可用)的brokerId

    #######################3、下面我们来强制kill掉,brokerId等于2的#####################
    #######################注意我们在xxo10(brokerId=2)上操作#########################
    [root@xxo10 ~]# jps
    1270 Kafka
    1372 Jps
    1245 QuorumPeerMain
    [root@xxo10 ~]# kill -9 1270 ###########强制kill掉
    [root@xxo10 ~]# jps
    1245 QuorumPeerMain
    1382 Jps
    [root@xxo10 ~]# cd /opt/kafka

    #######################4、再来看一下topic 就会发现平衡机制已经重启分配partitoin######
    [root@xxo10 kafka]# bin/kafka-topics.sh --describe --zookeeper xxo08:2181,xxo09:2181,xxo10:2181 --topic world
    Topic:world PartitionCount:4 ReplicationFactor:3 Configs:
    Topic: world Partition: 0 Leader: 1 Replicas: 2,1,0 Isr: 1,0
    Topic: world Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,1
    Topic: world Partition: 2 Leader: 1 Replicas: 1,0,2 Isr: 1,0
    Topic: world Partition: 3 Leader: 0 Replicas: 2,0,1 Isr: 0,1

扩容

对于扩容,是公司中使用教多的,比如集群不够了,重新购买了服务器,需要把这台服务器加入到kafak集群中。新加入的这个节点zk会自动识别并让它在适当的机会选择此节点提供服务。

  1. 新加入的节点,配置信息(server.properties)记得保持相对一致
  2. 修改broker.idlog.dirszookeeper.connect
  3. 如果直接拷贝kafka,记得拷贝后新节点zk和kafka的存储路径应该把数据都删掉,防止数据不一致问题。因为zk的dataDir里面存储了zk和kafka的元数据信息,kafka的数据在kafka的Data目录就会多出一下奇怪的数据。
  • 下面我们就假如新加入了一个节点就是xxo10(brokerId=2),先来启动该节点
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    [root@xxo10 kafka]# nohup bin/kafka-server-start.sh config/server.properties >> ../logs/kafka-server.log 2>
    &1 &
    [1] 1412
    ##########1、启动后会发现这个节点不会是任何分区的leader,在Isr能看见它以存活########
    [root@xxo10 kafka]# bin/kafka-topics.sh --describe --zookeeper xxo08:2181,xxo09:2181,xxo10:2181 --topic world
    Topic:world PartitionCount:4 ReplicationFactor:3 Configs:
    Topic: world Partition: 0 Leader: 1 Replicas: 2,1,0 Isr: 1,0,2
    Topic: world Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,1,2
    Topic: world Partition: 2 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
    Topic: world Partition: 3 Leader: 0 Replicas: 2,0,1 Isr: 0,1,2

    ##########2、怎么重新均匀分配?########
    ##########2.1、Broker配置中的自动均衡策略######################################
    #####Auto.leader.rebalance.enable=true######################################
    #####leader.imbalance.check.interval.seconds 默认值:300#####################

    ##########2.2、手动配置执行###################################################
    ##########下面我们手动配置一下#################################################
    [root@xxo10 kafka]# bin/kafka-preferred-replica-election.sh --zookeeper xxo08:2181,xxo09:2181,xxo10:2181
    Successfully started preferred replica election for partitions Set([world,0], [world,2], [world,3], [world,1])
    [root@xxo10 kafka]# bin/kafka-topics.sh --describe --zookeeper xxo08:2181,xxo09:2181,xxo10:2181 --topic world
    Topic:world PartitionCount:4 ReplicationFactor:3 Configs:
    Topic: world Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 1,0,2
    Topic: world Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,1,2
    Topic: world Partition: 2 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
    Topic: world Partition: 3 Leader: 2 Replicas: 2,0,1 Isr: 0,1,2


    ####负载均衡,Kafka提供了一个 metadata API来管理broker之间的负载(对于0.7.x主要靠zookeeper来实现负载均衡)。

API操作

kafka使用了scala编写,但是提供了很多语言的API接口。比如scala、java、python、R、PHP等。
这里我就使用java来模拟一个网页流量的实时统计处理

  • 项目结构
    项目结构

  • 一、配置文件


1
2
3
4
5
6
7
8
9
10
11
################producer.properties##########################
metadata.broker.list=xxo08:9092,xxo09:9092,xxo10:9092
partitioner.class=com.xxo.kafka.MyPartition
producer.type=sync
compression.codec=none
serializer.class=kafka.serializer.StringEncoder

################consumer.properties##########################
zookeeper.connect=xxo08:2181,xxo08:2181,xxo08:2181
zookeeper.connection.timeout.ms=6000
group.id=xiaoxiaomo ###组ID
  • 二、生产者,模拟用户访问数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    package com.xxo.kafka;

    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;

    import java.io.IOException;
    import java.text.SimpleDateFormat;
    import java.util.*;

    /**
    *
    * 模拟生产用户访问数据
    * Created by xiaoxiaomo on 2016/5/14.
    */

    public class ProduceUserData {

    static Map<Integer,String> path = new HashMap<Integer, String>();
    static Map<Integer,Integer> userId = new HashMap<Integer, Integer>();

    public static void main(String[] args) throws IOException, InterruptedException {

    //1. 创建一个生产者对象
    Properties prop = new Properties();
    prop.load( ProduceUserData.class.getClassLoader().getResourceAsStream("producer.properties") );
    Producer<String, String> producer = new Producer<String, String>(new ProducerConfig( prop ));

    //2. 构造message
    String topic = "world" ; //world 主题
    List<KeyedMessage<String,String>> list = new ArrayList<KeyedMessage<String, String>>() ;

    //用户数据
    //init path
    path.put(0,"http://xiaoxiaomo.com/");
    path.put(1,"http://blog.xiaoxiaomo.com/");
    path.put(2,"http://blog.xiaoxiaomo.com/archives/");
    path.put(3,"http://blog.xiaoxiaomo.com/photo/");
    path.put(4,"http://blog.xiaoxiaomo.com/about/");

    //init userId
    userId.put(0,2010);
    userId.put(1,1001);
    userId.put(2,1002);
    userId.put(3,2001);
    userId.put(4,2002);
    userId.put(5,3003);
    userId.put(6,4004);
    userId.put(7,1007);
    userId.put(8,1008);
    userId.put(9,1009);


    Random random = new Random();
    while ( true ){
    //随机产生数据
    int pathIndex=random.nextInt(5);
    int userIndex=random.nextInt(10);
    int isVip=random.nextInt(2);

    //构造一个用户访问数据
    String visData = String.format("%s\t%s\t%s\t%s",
    userId.get(userIndex),
    path.get(pathIndex),
    new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()), isVip) ;

    //使用了pathIndex作为一个Key分区用
    list.add( new KeyedMessage<String, String>( topic ,String.valueOf(pathIndex) , visData ) );

    //3. 发送
    producer.send(list);

    Thread.sleep( random.nextInt(2000) );
    }

    //4. 关闭
    //producer.close();
    }
    }
  • 三、消费者,处理用户访问日志信息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    package com.xxo.kafka;

    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.message.MessageAndMetadata;

    import java.io.IOException;
    import java.util.*;

    /**
    * 消费者
    * 处理用户访问日志信息
    * Created by xiaoxiaomo on 2016/5/14.
    */

    public class ConsumerUserVisit {

    private static int times = 0;
    static Map<String,Integer> map = new HashMap<String, Integer>();

    public static void main(String[] args) throws IOException {

    //1. 创建消费者
    Properties prop = new Properties();
    prop.load( ConsumerUserVisit.class.getClassLoader().getResourceAsStream( "consumer.properties" ) );
    ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(prop));

    //2.
    String topic = "world" ;
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put( topic , 3 ) ; //这里启用了三个消费者线程

    Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streamList = streams.get(topic);
    for (KafkaStream<byte[], byte[]> stream : streamList) {
    ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
    new Thread( new RunConsumer( iterator ) ).start();
    }

    //处理访问数据
    Timer timer = new Timer(); //定时来打印一下信息
    timer.schedule(new TimerTask() {
    @Override
    public void run() {
    synchronized (map) {

    for (String key:map.keySet()){
    System.out.println(String.format("访问path: %s, 的次数: %s",key,map.get(key)));
    }
    System.out.println("总访问次数: "+times);
    map.clear();
    times=0;
    }
    }
    } , 0 ,5*1000 );

    }

    /**
    * 多线程处理数据,防阻塞
    */

    public static class RunConsumer implements Runnable{

    private ConsumerIterator<byte[], byte[]> iterator ;
    public RunConsumer(ConsumerIterator<byte[], byte[]> iterator) {
    this.iterator = iterator ;
    }

    public void run() {
    while ( iterator.hasNext() ){
    MessageAndMetadata<byte[], byte[]> next = iterator.next();
    String str = new String(next.message());
    String[] strs = str.split("\t");
    Integer key = map.get(strs[1]);
    synchronized ( map ) {
    int count = 1 ;
    if( key != null ){
    count += key ;
    }
    map.put( strs[1] , count ) ;
    times ++ ;
    }
    }
    }
    }
    }
  • 四、自定义Partition

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    package com.xxo.kafka;

    import kafka.producer.Partitioner;
    import kafka.utils.VerifiableProperties;

    /**
    * 自定义partition
    * Created by xiaoxiaomo on 2016/5/14.
    */

    public class MyPartition implements Partitioner {

    public MyPartition(VerifiableProperties verifiableProperties) {
    //记得要有这个构造函数,不然会报错!
    }

    public int partition(Object key, int numPartitions) {
    if( key == null ) return 0 ;
    Integer k = Integer.parseInt(key+"") ;
    return k % numPartitions;
    }
    }
  • 输出结果

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    总访问次数: 0
    总访问次数: 0
    访问path: http://xiaoxiaomo.com/, 的次数: 5
    访问path: http://blog.xiaoxiaomo.com/photo/, 的次数: 3
    访问path: http://blog.xiaoxiaomo.com/archives/, 的次数: 2
    总访问次数: 10
    访问path: http://xiaoxiaomo.com/, 的次数: 6
    访问path: http://blog.xiaoxiaomo.com/about/, 的次数: 3
    访问path: http://blog.xiaoxiaomo.com/photo/, 的次数: 3
    访问path: http://blog.xiaoxiaomo.com/archives/, 的次数: 6
    总访问次数: 18
    访问path: http://xiaoxiaomo.com/, 的次数: 6
    访问path: http://blog.xiaoxiaomo.com/, 的次数: 4
    访问path: http://blog.xiaoxiaomo.com/about/, 的次数: 3
    访问path: http://blog.xiaoxiaomo.com/photo/, 的次数: 5
    访问path: http://blog.xiaoxiaomo.com/archives/, 的次数: 9
    总访问次数: 27
    访问path: http://xiaoxiaomo.com/, 的次数: 24
    访问path: http://blog.xiaoxiaomo.com/, 的次数: 23
    访问path: http://blog.xiaoxiaomo.com/about/, 的次数: 16
    访问path: http://blog.xiaoxiaomo.com/photo/, 的次数: 21
    访问path: http://blog.xiaoxiaomo.com/archives/, 的次数: 32
    总访问次数: 116
  • 本博客所用源码下载地址:http://download.csdn.net/detail/tang__xuandong/9520209

当前网速较慢或者你使用的浏览器不支持博客特定功能,请尝试刷新或换用Chrome、Firefox等现代浏览器