working on storage dashboard
This commit is contained in:
792
backend/internal/storage/zfs.go
Normal file
792
backend/internal/storage/zfs.go
Normal file
@@ -0,0 +1,792 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"os/exec"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/lib/pq"
|
||||
"github.com/atlasos/calypso/internal/common/database"
|
||||
"github.com/atlasos/calypso/internal/common/logger"
|
||||
)
|
||||
|
||||
// ZFSService handles ZFS pool management
|
||||
type ZFSService struct {
|
||||
db *database.DB
|
||||
logger *logger.Logger
|
||||
}
|
||||
|
||||
// NewZFSService creates a new ZFS service
|
||||
func NewZFSService(db *database.DB, log *logger.Logger) *ZFSService {
|
||||
return &ZFSService{
|
||||
db: db,
|
||||
logger: log,
|
||||
}
|
||||
}
|
||||
|
||||
// ZFSPool represents a ZFS pool
|
||||
type ZFSPool struct {
|
||||
ID string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Description string `json:"description"`
|
||||
RaidLevel string `json:"raid_level"` // stripe, mirror, raidz, raidz2, raidz3
|
||||
Disks []string `json:"disks"` // device paths
|
||||
SpareDisks []string `json:"spare_disks"` // spare disk paths
|
||||
SizeBytes int64 `json:"size_bytes"`
|
||||
UsedBytes int64 `json:"used_bytes"`
|
||||
Compression string `json:"compression"` // off, lz4, zstd, gzip
|
||||
Deduplication bool `json:"deduplication"`
|
||||
AutoExpand bool `json:"auto_expand"`
|
||||
ScrubInterval int `json:"scrub_interval"` // days
|
||||
IsActive bool `json:"is_active"`
|
||||
HealthStatus string `json:"health_status"` // online, degraded, faulted, offline
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
UpdatedAt time.Time `json:"updated_at"`
|
||||
CreatedBy string `json:"created_by"`
|
||||
}
|
||||
|
||||
// CreatePool creates a new ZFS pool
|
||||
func (s *ZFSService) CreatePool(ctx context.Context, name string, raidLevel string, disks []string, compression string, deduplication bool, autoExpand bool, createdBy string) (*ZFSPool, error) {
|
||||
// Validate inputs
|
||||
if name == "" {
|
||||
return nil, fmt.Errorf("pool name is required")
|
||||
}
|
||||
if len(disks) == 0 {
|
||||
return nil, fmt.Errorf("at least one disk is required")
|
||||
}
|
||||
|
||||
// Validate RAID level
|
||||
validRaidLevels := map[string]int{
|
||||
"stripe": 1,
|
||||
"mirror": 2,
|
||||
"raidz": 3,
|
||||
"raidz2": 4,
|
||||
"raidz3": 5,
|
||||
}
|
||||
minDisks, ok := validRaidLevels[raidLevel]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("invalid RAID level: %s", raidLevel)
|
||||
}
|
||||
if len(disks) < minDisks {
|
||||
return nil, fmt.Errorf("RAID level %s requires at least %d disks, got %d", raidLevel, minDisks, len(disks))
|
||||
}
|
||||
|
||||
// Check if pool already exists
|
||||
var existingID string
|
||||
err := s.db.QueryRowContext(ctx,
|
||||
"SELECT id FROM zfs_pools WHERE name = $1",
|
||||
name,
|
||||
).Scan(&existingID)
|
||||
if err == nil {
|
||||
return nil, fmt.Errorf("pool with name %s already exists", name)
|
||||
} else if err != sql.ErrNoRows {
|
||||
// Check if table exists - if not, this is a migration issue
|
||||
if strings.Contains(err.Error(), "does not exist") || strings.Contains(err.Error(), "relation") {
|
||||
return nil, fmt.Errorf("zfs_pools table does not exist - please run database migrations")
|
||||
}
|
||||
return nil, fmt.Errorf("failed to check existing pool: %w", err)
|
||||
}
|
||||
|
||||
// Check if disks are available (not used)
|
||||
for _, diskPath := range disks {
|
||||
var isUsed bool
|
||||
err := s.db.QueryRowContext(ctx,
|
||||
"SELECT is_used FROM physical_disks WHERE device_path = $1",
|
||||
diskPath,
|
||||
).Scan(&isUsed)
|
||||
if err == sql.ErrNoRows {
|
||||
// Disk not in database, that's okay - we'll still try to use it
|
||||
s.logger.Warn("Disk not found in database, will attempt to use anyway", "disk", diskPath)
|
||||
} else if err != nil {
|
||||
return nil, fmt.Errorf("failed to check disk %s: %w", diskPath, err)
|
||||
} else if isUsed {
|
||||
return nil, fmt.Errorf("disk %s is already in use", diskPath)
|
||||
}
|
||||
}
|
||||
|
||||
// Build zpool create command
|
||||
var args []string
|
||||
args = append(args, "create", "-f") // -f to force creation
|
||||
|
||||
// Note: compression is a filesystem property, not a pool property
|
||||
// We'll set it after pool creation using zfs set
|
||||
|
||||
// Add deduplication property (this IS a pool property)
|
||||
if deduplication {
|
||||
args = append(args, "-o", "dedup=on")
|
||||
}
|
||||
|
||||
// Add autoexpand property (this IS a pool property)
|
||||
if autoExpand {
|
||||
args = append(args, "-o", "autoexpand=on")
|
||||
}
|
||||
|
||||
// Add pool name
|
||||
args = append(args, name)
|
||||
|
||||
// Add RAID level and disks
|
||||
switch raidLevel {
|
||||
case "stripe":
|
||||
// Simple stripe: just list all disks
|
||||
args = append(args, disks...)
|
||||
case "mirror":
|
||||
// Mirror: group disks in pairs
|
||||
if len(disks)%2 != 0 {
|
||||
return nil, fmt.Errorf("mirror requires even number of disks")
|
||||
}
|
||||
for i := 0; i < len(disks); i += 2 {
|
||||
args = append(args, "mirror", disks[i], disks[i+1])
|
||||
}
|
||||
case "raidz":
|
||||
args = append(args, "raidz")
|
||||
args = append(args, disks...)
|
||||
case "raidz2":
|
||||
args = append(args, "raidz2")
|
||||
args = append(args, disks...)
|
||||
case "raidz3":
|
||||
args = append(args, "raidz3")
|
||||
args = append(args, disks...)
|
||||
}
|
||||
|
||||
// Execute zpool create
|
||||
s.logger.Info("Creating ZFS pool", "name", name, "raid_level", raidLevel, "disks", disks, "args", args)
|
||||
cmd := exec.CommandContext(ctx, "zpool", args...)
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
errorMsg := string(output)
|
||||
s.logger.Error("Failed to create ZFS pool", "name", name, "error", err, "output", errorMsg)
|
||||
return nil, fmt.Errorf("failed to create ZFS pool: %s", errorMsg)
|
||||
}
|
||||
|
||||
s.logger.Info("ZFS pool created successfully", "name", name, "output", string(output))
|
||||
|
||||
// Set filesystem properties (compression, etc.) after pool creation
|
||||
// ZFS creates a root filesystem with the same name as the pool
|
||||
if compression != "" && compression != "off" {
|
||||
cmd = exec.CommandContext(ctx, "zfs", "set", fmt.Sprintf("compression=%s", compression), name)
|
||||
output, err = cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
s.logger.Warn("Failed to set compression property", "pool", name, "compression", compression, "error", string(output))
|
||||
// Don't fail pool creation if compression setting fails, just log warning
|
||||
} else {
|
||||
s.logger.Info("Compression property set", "pool", name, "compression", compression)
|
||||
}
|
||||
}
|
||||
|
||||
// Get pool information
|
||||
poolInfo, err := s.getPoolInfo(ctx, name)
|
||||
if err != nil {
|
||||
// Try to destroy the pool if we can't get info
|
||||
s.logger.Warn("Failed to get pool info, attempting to destroy pool", "name", name, "error", err)
|
||||
exec.CommandContext(ctx, "zpool", "destroy", "-f", name).Run()
|
||||
return nil, fmt.Errorf("failed to get pool info after creation: %w", err)
|
||||
}
|
||||
|
||||
// Mark disks as used
|
||||
for _, diskPath := range disks {
|
||||
_, err = s.db.ExecContext(ctx,
|
||||
"UPDATE physical_disks SET is_used = true, updated_at = NOW() WHERE device_path = $1",
|
||||
diskPath,
|
||||
)
|
||||
if err != nil {
|
||||
s.logger.Warn("Failed to mark disk as used", "disk", diskPath, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Insert into database
|
||||
query := `
|
||||
INSERT INTO zfs_pools (
|
||||
name, raid_level, disks, size_bytes, used_bytes,
|
||||
compression, deduplication, auto_expand, scrub_interval,
|
||||
is_active, health_status, created_by
|
||||
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
|
||||
RETURNING id, created_at, updated_at
|
||||
`
|
||||
|
||||
var pool ZFSPool
|
||||
err = s.db.QueryRowContext(ctx, query,
|
||||
name, raidLevel, pq.Array(disks), poolInfo.SizeBytes, poolInfo.UsedBytes,
|
||||
compression, deduplication, autoExpand, 30, // default scrub interval 30 days
|
||||
true, "online", createdBy,
|
||||
).Scan(&pool.ID, &pool.CreatedAt, &pool.UpdatedAt)
|
||||
if err != nil {
|
||||
// Cleanup: destroy pool if database insert fails
|
||||
s.logger.Error("Failed to save pool to database, destroying pool", "name", name, "error", err)
|
||||
exec.CommandContext(ctx, "zpool", "destroy", "-f", name).Run()
|
||||
return nil, fmt.Errorf("failed to save pool to database: %w", err)
|
||||
}
|
||||
|
||||
pool.Name = name
|
||||
pool.RaidLevel = raidLevel
|
||||
pool.Disks = disks
|
||||
pool.SizeBytes = poolInfo.SizeBytes
|
||||
pool.UsedBytes = poolInfo.UsedBytes
|
||||
pool.Compression = compression
|
||||
pool.Deduplication = deduplication
|
||||
pool.AutoExpand = autoExpand
|
||||
pool.ScrubInterval = 30
|
||||
pool.IsActive = true
|
||||
pool.HealthStatus = "online"
|
||||
pool.CreatedBy = createdBy
|
||||
|
||||
s.logger.Info("ZFS pool created", "name", name, "raid_level", raidLevel, "disks", len(disks))
|
||||
return &pool, nil
|
||||
}
|
||||
|
||||
// getPoolInfo retrieves information about a ZFS pool
|
||||
func (s *ZFSService) getPoolInfo(ctx context.Context, poolName string) (*ZFSPool, error) {
|
||||
// Get pool size and used space
|
||||
cmd := exec.CommandContext(ctx, "zpool", "list", "-H", "-o", "name,size,allocated", poolName)
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
errorMsg := string(output)
|
||||
s.logger.Error("Failed to get pool info", "pool", poolName, "error", err, "output", errorMsg)
|
||||
return nil, fmt.Errorf("failed to get pool info: %s", errorMsg)
|
||||
}
|
||||
|
||||
outputStr := strings.TrimSpace(string(output))
|
||||
if outputStr == "" {
|
||||
return nil, fmt.Errorf("pool %s not found or empty output", poolName)
|
||||
}
|
||||
|
||||
fields := strings.Fields(outputStr)
|
||||
if len(fields) < 3 {
|
||||
s.logger.Error("Unexpected zpool list output", "pool", poolName, "output", outputStr, "fields", len(fields))
|
||||
return nil, fmt.Errorf("unexpected zpool list output: %s (expected 3+ fields, got %d)", outputStr, len(fields))
|
||||
}
|
||||
|
||||
// Parse size (format: 100G, 1T, etc.)
|
||||
sizeBytes, err := parseZFSSize(fields[1])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse pool size: %w", err)
|
||||
}
|
||||
|
||||
usedBytes, err := parseZFSSize(fields[2])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse used size: %w", err)
|
||||
}
|
||||
|
||||
return &ZFSPool{
|
||||
Name: poolName,
|
||||
SizeBytes: sizeBytes,
|
||||
UsedBytes: usedBytes,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// parseZFSSize parses ZFS size strings like "100G", "1T", "500M"
|
||||
func parseZFSSize(sizeStr string) (int64, error) {
|
||||
sizeStr = strings.TrimSpace(sizeStr)
|
||||
if sizeStr == "" {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
var multiplier int64 = 1
|
||||
lastChar := sizeStr[len(sizeStr)-1]
|
||||
if lastChar >= '0' && lastChar <= '9' {
|
||||
// No suffix, assume bytes
|
||||
var size int64
|
||||
_, err := fmt.Sscanf(sizeStr, "%d", &size)
|
||||
return size, err
|
||||
}
|
||||
|
||||
switch strings.ToUpper(string(lastChar)) {
|
||||
case "K":
|
||||
multiplier = 1024
|
||||
case "M":
|
||||
multiplier = 1024 * 1024
|
||||
case "G":
|
||||
multiplier = 1024 * 1024 * 1024
|
||||
case "T":
|
||||
multiplier = 1024 * 1024 * 1024 * 1024
|
||||
case "P":
|
||||
multiplier = 1024 * 1024 * 1024 * 1024 * 1024
|
||||
default:
|
||||
return 0, fmt.Errorf("unknown size suffix: %c", lastChar)
|
||||
}
|
||||
|
||||
var size int64
|
||||
_, err := fmt.Sscanf(sizeStr[:len(sizeStr)-1], "%d", &size)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return size * multiplier, nil
|
||||
}
|
||||
|
||||
// getSpareDisks retrieves spare disks from zpool status
|
||||
func (s *ZFSService) getSpareDisks(ctx context.Context, poolName string) ([]string, error) {
|
||||
cmd := exec.CommandContext(ctx, "zpool", "status", poolName)
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get pool status: %w", err)
|
||||
}
|
||||
|
||||
outputStr := string(output)
|
||||
var spareDisks []string
|
||||
|
||||
// Parse spare disks from zpool status output
|
||||
// Format: spares\n sde AVAIL
|
||||
lines := strings.Split(outputStr, "\n")
|
||||
inSparesSection := false
|
||||
for _, line := range lines {
|
||||
line = strings.TrimSpace(line)
|
||||
if strings.HasPrefix(line, "spares") {
|
||||
inSparesSection = true
|
||||
continue
|
||||
}
|
||||
if inSparesSection {
|
||||
if line == "" || strings.HasPrefix(line, "errors:") || strings.HasPrefix(line, "config:") {
|
||||
break
|
||||
}
|
||||
// Extract disk name (e.g., "sde AVAIL" -> "sde")
|
||||
fields := strings.Fields(line)
|
||||
if len(fields) > 0 {
|
||||
diskName := fields[0]
|
||||
// Convert to full device path
|
||||
if !strings.HasPrefix(diskName, "/dev/") {
|
||||
diskName = "/dev/" + diskName
|
||||
}
|
||||
spareDisks = append(spareDisks, diskName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return spareDisks, nil
|
||||
}
|
||||
|
||||
// ListPools lists all ZFS pools
|
||||
func (s *ZFSService) ListPools(ctx context.Context) ([]*ZFSPool, error) {
|
||||
query := `
|
||||
SELECT id, name, description, raid_level, disks, size_bytes, used_bytes,
|
||||
compression, deduplication, auto_expand, scrub_interval,
|
||||
is_active, health_status, created_at, updated_at, created_by
|
||||
FROM zfs_pools
|
||||
ORDER BY created_at DESC
|
||||
`
|
||||
|
||||
rows, err := s.db.QueryContext(ctx, query)
|
||||
if err != nil {
|
||||
// Check if table exists
|
||||
errStr := err.Error()
|
||||
if strings.Contains(errStr, "does not exist") || strings.Contains(errStr, "relation") {
|
||||
return nil, fmt.Errorf("zfs_pools table does not exist - please run database migrations")
|
||||
}
|
||||
return nil, fmt.Errorf("failed to query pools: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var pools []*ZFSPool
|
||||
for rows.Next() {
|
||||
var pool ZFSPool
|
||||
var description sql.NullString
|
||||
err := rows.Scan(
|
||||
&pool.ID, &pool.Name, &description, &pool.RaidLevel, pq.Array(&pool.Disks),
|
||||
&pool.SizeBytes, &pool.UsedBytes, &pool.Compression, &pool.Deduplication,
|
||||
&pool.AutoExpand, &pool.ScrubInterval, &pool.IsActive, &pool.HealthStatus,
|
||||
&pool.CreatedAt, &pool.UpdatedAt, &pool.CreatedBy,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to scan pool: %w", err)
|
||||
}
|
||||
if description.Valid {
|
||||
pool.Description = description.String
|
||||
}
|
||||
|
||||
// Get spare disks from zpool status
|
||||
spareDisks, err := s.getSpareDisks(ctx, pool.Name)
|
||||
if err != nil {
|
||||
s.logger.Warn("Failed to get spare disks", "pool", pool.Name, "error", err)
|
||||
pool.SpareDisks = []string{}
|
||||
} else {
|
||||
pool.SpareDisks = spareDisks
|
||||
}
|
||||
|
||||
pools = append(pools, &pool)
|
||||
}
|
||||
|
||||
return pools, nil
|
||||
}
|
||||
|
||||
// GetPool retrieves a ZFS pool by ID
|
||||
func (s *ZFSService) GetPool(ctx context.Context, poolID string) (*ZFSPool, error) {
|
||||
query := `
|
||||
SELECT id, name, description, raid_level, disks, size_bytes, used_bytes,
|
||||
compression, deduplication, auto_expand, scrub_interval,
|
||||
is_active, health_status, created_at, updated_at, created_by
|
||||
FROM zfs_pools
|
||||
WHERE id = $1
|
||||
`
|
||||
|
||||
var pool ZFSPool
|
||||
var description sql.NullString
|
||||
err := s.db.QueryRowContext(ctx, query, poolID).Scan(
|
||||
&pool.ID, &pool.Name, &description, &pool.RaidLevel, pq.Array(&pool.Disks),
|
||||
&pool.SizeBytes, &pool.UsedBytes, &pool.Compression, &pool.Deduplication,
|
||||
&pool.AutoExpand, &pool.ScrubInterval, &pool.IsActive, &pool.HealthStatus,
|
||||
&pool.CreatedAt, &pool.UpdatedAt, &pool.CreatedBy,
|
||||
)
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, fmt.Errorf("pool not found")
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get pool: %w", err)
|
||||
}
|
||||
|
||||
if description.Valid {
|
||||
pool.Description = description.String
|
||||
}
|
||||
|
||||
// Get spare disks from zpool status
|
||||
spareDisks, err := s.getSpareDisks(ctx, pool.Name)
|
||||
if err != nil {
|
||||
s.logger.Warn("Failed to get spare disks", "pool", pool.Name, "error", err)
|
||||
pool.SpareDisks = []string{}
|
||||
} else {
|
||||
pool.SpareDisks = spareDisks
|
||||
}
|
||||
|
||||
return &pool, nil
|
||||
}
|
||||
|
||||
// DeletePool destroys a ZFS pool
|
||||
func (s *ZFSService) DeletePool(ctx context.Context, poolID string) error {
|
||||
pool, err := s.GetPool(ctx, poolID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Destroy ZFS pool
|
||||
cmd := exec.CommandContext(ctx, "zpool", "destroy", pool.Name)
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to destroy ZFS pool: %s: %w", string(output), err)
|
||||
}
|
||||
|
||||
// Mark disks as unused
|
||||
for _, diskPath := range pool.Disks {
|
||||
_, err = s.db.ExecContext(ctx,
|
||||
"UPDATE physical_disks SET is_used = false, updated_at = NOW() WHERE device_path = $1",
|
||||
diskPath,
|
||||
)
|
||||
if err != nil {
|
||||
s.logger.Warn("Failed to mark disk as unused", "disk", diskPath, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Delete from database
|
||||
_, err = s.db.ExecContext(ctx, "DELETE FROM zfs_pools WHERE id = $1", poolID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete pool from database: %w", err)
|
||||
}
|
||||
|
||||
s.logger.Info("ZFS pool deleted", "name", pool.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddSpareDisk adds one or more spare disks to a ZFS pool
|
||||
func (s *ZFSService) AddSpareDisk(ctx context.Context, poolID string, diskPaths []string) error {
|
||||
if len(diskPaths) == 0 {
|
||||
return fmt.Errorf("at least one disk must be specified")
|
||||
}
|
||||
|
||||
// Get pool information
|
||||
pool, err := s.GetPool(ctx, poolID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Verify pool exists in ZFS and check if disks are already spare
|
||||
cmd := exec.CommandContext(ctx, "zpool", "status", pool.Name)
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
return fmt.Errorf("pool %s does not exist in ZFS: %w", pool.Name, err)
|
||||
}
|
||||
outputStr := string(output)
|
||||
|
||||
// Check if any disk is already a spare in this pool
|
||||
for _, diskPath := range diskPaths {
|
||||
// Extract just the device name (e.g., /dev/sde -> sde)
|
||||
diskName := strings.TrimPrefix(diskPath, "/dev/")
|
||||
if strings.Contains(outputStr, "spares") && strings.Contains(outputStr, diskName) {
|
||||
s.logger.Warn("Disk is already a spare in this pool", "disk", diskPath, "pool", pool.Name)
|
||||
// Don't return error, just skip - zpool add will handle duplicate gracefully
|
||||
}
|
||||
}
|
||||
|
||||
// Verify pool exists in ZFS (already checked above with zpool status)
|
||||
|
||||
// Build zpool add command with spare option
|
||||
args := []string{"add", pool.Name, "spare"}
|
||||
args = append(args, diskPaths...)
|
||||
|
||||
// Execute zpool add
|
||||
s.logger.Info("Adding spare disks to ZFS pool", "pool", pool.Name, "disks", diskPaths)
|
||||
cmd = exec.CommandContext(ctx, "zpool", args...)
|
||||
output, err = cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
errorMsg := string(output)
|
||||
s.logger.Error("Failed to add spare disks to ZFS pool", "pool", pool.Name, "disks", diskPaths, "error", err, "output", errorMsg)
|
||||
return fmt.Errorf("failed to add spare disks: %s", errorMsg)
|
||||
}
|
||||
|
||||
s.logger.Info("Spare disks added successfully", "pool", pool.Name, "disks", diskPaths)
|
||||
|
||||
// Mark disks as used
|
||||
for _, diskPath := range diskPaths {
|
||||
_, err = s.db.ExecContext(ctx,
|
||||
"UPDATE physical_disks SET is_used = true, updated_at = NOW() WHERE device_path = $1",
|
||||
diskPath,
|
||||
)
|
||||
if err != nil {
|
||||
s.logger.Warn("Failed to mark disk as used", "disk", diskPath, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Update pool's updated_at timestamp
|
||||
_, err = s.db.ExecContext(ctx,
|
||||
"UPDATE zfs_pools SET updated_at = NOW() WHERE id = $1",
|
||||
poolID,
|
||||
)
|
||||
if err != nil {
|
||||
s.logger.Warn("Failed to update pool timestamp", "pool_id", poolID, "error", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ZFSDataset represents a ZFS dataset
|
||||
type ZFSDataset struct {
|
||||
Name string `json:"name"`
|
||||
Pool string `json:"pool"`
|
||||
Type string `json:"type"` // filesystem, volume, snapshot
|
||||
MountPoint string `json:"mount_point"`
|
||||
UsedBytes int64 `json:"used_bytes"`
|
||||
AvailableBytes int64 `json:"available_bytes"`
|
||||
ReferencedBytes int64 `json:"referenced_bytes"`
|
||||
Compression string `json:"compression"`
|
||||
Deduplication string `json:"deduplication"`
|
||||
Quota int64 `json:"quota"` // -1 for unlimited
|
||||
Reservation int64 `json:"reservation"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
}
|
||||
|
||||
// ListDatasets lists all datasets in a ZFS pool
|
||||
func (s *ZFSService) ListDatasets(ctx context.Context, poolName string) ([]*ZFSDataset, error) {
|
||||
// Get all datasets in the pool using zfs list
|
||||
cmd := exec.CommandContext(ctx, "zfs", "list", "-H", "-o", "name,used,avail,refer,compress,dedup,quota,reservation,mountpoint", "-r", poolName)
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
// If pool doesn't exist, return empty list
|
||||
if strings.Contains(string(output), "does not exist") {
|
||||
return []*ZFSDataset{}, nil
|
||||
}
|
||||
return nil, fmt.Errorf("failed to list datasets: %s: %w", string(output), err)
|
||||
}
|
||||
|
||||
var datasets []*ZFSDataset
|
||||
lines := strings.Split(strings.TrimSpace(string(output)), "\n")
|
||||
|
||||
for _, line := range lines {
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
fields := strings.Fields(line)
|
||||
if len(fields) < 9 {
|
||||
continue
|
||||
}
|
||||
|
||||
datasetName := fields[0]
|
||||
// Skip the pool itself (root dataset)
|
||||
if datasetName == poolName {
|
||||
continue
|
||||
}
|
||||
|
||||
// Extract pool name from dataset name (e.g., "pool/dataset" -> "pool")
|
||||
poolFromName := strings.Split(datasetName, "/")[0]
|
||||
if poolFromName != poolName {
|
||||
continue
|
||||
}
|
||||
|
||||
usedBytes, _ := parseZFSSize(fields[1])
|
||||
availableBytes, _ := parseZFSSize(fields[2])
|
||||
referencedBytes, _ := parseZFSSize(fields[3])
|
||||
compression := fields[4]
|
||||
deduplication := fields[5]
|
||||
quotaStr := fields[6]
|
||||
reservationStr := fields[7]
|
||||
mountPoint := fields[8]
|
||||
|
||||
quota := int64(-1) // -1 means unlimited
|
||||
if quotaStr != "-" && quotaStr != "none" {
|
||||
if q, err := parseZFSSize(quotaStr); err == nil {
|
||||
quota = q
|
||||
}
|
||||
}
|
||||
|
||||
reservation := int64(0)
|
||||
if reservationStr != "-" && reservationStr != "none" {
|
||||
if r, err := parseZFSSize(reservationStr); err == nil {
|
||||
reservation = r
|
||||
}
|
||||
}
|
||||
|
||||
// Determine dataset type
|
||||
datasetType := "filesystem"
|
||||
volCmd := exec.CommandContext(ctx, "zfs", "get", "-H", "-o", "value", "type", datasetName)
|
||||
volOutput, err := volCmd.Output()
|
||||
if err == nil {
|
||||
volType := strings.TrimSpace(string(volOutput))
|
||||
if volType == "volume" {
|
||||
datasetType = "volume"
|
||||
} else if strings.Contains(volType, "snapshot") {
|
||||
datasetType = "snapshot"
|
||||
}
|
||||
}
|
||||
|
||||
// Get creation time
|
||||
createdAt := time.Now()
|
||||
creationCmd := exec.CommandContext(ctx, "zfs", "get", "-H", "-o", "value", "creation", datasetName)
|
||||
creationOutput, err := creationCmd.Output()
|
||||
if err == nil {
|
||||
creationStr := strings.TrimSpace(string(creationOutput))
|
||||
// Try parsing different date formats
|
||||
if t, err := time.Parse("Mon Jan 2 15:04:05 2006", creationStr); err == nil {
|
||||
createdAt = t
|
||||
} else if t, err := time.Parse(time.RFC3339, creationStr); err == nil {
|
||||
createdAt = t
|
||||
}
|
||||
}
|
||||
|
||||
datasets = append(datasets, &ZFSDataset{
|
||||
Name: datasetName,
|
||||
Pool: poolName,
|
||||
Type: datasetType,
|
||||
MountPoint: mountPoint,
|
||||
UsedBytes: usedBytes,
|
||||
AvailableBytes: availableBytes,
|
||||
ReferencedBytes: referencedBytes,
|
||||
Compression: compression,
|
||||
Deduplication: deduplication,
|
||||
Quota: quota,
|
||||
Reservation: reservation,
|
||||
CreatedAt: createdAt,
|
||||
})
|
||||
}
|
||||
|
||||
return datasets, nil
|
||||
}
|
||||
|
||||
// CreateDatasetRequest represents a request to create a ZFS dataset
|
||||
type CreateDatasetRequest struct {
|
||||
Name string `json:"name"` // Dataset name (e.g., "pool/dataset" or just "dataset")
|
||||
Type string `json:"type"` // "filesystem" or "volume"
|
||||
Compression string `json:"compression"` // off, lz4, zstd, gzip, etc.
|
||||
Quota int64 `json:"quota"` // -1 for unlimited
|
||||
Reservation int64 `json:"reservation"` // 0 for none
|
||||
MountPoint string `json:"mount_point"` // Optional mount point
|
||||
}
|
||||
|
||||
// CreateDataset creates a new ZFS dataset
|
||||
func (s *ZFSService) CreateDataset(ctx context.Context, poolName string, req CreateDatasetRequest) (*ZFSDataset, error) {
|
||||
// Construct full dataset name
|
||||
fullName := poolName + "/" + req.Name
|
||||
|
||||
// Build zfs create command
|
||||
args := []string{"create"}
|
||||
|
||||
// Add type if volume
|
||||
if req.Type == "volume" {
|
||||
// For volumes, we need size (use quota as size)
|
||||
if req.Quota <= 0 {
|
||||
return nil, fmt.Errorf("volume size (quota) must be specified and greater than 0")
|
||||
}
|
||||
args = append(args, "-V", fmt.Sprintf("%d", req.Quota), fullName)
|
||||
} else {
|
||||
// For filesystems
|
||||
args = append(args, fullName)
|
||||
}
|
||||
|
||||
// Set compression
|
||||
if req.Compression != "" && req.Compression != "off" {
|
||||
args = append(args, "-o", fmt.Sprintf("compression=%s", req.Compression))
|
||||
}
|
||||
|
||||
// Set mount point if provided
|
||||
if req.MountPoint != "" {
|
||||
args = append(args, "-o", fmt.Sprintf("mountpoint=%s", req.MountPoint))
|
||||
}
|
||||
|
||||
// Execute zfs create
|
||||
s.logger.Info("Creating ZFS dataset", "name", fullName, "type", req.Type)
|
||||
cmd := exec.CommandContext(ctx, "zfs", args...)
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
errorMsg := string(output)
|
||||
s.logger.Error("Failed to create dataset", "name", fullName, "error", err, "output", errorMsg)
|
||||
return nil, fmt.Errorf("failed to create dataset: %s", errorMsg)
|
||||
}
|
||||
|
||||
// Set quota if specified (for filesystems)
|
||||
if req.Type == "filesystem" && req.Quota > 0 {
|
||||
quotaCmd := exec.CommandContext(ctx, "zfs", "set", fmt.Sprintf("quota=%d", req.Quota), fullName)
|
||||
if quotaOutput, err := quotaCmd.CombinedOutput(); err != nil {
|
||||
s.logger.Warn("Failed to set quota", "dataset", fullName, "error", err, "output", string(quotaOutput))
|
||||
}
|
||||
}
|
||||
|
||||
// Set reservation if specified
|
||||
if req.Reservation > 0 {
|
||||
resvCmd := exec.CommandContext(ctx, "zfs", "set", fmt.Sprintf("reservation=%d", req.Reservation), fullName)
|
||||
if resvOutput, err := resvCmd.CombinedOutput(); err != nil {
|
||||
s.logger.Warn("Failed to set reservation", "dataset", fullName, "error", err, "output", string(resvOutput))
|
||||
}
|
||||
}
|
||||
|
||||
// Get the created dataset info
|
||||
datasets, err := s.ListDatasets(ctx, poolName)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to list datasets after creation: %w", err)
|
||||
}
|
||||
|
||||
// Find the newly created dataset
|
||||
for _, ds := range datasets {
|
||||
if ds.Name == fullName {
|
||||
s.logger.Info("ZFS dataset created successfully", "name", fullName)
|
||||
return ds, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("dataset created but not found in list")
|
||||
}
|
||||
|
||||
// DeleteDataset deletes a ZFS dataset
|
||||
func (s *ZFSService) DeleteDataset(ctx context.Context, datasetName string) error {
|
||||
// Check if dataset exists
|
||||
cmd := exec.CommandContext(ctx, "zfs", "list", "-H", "-o", "name", datasetName)
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
return fmt.Errorf("dataset %s does not exist: %w", datasetName, err)
|
||||
}
|
||||
|
||||
if strings.TrimSpace(string(output)) != datasetName {
|
||||
return fmt.Errorf("dataset %s not found", datasetName)
|
||||
}
|
||||
|
||||
// Delete the dataset (use -r for recursive to delete children)
|
||||
s.logger.Info("Deleting ZFS dataset", "name", datasetName)
|
||||
cmd = exec.CommandContext(ctx, "zfs", "destroy", "-r", datasetName)
|
||||
output, err = cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
errorMsg := string(output)
|
||||
s.logger.Error("Failed to delete dataset", "name", datasetName, "error", err, "output", errorMsg)
|
||||
return fmt.Errorf("failed to delete dataset: %s", errorMsg)
|
||||
}
|
||||
|
||||
s.logger.Info("ZFS dataset deleted successfully", "name", datasetName)
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user