Files
calypso/backend/internal/bacula/handler.go
Othman H. Suseno 70b7841d1a add sources
2026-01-15 17:39:32 +07:00

677 lines
20 KiB
Go

package bacula
import (
"context"
"database/sql"
"encoding/json"
"net/http"
"time"
"github.com/atlasos/calypso/internal/common/database"
"github.com/atlasos/calypso/internal/common/logger"
"github.com/atlasos/calypso/internal/iam"
"github.com/gin-gonic/gin"
"github.com/lib/pq"
"go.uber.org/zap"
)
const (
requestTimeout = 5 * time.Second
maxHistoryEntries = 10
)
type Handler struct {
db *database.DB
logger *logger.Logger
}
func NewHandler(db *database.DB, log *logger.Logger) *Handler {
return &Handler{db: db, logger: log.WithFields(zap.String("component", "bacula-handler"))}
}
type RegisterRequest struct {
Hostname string `json:"hostname" binding:"required"`
IPAddress string `json:"ip_address" binding:"required"`
AgentVersion string `json:"agent_version"`
Status string `json:"status"`
BackupTypes []string `json:"backup_types" binding:"required"`
Metadata map[string]string `json:"metadata"`
}
type UpdateCapabilitiesRequest struct {
BackupTypes []string `json:"backup_types" binding:"required"`
Notes string `json:"notes"`
}
type PingRequest struct {
Status string `json:"status"`
}
type ClientResponse struct {
ID string `json:"id"`
Hostname string `json:"hostname"`
IPAddress string `json:"ip_address"`
AgentVersion string `json:"agent_version"`
Status string `json:"status"`
BackupTypes []string `json:"backup_types"`
PendingBackupTypes []string `json:"pending_backup_types,omitempty"`
PendingRequestedBy string `json:"pending_requested_by,omitempty"`
PendingRequestedAt *time.Time `json:"pending_requested_at,omitempty"`
PendingNotes string `json:"pending_notes,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
RegisteredBy string `json:"registered_by"`
LastSeen *time.Time `json:"last_seen,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
CapabilityHistory []CapabilityHistoryEntry `json:"capability_history,omitempty"`
}
type CapabilityHistoryEntry struct {
BackupTypes []string `json:"backup_types"`
Source string `json:"source"`
RequestedBy string `json:"requested_by,omitempty"`
RequestedAt time.Time `json:"requested_at"`
Notes string `json:"notes,omitempty"`
}
type PendingUpdateResponse struct {
BackupTypes []string `json:"backup_types"`
RequestedBy string `json:"requested_by,omitempty"`
RequestedAt time.Time `json:"requested_at"`
Notes string `json:"notes,omitempty"`
}
func (h *Handler) Register(c *gin.Context) {
var req RegisterRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request payload"})
return
}
if len(req.BackupTypes) == 0 {
c.JSON(http.StatusBadRequest, gin.H{"error": "backup_types is required"})
return
}
user, err := currentUser(c)
if err != nil {
c.JSON(http.StatusUnauthorized, gin.H{"error": "authentication required"})
return
}
ctx, cancel := context.WithTimeout(c.Request.Context(), requestTimeout)
defer cancel()
backupPayload, err := json.Marshal(req.BackupTypes)
if err != nil {
h.logger.Error("failed to marshal backup types", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to encode backup types"})
return
}
var metadataPayload []byte
if len(req.Metadata) > 0 {
metadataPayload, err = json.Marshal(req.Metadata)
if err != nil {
h.logger.Error("failed to marshal metadata", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to encode metadata"})
return
}
}
status := req.Status
if status == "" {
status = "online"
}
tx, err := h.db.BeginTx(ctx, nil)
if err != nil {
h.logger.Error("failed to begin database transaction", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to register client"})
return
}
defer tx.Rollback()
var row clientRow
err = tx.QueryRowContext(ctx, `
INSERT INTO bacula_clients (
hostname, ip_address, agent_version, status, backup_types, metadata,
registered_by_user_id, last_seen, updated_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, NOW(), NOW())
ON CONFLICT (hostname) DO UPDATE SET
ip_address = EXCLUDED.ip_address,
agent_version = EXCLUDED.agent_version,
status = EXCLUDED.status,
backup_types = EXCLUDED.backup_types,
metadata = COALESCE(EXCLUDED.metadata, bacula_clients.metadata),
registered_by_user_id = EXCLUDED.registered_by_user_id,
last_seen = EXCLUDED.last_seen,
updated_at = NOW()
RETURNING id, hostname, ip_address, agent_version, status, backup_types, metadata,
pending_backup_types, pending_requested_by, pending_requested_at, pending_notes,
registered_by_user_id, last_seen, created_at, updated_at
`, req.Hostname, req.IPAddress, req.AgentVersion, status, backupPayload, metadataPayload, user.ID).Scan(
&row.ID, &row.Hostname, &row.IPAddress, &row.AgentVersion, &row.Status, &row.BackupJSON,
&row.MetadataJSON, &row.PendingBackupJSON, &row.PendingRequestedBy, &row.PendingRequestedAt,
&row.PendingNotes, &row.RegisteredBy, &row.LastSeen, &row.CreatedAt, &row.UpdatedAt,
)
if err != nil {
h.logger.Error("failed to ensure bacula client", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to register client"})
return
}
resp, err := buildClientResponse(&row)
if err != nil {
h.logger.Error("failed to marshal client response", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to build response"})
return
}
if err := insertCapabilityHistory(ctx, tx, row.ID, req.BackupTypes, "agent", user.ID, "agent registration"); err != nil {
h.logger.Error("failed to record capability history", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to record capability history"})
return
}
if len(resp.PendingBackupTypes) > 0 && stringSlicesEqual(resp.PendingBackupTypes, resp.BackupTypes) {
if _, err := tx.ExecContext(ctx, `
UPDATE bacula_clients
SET pending_backup_types = NULL,
pending_requested_by = NULL,
pending_requested_at = NULL,
pending_notes = NULL,
updated_at = NOW()
WHERE id = $1
`, resp.ID); err != nil {
h.logger.Error("failed to clear pending capability update", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update client"})
return
}
resp.PendingBackupTypes = nil
resp.PendingRequestedBy = ""
resp.PendingRequestedAt = nil
resp.PendingNotes = ""
}
if err := tx.Commit(); err != nil {
h.logger.Error("failed to commit client registration", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to register client"})
return
}
c.JSON(http.StatusOK, resp)
}
func (h *Handler) UpdateCapabilities(c *gin.Context) {
var req UpdateCapabilitiesRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request payload"})
return
}
if len(req.BackupTypes) == 0 {
c.JSON(http.StatusBadRequest, gin.H{"error": "backup_types is required"})
return
}
user, err := currentUser(c)
if err != nil {
c.JSON(http.StatusUnauthorized, gin.H{"error": "authentication required"})
return
}
clientID := c.Param("id")
if clientID == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "client id is required"})
return
}
ctx, cancel := context.WithTimeout(c.Request.Context(), requestTimeout)
defer cancel()
tx, err := h.db.BeginTx(ctx, nil)
if err != nil {
h.logger.Error("failed to begin transaction", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "unable to update capabilities"})
return
}
defer tx.Rollback()
var exists bool
if err := tx.QueryRowContext(ctx, `SELECT EXISTS (SELECT 1 FROM bacula_clients WHERE id = $1)`, clientID).Scan(&exists); err != nil {
h.logger.Error("failed to verify client", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "unable to update client"})
return
}
if !exists {
c.JSON(http.StatusNotFound, gin.H{"error": "client not found"})
return
}
backupPayload, err := json.Marshal(req.BackupTypes)
if err != nil {
h.logger.Error("failed to marshal backup types", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to encode backup types"})
return
}
if _, err := tx.ExecContext(ctx, `
UPDATE bacula_clients
SET pending_backup_types = $1,
pending_requested_by = $2,
pending_requested_at = NOW(),
pending_notes = $3,
updated_at = NOW()
WHERE id = $4
`, backupPayload, user.ID, req.Notes, clientID); err != nil {
h.logger.Error("failed to mark pending update", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update client"})
return
}
if err := insertCapabilityHistory(ctx, tx, clientID, req.BackupTypes, "ui", user.ID, req.Notes); err != nil {
h.logger.Error("failed to insert capability history", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to record capability change"})
return
}
if err := tx.Commit(); err != nil {
h.logger.Error("failed to commit capability update", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update client"})
return
}
c.JSON(http.StatusOK, PendingUpdateResponse{
BackupTypes: req.BackupTypes,
RequestedBy: user.ID,
RequestedAt: time.Now(),
Notes: req.Notes,
})
}
func (h *Handler) GetPendingUpdate(c *gin.Context) {
clientID := c.Param("id")
if clientID == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "client id is required"})
return
}
ctx, cancel := context.WithTimeout(c.Request.Context(), requestTimeout)
defer cancel()
var pendingJSON []byte
var requestedBy sql.NullString
var requestedAt sql.NullTime
var notes sql.NullString
err := h.db.QueryRowContext(ctx, `
SELECT pending_backup_types, pending_requested_by, pending_requested_at, pending_notes
FROM bacula_clients
WHERE id = $1
`, clientID).Scan(&pendingJSON, &requestedBy, &requestedAt, &notes)
if err != nil {
if err == sql.ErrNoRows {
c.JSON(http.StatusNotFound, gin.H{"error": "client not found"})
return
}
h.logger.Error("failed to fetch pending update", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to read pending update"})
return
}
if len(pendingJSON) == 0 {
c.Status(http.StatusNoContent)
return
}
var backupTypes []string
if err := json.Unmarshal(pendingJSON, &backupTypes); err != nil {
h.logger.Error("failed to unmarshal pending backup types", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to read pending update"})
return
}
response := PendingUpdateResponse{
BackupTypes: backupTypes,
Notes: notes.String,
}
if requestedBy.Valid {
response.RequestedBy = requestedBy.String
}
if requestedAt.Valid {
response.RequestedAt = requestedAt.Time
}
c.JSON(http.StatusOK, response)
}
func (h *Handler) Ping(c *gin.Context) {
clientID := c.Param("id")
if clientID == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "client id is required"})
return
}
var req PingRequest
if err := c.ShouldBindJSON(&req); err != nil {
// swallow body if absent
}
ctx, cancel := context.WithTimeout(c.Request.Context(), requestTimeout)
defer cancel()
query := `
UPDATE bacula_clients
SET last_seen = NOW(),
status = COALESCE(NULLIF($2, ''), status),
updated_at = NOW()
WHERE id = $1
RETURNING id
`
var id string
err := h.db.QueryRowContext(ctx, query, clientID, req.Status).Scan(&id)
if err != nil {
if err == sql.ErrNoRows {
c.JSON(http.StatusNotFound, gin.H{"error": "client not found"})
return
}
h.logger.Error("failed to update heartbeat", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update client"})
return
}
c.Status(http.StatusNoContent)
}
func (h *Handler) ListClients(c *gin.Context) {
ctx, cancel := context.WithTimeout(c.Request.Context(), requestTimeout)
defer cancel()
rows, err := h.db.QueryContext(ctx, `
SELECT id, hostname, ip_address, agent_version, status, backup_types, metadata,
pending_backup_types, pending_requested_by, pending_requested_at, pending_notes,
registered_by_user_id, last_seen, created_at, updated_at
FROM bacula_clients
ORDER BY created_at DESC
`)
if err != nil {
h.logger.Error("failed to query clients", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to fetch clients"})
return
}
defer rows.Close()
var clients []*ClientResponse
var ids []string
for rows.Next() {
row := clientRow{}
if err := rows.Scan(&row.ID, &row.Hostname, &row.IPAddress, &row.AgentVersion, &row.Status,
&row.BackupJSON, &row.MetadataJSON, &row.PendingBackupJSON, &row.PendingRequestedBy,
&row.PendingRequestedAt, &row.PendingNotes, &row.RegisteredBy, &row.LastSeen,
&row.CreatedAt, &row.UpdatedAt); err != nil {
h.logger.Error("failed to scan client row", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to fetch clients"})
return
}
resp, err := buildClientResponse(&row)
if err != nil {
h.logger.Error("failed to build client response", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to fetch clients"})
return
}
clients = append(clients, resp)
ids = append(ids, resp.ID)
}
if len(ids) > 0 {
if err := h.attachHistory(ctx, ids, clients); err != nil {
h.logger.Error("failed to attach history", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to fetch client history"})
return
}
}
c.JSON(http.StatusOK, clients)
}
func (h *Handler) GetClient(c *gin.Context) {
clientID := c.Param("id")
if clientID == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "client id is required"})
return
}
ctx, cancel := context.WithTimeout(c.Request.Context(), requestTimeout)
defer cancel()
var row clientRow
err := h.db.QueryRowContext(ctx, `
SELECT id, hostname, ip_address, agent_version, status, backup_types, metadata,
pending_backup_types, pending_requested_by, pending_requested_at, pending_notes,
registered_by_user_id, last_seen, created_at, updated_at
FROM bacula_clients
WHERE id = $1
`, clientID).Scan(&row.ID, &row.Hostname, &row.IPAddress, &row.AgentVersion, &row.Status,
&row.BackupJSON, &row.MetadataJSON, &row.PendingBackupJSON, &row.PendingRequestedBy,
&row.PendingRequestedAt, &row.PendingNotes, &row.RegisteredBy, &row.LastSeen,
&row.CreatedAt, &row.UpdatedAt)
if err != nil {
if err == sql.ErrNoRows {
c.JSON(http.StatusNotFound, gin.H{"error": "client not found"})
return
}
h.logger.Error("failed to read client", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to fetch client"})
return
}
resp, err := buildClientResponse(&row)
if err != nil {
h.logger.Error("failed to build client response", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to fetch client"})
return
}
if err := h.attachHistory(ctx, []string{resp.ID}, []*ClientResponse{resp}); err != nil {
h.logger.Error("failed to attach history", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to fetch client history"})
return
}
c.JSON(http.StatusOK, resp)
}
func (h *Handler) attachHistory(ctx context.Context, ids []string, clients []*ClientResponse) error {
if len(ids) == 0 {
return nil
}
rows, err := h.db.QueryContext(ctx, `
SELECT client_id, backup_types, source, requested_by_user_id, requested_at, notes
FROM bacula_client_capability_history
WHERE client_id = ANY($1)
ORDER BY requested_at DESC
`, pq.Array(ids))
if err != nil {
return err
}
defer rows.Close()
clientMap := make(map[string]*ClientResponse)
for _, client := range clients {
clientMap[client.ID] = client
}
for rows.Next() {
var clientID string
var backupJSON []byte
var source string
var requestedBy sql.NullString
var requestedAt time.Time
var notes sql.NullString
if err := rows.Scan(&clientID, &backupJSON, &source, &requestedBy, &requestedAt, &notes); err != nil {
return err
}
resp, ok := clientMap[clientID]
if !ok {
continue
}
if len(resp.CapabilityHistory) >= maxHistoryEntries {
continue
}
var backupTypes []string
if err := json.Unmarshal(backupJSON, &backupTypes); err != nil {
return err
}
entry := CapabilityHistoryEntry{
BackupTypes: backupTypes,
Source: source,
RequestedAt: requestedAt,
}
if requestedBy.Valid {
entry.RequestedBy = requestedBy.String
}
if notes.Valid {
entry.Notes = notes.String
}
resp.CapabilityHistory = append(resp.CapabilityHistory, entry)
}
return nil
}
type clientRow struct {
ID string
Hostname string
IPAddress string
AgentVersion string
Status string
BackupJSON []byte
MetadataJSON []byte
PendingBackupJSON []byte
PendingRequestedBy sql.NullString
PendingRequestedAt sql.NullTime
PendingNotes sql.NullString
RegisteredBy string
LastSeen sql.NullTime
CreatedAt time.Time
UpdatedAt time.Time
}
func buildClientResponse(row *clientRow) (*ClientResponse, error) {
backupTypes, err := decodeStringSlice(row.BackupJSON)
if err != nil {
return nil, err
}
pendingTypes, err := decodeStringSlice(row.PendingBackupJSON)
if err != nil {
return nil, err
}
metadata, err := decodeMetadata(row.MetadataJSON)
if err != nil {
return nil, err
}
resp := &ClientResponse{
ID: row.ID,
Hostname: row.Hostname,
IPAddress: row.IPAddress,
AgentVersion: row.AgentVersion,
Status: row.Status,
BackupTypes: backupTypes,
Metadata: metadata,
RegisteredBy: row.RegisteredBy,
CreatedAt: row.CreatedAt,
UpdatedAt: row.UpdatedAt,
}
if len(pendingTypes) > 0 {
resp.PendingBackupTypes = pendingTypes
if row.PendingRequestedBy.Valid {
resp.PendingRequestedBy = row.PendingRequestedBy.String
}
if row.PendingRequestedAt.Valid {
resp.PendingRequestedAt = &row.PendingRequestedAt.Time
}
if row.PendingNotes.Valid {
resp.PendingNotes = row.PendingNotes.String
}
}
if row.LastSeen.Valid {
resp.LastSeen = &row.LastSeen.Time
}
return resp, nil
}
func currentUser(c *gin.Context) (*iam.User, error) {
user, exists := c.Get("user")
if !exists {
return nil, sql.ErrNoRows
}
authUser, ok := user.(*iam.User)
if !ok {
return nil, sql.ErrNoRows
}
return authUser, nil
}
func decodeStringSlice(data []byte) ([]string, error) {
if len(data) == 0 {
return []string{}, nil
}
var dst []string
if err := json.Unmarshal(data, &dst); err != nil {
return nil, err
}
return dst, nil
}
func decodeMetadata(data []byte) (map[string]interface{}, error) {
if len(data) == 0 {
return nil, nil
}
var metadata map[string]interface{}
if err := json.Unmarshal(data, &metadata); err != nil {
return nil, err
}
return metadata, nil
}
func insertCapabilityHistory(ctx context.Context, tx *sql.Tx, clientID string, backupTypes []string, source, requestedBy, notes string) error {
payload, err := json.Marshal(backupTypes)
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, `
INSERT INTO bacula_client_capability_history (
client_id, backup_types, source, requested_by_user_id, notes
) VALUES ($1, $2, $3, $4, $5)
`, clientID, payload, source, requestedBy, notes)
return err
}
func stringSlicesEqual(a, b []string) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}