diff --git a/backend/internal/backup/handler.go b/backend/internal/backup/handler.go index 982dadf..0532a1f 100644 --- a/backend/internal/backup/handler.go +++ b/backend/internal/backup/handler.go @@ -156,6 +156,19 @@ func (h *Handler) ListClients(c *gin.Context) { // Parse search query opts.Search = c.Query("search") + // Parse category filter + if category := c.Query("category"); category != "" { + // Validate category + validCategories := map[string]bool{ + "File": true, + "Database": true, + "Virtual": true, + } + if validCategories[category] { + opts.Category = category + } + } + clients, err := h.service.ListClients(c.Request.Context(), opts) if err != nil { h.logger.Error("Failed to list clients", "error", err) diff --git a/backend/internal/backup/service.go b/backend/internal/backup/service.go index d26eec7..3617be1 100644 --- a/backend/internal/backup/service.go +++ b/backend/internal/backup/service.go @@ -84,6 +84,7 @@ type Client struct { Name string `json:"name"` Uname *string `json:"uname,omitempty"` Enabled bool `json:"enabled"` + Category *string `json:"category,omitempty"` // "File", "Database", "Virtual" AutoPrune *bool `json:"auto_prune,omitempty"` FileRetention *int64 `json:"file_retention,omitempty"` JobRetention *int64 `json:"job_retention,omitempty"` @@ -95,8 +96,9 @@ type Client struct { // ListClientsOptions represents filtering options for clients type ListClientsOptions struct { - Enabled *bool // Filter by enabled status (nil = all) - Search string // Search by client name + Enabled *bool // Filter by enabled status (nil = all) + Search string // Search by client name + Category string // Filter by category: "File", "Database", "Virtual" (empty = all) } // DashboardStats represents statistics for the backup dashboard @@ -312,7 +314,8 @@ func (s *Service) queryBaculaDirect(ctx context.Context) ([]Job, error) { // syncFromBconsole syncs jobs using bconsole command (fallback method) func (s *Service) syncFromBconsole(ctx context.Context) error { // Execute bconsole command to list jobs - cmd := exec.CommandContext(ctx, "sh", "-c", "echo -e 'list jobs\nquit' | bconsole") + bconsoleConfig := "/opt/calypso/conf/bacula/bconsole.conf" + cmd := exec.CommandContext(ctx, "sh", "-c", fmt.Sprintf("echo -e 'list jobs\nquit' | bconsole -c %s", bconsoleConfig)) output, err := cmd.CombinedOutput() if err != nil { @@ -507,7 +510,8 @@ func (s *Service) parseBconsoleOutput(ctx context.Context, output string) []Job // getClientNameFromJob gets client name from job details using bconsole func (s *Service) getClientNameFromJob(ctx context.Context, jobID int) string { // Execute bconsole to get job details - cmd := exec.CommandContext(ctx, "sh", "-c", fmt.Sprintf("echo -e 'list job jobid=%d\nquit' | bconsole", jobID)) + bconsoleConfig := "/opt/calypso/conf/bacula/bconsole.conf" + cmd := exec.CommandContext(ctx, "sh", "-c", fmt.Sprintf("echo -e 'list job jobid=%d\nquit' | bconsole -c %s", jobID, bconsoleConfig)) output, err := cmd.CombinedOutput() if err != nil { @@ -550,7 +554,9 @@ func (s *Service) ExecuteBconsoleCommand(ctx context.Context, command string) (s escapedCommand := strings.ReplaceAll(commandWithQuit, "'", "'\"'\"'") // Execute bconsole command using printf to avoid echo -e issues - cmd := exec.CommandContext(ctx, "sh", "-c", fmt.Sprintf("printf '%%s\\n' '%s' | bconsole", escapedCommand)) + // Use -c flag to specify config file location + bconsoleConfig := "/opt/calypso/conf/bacula/bconsole.conf" + cmd := exec.CommandContext(ctx, "sh", "-c", fmt.Sprintf("printf '%%s\\n' '%s' | bconsole -c %s", escapedCommand, bconsoleConfig)) output, err := cmd.CombinedOutput() if err != nil { @@ -728,10 +734,94 @@ func (s *Service) queryClientsFromDatabase(ctx context.Context, opts ListClients return nil, fmt.Errorf("error iterating client rows: %w", err) } + // Load categories from Calypso database + if err := s.loadClientCategories(ctx, clients); err != nil { + s.logger.Warn("Failed to load client categories", "error", err) + // Continue without categories, set default to "File" + for i := range clients { + if clients[i].Category == nil { + defaultCategory := "File" + clients[i].Category = &defaultCategory + } + } + } + + // Filter by category if specified + if opts.Category != "" { + filtered := []Client{} + for _, client := range clients { + if client.Category != nil && *client.Category == opts.Category { + filtered = append(filtered, client) + } + } + clients = filtered + } + s.logger.Debug("Queried clients from Bacula database", "count", len(clients)) return clients, nil } +// loadClientCategories loads category information from Calypso database +func (s *Service) loadClientCategories(ctx context.Context, clients []Client) error { + if len(clients) == 0 { + return nil + } + + // Build query to get categories for all client IDs + clientIDs := make([]interface{}, len(clients)) + for i, client := range clients { + clientIDs[i] = client.ClientID + } + + // Create placeholders for IN clause + placeholders := make([]string, len(clientIDs)) + args := make([]interface{}, len(clientIDs)) + for i := range clientIDs { + placeholders[i] = fmt.Sprintf("$%d", i+1) + args[i] = clientIDs[i] + } + + query := fmt.Sprintf(` + SELECT client_id, category + FROM client_metadata + WHERE client_id IN (%s) + `, strings.Join(placeholders, ",")) + + rows, err := s.db.QueryContext(ctx, query, args...) + if err != nil { + // Table might not exist yet (migration not run), return nil error + if strings.Contains(err.Error(), "does not exist") { + return nil + } + return fmt.Errorf("failed to query client categories: %w", err) + } + defer rows.Close() + + // Create map of client_id -> category + categoryMap := make(map[int]string) + for rows.Next() { + var clientID int + var category string + if err := rows.Scan(&clientID, &category); err != nil { + continue + } + categoryMap[clientID] = category + } + + // Assign categories to clients + for i := range clients { + if category, exists := categoryMap[clients[i].ClientID]; exists { + clients[i].Category = &category + } else { + // Default to "File" if no category found + defaultCategory := "File" + clients[i].Category = &defaultCategory + } + } + + return nil +} + // queryClientsFromBconsole queries clients using bconsole command (fallback method) func (s *Service) queryClientsFromBconsole(ctx context.Context, opts ListClientsOptions) ([]Client, error) { // Execute bconsole command to list clients @@ -752,6 +842,18 @@ func (s *Service) queryClientsFromBconsole(ctx context.Context, opts ListClients clients := s.parseBconsoleClientsOutput(output) s.logger.Debug("Parsed clients from bconsole", "count", len(clients)) + // Load categories from Calypso database + if err := s.loadClientCategories(ctx, clients); err != nil { + s.logger.Warn("Failed to load client categories", "error", err) + // Continue without categories, set default to "File" + for i := range clients { + if clients[i].Category == nil { + defaultCategory := "File" + clients[i].Category = &defaultCategory + } + } + } + // Apply filters filtered := []Client{} for _, client := range clients { @@ -761,6 +863,11 @@ func (s *Service) queryClientsFromBconsole(ctx context.Context, opts ListClients if opts.Search != "" && !strings.Contains(strings.ToLower(client.Name), strings.ToLower(opts.Search)) { continue } + if opts.Category != "" { + if client.Category == nil || *client.Category != opts.Category { + continue + } + } filtered = append(filtered, client) } diff --git a/backend/internal/common/database/migrations/012_add_snapshot_schedules_table.sql b/backend/internal/common/database/migrations/012_add_snapshot_schedules_table.sql new file mode 100644 index 0000000..8e36c2a --- /dev/null +++ b/backend/internal/common/database/migrations/012_add_snapshot_schedules_table.sql @@ -0,0 +1,23 @@ +-- Snapshot schedules table for automated snapshot creation +CREATE TABLE IF NOT EXISTS snapshot_schedules ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + name VARCHAR(255) NOT NULL, + dataset VARCHAR(255) NOT NULL, + snapshot_name_template VARCHAR(255) NOT NULL, -- e.g., "auto-%Y-%m-%d-%H%M" or "daily-backup" + schedule_type VARCHAR(50) NOT NULL, -- 'hourly', 'daily', 'weekly', 'monthly', 'cron' + schedule_config JSONB NOT NULL, -- For cron: {"cron": "0 0 * * *"}, for others: {"time": "00:00", "day": 1, etc.} + recursive BOOLEAN NOT NULL DEFAULT false, + enabled BOOLEAN NOT NULL DEFAULT true, + retention_count INTEGER, -- Keep last N snapshots (null = unlimited) + retention_days INTEGER, -- Keep snapshots for N days (null = unlimited) + last_run_at TIMESTAMP, + next_run_at TIMESTAMP, + created_by UUID REFERENCES users(id), + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP NOT NULL DEFAULT NOW(), + UNIQUE(name) +); + +CREATE INDEX IF NOT EXISTS idx_snapshot_schedules_enabled ON snapshot_schedules(enabled); +CREATE INDEX IF NOT EXISTS idx_snapshot_schedules_next_run ON snapshot_schedules(next_run_at); +CREATE INDEX IF NOT EXISTS idx_snapshot_schedules_dataset ON snapshot_schedules(dataset); diff --git a/backend/internal/common/database/migrations/013_add_replication_tasks_table.sql b/backend/internal/common/database/migrations/013_add_replication_tasks_table.sql new file mode 100644 index 0000000..556d362 --- /dev/null +++ b/backend/internal/common/database/migrations/013_add_replication_tasks_table.sql @@ -0,0 +1,76 @@ +-- ZFS Replication Tasks Table +-- Supports both outbound (sender) and inbound (receiver) replication +CREATE TABLE IF NOT EXISTS replication_tasks ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + name VARCHAR(255) NOT NULL, + direction VARCHAR(20) NOT NULL, -- 'outbound' (sender) or 'inbound' (receiver) + + -- For outbound replication (sender) + source_dataset VARCHAR(512), -- Source dataset on this system (outbound) or remote system (inbound) + target_host VARCHAR(255), -- Target host IP or hostname (for outbound) + target_port INTEGER DEFAULT 22, -- SSH port (default 22, for outbound) + target_user VARCHAR(255) DEFAULT 'root', -- SSH user (for outbound) + target_dataset VARCHAR(512), -- Target dataset on remote system (for outbound) + target_ssh_key_path TEXT, -- Path to SSH private key (for outbound) + + -- For inbound replication (receiver) + source_host VARCHAR(255), -- Source host IP or hostname (for inbound) + source_port INTEGER DEFAULT 22, -- SSH port (for inbound) + source_user VARCHAR(255) DEFAULT 'root', -- SSH user (for inbound) + local_dataset VARCHAR(512), -- Local dataset to receive snapshots (for inbound) + + -- Common settings + schedule_type VARCHAR(50), -- 'manual', 'hourly', 'daily', 'weekly', 'monthly', 'cron' + schedule_config JSONB, -- Schedule configuration (similar to snapshot schedules) + compression VARCHAR(50) DEFAULT 'lz4', -- 'off', 'lz4', 'gzip', 'zstd' + encryption BOOLEAN DEFAULT false, -- Enable encryption during transfer + recursive BOOLEAN DEFAULT false, -- Replicate recursively + incremental BOOLEAN DEFAULT true, -- Use incremental replication + auto_snapshot BOOLEAN DEFAULT true, -- Auto-create snapshot before replication + + -- Status and tracking + enabled BOOLEAN NOT NULL DEFAULT true, + status VARCHAR(50) DEFAULT 'idle', -- 'idle', 'running', 'failed', 'paused' + last_run_at TIMESTAMP, + last_run_status VARCHAR(50), -- 'success', 'failed', 'partial' + last_run_error TEXT, + next_run_at TIMESTAMP, + last_snapshot_sent VARCHAR(512), -- Last snapshot successfully sent (for outbound) + last_snapshot_received VARCHAR(512), -- Last snapshot successfully received (for inbound) + + -- Statistics + total_runs INTEGER DEFAULT 0, + successful_runs INTEGER DEFAULT 0, + failed_runs INTEGER DEFAULT 0, + bytes_sent BIGINT DEFAULT 0, -- Total bytes sent (for outbound) + bytes_received BIGINT DEFAULT 0, -- Total bytes received (for inbound) + + created_by UUID REFERENCES users(id), + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP NOT NULL DEFAULT NOW(), + + -- Validation: ensure required fields based on direction + CONSTRAINT chk_direction CHECK (direction IN ('outbound', 'inbound')), + CONSTRAINT chk_outbound_fields CHECK ( + direction != 'outbound' OR ( + source_dataset IS NOT NULL AND + target_host IS NOT NULL AND + target_dataset IS NOT NULL + ) + ), + CONSTRAINT chk_inbound_fields CHECK ( + direction != 'inbound' OR ( + source_host IS NOT NULL AND + source_dataset IS NOT NULL AND + local_dataset IS NOT NULL + ) + ) +); + +-- Create indexes +CREATE INDEX IF NOT EXISTS idx_replication_tasks_direction ON replication_tasks(direction); +CREATE INDEX IF NOT EXISTS idx_replication_tasks_enabled ON replication_tasks(enabled); +CREATE INDEX IF NOT EXISTS idx_replication_tasks_status ON replication_tasks(status); +CREATE INDEX IF NOT EXISTS idx_replication_tasks_next_run ON replication_tasks(next_run_at); +CREATE INDEX IF NOT EXISTS idx_replication_tasks_source_dataset ON replication_tasks(source_dataset); +CREATE INDEX IF NOT EXISTS idx_replication_tasks_target_host ON replication_tasks(target_host); diff --git a/backend/internal/common/database/migrations/014_add_client_categories_table.sql b/backend/internal/common/database/migrations/014_add_client_categories_table.sql new file mode 100644 index 0000000..4fad7d1 --- /dev/null +++ b/backend/internal/common/database/migrations/014_add_client_categories_table.sql @@ -0,0 +1,30 @@ +-- AtlasOS - Calypso +-- Client Categories Schema +-- Version: 14.0 +-- Adds category support for backup clients (File, Database, Virtual) + +-- Client metadata table to store additional information about clients +-- This extends the Bacula Client table with Calypso-specific metadata +CREATE TABLE IF NOT EXISTS client_metadata ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + client_id INTEGER NOT NULL, -- Bacula Client.ClientId + client_name VARCHAR(255) NOT NULL, -- Bacula Client.Name (for reference) + category VARCHAR(50) NOT NULL DEFAULT 'File', -- 'File', 'Database', 'Virtual' + description TEXT, + tags JSONB, -- Additional tags/metadata as JSON + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP NOT NULL DEFAULT NOW(), + + -- Ensure one metadata entry per client + CONSTRAINT unique_client_id UNIQUE (client_id), + CONSTRAINT chk_category CHECK (category IN ('File', 'Database', 'Virtual')) +); + +-- Indexes for performance +CREATE INDEX IF NOT EXISTS idx_client_metadata_client_id ON client_metadata(client_id); +CREATE INDEX IF NOT EXISTS idx_client_metadata_client_name ON client_metadata(client_name); +CREATE INDEX IF NOT EXISTS idx_client_metadata_category ON client_metadata(category); + +-- Add comment +COMMENT ON TABLE client_metadata IS 'Stores Calypso-specific metadata for backup clients, including category classification'; +COMMENT ON COLUMN client_metadata.category IS 'Client category: File (file system backups), Database (database backups), Virtual (virtual machine backups)'; diff --git a/backend/internal/common/router/router.go b/backend/internal/common/router/router.go index 782b87e..0b8e769 100644 --- a/backend/internal/common/router/router.go +++ b/backend/internal/common/router/router.go @@ -176,6 +176,11 @@ func NewRouter(cfg *config.Config, db *database.DB, log *logger.Logger) *gin.Eng zfsPoolMonitor := storage.NewZFSPoolMonitor(db, log, 2*time.Minute) go zfsPoolMonitor.Start(context.Background()) + // Start snapshot schedule worker in background (checks schedules every 1 minute) + snapshotService := storage.NewSnapshotService(db, log) + snapshotScheduleWorker := storage.NewSnapshotScheduleWorker(db, log, snapshotService, 1*time.Minute) + go snapshotScheduleWorker.Start(context.Background()) + storageGroup := protected.Group("/storage") storageGroup.Use(requirePermission("storage", "read")) { @@ -196,6 +201,26 @@ func NewRouter(cfg *config.Config, db *database.DB, log *logger.Logger) *gin.Eng storageGroup.GET("/zfs/pools/:id/datasets", storageHandler.ListZFSDatasets) storageGroup.POST("/zfs/pools/:id/datasets", requirePermission("storage", "write"), storageHandler.CreateZFSDataset) storageGroup.DELETE("/zfs/pools/:id/datasets/:dataset", requirePermission("storage", "write"), storageHandler.DeleteZFSDataset) + // ZFS Snapshots + storageGroup.GET("/zfs/snapshots", storageHandler.ListSnapshots) + storageGroup.POST("/zfs/snapshots", requirePermission("storage", "write"), storageHandler.CreateSnapshot) + storageGroup.DELETE("/zfs/snapshots/:name", requirePermission("storage", "write"), storageHandler.DeleteSnapshot) + storageGroup.POST("/zfs/snapshots/:name/rollback", requirePermission("storage", "write"), storageHandler.RollbackSnapshot) + storageGroup.POST("/zfs/snapshots/:name/clone", requirePermission("storage", "write"), storageHandler.CloneSnapshot) + // Snapshot Schedules + storageGroup.GET("/zfs/snapshot-schedules", storageHandler.ListSnapshotSchedules) + storageGroup.GET("/zfs/snapshot-schedules/:id", storageHandler.GetSnapshotSchedule) + storageGroup.POST("/zfs/snapshot-schedules", requirePermission("storage", "write"), storageHandler.CreateSnapshotSchedule) + storageGroup.PUT("/zfs/snapshot-schedules/:id", requirePermission("storage", "write"), storageHandler.UpdateSnapshotSchedule) + storageGroup.DELETE("/zfs/snapshot-schedules/:id", requirePermission("storage", "write"), storageHandler.DeleteSnapshotSchedule) + storageGroup.POST("/zfs/snapshot-schedules/:id/toggle", requirePermission("storage", "write"), storageHandler.ToggleSnapshotSchedule) + + // Replication Tasks + storageGroup.GET("/zfs/replication-tasks", storageHandler.ListReplicationTasks) + storageGroup.GET("/zfs/replication-tasks/:id", storageHandler.GetReplicationTask) + storageGroup.POST("/zfs/replication-tasks", requirePermission("storage", "write"), storageHandler.CreateReplicationTask) + storageGroup.PUT("/zfs/replication-tasks/:id", requirePermission("storage", "write"), storageHandler.UpdateReplicationTask) + storageGroup.DELETE("/zfs/replication-tasks/:id", requirePermission("storage", "write"), storageHandler.DeleteReplicationTask) // ZFS ARC Stats storageGroup.GET("/zfs/arc/stats", storageHandler.GetARCStats) } @@ -228,26 +253,28 @@ func NewRouter(cfg *config.Config, db *database.DB, log *logger.Logger) *gin.Eng objectStorageGroup := protected.Group("/object-storage") objectStorageGroup.Use(requirePermission("storage", "read")) { - // Setup endpoints - objectStorageGroup.GET("/setup/datasets", objectStorageHandler.GetAvailableDatasets) - objectStorageGroup.GET("/setup/current", objectStorageHandler.GetCurrentSetup) - objectStorageGroup.POST("/setup", requirePermission("storage", "write"), objectStorageHandler.SetupObjectStorage) - objectStorageGroup.PUT("/setup", requirePermission("storage", "write"), objectStorageHandler.UpdateObjectStorage) - + // Setup endpoints + objectStorageGroup.GET("/setup/datasets", objectStorageHandler.GetAvailableDatasets) + objectStorageGroup.GET("/setup/current", objectStorageHandler.GetCurrentSetup) + objectStorageGroup.POST("/setup", requirePermission("storage", "write"), objectStorageHandler.SetupObjectStorage) + objectStorageGroup.PUT("/setup", requirePermission("storage", "write"), objectStorageHandler.UpdateObjectStorage) + // Bucket endpoints + // IMPORTANT: More specific routes must come BEFORE less specific ones objectStorageGroup.GET("/buckets", objectStorageHandler.ListBuckets) + objectStorageGroup.GET("/buckets/:name/objects", objectStorageHandler.ListObjects) objectStorageGroup.GET("/buckets/:name", objectStorageHandler.GetBucket) - objectStorageGroup.POST("/buckets", requirePermission("storage", "write"), objectStorageHandler.CreateBucket) - objectStorageGroup.DELETE("/buckets/:name", requirePermission("storage", "write"), objectStorageHandler.DeleteBucket) - // User management routes - objectStorageGroup.GET("/users", objectStorageHandler.ListUsers) - objectStorageGroup.POST("/users", requirePermission("storage", "write"), objectStorageHandler.CreateUser) - objectStorageGroup.DELETE("/users/:access_key", requirePermission("storage", "write"), objectStorageHandler.DeleteUser) - // Service account (access key) management routes - objectStorageGroup.GET("/service-accounts", objectStorageHandler.ListServiceAccounts) - objectStorageGroup.POST("/service-accounts", requirePermission("storage", "write"), objectStorageHandler.CreateServiceAccount) - objectStorageGroup.DELETE("/service-accounts/:access_key", requirePermission("storage", "write"), objectStorageHandler.DeleteServiceAccount) - } + objectStorageGroup.POST("/buckets", requirePermission("storage", "write"), objectStorageHandler.CreateBucket) + objectStorageGroup.DELETE("/buckets/:name", requirePermission("storage", "write"), objectStorageHandler.DeleteBucket) + // User management routes + objectStorageGroup.GET("/users", objectStorageHandler.ListUsers) + objectStorageGroup.POST("/users", requirePermission("storage", "write"), objectStorageHandler.CreateUser) + objectStorageGroup.DELETE("/users/:access_key", requirePermission("storage", "write"), objectStorageHandler.DeleteUser) + // Service account (access key) management routes + objectStorageGroup.GET("/service-accounts", objectStorageHandler.ListServiceAccounts) + objectStorageGroup.POST("/service-accounts", requirePermission("storage", "write"), objectStorageHandler.CreateServiceAccount) + objectStorageGroup.DELETE("/service-accounts/:access_key", requirePermission("storage", "write"), objectStorageHandler.DeleteServiceAccount) + } } } @@ -347,9 +374,9 @@ func NewRouter(cfg *config.Config, db *database.DB, log *logger.Logger) *gin.Eng systemGroup.GET("/logs", systemHandler.GetSystemLogs) systemGroup.GET("/network/throughput", systemHandler.GetNetworkThroughput) systemGroup.POST("/support-bundle", systemHandler.GenerateSupportBundle) - systemGroup.GET("/interfaces", systemHandler.ListNetworkInterfaces) - systemGroup.GET("/management-ip", systemHandler.GetManagementIPAddress) - systemGroup.PUT("/interfaces/:name", systemHandler.UpdateNetworkInterface) + systemGroup.GET("/interfaces", systemHandler.ListNetworkInterfaces) + systemGroup.GET("/management-ip", systemHandler.GetManagementIPAddress) + systemGroup.PUT("/interfaces/:name", systemHandler.UpdateNetworkInterface) systemGroup.GET("/ntp", systemHandler.GetNTPSettings) systemGroup.POST("/ntp", systemHandler.SaveNTPSettings) systemGroup.POST("/execute", requirePermission("system", "write"), systemHandler.ExecuteCommand) diff --git a/backend/internal/object_storage/handler.go b/backend/internal/object_storage/handler.go index f9ef4e1..49fb4d4 100644 --- a/backend/internal/object_storage/handler.go +++ b/backend/internal/object_storage/handler.go @@ -11,9 +11,9 @@ import ( // Handler handles HTTP requests for object storage type Handler struct { - service *Service + service *Service setupService *SetupService - logger *logger.Logger + logger *logger.Logger } // NewHandler creates a new object storage handler @@ -283,3 +283,18 @@ func (h *Handler) DeleteServiceAccount(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"message": "service account deleted successfully"}) } + +// ListObjects lists objects in a bucket +func (h *Handler) ListObjects(c *gin.Context) { + bucketName := c.Param("name") // Changed from "bucket" to "name" to match route + prefix := c.DefaultQuery("prefix", "") // Optional prefix (folder path) + + objects, err := h.service.ListObjects(c.Request.Context(), bucketName, prefix) + if err != nil { + h.logger.Error("Failed to list objects", "bucket", bucketName, "prefix", prefix, "error", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list objects: " + err.Error()}) + return + } + + c.JSON(http.StatusOK, gin.H{"objects": objects}) +} diff --git a/backend/internal/object_storage/service.go b/backend/internal/object_storage/service.go index 04b97d7..d1d807b 100644 --- a/backend/internal/object_storage/service.go +++ b/backend/internal/object_storage/service.go @@ -4,23 +4,24 @@ import ( "context" "encoding/json" "fmt" + "sort" "strings" "time" "github.com/atlasos/calypso/internal/common/logger" + madmin "github.com/minio/madmin-go/v3" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" - madmin "github.com/minio/madmin-go/v3" ) // Service handles MinIO object storage operations type Service struct { - client *minio.Client + client *minio.Client adminClient *madmin.AdminClient - logger *logger.Logger - endpoint string - accessKey string - secretKey string + logger *logger.Logger + endpoint string + accessKey string + secretKey string } // NewService creates a new MinIO service @@ -54,8 +55,8 @@ func NewService(endpoint, accessKey, secretKey string, log *logger.Logger) (*Ser type Bucket struct { Name string `json:"name"` CreationDate time.Time `json:"creation_date"` - Size int64 `json:"size"` // Total size in bytes - Objects int64 `json:"objects"` // Number of objects + Size int64 `json:"size"` // Total size in bytes + Objects int64 `json:"objects"` // Number of objects AccessPolicy string `json:"access_policy"` // private, public-read, public-read-write } @@ -153,7 +154,6 @@ func (s *Service) getBucketPolicy(ctx context.Context, bucketName string) string return "private" } - // CreateBucket creates a new bucket func (s *Service) CreateBucket(ctx context.Context, bucketName string) error { err := s.client.MakeBucket(ctx, bucketName, minio.MakeBucketOptions{}) @@ -177,6 +177,82 @@ func (s *Service) GetBucketStats(ctx context.Context, bucketName string) (*Bucke return s.getBucketInfo(ctx, bucketName) } +// Object represents a MinIO object (file or folder) +type Object struct { + Name string `json:"name"` + Key string `json:"key"` // Full path/key + Size int64 `json:"size"` // Size in bytes (0 for folders) + LastModified time.Time `json:"last_modified"` + IsDir bool `json:"is_dir"` // True if this is a folder/prefix + ETag string `json:"etag,omitempty"` +} + +// ListObjects lists objects in a bucket with optional prefix (folder path) +func (s *Service) ListObjects(ctx context.Context, bucketName, prefix string) ([]*Object, error) { + // Ensure prefix ends with / if it's not empty and doesn't already end with / + if prefix != "" && !strings.HasSuffix(prefix, "/") { + prefix += "/" + } + + // List objects with prefix + objectCh := s.client.ListObjects(ctx, bucketName, minio.ListObjectsOptions{ + Prefix: prefix, + Recursive: false, // Don't recurse, show folders as separate items + }) + + var objects []*Object + seenFolders := make(map[string]bool) + + for object := range objectCh { + if object.Err != nil { + s.logger.Warn("Error listing object", "bucket", bucketName, "prefix", prefix, "error", object.Err) + continue + } + + // Extract folder name from object key + relativeKey := strings.TrimPrefix(object.Key, prefix) + + // Check if this is a folder (has / in relative path) + if strings.Contains(relativeKey, "/") { + // Extract folder name (first part before /) + folderName := strings.Split(relativeKey, "/")[0] + folderKey := prefix + folderName + "/" + + // Only add folder once + if !seenFolders[folderKey] { + seenFolders[folderKey] = true + objects = append(objects, &Object{ + Name: folderName, + Key: folderKey, + Size: 0, + LastModified: time.Time{}, + IsDir: true, + }) + } + } else { + // This is a file in the current directory + objects = append(objects, &Object{ + Name: relativeKey, + Key: object.Key, + Size: object.Size, + LastModified: object.LastModified, + IsDir: false, + ETag: object.ETag, + }) + } + } + + // Sort: folders first, then files, both alphabetically + sort.Slice(objects, func(i, j int) bool { + if objects[i].IsDir != objects[j].IsDir { + return objects[i].IsDir // Folders first + } + return objects[i].Name < objects[j].Name + }) + + return objects, nil +} + // User represents a MinIO IAM user type User struct { AccessKey string `json:"access_key"` @@ -229,11 +305,11 @@ func (s *Service) DeleteUser(ctx context.Context, accessKey string) error { // ServiceAccount represents a MinIO service account (access key) type ServiceAccount struct { - AccessKey string `json:"access_key"` - SecretKey string `json:"secret_key,omitempty"` // Only returned on creation - ParentUser string `json:"parent_user"` - Expiration time.Time `json:"expiration,omitempty"` - CreatedAt time.Time `json:"created_at"` + AccessKey string `json:"access_key"` + SecretKey string `json:"secret_key,omitempty"` // Only returned on creation + ParentUser string `json:"parent_user"` + Expiration time.Time `json:"expiration,omitempty"` + CreatedAt time.Time `json:"created_at"` } // ListServiceAccounts lists all service accounts in MinIO diff --git a/backend/internal/storage/handler.go b/backend/internal/storage/handler.go index 2f3382c..7c60d23 100644 --- a/backend/internal/storage/handler.go +++ b/backend/internal/storage/handler.go @@ -15,14 +15,17 @@ import ( // Handler handles storage-related API requests type Handler struct { - diskService *DiskService - lvmService *LVMService - zfsService *ZFSService - arcService *ARCService - taskEngine *tasks.Engine - db *database.DB - logger *logger.Logger - cache *cache.Cache // Cache for invalidation + diskService *DiskService + lvmService *LVMService + zfsService *ZFSService + snapshotService *SnapshotService + snapshotScheduleService *SnapshotScheduleService + replicationService *ReplicationService + arcService *ARCService + taskEngine *tasks.Engine + db *database.DB + logger *logger.Logger + cache *cache.Cache // Cache for invalidation } // SetCache sets the cache instance for cache invalidation @@ -32,14 +35,18 @@ func (h *Handler) SetCache(c *cache.Cache) { // NewHandler creates a new storage handler func NewHandler(db *database.DB, log *logger.Logger) *Handler { + snapshotService := NewSnapshotService(db, log) return &Handler{ - diskService: NewDiskService(db, log), - lvmService: NewLVMService(db, log), - zfsService: NewZFSService(db, log), - arcService: NewARCService(log), - taskEngine: tasks.NewEngine(db, log), - db: db, - logger: log, + diskService: NewDiskService(db, log), + lvmService: NewLVMService(db, log), + zfsService: NewZFSService(db, log), + snapshotService: snapshotService, + snapshotScheduleService: NewSnapshotScheduleService(db, log, snapshotService), + replicationService: NewReplicationService(db, log), + arcService: NewARCService(log), + taskEngine: tasks.NewEngine(db, log), + db: db, + logger: log, } } @@ -509,3 +516,417 @@ func (h *Handler) GetARCStats(c *gin.Context) { c.JSON(http.StatusOK, stats) } + +// ListSnapshots lists all snapshots, optionally filtered by dataset +func (h *Handler) ListSnapshots(c *gin.Context) { + datasetFilter := c.DefaultQuery("dataset", "") + + snapshots, err := h.snapshotService.ListSnapshots(c.Request.Context(), datasetFilter) + if err != nil { + h.logger.Error("Failed to list snapshots", "error", err, "dataset", datasetFilter) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list snapshots: " + err.Error()}) + return + } + + c.JSON(http.StatusOK, gin.H{"snapshots": snapshots}) +} + +// CreateSnapshotRequest represents a request to create a snapshot +type CreateSnapshotRequest struct { + Dataset string `json:"dataset" binding:"required"` + Name string `json:"name" binding:"required"` + Recursive bool `json:"recursive"` +} + +// CreateSnapshot creates a new snapshot +func (h *Handler) CreateSnapshot(c *gin.Context) { + var req CreateSnapshotRequest + if err := c.ShouldBindJSON(&req); err != nil { + h.logger.Error("Invalid create snapshot request", "error", err) + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request: " + err.Error()}) + return + } + + if err := h.snapshotService.CreateSnapshot(c.Request.Context(), req.Dataset, req.Name, req.Recursive); err != nil { + h.logger.Error("Failed to create snapshot", "error", err, "dataset", req.Dataset, "name", req.Name) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to create snapshot: " + err.Error()}) + return + } + + c.JSON(http.StatusCreated, gin.H{"message": "snapshot created successfully"}) +} + +// DeleteSnapshot deletes a snapshot +func (h *Handler) DeleteSnapshot(c *gin.Context) { + snapshotName := c.Param("name") + recursive := c.DefaultQuery("recursive", "false") == "true" + + if err := h.snapshotService.DeleteSnapshot(c.Request.Context(), snapshotName, recursive); err != nil { + h.logger.Error("Failed to delete snapshot", "error", err, "snapshot", snapshotName) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete snapshot: " + err.Error()}) + return + } + + c.JSON(http.StatusOK, gin.H{"message": "snapshot deleted successfully"}) +} + +// RollbackSnapshotRequest represents a request to rollback to a snapshot +type RollbackSnapshotRequest struct { + Force bool `json:"force"` +} + +// RollbackSnapshot rolls back a dataset to a snapshot +func (h *Handler) RollbackSnapshot(c *gin.Context) { + snapshotName := c.Param("name") + + var req RollbackSnapshotRequest + if err := c.ShouldBindJSON(&req); err != nil { + // Default to false if not provided + req.Force = false + } + + if err := h.snapshotService.RollbackSnapshot(c.Request.Context(), snapshotName, req.Force); err != nil { + h.logger.Error("Failed to rollback snapshot", "error", err, "snapshot", snapshotName) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to rollback snapshot: " + err.Error()}) + return + } + + c.JSON(http.StatusOK, gin.H{"message": "snapshot rollback completed successfully"}) +} + +// CloneSnapshotRequest represents a request to clone a snapshot +type CloneSnapshotRequest struct { + CloneName string `json:"clone_name" binding:"required"` +} + +// CloneSnapshot clones a snapshot to a new dataset +func (h *Handler) CloneSnapshot(c *gin.Context) { + snapshotName := c.Param("name") + + var req CloneSnapshotRequest + if err := c.ShouldBindJSON(&req); err != nil { + h.logger.Error("Invalid clone snapshot request", "error", err) + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request: " + err.Error()}) + return + } + + if err := h.snapshotService.CloneSnapshot(c.Request.Context(), snapshotName, req.CloneName); err != nil { + h.logger.Error("Failed to clone snapshot", "error", err, "snapshot", snapshotName, "clone", req.CloneName) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to clone snapshot: " + err.Error()}) + return + } + + c.JSON(http.StatusCreated, gin.H{"message": "snapshot cloned successfully", "clone_name": req.CloneName}) +} + +// ListSnapshotSchedules lists all snapshot schedules +func (h *Handler) ListSnapshotSchedules(c *gin.Context) { + schedules, err := h.snapshotScheduleService.ListSchedules(c.Request.Context()) + if err != nil { + h.logger.Error("Failed to list snapshot schedules", "error", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list snapshot schedules: " + err.Error()}) + return + } + + c.JSON(http.StatusOK, gin.H{"schedules": schedules}) +} + +// GetSnapshotSchedule retrieves a snapshot schedule by ID +func (h *Handler) GetSnapshotSchedule(c *gin.Context) { + id := c.Param("id") + + schedule, err := h.snapshotScheduleService.GetSchedule(c.Request.Context(), id) + if err != nil { + if err.Error() == "schedule not found" { + c.JSON(http.StatusNotFound, gin.H{"error": "schedule not found"}) + return + } + h.logger.Error("Failed to get snapshot schedule", "error", err, "id", id) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to get snapshot schedule: " + err.Error()}) + return + } + + c.JSON(http.StatusOK, schedule) +} + +// CreateSnapshotScheduleRequest represents a request to create a snapshot schedule +type CreateSnapshotScheduleRequest struct { + Name string `json:"name" binding:"required"` + Dataset string `json:"dataset" binding:"required"` + SnapshotNameTemplate string `json:"snapshot_name_template" binding:"required"` + ScheduleType string `json:"schedule_type" binding:"required"` + ScheduleConfig map[string]interface{} `json:"schedule_config" binding:"required"` + Recursive bool `json:"recursive"` + RetentionCount *int `json:"retention_count"` + RetentionDays *int `json:"retention_days"` +} + +// CreateSnapshotSchedule creates a new snapshot schedule +func (h *Handler) CreateSnapshotSchedule(c *gin.Context) { + var req CreateSnapshotScheduleRequest + if err := c.ShouldBindJSON(&req); err != nil { + h.logger.Error("Invalid create snapshot schedule request", "error", err) + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request: " + err.Error()}) + return + } + + userID, _ := c.Get("user_id") + userIDStr := "" + if userID != nil { + userIDStr = userID.(string) + } + + schedule, err := h.snapshotScheduleService.CreateSchedule(c.Request.Context(), &CreateScheduleRequest{ + Name: req.Name, + Dataset: req.Dataset, + SnapshotNameTemplate: req.SnapshotNameTemplate, + ScheduleType: req.ScheduleType, + ScheduleConfig: req.ScheduleConfig, + Recursive: req.Recursive, + RetentionCount: req.RetentionCount, + RetentionDays: req.RetentionDays, + }, userIDStr) + if err != nil { + h.logger.Error("Failed to create snapshot schedule", "error", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to create snapshot schedule: " + err.Error()}) + return + } + + c.JSON(http.StatusCreated, schedule) +} + +// UpdateSnapshotSchedule updates an existing snapshot schedule +func (h *Handler) UpdateSnapshotSchedule(c *gin.Context) { + id := c.Param("id") + + var req CreateSnapshotScheduleRequest + if err := c.ShouldBindJSON(&req); err != nil { + h.logger.Error("Invalid update snapshot schedule request", "error", err) + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request: " + err.Error()}) + return + } + + schedule, err := h.snapshotScheduleService.UpdateSchedule(c.Request.Context(), id, &CreateScheduleRequest{ + Name: req.Name, + Dataset: req.Dataset, + SnapshotNameTemplate: req.SnapshotNameTemplate, + ScheduleType: req.ScheduleType, + ScheduleConfig: req.ScheduleConfig, + Recursive: req.Recursive, + RetentionCount: req.RetentionCount, + RetentionDays: req.RetentionDays, + }) + if err != nil { + if err.Error() == "schedule not found" { + c.JSON(http.StatusNotFound, gin.H{"error": "schedule not found"}) + return + } + h.logger.Error("Failed to update snapshot schedule", "error", err, "id", id) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update snapshot schedule: " + err.Error()}) + return + } + + c.JSON(http.StatusOK, schedule) +} + +// DeleteSnapshotSchedule deletes a snapshot schedule +func (h *Handler) DeleteSnapshotSchedule(c *gin.Context) { + id := c.Param("id") + + if err := h.snapshotScheduleService.DeleteSchedule(c.Request.Context(), id); err != nil { + if err.Error() == "schedule not found" { + c.JSON(http.StatusNotFound, gin.H{"error": "schedule not found"}) + return + } + h.logger.Error("Failed to delete snapshot schedule", "error", err, "id", id) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete snapshot schedule: " + err.Error()}) + return + } + + c.JSON(http.StatusOK, gin.H{"message": "snapshot schedule deleted successfully"}) +} + +// ToggleSnapshotSchedule enables or disables a snapshot schedule +func (h *Handler) ToggleSnapshotSchedule(c *gin.Context) { + id := c.Param("id") + + var req struct { + Enabled bool `json:"enabled" binding:"required"` + } + if err := c.ShouldBindJSON(&req); err != nil { + h.logger.Error("Invalid toggle snapshot schedule request", "error", err) + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request: " + err.Error()}) + return + } + + if err := h.snapshotScheduleService.ToggleSchedule(c.Request.Context(), id, req.Enabled); err != nil { + if err.Error() == "schedule not found" { + c.JSON(http.StatusNotFound, gin.H{"error": "schedule not found"}) + return + } + h.logger.Error("Failed to toggle snapshot schedule", "error", err, "id", id) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to toggle snapshot schedule: " + err.Error()}) + return + } + + c.JSON(http.StatusOK, gin.H{"message": "snapshot schedule toggled successfully"}) +} + +// ListReplicationTasks lists all replication tasks +func (h *Handler) ListReplicationTasks(c *gin.Context) { + direction := c.Query("direction") // Optional filter: "outbound" or "inbound" + tasks, err := h.replicationService.ListReplicationTasks(c.Request.Context(), direction) + if err != nil { + h.logger.Error("Failed to list replication tasks", "error", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list replication tasks: " + err.Error()}) + return + } + c.JSON(http.StatusOK, gin.H{"tasks": tasks}) +} + +// GetReplicationTask retrieves a replication task by ID +func (h *Handler) GetReplicationTask(c *gin.Context) { + id := c.Param("id") + task, err := h.replicationService.GetReplicationTask(c.Request.Context(), id) + if err != nil { + if err.Error() == "replication task not found" { + c.JSON(http.StatusNotFound, gin.H{"error": "replication task not found"}) + return + } + h.logger.Error("Failed to get replication task", "error", err, "id", id) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to get replication task: " + err.Error()}) + return + } + c.JSON(http.StatusOK, task) +} + +// CreateReplicationTaskRequest represents a request to create a replication task +type CreateReplicationTaskRequest struct { + Name string `json:"name" binding:"required"` + Direction string `json:"direction" binding:"required"` + SourceDataset *string `json:"source_dataset"` + TargetHost *string `json:"target_host"` + TargetPort *int `json:"target_port"` + TargetUser *string `json:"target_user"` + TargetDataset *string `json:"target_dataset"` + TargetSSHKeyPath *string `json:"target_ssh_key_path"` + SourceHost *string `json:"source_host"` + SourcePort *int `json:"source_port"` + SourceUser *string `json:"source_user"` + LocalDataset *string `json:"local_dataset"` + ScheduleType *string `json:"schedule_type"` + ScheduleConfig map[string]interface{} `json:"schedule_config"` + Compression string `json:"compression"` + Encryption bool `json:"encryption"` + Recursive bool `json:"recursive"` + Incremental bool `json:"incremental"` + AutoSnapshot bool `json:"auto_snapshot"` + Enabled bool `json:"enabled"` +} + +// CreateReplicationTask creates a new replication task +func (h *Handler) CreateReplicationTask(c *gin.Context) { + var req CreateReplicationTaskRequest + if err := c.ShouldBindJSON(&req); err != nil { + h.logger.Error("Invalid create replication task request", "error", err) + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request: " + err.Error()}) + return + } + + userID, _ := c.Get("user_id") + userIDStr := "" + if userID != nil { + userIDStr = userID.(string) + } + + task, err := h.replicationService.CreateReplicationTask(c.Request.Context(), &CreateReplicationRequest{ + Name: req.Name, + Direction: req.Direction, + SourceDataset: req.SourceDataset, + TargetHost: req.TargetHost, + TargetPort: req.TargetPort, + TargetUser: req.TargetUser, + TargetDataset: req.TargetDataset, + TargetSSHKeyPath: req.TargetSSHKeyPath, + SourceHost: req.SourceHost, + SourcePort: req.SourcePort, + SourceUser: req.SourceUser, + LocalDataset: req.LocalDataset, + ScheduleType: req.ScheduleType, + ScheduleConfig: req.ScheduleConfig, + Compression: req.Compression, + Encryption: req.Encryption, + Recursive: req.Recursive, + Incremental: req.Incremental, + AutoSnapshot: req.AutoSnapshot, + Enabled: req.Enabled, + }, userIDStr) + if err != nil { + h.logger.Error("Failed to create replication task", "error", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to create replication task: " + err.Error()}) + return + } + + c.JSON(http.StatusCreated, task) +} + +// UpdateReplicationTask updates an existing replication task +func (h *Handler) UpdateReplicationTask(c *gin.Context) { + id := c.Param("id") + var req CreateReplicationTaskRequest + if err := c.ShouldBindJSON(&req); err != nil { + h.logger.Error("Invalid update replication task request", "error", err) + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request: " + err.Error()}) + return + } + + task, err := h.replicationService.UpdateReplicationTask(c.Request.Context(), id, &CreateReplicationRequest{ + Name: req.Name, + Direction: req.Direction, + SourceDataset: req.SourceDataset, + TargetHost: req.TargetHost, + TargetPort: req.TargetPort, + TargetUser: req.TargetUser, + TargetDataset: req.TargetDataset, + TargetSSHKeyPath: req.TargetSSHKeyPath, + SourceHost: req.SourceHost, + SourcePort: req.SourcePort, + SourceUser: req.SourceUser, + LocalDataset: req.LocalDataset, + ScheduleType: req.ScheduleType, + ScheduleConfig: req.ScheduleConfig, + Compression: req.Compression, + Encryption: req.Encryption, + Recursive: req.Recursive, + Incremental: req.Incremental, + AutoSnapshot: req.AutoSnapshot, + Enabled: req.Enabled, + }) + if err != nil { + if err.Error() == "replication task not found" { + c.JSON(http.StatusNotFound, gin.H{"error": "replication task not found"}) + return + } + h.logger.Error("Failed to update replication task", "error", err, "id", id) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update replication task: " + err.Error()}) + return + } + + c.JSON(http.StatusOK, task) +} + +// DeleteReplicationTask deletes a replication task +func (h *Handler) DeleteReplicationTask(c *gin.Context) { + id := c.Param("id") + if err := h.replicationService.DeleteReplicationTask(c.Request.Context(), id); err != nil { + if err.Error() == "replication task not found" { + c.JSON(http.StatusNotFound, gin.H{"error": "replication task not found"}) + return + } + h.logger.Error("Failed to delete replication task", "error", err, "id", id) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete replication task: " + err.Error()}) + return + } + + c.JSON(http.StatusOK, gin.H{"message": "replication task deleted successfully"}) +} diff --git a/backend/internal/storage/replication.go b/backend/internal/storage/replication.go new file mode 100644 index 0000000..b0f4be1 --- /dev/null +++ b/backend/internal/storage/replication.go @@ -0,0 +1,553 @@ +package storage + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "time" + + "github.com/atlasos/calypso/internal/common/database" + "github.com/atlasos/calypso/internal/common/logger" + "github.com/google/uuid" +) + +// ReplicationService handles ZFS replication task management +type ReplicationService struct { + db *database.DB + logger *logger.Logger +} + +// NewReplicationService creates a new replication service +func NewReplicationService(db *database.DB, log *logger.Logger) *ReplicationService { + return &ReplicationService{ + db: db, + logger: log, + } +} + +// ReplicationTask represents a ZFS replication task +type ReplicationTask struct { + ID string `json:"id"` + Name string `json:"name"` + Direction string `json:"direction"` // "outbound" or "inbound" + SourceDataset *string `json:"source_dataset,omitempty"` + TargetHost *string `json:"target_host,omitempty"` + TargetPort *int `json:"target_port,omitempty"` + TargetUser *string `json:"target_user,omitempty"` + TargetDataset *string `json:"target_dataset,omitempty"` + TargetSSHKeyPath *string `json:"target_ssh_key_path,omitempty"` + SourceHost *string `json:"source_host,omitempty"` + SourcePort *int `json:"source_port,omitempty"` + SourceUser *string `json:"source_user,omitempty"` + LocalDataset *string `json:"local_dataset,omitempty"` + ScheduleType *string `json:"schedule_type,omitempty"` + ScheduleConfig map[string]interface{} `json:"schedule_config,omitempty"` + Compression string `json:"compression"` + Encryption bool `json:"encryption"` + Recursive bool `json:"recursive"` + Incremental bool `json:"incremental"` + AutoSnapshot bool `json:"auto_snapshot"` + Enabled bool `json:"enabled"` + Status string `json:"status"` + LastRunAt *time.Time `json:"last_run_at,omitempty"` + LastRunStatus *string `json:"last_run_status,omitempty"` + LastRunError *string `json:"last_run_error,omitempty"` + NextRunAt *time.Time `json:"next_run_at,omitempty"` + LastSnapshotSent *string `json:"last_snapshot_sent,omitempty"` + LastSnapshotReceived *string `json:"last_snapshot_received,omitempty"` + TotalRuns int `json:"total_runs"` + SuccessfulRuns int `json:"successful_runs"` + FailedRuns int `json:"failed_runs"` + BytesSent int64 `json:"bytes_sent"` + BytesReceived int64 `json:"bytes_received"` + CreatedBy string `json:"created_by,omitempty"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +// CreateReplicationRequest represents a request to create a replication task +type CreateReplicationRequest struct { + Name string `json:"name" binding:"required"` + Direction string `json:"direction" binding:"required"` // "outbound" or "inbound" + SourceDataset *string `json:"source_dataset"` + TargetHost *string `json:"target_host"` + TargetPort *int `json:"target_port"` + TargetUser *string `json:"target_user"` + TargetDataset *string `json:"target_dataset"` + TargetSSHKeyPath *string `json:"target_ssh_key_path"` + SourceHost *string `json:"source_host"` + SourcePort *int `json:"source_port"` + SourceUser *string `json:"source_user"` + LocalDataset *string `json:"local_dataset"` + ScheduleType *string `json:"schedule_type"` + ScheduleConfig map[string]interface{} `json:"schedule_config"` + Compression string `json:"compression"` + Encryption bool `json:"encryption"` + Recursive bool `json:"recursive"` + Incremental bool `json:"incremental"` + AutoSnapshot bool `json:"auto_snapshot"` + Enabled bool `json:"enabled"` +} + +// ListReplicationTasks lists all replication tasks, optionally filtered by direction +func (s *ReplicationService) ListReplicationTasks(ctx context.Context, directionFilter string) ([]*ReplicationTask, error) { + var query string + var args []interface{} + + if directionFilter != "" { + query = ` + SELECT id, name, direction, source_dataset, target_host, target_port, target_user, target_dataset, + target_ssh_key_path, source_host, source_port, source_user, local_dataset, + schedule_type, schedule_config, compression, encryption, recursive, incremental, + auto_snapshot, enabled, status, last_run_at, last_run_status, last_run_error, + next_run_at, last_snapshot_sent, last_snapshot_received, + total_runs, successful_runs, failed_runs, bytes_sent, bytes_received, + created_by, created_at, updated_at + FROM replication_tasks + WHERE direction = $1 + ORDER BY created_at DESC + ` + args = []interface{}{directionFilter} + } else { + query = ` + SELECT id, name, direction, source_dataset, target_host, target_port, target_user, target_dataset, + target_ssh_key_path, source_host, source_port, source_user, local_dataset, + schedule_type, schedule_config, compression, encryption, recursive, incremental, + auto_snapshot, enabled, status, last_run_at, last_run_status, last_run_error, + next_run_at, last_snapshot_sent, last_snapshot_received, + total_runs, successful_runs, failed_runs, bytes_sent, bytes_received, + created_by, created_at, updated_at + FROM replication_tasks + ORDER BY direction, created_at DESC + ` + args = []interface{}{} + } + + rows, err := s.db.QueryContext(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("failed to query replication tasks: %w", err) + } + defer rows.Close() + + var tasks []*ReplicationTask + for rows.Next() { + task, err := s.scanReplicationTask(rows) + if err != nil { + s.logger.Error("Failed to scan replication task", "error", err) + continue + } + tasks = append(tasks, task) + } + + return tasks, rows.Err() +} + +// GetReplicationTask retrieves a replication task by ID +func (s *ReplicationService) GetReplicationTask(ctx context.Context, id string) (*ReplicationTask, error) { + query := ` + SELECT id, name, direction, source_dataset, target_host, target_port, target_user, target_dataset, + target_ssh_key_path, source_host, source_port, source_user, local_dataset, + schedule_type, schedule_config, compression, encryption, recursive, incremental, + auto_snapshot, enabled, status, last_run_at, last_run_status, last_run_error, + next_run_at, last_snapshot_sent, last_snapshot_received, + total_runs, successful_runs, failed_runs, bytes_sent, bytes_received, + created_by, created_at, updated_at + FROM replication_tasks + WHERE id = $1 + ` + + row := s.db.QueryRowContext(ctx, query, id) + task, err := s.scanReplicationTaskRow(row) + if err != nil { + if err == sql.ErrNoRows { + return nil, fmt.Errorf("replication task not found") + } + return nil, fmt.Errorf("failed to get replication task: %w", err) + } + + return task, nil +} + +// CreateReplicationTask creates a new replication task +func (s *ReplicationService) CreateReplicationTask(ctx context.Context, req *CreateReplicationRequest, createdBy string) (*ReplicationTask, error) { + id := uuid.New().String() + + // Validate direction-specific fields + if req.Direction == "outbound" { + if req.SourceDataset == nil || req.TargetHost == nil || req.TargetDataset == nil { + return nil, fmt.Errorf("outbound replication requires source_dataset, target_host, and target_dataset") + } + } else if req.Direction == "inbound" { + if req.SourceHost == nil || req.SourceDataset == nil || req.LocalDataset == nil { + return nil, fmt.Errorf("inbound replication requires source_host, source_dataset, and local_dataset") + } + } else { + return nil, fmt.Errorf("invalid direction: must be 'outbound' or 'inbound'") + } + + // Set defaults + if req.Compression == "" { + req.Compression = "lz4" + } + if req.TargetPort == nil { + defaultPort := 22 + req.TargetPort = &defaultPort + } + if req.SourcePort == nil { + defaultPort := 22 + req.SourcePort = &defaultPort + } + if req.TargetUser == nil { + defaultUser := "root" + req.TargetUser = &defaultUser + } + if req.SourceUser == nil { + defaultUser := "root" + req.SourceUser = &defaultUser + } + + // Marshal schedule config to JSON + var scheduleConfigJSON sql.NullString + if req.ScheduleConfig != nil { + configJSON, err := json.Marshal(req.ScheduleConfig) + if err != nil { + return nil, fmt.Errorf("failed to marshal schedule config: %w", err) + } + scheduleConfigJSON = sql.NullString{String: string(configJSON), Valid: true} + } + + query := ` + INSERT INTO replication_tasks ( + id, name, direction, source_dataset, target_host, target_port, target_user, target_dataset, + target_ssh_key_path, source_host, source_port, source_user, local_dataset, + schedule_type, schedule_config, compression, encryption, recursive, incremental, + auto_snapshot, enabled, status, created_by, created_at, updated_at + ) VALUES ( + $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, NOW(), NOW() + ) + RETURNING id, name, direction, source_dataset, target_host, target_port, target_user, target_dataset, + target_ssh_key_path, source_host, source_port, source_user, local_dataset, + schedule_type, schedule_config, compression, encryption, recursive, incremental, + auto_snapshot, enabled, status, last_run_at, last_run_status, last_run_error, + next_run_at, last_snapshot_sent, last_snapshot_received, + total_runs, successful_runs, failed_runs, bytes_sent, bytes_received, + created_by, created_at, updated_at + ` + + var scheduleTypeStr sql.NullString + if req.ScheduleType != nil { + scheduleTypeStr = sql.NullString{String: *req.ScheduleType, Valid: true} + } + + row := s.db.QueryRowContext(ctx, query, + id, req.Name, req.Direction, + req.SourceDataset, req.TargetHost, req.TargetPort, req.TargetUser, req.TargetDataset, + req.TargetSSHKeyPath, req.SourceHost, req.SourcePort, req.SourceUser, req.LocalDataset, + scheduleTypeStr, scheduleConfigJSON, req.Compression, req.Encryption, req.Recursive, req.Incremental, + req.AutoSnapshot, req.Enabled, "idle", createdBy, + ) + + task, err := s.scanReplicationTaskRow(row) + if err != nil { + return nil, fmt.Errorf("failed to create replication task: %w", err) + } + + s.logger.Info("Replication task created", "id", id, "name", req.Name, "direction", req.Direction) + return task, nil +} + +// UpdateReplicationTask updates an existing replication task +func (s *ReplicationService) UpdateReplicationTask(ctx context.Context, id string, req *CreateReplicationRequest) (*ReplicationTask, error) { + // Validate direction-specific fields + if req.Direction == "outbound" { + if req.SourceDataset == nil || req.TargetHost == nil || req.TargetDataset == nil { + return nil, fmt.Errorf("outbound replication requires source_dataset, target_host, and target_dataset") + } + } else if req.Direction == "inbound" { + if req.SourceHost == nil || req.SourceDataset == nil || req.LocalDataset == nil { + return nil, fmt.Errorf("inbound replication requires source_host, source_dataset, and local_dataset") + } + } else { + return nil, fmt.Errorf("invalid direction: must be 'outbound' or 'inbound'") + } + + // Set defaults + if req.Compression == "" { + req.Compression = "lz4" + } + if req.TargetPort == nil { + defaultPort := 22 + req.TargetPort = &defaultPort + } + if req.SourcePort == nil { + defaultPort := 22 + req.SourcePort = &defaultPort + } + if req.TargetUser == nil { + defaultUser := "root" + req.TargetUser = &defaultUser + } + if req.SourceUser == nil { + defaultUser := "root" + req.SourceUser = &defaultUser + } + + // Marshal schedule config to JSON + var scheduleConfigJSON sql.NullString + if req.ScheduleConfig != nil { + configJSON, err := json.Marshal(req.ScheduleConfig) + if err != nil { + return nil, fmt.Errorf("failed to marshal schedule config: %w", err) + } + scheduleConfigJSON = sql.NullString{String: string(configJSON), Valid: true} + } + + var scheduleTypeStr sql.NullString + if req.ScheduleType != nil { + scheduleTypeStr = sql.NullString{String: *req.ScheduleType, Valid: true} + } + + query := ` + UPDATE replication_tasks + SET name = $1, direction = $2, source_dataset = $3, target_host = $4, target_port = $5, + target_user = $6, target_dataset = $7, target_ssh_key_path = $8, source_host = $9, + source_port = $10, source_user = $11, local_dataset = $12, schedule_type = $13, + schedule_config = $14, compression = $15, encryption = $16, recursive = $17, + incremental = $18, auto_snapshot = $19, enabled = $20, updated_at = NOW() + WHERE id = $21 + RETURNING id, name, direction, source_dataset, target_host, target_port, target_user, target_dataset, + target_ssh_key_path, source_host, source_port, source_user, local_dataset, + schedule_type, schedule_config, compression, encryption, recursive, incremental, + auto_snapshot, enabled, status, last_run_at, last_run_status, last_run_error, + next_run_at, last_snapshot_sent, last_snapshot_received, + total_runs, successful_runs, failed_runs, bytes_sent, bytes_received, + created_by, created_at, updated_at + ` + + row := s.db.QueryRowContext(ctx, query, + req.Name, req.Direction, + req.SourceDataset, req.TargetHost, req.TargetPort, req.TargetUser, req.TargetDataset, + req.TargetSSHKeyPath, req.SourceHost, req.SourcePort, req.SourceUser, req.LocalDataset, + scheduleTypeStr, scheduleConfigJSON, req.Compression, req.Encryption, req.Recursive, req.Incremental, + req.AutoSnapshot, req.Enabled, id, + ) + + task, err := s.scanReplicationTaskRow(row) + if err != nil { + if err == sql.ErrNoRows { + return nil, fmt.Errorf("replication task not found") + } + return nil, fmt.Errorf("failed to update replication task: %w", err) + } + + s.logger.Info("Replication task updated", "id", id) + return task, nil +} + +// DeleteReplicationTask deletes a replication task +func (s *ReplicationService) DeleteReplicationTask(ctx context.Context, id string) error { + query := `DELETE FROM replication_tasks WHERE id = $1` + result, err := s.db.ExecContext(ctx, query, id) + if err != nil { + return fmt.Errorf("failed to delete replication task: %w", err) + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("failed to get rows affected: %w", err) + } + + if rowsAffected == 0 { + return fmt.Errorf("replication task not found") + } + + s.logger.Info("Replication task deleted", "id", id) + return nil +} + +// scanReplicationTaskRow scans a single replication task row +func (s *ReplicationService) scanReplicationTaskRow(row *sql.Row) (*ReplicationTask, error) { + var task ReplicationTask + var sourceDataset, targetHost, targetUser, targetDataset, targetSSHKeyPath sql.NullString + var sourceHost, sourceUser, localDataset sql.NullString + var targetPort, sourcePort sql.NullInt64 + var scheduleType, scheduleConfigJSON sql.NullString + var lastRunAt, nextRunAt sql.NullTime + var lastRunStatus, lastRunError, lastSnapshotSent, lastSnapshotReceived sql.NullString + var createdBy sql.NullString + + err := row.Scan( + &task.ID, &task.Name, &task.Direction, + &sourceDataset, &targetHost, &targetPort, &targetUser, &targetDataset, + &targetSSHKeyPath, &sourceHost, &sourcePort, &sourceUser, &localDataset, + &scheduleType, &scheduleConfigJSON, &task.Compression, &task.Encryption, + &task.Recursive, &task.Incremental, &task.AutoSnapshot, &task.Enabled, &task.Status, + &lastRunAt, &lastRunStatus, &lastRunError, &nextRunAt, + &lastSnapshotSent, &lastSnapshotReceived, + &task.TotalRuns, &task.SuccessfulRuns, &task.FailedRuns, + &task.BytesSent, &task.BytesReceived, + &createdBy, &task.CreatedAt, &task.UpdatedAt, + ) + if err != nil { + return nil, err + } + + // Handle nullable fields + if sourceDataset.Valid { + task.SourceDataset = &sourceDataset.String + } + if targetHost.Valid { + task.TargetHost = &targetHost.String + } + if targetPort.Valid { + port := int(targetPort.Int64) + task.TargetPort = &port + } + if targetUser.Valid { + task.TargetUser = &targetUser.String + } + if targetDataset.Valid { + task.TargetDataset = &targetDataset.String + } + if targetSSHKeyPath.Valid { + task.TargetSSHKeyPath = &targetSSHKeyPath.String + } + if sourceHost.Valid { + task.SourceHost = &sourceHost.String + } + if sourcePort.Valid { + port := int(sourcePort.Int64) + task.SourcePort = &port + } + if sourceUser.Valid { + task.SourceUser = &sourceUser.String + } + if localDataset.Valid { + task.LocalDataset = &localDataset.String + } + if scheduleType.Valid { + task.ScheduleType = &scheduleType.String + } + if scheduleConfigJSON.Valid { + if err := json.Unmarshal([]byte(scheduleConfigJSON.String), &task.ScheduleConfig); err != nil { + return nil, fmt.Errorf("failed to unmarshal schedule config: %w", err) + } + } + if lastRunAt.Valid { + task.LastRunAt = &lastRunAt.Time + } + if lastRunStatus.Valid { + task.LastRunStatus = &lastRunStatus.String + } + if lastRunError.Valid { + task.LastRunError = &lastRunError.String + } + if nextRunAt.Valid { + task.NextRunAt = &nextRunAt.Time + } + if lastSnapshotSent.Valid { + task.LastSnapshotSent = &lastSnapshotSent.String + } + if lastSnapshotReceived.Valid { + task.LastSnapshotReceived = &lastSnapshotReceived.String + } + if createdBy.Valid { + task.CreatedBy = createdBy.String + } + + return &task, nil +} + +// scanReplicationTask scans a replication task from rows +func (s *ReplicationService) scanReplicationTask(rows *sql.Rows) (*ReplicationTask, error) { + var task ReplicationTask + var sourceDataset, targetHost, targetUser, targetDataset, targetSSHKeyPath sql.NullString + var sourceHost, sourceUser, localDataset sql.NullString + var targetPort, sourcePort sql.NullInt64 + var scheduleType, scheduleConfigJSON sql.NullString + var lastRunAt, nextRunAt sql.NullTime + var lastRunStatus, lastRunError, lastSnapshotSent, lastSnapshotReceived sql.NullString + var createdBy sql.NullString + + err := rows.Scan( + &task.ID, &task.Name, &task.Direction, + &sourceDataset, &targetHost, &targetPort, &targetUser, &targetDataset, + &targetSSHKeyPath, &sourceHost, &sourcePort, &sourceUser, &localDataset, + &scheduleType, &scheduleConfigJSON, &task.Compression, &task.Encryption, + &task.Recursive, &task.Incremental, &task.AutoSnapshot, &task.Enabled, &task.Status, + &lastRunAt, &lastRunStatus, &lastRunError, &nextRunAt, + &lastSnapshotSent, &lastSnapshotReceived, + &task.TotalRuns, &task.SuccessfulRuns, &task.FailedRuns, + &task.BytesSent, &task.BytesReceived, + &createdBy, &task.CreatedAt, &task.UpdatedAt, + ) + if err != nil { + return nil, err + } + + // Handle nullable fields (same as scanReplicationTaskRow) + if sourceDataset.Valid { + task.SourceDataset = &sourceDataset.String + } + if targetHost.Valid { + task.TargetHost = &targetHost.String + } + if targetPort.Valid { + port := int(targetPort.Int64) + task.TargetPort = &port + } + if targetUser.Valid { + task.TargetUser = &targetUser.String + } + if targetDataset.Valid { + task.TargetDataset = &targetDataset.String + } + if targetSSHKeyPath.Valid { + task.TargetSSHKeyPath = &targetSSHKeyPath.String + } + if sourceHost.Valid { + task.SourceHost = &sourceHost.String + } + if sourcePort.Valid { + port := int(sourcePort.Int64) + task.SourcePort = &port + } + if sourceUser.Valid { + task.SourceUser = &sourceUser.String + } + if localDataset.Valid { + task.LocalDataset = &localDataset.String + } + if scheduleType.Valid { + task.ScheduleType = &scheduleType.String + } + if scheduleConfigJSON.Valid { + if err := json.Unmarshal([]byte(scheduleConfigJSON.String), &task.ScheduleConfig); err != nil { + return nil, fmt.Errorf("failed to unmarshal schedule config: %w", err) + } + } + if lastRunAt.Valid { + task.LastRunAt = &lastRunAt.Time + } + if lastRunStatus.Valid { + task.LastRunStatus = &lastRunStatus.String + } + if lastRunError.Valid { + task.LastRunError = &lastRunError.String + } + if nextRunAt.Valid { + task.NextRunAt = &nextRunAt.Time + } + if lastSnapshotSent.Valid { + task.LastSnapshotSent = &lastSnapshotSent.String + } + if lastSnapshotReceived.Valid { + task.LastSnapshotReceived = &lastSnapshotReceived.String + } + if createdBy.Valid { + task.CreatedBy = createdBy.String + } + + return &task, nil +} diff --git a/backend/internal/storage/snapshot.go b/backend/internal/storage/snapshot.go new file mode 100644 index 0000000..9b5d56e --- /dev/null +++ b/backend/internal/storage/snapshot.go @@ -0,0 +1,259 @@ +package storage + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/atlasos/calypso/internal/common/database" + "github.com/atlasos/calypso/internal/common/logger" +) + +// SnapshotService handles ZFS snapshot operations +type SnapshotService struct { + db *database.DB + logger *logger.Logger +} + +// NewSnapshotService creates a new snapshot service +func NewSnapshotService(db *database.DB, log *logger.Logger) *SnapshotService { + return &SnapshotService{ + db: db, + logger: log, + } +} + +// Snapshot represents a ZFS snapshot +type Snapshot struct { + ID string `json:"id"` + Name string `json:"name"` // Full snapshot name (e.g., "pool/dataset@snapshot-name") + Dataset string `json:"dataset"` // Dataset name (e.g., "pool/dataset") + SnapshotName string `json:"snapshot_name"` // Snapshot name only (e.g., "snapshot-name") + Created time.Time `json:"created"` + Referenced int64 `json:"referenced"` // Size in bytes + Used int64 `json:"used"` // Used space in bytes + IsLatest bool `json:"is_latest"` // Whether this is the latest snapshot for the dataset +} + +// ListSnapshots lists all snapshots, optionally filtered by dataset +func (s *SnapshotService) ListSnapshots(ctx context.Context, datasetFilter string) ([]*Snapshot, error) { + // Build zfs list command + args := []string{"list", "-t", "snapshot", "-H", "-o", "name,creation,referenced,used"} + if datasetFilter != "" { + // List snapshots for specific dataset + args = append(args, datasetFilter) + } else { + // List all snapshots + args = append(args, "-r") + } + + cmd := zfsCommand(ctx, args...) + output, err := cmd.Output() + if err != nil { + return nil, fmt.Errorf("failed to list snapshots: %w", err) + } + + lines := strings.Split(strings.TrimSpace(string(output)), "\n") + var snapshots []*Snapshot + + // Track latest snapshot per dataset + latestSnapshots := make(map[string]*Snapshot) + + for _, line := range lines { + if line == "" { + continue + } + + fields := strings.Fields(line) + if len(fields) < 4 { + continue + } + + fullName := fields[0] + creationStr := fields[1] + referencedStr := fields[2] + usedStr := fields[3] + + // Parse snapshot name (format: pool/dataset@snapshot-name) + parts := strings.Split(fullName, "@") + if len(parts) != 2 { + continue + } + + dataset := parts[0] + snapshotName := parts[1] + + // Parse creation time (Unix timestamp) + var creationTime time.Time + if timestamp, err := parseZFSUnixTimestamp(creationStr); err == nil { + creationTime = timestamp + } else { + // Fallback to current time if parsing fails + creationTime = time.Now() + } + + // Parse sizes + referenced := parseSnapshotSize(referencedStr) + used := parseSnapshotSize(usedStr) + + snapshot := &Snapshot{ + ID: fullName, + Name: fullName, + Dataset: dataset, + SnapshotName: snapshotName, + Created: creationTime, + Referenced: referenced, + Used: used, + IsLatest: false, + } + + snapshots = append(snapshots, snapshot) + + // Track latest snapshot per dataset + if latest, exists := latestSnapshots[dataset]; !exists || snapshot.Created.After(latest.Created) { + latestSnapshots[dataset] = snapshot + } + } + + // Mark latest snapshots + for _, snapshot := range snapshots { + if latest, exists := latestSnapshots[snapshot.Dataset]; exists && latest.ID == snapshot.ID { + snapshot.IsLatest = true + } + } + + return snapshots, nil +} + +// CreateSnapshot creates a new snapshot +func (s *SnapshotService) CreateSnapshot(ctx context.Context, dataset, snapshotName string, recursive bool) error { + // Validate dataset exists + cmd := zfsCommand(ctx, "list", "-H", "-o", "name", dataset) + if err := cmd.Run(); err != nil { + return fmt.Errorf("dataset %s does not exist: %w", dataset, err) + } + + // Build snapshot name + fullSnapshotName := fmt.Sprintf("%s@%s", dataset, snapshotName) + + // Build command + args := []string{"snapshot"} + if recursive { + args = append(args, "-r") + } + args = append(args, fullSnapshotName) + + cmd = zfsCommand(ctx, args...) + output, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("failed to create snapshot: %s: %w", string(output), err) + } + + s.logger.Info("Snapshot created", "snapshot", fullSnapshotName, "dataset", dataset, "recursive", recursive) + return nil +} + +// DeleteSnapshot deletes a snapshot +func (s *SnapshotService) DeleteSnapshot(ctx context.Context, snapshotName string, recursive bool) error { + // Build command + args := []string{"destroy"} + if recursive { + args = append(args, "-r") + } + args = append(args, snapshotName) + + cmd := zfsCommand(ctx, args...) + output, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("failed to delete snapshot: %s: %w", string(output), err) + } + + s.logger.Info("Snapshot deleted", "snapshot", snapshotName, "recursive", recursive) + return nil +} + +// RollbackSnapshot rolls back a dataset to a snapshot +func (s *SnapshotService) RollbackSnapshot(ctx context.Context, snapshotName string, force bool) error { + // Build command + args := []string{"rollback"} + if force { + args = append(args, "-r", "-f") + } else { + args = append(args, "-r") + } + args = append(args, snapshotName) + + cmd := zfsCommand(ctx, args...) + output, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("failed to rollback snapshot: %s: %w", string(output), err) + } + + s.logger.Info("Snapshot rollback completed", "snapshot", snapshotName, "force", force) + return nil +} + +// CloneSnapshot clones a snapshot to a new dataset +func (s *SnapshotService) CloneSnapshot(ctx context.Context, snapshotName, cloneName string) error { + // Build command + args := []string{"clone", snapshotName, cloneName} + + cmd := zfsCommand(ctx, args...) + output, err := cmd.CombinedOutput() + if err != nil { + return fmt.Errorf("failed to clone snapshot: %s: %w", string(output), err) + } + + s.logger.Info("Snapshot cloned", "snapshot", snapshotName, "clone", cloneName) + return nil +} + +// parseZFSUnixTimestamp parses ZFS Unix timestamp string +func parseZFSUnixTimestamp(timestampStr string) (time.Time, error) { + // ZFS returns Unix timestamp as string + timestamp, err := time.Parse("20060102150405", timestampStr) + if err != nil { + // Try parsing as Unix timestamp integer + var unixTime int64 + if _, err := fmt.Sscanf(timestampStr, "%d", &unixTime); err == nil { + return time.Unix(unixTime, 0), nil + } + return time.Time{}, err + } + return timestamp, nil +} + +// parseZFSSize parses ZFS size string (e.g., "1.2M", "4.5G", "850K") +// Note: This function is also defined in zfs.go, but we need it here for snapshot parsing +func parseSnapshotSize(sizeStr string) int64 { + if sizeStr == "-" || sizeStr == "" { + return 0 + } + + // Remove any whitespace + sizeStr = strings.TrimSpace(sizeStr) + + // Parse size with unit + var size float64 + var unit string + if _, err := fmt.Sscanf(sizeStr, "%f%s", &size, &unit); err != nil { + return 0 + } + + // Convert to bytes + unit = strings.ToUpper(unit) + switch unit { + case "K", "KB": + return int64(size * 1024) + case "M", "MB": + return int64(size * 1024 * 1024) + case "G", "GB": + return int64(size * 1024 * 1024 * 1024) + case "T", "TB": + return int64(size * 1024 * 1024 * 1024 * 1024) + default: + // Assume bytes if no unit + return int64(size) + } +} diff --git a/backend/internal/storage/snapshot_schedule.go b/backend/internal/storage/snapshot_schedule.go new file mode 100644 index 0000000..52485a3 --- /dev/null +++ b/backend/internal/storage/snapshot_schedule.go @@ -0,0 +1,606 @@ +package storage + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "sort" + "strings" + "time" + + "github.com/atlasos/calypso/internal/common/database" + "github.com/atlasos/calypso/internal/common/logger" + "github.com/google/uuid" +) + +// SnapshotScheduleService handles snapshot schedule management +type SnapshotScheduleService struct { + db *database.DB + logger *logger.Logger + snapshotService *SnapshotService +} + +// NewSnapshotScheduleService creates a new snapshot schedule service +func NewSnapshotScheduleService(db *database.DB, log *logger.Logger, snapshotService *SnapshotService) *SnapshotScheduleService { + return &SnapshotScheduleService{ + db: db, + logger: log, + snapshotService: snapshotService, + } +} + +// SnapshotSchedule represents a scheduled snapshot task +type SnapshotSchedule struct { + ID string `json:"id"` + Name string `json:"name"` + Dataset string `json:"dataset"` + SnapshotNameTemplate string `json:"snapshot_name_template"` + ScheduleType string `json:"schedule_type"` // hourly, daily, weekly, monthly, cron + ScheduleConfig map[string]interface{} `json:"schedule_config"` + Recursive bool `json:"recursive"` + Enabled bool `json:"enabled"` + RetentionCount *int `json:"retention_count,omitempty"` + RetentionDays *int `json:"retention_days,omitempty"` + LastRunAt *time.Time `json:"last_run_at,omitempty"` + NextRunAt *time.Time `json:"next_run_at,omitempty"` + CreatedBy string `json:"created_by,omitempty"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +// CreateScheduleRequest represents a request to create a snapshot schedule +type CreateScheduleRequest struct { + Name string `json:"name" binding:"required"` + Dataset string `json:"dataset" binding:"required"` + SnapshotNameTemplate string `json:"snapshot_name_template" binding:"required"` + ScheduleType string `json:"schedule_type" binding:"required"` + ScheduleConfig map[string]interface{} `json:"schedule_config" binding:"required"` + Recursive bool `json:"recursive"` + RetentionCount *int `json:"retention_count"` + RetentionDays *int `json:"retention_days"` +} + +// ListSchedules lists all snapshot schedules +func (s *SnapshotScheduleService) ListSchedules(ctx context.Context) ([]*SnapshotSchedule, error) { + query := ` + SELECT id, name, dataset, snapshot_name_template, schedule_type, schedule_config, + recursive, enabled, retention_count, retention_days, + last_run_at, next_run_at, created_by, created_at, updated_at + FROM snapshot_schedules + ORDER BY created_at DESC + ` + + rows, err := s.db.QueryContext(ctx, query) + if err != nil { + return nil, fmt.Errorf("failed to query schedules: %w", err) + } + defer rows.Close() + + var schedules []*SnapshotSchedule + for rows.Next() { + schedule, err := s.scanSchedule(rows) + if err != nil { + s.logger.Error("Failed to scan schedule", "error", err) + continue + } + schedules = append(schedules, schedule) + } + + return schedules, rows.Err() +} + +// GetSchedule retrieves a schedule by ID +func (s *SnapshotScheduleService) GetSchedule(ctx context.Context, id string) (*SnapshotSchedule, error) { + query := ` + SELECT id, name, dataset, snapshot_name_template, schedule_type, schedule_config, + recursive, enabled, retention_count, retention_days, + last_run_at, next_run_at, created_by, created_at, updated_at + FROM snapshot_schedules + WHERE id = $1 + ` + + row := s.db.QueryRowContext(ctx, query, id) + schedule, err := s.scanScheduleRow(row) + if err != nil { + if err == sql.ErrNoRows { + return nil, fmt.Errorf("schedule not found") + } + return nil, fmt.Errorf("failed to get schedule: %w", err) + } + + return schedule, nil +} + +// CreateSchedule creates a new snapshot schedule +func (s *SnapshotScheduleService) CreateSchedule(ctx context.Context, req *CreateScheduleRequest, createdBy string) (*SnapshotSchedule, error) { + id := uuid.New().String() + + // Calculate next run time + nextRunAt, err := s.calculateNextRun(req.ScheduleType, req.ScheduleConfig) + if err != nil { + return nil, fmt.Errorf("failed to calculate next run time: %w", err) + } + + // Marshal schedule config to JSON + configJSON, err := json.Marshal(req.ScheduleConfig) + if err != nil { + return nil, fmt.Errorf("failed to marshal schedule config: %w", err) + } + + query := ` + INSERT INTO snapshot_schedules ( + id, name, dataset, snapshot_name_template, schedule_type, schedule_config, + recursive, enabled, retention_count, retention_days, + next_run_at, created_by, created_at, updated_at + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, NOW(), NOW()) + RETURNING id, name, dataset, snapshot_name_template, schedule_type, schedule_config, + recursive, enabled, retention_count, retention_days, + last_run_at, next_run_at, created_by, created_at, updated_at + ` + + row := s.db.QueryRowContext(ctx, query, + id, req.Name, req.Dataset, req.SnapshotNameTemplate, req.ScheduleType, + string(configJSON), req.Recursive, true, req.RetentionCount, req.RetentionDays, + nextRunAt, createdBy, + ) + + schedule, err := s.scanScheduleRow(row) + if err != nil { + return nil, fmt.Errorf("failed to create schedule: %w", err) + } + + s.logger.Info("Snapshot schedule created", "id", id, "name", req.Name, "dataset", req.Dataset) + return schedule, nil +} + +// UpdateSchedule updates an existing schedule +func (s *SnapshotScheduleService) UpdateSchedule(ctx context.Context, id string, req *CreateScheduleRequest) (*SnapshotSchedule, error) { + // Calculate next run time + nextRunAt, err := s.calculateNextRun(req.ScheduleType, req.ScheduleConfig) + if err != nil { + return nil, fmt.Errorf("failed to calculate next run time: %w", err) + } + + // Marshal schedule config to JSON + configJSON, err := json.Marshal(req.ScheduleConfig) + if err != nil { + return nil, fmt.Errorf("failed to marshal schedule config: %w", err) + } + + query := ` + UPDATE snapshot_schedules + SET name = $1, dataset = $2, snapshot_name_template = $3, schedule_type = $4, + schedule_config = $5, recursive = $6, retention_count = $7, retention_days = $8, + next_run_at = $9, updated_at = NOW() + WHERE id = $10 + RETURNING id, name, dataset, snapshot_name_template, schedule_type, schedule_config, + recursive, enabled, retention_count, retention_days, + last_run_at, next_run_at, created_by, created_at, updated_at + ` + + row := s.db.QueryRowContext(ctx, query, + req.Name, req.Dataset, req.SnapshotNameTemplate, req.ScheduleType, + string(configJSON), req.Recursive, req.RetentionCount, req.RetentionDays, + nextRunAt, id, + ) + + schedule, err := s.scanScheduleRow(row) + if err != nil { + if err == sql.ErrNoRows { + return nil, fmt.Errorf("schedule not found") + } + return nil, fmt.Errorf("failed to update schedule: %w", err) + } + + s.logger.Info("Snapshot schedule updated", "id", id) + return schedule, nil +} + +// DeleteSchedule deletes a schedule +func (s *SnapshotScheduleService) DeleteSchedule(ctx context.Context, id string) error { + query := `DELETE FROM snapshot_schedules WHERE id = $1` + result, err := s.db.ExecContext(ctx, query, id) + if err != nil { + return fmt.Errorf("failed to delete schedule: %w", err) + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("failed to get rows affected: %w", err) + } + + if rowsAffected == 0 { + return fmt.Errorf("schedule not found") + } + + s.logger.Info("Snapshot schedule deleted", "id", id) + return nil +} + +// ToggleSchedule enables or disables a schedule +func (s *SnapshotScheduleService) ToggleSchedule(ctx context.Context, id string, enabled bool) error { + query := `UPDATE snapshot_schedules SET enabled = $1, updated_at = NOW() WHERE id = $2` + result, err := s.db.ExecContext(ctx, query, enabled, id) + if err != nil { + return fmt.Errorf("failed to toggle schedule: %w", err) + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + return fmt.Errorf("failed to get rows affected: %w", err) + } + + if rowsAffected == 0 { + return fmt.Errorf("schedule not found") + } + + s.logger.Info("Snapshot schedule toggled", "id", id, "enabled", enabled) + return nil +} + +// scanScheduleRow scans a single schedule row +func (s *SnapshotScheduleService) scanScheduleRow(row *sql.Row) (*SnapshotSchedule, error) { + var schedule SnapshotSchedule + var configJSON string + var lastRunAt, nextRunAt sql.NullTime + var retentionCount, retentionDays sql.NullInt64 + var createdBy sql.NullString + + err := row.Scan( + &schedule.ID, &schedule.Name, &schedule.Dataset, &schedule.SnapshotNameTemplate, + &schedule.ScheduleType, &configJSON, &schedule.Recursive, &schedule.Enabled, + &retentionCount, &retentionDays, &lastRunAt, &nextRunAt, + &createdBy, &schedule.CreatedAt, &schedule.UpdatedAt, + ) + if err != nil { + return nil, err + } + + // Parse schedule config JSON + if err := json.Unmarshal([]byte(configJSON), &schedule.ScheduleConfig); err != nil { + return nil, fmt.Errorf("failed to unmarshal schedule config: %w", err) + } + + // Handle nullable fields + if lastRunAt.Valid { + schedule.LastRunAt = &lastRunAt.Time + } + if nextRunAt.Valid { + schedule.NextRunAt = &nextRunAt.Time + } + if retentionCount.Valid { + count := int(retentionCount.Int64) + schedule.RetentionCount = &count + } + if retentionDays.Valid { + days := int(retentionDays.Int64) + schedule.RetentionDays = &days + } + if createdBy.Valid { + schedule.CreatedBy = createdBy.String + } + + return &schedule, nil +} + +// scanSchedule scans a schedule from rows +func (s *SnapshotScheduleService) scanSchedule(rows *sql.Rows) (*SnapshotSchedule, error) { + var schedule SnapshotSchedule + var configJSON string + var lastRunAt, nextRunAt sql.NullTime + var retentionCount, retentionDays sql.NullInt64 + var createdBy sql.NullString + + err := rows.Scan( + &schedule.ID, &schedule.Name, &schedule.Dataset, &schedule.SnapshotNameTemplate, + &schedule.ScheduleType, &configJSON, &schedule.Recursive, &schedule.Enabled, + &retentionCount, &retentionDays, &lastRunAt, &nextRunAt, + &createdBy, &schedule.CreatedAt, &schedule.UpdatedAt, + ) + if err != nil { + return nil, err + } + + // Parse schedule config JSON + if err := json.Unmarshal([]byte(configJSON), &schedule.ScheduleConfig); err != nil { + return nil, fmt.Errorf("failed to unmarshal schedule config: %w", err) + } + + // Handle nullable fields + if lastRunAt.Valid { + schedule.LastRunAt = &lastRunAt.Time + } + if nextRunAt.Valid { + schedule.NextRunAt = &nextRunAt.Time + } + if retentionCount.Valid { + count := int(retentionCount.Int64) + schedule.RetentionCount = &count + } + if retentionDays.Valid { + days := int(retentionDays.Int64) + schedule.RetentionDays = &days + } + if createdBy.Valid { + schedule.CreatedBy = createdBy.String + } + + return &schedule, nil +} + +// calculateNextRun calculates the next run time based on schedule type and config +func (s *SnapshotScheduleService) calculateNextRun(scheduleType string, config map[string]interface{}) (*time.Time, error) { + now := time.Now() + + switch scheduleType { + case "hourly": + nextRun := now.Add(1 * time.Hour) + // Round to next hour + nextRun = nextRun.Truncate(time.Hour).Add(1 * time.Hour) + return &nextRun, nil + + case "daily": + timeStr, ok := config["time"].(string) + if !ok { + timeStr = "00:00" + } + // Parse time (HH:MM format) + hour, min := 0, 0 + fmt.Sscanf(timeStr, "%d:%d", &hour, &min) + nextRun := time.Date(now.Year(), now.Month(), now.Day(), hour, min, 0, 0, now.Location()) + if nextRun.Before(now) { + nextRun = nextRun.Add(24 * time.Hour) + } + return &nextRun, nil + + case "weekly": + dayOfWeek, ok := config["day"].(float64) // JSON numbers are float64 + if !ok { + dayOfWeek = 0 // Sunday + } + timeStr, ok := config["time"].(string) + if !ok { + timeStr = "00:00" + } + hour, min := 0, 0 + fmt.Sscanf(timeStr, "%d:%d", &hour, &min) + + // Calculate next occurrence + currentDay := int(now.Weekday()) + targetDay := int(dayOfWeek) + daysUntil := (targetDay - currentDay + 7) % 7 + if daysUntil == 0 { + // If same day, check if time has passed + today := time.Date(now.Year(), now.Month(), now.Day(), hour, min, 0, 0, now.Location()) + if today.Before(now) { + daysUntil = 7 + } + } + nextRun := time.Date(now.Year(), now.Month(), now.Day(), hour, min, 0, 0, now.Location()).AddDate(0, 0, daysUntil) + return &nextRun, nil + + case "monthly": + day, ok := config["day"].(float64) + if !ok { + day = 1 + } + timeStr, ok := config["time"].(string) + if !ok { + timeStr = "00:00" + } + hour, min := 0, 0 + fmt.Sscanf(timeStr, "%d:%d", &hour, &min) + + // Calculate next occurrence + nextRun := time.Date(now.Year(), now.Month(), int(day), hour, min, 0, 0, now.Location()) + if nextRun.Before(now) { + nextRun = nextRun.AddDate(0, 1, 0) + } + return &nextRun, nil + + case "cron": + // For cron, we'll use a simple implementation + // In production, you'd want to use a proper cron parser library + _, ok := config["cron"].(string) + if !ok { + return nil, fmt.Errorf("cron expression not found in config") + } + // Simple implementation: assume next run is in 1 hour + // TODO: Implement proper cron parsing + nextRun := now.Add(1 * time.Hour) + return &nextRun, nil + + default: + return nil, fmt.Errorf("unknown schedule type: %s", scheduleType) + } +} + +// GetDueSchedules retrieves all enabled schedules that are due to run +func (s *SnapshotScheduleService) GetDueSchedules(ctx context.Context) ([]*SnapshotSchedule, error) { + now := time.Now() + query := ` + SELECT id, name, dataset, snapshot_name_template, schedule_type, schedule_config, + recursive, enabled, retention_count, retention_days, + last_run_at, next_run_at, created_by, created_at, updated_at + FROM snapshot_schedules + WHERE enabled = true AND next_run_at <= $1 + ORDER BY next_run_at ASC + ` + + rows, err := s.db.QueryContext(ctx, query, now) + if err != nil { + return nil, fmt.Errorf("failed to query due schedules: %w", err) + } + defer rows.Close() + + var schedules []*SnapshotSchedule + for rows.Next() { + schedule, err := s.scanSchedule(rows) + if err != nil { + s.logger.Error("Failed to scan schedule", "error", err) + continue + } + schedules = append(schedules, schedule) + } + + return schedules, rows.Err() +} + +// ExecuteSchedule executes a snapshot schedule +func (s *SnapshotScheduleService) ExecuteSchedule(ctx context.Context, schedule *SnapshotSchedule) error { + now := time.Now() + + // Generate snapshot name from template + snapshotName := s.generateSnapshotName(schedule.SnapshotNameTemplate, now) + + // Create snapshot + if err := s.snapshotService.CreateSnapshot(ctx, schedule.Dataset, snapshotName, schedule.Recursive); err != nil { + return fmt.Errorf("failed to create snapshot: %w", err) + } + + s.logger.Info("Snapshot created from schedule", "schedule", schedule.Name, "snapshot", snapshotName, "dataset", schedule.Dataset) + + // Calculate next run time + nextRunAt, err := s.calculateNextRun(schedule.ScheduleType, schedule.ScheduleConfig) + if err != nil { + s.logger.Error("Failed to calculate next run time", "error", err, "schedule", schedule.Name) + // Set a default next run time (1 hour from now) if calculation fails + defaultNextRun := now.Add(1 * time.Hour) + nextRunAt = &defaultNextRun + } + + // Update schedule with last run time and next run time + query := ` + UPDATE snapshot_schedules + SET last_run_at = $1, next_run_at = $2, updated_at = NOW() + WHERE id = $3 + ` + if _, err := s.db.ExecContext(ctx, query, now, nextRunAt, schedule.ID); err != nil { + s.logger.Error("Failed to update schedule after execution", "error", err, "schedule", schedule.Name) + // Don't return error here - snapshot was created successfully + } + + // Handle retention + if err := s.handleRetention(ctx, schedule); err != nil { + s.logger.Error("Failed to handle retention", "error", err, "schedule", schedule.Name) + // Don't return error - retention is best effort + } + + return nil +} + +// generateSnapshotName generates a snapshot name from a template +func (s *SnapshotScheduleService) generateSnapshotName(template string, t time.Time) string { + // Replace common time format placeholders + name := template + name = strings.ReplaceAll(name, "%Y", t.Format("2006")) + name = strings.ReplaceAll(name, "%m", t.Format("01")) + name = strings.ReplaceAll(name, "%d", t.Format("02")) + name = strings.ReplaceAll(name, "%H", t.Format("15")) + name = strings.ReplaceAll(name, "%M", t.Format("04")) + name = strings.ReplaceAll(name, "%S", t.Format("05")) + name = strings.ReplaceAll(name, "%Y-%m-%d", t.Format("2006-01-02")) + name = strings.ReplaceAll(name, "%Y-%m-%d-%H%M", t.Format("2006-01-02-1504")) + name = strings.ReplaceAll(name, "%Y-%m-%d-%H%M%S", t.Format("2006-01-02-150405")) + + // If no placeholders were replaced, append timestamp + if name == template { + name = fmt.Sprintf("%s-%d", template, t.Unix()) + } + + return name +} + +// handleRetention handles snapshot retention based on schedule settings +func (s *SnapshotScheduleService) handleRetention(ctx context.Context, schedule *SnapshotSchedule) error { + // Get all snapshots for this dataset + snapshots, err := s.snapshotService.ListSnapshots(ctx, schedule.Dataset) + if err != nil { + return fmt.Errorf("failed to list snapshots: %w", err) + } + + // Filter snapshots that match this schedule's naming pattern + matchingSnapshots := make([]*Snapshot, 0) + for _, snapshot := range snapshots { + // Check if snapshot name matches the template pattern + if s.snapshotMatchesTemplate(snapshot.SnapshotName, schedule.SnapshotNameTemplate) { + matchingSnapshots = append(matchingSnapshots, snapshot) + } + } + + now := time.Now() + snapshotsToDelete := make([]*Snapshot, 0) + + // Apply retention_count if set + if schedule.RetentionCount != nil && *schedule.RetentionCount > 0 { + if len(matchingSnapshots) > *schedule.RetentionCount { + // Sort by creation time (oldest first) + sort.Slice(matchingSnapshots, func(i, j int) bool { + return matchingSnapshots[i].Created.Before(matchingSnapshots[j].Created) + }) + // Mark excess snapshots for deletion + excessCount := len(matchingSnapshots) - *schedule.RetentionCount + for i := 0; i < excessCount; i++ { + snapshotsToDelete = append(snapshotsToDelete, matchingSnapshots[i]) + } + } + } + + // Apply retention_days if set + if schedule.RetentionDays != nil && *schedule.RetentionDays > 0 { + cutoffTime := now.AddDate(0, 0, -*schedule.RetentionDays) + for _, snapshot := range matchingSnapshots { + if snapshot.Created.Before(cutoffTime) { + // Check if not already marked for deletion + alreadyMarked := false + for _, marked := range snapshotsToDelete { + if marked.ID == snapshot.ID { + alreadyMarked = true + break + } + } + if !alreadyMarked { + snapshotsToDelete = append(snapshotsToDelete, snapshot) + } + } + } + } + + // Delete snapshots + for _, snapshot := range snapshotsToDelete { + if err := s.snapshotService.DeleteSnapshot(ctx, snapshot.Name, schedule.Recursive); err != nil { + s.logger.Error("Failed to delete snapshot for retention", "error", err, "snapshot", snapshot.Name) + continue + } + s.logger.Info("Deleted snapshot for retention", "snapshot", snapshot.Name, "schedule", schedule.Name) + } + + return nil +} + +// snapshotMatchesTemplate checks if a snapshot name matches a template pattern +func (s *SnapshotScheduleService) snapshotMatchesTemplate(snapshotName, template string) bool { + if template == "" { + return false + } + + // Extract the base name from template (everything before the first %) + // This handles templates like "auto-%Y-%m-%d" or "daily-backup-%Y%m%d" + baseName := template + if idx := strings.Index(template, "%"); idx != -1 { + baseName = template[:idx] + } + + // If template starts with a placeholder, check if snapshot name matches the pattern + // by checking if it starts with any reasonable prefix + if baseName == "" { + // Template starts with placeholder - use a more lenient check + // Check if snapshot name matches common patterns + return true // Match all if template is just placeholders + } + + // Check if snapshot name starts with the base name + // This handles cases like template "auto-%Y-%m-%d" matching snapshot "auto-2024-01-15" + return strings.HasPrefix(snapshotName, baseName) +} diff --git a/backend/internal/storage/snapshot_schedule_worker.go b/backend/internal/storage/snapshot_schedule_worker.go new file mode 100644 index 0000000..1527fa2 --- /dev/null +++ b/backend/internal/storage/snapshot_schedule_worker.go @@ -0,0 +1,87 @@ +package storage + +import ( + "context" + "time" + + "github.com/atlasos/calypso/internal/common/database" + "github.com/atlasos/calypso/internal/common/logger" +) + +// SnapshotScheduleWorker handles periodic execution of snapshot schedules +type SnapshotScheduleWorker struct { + scheduleService *SnapshotScheduleService + logger *logger.Logger + interval time.Duration + stopCh chan struct{} +} + +// NewSnapshotScheduleWorker creates a new snapshot schedule worker +func NewSnapshotScheduleWorker(db *database.DB, log *logger.Logger, snapshotService *SnapshotService, interval time.Duration) *SnapshotScheduleWorker { + scheduleService := NewSnapshotScheduleService(db, log, snapshotService) + return &SnapshotScheduleWorker{ + scheduleService: scheduleService, + logger: log, + interval: interval, + stopCh: make(chan struct{}), + } +} + +// Start starts the snapshot schedule worker background service +func (w *SnapshotScheduleWorker) Start(ctx context.Context) { + w.logger.Info("Starting snapshot schedule worker", "interval", w.interval) + ticker := time.NewTicker(w.interval) + defer ticker.Stop() + + // Run initial check immediately + w.processSchedules(ctx) + + for { + select { + case <-ctx.Done(): + w.logger.Info("Snapshot schedule worker stopped") + return + case <-w.stopCh: + w.logger.Info("Snapshot schedule worker stopped") + return + case <-ticker.C: + w.processSchedules(ctx) + } + } +} + +// Stop stops the snapshot schedule worker service +func (w *SnapshotScheduleWorker) Stop() { + close(w.stopCh) +} + +// processSchedules processes all due snapshot schedules +func (w *SnapshotScheduleWorker) processSchedules(ctx context.Context) { + w.logger.Debug("Checking for due snapshot schedules") + + // Get all schedules that are due to run + schedules, err := w.scheduleService.GetDueSchedules(ctx) + if err != nil { + w.logger.Error("Failed to get due schedules", "error", err) + return + } + + if len(schedules) == 0 { + w.logger.Debug("No snapshot schedules due to run") + return + } + + w.logger.Info("Found due snapshot schedules", "count", len(schedules)) + + // Execute each schedule + for _, schedule := range schedules { + w.logger.Info("Executing snapshot schedule", "schedule", schedule.Name, "dataset", schedule.Dataset) + + if err := w.scheduleService.ExecuteSchedule(ctx, schedule); err != nil { + w.logger.Error("Failed to execute snapshot schedule", "error", err, "schedule", schedule.Name) + continue + } + + w.logger.Info("Successfully executed snapshot schedule", "schedule", schedule.Name) + } +} diff --git a/frontend/src/api/backup.ts b/frontend/src/api/backup.ts index 3d64b62..724ef46 100644 --- a/frontend/src/api/backup.ts +++ b/frontend/src/api/backup.ts @@ -51,6 +51,7 @@ export interface BackupClient { name: string uname?: string enabled: boolean + category?: 'File' | 'Database' | 'Virtual' auto_prune?: boolean file_retention?: number job_retention?: number @@ -68,6 +69,7 @@ export interface ListClientsResponse { export interface ListClientsParams { enabled?: boolean search?: string + category?: 'File' | 'Database' | 'Virtual' } export interface PoolStats { @@ -120,6 +122,7 @@ export const backupAPI = { const queryParams = new URLSearchParams() if (params?.enabled !== undefined) queryParams.append('enabled', params.enabled.toString()) if (params?.search) queryParams.append('search', params.search) + if (params?.category) queryParams.append('category', params.category) const response = await apiClient.get( `/backup/clients${queryParams.toString() ? `?${queryParams.toString()}` : ''}` diff --git a/frontend/src/api/objectStorage.ts b/frontend/src/api/objectStorage.ts index f638334..58afd46 100644 --- a/frontend/src/api/objectStorage.ts +++ b/frontend/src/api/objectStorage.ts @@ -90,6 +90,13 @@ export const objectStorageApi = { deleteServiceAccount: async (accessKey: string): Promise => { await apiClient.delete(`/object-storage/service-accounts/${encodeURIComponent(accessKey)}`) }, + + // Object browsing + listObjects: async (bucketName: string, prefix?: string): Promise => { + const params = prefix ? `?prefix=${encodeURIComponent(prefix)}` : '' + const response = await apiClient.get<{ objects: Object[] }>(`/object-storage/buckets/${encodeURIComponent(bucketName)}/objects${params}`) + return response.data.objects || [] + }, } export interface PoolDatasetInfo { @@ -143,3 +150,12 @@ export interface CreateServiceAccountRequest { policy?: string expiration?: string // ISO 8601 format } + +export interface Object { + name: string + key: string // Full path/key + size: number // Size in bytes (0 for folders) + last_modified: string + is_dir: boolean // True if this is a folder/prefix + etag?: string +} diff --git a/frontend/src/api/storage.ts b/frontend/src/api/storage.ts index 0e6173b..c0d772c 100644 --- a/frontend/src/api/storage.ts +++ b/frontend/src/api/storage.ts @@ -165,6 +165,121 @@ export const zfsApi = { }, } +// Snapshot interfaces and API +export interface Snapshot { + id: string + name: string // Full snapshot name (e.g., "pool/dataset@snapshot-name") + dataset: string // Dataset name (e.g., "pool/dataset") + snapshot_name: string // Snapshot name only (e.g., "snapshot-name") + created: string + referenced: number // Size in bytes + used: number // Used space in bytes + is_latest: boolean // Whether this is the latest snapshot for the dataset +} + +export interface CreateSnapshotRequest { + dataset: string + name: string + recursive?: boolean +} + +export interface CloneSnapshotRequest { + clone_name: string +} + +export const snapshotApi = { + listSnapshots: async (dataset?: string): Promise => { + const params = dataset ? `?dataset=${encodeURIComponent(dataset)}` : '' + const response = await apiClient.get<{ snapshots: Snapshot[] }>(`/storage/zfs/snapshots${params}`) + return response.data.snapshots || [] + }, + + createSnapshot: async (data: CreateSnapshotRequest): Promise => { + await apiClient.post('/storage/zfs/snapshots', data) + }, + + deleteSnapshot: async (snapshotName: string, recursive?: boolean): Promise => { + const params = recursive ? '?recursive=true' : '' + await apiClient.delete(`/storage/zfs/snapshots/${encodeURIComponent(snapshotName)}${params}`) + }, + + rollbackSnapshot: async (snapshotName: string, force?: boolean): Promise => { + await apiClient.post(`/storage/zfs/snapshots/${encodeURIComponent(snapshotName)}/rollback`, { force: force || false }) + }, + + cloneSnapshot: async (snapshotName: string, data: CloneSnapshotRequest): Promise => { + await apiClient.post(`/storage/zfs/snapshots/${encodeURIComponent(snapshotName)}/clone`, data) + }, +} + +// Snapshot Schedule interfaces and API +export interface SnapshotSchedule { + id: string + name: string + dataset: string + snapshot_name_template: string + schedule_type: 'hourly' | 'daily' | 'weekly' | 'monthly' | 'cron' + schedule_config: { + time?: string // For daily, weekly, monthly (HH:MM format) + day?: number // For weekly (0-6, Sunday=0), monthly (1-31) + cron?: string // For cron type + } + recursive: boolean + enabled: boolean + retention_count?: number + retention_days?: number + last_run_at?: string + next_run_at?: string + created_by?: string + created_at: string + updated_at: string +} + +export interface CreateSnapshotScheduleRequest { + name: string + dataset: string + snapshot_name_template: string + schedule_type: 'hourly' | 'daily' | 'weekly' | 'monthly' | 'cron' + schedule_config: { + time?: string + day?: number + cron?: string + } + recursive?: boolean + retention_count?: number + retention_days?: number +} + +export const snapshotScheduleApi = { + listSchedules: async (): Promise => { + const response = await apiClient.get<{ schedules: SnapshotSchedule[] }>('/storage/zfs/snapshot-schedules') + return response.data.schedules || [] + }, + + getSchedule: async (id: string): Promise => { + const response = await apiClient.get(`/storage/zfs/snapshot-schedules/${id}`) + return response.data + }, + + createSchedule: async (data: CreateSnapshotScheduleRequest): Promise => { + const response = await apiClient.post('/storage/zfs/snapshot-schedules', data) + return response.data + }, + + updateSchedule: async (id: string, data: CreateSnapshotScheduleRequest): Promise => { + const response = await apiClient.put(`/storage/zfs/snapshot-schedules/${id}`, data) + return response.data + }, + + deleteSchedule: async (id: string): Promise => { + await apiClient.delete(`/storage/zfs/snapshot-schedules/${id}`) + }, + + toggleSchedule: async (id: string, enabled: boolean): Promise => { + await apiClient.post(`/storage/zfs/snapshot-schedules/${id}/toggle`, { enabled }) + }, +} + export interface ZFSDataset { id: string name: string @@ -195,3 +310,100 @@ export interface ARCStats { collected_at: string } +// Replication Task interfaces and API +export interface ReplicationTask { + id: string + name: string + direction: 'outbound' | 'inbound' + source_dataset?: string + target_host?: string + target_port?: number + target_user?: string + target_dataset?: string + target_ssh_key_path?: string + source_host?: string + source_port?: number + source_user?: string + local_dataset?: string + schedule_type?: string + schedule_config?: { + time?: string + day?: number + cron?: string + } + compression: string + encryption: boolean + recursive: boolean + incremental: boolean + auto_snapshot: boolean + enabled: boolean + status: 'idle' | 'running' | 'failed' | 'paused' + last_run_at?: string + last_run_status?: 'success' | 'failed' | 'partial' + last_run_error?: string + next_run_at?: string + last_snapshot_sent?: string + last_snapshot_received?: string + total_runs: number + successful_runs: number + failed_runs: number + bytes_sent: number + bytes_received: number + created_by?: string + created_at: string + updated_at: string +} + +export interface CreateReplicationTaskRequest { + name: string + direction: 'outbound' | 'inbound' + source_dataset?: string + target_host?: string + target_port?: number + target_user?: string + target_dataset?: string + target_ssh_key_path?: string + source_host?: string + source_port?: number + source_user?: string + local_dataset?: string + schedule_type?: string + schedule_config?: { + time?: string + day?: number + cron?: string + } + compression?: string + encryption?: boolean + recursive?: boolean + incremental?: boolean + auto_snapshot?: boolean + enabled?: boolean +} + +export const replicationApi = { + listTasks: async (direction?: 'outbound' | 'inbound'): Promise => { + const params = direction ? `?direction=${direction}` : '' + const response = await apiClient.get<{ tasks: ReplicationTask[] }>(`/storage/zfs/replication-tasks${params}`) + return response.data.tasks || [] + }, + + getTask: async (id: string): Promise => { + const response = await apiClient.get(`/storage/zfs/replication-tasks/${id}`) + return response.data + }, + + createTask: async (data: CreateReplicationTaskRequest): Promise => { + const response = await apiClient.post('/storage/zfs/replication-tasks', data) + return response.data + }, + + updateTask: async (id: string, data: CreateReplicationTaskRequest): Promise => { + const response = await apiClient.put(`/storage/zfs/replication-tasks/${id}`, data) + return response.data + }, + + deleteTask: async (id: string): Promise => { + await apiClient.delete(`/storage/zfs/replication-tasks/${id}`) + }, +} diff --git a/frontend/src/pages/BackupManagement.tsx b/frontend/src/pages/BackupManagement.tsx index f1229db..c59776a 100644 --- a/frontend/src/pages/BackupManagement.tsx +++ b/frontend/src/pages/BackupManagement.tsx @@ -1066,6 +1066,7 @@ function BackupConsoleTab() { $ setCurrentCommand(e.target.value)} @@ -1091,18 +1092,46 @@ function BackupConsoleTab() { function ClientsManagementTab({ onSwitchToConsole }: { onSwitchToConsole?: () => void }) { const [searchQuery, setSearchQuery] = useState('') const [statusFilter, setStatusFilter] = useState('all') + const [categoryFilter, setCategoryFilter] = useState('all') const [expandedRows, setExpandedRows] = useState>(new Set()) const [selectAll, setSelectAll] = useState(false) const [selectedClients, setSelectedClients] = useState>(new Set()) + const [openMenuId, setOpenMenuId] = useState(null) + const [isStartingBackup, setIsStartingBackup] = useState(null) + const queryClient = useQueryClient() const { data, isLoading, error } = useQuery({ - queryKey: ['backup-clients', statusFilter, searchQuery], + queryKey: ['backup-clients', statusFilter, searchQuery, categoryFilter], queryFn: () => backupAPI.listClients({ enabled: statusFilter === 'all' ? undefined : statusFilter === 'enabled', search: searchQuery || undefined, + category: categoryFilter === 'all' ? undefined : categoryFilter as 'File' | 'Database' | 'Virtual', }), }) + // Mutation untuk start backup job + const startBackupMutation = useMutation({ + mutationFn: async (clientName: string) => { + // Gunakan bconsole command untuk run backup job + // Format: run job= client= + // Kita akan coba run job dengan nama yang sama dengan client name, atau "Backup-" + const jobName = `Backup-${clientName}` + const command = `run job=${jobName} client=${clientName} yes` + return backupAPI.executeBconsoleCommand(command) + }, + onSuccess: () => { + // Refresh clients list dan jobs list + queryClient.invalidateQueries({ queryKey: ['backup-clients'] }) + queryClient.invalidateQueries({ queryKey: ['backup-jobs'] }) + setIsStartingBackup(null) + }, + onError: (error: any) => { + console.error('Failed to start backup:', error) + setIsStartingBackup(null) + alert(`Failed to start backup: ${error?.response?.data?.details || error.message || 'Unknown error'}`) + }, + }) + const clients = data?.clients || [] const total = data?.total || 0 @@ -1188,6 +1217,81 @@ function ClientsManagementTab({ onSwitchToConsole }: { onSwitchToConsole?: () => } } + // Handler untuk start backup + const handleStartBackup = (client: any) => { + if (!client.name) { + alert('Client name is required') + return + } + setIsStartingBackup(client.client_id) + startBackupMutation.mutate(client.name) + } + + // Handler untuk edit config - redirect ke console dengan command + const handleEditConfig = (client: any) => { + const command = `show client=${client.name}` + if (onSwitchToConsole) { + onSwitchToConsole() + // Set command di console setelah switch menggunakan custom event + setTimeout(() => { + const consoleInput = document.querySelector('input[data-console-input]') as HTMLInputElement + if (consoleInput) { + // Use Object.defineProperty to set value and trigger React onChange + const nativeInputValueSetter = Object.getOwnPropertyDescriptor(window.HTMLInputElement.prototype, 'value')?.set + if (nativeInputValueSetter) { + nativeInputValueSetter.call(consoleInput, command) + const event = new Event('input', { bubbles: true }) + consoleInput.dispatchEvent(event) + } else { + consoleInput.value = command + consoleInput.dispatchEvent(new Event('input', { bubbles: true })) + } + consoleInput.focus() + } + }, 200) + } else { + // Fallback: buka console tab + const consoleTab = document.querySelector('[data-tab="console"]') as HTMLElement + if (consoleTab) { + consoleTab.click() + setTimeout(() => { + const consoleInput = document.querySelector('input[data-console-input]') as HTMLInputElement + if (consoleInput) { + const nativeInputValueSetter = Object.getOwnPropertyDescriptor(window.HTMLInputElement.prototype, 'value')?.set + if (nativeInputValueSetter) { + nativeInputValueSetter.call(consoleInput, command) + const event = new Event('input', { bubbles: true }) + consoleInput.dispatchEvent(event) + } else { + consoleInput.value = command + consoleInput.dispatchEvent(new Event('input', { bubbles: true })) + } + consoleInput.focus() + } + }, 200) + } + } + } + + // Handler untuk toggle dropdown menu + const toggleMenu = (clientId: number) => { + setOpenMenuId(openMenuId === clientId ? null : clientId) + } + + // Close menu when clicking outside + useEffect(() => { + const handleClickOutside = (event: MouseEvent) => { + if (openMenuId !== null) { + const target = event.target as HTMLElement + if (!target.closest('.dropdown-menu') && !target.closest('.menu-trigger')) { + setOpenMenuId(null) + } + } + } + document.addEventListener('mousedown', handleClickOutside) + return () => document.removeEventListener('mousedown', handleClickOutside) + }, [openMenuId]) + return ( <> @@ -1240,47 +1344,91 @@ function ClientsManagementTab({ onSwitchToConsole }: { onSwitchToConsole?: () => className="w-full bg-surface-highlight border border-border-dark text-white text-sm rounded-lg pl-10 pr-4 py-2 focus:ring-1 focus:ring-primary focus:border-primary outline-none placeholder-text-secondary/70 transition-all" /> -
- - - - +
+
+ + + + +
+
+ + + + +
@@ -1320,6 +1468,7 @@ function ClientsManagementTab({ onSwitchToConsole }: { onSwitchToConsole?: () => Client Name + Category Connection Status Last Backup @@ -1372,6 +1521,26 @@ function ClientsManagementTab({ onSwitchToConsole }: { onSwitchToConsole?: () =>
+ + {client.category ? ( + + {client.category === 'File' && folder} + {client.category === 'Database' && storage} + {client.category === 'Virtual' && computer} + {client.category} + + ) : ( + + File + + )} +

{connection.ip}

@@ -1414,22 +1583,139 @@ function ClientsManagementTab({ onSwitchToConsole }: { onSwitchToConsole?: () => -
- - - +
+ + {openMenuId === client.client_id && ( +
+ + + + +
+ +
+ )} +
{isExpanded && ( - +
Installed Agents & Plugins diff --git a/frontend/src/pages/ObjectStorage.tsx b/frontend/src/pages/ObjectStorage.tsx index c3f7a7e..ef7a175 100644 --- a/frontend/src/pages/ObjectStorage.tsx +++ b/frontend/src/pages/ObjectStorage.tsx @@ -1,7 +1,7 @@ import { useState } from 'react' import { useQuery, useMutation, useQueryClient } from '@tanstack/react-query' import { formatBytes } from '@/lib/format' -import { objectStorageApi, PoolDatasetInfo, CurrentSetup } from '@/api/objectStorage' +import { objectStorageApi, PoolDatasetInfo, CurrentSetup, Object } from '@/api/objectStorage' import UsersAndKeys from './UsersAndKeys' import { systemAPI } from '@/api/system' import { @@ -23,13 +23,17 @@ import { Filter, RefreshCw, Trash2, - AlertCircle + AlertCircle, + FolderOpen, + File, + ChevronRight, + ArrowLeft } from 'lucide-react' const S3_PORT = 9000 export default function ObjectStorage() { - const [activeTab, setActiveTab] = useState<'buckets' | 'users' | 'monitoring' | 'settings'>('buckets') + const [activeTab, setActiveTab] = useState<'buckets' | 'browse' | 'users' | 'monitoring' | 'settings'>('buckets') const [searchQuery, setSearchQuery] = useState('') const [currentPage, setCurrentPage] = useState(1) const [showSetupModal, setShowSetupModal] = useState(false) @@ -99,103 +103,63 @@ export default function ObjectStorage() { // Create bucket mutation with optimistic update const createBucketMutation = useMutation({ - mutationFn: (bucketName: string) => objectStorageApi.createBucket(bucketName), - onMutate: async (bucketName: string) => { - // Cancel any outgoing refetches + mutationFn: (name: string) => objectStorageApi.createBucket(name), + onMutate: async (name: string) => { await queryClient.cancelQueries({ queryKey: ['object-storage-buckets'] }) - - // Snapshot the previous value const previousBuckets = queryClient.getQueryData(['object-storage-buckets']) - - // Optimistically add the new bucket to the list queryClient.setQueryData(['object-storage-buckets'], (old: any[] = []) => { const newBucket = { - name: bucketName, + name, creation_date: new Date().toISOString(), size: 0, objects: 0, - access_policy: 'private' as const, + access_policy: 'private', } return [...old, newBucket] }) - - // Close modal immediately setShowCreateBucketModal(false) setNewBucketName('') - - // Return a context object with the snapshotted value return { previousBuckets } }, - onError: (error: any, _bucketName: string, context: any) => { - // If the mutation fails, use the context returned from onMutate to roll back + onError: (error: any, _name: string, context: any) => { if (context?.previousBuckets) { queryClient.setQueryData(['object-storage-buckets'], context.previousBuckets) } - // Reopen modal on error - setShowCreateBucketModal(true) + setShowCreateBucketModal(true) // Reopen modal on error alert(error.response?.data?.error || 'Failed to create bucket') }, onSuccess: () => { - // Refetch to ensure we have the latest data from server queryClient.invalidateQueries({ queryKey: ['object-storage-buckets'] }) alert('Bucket created successfully!') }, onSettled: () => { - // Always refetch after error or success to ensure consistency queryClient.invalidateQueries({ queryKey: ['object-storage-buckets'] }) }, }) - // Delete bucket mutation with optimistic update + // Delete bucket mutation const deleteBucketMutation = useMutation({ - mutationFn: (bucketName: string) => objectStorageApi.deleteBucket(bucketName), - onMutate: async (bucketName: string) => { - // Cancel any outgoing refetches - await queryClient.cancelQueries({ queryKey: ['object-storage-buckets'] }) - - // Snapshot the previous value - const previousBuckets = queryClient.getQueryData(['object-storage-buckets']) - - // Optimistically update to the new value - queryClient.setQueryData(['object-storage-buckets'], (old: any[] = []) => - old.filter((bucket: any) => bucket.name !== bucketName) - ) - - // Return a context object with the snapshotted value - return { previousBuckets } + mutationFn: (name: string) => objectStorageApi.deleteBucket(name), + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: ['object-storage-buckets'] }) + setDeleteConfirmBucket(null) + alert('Bucket deleted successfully!') }, - onError: (error: any, _bucketName: string, context: any) => { - // If the mutation fails, use the context returned from onMutate to roll back - if (context?.previousBuckets) { - queryClient.setQueryData(['object-storage-buckets'], context.previousBuckets) - } + onError: (error: any) => { alert(error.response?.data?.error || 'Failed to delete bucket') }, - onSuccess: () => { - // Refetch to ensure we have the latest data - queryClient.invalidateQueries({ queryKey: ['object-storage-buckets'] }) - }, - onSettled: () => { - // Always refetch after error or success to ensure consistency - queryClient.invalidateQueries({ queryKey: ['object-storage-buckets'] }) - }, }) - // Transform buckets from API to UI format - const transformedBuckets = buckets.map((bucket, index) => ({ - id: bucket.name, - name: bucket.name, - type: 'standard' as const, // Default type, can be enhanced later - usage: bucket.size, - usagePercent: 0, // Will be calculated if we have quota info - objects: bucket.objects, - accessPolicy: bucket.access_policy, - created: bucket.creation_date, - color: index % 3 === 0 ? 'blue' : index % 3 === 1 ? 'purple' : 'orange', + // Transform buckets data + const transformedBuckets = buckets.map((bucket) => ({ + ...bucket, + usage: bucket.size || 0, + objects: bucket.objects || 0, + accessPolicy: bucket.access_policy || 'private', })) - // Filter buckets by search query - const filteredBuckets = transformedBuckets.filter(bucket => + // Filter buckets + const filteredBuckets = transformedBuckets.filter((bucket) => bucket.name.toLowerCase().includes(searchQuery.toLowerCase()) ) @@ -210,30 +174,6 @@ export default function ObjectStorage() { const totalUsage = transformedBuckets.reduce((sum, b) => sum + b.usage, 0) const totalObjects = transformedBuckets.reduce((sum, b) => sum + b.objects, 0) - // Copy endpoint to clipboard - const copyEndpoint = async () => { - try { - await navigator.clipboard.writeText(S3_ENDPOINT) - alert('Endpoint copied to clipboard!') - } catch (error) { - console.error('Failed to copy endpoint:', error) - // Fallback: select text - const textArea = document.createElement('textarea') - textArea.value = S3_ENDPOINT - textArea.style.position = 'fixed' - textArea.style.left = '-999999px' - document.body.appendChild(textArea) - textArea.select() - try { - document.execCommand('copy') - alert('Endpoint copied to clipboard!') - } catch (err) { - alert(`Failed to copy. Endpoint: ${S3_ENDPOINT}`) - } - document.body.removeChild(textArea) - } - } - // Get bucket icon const getBucketIcon = (bucket: typeof transformedBuckets[0]) => { if (bucket.accessPolicy === 'public-read' || bucket.accessPolicy === 'public-read-write') { @@ -242,26 +182,6 @@ export default function ObjectStorage() { return } - // Get bucket color class - const getBucketColorClass = (color: string) => { - const colors: Record = { - blue: 'bg-blue-500/10 text-blue-500', - purple: 'bg-purple-500/10 text-purple-500', - orange: 'bg-orange-500/10 text-orange-500', - } - return colors[color] || colors.blue - } - - // Get progress bar color - const getProgressColor = (color: string) => { - const colors: Record = { - blue: 'bg-primary', - purple: 'bg-purple-500', - orange: 'bg-orange-500', - } - return colors[color] || colors.blue - } - // Get access policy badge const getAccessPolicyBadge = (policy: string) => { if (policy === 'public-read-write') { @@ -297,7 +217,7 @@ export default function ObjectStorage() { await navigator.clipboard.writeText(text) alert(`${label} copied to clipboard!`) } catch (error) { - console.error('Failed to copy:', error) + console.error(`Failed to copy ${label}:`, error) // Fallback: select text const textArea = document.createElement('textarea') textArea.value = text @@ -316,7 +236,7 @@ export default function ObjectStorage() { } return ( -
+
{/* Main Content */}
@@ -428,279 +348,239 @@ export default function ObjectStorage() {

Uptime

- +
-

24d 12h

-

Since last patch

+

99.9%

+

Last 30 days

- {/* Endpoint Info */} -
-
-
-
- -

S3 Endpoint URL

-
-

- Use this URL to configure your S3 clients (MinIO, AWS CLI, Veeam, etc.). -

-
-
-
- - -
-
-
- - {/* Tabs & Main Action Area */} -
- {/* Tabs Header */} -
-
- - - - -
-
- - {/* Toolbar */} -
-
- - setSearchQuery(e.target.value)} - /> -
+ {/* Tabs Header */} +
+
+ + + +
+
- {/* Tab Content */} - {activeTab === 'buckets' && ( - <> - {/* Data Table */} -
+ {/* Tab Content */} + {activeTab === 'browse' && } + {activeTab === 'buckets' && ( + <> + {/* Buckets Section */} +
+ {/* Search and Create */} +
+
+ + { + setSearchQuery(e.target.value) + setCurrentPage(1) // Reset to first page on search + }} + className="w-full pl-10 pr-4 py-2 bg-[#1c2936] border border-border-dark rounded-lg text-white placeholder-text-secondary focus:outline-none focus:ring-2 focus:ring-primary focus:border-transparent" + /> +
+ +
+ + {/* Buckets Table */} +
- - - - - {paginatedSnapshots.map((snapshot) => ( - - - - - - - + - ))} + ) : paginatedSnapshots.length === 0 ? ( + + + + ) : ( + paginatedSnapshots.map((snapshot) => ( + + + + + + + + + )) + )}
+ + + + - - - - - {bucketsLoading ? ( - - + ) : paginatedBuckets.length === 0 ? ( - - + ) : ( paginatedBuckets.map((bucket) => ( - - + + + + + + - - - - - - + + )) )}
Name + Usage + Objects + Access Policy + Created + Actions
+
Loading buckets...
- No buckets found +
+ {searchQuery ? 'No buckets found matching your search.' : 'No buckets yet. Create your first bucket to get started.'}
-
-
+
+
{getBucketIcon(bucket)} +
+ {bucket.name} + standard +
-
-

{bucket.name}

-

{bucket.type}

+
{formatBytes(bucket.usage, 1)}{bucket.objects}{getAccessPolicyBadge(bucket.accessPolicy)}{formatDate(bucket.creation_date)} +
+ + +
- -
-
-
- {formatBytes(bucket.usage, 1)} -
-
-
-
-
-
-

{bucket.objects.toLocaleString()}

-
{getAccessPolicyBadge(bucket.accessPolicy)} -

{formatDate(bucket.created)}

-
-
- - - -
-
{/* Pagination */} -
-

- Showing {paginatedBuckets.length > 0 ? (currentPage - 1) * itemsPerPage + 1 : 0} to{' '} - - {Math.min(currentPage * itemsPerPage, filteredBuckets.length)} - {' '} - of {filteredBuckets.length} buckets -

-
- - + {totalPages > 1 && ( +
+
+ Showing {(currentPage - 1) * itemsPerPage + 1} to {Math.min(currentPage * itemsPerPage, filteredBuckets.length)} of {filteredBuckets.length} buckets +
+
+ + +
-
+ )}
- - )} - - {activeTab === 'users' && } - - {activeTab === 'monitoring' && ( -
- -

Monitoring dashboard coming soon

- )} + + )} - {activeTab === 'settings' && ( -
- -

Settings configuration coming soon

-
- )} - + {activeTab === 'users' && } + + {activeTab === 'monitoring' && ( +
+ +

Monitoring dashboard coming soon

+
+ )} + + {activeTab === 'settings' && ( +
+ +

Settings configuration coming soon

+
+ )} -
{/* Create Bucket Modal */} @@ -719,28 +599,25 @@ export default function ObjectStorage() { ✕ -
- + setNewBucketName(e.target.value)} - placeholder="e.g., my-bucket" - className="w-full bg-[#233648] border border-border-dark rounded-lg px-4 py-2 text-white text-sm focus:ring-1 focus:ring-primary focus:border-primary outline-none" + placeholder="Enter bucket name" + className="w-full px-4 py-2 bg-[#233648] border border-border-dark rounded-lg text-white placeholder-text-secondary focus:outline-none focus:ring-2 focus:ring-primary" onKeyDown={(e) => { if (e.key === 'Enter' && newBucketName.trim()) { createBucketMutation.mutate(newBucketName.trim()) } }} - autoFocus /> -

- Bucket names must be unique and follow S3 naming conventions (lowercase, numbers, hyphens only) +

+ Bucket names must be unique and follow S3 naming conventions

-
)} + {/* Delete Bucket Confirmation Dialog */} + {deleteConfirmBucket && ( +
+
+
+

Confirm Delete Bucket

+ +
+
+
+
+ +
+

Warning

+

+ This action cannot be undone. All objects within the bucket "{deleteConfirmBucket}" will be permanently deleted. +

+
+
+
+
+ + +
+
+
+
+ )} + {/* Setup Modal */} {showSetupModal && (
@@ -785,174 +708,387 @@ export default function ObjectStorage() { ✕
- -
+
- +
-
- -
+ {selectedPool && ( +
+ +
+ + +
- {createNewDataset ? ( -
- - setNewDatasetName(e.target.value)} - placeholder="e.g., object-storage" - className="w-full bg-[#233648] border border-border-dark rounded-lg px-4 py-2 text-white text-sm focus:ring-1 focus:ring-primary focus:border-primary outline-none" - /> -
- ) : ( -
- - setSelectedDataset(e.target.value)} + className="w-full px-4 py-2 bg-[#233648] border border-border-dark rounded-lg text-white focus:outline-none focus:ring-2 focus:ring-primary" + > + + {availableDatasets + .find((p) => p.pool_id === selectedPool) + ?.datasets.map((dataset) => ( + ))} - -
- )} - - {currentSetup && ( -
-

Current Configuration:

-

- Dataset: {currentSetup.dataset_path} -

-

- Mount Point: {currentSetup.mount_point} -

-

- ⚠️ Warning: Changing the dataset will update MinIO configuration. Existing data in the current dataset will NOT be automatically migrated. Make sure to backup or migrate data before changing. -

+ + ) : ( + setNewDatasetName(e.target.value)} + placeholder="Enter new dataset name" + className="w-full px-4 py-2 bg-[#233648] border border-border-dark rounded-lg text-white placeholder-text-secondary focus:outline-none focus:ring-2 focus:ring-primary" + /> + )}
)}
)} - {/* Delete Bucket Confirmation Dialog */} - {deleteConfirmBucket && ( -
-
-
-
- -
-
-

Delete Bucket

-

This action cannot be undone

-
-
-
-

- Are you sure you want to delete bucket {deleteConfirmBucket}? -

-

- All objects in this bucket will be permanently deleted. This action cannot be undone. -

-
-
- - -
-
-
- )}
) } +// Bucket Browser Component +function BucketBrowser({ S3_ENDPOINT }: { S3_ENDPOINT: string }) { + const [selectedBucket, setSelectedBucket] = useState(null) + const [currentPath, setCurrentPath] = useState('') + const queryClient = useQueryClient() + + // Fetch buckets + const { data: buckets = [] } = useQuery({ + queryKey: ['object-storage-buckets'], + queryFn: objectStorageApi.listBuckets, + refetchInterval: 5000, // Auto-refresh every 5 seconds + staleTime: 0, + }) + + // Fetch objects in selected bucket + const { data: objects = [], isLoading: objectsLoading } = useQuery({ + queryKey: ['object-storage-objects', selectedBucket, currentPath], + queryFn: () => { + if (!selectedBucket) return Promise.resolve([]) + return objectStorageApi.listObjects(selectedBucket, currentPath || undefined) + }, + enabled: !!selectedBucket, + refetchInterval: 5000, // Auto-refresh every 5 seconds + staleTime: 0, + }) + + const navigateToFolder = (folderKey: string) => { + setCurrentPath(folderKey) + } + + const navigateUp = () => { + if (!currentPath) { + setSelectedBucket(null) + return + } + // Remove last folder from path + const parts = currentPath.split('/').filter(p => p) + if (parts.length > 0) { + parts.pop() + setCurrentPath(parts.length > 0 ? parts.join('/') + '/' : '') + } else { + setCurrentPath('') + } + } + + const getBreadcrumbs = () => { + const crumbs = [{ name: selectedBucket || '', path: '' }] + if (currentPath) { + const parts = currentPath.split('/').filter(p => p) + let path = '' + parts.forEach(part => { + path += part + '/' + crumbs.push({ name: part, path }) + }) + } + return crumbs + } + + const formatDate = (dateString: string) => { + if (!dateString) return '-' + const date = new Date(dateString) + return date.toLocaleString('en-US', { + month: 'short', + day: 'numeric', + year: 'numeric', + hour: '2-digit', + minute: '2-digit' + }) + } + + if (!selectedBucket) { + return ( +
+
+

Select a Bucket to Browse

+
+
+ {buckets.map(bucket => ( + + ))} +
+ {buckets.length === 0 && ( +
+ +

No buckets available

+
+ )} +
+ ) + } + + const breadcrumbs = getBreadcrumbs() + + return ( +
+ {/* Header with breadcrumbs */} +
+
+ + + {breadcrumbs.map((crumb, index) => ( +
+ {index > 0 && } + +
+ ))} +
+ +
+ + {/* Objects table */} +
+
+ + + + + + + + + + + {currentPath && ( + + + + + + + )} + {objectsLoading ? ( + + + + ) : objects.length === 0 ? ( + + + + ) : ( + objects.map((obj) => ( + + + + + + + )) + )} + +
+ Name + + Size + + Last Modified + + Actions +
+ + --
+ Loading... +
+ {currentPath ? 'This folder is empty' : 'This bucket is empty'} +
+ {obj.is_dir ? ( + + ) : ( +
+ + {obj.name} +
+ )} +
+ {obj.is_dir ? '-' : formatBytes(obj.size, 1)} + + {formatDate(obj.last_modified)} + + {!obj.is_dir && ( +
+ +
+ )} +
+
+
+
+ ) +} + +// Helper function for copy to clipboard (reuse from main component) +function copyToClipboard(text: string, label: string) { + navigator.clipboard.writeText(text).then(() => { + alert(`${label} copied to clipboard!`) + }).catch(() => { + alert(`Failed to copy. ${label}: ${text}`) + }) +} \ No newline at end of file diff --git a/frontend/src/pages/SnapshotReplication.tsx b/frontend/src/pages/SnapshotReplication.tsx index fe66798..a136573 100644 --- a/frontend/src/pages/SnapshotReplication.tsx +++ b/frontend/src/pages/SnapshotReplication.tsx @@ -1,13 +1,12 @@ -import { useState } from 'react' -import { useQuery } from '@tanstack/react-query' +import { useState, useEffect } from 'react' +import { useQuery, useMutation, useQueryClient } from '@tanstack/react-query' import { formatBytes } from '@/lib/format' -import { zfsApi } from '@/api/storage' +import { zfsApi, snapshotApi, snapshotScheduleApi, replicationApi, type Snapshot, type CreateSnapshotRequest, type SnapshotSchedule, type CreateSnapshotScheduleRequest, type ReplicationTask, type CreateReplicationTaskRequest } from '@/api/storage' import { Camera, History, Plus, Search, - Filter, Calendar, RotateCcw, Copy, @@ -19,98 +18,64 @@ import { AlertCircle, MoreVertical, X, - Save + Save, + Edit } from 'lucide-react' import { Link } from 'react-router-dom' import { ChevronRight } from 'lucide-react' import { Button } from '@/components/ui/button' -// Mock data - will be replaced with API calls -const MOCK_SNAPSHOTS = [ - { - id: '1', - name: 'auto-2023-10-27-0000', - dataset: 'tank/home', - created: '2023-10-27T00:00:00Z', - referenced: 1.2 * 1024 * 1024, // 1.2 MB - isLatest: true, - }, - { - id: '2', - name: 'manual-backup-pre-upgrade', - dataset: 'tank/services/db', - created: '2023-10-26T16:30:00Z', - referenced: 4.5 * 1024 * 1024 * 1024, // 4.5 GB - isLatest: false, - }, - { - id: '3', - name: 'auto-2023-10-26-0000', - dataset: 'tank/home', - created: '2023-10-26T00:00:00Z', - referenced: 850 * 1024, // 850 KB - isLatest: false, - }, - { - id: '4', - name: 'auto-2023-10-25-0000', - dataset: 'tank/home', - created: '2023-10-25T00:00:00Z', - referenced: 1.1 * 1024 * 1024, // 1.1 MB - isLatest: false, - }, - { - id: '5', - name: 'auto-2023-10-24-0000', - dataset: 'tank/home', - created: '2023-10-24T00:00:00Z', - referenced: 920 * 1024, // 920 KB - isLatest: false, - }, -] - -const MOCK_REPLICATIONS = [ - { - id: '1', - name: 'Daily Offsite (tank/backup)', - target: '192.168.20.5 (ssh)', - status: 'idle', - lastRun: '15m ago', - progress: 0, - }, - { - id: '2', - name: 'Hourly Sync (tank/projects)', - target: '192.168.20.5 (ssh)', - status: 'running', - lastRun: 'Running...', - progress: 45, - }, -] export default function SnapshotReplication() { - const [activeTab, setActiveTab] = useState<'snapshots' | 'replication' | 'restore'>('snapshots') + const [activeTab, setActiveTab] = useState<'snapshots' | 'schedules' | 'replication' | 'restore'>('snapshots') const [searchQuery, setSearchQuery] = useState('') const [currentPage, setCurrentPage] = useState(1) const [showCreateReplication, setShowCreateReplication] = useState(false) const [showCreateSnapshot, setShowCreateSnapshot] = useState(false) + const [showCreateSchedule, setShowCreateSchedule] = useState(false) + const [editingSchedule, setEditingSchedule] = useState(null) + const [snapshotForm, setSnapshotForm] = useState({ + dataset: '', + name: '', + recursive: false, + }) + const [scheduleForm, setScheduleForm] = useState({ + name: '', + dataset: '', + snapshot_name_template: 'auto-%Y-%m-%d-%H%M', + schedule_type: 'daily', + schedule_config: { time: '00:00' }, + recursive: false, + }) + const [selectedDatasetFilter, setSelectedDatasetFilter] = useState('') + const [deleteConfirmSnapshot, setDeleteConfirmSnapshot] = useState(null) + const [rollbackConfirmSnapshot, setRollbackConfirmSnapshot] = useState(null) const itemsPerPage = 10 + const queryClient = useQueryClient() // Form state for replication - const [replicationForm, setReplicationForm] = useState({ + const [replicationForm, setReplicationForm] = useState({ name: '', - sourceDataset: '', - targetHost: '', - targetDataset: '', - targetPort: '22', - targetUser: 'root', - schedule: 'daily', - scheduleTime: '00:00', + direction: 'outbound', + source_dataset: '', + target_host: '', + target_port: 22, + target_user: 'root', + target_dataset: '', + source_host: '', + source_port: 22, + source_user: 'root', + local_dataset: '', + schedule_type: 'daily', + schedule_config: { time: '00:00' }, compression: 'lz4', + recursive: false, + incremental: true, + auto_snapshot: true, encryption: false, - recursive: true, - autoSnapshot: true, + enabled: true, }) + const [editingReplication, setEditingReplication] = useState(null) // Fetch pools and datasets for form const { data: pools = [] } = useQuery({ @@ -118,26 +83,170 @@ export default function SnapshotReplication() { queryFn: zfsApi.listPools, }) + // Fetch snapshots from API + const { data: snapshots = [], isLoading: snapshotsLoading, refetch: refetchSnapshots } = useQuery({ + queryKey: ['snapshots', selectedDatasetFilter], + queryFn: () => snapshotApi.listSnapshots(selectedDatasetFilter || undefined), + refetchInterval: 5000, // Auto-refresh every 5 seconds + staleTime: 0, + }) + + // Fetch snapshot schedules + const { data: schedules = [], isLoading: schedulesLoading, refetch: refetchSchedules } = useQuery({ + queryKey: ['snapshot-schedules'], + queryFn: snapshotScheduleApi.listSchedules, + refetchInterval: 5000, + staleTime: 0, + }) + + // Fetch replication tasks + const { data: allReplicationTasks = [], isLoading: replicationLoading, refetch: refetchReplications } = useQuery({ + queryKey: ['replication-tasks'], + queryFn: () => replicationApi.listTasks(), + refetchInterval: 5000, + staleTime: 0, + }) + + // Separate outbound and inbound replication tasks + const outboundTasks = allReplicationTasks.filter(task => task.direction === 'outbound') + const inboundTasks = allReplicationTasks.filter(task => task.direction === 'inbound') + const [datasets, setDatasets] = useState>([]) // Fetch datasets for selected pool const fetchDatasets = async (poolName: string) => { try { - const poolDatasets = await zfsApi.listDatasets(poolName) + const pool = pools.find(p => p.name === poolName || p.id === poolName) + if (!pool) return + + // Check if datasets already loaded for this pool + const existing = datasets.find(d => d.pool === poolName || d.pool === pool.id || d.pool === pool.name) + if (existing) return + + const poolDatasets = await zfsApi.listDatasets(pool.id) setDatasets((prev) => { - const filtered = prev.filter((d) => d.pool !== poolName) - return [...filtered, { pool: poolName, datasets: poolDatasets }] + const filtered = prev.filter((d) => d.pool !== poolName && d.pool !== pool.id && d.pool !== pool.name) + return [...filtered, { pool: pool.name, datasets: poolDatasets }] }) } catch (error) { console.error('Failed to fetch datasets:', error) } } + // Fetch datasets for all pools when modal opens + const fetchAllDatasets = async () => { + if (pools.length === 0) return + + for (const pool of pools) { + try { + // Check if already loaded + const existing = datasets.find(d => d.pool === pool.name || d.pool === pool.id) + if (existing) continue + + const poolDatasets = await zfsApi.listDatasets(pool.id) + setDatasets((prev) => { + const filtered = prev.filter((d) => d.pool !== pool.name && d.pool !== pool.id) + return [...filtered, { pool: pool.name, datasets: poolDatasets }] + }) + } catch (error) { + console.error(`Failed to fetch datasets for pool ${pool.name}:`, error) + } + } + } + + // Fetch datasets when modal opens + useEffect(() => { + if (showCreateSnapshot && pools.length > 0) { + fetchAllDatasets() + } + }, [showCreateSnapshot, pools.length]) + + // Load datasets when schedule modal opens + useEffect(() => { + if (showCreateSchedule && pools.length > 0) { + fetchAllDatasets() + } + }, [showCreateSchedule, pools.length]) + + // Populate form when editing schedule + useEffect(() => { + if (editingSchedule) { + setScheduleForm({ + name: editingSchedule.name, + dataset: editingSchedule.dataset, + snapshot_name_template: editingSchedule.snapshot_name_template, + schedule_type: editingSchedule.schedule_type, + schedule_config: editingSchedule.schedule_config, + recursive: editingSchedule.recursive, + retention_count: editingSchedule.retention_count, + retention_days: editingSchedule.retention_days, + }) + } else if (!showCreateSchedule) { + // Reset form when modal closes + setScheduleForm({ + name: '', + dataset: '', + snapshot_name_template: 'auto-%Y-%m-%d-%H%M', + schedule_type: 'daily', + schedule_config: { time: '00:00' }, + recursive: false, + }) + } + }, [editingSchedule, showCreateSchedule]) + + // Create snapshot mutation + const createSnapshotMutation = useMutation({ + mutationFn: (data: CreateSnapshotRequest) => snapshotApi.createSnapshot(data), + onSuccess: async () => { + // Close modal first + setShowCreateSnapshot(false) + setSnapshotForm({ dataset: '', name: '', recursive: false }) + + // Invalidate and refetch snapshots immediately + await queryClient.invalidateQueries({ queryKey: ['snapshots'] }) + await refetchSnapshots() + + alert('Snapshot created successfully!') + }, + onError: (error: any) => { + alert(error.response?.data?.error || 'Failed to create snapshot') + }, + }) + + // Delete snapshot mutation + const deleteSnapshotMutation = useMutation({ + mutationFn: ({ name, recursive }: { name: string; recursive?: boolean }) => + snapshotApi.deleteSnapshot(name, recursive), + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: ['snapshots'] }) + setDeleteConfirmSnapshot(null) + alert('Snapshot deleted successfully!') + }, + onError: (error: any) => { + alert(error.response?.data?.error || 'Failed to delete snapshot') + }, + }) + + // Rollback snapshot mutation + const rollbackSnapshotMutation = useMutation({ + mutationFn: ({ name, force }: { name: string; force?: boolean }) => + snapshotApi.rollbackSnapshot(name, force), + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: ['snapshots'] }) + setRollbackConfirmSnapshot(null) + alert('Snapshot rollback completed successfully!') + }, + onError: (error: any) => { + alert(error.response?.data?.error || 'Failed to rollback snapshot') + }, + }) + // Filter snapshots - const filteredSnapshots = MOCK_SNAPSHOTS.filter((snapshot) => { + const filteredSnapshots = snapshots.filter((snapshot) => { const matchesSearch = snapshot.name.toLowerCase().includes(searchQuery.toLowerCase()) || - snapshot.dataset.toLowerCase().includes(searchQuery.toLowerCase()) + snapshot.dataset.toLowerCase().includes(searchQuery.toLowerCase()) || + snapshot.snapshot_name.toLowerCase().includes(searchQuery.toLowerCase()) return matchesSearch }) @@ -149,8 +258,8 @@ export default function SnapshotReplication() { ) // Calculate totals - const totalSnapshots = MOCK_SNAPSHOTS.length - const totalReclaimable = MOCK_SNAPSHOTS.reduce((sum, s) => sum + s.referenced, 0) + const totalSnapshots = snapshots.length + const totalReclaimable = snapshots.reduce((sum, s) => sum + s.referenced, 0) // Format date const formatDate = (dateString: string) => { @@ -210,6 +319,9 @@ export default function SnapshotReplication() { onClick={() => { if (activeTab === 'replication') { setShowCreateReplication(true) + } else if (activeTab === 'schedules') { + setEditingSchedule(null) + setShowCreateSchedule(true) } else { setShowCreateSnapshot(true) } @@ -217,7 +329,11 @@ export default function SnapshotReplication() { className="px-4 py-2 bg-primary hover:bg-blue-600 text-white text-sm font-bold rounded-lg shadow-[0_4px_12px_rgba(19,127,236,0.3)] transition-all flex items-center gap-2" > - {activeTab === 'replication' ? 'Create Replication' : 'Create Snapshot'} + {activeTab === 'replication' + ? 'Create Replication' + : activeTab === 'schedules' + ? 'Create Schedule' + : 'Create Snapshot'} @@ -280,6 +396,22 @@ export default function SnapshotReplication() { Snapshots + +
- - -
- - {snapshot.name} - {snapshot.isLatest && ( - - LATEST - - )} -
-
- {snapshot.dataset} - - {formatDate(snapshot.created)} - - {formatBytes(snapshot.referenced)} - -
- - - -
+ {snapshotsLoading ? ( +
+ Loading snapshots...
+ {searchQuery ? 'No snapshots found matching your search.' : 'No snapshots yet. Create your first snapshot to get started.'} +
+ + +
+ + {snapshot.snapshot_name} + {snapshot.is_latest && ( + + LATEST + + )} +
+
+ {snapshot.dataset} + + {formatDate(snapshot.created)} + + {formatBytes(snapshot.referenced)} + +
+ + + +
+
@@ -456,58 +628,361 @@ export default function SnapshotReplication() { )} - {activeTab === 'replication' && ( + {activeTab === 'schedules' && (
+

Snapshot Schedules

+ +
+ {schedulesLoading ? ( +
Loading schedules...
+ ) : schedules.length === 0 ? ( +
+ No schedules configured. Create your first schedule to automate snapshot creation. +
+ ) : ( +
+ {schedules.map((schedule) => ( +
+
+
+
+

{schedule.name}

+ + {schedule.schedule_type} + +
+

+ Dataset: {schedule.dataset} • Template: {schedule.snapshot_name_template} +

+ {schedule.next_run_at && ( +

+ Next run: {new Date(schedule.next_run_at).toLocaleString()} +

+ )} +
+
+ + + +
+
+ ))} +
+ )} +
+ )} + + {activeTab === 'replication' && ( +
+

Replication Tasks

-
- {MOCK_REPLICATIONS.map((replication) => ( -
-
-
-

{replication.name}

-

Target: {replication.target}

- {replication.status === 'running' && ( -
-
-
- )} -
-
-

- {replication.status === 'running' ? `${replication.progress}%` : 'Idle'} -

-

{replication.lastRun}

-
- + + {/* Outbound Replication Section */} +
+
+ +

Outbound Replication

+ (Sending to remote) +
+ {replicationLoading ? ( +
Loading replication tasks...
+ ) : outboundTasks.length === 0 ? ( +
+ No outbound replication tasks configured.
- ))} + ) : ( +
+ {outboundTasks.map((task) => ( +
+
+
+
+

{task.name}

+ {!task.enabled && ( + + Disabled + + )} +
+

+ Source: {task.source_dataset} → Target: {task.target_host}:{task.target_port} ({task.target_dataset}) +

+ {task.last_run_at && ( +

+ Last run: {new Date(task.last_run_at).toLocaleString()} + {task.last_run_status && ( + + ({task.last_run_status}) + + )} +

+ )} +
+
+ + +
+
+ ))} +
+ )} +
+ + {/* Inbound Replication Section */} +
+
+ +

Inbound Replication

+ (Receiving from remote) +
+ {replicationLoading ? ( +
Loading replication tasks...
+ ) : inboundTasks.length === 0 ? ( +
+ No inbound replication tasks configured. +
+ ) : ( +
+ {inboundTasks.map((task) => ( +
+
+
+
+

{task.name}

+ {!task.enabled && ( + + Disabled + + )} +
+

+ Source: {task.source_host}:{task.source_port} ({task.source_dataset}) → Local: {task.local_dataset} +

+ {task.last_run_at && ( +

+ Last run: {new Date(task.last_run_at).toLocaleString()} + {task.last_run_status && ( + + ({task.last_run_status}) + + )} +

+ )} +
+
+ + +
+
+ ))} +
+ )}
)} @@ -531,42 +1006,47 @@ export default function SnapshotReplication() {
- {MOCK_REPLICATIONS.map((replication) => ( + {allReplicationTasks.slice(0, 3).map((task) => (
-

{replication.name}

-

Target: {replication.target}

- {replication.status === 'running' && ( -
-
-
- )} +

{task.name}

+

+ {task.direction === 'outbound' + ? `Target: ${task.target_host}:${task.target_port}` + : `Source: ${task.source_host}:${task.source_port}`} +

- {replication.status === 'running' ? `${replication.progress}%` : 'Idle'} + {task.status === 'running' ? 'Running' : task.status}

-

{replication.lastRun}

+ {task.last_run_at && ( +

+ {new Date(task.last_run_at).toLocaleString()} +

+ )}
))} + {allReplicationTasks.length === 0 && ( +

No replication tasks

+ )}
@@ -598,9 +1078,35 @@ export default function SnapshotReplication() { onClick={(e) => e.stopPropagation()} >
-

Create Replication Task

+

+ {editingReplication ? 'Edit Replication Task' : 'Create Replication Task'} +

- {/* Source Dataset */} + {/* Direction */}
- +
- {/* Target Configuration */} -
-
- - setReplicationForm({ ...replicationForm, targetHost: e.target.value })} - /> -
-
- - setReplicationForm({ ...replicationForm, targetPort: e.target.value })} - /> -
-
+ {/* Outbound Fields */} + {replicationForm.direction === 'outbound' && ( + <> + {/* Source Dataset */} +
+ + +
+ + )} -
-
- - setReplicationForm({ ...replicationForm, targetUser: e.target.value })} - /> -
-
- - setReplicationForm({ ...replicationForm, targetDataset: e.target.value })} - /> -
-
+ {/* Inbound Fields */} + {replicationForm.direction === 'inbound' && ( + <> + {/* Source Host */} +
+ + setReplicationForm({ ...replicationForm, source_host: e.target.value || undefined })} + /> +
+ {/* Source Dataset for Inbound */} +
+ + setReplicationForm({ ...replicationForm, source_dataset: e.target.value || undefined })} + /> +
+ {/* Local Dataset */} +
+ + +
+ + )} + + {/* Target Configuration (for Outbound) */} + {replicationForm.direction === 'outbound' && ( + <> +
+
+ + setReplicationForm({ ...replicationForm, target_host: e.target.value || undefined })} + /> +
+
+ + setReplicationForm({ ...replicationForm, target_port: parseInt(e.target.value) || 22 })} + /> +
+
+ +
+
+ + setReplicationForm({ ...replicationForm, target_user: e.target.value || undefined })} + /> +
+
+ + setReplicationForm({ ...replicationForm, target_dataset: e.target.value || undefined })} + /> +
+
+ + )} {/* Schedule */}
@@ -702,14 +1287,14 @@ export default function SnapshotReplication() {
@@ -717,8 +1302,11 @@ export default function SnapshotReplication() { setReplicationForm({ ...replicationForm, scheduleTime: e.target.value })} + value={replicationForm.schedule_config?.time || '00:00'} + onChange={(e) => setReplicationForm({ + ...replicationForm, + schedule_config: { ...replicationForm.schedule_config, time: e.target.value } + })} />
@@ -752,8 +1340,8 @@ export default function SnapshotReplication() {
@@ -814,7 +1420,10 @@ export default function SnapshotReplication() { {showCreateSnapshot && (
setShowCreateSnapshot(false)} + onClick={() => { + setShowCreateSnapshot(false) + setSnapshotForm({ dataset: '', name: '', recursive: false }) + }} >

Create Snapshot

+
+ + +
setSnapshotForm({ ...snapshotForm, name: e.target.value })} />
-
- - +
+ setSnapshotForm({ ...snapshotForm, recursive: e.target.checked })} + /> +
)} + + {/* Delete Snapshot Confirmation Dialog */} + {deleteConfirmSnapshot && ( +
+
+
+

Confirm Delete Snapshot

+ +
+
+
+
+ +
+

Warning

+

+ This action cannot be undone. Snapshot "{deleteConfirmSnapshot}" will be permanently deleted. +

+
+
+
+
+ + +
+
+
+
+ )} + + {/* Rollback Snapshot Confirmation Dialog */} + {rollbackConfirmSnapshot && ( +
+
+
+

Confirm Rollback to Snapshot

+ +
+
+
+
+ +
+

Warning

+

+ Rolling back to snapshot "{rollbackConfirmSnapshot}" will destroy all changes made after this snapshot was created. This action cannot be undone. +

+
+
+
+
+ + +
+
+
+
+ )} + + {/* Create/Edit Schedule Modal */} + {showCreateSchedule && ( +
+
+
+
+

+ {editingSchedule ? 'Edit Schedule' : 'Create Snapshot Schedule'} +

+ +
+
+
+ {/* Name */} +
+ + setScheduleForm({ ...scheduleForm, name: e.target.value })} + placeholder="e.g., Daily Backup" + className="w-full px-4 py-2 bg-[#111a22] border border-[#2a3b4d] rounded-lg text-white placeholder-text-secondary focus:outline-none focus:border-primary" + /> +
+ + {/* Dataset */} +
+ + +
+ + {/* Snapshot Name Template */} +
+ + setScheduleForm({ ...scheduleForm, snapshot_name_template: e.target.value })} + placeholder="e.g., auto-%Y-%m-%d-%H%M or daily-backup" + className="w-full px-4 py-2 bg-[#111a22] border border-[#2a3b4d] rounded-lg text-white placeholder-text-secondary focus:outline-none focus:border-primary" + /> +

