Files
calypso/backend/internal/backup/service.go
2025-12-31 03:04:11 +07:00

2323 lines
67 KiB
Go

package backup
import (
"context"
"database/sql"
"fmt"
"os/exec"
"strconv"
"strings"
"time"
"github.com/atlasos/calypso/internal/common/config"
"github.com/atlasos/calypso/internal/common/database"
"github.com/atlasos/calypso/internal/common/logger"
)
// Service handles backup job operations
type Service struct {
db *database.DB
baculaDB *database.DB // Direct connection to Bacula database
logger *logger.Logger
}
// NewService creates a new backup service
func NewService(db *database.DB, log *logger.Logger) *Service {
return &Service{
db: db,
logger: log,
}
}
// SetBaculaDatabase sets up direct connection to Bacula database
func (s *Service) SetBaculaDatabase(cfg config.DatabaseConfig, baculaDBName string) error {
// Create new database config for Bacula database
baculaCfg := cfg
baculaCfg.Database = baculaDBName // Override database name
// Create connection to Bacula database
baculaDB, err := database.NewConnection(baculaCfg)
if err != nil {
return fmt.Errorf("failed to connect to Bacula database: %w", err)
}
s.baculaDB = baculaDB
s.logger.Info("Connected to Bacula database", "database", baculaDBName, "host", cfg.Host, "port", cfg.Port)
return nil
}
// Job represents a backup job
type Job struct {
ID string `json:"id"`
JobID int `json:"job_id"`
JobName string `json:"job_name"`
ClientName string `json:"client_name"`
JobType string `json:"job_type"`
JobLevel string `json:"job_level"`
Status string `json:"status"`
BytesWritten int64 `json:"bytes_written"`
FilesWritten int `json:"files_written"`
DurationSeconds *int `json:"duration_seconds,omitempty"`
StartedAt *time.Time `json:"started_at,omitempty"`
EndedAt *time.Time `json:"ended_at,omitempty"`
ErrorMessage *string `json:"error_message,omitempty"`
StorageName *string `json:"storage_name,omitempty"`
PoolName *string `json:"pool_name,omitempty"`
VolumeName *string `json:"volume_name,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// ListJobsOptions represents filtering and pagination options
type ListJobsOptions struct {
Status string // Filter by status: "Running", "Completed", "Failed", etc.
JobType string // Filter by job type: "Backup", "Restore", etc.
ClientName string // Filter by client name
JobName string // Filter by job name
Limit int // Number of results to return
Offset int // Offset for pagination
}
// Client represents a backup client
type Client struct {
ClientID int `json:"client_id"`
Name string `json:"name"`
Uname *string `json:"uname,omitempty"`
Enabled bool `json:"enabled"`
AutoPrune *bool `json:"auto_prune,omitempty"`
FileRetention *int64 `json:"file_retention,omitempty"`
JobRetention *int64 `json:"job_retention,omitempty"`
LastBackupAt *time.Time `json:"last_backup_at,omitempty"`
TotalJobs *int `json:"total_jobs,omitempty"`
TotalBytes *int64 `json:"total_bytes,omitempty"`
Status *string `json:"status,omitempty"` // "online" or "offline"
}
// ListClientsOptions represents filtering options for clients
type ListClientsOptions struct {
Enabled *bool // Filter by enabled status (nil = all)
Search string // Search by client name
}
// DashboardStats represents statistics for the backup dashboard
type DashboardStats struct {
DirectorStatus string `json:"director_status"` // "Active" or "Inactive"
DirectorUptime string `json:"director_uptime"` // e.g., "14d 2h 12m"
LastJob *Job `json:"last_job,omitempty"`
ActiveJobsCount int `json:"active_jobs_count"`
DefaultPool *PoolStats `json:"default_pool,omitempty"`
}
// PoolStats represents pool storage statistics
type PoolStats struct {
Name string `json:"name"`
UsedBytes int64 `json:"used_bytes"`
TotalBytes int64 `json:"total_bytes"`
UsagePercent float64 `json:"usage_percent"`
}
// StoragePool represents a Bacula storage pool
type StoragePool struct {
PoolID int `json:"pool_id"`
Name string `json:"name"`
PoolType string `json:"pool_type"`
LabelFormat *string `json:"label_format,omitempty"`
Recycle *bool `json:"recycle,omitempty"`
AutoPrune *bool `json:"auto_prune,omitempty"`
VolumeCount int `json:"volume_count"`
UsedBytes int64 `json:"used_bytes"`
TotalBytes int64 `json:"total_bytes"`
UsagePercent float64 `json:"usage_percent"`
}
// StorageVolume represents a Bacula storage volume
type StorageVolume struct {
VolumeID int `json:"volume_id"`
MediaID int `json:"media_id"`
VolumeName string `json:"volume_name"`
PoolName string `json:"pool_name"`
MediaType string `json:"media_type"`
VolStatus string `json:"vol_status"` // Full, Append, Used, Error, etc.
VolBytes int64 `json:"vol_bytes"`
MaxVolBytes int64 `json:"max_vol_bytes"`
VolFiles int `json:"vol_files"`
VolRetention *time.Time `json:"vol_retention,omitempty"`
LastWritten *time.Time `json:"last_written,omitempty"`
RecycleCount int `json:"recycle_count"`
}
// StorageDaemon represents a Bacula storage daemon
type StorageDaemon struct {
StorageID int `json:"storage_id"`
Name string `json:"name"`
Address string `json:"address"`
Port int `json:"port"`
DeviceName string `json:"device_name"`
MediaType string `json:"media_type"`
Status string `json:"status"` // Online, Offline
}
// SyncJobsFromBacula syncs jobs from Bacula/Bareos to the database
// Tries to query Bacula database directly first, falls back to bconsole if database access fails
func (s *Service) SyncJobsFromBacula(ctx context.Context) error {
s.logger.Info("Starting sync from Bacula database", "bacula_db_configured", s.baculaDB != nil)
// Check if Bacula database connection is configured
if s.baculaDB == nil {
s.logger.Warn("Bacula database connection not configured, trying bconsole fallback")
return s.syncFromBconsole(ctx)
}
// Try to query Bacula database directly (if user has access)
jobs, err := s.queryBaculaDatabase(ctx)
if err != nil {
s.logger.Warn("Failed to query Bacula database directly, trying bconsole", "error", err)
// Fallback to bconsole
return s.syncFromBconsole(ctx)
}
s.logger.Info("Queried Bacula database", "jobs_found", len(jobs))
if len(jobs) == 0 {
s.logger.Debug("No jobs found in Bacula database")
return nil
}
// Upsert jobs to Calypso database
successCount := 0
errorCount := 0
for _, job := range jobs {
err := s.upsertJob(ctx, job)
if err != nil {
s.logger.Error("Failed to upsert job", "job_id", job.JobID, "job_name", job.JobName, "error", err)
errorCount++
continue
}
successCount++
s.logger.Debug("Upserted job", "job_id", job.JobID, "job_name", job.JobName)
}
s.logger.Info("Synced jobs from Bacula database", "total", len(jobs), "success", successCount, "errors", errorCount)
if errorCount > 0 {
return fmt.Errorf("failed to sync %d out of %d jobs", errorCount, len(jobs))
}
return nil
}
// queryBaculaDatabase queries Bacula database directly
// Uses direct connection to Bacula database (no dblink needed)
func (s *Service) queryBaculaDatabase(ctx context.Context) ([]Job, error) {
// Use direct connection to Bacula database
if s.baculaDB == nil {
return nil, fmt.Errorf("Bacula database connection not configured")
}
return s.queryBaculaDirect(ctx)
}
// queryBaculaDirect queries Job table directly (Bacularis approach)
// Assumes Bacula tables are in same database or accessible via search_path
func (s *Service) queryBaculaDirect(ctx context.Context) ([]Job, error) {
// Bacularis-style query: direct query to Job table with JOIN to Client
// This is the standard way Bacularis queries Bacula database
query := `
SELECT
j.JobId as job_id,
j.Name as job_name,
COALESCE(c.Name, 'unknown') as client_name,
CASE
WHEN j.Type = 'B' THEN 'Backup'
WHEN j.Type = 'R' THEN 'Restore'
WHEN j.Type = 'V' THEN 'Verify'
WHEN j.Type = 'C' THEN 'Copy'
WHEN j.Type = 'M' THEN 'Migrate'
ELSE 'Backup'
END as job_type,
CASE
WHEN j.Level = 'F' THEN 'Full'
WHEN j.Level = 'I' THEN 'Incremental'
WHEN j.Level = 'D' THEN 'Differential'
WHEN j.Level = 'S' THEN 'Since'
ELSE 'Full'
END as job_level,
CASE
WHEN j.JobStatus = 'T' THEN 'Running'
WHEN j.JobStatus = 'C' THEN 'Completed'
WHEN j.JobStatus = 'f' OR j.JobStatus = 'F' THEN 'Failed'
WHEN j.JobStatus = 'A' THEN 'Canceled'
WHEN j.JobStatus = 'W' THEN 'Waiting'
ELSE 'Waiting'
END as status,
COALESCE(j.JobBytes, 0) as bytes_written,
COALESCE(j.JobFiles, 0) as files_written,
j.StartTime as started_at,
j.EndTime as ended_at
FROM Job j
LEFT JOIN Client c ON j.ClientId = c.ClientId
ORDER BY j.StartTime DESC
LIMIT 1000
`
// Use direct connection to Bacula database
rows, err := s.baculaDB.QueryContext(ctx, query)
if err != nil {
return nil, fmt.Errorf("failed to query Bacula Job table: %w", err)
}
defer rows.Close()
var jobs []Job
for rows.Next() {
var job Job
var startedAt, endedAt sql.NullTime
err := rows.Scan(
&job.JobID, &job.JobName, &job.ClientName,
&job.JobType, &job.JobLevel, &job.Status,
&job.BytesWritten, &job.FilesWritten, &startedAt, &endedAt,
)
if err != nil {
s.logger.Error("Failed to scan Bacula job", "error", err)
continue
}
if startedAt.Valid {
job.StartedAt = &startedAt.Time
}
if endedAt.Valid {
job.EndedAt = &endedAt.Time
// Calculate duration if both start and end times are available
if job.StartedAt != nil {
duration := int(endedAt.Time.Sub(*job.StartedAt).Seconds())
job.DurationSeconds = &duration
}
}
jobs = append(jobs, job)
}
if err := rows.Err(); err != nil {
return nil, err
}
if len(jobs) > 0 {
s.logger.Info("Successfully queried Bacula database (direct)", "count", len(jobs))
return jobs, nil
}
return jobs, nil // Return empty list, not an error
}
// syncFromBconsole syncs jobs using bconsole command (fallback method)
func (s *Service) syncFromBconsole(ctx context.Context) error {
// Execute bconsole command to list jobs
cmd := exec.CommandContext(ctx, "sh", "-c", "echo -e 'list jobs\nquit' | bconsole")
output, err := cmd.CombinedOutput()
if err != nil {
s.logger.Debug("Failed to execute bconsole", "error", err, "output", string(output))
return nil // Don't fail, just return empty
}
if len(output) == 0 {
s.logger.Debug("bconsole returned empty output")
return nil
}
// Parse bconsole output
jobs := s.parseBconsoleOutput(ctx, string(output))
if len(jobs) == 0 {
s.logger.Debug("No jobs found in bconsole output")
return nil
}
// Upsert jobs to database
successCount := 0
for _, job := range jobs {
err := s.upsertJob(ctx, job)
if err != nil {
s.logger.Error("Failed to upsert job", "job_id", job.JobID, "error", err)
continue
}
successCount++
}
s.logger.Info("Synced jobs from bconsole", "total", len(jobs), "success", successCount)
return nil
}
// parseBconsoleOutput parses bconsole "list jobs" output
func (s *Service) parseBconsoleOutput(ctx context.Context, output string) []Job {
var jobs []Job
lines := strings.Split(output, "\n")
// Skip header lines until we find the data rows
inDataSection := false
for _, line := range lines {
line = strings.TrimSpace(line)
// Skip empty lines and separators
if line == "" || strings.HasPrefix(line, "+") {
continue
}
// Start data section when we see header
if strings.HasPrefix(line, "| jobid") {
inDataSection = true
continue
}
// Stop at footer separator
if strings.HasPrefix(line, "*") {
break
}
if !inDataSection {
continue
}
// Parse data row: | jobid | name | starttime | type | level | jobfiles | jobbytes | jobstatus |
if strings.HasPrefix(line, "|") {
parts := strings.Split(line, "|")
if len(parts) < 9 {
continue
}
// Extract fields (skip first empty part)
jobIDStr := strings.TrimSpace(parts[1])
jobName := strings.TrimSpace(parts[2])
startTimeStr := strings.TrimSpace(parts[3])
jobTypeChar := strings.TrimSpace(parts[4])
jobLevelChar := strings.TrimSpace(parts[5])
jobFilesStr := strings.TrimSpace(parts[6])
jobBytesStr := strings.TrimSpace(parts[7])
jobStatusChar := strings.TrimSpace(parts[8])
// Parse job ID
jobID, err := strconv.Atoi(jobIDStr)
if err != nil {
s.logger.Warn("Failed to parse job ID", "value", jobIDStr, "error", err)
continue
}
// Parse start time
var startedAt *time.Time
if startTimeStr != "" && startTimeStr != "-" {
// Format: 2025-12-27 23:05:02
parsedTime, err := time.Parse("2006-01-02 15:04:05", startTimeStr)
if err == nil {
startedAt = &parsedTime
}
}
// Map job type
jobType := "Backup"
switch jobTypeChar {
case "B":
jobType = "Backup"
case "R":
jobType = "Restore"
case "V":
jobType = "Verify"
case "C":
jobType = "Copy"
case "M":
jobType = "Migrate"
}
// Map job level
jobLevel := "Full"
switch jobLevelChar {
case "F":
jobLevel = "Full"
case "I":
jobLevel = "Incremental"
case "D":
jobLevel = "Differential"
case "S":
jobLevel = "Since"
}
// Parse files and bytes
filesWritten := 0
if jobFilesStr != "" && jobFilesStr != "-" {
if f, err := strconv.Atoi(jobFilesStr); err == nil {
filesWritten = f
}
}
bytesWritten := int64(0)
if jobBytesStr != "" && jobBytesStr != "-" {
if b, err := strconv.ParseInt(jobBytesStr, 10, 64); err == nil {
bytesWritten = b
}
}
// Map job status
status := "Waiting"
switch strings.ToLower(jobStatusChar) {
case "t", "T":
status = "Running"
case "c", "C":
status = "Completed"
case "f", "F":
status = "Failed"
case "A":
status = "Canceled"
case "W":
status = "Waiting"
}
// Try to extract client name from job name (common pattern: JobName-ClientName)
clientName := "unknown"
// For now, use job name as client name if it looks like a client name
// In real implementation, we'd query job details from Bacula
if jobName != "" {
// Try to get client name from job details
clientNameFromJob := s.getClientNameFromJob(ctx, jobID)
if clientNameFromJob != "" {
clientName = clientNameFromJob
} else {
// Fallback: use job name as client name
clientName = jobName
}
}
job := Job{
JobID: jobID,
JobName: jobName,
ClientName: clientName,
JobType: jobType,
JobLevel: jobLevel,
Status: status,
BytesWritten: bytesWritten,
FilesWritten: filesWritten,
StartedAt: startedAt,
}
jobs = append(jobs, job)
}
}
return jobs
}
// getClientNameFromJob gets client name from job details using bconsole
func (s *Service) getClientNameFromJob(ctx context.Context, jobID int) string {
// Execute bconsole to get job details
cmd := exec.CommandContext(ctx, "sh", "-c", fmt.Sprintf("echo -e 'list job jobid=%d\nquit' | bconsole", jobID))
output, err := cmd.CombinedOutput()
if err != nil {
s.logger.Debug("Failed to get job details", "job_id", jobID, "error", err)
return ""
}
// Parse output to find Client line
lines := strings.Split(string(output), "\n")
for _, line := range lines {
line = strings.TrimSpace(line)
if strings.HasPrefix(line, "Client:") {
parts := strings.Split(line, ":")
if len(parts) >= 2 {
return strings.TrimSpace(parts[1])
}
}
}
return ""
}
// ExecuteBconsoleCommand executes a bconsole command and returns the output
func (s *Service) ExecuteBconsoleCommand(ctx context.Context, command string) (string, error) {
// Sanitize command
command = strings.TrimSpace(command)
if command == "" {
return "", fmt.Errorf("command cannot be empty")
}
// Remove any existing quit commands from user input
command = strings.TrimSuffix(strings.ToLower(command), "quit")
command = strings.TrimSpace(command)
// Ensure command ends with quit
commandWithQuit := command + "\nquit"
// Use printf instead of echo -e for better compatibility
// Escape single quotes in command
escapedCommand := strings.ReplaceAll(commandWithQuit, "'", "'\"'\"'")
// Execute bconsole command using printf to avoid echo -e issues
cmd := exec.CommandContext(ctx, "sh", "-c", fmt.Sprintf("printf '%%s\\n' '%s' | bconsole", escapedCommand))
output, err := cmd.CombinedOutput()
if err != nil {
// bconsole may return non-zero exit code even on success, so check output
outputStr := string(output)
if len(outputStr) > 0 {
// If there's output, return it even if there's an error
return outputStr, nil
}
return outputStr, fmt.Errorf("bconsole error: %w", err)
}
return string(output), nil
}
// ListClients lists all backup clients from Bacula database Client table
// Falls back to bconsole if database connection is not available
func (s *Service) ListClients(ctx context.Context, opts ListClientsOptions) ([]Client, error) {
// Try database first if available
if s.baculaDB != nil {
clients, err := s.queryClientsFromDatabase(ctx, opts)
if err == nil {
s.logger.Debug("Queried clients from Bacula database", "count", len(clients))
return clients, nil
}
s.logger.Warn("Failed to query clients from database, trying bconsole fallback", "error", err)
}
// Fallback to bconsole
s.logger.Info("Using bconsole fallback for list clients")
return s.queryClientsFromBconsole(ctx, opts)
}
// queryClientsFromDatabase queries clients from Bacula database
func (s *Service) queryClientsFromDatabase(ctx context.Context, opts ListClientsOptions) ([]Client, error) {
// First, try a simple query to check if Client table exists and has data
simpleQuery := `SELECT COUNT(*) FROM Client`
var count int
err := s.baculaDB.QueryRowContext(ctx, simpleQuery).Scan(&count)
if err != nil {
s.logger.Warn("Failed to count clients from Client table", "error", err)
return nil, fmt.Errorf("failed to query Client table: %w", err)
}
s.logger.Debug("Total clients in database", "count", count)
if count == 0 {
s.logger.Info("No clients found in Bacula database")
return []Client{}, nil
}
// Build query with filters
query := `
SELECT
c.ClientId,
c.Name,
c.Uname,
true as enabled,
c.AutoPrune,
c.FileRetention,
c.JobRetention,
MAX(j.StartTime) as last_backup_at,
COUNT(DISTINCT j.JobId) as total_jobs,
COALESCE(SUM(j.JobBytes), 0) as total_bytes
FROM Client c
LEFT JOIN Job j ON c.ClientId = j.ClientId
WHERE 1=1
`
args := []interface{}{}
argIndex := 1
if opts.Enabled != nil {
query += fmt.Sprintf(" AND true = $%d", argIndex)
args = append(args, *opts.Enabled)
argIndex++
}
if opts.Search != "" {
query += fmt.Sprintf(" AND c.Name ILIKE $%d", argIndex)
args = append(args, "%"+opts.Search+"%")
argIndex++
}
query += " GROUP BY c.ClientId, c.Name, c.Uname, c.AutoPrune, c.FileRetention, c.JobRetention"
query += " ORDER BY c.Name"
s.logger.Debug("Executing clients query", "query", query, "args", args)
rows, err := s.baculaDB.QueryContext(ctx, query, args...)
if err != nil {
s.logger.Error("Failed to execute clients query", "error", err, "query", query)
return nil, fmt.Errorf("failed to query clients from Bacula database: %w", err)
}
defer rows.Close()
var clients []Client
for rows.Next() {
var client Client
var uname sql.NullString
var autoPrune sql.NullBool
var fileRetention, jobRetention sql.NullInt64
var lastBackupAt sql.NullTime
var totalJobs sql.NullInt64
var totalBytes sql.NullInt64
err := rows.Scan(
&client.ClientID,
&client.Name,
&uname,
&client.Enabled,
&autoPrune,
&fileRetention,
&jobRetention,
&lastBackupAt,
&totalJobs,
&totalBytes,
)
if err != nil {
s.logger.Error("Failed to scan client row", "error", err)
continue
}
if uname.Valid {
client.Uname = &uname.String
}
if autoPrune.Valid {
client.AutoPrune = &autoPrune.Bool
}
if fileRetention.Valid {
val := fileRetention.Int64
client.FileRetention = &val
}
if jobRetention.Valid {
val := jobRetention.Int64
client.JobRetention = &val
}
if lastBackupAt.Valid {
client.LastBackupAt = &lastBackupAt.Time
}
if totalJobs.Valid {
val := int(totalJobs.Int64)
client.TotalJobs = &val
}
if totalBytes.Valid {
val := totalBytes.Int64
client.TotalBytes = &val
}
// Determine client status based on enabled and last backup
// If client is enabled and has recent backup (within 24 hours), consider it online
// Otherwise, mark as offline
if client.Enabled {
if lastBackupAt.Valid {
timeSinceLastBackup := time.Since(lastBackupAt.Time)
if timeSinceLastBackup < 24*time.Hour {
status := "online"
client.Status = &status
} else {
status := "offline"
client.Status = &status
}
} else {
// No backup yet, but enabled - assume online
status := "online"
client.Status = &status
}
} else {
status := "offline"
client.Status = &status
}
clients = append(clients, client)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating client rows: %w", err)
}
s.logger.Debug("Queried clients from Bacula database", "count", len(clients))
return clients, nil
}
// queryClientsFromBconsole queries clients using bconsole command (fallback method)
func (s *Service) queryClientsFromBconsole(ctx context.Context, opts ListClientsOptions) ([]Client, error) {
// Execute bconsole command to list clients
s.logger.Debug("Executing bconsole list clients command")
output, err := s.ExecuteBconsoleCommand(ctx, "list clients")
if err != nil {
s.logger.Error("Failed to execute bconsole list clients", "error", err)
return nil, fmt.Errorf("failed to execute bconsole list clients: %w", err)
}
previewLen := 200
if len(output) < previewLen {
previewLen = len(output)
}
s.logger.Debug("bconsole output", "output_length", len(output), "output_preview", output[:previewLen])
// Parse bconsole output
clients := s.parseBconsoleClientsOutput(output)
s.logger.Debug("Parsed clients from bconsole", "count", len(clients))
// Apply filters
filtered := []Client{}
for _, client := range clients {
if opts.Enabled != nil && client.Enabled != *opts.Enabled {
continue
}
if opts.Search != "" && !strings.Contains(strings.ToLower(client.Name), strings.ToLower(opts.Search)) {
continue
}
filtered = append(filtered, client)
}
return filtered, nil
}
// parseBconsoleClientsOutput parses bconsole "list clients" output
func (s *Service) parseBconsoleClientsOutput(output string) []Client {
var clients []Client
lines := strings.Split(output, "\n")
inTable := false
headerFound := false
for _, line := range lines {
line = strings.TrimSpace(line)
// Skip connection messages and command echo
if strings.Contains(line, "Connecting to Director") ||
strings.Contains(line, "Enter a period") ||
strings.Contains(line, "list clients") ||
strings.Contains(line, "quit") ||
strings.Contains(line, "You have messages") ||
strings.Contains(line, "Automatically selected") ||
strings.Contains(line, "Using Catalog") {
continue
}
// Detect table header
if !headerFound && (strings.Contains(line, "Name") || strings.Contains(line, "| Name")) {
headerFound = true
inTable = true
continue
}
// Detect table separator
if strings.HasPrefix(line, "+") && strings.Contains(line, "-") {
continue
}
// Skip empty lines
if line == "" {
continue
}
// Parse table rows (format: | clientname | address |)
if inTable && strings.Contains(line, "|") {
parts := strings.Split(line, "|")
if len(parts) >= 2 {
clientName := strings.TrimSpace(parts[1])
clientName = strings.Trim(clientName, "\"'")
if clientName == "" || clientName == "Name" || strings.HasPrefix(clientName, "-") {
continue
}
client := Client{
ClientID: 0,
Name: clientName,
Enabled: true,
}
clients = append(clients, client)
}
} else if inTable && !strings.Contains(line, "|") {
// Fallback format
parts := strings.Fields(line)
if len(parts) > 0 {
clientName := parts[0]
clientName = strings.Trim(clientName, "\"'")
if clientName != "" && clientName != "Name" && !strings.HasPrefix(clientName, "-") {
client := Client{
ClientID: 0,
Name: clientName,
Enabled: true,
}
clients = append(clients, client)
}
}
}
}
return clients
}
// upsertJob inserts or updates a job in the database
func (s *Service) upsertJob(ctx context.Context, job Job) error {
query := `
INSERT INTO backup_jobs (
job_id, job_name, client_name, job_type, job_level, status,
bytes_written, files_written, started_at, ended_at, duration_seconds, updated_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, NOW())
ON CONFLICT (job_id) DO UPDATE SET
job_name = EXCLUDED.job_name,
client_name = EXCLUDED.client_name,
job_type = EXCLUDED.job_type,
job_level = EXCLUDED.job_level,
status = EXCLUDED.status,
bytes_written = EXCLUDED.bytes_written,
files_written = EXCLUDED.files_written,
started_at = EXCLUDED.started_at,
ended_at = EXCLUDED.ended_at,
duration_seconds = EXCLUDED.duration_seconds,
updated_at = NOW()
`
// Use job name as client name if client_name is empty (we'll improve this later)
clientName := job.ClientName
if clientName == "" {
clientName = "unknown"
}
result, err := s.db.ExecContext(ctx, query,
job.JobID, job.JobName, clientName, job.JobType, job.JobLevel, job.Status,
job.BytesWritten, job.FilesWritten, job.StartedAt, job.EndedAt, job.DurationSeconds,
)
if err != nil {
s.logger.Error("Database error in upsertJob", "job_id", job.JobID, "error", err)
return err
}
rowsAffected, _ := result.RowsAffected()
s.logger.Debug("Upserted job to database", "job_id", job.JobID, "rows_affected", rowsAffected)
return nil
}
// ListJobs lists backup jobs with optional filters
func (s *Service) ListJobs(ctx context.Context, opts ListJobsOptions) ([]Job, int, error) {
// Try to sync jobs from Bacula first (non-blocking - if it fails, continue with database)
// Don't return error if sync fails, just log it and continue
// This allows the API to work even if bconsole is not available
s.logger.Info("ListJobs called, syncing from Bacula first")
syncErr := s.SyncJobsFromBacula(ctx)
if syncErr != nil {
s.logger.Warn("Failed to sync jobs from Bacula, using database only", "error", syncErr)
// Continue anyway - we'll use whatever is in the database
} else {
s.logger.Info("Successfully synced jobs from Bacula")
}
// Build WHERE clause
whereClauses := []string{"1=1"}
args := []interface{}{}
argIndex := 1
if opts.Status != "" {
whereClauses = append(whereClauses, fmt.Sprintf("status = $%d", argIndex))
args = append(args, opts.Status)
argIndex++
}
if opts.JobType != "" {
whereClauses = append(whereClauses, fmt.Sprintf("job_type = $%d", argIndex))
args = append(args, opts.JobType)
argIndex++
}
if opts.ClientName != "" {
whereClauses = append(whereClauses, fmt.Sprintf("client_name ILIKE $%d", argIndex))
args = append(args, "%"+opts.ClientName+"%")
argIndex++
}
if opts.JobName != "" {
whereClauses = append(whereClauses, fmt.Sprintf("job_name ILIKE $%d", argIndex))
args = append(args, "%"+opts.JobName+"%")
argIndex++
}
whereClause := ""
if len(whereClauses) > 0 {
whereClause = "WHERE " + whereClauses[0]
for i := 1; i < len(whereClauses); i++ {
whereClause += " AND " + whereClauses[i]
}
}
// Get total count
countQuery := fmt.Sprintf("SELECT COUNT(*) FROM backup_jobs %s", whereClause)
var totalCount int
err := s.db.QueryRowContext(ctx, countQuery, args...).Scan(&totalCount)
if err != nil {
return nil, 0, fmt.Errorf("failed to count jobs: %w", err)
}
// Set default limit
limit := opts.Limit
if limit <= 0 {
limit = 50
}
if limit > 100 {
limit = 100
}
// Build query with pagination
query := fmt.Sprintf(`
SELECT id, job_id, job_name, client_name, job_type, job_level, status,
bytes_written, files_written, duration_seconds,
started_at, ended_at, error_message,
storage_name, pool_name, volume_name,
created_at, updated_at
FROM backup_jobs
%s
ORDER BY started_at DESC NULLS LAST, created_at DESC
LIMIT $%d OFFSET $%d
`, whereClause, argIndex, argIndex+1)
args = append(args, limit, opts.Offset)
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, 0, fmt.Errorf("failed to query jobs: %w", err)
}
defer rows.Close()
var jobs []Job
for rows.Next() {
var job Job
var durationSeconds sql.NullInt64
var startedAt, endedAt sql.NullTime
var errorMessage, storageName, poolName, volumeName sql.NullString
err := rows.Scan(
&job.ID, &job.JobID, &job.JobName, &job.ClientName,
&job.JobType, &job.JobLevel, &job.Status,
&job.BytesWritten, &job.FilesWritten, &durationSeconds,
&startedAt, &endedAt, &errorMessage,
&storageName, &poolName, &volumeName,
&job.CreatedAt, &job.UpdatedAt,
)
if err != nil {
s.logger.Error("Failed to scan job", "error", err)
continue
}
if durationSeconds.Valid {
dur := int(durationSeconds.Int64)
job.DurationSeconds = &dur
}
if startedAt.Valid {
job.StartedAt = &startedAt.Time
}
if endedAt.Valid {
job.EndedAt = &endedAt.Time
}
if errorMessage.Valid {
job.ErrorMessage = &errorMessage.String
}
if storageName.Valid {
job.StorageName = &storageName.String
}
if poolName.Valid {
job.PoolName = &poolName.String
}
if volumeName.Valid {
job.VolumeName = &volumeName.String
}
jobs = append(jobs, job)
}
return jobs, totalCount, rows.Err()
}
// GetJob retrieves a job by ID
func (s *Service) GetJob(ctx context.Context, id string) (*Job, error) {
query := `
SELECT id, job_id, job_name, client_name, job_type, job_level, status,
bytes_written, files_written, duration_seconds,
started_at, ended_at, error_message,
storage_name, pool_name, volume_name,
created_at, updated_at
FROM backup_jobs
WHERE id = $1
`
var job Job
var durationSeconds sql.NullInt64
var startedAt, endedAt sql.NullTime
var errorMessage, storageName, poolName, volumeName sql.NullString
err := s.db.QueryRowContext(ctx, query, id).Scan(
&job.ID, &job.JobID, &job.JobName, &job.ClientName,
&job.JobType, &job.JobLevel, &job.Status,
&job.BytesWritten, &job.FilesWritten, &durationSeconds,
&startedAt, &endedAt, &errorMessage,
&storageName, &poolName, &volumeName,
&job.CreatedAt, &job.UpdatedAt,
)
if err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("job not found")
}
return nil, fmt.Errorf("failed to get job: %w", err)
}
if durationSeconds.Valid {
dur := int(durationSeconds.Int64)
job.DurationSeconds = &dur
}
if startedAt.Valid {
job.StartedAt = &startedAt.Time
}
if endedAt.Valid {
job.EndedAt = &endedAt.Time
}
if errorMessage.Valid {
job.ErrorMessage = &errorMessage.String
}
if storageName.Valid {
job.StorageName = &storageName.String
}
if poolName.Valid {
job.PoolName = &poolName.String
}
if volumeName.Valid {
job.VolumeName = &volumeName.String
}
return &job, nil
}
// CreateJobRequest represents a request to create a new backup job
type CreateJobRequest struct {
JobName string `json:"job_name" binding:"required"`
ClientName string `json:"client_name" binding:"required"`
JobType string `json:"job_type" binding:"required"` // 'Backup', 'Restore', 'Verify', 'Copy', 'Migrate'
JobLevel string `json:"job_level" binding:"required"` // 'Full', 'Incremental', 'Differential', 'Since'
StorageName *string `json:"storage_name,omitempty"`
PoolName *string `json:"pool_name,omitempty"`
}
// CreateJob creates a new backup job
func (s *Service) CreateJob(ctx context.Context, req CreateJobRequest) (*Job, error) {
// Generate a unique job ID (in real implementation, this would come from Bareos)
// For now, we'll use a simple incrementing approach or timestamp-based ID
var jobID int
err := s.db.QueryRowContext(ctx, `
SELECT COALESCE(MAX(job_id), 0) + 1 FROM backup_jobs
`).Scan(&jobID)
if err != nil {
return nil, fmt.Errorf("failed to generate job ID: %w", err)
}
// Insert the job into database
query := `
INSERT INTO backup_jobs (
job_id, job_name, client_name, job_type, job_level,
status, bytes_written, files_written,
storage_name, pool_name, started_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW())
RETURNING id, job_id, job_name, client_name, job_type, job_level, status,
bytes_written, files_written, duration_seconds,
started_at, ended_at, error_message,
storage_name, pool_name, volume_name,
created_at, updated_at
`
var job Job
var durationSeconds sql.NullInt64
var startedAt, endedAt sql.NullTime
var errorMessage, storageName, poolName, volumeName sql.NullString
err = s.db.QueryRowContext(ctx, query,
jobID, req.JobName, req.ClientName, req.JobType, req.JobLevel,
"Waiting", 0, 0,
req.StorageName, req.PoolName,
).Scan(
&job.ID, &job.JobID, &job.JobName, &job.ClientName,
&job.JobType, &job.JobLevel, &job.Status,
&job.BytesWritten, &job.FilesWritten, &durationSeconds,
&startedAt, &endedAt, &errorMessage,
&storageName, &poolName, &volumeName,
&job.CreatedAt, &job.UpdatedAt,
)
if err != nil {
return nil, fmt.Errorf("failed to create job: %w", err)
}
if durationSeconds.Valid {
dur := int(durationSeconds.Int64)
job.DurationSeconds = &dur
}
if startedAt.Valid {
job.StartedAt = &startedAt.Time
}
if endedAt.Valid {
job.EndedAt = &endedAt.Time
}
if errorMessage.Valid {
job.ErrorMessage = &errorMessage.String
}
if storageName.Valid {
job.StorageName = &storageName.String
}
if poolName.Valid {
job.PoolName = &poolName.String
}
if volumeName.Valid {
job.VolumeName = &volumeName.String
}
s.logger.Info("Backup job created",
"job_id", job.JobID,
"job_name", job.JobName,
"client_name", job.ClientName,
"job_type", job.JobType,
)
return &job, nil
}
// GetDashboardStats returns statistics for the backup dashboard
func (s *Service) GetDashboardStats(ctx context.Context) (*DashboardStats, error) {
stats := &DashboardStats{
DirectorStatus: "Active", // Default to active
ActiveJobsCount: 0,
}
// Get director status and uptime from bconsole
output, err := s.ExecuteBconsoleCommand(ctx, "status director")
if err == nil && len(output) > 0 {
// If bconsole returns output, director is active
// Parse output to extract uptime
lines := strings.Split(output, "\n")
for _, line := range lines {
line = strings.TrimSpace(line)
// Look for "Daemon started" line which contains uptime info
if strings.Contains(line, "Daemon started") {
stats.DirectorStatus = "Active"
stats.DirectorUptime = s.parseUptimeFromStatus(line)
break
}
// Also check for version line as indicator of active director
if strings.Contains(line, "Version:") {
stats.DirectorStatus = "Active"
}
}
// If we didn't find uptime yet, try to parse from any date in the output
if stats.DirectorUptime == "" {
for _, line := range lines {
line = strings.TrimSpace(line)
if strings.Contains(line, "started") || strings.Contains(line, "since") {
uptime := s.parseUptimeFromStatus(line)
if uptime != "Unknown" {
stats.DirectorUptime = uptime
break
}
}
}
}
// If still no uptime, set default
if stats.DirectorUptime == "" {
stats.DirectorUptime = "Active"
}
} else {
s.logger.Warn("Failed to get director status from bconsole", "error", err)
stats.DirectorStatus = "Inactive"
stats.DirectorUptime = "Unknown"
}
// Get last completed job
lastJobQuery := `
SELECT id, job_id, job_name, client_name, job_type, job_level, status,
bytes_written, files_written, started_at, ended_at, created_at, updated_at
FROM backup_jobs
WHERE status = 'Completed'
ORDER BY ended_at DESC NULLS LAST, started_at DESC
LIMIT 1
`
var lastJob Job
var startedAt, endedAt sql.NullTime
err = s.db.QueryRowContext(ctx, lastJobQuery).Scan(
&lastJob.ID, &lastJob.JobID, &lastJob.JobName, &lastJob.ClientName,
&lastJob.JobType, &lastJob.JobLevel, &lastJob.Status,
&lastJob.BytesWritten, &lastJob.FilesWritten,
&startedAt, &endedAt, &lastJob.CreatedAt, &lastJob.UpdatedAt,
)
if err == nil {
if startedAt.Valid {
lastJob.StartedAt = &startedAt.Time
}
if endedAt.Valid {
lastJob.EndedAt = &endedAt.Time
}
// Calculate duration
if lastJob.StartedAt != nil && lastJob.EndedAt != nil {
duration := int(lastJob.EndedAt.Sub(*lastJob.StartedAt).Seconds())
lastJob.DurationSeconds = &duration
}
stats.LastJob = &lastJob
} else if err != sql.ErrNoRows {
s.logger.Warn("Failed to get last job", "error", err)
}
// Count active (running) jobs
activeJobsQuery := `SELECT COUNT(*) FROM backup_jobs WHERE status = 'Running'`
err = s.db.QueryRowContext(ctx, activeJobsQuery).Scan(&stats.ActiveJobsCount)
if err != nil {
s.logger.Warn("Failed to count active jobs", "error", err)
}
// Get default pool stats from Bacula database
if s.baculaDB != nil {
poolStats, err := s.getDefaultPoolStats(ctx)
if err == nil {
stats.DefaultPool = poolStats
} else {
s.logger.Warn("Failed to get pool stats", "error", err)
}
}
return stats, nil
}
// parseUptimeFromStatus parses uptime from bconsole status output
func (s *Service) parseUptimeFromStatus(line string) string {
// Look for "Daemon started" pattern: "Daemon started 28-Dec-25 01:45"
// Bacula format: "28-Dec-25 01:45" or "30-Dec-2025 02:24:58"
// Try to find date pattern in the line
// Format: "DD-MMM-YY HH:MM" or "DD-MMM-YYYY HH:MM:SS"
words := strings.Fields(line)
for i := 0; i < len(words); i++ {
word := words[i]
// Check if word looks like a date (contains "-" and month abbreviation)
if strings.Contains(word, "-") && (strings.Contains(word, "Jan") ||
strings.Contains(word, "Feb") || strings.Contains(word, "Mar") ||
strings.Contains(word, "Apr") || strings.Contains(word, "May") ||
strings.Contains(word, "Jun") || strings.Contains(word, "Jul") ||
strings.Contains(word, "Aug") || strings.Contains(word, "Sep") ||
strings.Contains(word, "Oct") || strings.Contains(word, "Nov") ||
strings.Contains(word, "Dec")) {
// Try to parse date + time
var dateTimeStr string
if i+1 < len(words) {
// Date + time (2 words)
dateTimeStr = word + " " + words[i+1]
} else {
dateTimeStr = word
}
// Try different date formats
formats := []string{
"02-Jan-06 15:04", // "28-Dec-25 01:45"
"02-Jan-2006 15:04:05", // "30-Dec-2025 02:24:58"
"02-Jan-2006 15:04", // "30-Dec-2025 02:24"
"2006-01-02 15:04:05", // ISO format
"2006-01-02 15:04", // ISO format without seconds
}
for _, format := range formats {
if t, err := time.Parse(format, dateTimeStr); err == nil {
duration := time.Since(t)
return s.formatUptime(duration)
}
}
}
}
return "Unknown"
}
// formatUptime formats a duration as "Xd Xh Xm"
func (s *Service) formatUptime(duration time.Duration) string {
days := int(duration.Hours() / 24)
hours := int(duration.Hours()) % 24
minutes := int(duration.Minutes()) % 60
if days > 0 {
return fmt.Sprintf("%dd %dh %dm", days, hours, minutes)
}
if hours > 0 {
return fmt.Sprintf("%dh %dm", hours, minutes)
}
return fmt.Sprintf("%dm", minutes)
}
// getDefaultPoolStats gets statistics for the default pool from Bacula database
func (s *Service) getDefaultPoolStats(ctx context.Context) (*PoolStats, error) {
// Query Pool table for default pool (usually named "Default" or "Full")
query := `
SELECT
p.Name,
COALESCE(SUM(v.VolBytes), 0) as used_bytes,
COALESCE(SUM(v.MaxVolBytes), 0) as total_bytes
FROM Pool p
LEFT JOIN Media m ON p.PoolId = m.PoolId
LEFT JOIN Volumes v ON m.MediaId = v.MediaId
WHERE p.Name = 'Default' OR p.Name = 'Full'
GROUP BY p.Name
ORDER BY p.Name
LIMIT 1
`
var pool PoolStats
var name sql.NullString
var usedBytes, totalBytes sql.NullInt64
err := s.baculaDB.QueryRowContext(ctx, query).Scan(&name, &usedBytes, &totalBytes)
if err != nil {
if err == sql.ErrNoRows {
// Try alternative query - get pool with most volumes
altQuery := `
SELECT
p.Name,
COALESCE(SUM(v.VolBytes), 0) as used_bytes,
COALESCE(SUM(v.MaxVolBytes), 0) as total_bytes
FROM Pool p
LEFT JOIN Media m ON p.PoolId = m.PoolId
LEFT JOIN Volumes v ON m.MediaId = v.MediaId
GROUP BY p.Name
ORDER BY COUNT(m.MediaId) DESC
LIMIT 1
`
err = s.baculaDB.QueryRowContext(ctx, altQuery).Scan(&name, &usedBytes, &totalBytes)
if err != nil {
return nil, fmt.Errorf("failed to query pool stats: %w", err)
}
} else {
return nil, fmt.Errorf("failed to query pool stats: %w", err)
}
}
if name.Valid {
pool.Name = name.String
}
if usedBytes.Valid {
pool.UsedBytes = usedBytes.Int64
}
if totalBytes.Valid {
pool.TotalBytes = totalBytes.Int64
}
// Calculate usage percent
if pool.TotalBytes > 0 {
pool.UsagePercent = float64(pool.UsedBytes) / float64(pool.TotalBytes) * 100
} else {
// If total is 0, set total to used to show 100% if there's data
if pool.UsedBytes > 0 {
pool.TotalBytes = pool.UsedBytes
pool.UsagePercent = 100.0
}
}
return &pool, nil
}
// ListStoragePools lists all storage pools from Bacula database
func (s *Service) ListStoragePools(ctx context.Context) ([]StoragePool, error) {
if s.baculaDB == nil {
return nil, fmt.Errorf("Bacula database connection not configured")
}
query := `
SELECT
p.PoolId,
p.Name,
COALESCE(p.PoolType, 'Backup') as PoolType,
p.LabelFormat,
p.Recycle,
p.AutoPrune,
COALESCE(COUNT(DISTINCT m.MediaId), 0) as volume_count,
COALESCE(SUM(m.VolBytes), 0) as used_bytes,
COALESCE(SUM(m.VolBytes), 0) as total_bytes
FROM Pool p
LEFT JOIN Media m ON p.PoolId = m.PoolId
GROUP BY p.PoolId, p.Name, p.PoolType, p.LabelFormat, p.Recycle, p.AutoPrune
ORDER BY p.Name
`
rows, err := s.baculaDB.QueryContext(ctx, query)
if err != nil {
return nil, fmt.Errorf("failed to query storage pools: %w", err)
}
defer rows.Close()
var pools []StoragePool
for rows.Next() {
var pool StoragePool
var labelFormat, poolType sql.NullString
var recycle, autoPrune sql.NullBool
var usedBytes, totalBytes sql.NullInt64
err := rows.Scan(
&pool.PoolID, &pool.Name, &poolType, &labelFormat,
&recycle, &autoPrune, &pool.VolumeCount,
&usedBytes, &totalBytes,
)
if err != nil {
s.logger.Error("Failed to scan pool row", "error", err)
continue
}
// Set default pool type if null
if poolType.Valid && poolType.String != "" {
pool.PoolType = poolType.String
} else {
pool.PoolType = "Backup" // Default
}
if labelFormat.Valid && labelFormat.String != "" {
pool.LabelFormat = &labelFormat.String
}
if recycle.Valid {
pool.Recycle = &recycle.Bool
}
if autoPrune.Valid {
pool.AutoPrune = &autoPrune.Bool
}
if usedBytes.Valid {
pool.UsedBytes = usedBytes.Int64
} else {
pool.UsedBytes = 0
}
if totalBytes.Valid {
pool.TotalBytes = totalBytes.Int64
} else {
pool.TotalBytes = 0
}
// Calculate usage percent
if pool.TotalBytes > 0 {
pool.UsagePercent = float64(pool.UsedBytes) / float64(pool.TotalBytes) * 100
} else if pool.UsedBytes > 0 {
pool.TotalBytes = pool.UsedBytes
pool.UsagePercent = 100.0
} else {
pool.UsagePercent = 0.0
}
s.logger.Debug("Loaded pool", "pool_id", pool.PoolID, "name", pool.Name, "type", pool.PoolType, "volumes", pool.VolumeCount)
pools = append(pools, pool)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating pool rows: %w", err)
}
return pools, nil
}
// ListStorageVolumes lists all storage volumes from Bacula database
func (s *Service) ListStorageVolumes(ctx context.Context, poolName string) ([]StorageVolume, error) {
if s.baculaDB == nil {
return nil, fmt.Errorf("Bacula database connection not configured")
}
query := `
SELECT
v.VolumeId,
v.MediaId,
v.VolumeName,
COALESCE(p.Name, 'Unknown') as pool_name,
m.MediaType,
v.VolStatus,
COALESCE(v.VolBytes, 0) as vol_bytes,
COALESCE(v.MaxVolBytes, 0) as max_vol_bytes,
COALESCE(v.VolFiles, 0) as vol_files,
v.VolRetention,
v.LastWritten,
COALESCE(v.RecycleCount, 0) as recycle_count
FROM Volumes v
LEFT JOIN Media m ON v.MediaId = m.MediaId
LEFT JOIN Pool p ON m.PoolId = p.PoolId
WHERE 1=1
`
args := []interface{}{}
argIndex := 1
if poolName != "" {
query += fmt.Sprintf(" AND p.Name = $%d", argIndex)
args = append(args, poolName)
argIndex++
}
query += " ORDER BY v.LastWritten DESC NULLS LAST, v.VolumeName"
rows, err := s.baculaDB.QueryContext(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("failed to query storage volumes: %w", err)
}
defer rows.Close()
var volumes []StorageVolume
for rows.Next() {
var vol StorageVolume
var volRetention, lastWritten sql.NullTime
err := rows.Scan(
&vol.VolumeID, &vol.MediaID, &vol.VolumeName, &vol.PoolName,
&vol.MediaType, &vol.VolStatus, &vol.VolBytes, &vol.MaxVolBytes,
&vol.VolFiles, &volRetention, &lastWritten, &vol.RecycleCount,
)
if err != nil {
s.logger.Error("Failed to scan volume row", "error", err)
continue
}
if volRetention.Valid {
vol.VolRetention = &volRetention.Time
}
if lastWritten.Valid {
vol.LastWritten = &lastWritten.Time
}
volumes = append(volumes, vol)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating volume rows: %w", err)
}
return volumes, nil
}
// ListStorageDaemons lists all storage daemons from Bacula database
func (s *Service) ListStorageDaemons(ctx context.Context) ([]StorageDaemon, error) {
if s.baculaDB == nil {
return nil, fmt.Errorf("Bacula database connection not configured")
}
query := `
SELECT
s.StorageId,
s.Name,
s.Address,
s.Port,
s.DeviceName,
s.MediaType
FROM Storage s
ORDER BY s.Name
`
rows, err := s.baculaDB.QueryContext(ctx, query)
if err != nil {
return nil, fmt.Errorf("failed to query storage daemons: %w", err)
}
defer rows.Close()
var daemons []StorageDaemon
for rows.Next() {
var daemon StorageDaemon
var address, deviceName, mediaType sql.NullString
var port sql.NullInt64
err := rows.Scan(
&daemon.StorageID, &daemon.Name, &address, &port,
&deviceName, &mediaType,
)
if err != nil {
s.logger.Error("Failed to scan storage daemon row", "error", err)
continue
}
if address.Valid {
daemon.Address = address.String
}
if port.Valid {
daemon.Port = int(port.Int64)
}
if deviceName.Valid {
daemon.DeviceName = deviceName.String
}
if mediaType.Valid {
daemon.MediaType = mediaType.String
}
// Default status to Online (could be enhanced with actual connection check)
daemon.Status = "Online"
daemons = append(daemons, daemon)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating storage daemon rows: %w", err)
}
return daemons, nil
}
// CreatePoolRequest represents a request to create a new storage pool
type CreatePoolRequest struct {
Name string `json:"name" binding:"required"`
PoolType string `json:"pool_type"` // Backup, Scratch, Recycle
LabelFormat *string `json:"label_format,omitempty"`
Recycle *bool `json:"recycle,omitempty"`
AutoPrune *bool `json:"auto_prune,omitempty"`
}
// CreateStoragePool creates a new storage pool in Bacula database
func (s *Service) CreateStoragePool(ctx context.Context, req CreatePoolRequest) (*StoragePool, error) {
if s.baculaDB == nil {
return nil, fmt.Errorf("Bacula database connection not configured")
}
// Validate pool name
if req.Name == "" {
return nil, fmt.Errorf("pool name is required")
}
// Check if pool already exists
var existingID int
err := s.baculaDB.QueryRowContext(ctx, "SELECT PoolId FROM Pool WHERE Name = $1", req.Name).Scan(&existingID)
if err == nil {
return nil, fmt.Errorf("pool with name %s already exists", req.Name)
} else if err != sql.ErrNoRows {
return nil, fmt.Errorf("failed to check existing pool: %w", err)
}
// Set defaults
poolType := req.PoolType
if poolType == "" {
poolType = "Backup" // Default to Backup pool
}
// Insert new pool
query := `
INSERT INTO Pool (Name, PoolType, LabelFormat, Recycle, AutoPrune)
VALUES ($1, $2, $3, $4, $5)
RETURNING PoolId, Name, PoolType, LabelFormat, Recycle, AutoPrune
`
var pool StoragePool
var labelFormat sql.NullString
var recycle, autoPrune sql.NullBool
err = s.baculaDB.QueryRowContext(ctx, query,
req.Name, poolType, req.LabelFormat, req.Recycle, req.AutoPrune,
).Scan(
&pool.PoolID, &pool.Name, &pool.PoolType, &labelFormat, &recycle, &autoPrune,
)
if err != nil {
return nil, fmt.Errorf("failed to create pool: %w", err)
}
if labelFormat.Valid {
pool.LabelFormat = &labelFormat.String
}
if recycle.Valid {
pool.Recycle = &recycle.Bool
}
if autoPrune.Valid {
pool.AutoPrune = &autoPrune.Bool
}
pool.VolumeCount = 0
pool.UsedBytes = 0
pool.TotalBytes = 0
pool.UsagePercent = 0
s.logger.Info("Storage pool created", "pool_id", pool.PoolID, "name", pool.Name, "type", pool.PoolType)
return &pool, nil
}
// DeleteStoragePool deletes a storage pool from Bacula database
func (s *Service) DeleteStoragePool(ctx context.Context, poolID int) error {
if s.baculaDB == nil {
return fmt.Errorf("Bacula database connection not configured")
}
// Check if pool exists and get name
var poolName string
err := s.baculaDB.QueryRowContext(ctx, "SELECT Name FROM Pool WHERE PoolId = $1", poolID).Scan(&poolName)
if err == sql.ErrNoRows {
return fmt.Errorf("pool not found")
} else if err != nil {
return fmt.Errorf("failed to check pool: %w", err)
}
// Check if pool has volumes
var volumeCount int
err = s.baculaDB.QueryRowContext(ctx, `
SELECT COUNT(*) FROM Media m
INNER JOIN Pool p ON m.PoolId = p.PoolId
WHERE p.PoolId = $1
`, poolID).Scan(&volumeCount)
if err != nil {
return fmt.Errorf("failed to check pool volumes: %w", err)
}
if volumeCount > 0 {
return fmt.Errorf("cannot delete pool %s: pool contains %d volumes. Please remove or move volumes first", poolName, volumeCount)
}
// Delete pool
_, err = s.baculaDB.ExecContext(ctx, "DELETE FROM Pool WHERE PoolId = $1", poolID)
if err != nil {
return fmt.Errorf("failed to delete pool: %w", err)
}
s.logger.Info("Storage pool deleted", "pool_id", poolID, "name", poolName)
return nil
}
// CreateVolumeRequest represents a request to create a new storage volume
type CreateVolumeRequest struct {
VolumeName string `json:"volume_name" binding:"required"`
PoolName string `json:"pool_name" binding:"required"`
MediaType string `json:"media_type"` // File, Tape, etc.
MaxVolBytes *int64 `json:"max_vol_bytes,omitempty"`
VolRetention *int `json:"vol_retention,omitempty"` // Retention period in days
}
// CreateStorageVolume creates a new storage volume in Bacula database
func (s *Service) CreateStorageVolume(ctx context.Context, req CreateVolumeRequest) (*StorageVolume, error) {
if s.baculaDB == nil {
return nil, fmt.Errorf("Bacula database connection not configured")
}
// Validate volume name
if req.VolumeName == "" {
return nil, fmt.Errorf("volume name is required")
}
// Get pool ID
var poolID int
err := s.baculaDB.QueryRowContext(ctx, "SELECT PoolId FROM Pool WHERE Name = $1", req.PoolName).Scan(&poolID)
if err == sql.ErrNoRows {
return nil, fmt.Errorf("pool %s not found", req.PoolName)
} else if err != nil {
return nil, fmt.Errorf("failed to get pool: %w", err)
}
// Set defaults
mediaType := req.MediaType
if mediaType == "" {
mediaType = "File" // Default to File for disk volumes
}
// Create Media entry first (Volumes table references Media)
var mediaID int
mediaQuery := `
INSERT INTO Media (PoolId, MediaType, VolumeName, VolBytes, VolFiles, VolStatus, LastWritten)
VALUES ($1, $2, $3, 0, 0, 'Append', NOW())
RETURNING MediaId
`
err = s.baculaDB.QueryRowContext(ctx, mediaQuery, poolID, mediaType, req.VolumeName).Scan(&mediaID)
if err != nil {
return nil, fmt.Errorf("failed to create media entry: %w", err)
}
// Create Volume entry
var maxVolBytes sql.NullInt64
if req.MaxVolBytes != nil {
maxVolBytes = sql.NullInt64{Int64: *req.MaxVolBytes, Valid: true}
}
var volRetention sql.NullTime
if req.VolRetention != nil {
retentionTime := time.Now().AddDate(0, 0, *req.VolRetention)
volRetention = sql.NullTime{Time: retentionTime, Valid: true}
}
volumeQuery := `
INSERT INTO Volumes (MediaId, VolumeName, VolBytes, MaxVolBytes, VolFiles, VolStatus, VolRetention, LastWritten)
VALUES ($1, $2, 0, $3, 0, 'Append', $4, NOW())
RETURNING VolumeId, MediaId, VolumeName, VolBytes, MaxVolBytes, VolFiles, VolRetention, LastWritten
`
var vol StorageVolume
var lastWritten sql.NullTime
err = s.baculaDB.QueryRowContext(ctx, volumeQuery,
mediaID, req.VolumeName, maxVolBytes, volRetention,
).Scan(
&vol.VolumeID, &vol.MediaID, &vol.VolumeName, &vol.VolBytes, &vol.MaxVolBytes,
&vol.VolFiles, &volRetention, &lastWritten,
)
if err != nil {
// Cleanup: delete media if volume creation fails
s.baculaDB.ExecContext(ctx, "DELETE FROM Media WHERE MediaId = $1", mediaID)
return nil, fmt.Errorf("failed to create volume: %w", err)
}
vol.PoolName = req.PoolName
vol.MediaType = mediaType
vol.VolStatus = "Append"
vol.RecycleCount = 0
if volRetention.Valid {
vol.VolRetention = &volRetention.Time
}
if lastWritten.Valid {
vol.LastWritten = &lastWritten.Time
}
s.logger.Info("Storage volume created", "volume_id", vol.VolumeID, "name", vol.VolumeName, "pool", req.PoolName)
return &vol, nil
}
// UpdateVolumeRequest represents a request to update a storage volume
type UpdateVolumeRequest struct {
MaxVolBytes *int64 `json:"max_vol_bytes,omitempty"`
VolRetention *int `json:"vol_retention,omitempty"` // Retention period in days
}
// UpdateStorageVolume updates a storage volume's meta-data in Bacula database
func (s *Service) UpdateStorageVolume(ctx context.Context, volumeID int, req UpdateVolumeRequest) (*StorageVolume, error) {
if s.baculaDB == nil {
return nil, fmt.Errorf("Bacula database connection not configured")
}
// Check if volume exists
var volumeName string
err := s.baculaDB.QueryRowContext(ctx, "SELECT VolumeName FROM Volumes WHERE VolumeId = $1", volumeID).Scan(&volumeName)
if err == sql.ErrNoRows {
return nil, fmt.Errorf("volume not found")
} else if err != nil {
return nil, fmt.Errorf("failed to check volume: %w", err)
}
// Build update query dynamically
updates := []string{}
args := []interface{}{}
argIndex := 1
if req.MaxVolBytes != nil {
updates = append(updates, fmt.Sprintf("MaxVolBytes = $%d", argIndex))
args = append(args, *req.MaxVolBytes)
argIndex++
}
if req.VolRetention != nil {
retentionTime := time.Now().AddDate(0, 0, *req.VolRetention)
updates = append(updates, fmt.Sprintf("VolRetention = $%d", argIndex))
args = append(args, retentionTime)
argIndex++
}
if len(updates) == 0 {
return nil, fmt.Errorf("no fields to update")
}
args = append(args, volumeID)
query := fmt.Sprintf("UPDATE Volumes SET %s WHERE VolumeId = $%d", strings.Join(updates, ", "), argIndex)
_, err = s.baculaDB.ExecContext(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("failed to update volume: %w", err)
}
// Get updated volume
volumes, err := s.ListStorageVolumes(ctx, "")
if err != nil {
return nil, fmt.Errorf("failed to get updated volume: %w", err)
}
for _, vol := range volumes {
if vol.VolumeID == volumeID {
s.logger.Info("Storage volume updated", "volume_id", volumeID, "name", volumeName)
return &vol, nil
}
}
return nil, fmt.Errorf("updated volume not found")
}
// DeleteStorageVolume deletes a storage volume from Bacula database
func (s *Service) DeleteStorageVolume(ctx context.Context, volumeID int) error {
if s.baculaDB == nil {
return fmt.Errorf("Bacula database connection not configured")
}
// Check if volume exists and get name
var volumeName string
var mediaID int
err := s.baculaDB.QueryRowContext(ctx, "SELECT VolumeName, MediaId FROM Volumes WHERE VolumeId = $1", volumeID).Scan(&volumeName, &mediaID)
if err == sql.ErrNoRows {
return fmt.Errorf("volume not found")
} else if err != nil {
return fmt.Errorf("failed to check volume: %w", err)
}
// Check if volume has data
var volBytes int64
err = s.baculaDB.QueryRowContext(ctx, "SELECT VolBytes FROM Volumes WHERE VolumeId = $1", volumeID).Scan(&volBytes)
if err != nil {
return fmt.Errorf("failed to check volume data: %w", err)
}
if volBytes > 0 {
return fmt.Errorf("cannot delete volume %s: volume contains data. Please purge or truncate first", volumeName)
}
// Delete volume
_, err = s.baculaDB.ExecContext(ctx, "DELETE FROM Volumes WHERE VolumeId = $1", volumeID)
if err != nil {
return fmt.Errorf("failed to delete volume: %w", err)
}
// Delete associated media entry
_, err = s.baculaDB.ExecContext(ctx, "DELETE FROM Media WHERE MediaId = $1", mediaID)
if err != nil {
s.logger.Warn("Failed to delete media entry", "media_id", mediaID, "error", err)
// Continue anyway, volume is deleted
}
s.logger.Info("Storage volume deleted", "volume_id", volumeID, "name", volumeName)
return nil
}
// Media represents a media entry from bconsole "list media"
type Media struct {
MediaID int `json:"media_id"`
VolumeName string `json:"volume_name"`
PoolName string `json:"pool_name"`
MediaType string `json:"media_type"`
Status string `json:"status"`
VolBytes int64 `json:"vol_bytes"`
MaxVolBytes int64 `json:"max_vol_bytes"`
VolFiles int `json:"vol_files"`
LastWritten string `json:"last_written,omitempty"`
RecycleCount int `json:"recycle_count"`
Slot int `json:"slot,omitempty"` // Slot number in library
InChanger int `json:"in_changer,omitempty"` // 1 if in changer, 0 if not
LibraryName string `json:"library_name,omitempty"` // Library name (for tape media)
}
// ListMedia lists all media from bconsole "list media" command
func (s *Service) ListMedia(ctx context.Context) ([]Media, error) {
// Execute bconsole command to list media
s.logger.Debug("Executing bconsole list media command")
output, err := s.ExecuteBconsoleCommand(ctx, "list media")
if err != nil {
s.logger.Error("Failed to execute bconsole list media", "error", err)
return nil, fmt.Errorf("failed to execute bconsole list media: %w", err)
}
previewLen := 500
if len(output) < previewLen {
previewLen = len(output)
}
s.logger.Debug("bconsole list media output", "output_length", len(output), "output_preview", output[:previewLen])
// Parse bconsole output
media := s.parseBconsoleMediaOutput(output)
s.logger.Debug("Parsed media from bconsole", "count", len(media))
// Enrich with pool names from database
if s.baculaDB != nil && len(media) > 0 {
media = s.enrichMediaWithPoolNames(ctx, media)
}
return media, nil
}
// enrichMediaWithPoolNames enriches media list with pool names from database
func (s *Service) enrichMediaWithPoolNames(ctx context.Context, media []Media) []Media {
// Create maps of media_id to pool_name and library_name
poolMap := make(map[int]string)
libraryMap := make(map[int]string)
if len(media) == 0 {
return media
}
// Query database to get pool names for all media
mediaIDs := make([]interface{}, len(media))
for i, m := range media {
mediaIDs[i] = m.MediaID
}
// Build query with placeholders
placeholders := make([]string, len(mediaIDs))
args := make([]interface{}, len(mediaIDs))
for i := range mediaIDs {
placeholders[i] = fmt.Sprintf("$%d", i+1)
args[i] = mediaIDs[i]
}
// First, get pool names
query := fmt.Sprintf(`
SELECT m.MediaId, COALESCE(p.Name, 'Unknown') as pool_name
FROM Media m
LEFT JOIN Pool p ON m.PoolId = p.PoolId
WHERE m.MediaId IN (%s)
`, strings.Join(placeholders, ","))
rows, err := s.baculaDB.QueryContext(ctx, query, args...)
if err != nil {
s.logger.Warn("Failed to query pool names for media", "error", err)
} else {
defer rows.Close()
for rows.Next() {
var mediaID int
var poolName string
if err := rows.Scan(&mediaID, &poolName); err == nil {
poolMap[mediaID] = poolName
}
}
}
// Get storage names for tape media
// Since Storage table doesn't have MediaType column, we'll use bconsole to match
// For each storage, check which media belong to it using "list volumes storage=<name>"
storageQuery := `
SELECT Name
FROM Storage
ORDER BY Name
`
storageRows, err := s.baculaDB.QueryContext(ctx, storageQuery)
if err == nil {
defer storageRows.Close()
var storageNames []string
for storageRows.Next() {
var storageName string
if err := storageRows.Scan(&storageName); err == nil {
storageNames = append(storageNames, storageName)
}
}
// For each storage, use bconsole to get list of media volumes
// This will tell us which media belong to which storage
for _, storageName := range storageNames {
// Skip file storages (File1, File2)
if strings.Contains(strings.ToLower(storageName), "file") {
continue
}
// Use bconsole to list volumes for this storage
cmd := fmt.Sprintf("list volumes storage=%s", storageName)
output, err := s.ExecuteBconsoleCommand(ctx, cmd)
if err != nil {
s.logger.Debug("Failed to get volumes for storage", "storage", storageName, "error", err)
continue
}
// Parse output to get media IDs
mediaIDs := s.parseMediaIDsFromBconsoleOutput(output)
for _, mediaID := range mediaIDs {
if mediaID > 0 {
libraryMap[mediaID] = storageName
}
}
}
}
// Update media with pool names and library names
for i := range media {
if poolName, ok := poolMap[media[i].MediaID]; ok {
media[i].PoolName = poolName
}
// Set library name for tape media that are in changer
if media[i].MediaType != "" && (strings.Contains(strings.ToLower(media[i].MediaType), "lto") || strings.Contains(strings.ToLower(media[i].MediaType), "tape")) {
if libraryName, ok := libraryMap[media[i].MediaID]; ok && libraryName != "" {
media[i].LibraryName = libraryName
} else if media[i].InChanger > 0 {
// If in changer but no storage name, use generic name
media[i].LibraryName = "Unknown Library"
}
}
}
return media
}
// parseMediaIDsFromBconsoleOutput parses media IDs from bconsole "list volumes storage=..." output
func (s *Service) parseMediaIDsFromBconsoleOutput(output string) []int {
var mediaIDs []int
lines := strings.Split(output, "\n")
inTable := false
headerFound := false
mediaIDColIndex := -1
for _, line := range lines {
line = strings.TrimSpace(line)
// Skip connection messages
if strings.Contains(line, "Connecting to Director") ||
strings.Contains(line, "Enter a period") ||
strings.Contains(line, "list volumes") ||
strings.Contains(line, "quit") ||
strings.Contains(line, "You have messages") ||
strings.Contains(line, "Automatically selected") ||
strings.Contains(line, "Using Catalog") ||
strings.Contains(line, "Pool:") {
continue
}
// Detect table header
if !headerFound && strings.Contains(line, "|") && strings.Contains(strings.ToLower(line), "mediaid") {
parts := strings.Split(line, "|")
for i, part := range parts {
if strings.Contains(strings.ToLower(strings.TrimSpace(part)), "mediaid") {
mediaIDColIndex = i
break
}
}
headerFound = true
inTable = true
continue
}
// Detect table separator
if strings.HasPrefix(line, "+") && strings.Contains(line, "-") {
continue
}
// Skip empty lines
if line == "" {
continue
}
// Parse table rows
if inTable && strings.Contains(line, "|") && mediaIDColIndex >= 0 {
parts := strings.Split(line, "|")
if mediaIDColIndex < len(parts) {
mediaIDStr := strings.TrimSpace(parts[mediaIDColIndex])
// Remove commas
mediaIDStr = strings.ReplaceAll(mediaIDStr, ",", "")
if mediaID, err := strconv.Atoi(mediaIDStr); err == nil && mediaID > 0 {
// Skip header row
if mediaIDStr != "mediaid" && mediaIDStr != "MediaId" {
mediaIDs = append(mediaIDs, mediaID)
}
}
}
}
}
return mediaIDs
}
// parseBconsoleMediaOutput parses bconsole "list media" output
func (s *Service) parseBconsoleMediaOutput(output string) []Media {
var mediaList []Media
lines := strings.Split(output, "\n")
inTable := false
headerFound := false
headerMap := make(map[string]int) // Map header name to column index
for _, line := range lines {
line = strings.TrimSpace(line)
// Skip connection messages and command echo
if strings.Contains(line, "Connecting to Director") ||
strings.Contains(line, "Enter a period") ||
strings.Contains(line, "list media") ||
strings.Contains(line, "quit") ||
strings.Contains(line, "You have messages") ||
strings.Contains(line, "Automatically selected") ||
strings.Contains(line, "Using Catalog") ||
strings.Contains(line, "Pool:") {
continue
}
// Detect table header - format: | mediaid | volumename | volstatus | ...
if !headerFound && strings.Contains(line, "|") && (strings.Contains(strings.ToLower(line), "mediaid") || strings.Contains(strings.ToLower(line), "volumename")) {
// Parse header to get column positions
parts := strings.Split(line, "|")
for i, part := range parts {
headerName := strings.ToLower(strings.TrimSpace(part))
if headerName != "" {
headerMap[headerName] = i
}
}
headerFound = true
inTable = true
s.logger.Debug("Found media table header", "headers", headerMap)
continue
}
// Detect table separator
if strings.HasPrefix(line, "+") && strings.Contains(line, "-") {
continue
}
// Skip empty lines
if line == "" {
continue
}
// Parse table rows
if inTable && strings.Contains(line, "|") {
parts := strings.Split(line, "|")
if len(parts) > 1 {
// Helper to get string value safely
getString := func(colName string) string {
if idx, ok := headerMap[colName]; ok && idx < len(parts) {
return strings.TrimSpace(parts[idx])
}
return ""
}
// Helper to get int value safely
getInt := func(colName string) int {
valStr := getString(colName)
// Remove commas from numbers
valStr = strings.ReplaceAll(valStr, ",", "")
if val, e := strconv.Atoi(valStr); e == nil {
return val
}
return 0
}
// Helper to get int64 value safely
getInt64 := func(colName string) int64 {
valStr := getString(colName)
// Remove commas from numbers
valStr = strings.ReplaceAll(valStr, ",", "")
if val, e := strconv.ParseInt(valStr, 10, 64); e == nil {
return val
}
return 0
}
mediaID := getInt("mediaid")
volumeName := getString("volumename")
volStatus := getString("volstatus")
volBytes := getInt64("volbytes")
volFiles := getInt("volfiles")
mediaType := getString("mediatype")
lastWritten := getString("lastwritten")
recycleCount := getInt("recycle")
slot := getInt("slot")
inChanger := getInt("inchanger")
// Skip header row or invalid rows
if mediaID == 0 || volumeName == "" || volumeName == "volumename" {
continue
}
// Get pool name - it's not in the table, we'll need to get it from database or set default
// For now, we'll use "Default" as pool name since bconsole list media doesn't show pool
poolName := "Default"
// MaxVolBytes is not in the output, we'll set to 0 for now
maxVolBytes := int64(0)
media := Media{
MediaID: mediaID,
VolumeName: volumeName,
PoolName: poolName,
MediaType: mediaType,
Status: volStatus,
VolBytes: volBytes,
MaxVolBytes: maxVolBytes,
VolFiles: volFiles,
LastWritten: lastWritten,
RecycleCount: recycleCount,
Slot: slot,
InChanger: inChanger,
}
mediaList = append(mediaList, media)
}
}
}
s.logger.Debug("Parsed media from bconsole", "count", len(mediaList))
return mediaList
}