Flume--kafka source启动异常

  • 在flume中配置了一个kafka source的agent。如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    # 使用kafka作为一个sources

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # kfka source
    a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.r1.zookeeperConnect = xxo08:2181,xxo09:2181,xxo10:2181##zk集群
    a1.sources.r1.topic = word
    a1.sources.r1.groupId = flume
    a1.sources.r1.kafka.consumer.timeout.ms = 3000

    # Describe the sink
    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

报错

  • 启动一个agent的时候报了一个错:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    2016-05-20 01:11:07,710 (lifecycleSupervisor-1-3) [ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)] Unable to start PollableSourceRunner: { source:org.apache.flume.source.kafka.KafkaSource{name:r1,state:IDLE} counterGroup:{ name:null counters:{} } } - Exception follows.
    java.lang.NoClassDefFoundError: org/apache/zookeeper/Watcher
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:156)
    at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:114)
    at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:65)
    at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:67)
    at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100)
    at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
    at org.apache.flume.source.kafka.KafkaSourceUtil.getConsumer(KafkaSourceUtil.java:47)
    at org.apache.flume.source.kafka.KafkaSource.start(KafkaSource.java:200)
    at org.apache.flume.source.PollableSourceRunner.start(PollableSourceRunner.java:74)
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.ClassNotFoundException: org.apache.zookeeper.Watcher
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    ... 29 more
  • 具体启动信息如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    [root@xxo08 flume]# bin/flume-ng agent --conf conf/ --conf-file conf/kafka_source.conf --name a1 -Dflume.root.logger=INFO,console
    Info: Sourcing environment configuration script /opt/flume/conf/flume-env.sh
    Info: Including Hive libraries found via () for Hive access
    + exec /usr/local/jdk1.7.0_79/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp '/opt/flume/conf:/opt/flume/lib/*:/lib/*' -Djava.library.path= org.apache.flume.node.Application --conf-file conf/kafka_source.conf --name a1
    2016-05-20 01:11:06,943 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:61)] Configuration provider starting
    2016-05-20 01:11:06,949 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)] Reloading configuration file:conf/kafka_source.conf
    2016-05-20 01:11:06,960 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:931)] Added sinks: k1 Agent: a1
    2016-05-20 01:11:06,961 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:k1
    2016-05-20 01:11:06,962 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)] Processing:k1
    2016-05-20 01:11:06,981 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:141)] Post-validation flume configuration contains configuration for agents: [a1]
    2016-05-20 01:11:06,982 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:145)] Creating channels
    2016-05-20 01:11:06,994 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)] Creating instance of channel c1 type memory
    2016-05-20 01:11:07,001 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:200)] Created channel c1
    2016-05-20 01:11:07,003 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)] Creating instance of source r1, type org.apache.flume.source.kafka.KafkaSource
    2016-05-20 01:11:07,011 (conf-file-poller-0) [INFO - org.apache.flume.source.kafka.KafkaSourceUtil.getKafkaProperties(KafkaSourceUtil.java:37)] context={ parameters:{topic=hello, groupId=flume, zookeeperConnect=xxo08:2181,xxo09:2181,xxo10:2181, kafka.consumer.timeout.ms=3000, channels=c1, type=org.apache.flume.source.kafka.KafkaSource} }
    2016-05-20 01:11:07,051 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] Creating instance of sink: k1, type: logger
    2016-05-20 01:11:07,057 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:114)] Channel c1 connected to [r1, k1]
    2016-05-20 01:11:07,071 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:138)] Starting new configuration:{ sourceRunners:{r1=PollableSourceRunner: { source:org.apache.flume.source.kafka.KafkaSource{name:r1,state:IDLE} counterGroup:{ name:null counters:{} } }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@4ab9ba02 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
    2016-05-20 01:11:07,071 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:145)] Starting Channel c1
    2016-05-20 01:11:07,140 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
    2016-05-20 01:11:07,142 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: CHANNEL, name: c1 started
    2016-05-20 01:11:07,143 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:173)] Starting Sink k1
    2016-05-20 01:11:07,148 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Starting Source r1
    2016-05-20 01:11:07,149 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.kafka.KafkaSource.start(KafkaSource.java:196)] Starting org.apache.flume.source.kafka.KafkaSource{name:r1,state:IDLE}...
    2016-05-20 01:11:07,601 (lifecycleSupervisor-1-3) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Verifying properties
    2016-05-20 01:11:07,663 (lifecycleSupervisor-1-3) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Property auto.commit.enable is overridden to false
    2016-05-20 01:11:07,663 (lifecycleSupervisor-1-3) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Property consumer.timeout.ms is overridden to 3000
    2016-05-20 01:11:07,663 (lifecycleSupervisor-1-3) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Property group.id is overridden to flume
    2016-05-20 01:11:07,664 (lifecycleSupervisor-1-3) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Property zookeeper.connect is overridden to xxo08:2181,xxo09:2181,xxo10:2181
    2016-05-20 01:11:07,708 (lifecycleSupervisor-1-3) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] [flume_xxo08-1463677867706-e244b0f9], Connecting to zookeeper instance at xxo08:2181,xxo09:2181,xxo10:2181
    2016-05-20 01:11:07,710 (lifecycleSupervisor-1-3) [ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)] Unable to start PollableSourceRunner: { source:org.apache.flume.source.kafka.KafkaSource{name:r1,state:IDLE} counterGroup:{ name:null counters:{} } } - Exception follows.
    java.lang.NoClassDefFoundError: org/apache/zookeeper/Watcher
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:800)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:449)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:156)
    at kafka.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:114)
    at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:65)
    at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:67)
    at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:100)
    at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
    at org.apache.flume.source.kafka.KafkaSourceUtil.getConsumer(KafkaSourceUtil.java:47)
    at org.apache.flume.source.kafka.KafkaSource.start(KafkaSource.java:200)
    at org.apache.flume.source.PollableSourceRunner.start(PollableSourceRunner.java:74)
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.ClassNotFoundException: org.apache.zookeeper.Watcher
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    ... 29 more
    2016-05-20 01:11:07,719 (lifecycleSupervisor-1-3) [ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:264)] Unsuccessful attempt to shutdown component: {} due to missing dependencies. Please shutdown the agentor disable this component, or the agent will bein an undefined state.
    java.lang.NullPointerException
    at org.apache.flume.source.PollableSourceRunner$PollingRunner.access$200(PollableSourceRunner.java:125)
    at org.apache.flume.source.PollableSourceRunner.stop(PollableSourceRunner.java:93)
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:259)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
    2016-05-20 01:11:10,723 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:233)] Component PollableSourceRunner: { source:org.apache.flume.source.kafka.KafkaSource{name:r1,state:IDLE} counterGroup:{ name:null counters:{} } } is in error state, and Flume will notattempt to change its state
    2016-05-20 01:11:13,726 (lifecycleSupervisor-1-2) [INFO - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:233)] Component PollableSourceRunner: { source:org.apache.flume.source.kafka.KafkaSource{name:r1,state:IDLE} counterGroup:{ name:null counters:{} } } is in error state, and Flume will notattempt to change its state
    ^C2016-05-20 01:11:14,610 (agent-shutdown-hook) [INFO - org.apache.flume.lifecycle.LifecycleSupervisor.stop(LifecycleSupervisor.java:79)] Stopping lifecycle supervisor 10
    2016-05-20 01:11:14,615 (agent-shutdown-hook) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.stop(PollingPropertiesFileConfigurationProvider.java:83)] Configuration provider stopping
    2016-05-20 01:11:14,616 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:150)] Component type: CHANNEL, name: c1 stopped
    2016-05-20 01:11:14,617 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:156)] Shutdown Metric for type: CHANNEL, name: c1. channel.start.time == 1463677867142
    2016-05-20 01:11:14,617 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:162)] Shutdown Metric for type: CHANNEL, name: c1. channel.stop.time == 1463677874616
    2016-05-20 01:11:14,618 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:178)] Shutdown Metric for type: CHANNEL, name: c1. channel.capacity == 1000
    2016-05-20 01:11:14,618 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:178)] Shutdown Metric for type: CHANNEL, name: c1. channel.current.size == 0
    2016-05-20 01:11:14,618 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:178)] Shutdown Metric for type: CHANNEL, name: c1. channel.event.put.attempt == 0
    2016-05-20 01:11:14,618 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:178)] Shutdown Metric for type: CHANNEL, name: c1. channel.event.put.success == 0
    2016-05-20 01:11:14,619 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:178)] Shutdown Metric for type: CHANNEL, name: c1. channel.event.take.attempt == 2
    2016-05-20 01:11:14,620 (agent-shutdown-hook) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.stop(MonitoredCounterGroup.java:178)] Shutdown Metric for type: CHANNEL, name: c1. channel.event.take.success == 0

原因

  • 缺少了zookeeper-**.jar,博主这里的为:zookeeper-3.4.6.jar**

解决办法

  • 三种解决办法
  1. 把 zookeeper-3.4.6.jar 拷贝到${FLUME_HOME}/lib下面,比如我的flume在/opt下面,我需要拷贝到/opt/flume/lib/下面。
  2. 配置 FLUME_CLASSPATH=”zookeeper jar 的地址“(默认注释掉了),例如:FLUME_CLASSPATH="/opt/zookeeper/zookeeper-3.4.6.jar"(即指定到我zk下面的jar)。
  3. 博主测试时,机器中同时安装了 hadoop 也是可以的,因为 hadoop lib 下面默认带了 zk 的jar。

        

当前网速较慢或者你使用的浏览器不支持博客特定功能,请尝试刷新或换用Chrome、Firefox等现代浏览器