fixing storage management dashboard

This commit is contained in:
Warp Agent
2025-12-25 20:02:59 +00:00
parent a5e6197bca
commit 419fcb7625
20 changed files with 3229 additions and 396 deletions

View File

@@ -0,0 +1,35 @@
-- AtlasOS - Calypso
-- Add ZFS Datasets Table
-- Version: 5.0
-- Description: Stores ZFS dataset metadata in database for faster queries and consistency
-- ZFS datasets table
CREATE TABLE IF NOT EXISTS zfs_datasets (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(512) NOT NULL UNIQUE, -- Full dataset name (e.g., pool/dataset)
pool_id UUID NOT NULL REFERENCES zfs_pools(id) ON DELETE CASCADE,
pool_name VARCHAR(255) NOT NULL, -- Denormalized for faster queries
type VARCHAR(50) NOT NULL, -- filesystem, volume, snapshot
mount_point TEXT, -- Mount point path (null for volumes)
used_bytes BIGINT NOT NULL DEFAULT 0,
available_bytes BIGINT NOT NULL DEFAULT 0,
referenced_bytes BIGINT NOT NULL DEFAULT 0,
compression VARCHAR(50) NOT NULL DEFAULT 'lz4', -- off, lz4, zstd, gzip
deduplication VARCHAR(50) NOT NULL DEFAULT 'off', -- off, on, verify
quota BIGINT DEFAULT -1, -- -1 for unlimited, bytes otherwise
reservation BIGINT NOT NULL DEFAULT 0, -- Reserved space in bytes
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
created_by UUID REFERENCES users(id)
);
-- Create indexes for faster lookups
CREATE INDEX IF NOT EXISTS idx_zfs_datasets_pool_id ON zfs_datasets(pool_id);
CREATE INDEX IF NOT EXISTS idx_zfs_datasets_pool_name ON zfs_datasets(pool_name);
CREATE INDEX IF NOT EXISTS idx_zfs_datasets_name ON zfs_datasets(name);
CREATE INDEX IF NOT EXISTS idx_zfs_datasets_type ON zfs_datasets(type);
CREATE INDEX IF NOT EXISTS idx_zfs_datasets_created_by ON zfs_datasets(created_by);
-- Composite index for common queries (list datasets by pool)
CREATE INDEX IF NOT EXISTS idx_zfs_datasets_pool_type ON zfs_datasets(pool_id, type);

View File

@@ -0,0 +1,50 @@
-- AtlasOS - Calypso
-- Add ZFS Shares and iSCSI Export Tables
-- Version: 6.0
-- Description: Separate tables for filesystem shares (NFS/SMB) and volume iSCSI exports
-- ZFS Filesystem Shares Table (for NFS/SMB)
CREATE TABLE IF NOT EXISTS zfs_shares (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
dataset_id UUID NOT NULL REFERENCES zfs_datasets(id) ON DELETE CASCADE,
share_type VARCHAR(50) NOT NULL, -- 'nfs', 'smb', 'both'
nfs_enabled BOOLEAN NOT NULL DEFAULT false,
nfs_options TEXT, -- e.g., "rw,sync,no_subtree_check"
nfs_clients TEXT[], -- Allowed client IPs/networks
smb_enabled BOOLEAN NOT NULL DEFAULT false,
smb_share_name VARCHAR(255), -- SMB share name
smb_path TEXT, -- SMB share path (usually same as mount_point)
smb_comment TEXT,
smb_guest_ok BOOLEAN NOT NULL DEFAULT false,
smb_read_only BOOLEAN NOT NULL DEFAULT false,
smb_browseable BOOLEAN NOT NULL DEFAULT true,
is_active BOOLEAN NOT NULL DEFAULT true,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
created_by UUID REFERENCES users(id),
UNIQUE(dataset_id) -- One share config per dataset
);
-- ZFS Volume iSCSI Exports Table
CREATE TABLE IF NOT EXISTS zfs_iscsi_exports (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
dataset_id UUID NOT NULL REFERENCES zfs_datasets(id) ON DELETE CASCADE,
target_id UUID REFERENCES scst_targets(id) ON DELETE SET NULL, -- Link to SCST target
lun_number INTEGER, -- LUN number in the target
device_path TEXT, -- /dev/zvol/pool/volume path
is_active BOOLEAN NOT NULL DEFAULT true,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
created_by UUID REFERENCES users(id),
UNIQUE(dataset_id) -- One iSCSI export per volume
);
-- Create indexes
CREATE INDEX IF NOT EXISTS idx_zfs_shares_dataset_id ON zfs_shares(dataset_id);
CREATE INDEX IF NOT EXISTS idx_zfs_shares_type ON zfs_shares(share_type);
CREATE INDEX IF NOT EXISTS idx_zfs_shares_active ON zfs_shares(is_active);
CREATE INDEX IF NOT EXISTS idx_zfs_iscsi_exports_dataset_id ON zfs_iscsi_exports(dataset_id);
CREATE INDEX IF NOT EXISTS idx_zfs_iscsi_exports_target_id ON zfs_iscsi_exports(target_id);
CREATE INDEX IF NOT EXISTS idx_zfs_iscsi_exports_active ON zfs_iscsi_exports(is_active);