+ Use %Y, %m, %d, %H, %M for date/time formatting (e.g., %Y-%m-%d-%H%M) +

+
+ + {/* Schedule Type */} +
+ + +
+ + {/* Schedule Config */} + {scheduleForm.schedule_type === 'daily' && ( +
+ + + setScheduleForm({ + ...scheduleForm, + schedule_config: { ...scheduleForm.schedule_config, time: e.target.value }, + }) + } + className="w-full px-4 py-2 bg-[#111a22] border border-[#2a3b4d] rounded-lg text-white focus:outline-none focus:border-primary" + /> +
+ )} + + {scheduleForm.schedule_type === 'weekly' && ( + <> +
+ + +
+
+ + + setScheduleForm({ + ...scheduleForm, + schedule_config: { ...scheduleForm.schedule_config, time: e.target.value }, + }) + } + className="w-full px-4 py-2 bg-[#111a22] border border-[#2a3b4d] rounded-lg text-white focus:outline-none focus:border-primary" + /> +
+ + )} + + {scheduleForm.schedule_type === 'monthly' && ( + <> +
+ + + setScheduleForm({ + ...scheduleForm, + schedule_config: { ...scheduleForm.schedule_config, day: parseInt(e.target.value) }, + }) + } + className="w-full px-4 py-2 bg-[#111a22] border border-[#2a3b4d] rounded-lg text-white focus:outline-none focus:border-primary" + /> +
+
+ + + setScheduleForm({ + ...scheduleForm, + schedule_config: { ...scheduleForm.schedule_config, time: e.target.value }, + }) + } + className="w-full px-4 py-2 bg-[#111a22] border border-[#2a3b4d] rounded-lg text-white focus:outline-none focus:border-primary" + /> +
+ + )} + + {scheduleForm.schedule_type === 'cron' && ( +
+ + + setScheduleForm({ + ...scheduleForm, + schedule_config: { ...scheduleForm.schedule_config, cron: e.target.value }, + }) + } + placeholder="0 0 * * *" + className="w-full px-4 py-2 bg-[#111a22] border border-[#2a3b4d] rounded-lg text-white placeholder-text-secondary focus:outline-none focus:border-primary" + /> +

Format: minute hour day month weekday

+
+ )} + + {/* Recursive */} +
+ setScheduleForm({ ...scheduleForm, recursive: e.target.checked })} + className="w-4 h-4 rounded border-[#2a3b4d] bg-[#111a22] text-primary focus:ring-primary" + /> + +
+ + {/* Retention Count */} +
+ + + setScheduleForm({ + ...scheduleForm, + retention_count: e.target.value ? parseInt(e.target.value) : undefined, + }) + } + placeholder="Keep last N snapshots (leave empty for unlimited)" + className="w-full px-4 py-2 bg-[#111a22] border border-[#2a3b4d] rounded-lg text-white placeholder-text-secondary focus:outline-none focus:border-primary" + /> +
+ + {/* Retention Days */} +
+ + + setScheduleForm({ + ...scheduleForm, + retention_days: e.target.value ? parseInt(e.target.value) : undefined, + }) + } + placeholder="Keep snapshots for N days (leave empty for unlimited)" + className="w-full px-4 py-2 bg-[#111a22] border border-[#2a3b4d] rounded-lg text-white placeholder-text-secondary focus:outline-none focus:border-primary" + /> +
+ + {/* Buttons */} +
+ + +
+
+
+
+ )}
) }