OWL--监控系统实战五二次开发

  二次开发? 当然就是修改源代码了。
  为什么进行二次开发? 当然就是有新需求了。

  • 需求的来由

    因为,我们做的是一个多租户的平台,需要实现用户资源的隔离。
    在资源隔离这一块,选择了对yarn的改造,我们知道在yarn的这一块已经对cpu和内存进行了隔离,但是还没有网络IO和磁盘IO资源的隔离。
    然后我们就想着在监控系统采集这一层,去采集到用户维度的磁盘IO和网络IO。
    所以,采集用户维度的资源(网络,磁盘,内存,cpu),不仅仅有利于我们下一步对yarn的二次开发,而且也有益于我们更细维度的监控。

监控系统实战六二次开发

思路

能不能直接获?

在linux里面有没有地方直接获取用户维度的信息(网络,磁盘,内存,cpu)?并没有!
top行吗?来看一看,发现并没有网络和磁盘相关的数据
top
iostat呢?发现也没有用户,直接能获取的这条路是走不通了。
iostat

想一下其他的途径

我们知道,在linux系统中我们这些东西(看到的所有东西)都是以文件的形式体现的,那在文件里是否有我们想要的数据呢?
ls /proc 看一下,哈哈,感觉快找到我们想要的了。
ls /proc

东西在这儿,怎么拿走?

/proc下面有好多东西,现在就去分析一下拿我们想要的就行了。
暂时别急,发现只有总的信息,还有一堆数字(进程号),暂时还没法区分用户!(直接拿不到想要的)

通过进程获取

直接拿不到用户的资源信息,那就只能通过进程然后汇总间接获取了!
ls /proc/1看一下进程号为“1”的进程下面的信息,果然还是找到想要的了
ls /proc/1

用户和进程关系的映射

在进程下面算是能获取到资源了(网络,磁盘,内存,cpu),现在要做的就是让它与用户关联。
cat /proc/1/status,在status找到了用户相关的信息,Uid,Gid
/proc/1/status

优化用户和进程的映射

  1. 通过上面的分析,现在直观的思路就是

    • /proc下找到所有num(进程号)
    • 然后通过进程下面的status文件找到用户/组ID
    • 把用户ID,拿到/etc/passwd去获取用户名称
    • 组ID,去/etc/group去获取组名称
    • 然后再去,做用户/组合进程的映射
  2. 这种方法能实现,但是过于复杂性能差

  3. 我们换一种思路,先去获取用户,然后再通过用户去找进程,这种办法是不是更好呢?

    • 最开始想通过/run/user来获取用户,发现并不全,最后通过ps获取用户
      ps -aux
    • ps -o ruser=userForLongName -e|sort| uniq ps 排序去重就获取到了所有用户
      ps -o ruser=userForLongName -e |sort| uniq
    • 然后通过ps -u userName 获取用户进程(这样关系就很清晰了)
      ps -u hadoop | awk '{if(NR>1){print $1}}'
  4. 总结一下

    • 通过ps命令组装用户和进程的关系
    • 遍历映射关系,去/proc/<pid>/取数据
    • 保存数据

具体代码实现

准备开始coding

  • 在之前的博客,我们已近分析了owl的源码,应该是有个整体的了解,以及我们需要修改哪儿的代码也应该是有一个比较清晰的思路了,我们再来看看入口:
    owl二次开发程序入口

