|
| 1 | +package agent |
| 2 | + |
| 3 | +import ( |
| 4 | + "errors" |
| 5 | + "os" |
| 6 | + "path" |
| 7 | + "strings" |
| 8 | + "time" |
| 9 | + |
| 10 | + "github.com/intelsdi-x/snap-plugin-lib-go/v1/plugin" |
| 11 | + logging "github.com/op/go-logging" |
| 12 | +) |
| 13 | + |
| 14 | +var log = logging.MustGetLogger("processor") |
| 15 | +var logFormatter = logging.MustStringFormatter( |
| 16 | + ` %{level:.1s}%{time:0102 15:04:05.999999} %{pid} %{shortfile}] %{message}`, |
| 17 | +) |
| 18 | + |
| 19 | +type FileLog struct { |
| 20 | + Name string |
| 21 | + Logger *logging.Logger |
| 22 | + LogFile *os.File |
| 23 | +} |
| 24 | + |
| 25 | +type PreviousData struct { |
| 26 | + Data float64 |
| 27 | + Create time.Time |
| 28 | +} |
| 29 | + |
| 30 | +// Processor test processor |
| 31 | +type SnapProcessor struct { |
| 32 | + Cache map[string]PreviousData |
| 33 | +} |
| 34 | + |
| 35 | +// NewProcessor generate processor |
| 36 | +func NewProcessor() plugin.Processor { |
| 37 | + return &SnapProcessor{ |
| 38 | + Cache: make(map[string]PreviousData), |
| 39 | + } |
| 40 | +} |
| 41 | + |
| 42 | +func NewLogger(filesPath string, name string) (*FileLog, error) { |
| 43 | + logDirPath := path.Join(filesPath, "log") |
| 44 | + if _, err := os.Stat(logDirPath); os.IsNotExist(err) { |
| 45 | + os.Mkdir(logDirPath, 0777) |
| 46 | + } |
| 47 | + |
| 48 | + logFilePath := path.Join(logDirPath, name+".log") |
| 49 | + logFile, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_RDWR, 0666) |
| 50 | + if err != nil { |
| 51 | + return nil, errors.New("Unable to create log file:" + err.Error()) |
| 52 | + } |
| 53 | + |
| 54 | + fileLog := logging.NewLogBackend(logFile, "["+name+"]", 0) |
| 55 | + fileLogLevel := logging.AddModuleLevel(fileLog) |
| 56 | + fileLogLevel.SetLevel(logging.INFO, "") |
| 57 | + fileLogBackend := logging.NewBackendFormatter(fileLog, logFormatter) |
| 58 | + |
| 59 | + log.SetBackend(logging.SetBackend(fileLogBackend)) |
| 60 | + |
| 61 | + return &FileLog{ |
| 62 | + Name: name, |
| 63 | + Logger: log, |
| 64 | + LogFile: logFile, |
| 65 | + }, nil |
| 66 | +} |
| 67 | + |
| 68 | +// Process test process function |
| 69 | +func (p *SnapProcessor) Process(mts []plugin.Metric, cfg plugin.Config) ([]plugin.Metric, error) { |
| 70 | + processLog, err := NewLogger("/tmp", "processor") |
| 71 | + if err != nil { |
| 72 | + return mts, errors.New("Error creating process logger: " + err.Error()) |
| 73 | + } |
| 74 | + defer processLog.LogFile.Close() |
| 75 | + |
| 76 | + log := processLog.Logger |
| 77 | + log.Infof("Process received metric size: %d", len(mts)) |
| 78 | + |
| 79 | + namespacesConfig, err := cfg.GetString("namespaces") |
| 80 | + if err != nil { |
| 81 | + return mts, errors.New("Unable to read namespaces config: " + err.Error()) |
| 82 | + } |
| 83 | + processNamespaces := strings.Split(namespacesConfig, ",") |
| 84 | + processNamespaces = append(processNamespaces, "") |
| 85 | + log.Infof("Process namespaces: %+v", processNamespaces) |
| 86 | + |
| 87 | + filterMetricKeywordsConfig, err := cfg.GetString("filterMetricKeywords") |
| 88 | + if err != nil { |
| 89 | + return mts, errors.New("Unable to read filterMetricKeywords config: " + err.Error()) |
| 90 | + } |
| 91 | + filterMetricKeywords := strings.Split(filterMetricKeywordsConfig, ",") |
| 92 | + log.Infof("Process filterMetricKeywords: %+v", filterMetricKeywords) |
| 93 | + |
| 94 | + metrics := []plugin.Metric{} |
| 95 | + for _, mt := range mts { |
| 96 | + podNamespace, _ := mt.Tags["io.kubernetes.pod.namespace"] |
| 97 | + if inArray(podNamespace, processNamespaces) { |
| 98 | + averageData := p.caluAverageData(mt, filterMetricKeywords, log) |
| 99 | + if averageData != -1 { |
| 100 | + mt.Data = averageData |
| 101 | + mt.Tags["processed"] = "true" |
| 102 | + } |
| 103 | + metrics = append(metrics, mt) |
| 104 | + } |
| 105 | + } |
| 106 | + |
| 107 | + log.Infof("Process filter metric size %d: ", len(metrics)) |
| 108 | + log.Infof("Process filter metric %+v: ", metrics) |
| 109 | + return metrics, nil |
| 110 | +} |
| 111 | + |
| 112 | +/* |
| 113 | + GetConfigPolicy() returns the configPolicy for your plugin. |
| 114 | +
|
| 115 | + A config policy is how users can provide configuration info to |
| 116 | + plugin. Here you define what sorts of config info your plugin |
| 117 | + needs and/or requires. |
| 118 | +*/ |
| 119 | +func (p *SnapProcessor) GetConfigPolicy() (plugin.ConfigPolicy, error) { |
| 120 | + policy := plugin.NewConfigPolicy() |
| 121 | + return *policy, nil |
| 122 | +} |
| 123 | + |
| 124 | +func (p *SnapProcessor) caluAverageData( |
| 125 | + mt plugin.Metric, |
| 126 | + filterMetricKeywords []string, |
| 127 | + log *logging.Logger) float64 { |
| 128 | + namespaces := mt.Namespace.Strings() |
| 129 | + mapKey := strings.Join(namespaces, "/") |
| 130 | + averageData := float64(-1) |
| 131 | + previousData, ok := p.Cache[mapKey] |
| 132 | + if ok { |
| 133 | + log.Infof("Find %s previous cache metric vaule: %+v", mapKey, previousData) |
| 134 | + diffSeconds := mt.Timestamp.Sub(previousData.Create).Seconds() |
| 135 | + diffValue := (convertInterface(mt.Data) - previousData.Data) |
| 136 | + if diffSeconds > 0 && diffValue > 0 { |
| 137 | + averageData = (convertInterface(mt.Data) - previousData.Data) / diffSeconds |
| 138 | + log.Infof("Calculate %s averageData(%f) on %s", mapKey, averageData, mt.Timestamp) |
| 139 | + } |
| 140 | + } |
| 141 | + |
| 142 | + // filter do not need counter metric |
| 143 | + caluMetric := namespaces[len(namespaces)-1] |
| 144 | + filterCache := false |
| 145 | + for _, metricKeyword := range filterMetricKeywords { |
| 146 | + if strings.Contains(caluMetric, metricKeyword) { |
| 147 | + filterCache = true |
| 148 | + break |
| 149 | + } |
| 150 | + } |
| 151 | + |
| 152 | + if !filterCache { |
| 153 | + p.Cache[mapKey] = PreviousData{ |
| 154 | + Data: convertInterface(mt.Data), |
| 155 | + Create: mt.Timestamp, |
| 156 | + } |
| 157 | + log.Infof("Cache this time metric vaule: %+v", p.Cache[mapKey]) |
| 158 | + } |
| 159 | + |
| 160 | + return averageData |
| 161 | +} |
| 162 | + |
| 163 | +func convertInterface(data interface{}) float64 { |
| 164 | + switch data.(type) { |
| 165 | + case int: |
| 166 | + return float64(data.(int)) |
| 167 | + case int8: |
| 168 | + return float64(data.(int8)) |
| 169 | + case int16: |
| 170 | + return float64(data.(int16)) |
| 171 | + case int32: |
| 172 | + return float64(data.(int32)) |
| 173 | + case int64: |
| 174 | + return float64(data.(int64)) |
| 175 | + case uint64: |
| 176 | + return float64(data.(uint64)) |
| 177 | + case float32: |
| 178 | + return float64(data.(float32)) |
| 179 | + case float64: |
| 180 | + return float64(data.(float64)) |
| 181 | + default: |
| 182 | + return float64(0) |
| 183 | + } |
| 184 | +} |
| 185 | + |
| 186 | +func inArray(a string, list []string) bool { |
| 187 | + for _, b := range list { |
| 188 | + if b == a { |
| 189 | + return true |
| 190 | + } |
| 191 | + } |
| 192 | + return false |
| 193 | +} |
0 commit comments