-- AtlasOS - Calypso -- PostgreSQL Function to Sync Jobs from Bacula to Calypso -- Version: 11.0 -- -- This function syncs jobs from Bacula database (Job table) to Calypso database (backup_jobs table) -- Uses dblink extension to query Bacula database from Calypso database -- -- Prerequisites: -- 1. dblink extension must be installed: CREATE EXTENSION IF NOT EXISTS dblink; -- 2. User must have access to both databases -- 3. Connection parameters must be configured in the function -- Create function to sync jobs from Bacula to Calypso CREATE OR REPLACE FUNCTION sync_bacula_jobs( bacula_db_name TEXT DEFAULT 'bacula', bacula_host TEXT DEFAULT 'localhost', bacula_port INTEGER DEFAULT 5432, bacula_user TEXT DEFAULT 'calypso', bacula_password TEXT DEFAULT '' ) RETURNS TABLE( jobs_synced INTEGER, jobs_inserted INTEGER, jobs_updated INTEGER, errors INTEGER ) AS $$ DECLARE conn_str TEXT; jobs_count INTEGER := 0; inserted_count INTEGER := 0; updated_count INTEGER := 0; error_count INTEGER := 0; job_record RECORD; BEGIN -- Build dblink connection string conn_str := format( 'dbname=%s host=%s port=%s user=%s password=%s', bacula_db_name, bacula_host, bacula_port, bacula_user, bacula_password ); -- Query jobs from Bacula database using dblink FOR job_record IN SELECT * FROM dblink( conn_str, $QUERY$ SELECT j.JobId, j.Name as job_name, COALESCE(c.Name, 'unknown') as client_name, CASE WHEN j.Type = 'B' THEN 'Backup' WHEN j.Type = 'R' THEN 'Restore' WHEN j.Type = 'V' THEN 'Verify' WHEN j.Type = 'C' THEN 'Copy' WHEN j.Type = 'M' THEN 'Migrate' ELSE 'Backup' END as job_type, CASE WHEN j.Level = 'F' THEN 'Full' WHEN j.Level = 'I' THEN 'Incremental' WHEN j.Level = 'D' THEN 'Differential' WHEN j.Level = 'S' THEN 'Since' ELSE 'Full' END as job_level, CASE WHEN j.JobStatus = 'T' THEN 'Running' WHEN j.JobStatus = 'C' THEN 'Completed' WHEN j.JobStatus = 'f' OR j.JobStatus = 'F' THEN 'Failed' WHEN j.JobStatus = 'A' THEN 'Canceled' WHEN j.JobStatus = 'W' THEN 'Waiting' ELSE 'Waiting' END as status, COALESCE(j.JobBytes, 0) as bytes_written, COALESCE(j.JobFiles, 0) as files_written, j.StartTime as started_at, j.EndTime as ended_at, CASE WHEN j.EndTime IS NOT NULL AND j.StartTime IS NOT NULL THEN EXTRACT(EPOCH FROM (j.EndTime - j.StartTime))::INTEGER ELSE NULL END as duration_seconds FROM Job j LEFT JOIN Client c ON j.ClientId = c.ClientId ORDER BY j.StartTime DESC LIMIT 1000 $QUERY$ ) AS t( job_id INTEGER, job_name TEXT, client_name TEXT, job_type TEXT, job_level TEXT, status TEXT, bytes_written BIGINT, files_written INTEGER, started_at TIMESTAMP, ended_at TIMESTAMP, duration_seconds INTEGER ) LOOP BEGIN -- Check if job already exists (before insert/update) IF EXISTS (SELECT 1 FROM backup_jobs WHERE job_id = job_record.job_id) THEN updated_count := updated_count + 1; ELSE inserted_count := inserted_count + 1; END IF; -- Upsert job to backup_jobs table INSERT INTO backup_jobs ( job_id, job_name, client_name, job_type, job_level, status, bytes_written, files_written, started_at, ended_at, duration_seconds, updated_at ) VALUES ( job_record.job_id, job_record.job_name, job_record.client_name, job_record.job_type, job_record.job_level, job_record.status, job_record.bytes_written, job_record.files_written, job_record.started_at, job_record.ended_at, job_record.duration_seconds, NOW() ) ON CONFLICT (job_id) DO UPDATE SET job_name = EXCLUDED.job_name, client_name = EXCLUDED.client_name, job_type = EXCLUDED.job_type, job_level = EXCLUDED.job_level, status = EXCLUDED.status, bytes_written = EXCLUDED.bytes_written, files_written = EXCLUDED.files_written, started_at = EXCLUDED.started_at, ended_at = EXCLUDED.ended_at, duration_seconds = EXCLUDED.duration_seconds, updated_at = NOW(); jobs_count := jobs_count + 1; EXCEPTION WHEN OTHERS THEN error_count := error_count + 1; -- Log error but continue with next job RAISE WARNING 'Error syncing job %: %', job_record.job_id, SQLERRM; END; END LOOP; -- Return summary RETURN QUERY SELECT jobs_count, inserted_count, updated_count, error_count; END; $$ LANGUAGE plpgsql; -- Create a simpler version that uses current database connection settings -- This version assumes Bacula is on same host/port with same user CREATE OR REPLACE FUNCTION sync_bacula_jobs_simple() RETURNS TABLE( jobs_synced INTEGER, jobs_inserted INTEGER, jobs_updated INTEGER, errors INTEGER ) AS $$ DECLARE current_user_name TEXT; current_host TEXT; current_port INTEGER; current_db TEXT; BEGIN -- Get current connection info SELECT current_user, COALESCE(inet_server_addr()::TEXT, 'localhost'), COALESCE(inet_server_port(), 5432), current_database() INTO current_user_name, current_host, current_port, current_db; -- Call main function with current connection settings -- Note: password needs to be passed or configured in .pgpass RETURN QUERY SELECT * FROM sync_bacula_jobs( 'bacula', -- Try 'bacula' first current_host, current_port, current_user_name, '' -- Empty password - will use .pgpass or peer authentication ); END; $$ LANGUAGE plpgsql; -- Grant execute permission to calypso user GRANT EXECUTE ON FUNCTION sync_bacula_jobs(TEXT, TEXT, INTEGER, TEXT, TEXT) TO calypso; GRANT EXECUTE ON FUNCTION sync_bacula_jobs_simple() TO calypso; -- Create index if not exists (should already exist from migration 009) CREATE INDEX IF NOT EXISTS idx_backup_jobs_job_id ON backup_jobs(job_id); CREATE INDEX IF NOT EXISTS idx_backup_jobs_updated_at ON backup_jobs(updated_at); COMMENT ON FUNCTION sync_bacula_jobs IS 'Syncs jobs from Bacula database to Calypso backup_jobs table using dblink'; COMMENT ON FUNCTION sync_bacula_jobs_simple IS 'Simplified version that uses current connection settings (requires .pgpass for password)';