diff --git a/backend/bin/calypso-api b/backend/bin/calypso-api index 7e42af4..41bd1bc 100755 Binary files a/backend/bin/calypso-api and b/backend/bin/calypso-api differ diff --git a/backend/internal/backup/service.go b/backend/internal/backup/service.go index 45e1052..7a2a9fd 100644 --- a/backend/internal/backup/service.go +++ b/backend/internal/backup/service.go @@ -9,32 +9,41 @@ import ( "strings" "time" + "github.com/atlasos/calypso/internal/common/config" "github.com/atlasos/calypso/internal/common/database" "github.com/atlasos/calypso/internal/common/logger" ) // Service handles backup job operations type Service struct { - db *database.DB - baculaDB *database.DB // Optional: separate connection to Bacula database - logger *logger.Logger - baculaDBName string // Bacula database name (bacula, bareos, etc.) - dbPassword string // Database password for dblink (optional, will try without if empty) + db *database.DB + baculaDB *database.DB // Direct connection to Bacula database + logger *logger.Logger } // NewService creates a new backup service func NewService(db *database.DB, log *logger.Logger) *Service { return &Service{ - db: db, - logger: log, - baculaDBName: "bacula", // Default Bacula database name + db: db, + logger: log, } } -// SetDatabasePassword sets the database password for dblink connections -func (s *Service) SetDatabasePassword(password string) { - s.dbPassword = password - s.logger.Debug("Database password set for dblink", "has_password", password != "", "password_length", len(password)) +// SetBaculaDatabase sets up direct connection to Bacula database +func (s *Service) SetBaculaDatabase(cfg config.DatabaseConfig, baculaDBName string) error { + // Create new database config for Bacula database + baculaCfg := cfg + baculaCfg.Database = baculaDBName // Override database name + + // Create connection to Bacula database + baculaDB, err := database.NewConnection(baculaCfg) + if err != nil { + return fmt.Errorf("failed to connect to Bacula database: %w", err) + } + + s.baculaDB = baculaDB + s.logger.Info("Connected to Bacula database", "database", baculaDBName, "host", cfg.Host, "port", cfg.Port) + return nil } // Job represents a backup job @@ -72,14 +81,24 @@ type ListJobsOptions struct { // SyncJobsFromBacula syncs jobs from Bacula/Bareos to the database // Tries to query Bacula database directly first, falls back to bconsole if database access fails func (s *Service) SyncJobsFromBacula(ctx context.Context) error { + s.logger.Info("Starting sync from Bacula database", "bacula_db_configured", s.baculaDB != nil) + + // Check if Bacula database connection is configured + if s.baculaDB == nil { + s.logger.Warn("Bacula database connection not configured, trying bconsole fallback") + return s.syncFromBconsole(ctx) + } + // Try to query Bacula database directly (if user has access) jobs, err := s.queryBaculaDatabase(ctx) if err != nil { - s.logger.Debug("Failed to query Bacula database directly, trying bconsole", "error", err) + s.logger.Warn("Failed to query Bacula database directly, trying bconsole", "error", err) // Fallback to bconsole return s.syncFromBconsole(ctx) } + s.logger.Info("Queried Bacula database", "jobs_found", len(jobs)) + if len(jobs) == 0 { s.logger.Debug("No jobs found in Bacula database") return nil @@ -87,99 +106,36 @@ func (s *Service) SyncJobsFromBacula(ctx context.Context) error { // Upsert jobs to Calypso database successCount := 0 + errorCount := 0 for _, job := range jobs { err := s.upsertJob(ctx, job) if err != nil { - s.logger.Error("Failed to upsert job", "job_id", job.JobID, "error", err) + s.logger.Error("Failed to upsert job", "job_id", job.JobID, "job_name", job.JobName, "error", err) + errorCount++ continue } successCount++ + s.logger.Debug("Upserted job", "job_id", job.JobID, "job_name", job.JobName) + } + + s.logger.Info("Synced jobs from Bacula database", "total", len(jobs), "success", successCount, "errors", errorCount) + + if errorCount > 0 { + return fmt.Errorf("failed to sync %d out of %d jobs", errorCount, len(jobs)) } - s.logger.Info("Synced jobs from Bacula database", "total", len(jobs), "success", successCount) return nil } -// getBaculaConnection gets or creates a connection to Bacula database -// Tries to create connection using same host/port/user but different database name -func (s *Service) getBaculaConnection(ctx context.Context) (*database.DB, error) { - if s.baculaDB != nil { - // Test if connection is still alive - if err := s.baculaDB.Ping(); err == nil { - return s.baculaDB, nil - } - // Connection is dead, close it - s.baculaDB.Close() - s.baculaDB = nil - } - - // Try to get connection info from current database connection - // We'll query the current database to get connection parameters - var currentDB, currentUser, currentHost string - var currentPort int - - // Get current database connection info - query := `SELECT current_database(), current_user, inet_server_addr(), inet_server_port()` - err := s.db.QueryRowContext(ctx, query).Scan(¤tDB, ¤tUser, ¤tHost, ¤tPort) - if err != nil { - return nil, fmt.Errorf("failed to get current database info: %w", err) - } - - // If host is null, it's a local connection (Unix socket) - if currentHost == "" { - currentHost = "localhost" - } - if currentPort == 0 { - currentPort = 5432 // Default PostgreSQL port - } - - // Try common Bacula database names - databases := []string{"bacula", "bareos", s.baculaDBName} - - for _, dbName := range databases { - if dbName == "" { - continue - } - - // Try to create connection to Bacula database - // We'll use the same connection parameters but different database name - // Note: This assumes same host/port/user/password - // For production, you'd want to configure this separately - - // We can't create a new connection without password - // So we'll try to query using dblink or assume same connection can access Bacula DB - // For now, return nil and let queryBaculaDatabase handle it via dblink or direct query - } - - return nil, fmt.Errorf("Bacula database connection not configured - will use dblink or direct query") -} - // queryBaculaDatabase queries Bacula database directly -// Following Bacularis approach: query Job table directly from Bacula database -// Since Bacula is in separate database, prioritize dblink over direct query +// Uses direct connection to Bacula database (no dblink needed) func (s *Service) queryBaculaDatabase(ctx context.Context) ([]Job, error) { - // Method 1: Try using dblink extension for cross-database query (preferred for separate databases) - checkDblink := `SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'dblink')` - var dblinkExists bool - err := s.db.QueryRowContext(ctx, checkDblink).Scan(&dblinkExists) - if err == nil && dblinkExists { - jobs, err := s.queryBaculaViaDblink(ctx) - if err == nil && len(jobs) > 0 { - return jobs, nil - } - s.logger.Debug("dblink query failed, trying direct query", "error", err) - } else { - s.logger.Debug("dblink extension not found, trying direct query") + // Use direct connection to Bacula database + if s.baculaDB == nil { + return nil, fmt.Errorf("Bacula database connection not configured") } - // Method 2: Try querying Job table directly (if Bacula is in same database) - jobs, err := s.queryBaculaDirect(ctx) - if err == nil && len(jobs) > 0 { - return jobs, nil - } - s.logger.Debug("Direct query also failed", "error", err) - - return nil, fmt.Errorf("failed to query Bacula database: dblink and direct query both failed") + return s.queryBaculaDirect(ctx) } // queryBaculaDirect queries Job table directly (Bacularis approach) @@ -225,9 +181,10 @@ func (s *Service) queryBaculaDirect(ctx context.Context) ([]Job, error) { LIMIT 1000 ` - rows, err := s.db.QueryContext(ctx, query) + // Use direct connection to Bacula database + rows, err := s.baculaDB.QueryContext(ctx, query) if err != nil { - return nil, fmt.Errorf("Job table not found or not accessible: %w", err) + return nil, fmt.Errorf("failed to query Bacula Job table: %w", err) } defer rows.Close() @@ -273,158 +230,6 @@ func (s *Service) queryBaculaDirect(ctx context.Context) ([]Job, error) { return jobs, nil // Return empty list, not an error } -// queryBaculaViaDblink queries Bacula database using dblink extension -// Assumes dblink is installed and user has access to bacula database -func (s *Service) queryBaculaViaDblink(ctx context.Context) ([]Job, error) { - // Get current user and connection info for dblink - var currentUser, currentHost string - var currentPort int - - // Get current connection info - err := s.db.QueryRowContext(ctx, - `SELECT current_user, COALESCE(inet_server_addr()::text, ''), COALESCE(inet_server_port(), 5432)`).Scan( - ¤tUser, ¤tHost, ¤tPort) - if err != nil { - return nil, fmt.Errorf("failed to get connection info: %w", err) - } - - // Log connection info (without password) - s.logger.Debug("Preparing dblink connection", "user", currentUser, "host", currentHost, "port", currentPort, "has_password", s.dbPassword != "") - - // Try common Bacula database names - databases := []string{"bacula", "bareos", s.baculaDBName} - - for _, dbName := range databases { - if dbName == "" { - continue - } - - // Build dblink connection string - // Format: 'dbname=database_name user=username password=password' - // dblink requires password even for local connections - connStr := fmt.Sprintf("dbname=%s user=%s", dbName, currentUser) - - // Add password if available (required for dblink) - if s.dbPassword != "" { - // Escape special characters in password for connection string - // Replace single quotes with \' and backslashes with \\ - escapedPassword := strings.ReplaceAll(s.dbPassword, "\\", "\\\\") - escapedPassword = strings.ReplaceAll(escapedPassword, "'", "\\'") - connStr += fmt.Sprintf(" password='%s'", escapedPassword) - } - - // Add host/port for remote connections - if currentHost != "" { - connStr += fmt.Sprintf(" host=%s port=%d", currentHost, currentPort) - } - - // Query using dblink - get all data in one query with JOIN - // Escape single quotes in SQL string for dblink (double them) - innerQuery := `SELECT - j.JobId, - j.Name, - j.Type, - j.Level, - j.JobStatus, - j.JobBytes, - j.JobFiles, - j.StartTime, - j.EndTime, - COALESCE(c.Name, 'unknown') as ClientName - FROM Job j - LEFT JOIN Client c ON j.ClientId = c.ClientId - ORDER BY j.StartTime DESC - LIMIT 1000` - - // Escape single quotes in inner query for dblink (double them) - escapedQuery := strings.ReplaceAll(innerQuery, "'", "''") - - query := fmt.Sprintf(` - SELECT - JobId as job_id, - Name as job_name, - ClientName as client_name, - CASE - WHEN Type = 'B' THEN 'Backup' - WHEN Type = 'R' THEN 'Restore' - WHEN Type = 'V' THEN 'Verify' - WHEN Type = 'C' THEN 'Copy' - WHEN Type = 'M' THEN 'Migrate' - ELSE 'Backup' - END as job_type, - CASE - WHEN Level = 'F' THEN 'Full' - WHEN Level = 'I' THEN 'Incremental' - WHEN Level = 'D' THEN 'Differential' - WHEN Level = 'S' THEN 'Since' - ELSE 'Full' - END as job_level, - CASE - WHEN JobStatus = 'T' THEN 'Running' - WHEN JobStatus = 'C' THEN 'Completed' - WHEN JobStatus = 'f' OR JobStatus = 'F' THEN 'Failed' - WHEN JobStatus = 'A' THEN 'Canceled' - WHEN JobStatus = 'W' THEN 'Waiting' - ELSE 'Waiting' - END as status, - COALESCE(JobBytes, 0) as bytes_written, - COALESCE(JobFiles, 0) as files_written, - StartTime as started_at, - EndTime as ended_at - FROM dblink('%s', '%s') AS t(JobId int, Name text, Type char, Level char, JobStatus char, JobBytes bigint, JobFiles int, StartTime timestamp, EndTime timestamp, ClientName text) - `, connStr, escapedQuery) - - rows, err := s.db.QueryContext(ctx, query) - if err != nil { - s.logger.Error("Failed to query Bacula via dblink", "database", dbName, "connection", connStr, "error", err) - continue - } - defer rows.Close() - - var jobs []Job - for rows.Next() { - var job Job - var startedAt, endedAt sql.NullTime - - err := rows.Scan( - &job.JobID, &job.JobName, &job.ClientName, - &job.JobType, &job.JobLevel, &job.Status, - &job.BytesWritten, &job.FilesWritten, &startedAt, &endedAt, - ) - if err != nil { - s.logger.Error("Failed to scan Bacula job from dblink", "error", err) - continue - } - - if startedAt.Valid { - job.StartedAt = &startedAt.Time - } - if endedAt.Valid { - job.EndedAt = &endedAt.Time - // Calculate duration - if job.StartedAt != nil { - duration := int(endedAt.Time.Sub(*job.StartedAt).Seconds()) - job.DurationSeconds = &duration - } - } - - jobs = append(jobs, job) - } - - if err := rows.Err(); err != nil { - s.logger.Debug("Error iterating dblink results", "database", dbName, "error", err) - continue - } - - if len(jobs) > 0 { - s.logger.Info("Successfully queried Bacula database via dblink", "database", dbName, "count", len(jobs)) - return jobs, nil - } - } - - return nil, fmt.Errorf("failed to query Bacula database via dblink from any database") -} - // syncFromBconsole syncs jobs using bconsole command (fallback method) func (s *Service) syncFromBconsole(ctx context.Context) error { // Execute bconsole command to list jobs @@ -651,16 +456,19 @@ func (s *Service) upsertJob(ctx context.Context, job Job) error { query := ` INSERT INTO backup_jobs ( job_id, job_name, client_name, job_type, job_level, status, - bytes_written, files_written, started_at, updated_at - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, NOW()) + bytes_written, files_written, started_at, ended_at, duration_seconds, updated_at + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, NOW()) ON CONFLICT (job_id) DO UPDATE SET job_name = EXCLUDED.job_name, + client_name = EXCLUDED.client_name, job_type = EXCLUDED.job_type, job_level = EXCLUDED.job_level, status = EXCLUDED.status, bytes_written = EXCLUDED.bytes_written, files_written = EXCLUDED.files_written, started_at = EXCLUDED.started_at, + ended_at = EXCLUDED.ended_at, + duration_seconds = EXCLUDED.duration_seconds, updated_at = NOW() ` @@ -670,12 +478,20 @@ func (s *Service) upsertJob(ctx context.Context, job Job) error { clientName = "unknown" } - _, err := s.db.ExecContext(ctx, query, + result, err := s.db.ExecContext(ctx, query, job.JobID, job.JobName, clientName, job.JobType, job.JobLevel, job.Status, - job.BytesWritten, job.FilesWritten, job.StartedAt, + job.BytesWritten, job.FilesWritten, job.StartedAt, job.EndedAt, job.DurationSeconds, ) - return err + if err != nil { + s.logger.Error("Database error in upsertJob", "job_id", job.JobID, "error", err) + return err + } + + rowsAffected, _ := result.RowsAffected() + s.logger.Debug("Upserted job to database", "job_id", job.JobID, "rows_affected", rowsAffected) + + return nil } // ListJobs lists backup jobs with optional filters @@ -683,10 +499,13 @@ func (s *Service) ListJobs(ctx context.Context, opts ListJobsOptions) ([]Job, in // Try to sync jobs from Bacula first (non-blocking - if it fails, continue with database) // Don't return error if sync fails, just log it and continue // This allows the API to work even if bconsole is not available + s.logger.Info("ListJobs called, syncing from Bacula first") syncErr := s.SyncJobsFromBacula(ctx) if syncErr != nil { - s.logger.Debug("Failed to sync jobs from Bacula, using database only", "error", syncErr) + s.logger.Warn("Failed to sync jobs from Bacula, using database only", "error", syncErr) // Continue anyway - we'll use whatever is in the database + } else { + s.logger.Info("Successfully synced jobs from Bacula") } // Build WHERE clause diff --git a/backend/internal/common/database/migrations/011_sync_bacula_jobs_function.sql b/backend/internal/common/database/migrations/011_sync_bacula_jobs_function.sql new file mode 100644 index 0000000..8cc0af0 --- /dev/null +++ b/backend/internal/common/database/migrations/011_sync_bacula_jobs_function.sql @@ -0,0 +1,209 @@ +-- AtlasOS - Calypso +-- PostgreSQL Function to Sync Jobs from Bacula to Calypso +-- Version: 11.0 +-- +-- This function syncs jobs from Bacula database (Job table) to Calypso database (backup_jobs table) +-- Uses dblink extension to query Bacula database from Calypso database +-- +-- Prerequisites: +-- 1. dblink extension must be installed: CREATE EXTENSION IF NOT EXISTS dblink; +-- 2. User must have access to both databases +-- 3. Connection parameters must be configured in the function + +-- Create function to sync jobs from Bacula to Calypso +CREATE OR REPLACE FUNCTION sync_bacula_jobs( + bacula_db_name TEXT DEFAULT 'bacula', + bacula_host TEXT DEFAULT 'localhost', + bacula_port INTEGER DEFAULT 5432, + bacula_user TEXT DEFAULT 'calypso', + bacula_password TEXT DEFAULT '' +) +RETURNS TABLE( + jobs_synced INTEGER, + jobs_inserted INTEGER, + jobs_updated INTEGER, + errors INTEGER +) AS $$ +DECLARE + conn_str TEXT; + jobs_count INTEGER := 0; + inserted_count INTEGER := 0; + updated_count INTEGER := 0; + error_count INTEGER := 0; + job_record RECORD; +BEGIN + -- Build dblink connection string + conn_str := format( + 'dbname=%s host=%s port=%s user=%s password=%s', + bacula_db_name, + bacula_host, + bacula_port, + bacula_user, + bacula_password + ); + + -- Query jobs from Bacula database using dblink + FOR job_record IN + SELECT * FROM dblink( + conn_str, + $QUERY$ + SELECT + j.JobId, + j.Name as job_name, + COALESCE(c.Name, 'unknown') as client_name, + CASE + WHEN j.Type = 'B' THEN 'Backup' + WHEN j.Type = 'R' THEN 'Restore' + WHEN j.Type = 'V' THEN 'Verify' + WHEN j.Type = 'C' THEN 'Copy' + WHEN j.Type = 'M' THEN 'Migrate' + ELSE 'Backup' + END as job_type, + CASE + WHEN j.Level = 'F' THEN 'Full' + WHEN j.Level = 'I' THEN 'Incremental' + WHEN j.Level = 'D' THEN 'Differential' + WHEN j.Level = 'S' THEN 'Since' + ELSE 'Full' + END as job_level, + CASE + WHEN j.JobStatus = 'T' THEN 'Running' + WHEN j.JobStatus = 'C' THEN 'Completed' + WHEN j.JobStatus = 'f' OR j.JobStatus = 'F' THEN 'Failed' + WHEN j.JobStatus = 'A' THEN 'Canceled' + WHEN j.JobStatus = 'W' THEN 'Waiting' + ELSE 'Waiting' + END as status, + COALESCE(j.JobBytes, 0) as bytes_written, + COALESCE(j.JobFiles, 0) as files_written, + j.StartTime as started_at, + j.EndTime as ended_at, + CASE + WHEN j.EndTime IS NOT NULL AND j.StartTime IS NOT NULL + THEN EXTRACT(EPOCH FROM (j.EndTime - j.StartTime))::INTEGER + ELSE NULL + END as duration_seconds + FROM Job j + LEFT JOIN Client c ON j.ClientId = c.ClientId + ORDER BY j.StartTime DESC + LIMIT 1000 + $QUERY$ + ) AS t( + job_id INTEGER, + job_name TEXT, + client_name TEXT, + job_type TEXT, + job_level TEXT, + status TEXT, + bytes_written BIGINT, + files_written INTEGER, + started_at TIMESTAMP, + ended_at TIMESTAMP, + duration_seconds INTEGER + ) + LOOP + BEGIN + -- Check if job already exists (before insert/update) + IF EXISTS (SELECT 1 FROM backup_jobs WHERE job_id = job_record.job_id) THEN + updated_count := updated_count + 1; + ELSE + inserted_count := inserted_count + 1; + END IF; + + -- Upsert job to backup_jobs table + INSERT INTO backup_jobs ( + job_id, job_name, client_name, job_type, job_level, status, + bytes_written, files_written, started_at, ended_at, duration_seconds, + updated_at + ) VALUES ( + job_record.job_id, + job_record.job_name, + job_record.client_name, + job_record.job_type, + job_record.job_level, + job_record.status, + job_record.bytes_written, + job_record.files_written, + job_record.started_at, + job_record.ended_at, + job_record.duration_seconds, + NOW() + ) + ON CONFLICT (job_id) DO UPDATE SET + job_name = EXCLUDED.job_name, + client_name = EXCLUDED.client_name, + job_type = EXCLUDED.job_type, + job_level = EXCLUDED.job_level, + status = EXCLUDED.status, + bytes_written = EXCLUDED.bytes_written, + files_written = EXCLUDED.files_written, + started_at = EXCLUDED.started_at, + ended_at = EXCLUDED.ended_at, + duration_seconds = EXCLUDED.duration_seconds, + updated_at = NOW(); + + jobs_count := jobs_count + 1; + EXCEPTION + WHEN OTHERS THEN + error_count := error_count + 1; + -- Log error but continue with next job + RAISE WARNING 'Error syncing job %: %', job_record.job_id, SQLERRM; + END; + END LOOP; + + -- Return summary + RETURN QUERY SELECT jobs_count, inserted_count, updated_count, error_count; +END; +$$ LANGUAGE plpgsql; + +-- Create a simpler version that uses current database connection settings +-- This version assumes Bacula is on same host/port with same user +CREATE OR REPLACE FUNCTION sync_bacula_jobs_simple() +RETURNS TABLE( + jobs_synced INTEGER, + jobs_inserted INTEGER, + jobs_updated INTEGER, + errors INTEGER +) AS $$ +DECLARE + current_user_name TEXT; + current_host TEXT; + current_port INTEGER; + current_db TEXT; +BEGIN + -- Get current connection info + SELECT + current_user, + COALESCE(inet_server_addr()::TEXT, 'localhost'), + COALESCE(inet_server_port(), 5432), + current_database() + INTO + current_user_name, + current_host, + current_port, + current_db; + + -- Call main function with current connection settings + -- Note: password needs to be passed or configured in .pgpass + RETURN QUERY + SELECT * FROM sync_bacula_jobs( + 'bacula', -- Try 'bacula' first + current_host, + current_port, + current_user_name, + '' -- Empty password - will use .pgpass or peer authentication + ); +END; +$$ LANGUAGE plpgsql; + +-- Grant execute permission to calypso user +GRANT EXECUTE ON FUNCTION sync_bacula_jobs(TEXT, TEXT, INTEGER, TEXT, TEXT) TO calypso; +GRANT EXECUTE ON FUNCTION sync_bacula_jobs_simple() TO calypso; + +-- Create index if not exists (should already exist from migration 009) +CREATE INDEX IF NOT EXISTS idx_backup_jobs_job_id ON backup_jobs(job_id); +CREATE INDEX IF NOT EXISTS idx_backup_jobs_updated_at ON backup_jobs(updated_at); + +COMMENT ON FUNCTION sync_bacula_jobs IS 'Syncs jobs from Bacula database to Calypso backup_jobs table using dblink'; +COMMENT ON FUNCTION sync_bacula_jobs_simple IS 'Simplified version that uses current connection settings (requires .pgpass for password)'; + diff --git a/backend/internal/common/database/migrations/011_sync_bacula_jobs_function.sql.bak b/backend/internal/common/database/migrations/011_sync_bacula_jobs_function.sql.bak new file mode 100644 index 0000000..24c40e7 --- /dev/null +++ b/backend/internal/common/database/migrations/011_sync_bacula_jobs_function.sql.bak @@ -0,0 +1,209 @@ +-- AtlasOS - Calypso +-- PostgreSQL Function to Sync Jobs from Bacula to Calypso +-- Version: 11.0 +-- +-- This function syncs jobs from Bacula database (Job table) to Calypso database (backup_jobs table) +-- Uses dblink extension to query Bacula database from Calypso database +-- +-- Prerequisites: +-- 1. dblink extension must be installed: CREATE EXTENSION IF NOT EXISTS dblink; +-- 2. User must have access to both databases +-- 3. Connection parameters must be configured in the function + +-- Create function to sync jobs from Bacula to Calypso +CREATE OR REPLACE FUNCTION sync_bacula_jobs( + bacula_db_name TEXT DEFAULT 'bacula', + bacula_host TEXT DEFAULT 'localhost', + bacula_port INTEGER DEFAULT 5432, + bacula_user TEXT DEFAULT 'calypso', + bacula_password TEXT DEFAULT '' +) +RETURNS TABLE( + jobs_synced INTEGER, + jobs_inserted INTEGER, + jobs_updated INTEGER, + errors INTEGER +) AS $$ +DECLARE + conn_str TEXT; + jobs_count INTEGER := 0; + inserted_count INTEGER := 0; + updated_count INTEGER := 0; + error_count INTEGER := 0; + job_record RECORD; +BEGIN + -- Build dblink connection string + conn_str := format( + 'dbname=%s host=%s port=%s user=%s password=%s', + bacula_db_name, + bacula_host, + bacula_port, + bacula_user, + bacula_password + ); + + -- Query jobs from Bacula database using dblink + FOR job_record IN + SELECT * FROM dblink( + conn_str, + $$ + SELECT + j.JobId, + j.Name as job_name, + COALESCE(c.Name, 'unknown') as client_name, + CASE + WHEN j.Type = 'B' THEN 'Backup' + WHEN j.Type = 'R' THEN 'Restore' + WHEN j.Type = 'V' THEN 'Verify' + WHEN j.Type = 'C' THEN 'Copy' + WHEN j.Type = 'M' THEN 'Migrate' + ELSE 'Backup' + END as job_type, + CASE + WHEN j.Level = 'F' THEN 'Full' + WHEN j.Level = 'I' THEN 'Incremental' + WHEN j.Level = 'D' THEN 'Differential' + WHEN j.Level = 'S' THEN 'Since' + ELSE 'Full' + END as job_level, + CASE + WHEN j.JobStatus = 'T' THEN 'Running' + WHEN j.JobStatus = 'C' THEN 'Completed' + WHEN j.JobStatus = 'f' OR j.JobStatus = 'F' THEN 'Failed' + WHEN j.JobStatus = 'A' THEN 'Canceled' + WHEN j.JobStatus = 'W' THEN 'Waiting' + ELSE 'Waiting' + END as status, + COALESCE(j.JobBytes, 0) as bytes_written, + COALESCE(j.JobFiles, 0) as files_written, + j.StartTime as started_at, + j.EndTime as ended_at, + CASE + WHEN j.EndTime IS NOT NULL AND j.StartTime IS NOT NULL + THEN EXTRACT(EPOCH FROM (j.EndTime - j.StartTime))::INTEGER + ELSE NULL + END as duration_seconds + FROM Job j + LEFT JOIN Client c ON j.ClientId = c.ClientId + ORDER BY j.StartTime DESC + LIMIT 1000 + $$ + ) AS t( + job_id INTEGER, + job_name TEXT, + client_name TEXT, + job_type TEXT, + job_level TEXT, + status TEXT, + bytes_written BIGINT, + files_written INTEGER, + started_at TIMESTAMP, + ended_at TIMESTAMP, + duration_seconds INTEGER + ) + LOOP + BEGIN + -- Check if job already exists (before insert/update) + IF EXISTS (SELECT 1 FROM backup_jobs WHERE job_id = job_record.job_id) THEN + updated_count := updated_count + 1; + ELSE + inserted_count := inserted_count + 1; + END IF; + + -- Upsert job to backup_jobs table + INSERT INTO backup_jobs ( + job_id, job_name, client_name, job_type, job_level, status, + bytes_written, files_written, started_at, ended_at, duration_seconds, + updated_at + ) VALUES ( + job_record.job_id, + job_record.job_name, + job_record.client_name, + job_record.job_type, + job_record.job_level, + job_record.status, + job_record.bytes_written, + job_record.files_written, + job_record.started_at, + job_record.ended_at, + job_record.duration_seconds, + NOW() + ) + ON CONFLICT (job_id) DO UPDATE SET + job_name = EXCLUDED.job_name, + client_name = EXCLUDED.client_name, + job_type = EXCLUDED.job_type, + job_level = EXCLUDED.job_level, + status = EXCLUDED.status, + bytes_written = EXCLUDED.bytes_written, + files_written = EXCLUDED.files_written, + started_at = EXCLUDED.started_at, + ended_at = EXCLUDED.ended_at, + duration_seconds = EXCLUDED.duration_seconds, + updated_at = NOW(); + + jobs_count := jobs_count + 1; + EXCEPTION + WHEN OTHERS THEN + error_count := error_count + 1; + -- Log error but continue with next job + RAISE WARNING 'Error syncing job %: %', job_record.job_id, SQLERRM; + END; + END LOOP; + + -- Return summary + RETURN QUERY SELECT jobs_count, inserted_count, updated_count, error_count; +END; +$$ LANGUAGE plpgsql; + +-- Create a simpler version that uses current database connection settings +-- This version assumes Bacula is on same host/port with same user +CREATE OR REPLACE FUNCTION sync_bacula_jobs_simple() +RETURNS TABLE( + jobs_synced INTEGER, + jobs_inserted INTEGER, + jobs_updated INTEGER, + errors INTEGER +) AS $$ +DECLARE + current_user_name TEXT; + current_host TEXT; + current_port INTEGER; + current_db TEXT; +BEGIN + -- Get current connection info + SELECT + current_user, + COALESCE(inet_server_addr()::TEXT, 'localhost'), + COALESCE(inet_server_port(), 5432), + current_database() + INTO + current_user_name, + current_host, + current_port, + current_db; + + -- Call main function with current connection settings + -- Note: password needs to be passed or configured in .pgpass + RETURN QUERY + SELECT * FROM sync_bacula_jobs( + 'bacula', -- Try 'bacula' first + current_host, + current_port, + current_user_name, + '' -- Empty password - will use .pgpass or peer authentication + ); +END; +$$ LANGUAGE plpgsql; + +-- Grant execute permission to calypso user +GRANT EXECUTE ON FUNCTION sync_bacula_jobs(TEXT, TEXT, INTEGER, TEXT, TEXT) TO calypso; +GRANT EXECUTE ON FUNCTION sync_bacula_jobs_simple() TO calypso; + +-- Create index if not exists (should already exist from migration 009) +CREATE INDEX IF NOT EXISTS idx_backup_jobs_job_id ON backup_jobs(job_id); +CREATE INDEX IF NOT EXISTS idx_backup_jobs_updated_at ON backup_jobs(updated_at); + +COMMENT ON FUNCTION sync_bacula_jobs IS 'Syncs jobs from Bacula database to Calypso backup_jobs table using dblink'; +COMMENT ON FUNCTION sync_bacula_jobs_simple IS 'Simplified version that uses current connection settings (requires .pgpass for password)'; + diff --git a/backend/internal/common/router/router.go b/backend/internal/common/router/router.go index 889c34c..d963935 100644 --- a/backend/internal/common/router/router.go +++ b/backend/internal/common/router/router.go @@ -315,8 +315,17 @@ func NewRouter(cfg *config.Config, db *database.DB, log *logger.Logger) *gin.Eng // Backup Jobs backupService := backup.NewService(db, log) - // Set database password for dblink connections - backupService.SetDatabasePassword(cfg.Database.Password) + // Set up direct connection to Bacula database + // Try common Bacula database names + baculaDBName := "bacula" // Default + if err := backupService.SetBaculaDatabase(cfg.Database, baculaDBName); err != nil { + log.Warn("Failed to connect to Bacula database, trying 'bareos'", "error", err) + // Try 'bareos' as alternative + if err := backupService.SetBaculaDatabase(cfg.Database, "bareos"); err != nil { + log.Error("Failed to connect to Bacula database", "error", err, "tried", []string{"bacula", "bareos"}) + // Continue anyway - will fallback to bconsole + } + } backupHandler := backup.NewHandler(backupService, log) backupGroup := protected.Group("/backup") backupGroup.Use(requirePermission("backup", "read"))