Spark Streaming--应用与实战(二)
然后就开始写代码了
- 总体思路就是:
- put数据构造json数据,写入kafka;
- spark streaming任务启动后首先去zookeeper中去读取offset,组装成fromOffsets;
- spark streaming 获取到fromOffsets后通过KafkaUtils.createDirectStream去消费Kafka的数据;
- 读取kafka数据返回一个InputDStream的信息,foreachRDD遍历,同时记录读取到的offset到zk中;
- 写入数据到HBase
初始化与配置加载
- 下面是一些接收参数,加载配置,获取配置中的topic,还有初始化配置,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37//接收参数
val Array(kafka_topic, timeWindow, maxRatePerPartition) = args
//加载配置
val prop: Properties = new Properties()
prop.load(this.getClass().getResourceAsStream("/kafka.properties"))
val groupName = prop.getProperty("group.id")
//获取配置文件中的topic
val kafkaTopics: String = prop.getProperty("kafka.topic." + kafka_topic)
if (kafkaTopics == null || kafkaTopics.length <= 0) {
System.err.println("Usage: KafkaDataStream <kafka_topic> is number from kafka.properties")
System.exit(1)
}
val topics: Set[String] = kafkaTopics.split(",").toSet
val kafkaParams = scala.collection.immutable.Map[String, String](
"metadata.broker.list" -> prop.getProperty("bootstrap.servers"),
"group.id" -> groupName,
"auto.offset.reset" -> "largest")
val kc = new KafkaCluster(kafkaParams)
//初始化配置
val sparkConf = new SparkConf()
.setAppName(KafkaDataStream.getClass.getSimpleName + topics.toString())
.set("spark.yarn.am.memory", prop.getProperty("am.memory"))
.set("spark.yarn.am.memoryOverhead", prop.getProperty("am.memoryOverhead"))
.set("spark.yarn.executor.memoryOverhead", prop.getProperty("executor.memoryOverhead"))
.set("spark.streaming.kafka.maxRatePerPartition", maxRatePerPartition) //此处为每秒每个partition的条数
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.reducer.maxSizeInFlight", "1m")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(timeWindow.toInt)) //多少秒处理一次请求
只是需要注意一下,这里的KafkaCluster,需要把源码拷贝过来,修改一下,因为里面有些方法是私有的。copy过来后改为public 即可。
链接ZK
- 注意:这里的ZKStringSerializer,需要把源码拷贝过来,修改一下
1
2
3//zk
val zkClient = new ZkClient(prop.getProperty("zk.connect"), Integer.MAX_VALUE, 100000, ZKStringSerializer)
val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())
组装fromOffsets
- 组装fromOffsets,createDirectStream接收的是一个map的结构,所以可以支持多个topic的消费
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20var fromOffsets: Map[TopicAndPartition, Long] = Map() //多个partition的offset
//支持多个topic : Set[String]
topics.foreach(topicName => {
//去brokers中获取partition数量,注意:新增partition后需要重启
val children = zkClient.countChildren(ZkUtils.getTopicPartitionsPath(topicName))
for (i <- 0 until children) {
//kafka consumer 中是否有该partition的消费记录,如果没有设置为0
val tp = TopicAndPartition(topicName, i)
val path: String = s"${new ZKGroupTopicDirs(groupName, topicName).consumerOffsetDir}/$i"
if (zkClient.exists(path)) {
fromOffsets += (tp -> zkClient.readData[String](path).toLong)
} else {
fromOffsets += (tp -> 0)
}
}
})
通过createDirectStream接受数据
- 使用KafkaUtils里面的createDirectStream方法去消费kafka数据,createDirectStream使用的是kafka简单的Consumer API,所以需要自己去管理offset,我们把offset写入到zk中,这样也方便了一些监控软件读取记录
1
2
3//创建Kafka持续读取流,通过zk中记录的offset
val messages: InputDStream[(String, String)] =
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
入库
入库HBase
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//数据操作
messages.foreachRDD(rdd => {
val offsetsList: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//data 处理
rdd.foreachPartition(partitionRecords => {
//TaskContext 上下文
val offsetRange: OffsetRange = offsetsList(TaskContext.get.partitionId)
logger.info(s"${offsetRange.topic} ${offsetRange.partition} ${offsetRange.fromOffset} ${offsetRange.untilOffset}")
partitionRecords.foreach(data => {
HBaseDao.insert(data)
})
//TopicAndPartition 主构造参数第一个是topic,第二个是Kafka partition id
val topicAndPartition = TopicAndPartition(offsetRange.topic, offsetRange.partition)
val either = kc.setConsumerOffsets(groupName, Map((topicAndPartition, offsetRange.untilOffset))) //是
if (either.isLeft) {
logger.info(s"Error updating the offset to Kafka cluster: ${either.left.get}")
}
})
})插入数据到具体HBase数据库
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40/**
*
* 插入数据到 HBase
*
* 参数( tableName , json ) ):
*
* Json格式:
* {
* "rowKey": "00000-0",
* "family:qualifier": "value",
* "family:qualifier": "value",
* ......
* }
*
* @param data
* @return
*/
def insert(data: (String, String)): Boolean = {
val t: HTable = getTable(data._1) //HTable
try {
val map: mutable.HashMap[String, Object] = JsonUtils.json2Map(data._2)
val rowKey: Array[Byte] = String.valueOf(map.get("rowKey")).getBytes //rowKey
val put = new Put(rowKey)
for ((k, v) <- map) {
val keys: Array[String] = k.split(":")
if (keys.length == 2){
put.addColumn(keys(0).getBytes, keys(1).getBytes, String.valueOf(v).getBytes)
}
}
Try(t.put(put)).getOrElse(t.close())
true
} catch {
case e: Exception =>
e.printStackTrace()
false
}
}
运行并查看结果
- 运行命令:
/opt/cloudera/parcels/CDH/bin/spark-submit --master yarn-client --class com.xiaoxiaomo.streaming.KafkaDataStream hspark-1.0.jar 1 3 1000
运行后可以去spark UI中去查看相关运行情况,UI中具体细节见下篇博客