OWL--监控系统实战五二次开发
二次开发? 当然就是修改源代码了。
为什么进行二次开发? 当然就是有新需求了。
- 需求的来由
因为,我们做的是一个多租户的平台,需要实现用户资源的隔离。
在资源隔离这一块,选择了对yarn的改造,我们知道在yarn的这一块已经对cpu和内存进行了隔离,但是还没有网络IO和磁盘IO资源的隔离。
然后我们就想着在监控系统采集这一层,去采集到用户维度的磁盘IO和网络IO。
所以,采集用户维度的资源(网络,磁盘,内存,cpu),不仅仅有利于我们下一步对yarn的二次开发,而且也有益于我们更细维度的监控。
监控系统实战六二次开发
思路
能不能直接获?
在linux里面有没有地方直接获取用户维度的信息(网络,磁盘,内存,cpu)?并没有!top
行吗?来看一看,发现并没有网络和磁盘相关的数据iostat
呢?发现也没有用户,直接能获取的这条路是走不通了。
想一下其他的途径
我们知道,在linux系统中我们这些东西(看到的所有东西)都是以文件的形式体现的,那在文件里是否有我们想要的数据呢?ls /proc
看一下,哈哈,感觉快找到我们想要的了。
东西在这儿,怎么拿走?
在/proc
下面有好多东西,现在就去分析一下拿我们想要的就行了。
暂时别急,发现只有总的信息,还有一堆数字(进程号),暂时还没法区分用户!(直接拿不到想要的)
通过进程获取
直接拿不到用户的资源信息,那就只能通过进程然后汇总间接获取了!ls /proc/1
看一下进程号为“1”的进程下面的信息,果然还是找到想要的了
用户和进程关系的映射
在进程下面算是能获取到资源了(网络,磁盘,内存,cpu),现在要做的就是让它与用户关联。cat /proc/1/status
,在status找到了用户相关的信息,Uid,Gid
优化用户和进程的映射
通过上面的分析,现在直观的思路就是:
- 在
/proc
下找到所有num(进程号) - 然后通过进程下面的status文件找到用户/组ID
- 把用户ID,拿到
/etc/passwd
去获取用户名称 - 组ID,去
/etc/group
去获取组名称 - 然后再去,做用户/组合进程的映射
- 在
这种方法能实现,但是过于复杂,性能差
我们换一种思路,先去获取用户,然后再通过用户去找进程,这种办法是不是更好呢?
- 最开始想通过
/run/user
来获取用户,发现并不全,最后通过ps
获取用户 ps -o ruser=userForLongName -e|sort| uniq
ps 排序去重就获取到了所有用户- 然后通过
ps -u userName
获取用户进程(这样关系就很清晰了)
- 最开始想通过
总结一下
- 通过
ps命令
组装用户和进程的关系 - 遍历映射关系,去
/proc/<pid>/
取数据 - 保存数据
- 通过
具体代码实现
准备开始coding
- 在之前的博客,我们已近分析了owl的源码,应该是有个整体的了解,以及我们需要修改哪儿的代码也应该是有一个比较清晰的思路了,我们再来看看入口:
获取用户与进程映射
定义一个映射结构体
1
2
3
4
5
6
7
8// 用户属性
type UserAttr struct {
Uid string
UName string
Gid string
GName string
Pid []string
}获取所有正在运行的用户
1
2
3
4
5//获取所有正在运行的用户
getUserCommand, err := ExecCommand("/bin/sh", "-c", "ps -o ruser=userForLongName -e |sort| uniq")
if err != nil {
return nil
}通过用户名,获取用户信息(uid,gid,uName,gName)
1
2
3
4
5//通过用户名,获取用户信息(uid,gid,uName,gName)
idCommand, err := ExecCommand("id", scannerUser.Text())
if err != nil {
continue
}通过用户名获取用户所有进程ID
1
2
3
4
5//用户所有进程ID
psCommand, err := ExecCommand("/bin/sh", "-c", "ps -u userName | awk '{if(NR>1){print $1}}'")
if err != nil {
continue
}获取用户信息以及用户所运行的进程关系,上面的代码整体后如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47// 获取用户信息以及用户所运行的进程
func getUserAttr() []UserAttr{
//获取所有正在运行的用户
getUserCommand, err := ExecCommand("/bin/sh", "-c", "ps -o ruser=userForLongName -e |sort| uniq")
// ......
userAttrs := make([]UserAttr, 0)
scannerUser := bufio.NewScanner(bytes.NewReader(getUserCommand))
for scannerUser.Scan() {
if scannerUser.Text() != "userForLongName" {
//通过用户名,获取用户信息(uid,gid,uName,gName)
idCommand, err := ExecCommand("id", scannerUser.Text())
// ......
scannerId := bufio.NewScanner(bytes.NewReader(idCommand))
for scannerId.Scan() {
//用户&&用户组信息
f1 := strings.Split(scannerId.Text(), " ")
user := strings.Split(f1[0], "=")[1] //用户信息
uidAndName := strings.Split(user, "(")
group := strings.Split(f1[1], "=")[1] //用户组信息
gidAndName := strings.Split(group, "(")
//用户所有进程ID
psCommand, err := ExecCommand("/bin/sh", "-c", "ps -u "+scannerUser.Text()+" | awk '{if(NR>1){print $1}}'")
// ......
scannerPs := bufio.NewScanner(bytes.NewReader(psCommand))
pid := []string{}
for scannerPs.Scan() {
pid = append(pid, scannerPs.Text())
}
userAttr := UserAttr{
Uid: uidAndName[0],
UName: scannerUser.Text(),
Gid: gidAndName[0],
GName: strings.TrimSuffix(gidAndName[1], ")"),
Pid: pid,
}
userAttrs = append(userAttrs, userAttr)
}
}
}
// ......
}
遍历进程号组装数据
组装好用户和进程的关系后,下面就是遍历关系,获取进程下面的(网络,磁盘,内存,cpu)
这里就不一一举例收集上面四个了,以网络为例来说明
网络数据在那个文件?
这个得确认好,笔者最开始是收集/proc/<pid>/net/snmp
下的tcp和udp等网络数据最后写完了发现不对!!
最后重写,收集/proc/<pid>/net/dev
下的网卡流量信息,cat /proc/1/net/dev
来看一看结构(网卡|接收|传输):定义数据结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15type NETPIDCountersStat struct {
RBytes uint64 // 表示收到的字节
RPackets uint64 // 表示收到正确的包量
RErrs uint64 // 表示收到错误的包量
RDrop uint64 // 表示收到丢弃的包量
TBytes uint64 // 表示发送的字节
TPackets uint64 // 表示发送正确的包量
TErrs uint64 // 表示发送错误的包量
TDrop uint64 // 表示发送丢弃的包量
UName string
GName string
NetCar string
}解析dev文件组装数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41func NETPIDCounters(netCarName string) (map[string]NETPIDCountersStat, error) {
// ......
for _, pid := range u.Pid {
data, err := ioutil.ReadFile("/proc/" + pid + "/net/dev")
if err != nil {
//打印异常但不停止程序 可能有些进程在获取到pid后获取信息时已经停止了
logs.Error(err.Error())
continue
}
scanner := bufio.NewScanner(bytes.NewReader(data))
d := NETPIDCountersStat{}
// 获取pid的具体io值
for scanner.Scan() {
f := strings.Split(scanner.Text(), ": ")
if( len(f) == 2 && strings.TrimSpace(f[0]) == netName) {
if( len(f) == 2 && strings.TrimSpace(f[0]) == v){
fields := strings.Fields(f[1]) //网络流量 数据
rBytes, err := strconv.ParseUint((fields[0]), 10, 64)
if err != nil {
return ret, err
}
d.RBytes = rBytes
//......
if d == empty {
continue
}
d.UName = name
d.GName = gName
d.NetCar = v
ret[v+pid] = d
}
}
}
}
// ......
}解析结构体数据,组装成
types.TimeSeriesData
数据1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27// 用户级别的网络流量
func netUserMetrics(netCarName string) []*types.TimeSeriesData {
cnt, err := netstat.NETPIDCounters(netCarName)
if err != nil {
return nil
}
ts := time.Now().Unix()
metrics := []*types.TimeSeriesData{}
for k, v := range cnt {
if NameNotAvalid(v.NetCar) {
continue
}
metrics = append(metrics,
&types.TimeSeriesData{
Metric: "net.user.rbytes",
Value: float64(v.RBytes),
Cycle: Cycle,
Timestamp: ts,
DataType: "COUNTER",
Tags: map[string]string{"user": v.UName, "group": v.GName, "iface": k},
},
// ......
}
// ......
}
遇到的问题
- 除了上面提到的最开始解析错文件,还遇到了一些问题
ps -aux |awk '{if(NR>1){print $1}}' |sort| uniq
想查找用户信息,结果显示不全
解决办法:http://www.linuxdiyf.com/linux/29239.html统计用户所有进程网络io后,然后想加和保存一个汇总的结果到数据库(当然很好的是数据量会小),结果引发了一系列问题
- 这样汇总统计后,发现root用户的流量统计出来太吓人了
- 直觉可能是自己单位换算有问题,或者是我之前的思路有问题?看了源码然后问了下作者发现没啥问题,单位都正常bytes。
- 于是就把每一个进程的详细记录值打印出来,查看了一下没有问题,差值不明显,看不出来结果!
- 然后又统计了和上一秒的差值(上一秒root用户总值 -减去 当前root用户总值),卧槽,怎么还有负数!后来上面的文本往下看(每次进程数不一样,左边为168行即168个进程,右边170行即170个进程,绿色的30706那个进程左边就没有!)!!
- 最后想着求一下均值,也是没法解决这个问题,果断放弃了汇总后保存结果,只能分别保存每一个pid的信息了
- 这样汇总统计后,发现root用户的流量统计出来太吓人了
Golang的类型也是一个坑,注意一下类型转换还有类似于这种(uint|int)的区别。
- 线上环境网卡的问题,因为线上网络的吞吐量非常大通常使用网卡bond把多个物理网卡绑定为一个逻辑网卡,需要确定统计那个网卡
优化
模拟单例模式做了一个25s的缓存
- 因为我们每一次都需要去调用获取用户与进程的关系,它们(网络,磁盘,内存,cpu)的数据组装是分开写的,所以每个Cycle(60s)时间内需要调用4次
- 然后自己就把获取进程信息那块代码做了一个缓存
1
2
3
4
5
6
7
8
9
10
11
12
13// 模拟单例模式做了一个25s的缓存
var u []UserAttr
var currTime int64
var lock *sync.Mutex = &sync.Mutex{}
func GetUserPidInfo() []UserAttr {
if u == nil || time.Now().Unix() - currTime > 25 {
lock.Lock()
defer lock.Unlock()
u = getUserAttr()
currTime = time.Now().Unix()
}
return u
}
多网卡灵活配置
- 通过配置,传入需要统计的网卡名称(多个网卡以逗号分隔),如果不传会统计指定的默认网卡
- 通过配置,传入需要统计的网卡名称(多个网卡以逗号分隔),如果不传会统计指定的默认网卡