Files
calypso/backend/internal/storage/zfs_pool_monitor.go
2025-12-26 17:47:20 +00:00

251 lines
6.6 KiB
Go

package storage
import (
"context"
"os/exec"
"regexp"
"strconv"
"strings"
"time"
"github.com/atlasos/calypso/internal/common/database"
"github.com/atlasos/calypso/internal/common/logger"
)
// ZFSPoolMonitor handles periodic ZFS pool status monitoring and sync to database
type ZFSPoolMonitor struct {
zfsService *ZFSService
logger *logger.Logger
interval time.Duration
stopCh chan struct{}
}
// NewZFSPoolMonitor creates a new ZFS pool monitor service
func NewZFSPoolMonitor(db *database.DB, log *logger.Logger, interval time.Duration) *ZFSPoolMonitor {
return &ZFSPoolMonitor{
zfsService: NewZFSService(db, log),
logger: log,
interval: interval,
stopCh: make(chan struct{}),
}
}
// Start starts the ZFS pool monitor background service
func (m *ZFSPoolMonitor) Start(ctx context.Context) {
m.logger.Info("Starting ZFS pool monitor service", "interval", m.interval)
ticker := time.NewTicker(m.interval)
defer ticker.Stop()
// Run initial sync immediately
m.syncPools(ctx)
for {
select {
case <-ctx.Done():
m.logger.Info("ZFS pool monitor service stopped")
return
case <-m.stopCh:
m.logger.Info("ZFS pool monitor service stopped")
return
case <-ticker.C:
m.syncPools(ctx)
}
}
}
// Stop stops the ZFS pool monitor service
func (m *ZFSPoolMonitor) Stop() {
close(m.stopCh)
}
// syncPools syncs ZFS pool status from system to database
func (m *ZFSPoolMonitor) syncPools(ctx context.Context) {
m.logger.Debug("Running periodic ZFS pool sync")
// Get all pools from system
systemPools, err := m.getSystemPools(ctx)
if err != nil {
m.logger.Error("Failed to get system pools", "error", err)
return
}
m.logger.Debug("Found pools in system", "count", len(systemPools))
// Update each pool in database
for poolName, poolInfo := range systemPools {
if err := m.updatePoolStatus(ctx, poolName, poolInfo); err != nil {
m.logger.Error("Failed to update pool status", "pool", poolName, "error", err)
}
}
// Mark pools that don't exist in system as offline
if err := m.markMissingPoolsOffline(ctx, systemPools); err != nil {
m.logger.Error("Failed to mark missing pools offline", "error", err)
}
m.logger.Debug("ZFS pool sync completed")
}
// PoolInfo represents pool information from system
type PoolInfo struct {
Name string
SizeBytes int64
UsedBytes int64
Health string // online, degraded, faulted, offline, unavailable, removed
}
// getSystemPools gets all pools from ZFS system
func (m *ZFSPoolMonitor) getSystemPools(ctx context.Context) (map[string]PoolInfo, error) {
pools := make(map[string]PoolInfo)
// Get pool list
cmd := exec.CommandContext(ctx, "zpool", "list", "-H", "-o", "name,size,alloc,free,health")
output, err := cmd.Output()
if err != nil {
return nil, err
}
lines := strings.Split(strings.TrimSpace(string(output)), "\n")
for _, line := range lines {
if line == "" {
continue
}
fields := strings.Fields(line)
if len(fields) < 5 {
continue
}
poolName := fields[0]
sizeStr := fields[1]
allocStr := fields[2]
health := fields[4]
// Parse size (e.g., "95.5G" -> bytes)
sizeBytes, err := parseSize(sizeStr)
if err != nil {
m.logger.Warn("Failed to parse pool size", "pool", poolName, "size", sizeStr, "error", err)
continue
}
// Parse allocated (used) size
usedBytes, err := parseSize(allocStr)
if err != nil {
m.logger.Warn("Failed to parse pool used size", "pool", poolName, "alloc", allocStr, "error", err)
continue
}
// Normalize health status to lowercase
healthNormalized := strings.ToLower(health)
pools[poolName] = PoolInfo{
Name: poolName,
SizeBytes: sizeBytes,
UsedBytes: usedBytes,
Health: healthNormalized,
}
}
return pools, nil
}
// parseSize parses size string (e.g., "95.5G", "1.2T") to bytes
func parseSize(sizeStr string) (int64, error) {
// Remove any whitespace
sizeStr = strings.TrimSpace(sizeStr)
// Match pattern like "95.5G", "1.2T", "512M"
re := regexp.MustCompile(`^([\d.]+)([KMGT]?)$`)
matches := re.FindStringSubmatch(strings.ToUpper(sizeStr))
if len(matches) != 3 {
return 0, nil // Return 0 if can't parse
}
value, err := strconv.ParseFloat(matches[1], 64)
if err != nil {
return 0, err
}
unit := matches[2]
var multiplier int64 = 1
switch unit {
case "K":
multiplier = 1024
case "M":
multiplier = 1024 * 1024
case "G":
multiplier = 1024 * 1024 * 1024
case "T":
multiplier = 1024 * 1024 * 1024 * 1024
case "P":
multiplier = 1024 * 1024 * 1024 * 1024 * 1024
}
return int64(value * float64(multiplier)), nil
}
// updatePoolStatus updates pool status in database
func (m *ZFSPoolMonitor) updatePoolStatus(ctx context.Context, poolName string, poolInfo PoolInfo) error {
// Get pool from database by name
var poolID string
err := m.zfsService.db.QueryRowContext(ctx,
"SELECT id FROM zfs_pools WHERE name = $1",
poolName,
).Scan(&poolID)
if err != nil {
// Pool not in database, skip (might be created outside of Calypso)
m.logger.Debug("Pool not found in database, skipping", "pool", poolName)
return nil
}
// Update pool status, size, and used bytes
_, err = m.zfsService.db.ExecContext(ctx, `
UPDATE zfs_pools SET
size_bytes = $1,
used_bytes = $2,
health_status = $3,
updated_at = NOW()
WHERE id = $4
`, poolInfo.SizeBytes, poolInfo.UsedBytes, poolInfo.Health, poolID)
if err != nil {
return err
}
m.logger.Debug("Updated pool status", "pool", poolName, "health", poolInfo.Health, "size", poolInfo.SizeBytes, "used", poolInfo.UsedBytes)
return nil
}
// markMissingPoolsOffline marks pools that exist in database but not in system as offline or deletes them
func (m *ZFSPoolMonitor) markMissingPoolsOffline(ctx context.Context, systemPools map[string]PoolInfo) error {
// Get all pools from database
rows, err := m.zfsService.db.QueryContext(ctx, "SELECT id, name FROM zfs_pools WHERE is_active = true")
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var poolID, poolName string
if err := rows.Scan(&poolID, &poolName); err != nil {
continue
}
// Check if pool exists in system
if _, exists := systemPools[poolName]; !exists {
// Pool doesn't exist in system - delete from database (pool was destroyed)
m.logger.Info("Pool not found in system, removing from database", "pool", poolName)
_, err = m.zfsService.db.ExecContext(ctx, "DELETE FROM zfs_pools WHERE id = $1", poolID)
if err != nil {
m.logger.Warn("Failed to delete missing pool from database", "pool", poolName, "error", err)
} else {
m.logger.Info("Removed missing pool from database", "pool", poolName)
}
}
}
return rows.Err()
}