Spark Streaming--应用与实战(二)

然后就开始写代码了

  • 总体思路就是:
  1. put数据构造json数据,写入kafka;
  2. spark streaming任务启动后首先去zookeeper中去读取offset,组装成fromOffsets;
  3. spark streaming 获取到fromOffsets后通过KafkaUtils.createDirectStream去消费Kafka的数据;
  4. 读取kafka数据返回一个InputDStream的信息,foreachRDD遍历,同时记录读取到的offset到zk中;
  5. 写入数据到HBase
    详细一点的架构图

初始化与配置加载

  • 下面是一些接收参数,加载配置,获取配置中的topic,还有初始化配置,代码如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    //接收参数
    val Array(kafka_topic, timeWindow, maxRatePerPartition) = args

    //加载配置
    val prop: Properties = new Properties()
    prop.load(this.getClass().getResourceAsStream("/kafka.properties"))

    val groupName = prop.getProperty("group.id")

    //获取配置文件中的topic
    val kafkaTopics: String = prop.getProperty("kafka.topic." + kafka_topic)
    if (kafkaTopics == null || kafkaTopics.length <= 0) {
    System.err.println("Usage: KafkaDataStream <kafka_topic> is number from kafka.properties")
    System.exit(1)
    }

    val topics: Set[String] = kafkaTopics.split(",").toSet

    val kafkaParams = scala.collection.immutable.Map[String, String](
    "metadata.broker.list" -> prop.getProperty("bootstrap.servers"),
    "group.id" -> groupName,
    "auto.offset.reset" -> "largest")

    val kc = new KafkaCluster(kafkaParams)

    //初始化配置
    val sparkConf = new SparkConf()
    .setAppName(KafkaDataStream.getClass.getSimpleName + topics.toString())
    .set("spark.yarn.am.memory", prop.getProperty("am.memory"))
    .set("spark.yarn.am.memoryOverhead", prop.getProperty("am.memoryOverhead"))
    .set("spark.yarn.executor.memoryOverhead", prop.getProperty("executor.memoryOverhead"))
    .set("spark.streaming.kafka.maxRatePerPartition", maxRatePerPartition) //此处为每秒每个partition的条数
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .set("spark.reducer.maxSizeInFlight", "1m")

    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(timeWindow.toInt)) //多少秒处理一次请求

只是需要注意一下,这里的KafkaCluster,需要把源码拷贝过来,修改一下,因为里面有些方法是私有的。copy过来后改为public 即可。

链接ZK

  • 注意:这里的ZKStringSerializer,需要把源码拷贝过来,修改一下
    1
    2
    3
    //zk
    val zkClient = new ZkClient(prop.getProperty("zk.connect"), Integer.MAX_VALUE, 100000, ZKStringSerializer)
    val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())

组装fromOffsets

  • 组装fromOffsets,createDirectStream接收的是一个map的结构,所以可以支持多个topic的消费
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    var fromOffsets: Map[TopicAndPartition, Long] = Map() //多个partition的offset


    //支持多个topic : Set[String]
    topics.foreach(topicName => {

    //去brokers中获取partition数量,注意:新增partition后需要重启
    val children = zkClient.countChildren(ZkUtils.getTopicPartitionsPath(topicName))
    for (i <- 0 until children) {

    //kafka consumer 中是否有该partition的消费记录,如果没有设置为0
    val tp = TopicAndPartition(topicName, i)
    val path: String = s"${new ZKGroupTopicDirs(groupName, topicName).consumerOffsetDir}/$i"
    if (zkClient.exists(path)) {
    fromOffsets += (tp -> zkClient.readData[String](path).toLong)
    } else {
    fromOffsets += (tp -> 0)
    }
    }
    })

通过createDirectStream接受数据

  • 使用KafkaUtils里面的createDirectStream方法去消费kafka数据,createDirectStream使用的是kafka简单的Consumer API,所以需要自己去管理offset,我们把offset写入到zk中,这样也方便了一些监控软件读取记录
    1
    2
    3
    //创建Kafka持续读取流,通过zk中记录的offset
    val messages: InputDStream[(String, String)] =
    KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)

入库

  • 入库HBase

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24

    //数据操作
    messages.foreachRDD(rdd => {
    val offsetsList: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

    //data 处理
    rdd.foreachPartition(partitionRecords => {
    //TaskContext 上下文
    val offsetRange: OffsetRange = offsetsList(TaskContext.get.partitionId)
    logger.info(s"${offsetRange.topic} ${offsetRange.partition} ${offsetRange.fromOffset} ${offsetRange.untilOffset}")

    partitionRecords.foreach(data => {
    HBaseDao.insert(data)
    })

    //TopicAndPartition 主构造参数第一个是topic,第二个是Kafka partition id
    val topicAndPartition = TopicAndPartition(offsetRange.topic, offsetRange.partition)
    val either = kc.setConsumerOffsets(groupName, Map((topicAndPartition, offsetRange.untilOffset))) //是
    if (either.isLeft) {
    logger.info(s"Error updating the offset to Kafka cluster: ${either.left.get}")
    }
    })

    })
  • 插入数据到具体HBase数据库

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    /**
    *
    * 插入数据到 HBase
    *
    * 参数( tableName , json ) ):
    *
    * Json格式:
    * {
    * "rowKey": "00000-0",
    * "family:qualifier": "value",
    * "family:qualifier": "value",
    * ......
    * }
    *
    * @param data
    * @return
    */

    def insert(data: (String, String)): Boolean = {

    val t: HTable = getTable(data._1) //HTable
    try {
    val map: mutable.HashMap[String, Object] = JsonUtils.json2Map(data._2)
    val rowKey: Array[Byte] = String.valueOf(map.get("rowKey")).getBytes //rowKey
    val put = new Put(rowKey)

    for ((k, v) <- map) {
    val keys: Array[String] = k.split(":")
    if (keys.length == 2){
    put.addColumn(keys(0).getBytes, keys(1).getBytes, String.valueOf(v).getBytes)
    }
    }

    Try(t.put(put)).getOrElse(t.close())
    true
    } catch {
    case e: Exception =>
    e.printStackTrace()
    false
    }
    }

运行并查看结果

  • 运行命令:
    /opt/cloudera/parcels/CDH/bin/spark-submit --master yarn-client --class com.xiaoxiaomo.streaming.KafkaDataStream hspark-1.0.jar 1 3 1000
    运行后可以去spark UI中去查看相关运行情况,UI中具体细节见下篇博客
    Streaming Statistics数据统计图
    Completed Batches

当前网速较慢或者你使用的浏览器不支持博客特定功能,请尝试刷新或换用Chrome、Firefox等现代浏览器