package biz import ( "context" "os" "time" "github.com/cuigh/auxo/data" "github.com/cuigh/auxo/errors" "github.com/cuigh/auxo/log" "github.com/cuigh/auxo/net/web" "github.com/cuigh/swirl/dao" "github.com/jinzhu/copier" ) var builtins = []*dao.Chart{ dao.NewChart("service", "$cpu", "CPU", "${name}", `rate(container_cpu_user_seconds_total{container_label_com_docker_swarm_service_name="${service}"}[5m]) * 100`, "percent:100", 60), dao.NewChart("service", "$memory", "Memory", "${name}", `container_memory_usage_bytes{container_label_com_docker_swarm_service_name="${service}"}`, "size:bytes", 60), dao.NewChart("service", "$network_in", "Network Receive", "${name}", `sum(irate(container_network_receive_bytes_total{container_label_com_docker_swarm_service_name="${service}"}[5m])) by(name)`, "size:bytes", 60), dao.NewChart("service", "$network_out", "Network Send", "${name}", `sum(irate(container_network_transmit_bytes_total{container_label_com_docker_swarm_service_name="${service}"}[5m])) by(name)`, "size:bytes", 60), } type DashboardBiz interface { FetchData(ctx context.Context, key string, ids []string, period time.Duration) (data.Map, error) FindDashboard(ctx context.Context, name, key string) (dashboard *dao.Dashboard, err error) UpdateDashboard(ctx context.Context, dashboard *dao.Dashboard, user web.User) (err error) } func NewDashboard(d dao.Interface, mb MetricBiz, eb EventBiz) DashboardBiz { return &dashboardBiz{ d: d, mb: mb, eb: eb, } } type dashboardBiz struct { d dao.Interface cb ChartBiz mb MetricBiz eb EventBiz } func (b *dashboardBiz) FindDashboard(ctx context.Context, name, key string) (dashboard *dao.Dashboard, err error) { if dashboard, err = b.d.DashboardGet(ctx, name, key); err != nil { return } if dashboard == nil { dashboard = b.defaultDashboard(name, key) } err = b.fillCharts(ctx, dashboard) return } func (b *dashboardBiz) UpdateDashboard(ctx context.Context, dashboard *dao.Dashboard, user web.User) (err error) { dashboard.UpdatedAt = now() dashboard.UpdatedBy = newOperator(user) return b.d.DashboardUpdate(ctx, dashboard) } func (b *dashboardBiz) FetchData(ctx context.Context, key string, ids []string, period time.Duration) (data.Map, error) { if !b.mb.Enabled() { return data.Map{}, nil } charts, err := b.getCharts(ctx, ids) if err != nil { return nil, err } type Data struct { id string data interface{} err error } ch := make(chan Data, len(charts)) end := time.Now() start := end.Add(-period) for _, chart := range charts { go func(c *dao.Chart) { d := Data{id: c.ID} switch c.Type { case "line", "bar": d.data, d.err = b.fetchMatrixData(ctx, c, key, start, end) case "pie": d.data, d.err = b.fetchVectorData(ctx, c, key, end) case "gauge": d.data, d.err = b.fetchScalarData(ctx, c, key, end) default: d.err = errors.New("invalid chart type: " + c.Type) } ch <- d }(chart) } ds := data.Map{} for range charts { d := <-ch if d.err != nil { log.Get("metric").Error(d.err) } else { ds.Set(d.id, d.data) } } close(ch) return ds, nil } func (b *dashboardBiz) fetchMatrixData(ctx context.Context, chart *dao.Chart, key string, start, end time.Time) (md *MatrixData, err error) { var ( q string d *MatrixData ) for i, m := range chart.Metrics { q, err = b.formatQuery(m.Query, chart.Dashboard, key) if err != nil { return nil, err } if d, err = b.mb.GetMatrix(ctx, q, m.Legend, start, end); err != nil { log.Get("metric").Error(err) } else if i == 0 { md = d } else { md.Legend = append(md.Legend, d.Legend...) md.Series = append(md.Series, d.Series...) } } return md, nil } func (b *dashboardBiz) fetchVectorData(ctx context.Context, chart *dao.Chart, key string, end time.Time) (cvd *VectorData, err error) { var ( q string d *VectorData ) for i, m := range chart.Metrics { q, err = b.formatQuery(m.Query, chart.Dashboard, key) if err != nil { return nil, err } if d, err = b.mb.GetVector(ctx, q, m.Legend, end); err != nil { log.Get("metric").Error(err) } else if i == 0 { cvd = d } else { cvd.Legend = append(cvd.Legend, d.Legend...) cvd.Data = append(cvd.Data, d.Data...) } } return cvd, nil } func (b *dashboardBiz) fetchScalarData(ctx context.Context, chart *dao.Chart, key string, end time.Time) (*VectorValue, error) { query, err := b.formatQuery(chart.Metrics[0].Query, chart.Dashboard, key) if err != nil { return nil, err } v, err := b.mb.GetScalar(ctx, query, end) if err != nil { return nil, err } return &VectorValue{ //Name: "", Value: v, }, nil } func (b *dashboardBiz) formatQuery(query, dashboard, key string) (string, error) { if dashboard == "home" { return query, nil } var errs []error m := map[string]string{dashboard: key} q := os.Expand(query, func(k string) string { if v, ok := m[k]; ok { return v } errs = append(errs, errors.New("invalid argument in query: "+query)) return "" }) if len(errs) == 0 { return q, nil } return "", errs[0] } func (b *dashboardBiz) getCharts(ctx context.Context, ids []string) (charts map[string]*dao.Chart, err error) { var ( customIds []string customCharts []*dao.Chart ) charts = make(map[string]*dao.Chart) for _, id := range ids { if id[0] == '$' { for _, c := range builtins { if c.ID == id { charts[id] = c } } } else { customIds = append(customIds, id) } } if len(customIds) > 0 { if customCharts, err = b.d.ChartGetBatch(ctx, customIds...); err == nil { for _, chart := range customCharts { charts[chart.ID] = chart } } } return } func (b *dashboardBiz) fillCharts(ctx context.Context, d *dao.Dashboard) (err error) { if len(d.Charts) == 0 { return } var ( m map[string]*dao.Chart ids = make([]string, len(d.Charts)) ) for i, c := range d.Charts { ids[i] = c.ID } m, err = b.getCharts(ctx, ids) if err != nil { return err } for i := range d.Charts { if c := m[d.Charts[i].ID]; c != nil { _ = copier.CopyWithOption(&d.Charts[i], c, copier.Option{IgnoreEmpty: true}) } } return nil } func (b *dashboardBiz) defaultDashboard(name, key string) *dao.Dashboard { d := &dao.Dashboard{ Name: name, Key: key, Period: 30, Interval: 15, } if name == "service" { d.Charts = []dao.ChartInfo{ {ID: "$cpu"}, {ID: "$memory"}, {ID: "$network_in"}, {ID: "$network_out"}, } } return d }