swirl/docker/docker.go
2022-01-06 16:54:14 +08:00

202 lines
4.1 KiB
Go

package docker
import (
"context"
"strings"
"sync"
"time"
"github.com/cuigh/auxo/app/container"
"github.com/cuigh/auxo/cache"
"github.com/cuigh/auxo/errors"
"github.com/cuigh/auxo/log"
"github.com/cuigh/auxo/util/lazy"
"github.com/cuigh/swirl/misc"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/client"
)
func newVersion(v uint64) swarm.Version {
return swarm.Version{Index: v}
}
type Docker struct {
c *client.Client
locker sync.Mutex
logger log.Logger
nodes cache.Value
agents sync.Map
networks sync.Map
}
func NewDocker() *Docker {
d := &Docker{
logger: log.Get("docker"),
nodes: cache.Value{TTL: 30 * time.Minute},
}
d.nodes.Load = d.loadCache
return d
}
func IsErrNotFound(err error) bool {
return client.IsErrNotFound(err)
}
func (d *Docker) call(fn func(c *client.Client) error) error {
c, err := d.client()
if err == nil {
err = fn(c)
}
return err
}
func (d *Docker) client() (c *client.Client, err error) {
if d.c == nil {
d.locker.Lock()
defer d.locker.Unlock()
if d.c == nil {
var opt client.Opt
if misc.Options.DockerEndpoint == "" {
opt = client.FromEnv
} else {
opt = client.WithHost(misc.Options.DockerEndpoint)
}
d.c, err = client.NewClientWithOpts(opt, client.WithVersion(misc.Options.DockerAPIVersion))
if err != nil {
return
}
}
}
return d.c, nil
}
func (d *Docker) agent(node string) (*client.Client, error) {
host, err := d.getAgent(node)
if err != nil {
d.logger.Error("failed to find node agent: ", err)
}
if host == "" {
return d.client()
}
value, _ := d.agents.LoadOrStore(node, &lazy.Value{
New: func() (interface{}, error) {
c, e := client.NewClientWithOpts(
client.WithHost("tcp://"+host),
client.WithVersion(misc.Options.DockerAPIVersion),
)
return c, e
},
})
c, err := value.(*lazy.Value).Get()
if err != nil {
return nil, err
}
return c.(*client.Client), nil
}
func (d *Docker) getAgent(node string) (agent string, err error) {
if node == "" || node == "-" {
return "", nil
}
nodes, err := d.NodeMap()
if err != nil {
return
}
if n, ok := nodes[node]; ok {
agent = n.Agent
}
return
}
func (d *Docker) loadCache() (interface{}, error) {
c, err := d.client()
if err != nil {
return nil, err
}
ctx, cancel := misc.Context(time.Minute)
defer cancel()
agents, err := d.loadAgents(ctx, c)
if err != nil {
return nil, errors.Wrap(err, "failed to load agents")
}
nodes, err := d.loadNodes(ctx, c)
if err != nil {
return nil, err
}
for i := range nodes {
nodes[i].Agent = agents[nodes[i].ID]
}
return nodes, nil
}
func (d *Docker) loadNodes(ctx context.Context, c *client.Client) (nodes map[string]*Node, err error) {
var list []swarm.Node
list, err = c.NodeList(ctx, types.NodeListOptions{})
if err == nil {
nodes = make(map[string]*Node)
for _, n := range list {
ni := &Node{
ID: n.ID,
Name: n.Spec.Name,
State: n.Status.State,
}
if ni.Name == "" {
ni.Name = n.Description.Hostname
}
nodes[n.ID] = ni
}
}
return
}
func (d *Docker) loadAgents(ctx context.Context, c *client.Client) (agents map[string]string, err error) {
var tasks []swarm.Task
agents = make(map[string]string)
for _, agent := range misc.Options.Agents {
pair := strings.SplitN(agent, ":", 2)
args := filters.NewArgs(
filters.Arg("desired-state", string(swarm.TaskStateRunning)),
filters.Arg("service", pair[0]),
)
tasks, err = c.TaskList(ctx, types.TaskListOptions{Filters: args})
if err != nil {
return
}
port := "2375"
if len(pair) > 1 {
port = pair[1]
}
for _, t := range tasks {
if len(t.NetworksAttachments) > 0 {
pair = strings.SplitN(t.NetworksAttachments[0].Addresses[0], "/", 2)
agents[t.NodeID] = pair[0] + ":" + port
}
}
}
return
}
type Node struct {
ID string `json:"id,omitempty"`
Name string `json:"name,omitempty"`
State swarm.NodeState `json:"-"`
Agent string `json:"-"`
}
func init() {
container.Put(NewDocker)
}