View File

@@ -6,6 +6,7 @@ import (
"encoding/hex"
"fmt"
"net/http"
"strings"
"time"
"github.com/atlasos/calypso/internal/common/cache"
@@ -18,21 +19,21 @@ func GenerateKey(prefix string, parts ...string) string {
for _, part := range parts {
key += ":" + part
}
// Hash long keys to keep them manageable
if len(key) > 200 {
hash := sha256.Sum256([]byte(key))
return prefix + ":" + hex.EncodeToString(hash[:])
}
return key
}
// CacheConfig holds cache configuration
type CacheConfig struct {
Enabled bool
DefaultTTL time.Duration
MaxAge int // seconds for Cache-Control header
Enabled bool
DefaultTTL time.Duration
MaxAge int // seconds for Cache-Control header
}
// cacheMiddleware creates a caching middleware
@@ -74,7 +75,7 @@ func cacheMiddleware(cfg CacheConfig, cache *cache.Cache) gin.HandlerFunc {
// Cache miss - capture response
writer := &responseWriter{
ResponseWriter: c.Writer,
body: &bytes.Buffer{},
body: &bytes.Buffer{},
}
c.Writer = writer
@@ -136,6 +137,9 @@ func cacheControlMiddleware() gin.HandlerFunc {
case path == "/api/v1/system/services":
// Service list can be cached briefly
c.Header("Cache-Control", "public, max-age=60")
case strings.HasPrefix(path, "/api/v1/storage/zfs/pools/") && strings.HasSuffix(path, "/datasets"):
// ZFS datasets should not be cached - they change frequently
c.Header("Cache-Control", "no-cache, no-store, must-revalidate")
default:
// Default: no cache for other endpoints
c.Header("Cache-Control", "no-cache, no-store, must-revalidate")
@@ -168,4 +172,3 @@ func InvalidateCachePattern(cache *cache.Cache, pattern string) {
cache.Clear()
}
}

View File

@@ -4,12 +4,12 @@ import (
"context"
"time"
"github.com/atlasos/calypso/internal/audit"
"github.com/atlasos/calypso/internal/auth"
"github.com/atlasos/calypso/internal/common/cache"
"github.com/atlasos/calypso/internal/common/config"
"github.com/atlasos/calypso/internal/common/database"
"github.com/atlasos/calypso/internal/common/logger"
"github.com/atlasos/calypso/internal/audit"
"github.com/atlasos/calypso/internal/auth"
"github.com/atlasos/calypso/internal/iam"
"github.com/atlasos/calypso/internal/monitoring"
"github.com/atlasos/calypso/internal/scst"
@@ -44,10 +44,10 @@ func NewRouter(cfg *config.Config, db *database.DB, log *logger.Logger) *gin.Eng
r.Use(securityHeadersMiddleware(cfg))
r.Use(rateLimitMiddleware(cfg, log))
r.Use(corsMiddleware(cfg))
// Cache control headers (always applied)
r.Use(cacheControlMiddleware())
// Response caching middleware (if enabled)
if cfg.Server.Cache.Enabled {
cacheConfig := CacheConfig{
@@ -84,7 +84,7 @@ func NewRouter(cfg *config.Config, db *database.DB, log *logger.Logger) *gin.Eng
// Initialize and start alert rule engine
alertRuleEngine := monitoring.NewAlertRuleEngine(db, log, alertService)
// Register default alert rules
alertRuleEngine.RegisterRule(monitoring.NewAlertRule(
"storage-capacity-warning",
@@ -160,6 +160,10 @@ func NewRouter(cfg *config.Config, db *database.DB, log *logger.Logger) *gin.Eng
// Storage
storageHandler := storage.NewHandler(db, log)
// Pass cache to storage handler for cache invalidation
if responseCache != nil {
storageHandler.SetCache(responseCache)
}
storageGroup := protected.Group("/storage")
storageGroup.Use(requirePermission("storage", "read"))
{
@@ -180,6 +184,8 @@ func NewRouter(cfg *config.Config, db *database.DB, log *logger.Logger) *gin.Eng
storageGroup.GET("/zfs/pools/:id/datasets", storageHandler.ListZFSDatasets)
storageGroup.POST("/zfs/pools/:id/datasets", requirePermission("storage", "write"), storageHandler.CreateZFSDataset)
storageGroup.DELETE("/zfs/pools/:id/datasets/:dataset", requirePermission("storage", "write"), storageHandler.DeleteZFSDataset)
// ZFS ARC Stats
storageGroup.GET("/zfs/arc/stats", storageHandler.GetARCStats)
}
// SCST
@@ -286,6 +292,3 @@ func ginLogger(log *logger.Logger) gin.HandlerFunc {
)
}
}

View File

@@ -0,0 +1,111 @@
package storage
import (
"bufio"
"context"
"fmt"
"os"
"strconv"
"strings"
"time"
"github.com/atlasos/calypso/internal/common/logger"
)
// ARCStats represents ZFS ARC (Adaptive Replacement Cache) statistics
type ARCStats struct {
HitRatio float64 `json:"hit_ratio"` // Percentage of cache hits
CacheUsage float64 `json:"cache_usage"` // Percentage of cache used
CacheSize int64 `json:"cache_size"` // Current ARC size in bytes
CacheMax int64 `json:"cache_max"` // Maximum ARC size in bytes
Hits int64 `json:"hits"` // Total cache hits
Misses int64 `json:"misses"` // Total cache misses
DemandHits int64 `json:"demand_hits"` // Demand data/metadata hits
PrefetchHits int64 `json:"prefetch_hits"` // Prefetch hits
MRUHits int64 `json:"mru_hits"` // Most Recently Used hits
MFUHits int64 `json:"mfu_hits"` // Most Frequently Used hits
CollectedAt string `json:"collected_at"` // Timestamp when stats were collected
}
// ARCService handles ZFS ARC statistics collection
type ARCService struct {
logger *logger.Logger
}
// NewARCService creates a new ARC service
func NewARCService(log *logger.Logger) *ARCService {
return &ARCService{
logger: log,
}
}
// GetARCStats reads and parses ARC statistics from /proc/spl/kstat/zfs/arcstats
func (s *ARCService) GetARCStats(ctx context.Context) (*ARCStats, error) {
stats := &ARCStats{}
// Read ARC stats file
file, err := os.Open("/proc/spl/kstat/zfs/arcstats")
if err != nil {
return nil, fmt.Errorf("failed to open arcstats file: %w", err)
}
defer file.Close()
// Parse the file
scanner := bufio.NewScanner(file)
arcData := make(map[string]int64)
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
// Skip empty lines and header lines
if line == "" || strings.HasPrefix(line, "name") || strings.HasPrefix(line, "9") {
continue
}
// Parse lines like: "hits 4 311154"
fields := strings.Fields(line)
if len(fields) >= 3 {
key := fields[0]
// The value is in the last field (field index 2)
if value, err := strconv.ParseInt(fields[len(fields)-1], 10, 64); err == nil {
arcData[key] = value
}
}
}
if err := scanner.Err(); err != nil {
return nil, fmt.Errorf("failed to read arcstats file: %w", err)
}
// Extract key metrics
stats.Hits = arcData["hits"]
stats.Misses = arcData["misses"]
stats.DemandHits = arcData["demand_data_hits"] + arcData["demand_metadata_hits"]
stats.PrefetchHits = arcData["prefetch_data_hits"] + arcData["prefetch_metadata_hits"]
stats.MRUHits = arcData["mru_hits"]
stats.MFUHits = arcData["mfu_hits"]
// Current ARC size (c) and max size (c_max)
stats.CacheSize = arcData["c"]
stats.CacheMax = arcData["c_max"]
// Calculate hit ratio
totalRequests := stats.Hits + stats.Misses
if totalRequests > 0 {
stats.HitRatio = float64(stats.Hits) / float64(totalRequests) * 100.0
} else {
stats.HitRatio = 0.0
}
// Calculate cache usage percentage
if stats.CacheMax > 0 {
stats.CacheUsage = float64(stats.CacheSize) / float64(stats.CacheMax) * 100.0
} else {
stats.CacheUsage = 0.0
}
// Set collection timestamp
stats.CollectedAt = time.Now().Format(time.RFC3339)
return stats, nil
}

View File

@@ -5,6 +5,7 @@ import (
"net/http"
"strings"
"github.com/atlasos/calypso/internal/common/cache"
"github.com/atlasos/calypso/internal/common/database"
"github.com/atlasos/calypso/internal/common/logger"
"github.com/atlasos/calypso/internal/tasks"
@@ -16,9 +17,16 @@ type Handler struct {
diskService *DiskService
lvmService *LVMService
zfsService *ZFSService
arcService *ARCService
taskEngine *tasks.Engine
db *database.DB
logger *logger.Logger
cache *cache.Cache // Cache for invalidation
}
// SetCache sets the cache instance for cache invalidation
func (h *Handler) SetCache(c *cache.Cache) {
h.cache = c
}
// NewHandler creates a new storage handler
@@ -27,6 +35,7 @@ func NewHandler(db *database.DB, log *logger.Logger) *Handler {
diskService: NewDiskService(db, log),
lvmService: NewLVMService(db, log),
zfsService: NewZFSService(db, log),
arcService: NewARCService(log),
taskEngine: tasks.NewEngine(db, log),
db: db,
logger: log,
@@ -350,6 +359,11 @@ func (h *Handler) ListZFSDatasets(c *gin.Context) {
return
}
// Ensure we return an empty array instead of null
if datasets == nil {
datasets = []*ZFSDataset{}
}
c.JSON(http.StatusOK, gin.H{"datasets": datasets})
}
@@ -392,6 +406,12 @@ func (h *Handler) CreateZFSDataset(c *gin.Context) {
return
}
// Validate mount point: volumes cannot have mount points
if req.Type == "volume" && req.MountPoint != "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "mount point cannot be set for volume datasets (volumes are block devices for iSCSI export)"})
return
}
// Validate dataset name (should not contain pool name)
if strings.Contains(req.Name, "/") {
c.JSON(http.StatusBadRequest, gin.H{"error": "dataset name should not contain '/' (pool name is automatically prepended)"})
@@ -454,5 +474,26 @@ func (h *Handler) DeleteZFSDataset(c *gin.Context) {
return
}
// Invalidate cache for this pool's datasets list
if h.cache != nil {
// Generate cache key using the same format as cache middleware
cacheKey := fmt.Sprintf("http:/api/v1/storage/zfs/pools/%s/datasets:", poolID)
h.cache.Delete(cacheKey)
// Also invalidate any cached responses with query parameters
h.logger.Debug("Cache invalidated for dataset list", "pool_id", poolID, "key", cacheKey)
}
c.JSON(http.StatusOK, gin.H{"message": "Dataset deleted successfully"})
}
// GetARCStats returns ZFS ARC statistics
func (h *Handler) GetARCStats(c *gin.Context) {
stats, err := h.arcService.GetARCStats(c.Request.Context())
if err != nil {
h.logger.Error("Failed to get ARC stats", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to get ARC stats: " + err.Error()})
return
}
c.JSON(http.StatusOK, stats)
}

View File

@@ -4,13 +4,15 @@ import (
"context"
"database/sql"
"fmt"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
"github.com/lib/pq"
"github.com/atlasos/calypso/internal/common/database"
"github.com/atlasos/calypso/internal/common/logger"
"github.com/lib/pq"
)
// ZFSService handles ZFS pool management
@@ -29,23 +31,23 @@ func NewZFSService(db *database.DB, log *logger.Logger) *ZFSService {
// ZFSPool represents a ZFS pool
type ZFSPool struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
RaidLevel string `json:"raid_level"` // stripe, mirror, raidz, raidz2, raidz3
Disks []string `json:"disks"` // device paths
SpareDisks []string `json:"spare_disks"` // spare disk paths
SizeBytes int64 `json:"size_bytes"`
UsedBytes int64 `json:"used_bytes"`
Compression string `json:"compression"` // off, lz4, zstd, gzip
Deduplication bool `json:"deduplication"`
AutoExpand bool `json:"auto_expand"`
ScrubInterval int `json:"scrub_interval"` // days
IsActive bool `json:"is_active"`
HealthStatus string `json:"health_status"` // online, degraded, faulted, offline
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
CreatedBy string `json:"created_by"`
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
RaidLevel string `json:"raid_level"` // stripe, mirror, raidz, raidz2, raidz3
Disks []string `json:"disks"` // device paths
SpareDisks []string `json:"spare_disks"` // spare disk paths
SizeBytes int64 `json:"size_bytes"`
UsedBytes int64 `json:"used_bytes"`
Compression string `json:"compression"` // off, lz4, zstd, gzip
Deduplication bool `json:"deduplication"`
AutoExpand bool `json:"auto_expand"`
ScrubInterval int `json:"scrub_interval"` // days
IsActive bool `json:"is_active"`
HealthStatus string `json:"health_status"` // online, degraded, faulted, offline
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
CreatedBy string `json:"created_by"`
}
// CreatePool creates a new ZFS pool
@@ -559,122 +561,72 @@ func (s *ZFSService) AddSpareDisk(ctx context.Context, poolID string, diskPaths
// ZFSDataset represents a ZFS dataset
type ZFSDataset struct {
Name string `json:"name"`
Pool string `json:"pool"`
Type string `json:"type"` // filesystem, volume, snapshot
MountPoint string `json:"mount_point"`
UsedBytes int64 `json:"used_bytes"`
AvailableBytes int64 `json:"available_bytes"`
ReferencedBytes int64 `json:"referenced_bytes"`
Compression string `json:"compression"`
Deduplication string `json:"deduplication"`
Quota int64 `json:"quota"` // -1 for unlimited
Reservation int64 `json:"reservation"`
CreatedAt time.Time `json:"created_at"`
Name string `json:"name"`
Pool string `json:"pool"`
Type string `json:"type"` // filesystem, volume, snapshot
MountPoint string `json:"mount_point"`
UsedBytes int64 `json:"used_bytes"`
AvailableBytes int64 `json:"available_bytes"`
ReferencedBytes int64 `json:"referenced_bytes"`
Compression string `json:"compression"`
Deduplication string `json:"deduplication"`
Quota int64 `json:"quota"` // -1 for unlimited
Reservation int64 `json:"reservation"`
CreatedAt time.Time `json:"created_at"`
}
// ListDatasets lists all datasets in a ZFS pool
// ListDatasets lists all datasets in a ZFS pool from database
func (s *ZFSService) ListDatasets(ctx context.Context, poolName string) ([]*ZFSDataset, error) {
// Get all datasets in the pool using zfs list
cmd := exec.CommandContext(ctx, "zfs", "list", "-H", "-o", "name,used,avail,refer,compress,dedup,quota,reservation,mountpoint", "-r", poolName)
output, err := cmd.CombinedOutput()
// Get datasets from database
query := `
SELECT name, pool_name, type, mount_point,
used_bytes, available_bytes, referenced_bytes,
compression, deduplication, quota, reservation,
created_at
FROM zfs_datasets
WHERE pool_name = $1
ORDER BY name
`
rows, err := s.db.QueryContext(ctx, query, poolName)
if err != nil {
// If pool doesn't exist, return empty list
if strings.Contains(string(output), "does not exist") {
// If table doesn't exist, return empty list (migration not run yet)
if strings.Contains(err.Error(), "does not exist") {
s.logger.Warn("zfs_datasets table does not exist, returning empty list", "pool", poolName)
return []*ZFSDataset{}, nil
}
return nil, fmt.Errorf("failed to list datasets: %s: %w", string(output), err)
return nil, fmt.Errorf("failed to list datasets from database: %w", err)
}
defer rows.Close()
var datasets []*ZFSDataset
lines := strings.Split(strings.TrimSpace(string(output)), "\n")
for _, line := range lines {
if line == "" {
for rows.Next() {
var ds ZFSDataset
var mountPoint sql.NullString
err := rows.Scan(
&ds.Name, &ds.Pool, &ds.Type, &mountPoint,
&ds.UsedBytes, &ds.AvailableBytes, &ds.ReferencedBytes,
&ds.Compression, &ds.Deduplication, &ds.Quota, &ds.Reservation,
&ds.CreatedAt,
)
if err != nil {
s.logger.Error("Failed to scan dataset row", "error", err)
continue
}
fields := strings.Fields(line)
if len(fields) < 9 {
continue
// Handle nullable mount_point
if mountPoint.Valid {
ds.MountPoint = mountPoint.String
} else {
ds.MountPoint = "none"
}
datasetName := fields[0]
// Skip the pool itself (root dataset)
if datasetName == poolName {
continue
}
datasets = append(datasets, &ds)
}
// Extract pool name from dataset name (e.g., "pool/dataset" -> "pool")
poolFromName := strings.Split(datasetName, "/")[0]
if poolFromName != poolName {
continue
}
usedBytes, _ := parseZFSSize(fields[1])
availableBytes, _ := parseZFSSize(fields[2])
referencedBytes, _ := parseZFSSize(fields[3])
compression := fields[4]
deduplication := fields[5]
quotaStr := fields[6]
reservationStr := fields[7]
mountPoint := fields[8]
quota := int64(-1) // -1 means unlimited
if quotaStr != "-" && quotaStr != "none" {
if q, err := parseZFSSize(quotaStr); err == nil {
quota = q
}
}
reservation := int64(0)
if reservationStr != "-" && reservationStr != "none" {
if r, err := parseZFSSize(reservationStr); err == nil {
reservation = r
}
}
// Determine dataset type
datasetType := "filesystem"
volCmd := exec.CommandContext(ctx, "zfs", "get", "-H", "-o", "value", "type", datasetName)
volOutput, err := volCmd.Output()
if err == nil {
volType := strings.TrimSpace(string(volOutput))
if volType == "volume" {
datasetType = "volume"
} else if strings.Contains(volType, "snapshot") {
datasetType = "snapshot"
}
}
// Get creation time
createdAt := time.Now()
creationCmd := exec.CommandContext(ctx, "zfs", "get", "-H", "-o", "value", "creation", datasetName)
creationOutput, err := creationCmd.Output()
if err == nil {
creationStr := strings.TrimSpace(string(creationOutput))
// Try parsing different date formats
if t, err := time.Parse("Mon Jan 2 15:04:05 2006", creationStr); err == nil {
createdAt = t
} else if t, err := time.Parse(time.RFC3339, creationStr); err == nil {
createdAt = t
}
}
datasets = append(datasets, &ZFSDataset{
Name: datasetName,
Pool: poolName,
Type: datasetType,
MountPoint: mountPoint,
UsedBytes: usedBytes,
AvailableBytes: availableBytes,
ReferencedBytes: referencedBytes,
Compression: compression,
Deduplication: deduplication,
Quota: quota,
Reservation: reservation,
CreatedAt: createdAt,
})
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating dataset rows: %w", err)
}
return datasets, nil
@@ -682,12 +634,12 @@ func (s *ZFSService) ListDatasets(ctx context.Context, poolName string) ([]*ZFSD
// CreateDatasetRequest represents a request to create a ZFS dataset
type CreateDatasetRequest struct {
Name string `json:"name"` // Dataset name (e.g., "pool/dataset" or just "dataset")
Type string `json:"type"` // "filesystem" or "volume"
Compression string `json:"compression"` // off, lz4, zstd, gzip, etc.
Quota int64 `json:"quota"` // -1 for unlimited
Reservation int64 `json:"reservation"` // 0 for none
MountPoint string `json:"mount_point"` // Optional mount point
Name string `json:"name"` // Dataset name (e.g., "pool/dataset" or just "dataset")
Type string `json:"type"` // "filesystem" or "volume"
Compression string `json:"compression"` // off, lz4, zstd, gzip, etc.
Quota int64 `json:"quota"` // -1 for unlimited
Reservation int64 `json:"reservation"` // 0 for none
MountPoint string `json:"mount_point"` // Optional mount point
}
// CreateDataset creates a new ZFS dataset
@@ -695,9 +647,41 @@ func (s *ZFSService) CreateDataset(ctx context.Context, poolName string, req Cre
// Construct full dataset name
fullName := poolName + "/" + req.Name
// For filesystem datasets, create mount directory if mount point is provided
if req.Type == "filesystem" && req.MountPoint != "" {
// Clean and validate mount point path
mountPath := filepath.Clean(req.MountPoint)
// Check if directory already exists
if info, err := os.Stat(mountPath); err == nil {
if !info.IsDir() {
return nil, fmt.Errorf("mount point path exists but is not a directory: %s", mountPath)
}
// Directory exists, check if it's empty
dir, err := os.Open(mountPath)
if err == nil {
entries, err := dir.Readdirnames(1)
dir.Close()
if err == nil && len(entries) > 0 {
s.logger.Warn("Mount directory is not empty", "path", mountPath)
// Continue anyway, ZFS will mount over it
}
}
} else if os.IsNotExist(err) {
// Create directory with proper permissions (0755)
s.logger.Info("Creating mount directory", "path", mountPath)
if err := os.MkdirAll(mountPath, 0755); err != nil {
return nil, fmt.Errorf("failed to create mount directory %s: %w", mountPath, err)
}
s.logger.Info("Mount directory created successfully", "path", mountPath)
} else {
return nil, fmt.Errorf("failed to check mount directory %s: %w", mountPath, err)
}
}
// Build zfs create command
args := []string{"create"}
// Add type if volume
if req.Type == "volume" {
// For volumes, we need size (use quota as size)
@@ -715,8 +699,8 @@ func (s *ZFSService) CreateDataset(ctx context.Context, poolName string, req Cre
args = append(args, "-o", fmt.Sprintf("compression=%s", req.Compression))
}
// Set mount point if provided
if req.MountPoint != "" {
// Set mount point if provided (only for filesystems, not volumes)
if req.Type == "filesystem" && req.MountPoint != "" {
args = append(args, "-o", fmt.Sprintf("mountpoint=%s", req.MountPoint))
}
@@ -746,38 +730,196 @@ func (s *ZFSService) CreateDataset(ctx context.Context, poolName string, req Cre
}
}
// Get the created dataset info
datasets, err := s.ListDatasets(ctx, poolName)
// Get pool ID from pool name
var poolID string
err = s.db.QueryRowContext(ctx, "SELECT id FROM zfs_pools WHERE name = $1", poolName).Scan(&poolID)
if err != nil {
return nil, fmt.Errorf("failed to list datasets after creation: %w", err)
s.logger.Error("Failed to get pool ID", "pool", poolName, "error", err)
// Try to destroy the dataset if we can't save to database
exec.CommandContext(ctx, "zfs", "destroy", "-r", fullName).Run()
return nil, fmt.Errorf("failed to get pool ID: %w", err)
}
// Find the newly created dataset
for _, ds := range datasets {
if ds.Name == fullName {
s.logger.Info("ZFS dataset created successfully", "name", fullName)
return ds, nil
// Get dataset info from ZFS to save to database
cmd = exec.CommandContext(ctx, "zfs", "list", "-H", "-o", "name,used,avail,refer,compress,dedup,quota,reservation,mountpoint", fullName)
output, err = cmd.CombinedOutput()
if err != nil {
s.logger.Error("Failed to get dataset info", "name", fullName, "error", err)
// Try to destroy the dataset if we can't get info
exec.CommandContext(ctx, "zfs", "destroy", "-r", fullName).Run()
return nil, fmt.Errorf("failed to get dataset info: %w", err)
}
// Parse dataset info
lines := strings.TrimSpace(string(output))
if lines == "" {
exec.CommandContext(ctx, "zfs", "destroy", "-r", fullName).Run()
return nil, fmt.Errorf("dataset not found after creation")
}
fields := strings.Fields(lines)
if len(fields) < 9 {
exec.CommandContext(ctx, "zfs", "destroy", "-r", fullName).Run()
return nil, fmt.Errorf("invalid dataset info format")
}
usedBytes, _ := parseZFSSize(fields[1])
availableBytes, _ := parseZFSSize(fields[2])
referencedBytes, _ := parseZFSSize(fields[3])
compression := fields[4]
deduplication := fields[5]
quotaStr := fields[6]
reservationStr := fields[7]
mountPoint := fields[8]
// Determine dataset type
datasetType := req.Type
typeCmd := exec.CommandContext(ctx, "zfs", "get", "-H", "-o", "value", "type", fullName)
if typeOutput, err := typeCmd.Output(); err == nil {
volType := strings.TrimSpace(string(typeOutput))
if volType == "volume" {
datasetType = "volume"
} else if strings.Contains(volType, "snapshot") {
datasetType = "snapshot"
}
}
return nil, fmt.Errorf("dataset created but not found in list")
// Parse quota
quota := int64(-1)
if datasetType == "volume" {
// For volumes, get volsize
volsizeCmd := exec.CommandContext(ctx, "zfs", "get", "-H", "-o", "value", "volsize", fullName)
if volsizeOutput, err := volsizeCmd.Output(); err == nil {
volsizeStr := strings.TrimSpace(string(volsizeOutput))
if volsizeStr != "-" && volsizeStr != "none" {
if vs, err := parseZFSSize(volsizeStr); err == nil {
quota = vs
}
}
}
} else if quotaStr != "-" && quotaStr != "none" {
if q, err := parseZFSSize(quotaStr); err == nil {
quota = q
}
}
// Parse reservation
reservation := int64(0)
if reservationStr != "-" && reservationStr != "none" {
if r, err := parseZFSSize(reservationStr); err == nil {
reservation = r
}
}
// Normalize mount point for volumes
if datasetType == "volume" && mountPoint == "-" {
mountPoint = "none"
}
// Get creation time
createdAt := time.Now()
creationCmd := exec.CommandContext(ctx, "zfs", "get", "-H", "-o", "value", "creation", fullName)
if creationOutput, err := creationCmd.Output(); err == nil {
creationStr := strings.TrimSpace(string(creationOutput))
if t, err := time.Parse("Mon Jan 2 15:04:05 2006", creationStr); err == nil {
createdAt = t
} else if t, err := time.Parse(time.RFC3339, creationStr); err == nil {
createdAt = t
}
}
// Save to database (works for both filesystem and volume datasets)
// Volume datasets are stored in the same zfs_datasets table with type='volume'
insertQuery := `
INSERT INTO zfs_datasets (
name, pool_id, pool_name, type, mount_point,
used_bytes, available_bytes, referenced_bytes,
compression, deduplication, quota, reservation,
created_at, updated_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, NOW())
RETURNING id
`
var datasetID string
err = s.db.QueryRowContext(ctx, insertQuery,
fullName, poolID, poolName, datasetType, mountPoint,
usedBytes, availableBytes, referencedBytes,
compression, deduplication, quota, reservation,
createdAt,
).Scan(&datasetID)
if err != nil {
s.logger.Error("Failed to save dataset to database", "name", fullName, "error", err)
// Try to destroy the dataset if we can't save to database
exec.CommandContext(ctx, "zfs", "destroy", "-r", fullName).Run()
return nil, fmt.Errorf("failed to save dataset to database: %w", err)
}
// Return dataset info
dataset := &ZFSDataset{
Name: fullName,
Pool: poolName,
Type: datasetType,
MountPoint: mountPoint,
UsedBytes: usedBytes,
AvailableBytes: availableBytes,
ReferencedBytes: referencedBytes,
Compression: compression,
Deduplication: deduplication,
Quota: quota,
Reservation: reservation,
CreatedAt: createdAt,
}
s.logger.Info("ZFS dataset created and saved to database", "name", fullName, "id", datasetID)
return dataset, nil
}
// DeleteDataset deletes a ZFS dataset
func (s *ZFSService) DeleteDataset(ctx context.Context, datasetName string) error {
// Check if dataset exists
cmd := exec.CommandContext(ctx, "zfs", "list", "-H", "-o", "name", datasetName)
// Check if dataset exists and get its mount point before deletion
var mountPoint string
cmd := exec.CommandContext(ctx, "zfs", "list", "-H", "-o", "name,mountpoint", datasetName)
output, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("dataset %s does not exist: %w", datasetName, err)
}
if strings.TrimSpace(string(output)) != datasetName {
lines := strings.TrimSpace(string(output))
if lines == "" {
return fmt.Errorf("dataset %s not found", datasetName)
}
// Delete the dataset (use -r for recursive to delete children)
s.logger.Info("Deleting ZFS dataset", "name", datasetName)
// Parse output to get mount point
fields := strings.Fields(lines)
if len(fields) >= 2 {
mountPoint = fields[1]
}
// Get dataset type to determine if we should clean up mount directory
var datasetType string
typeCmd := exec.CommandContext(ctx, "zfs", "get", "-H", "-o", "value", "type", datasetName)
typeOutput, err := typeCmd.Output()
if err == nil {
datasetType = strings.TrimSpace(string(typeOutput))
}
// Delete from database first (before ZFS deletion, so we have the record)
// This ensures we can clean up even if ZFS deletion partially fails
// Works for both filesystem and volume datasets
deleteQuery := "DELETE FROM zfs_datasets WHERE name = $1"
result, err := s.db.ExecContext(ctx, deleteQuery, datasetName)
if err != nil {
s.logger.Warn("Failed to delete dataset from database (may not exist)", "name", datasetName, "error", err)
// Continue with ZFS deletion anyway
} else {
rowsAffected, _ := result.RowsAffected()
if rowsAffected > 0 {
s.logger.Info("Dataset removed from database", "name", datasetName)
}
}
// Delete the dataset from ZFS (use -r for recursive to delete children)
s.logger.Info("Deleting ZFS dataset", "name", datasetName, "mountpoint", mountPoint)
cmd = exec.CommandContext(ctx, "zfs", "destroy", "-r", datasetName)
output, err = cmd.CombinedOutput()
if err != nil {
@@ -786,7 +928,35 @@ func (s *ZFSService) DeleteDataset(ctx context.Context, datasetName string) erro
return fmt.Errorf("failed to delete dataset: %s", errorMsg)
}
// Clean up mount directory if it exists and is a filesystem dataset
// Only remove if mount point is not "-" (volumes) and not "none" or "legacy"
if datasetType == "filesystem" && mountPoint != "" && mountPoint != "-" && mountPoint != "none" && mountPoint != "legacy" {
mountPath := filepath.Clean(mountPoint)
// Check if directory exists
if info, err := os.Stat(mountPath); err == nil && info.IsDir() {
// Check if directory is empty
dir, err := os.Open(mountPath)
if err == nil {
entries, err := dir.Readdirnames(1)
dir.Close()
// Only remove if directory is empty
if err == nil && len(entries) == 0 {
s.logger.Info("Removing empty mount directory", "path", mountPath)
if err := os.Remove(mountPath); err != nil {
s.logger.Warn("Failed to remove mount directory", "path", mountPath, "error", err)
// Don't fail the deletion if we can't remove the directory
} else {
s.logger.Info("Mount directory removed successfully", "path", mountPath)
}
} else {
s.logger.Info("Mount directory is not empty, keeping it", "path", mountPath)
}
}
}
}
s.logger.Info("ZFS dataset deleted successfully", "name", datasetName)
return nil
}