swirl/scaler/scaler.go
2018-05-26 14:57:32 +08:00

197 lines
4.7 KiB
Go

package scaler
import (
"fmt"
"strings"
"time"
"github.com/cuigh/auxo/data"
"github.com/cuigh/auxo/data/set"
"github.com/cuigh/auxo/log"
"github.com/cuigh/auxo/util/cast"
"github.com/cuigh/auxo/util/run"
"github.com/cuigh/swirl/biz"
"github.com/cuigh/swirl/biz/docker"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/swarm"
)
type checker func(service string, low, high float64) (scaleType, float64)
var checkers = map[string]checker{
"cpu": cpuChecker,
}
type scaleType int
const (
scaleNone scaleType = iota
scaleUp
scaleDown
)
type policyType string
const (
policyAny policyType = "any"
policyAll policyType = "all"
)
// Start starts a timer to scale services automatically.
func Start() {
const labelScale = "swirl.scale"
run.Schedule(time.Minute, func() {
args := filters.NewArgs()
args.Add("mode", "replicated")
args.Add("label", labelScale)
services, err := docker.ServiceSearch(args)
if err != nil {
log.Get("scaler").Error("scaler > Failed to search service: ", err)
return
}
for _, service := range services {
label := service.Spec.Labels[labelScale]
opts := data.ParseOptions(label, ",", "=")
tryScale(&service, opts)
}
}, nil)
}
// nolint: gocyclo
func tryScale(service *swarm.Service, opts data.Options) {
// ignore services with global mode
if service.Spec.Mode.Replicated == nil {
return
}
var (
min uint64 = 2
max uint64 = 8
step uint64 = 2
window = 3 * time.Minute
policy = policyAny
args data.Options
)
for _, opt := range opts {
switch opt.Name {
case "min":
min = cast.ToUint64(opt.Value, min)
case "max":
max = cast.ToUint64(opt.Value, max)
case "step":
step = cast.ToUint64(opt.Value, step)
case "window":
window = cast.ToDuration(opt.Value, window)
case "policy":
policy = policyType(opt.Value)
default:
args = append(args, opt)
}
}
// ignore services that have been updated in window period.
if service.UpdatedAt.Add(window).After(time.Now()) {
return
}
result := check(service, policy, args)
if result.Type == scaleNone {
return
}
logger := log.Get("scaler")
replicas := *service.Spec.Mode.Replicated.Replicas
if result.Type == scaleUp {
if replicas < max {
if err := docker.ServiceScale(service.Spec.Name, service.Version.Index, replicas+step); err != nil {
logger.Errorf("scaler > Failed to scale service '%s': %v", service.Spec.Name, err)
} else {
logger.Infof("scaler > Service '%s' scaled up for: %v", service.Spec.Name, result.Reasons)
}
}
} else if result.Type == scaleDown {
if replicas > min {
if err := docker.ServiceScale(service.Spec.Name, service.Version.Index, replicas-step); err != nil {
logger.Errorf("scaler > Failed to scale service '%s': %v", service.Spec.Name, err)
} else {
logger.Infof("scaler > Service '%s' scaled down for: %v", service.Spec.Name, result.Reasons)
}
}
}
}
func check(service *swarm.Service, policy policyType, args data.Options) checkResult {
result := checkResult{
Reasons: make(map[string]float64),
}
if policy == policyAny {
for _, arg := range args {
st, value := checkArg(service.Spec.Name, arg)
if st == scaleNone {
continue
}
result.Type = st
result.Reasons[arg.Name] = value
break
}
} else if policy == policyAll {
types := set.Set{}
for _, arg := range args {
st, value := checkArg(service.Spec.Name, arg)
types.Add(st)
if types.Len() > 1 {
result.Type = scaleNone
return result
}
result.Type = st
result.Reasons[arg.Name] = value
}
}
return result
}
func checkArg(service string, arg data.Option) (scaleType, float64) {
items := strings.Split(arg.Value, ":")
if len(items) != 2 {
log.Get("scaler").Warnf("scaler > Invalid scale argument: %s=%s", arg.Name, arg.Value)
return scaleNone, 0
}
c := checkers[arg.Name]
if c == nil {
log.Get("scaler").Warnf("scaler > Metric checker '%s' not found", arg.Name)
return scaleNone, 0
}
low := cast.ToFloat64(items[0])
high := cast.ToFloat64(items[1])
return c(service, low, high)
}
func cpuChecker(service string, low, high float64) (scaleType, float64) {
query := fmt.Sprintf(`avg(rate(container_cpu_user_seconds_total{container_label_com_docker_swarm_service_name="%s"}[1m]) * 100)`, service)
vector, err := biz.Metric.GetVector(query, "", time.Now())
if err != nil {
log.Get("scaler").Error("scaler > Failed to query metrics: ", err)
return scaleNone, 0
}
if len(vector.Data) == 0 {
return scaleNone, 0
}
cv := vector.Data[0]
if cv.Value <= low {
return scaleDown, cv.Value
} else if cv.Value >= high {
return scaleUp, cv.Value
}
return scaleNone, 0
}
type checkResult struct {
Type scaleType
Reasons map[string]float64
}