获取用户与进程映射

  1. 定义一个映射结构体

    1
    2
    3
    4
    5
    6
    7
    8
    // 用户属性
    type UserAttr struct {
    Uid string
    UName string
    Gid string
    GName string
    Pid []string
    }
  2. 获取所有正在运行的用户

    1
    2
    3
    4
    5
    //获取所有正在运行的用户
    getUserCommand, err := ExecCommand("/bin/sh", "-c", "ps -o ruser=userForLongName -e |sort| uniq")
    if err != nil {
    return nil
    }
  3. 通过用户名,获取用户信息(uid,gid,uName,gName)

    1
    2
    3
    4
    5
    //通过用户名,获取用户信息(uid,gid,uName,gName)
    idCommand, err := ExecCommand("id", scannerUser.Text())
    if err != nil {
    continue
    }
  4. 通过用户名获取用户所有进程ID

    1
    2
    3
    4
    5
    //用户所有进程ID
    psCommand, err := ExecCommand("/bin/sh", "-c", "ps -u userName | awk '{if(NR>1){print $1}}'")
    if err != nil {
    continue
    }
  5. 获取用户信息以及用户所运行的进程关系,上面的代码整体后如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    // 获取用户信息以及用户所运行的进程
    func getUserAttr() []UserAttr{
    //获取所有正在运行的用户
    getUserCommand, err := ExecCommand("/bin/sh", "-c", "ps -o ruser=userForLongName -e |sort| uniq")
    // ......

    userAttrs := make([]UserAttr, 0)
    scannerUser := bufio.NewScanner(bytes.NewReader(getUserCommand))
    for scannerUser.Scan() {
    if scannerUser.Text() != "userForLongName" {
    //通过用户名,获取用户信息(uid,gid,uName,gName)
    idCommand, err := ExecCommand("id", scannerUser.Text())
    // ......
    scannerId := bufio.NewScanner(bytes.NewReader(idCommand))
    for scannerId.Scan() {
    //用户&&用户组信息
    f1 := strings.Split(scannerId.Text(), " ")
    user := strings.Split(f1[0], "=")[1] //用户信息
    uidAndName := strings.Split(user, "(")

    group := strings.Split(f1[1], "=")[1] //用户组信息
    gidAndName := strings.Split(group, "(")

    //用户所有进程ID
    psCommand, err := ExecCommand("/bin/sh", "-c", "ps -u "+scannerUser.Text()+" | awk '{if(NR>1){print $1}}'")
    // ......
    scannerPs := bufio.NewScanner(bytes.NewReader(psCommand))
    pid := []string{}

    for scannerPs.Scan() {
    pid = append(pid, scannerPs.Text())
    }

    userAttr := UserAttr{
    Uid: uidAndName[0],
    UName: scannerUser.Text(),
    Gid: gidAndName[0],
    GName: strings.TrimSuffix(gidAndName[1], ")"),
    Pid: pid,
    }
    userAttrs = append(userAttrs, userAttr)
    }
    }

    }
    // ......
    }

遍历进程号组装数据

