From 553c666723ef345a9b36e3315216ea9380aaee17 Mon Sep 17 00:00:00 2001 From: cuigh Date: Tue, 13 Mar 2018 18:48:16 +0800 Subject: [PATCH] Add ability to scale services automatically --- README.md | 12 ++- biz/docker/service.go | 18 ++++- biz/metric.go | 33 ++++++++ controller/service.go | 2 +- main.go | 24 +++--- scaler/scaler.go | 183 ++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 259 insertions(+), 13 deletions(-) create mode 100644 scaler/scaler.go diff --git a/README.md b/README.md index 4400563..9d9585e 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ [![Swirl](https://goreportcard.com/badge/cuigh/swirl)](https://goreportcard.com/report/cuigh/swirl) -Swirl is a web management tool for Docker, focused on swarm cluster. +**Swirl** is a web management tool for Docker, focused on swarm cluster. ## Features @@ -10,6 +10,7 @@ Swirl is a web management tool for Docker, focused on swarm cluster. * Image and container management * Compose management with deployment support * Service monitoring based on Prometheus +* Service auto scaling * LDAP authentication support * Full permission control based on RBAC model * Scale out as you want @@ -120,6 +121,15 @@ docker service create \ docker stack deploy -c compose.yml swirl ``` +## Advanced features + +**Swirl** use service labels to support some features, the labels in the table below are currently supported. + +Name | Description | Examples +--- | --- | --- +swirl.scale | Service auto scaling | `swirl.scale=min=1,max=5,cpu=30:50` +swirl.metrics | Add additional metrics to service stats page | `swirl.metrics=java`, `swirl.metrics=go` + ## Build **Swirl** use `dep` as dependency management tool. You can build **Swirl** diff --git a/biz/docker/service.go b/biz/docker/service.go index a4d3524..ad88114 100644 --- a/biz/docker/service.go +++ b/biz/docker/service.go @@ -96,6 +96,16 @@ func ServiceList(name string, pageIndex, pageSize int) (infos []*model.ServiceLi return } +// ServiceSearch search services with args. +func ServiceSearch(args filters.Args) (services []swarm.Service, err error) { // nolint: gocyclo + err = mgr.Do(func(ctx context.Context, cli *client.Client) (err error) { + opts := types.ServiceListOptions{Filters: args} + services, err = cli.ServiceList(ctx, opts) + return + }) + return +} + // ServiceCount return number of services. func ServiceCount() (count int, err error) { err = mgr.Do(func(ctx context.Context, cli *client.Client) (err error) { @@ -317,7 +327,7 @@ func ServiceUpdate(info *model.ServiceInfo) error { // nolint: gocyclo } // ServiceScale adjust replicas of a service. -func ServiceScale(name string, count uint64) error { +func ServiceScale(name string, version, count uint64) error { return mgr.Do(func(ctx context.Context, cli *client.Client) (err error) { service, _, err := cli.ServiceInspectWithRaw(ctx, name, types.ServiceInspectOptions{}) if err != nil { @@ -334,7 +344,11 @@ func ServiceScale(name string, count uint64) error { RegistryAuthFrom: types.RegistryAuthFromSpec, QueryRegistry: false, } - resp, err := cli.ServiceUpdate(context.Background(), name, service.Version, spec, options) + ver := service.Version + if version > 0 { + ver = swarm.Version{Index: version} + } + resp, err := cli.ServiceUpdate(context.Background(), name, ver, spec, options) if err == nil && len(resp.Warnings) > 0 { mgr.Logger().Warnf("service %s was scaled but got warnings: %v", name, resp.Warnings) } diff --git a/biz/metric.go b/biz/metric.go index 5edaa1f..28ed309 100644 --- a/biz/metric.go +++ b/biz/metric.go @@ -88,6 +88,39 @@ func (b *metricBiz) GetMatrix(query, label string, start, end time.Time) (lines return } +func (b *metricBiz) GetScalar(query string, t time.Time) (v float64, err error) { + api, err := b.getAPI() + if err != nil { + return 0, err + } + + value, err := api.Query(context.Background(), query, t) + if err != nil { + return 0, err + } + + scalar := value.(*pmodel.Scalar) + return float64(scalar.Value), nil +} + +func (b *metricBiz) GetVector(query string, t time.Time) (values []float64, err error) { + api, err := b.getAPI() + if err != nil { + return nil, err + } + + value, err := api.Query(context.Background(), query, t) + if err != nil { + return nil, err + } + + vector := value.(pmodel.Vector) + for _, sample := range vector { + values = append(values, float64(sample.Value)) + } + return +} + func (b *metricBiz) calcStep(period time.Duration) (step time.Duration) { if period >= times.Day { step = 20 * time.Minute diff --git a/controller/service.go b/controller/service.go index e1283df..2fc758e 100644 --- a/controller/service.go +++ b/controller/service.go @@ -254,7 +254,7 @@ func serviceScale(ctx web.Context) error { return err } - err = docker.ServiceScale(name, uint64(count)) + err = docker.ServiceScale(name, 0, uint64(count)) if err == nil { biz.Event.CreateService(model.EventActionScale, name, ctx.User()) } diff --git a/main.go b/main.go index 41c6905..14b4e7e 100644 --- a/main.go +++ b/main.go @@ -1,8 +1,8 @@ package main import ( - "fmt" "net/http" + "os" "path/filepath" "runtime" "time" @@ -12,6 +12,7 @@ import ( _ "github.com/cuigh/auxo/cache/memory" "github.com/cuigh/auxo/config" "github.com/cuigh/auxo/data/valid" + "github.com/cuigh/auxo/log" "github.com/cuigh/auxo/net/web" "github.com/cuigh/auxo/net/web/filter" "github.com/cuigh/auxo/net/web/filter/auth" @@ -19,6 +20,8 @@ import ( "github.com/cuigh/swirl/biz" "github.com/cuigh/swirl/controller" "github.com/cuigh/swirl/misc" + "github.com/cuigh/swirl/model" + "github.com/cuigh/swirl/scaler" "github.com/cuigh/swirl/security" ) @@ -26,22 +29,25 @@ func main() { misc.BindOptions() app.Name = "Swirl" - app.Version = "0.7.0" + app.Version = "0.7.1" app.Desc = "A web management UI for Docker, focused on swarm cluster" app.Action = func(ctx *app.Context) { misc.LoadOptions() - app.Run(server()) + + setting, err := biz.Setting.Get() + if err != nil { + log.Get(app.Name).Error("Load setting failed: ", err) + os.Exit(1) + } + + scaler.Start() + app.Run(server(setting)) } app.Flags.Register(flag.All) app.Start() } -func server() *web.Server { - setting, err := biz.Setting.Get() - if err != nil { - panic(fmt.Sprintf("Load setting failed: %v", err)) - } - +func server(setting *model.Setting) *web.Server { ws := web.Auto() // customize error handler diff --git a/scaler/scaler.go b/scaler/scaler.go new file mode 100644 index 0000000..a6e28f7 --- /dev/null +++ b/scaler/scaler.go @@ -0,0 +1,183 @@ +package scaler + +import ( + "fmt" + "strings" + "time" + + "github.com/cuigh/auxo/data" + "github.com/cuigh/auxo/data/set" + "github.com/cuigh/auxo/log" + "github.com/cuigh/auxo/util/cast" + "github.com/cuigh/auxo/util/run" + "github.com/cuigh/swirl/biz" + "github.com/cuigh/swirl/biz/docker" + "github.com/docker/docker/api/types/filters" + "github.com/docker/docker/api/types/swarm" +) + +type checker func(service string, low, high float64) (scaleType, float64) + +var checkers = map[string]checker{ + "cpu": cpuChecker, +} + +type scaleType int + +const ( + scaleNone scaleType = iota + scaleUp + scaleDown +) + +type policyType string + +const ( + policyAny policyType = "any" + policyAll policyType = "all" +) + +// Start starts a timer to scale services automatically. +func Start() { + const labelScale = "swirl.scale" + + run.Schedule(time.Minute, func() { + args := filters.NewArgs() + args.Add("mode", "replicated") + args.Add("label", labelScale) + services, err := docker.ServiceSearch(args) + if err != nil { + log.Get("scaler").Error("scaler > Failed to search service: ", err) + return + } + + for _, service := range services { + label := service.Spec.Labels[labelScale] + opts := data.ParseOptions(label, ",", "=") + tryScale(&service, opts) + } + }, nil) +} + +// nolint: gocyclo +func tryScale(service *swarm.Service, opts data.Options) { + // ignore services with global mode + if service.Spec.Mode.Replicated == nil { + return + } + + // ignore services that have been updated in 3 minutes + if service.UpdatedAt.Add(3 * time.Minute).After(time.Now()) { + return + } + + var ( + min = uint64(2) + max = uint64(5) + policy = policyAny + args data.Options + ) + for _, opt := range opts { + switch opt.Name { + case "min": + min = cast.ToUint64(opt.Value, 1) + case "max": + max = cast.ToUint64(opt.Value, 2) + case "policy": + policy = policyType(opt.Value) + default: + args = append(args, opt) + } + } + + result := check(service, policy, args) + if result.Type == scaleNone { + return + } + + replicas := *service.Spec.Mode.Replicated.Replicas + if result.Type == scaleUp { + if replicas < max { + docker.ServiceScale(service.Spec.Name, service.Version.Index, replicas+1) + log.Get("scaler").Infof("scaler > Service '%s' scaled up for: %v", service.Spec.Name, result.Reasons) + } + } else if result.Type == scaleDown { + if replicas > min { + docker.ServiceScale(service.Spec.Name, service.Version.Index, replicas-1) + log.Get("scaler").Infof("scaler > Service '%s' scaled down for: %v", service.Spec.Name, result.Reasons) + } + } +} + +func check(service *swarm.Service, policy policyType, args data.Options) checkResult { + result := checkResult{ + Reasons: make(map[string]float64), + } + if policy == policyAny { + for _, arg := range args { + st, value := checkArg(service.Spec.Name, arg) + if st == scaleNone { + continue + } + result.Type = st + result.Reasons[arg.Name] = value + break + } + } else if policy == policyAll { + types := set.Set{} + for _, arg := range args { + st, value := checkArg(service.Spec.Name, arg) + types.Add(st) + if types.Len() > 1 { + result.Type = scaleNone + return result + } + result.Type = st + result.Reasons[arg.Name] = value + } + } + return result +} + +func checkArg(service string, arg data.Option) (scaleType, float64) { + items := strings.Split(arg.Value, ":") + if len(items) != 2 { + log.Get("scaler").Warnf("scaler > Invalid scale argument: %s=%s", arg.Name, arg.Value) + return scaleNone, 0 + } + + c := checkers[arg.Name] + if c == nil { + log.Get("scaler").Warnf("scaler > Metric checker '%s' not found", arg.Name) + return scaleNone, 0 + } + + low := cast.ToFloat64(items[0]) + high := cast.ToFloat64(items[1]) + return c(service, low, high) +} + +func cpuChecker(service string, low, high float64) (scaleType, float64) { + query := fmt.Sprintf(`avg(rate(container_cpu_user_seconds_total{container_label_com_docker_swarm_service_name="%s"}[5m]) * 100)`, service) + values, err := biz.Metric.GetVector(query, time.Now()) + if err != nil { + log.Get("scaler").Error("scaler > Failed to query metrics: ", err) + return scaleNone, 0 + } + if len(values) == 0 { + return scaleNone, 0 + } + + value := values[0] + if value <= low { + return scaleDown, value + } else if value >= high { + return scaleUp, value + } + return scaleNone, 0 +} + +type checkResult struct { + Type scaleType + Reasons map[string]float64 +}