mirror of
https://github.com/cuigh/swirl
synced 2025-01-30 22:37:21 +00:00
Add ability to scale services automatically
This commit is contained in:
parent
c600bcb0dd
commit
553c666723
12
README.md
12
README.md
@ -2,7 +2,7 @@
|
||||
|
||||
[![Swirl](https://goreportcard.com/badge/cuigh/swirl)](https://goreportcard.com/report/cuigh/swirl)
|
||||
|
||||
Swirl is a web management tool for Docker, focused on swarm cluster.
|
||||
**Swirl** is a web management tool for Docker, focused on swarm cluster.
|
||||
|
||||
## Features
|
||||
|
||||
@ -10,6 +10,7 @@ Swirl is a web management tool for Docker, focused on swarm cluster.
|
||||
* Image and container management
|
||||
* Compose management with deployment support
|
||||
* Service monitoring based on Prometheus
|
||||
* Service auto scaling
|
||||
* LDAP authentication support
|
||||
* Full permission control based on RBAC model
|
||||
* Scale out as you want
|
||||
@ -120,6 +121,15 @@ docker service create \
|
||||
docker stack deploy -c compose.yml swirl
|
||||
```
|
||||
|
||||
## Advanced features
|
||||
|
||||
**Swirl** use service labels to support some features, the labels in the table below are currently supported.
|
||||
|
||||
Name | Description | Examples
|
||||
--- | --- | ---
|
||||
swirl.scale | Service auto scaling | `swirl.scale=min=1,max=5,cpu=30:50`
|
||||
swirl.metrics | Add additional metrics to service stats page | `swirl.metrics=java`, `swirl.metrics=go`
|
||||
|
||||
## Build
|
||||
|
||||
**Swirl** use `dep` as dependency management tool. You can build **Swirl**
|
||||
|
@ -96,6 +96,16 @@ func ServiceList(name string, pageIndex, pageSize int) (infos []*model.ServiceLi
|
||||
return
|
||||
}
|
||||
|
||||
// ServiceSearch search services with args.
|
||||
func ServiceSearch(args filters.Args) (services []swarm.Service, err error) { // nolint: gocyclo
|
||||
err = mgr.Do(func(ctx context.Context, cli *client.Client) (err error) {
|
||||
opts := types.ServiceListOptions{Filters: args}
|
||||
services, err = cli.ServiceList(ctx, opts)
|
||||
return
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// ServiceCount return number of services.
|
||||
func ServiceCount() (count int, err error) {
|
||||
err = mgr.Do(func(ctx context.Context, cli *client.Client) (err error) {
|
||||
@ -317,7 +327,7 @@ func ServiceUpdate(info *model.ServiceInfo) error { // nolint: gocyclo
|
||||
}
|
||||
|
||||
// ServiceScale adjust replicas of a service.
|
||||
func ServiceScale(name string, count uint64) error {
|
||||
func ServiceScale(name string, version, count uint64) error {
|
||||
return mgr.Do(func(ctx context.Context, cli *client.Client) (err error) {
|
||||
service, _, err := cli.ServiceInspectWithRaw(ctx, name, types.ServiceInspectOptions{})
|
||||
if err != nil {
|
||||
@ -334,7 +344,11 @@ func ServiceScale(name string, count uint64) error {
|
||||
RegistryAuthFrom: types.RegistryAuthFromSpec,
|
||||
QueryRegistry: false,
|
||||
}
|
||||
resp, err := cli.ServiceUpdate(context.Background(), name, service.Version, spec, options)
|
||||
ver := service.Version
|
||||
if version > 0 {
|
||||
ver = swarm.Version{Index: version}
|
||||
}
|
||||
resp, err := cli.ServiceUpdate(context.Background(), name, ver, spec, options)
|
||||
if err == nil && len(resp.Warnings) > 0 {
|
||||
mgr.Logger().Warnf("service %s was scaled but got warnings: %v", name, resp.Warnings)
|
||||
}
|
||||
|
@ -88,6 +88,39 @@ func (b *metricBiz) GetMatrix(query, label string, start, end time.Time) (lines
|
||||
return
|
||||
}
|
||||
|
||||
func (b *metricBiz) GetScalar(query string, t time.Time) (v float64, err error) {
|
||||
api, err := b.getAPI()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
value, err := api.Query(context.Background(), query, t)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
scalar := value.(*pmodel.Scalar)
|
||||
return float64(scalar.Value), nil
|
||||
}
|
||||
|
||||
func (b *metricBiz) GetVector(query string, t time.Time) (values []float64, err error) {
|
||||
api, err := b.getAPI()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
value, err := api.Query(context.Background(), query, t)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
vector := value.(pmodel.Vector)
|
||||
for _, sample := range vector {
|
||||
values = append(values, float64(sample.Value))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (b *metricBiz) calcStep(period time.Duration) (step time.Duration) {
|
||||
if period >= times.Day {
|
||||
step = 20 * time.Minute
|
||||
|
@ -254,7 +254,7 @@ func serviceScale(ctx web.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
err = docker.ServiceScale(name, uint64(count))
|
||||
err = docker.ServiceScale(name, 0, uint64(count))
|
||||
if err == nil {
|
||||
biz.Event.CreateService(model.EventActionScale, name, ctx.User())
|
||||
}
|
||||
|
24
main.go
24
main.go
@ -1,8 +1,8 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"time"
|
||||
@ -12,6 +12,7 @@ import (
|
||||
_ "github.com/cuigh/auxo/cache/memory"
|
||||
"github.com/cuigh/auxo/config"
|
||||
"github.com/cuigh/auxo/data/valid"
|
||||
"github.com/cuigh/auxo/log"
|
||||
"github.com/cuigh/auxo/net/web"
|
||||
"github.com/cuigh/auxo/net/web/filter"
|
||||
"github.com/cuigh/auxo/net/web/filter/auth"
|
||||
@ -19,6 +20,8 @@ import (
|
||||
"github.com/cuigh/swirl/biz"
|
||||
"github.com/cuigh/swirl/controller"
|
||||
"github.com/cuigh/swirl/misc"
|
||||
"github.com/cuigh/swirl/model"
|
||||
"github.com/cuigh/swirl/scaler"
|
||||
"github.com/cuigh/swirl/security"
|
||||
)
|
||||
|
||||
@ -26,22 +29,25 @@ func main() {
|
||||
misc.BindOptions()
|
||||
|
||||
app.Name = "Swirl"
|
||||
app.Version = "0.7.0"
|
||||
app.Version = "0.7.1"
|
||||
app.Desc = "A web management UI for Docker, focused on swarm cluster"
|
||||
app.Action = func(ctx *app.Context) {
|
||||
misc.LoadOptions()
|
||||
app.Run(server())
|
||||
|
||||
setting, err := biz.Setting.Get()
|
||||
if err != nil {
|
||||
log.Get(app.Name).Error("Load setting failed: ", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
scaler.Start()
|
||||
app.Run(server(setting))
|
||||
}
|
||||
app.Flags.Register(flag.All)
|
||||
app.Start()
|
||||
}
|
||||
|
||||
func server() *web.Server {
|
||||
setting, err := biz.Setting.Get()
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Load setting failed: %v", err))
|
||||
}
|
||||
|
||||
func server(setting *model.Setting) *web.Server {
|
||||
ws := web.Auto()
|
||||
|
||||
// customize error handler
|
||||
|
183
scaler/scaler.go
Normal file
183
scaler/scaler.go
Normal file
@ -0,0 +1,183 @@
|
||||
package scaler
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/cuigh/auxo/data"
|
||||
"github.com/cuigh/auxo/data/set"
|
||||
"github.com/cuigh/auxo/log"
|
||||
"github.com/cuigh/auxo/util/cast"
|
||||
"github.com/cuigh/auxo/util/run"
|
||||
"github.com/cuigh/swirl/biz"
|
||||
"github.com/cuigh/swirl/biz/docker"
|
||||
"github.com/docker/docker/api/types/filters"
|
||||
"github.com/docker/docker/api/types/swarm"
|
||||
)
|
||||
|
||||
type checker func(service string, low, high float64) (scaleType, float64)
|
||||
|
||||
var checkers = map[string]checker{
|
||||
"cpu": cpuChecker,
|
||||
}
|
||||
|
||||
type scaleType int
|
||||
|
||||
const (
|
||||
scaleNone scaleType = iota
|
||||
scaleUp
|
||||
scaleDown
|
||||
)
|
||||
|
||||
type policyType string
|
||||
|
||||
const (
|
||||
policyAny policyType = "any"
|
||||
policyAll policyType = "all"
|
||||
)
|
||||
|
||||
// Start starts a timer to scale services automatically.
|
||||
func Start() {
|
||||
const labelScale = "swirl.scale"
|
||||
|
||||
run.Schedule(time.Minute, func() {
|
||||
args := filters.NewArgs()
|
||||
args.Add("mode", "replicated")
|
||||
args.Add("label", labelScale)
|
||||
services, err := docker.ServiceSearch(args)
|
||||
if err != nil {
|
||||
log.Get("scaler").Error("scaler > Failed to search service: ", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, service := range services {
|
||||
label := service.Spec.Labels[labelScale]
|
||||
opts := data.ParseOptions(label, ",", "=")
|
||||
tryScale(&service, opts)
|
||||
}
|
||||
}, nil)
|
||||
}
|
||||
|
||||
// nolint: gocyclo
|
||||
func tryScale(service *swarm.Service, opts data.Options) {
|
||||
// ignore services with global mode
|
||||
if service.Spec.Mode.Replicated == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// ignore services that have been updated in 3 minutes
|
||||
if service.UpdatedAt.Add(3 * time.Minute).After(time.Now()) {
|
||||
return
|
||||
}
|
||||
|
||||
var (
|
||||
min = uint64(2)
|
||||
max = uint64(5)
|
||||
policy = policyAny
|
||||
args data.Options
|
||||
)
|
||||
for _, opt := range opts {
|
||||
switch opt.Name {
|
||||
case "min":
|
||||
min = cast.ToUint64(opt.Value, 1)
|
||||
case "max":
|
||||
max = cast.ToUint64(opt.Value, 2)
|
||||
case "policy":
|
||||
policy = policyType(opt.Value)
|
||||
default:
|
||||
args = append(args, opt)
|
||||
}
|
||||
}
|
||||
|
||||
result := check(service, policy, args)
|
||||
if result.Type == scaleNone {
|
||||
return
|
||||
}
|
||||
|
||||
replicas := *service.Spec.Mode.Replicated.Replicas
|
||||
if result.Type == scaleUp {
|
||||
if replicas < max {
|
||||
docker.ServiceScale(service.Spec.Name, service.Version.Index, replicas+1)
|
||||
log.Get("scaler").Infof("scaler > Service '%s' scaled up for: %v", service.Spec.Name, result.Reasons)
|
||||
}
|
||||
} else if result.Type == scaleDown {
|
||||
if replicas > min {
|
||||
docker.ServiceScale(service.Spec.Name, service.Version.Index, replicas-1)
|
||||
log.Get("scaler").Infof("scaler > Service '%s' scaled down for: %v", service.Spec.Name, result.Reasons)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func check(service *swarm.Service, policy policyType, args data.Options) checkResult {
|
||||
result := checkResult{
|
||||
Reasons: make(map[string]float64),
|
||||
}
|
||||
if policy == policyAny {
|
||||
for _, arg := range args {
|
||||
st, value := checkArg(service.Spec.Name, arg)
|
||||
if st == scaleNone {
|
||||
continue
|
||||
}
|
||||
result.Type = st
|
||||
result.Reasons[arg.Name] = value
|
||||
break
|
||||
}
|
||||
} else if policy == policyAll {
|
||||
types := set.Set{}
|
||||
for _, arg := range args {
|
||||
st, value := checkArg(service.Spec.Name, arg)
|
||||
types.Add(st)
|
||||
if types.Len() > 1 {
|
||||
result.Type = scaleNone
|
||||
return result
|
||||
}
|
||||
result.Type = st
|
||||
result.Reasons[arg.Name] = value
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func checkArg(service string, arg data.Option) (scaleType, float64) {
|
||||
items := strings.Split(arg.Value, ":")
|
||||
if len(items) != 2 {
|
||||
log.Get("scaler").Warnf("scaler > Invalid scale argument: %s=%s", arg.Name, arg.Value)
|
||||
return scaleNone, 0
|
||||
}
|
||||
|
||||
c := checkers[arg.Name]
|
||||
if c == nil {
|
||||
log.Get("scaler").Warnf("scaler > Metric checker '%s' not found", arg.Name)
|
||||
return scaleNone, 0
|
||||
}
|
||||
|
||||
low := cast.ToFloat64(items[0])
|
||||
high := cast.ToFloat64(items[1])
|
||||
return c(service, low, high)
|
||||
}
|
||||
|
||||
func cpuChecker(service string, low, high float64) (scaleType, float64) {
|
||||
query := fmt.Sprintf(`avg(rate(container_cpu_user_seconds_total{container_label_com_docker_swarm_service_name="%s"}[5m]) * 100)`, service)
|
||||
values, err := biz.Metric.GetVector(query, time.Now())
|
||||
if err != nil {
|
||||
log.Get("scaler").Error("scaler > Failed to query metrics: ", err)
|
||||
return scaleNone, 0
|
||||
}
|
||||
if len(values) == 0 {
|
||||
return scaleNone, 0
|
||||
}
|
||||
|
||||
value := values[0]
|
||||
if value <= low {
|
||||
return scaleDown, value
|
||||
} else if value >= high {
|
||||
return scaleUp, value
|
||||
}
|
||||
return scaleNone, 0
|
||||
}
|
||||
|
||||
type checkResult struct {
|
||||
Type scaleType
|
||||
Reasons map[string]float64
|
||||
}
|
Loading…
Reference in New Issue
Block a user