working on the backup management parts
This commit is contained in:
962
backend/internal/backup/service.go
Normal file
962
backend/internal/backup/service.go
Normal file
@@ -0,0 +1,962 @@
|
||||
package backup
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"os/exec"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/atlasos/calypso/internal/common/database"
|
||||
"github.com/atlasos/calypso/internal/common/logger"
|
||||
)
|
||||
|
||||
// Service handles backup job operations
|
||||
type Service struct {
|
||||
db *database.DB
|
||||
baculaDB *database.DB // Optional: separate connection to Bacula database
|
||||
logger *logger.Logger
|
||||
baculaDBName string // Bacula database name (bacula, bareos, etc.)
|
||||
dbPassword string // Database password for dblink (optional, will try without if empty)
|
||||
}
|
||||
|
||||
// NewService creates a new backup service
|
||||
func NewService(db *database.DB, log *logger.Logger) *Service {
|
||||
return &Service{
|
||||
db: db,
|
||||
logger: log,
|
||||
baculaDBName: "bacula", // Default Bacula database name
|
||||
}
|
||||
}
|
||||
|
||||
// SetDatabasePassword sets the database password for dblink connections
|
||||
func (s *Service) SetDatabasePassword(password string) {
|
||||
s.dbPassword = password
|
||||
s.logger.Debug("Database password set for dblink", "has_password", password != "", "password_length", len(password))
|
||||
}
|
||||
|
||||
// Job represents a backup job
|
||||
type Job struct {
|
||||
ID string `json:"id"`
|
||||
JobID int `json:"job_id"`
|
||||
JobName string `json:"job_name"`
|
||||
ClientName string `json:"client_name"`
|
||||
JobType string `json:"job_type"`
|
||||
JobLevel string `json:"job_level"`
|
||||
Status string `json:"status"`
|
||||
BytesWritten int64 `json:"bytes_written"`
|
||||
FilesWritten int `json:"files_written"`
|
||||
DurationSeconds *int `json:"duration_seconds,omitempty"`
|
||||
StartedAt *time.Time `json:"started_at,omitempty"`
|
||||
EndedAt *time.Time `json:"ended_at,omitempty"`
|
||||
ErrorMessage *string `json:"error_message,omitempty"`
|
||||
StorageName *string `json:"storage_name,omitempty"`
|
||||
PoolName *string `json:"pool_name,omitempty"`
|
||||
VolumeName *string `json:"volume_name,omitempty"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
UpdatedAt time.Time `json:"updated_at"`
|
||||
}
|
||||
|
||||
// ListJobsOptions represents filtering and pagination options
|
||||
type ListJobsOptions struct {
|
||||
Status string // Filter by status: "Running", "Completed", "Failed", etc.
|
||||
JobType string // Filter by job type: "Backup", "Restore", etc.
|
||||
ClientName string // Filter by client name
|
||||
JobName string // Filter by job name
|
||||
Limit int // Number of results to return
|
||||
Offset int // Offset for pagination
|
||||
}
|
||||
|
||||
// SyncJobsFromBacula syncs jobs from Bacula/Bareos to the database
|
||||
// Tries to query Bacula database directly first, falls back to bconsole if database access fails
|
||||
func (s *Service) SyncJobsFromBacula(ctx context.Context) error {
|
||||
// Try to query Bacula database directly (if user has access)
|
||||
jobs, err := s.queryBaculaDatabase(ctx)
|
||||
if err != nil {
|
||||
s.logger.Debug("Failed to query Bacula database directly, trying bconsole", "error", err)
|
||||
// Fallback to bconsole
|
||||
return s.syncFromBconsole(ctx)
|
||||
}
|
||||
|
||||
if len(jobs) == 0 {
|
||||
s.logger.Debug("No jobs found in Bacula database")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Upsert jobs to Calypso database
|
||||
successCount := 0
|
||||
for _, job := range jobs {
|
||||
err := s.upsertJob(ctx, job)
|
||||
if err != nil {
|
||||
s.logger.Error("Failed to upsert job", "job_id", job.JobID, "error", err)
|
||||
continue
|
||||
}
|
||||
successCount++
|
||||
}
|
||||
|
||||
s.logger.Info("Synced jobs from Bacula database", "total", len(jobs), "success", successCount)
|
||||
return nil
|
||||
}
|
||||
|
||||
// getBaculaConnection gets or creates a connection to Bacula database
|
||||
// Tries to create connection using same host/port/user but different database name
|
||||
func (s *Service) getBaculaConnection(ctx context.Context) (*database.DB, error) {
|
||||
if s.baculaDB != nil {
|
||||
// Test if connection is still alive
|
||||
if err := s.baculaDB.Ping(); err == nil {
|
||||
return s.baculaDB, nil
|
||||
}
|
||||
// Connection is dead, close it
|
||||
s.baculaDB.Close()
|
||||
s.baculaDB = nil
|
||||
}
|
||||
|
||||
// Try to get connection info from current database connection
|
||||
// We'll query the current database to get connection parameters
|
||||
var currentDB, currentUser, currentHost string
|
||||
var currentPort int
|
||||
|
||||
// Get current database connection info
|
||||
query := `SELECT current_database(), current_user, inet_server_addr(), inet_server_port()`
|
||||
err := s.db.QueryRowContext(ctx, query).Scan(¤tDB, ¤tUser, ¤tHost, ¤tPort)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get current database info: %w", err)
|
||||
}
|
||||
|
||||
// If host is null, it's a local connection (Unix socket)
|
||||
if currentHost == "" {
|
||||
currentHost = "localhost"
|
||||
}
|
||||
if currentPort == 0 {
|
||||
currentPort = 5432 // Default PostgreSQL port
|
||||
}
|
||||
|
||||
// Try common Bacula database names
|
||||
databases := []string{"bacula", "bareos", s.baculaDBName}
|
||||
|
||||
for _, dbName := range databases {
|
||||
if dbName == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
// Try to create connection to Bacula database
|
||||
// We'll use the same connection parameters but different database name
|
||||
// Note: This assumes same host/port/user/password
|
||||
// For production, you'd want to configure this separately
|
||||
|
||||
// We can't create a new connection without password
|
||||
// So we'll try to query using dblink or assume same connection can access Bacula DB
|
||||
// For now, return nil and let queryBaculaDatabase handle it via dblink or direct query
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("Bacula database connection not configured - will use dblink or direct query")
|
||||
}
|
||||
|
||||
// queryBaculaDatabase queries Bacula database directly
|
||||
// Following Bacularis approach: query Job table directly from Bacula database
|
||||
// Since Bacula is in separate database, prioritize dblink over direct query
|
||||
func (s *Service) queryBaculaDatabase(ctx context.Context) ([]Job, error) {
|
||||
// Method 1: Try using dblink extension for cross-database query (preferred for separate databases)
|
||||
checkDblink := `SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'dblink')`
|
||||
var dblinkExists bool
|
||||
err := s.db.QueryRowContext(ctx, checkDblink).Scan(&dblinkExists)
|
||||
if err == nil && dblinkExists {
|
||||
jobs, err := s.queryBaculaViaDblink(ctx)
|
||||
if err == nil && len(jobs) > 0 {
|
||||
return jobs, nil
|
||||
}
|
||||
s.logger.Debug("dblink query failed, trying direct query", "error", err)
|
||||
} else {
|
||||
s.logger.Debug("dblink extension not found, trying direct query")
|
||||
}
|
||||
|
||||
// Method 2: Try querying Job table directly (if Bacula is in same database)
|
||||
jobs, err := s.queryBaculaDirect(ctx)
|
||||
if err == nil && len(jobs) > 0 {
|
||||
return jobs, nil
|
||||
}
|
||||
s.logger.Debug("Direct query also failed", "error", err)
|
||||
|
||||
return nil, fmt.Errorf("failed to query Bacula database: dblink and direct query both failed")
|
||||
}
|
||||
|
||||
// queryBaculaDirect queries Job table directly (Bacularis approach)
|
||||
// Assumes Bacula tables are in same database or accessible via search_path
|
||||
func (s *Service) queryBaculaDirect(ctx context.Context) ([]Job, error) {
|
||||
// Bacularis-style query: direct query to Job table with JOIN to Client
|
||||
// This is the standard way Bacularis queries Bacula database
|
||||
query := `
|
||||
SELECT
|
||||
j.JobId as job_id,
|
||||
j.Name as job_name,
|
||||
COALESCE(c.Name, 'unknown') as client_name,
|
||||
CASE
|
||||
WHEN j.Type = 'B' THEN 'Backup'
|
||||
WHEN j.Type = 'R' THEN 'Restore'
|
||||
WHEN j.Type = 'V' THEN 'Verify'
|
||||
WHEN j.Type = 'C' THEN 'Copy'
|
||||
WHEN j.Type = 'M' THEN 'Migrate'
|
||||
ELSE 'Backup'
|
||||
END as job_type,
|
||||
CASE
|
||||
WHEN j.Level = 'F' THEN 'Full'
|
||||
WHEN j.Level = 'I' THEN 'Incremental'
|
||||
WHEN j.Level = 'D' THEN 'Differential'
|
||||
WHEN j.Level = 'S' THEN 'Since'
|
||||
ELSE 'Full'
|
||||
END as job_level,
|
||||
CASE
|
||||
WHEN j.JobStatus = 'T' THEN 'Running'
|
||||
WHEN j.JobStatus = 'C' THEN 'Completed'
|
||||
WHEN j.JobStatus = 'f' OR j.JobStatus = 'F' THEN 'Failed'
|
||||
WHEN j.JobStatus = 'A' THEN 'Canceled'
|
||||
WHEN j.JobStatus = 'W' THEN 'Waiting'
|
||||
ELSE 'Waiting'
|
||||
END as status,
|
||||
COALESCE(j.JobBytes, 0) as bytes_written,
|
||||
COALESCE(j.JobFiles, 0) as files_written,
|
||||
j.StartTime as started_at,
|
||||
j.EndTime as ended_at
|
||||
FROM Job j
|
||||
LEFT JOIN Client c ON j.ClientId = c.ClientId
|
||||
ORDER BY j.StartTime DESC
|
||||
LIMIT 1000
|
||||
`
|
||||
|
||||
rows, err := s.db.QueryContext(ctx, query)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Job table not found or not accessible: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var jobs []Job
|
||||
for rows.Next() {
|
||||
var job Job
|
||||
var startedAt, endedAt sql.NullTime
|
||||
|
||||
err := rows.Scan(
|
||||
&job.JobID, &job.JobName, &job.ClientName,
|
||||
&job.JobType, &job.JobLevel, &job.Status,
|
||||
&job.BytesWritten, &job.FilesWritten, &startedAt, &endedAt,
|
||||
)
|
||||
if err != nil {
|
||||
s.logger.Error("Failed to scan Bacula job", "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if startedAt.Valid {
|
||||
job.StartedAt = &startedAt.Time
|
||||
}
|
||||
if endedAt.Valid {
|
||||
job.EndedAt = &endedAt.Time
|
||||
// Calculate duration if both start and end times are available
|
||||
if job.StartedAt != nil {
|
||||
duration := int(endedAt.Time.Sub(*job.StartedAt).Seconds())
|
||||
job.DurationSeconds = &duration
|
||||
}
|
||||
}
|
||||
|
||||
jobs = append(jobs, job)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(jobs) > 0 {
|
||||
s.logger.Info("Successfully queried Bacula database (direct)", "count", len(jobs))
|
||||
return jobs, nil
|
||||
}
|
||||
|
||||
return jobs, nil // Return empty list, not an error
|
||||
}
|
||||
|
||||
// queryBaculaViaDblink queries Bacula database using dblink extension
|
||||
// Assumes dblink is installed and user has access to bacula database
|
||||
func (s *Service) queryBaculaViaDblink(ctx context.Context) ([]Job, error) {
|
||||
// Get current user and connection info for dblink
|
||||
var currentUser, currentHost string
|
||||
var currentPort int
|
||||
|
||||
// Get current connection info
|
||||
err := s.db.QueryRowContext(ctx,
|
||||
`SELECT current_user, COALESCE(inet_server_addr()::text, ''), COALESCE(inet_server_port(), 5432)`).Scan(
|
||||
¤tUser, ¤tHost, ¤tPort)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get connection info: %w", err)
|
||||
}
|
||||
|
||||
// Log connection info (without password)
|
||||
s.logger.Debug("Preparing dblink connection", "user", currentUser, "host", currentHost, "port", currentPort, "has_password", s.dbPassword != "")
|
||||
|
||||
// Try common Bacula database names
|
||||
databases := []string{"bacula", "bareos", s.baculaDBName}
|
||||
|
||||
for _, dbName := range databases {
|
||||
if dbName == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
// Build dblink connection string
|
||||
// Format: 'dbname=database_name user=username password=password'
|
||||
// dblink requires password even for local connections
|
||||
connStr := fmt.Sprintf("dbname=%s user=%s", dbName, currentUser)
|
||||
|
||||
// Add password if available (required for dblink)
|
||||
if s.dbPassword != "" {
|
||||
// Escape special characters in password for connection string
|
||||
// Replace single quotes with \' and backslashes with \\
|
||||
escapedPassword := strings.ReplaceAll(s.dbPassword, "\\", "\\\\")
|
||||
escapedPassword = strings.ReplaceAll(escapedPassword, "'", "\\'")
|
||||
connStr += fmt.Sprintf(" password='%s'", escapedPassword)
|
||||
}
|
||||
|
||||
// Add host/port for remote connections
|
||||
if currentHost != "" {
|
||||
connStr += fmt.Sprintf(" host=%s port=%d", currentHost, currentPort)
|
||||
}
|
||||
|
||||
// Query using dblink - get all data in one query with JOIN
|
||||
// Escape single quotes in SQL string for dblink (double them)
|
||||
innerQuery := `SELECT
|
||||
j.JobId,
|
||||
j.Name,
|
||||
j.Type,
|
||||
j.Level,
|
||||
j.JobStatus,
|
||||
j.JobBytes,
|
||||
j.JobFiles,
|
||||
j.StartTime,
|
||||
j.EndTime,
|
||||
COALESCE(c.Name, 'unknown') as ClientName
|
||||
FROM Job j
|
||||
LEFT JOIN Client c ON j.ClientId = c.ClientId
|
||||
ORDER BY j.StartTime DESC
|
||||
LIMIT 1000`
|
||||
|
||||
// Escape single quotes in inner query for dblink (double them)
|
||||
escapedQuery := strings.ReplaceAll(innerQuery, "'", "''")
|
||||
|
||||
query := fmt.Sprintf(`
|
||||
SELECT
|
||||
JobId as job_id,
|
||||
Name as job_name,
|
||||
ClientName as client_name,
|
||||
CASE
|
||||
WHEN Type = 'B' THEN 'Backup'
|
||||
WHEN Type = 'R' THEN 'Restore'
|
||||
WHEN Type = 'V' THEN 'Verify'
|
||||
WHEN Type = 'C' THEN 'Copy'
|
||||
WHEN Type = 'M' THEN 'Migrate'
|
||||
ELSE 'Backup'
|
||||
END as job_type,
|
||||
CASE
|
||||
WHEN Level = 'F' THEN 'Full'
|
||||
WHEN Level = 'I' THEN 'Incremental'
|
||||
WHEN Level = 'D' THEN 'Differential'
|
||||
WHEN Level = 'S' THEN 'Since'
|
||||
ELSE 'Full'
|
||||
END as job_level,
|
||||
CASE
|
||||
WHEN JobStatus = 'T' THEN 'Running'
|
||||
WHEN JobStatus = 'C' THEN 'Completed'
|
||||
WHEN JobStatus = 'f' OR JobStatus = 'F' THEN 'Failed'
|
||||
WHEN JobStatus = 'A' THEN 'Canceled'
|
||||
WHEN JobStatus = 'W' THEN 'Waiting'
|
||||
ELSE 'Waiting'
|
||||
END as status,
|
||||
COALESCE(JobBytes, 0) as bytes_written,
|
||||
COALESCE(JobFiles, 0) as files_written,
|
||||
StartTime as started_at,
|
||||
EndTime as ended_at
|
||||
FROM dblink('%s', '%s') AS t(JobId int, Name text, Type char, Level char, JobStatus char, JobBytes bigint, JobFiles int, StartTime timestamp, EndTime timestamp, ClientName text)
|
||||
`, connStr, escapedQuery)
|
||||
|
||||
rows, err := s.db.QueryContext(ctx, query)
|
||||
if err != nil {
|
||||
s.logger.Error("Failed to query Bacula via dblink", "database", dbName, "connection", connStr, "error", err)
|
||||
continue
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var jobs []Job
|
||||
for rows.Next() {
|
||||
var job Job
|
||||
var startedAt, endedAt sql.NullTime
|
||||
|
||||
err := rows.Scan(
|
||||
&job.JobID, &job.JobName, &job.ClientName,
|
||||
&job.JobType, &job.JobLevel, &job.Status,
|
||||
&job.BytesWritten, &job.FilesWritten, &startedAt, &endedAt,
|
||||
)
|
||||
if err != nil {
|
||||
s.logger.Error("Failed to scan Bacula job from dblink", "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if startedAt.Valid {
|
||||
job.StartedAt = &startedAt.Time
|
||||
}
|
||||
if endedAt.Valid {
|
||||
job.EndedAt = &endedAt.Time
|
||||
// Calculate duration
|
||||
if job.StartedAt != nil {
|
||||
duration := int(endedAt.Time.Sub(*job.StartedAt).Seconds())
|
||||
job.DurationSeconds = &duration
|
||||
}
|
||||
}
|
||||
|
||||
jobs = append(jobs, job)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
s.logger.Debug("Error iterating dblink results", "database", dbName, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if len(jobs) > 0 {
|
||||
s.logger.Info("Successfully queried Bacula database via dblink", "database", dbName, "count", len(jobs))
|
||||
return jobs, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("failed to query Bacula database via dblink from any database")
|
||||
}
|
||||
|
||||
// syncFromBconsole syncs jobs using bconsole command (fallback method)
|
||||
func (s *Service) syncFromBconsole(ctx context.Context) error {
|
||||
// Execute bconsole command to list jobs
|
||||
cmd := exec.CommandContext(ctx, "sh", "-c", "echo -e 'list jobs\nquit' | bconsole")
|
||||
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
s.logger.Debug("Failed to execute bconsole", "error", err, "output", string(output))
|
||||
return nil // Don't fail, just return empty
|
||||
}
|
||||
|
||||
if len(output) == 0 {
|
||||
s.logger.Debug("bconsole returned empty output")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Parse bconsole output
|
||||
jobs := s.parseBconsoleOutput(ctx, string(output))
|
||||
|
||||
if len(jobs) == 0 {
|
||||
s.logger.Debug("No jobs found in bconsole output")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Upsert jobs to database
|
||||
successCount := 0
|
||||
for _, job := range jobs {
|
||||
err := s.upsertJob(ctx, job)
|
||||
if err != nil {
|
||||
s.logger.Error("Failed to upsert job", "job_id", job.JobID, "error", err)
|
||||
continue
|
||||
}
|
||||
successCount++
|
||||
}
|
||||
|
||||
s.logger.Info("Synced jobs from bconsole", "total", len(jobs), "success", successCount)
|
||||
return nil
|
||||
}
|
||||
|
||||
// parseBconsoleOutput parses bconsole "list jobs" output
|
||||
func (s *Service) parseBconsoleOutput(ctx context.Context, output string) []Job {
|
||||
var jobs []Job
|
||||
lines := strings.Split(output, "\n")
|
||||
|
||||
// Skip header lines until we find the data rows
|
||||
inDataSection := false
|
||||
for _, line := range lines {
|
||||
line = strings.TrimSpace(line)
|
||||
|
||||
// Skip empty lines and separators
|
||||
if line == "" || strings.HasPrefix(line, "+") {
|
||||
continue
|
||||
}
|
||||
|
||||
// Start data section when we see header
|
||||
if strings.HasPrefix(line, "| jobid") {
|
||||
inDataSection = true
|
||||
continue
|
||||
}
|
||||
|
||||
// Stop at footer separator
|
||||
if strings.HasPrefix(line, "*") {
|
||||
break
|
||||
}
|
||||
|
||||
if !inDataSection {
|
||||
continue
|
||||
}
|
||||
|
||||
// Parse data row: | jobid | name | starttime | type | level | jobfiles | jobbytes | jobstatus |
|
||||
if strings.HasPrefix(line, "|") {
|
||||
parts := strings.Split(line, "|")
|
||||
if len(parts) < 9 {
|
||||
continue
|
||||
}
|
||||
|
||||
// Extract fields (skip first empty part)
|
||||
jobIDStr := strings.TrimSpace(parts[1])
|
||||
jobName := strings.TrimSpace(parts[2])
|
||||
startTimeStr := strings.TrimSpace(parts[3])
|
||||
jobTypeChar := strings.TrimSpace(parts[4])
|
||||
jobLevelChar := strings.TrimSpace(parts[5])
|
||||
jobFilesStr := strings.TrimSpace(parts[6])
|
||||
jobBytesStr := strings.TrimSpace(parts[7])
|
||||
jobStatusChar := strings.TrimSpace(parts[8])
|
||||
|
||||
// Parse job ID
|
||||
jobID, err := strconv.Atoi(jobIDStr)
|
||||
if err != nil {
|
||||
s.logger.Warn("Failed to parse job ID", "value", jobIDStr, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Parse start time
|
||||
var startedAt *time.Time
|
||||
if startTimeStr != "" && startTimeStr != "-" {
|
||||
// Format: 2025-12-27 23:05:02
|
||||
parsedTime, err := time.Parse("2006-01-02 15:04:05", startTimeStr)
|
||||
if err == nil {
|
||||
startedAt = &parsedTime
|
||||
}
|
||||
}
|
||||
|
||||
// Map job type
|
||||
jobType := "Backup"
|
||||
switch jobTypeChar {
|
||||
case "B":
|
||||
jobType = "Backup"
|
||||
case "R":
|
||||
jobType = "Restore"
|
||||
case "V":
|
||||
jobType = "Verify"
|
||||
case "C":
|
||||
jobType = "Copy"
|
||||
case "M":
|
||||
jobType = "Migrate"
|
||||
}
|
||||
|
||||
// Map job level
|
||||
jobLevel := "Full"
|
||||
switch jobLevelChar {
|
||||
case "F":
|
||||
jobLevel = "Full"
|
||||
case "I":
|
||||
jobLevel = "Incremental"
|
||||
case "D":
|
||||
jobLevel = "Differential"
|
||||
case "S":
|
||||
jobLevel = "Since"
|
||||
}
|
||||
|
||||
// Parse files and bytes
|
||||
filesWritten := 0
|
||||
if jobFilesStr != "" && jobFilesStr != "-" {
|
||||
if f, err := strconv.Atoi(jobFilesStr); err == nil {
|
||||
filesWritten = f
|
||||
}
|
||||
}
|
||||
|
||||
bytesWritten := int64(0)
|
||||
if jobBytesStr != "" && jobBytesStr != "-" {
|
||||
if b, err := strconv.ParseInt(jobBytesStr, 10, 64); err == nil {
|
||||
bytesWritten = b
|
||||
}
|
||||
}
|
||||
|
||||
// Map job status
|
||||
status := "Waiting"
|
||||
switch strings.ToLower(jobStatusChar) {
|
||||
case "t", "T":
|
||||
status = "Running"
|
||||
case "c", "C":
|
||||
status = "Completed"
|
||||
case "f", "F":
|
||||
status = "Failed"
|
||||
case "A":
|
||||
status = "Canceled"
|
||||
case "W":
|
||||
status = "Waiting"
|
||||
}
|
||||
|
||||
// Try to extract client name from job name (common pattern: JobName-ClientName)
|
||||
clientName := "unknown"
|
||||
// For now, use job name as client name if it looks like a client name
|
||||
// In real implementation, we'd query job details from Bacula
|
||||
if jobName != "" {
|
||||
// Try to get client name from job details
|
||||
clientNameFromJob := s.getClientNameFromJob(ctx, jobID)
|
||||
if clientNameFromJob != "" {
|
||||
clientName = clientNameFromJob
|
||||
} else {
|
||||
// Fallback: use job name as client name
|
||||
clientName = jobName
|
||||
}
|
||||
}
|
||||
|
||||
job := Job{
|
||||
JobID: jobID,
|
||||
JobName: jobName,
|
||||
ClientName: clientName,
|
||||
JobType: jobType,
|
||||
JobLevel: jobLevel,
|
||||
Status: status,
|
||||
BytesWritten: bytesWritten,
|
||||
FilesWritten: filesWritten,
|
||||
StartedAt: startedAt,
|
||||
}
|
||||
|
||||
jobs = append(jobs, job)
|
||||
}
|
||||
}
|
||||
|
||||
return jobs
|
||||
}
|
||||
|
||||
// getClientNameFromJob gets client name from job details using bconsole
|
||||
func (s *Service) getClientNameFromJob(ctx context.Context, jobID int) string {
|
||||
// Execute bconsole to get job details
|
||||
cmd := exec.CommandContext(ctx, "sh", "-c", fmt.Sprintf("echo -e 'list job jobid=%d\nquit' | bconsole", jobID))
|
||||
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
s.logger.Debug("Failed to get job details", "job_id", jobID, "error", err)
|
||||
return ""
|
||||
}
|
||||
|
||||
// Parse output to find Client line
|
||||
lines := strings.Split(string(output), "\n")
|
||||
for _, line := range lines {
|
||||
line = strings.TrimSpace(line)
|
||||
if strings.HasPrefix(line, "Client:") {
|
||||
parts := strings.Split(line, ":")
|
||||
if len(parts) >= 2 {
|
||||
return strings.TrimSpace(parts[1])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
|
||||
// upsertJob inserts or updates a job in the database
|
||||
func (s *Service) upsertJob(ctx context.Context, job Job) error {
|
||||
query := `
|
||||
INSERT INTO backup_jobs (
|
||||
job_id, job_name, client_name, job_type, job_level, status,
|
||||
bytes_written, files_written, started_at, updated_at
|
||||
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, NOW())
|
||||
ON CONFLICT (job_id) DO UPDATE SET
|
||||
job_name = EXCLUDED.job_name,
|
||||
job_type = EXCLUDED.job_type,
|
||||
job_level = EXCLUDED.job_level,
|
||||
status = EXCLUDED.status,
|
||||
bytes_written = EXCLUDED.bytes_written,
|
||||
files_written = EXCLUDED.files_written,
|
||||
started_at = EXCLUDED.started_at,
|
||||
updated_at = NOW()
|
||||
`
|
||||
|
||||
// Use job name as client name if client_name is empty (we'll improve this later)
|
||||
clientName := job.ClientName
|
||||
if clientName == "" {
|
||||
clientName = "unknown"
|
||||
}
|
||||
|
||||
_, err := s.db.ExecContext(ctx, query,
|
||||
job.JobID, job.JobName, clientName, job.JobType, job.JobLevel, job.Status,
|
||||
job.BytesWritten, job.FilesWritten, job.StartedAt,
|
||||
)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// ListJobs lists backup jobs with optional filters
|
||||
func (s *Service) ListJobs(ctx context.Context, opts ListJobsOptions) ([]Job, int, error) {
|
||||
// Try to sync jobs from Bacula first (non-blocking - if it fails, continue with database)
|
||||
// Don't return error if sync fails, just log it and continue
|
||||
// This allows the API to work even if bconsole is not available
|
||||
syncErr := s.SyncJobsFromBacula(ctx)
|
||||
if syncErr != nil {
|
||||
s.logger.Debug("Failed to sync jobs from Bacula, using database only", "error", syncErr)
|
||||
// Continue anyway - we'll use whatever is in the database
|
||||
}
|
||||
|
||||
// Build WHERE clause
|
||||
whereClauses := []string{"1=1"}
|
||||
args := []interface{}{}
|
||||
argIndex := 1
|
||||
|
||||
if opts.Status != "" {
|
||||
whereClauses = append(whereClauses, fmt.Sprintf("status = $%d", argIndex))
|
||||
args = append(args, opts.Status)
|
||||
argIndex++
|
||||
}
|
||||
|
||||
if opts.JobType != "" {
|
||||
whereClauses = append(whereClauses, fmt.Sprintf("job_type = $%d", argIndex))
|
||||
args = append(args, opts.JobType)
|
||||
argIndex++
|
||||
}
|
||||
|
||||
if opts.ClientName != "" {
|
||||
whereClauses = append(whereClauses, fmt.Sprintf("client_name ILIKE $%d", argIndex))
|
||||
args = append(args, "%"+opts.ClientName+"%")
|
||||
argIndex++
|
||||
}
|
||||
|
||||
if opts.JobName != "" {
|
||||
whereClauses = append(whereClauses, fmt.Sprintf("job_name ILIKE $%d", argIndex))
|
||||
args = append(args, "%"+opts.JobName+"%")
|
||||
argIndex++
|
||||
}
|
||||
|
||||
whereClause := ""
|
||||
if len(whereClauses) > 0 {
|
||||
whereClause = "WHERE " + whereClauses[0]
|
||||
for i := 1; i < len(whereClauses); i++ {
|
||||
whereClause += " AND " + whereClauses[i]
|
||||
}
|
||||
}
|
||||
|
||||
// Get total count
|
||||
countQuery := fmt.Sprintf("SELECT COUNT(*) FROM backup_jobs %s", whereClause)
|
||||
var totalCount int
|
||||
err := s.db.QueryRowContext(ctx, countQuery, args...).Scan(&totalCount)
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("failed to count jobs: %w", err)
|
||||
}
|
||||
|
||||
// Set default limit
|
||||
limit := opts.Limit
|
||||
if limit <= 0 {
|
||||
limit = 50
|
||||
}
|
||||
if limit > 100 {
|
||||
limit = 100
|
||||
}
|
||||
|
||||
// Build query with pagination
|
||||
query := fmt.Sprintf(`
|
||||
SELECT id, job_id, job_name, client_name, job_type, job_level, status,
|
||||
bytes_written, files_written, duration_seconds,
|
||||
started_at, ended_at, error_message,
|
||||
storage_name, pool_name, volume_name,
|
||||
created_at, updated_at
|
||||
FROM backup_jobs
|
||||
%s
|
||||
ORDER BY started_at DESC NULLS LAST, created_at DESC
|
||||
LIMIT $%d OFFSET $%d
|
||||
`, whereClause, argIndex, argIndex+1)
|
||||
|
||||
args = append(args, limit, opts.Offset)
|
||||
|
||||
rows, err := s.db.QueryContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("failed to query jobs: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var jobs []Job
|
||||
for rows.Next() {
|
||||
var job Job
|
||||
var durationSeconds sql.NullInt64
|
||||
var startedAt, endedAt sql.NullTime
|
||||
var errorMessage, storageName, poolName, volumeName sql.NullString
|
||||
|
||||
err := rows.Scan(
|
||||
&job.ID, &job.JobID, &job.JobName, &job.ClientName,
|
||||
&job.JobType, &job.JobLevel, &job.Status,
|
||||
&job.BytesWritten, &job.FilesWritten, &durationSeconds,
|
||||
&startedAt, &endedAt, &errorMessage,
|
||||
&storageName, &poolName, &volumeName,
|
||||
&job.CreatedAt, &job.UpdatedAt,
|
||||
)
|
||||
if err != nil {
|
||||
s.logger.Error("Failed to scan job", "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if durationSeconds.Valid {
|
||||
dur := int(durationSeconds.Int64)
|
||||
job.DurationSeconds = &dur
|
||||
}
|
||||
if startedAt.Valid {
|
||||
job.StartedAt = &startedAt.Time
|
||||
}
|
||||
if endedAt.Valid {
|
||||
job.EndedAt = &endedAt.Time
|
||||
}
|
||||
if errorMessage.Valid {
|
||||
job.ErrorMessage = &errorMessage.String
|
||||
}
|
||||
if storageName.Valid {
|
||||
job.StorageName = &storageName.String
|
||||
}
|
||||
if poolName.Valid {
|
||||
job.PoolName = &poolName.String
|
||||
}
|
||||
if volumeName.Valid {
|
||||
job.VolumeName = &volumeName.String
|
||||
}
|
||||
|
||||
jobs = append(jobs, job)
|
||||
}
|
||||
|
||||
return jobs, totalCount, rows.Err()
|
||||
}
|
||||
|
||||
// GetJob retrieves a job by ID
|
||||
func (s *Service) GetJob(ctx context.Context, id string) (*Job, error) {
|
||||
query := `
|
||||
SELECT id, job_id, job_name, client_name, job_type, job_level, status,
|
||||
bytes_written, files_written, duration_seconds,
|
||||
started_at, ended_at, error_message,
|
||||
storage_name, pool_name, volume_name,
|
||||
created_at, updated_at
|
||||
FROM backup_jobs
|
||||
WHERE id = $1
|
||||
`
|
||||
|
||||
var job Job
|
||||
var durationSeconds sql.NullInt64
|
||||
var startedAt, endedAt sql.NullTime
|
||||
var errorMessage, storageName, poolName, volumeName sql.NullString
|
||||
|
||||
err := s.db.QueryRowContext(ctx, query, id).Scan(
|
||||
&job.ID, &job.JobID, &job.JobName, &job.ClientName,
|
||||
&job.JobType, &job.JobLevel, &job.Status,
|
||||
&job.BytesWritten, &job.FilesWritten, &durationSeconds,
|
||||
&startedAt, &endedAt, &errorMessage,
|
||||
&storageName, &poolName, &volumeName,
|
||||
&job.CreatedAt, &job.UpdatedAt,
|
||||
)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, fmt.Errorf("job not found")
|
||||
}
|
||||
return nil, fmt.Errorf("failed to get job: %w", err)
|
||||
}
|
||||
|
||||
if durationSeconds.Valid {
|
||||
dur := int(durationSeconds.Int64)
|
||||
job.DurationSeconds = &dur
|
||||
}
|
||||
if startedAt.Valid {
|
||||
job.StartedAt = &startedAt.Time
|
||||
}
|
||||
if endedAt.Valid {
|
||||
job.EndedAt = &endedAt.Time
|
||||
}
|
||||
if errorMessage.Valid {
|
||||
job.ErrorMessage = &errorMessage.String
|
||||
}
|
||||
if storageName.Valid {
|
||||
job.StorageName = &storageName.String
|
||||
}
|
||||
if poolName.Valid {
|
||||
job.PoolName = &poolName.String
|
||||
}
|
||||
if volumeName.Valid {
|
||||
job.VolumeName = &volumeName.String
|
||||
}
|
||||
|
||||
return &job, nil
|
||||
}
|
||||
|
||||
// CreateJobRequest represents a request to create a new backup job
|
||||
type CreateJobRequest struct {
|
||||
JobName string `json:"job_name" binding:"required"`
|
||||
ClientName string `json:"client_name" binding:"required"`
|
||||
JobType string `json:"job_type" binding:"required"` // 'Backup', 'Restore', 'Verify', 'Copy', 'Migrate'
|
||||
JobLevel string `json:"job_level" binding:"required"` // 'Full', 'Incremental', 'Differential', 'Since'
|
||||
StorageName *string `json:"storage_name,omitempty"`
|
||||
PoolName *string `json:"pool_name,omitempty"`
|
||||
}
|
||||
|
||||
// CreateJob creates a new backup job
|
||||
func (s *Service) CreateJob(ctx context.Context, req CreateJobRequest) (*Job, error) {
|
||||
// Generate a unique job ID (in real implementation, this would come from Bareos)
|
||||
// For now, we'll use a simple incrementing approach or timestamp-based ID
|
||||
var jobID int
|
||||
err := s.db.QueryRowContext(ctx, `
|
||||
SELECT COALESCE(MAX(job_id), 0) + 1 FROM backup_jobs
|
||||
`).Scan(&jobID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to generate job ID: %w", err)
|
||||
}
|
||||
|
||||
// Insert the job into database
|
||||
query := `
|
||||
INSERT INTO backup_jobs (
|
||||
job_id, job_name, client_name, job_type, job_level,
|
||||
status, bytes_written, files_written,
|
||||
storage_name, pool_name, started_at
|
||||
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW())
|
||||
RETURNING id, job_id, job_name, client_name, job_type, job_level, status,
|
||||
bytes_written, files_written, duration_seconds,
|
||||
started_at, ended_at, error_message,
|
||||
storage_name, pool_name, volume_name,
|
||||
created_at, updated_at
|
||||
`
|
||||
|
||||
var job Job
|
||||
var durationSeconds sql.NullInt64
|
||||
var startedAt, endedAt sql.NullTime
|
||||
var errorMessage, storageName, poolName, volumeName sql.NullString
|
||||
|
||||
err = s.db.QueryRowContext(ctx, query,
|
||||
jobID, req.JobName, req.ClientName, req.JobType, req.JobLevel,
|
||||
"Waiting", 0, 0,
|
||||
req.StorageName, req.PoolName,
|
||||
).Scan(
|
||||
&job.ID, &job.JobID, &job.JobName, &job.ClientName,
|
||||
&job.JobType, &job.JobLevel, &job.Status,
|
||||
&job.BytesWritten, &job.FilesWritten, &durationSeconds,
|
||||
&startedAt, &endedAt, &errorMessage,
|
||||
&storageName, &poolName, &volumeName,
|
||||
&job.CreatedAt, &job.UpdatedAt,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create job: %w", err)
|
||||
}
|
||||
|
||||
if durationSeconds.Valid {
|
||||
dur := int(durationSeconds.Int64)
|
||||
job.DurationSeconds = &dur
|
||||
}
|
||||
if startedAt.Valid {
|
||||
job.StartedAt = &startedAt.Time
|
||||
}
|
||||
if endedAt.Valid {
|
||||
job.EndedAt = &endedAt.Time
|
||||
}
|
||||
if errorMessage.Valid {
|
||||
job.ErrorMessage = &errorMessage.String
|
||||
}
|
||||
if storageName.Valid {
|
||||
job.StorageName = &storageName.String
|
||||
}
|
||||
if poolName.Valid {
|
||||
job.PoolName = &poolName.String
|
||||
}
|
||||
if volumeName.Valid {
|
||||
job.VolumeName = &volumeName.String
|
||||
}
|
||||
|
||||
s.logger.Info("Backup job created",
|
||||
"job_id", job.JobID,
|
||||
"job_name", job.JobName,
|
||||
"client_name", job.ClientName,
|
||||
"job_type", job.JobType,
|
||||
)
|
||||
|
||||
return &job, nil
|
||||
}
|
||||
Reference in New Issue
Block a user