package backup import ( "context" "database/sql" "fmt" "os/exec" "strconv" "strings" "time" "github.com/atlasos/calypso/internal/common/config" "github.com/atlasos/calypso/internal/common/database" "github.com/atlasos/calypso/internal/common/logger" ) // Service handles backup job operations type Service struct { db *database.DB baculaDB *database.DB // Direct connection to Bacula database logger *logger.Logger } // NewService creates a new backup service func NewService(db *database.DB, log *logger.Logger) *Service { return &Service{ db: db, logger: log, } } // SetBaculaDatabase sets up direct connection to Bacula database func (s *Service) SetBaculaDatabase(cfg config.DatabaseConfig, baculaDBName string) error { // Create new database config for Bacula database baculaCfg := cfg baculaCfg.Database = baculaDBName // Override database name // Create connection to Bacula database baculaDB, err := database.NewConnection(baculaCfg) if err != nil { return fmt.Errorf("failed to connect to Bacula database: %w", err) } s.baculaDB = baculaDB s.logger.Info("Connected to Bacula database", "database", baculaDBName, "host", cfg.Host, "port", cfg.Port) return nil } // Job represents a backup job type Job struct { ID string `json:"id"` JobID int `json:"job_id"` JobName string `json:"job_name"` ClientName string `json:"client_name"` JobType string `json:"job_type"` JobLevel string `json:"job_level"` Status string `json:"status"` BytesWritten int64 `json:"bytes_written"` FilesWritten int `json:"files_written"` DurationSeconds *int `json:"duration_seconds,omitempty"` StartedAt *time.Time `json:"started_at,omitempty"` EndedAt *time.Time `json:"ended_at,omitempty"` ErrorMessage *string `json:"error_message,omitempty"` StorageName *string `json:"storage_name,omitempty"` PoolName *string `json:"pool_name,omitempty"` VolumeName *string `json:"volume_name,omitempty"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` } // ListJobsOptions represents filtering and pagination options type ListJobsOptions struct { Status string // Filter by status: "Running", "Completed", "Failed", etc. JobType string // Filter by job type: "Backup", "Restore", etc. ClientName string // Filter by client name JobName string // Filter by job name Limit int // Number of results to return Offset int // Offset for pagination } // SyncJobsFromBacula syncs jobs from Bacula/Bareos to the database // Tries to query Bacula database directly first, falls back to bconsole if database access fails func (s *Service) SyncJobsFromBacula(ctx context.Context) error { s.logger.Info("Starting sync from Bacula database", "bacula_db_configured", s.baculaDB != nil) // Check if Bacula database connection is configured if s.baculaDB == nil { s.logger.Warn("Bacula database connection not configured, trying bconsole fallback") return s.syncFromBconsole(ctx) } // Try to query Bacula database directly (if user has access) jobs, err := s.queryBaculaDatabase(ctx) if err != nil { s.logger.Warn("Failed to query Bacula database directly, trying bconsole", "error", err) // Fallback to bconsole return s.syncFromBconsole(ctx) } s.logger.Info("Queried Bacula database", "jobs_found", len(jobs)) if len(jobs) == 0 { s.logger.Debug("No jobs found in Bacula database") return nil } // Upsert jobs to Calypso database successCount := 0 errorCount := 0 for _, job := range jobs { err := s.upsertJob(ctx, job) if err != nil { s.logger.Error("Failed to upsert job", "job_id", job.JobID, "job_name", job.JobName, "error", err) errorCount++ continue } successCount++ s.logger.Debug("Upserted job", "job_id", job.JobID, "job_name", job.JobName) } s.logger.Info("Synced jobs from Bacula database", "total", len(jobs), "success", successCount, "errors", errorCount) if errorCount > 0 { return fmt.Errorf("failed to sync %d out of %d jobs", errorCount, len(jobs)) } return nil } // queryBaculaDatabase queries Bacula database directly // Uses direct connection to Bacula database (no dblink needed) func (s *Service) queryBaculaDatabase(ctx context.Context) ([]Job, error) { // Use direct connection to Bacula database if s.baculaDB == nil { return nil, fmt.Errorf("Bacula database connection not configured") } return s.queryBaculaDirect(ctx) } // queryBaculaDirect queries Job table directly (Bacularis approach) // Assumes Bacula tables are in same database or accessible via search_path func (s *Service) queryBaculaDirect(ctx context.Context) ([]Job, error) { // Bacularis-style query: direct query to Job table with JOIN to Client // This is the standard way Bacularis queries Bacula database query := ` SELECT j.JobId as job_id, j.Name as job_name, COALESCE(c.Name, 'unknown') as client_name, CASE WHEN j.Type = 'B' THEN 'Backup' WHEN j.Type = 'R' THEN 'Restore' WHEN j.Type = 'V' THEN 'Verify' WHEN j.Type = 'C' THEN 'Copy' WHEN j.Type = 'M' THEN 'Migrate' ELSE 'Backup' END as job_type, CASE WHEN j.Level = 'F' THEN 'Full' WHEN j.Level = 'I' THEN 'Incremental' WHEN j.Level = 'D' THEN 'Differential' WHEN j.Level = 'S' THEN 'Since' ELSE 'Full' END as job_level, CASE WHEN j.JobStatus = 'T' THEN 'Running' WHEN j.JobStatus = 'C' THEN 'Completed' WHEN j.JobStatus = 'f' OR j.JobStatus = 'F' THEN 'Failed' WHEN j.JobStatus = 'A' THEN 'Canceled' WHEN j.JobStatus = 'W' THEN 'Waiting' ELSE 'Waiting' END as status, COALESCE(j.JobBytes, 0) as bytes_written, COALESCE(j.JobFiles, 0) as files_written, j.StartTime as started_at, j.EndTime as ended_at FROM Job j LEFT JOIN Client c ON j.ClientId = c.ClientId ORDER BY j.StartTime DESC LIMIT 1000 ` // Use direct connection to Bacula database rows, err := s.baculaDB.QueryContext(ctx, query) if err != nil { return nil, fmt.Errorf("failed to query Bacula Job table: %w", err) } defer rows.Close() var jobs []Job for rows.Next() { var job Job var startedAt, endedAt sql.NullTime err := rows.Scan( &job.JobID, &job.JobName, &job.ClientName, &job.JobType, &job.JobLevel, &job.Status, &job.BytesWritten, &job.FilesWritten, &startedAt, &endedAt, ) if err != nil { s.logger.Error("Failed to scan Bacula job", "error", err) continue } if startedAt.Valid { job.StartedAt = &startedAt.Time } if endedAt.Valid { job.EndedAt = &endedAt.Time // Calculate duration if both start and end times are available if job.StartedAt != nil { duration := int(endedAt.Time.Sub(*job.StartedAt).Seconds()) job.DurationSeconds = &duration } } jobs = append(jobs, job) } if err := rows.Err(); err != nil { return nil, err } if len(jobs) > 0 { s.logger.Info("Successfully queried Bacula database (direct)", "count", len(jobs)) return jobs, nil } return jobs, nil // Return empty list, not an error } // syncFromBconsole syncs jobs using bconsole command (fallback method) func (s *Service) syncFromBconsole(ctx context.Context) error { // Execute bconsole command to list jobs cmd := exec.CommandContext(ctx, "sh", "-c", "echo -e 'list jobs\nquit' | bconsole") output, err := cmd.CombinedOutput() if err != nil { s.logger.Debug("Failed to execute bconsole", "error", err, "output", string(output)) return nil // Don't fail, just return empty } if len(output) == 0 { s.logger.Debug("bconsole returned empty output") return nil } // Parse bconsole output jobs := s.parseBconsoleOutput(ctx, string(output)) if len(jobs) == 0 { s.logger.Debug("No jobs found in bconsole output") return nil } // Upsert jobs to database successCount := 0 for _, job := range jobs { err := s.upsertJob(ctx, job) if err != nil { s.logger.Error("Failed to upsert job", "job_id", job.JobID, "error", err) continue } successCount++ } s.logger.Info("Synced jobs from bconsole", "total", len(jobs), "success", successCount) return nil } // parseBconsoleOutput parses bconsole "list jobs" output func (s *Service) parseBconsoleOutput(ctx context.Context, output string) []Job { var jobs []Job lines := strings.Split(output, "\n") // Skip header lines until we find the data rows inDataSection := false for _, line := range lines { line = strings.TrimSpace(line) // Skip empty lines and separators if line == "" || strings.HasPrefix(line, "+") { continue } // Start data section when we see header if strings.HasPrefix(line, "| jobid") { inDataSection = true continue } // Stop at footer separator if strings.HasPrefix(line, "*") { break } if !inDataSection { continue } // Parse data row: | jobid | name | starttime | type | level | jobfiles | jobbytes | jobstatus | if strings.HasPrefix(line, "|") { parts := strings.Split(line, "|") if len(parts) < 9 { continue } // Extract fields (skip first empty part) jobIDStr := strings.TrimSpace(parts[1]) jobName := strings.TrimSpace(parts[2]) startTimeStr := strings.TrimSpace(parts[3]) jobTypeChar := strings.TrimSpace(parts[4]) jobLevelChar := strings.TrimSpace(parts[5]) jobFilesStr := strings.TrimSpace(parts[6]) jobBytesStr := strings.TrimSpace(parts[7]) jobStatusChar := strings.TrimSpace(parts[8]) // Parse job ID jobID, err := strconv.Atoi(jobIDStr) if err != nil { s.logger.Warn("Failed to parse job ID", "value", jobIDStr, "error", err) continue } // Parse start time var startedAt *time.Time if startTimeStr != "" && startTimeStr != "-" { // Format: 2025-12-27 23:05:02 parsedTime, err := time.Parse("2006-01-02 15:04:05", startTimeStr) if err == nil { startedAt = &parsedTime } } // Map job type jobType := "Backup" switch jobTypeChar { case "B": jobType = "Backup" case "R": jobType = "Restore" case "V": jobType = "Verify" case "C": jobType = "Copy" case "M": jobType = "Migrate" } // Map job level jobLevel := "Full" switch jobLevelChar { case "F": jobLevel = "Full" case "I": jobLevel = "Incremental" case "D": jobLevel = "Differential" case "S": jobLevel = "Since" } // Parse files and bytes filesWritten := 0 if jobFilesStr != "" && jobFilesStr != "-" { if f, err := strconv.Atoi(jobFilesStr); err == nil { filesWritten = f } } bytesWritten := int64(0) if jobBytesStr != "" && jobBytesStr != "-" { if b, err := strconv.ParseInt(jobBytesStr, 10, 64); err == nil { bytesWritten = b } } // Map job status status := "Waiting" switch strings.ToLower(jobStatusChar) { case "t", "T": status = "Running" case "c", "C": status = "Completed" case "f", "F": status = "Failed" case "A": status = "Canceled" case "W": status = "Waiting" } // Try to extract client name from job name (common pattern: JobName-ClientName) clientName := "unknown" // For now, use job name as client name if it looks like a client name // In real implementation, we'd query job details from Bacula if jobName != "" { // Try to get client name from job details clientNameFromJob := s.getClientNameFromJob(ctx, jobID) if clientNameFromJob != "" { clientName = clientNameFromJob } else { // Fallback: use job name as client name clientName = jobName } } job := Job{ JobID: jobID, JobName: jobName, ClientName: clientName, JobType: jobType, JobLevel: jobLevel, Status: status, BytesWritten: bytesWritten, FilesWritten: filesWritten, StartedAt: startedAt, } jobs = append(jobs, job) } } return jobs } // getClientNameFromJob gets client name from job details using bconsole func (s *Service) getClientNameFromJob(ctx context.Context, jobID int) string { // Execute bconsole to get job details cmd := exec.CommandContext(ctx, "sh", "-c", fmt.Sprintf("echo -e 'list job jobid=%d\nquit' | bconsole", jobID)) output, err := cmd.CombinedOutput() if err != nil { s.logger.Debug("Failed to get job details", "job_id", jobID, "error", err) return "" } // Parse output to find Client line lines := strings.Split(string(output), "\n") for _, line := range lines { line = strings.TrimSpace(line) if strings.HasPrefix(line, "Client:") { parts := strings.Split(line, ":") if len(parts) >= 2 { return strings.TrimSpace(parts[1]) } } } return "" } // upsertJob inserts or updates a job in the database func (s *Service) upsertJob(ctx context.Context, job Job) error { query := ` INSERT INTO backup_jobs ( job_id, job_name, client_name, job_type, job_level, status, bytes_written, files_written, started_at, ended_at, duration_seconds, updated_at ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, NOW()) ON CONFLICT (job_id) DO UPDATE SET job_name = EXCLUDED.job_name, client_name = EXCLUDED.client_name, job_type = EXCLUDED.job_type, job_level = EXCLUDED.job_level, status = EXCLUDED.status, bytes_written = EXCLUDED.bytes_written, files_written = EXCLUDED.files_written, started_at = EXCLUDED.started_at, ended_at = EXCLUDED.ended_at, duration_seconds = EXCLUDED.duration_seconds, updated_at = NOW() ` // Use job name as client name if client_name is empty (we'll improve this later) clientName := job.ClientName if clientName == "" { clientName = "unknown" } result, err := s.db.ExecContext(ctx, query, job.JobID, job.JobName, clientName, job.JobType, job.JobLevel, job.Status, job.BytesWritten, job.FilesWritten, job.StartedAt, job.EndedAt, job.DurationSeconds, ) if err != nil { s.logger.Error("Database error in upsertJob", "job_id", job.JobID, "error", err) return err } rowsAffected, _ := result.RowsAffected() s.logger.Debug("Upserted job to database", "job_id", job.JobID, "rows_affected", rowsAffected) return nil } // ListJobs lists backup jobs with optional filters func (s *Service) ListJobs(ctx context.Context, opts ListJobsOptions) ([]Job, int, error) { // Try to sync jobs from Bacula first (non-blocking - if it fails, continue with database) // Don't return error if sync fails, just log it and continue // This allows the API to work even if bconsole is not available s.logger.Info("ListJobs called, syncing from Bacula first") syncErr := s.SyncJobsFromBacula(ctx) if syncErr != nil { s.logger.Warn("Failed to sync jobs from Bacula, using database only", "error", syncErr) // Continue anyway - we'll use whatever is in the database } else { s.logger.Info("Successfully synced jobs from Bacula") } // Build WHERE clause whereClauses := []string{"1=1"} args := []interface{}{} argIndex := 1 if opts.Status != "" { whereClauses = append(whereClauses, fmt.Sprintf("status = $%d", argIndex)) args = append(args, opts.Status) argIndex++ } if opts.JobType != "" { whereClauses = append(whereClauses, fmt.Sprintf("job_type = $%d", argIndex)) args = append(args, opts.JobType) argIndex++ } if opts.ClientName != "" { whereClauses = append(whereClauses, fmt.Sprintf("client_name ILIKE $%d", argIndex)) args = append(args, "%"+opts.ClientName+"%") argIndex++ } if opts.JobName != "" { whereClauses = append(whereClauses, fmt.Sprintf("job_name ILIKE $%d", argIndex)) args = append(args, "%"+opts.JobName+"%") argIndex++ } whereClause := "" if len(whereClauses) > 0 { whereClause = "WHERE " + whereClauses[0] for i := 1; i < len(whereClauses); i++ { whereClause += " AND " + whereClauses[i] } } // Get total count countQuery := fmt.Sprintf("SELECT COUNT(*) FROM backup_jobs %s", whereClause) var totalCount int err := s.db.QueryRowContext(ctx, countQuery, args...).Scan(&totalCount) if err != nil { return nil, 0, fmt.Errorf("failed to count jobs: %w", err) } // Set default limit limit := opts.Limit if limit <= 0 { limit = 50 } if limit > 100 { limit = 100 } // Build query with pagination query := fmt.Sprintf(` SELECT id, job_id, job_name, client_name, job_type, job_level, status, bytes_written, files_written, duration_seconds, started_at, ended_at, error_message, storage_name, pool_name, volume_name, created_at, updated_at FROM backup_jobs %s ORDER BY started_at DESC NULLS LAST, created_at DESC LIMIT $%d OFFSET $%d `, whereClause, argIndex, argIndex+1) args = append(args, limit, opts.Offset) rows, err := s.db.QueryContext(ctx, query, args...) if err != nil { return nil, 0, fmt.Errorf("failed to query jobs: %w", err) } defer rows.Close() var jobs []Job for rows.Next() { var job Job var durationSeconds sql.NullInt64 var startedAt, endedAt sql.NullTime var errorMessage, storageName, poolName, volumeName sql.NullString err := rows.Scan( &job.ID, &job.JobID, &job.JobName, &job.ClientName, &job.JobType, &job.JobLevel, &job.Status, &job.BytesWritten, &job.FilesWritten, &durationSeconds, &startedAt, &endedAt, &errorMessage, &storageName, &poolName, &volumeName, &job.CreatedAt, &job.UpdatedAt, ) if err != nil { s.logger.Error("Failed to scan job", "error", err) continue } if durationSeconds.Valid { dur := int(durationSeconds.Int64) job.DurationSeconds = &dur } if startedAt.Valid { job.StartedAt = &startedAt.Time } if endedAt.Valid { job.EndedAt = &endedAt.Time } if errorMessage.Valid { job.ErrorMessage = &errorMessage.String } if storageName.Valid { job.StorageName = &storageName.String } if poolName.Valid { job.PoolName = &poolName.String } if volumeName.Valid { job.VolumeName = &volumeName.String } jobs = append(jobs, job) } return jobs, totalCount, rows.Err() } // GetJob retrieves a job by ID func (s *Service) GetJob(ctx context.Context, id string) (*Job, error) { query := ` SELECT id, job_id, job_name, client_name, job_type, job_level, status, bytes_written, files_written, duration_seconds, started_at, ended_at, error_message, storage_name, pool_name, volume_name, created_at, updated_at FROM backup_jobs WHERE id = $1 ` var job Job var durationSeconds sql.NullInt64 var startedAt, endedAt sql.NullTime var errorMessage, storageName, poolName, volumeName sql.NullString err := s.db.QueryRowContext(ctx, query, id).Scan( &job.ID, &job.JobID, &job.JobName, &job.ClientName, &job.JobType, &job.JobLevel, &job.Status, &job.BytesWritten, &job.FilesWritten, &durationSeconds, &startedAt, &endedAt, &errorMessage, &storageName, &poolName, &volumeName, &job.CreatedAt, &job.UpdatedAt, ) if err != nil { if err == sql.ErrNoRows { return nil, fmt.Errorf("job not found") } return nil, fmt.Errorf("failed to get job: %w", err) } if durationSeconds.Valid { dur := int(durationSeconds.Int64) job.DurationSeconds = &dur } if startedAt.Valid { job.StartedAt = &startedAt.Time } if endedAt.Valid { job.EndedAt = &endedAt.Time } if errorMessage.Valid { job.ErrorMessage = &errorMessage.String } if storageName.Valid { job.StorageName = &storageName.String } if poolName.Valid { job.PoolName = &poolName.String } if volumeName.Valid { job.VolumeName = &volumeName.String } return &job, nil } // CreateJobRequest represents a request to create a new backup job type CreateJobRequest struct { JobName string `json:"job_name" binding:"required"` ClientName string `json:"client_name" binding:"required"` JobType string `json:"job_type" binding:"required"` // 'Backup', 'Restore', 'Verify', 'Copy', 'Migrate' JobLevel string `json:"job_level" binding:"required"` // 'Full', 'Incremental', 'Differential', 'Since' StorageName *string `json:"storage_name,omitempty"` PoolName *string `json:"pool_name,omitempty"` } // CreateJob creates a new backup job func (s *Service) CreateJob(ctx context.Context, req CreateJobRequest) (*Job, error) { // Generate a unique job ID (in real implementation, this would come from Bareos) // For now, we'll use a simple incrementing approach or timestamp-based ID var jobID int err := s.db.QueryRowContext(ctx, ` SELECT COALESCE(MAX(job_id), 0) + 1 FROM backup_jobs `).Scan(&jobID) if err != nil { return nil, fmt.Errorf("failed to generate job ID: %w", err) } // Insert the job into database query := ` INSERT INTO backup_jobs ( job_id, job_name, client_name, job_type, job_level, status, bytes_written, files_written, storage_name, pool_name, started_at ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW()) RETURNING id, job_id, job_name, client_name, job_type, job_level, status, bytes_written, files_written, duration_seconds, started_at, ended_at, error_message, storage_name, pool_name, volume_name, created_at, updated_at ` var job Job var durationSeconds sql.NullInt64 var startedAt, endedAt sql.NullTime var errorMessage, storageName, poolName, volumeName sql.NullString err = s.db.QueryRowContext(ctx, query, jobID, req.JobName, req.ClientName, req.JobType, req.JobLevel, "Waiting", 0, 0, req.StorageName, req.PoolName, ).Scan( &job.ID, &job.JobID, &job.JobName, &job.ClientName, &job.JobType, &job.JobLevel, &job.Status, &job.BytesWritten, &job.FilesWritten, &durationSeconds, &startedAt, &endedAt, &errorMessage, &storageName, &poolName, &volumeName, &job.CreatedAt, &job.UpdatedAt, ) if err != nil { return nil, fmt.Errorf("failed to create job: %w", err) } if durationSeconds.Valid { dur := int(durationSeconds.Int64) job.DurationSeconds = &dur } if startedAt.Valid { job.StartedAt = &startedAt.Time } if endedAt.Valid { job.EndedAt = &endedAt.Time } if errorMessage.Valid { job.ErrorMessage = &errorMessage.String } if storageName.Valid { job.StorageName = &storageName.String } if poolName.Valid { job.PoolName = &poolName.String } if volumeName.Valid { job.VolumeName = &volumeName.String } s.logger.Info("Backup job created", "job_id", job.JobID, "job_name", job.JobName, "client_name", job.ClientName, "job_type", job.JobType, ) return &job, nil }