组装好用户和进程的关系后,下面就是遍历关系,获取进程下面的(网络,磁盘,内存,cpu)
这里就不一一举例收集上面四个了,以网络为例来说明

  1. 网络数据在那个文件?
    这个得确认好,笔者最开始是收集/proc/<pid>/net/snmp下的tcp和udp等网络数据最后写完了发现不对!!
    最后重写,收集/proc/<pid>/net/dev下的网卡流量信息,cat /proc/1/net/dev来看一看结构(网卡|接收|传输):
    /proc/1/net/dev

  2. 定义数据结构

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    type NETPIDCountersStat struct {
    RBytes uint64 // 表示收到的字节
    RPackets uint64 // 表示收到正确的包量
    RErrs uint64 // 表示收到错误的包量
    RDrop uint64 // 表示收到丢弃的包量

    TBytes uint64 // 表示发送的字节
    TPackets uint64 // 表示发送正确的包量
    TErrs uint64 // 表示发送错误的包量
    TDrop uint64 // 表示发送丢弃的包量

    UName string
    GName string
    NetCar string
    }
  3. 解析dev文件组装数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    func NETPIDCounters(netCarName string) (map[string]NETPIDCountersStat, error) {
    // ......
    for _, pid := range u.Pid {
    data, err := ioutil.ReadFile("/proc/" + pid + "/net/dev")
    if err != nil {
    //打印异常但不停止程序 可能有些进程在获取到pid后获取信息时已经停止了
    logs.Error(err.Error())
    continue
    }

    scanner := bufio.NewScanner(bytes.NewReader(data))
    d := NETPIDCountersStat{}
    // 获取pid的具体io值
    for scanner.Scan() {
    f := strings.Split(scanner.Text(), ": ")
    if( len(f) == 2 && strings.TrimSpace(f[0]) == netName) {
    if( len(f) == 2 && strings.TrimSpace(f[0]) == v){
    fields := strings.Fields(f[1]) //网络流量 数据
    rBytes, err := strconv.ParseUint((fields[0]), 10, 64)
    if err != nil {
    return ret, err
    }
    d.RBytes = rBytes

    //......

    if d == empty {
    continue
    }
    d.UName = name
    d.GName = gName
    d.NetCar = v
    ret[v+pid] = d

    }
    }
    }

    }
    // ......
    }
  4. 解析结构体数据,组装成types.TimeSeriesData数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    // 用户级别的网络流量
    func netUserMetrics(netCarName string) []*types.TimeSeriesData {
    cnt, err := netstat.NETPIDCounters(netCarName)
    if err != nil {
    return nil
    }
    ts := time.Now().Unix()
    metrics := []*types.TimeSeriesData{}

    for k, v := range cnt {
    if NameNotAvalid(v.NetCar) {
    continue
    }
    metrics = append(metrics,
    &types.TimeSeriesData{
    Metric: "net.user.rbytes",
    Value: float64(v.RBytes),
    Cycle: Cycle,
    Timestamp: ts,
    DataType: "COUNTER",
    Tags: map[string]string{"user": v.UName, "group": v.GName, "iface": k},
    },

    // ......
    }
    // ......
    }

遇到的问题

  • 除了上面提到的最开始解析错文件,还遇到了一些问题
  1. ps -aux |awk '{if(NR>1){print $1}}' |sort| uniq 想查找用户信息,结果显示不全
    ps -aux |awk '{if(NR>1){print $1}}' |sort| uniq
    解决办法:http://www.linuxdiyf.com/linux/29239.html

  2. 统计用户所有进程网络io后,然后想加和保存一个汇总的结果到数据库(当然很好的是数据量会小),结果引发了一系列问题

    • 这样汇总统计后,发现root用户的流量统计出来太吓人了
      root用户的流量
    • 直觉可能是自己单位换算有问题,或者是我之前的思路有问题?看了源码然后问了下作者发现没啥问题,单位都正常bytes。
    • 于是就把每一个进程的详细记录值打印出来,查看了一下没有问题,差值不明显,看不出来结果!
      log
    • 然后又统计了和上一秒的差值(上一秒root用户总值 -减去 当前root用户总值),卧槽,怎么还有负数!后来上面的文本往下看(每次进程数不一样,左边为168行即168个进程,右边170行即170个进程,绿色的30706那个进程左边就没有!)!!
      log
    • 最后想着求一下均值,也是没法解决这个问题,果断放弃了汇总后保存结果,只能分别保存每一个pid的信息了
  3. Golang的类型也是一个坑,注意一下类型转换还有类似于这种(uint|int)的区别。

  4. 线上环境网卡的问题,因为线上网络的吞吐量非常大通常使用网卡bond把多个物理网卡绑定为一个逻辑网卡,需要确定统计那个网卡
    网卡bond

优化

  1. 模拟单例模式做了一个25s的缓存

    • 因为我们每一次都需要去调用获取用户与进程的关系,它们(网络,磁盘,内存,cpu)的数据组装是分开写的,所以每个Cycle(60s)时间内需要调用4次
    • 然后自己就把获取进程信息那块代码做了一个缓存
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      // 模拟单例模式做了一个25s的缓存
      var u []UserAttr
      var currTime int64
      var lock *sync.Mutex = &sync.Mutex{}
      func GetUserPidInfo() []UserAttr {
      if u == nil || time.Now().Unix() - currTime > 25 {
      lock.Lock()
      defer lock.Unlock()
      u = getUserAttr()
      currTime = time.Now().Unix()
      }
      return u
      }
  2. 多网卡灵活配置

    • 通过配置,传入需要统计的网卡名称(多个网卡以逗号分隔),如果不传会统计指定的默认网卡
      多网卡支持

效果

网络
磁盘

当前网速较慢或者你使用的浏览器不支持博客特定功能,请尝试刷新或换用Chrome、Firefox等现代浏览器