Reapply "Merge branch 'canary' into kucherenko/canary"

This reverts commit e6cb6454db.
This commit is contained in:
Mauricio Siu
2025-03-02 00:30:02 -06:00
parent e6cb6454db
commit 747c2137c9
639 changed files with 82888 additions and 17188 deletions

View File

@@ -0,0 +1,52 @@
package database
import (
"database/sql"
"log"
"time"
"github.com/robfig/cron/v3"
)
// CleanupMetrics deletes metrics older than the retention period
func CleanupMetrics(db *sql.DB, retentionDays int) error {
cutoffDate := time.Now().AddDate(0, 0, -retentionDays)
cutoffDateStr := cutoffDate.UTC().Format(time.RFC3339Nano)
containerQuery := `DELETE FROM container_metrics WHERE timestamp < ?`
_, err := db.Exec(containerQuery, cutoffDateStr)
if err != nil {
return err
}
serverQuery := `DELETE FROM server_metrics WHERE timestamp < ?`
_, err = db.Exec(serverQuery, cutoffDateStr)
if err != nil {
return err
}
log.Printf("Metrics deleted (older than %d days)", retentionDays)
log.Printf("Cutoff date for both tables: %s", cutoffDateStr)
return nil
}
// StartMetricsCleanup starts a cron job to periodically clean up metrics
func StartMetricsCleanup(db *sql.DB, retentionDays int, cronExpression string) (*cron.Cron, error) {
c := cron.New()
_, err := c.AddFunc(cronExpression, func() {
if err := CleanupMetrics(db, retentionDays); err != nil {
log.Printf("Error during metrics cleanup: %v", err)
}
})
if err != nil {
return nil, err
}
c.Start()
log.Printf("Started metrics cleanup job (retention: %d days, cron: %s)",
retentionDays, cronExpression)
return c, nil
}

View File

