210 lines
7.5 KiB
PL/PgSQL
210 lines
7.5 KiB
PL/PgSQL
-- AtlasOS - Calypso
|
|
-- PostgreSQL Function to Sync Jobs from Bacula to Calypso
|
|
-- Version: 11.0
|
|
--
|
|
-- This function syncs jobs from Bacula database (Job table) to Calypso database (backup_jobs table)
|
|
-- Uses dblink extension to query Bacula database from Calypso database
|
|
--
|
|
-- Prerequisites:
|
|
-- 1. dblink extension must be installed: CREATE EXTENSION IF NOT EXISTS dblink;
|
|
-- 2. User must have access to both databases
|
|
-- 3. Connection parameters must be configured in the function
|
|
|
|
-- Create function to sync jobs from Bacula to Calypso
|
|
CREATE OR REPLACE FUNCTION sync_bacula_jobs(
|
|
bacula_db_name TEXT DEFAULT 'bacula',
|
|
bacula_host TEXT DEFAULT 'localhost',
|
|
bacula_port INTEGER DEFAULT 5432,
|
|
bacula_user TEXT DEFAULT 'calypso',
|
|
bacula_password TEXT DEFAULT ''
|
|
)
|
|
RETURNS TABLE(
|
|
jobs_synced INTEGER,
|
|
jobs_inserted INTEGER,
|
|
jobs_updated INTEGER,
|
|
errors INTEGER
|
|
) AS $$
|
|
DECLARE
|
|
conn_str TEXT;
|
|
jobs_count INTEGER := 0;
|
|
inserted_count INTEGER := 0;
|
|
updated_count INTEGER := 0;
|
|
error_count INTEGER := 0;
|
|
job_record RECORD;
|
|
BEGIN
|
|
-- Build dblink connection string
|
|
conn_str := format(
|
|
'dbname=%s host=%s port=%s user=%s password=%s',
|
|
bacula_db_name,
|
|
bacula_host,
|
|
bacula_port,
|
|
bacula_user,
|
|
bacula_password
|
|
);
|
|
|
|
-- Query jobs from Bacula database using dblink
|
|
FOR job_record IN
|
|
SELECT * FROM dblink(
|
|
conn_str,
|
|
$QUERY$
|
|
SELECT
|
|
j.JobId,
|
|
j.Name as job_name,
|
|
COALESCE(c.Name, 'unknown') as client_name,
|
|
CASE
|
|
WHEN j.Type = 'B' THEN 'Backup'
|
|
WHEN j.Type = 'R' THEN 'Restore'
|
|
WHEN j.Type = 'V' THEN 'Verify'
|
|
WHEN j.Type = 'C' THEN 'Copy'
|
|
WHEN j.Type = 'M' THEN 'Migrate'
|
|
ELSE 'Backup'
|
|
END as job_type,
|
|
CASE
|
|
WHEN j.Level = 'F' THEN 'Full'
|
|
WHEN j.Level = 'I' THEN 'Incremental'
|
|
WHEN j.Level = 'D' THEN 'Differential'
|
|
WHEN j.Level = 'S' THEN 'Since'
|
|
ELSE 'Full'
|
|
END as job_level,
|
|
CASE
|
|
WHEN j.JobStatus = 'T' THEN 'Running'
|
|
WHEN j.JobStatus = 'C' THEN 'Completed'
|
|
WHEN j.JobStatus = 'f' OR j.JobStatus = 'F' THEN 'Failed'
|
|
WHEN j.JobStatus = 'A' THEN 'Canceled'
|
|
WHEN j.JobStatus = 'W' THEN 'Waiting'
|
|
ELSE 'Waiting'
|
|
END as status,
|
|
COALESCE(j.JobBytes, 0) as bytes_written,
|
|
COALESCE(j.JobFiles, 0) as files_written,
|
|
j.StartTime as started_at,
|
|
j.EndTime as ended_at,
|
|
CASE
|
|
WHEN j.EndTime IS NOT NULL AND j.StartTime IS NOT NULL
|
|
THEN EXTRACT(EPOCH FROM (j.EndTime - j.StartTime))::INTEGER
|
|
ELSE NULL
|
|
END as duration_seconds
|
|
FROM Job j
|
|
LEFT JOIN Client c ON j.ClientId = c.ClientId
|
|
ORDER BY j.StartTime DESC
|
|
LIMIT 1000
|
|
$QUERY$
|
|
) AS t(
|
|
job_id INTEGER,
|
|
job_name TEXT,
|
|
client_name TEXT,
|
|
job_type TEXT,
|
|
job_level TEXT,
|
|
status TEXT,
|
|
bytes_written BIGINT,
|
|
files_written INTEGER,
|
|
started_at TIMESTAMP,
|
|
ended_at TIMESTAMP,
|
|
duration_seconds INTEGER
|
|
)
|
|
LOOP
|
|
BEGIN
|
|
-- Check if job already exists (before insert/update)
|
|
IF EXISTS (SELECT 1 FROM backup_jobs WHERE job_id = job_record.job_id) THEN
|
|
updated_count := updated_count + 1;
|
|
ELSE
|
|
inserted_count := inserted_count + 1;
|
|
END IF;
|
|
|
|
-- Upsert job to backup_jobs table
|
|
INSERT INTO backup_jobs (
|
|
job_id, job_name, client_name, job_type, job_level, status,
|
|
bytes_written, files_written, started_at, ended_at, duration_seconds,
|
|
updated_at
|
|
) VALUES (
|
|
job_record.job_id,
|
|
job_record.job_name,
|
|
job_record.client_name,
|
|
job_record.job_type,
|
|
job_record.job_level,
|
|
job_record.status,
|
|
job_record.bytes_written,
|
|
job_record.files_written,
|
|
job_record.started_at,
|
|
job_record.ended_at,
|
|
job_record.duration_seconds,
|
|
NOW()
|
|
)
|
|
ON CONFLICT (job_id) DO UPDATE SET
|
|
job_name = EXCLUDED.job_name,
|
|
client_name = EXCLUDED.client_name,
|
|
job_type = EXCLUDED.job_type,
|
|
job_level = EXCLUDED.job_level,
|
|
status = EXCLUDED.status,
|
|
bytes_written = EXCLUDED.bytes_written,
|
|
files_written = EXCLUDED.files_written,
|
|
started_at = EXCLUDED.started_at,
|
|
ended_at = EXCLUDED.ended_at,
|
|
duration_seconds = EXCLUDED.duration_seconds,
|
|
updated_at = NOW();
|
|
|
|
jobs_count := jobs_count + 1;
|
|
EXCEPTION
|
|
WHEN OTHERS THEN
|
|
error_count := error_count + 1;
|
|
-- Log error but continue with next job
|
|
RAISE WARNING 'Error syncing job %: %', job_record.job_id, SQLERRM;
|
|
END;
|
|
END LOOP;
|
|
|
|
-- Return summary
|
|
RETURN QUERY SELECT jobs_count, inserted_count, updated_count, error_count;
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
-- Create a simpler version that uses current database connection settings
|
|
-- This version assumes Bacula is on same host/port with same user
|
|
CREATE OR REPLACE FUNCTION sync_bacula_jobs_simple()
|
|
RETURNS TABLE(
|
|
jobs_synced INTEGER,
|
|
jobs_inserted INTEGER,
|
|
jobs_updated INTEGER,
|
|
errors INTEGER
|
|
) AS $$
|
|
DECLARE
|
|
current_user_name TEXT;
|
|
current_host TEXT;
|
|
current_port INTEGER;
|
|
current_db TEXT;
|
|
BEGIN
|
|
-- Get current connection info
|
|
SELECT
|
|
current_user,
|
|
COALESCE(inet_server_addr()::TEXT, 'localhost'),
|
|
COALESCE(inet_server_port(), 5432),
|
|
current_database()
|
|
INTO
|
|
current_user_name,
|
|
current_host,
|
|
current_port,
|
|
current_db;
|
|
|
|
-- Call main function with current connection settings
|
|
-- Note: password needs to be passed or configured in .pgpass
|
|
RETURN QUERY
|
|
SELECT * FROM sync_bacula_jobs(
|
|
'bacula', -- Try 'bacula' first
|
|
current_host,
|
|
current_port,
|
|
current_user_name,
|
|
'' -- Empty password - will use .pgpass or peer authentication
|
|
);
|
|
END;
|
|
$$ LANGUAGE plpgsql;
|
|
|
|
-- Grant execute permission to calypso user
|
|
GRANT EXECUTE ON FUNCTION sync_bacula_jobs(TEXT, TEXT, INTEGER, TEXT, TEXT) TO calypso;
|
|
GRANT EXECUTE ON FUNCTION sync_bacula_jobs_simple() TO calypso;
|
|
|
|
-- Create index if not exists (should already exist from migration 009)
|
|
CREATE INDEX IF NOT EXISTS idx_backup_jobs_job_id ON backup_jobs(job_id);
|
|
CREATE INDEX IF NOT EXISTS idx_backup_jobs_updated_at ON backup_jobs(updated_at);
|
|
|
|
COMMENT ON FUNCTION sync_bacula_jobs IS 'Syncs jobs from Bacula database to Calypso backup_jobs table using dblink';
|
|
COMMENT ON FUNCTION sync_bacula_jobs_simple IS 'Simplified version that uses current connection settings (requires .pgpass for password)';
|
|
|