-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpushDatas.go
378 lines (313 loc) · 15.9 KB
/
pushDatas.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
//main uploadCadvisorData 程序会调用这里的函数,这里控制着获取并推送数据的主节奏
package main
import (
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
)
func pushData() {
//AllDockerBrief 变量存放所有docker的基本信息JSON格式 , 原理 (echo -e "GET /containers/json HTTP/1.1\r\nHost: www.test.com\r\n")|nc -U //var/run/docker.sock
AllDockerBrief, err1 := getAllDockerBrief()
if err1 != nil {
LogErr(err1, "from pushDatas.go getAllDockerBrief. return")
return
}
var a []AllDockerBriefStruct
if err := json.Unmarshal([]byte(AllDockerBrief), &a); err != nil {
LogErr(err, "from pushDatas.go unmarshal AllDockerBrief. return ")
return
}
//循环读出所有的容器ID, 以容器ID为唯一标示去取所有内容。
for i := 0; i < len(a); i++ {
LogRun("=========>Begin [" + strconv.Itoa(i) + "/" + strconv.Itoa(len(a)) + "] ->" + a[i].ID + "->" + a[i].Names[0] + "=========>")
//-----------------------------------
//--- 通过容器Id获取inspect信息------ 原理:(echo -e "GET /containers/XXXXXXXXXX/json HTTP/1.1\r\nHost: www.test.com\r\n")|nc -U //var/run/docker.sock
DockerInspect, err := getDockerData(a[i].ID)
if err != nil {
LogErr(err, "from pushDatas.go getDockerData ..continue ")
continue
}
var b DockerInspectStruct
if err := json.Unmarshal([]byte(DockerInspect), &b); err != nil {
LogErr(err, "from pushDatas.go unmarshal DockerInspect ..continue")
continue
}
// inspect结构体中信息非常丰富,目前只取出了很少的信息,以后可以多加利用 ,例如 Env ,Labels等
endpoint := getEndPoint(b.Name) // 容器的名字也可以在AllDockerBrief结构体中获取
containerIP := b.NetworkSettings.IPAddress + "/" + strconv.Itoa(b.NetworkSettings.IPPrefixLen)
tag := "" //函数已经有了,目前没用上,对应的容器的label
LogRun("=>" + endpoint + "=>" + containerIP) // 可以通过inspect获取容器的IP
//----------------------------------------
//---通过容器ID, 获取容器对应的cadvisor的详细信息---
OneContainerCadvisorJSON, err2 := getOneContainerCadvisorData(a[i].ID)
if err2 != nil {
LogErr(err2, "from pushDatas.go getOneContainerCadvisorData .return ")
return
}
CutIndex := strings.IndexAny(OneContainerCadvisorJSON, ":")
if CutIndex == -1 {
LogErr(nil, "from pushDatas.go CutIndex,can not find maohao .return ")
return
}
// 由于原始的JSON串前边的key是不固定的字符,无法进行json解析,所以需要先截取成可以解析的结构体JSON
OneContainerCadvisorJSONAfterCut := OneContainerCadvisorJSON[CutIndex+1 : len(OneContainerCadvisorJSON)-1]
var c ContainerInfo // 使用官方cadvisor api v1中提供的api进行解析
if err := json.Unmarshal([]byte(OneContainerCadvisorJSONAfterCut), &c); err != nil {
LogErr(err, "from pushDatas.go unmarshal ContainerInfo ..return")
continue
}
//----------------------------------------
//--下边就可以任意取出cadvisor对容器的描述了---
//LogRun("=>" + "开始获取这个容器的cadvisor数据" + c.ContainerReference.Id)
timestamp := fmt.Sprintf("%d", time.Now().Unix()) //时间格式变化会上报失败,timestamp := time.Now().Format("2006-01-02 15:04:05")
//---内存上报---------
if err := pushMem(c, timestamp, tag, a[i].ID, endpoint); err != nil { //get cadvisor data about Memery
LogErr(err, "from pushDatas.go pushMem function ")
}
//-----cpu上报------------
if err := NewPushCPU(c, timestamp, tag, a[i].ID, endpoint); err != nil {
LogErr(err, "from pushDatas.go NewPushCPU function ")
}
//-----IO上报-------------
if err := pushDiskIo(c, timestamp, tag, a[i].ID, endpoint); err != nil {
LogErr(err, "from pushDatas.go pushDisoIo function")
}
//------网卡流量上报--------
if err := NewPushNetwork(c, timestamp, tag, a[i].ID, endpoint); err != nil {
LogErr(err, "from pushDatas.go NewPushNetwork function ")
}
}
// os.Exit(1)
}
// 本函数上报了四个数值,内存使用量,内存最大值,内存百分比, 如果需要上报的更多, 可以增加这个函数的参数。
func pushMem(_c ContainerInfo, timestamp, tags, containerID, endpoint string) error {
LogRun("begin to push Mem Info")
memLimit := _c.Spec.Memory.Limit //资源配额是个固定值,取一次即可
//-----求出memUsage平均值--------
var memUseage uint64
for _, u := range _c.Stats {
//LogRun(strconv.Itoa(i) + fmt.Sprint(u.Memory.Usage))
memUseage = memUseage + u.Memory.Usage
}
memUseage = (memUseage) / uint64(len(_c.Stats)) // 内存用量是个变动数值,把stats数组中的所有数值加起来,求平均,以防止某一个为空造成的取值不准。
//-----求出memcache平均值--------
var memCache uint64
for _, u := range _c.Stats {
memCache += u.Memory.Cache
}
memCache = memCache / uint64(len(_c.Stats))
//-----求出rss平均值--------
var memRss uint64
for _, u := range _c.Stats {
memRss += u.Memory.RSS
}
memRss = memRss / uint64(len(_c.Stats))
//-------求出working_set平均值---------------------
var memWorkingSet uint64
for _, u := range _c.Stats {
memWorkingSet += u.Memory.WorkingSet
}
memWorkingSet = memWorkingSet / uint64(len(_c.Stats))
//LogRun(fmt.Sprint(float64(memUseage)) + "---" + fmt.Sprint(float64(memLimit)))
// 上报内存使用量
if err := pushIt(fmt.Sprint(memUseage), timestamp, "mem.memused", tags, containerID, "GAUGE", endpoint); err != nil {
//LogErr(err, "pushIt err in pushMem")
return err
}
// 上报内存总量
if err := pushIt(fmt.Sprint(memLimit), timestamp, "mem.memtotal", tags, containerID, "GAUGE", endpoint); err != nil {
//LogErr(err, "pushIt err in pushMem")
return err
}
//上报总内存使用率
memUsagePrecent := float64(memUseage) / float64(memLimit) // 注意上报的这个数值已经乘以了100,即:上报的是百分数
if err := pushIt(fmt.Sprintf("%.3f", memUsagePrecent*100), timestamp, "mem.totalUsed.percent", tags, containerID, "GAUGE", endpoint); err != nil {
// LogErr(err, "pushIt err in pushMem")
return err
}
//(rss)memlimit
memRssUsagePrecent := float64(memRss) / float64(memLimit) // 注意上报的这个数值已经乘以了100,即:上报的是百分数
if err := pushIt(fmt.Sprintf("%.3f", memRssUsagePrecent*100), timestamp, "mem.Rss.percent", tags, containerID, "GAUGE", endpoint); err != nil {
// LogErr(err, "pushIt err in pushMem")
return err
}
//(cache)memlimit
memCacheUsagePrecent := float64(memCache) / float64(memLimit) // 注意上报的这个数值已经乘以了100,即:上报的是百分数
if err := pushIt(fmt.Sprintf("%.3f", memCacheUsagePrecent*100), timestamp, "mem.cache.percent", tags, containerID, "GAUGE", endpoint); err != nil {
// LogErr(err, "pushIt err in pushMem")
return err
}
//(WorkingSet)memlimit
memWorkingSetUsagePrecent := float64(memWorkingSet) / float64(memLimit) // 注意上报的这个数值已经乘以了100,即:上报的是百分数
if err := pushIt(fmt.Sprintf("%.3f", memWorkingSetUsagePrecent*100), timestamp, "mem.WorkingSet.percent", tags, containerID, "GAUGE", endpoint); err != nil {
// LogErr(err, "pushIt err in pushMem")
return err
}
return nil
}
// pushDiskIo 用于上报磁盘使用情况,
func pushDiskIo(_c ContainerInfo, timestamp, tags, containerID, endpoint string) error {
LogRun("begin to push DiskIo Info")
var writeUsage uint64
var readUsage uint64
var StatsCount uint64 //计数器存放, read 或者 write 值被累加的次数,作为求平均的分母;
for _, u := range _c.Stats {
StatsCount++
for _, j := range u.DiskIo.IoServiceBytes {
//这个数组的长度是4,不知道是否为固定的,不过无所谓, 全部进行和运算。
//LogRun(strconv.Itoa(i) + strconv.Itoa(k) + "--" + fmt.Sprint(j.Stats["Read"]) + "---" + fmt.Sprint(j.Stats["Write"]))
writeUsage += j.Stats["Write"]
readUsage += j.Stats["Read"]
}
}
writeUsage = writeUsage / StatsCount
readUsage = readUsage / StatsCount
if err := pushIt(fmt.Sprintf("%.2f", float64(writeUsage)/1048576), timestamp, "disk.io.write_MBps", tags, containerID, "GAUGE", endpoint); err != nil {
LogErr(err, "pushIt err in pushDiskIo")
}
if err := pushIt(fmt.Sprintf("%.2f", float64(readUsage)/1048576), timestamp, "disk.io.read_MBps", tags, containerID, "GAUGE", endpoint); err != nil {
LogErr(err, "pushIt err in pushDiskIo")
}
return nil
}
//NewPushCPU 函数用于上报stats结构体中cpu的信息,这些是全部的信息了。
//简单的做了一个除法,从ns->us->ms->s换算为s , 这个数值应该可以说明cpu的事情情况,以后有更优秀的算法再改进。
//20170419 修改NewPushCPU函数, 因为发现stats 中的cpu时间为增量值,需要计算出变化量。
func NewPushCPU(_c ContainerInfo, timestamp, tags, containerID, endpoint string) error {
LogRun("begin to push CPU Info")
//---计算cpuLoadAverage-------
var cpuLoadAverage int32
for _, u := range _c.Stats {
//LogRun(strconv.Itoa(i) + "--" + fmt.Sprint(u.Cpu.LoadAverage) + "---" + fmt.Sprint(u.Cpu.Usage.Total))
cpuLoadAverage += u.Cpu.LoadAverage
}
cpuLoadAverage = cpuLoadAverage / int32(len(_c.Stats))
//---计算cpuUsageTotal -----
var cpuUsageTotal uint64
LogRun("NeWPushCPU len([]Stats) should be >= 2 , now is " + strconv.Itoa(len(_c.Stats)))
if len(_c.Stats) < 2 {
if err := pushIt("0", timestamp, "cpu.usageTotalPercent", tags, containerID, "GAUGE", endpoint); err != nil {
return err
}
return nil
}
// 阶梯运算,取出所有的差值
for i := 0; i < len(_c.Stats)-1; i++ {
//LogRun(strconv.Itoa(i) + "--" + fmt.Sprint(_c.Stats[i+1].Cpu.Usage.Total-_c.Stats[i].Cpu.Usage.Total))
cpuUsageTotal += _c.Stats[i+1].Cpu.Usage.Total - _c.Stats[i].Cpu.Usage.Total
}
//求差值的平均值
cpuUsageTotal = cpuUsageTotal / uint64(len(_c.Stats)-1)
// 上报cpuload平均值, 这个值测试阶段一直为0,不知道线上知否能用到
if err := pushIt(fmt.Sprintf("%.2f", float64(cpuLoadAverage)), timestamp, "cpu.loadaverage", tags, containerID, "GAUGE", endpoint); err != nil {
return err
}
if err := pushIt(fmt.Sprint(float64(cpuUsageTotal)/1000000000), timestamp, "cpu.usageTotalSec", tags, containerID, "GAUGE", endpoint); err != nil {
return err
}
return nil
}
// func NewPushCPU(_c ContainerInfo, timestamp, tags, containerID, endpoint string) error {
// LogRun("begin to push CPU Info")
//
// // 以下三个参数是可以提供上报的,暂时隐藏掉。
// // cpuUsageUser := _c.Stats[STATSINDEX].Cpu.Usage.User 用户空间占用的cpu量
// // cpuUsageSystem := _c.Stats[STATSINDEX].Cpu.Usage.System 内核空间占用的cpu值
// // cpuUsagePerCPUUsage := _c.Stats[STATSINDEX].Cpu.Usage.PerCpu // 每个cpu核心占用的cpu量值
//
// var cpuLoadAverage int32
// var cpuUsageTotal uint64
// for _, u := range _c.Stats {
//
// //LogRun(strconv.Itoa(i) + "--" + fmt.Sprint(u.Cpu.LoadAverage) + "---" + fmt.Sprint(u.Cpu.Usage.Total))
// cpuLoadAverage += u.Cpu.LoadAverage
// cpuUsageTotal += u.Cpu.Usage.Total
//
// }
// cpuLoadAverage = cpuLoadAverage / int32(len(_c.Stats))
// cpuUsageTotal = cpuUsageTotal / uint64(len(_c.Stats))
//
// // 上报cpuload平均值, 这个值测试阶段一直为0,不知道线上知否能用到
// if err := pushIt(fmt.Sprintf("%.2f", float64(cpuLoadAverage)), timestamp, "cpu.loadaverage", tags, containerID, "GAUGE", endpoint); err != nil {
// return err
// }
// //上报cpu的总使用量,包括用户空间+内核 1000,000,000,000,000 * 100% 这个数值和docker stats 观察到的cpu百分比很像, 估且就当作百分比吧,如果不是至少可以从这个数值看出cpu的负载情况。
// //即(cpu耗时 / 百万分之一秒) *100% 和cpu使用率很像。
// if err := pushIt(fmt.Sprintf("%.5f", float64(cpuUsageTotal)/10000000000000), timestamp, "cpu.usageTotalPercent", tags, containerID, "GAUGE", endpoint); err != nil {
// return err
// }
// // if err := pushIt(fmt.Sprintf("%.2f", float64(cpuUsageUser)/1000000000), timestamp, "cpu.UsageUser", tags, containerID, "GAUGE", endpoint); err != nil {
// // return err
// // }
// // if err := pushIt(fmt.Sprintf("%.2f", float64(cpuUsageSystem)/1000000000), timestamp, "cpu.UsageSystem", tags, containerID, "GAUGE", endpoint); err != nil {
// // return err
// // }
// // for i, u := range cpuUsagePerCPUUsage {
// //
// // if err := pushIt(fmt.Sprintf("%.2f", float64(u)/1000000000), timestamp, "cpu.perCpuUsage-"+strconv.Itoa(i), tags, containerID, "GAUGE", endpoint); err != nil {
// // return err
// // }
// //
// // }
// return nil
// }
//NewPushNetwork 函数作用为容器上报网卡流量
// 由于采集的是网卡的累计值,每次采集的stats数组数量又不确定,所以书写算法: stats_rxbytes[i+1] - stats_rxbytes[i] , 依次可以计算出多个增量值, 返回增量值的平均数。
// 流量的单位是每秒, 丢包是以每秒平均丢包个数计算的
func NewPushNetwork(_c ContainerInfo, timestamp, tags, containerID, endpoint string) error {
LogRun("begin to push NetWork Info")
var rxBytes uint64
var rxDropped uint64
var txBytes uint64
var txDropped uint64
LogRun("NeWPushNetwork len([]Stats) should be >= 2 , now is " + strconv.Itoa(len(_c.Stats)))
// 这种情况说明Stats数组太短,无法看出趋势
if len(_c.Stats) < 2 {
//push 所有数值为0 ,防止图形断点
if err := pushIt("0", timestamp, "net.rx.KBps", tags, containerID, "GAUGE", endpoint); err != nil {
return err
}
if err := pushIt("0", timestamp, "net.tx.KBps", tags, containerID, "GAUGE", endpoint); err != nil {
return err
}
if err := pushIt("0", timestamp, "net.rxDrop.packets", tags, containerID, "GAUGE", endpoint); err != nil {
return err
}
if err := pushIt("0", timestamp, "net.txDrop.packets", tags, containerID, "GAUGE", endpoint); err != nil {
return err
}
return nil
}
// 控制循环的次数 = 数组长度-1
for i := 0; i < len(_c.Stats)-1; i++ {
// 调试错误的时候可以放开
//LogRun(strconv.Itoa(i) + "--" + fmt.Sprint(_c.Stats[i+1].Network.RxBytes-_c.Stats[i].Network.RxBytes) + "---" + fmt.Sprint(_c.Stats[i+1].Network.TxBytes-_c.Stats[i].Network.TxBytes))
//LogRun(strconv.Itoa(i) + "--" + fmt.Sprint(_c.Stats[i+1].Network.RxDropped-_c.Stats[i].Network.RxDropped) + "---" + fmt.Sprint(_c.Stats[i+1].Network.TxDropped-_c.Stats[i].Network.TxDropped))
rxBytes += _c.Stats[i+1].Network.RxBytes - _c.Stats[i].Network.RxBytes
txBytes += _c.Stats[i+1].Network.TxBytes - _c.Stats[i].Network.TxBytes
rxDropped += _c.Stats[i+1].Network.RxDropped - _c.Stats[i].Network.RxDropped
txDropped += _c.Stats[i+1].Network.TxDropped - _c.Stats[i].Network.TxDropped
}
rxBytes = rxBytes / uint64(len(_c.Stats)-1) //本次统计周期的tx增量平均值
txBytes = txBytes / uint64(len(_c.Stats)-1) //本次统计周期的tx增量平均值
rxDropped = rxDropped / uint64(len(_c.Stats)-1)
txDropped = txDropped / uint64(len(_c.Stats)-1)
LogRun("average_rxtx_KBps:" + fmt.Sprintf("%.2f", float64(rxBytes)/1000) + "<-->" + fmt.Sprintf("%.2f", float64(txBytes)/1000)) //Stats[]中的数据都是一秒一更新,所以单位为KBps
LogRun("average_rxtxDrop_Package:" + fmt.Sprint(rxDropped) + "<-->" + fmt.Sprint(txDropped))
// 所需数据在上边都已经计算完毕,以下为推送数据的代码
if err := pushIt(fmt.Sprintf("%.2f", float64(rxBytes)/1000), timestamp, "net.rx.KBps", tags, containerID, "GAUGE", endpoint); err != nil {
return err
}
if err := pushIt(fmt.Sprintf("%.2f", float64(txBytes)/1000), timestamp, "net.tx.KBps", tags, containerID, "GAUGE", endpoint); err != nil {
return err
}
if err := pushIt(fmt.Sprint(rxDropped), timestamp, "net.rxDrop.packets", tags, containerID, "GAUGE", endpoint); err != nil {
return err
}
if err := pushIt(fmt.Sprint(txDropped), timestamp, "net.txDrop.packets", tags, containerID, "GAUGE", endpoint); err != nil {
return err
}
return nil
}