add some changes

This commit is contained in:
2026-01-15 09:44:57 +00:00
parent 9b1f85479b
commit 1d9406c93a
19 changed files with 4922 additions and 887 deletions

View File

@@ -15,14 +15,17 @@ import (
// Handler handles storage-related API requests
type Handler struct {
diskService *DiskService
lvmService *LVMService
zfsService *ZFSService
arcService *ARCService
taskEngine *tasks.Engine
db *database.DB
logger *logger.Logger
cache *cache.Cache // Cache for invalidation
diskService *DiskService
lvmService *LVMService
zfsService *ZFSService
snapshotService *SnapshotService
snapshotScheduleService *SnapshotScheduleService
replicationService *ReplicationService
arcService *ARCService
taskEngine *tasks.Engine
db *database.DB
logger *logger.Logger
cache *cache.Cache // Cache for invalidation
}
// SetCache sets the cache instance for cache invalidation
@@ -32,14 +35,18 @@ func (h *Handler) SetCache(c *cache.Cache) {
// NewHandler creates a new storage handler
func NewHandler(db *database.DB, log *logger.Logger) *Handler {
snapshotService := NewSnapshotService(db, log)
return &Handler{
diskService: NewDiskService(db, log),
lvmService: NewLVMService(db, log),
zfsService: NewZFSService(db, log),
arcService: NewARCService(log),
taskEngine: tasks.NewEngine(db, log),
db: db,
logger: log,
diskService: NewDiskService(db, log),
lvmService: NewLVMService(db, log),
zfsService: NewZFSService(db, log),
snapshotService: snapshotService,
snapshotScheduleService: NewSnapshotScheduleService(db, log, snapshotService),
replicationService: NewReplicationService(db, log),
arcService: NewARCService(log),
taskEngine: tasks.NewEngine(db, log),
db: db,
logger: log,
}
}
@@ -509,3 +516,417 @@ func (h *Handler) GetARCStats(c *gin.Context) {
c.JSON(http.StatusOK, stats)
}
// ListSnapshots lists all snapshots, optionally filtered by dataset
func (h *Handler) ListSnapshots(c *gin.Context) {
datasetFilter := c.DefaultQuery("dataset", "")
snapshots, err := h.snapshotService.ListSnapshots(c.Request.Context(), datasetFilter)
if err != nil {
h.logger.Error("Failed to list snapshots", "error", err, "dataset", datasetFilter)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list snapshots: " + err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"snapshots": snapshots})
}
// CreateSnapshotRequest represents a request to create a snapshot
type CreateSnapshotRequest struct {
Dataset string `json:"dataset" binding:"required"`
Name string `json:"name" binding:"required"`
Recursive bool `json:"recursive"`
}
// CreateSnapshot creates a new snapshot
func (h *Handler) CreateSnapshot(c *gin.Context) {
var req CreateSnapshotRequest
if err := c.ShouldBindJSON(&req); err != nil {
h.logger.Error("Invalid create snapshot request", "error", err)
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request: " + err.Error()})
return
}
if err := h.snapshotService.CreateSnapshot(c.Request.Context(), req.Dataset, req.Name, req.Recursive); err != nil {
h.logger.Error("Failed to create snapshot", "error", err, "dataset", req.Dataset, "name", req.Name)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to create snapshot: " + err.Error()})
return
}
c.JSON(http.StatusCreated, gin.H{"message": "snapshot created successfully"})
}
// DeleteSnapshot deletes a snapshot
func (h *Handler) DeleteSnapshot(c *gin.Context) {
snapshotName := c.Param("name")
recursive := c.DefaultQuery("recursive", "false") == "true"
if err := h.snapshotService.DeleteSnapshot(c.Request.Context(), snapshotName, recursive); err != nil {
h.logger.Error("Failed to delete snapshot", "error", err, "snapshot", snapshotName)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete snapshot: " + err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"message": "snapshot deleted successfully"})
}
// RollbackSnapshotRequest represents a request to rollback to a snapshot
type RollbackSnapshotRequest struct {
Force bool `json:"force"`
}
// RollbackSnapshot rolls back a dataset to a snapshot
func (h *Handler) RollbackSnapshot(c *gin.Context) {
snapshotName := c.Param("name")
var req RollbackSnapshotRequest
if err := c.ShouldBindJSON(&req); err != nil {
// Default to false if not provided
req.Force = false
}
if err := h.snapshotService.RollbackSnapshot(c.Request.Context(), snapshotName, req.Force); err != nil {
h.logger.Error("Failed to rollback snapshot", "error", err, "snapshot", snapshotName)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to rollback snapshot: " + err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"message": "snapshot rollback completed successfully"})
}
// CloneSnapshotRequest represents a request to clone a snapshot
type CloneSnapshotRequest struct {
CloneName string `json:"clone_name" binding:"required"`
}
// CloneSnapshot clones a snapshot to a new dataset
func (h *Handler) CloneSnapshot(c *gin.Context) {
snapshotName := c.Param("name")
var req CloneSnapshotRequest
if err := c.ShouldBindJSON(&req); err != nil {
h.logger.Error("Invalid clone snapshot request", "error", err)
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request: " + err.Error()})
return
}
if err := h.snapshotService.CloneSnapshot(c.Request.Context(), snapshotName, req.CloneName); err != nil {
h.logger.Error("Failed to clone snapshot", "error", err, "snapshot", snapshotName, "clone", req.CloneName)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to clone snapshot: " + err.Error()})
return
}
c.JSON(http.StatusCreated, gin.H{"message": "snapshot cloned successfully", "clone_name": req.CloneName})
}
// ListSnapshotSchedules lists all snapshot schedules
func (h *Handler) ListSnapshotSchedules(c *gin.Context) {
schedules, err := h.snapshotScheduleService.ListSchedules(c.Request.Context())
if err != nil {
h.logger.Error("Failed to list snapshot schedules", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list snapshot schedules: " + err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"schedules": schedules})
}
// GetSnapshotSchedule retrieves a snapshot schedule by ID
func (h *Handler) GetSnapshotSchedule(c *gin.Context) {
id := c.Param("id")
schedule, err := h.snapshotScheduleService.GetSchedule(c.Request.Context(), id)
if err != nil {
if err.Error() == "schedule not found" {
c.JSON(http.StatusNotFound, gin.H{"error": "schedule not found"})
return
}
h.logger.Error("Failed to get snapshot schedule", "error", err, "id", id)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to get snapshot schedule: " + err.Error()})
return
}
c.JSON(http.StatusOK, schedule)
}
// CreateSnapshotScheduleRequest represents a request to create a snapshot schedule
type CreateSnapshotScheduleRequest struct {
Name string `json:"name" binding:"required"`
Dataset string `json:"dataset" binding:"required"`
SnapshotNameTemplate string `json:"snapshot_name_template" binding:"required"`
ScheduleType string `json:"schedule_type" binding:"required"`
ScheduleConfig map[string]interface{} `json:"schedule_config" binding:"required"`
Recursive bool `json:"recursive"`
RetentionCount *int `json:"retention_count"`
RetentionDays *int `json:"retention_days"`
}
// CreateSnapshotSchedule creates a new snapshot schedule
func (h *Handler) CreateSnapshotSchedule(c *gin.Context) {
var req CreateSnapshotScheduleRequest
if err := c.ShouldBindJSON(&req); err != nil {
h.logger.Error("Invalid create snapshot schedule request", "error", err)
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request: " + err.Error()})
return
}
userID, _ := c.Get("user_id")
userIDStr := ""
if userID != nil {
userIDStr = userID.(string)
}
schedule, err := h.snapshotScheduleService.CreateSchedule(c.Request.Context(), &CreateScheduleRequest{
Name: req.Name,
Dataset: req.Dataset,
SnapshotNameTemplate: req.SnapshotNameTemplate,
ScheduleType: req.ScheduleType,
ScheduleConfig: req.ScheduleConfig,
Recursive: req.Recursive,
RetentionCount: req.RetentionCount,
RetentionDays: req.RetentionDays,
}, userIDStr)
if err != nil {
h.logger.Error("Failed to create snapshot schedule", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to create snapshot schedule: " + err.Error()})
return
}
c.JSON(http.StatusCreated, schedule)
}
// UpdateSnapshotSchedule updates an existing snapshot schedule
func (h *Handler) UpdateSnapshotSchedule(c *gin.Context) {
id := c.Param("id")
var req CreateSnapshotScheduleRequest
if err := c.ShouldBindJSON(&req); err != nil {
h.logger.Error("Invalid update snapshot schedule request", "error", err)
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request: " + err.Error()})
return
}
schedule, err := h.snapshotScheduleService.UpdateSchedule(c.Request.Context(), id, &CreateScheduleRequest{
Name: req.Name,
Dataset: req.Dataset,
SnapshotNameTemplate: req.SnapshotNameTemplate,
ScheduleType: req.ScheduleType,
ScheduleConfig: req.ScheduleConfig,
Recursive: req.Recursive,
RetentionCount: req.RetentionCount,
RetentionDays: req.RetentionDays,
})
if err != nil {
if err.Error() == "schedule not found" {
c.JSON(http.StatusNotFound, gin.H{"error": "schedule not found"})
return
}
h.logger.Error("Failed to update snapshot schedule", "error", err, "id", id)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update snapshot schedule: " + err.Error()})
return
}
c.JSON(http.StatusOK, schedule)
}
// DeleteSnapshotSchedule deletes a snapshot schedule
func (h *Handler) DeleteSnapshotSchedule(c *gin.Context) {
id := c.Param("id")
if err := h.snapshotScheduleService.DeleteSchedule(c.Request.Context(), id); err != nil {
if err.Error() == "schedule not found" {
c.JSON(http.StatusNotFound, gin.H{"error": "schedule not found"})
return
}
h.logger.Error("Failed to delete snapshot schedule", "error", err, "id", id)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete snapshot schedule: " + err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"message": "snapshot schedule deleted successfully"})
}
// ToggleSnapshotSchedule enables or disables a snapshot schedule
func (h *Handler) ToggleSnapshotSchedule(c *gin.Context) {
id := c.Param("id")
var req struct {
Enabled bool `json:"enabled" binding:"required"`
}
if err := c.ShouldBindJSON(&req); err != nil {
h.logger.Error("Invalid toggle snapshot schedule request", "error", err)
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request: " + err.Error()})
return
}
if err := h.snapshotScheduleService.ToggleSchedule(c.Request.Context(), id, req.Enabled); err != nil {
if err.Error() == "schedule not found" {
c.JSON(http.StatusNotFound, gin.H{"error": "schedule not found"})
return
}
h.logger.Error("Failed to toggle snapshot schedule", "error", err, "id", id)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to toggle snapshot schedule: " + err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"message": "snapshot schedule toggled successfully"})
}
// ListReplicationTasks lists all replication tasks
func (h *Handler) ListReplicationTasks(c *gin.Context) {
direction := c.Query("direction") // Optional filter: "outbound" or "inbound"
tasks, err := h.replicationService.ListReplicationTasks(c.Request.Context(), direction)
if err != nil {
h.logger.Error("Failed to list replication tasks", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list replication tasks: " + err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"tasks": tasks})
}
// GetReplicationTask retrieves a replication task by ID
func (h *Handler) GetReplicationTask(c *gin.Context) {
id := c.Param("id")
task, err := h.replicationService.GetReplicationTask(c.Request.Context(), id)
if err != nil {
if err.Error() == "replication task not found" {
c.JSON(http.StatusNotFound, gin.H{"error": "replication task not found"})
return
}
h.logger.Error("Failed to get replication task", "error", err, "id", id)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to get replication task: " + err.Error()})
return
}
c.JSON(http.StatusOK, task)
}
// CreateReplicationTaskRequest represents a request to create a replication task
type CreateReplicationTaskRequest struct {
Name string `json:"name" binding:"required"`
Direction string `json:"direction" binding:"required"`
SourceDataset *string `json:"source_dataset"`
TargetHost *string `json:"target_host"`
TargetPort *int `json:"target_port"`
TargetUser *string `json:"target_user"`
TargetDataset *string `json:"target_dataset"`
TargetSSHKeyPath *string `json:"target_ssh_key_path"`
SourceHost *string `json:"source_host"`
SourcePort *int `json:"source_port"`
SourceUser *string `json:"source_user"`
LocalDataset *string `json:"local_dataset"`
ScheduleType *string `json:"schedule_type"`
ScheduleConfig map[string]interface{} `json:"schedule_config"`
Compression string `json:"compression"`
Encryption bool `json:"encryption"`
Recursive bool `json:"recursive"`
Incremental bool `json:"incremental"`
AutoSnapshot bool `json:"auto_snapshot"`
Enabled bool `json:"enabled"`
}
// CreateReplicationTask creates a new replication task
func (h *Handler) CreateReplicationTask(c *gin.Context) {
var req CreateReplicationTaskRequest
if err := c.ShouldBindJSON(&req); err != nil {
h.logger.Error("Invalid create replication task request", "error", err)
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request: " + err.Error()})
return
}
userID, _ := c.Get("user_id")
userIDStr := ""
if userID != nil {
userIDStr = userID.(string)
}
task, err := h.replicationService.CreateReplicationTask(c.Request.Context(), &CreateReplicationRequest{
Name: req.Name,
Direction: req.Direction,
SourceDataset: req.SourceDataset,
TargetHost: req.TargetHost,
TargetPort: req.TargetPort,
TargetUser: req.TargetUser,
TargetDataset: req.TargetDataset,
TargetSSHKeyPath: req.TargetSSHKeyPath,
SourceHost: req.SourceHost,
SourcePort: req.SourcePort,
SourceUser: req.SourceUser,
LocalDataset: req.LocalDataset,
ScheduleType: req.ScheduleType,
ScheduleConfig: req.ScheduleConfig,
Compression: req.Compression,
Encryption: req.Encryption,
Recursive: req.Recursive,
Incremental: req.Incremental,
AutoSnapshot: req.AutoSnapshot,
Enabled: req.Enabled,
}, userIDStr)
if err != nil {
h.logger.Error("Failed to create replication task", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to create replication task: " + err.Error()})
return
}
c.JSON(http.StatusCreated, task)
}
// UpdateReplicationTask updates an existing replication task
func (h *Handler) UpdateReplicationTask(c *gin.Context) {
id := c.Param("id")
var req CreateReplicationTaskRequest
if err := c.ShouldBindJSON(&req); err != nil {
h.logger.Error("Invalid update replication task request", "error", err)
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request: " + err.Error()})
return
}
task, err := h.replicationService.UpdateReplicationTask(c.Request.Context(), id, &CreateReplicationRequest{
Name: req.Name,
Direction: req.Direction,
SourceDataset: req.SourceDataset,
TargetHost: req.TargetHost,
TargetPort: req.TargetPort,
TargetUser: req.TargetUser,
TargetDataset: req.TargetDataset,
TargetSSHKeyPath: req.TargetSSHKeyPath,
SourceHost: req.SourceHost,
SourcePort: req.SourcePort,
SourceUser: req.SourceUser,
LocalDataset: req.LocalDataset,
ScheduleType: req.ScheduleType,
ScheduleConfig: req.ScheduleConfig,
Compression: req.Compression,
Encryption: req.Encryption,
Recursive: req.Recursive,
Incremental: req.Incremental,
AutoSnapshot: req.AutoSnapshot,
Enabled: req.Enabled,
})
if err != nil {
if err.Error() == "replication task not found" {
c.JSON(http.StatusNotFound, gin.H{"error": "replication task not found"})
return
}
h.logger.Error("Failed to update replication task", "error", err, "id", id)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update replication task: " + err.Error()})
return
}
c.JSON(http.StatusOK, task)
}
// DeleteReplicationTask deletes a replication task
func (h *Handler) DeleteReplicationTask(c *gin.Context) {
id := c.Param("id")
if err := h.replicationService.DeleteReplicationTask(c.Request.Context(), id); err != nil {
if err.Error() == "replication task not found" {
c.JSON(http.StatusNotFound, gin.H{"error": "replication task not found"})
return
}
h.logger.Error("Failed to delete replication task", "error", err, "id", id)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete replication task: " + err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"message": "replication task deleted successfully"})
}

View File

@@ -0,0 +1,553 @@
package storage
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"time"
"github.com/atlasos/calypso/internal/common/database"
"github.com/atlasos/calypso/internal/common/logger"
"github.com/google/uuid"
)
// ReplicationService handles ZFS replication task management
type ReplicationService struct {
db *database.DB
logger *logger.Logger
}
// NewReplicationService creates a new replication service
func NewReplicationService(db *database.DB, log *logger.Logger) *ReplicationService {
return &ReplicationService{
db: db,
logger: log,
}
}
// ReplicationTask represents a ZFS replication task
type ReplicationTask struct {
ID string `json:"id"`
Name string `json:"name"`
Direction string `json:"direction"` // "outbound" or "inbound"
SourceDataset *string `json:"source_dataset,omitempty"`
TargetHost *string `json:"target_host,omitempty"`
TargetPort *int `json:"target_port,omitempty"`
TargetUser *string `json:"target_user,omitempty"`
TargetDataset *string `json:"target_dataset,omitempty"`
TargetSSHKeyPath *string `json:"target_ssh_key_path,omitempty"`
SourceHost *string `json:"source_host,omitempty"`
SourcePort *int `json:"source_port,omitempty"`
SourceUser *string `json:"source_user,omitempty"`
LocalDataset *string `json:"local_dataset,omitempty"`
ScheduleType *string `json:"schedule_type,omitempty"`
ScheduleConfig map[string]interface{} `json:"schedule_config,omitempty"`
Compression string `json:"compression"`
Encryption bool `json:"encryption"`
Recursive bool `json:"recursive"`
Incremental bool `json:"incremental"`
AutoSnapshot bool `json:"auto_snapshot"`
Enabled bool `json:"enabled"`
Status string `json:"status"`
LastRunAt *time.Time `json:"last_run_at,omitempty"`
LastRunStatus *string `json:"last_run_status,omitempty"`
LastRunError *string `json:"last_run_error,omitempty"`
NextRunAt *time.Time `json:"next_run_at,omitempty"`
LastSnapshotSent *string `json:"last_snapshot_sent,omitempty"`
LastSnapshotReceived *string `json:"last_snapshot_received,omitempty"`
TotalRuns int `json:"total_runs"`
SuccessfulRuns int `json:"successful_runs"`
FailedRuns int `json:"failed_runs"`
BytesSent int64 `json:"bytes_sent"`
BytesReceived int64 `json:"bytes_received"`
CreatedBy string `json:"created_by,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// CreateReplicationRequest represents a request to create a replication task
type CreateReplicationRequest struct {
Name string `json:"name" binding:"required"`
Direction string `json:"direction" binding:"required"` // "outbound" or "inbound"
SourceDataset *string `json:"source_dataset"`
TargetHost *string `json:"target_host"`
TargetPort *int `json:"target_port"`
TargetUser *string `json:"target_user"`
TargetDataset *string `json:"target_dataset"`
TargetSSHKeyPath *string `json:"target_ssh_key_path"`
SourceHost *string `json:"source_host"`
SourcePort *int `json:"source_port"`
SourceUser *string `json:"source_user"`
LocalDataset *string `json:"local_dataset"`
ScheduleType *string `json:"schedule_type"`
ScheduleConfig map[string]interface{} `json:"schedule_config"`
Compression string `json:"compression"`
Encryption bool `json:"encryption"`
Recursive bool `json:"recursive"`
Incremental bool `json:"incremental"`
AutoSnapshot bool `json:"auto_snapshot"`
Enabled bool `json:"enabled"`
}
// ListReplicationTasks lists all replication tasks, optionally filtered by direction
func (s *ReplicationService) ListReplicationTasks(ctx context.Context, directionFilter string) ([]*ReplicationTask, error) {
var query string
var args []interface{}
if directionFilter != "" {
query = `
SELECT id, name, direction, source_dataset, target_host, target_port, target_user, target_dataset,
target_ssh_key_path, source_host, source_port, source_user, local_dataset,
schedule_type, schedule_config, compression, encryption, recursive, incremental,
auto_snapshot, enabled, status, last_run_at, last_run_status, last_run_error,
next_run_at, last_snapshot_sent, last_snapshot_received,
total_runs, successful_runs, failed_runs, bytes_sent, bytes_received,
created_by, created_at, updated_at
FROM replication_tasks
WHERE direction = $1
ORDER BY created_at DESC
`
args = []interface{}{directionFilter}
} else {
query = `
SELECT id, name, direction, source_dataset, target_host, target_port, target_user, target_dataset,
target_ssh_key_path, source_host, source_port, source_user, local_dataset,
schedule_type, schedule_config, compression, encryption, recursive, incremental,
auto_snapshot, enabled, status, last_run_at, last_run_status, last_run_error,
next_run_at, last_snapshot_sent, last_snapshot_received,
total_runs, successful_runs, failed_runs, bytes_sent, bytes_received,
created_by, created_at, updated_at
FROM replication_tasks
ORDER BY direction, created_at DESC
`
args = []interface{}{}
}
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("failed to query replication tasks: %w", err)
}
defer rows.Close()
var tasks []*ReplicationTask
for rows.Next() {
task, err := s.scanReplicationTask(rows)
if err != nil {
s.logger.Error("Failed to scan replication task", "error", err)
continue
}
tasks = append(tasks, task)
}
return tasks, rows.Err()
}
// GetReplicationTask retrieves a replication task by ID
func (s *ReplicationService) GetReplicationTask(ctx context.Context, id string) (*ReplicationTask, error) {
query := `
SELECT id, name, direction, source_dataset, target_host, target_port, target_user, target_dataset,
target_ssh_key_path, source_host, source_port, source_user, local_dataset,
schedule_type, schedule_config, compression, encryption, recursive, incremental,
auto_snapshot, enabled, status, last_run_at, last_run_status, last_run_error,
next_run_at, last_snapshot_sent, last_snapshot_received,
total_runs, successful_runs, failed_runs, bytes_sent, bytes_received,
created_by, created_at, updated_at
FROM replication_tasks
WHERE id = $1
`
row := s.db.QueryRowContext(ctx, query, id)
task, err := s.scanReplicationTaskRow(row)
if err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("replication task not found")
}
return nil, fmt.Errorf("failed to get replication task: %w", err)
}
return task, nil
}
// CreateReplicationTask creates a new replication task
func (s *ReplicationService) CreateReplicationTask(ctx context.Context, req *CreateReplicationRequest, createdBy string) (*ReplicationTask, error) {
id := uuid.New().String()
// Validate direction-specific fields
if req.Direction == "outbound" {
if req.SourceDataset == nil || req.TargetHost == nil || req.TargetDataset == nil {
return nil, fmt.Errorf("outbound replication requires source_dataset, target_host, and target_dataset")
}
} else if req.Direction == "inbound" {
if req.SourceHost == nil || req.SourceDataset == nil || req.LocalDataset == nil {
return nil, fmt.Errorf("inbound replication requires source_host, source_dataset, and local_dataset")
}
} else {
return nil, fmt.Errorf("invalid direction: must be 'outbound' or 'inbound'")
}
// Set defaults
if req.Compression == "" {
req.Compression = "lz4"
}
if req.TargetPort == nil {
defaultPort := 22
req.TargetPort = &defaultPort
}
if req.SourcePort == nil {
defaultPort := 22
req.SourcePort = &defaultPort
}
if req.TargetUser == nil {
defaultUser := "root"
req.TargetUser = &defaultUser
}
if req.SourceUser == nil {
defaultUser := "root"
req.SourceUser = &defaultUser
}
// Marshal schedule config to JSON
var scheduleConfigJSON sql.NullString
if req.ScheduleConfig != nil {
configJSON, err := json.Marshal(req.ScheduleConfig)
if err != nil {
return nil, fmt.Errorf("failed to marshal schedule config: %w", err)
}
scheduleConfigJSON = sql.NullString{String: string(configJSON), Valid: true}
}
query := `
INSERT INTO replication_tasks (
id, name, direction, source_dataset, target_host, target_port, target_user, target_dataset,
target_ssh_key_path, source_host, source_port, source_user, local_dataset,
schedule_type, schedule_config, compression, encryption, recursive, incremental,
auto_snapshot, enabled, status, created_by, created_at, updated_at
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, NOW(), NOW()
)
RETURNING id, name, direction, source_dataset, target_host, target_port, target_user, target_dataset,
target_ssh_key_path, source_host, source_port, source_user, local_dataset,
schedule_type, schedule_config, compression, encryption, recursive, incremental,
auto_snapshot, enabled, status, last_run_at, last_run_status, last_run_error,
next_run_at, last_snapshot_sent, last_snapshot_received,
total_runs, successful_runs, failed_runs, bytes_sent, bytes_received,
created_by, created_at, updated_at
`
var scheduleTypeStr sql.NullString
if req.ScheduleType != nil {
scheduleTypeStr = sql.NullString{String: *req.ScheduleType, Valid: true}
}
row := s.db.QueryRowContext(ctx, query,
id, req.Name, req.Direction,
req.SourceDataset, req.TargetHost, req.TargetPort, req.TargetUser, req.TargetDataset,
req.TargetSSHKeyPath, req.SourceHost, req.SourcePort, req.SourceUser, req.LocalDataset,
scheduleTypeStr, scheduleConfigJSON, req.Compression, req.Encryption, req.Recursive, req.Incremental,
req.AutoSnapshot, req.Enabled, "idle", createdBy,
)
task, err := s.scanReplicationTaskRow(row)
if err != nil {
return nil, fmt.Errorf("failed to create replication task: %w", err)
}
s.logger.Info("Replication task created", "id", id, "name", req.Name, "direction", req.Direction)
return task, nil
}
// UpdateReplicationTask updates an existing replication task
func (s *ReplicationService) UpdateReplicationTask(ctx context.Context, id string, req *CreateReplicationRequest) (*ReplicationTask, error) {
// Validate direction-specific fields
if req.Direction == "outbound" {
if req.SourceDataset == nil || req.TargetHost == nil || req.TargetDataset == nil {
return nil, fmt.Errorf("outbound replication requires source_dataset, target_host, and target_dataset")
}
} else if req.Direction == "inbound" {
if req.SourceHost == nil || req.SourceDataset == nil || req.LocalDataset == nil {
return nil, fmt.Errorf("inbound replication requires source_host, source_dataset, and local_dataset")
}
} else {
return nil, fmt.Errorf("invalid direction: must be 'outbound' or 'inbound'")
}
// Set defaults
if req.Compression == "" {
req.Compression = "lz4"
}
if req.TargetPort == nil {
defaultPort := 22
req.TargetPort = &defaultPort
}
if req.SourcePort == nil {
defaultPort := 22
req.SourcePort = &defaultPort
}
if req.TargetUser == nil {
defaultUser := "root"
req.TargetUser = &defaultUser
}
if req.SourceUser == nil {
defaultUser := "root"
req.SourceUser = &defaultUser
}
// Marshal schedule config to JSON
var scheduleConfigJSON sql.NullString
if req.ScheduleConfig != nil {
configJSON, err := json.Marshal(req.ScheduleConfig)
if err != nil {
return nil, fmt.Errorf("failed to marshal schedule config: %w", err)
}
scheduleConfigJSON = sql.NullString{String: string(configJSON), Valid: true}
}
var scheduleTypeStr sql.NullString
if req.ScheduleType != nil {
scheduleTypeStr = sql.NullString{String: *req.ScheduleType, Valid: true}
}
query := `
UPDATE replication_tasks
SET name = $1, direction = $2, source_dataset = $3, target_host = $4, target_port = $5,
target_user = $6, target_dataset = $7, target_ssh_key_path = $8, source_host = $9,
source_port = $10, source_user = $11, local_dataset = $12, schedule_type = $13,
schedule_config = $14, compression = $15, encryption = $16, recursive = $17,
incremental = $18, auto_snapshot = $19, enabled = $20, updated_at = NOW()
WHERE id = $21
RETURNING id, name, direction, source_dataset, target_host, target_port, target_user, target_dataset,
target_ssh_key_path, source_host, source_port, source_user, local_dataset,
schedule_type, schedule_config, compression, encryption, recursive, incremental,
auto_snapshot, enabled, status, last_run_at, last_run_status, last_run_error,
next_run_at, last_snapshot_sent, last_snapshot_received,
total_runs, successful_runs, failed_runs, bytes_sent, bytes_received,
created_by, created_at, updated_at
`
row := s.db.QueryRowContext(ctx, query,
req.Name, req.Direction,
req.SourceDataset, req.TargetHost, req.TargetPort, req.TargetUser, req.TargetDataset,
req.TargetSSHKeyPath, req.SourceHost, req.SourcePort, req.SourceUser, req.LocalDataset,
scheduleTypeStr, scheduleConfigJSON, req.Compression, req.Encryption, req.Recursive, req.Incremental,
req.AutoSnapshot, req.Enabled, id,
)
task, err := s.scanReplicationTaskRow(row)
if err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("replication task not found")
}
return nil, fmt.Errorf("failed to update replication task: %w", err)
}
s.logger.Info("Replication task updated", "id", id)
return task, nil
}
// DeleteReplicationTask deletes a replication task
func (s *ReplicationService) DeleteReplicationTask(ctx context.Context, id string) error {
query := `DELETE FROM replication_tasks WHERE id = $1`
result, err := s.db.ExecContext(ctx, query, id)
if err != nil {
return fmt.Errorf("failed to delete replication task: %w", err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected: %w", err)
}
if rowsAffected == 0 {
return fmt.Errorf("replication task not found")
}
s.logger.Info("Replication task deleted", "id", id)
return nil
}
// scanReplicationTaskRow scans a single replication task row
func (s *ReplicationService) scanReplicationTaskRow(row *sql.Row) (*ReplicationTask, error) {
var task ReplicationTask
var sourceDataset, targetHost, targetUser, targetDataset, targetSSHKeyPath sql.NullString
var sourceHost, sourceUser, localDataset sql.NullString
var targetPort, sourcePort sql.NullInt64
var scheduleType, scheduleConfigJSON sql.NullString
var lastRunAt, nextRunAt sql.NullTime
var lastRunStatus, lastRunError, lastSnapshotSent, lastSnapshotReceived sql.NullString
var createdBy sql.NullString
err := row.Scan(
&task.ID, &task.Name, &task.Direction,
&sourceDataset, &targetHost, &targetPort, &targetUser, &targetDataset,
&targetSSHKeyPath, &sourceHost, &sourcePort, &sourceUser, &localDataset,
&scheduleType, &scheduleConfigJSON, &task.Compression, &task.Encryption,
&task.Recursive, &task.Incremental, &task.AutoSnapshot, &task.Enabled, &task.Status,
&lastRunAt, &lastRunStatus, &lastRunError, &nextRunAt,
&lastSnapshotSent, &lastSnapshotReceived,
&task.TotalRuns, &task.SuccessfulRuns, &task.FailedRuns,
&task.BytesSent, &task.BytesReceived,
&createdBy, &task.CreatedAt, &task.UpdatedAt,
)
if err != nil {
return nil, err
}
// Handle nullable fields
if sourceDataset.Valid {
task.SourceDataset = &sourceDataset.String
}
if targetHost.Valid {
task.TargetHost = &targetHost.String
}
if targetPort.Valid {
port := int(targetPort.Int64)
task.TargetPort = &port
}
if targetUser.Valid {
task.TargetUser = &targetUser.String
}
if targetDataset.Valid {
task.TargetDataset = &targetDataset.String
}
if targetSSHKeyPath.Valid {
task.TargetSSHKeyPath = &targetSSHKeyPath.String
}
if sourceHost.Valid {
task.SourceHost = &sourceHost.String
}
if sourcePort.Valid {
port := int(sourcePort.Int64)
task.SourcePort = &port
}
if sourceUser.Valid {
task.SourceUser = &sourceUser.String
}
if localDataset.Valid {
task.LocalDataset = &localDataset.String
}
if scheduleType.Valid {
task.ScheduleType = &scheduleType.String
}
if scheduleConfigJSON.Valid {
if err := json.Unmarshal([]byte(scheduleConfigJSON.String), &task.ScheduleConfig); err != nil {
return nil, fmt.Errorf("failed to unmarshal schedule config: %w", err)
}
}
if lastRunAt.Valid {
task.LastRunAt = &lastRunAt.Time
}
if lastRunStatus.Valid {
task.LastRunStatus = &lastRunStatus.String
}
if lastRunError.Valid {
task.LastRunError = &lastRunError.String
}
if nextRunAt.Valid {
task.NextRunAt = &nextRunAt.Time
}
if lastSnapshotSent.Valid {
task.LastSnapshotSent = &lastSnapshotSent.String
}
if lastSnapshotReceived.Valid {
task.LastSnapshotReceived = &lastSnapshotReceived.String
}
if createdBy.Valid {
task.CreatedBy = createdBy.String
}
return &task, nil
}
// scanReplicationTask scans a replication task from rows
func (s *ReplicationService) scanReplicationTask(rows *sql.Rows) (*ReplicationTask, error) {
var task ReplicationTask
var sourceDataset, targetHost, targetUser, targetDataset, targetSSHKeyPath sql.NullString
var sourceHost, sourceUser, localDataset sql.NullString
var targetPort, sourcePort sql.NullInt64
var scheduleType, scheduleConfigJSON sql.NullString
var lastRunAt, nextRunAt sql.NullTime
var lastRunStatus, lastRunError, lastSnapshotSent, lastSnapshotReceived sql.NullString
var createdBy sql.NullString
err := rows.Scan(
&task.ID, &task.Name, &task.Direction,
&sourceDataset, &targetHost, &targetPort, &targetUser, &targetDataset,
&targetSSHKeyPath, &sourceHost, &sourcePort, &sourceUser, &localDataset,
&scheduleType, &scheduleConfigJSON, &task.Compression, &task.Encryption,
&task.Recursive, &task.Incremental, &task.AutoSnapshot, &task.Enabled, &task.Status,
&lastRunAt, &lastRunStatus, &lastRunError, &nextRunAt,
&lastSnapshotSent, &lastSnapshotReceived,
&task.TotalRuns, &task.SuccessfulRuns, &task.FailedRuns,
&task.BytesSent, &task.BytesReceived,
&createdBy, &task.CreatedAt, &task.UpdatedAt,
)
if err != nil {
return nil, err
}
// Handle nullable fields (same as scanReplicationTaskRow)
if sourceDataset.Valid {
task.SourceDataset = &sourceDataset.String
}
if targetHost.Valid {
task.TargetHost = &targetHost.String
}
if targetPort.Valid {
port := int(targetPort.Int64)
task.TargetPort = &port
}
if targetUser.Valid {
task.TargetUser = &targetUser.String
}
if targetDataset.Valid {
task.TargetDataset = &targetDataset.String
}
if targetSSHKeyPath.Valid {
task.TargetSSHKeyPath = &targetSSHKeyPath.String
}
if sourceHost.Valid {
task.SourceHost = &sourceHost.String
}
if sourcePort.Valid {
port := int(sourcePort.Int64)
task.SourcePort = &port
}
if sourceUser.Valid {
task.SourceUser = &sourceUser.String
}
if localDataset.Valid {
task.LocalDataset = &localDataset.String
}
if scheduleType.Valid {
task.ScheduleType = &scheduleType.String
}
if scheduleConfigJSON.Valid {
if err := json.Unmarshal([]byte(scheduleConfigJSON.String), &task.ScheduleConfig); err != nil {
return nil, fmt.Errorf("failed to unmarshal schedule config: %w", err)
}
}
if lastRunAt.Valid {
task.LastRunAt = &lastRunAt.Time
}
if lastRunStatus.Valid {
task.LastRunStatus = &lastRunStatus.String
}
if lastRunError.Valid {
task.LastRunError = &lastRunError.String
}
if nextRunAt.Valid {
task.NextRunAt = &nextRunAt.Time
}
if lastSnapshotSent.Valid {
task.LastSnapshotSent = &lastSnapshotSent.String
}
if lastSnapshotReceived.Valid {
task.LastSnapshotReceived = &lastSnapshotReceived.String
}
if createdBy.Valid {
task.CreatedBy = createdBy.String
}
return &task, nil
}

View File

@@ -0,0 +1,259 @@
package storage
import (
"context"
"fmt"
"strings"
"time"
"github.com/atlasos/calypso/internal/common/database"
"github.com/atlasos/calypso/internal/common/logger"
)
// SnapshotService handles ZFS snapshot operations
type SnapshotService struct {
db *database.DB
logger *logger.Logger
}
// NewSnapshotService creates a new snapshot service
func NewSnapshotService(db *database.DB, log *logger.Logger) *SnapshotService {
return &SnapshotService{
db: db,
logger: log,
}
}
// Snapshot represents a ZFS snapshot
type Snapshot struct {
ID string `json:"id"`
Name string `json:"name"` // Full snapshot name (e.g., "pool/dataset@snapshot-name")
Dataset string `json:"dataset"` // Dataset name (e.g., "pool/dataset")
SnapshotName string `json:"snapshot_name"` // Snapshot name only (e.g., "snapshot-name")
Created time.Time `json:"created"`
Referenced int64 `json:"referenced"` // Size in bytes
Used int64 `json:"used"` // Used space in bytes
IsLatest bool `json:"is_latest"` // Whether this is the latest snapshot for the dataset
}
// ListSnapshots lists all snapshots, optionally filtered by dataset
func (s *SnapshotService) ListSnapshots(ctx context.Context, datasetFilter string) ([]*Snapshot, error) {
// Build zfs list command
args := []string{"list", "-t", "snapshot", "-H", "-o", "name,creation,referenced,used"}
if datasetFilter != "" {
// List snapshots for specific dataset
args = append(args, datasetFilter)
} else {
// List all snapshots
args = append(args, "-r")
}
cmd := zfsCommand(ctx, args...)
output, err := cmd.Output()
if err != nil {
return nil, fmt.Errorf("failed to list snapshots: %w", err)
}
lines := strings.Split(strings.TrimSpace(string(output)), "\n")
var snapshots []*Snapshot
// Track latest snapshot per dataset
latestSnapshots := make(map[string]*Snapshot)
for _, line := range lines {
if line == "" {
continue
}
fields := strings.Fields(line)
if len(fields) < 4 {
continue
}
fullName := fields[0]
creationStr := fields[1]
referencedStr := fields[2]
usedStr := fields[3]
// Parse snapshot name (format: pool/dataset@snapshot-name)
parts := strings.Split(fullName, "@")
if len(parts) != 2 {
continue
}
dataset := parts[0]
snapshotName := parts[1]
// Parse creation time (Unix timestamp)
var creationTime time.Time
if timestamp, err := parseZFSUnixTimestamp(creationStr); err == nil {
creationTime = timestamp
} else {
// Fallback to current time if parsing fails
creationTime = time.Now()
}
// Parse sizes
referenced := parseSnapshotSize(referencedStr)
used := parseSnapshotSize(usedStr)
snapshot := &Snapshot{
ID: fullName,
Name: fullName,
Dataset: dataset,
SnapshotName: snapshotName,
Created: creationTime,
Referenced: referenced,
Used: used,
IsLatest: false,
}
snapshots = append(snapshots, snapshot)
// Track latest snapshot per dataset
if latest, exists := latestSnapshots[dataset]; !exists || snapshot.Created.After(latest.Created) {
latestSnapshots[dataset] = snapshot
}
}
// Mark latest snapshots
for _, snapshot := range snapshots {
if latest, exists := latestSnapshots[snapshot.Dataset]; exists && latest.ID == snapshot.ID {
snapshot.IsLatest = true
}
}
return snapshots, nil
}
// CreateSnapshot creates a new snapshot
func (s *SnapshotService) CreateSnapshot(ctx context.Context, dataset, snapshotName string, recursive bool) error {
// Validate dataset exists
cmd := zfsCommand(ctx, "list", "-H", "-o", "name", dataset)
if err := cmd.Run(); err != nil {
return fmt.Errorf("dataset %s does not exist: %w", dataset, err)
}
// Build snapshot name
fullSnapshotName := fmt.Sprintf("%s@%s", dataset, snapshotName)
// Build command
args := []string{"snapshot"}
if recursive {
args = append(args, "-r")
}
args = append(args, fullSnapshotName)
cmd = zfsCommand(ctx, args...)
output, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("failed to create snapshot: %s: %w", string(output), err)
}
s.logger.Info("Snapshot created", "snapshot", fullSnapshotName, "dataset", dataset, "recursive", recursive)
return nil
}
// DeleteSnapshot deletes a snapshot
func (s *SnapshotService) DeleteSnapshot(ctx context.Context, snapshotName string, recursive bool) error {
// Build command
args := []string{"destroy"}
if recursive {
args = append(args, "-r")
}
args = append(args, snapshotName)
cmd := zfsCommand(ctx, args...)
output, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("failed to delete snapshot: %s: %w", string(output), err)
}
s.logger.Info("Snapshot deleted", "snapshot", snapshotName, "recursive", recursive)
return nil
}
// RollbackSnapshot rolls back a dataset to a snapshot
func (s *SnapshotService) RollbackSnapshot(ctx context.Context, snapshotName string, force bool) error {
// Build command
args := []string{"rollback"}
if force {
args = append(args, "-r", "-f")
} else {
args = append(args, "-r")
}
args = append(args, snapshotName)
cmd := zfsCommand(ctx, args...)
output, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("failed to rollback snapshot: %s: %w", string(output), err)
}
s.logger.Info("Snapshot rollback completed", "snapshot", snapshotName, "force", force)
return nil
}
// CloneSnapshot clones a snapshot to a new dataset
func (s *SnapshotService) CloneSnapshot(ctx context.Context, snapshotName, cloneName string) error {
// Build command
args := []string{"clone", snapshotName, cloneName}
cmd := zfsCommand(ctx, args...)
output, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("failed to clone snapshot: %s: %w", string(output), err)
}
s.logger.Info("Snapshot cloned", "snapshot", snapshotName, "clone", cloneName)
return nil
}
// parseZFSUnixTimestamp parses ZFS Unix timestamp string
func parseZFSUnixTimestamp(timestampStr string) (time.Time, error) {
// ZFS returns Unix timestamp as string
timestamp, err := time.Parse("20060102150405", timestampStr)
if err != nil {
// Try parsing as Unix timestamp integer
var unixTime int64
if _, err := fmt.Sscanf(timestampStr, "%d", &unixTime); err == nil {
return time.Unix(unixTime, 0), nil
}
return time.Time{}, err
}
return timestamp, nil
}
// parseZFSSize parses ZFS size string (e.g., "1.2M", "4.5G", "850K")
// Note: This function is also defined in zfs.go, but we need it here for snapshot parsing
func parseSnapshotSize(sizeStr string) int64 {
if sizeStr == "-" || sizeStr == "" {
return 0
}
// Remove any whitespace
sizeStr = strings.TrimSpace(sizeStr)
// Parse size with unit
var size float64
var unit string
if _, err := fmt.Sscanf(sizeStr, "%f%s", &size, &unit); err != nil {
return 0
}
// Convert to bytes
unit = strings.ToUpper(unit)
switch unit {
case "K", "KB":
return int64(size * 1024)
case "M", "MB":
return int64(size * 1024 * 1024)
case "G", "GB":
return int64(size * 1024 * 1024 * 1024)
case "T", "TB":
return int64(size * 1024 * 1024 * 1024 * 1024)
default:
// Assume bytes if no unit
return int64(size)
}
}

View File

@@ -0,0 +1,606 @@
package storage
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"sort"
"strings"
"time"
"github.com/atlasos/calypso/internal/common/database"
"github.com/atlasos/calypso/internal/common/logger"
"github.com/google/uuid"
)
// SnapshotScheduleService handles snapshot schedule management
type SnapshotScheduleService struct {
db *database.DB
logger *logger.Logger
snapshotService *SnapshotService
}
// NewSnapshotScheduleService creates a new snapshot schedule service
func NewSnapshotScheduleService(db *database.DB, log *logger.Logger, snapshotService *SnapshotService) *SnapshotScheduleService {
return &SnapshotScheduleService{
db: db,
logger: log,
snapshotService: snapshotService,
}
}
// SnapshotSchedule represents a scheduled snapshot task
type SnapshotSchedule struct {
ID string `json:"id"`
Name string `json:"name"`
Dataset string `json:"dataset"`
SnapshotNameTemplate string `json:"snapshot_name_template"`
ScheduleType string `json:"schedule_type"` // hourly, daily, weekly, monthly, cron
ScheduleConfig map[string]interface{} `json:"schedule_config"`
Recursive bool `json:"recursive"`
Enabled bool `json:"enabled"`
RetentionCount *int `json:"retention_count,omitempty"`
RetentionDays *int `json:"retention_days,omitempty"`
LastRunAt *time.Time `json:"last_run_at,omitempty"`
NextRunAt *time.Time `json:"next_run_at,omitempty"`
CreatedBy string `json:"created_by,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// CreateScheduleRequest represents a request to create a snapshot schedule
type CreateScheduleRequest struct {
Name string `json:"name" binding:"required"`
Dataset string `json:"dataset" binding:"required"`
SnapshotNameTemplate string `json:"snapshot_name_template" binding:"required"`
ScheduleType string `json:"schedule_type" binding:"required"`
ScheduleConfig map[string]interface{} `json:"schedule_config" binding:"required"`
Recursive bool `json:"recursive"`
RetentionCount *int `json:"retention_count"`
RetentionDays *int `json:"retention_days"`
}
// ListSchedules lists all snapshot schedules
func (s *SnapshotScheduleService) ListSchedules(ctx context.Context) ([]*SnapshotSchedule, error) {
query := `
SELECT id, name, dataset, snapshot_name_template, schedule_type, schedule_config,
recursive, enabled, retention_count, retention_days,
last_run_at, next_run_at, created_by, created_at, updated_at
FROM snapshot_schedules
ORDER BY created_at DESC
`
rows, err := s.db.QueryContext(ctx, query)
if err != nil {
return nil, fmt.Errorf("failed to query schedules: %w", err)
}
defer rows.Close()
var schedules []*SnapshotSchedule
for rows.Next() {
schedule, err := s.scanSchedule(rows)
if err != nil {
s.logger.Error("Failed to scan schedule", "error", err)
continue
}
schedules = append(schedules, schedule)
}
return schedules, rows.Err()
}
// GetSchedule retrieves a schedule by ID
func (s *SnapshotScheduleService) GetSchedule(ctx context.Context, id string) (*SnapshotSchedule, error) {
query := `
SELECT id, name, dataset, snapshot_name_template, schedule_type, schedule_config,
recursive, enabled, retention_count, retention_days,
last_run_at, next_run_at, created_by, created_at, updated_at
FROM snapshot_schedules
WHERE id = $1
`
row := s.db.QueryRowContext(ctx, query, id)
schedule, err := s.scanScheduleRow(row)
if err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("schedule not found")
}
return nil, fmt.Errorf("failed to get schedule: %w", err)
}
return schedule, nil
}
// CreateSchedule creates a new snapshot schedule
func (s *SnapshotScheduleService) CreateSchedule(ctx context.Context, req *CreateScheduleRequest, createdBy string) (*SnapshotSchedule, error) {
id := uuid.New().String()
// Calculate next run time
nextRunAt, err := s.calculateNextRun(req.ScheduleType, req.ScheduleConfig)
if err != nil {
return nil, fmt.Errorf("failed to calculate next run time: %w", err)
}
// Marshal schedule config to JSON
configJSON, err := json.Marshal(req.ScheduleConfig)
if err != nil {
return nil, fmt.Errorf("failed to marshal schedule config: %w", err)
}
query := `
INSERT INTO snapshot_schedules (
id, name, dataset, snapshot_name_template, schedule_type, schedule_config,
recursive, enabled, retention_count, retention_days,
next_run_at, created_by, created_at, updated_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, NOW(), NOW())
RETURNING id, name, dataset, snapshot_name_template, schedule_type, schedule_config,
recursive, enabled, retention_count, retention_days,
last_run_at, next_run_at, created_by, created_at, updated_at
`
row := s.db.QueryRowContext(ctx, query,
id, req.Name, req.Dataset, req.SnapshotNameTemplate, req.ScheduleType,
string(configJSON), req.Recursive, true, req.RetentionCount, req.RetentionDays,
nextRunAt, createdBy,
)
schedule, err := s.scanScheduleRow(row)
if err != nil {
return nil, fmt.Errorf("failed to create schedule: %w", err)
}
s.logger.Info("Snapshot schedule created", "id", id, "name", req.Name, "dataset", req.Dataset)
return schedule, nil
}
// UpdateSchedule updates an existing schedule
func (s *SnapshotScheduleService) UpdateSchedule(ctx context.Context, id string, req *CreateScheduleRequest) (*SnapshotSchedule, error) {
// Calculate next run time
nextRunAt, err := s.calculateNextRun(req.ScheduleType, req.ScheduleConfig)
if err != nil {
return nil, fmt.Errorf("failed to calculate next run time: %w", err)
}
// Marshal schedule config to JSON
configJSON, err := json.Marshal(req.ScheduleConfig)
if err != nil {
return nil, fmt.Errorf("failed to marshal schedule config: %w", err)
}
query := `
UPDATE snapshot_schedules
SET name = $1, dataset = $2, snapshot_name_template = $3, schedule_type = $4,
schedule_config = $5, recursive = $6, retention_count = $7, retention_days = $8,
next_run_at = $9, updated_at = NOW()
WHERE id = $10
RETURNING id, name, dataset, snapshot_name_template, schedule_type, schedule_config,
recursive, enabled, retention_count, retention_days,
last_run_at, next_run_at, created_by, created_at, updated_at
`
row := s.db.QueryRowContext(ctx, query,
req.Name, req.Dataset, req.SnapshotNameTemplate, req.ScheduleType,
string(configJSON), req.Recursive, req.RetentionCount, req.RetentionDays,
nextRunAt, id,
)
schedule, err := s.scanScheduleRow(row)
if err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("schedule not found")
}
return nil, fmt.Errorf("failed to update schedule: %w", err)
}
s.logger.Info("Snapshot schedule updated", "id", id)
return schedule, nil
}
// DeleteSchedule deletes a schedule
func (s *SnapshotScheduleService) DeleteSchedule(ctx context.Context, id string) error {
query := `DELETE FROM snapshot_schedules WHERE id = $1`
result, err := s.db.ExecContext(ctx, query, id)
if err != nil {
return fmt.Errorf("failed to delete schedule: %w", err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected: %w", err)
}
if rowsAffected == 0 {
return fmt.Errorf("schedule not found")
}
s.logger.Info("Snapshot schedule deleted", "id", id)
return nil
}
// ToggleSchedule enables or disables a schedule
func (s *SnapshotScheduleService) ToggleSchedule(ctx context.Context, id string, enabled bool) error {
query := `UPDATE snapshot_schedules SET enabled = $1, updated_at = NOW() WHERE id = $2`
result, err := s.db.ExecContext(ctx, query, enabled, id)
if err != nil {
return fmt.Errorf("failed to toggle schedule: %w", err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected: %w", err)
}
if rowsAffected == 0 {
return fmt.Errorf("schedule not found")
}
s.logger.Info("Snapshot schedule toggled", "id", id, "enabled", enabled)
return nil
}
// scanScheduleRow scans a single schedule row
func (s *SnapshotScheduleService) scanScheduleRow(row *sql.Row) (*SnapshotSchedule, error) {
var schedule SnapshotSchedule
var configJSON string
var lastRunAt, nextRunAt sql.NullTime
var retentionCount, retentionDays sql.NullInt64
var createdBy sql.NullString
err := row.Scan(
&schedule.ID, &schedule.Name, &schedule.Dataset, &schedule.SnapshotNameTemplate,
&schedule.ScheduleType, &configJSON, &schedule.Recursive, &schedule.Enabled,
&retentionCount, &retentionDays, &lastRunAt, &nextRunAt,
&createdBy, &schedule.CreatedAt, &schedule.UpdatedAt,
)
if err != nil {
return nil, err
}
// Parse schedule config JSON
if err := json.Unmarshal([]byte(configJSON), &schedule.ScheduleConfig); err != nil {
return nil, fmt.Errorf("failed to unmarshal schedule config: %w", err)
}
// Handle nullable fields
if lastRunAt.Valid {
schedule.LastRunAt = &lastRunAt.Time
}
if nextRunAt.Valid {
schedule.NextRunAt = &nextRunAt.Time
}
if retentionCount.Valid {
count := int(retentionCount.Int64)
schedule.RetentionCount = &count
}
if retentionDays.Valid {
days := int(retentionDays.Int64)
schedule.RetentionDays = &days
}
if createdBy.Valid {
schedule.CreatedBy = createdBy.String
}
return &schedule, nil
}
// scanSchedule scans a schedule from rows
func (s *SnapshotScheduleService) scanSchedule(rows *sql.Rows) (*SnapshotSchedule, error) {
var schedule SnapshotSchedule
var configJSON string
var lastRunAt, nextRunAt sql.NullTime
var retentionCount, retentionDays sql.NullInt64
var createdBy sql.NullString
err := rows.Scan(
&schedule.ID, &schedule.Name, &schedule.Dataset, &schedule.SnapshotNameTemplate,
&schedule.ScheduleType, &configJSON, &schedule.Recursive, &schedule.Enabled,
&retentionCount, &retentionDays, &lastRunAt, &nextRunAt,
&createdBy, &schedule.CreatedAt, &schedule.UpdatedAt,
)
if err != nil {
return nil, err
}
// Parse schedule config JSON
if err := json.Unmarshal([]byte(configJSON), &schedule.ScheduleConfig); err != nil {
return nil, fmt.Errorf("failed to unmarshal schedule config: %w", err)
}
// Handle nullable fields
if lastRunAt.Valid {
schedule.LastRunAt = &lastRunAt.Time
}
if nextRunAt.Valid {
schedule.NextRunAt = &nextRunAt.Time
}
if retentionCount.Valid {
count := int(retentionCount.Int64)
schedule.RetentionCount = &count
}
if retentionDays.Valid {
days := int(retentionDays.Int64)
schedule.RetentionDays = &days
}
if createdBy.Valid {
schedule.CreatedBy = createdBy.String
}
return &schedule, nil
}
// calculateNextRun calculates the next run time based on schedule type and config
func (s *SnapshotScheduleService) calculateNextRun(scheduleType string, config map[string]interface{}) (*time.Time, error) {
now := time.Now()
switch scheduleType {
case "hourly":
nextRun := now.Add(1 * time.Hour)
// Round to next hour
nextRun = nextRun.Truncate(time.Hour).Add(1 * time.Hour)
return &nextRun, nil
case "daily":
timeStr, ok := config["time"].(string)
if !ok {
timeStr = "00:00"
}
// Parse time (HH:MM format)
hour, min := 0, 0
fmt.Sscanf(timeStr, "%d:%d", &hour, &min)
nextRun := time.Date(now.Year(), now.Month(), now.Day(), hour, min, 0, 0, now.Location())
if nextRun.Before(now) {
nextRun = nextRun.Add(24 * time.Hour)
}
return &nextRun, nil
case "weekly":
dayOfWeek, ok := config["day"].(float64) // JSON numbers are float64
if !ok {
dayOfWeek = 0 // Sunday
}
timeStr, ok := config["time"].(string)
if !ok {
timeStr = "00:00"
}
hour, min := 0, 0
fmt.Sscanf(timeStr, "%d:%d", &hour, &min)
// Calculate next occurrence
currentDay := int(now.Weekday())
targetDay := int(dayOfWeek)
daysUntil := (targetDay - currentDay + 7) % 7
if daysUntil == 0 {
// If same day, check if time has passed
today := time.Date(now.Year(), now.Month(), now.Day(), hour, min, 0, 0, now.Location())
if today.Before(now) {
daysUntil = 7
}
}
nextRun := time.Date(now.Year(), now.Month(), now.Day(), hour, min, 0, 0, now.Location()).AddDate(0, 0, daysUntil)
return &nextRun, nil
case "monthly":
day, ok := config["day"].(float64)
if !ok {
day = 1
}
timeStr, ok := config["time"].(string)
if !ok {
timeStr = "00:00"
}
hour, min := 0, 0
fmt.Sscanf(timeStr, "%d:%d", &hour, &min)
// Calculate next occurrence
nextRun := time.Date(now.Year(), now.Month(), int(day), hour, min, 0, 0, now.Location())
if nextRun.Before(now) {
nextRun = nextRun.AddDate(0, 1, 0)
}
return &nextRun, nil
case "cron":
// For cron, we'll use a simple implementation
// In production, you'd want to use a proper cron parser library
_, ok := config["cron"].(string)
if !ok {
return nil, fmt.Errorf("cron expression not found in config")
}
// Simple implementation: assume next run is in 1 hour
// TODO: Implement proper cron parsing
nextRun := now.Add(1 * time.Hour)
return &nextRun, nil
default:
return nil, fmt.Errorf("unknown schedule type: %s", scheduleType)
}
}
// GetDueSchedules retrieves all enabled schedules that are due to run
func (s *SnapshotScheduleService) GetDueSchedules(ctx context.Context) ([]*SnapshotSchedule, error) {
now := time.Now()
query := `
SELECT id, name, dataset, snapshot_name_template, schedule_type, schedule_config,
recursive, enabled, retention_count, retention_days,
last_run_at, next_run_at, created_by, created_at, updated_at
FROM snapshot_schedules
WHERE enabled = true AND next_run_at <= $1
ORDER BY next_run_at ASC
`
rows, err := s.db.QueryContext(ctx, query, now)
if err != nil {
return nil, fmt.Errorf("failed to query due schedules: %w", err)
}
defer rows.Close()
var schedules []*SnapshotSchedule
for rows.Next() {
schedule, err := s.scanSchedule(rows)
if err != nil {
s.logger.Error("Failed to scan schedule", "error", err)
continue
}
schedules = append(schedules, schedule)
}
return schedules, rows.Err()
}
// ExecuteSchedule executes a snapshot schedule
func (s *SnapshotScheduleService) ExecuteSchedule(ctx context.Context, schedule *SnapshotSchedule) error {
now := time.Now()
// Generate snapshot name from template
snapshotName := s.generateSnapshotName(schedule.SnapshotNameTemplate, now)
// Create snapshot
if err := s.snapshotService.CreateSnapshot(ctx, schedule.Dataset, snapshotName, schedule.Recursive); err != nil {
return fmt.Errorf("failed to create snapshot: %w", err)
}
s.logger.Info("Snapshot created from schedule", "schedule", schedule.Name, "snapshot", snapshotName, "dataset", schedule.Dataset)
// Calculate next run time
nextRunAt, err := s.calculateNextRun(schedule.ScheduleType, schedule.ScheduleConfig)
if err != nil {
s.logger.Error("Failed to calculate next run time", "error", err, "schedule", schedule.Name)
// Set a default next run time (1 hour from now) if calculation fails
defaultNextRun := now.Add(1 * time.Hour)
nextRunAt = &defaultNextRun
}
// Update schedule with last run time and next run time
query := `
UPDATE snapshot_schedules
SET last_run_at = $1, next_run_at = $2, updated_at = NOW()
WHERE id = $3
`
if _, err := s.db.ExecContext(ctx, query, now, nextRunAt, schedule.ID); err != nil {
s.logger.Error("Failed to update schedule after execution", "error", err, "schedule", schedule.Name)
// Don't return error here - snapshot was created successfully
}
// Handle retention
if err := s.handleRetention(ctx, schedule); err != nil {
s.logger.Error("Failed to handle retention", "error", err, "schedule", schedule.Name)
// Don't return error - retention is best effort
}
return nil
}
// generateSnapshotName generates a snapshot name from a template
func (s *SnapshotScheduleService) generateSnapshotName(template string, t time.Time) string {
// Replace common time format placeholders
name := template
name = strings.ReplaceAll(name, "%Y", t.Format("2006"))
name = strings.ReplaceAll(name, "%m", t.Format("01"))
name = strings.ReplaceAll(name, "%d", t.Format("02"))
name = strings.ReplaceAll(name, "%H", t.Format("15"))
name = strings.ReplaceAll(name, "%M", t.Format("04"))
name = strings.ReplaceAll(name, "%S", t.Format("05"))
name = strings.ReplaceAll(name, "%Y-%m-%d", t.Format("2006-01-02"))
name = strings.ReplaceAll(name, "%Y-%m-%d-%H%M", t.Format("2006-01-02-1504"))
name = strings.ReplaceAll(name, "%Y-%m-%d-%H%M%S", t.Format("2006-01-02-150405"))
// If no placeholders were replaced, append timestamp
if name == template {
name = fmt.Sprintf("%s-%d", template, t.Unix())
}
return name
}
// handleRetention handles snapshot retention based on schedule settings
func (s *SnapshotScheduleService) handleRetention(ctx context.Context, schedule *SnapshotSchedule) error {
// Get all snapshots for this dataset
snapshots, err := s.snapshotService.ListSnapshots(ctx, schedule.Dataset)
if err != nil {
return fmt.Errorf("failed to list snapshots: %w", err)
}
// Filter snapshots that match this schedule's naming pattern
matchingSnapshots := make([]*Snapshot, 0)
for _, snapshot := range snapshots {
// Check if snapshot name matches the template pattern
if s.snapshotMatchesTemplate(snapshot.SnapshotName, schedule.SnapshotNameTemplate) {
matchingSnapshots = append(matchingSnapshots, snapshot)
}
}
now := time.Now()
snapshotsToDelete := make([]*Snapshot, 0)
// Apply retention_count if set
if schedule.RetentionCount != nil && *schedule.RetentionCount > 0 {
if len(matchingSnapshots) > *schedule.RetentionCount {
// Sort by creation time (oldest first)
sort.Slice(matchingSnapshots, func(i, j int) bool {
return matchingSnapshots[i].Created.Before(matchingSnapshots[j].Created)
})
// Mark excess snapshots for deletion
excessCount := len(matchingSnapshots) - *schedule.RetentionCount
for i := 0; i < excessCount; i++ {
snapshotsToDelete = append(snapshotsToDelete, matchingSnapshots[i])
}
}
}
// Apply retention_days if set
if schedule.RetentionDays != nil && *schedule.RetentionDays > 0 {
cutoffTime := now.AddDate(0, 0, -*schedule.RetentionDays)
for _, snapshot := range matchingSnapshots {
if snapshot.Created.Before(cutoffTime) {
// Check if not already marked for deletion
alreadyMarked := false
for _, marked := range snapshotsToDelete {
if marked.ID == snapshot.ID {
alreadyMarked = true
break
}
}
if !alreadyMarked {
snapshotsToDelete = append(snapshotsToDelete, snapshot)
}
}
}
}
// Delete snapshots
for _, snapshot := range snapshotsToDelete {
if err := s.snapshotService.DeleteSnapshot(ctx, snapshot.Name, schedule.Recursive); err != nil {
s.logger.Error("Failed to delete snapshot for retention", "error", err, "snapshot", snapshot.Name)
continue
}
s.logger.Info("Deleted snapshot for retention", "snapshot", snapshot.Name, "schedule", schedule.Name)
}
return nil
}
// snapshotMatchesTemplate checks if a snapshot name matches a template pattern
func (s *SnapshotScheduleService) snapshotMatchesTemplate(snapshotName, template string) bool {
if template == "" {
return false
}
// Extract the base name from template (everything before the first %)
// This handles templates like "auto-%Y-%m-%d" or "daily-backup-%Y%m%d"
baseName := template
if idx := strings.Index(template, "%"); idx != -1 {
baseName = template[:idx]
}
// If template starts with a placeholder, check if snapshot name matches the pattern
// by checking if it starts with any reasonable prefix
if baseName == "" {
// Template starts with placeholder - use a more lenient check
// Check if snapshot name matches common patterns
return true // Match all if template is just placeholders
}
// Check if snapshot name starts with the base name
// This handles cases like template "auto-%Y-%m-%d" matching snapshot "auto-2024-01-15"
return strings.HasPrefix(snapshotName, baseName)
}

View File

@@ -0,0 +1,87 @@
package storage
import (
"context"
"time"
"github.com/atlasos/calypso/internal/common/database"
"github.com/atlasos/calypso/internal/common/logger"
)
// SnapshotScheduleWorker handles periodic execution of snapshot schedules
type SnapshotScheduleWorker struct {
scheduleService *SnapshotScheduleService
logger *logger.Logger
interval time.Duration
stopCh chan struct{}
}
// NewSnapshotScheduleWorker creates a new snapshot schedule worker
func NewSnapshotScheduleWorker(db *database.DB, log *logger.Logger, snapshotService *SnapshotService, interval time.Duration) *SnapshotScheduleWorker {
scheduleService := NewSnapshotScheduleService(db, log, snapshotService)
return &SnapshotScheduleWorker{
scheduleService: scheduleService,
logger: log,
interval: interval,
stopCh: make(chan struct{}),
}
}
// Start starts the snapshot schedule worker background service
func (w *SnapshotScheduleWorker) Start(ctx context.Context) {
w.logger.Info("Starting snapshot schedule worker", "interval", w.interval)
ticker := time.NewTicker(w.interval)
defer ticker.Stop()
// Run initial check immediately
w.processSchedules(ctx)
for {
select {
case <-ctx.Done():
w.logger.Info("Snapshot schedule worker stopped")
return
case <-w.stopCh:
w.logger.Info("Snapshot schedule worker stopped")
return
case <-ticker.C:
w.processSchedules(ctx)
}
}
}
// Stop stops the snapshot schedule worker service
func (w *SnapshotScheduleWorker) Stop() {
close(w.stopCh)
}
// processSchedules processes all due snapshot schedules
func (w *SnapshotScheduleWorker) processSchedules(ctx context.Context) {
w.logger.Debug("Checking for due snapshot schedules")
// Get all schedules that are due to run
schedules, err := w.scheduleService.GetDueSchedules(ctx)
if err != nil {
w.logger.Error("Failed to get due schedules", "error", err)
return
}
if len(schedules) == 0 {
w.logger.Debug("No snapshot schedules due to run")
return
}
w.logger.Info("Found due snapshot schedules", "count", len(schedules))
// Execute each schedule
for _, schedule := range schedules {
w.logger.Info("Executing snapshot schedule", "schedule", schedule.Name, "dataset", schedule.Dataset)
if err := w.scheduleService.ExecuteSchedule(ctx, schedule); err != nil {
w.logger.Error("Failed to execute snapshot schedule", "error", err, "schedule", schedule.Name)
continue
}
w.logger.Info("Successfully executed snapshot schedule", "schedule", schedule.Name)
}
}