package storage import ( "context" "database/sql" "encoding/json" "fmt" "sort" "strings" "time" "github.com/atlasos/calypso/internal/common/database" "github.com/atlasos/calypso/internal/common/logger" "github.com/google/uuid" ) // SnapshotScheduleService handles snapshot schedule management type SnapshotScheduleService struct { db *database.DB logger *logger.Logger snapshotService *SnapshotService } // NewSnapshotScheduleService creates a new snapshot schedule service func NewSnapshotScheduleService(db *database.DB, log *logger.Logger, snapshotService *SnapshotService) *SnapshotScheduleService { return &SnapshotScheduleService{ db: db, logger: log, snapshotService: snapshotService, } } // SnapshotSchedule represents a scheduled snapshot task type SnapshotSchedule struct { ID string `json:"id"` Name string `json:"name"` Dataset string `json:"dataset"` SnapshotNameTemplate string `json:"snapshot_name_template"` ScheduleType string `json:"schedule_type"` // hourly, daily, weekly, monthly, cron ScheduleConfig map[string]interface{} `json:"schedule_config"` Recursive bool `json:"recursive"` Enabled bool `json:"enabled"` RetentionCount *int `json:"retention_count,omitempty"` RetentionDays *int `json:"retention_days,omitempty"` LastRunAt *time.Time `json:"last_run_at,omitempty"` NextRunAt *time.Time `json:"next_run_at,omitempty"` CreatedBy string `json:"created_by,omitempty"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` } // CreateScheduleRequest represents a request to create a snapshot schedule type CreateScheduleRequest struct { Name string `json:"name" binding:"required"` Dataset string `json:"dataset" binding:"required"` SnapshotNameTemplate string `json:"snapshot_name_template" binding:"required"` ScheduleType string `json:"schedule_type" binding:"required"` ScheduleConfig map[string]interface{} `json:"schedule_config" binding:"required"` Recursive bool `json:"recursive"` RetentionCount *int `json:"retention_count"` RetentionDays *int `json:"retention_days"` } // ListSchedules lists all snapshot schedules func (s *SnapshotScheduleService) ListSchedules(ctx context.Context) ([]*SnapshotSchedule, error) { query := ` SELECT id, name, dataset, snapshot_name_template, schedule_type, schedule_config, recursive, enabled, retention_count, retention_days, last_run_at, next_run_at, created_by, created_at, updated_at FROM snapshot_schedules ORDER BY created_at DESC ` rows, err := s.db.QueryContext(ctx, query) if err != nil { return nil, fmt.Errorf("failed to query schedules: %w", err) } defer rows.Close() var schedules []*SnapshotSchedule for rows.Next() { schedule, err := s.scanSchedule(rows) if err != nil { s.logger.Error("Failed to scan schedule", "error", err) continue } schedules = append(schedules, schedule) } return schedules, rows.Err() } // GetSchedule retrieves a schedule by ID func (s *SnapshotScheduleService) GetSchedule(ctx context.Context, id string) (*SnapshotSchedule, error) { query := ` SELECT id, name, dataset, snapshot_name_template, schedule_type, schedule_config, recursive, enabled, retention_count, retention_days, last_run_at, next_run_at, created_by, created_at, updated_at FROM snapshot_schedules WHERE id = $1 ` row := s.db.QueryRowContext(ctx, query, id) schedule, err := s.scanScheduleRow(row) if err != nil { if err == sql.ErrNoRows { return nil, fmt.Errorf("schedule not found") } return nil, fmt.Errorf("failed to get schedule: %w", err) } return schedule, nil } // CreateSchedule creates a new snapshot schedule func (s *SnapshotScheduleService) CreateSchedule(ctx context.Context, req *CreateScheduleRequest, createdBy string) (*SnapshotSchedule, error) { id := uuid.New().String() // Calculate next run time nextRunAt, err := s.calculateNextRun(req.ScheduleType, req.ScheduleConfig) if err != nil { return nil, fmt.Errorf("failed to calculate next run time: %w", err) } // Marshal schedule config to JSON configJSON, err := json.Marshal(req.ScheduleConfig) if err != nil { return nil, fmt.Errorf("failed to marshal schedule config: %w", err) } query := ` INSERT INTO snapshot_schedules ( id, name, dataset, snapshot_name_template, schedule_type, schedule_config, recursive, enabled, retention_count, retention_days, next_run_at, created_by, created_at, updated_at ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, NOW(), NOW()) RETURNING id, name, dataset, snapshot_name_template, schedule_type, schedule_config, recursive, enabled, retention_count, retention_days, last_run_at, next_run_at, created_by, created_at, updated_at ` row := s.db.QueryRowContext(ctx, query, id, req.Name, req.Dataset, req.SnapshotNameTemplate, req.ScheduleType, string(configJSON), req.Recursive, true, req.RetentionCount, req.RetentionDays, nextRunAt, createdBy, ) schedule, err := s.scanScheduleRow(row) if err != nil { return nil, fmt.Errorf("failed to create schedule: %w", err) } s.logger.Info("Snapshot schedule created", "id", id, "name", req.Name, "dataset", req.Dataset) return schedule, nil } // UpdateSchedule updates an existing schedule func (s *SnapshotScheduleService) UpdateSchedule(ctx context.Context, id string, req *CreateScheduleRequest) (*SnapshotSchedule, error) { // Calculate next run time nextRunAt, err := s.calculateNextRun(req.ScheduleType, req.ScheduleConfig) if err != nil { return nil, fmt.Errorf("failed to calculate next run time: %w", err) } // Marshal schedule config to JSON configJSON, err := json.Marshal(req.ScheduleConfig) if err != nil { return nil, fmt.Errorf("failed to marshal schedule config: %w", err) } query := ` UPDATE snapshot_schedules SET name = $1, dataset = $2, snapshot_name_template = $3, schedule_type = $4, schedule_config = $5, recursive = $6, retention_count = $7, retention_days = $8, next_run_at = $9, updated_at = NOW() WHERE id = $10 RETURNING id, name, dataset, snapshot_name_template, schedule_type, schedule_config, recursive, enabled, retention_count, retention_days, last_run_at, next_run_at, created_by, created_at, updated_at ` row := s.db.QueryRowContext(ctx, query, req.Name, req.Dataset, req.SnapshotNameTemplate, req.ScheduleType, string(configJSON), req.Recursive, req.RetentionCount, req.RetentionDays, nextRunAt, id, ) schedule, err := s.scanScheduleRow(row) if err != nil { if err == sql.ErrNoRows { return nil, fmt.Errorf("schedule not found") } return nil, fmt.Errorf("failed to update schedule: %w", err) } s.logger.Info("Snapshot schedule updated", "id", id) return schedule, nil } // DeleteSchedule deletes a schedule func (s *SnapshotScheduleService) DeleteSchedule(ctx context.Context, id string) error { query := `DELETE FROM snapshot_schedules WHERE id = $1` result, err := s.db.ExecContext(ctx, query, id) if err != nil { return fmt.Errorf("failed to delete schedule: %w", err) } rowsAffected, err := result.RowsAffected() if err != nil { return fmt.Errorf("failed to get rows affected: %w", err) } if rowsAffected == 0 { return fmt.Errorf("schedule not found") } s.logger.Info("Snapshot schedule deleted", "id", id) return nil } // ToggleSchedule enables or disables a schedule func (s *SnapshotScheduleService) ToggleSchedule(ctx context.Context, id string, enabled bool) error { query := `UPDATE snapshot_schedules SET enabled = $1, updated_at = NOW() WHERE id = $2` result, err := s.db.ExecContext(ctx, query, enabled, id) if err != nil { return fmt.Errorf("failed to toggle schedule: %w", err) } rowsAffected, err := result.RowsAffected() if err != nil { return fmt.Errorf("failed to get rows affected: %w", err) } if rowsAffected == 0 { return fmt.Errorf("schedule not found") } s.logger.Info("Snapshot schedule toggled", "id", id, "enabled", enabled) return nil } // scanScheduleRow scans a single schedule row func (s *SnapshotScheduleService) scanScheduleRow(row *sql.Row) (*SnapshotSchedule, error) { var schedule SnapshotSchedule var configJSON string var lastRunAt, nextRunAt sql.NullTime var retentionCount, retentionDays sql.NullInt64 var createdBy sql.NullString err := row.Scan( &schedule.ID, &schedule.Name, &schedule.Dataset, &schedule.SnapshotNameTemplate, &schedule.ScheduleType, &configJSON, &schedule.Recursive, &schedule.Enabled, &retentionCount, &retentionDays, &lastRunAt, &nextRunAt, &createdBy, &schedule.CreatedAt, &schedule.UpdatedAt, ) if err != nil { return nil, err } // Parse schedule config JSON if err := json.Unmarshal([]byte(configJSON), &schedule.ScheduleConfig); err != nil { return nil, fmt.Errorf("failed to unmarshal schedule config: %w", err) } // Handle nullable fields if lastRunAt.Valid { schedule.LastRunAt = &lastRunAt.Time } if nextRunAt.Valid { schedule.NextRunAt = &nextRunAt.Time } if retentionCount.Valid { count := int(retentionCount.Int64) schedule.RetentionCount = &count } if retentionDays.Valid { days := int(retentionDays.Int64) schedule.RetentionDays = &days } if createdBy.Valid { schedule.CreatedBy = createdBy.String } return &schedule, nil } // scanSchedule scans a schedule from rows func (s *SnapshotScheduleService) scanSchedule(rows *sql.Rows) (*SnapshotSchedule, error) { var schedule SnapshotSchedule var configJSON string var lastRunAt, nextRunAt sql.NullTime var retentionCount, retentionDays sql.NullInt64 var createdBy sql.NullString err := rows.Scan( &schedule.ID, &schedule.Name, &schedule.Dataset, &schedule.SnapshotNameTemplate, &schedule.ScheduleType, &configJSON, &schedule.Recursive, &schedule.Enabled, &retentionCount, &retentionDays, &lastRunAt, &nextRunAt, &createdBy, &schedule.CreatedAt, &schedule.UpdatedAt, ) if err != nil { return nil, err } // Parse schedule config JSON if err := json.Unmarshal([]byte(configJSON), &schedule.ScheduleConfig); err != nil { return nil, fmt.Errorf("failed to unmarshal schedule config: %w", err) } // Handle nullable fields if lastRunAt.Valid { schedule.LastRunAt = &lastRunAt.Time } if nextRunAt.Valid { schedule.NextRunAt = &nextRunAt.Time } if retentionCount.Valid { count := int(retentionCount.Int64) schedule.RetentionCount = &count } if retentionDays.Valid { days := int(retentionDays.Int64) schedule.RetentionDays = &days } if createdBy.Valid { schedule.CreatedBy = createdBy.String } return &schedule, nil } // calculateNextRun calculates the next run time based on schedule type and config func (s *SnapshotScheduleService) calculateNextRun(scheduleType string, config map[string]interface{}) (*time.Time, error) { now := time.Now() switch scheduleType { case "hourly": nextRun := now.Add(1 * time.Hour) // Round to next hour nextRun = nextRun.Truncate(time.Hour).Add(1 * time.Hour) return &nextRun, nil case "daily": timeStr, ok := config["time"].(string) if !ok { timeStr = "00:00" } // Parse time (HH:MM format) hour, min := 0, 0 fmt.Sscanf(timeStr, "%d:%d", &hour, &min) nextRun := time.Date(now.Year(), now.Month(), now.Day(), hour, min, 0, 0, now.Location()) if nextRun.Before(now) { nextRun = nextRun.Add(24 * time.Hour) } return &nextRun, nil case "weekly": dayOfWeek, ok := config["day"].(float64) // JSON numbers are float64 if !ok { dayOfWeek = 0 // Sunday } timeStr, ok := config["time"].(string) if !ok { timeStr = "00:00" } hour, min := 0, 0 fmt.Sscanf(timeStr, "%d:%d", &hour, &min) // Calculate next occurrence currentDay := int(now.Weekday()) targetDay := int(dayOfWeek) daysUntil := (targetDay - currentDay + 7) % 7 if daysUntil == 0 { // If same day, check if time has passed today := time.Date(now.Year(), now.Month(), now.Day(), hour, min, 0, 0, now.Location()) if today.Before(now) { daysUntil = 7 } } nextRun := time.Date(now.Year(), now.Month(), now.Day(), hour, min, 0, 0, now.Location()).AddDate(0, 0, daysUntil) return &nextRun, nil case "monthly": day, ok := config["day"].(float64) if !ok { day = 1 } timeStr, ok := config["time"].(string) if !ok { timeStr = "00:00" } hour, min := 0, 0 fmt.Sscanf(timeStr, "%d:%d", &hour, &min) // Calculate next occurrence nextRun := time.Date(now.Year(), now.Month(), int(day), hour, min, 0, 0, now.Location()) if nextRun.Before(now) { nextRun = nextRun.AddDate(0, 1, 0) } return &nextRun, nil case "cron": // For cron, we'll use a simple implementation // In production, you'd want to use a proper cron parser library _, ok := config["cron"].(string) if !ok { return nil, fmt.Errorf("cron expression not found in config") } // Simple implementation: assume next run is in 1 hour // TODO: Implement proper cron parsing nextRun := now.Add(1 * time.Hour) return &nextRun, nil default: return nil, fmt.Errorf("unknown schedule type: %s", scheduleType) } } // GetDueSchedules retrieves all enabled schedules that are due to run func (s *SnapshotScheduleService) GetDueSchedules(ctx context.Context) ([]*SnapshotSchedule, error) { now := time.Now() query := ` SELECT id, name, dataset, snapshot_name_template, schedule_type, schedule_config, recursive, enabled, retention_count, retention_days, last_run_at, next_run_at, created_by, created_at, updated_at FROM snapshot_schedules WHERE enabled = true AND next_run_at <= $1 ORDER BY next_run_at ASC ` rows, err := s.db.QueryContext(ctx, query, now) if err != nil { return nil, fmt.Errorf("failed to query due schedules: %w", err) } defer rows.Close() var schedules []*SnapshotSchedule for rows.Next() { schedule, err := s.scanSchedule(rows) if err != nil { s.logger.Error("Failed to scan schedule", "error", err) continue } schedules = append(schedules, schedule) } return schedules, rows.Err() } // ExecuteSchedule executes a snapshot schedule func (s *SnapshotScheduleService) ExecuteSchedule(ctx context.Context, schedule *SnapshotSchedule) error { now := time.Now() // Generate snapshot name from template snapshotName := s.generateSnapshotName(schedule.SnapshotNameTemplate, now) // Create snapshot if err := s.snapshotService.CreateSnapshot(ctx, schedule.Dataset, snapshotName, schedule.Recursive); err != nil { return fmt.Errorf("failed to create snapshot: %w", err) } s.logger.Info("Snapshot created from schedule", "schedule", schedule.Name, "snapshot", snapshotName, "dataset", schedule.Dataset) // Calculate next run time nextRunAt, err := s.calculateNextRun(schedule.ScheduleType, schedule.ScheduleConfig) if err != nil { s.logger.Error("Failed to calculate next run time", "error", err, "schedule", schedule.Name) // Set a default next run time (1 hour from now) if calculation fails defaultNextRun := now.Add(1 * time.Hour) nextRunAt = &defaultNextRun } // Update schedule with last run time and next run time query := ` UPDATE snapshot_schedules SET last_run_at = $1, next_run_at = $2, updated_at = NOW() WHERE id = $3 ` if _, err := s.db.ExecContext(ctx, query, now, nextRunAt, schedule.ID); err != nil { s.logger.Error("Failed to update schedule after execution", "error", err, "schedule", schedule.Name) // Don't return error here - snapshot was created successfully } // Handle retention if err := s.handleRetention(ctx, schedule); err != nil { s.logger.Error("Failed to handle retention", "error", err, "schedule", schedule.Name) // Don't return error - retention is best effort } return nil } // generateSnapshotName generates a snapshot name from a template func (s *SnapshotScheduleService) generateSnapshotName(template string, t time.Time) string { // Replace common time format placeholders name := template name = strings.ReplaceAll(name, "%Y", t.Format("2006")) name = strings.ReplaceAll(name, "%m", t.Format("01")) name = strings.ReplaceAll(name, "%d", t.Format("02")) name = strings.ReplaceAll(name, "%H", t.Format("15")) name = strings.ReplaceAll(name, "%M", t.Format("04")) name = strings.ReplaceAll(name, "%S", t.Format("05")) name = strings.ReplaceAll(name, "%Y-%m-%d", t.Format("2006-01-02")) name = strings.ReplaceAll(name, "%Y-%m-%d-%H%M", t.Format("2006-01-02-1504")) name = strings.ReplaceAll(name, "%Y-%m-%d-%H%M%S", t.Format("2006-01-02-150405")) // If no placeholders were replaced, append timestamp if name == template { name = fmt.Sprintf("%s-%d", template, t.Unix()) } return name } // handleRetention handles snapshot retention based on schedule settings func (s *SnapshotScheduleService) handleRetention(ctx context.Context, schedule *SnapshotSchedule) error { // Get all snapshots for this dataset snapshots, err := s.snapshotService.ListSnapshots(ctx, schedule.Dataset) if err != nil { return fmt.Errorf("failed to list snapshots: %w", err) } // Filter snapshots that match this schedule's naming pattern matchingSnapshots := make([]*Snapshot, 0) for _, snapshot := range snapshots { // Check if snapshot name matches the template pattern if s.snapshotMatchesTemplate(snapshot.SnapshotName, schedule.SnapshotNameTemplate) { matchingSnapshots = append(matchingSnapshots, snapshot) } } now := time.Now() snapshotsToDelete := make([]*Snapshot, 0) // Apply retention_count if set if schedule.RetentionCount != nil && *schedule.RetentionCount > 0 { if len(matchingSnapshots) > *schedule.RetentionCount { // Sort by creation time (oldest first) sort.Slice(matchingSnapshots, func(i, j int) bool { return matchingSnapshots[i].Created.Before(matchingSnapshots[j].Created) }) // Mark excess snapshots for deletion excessCount := len(matchingSnapshots) - *schedule.RetentionCount for i := 0; i < excessCount; i++ { snapshotsToDelete = append(snapshotsToDelete, matchingSnapshots[i]) } } } // Apply retention_days if set if schedule.RetentionDays != nil && *schedule.RetentionDays > 0 { cutoffTime := now.AddDate(0, 0, -*schedule.RetentionDays) for _, snapshot := range matchingSnapshots { if snapshot.Created.Before(cutoffTime) { // Check if not already marked for deletion alreadyMarked := false for _, marked := range snapshotsToDelete { if marked.ID == snapshot.ID { alreadyMarked = true break } } if !alreadyMarked { snapshotsToDelete = append(snapshotsToDelete, snapshot) } } } } // Delete snapshots for _, snapshot := range snapshotsToDelete { if err := s.snapshotService.DeleteSnapshot(ctx, snapshot.Name, schedule.Recursive); err != nil { s.logger.Error("Failed to delete snapshot for retention", "error", err, "snapshot", snapshot.Name) continue } s.logger.Info("Deleted snapshot for retention", "snapshot", snapshot.Name, "schedule", schedule.Name) } return nil } // snapshotMatchesTemplate checks if a snapshot name matches a template pattern func (s *SnapshotScheduleService) snapshotMatchesTemplate(snapshotName, template string) bool { if template == "" { return false } // Extract the base name from template (everything before the first %) // This handles templates like "auto-%Y-%m-%d" or "daily-backup-%Y%m%d" baseName := template if idx := strings.Index(template, "%"); idx != -1 { baseName = template[:idx] } // If template starts with a placeholder, check if snapshot name matches the pattern // by checking if it starts with any reasonable prefix if baseName == "" { // Template starts with placeholder - use a more lenient check // Check if snapshot name matches common patterns return true // Match all if template is just placeholders } // Check if snapshot name starts with the base name // This handles cases like template "auto-%Y-%m-%d" matching snapshot "auto-2024-01-15" return strings.HasPrefix(snapshotName, baseName) }