Files
calypso/backend/internal/backup/service.go
2025-12-30 23:31:07 +07:00

1118 lines
32 KiB
Go

package backup
import (
"context"
"database/sql"
"fmt"
"os/exec"
"strconv"
"strings"
"time"
"github.com/atlasos/calypso/internal/common/config"
"github.com/atlasos/calypso/internal/common/database"
"github.com/atlasos/calypso/internal/common/logger"
)
// Service handles backup job operations
type Service struct {
db *database.DB
baculaDB *database.DB // Direct connection to Bacula database
logger *logger.Logger
}
// NewService creates a new backup service
func NewService(db *database.DB, log *logger.Logger) *Service {
return &Service{
db: db,
logger: log,
}
}
// SetBaculaDatabase sets up direct connection to Bacula database
func (s *Service) SetBaculaDatabase(cfg config.DatabaseConfig, baculaDBName string) error {
// Create new database config for Bacula database
baculaCfg := cfg
baculaCfg.Database = baculaDBName // Override database name
// Create connection to Bacula database
baculaDB, err := database.NewConnection(baculaCfg)
if err != nil {
return fmt.Errorf("failed to connect to Bacula database: %w", err)
}
s.baculaDB = baculaDB
s.logger.Info("Connected to Bacula database", "database", baculaDBName, "host", cfg.Host, "port", cfg.Port)
return nil
}
// Job represents a backup job
type Job struct {
ID string `json:"id"`
JobID int `json:"job_id"`
JobName string `json:"job_name"`
ClientName string `json:"client_name"`
JobType string `json:"job_type"`
JobLevel string `json:"job_level"`
Status string `json:"status"`
BytesWritten int64 `json:"bytes_written"`
FilesWritten int `json:"files_written"`
DurationSeconds *int `json:"duration_seconds,omitempty"`
StartedAt *time.Time `json:"started_at,omitempty"`
EndedAt *time.Time `json:"ended_at,omitempty"`
ErrorMessage *string `json:"error_message,omitempty"`
StorageName *string `json:"storage_name,omitempty"`
PoolName *string `json:"pool_name,omitempty"`
VolumeName *string `json:"volume_name,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// ListJobsOptions represents filtering and pagination options
type ListJobsOptions struct {
Status string // Filter by status: "Running", "Completed", "Failed", etc.
JobType string // Filter by job type: "Backup", "Restore", etc.
ClientName string // Filter by client name
JobName string // Filter by job name
Limit int // Number of results to return
Offset int // Offset for pagination
}
// Client represents a backup client
type Client struct {
ClientID int `json:"client_id"`
Name string `json:"name"`
Uname *string `json:"uname,omitempty"`
Enabled bool `json:"enabled"`
AutoPrune *bool `json:"auto_prune,omitempty"`
FileRetention *int64 `json:"file_retention,omitempty"`
JobRetention *int64 `json:"job_retention,omitempty"`
LastBackupAt *time.Time `json:"last_backup_at,omitempty"`
TotalJobs *int `json:"total_jobs,omitempty"`
TotalBytes *int64 `json:"total_bytes,omitempty"`
Status *string `json:"status,omitempty"` // "online" or "offline"
}
// ListClientsOptions represents filtering options for clients
type ListClientsOptions struct {
Enabled *bool // Filter by enabled status (nil = all)
Search string // Search by client name
}
// SyncJobsFromBacula syncs jobs from Bacula/Bareos to the database
// Tries to query Bacula database directly first, falls back to bconsole if database access fails
func (s *Service) SyncJobsFromBacula(ctx context.Context) error {
s.logger.Info("Starting sync from Bacula database", "bacula_db_configured", s.baculaDB != nil)
// Check if Bacula database connection is configured
if s.baculaDB == nil {
s.logger.Warn("Bacula database connection not configured, trying bconsole fallback")
return s.syncFromBconsole(ctx)
}
// Try to query Bacula database directly (if user has access)
jobs, err := s.queryBaculaDatabase(ctx)
if err != nil {
s.logger.Warn("Failed to query Bacula database directly, trying bconsole", "error", err)
// Fallback to bconsole
return s.syncFromBconsole(ctx)
}
s.logger.Info("Queried Bacula database", "jobs_found", len(jobs))
if len(jobs) == 0 {
s.logger.Debug("No jobs found in Bacula database")
return nil
}
// Upsert jobs to Calypso database
successCount := 0
errorCount := 0
for _, job := range jobs {
err := s.upsertJob(ctx, job)
if err != nil {
s.logger.Error("Failed to upsert job", "job_id", job.JobID, "job_name", job.JobName, "error", err)
errorCount++
continue
}
successCount++
s.logger.Debug("Upserted job", "job_id", job.JobID, "job_name", job.JobName)
}
s.logger.Info("Synced jobs from Bacula database", "total", len(jobs), "success", successCount, "errors", errorCount)
if errorCount > 0 {
return fmt.Errorf("failed to sync %d out of %d jobs", errorCount, len(jobs))
}
return nil
}
// queryBaculaDatabase queries Bacula database directly
// Uses direct connection to Bacula database (no dblink needed)
func (s *Service) queryBaculaDatabase(ctx context.Context) ([]Job, error) {
// Use direct connection to Bacula database
if s.baculaDB == nil {
return nil, fmt.Errorf("Bacula database connection not configured")
}
return s.queryBaculaDirect(ctx)
}
// queryBaculaDirect queries Job table directly (Bacularis approach)
// Assumes Bacula tables are in same database or accessible via search_path
func (s *Service) queryBaculaDirect(ctx context.Context) ([]Job, error) {
// Bacularis-style query: direct query to Job table with JOIN to Client
// This is the standard way Bacularis queries Bacula database
query := `
SELECT
j.JobId as job_id,
j.Name as job_name,
COALESCE(c.Name, 'unknown') as client_name,
CASE
WHEN j.Type = 'B' THEN 'Backup'
WHEN j.Type = 'R' THEN 'Restore'
WHEN j.Type = 'V' THEN 'Verify'
WHEN j.Type = 'C' THEN 'Copy'
WHEN j.Type = 'M' THEN 'Migrate'
ELSE 'Backup'
END as job_type,
CASE
WHEN j.Level = 'F' THEN 'Full'
WHEN j.Level = 'I' THEN 'Incremental'
WHEN j.Level = 'D' THEN 'Differential'
WHEN j.Level = 'S' THEN 'Since'
ELSE 'Full'
END as job_level,
CASE
WHEN j.JobStatus = 'T' THEN 'Running'
WHEN j.JobStatus = 'C' THEN 'Completed'
WHEN j.JobStatus = 'f' OR j.JobStatus = 'F' THEN 'Failed'
WHEN j.JobStatus = 'A' THEN 'Canceled'
WHEN j.JobStatus = 'W' THEN 'Waiting'
ELSE 'Waiting'
END as status,
COALESCE(j.JobBytes, 0) as bytes_written,
COALESCE(j.JobFiles, 0) as files_written,
j.StartTime as started_at,
j.EndTime as ended_at
FROM Job j
LEFT JOIN Client c ON j.ClientId = c.ClientId
ORDER BY j.StartTime DESC
LIMIT 1000
`
// Use direct connection to Bacula database
rows, err := s.baculaDB.QueryContext(ctx, query)
if err != nil {
return nil, fmt.Errorf("failed to query Bacula Job table: %w", err)
}
defer rows.Close()
var jobs []Job
for rows.Next() {
var job Job
var startedAt, endedAt sql.NullTime
err := rows.Scan(
&job.JobID, &job.JobName, &job.ClientName,
&job.JobType, &job.JobLevel, &job.Status,
&job.BytesWritten, &job.FilesWritten, &startedAt, &endedAt,
)
if err != nil {
s.logger.Error("Failed to scan Bacula job", "error", err)
continue
}
if startedAt.Valid {
job.StartedAt = &startedAt.Time
}
if endedAt.Valid {
job.EndedAt = &endedAt.Time
// Calculate duration if both start and end times are available
if job.StartedAt != nil {
duration := int(endedAt.Time.Sub(*job.StartedAt).Seconds())
job.DurationSeconds = &duration
}
}
jobs = append(jobs, job)
}
if err := rows.Err(); err != nil {
return nil, err
}
if len(jobs) > 0 {
s.logger.Info("Successfully queried Bacula database (direct)", "count", len(jobs))
return jobs, nil
}
return jobs, nil // Return empty list, not an error
}
// syncFromBconsole syncs jobs using bconsole command (fallback method)
func (s *Service) syncFromBconsole(ctx context.Context) error {
// Execute bconsole command to list jobs
cmd := exec.CommandContext(ctx, "sh", "-c", "echo -e 'list jobs\nquit' | bconsole")
output, err := cmd.CombinedOutput()
if err != nil {
s.logger.Debug("Failed to execute bconsole", "error", err, "output", string(output))
return nil // Don't fail, just return empty
}
if len(output) == 0 {
s.logger.Debug("bconsole returned empty output")
return nil
}
// Parse bconsole output
jobs := s.parseBconsoleOutput(ctx, string(output))
if len(jobs) == 0 {
s.logger.Debug("No jobs found in bconsole output")
return nil
}
// Upsert jobs to database
successCount := 0
for _, job := range jobs {
err := s.upsertJob(ctx, job)
if err != nil {
s.logger.Error("Failed to upsert job", "job_id", job.JobID, "error", err)
continue
}
successCount++
}
s.logger.Info("Synced jobs from bconsole", "total", len(jobs), "success", successCount)
return nil
}
// parseBconsoleOutput parses bconsole "list jobs" output
func (s *Service) parseBconsoleOutput(ctx context.Context, output string) []Job {
var jobs []Job
lines := strings.Split(output, "\n")
// Skip header lines until we find the data rows
inDataSection := false
for _, line := range lines {
line = strings.TrimSpace(line)
// Skip empty lines and separators
if line == "" || strings.HasPrefix(line, "+") {
continue
}
// Start data section when we see header
if strings.HasPrefix(line, "| jobid") {
inDataSection = true
continue
}
// Stop at footer separator
if strings.HasPrefix(line, "*") {
break
}
if !inDataSection {
continue
}
// Parse data row: | jobid | name | starttime | type | level | jobfiles | jobbytes | jobstatus |
if strings.HasPrefix(line, "|") {
parts := strings.Split(line, "|")
if len(parts) < 9 {
continue
}
// Extract fields (skip first empty part)
jobIDStr := strings.TrimSpace(parts[1])
jobName := strings.TrimSpace(parts[2])
startTimeStr := strings.TrimSpace(parts[3])
jobTypeChar := strings.TrimSpace(parts[4])
jobLevelChar := strings.TrimSpace(parts[5])
jobFilesStr := strings.TrimSpace(parts[6])
jobBytesStr := strings.TrimSpace(parts[7])
jobStatusChar := strings.TrimSpace(parts[8])
// Parse job ID
jobID, err := strconv.Atoi(jobIDStr)
if err != nil {
s.logger.Warn("Failed to parse job ID", "value", jobIDStr, "error", err)
continue
}
// Parse start time
var startedAt *time.Time
if startTimeStr != "" && startTimeStr != "-" {
// Format: 2025-12-27 23:05:02
parsedTime, err := time.Parse("2006-01-02 15:04:05", startTimeStr)
if err == nil {
startedAt = &parsedTime
}
}
// Map job type
jobType := "Backup"
switch jobTypeChar {
case "B":
jobType = "Backup"
case "R":
jobType = "Restore"
case "V":
jobType = "Verify"
case "C":
jobType = "Copy"
case "M":
jobType = "Migrate"
}
// Map job level
jobLevel := "Full"
switch jobLevelChar {
case "F":
jobLevel = "Full"
case "I":
jobLevel = "Incremental"
case "D":
jobLevel = "Differential"
case "S":
jobLevel = "Since"
}
// Parse files and bytes
filesWritten := 0
if jobFilesStr != "" && jobFilesStr != "-" {
if f, err := strconv.Atoi(jobFilesStr); err == nil {
filesWritten = f
}
}
bytesWritten := int64(0)
if jobBytesStr != "" && jobBytesStr != "-" {
if b, err := strconv.ParseInt(jobBytesStr, 10, 64); err == nil {
bytesWritten = b
}
}
// Map job status
status := "Waiting"
switch strings.ToLower(jobStatusChar) {
case "t", "T":
status = "Running"
case "c", "C":
status = "Completed"
case "f", "F":
status = "Failed"
case "A":
status = "Canceled"
case "W":
status = "Waiting"
}
// Try to extract client name from job name (common pattern: JobName-ClientName)
clientName := "unknown"
// For now, use job name as client name if it looks like a client name
// In real implementation, we'd query job details from Bacula
if jobName != "" {
// Try to get client name from job details
clientNameFromJob := s.getClientNameFromJob(ctx, jobID)
if clientNameFromJob != "" {
clientName = clientNameFromJob
} else {
// Fallback: use job name as client name
clientName = jobName
}
}
job := Job{
JobID: jobID,
JobName: jobName,
ClientName: clientName,
JobType: jobType,
JobLevel: jobLevel,
Status: status,
BytesWritten: bytesWritten,
FilesWritten: filesWritten,
StartedAt: startedAt,
}
jobs = append(jobs, job)
}
}
return jobs
}
// getClientNameFromJob gets client name from job details using bconsole
func (s *Service) getClientNameFromJob(ctx context.Context, jobID int) string {
// Execute bconsole to get job details
cmd := exec.CommandContext(ctx, "sh", "-c", fmt.Sprintf("echo -e 'list job jobid=%d\nquit' | bconsole", jobID))
output, err := cmd.CombinedOutput()
if err != nil {
s.logger.Debug("Failed to get job details", "job_id", jobID, "error", err)
return ""
}
// Parse output to find Client line
lines := strings.Split(string(output), "\n")
for _, line := range lines {
line = strings.TrimSpace(line)
if strings.HasPrefix(line, "Client:") {
parts := strings.Split(line, ":")
if len(parts) >= 2 {
return strings.TrimSpace(parts[1])
}
}
}
return ""
}
// ExecuteBconsoleCommand executes a bconsole command and returns the output
func (s *Service) ExecuteBconsoleCommand(ctx context.Context, command string) (string, error) {
// Sanitize command
command = strings.TrimSpace(command)
if command == "" {
return "", fmt.Errorf("command cannot be empty")
}
// Remove any existing quit commands from user input
command = strings.TrimSuffix(strings.ToLower(command), "quit")
command = strings.TrimSpace(command)
// Ensure command ends with quit
commandWithQuit := command + "\nquit"
// Use printf instead of echo -e for better compatibility
// Escape single quotes in command
escapedCommand := strings.ReplaceAll(commandWithQuit, "'", "'\"'\"'")
// Execute bconsole command using printf to avoid echo -e issues
cmd := exec.CommandContext(ctx, "sh", "-c", fmt.Sprintf("printf '%%s\\n' '%s' | bconsole", escapedCommand))
output, err := cmd.CombinedOutput()
if err != nil {
// bconsole may return non-zero exit code even on success, so check output
outputStr := string(output)
if len(outputStr) > 0 {
// If there's output, return it even if there's an error
return outputStr, nil
}
return outputStr, fmt.Errorf("bconsole error: %w", err)
}
return string(output), nil
}
// ListClients lists all backup clients from Bacula database Client table
// Falls back to bconsole if database connection is not available
func (s *Service) ListClients(ctx context.Context, opts ListClientsOptions) ([]Client, error) {
// Try database first if available
if s.baculaDB != nil {
clients, err := s.queryClientsFromDatabase(ctx, opts)
if err == nil {
s.logger.Debug("Queried clients from Bacula database", "count", len(clients))
return clients, nil
}
s.logger.Warn("Failed to query clients from database, trying bconsole fallback", "error", err)
}
// Fallback to bconsole
s.logger.Info("Using bconsole fallback for list clients")
return s.queryClientsFromBconsole(ctx, opts)
}
// queryClientsFromDatabase queries clients from Bacula database
func (s *Service) queryClientsFromDatabase(ctx context.Context, opts ListClientsOptions) ([]Client, error) {
// First, try a simple query to check if Client table exists and has data
simpleQuery := `SELECT COUNT(*) FROM Client`
var count int
err := s.baculaDB.QueryRowContext(ctx, simpleQuery).Scan(&count)
if err != nil {
s.logger.Warn("Failed to count clients from Client table", "error", err)
return nil, fmt.Errorf("failed to query Client table: %w", err)
}
s.logger.Debug("Total clients in database", "count", count)
if count == 0 {
s.logger.Info("No clients found in Bacula database")
return []Client{}, nil
}
// Build query with filters
query := `
SELECT
c.ClientId,
c.Name,
c.Uname,
true as enabled,
c.AutoPrune,
c.FileRetention,
c.JobRetention,
MAX(j.StartTime) as last_backup_at,
COUNT(DISTINCT j.JobId) as total_jobs,
COALESCE(SUM(j.JobBytes), 0) as total_bytes
FROM Client c
LEFT JOIN Job j ON c.ClientId = j.ClientId
WHERE 1=1
`
args := []interface{}{}
argIndex := 1
if opts.Enabled != nil {
query += fmt.Sprintf(" AND true = $%d", argIndex)
args = append(args, *opts.Enabled)
argIndex++
}
if opts.Search != "" {
query += fmt.Sprintf(" AND c.Name ILIKE $%d", argIndex)
args = append(args, "%"+opts.Search+"%")
argIndex++
}
query += " GROUP BY c.ClientId, c.Name, c.Uname, c.AutoPrune, c.FileRetention, c.JobRetention"
query += " ORDER BY c.Name"
s.logger.Debug("Executing clients query", "query", query, "args", args)
rows, err := s.baculaDB.QueryContext(ctx, query, args...)
if err != nil {
s.logger.Error("Failed to execute clients query", "error", err, "query", query)
return nil, fmt.Errorf("failed to query clients from Bacula database: %w", err)
}
defer rows.Close()
var clients []Client
for rows.Next() {
var client Client
var uname sql.NullString
var autoPrune sql.NullBool
var fileRetention, jobRetention sql.NullInt64
var lastBackupAt sql.NullTime
var totalJobs sql.NullInt64
var totalBytes sql.NullInt64
err := rows.Scan(
&client.ClientID,
&client.Name,
&uname,
&client.Enabled,
&autoPrune,
&fileRetention,
&jobRetention,
&lastBackupAt,
&totalJobs,
&totalBytes,
)
if err != nil {
s.logger.Error("Failed to scan client row", "error", err)
continue
}
if uname.Valid {
client.Uname = &uname.String
}
if autoPrune.Valid {
client.AutoPrune = &autoPrune.Bool
}
if fileRetention.Valid {
val := fileRetention.Int64
client.FileRetention = &val
}
if jobRetention.Valid {
val := jobRetention.Int64
client.JobRetention = &val
}
if lastBackupAt.Valid {
client.LastBackupAt = &lastBackupAt.Time
}
if totalJobs.Valid {
val := int(totalJobs.Int64)
client.TotalJobs = &val
}
if totalBytes.Valid {
val := totalBytes.Int64
client.TotalBytes = &val
}
// Determine client status based on enabled and last backup
// If client is enabled and has recent backup (within 24 hours), consider it online
// Otherwise, mark as offline
if client.Enabled {
if lastBackupAt.Valid {
timeSinceLastBackup := time.Since(lastBackupAt.Time)
if timeSinceLastBackup < 24*time.Hour {
status := "online"
client.Status = &status
} else {
status := "offline"
client.Status = &status
}
} else {
// No backup yet, but enabled - assume online
status := "online"
client.Status = &status
}
} else {
status := "offline"
client.Status = &status
}
clients = append(clients, client)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating client rows: %w", err)
}
s.logger.Debug("Queried clients from Bacula database", "count", len(clients))
return clients, nil
}
// queryClientsFromBconsole queries clients using bconsole command (fallback method)
func (s *Service) queryClientsFromBconsole(ctx context.Context, opts ListClientsOptions) ([]Client, error) {
// Execute bconsole command to list clients
s.logger.Debug("Executing bconsole list clients command")
output, err := s.ExecuteBconsoleCommand(ctx, "list clients")
if err != nil {
s.logger.Error("Failed to execute bconsole list clients", "error", err)
return nil, fmt.Errorf("failed to execute bconsole list clients: %w", err)
}
previewLen := 200
if len(output) < previewLen {
previewLen = len(output)
}
s.logger.Debug("bconsole output", "output_length", len(output), "output_preview", output[:previewLen])
// Parse bconsole output
clients := s.parseBconsoleClientsOutput(output)
s.logger.Debug("Parsed clients from bconsole", "count", len(clients))
// Apply filters
filtered := []Client{}
for _, client := range clients {
if opts.Enabled != nil && client.Enabled != *opts.Enabled {
continue
}
if opts.Search != "" && !strings.Contains(strings.ToLower(client.Name), strings.ToLower(opts.Search)) {
continue
}
filtered = append(filtered, client)
}
return filtered, nil
}
// parseBconsoleClientsOutput parses bconsole "list clients" output
func (s *Service) parseBconsoleClientsOutput(output string) []Client {
var clients []Client
lines := strings.Split(output, "\n")
inTable := false
headerFound := false
for _, line := range lines {
line = strings.TrimSpace(line)
// Skip connection messages and command echo
if strings.Contains(line, "Connecting to Director") ||
strings.Contains(line, "Enter a period") ||
strings.Contains(line, "list clients") ||
strings.Contains(line, "quit") ||
strings.Contains(line, "You have messages") ||
strings.Contains(line, "Automatically selected") ||
strings.Contains(line, "Using Catalog") {
continue
}
// Detect table header
if !headerFound && (strings.Contains(line, "Name") || strings.Contains(line, "| Name")) {
headerFound = true
inTable = true
continue
}
// Detect table separator
if strings.HasPrefix(line, "+") && strings.Contains(line, "-") {
continue
}
// Skip empty lines
if line == "" {
continue
}
// Parse table rows (format: | clientname | address |)
if inTable && strings.Contains(line, "|") {
parts := strings.Split(line, "|")
if len(parts) >= 2 {
clientName := strings.TrimSpace(parts[1])
clientName = strings.Trim(clientName, "\"'")
if clientName == "" || clientName == "Name" || strings.HasPrefix(clientName, "-") {
continue
}
client := Client{
ClientID: 0,
Name: clientName,
Enabled: true,
}
clients = append(clients, client)
}
} else if inTable && !strings.Contains(line, "|") {
// Fallback format
parts := strings.Fields(line)
if len(parts) > 0 {
clientName := parts[0]
clientName = strings.Trim(clientName, "\"'")
if clientName != "" && clientName != "Name" && !strings.HasPrefix(clientName, "-") {
client := Client{
ClientID: 0,
Name: clientName,
Enabled: true,
}
clients = append(clients, client)
}
}
}
}
return clients
}
// upsertJob inserts or updates a job in the database
func (s *Service) upsertJob(ctx context.Context, job Job) error {
query := `
INSERT INTO backup_jobs (
job_id, job_name, client_name, job_type, job_level, status,
bytes_written, files_written, started_at, ended_at, duration_seconds, updated_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, NOW())
ON CONFLICT (job_id) DO UPDATE SET
job_name = EXCLUDED.job_name,
client_name = EXCLUDED.client_name,
job_type = EXCLUDED.job_type,
job_level = EXCLUDED.job_level,
status = EXCLUDED.status,
bytes_written = EXCLUDED.bytes_written,
files_written = EXCLUDED.files_written,
started_at = EXCLUDED.started_at,
ended_at = EXCLUDED.ended_at,
duration_seconds = EXCLUDED.duration_seconds,
updated_at = NOW()
`
// Use job name as client name if client_name is empty (we'll improve this later)
clientName := job.ClientName
if clientName == "" {
clientName = "unknown"
}
result, err := s.db.ExecContext(ctx, query,
job.JobID, job.JobName, clientName, job.JobType, job.JobLevel, job.Status,
job.BytesWritten, job.FilesWritten, job.StartedAt, job.EndedAt, job.DurationSeconds,
)
if err != nil {
s.logger.Error("Database error in upsertJob", "job_id", job.JobID, "error", err)
return err
}
rowsAffected, _ := result.RowsAffected()
s.logger.Debug("Upserted job to database", "job_id", job.JobID, "rows_affected", rowsAffected)
return nil
}
// ListJobs lists backup jobs with optional filters
func (s *Service) ListJobs(ctx context.Context, opts ListJobsOptions) ([]Job, int, error) {
// Try to sync jobs from Bacula first (non-blocking - if it fails, continue with database)
// Don't return error if sync fails, just log it and continue
// This allows the API to work even if bconsole is not available
s.logger.Info("ListJobs called, syncing from Bacula first")
syncErr := s.SyncJobsFromBacula(ctx)
if syncErr != nil {
s.logger.Warn("Failed to sync jobs from Bacula, using database only", "error", syncErr)
// Continue anyway - we'll use whatever is in the database
} else {
s.logger.Info("Successfully synced jobs from Bacula")
}
// Build WHERE clause
whereClauses := []string{"1=1"}
args := []interface{}{}
argIndex := 1
if opts.Status != "" {
whereClauses = append(whereClauses, fmt.Sprintf("status = $%d", argIndex))
args = append(args, opts.Status)
argIndex++
}
if opts.JobType != "" {
whereClauses = append(whereClauses, fmt.Sprintf("job_type = $%d", argIndex))
args = append(args, opts.JobType)
argIndex++
}
if opts.ClientName != "" {
whereClauses = append(whereClauses, fmt.Sprintf("client_name ILIKE $%d", argIndex))
args = append(args, "%"+opts.ClientName+"%")
argIndex++
}
if opts.JobName != "" {
whereClauses = append(whereClauses, fmt.Sprintf("job_name ILIKE $%d", argIndex))
args = append(args, "%"+opts.JobName+"%")
argIndex++
}
whereClause := ""
if len(whereClauses) > 0 {
whereClause = "WHERE " + whereClauses[0]
for i := 1; i < len(whereClauses); i++ {
whereClause += " AND " + whereClauses[i]
}
}
// Get total count
countQuery := fmt.Sprintf("SELECT COUNT(*) FROM backup_jobs %s", whereClause)
var totalCount int
err := s.db.QueryRowContext(ctx, countQuery, args...).Scan(&totalCount)
if err != nil {
return nil, 0, fmt.Errorf("failed to count jobs: %w", err)
}
// Set default limit
limit := opts.Limit
if limit <= 0 {
limit = 50
}
if limit > 100 {
limit = 100
}
// Build query with pagination
query := fmt.Sprintf(`
SELECT id, job_id, job_name, client_name, job_type, job_level, status,
bytes_written, files_written, duration_seconds,
started_at, ended_at, error_message,
storage_name, pool_name, volume_name,
created_at, updated_at
FROM backup_jobs
%s
ORDER BY started_at DESC NULLS LAST, created_at DESC
LIMIT $%d OFFSET $%d
`, whereClause, argIndex, argIndex+1)
args = append(args, limit, opts.Offset)
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, 0, fmt.Errorf("failed to query jobs: %w", err)
}
defer rows.Close()
var jobs []Job
for rows.Next() {
var job Job
var durationSeconds sql.NullInt64
var startedAt, endedAt sql.NullTime
var errorMessage, storageName, poolName, volumeName sql.NullString
err := rows.Scan(
&job.ID, &job.JobID, &job.JobName, &job.ClientName,
&job.JobType, &job.JobLevel, &job.Status,
&job.BytesWritten, &job.FilesWritten, &durationSeconds,
&startedAt, &endedAt, &errorMessage,
&storageName, &poolName, &volumeName,
&job.CreatedAt, &job.UpdatedAt,
)
if err != nil {
s.logger.Error("Failed to scan job", "error", err)
continue
}
if durationSeconds.Valid {
dur := int(durationSeconds.Int64)
job.DurationSeconds = &dur
}
if startedAt.Valid {
job.StartedAt = &startedAt.Time
}
if endedAt.Valid {
job.EndedAt = &endedAt.Time
}
if errorMessage.Valid {
job.ErrorMessage = &errorMessage.String
}
if storageName.Valid {
job.StorageName = &storageName.String
}
if poolName.Valid {
job.PoolName = &poolName.String
}
if volumeName.Valid {
job.VolumeName = &volumeName.String
}
jobs = append(jobs, job)
}
return jobs, totalCount, rows.Err()
}
// GetJob retrieves a job by ID
func (s *Service) GetJob(ctx context.Context, id string) (*Job, error) {
query := `
SELECT id, job_id, job_name, client_name, job_type, job_level, status,
bytes_written, files_written, duration_seconds,
started_at, ended_at, error_message,
storage_name, pool_name, volume_name,
created_at, updated_at
FROM backup_jobs
WHERE id = $1
`
var job Job
var durationSeconds sql.NullInt64
var startedAt, endedAt sql.NullTime
var errorMessage, storageName, poolName, volumeName sql.NullString
err := s.db.QueryRowContext(ctx, query, id).Scan(
&job.ID, &job.JobID, &job.JobName, &job.ClientName,
&job.JobType, &job.JobLevel, &job.Status,
&job.BytesWritten, &job.FilesWritten, &durationSeconds,
&startedAt, &endedAt, &errorMessage,
&storageName, &poolName, &volumeName,
&job.CreatedAt, &job.UpdatedAt,
)
if err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("job not found")
}
return nil, fmt.Errorf("failed to get job: %w", err)
}
if durationSeconds.Valid {
dur := int(durationSeconds.Int64)
job.DurationSeconds = &dur
}
if startedAt.Valid {
job.StartedAt = &startedAt.Time
}
if endedAt.Valid {
job.EndedAt = &endedAt.Time
}
if errorMessage.Valid {
job.ErrorMessage = &errorMessage.String
}
if storageName.Valid {
job.StorageName = &storageName.String
}
if poolName.Valid {
job.PoolName = &poolName.String
}
if volumeName.Valid {
job.VolumeName = &volumeName.String
}
return &job, nil
}
// CreateJobRequest represents a request to create a new backup job
type CreateJobRequest struct {
JobName string `json:"job_name" binding:"required"`
ClientName string `json:"client_name" binding:"required"`
JobType string `json:"job_type" binding:"required"` // 'Backup', 'Restore', 'Verify', 'Copy', 'Migrate'
JobLevel string `json:"job_level" binding:"required"` // 'Full', 'Incremental', 'Differential', 'Since'
StorageName *string `json:"storage_name,omitempty"`
PoolName *string `json:"pool_name,omitempty"`
}
// CreateJob creates a new backup job
func (s *Service) CreateJob(ctx context.Context, req CreateJobRequest) (*Job, error) {
// Generate a unique job ID (in real implementation, this would come from Bareos)
// For now, we'll use a simple incrementing approach or timestamp-based ID
var jobID int
err := s.db.QueryRowContext(ctx, `
SELECT COALESCE(MAX(job_id), 0) + 1 FROM backup_jobs
`).Scan(&jobID)
if err != nil {
return nil, fmt.Errorf("failed to generate job ID: %w", err)
}
// Insert the job into database
query := `
INSERT INTO backup_jobs (
job_id, job_name, client_name, job_type, job_level,
status, bytes_written, files_written,
storage_name, pool_name, started_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW())
RETURNING id, job_id, job_name, client_name, job_type, job_level, status,
bytes_written, files_written, duration_seconds,
started_at, ended_at, error_message,
storage_name, pool_name, volume_name,
created_at, updated_at
`
var job Job
var durationSeconds sql.NullInt64
var startedAt, endedAt sql.NullTime
var errorMessage, storageName, poolName, volumeName sql.NullString
err = s.db.QueryRowContext(ctx, query,
jobID, req.JobName, req.ClientName, req.JobType, req.JobLevel,
"Waiting", 0, 0,
req.StorageName, req.PoolName,
).Scan(
&job.ID, &job.JobID, &job.JobName, &job.ClientName,
&job.JobType, &job.JobLevel, &job.Status,
&job.BytesWritten, &job.FilesWritten, &durationSeconds,
&startedAt, &endedAt, &errorMessage,
&storageName, &poolName, &volumeName,
&job.CreatedAt, &job.UpdatedAt,
)
if err != nil {
return nil, fmt.Errorf("failed to create job: %w", err)
}
if durationSeconds.Valid {
dur := int(durationSeconds.Int64)
job.DurationSeconds = &dur
}
if startedAt.Valid {
job.StartedAt = &startedAt.Time
}
if endedAt.Valid {
job.EndedAt = &endedAt.Time
}
if errorMessage.Valid {
job.ErrorMessage = &errorMessage.String
}
if storageName.Valid {
job.StorageName = &storageName.String
}
if poolName.Valid {
job.PoolName = &poolName.String
}
if volumeName.Valid {
job.VolumeName = &volumeName.String
}
s.logger.Info("Backup job created",
"job_id", job.JobID,
"job_name", job.JobName,
"client_name", job.ClientName,
"job_type", job.JobType,
)
return &job, nil
}