@@ -0,0 +1,160 @@
package database
import (
"encoding/json"
"fmt"
"strings"
)
func (db *DB) InitContainerMetricsTable() error {
_, err := db.Exec(`
CREATE TABLE IF NOT EXISTS container_metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
container_id TEXT NOT NULL,
container_name TEXT NOT NULL,
metrics_json TEXT NOT NULL
)
`)
if err != nil {
return fmt.Errorf("error creating container_metrics table: %v", err)
}
// Crear índices para mejorar el rendimiento
_, err = db.Exec(`CREATE INDEX IF NOT EXISTS idx_container_metrics_timestamp ON container_metrics(timestamp)`)
if err != nil {
return fmt.Errorf("error creating timestamp index: %v", err)
}
_, err = db.Exec(`CREATE INDEX IF NOT EXISTS idx_container_metrics_name ON container_metrics(container_name)`)
if err != nil {
return fmt.Errorf("error creating name index: %v", err)
}
return nil
}
func (db *DB) SaveContainerMetric(metric *ContainerMetric) error {
metricsJSON, err := json.Marshal(metric)
if err != nil {
return fmt.Errorf("error marshaling metrics: %v", err)
}
_, err = db.Exec(`
INSERT INTO container_metrics (timestamp, container_id, container_name, metrics_json)
VALUES (?, ?, ?, ?)
`, metric.Timestamp, metric.ID, metric.Name, string(metricsJSON))
return err
}
func (db *DB) GetLastNContainerMetrics(containerName string, limit int) ([]ContainerMetric, error) {
name := strings.TrimPrefix(containerName, "/")
parts := strings.Split(name, "-")
if len(parts) > 1 {
containerName = strings.Join(parts[:len(parts)-1], "-")
}
query := `
WITH recent_metrics AS (
SELECT metrics_json
FROM container_metrics
WHERE container_name LIKE ? || '%'
ORDER BY timestamp DESC
LIMIT ?
)
SELECT metrics_json FROM recent_metrics ORDER BY json_extract(metrics_json, '$.timestamp') ASC
`
rows, err := db.Query(query, containerName, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var metrics []ContainerMetric
for rows.Next() {
var metricsJSON string
err := rows.Scan(&metricsJSON)
if err != nil {
return nil, err
}
var metric ContainerMetric
if err := json.Unmarshal([]byte(metricsJSON), &metric); err != nil {
return nil, err
}
metrics = append(metrics, metric)
}
return metrics, nil
}
func (db *DB) GetAllMetricsContainer(containerName string) ([]ContainerMetric, error) {
name := strings.TrimPrefix(containerName, "/")
parts := strings.Split(name, "-")
if len(parts) > 1 {
containerName = strings.Join(parts[:len(parts)-1], "-")
}
query := `
WITH recent_metrics AS (
SELECT metrics_json
FROM container_metrics
WHERE container_name LIKE ? || '%'
ORDER BY timestamp DESC
)
SELECT metrics_json FROM recent_metrics ORDER BY json_extract(metrics_json, '$.timestamp') ASC
`
rows, err := db.Query(query, containerName)
if err != nil {
return nil, err
}
defer rows.Close()
var metrics []ContainerMetric
for rows.Next() {
var metricsJSON string
err := rows.Scan(&metricsJSON)
if err != nil {
return nil, err
}
var metric ContainerMetric
if err := json.Unmarshal([]byte(metricsJSON), &metric); err != nil {
return nil, err
}
metrics = append(metrics, metric)
}
return metrics, nil
}
type ContainerMetric struct {
Timestamp string `json:"timestamp"`
CPU float64 `json:"CPU"`
Memory MemoryMetric `json:"Memory"`
Network NetworkMetric `json:"Network"`
BlockIO BlockIOMetric `json:"BlockIO"`
Container string `json:"Container"`
ID string `json:"ID"`
Name string `json:"Name"`
}
type MemoryMetric struct {
Percentage float64 `json:"percentage"`
Used float64 `json:"used"`
Total float64 `json:"total"`
UsedUnit string `json:"usedUnit"`
TotalUnit string `json:"totalUnit"`
}
type NetworkMetric struct {
Input float64 `json:"input"`
Output float64 `json:"output"`
InputUnit string `json:"inputUnit"`
OutputUnit string `json:"outputUnit"`
}
type BlockIOMetric struct {
Read float64 `json:"read"`
Write float64 `json:"write"`
ReadUnit string `json:"readUnit"`
WriteUnit string `json:"writeUnit"`
}

View File

@@ -0,0 +1,47 @@
package database
import (
"database/sql"
_ "github.com/mattn/go-sqlite3"
)
type DB struct {
*sql.DB
}
func InitDB() (*DB, error) {
db, err := sql.Open("sqlite3", "./monitoring.db")
if err != nil {
return nil, err
}
// Create metrics table if it doesn't exist
_, err = db.Exec(`
CREATE TABLE IF NOT EXISTS server_metrics (
timestamp TEXT PRIMARY KEY,
cpu REAL,
cpu_model TEXT,
cpu_cores INTEGER,
cpu_physical_cores INTEGER,
cpu_speed REAL,
os TEXT,
distro TEXT,
kernel TEXT,
arch TEXT,
mem_used REAL,
mem_used_gb REAL,
mem_total REAL,
uptime INTEGER,
disk_used REAL,
total_disk REAL,
network_in REAL,
network_out REAL
)
`)
if err != nil {
return nil, err
}
return &DB{db}, nil
}

View File

@@ -0,0 +1,115 @@
package database
import (
"time"
_ "github.com/mattn/go-sqlite3"
)
type ServerMetric struct {
Timestamp string `json:"timestamp"`
CPU float64 `json:"cpu"`
CPUModel string `json:"cpuModel"`
CPUCores int32 `json:"cpuCores"`
CPUPhysicalCores int32 `json:"cpuPhysicalCores"`
CPUSpeed float64 `json:"cpuSpeed"`
OS string `json:"os"`
Distro string `json:"distro"`
Kernel string `json:"kernel"`
Arch string `json:"arch"`
MemUsed float64 `json:"memUsed"`
MemUsedGB float64 `json:"memUsedGB"`
MemTotal float64 `json:"memTotal"`
Uptime uint64 `json:"uptime"`
DiskUsed float64 `json:"diskUsed"`
TotalDisk float64 `json:"totalDisk"`
NetworkIn float64 `json:"networkIn"`
NetworkOut float64 `json:"networkOut"`
}
func (db *DB) SaveMetric(metric ServerMetric) error {
if metric.Timestamp == "" {
metric.Timestamp = time.Now().UTC().Format(time.RFC3339Nano)
}
_, err := db.Exec(`
INSERT INTO server_metrics (timestamp, cpu, cpu_model, cpu_cores, cpu_physical_cores, cpu_speed, os, distro, kernel, arch, mem_used, mem_used_gb, mem_total, uptime, disk_used, total_disk, network_in, network_out)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`, metric.Timestamp, metric.CPU, metric.CPUModel, metric.CPUCores, metric.CPUPhysicalCores, metric.CPUSpeed, metric.OS, metric.Distro, metric.Kernel, metric.Arch, metric.MemUsed, metric.MemUsedGB, metric.MemTotal, metric.Uptime, metric.DiskUsed, metric.TotalDisk, metric.NetworkIn, metric.NetworkOut)
return err
}
func (db *DB) GetMetricsInRange(start, end time.Time) ([]ServerMetric, error) {
rows, err := db.Query(`
SELECT timestamp, cpu, cpu_model, cpu_cores, cpu_physical_cores, cpu_speed, os, distro, kernel, arch, mem_used, mem_used_gb, mem_total, uptime, disk_used, total_disk, network_in, network_out
FROM server_metrics
WHERE timestamp BETWEEN ? AND ?
ORDER BY timestamp ASC
`, start.UTC().Format(time.RFC3339Nano), end.UTC().Format(time.RFC3339Nano))
if err != nil {
return nil, err
}
defer rows.Close()
var metrics []ServerMetric
for rows.Next() {
var m ServerMetric
err := rows.Scan(&m.Timestamp, &m.CPU, &m.CPUModel, &m.CPUCores, &m.CPUPhysicalCores, &m.CPUSpeed, &m.OS, &m.Distro, &m.Kernel, &m.Arch, &m.MemUsed, &m.MemUsedGB, &m.MemTotal, &m.Uptime, &m.DiskUsed, &m.TotalDisk, &m.NetworkIn, &m.NetworkOut)
if err != nil {
return nil, err
}
metrics = append(metrics, m)
}
return metrics, nil
}
func (db *DB) GetLastNMetrics(n int) ([]ServerMetric, error) {
rows, err := db.Query(`
WITH recent_metrics AS (
SELECT timestamp, cpu, cpu_model, cpu_cores, cpu_physical_cores, cpu_speed, os, distro, kernel, arch, mem_used, mem_used_gb, mem_total, uptime, disk_used, total_disk, network_in, network_out
FROM server_metrics
ORDER BY timestamp DESC
LIMIT ?
)
SELECT * FROM recent_metrics
ORDER BY timestamp ASC
`, n)
if err != nil {
return nil, err
}
defer rows.Close()
var metrics []ServerMetric
for rows.Next() {
var m ServerMetric
err := rows.Scan(&m.Timestamp, &m.CPU, &m.CPUModel, &m.CPUCores, &m.CPUPhysicalCores, &m.CPUSpeed, &m.OS, &m.Distro, &m.Kernel, &m.Arch, &m.MemUsed, &m.MemUsedGB, &m.MemTotal, &m.Uptime, &m.DiskUsed, &m.TotalDisk, &m.NetworkIn, &m.NetworkOut)
if err != nil {
return nil, err
}
metrics = append(metrics, m)
}
return metrics, nil
}
func (db *DB) GetAllMetrics() ([]ServerMetric, error) {
rows, err := db.Query(`
SELECT timestamp, cpu, cpu_model, cpu_cores, cpu_physical_cores, cpu_speed, os, distro, kernel, arch, mem_used, mem_used_gb, mem_total, uptime, disk_used, total_disk, network_in, network_out
FROM server_metrics
ORDER BY timestamp ASC
`)
if err != nil {
return nil, err
}
defer rows.Close()
var metrics []ServerMetric
for rows.Next() {
var m ServerMetric
err := rows.Scan(&m.Timestamp, &m.CPU, &m.CPUModel, &m.CPUCores, &m.CPUPhysicalCores, &m.CPUSpeed, &m.OS, &m.Distro, &m.Kernel, &m.Arch, &m.MemUsed, &m.MemUsedGB, &m.MemTotal, &m.Uptime, &m.DiskUsed, &m.TotalDisk, &m.NetworkIn, &m.NetworkOut)
if err != nil {
return nil, err
}
metrics = append(metrics, m)
}
return metrics, nil
}