replace tape library body layout
This commit is contained in:
@@ -75,7 +75,21 @@ func (s *DiskService) DiscoverDisks(ctx context.Context) ([]PhysicalDisk, error)
|
||||
}
|
||||
|
||||
devicePath := "/dev/" + device.Name
|
||||
|
||||
|
||||
// Skip ZFS volume block devices (zd* devices are ZFS volumes exported as block devices)
|
||||
// These are not physical disks and should not appear in physical disk list
|
||||
if strings.HasPrefix(device.Name, "zd") {
|
||||
s.logger.Debug("Skipping ZFS volume block device", "device", devicePath)
|
||||
continue
|
||||
}
|
||||
|
||||
// Skip devices under /dev/zvol (ZFS volume devices in zvol directory)
|
||||
// These are virtual block devices created from ZFS volumes, not physical hardware
|
||||
if strings.HasPrefix(devicePath, "/dev/zvol/") {
|
||||
s.logger.Debug("Skipping ZFS volume device", "device", devicePath)
|
||||
continue
|
||||
}
|
||||
|
||||
// Skip OS disk (disk that has root or boot partition)
|
||||
if s.isOSDisk(ctx, devicePath) {
|
||||
s.logger.Debug("Skipping OS disk", "device", devicePath)
|
||||
@@ -113,8 +127,8 @@ func (s *DiskService) DiscoverDisks(ctx context.Context) ([]PhysicalDisk, error)
|
||||
// getDiskInfo retrieves detailed information about a disk
|
||||
func (s *DiskService) getDiskInfo(ctx context.Context, devicePath string) (*PhysicalDisk, error) {
|
||||
disk := &PhysicalDisk{
|
||||
DevicePath: devicePath,
|
||||
HealthStatus: "unknown",
|
||||
DevicePath: devicePath,
|
||||
HealthStatus: "unknown",
|
||||
HealthDetails: make(map[string]interface{}),
|
||||
}
|
||||
|
||||
@@ -129,7 +143,7 @@ func (s *DiskService) getDiskInfo(ctx context.Context, devicePath string) (*Phys
|
||||
disk.Vendor = props["ID_VENDOR"]
|
||||
disk.Model = props["ID_MODEL"]
|
||||
disk.SerialNumber = props["ID_SERIAL_SHORT"]
|
||||
|
||||
|
||||
if props["ID_ATA_ROTATION_RATE"] == "0" {
|
||||
disk.IsSSD = true
|
||||
}
|
||||
@@ -258,11 +272,15 @@ func (s *DiskService) isOSDisk(ctx context.Context, devicePath string) bool {
|
||||
|
||||
// SyncDisksToDatabase syncs discovered disks to the database
|
||||
func (s *DiskService) SyncDisksToDatabase(ctx context.Context) error {
|
||||
s.logger.Info("Starting disk discovery and sync")
|
||||
disks, err := s.DiscoverDisks(ctx)
|
||||
if err != nil {
|
||||
s.logger.Error("Failed to discover disks", "error", err)
|
||||
return fmt.Errorf("failed to discover disks: %w", err)
|
||||
}
|
||||
|
||||
s.logger.Info("Discovered disks", "count", len(disks))
|
||||
|
||||
for _, disk := range disks {
|
||||
// Check if disk exists
|
||||
var existingID string
|
||||
@@ -300,10 +318,80 @@ func (s *DiskService) SyncDisksToDatabase(ctx context.Context) error {
|
||||
disk.HealthStatus, healthDetailsJSON, disk.IsUsed, existingID)
|
||||
if err != nil {
|
||||
s.logger.Error("Failed to update disk", "device", disk.DevicePath, "error", err)
|
||||
} else {
|
||||
s.logger.Debug("Updated disk", "device", disk.DevicePath)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
s.logger.Info("Disk sync completed", "total_disks", len(disks))
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListDisksFromDatabase retrieves all physical disks from the database
|
||||
func (s *DiskService) ListDisksFromDatabase(ctx context.Context) ([]PhysicalDisk, error) {
|
||||
query := `
|
||||
SELECT
|
||||
id, device_path, vendor, model, serial_number, size_bytes,
|
||||
sector_size, is_ssd, health_status, health_details, is_used,
|
||||
created_at, updated_at
|
||||
FROM physical_disks
|
||||
ORDER BY device_path
|
||||
`
|
||||
|
||||
rows, err := s.db.QueryContext(ctx, query)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to query disks: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var disks []PhysicalDisk
|
||||
for rows.Next() {
|
||||
var disk PhysicalDisk
|
||||
var healthDetailsJSON []byte
|
||||
var attachedToPool sql.NullString
|
||||
|
||||
err := rows.Scan(
|
||||
&disk.ID, &disk.DevicePath, &disk.Vendor, &disk.Model,
|
||||
&disk.SerialNumber, &disk.SizeBytes, &disk.SectorSize,
|
||||
&disk.IsSSD, &disk.HealthStatus, &healthDetailsJSON,
|
||||
&disk.IsUsed, &disk.CreatedAt, &disk.UpdatedAt,
|
||||
)
|
||||
if err != nil {
|
||||
s.logger.Warn("Failed to scan disk row", "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Parse health details JSON
|
||||
if len(healthDetailsJSON) > 0 {
|
||||
if err := json.Unmarshal(healthDetailsJSON, &disk.HealthDetails); err != nil {
|
||||
s.logger.Warn("Failed to parse health details", "error", err)
|
||||
disk.HealthDetails = make(map[string]interface{})
|
||||
}
|
||||
} else {
|
||||
disk.HealthDetails = make(map[string]interface{})
|
||||
}
|
||||
|
||||
// Get ZFS pool attachment if disk is used
|
||||
if disk.IsUsed {
|
||||
err := s.db.QueryRowContext(ctx,
|
||||
`SELECT zp.name FROM zfs_pools zp
|
||||
INNER JOIN zfs_pool_disks zpd ON zp.id = zpd.pool_id
|
||||
WHERE zpd.disk_id = $1
|
||||
LIMIT 1`,
|
||||
disk.ID,
|
||||
).Scan(&attachedToPool)
|
||||
if err == nil && attachedToPool.Valid {
|
||||
disk.AttachedToPool = attachedToPool.String
|
||||
}
|
||||
}
|
||||
|
||||
disks = append(disks, disk)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, fmt.Errorf("error iterating disk rows: %w", err)
|
||||
}
|
||||
|
||||
return disks, nil
|
||||
}
|
||||
|
||||
65
backend/internal/storage/disk_monitor.go
Normal file
65
backend/internal/storage/disk_monitor.go
Normal file
@@ -0,0 +1,65 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/atlasos/calypso/internal/common/database"
|
||||
"github.com/atlasos/calypso/internal/common/logger"
|
||||
)
|
||||
|
||||
// DiskMonitor handles periodic disk discovery and sync to database
|
||||
type DiskMonitor struct {
|
||||
diskService *DiskService
|
||||
logger *logger.Logger
|
||||
interval time.Duration
|
||||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
// NewDiskMonitor creates a new disk monitor service
|
||||
func NewDiskMonitor(db *database.DB, log *logger.Logger, interval time.Duration) *DiskMonitor {
|
||||
return &DiskMonitor{
|
||||
diskService: NewDiskService(db, log),
|
||||
logger: log,
|
||||
interval: interval,
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Start starts the disk monitor background service
|
||||
func (m *DiskMonitor) Start(ctx context.Context) {
|
||||
m.logger.Info("Starting disk monitor service", "interval", m.interval)
|
||||
ticker := time.NewTicker(m.interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
// Run initial sync immediately
|
||||
m.syncDisks(ctx)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
m.logger.Info("Disk monitor service stopped")
|
||||
return
|
||||
case <-m.stopCh:
|
||||
m.logger.Info("Disk monitor service stopped")
|
||||
return
|
||||
case <-ticker.C:
|
||||
m.syncDisks(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stop stops the disk monitor service
|
||||
func (m *DiskMonitor) Stop() {
|
||||
close(m.stopCh)
|
||||
}
|
||||
|
||||
// syncDisks performs disk discovery and sync to database
|
||||
func (m *DiskMonitor) syncDisks(ctx context.Context) {
|
||||
m.logger.Debug("Running periodic disk sync")
|
||||
if err := m.diskService.SyncDisksToDatabase(ctx); err != nil {
|
||||
m.logger.Error("Periodic disk sync failed", "error", err)
|
||||
} else {
|
||||
m.logger.Debug("Periodic disk sync completed")
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
@@ -42,9 +43,9 @@ func NewHandler(db *database.DB, log *logger.Logger) *Handler {
|
||||
}
|
||||
}
|
||||
|
||||
// ListDisks lists all physical disks
|
||||
// ListDisks lists all physical disks from database
|
||||
func (h *Handler) ListDisks(c *gin.Context) {
|
||||
disks, err := h.diskService.DiscoverDisks(c.Request.Context())
|
||||
disks, err := h.diskService.ListDisksFromDatabase(c.Request.Context())
|
||||
if err != nil {
|
||||
h.logger.Error("Failed to list disks", "error", err)
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list disks"})
|
||||
@@ -70,15 +71,19 @@ func (h *Handler) SyncDisks(c *gin.Context) {
|
||||
|
||||
// Run sync in background
|
||||
go func() {
|
||||
ctx := c.Request.Context()
|
||||
// Create new context for background task (don't use request context which may expire)
|
||||
ctx := context.Background()
|
||||
h.taskEngine.StartTask(ctx, taskID)
|
||||
h.taskEngine.UpdateProgress(ctx, taskID, 50, "Discovering disks...")
|
||||
h.logger.Info("Starting disk sync", "task_id", taskID)
|
||||
|
||||
if err := h.diskService.SyncDisksToDatabase(ctx); err != nil {
|
||||
h.logger.Error("Disk sync failed", "task_id", taskID, "error", err)
|
||||
h.taskEngine.FailTask(ctx, taskID, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
h.logger.Info("Disk sync completed", "task_id", taskID)
|
||||
h.taskEngine.UpdateProgress(ctx, taskID, 100, "Disk sync completed")
|
||||
h.taskEngine.CompleteTask(ctx, taskID, "Disks synchronized successfully")
|
||||
}()
|
||||
|
||||
@@ -391,7 +391,8 @@ func (s *ZFSService) ListPools(ctx context.Context) ([]*ZFSPool, error) {
|
||||
&pool.CreatedAt, &pool.UpdatedAt, &pool.CreatedBy,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to scan pool: %w", err)
|
||||
s.logger.Error("Failed to scan pool row", "error", err)
|
||||
continue // Skip this pool instead of failing entire query
|
||||
}
|
||||
if description.Valid {
|
||||
pool.Description = description.String
|
||||
@@ -407,8 +408,14 @@ func (s *ZFSService) ListPools(ctx context.Context) ([]*ZFSPool, error) {
|
||||
}
|
||||
|
||||
pools = append(pools, &pool)
|
||||
s.logger.Debug("Added pool to list", "pool_id", pool.ID, "name", pool.Name)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, fmt.Errorf("error iterating pool rows: %w", err)
|
||||
}
|
||||
|
||||
s.logger.Debug("Listed ZFS pools", "count", len(pools))
|
||||
return pools, nil
|
||||
}
|
||||
|
||||
@@ -460,11 +467,22 @@ func (s *ZFSService) DeletePool(ctx context.Context, poolID string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Destroy ZFS pool
|
||||
cmd := exec.CommandContext(ctx, "zpool", "destroy", pool.Name)
|
||||
// Destroy ZFS pool with -f flag to force destroy (works for both empty and non-empty pools)
|
||||
// The -f flag is needed to destroy pools even if they have datasets or are in use
|
||||
s.logger.Info("Destroying ZFS pool", "pool", pool.Name)
|
||||
cmd := exec.CommandContext(ctx, "zpool", "destroy", "-f", pool.Name)
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to destroy ZFS pool: %s: %w", string(output), err)
|
||||
errorMsg := string(output)
|
||||
// Check if pool doesn't exist (might have been destroyed already)
|
||||
if strings.Contains(errorMsg, "no such pool") || strings.Contains(errorMsg, "cannot open") {
|
||||
s.logger.Warn("Pool does not exist in ZFS, continuing with database cleanup", "pool", pool.Name)
|
||||
// Continue with database cleanup even if pool doesn't exist
|
||||
} else {
|
||||
return fmt.Errorf("failed to destroy ZFS pool: %s: %w", errorMsg, err)
|
||||
}
|
||||
} else {
|
||||
s.logger.Info("ZFS pool destroyed successfully", "pool", pool.Name)
|
||||
}
|
||||
|
||||
// Mark disks as unused
|
||||
|
||||
254
backend/internal/storage/zfs_pool_monitor.go
Normal file
254
backend/internal/storage/zfs_pool_monitor.go
Normal file
@@ -0,0 +1,254 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os/exec"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/atlasos/calypso/internal/common/database"
|
||||
"github.com/atlasos/calypso/internal/common/logger"
|
||||
)
|
||||
|
||||
// ZFSPoolMonitor handles periodic ZFS pool status monitoring and sync to database
|
||||
type ZFSPoolMonitor struct {
|
||||
zfsService *ZFSService
|
||||
logger *logger.Logger
|
||||
interval time.Duration
|
||||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
// NewZFSPoolMonitor creates a new ZFS pool monitor service
|
||||
func NewZFSPoolMonitor(db *database.DB, log *logger.Logger, interval time.Duration) *ZFSPoolMonitor {
|
||||
return &ZFSPoolMonitor{
|
||||
zfsService: NewZFSService(db, log),
|
||||
logger: log,
|
||||
interval: interval,
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Start starts the ZFS pool monitor background service
|
||||
func (m *ZFSPoolMonitor) Start(ctx context.Context) {
|
||||
m.logger.Info("Starting ZFS pool monitor service", "interval", m.interval)
|
||||
ticker := time.NewTicker(m.interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
// Run initial sync immediately
|
||||
m.syncPools(ctx)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
m.logger.Info("ZFS pool monitor service stopped")
|
||||
return
|
||||
case <-m.stopCh:
|
||||
m.logger.Info("ZFS pool monitor service stopped")
|
||||
return
|
||||
case <-ticker.C:
|
||||
m.syncPools(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stop stops the ZFS pool monitor service
|
||||
func (m *ZFSPoolMonitor) Stop() {
|
||||
close(m.stopCh)
|
||||
}
|
||||
|
||||
// syncPools syncs ZFS pool status from system to database
|
||||
func (m *ZFSPoolMonitor) syncPools(ctx context.Context) {
|
||||
m.logger.Debug("Running periodic ZFS pool sync")
|
||||
|
||||
// Get all pools from system
|
||||
systemPools, err := m.getSystemPools(ctx)
|
||||
if err != nil {
|
||||
m.logger.Error("Failed to get system pools", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
m.logger.Debug("Found pools in system", "count", len(systemPools))
|
||||
|
||||
// Update each pool in database
|
||||
for poolName, poolInfo := range systemPools {
|
||||
if err := m.updatePoolStatus(ctx, poolName, poolInfo); err != nil {
|
||||
m.logger.Error("Failed to update pool status", "pool", poolName, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Mark pools that don't exist in system as offline
|
||||
if err := m.markMissingPoolsOffline(ctx, systemPools); err != nil {
|
||||
m.logger.Error("Failed to mark missing pools offline", "error", err)
|
||||
}
|
||||
|
||||
m.logger.Debug("ZFS pool sync completed")
|
||||
}
|
||||
|
||||
// PoolInfo represents pool information from system
|
||||
type PoolInfo struct {
|
||||
Name string
|
||||
SizeBytes int64
|
||||
UsedBytes int64
|
||||
Health string // online, degraded, faulted, offline, unavailable, removed
|
||||
}
|
||||
|
||||
// getSystemPools gets all pools from ZFS system
|
||||
func (m *ZFSPoolMonitor) getSystemPools(ctx context.Context) (map[string]PoolInfo, error) {
|
||||
pools := make(map[string]PoolInfo)
|
||||
|
||||
// Get pool list
|
||||
cmd := exec.CommandContext(ctx, "zpool", "list", "-H", "-o", "name,size,alloc,free,health")
|
||||
output, err := cmd.Output()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
lines := strings.Split(strings.TrimSpace(string(output)), "\n")
|
||||
for _, line := range lines {
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
fields := strings.Fields(line)
|
||||
if len(fields) < 5 {
|
||||
continue
|
||||
}
|
||||
|
||||
poolName := fields[0]
|
||||
sizeStr := fields[1]
|
||||
allocStr := fields[2]
|
||||
health := fields[4]
|
||||
|
||||
// Parse size (e.g., "95.5G" -> bytes)
|
||||
sizeBytes, err := parseSize(sizeStr)
|
||||
if err != nil {
|
||||
m.logger.Warn("Failed to parse pool size", "pool", poolName, "size", sizeStr, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Parse allocated (used) size
|
||||
usedBytes, err := parseSize(allocStr)
|
||||
if err != nil {
|
||||
m.logger.Warn("Failed to parse pool used size", "pool", poolName, "alloc", allocStr, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Normalize health status to lowercase
|
||||
healthNormalized := strings.ToLower(health)
|
||||
|
||||
pools[poolName] = PoolInfo{
|
||||
Name: poolName,
|
||||
SizeBytes: sizeBytes,
|
||||
UsedBytes: usedBytes,
|
||||
Health: healthNormalized,
|
||||
}
|
||||
}
|
||||
|
||||
return pools, nil
|
||||
}
|
||||
|
||||
// parseSize parses size string (e.g., "95.5G", "1.2T") to bytes
|
||||
func parseSize(sizeStr string) (int64, error) {
|
||||
// Remove any whitespace
|
||||
sizeStr = strings.TrimSpace(sizeStr)
|
||||
|
||||
// Match pattern like "95.5G", "1.2T", "512M"
|
||||
re := regexp.MustCompile(`^([\d.]+)([KMGT]?)$`)
|
||||
matches := re.FindStringSubmatch(strings.ToUpper(sizeStr))
|
||||
if len(matches) != 3 {
|
||||
return 0, nil // Return 0 if can't parse
|
||||
}
|
||||
|
||||
value, err := strconv.ParseFloat(matches[1], 64)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
unit := matches[2]
|
||||
var multiplier int64 = 1
|
||||
|
||||
switch unit {
|
||||
case "K":
|
||||
multiplier = 1024
|
||||
case "M":
|
||||
multiplier = 1024 * 1024
|
||||
case "G":
|
||||
multiplier = 1024 * 1024 * 1024
|
||||
case "T":
|
||||
multiplier = 1024 * 1024 * 1024 * 1024
|
||||
case "P":
|
||||
multiplier = 1024 * 1024 * 1024 * 1024 * 1024
|
||||
}
|
||||
|
||||
return int64(value * float64(multiplier)), nil
|
||||
}
|
||||
|
||||
// updatePoolStatus updates pool status in database
|
||||
func (m *ZFSPoolMonitor) updatePoolStatus(ctx context.Context, poolName string, poolInfo PoolInfo) error {
|
||||
// Get pool from database by name
|
||||
var poolID string
|
||||
err := m.zfsService.db.QueryRowContext(ctx,
|
||||
"SELECT id FROM zfs_pools WHERE name = $1",
|
||||
poolName,
|
||||
).Scan(&poolID)
|
||||
|
||||
if err != nil {
|
||||
// Pool not in database, skip (might be created outside of Calypso)
|
||||
m.logger.Debug("Pool not found in database, skipping", "pool", poolName)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Update pool status, size, and used bytes
|
||||
_, err = m.zfsService.db.ExecContext(ctx, `
|
||||
UPDATE zfs_pools SET
|
||||
size_bytes = $1,
|
||||
used_bytes = $2,
|
||||
health_status = $3,
|
||||
updated_at = NOW()
|
||||
WHERE id = $4
|
||||
`, poolInfo.SizeBytes, poolInfo.UsedBytes, poolInfo.Health, poolID)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.logger.Debug("Updated pool status", "pool", poolName, "health", poolInfo.Health, "size", poolInfo.SizeBytes, "used", poolInfo.UsedBytes)
|
||||
return nil
|
||||
}
|
||||
|
||||
// markMissingPoolsOffline marks pools that exist in database but not in system as offline
|
||||
func (m *ZFSPoolMonitor) markMissingPoolsOffline(ctx context.Context, systemPools map[string]PoolInfo) error {
|
||||
// Get all pools from database
|
||||
rows, err := m.zfsService.db.QueryContext(ctx, "SELECT id, name FROM zfs_pools WHERE is_active = true")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var poolID, poolName string
|
||||
if err := rows.Scan(&poolID, &poolName); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if pool exists in system
|
||||
if _, exists := systemPools[poolName]; !exists {
|
||||
// Pool doesn't exist in system, mark as offline
|
||||
_, err = m.zfsService.db.ExecContext(ctx, `
|
||||
UPDATE zfs_pools SET
|
||||
health_status = 'offline',
|
||||
updated_at = NOW()
|
||||
WHERE id = $1
|
||||
`, poolID)
|
||||
if err != nil {
|
||||
m.logger.Warn("Failed to mark pool as offline", "pool", poolName, "error", err)
|
||||
} else {
|
||||
m.logger.Info("Marked pool as offline (not found in system)", "pool", poolName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return rows.Err()
|
||||
}
|
||||
Reference in New Issue
Block a user