OWL--监控系统实战三源码阅读

  本篇博客,主要是带你看看具体的源码,从源码的解读理解一下OWL这个监控系统,以及为后面我们做二次开发插件开发打下基础。OWL的一些介绍还可参考官方文档:OWL监控系统最佳实践.pdf

OWL的介绍

  来看看OWL监控系统最佳实践.pdf里面的一张结构图吧:
访问OpenTSDB

  刚开始要去了解OWL这么多组件,容易看花眼,各种曲线箭头。

  • 所以得把问题简单化,简单到我们分为三个部分
    1. 数据的采集(netcollect和agent)
    2. 数据的存储(TSDB和MySQL)
    3. 数据的查询(API和Controller)
  • 然后就只剩下Repeater和CFC:
    1. Repeater:接收监控采集的数据写到OpenTSDB
    2. CFC:维护一些元数据(WEB界面交互数据),然后保存在MySQL中,(主机状态、metric信息、插件列表)
  • 哈哈!这样是不是更好理解一点呢?接着往下看具体源码吧

一起来看看具体的源码

Agent数据的采集

  1. 首先看看agent的main.go,主要是做初始化,还有程序运行的一些入口

  2. 看一个配置文件的获取

  3. agent.SendConfig2CFC()发送配置信息到cfc

    上面看到的types.MessageType=types.MESS_POST_HOST_CONFIG,具体代码可以在handle中看到

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    case types.MESS_POST_HOST_CONFIG:
    host := &types.Host{}
    err = host.Decode(data[1:])
    if err != nil {
    lg.Error(err.Error())
    sess.Close()
    return
    }
    if h := mydb.GetHost(host.ID); h == nil { //主机不存在
    err := mydb.CreateHost(host.ID, host.SN, host.IP, host.Hostname, host.AgentVersion)
    if err != nil {
    lg.Error("create host error: %s", err.Error())
    } else {
    lg.Info("create host:%v", host)
    }
    } else if h.IP != host.IP ||
    h.Hostname != host.Hostname ||
    h.AgentVersion != host.AgentVersion ||
    h.SN != host.SN {
    lg.Info("update host: %v->%v", h, host)
    mydb.UpdateHost(host)
    }
  4. agent.GetPluginList() 获取插件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    	//获取需要执行的插件列表
    case types.MESS_GET_HOST_PLUGIN_LIST:
    host := &types.Host{}
    err = host.Decode(data[1:])
    if err != nil {
    return
    }
    sess.Send(types.Pack(types.MESS_GET_HOST_PLUGIN_LIST_RESP,
    &types.GetPluginResp{HostID: host.ID, Plugins: mydb.GetPlugins(host.ID)}),
    )
  5. agent.SendTSD2Repeater() 发送metrics信息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
           //repeater 发送数据
    case types.MESS_POST_TSD:
    repeater.buffer <- data[1:]

    //cfc 同步metric 到MySQL
    case types.MESS_POST_METRIC:
    cfg := types.MetricConfig{}
    err = cfg.Decode(data[1:])
    if err != nil {
    lg.Error(err.Error())
    sess.Close()
    }
    host := mydb.GetHost(cfg.HostID)
    if host == nil {
    return
    }
    // 这是去mysql里面查询metrics信息,如果不存在该metrics就创建
    // 在“设备列表”中点击“metrics数”看见的metric名就是这儿保存的
    if mydb.MetricIsExists(cfg.HostID, cfg.SeriesData.GetMetric()) {
    return
    }
    if err = mydb.CreateMetric(cfg.HostID, cfg.SeriesData); err != nil {
    lg.Error("create metric error %v", err.Error())
    }
  6. agent.SendAgentAlive2Repeater() & agent.SendHostAlive2CFC() 发送心跳报告

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    func (this *Agent) SendAgentAlive2Repeater() {
    go func() {
    for {
    this.SendChan <- types.TimeSeriesData{
    Metric: "agent.alive",
    DataType: "GAUGE",
    Value: 1,
    Cycle: 30,
    Timestamp: time.Now().Unix(),
    }
    time.Sleep(time.Second * 30)
    }
    }()
    }

    func (this *Agent) SendHostAlive2CFC() {
    go func() {
    for {
    if this.cfc.IsClosed() {
    time.Sleep(time.Second * 10)
    continue
    }
    this.cfc.Send(types.Pack(types.MESS_POST_HOST_ALIVE, NewHostConfig()))
    time.Sleep(time.Minute * 1)
    }
    }()
    }
  7. agent.RunBuiltinMetric() 具体metrics采集(之后的二次开发也是修改这些代码)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    // 看一个简单的LoadMetrics 主要实现就是组装TimeSeriesData数据
    func LoadMetrics(cycle int, ch chan types.TimeSeriesData) {
    for _, metric := range loadMetrics(cycle) {
    if metric == nil {
    continue
    }
    ch <- *metric
    }
    }

    func loadMetrics(cycle int) []*types.TimeSeriesData {
    cnt, err := load.Avg()
    if err != nil {
    return nil
    }
    ts := time.Now().Unix()
    metrics := make([]*types.TimeSeriesData, 3)

    metrics[0] = &types.TimeSeriesData{
    Metric: "sys.load1",
    Value: cnt.Load1,
    Cycle: cycle,
    Timestamp: ts,
    DataType: "GAUGE",
    }
    // ...
    return metrics
    }
  • 到这里我们agent的核心代码就看完了!

repeater核心代码

  1. main方法入口
    1
    go repeater.Forward()

写入数据到opentsdb
opentsdb具体写逻辑

cfc核心代码

  1. main方法入口
    1
    UpdatHostAive()

cfc

api核心代码

  1. 有必要看看api模块main里面的初始化
    api main

  2. InitServer
    api InitServer

  3. 我们可以直接调用api
    api InitServer

  4. api查询调用以及返回值,看一个query就够了

    1
    2
    3
    4
    query := authorized.Group("/query")
    {
    query.GET("", data)
    }


剩下源码

  • 看完前面的源码,总体思路我想应该是有了。剩下的源码,你们就自己去阅读吧,后面的也都很简单,因为go语言本身就有阅读简单的特点。
  • 运行起来看看具体的日志,更加容易理解
    owl log

当前网速较慢或者你使用的浏览器不支持博客特定功能,请尝试刷新或换用Chrome、Firefox等现代浏览器