OWL--监控系统实战三源码阅读
本篇博客,主要是带你看看具体的源码,从源码的解读理解一下OWL这个监控系统,以及为后面我们做二次开发和插件开发打下基础。OWL的一些介绍还可参考官方文档:OWL监控系统最佳实践.pdf
OWL的介绍
来看看OWL监控系统最佳实践.pdf里面的一张结构图吧:
刚开始要去了解OWL这么多组件,容易看花眼,各种曲线箭头。
- 所以得把问题简单化,简单到我们分为三个部分:
- 数据的采集(netcollect和agent)
- 数据的存储(TSDB和MySQL)
- 数据的查询(API和Controller)
- 然后就只剩下Repeater和CFC:
- Repeater:接收监控采集的数据写到OpenTSDB
- CFC:维护一些元数据(WEB界面交互数据),然后保存在MySQL中,(主机状态、metric信息、插件列表)
- 哈哈!这样是不是更好理解一点呢?接着往下看具体源码吧
一起来看看具体的源码
Agent数据的采集
首先看看agent的main.go,主要是做初始化,还有程序运行的一些入口
看一个配置文件的获取
agent.SendConfig2CFC()发送配置信息到cfc
上面看到的types.MessageType=types.MESS_POST_HOST_CONFIG
,具体代码可以在handle中看到1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22case types.MESS_POST_HOST_CONFIG:
host := &types.Host{}
err = host.Decode(data[1:])
if err != nil {
lg.Error(err.Error())
sess.Close()
return
}
if h := mydb.GetHost(host.ID); h == nil { //主机不存在
err := mydb.CreateHost(host.ID, host.SN, host.IP, host.Hostname, host.AgentVersion)
if err != nil {
lg.Error("create host error: %s", err.Error())
} else {
lg.Info("create host:%v", host)
}
} else if h.IP != host.IP ||
h.Hostname != host.Hostname ||
h.AgentVersion != host.AgentVersion ||
h.SN != host.SN {
lg.Info("update host: %v->%v", h, host)
mydb.UpdateHost(host)
}agent.GetPluginList() 获取插件
1
2
3
4
5
6
7
8
9
10//获取需要执行的插件列表
case types.MESS_GET_HOST_PLUGIN_LIST:
host := &types.Host{}
err = host.Decode(data[1:])
if err != nil {
return
}
sess.Send(types.Pack(types.MESS_GET_HOST_PLUGIN_LIST_RESP,
&types.GetPluginResp{HostID: host.ID, Plugins: mydb.GetPlugins(host.ID)}),
)agent.SendTSD2Repeater() 发送metrics信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24//repeater 发送数据
case types.MESS_POST_TSD:
repeater.buffer <- data[1:]
//cfc 同步metric 到MySQL
case types.MESS_POST_METRIC:
cfg := types.MetricConfig{}
err = cfg.Decode(data[1:])
if err != nil {
lg.Error(err.Error())
sess.Close()
}
host := mydb.GetHost(cfg.HostID)
if host == nil {
return
}
// 这是去mysql里面查询metrics信息,如果不存在该metrics就创建
// 在“设备列表”中点击“metrics数”看见的metric名就是这儿保存的
if mydb.MetricIsExists(cfg.HostID, cfg.SeriesData.GetMetric()) {
return
}
if err = mydb.CreateMetric(cfg.HostID, cfg.SeriesData); err != nil {
lg.Error("create metric error %v", err.Error())
}agent.SendAgentAlive2Repeater() & agent.SendHostAlive2CFC() 发送心跳报告
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27func (this *Agent) SendAgentAlive2Repeater() {
go func() {
for {
this.SendChan <- types.TimeSeriesData{
Metric: "agent.alive",
DataType: "GAUGE",
Value: 1,
Cycle: 30,
Timestamp: time.Now().Unix(),
}
time.Sleep(time.Second * 30)
}
}()
}
func (this *Agent) SendHostAlive2CFC() {
go func() {
for {
if this.cfc.IsClosed() {
time.Sleep(time.Second * 10)
continue
}
this.cfc.Send(types.Pack(types.MESS_POST_HOST_ALIVE, NewHostConfig()))
time.Sleep(time.Minute * 1)
}
}()
}agent.RunBuiltinMetric() 具体metrics采集(之后的二次开发也是修改这些代码)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28// 看一个简单的LoadMetrics 主要实现就是组装TimeSeriesData数据
func LoadMetrics(cycle int, ch chan types.TimeSeriesData) {
for _, metric := range loadMetrics(cycle) {
if metric == nil {
continue
}
ch <- *metric
}
}
func loadMetrics(cycle int) []*types.TimeSeriesData {
cnt, err := load.Avg()
if err != nil {
return nil
}
ts := time.Now().Unix()
metrics := make([]*types.TimeSeriesData, 3)
metrics[0] = &types.TimeSeriesData{
Metric: "sys.load1",
Value: cnt.Load1,
Cycle: cycle,
Timestamp: ts,
DataType: "GAUGE",
}
// ...
return metrics
}
- 到这里我们agent的核心代码就看完了!
repeater核心代码
- main方法入口
1
go repeater.Forward()
cfc核心代码
- main方法入口
1
UpdatHostAive()
api核心代码
有必要看看api模块main里面的初始化
InitServer
我们可以直接调用api
api查询调用以及返回值,看一个query就够了
1
2
3
4query := authorized.Group("/query")
{
query.GET("", data)
}
剩下源码
- 看完前面的源码,总体思路我想应该是有了。剩下的源码,你们就自己去阅读吧,后面的也都很简单,因为go语言本身就有阅读简单的特点。
- 运行起来看看具体的日志,更加容易理解