fix list backup jobs on backup management console
This commit is contained in:
Binary file not shown.
@@ -9,32 +9,41 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/atlasos/calypso/internal/common/config"
|
||||
"github.com/atlasos/calypso/internal/common/database"
|
||||
"github.com/atlasos/calypso/internal/common/logger"
|
||||
)
|
||||
|
||||
// Service handles backup job operations
|
||||
type Service struct {
|
||||
db *database.DB
|
||||
baculaDB *database.DB // Optional: separate connection to Bacula database
|
||||
logger *logger.Logger
|
||||
baculaDBName string // Bacula database name (bacula, bareos, etc.)
|
||||
dbPassword string // Database password for dblink (optional, will try without if empty)
|
||||
db *database.DB
|
||||
baculaDB *database.DB // Direct connection to Bacula database
|
||||
logger *logger.Logger
|
||||
}
|
||||
|
||||
// NewService creates a new backup service
|
||||
func NewService(db *database.DB, log *logger.Logger) *Service {
|
||||
return &Service{
|
||||
db: db,
|
||||
logger: log,
|
||||
baculaDBName: "bacula", // Default Bacula database name
|
||||
db: db,
|
||||
logger: log,
|
||||
}
|
||||
}
|
||||
|
||||
// SetDatabasePassword sets the database password for dblink connections
|
||||
func (s *Service) SetDatabasePassword(password string) {
|
||||
s.dbPassword = password
|
||||
s.logger.Debug("Database password set for dblink", "has_password", password != "", "password_length", len(password))
|
||||
// SetBaculaDatabase sets up direct connection to Bacula database
|
||||
func (s *Service) SetBaculaDatabase(cfg config.DatabaseConfig, baculaDBName string) error {
|
||||
// Create new database config for Bacula database
|
||||
baculaCfg := cfg
|
||||
baculaCfg.Database = baculaDBName // Override database name
|
||||
|
||||
// Create connection to Bacula database
|
||||
baculaDB, err := database.NewConnection(baculaCfg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect to Bacula database: %w", err)
|
||||
}
|
||||
|
||||
s.baculaDB = baculaDB
|
||||
s.logger.Info("Connected to Bacula database", "database", baculaDBName, "host", cfg.Host, "port", cfg.Port)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Job represents a backup job
|
||||
@@ -72,14 +81,24 @@ type ListJobsOptions struct {
|
||||
// SyncJobsFromBacula syncs jobs from Bacula/Bareos to the database
|
||||
// Tries to query Bacula database directly first, falls back to bconsole if database access fails
|
||||
func (s *Service) SyncJobsFromBacula(ctx context.Context) error {
|
||||
s.logger.Info("Starting sync from Bacula database", "bacula_db_configured", s.baculaDB != nil)
|
||||
|
||||
// Check if Bacula database connection is configured
|
||||
if s.baculaDB == nil {
|
||||
s.logger.Warn("Bacula database connection not configured, trying bconsole fallback")
|
||||
return s.syncFromBconsole(ctx)
|
||||
}
|
||||
|
||||
// Try to query Bacula database directly (if user has access)
|
||||
jobs, err := s.queryBaculaDatabase(ctx)
|
||||
if err != nil {
|
||||
s.logger.Debug("Failed to query Bacula database directly, trying bconsole", "error", err)
|
||||
s.logger.Warn("Failed to query Bacula database directly, trying bconsole", "error", err)
|
||||
// Fallback to bconsole
|
||||
return s.syncFromBconsole(ctx)
|
||||
}
|
||||
|
||||
s.logger.Info("Queried Bacula database", "jobs_found", len(jobs))
|
||||
|
||||
if len(jobs) == 0 {
|
||||
s.logger.Debug("No jobs found in Bacula database")
|
||||
return nil
|
||||
@@ -87,99 +106,36 @@ func (s *Service) SyncJobsFromBacula(ctx context.Context) error {
|
||||
|
||||
// Upsert jobs to Calypso database
|
||||
successCount := 0
|
||||
errorCount := 0
|
||||
for _, job := range jobs {
|
||||
err := s.upsertJob(ctx, job)
|
||||
if err != nil {
|
||||
s.logger.Error("Failed to upsert job", "job_id", job.JobID, "error", err)
|
||||
s.logger.Error("Failed to upsert job", "job_id", job.JobID, "job_name", job.JobName, "error", err)
|
||||
errorCount++
|
||||
continue
|
||||
}
|
||||
successCount++
|
||||
s.logger.Debug("Upserted job", "job_id", job.JobID, "job_name", job.JobName)
|
||||
}
|
||||
|
||||
s.logger.Info("Synced jobs from Bacula database", "total", len(jobs), "success", successCount, "errors", errorCount)
|
||||
|
||||
if errorCount > 0 {
|
||||
return fmt.Errorf("failed to sync %d out of %d jobs", errorCount, len(jobs))
|
||||
}
|
||||
|
||||
s.logger.Info("Synced jobs from Bacula database", "total", len(jobs), "success", successCount)
|
||||
return nil
|
||||
}
|
||||
|
||||
// getBaculaConnection gets or creates a connection to Bacula database
|
||||
// Tries to create connection using same host/port/user but different database name
|
||||
func (s *Service) getBaculaConnection(ctx context.Context) (*database.DB, error) {
|
||||
if s.baculaDB != nil {
|
||||
// Test if connection is still alive
|
||||
if err := s.baculaDB.Ping(); err == nil {
|
||||
return s.baculaDB, nil
|
||||
}
|
||||
// Connection is dead, close it
|
||||
s.baculaDB.Close()
|
||||
s.baculaDB = nil
|
||||
}
|
||||
|
||||
// Try to get connection info from current database connection
|
||||
// We'll query the current database to get connection parameters
|
||||
var currentDB, currentUser, currentHost string
|
||||
var currentPort int
|
||||
|
||||
// Get current database connection info
|
||||
query := `SELECT current_database(), current_user, inet_server_addr(), inet_server_port()`
|
||||
err := s.db.QueryRowContext(ctx, query).Scan(¤tDB, ¤tUser, ¤tHost, ¤tPort)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get current database info: %w", err)
|
||||
}
|
||||
|
||||
// If host is null, it's a local connection (Unix socket)
|
||||
if currentHost == "" {
|
||||
currentHost = "localhost"
|
||||
}
|
||||
if currentPort == 0 {
|
||||
currentPort = 5432 // Default PostgreSQL port
|
||||
}
|
||||
|
||||
// Try common Bacula database names
|
||||
databases := []string{"bacula", "bareos", s.baculaDBName}
|
||||
|
||||
for _, dbName := range databases {
|
||||
if dbName == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
// Try to create connection to Bacula database
|
||||
// We'll use the same connection parameters but different database name
|
||||
// Note: This assumes same host/port/user/password
|
||||
// For production, you'd want to configure this separately
|
||||
|
||||
// We can't create a new connection without password
|
||||
// So we'll try to query using dblink or assume same connection can access Bacula DB
|
||||
// For now, return nil and let queryBaculaDatabase handle it via dblink or direct query
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("Bacula database connection not configured - will use dblink or direct query")
|
||||
}
|
||||
|
||||
// queryBaculaDatabase queries Bacula database directly
|
||||
// Following Bacularis approach: query Job table directly from Bacula database
|
||||
// Since Bacula is in separate database, prioritize dblink over direct query
|
||||
// Uses direct connection to Bacula database (no dblink needed)
|
||||
func (s *Service) queryBaculaDatabase(ctx context.Context) ([]Job, error) {
|
||||
// Method 1: Try using dblink extension for cross-database query (preferred for separate databases)
|
||||
checkDblink := `SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'dblink')`
|
||||
var dblinkExists bool
|
||||
err := s.db.QueryRowContext(ctx, checkDblink).Scan(&dblinkExists)
|
||||
if err == nil && dblinkExists {
|
||||
jobs, err := s.queryBaculaViaDblink(ctx)
|
||||
if err == nil && len(jobs) > 0 {
|
||||
return jobs, nil
|
||||
}
|
||||
s.logger.Debug("dblink query failed, trying direct query", "error", err)
|
||||
} else {
|
||||
s.logger.Debug("dblink extension not found, trying direct query")
|
||||
// Use direct connection to Bacula database
|
||||
if s.baculaDB == nil {
|
||||
return nil, fmt.Errorf("Bacula database connection not configured")
|
||||
}
|
||||
|
||||
// Method 2: Try querying Job table directly (if Bacula is in same database)
|
||||
jobs, err := s.queryBaculaDirect(ctx)
|
||||
if err == nil && len(jobs) > 0 {
|
||||
return jobs, nil
|
||||
}
|
||||
s.logger.Debug("Direct query also failed", "error", err)
|
||||
|
||||
return nil, fmt.Errorf("failed to query Bacula database: dblink and direct query both failed")
|
||||
return s.queryBaculaDirect(ctx)
|
||||
}
|
||||
|
||||
// queryBaculaDirect queries Job table directly (Bacularis approach)
|
||||
@@ -225,9 +181,10 @@ func (s *Service) queryBaculaDirect(ctx context.Context) ([]Job, error) {
|
||||
LIMIT 1000
|
||||
`
|
||||
|
||||
rows, err := s.db.QueryContext(ctx, query)
|
||||
// Use direct connection to Bacula database
|
||||
rows, err := s.baculaDB.QueryContext(ctx, query)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Job table not found or not accessible: %w", err)
|
||||
return nil, fmt.Errorf("failed to query Bacula Job table: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
@@ -273,158 +230,6 @@ func (s *Service) queryBaculaDirect(ctx context.Context) ([]Job, error) {
|
||||
return jobs, nil // Return empty list, not an error
|
||||
}
|
||||
|
||||
// queryBaculaViaDblink queries Bacula database using dblink extension
|
||||
// Assumes dblink is installed and user has access to bacula database
|
||||
func (s *Service) queryBaculaViaDblink(ctx context.Context) ([]Job, error) {
|
||||
// Get current user and connection info for dblink
|
||||
var currentUser, currentHost string
|
||||
var currentPort int
|
||||
|
||||
// Get current connection info
|
||||
err := s.db.QueryRowContext(ctx,
|
||||
`SELECT current_user, COALESCE(inet_server_addr()::text, ''), COALESCE(inet_server_port(), 5432)`).Scan(
|
||||
¤tUser, ¤tHost, ¤tPort)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get connection info: %w", err)
|
||||
}
|
||||
|
||||
// Log connection info (without password)
|
||||
s.logger.Debug("Preparing dblink connection", "user", currentUser, "host", currentHost, "port", currentPort, "has_password", s.dbPassword != "")
|
||||
|
||||
// Try common Bacula database names
|
||||
databases := []string{"bacula", "bareos", s.baculaDBName}
|
||||
|
||||
for _, dbName := range databases {
|
||||
if dbName == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
// Build dblink connection string
|
||||
// Format: 'dbname=database_name user=username password=password'
|
||||
// dblink requires password even for local connections
|
||||
connStr := fmt.Sprintf("dbname=%s user=%s", dbName, currentUser)
|
||||
|
||||
// Add password if available (required for dblink)
|
||||
if s.dbPassword != "" {
|
||||
// Escape special characters in password for connection string
|
||||
// Replace single quotes with \' and backslashes with \\
|
||||
escapedPassword := strings.ReplaceAll(s.dbPassword, "\\", "\\\\")
|
||||
escapedPassword = strings.ReplaceAll(escapedPassword, "'", "\\'")
|
||||
connStr += fmt.Sprintf(" password='%s'", escapedPassword)
|
||||
}
|
||||
|
||||
// Add host/port for remote connections
|
||||
if currentHost != "" {
|
||||
connStr += fmt.Sprintf(" host=%s port=%d", currentHost, currentPort)
|
||||
}
|
||||
|
||||
// Query using dblink - get all data in one query with JOIN
|
||||
// Escape single quotes in SQL string for dblink (double them)
|
||||
innerQuery := `SELECT
|
||||
j.JobId,
|
||||
j.Name,
|
||||
j.Type,
|
||||
j.Level,
|
||||
j.JobStatus,
|
||||
j.JobBytes,
|
||||
j.JobFiles,
|
||||
j.StartTime,
|
||||
j.EndTime,
|
||||
COALESCE(c.Name, 'unknown') as ClientName
|
||||
FROM Job j
|
||||
LEFT JOIN Client c ON j.ClientId = c.ClientId
|
||||
ORDER BY j.StartTime DESC
|
||||
LIMIT 1000`
|
||||
|
||||
// Escape single quotes in inner query for dblink (double them)
|
||||
escapedQuery := strings.ReplaceAll(innerQuery, "'", "''")
|
||||
|
||||
query := fmt.Sprintf(`
|
||||
SELECT
|
||||
JobId as job_id,
|
||||
Name as job_name,
|
||||
ClientName as client_name,
|
||||
CASE
|
||||
WHEN Type = 'B' THEN 'Backup'
|
||||
WHEN Type = 'R' THEN 'Restore'
|
||||
WHEN Type = 'V' THEN 'Verify'
|
||||
WHEN Type = 'C' THEN 'Copy'
|
||||
WHEN Type = 'M' THEN 'Migrate'
|
||||
ELSE 'Backup'
|
||||
END as job_type,
|
||||
CASE
|
||||
WHEN Level = 'F' THEN 'Full'
|
||||
WHEN Level = 'I' THEN 'Incremental'
|
||||
WHEN Level = 'D' THEN 'Differential'
|
||||
WHEN Level = 'S' THEN 'Since'
|
||||
ELSE 'Full'
|
||||
END as job_level,
|
||||
CASE
|
||||
WHEN JobStatus = 'T' THEN 'Running'
|
||||
WHEN JobStatus = 'C' THEN 'Completed'
|
||||
WHEN JobStatus = 'f' OR JobStatus = 'F' THEN 'Failed'
|
||||
WHEN JobStatus = 'A' THEN 'Canceled'
|
||||
WHEN JobStatus = 'W' THEN 'Waiting'
|
||||
ELSE 'Waiting'
|
||||
END as status,
|
||||
COALESCE(JobBytes, 0) as bytes_written,
|
||||
COALESCE(JobFiles, 0) as files_written,
|
||||
StartTime as started_at,
|
||||
EndTime as ended_at
|
||||
FROM dblink('%s', '%s') AS t(JobId int, Name text, Type char, Level char, JobStatus char, JobBytes bigint, JobFiles int, StartTime timestamp, EndTime timestamp, ClientName text)
|
||||
`, connStr, escapedQuery)
|
||||
|
||||
rows, err := s.db.QueryContext(ctx, query)
|
||||
if err != nil {
|
||||
s.logger.Error("Failed to query Bacula via dblink", "database", dbName, "connection", connStr, "error", err)
|
||||
continue
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var jobs []Job
|
||||
for rows.Next() {
|
||||
var job Job
|
||||
var startedAt, endedAt sql.NullTime
|
||||
|
||||
err := rows.Scan(
|
||||
&job.JobID, &job.JobName, &job.ClientName,
|
||||
&job.JobType, &job.JobLevel, &job.Status,
|
||||
&job.BytesWritten, &job.FilesWritten, &startedAt, &endedAt,
|
||||
)
|
||||
if err != nil {
|
||||
s.logger.Error("Failed to scan Bacula job from dblink", "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if startedAt.Valid {
|
||||
job.StartedAt = &startedAt.Time
|
||||
}
|
||||
if endedAt.Valid {
|
||||
job.EndedAt = &endedAt.Time
|
||||
// Calculate duration
|
||||
if job.StartedAt != nil {
|
||||
duration := int(endedAt.Time.Sub(*job.StartedAt).Seconds())
|
||||
job.DurationSeconds = &duration
|
||||
}
|
||||
}
|
||||
|
||||
jobs = append(jobs, job)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
s.logger.Debug("Error iterating dblink results", "database", dbName, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if len(jobs) > 0 {
|
||||
s.logger.Info("Successfully queried Bacula database via dblink", "database", dbName, "count", len(jobs))
|
||||
return jobs, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("failed to query Bacula database via dblink from any database")
|
||||
}
|
||||
|
||||
// syncFromBconsole syncs jobs using bconsole command (fallback method)
|
||||
func (s *Service) syncFromBconsole(ctx context.Context) error {
|
||||
// Execute bconsole command to list jobs
|
||||
@@ -651,16 +456,19 @@ func (s *Service) upsertJob(ctx context.Context, job Job) error {
|
||||
query := `
|
||||
INSERT INTO backup_jobs (
|
||||
job_id, job_name, client_name, job_type, job_level, status,
|
||||
bytes_written, files_written, started_at, updated_at
|
||||
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, NOW())
|
||||
bytes_written, files_written, started_at, ended_at, duration_seconds, updated_at
|
||||
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, NOW())
|
||||
ON CONFLICT (job_id) DO UPDATE SET
|
||||
job_name = EXCLUDED.job_name,
|
||||
client_name = EXCLUDED.client_name,
|
||||
job_type = EXCLUDED.job_type,
|
||||
job_level = EXCLUDED.job_level,
|
||||
status = EXCLUDED.status,
|
||||
bytes_written = EXCLUDED.bytes_written,
|
||||
files_written = EXCLUDED.files_written,
|
||||
started_at = EXCLUDED.started_at,
|
||||
ended_at = EXCLUDED.ended_at,
|
||||
duration_seconds = EXCLUDED.duration_seconds,
|
||||
updated_at = NOW()
|
||||
`
|
||||
|
||||
@@ -670,12 +478,20 @@ func (s *Service) upsertJob(ctx context.Context, job Job) error {
|
||||
clientName = "unknown"
|
||||
}
|
||||
|
||||
_, err := s.db.ExecContext(ctx, query,
|
||||
result, err := s.db.ExecContext(ctx, query,
|
||||
job.JobID, job.JobName, clientName, job.JobType, job.JobLevel, job.Status,
|
||||
job.BytesWritten, job.FilesWritten, job.StartedAt,
|
||||
job.BytesWritten, job.FilesWritten, job.StartedAt, job.EndedAt, job.DurationSeconds,
|
||||
)
|
||||
|
||||
return err
|
||||
if err != nil {
|
||||
s.logger.Error("Database error in upsertJob", "job_id", job.JobID, "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
rowsAffected, _ := result.RowsAffected()
|
||||
s.logger.Debug("Upserted job to database", "job_id", job.JobID, "rows_affected", rowsAffected)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListJobs lists backup jobs with optional filters
|
||||
@@ -683,10 +499,13 @@ func (s *Service) ListJobs(ctx context.Context, opts ListJobsOptions) ([]Job, in
|
||||
// Try to sync jobs from Bacula first (non-blocking - if it fails, continue with database)
|
||||
// Don't return error if sync fails, just log it and continue
|
||||
// This allows the API to work even if bconsole is not available
|
||||
s.logger.Info("ListJobs called, syncing from Bacula first")
|
||||
syncErr := s.SyncJobsFromBacula(ctx)
|
||||
if syncErr != nil {
|
||||
s.logger.Debug("Failed to sync jobs from Bacula, using database only", "error", syncErr)
|
||||
s.logger.Warn("Failed to sync jobs from Bacula, using database only", "error", syncErr)
|
||||
// Continue anyway - we'll use whatever is in the database
|
||||
} else {
|
||||
s.logger.Info("Successfully synced jobs from Bacula")
|
||||
}
|
||||
|
||||
// Build WHERE clause
|
||||
|
||||
@@ -0,0 +1,209 @@
|
||||
-- AtlasOS - Calypso
|
||||
-- PostgreSQL Function to Sync Jobs from Bacula to Calypso
|
||||
-- Version: 11.0
|
||||
--
|
||||
-- This function syncs jobs from Bacula database (Job table) to Calypso database (backup_jobs table)
|
||||
-- Uses dblink extension to query Bacula database from Calypso database
|
||||
--
|
||||
-- Prerequisites:
|
||||
-- 1. dblink extension must be installed: CREATE EXTENSION IF NOT EXISTS dblink;
|
||||
-- 2. User must have access to both databases
|
||||
-- 3. Connection parameters must be configured in the function
|
||||
|
||||
-- Create function to sync jobs from Bacula to Calypso
|
||||
CREATE OR REPLACE FUNCTION sync_bacula_jobs(
|
||||
bacula_db_name TEXT DEFAULT 'bacula',
|
||||
bacula_host TEXT DEFAULT 'localhost',
|
||||
bacula_port INTEGER DEFAULT 5432,
|
||||
bacula_user TEXT DEFAULT 'calypso',
|
||||
bacula_password TEXT DEFAULT ''
|
||||
)
|
||||
RETURNS TABLE(
|
||||
jobs_synced INTEGER,
|
||||
jobs_inserted INTEGER,
|
||||
jobs_updated INTEGER,
|
||||
errors INTEGER
|
||||
) AS $$
|
||||
DECLARE
|
||||
conn_str TEXT;
|
||||
jobs_count INTEGER := 0;
|
||||
inserted_count INTEGER := 0;
|
||||
updated_count INTEGER := 0;
|
||||
error_count INTEGER := 0;
|
||||
job_record RECORD;
|
||||
BEGIN
|
||||
-- Build dblink connection string
|
||||
conn_str := format(
|
||||
'dbname=%s host=%s port=%s user=%s password=%s',
|
||||
bacula_db_name,
|
||||
bacula_host,
|
||||
bacula_port,
|
||||
bacula_user,
|
||||
bacula_password
|
||||
);
|
||||
|
||||
-- Query jobs from Bacula database using dblink
|
||||
FOR job_record IN
|
||||
SELECT * FROM dblink(
|
||||
conn_str,
|
||||
$QUERY$
|
||||
SELECT
|
||||
j.JobId,
|
||||
j.Name as job_name,
|
||||
COALESCE(c.Name, 'unknown') as client_name,
|
||||
CASE
|
||||
WHEN j.Type = 'B' THEN 'Backup'
|
||||
WHEN j.Type = 'R' THEN 'Restore'
|
||||
WHEN j.Type = 'V' THEN 'Verify'
|
||||
WHEN j.Type = 'C' THEN 'Copy'
|
||||
WHEN j.Type = 'M' THEN 'Migrate'
|
||||
ELSE 'Backup'
|
||||
END as job_type,
|
||||
CASE
|
||||
WHEN j.Level = 'F' THEN 'Full'
|
||||
WHEN j.Level = 'I' THEN 'Incremental'
|
||||
WHEN j.Level = 'D' THEN 'Differential'
|
||||
WHEN j.Level = 'S' THEN 'Since'
|
||||
ELSE 'Full'
|
||||
END as job_level,
|
||||
CASE
|
||||
WHEN j.JobStatus = 'T' THEN 'Running'
|
||||
WHEN j.JobStatus = 'C' THEN 'Completed'
|
||||
WHEN j.JobStatus = 'f' OR j.JobStatus = 'F' THEN 'Failed'
|
||||
WHEN j.JobStatus = 'A' THEN 'Canceled'
|
||||
WHEN j.JobStatus = 'W' THEN 'Waiting'
|
||||
ELSE 'Waiting'
|
||||
END as status,
|
||||
COALESCE(j.JobBytes, 0) as bytes_written,
|
||||
COALESCE(j.JobFiles, 0) as files_written,
|
||||
j.StartTime as started_at,
|
||||
j.EndTime as ended_at,
|
||||
CASE
|
||||
WHEN j.EndTime IS NOT NULL AND j.StartTime IS NOT NULL
|
||||
THEN EXTRACT(EPOCH FROM (j.EndTime - j.StartTime))::INTEGER
|
||||
ELSE NULL
|
||||
END as duration_seconds
|
||||
FROM Job j
|
||||
LEFT JOIN Client c ON j.ClientId = c.ClientId
|
||||
ORDER BY j.StartTime DESC
|
||||
LIMIT 1000
|
||||
$QUERY$
|
||||
) AS t(
|
||||
job_id INTEGER,
|
||||
job_name TEXT,
|
||||
client_name TEXT,
|
||||
job_type TEXT,
|
||||
job_level TEXT,
|
||||
status TEXT,
|
||||
bytes_written BIGINT,
|
||||
files_written INTEGER,
|
||||
started_at TIMESTAMP,
|
||||
ended_at TIMESTAMP,
|
||||
duration_seconds INTEGER
|
||||
)
|
||||
LOOP
|
||||
BEGIN
|
||||
-- Check if job already exists (before insert/update)
|
||||
IF EXISTS (SELECT 1 FROM backup_jobs WHERE job_id = job_record.job_id) THEN
|
||||
updated_count := updated_count + 1;
|
||||
ELSE
|
||||
inserted_count := inserted_count + 1;
|
||||
END IF;
|
||||
|
||||
-- Upsert job to backup_jobs table
|
||||
INSERT INTO backup_jobs (
|
||||
job_id, job_name, client_name, job_type, job_level, status,
|
||||
bytes_written, files_written, started_at, ended_at, duration_seconds,
|
||||
updated_at
|
||||
) VALUES (
|
||||
job_record.job_id,
|
||||
job_record.job_name,
|
||||
job_record.client_name,
|
||||
job_record.job_type,
|
||||
job_record.job_level,
|
||||
job_record.status,
|
||||
job_record.bytes_written,
|
||||
job_record.files_written,
|
||||
job_record.started_at,
|
||||
job_record.ended_at,
|
||||
job_record.duration_seconds,
|
||||
NOW()
|
||||
)
|
||||
ON CONFLICT (job_id) DO UPDATE SET
|
||||
job_name = EXCLUDED.job_name,
|
||||
client_name = EXCLUDED.client_name,
|
||||
job_type = EXCLUDED.job_type,
|
||||
job_level = EXCLUDED.job_level,
|
||||
status = EXCLUDED.status,
|
||||
bytes_written = EXCLUDED.bytes_written,
|
||||
files_written = EXCLUDED.files_written,
|
||||
started_at = EXCLUDED.started_at,
|
||||
ended_at = EXCLUDED.ended_at,
|
||||
duration_seconds = EXCLUDED.duration_seconds,
|
||||
updated_at = NOW();
|
||||
|
||||
jobs_count := jobs_count + 1;
|
||||
EXCEPTION
|
||||
WHEN OTHERS THEN
|
||||
error_count := error_count + 1;
|
||||
-- Log error but continue with next job
|
||||
RAISE WARNING 'Error syncing job %: %', job_record.job_id, SQLERRM;
|
||||
END;
|
||||
END LOOP;
|
||||
|
||||
-- Return summary
|
||||
RETURN QUERY SELECT jobs_count, inserted_count, updated_count, error_count;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- Create a simpler version that uses current database connection settings
|
||||
-- This version assumes Bacula is on same host/port with same user
|
||||
CREATE OR REPLACE FUNCTION sync_bacula_jobs_simple()
|
||||
RETURNS TABLE(
|
||||
jobs_synced INTEGER,
|
||||
jobs_inserted INTEGER,
|
||||
jobs_updated INTEGER,
|
||||
errors INTEGER
|
||||
) AS $$
|
||||
DECLARE
|
||||
current_user_name TEXT;
|
||||
current_host TEXT;
|
||||
current_port INTEGER;
|
||||
current_db TEXT;
|
||||
BEGIN
|
||||
-- Get current connection info
|
||||
SELECT
|
||||
current_user,
|
||||
COALESCE(inet_server_addr()::TEXT, 'localhost'),
|
||||
COALESCE(inet_server_port(), 5432),
|
||||
current_database()
|
||||
INTO
|
||||
current_user_name,
|
||||
current_host,
|
||||
current_port,
|
||||
current_db;
|
||||
|
||||
-- Call main function with current connection settings
|
||||
-- Note: password needs to be passed or configured in .pgpass
|
||||
RETURN QUERY
|
||||
SELECT * FROM sync_bacula_jobs(
|
||||
'bacula', -- Try 'bacula' first
|
||||
current_host,
|
||||
current_port,
|
||||
current_user_name,
|
||||
'' -- Empty password - will use .pgpass or peer authentication
|
||||
);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- Grant execute permission to calypso user
|
||||
GRANT EXECUTE ON FUNCTION sync_bacula_jobs(TEXT, TEXT, INTEGER, TEXT, TEXT) TO calypso;
|
||||
GRANT EXECUTE ON FUNCTION sync_bacula_jobs_simple() TO calypso;
|
||||
|
||||
-- Create index if not exists (should already exist from migration 009)
|
||||
CREATE INDEX IF NOT EXISTS idx_backup_jobs_job_id ON backup_jobs(job_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_backup_jobs_updated_at ON backup_jobs(updated_at);
|
||||
|
||||
COMMENT ON FUNCTION sync_bacula_jobs IS 'Syncs jobs from Bacula database to Calypso backup_jobs table using dblink';
|
||||
COMMENT ON FUNCTION sync_bacula_jobs_simple IS 'Simplified version that uses current connection settings (requires .pgpass for password)';
|
||||
|
||||
@@ -0,0 +1,209 @@
|
||||
-- AtlasOS - Calypso
|
||||
-- PostgreSQL Function to Sync Jobs from Bacula to Calypso
|
||||
-- Version: 11.0
|
||||
--
|
||||
-- This function syncs jobs from Bacula database (Job table) to Calypso database (backup_jobs table)
|
||||
-- Uses dblink extension to query Bacula database from Calypso database
|
||||
--
|
||||
-- Prerequisites:
|
||||
-- 1. dblink extension must be installed: CREATE EXTENSION IF NOT EXISTS dblink;
|
||||
-- 2. User must have access to both databases
|
||||
-- 3. Connection parameters must be configured in the function
|
||||
|
||||
-- Create function to sync jobs from Bacula to Calypso
|
||||
CREATE OR REPLACE FUNCTION sync_bacula_jobs(
|
||||
bacula_db_name TEXT DEFAULT 'bacula',
|
||||
bacula_host TEXT DEFAULT 'localhost',
|
||||
bacula_port INTEGER DEFAULT 5432,
|
||||
bacula_user TEXT DEFAULT 'calypso',
|
||||
bacula_password TEXT DEFAULT ''
|
||||
)
|
||||
RETURNS TABLE(
|
||||
jobs_synced INTEGER,
|
||||
jobs_inserted INTEGER,
|
||||
jobs_updated INTEGER,
|
||||
errors INTEGER
|
||||
) AS $$
|
||||
DECLARE
|
||||
conn_str TEXT;
|
||||
jobs_count INTEGER := 0;
|
||||
inserted_count INTEGER := 0;
|
||||
updated_count INTEGER := 0;
|
||||
error_count INTEGER := 0;
|
||||
job_record RECORD;
|
||||
BEGIN
|
||||
-- Build dblink connection string
|
||||
conn_str := format(
|
||||
'dbname=%s host=%s port=%s user=%s password=%s',
|
||||
bacula_db_name,
|
||||
bacula_host,
|
||||
bacula_port,
|
||||
bacula_user,
|
||||
bacula_password
|
||||
);
|
||||
|
||||
-- Query jobs from Bacula database using dblink
|
||||
FOR job_record IN
|
||||
SELECT * FROM dblink(
|
||||
conn_str,
|
||||
$$
|
||||
SELECT
|
||||
j.JobId,
|
||||
j.Name as job_name,
|
||||
COALESCE(c.Name, 'unknown') as client_name,
|
||||
CASE
|
||||
WHEN j.Type = 'B' THEN 'Backup'
|
||||
WHEN j.Type = 'R' THEN 'Restore'
|
||||
WHEN j.Type = 'V' THEN 'Verify'
|
||||
WHEN j.Type = 'C' THEN 'Copy'
|
||||
WHEN j.Type = 'M' THEN 'Migrate'
|
||||
ELSE 'Backup'
|
||||
END as job_type,
|
||||
CASE
|
||||
WHEN j.Level = 'F' THEN 'Full'
|
||||
WHEN j.Level = 'I' THEN 'Incremental'
|
||||
WHEN j.Level = 'D' THEN 'Differential'
|
||||
WHEN j.Level = 'S' THEN 'Since'
|
||||
ELSE 'Full'
|
||||
END as job_level,
|
||||
CASE
|
||||
WHEN j.JobStatus = 'T' THEN 'Running'
|
||||
WHEN j.JobStatus = 'C' THEN 'Completed'
|
||||
WHEN j.JobStatus = 'f' OR j.JobStatus = 'F' THEN 'Failed'
|
||||
WHEN j.JobStatus = 'A' THEN 'Canceled'
|
||||
WHEN j.JobStatus = 'W' THEN 'Waiting'
|
||||
ELSE 'Waiting'
|
||||
END as status,
|
||||
COALESCE(j.JobBytes, 0) as bytes_written,
|
||||
COALESCE(j.JobFiles, 0) as files_written,
|
||||
j.StartTime as started_at,
|
||||
j.EndTime as ended_at,
|
||||
CASE
|
||||
WHEN j.EndTime IS NOT NULL AND j.StartTime IS NOT NULL
|
||||
THEN EXTRACT(EPOCH FROM (j.EndTime - j.StartTime))::INTEGER
|
||||
ELSE NULL
|
||||
END as duration_seconds
|
||||
FROM Job j
|
||||
LEFT JOIN Client c ON j.ClientId = c.ClientId
|
||||
ORDER BY j.StartTime DESC
|
||||
LIMIT 1000
|
||||
$$
|
||||
) AS t(
|
||||
job_id INTEGER,
|
||||
job_name TEXT,
|
||||
client_name TEXT,
|
||||
job_type TEXT,
|
||||
job_level TEXT,
|
||||
status TEXT,
|
||||
bytes_written BIGINT,
|
||||
files_written INTEGER,
|
||||
started_at TIMESTAMP,
|
||||
ended_at TIMESTAMP,
|
||||
duration_seconds INTEGER
|
||||
)
|
||||
LOOP
|
||||
BEGIN
|
||||
-- Check if job already exists (before insert/update)
|
||||
IF EXISTS (SELECT 1 FROM backup_jobs WHERE job_id = job_record.job_id) THEN
|
||||
updated_count := updated_count + 1;
|
||||
ELSE
|
||||
inserted_count := inserted_count + 1;
|
||||
END IF;
|
||||
|
||||
-- Upsert job to backup_jobs table
|
||||
INSERT INTO backup_jobs (
|
||||
job_id, job_name, client_name, job_type, job_level, status,
|
||||
bytes_written, files_written, started_at, ended_at, duration_seconds,
|
||||
updated_at
|
||||
) VALUES (
|
||||
job_record.job_id,
|
||||
job_record.job_name,
|
||||
job_record.client_name,
|
||||
job_record.job_type,
|
||||
job_record.job_level,
|
||||
job_record.status,
|
||||
job_record.bytes_written,
|
||||
job_record.files_written,
|
||||
job_record.started_at,
|
||||
job_record.ended_at,
|
||||
job_record.duration_seconds,
|
||||
NOW()
|
||||
)
|
||||
ON CONFLICT (job_id) DO UPDATE SET
|
||||
job_name = EXCLUDED.job_name,
|
||||
client_name = EXCLUDED.client_name,
|
||||
job_type = EXCLUDED.job_type,
|
||||
job_level = EXCLUDED.job_level,
|
||||
status = EXCLUDED.status,
|
||||
bytes_written = EXCLUDED.bytes_written,
|
||||
files_written = EXCLUDED.files_written,
|
||||
started_at = EXCLUDED.started_at,
|
||||
ended_at = EXCLUDED.ended_at,
|
||||
duration_seconds = EXCLUDED.duration_seconds,
|
||||
updated_at = NOW();
|
||||
|
||||
jobs_count := jobs_count + 1;
|
||||
EXCEPTION
|
||||
WHEN OTHERS THEN
|
||||
error_count := error_count + 1;
|
||||
-- Log error but continue with next job
|
||||
RAISE WARNING 'Error syncing job %: %', job_record.job_id, SQLERRM;
|
||||
END;
|
||||
END LOOP;
|
||||
|
||||
-- Return summary
|
||||
RETURN QUERY SELECT jobs_count, inserted_count, updated_count, error_count;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- Create a simpler version that uses current database connection settings
|
||||
-- This version assumes Bacula is on same host/port with same user
|
||||
CREATE OR REPLACE FUNCTION sync_bacula_jobs_simple()
|
||||
RETURNS TABLE(
|
||||
jobs_synced INTEGER,
|
||||
jobs_inserted INTEGER,
|
||||
jobs_updated INTEGER,
|
||||
errors INTEGER
|
||||
) AS $$
|
||||
DECLARE
|
||||
current_user_name TEXT;
|
||||
current_host TEXT;
|
||||
current_port INTEGER;
|
||||
current_db TEXT;
|
||||
BEGIN
|
||||
-- Get current connection info
|
||||
SELECT
|
||||
current_user,
|
||||
COALESCE(inet_server_addr()::TEXT, 'localhost'),
|
||||
COALESCE(inet_server_port(), 5432),
|
||||
current_database()
|
||||
INTO
|
||||
current_user_name,
|
||||
current_host,
|
||||
current_port,
|
||||
current_db;
|
||||
|
||||
-- Call main function with current connection settings
|
||||
-- Note: password needs to be passed or configured in .pgpass
|
||||
RETURN QUERY
|
||||
SELECT * FROM sync_bacula_jobs(
|
||||
'bacula', -- Try 'bacula' first
|
||||
current_host,
|
||||
current_port,
|
||||
current_user_name,
|
||||
'' -- Empty password - will use .pgpass or peer authentication
|
||||
);
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- Grant execute permission to calypso user
|
||||
GRANT EXECUTE ON FUNCTION sync_bacula_jobs(TEXT, TEXT, INTEGER, TEXT, TEXT) TO calypso;
|
||||
GRANT EXECUTE ON FUNCTION sync_bacula_jobs_simple() TO calypso;
|
||||
|
||||
-- Create index if not exists (should already exist from migration 009)
|
||||
CREATE INDEX IF NOT EXISTS idx_backup_jobs_job_id ON backup_jobs(job_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_backup_jobs_updated_at ON backup_jobs(updated_at);
|
||||
|
||||
COMMENT ON FUNCTION sync_bacula_jobs IS 'Syncs jobs from Bacula database to Calypso backup_jobs table using dblink';
|
||||
COMMENT ON FUNCTION sync_bacula_jobs_simple IS 'Simplified version that uses current connection settings (requires .pgpass for password)';
|
||||
|
||||
@@ -315,8 +315,17 @@ func NewRouter(cfg *config.Config, db *database.DB, log *logger.Logger) *gin.Eng
|
||||
|
||||
// Backup Jobs
|
||||
backupService := backup.NewService(db, log)
|
||||
// Set database password for dblink connections
|
||||
backupService.SetDatabasePassword(cfg.Database.Password)
|
||||
// Set up direct connection to Bacula database
|
||||
// Try common Bacula database names
|
||||
baculaDBName := "bacula" // Default
|
||||
if err := backupService.SetBaculaDatabase(cfg.Database, baculaDBName); err != nil {
|
||||
log.Warn("Failed to connect to Bacula database, trying 'bareos'", "error", err)
|
||||
// Try 'bareos' as alternative
|
||||
if err := backupService.SetBaculaDatabase(cfg.Database, "bareos"); err != nil {
|
||||
log.Error("Failed to connect to Bacula database", "error", err, "tried", []string{"bacula", "bareos"})
|
||||
// Continue anyway - will fallback to bconsole
|
||||
}
|
||||
}
|
||||
backupHandler := backup.NewHandler(backupService, log)
|
||||
backupGroup := protected.Group("/backup")
|
||||
backupGroup.Use(requirePermission("backup", "read"))
|
||||
|
||||
Reference in New Issue
Block a user