start working on the frontend side

This commit is contained in:
Warp Agent
2025-12-24 19:53:45 +00:00
parent 3aa0169af0
commit c962a223c6
84 changed files with 14761 additions and 58 deletions

View File

@@ -0,0 +1,383 @@
package monitoring
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"time"
"github.com/atlasos/calypso/internal/common/database"
"github.com/atlasos/calypso/internal/common/logger"
"github.com/google/uuid"
)
// AlertSeverity represents the severity level of an alert
type AlertSeverity string
const (
AlertSeverityInfo AlertSeverity = "info"
AlertSeverityWarning AlertSeverity = "warning"
AlertSeverityCritical AlertSeverity = "critical"
)
// AlertSource represents where the alert originated
type AlertSource string
const (
AlertSourceSystem AlertSource = "system"
AlertSourceStorage AlertSource = "storage"
AlertSourceSCST AlertSource = "scst"
AlertSourceTape AlertSource = "tape"
AlertSourceVTL AlertSource = "vtl"
AlertSourceTask AlertSource = "task"
AlertSourceAPI AlertSource = "api"
)
// Alert represents a system alert
type Alert struct {
ID string `json:"id"`
Severity AlertSeverity `json:"severity"`
Source AlertSource `json:"source"`
Title string `json:"title"`
Message string `json:"message"`
ResourceType string `json:"resource_type,omitempty"`
ResourceID string `json:"resource_id,omitempty"`
IsAcknowledged bool `json:"is_acknowledged"`
AcknowledgedBy string `json:"acknowledged_by,omitempty"`
AcknowledgedAt *time.Time `json:"acknowledged_at,omitempty"`
ResolvedAt *time.Time `json:"resolved_at,omitempty"`
CreatedAt time.Time `json:"created_at"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
// AlertService manages alerts
type AlertService struct {
db *database.DB
logger *logger.Logger
eventHub *EventHub
}
// NewAlertService creates a new alert service
func NewAlertService(db *database.DB, log *logger.Logger) *AlertService {
return &AlertService{
db: db,
logger: log,
}
}
// SetEventHub sets the event hub for broadcasting alerts
func (s *AlertService) SetEventHub(eventHub *EventHub) {
s.eventHub = eventHub
}
// CreateAlert creates a new alert
func (s *AlertService) CreateAlert(ctx context.Context, alert *Alert) error {
alert.ID = uuid.New().String()
alert.CreatedAt = time.Now()
var metadataJSON *string
if alert.Metadata != nil {
bytes, err := json.Marshal(alert.Metadata)
if err != nil {
return fmt.Errorf("failed to marshal metadata: %w", err)
}
jsonStr := string(bytes)
metadataJSON = &jsonStr
}
query := `
INSERT INTO alerts (id, severity, source, title, message, resource_type, resource_id, metadata)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`
_, err := s.db.ExecContext(ctx, query,
alert.ID,
string(alert.Severity),
string(alert.Source),
alert.Title,
alert.Message,
alert.ResourceType,
alert.ResourceID,
metadataJSON,
)
if err != nil {
return fmt.Errorf("failed to create alert: %w", err)
}
s.logger.Info("Alert created",
"alert_id", alert.ID,
"severity", alert.Severity,
"source", alert.Source,
"title", alert.Title,
)
// Broadcast alert via WebSocket if event hub is set
if s.eventHub != nil {
s.eventHub.BroadcastAlert(alert)
}
return nil
}
// ListAlerts retrieves alerts with optional filters
func (s *AlertService) ListAlerts(ctx context.Context, filters *AlertFilters) ([]*Alert, error) {
query := `
SELECT id, severity, source, title, message, resource_type, resource_id,
is_acknowledged, acknowledged_by, acknowledged_at, resolved_at,
created_at, metadata
FROM alerts
WHERE 1=1
`
args := []interface{}{}
argIndex := 1
if filters != nil {
if filters.Severity != "" {
query += fmt.Sprintf(" AND severity = $%d", argIndex)
args = append(args, string(filters.Severity))
argIndex++
}
if filters.Source != "" {
query += fmt.Sprintf(" AND source = $%d", argIndex)
args = append(args, string(filters.Source))
argIndex++
}
if filters.IsAcknowledged != nil {
query += fmt.Sprintf(" AND is_acknowledged = $%d", argIndex)
args = append(args, *filters.IsAcknowledged)
argIndex++
}
if filters.ResourceType != "" {
query += fmt.Sprintf(" AND resource_type = $%d", argIndex)
args = append(args, filters.ResourceType)
argIndex++
}
if filters.ResourceID != "" {
query += fmt.Sprintf(" AND resource_id = $%d", argIndex)
args = append(args, filters.ResourceID)
argIndex++
}
}
query += " ORDER BY created_at DESC"
if filters != nil && filters.Limit > 0 {
query += fmt.Sprintf(" LIMIT $%d", argIndex)
args = append(args, filters.Limit)
}
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("failed to query alerts: %w", err)
}
defer rows.Close()
var alerts []*Alert
for rows.Next() {
alert, err := s.scanAlert(rows)
if err != nil {
return nil, fmt.Errorf("failed to scan alert: %w", err)
}
alerts = append(alerts, alert)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating alerts: %w", err)
}
return alerts, nil
}
// GetAlert retrieves a single alert by ID
func (s *AlertService) GetAlert(ctx context.Context, alertID string) (*Alert, error) {
query := `
SELECT id, severity, source, title, message, resource_type, resource_id,
is_acknowledged, acknowledged_by, acknowledged_at, resolved_at,
created_at, metadata
FROM alerts
WHERE id = $1
`
row := s.db.QueryRowContext(ctx, query, alertID)
alert, err := s.scanAlertRow(row)
if err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("alert not found")
}
return nil, fmt.Errorf("failed to get alert: %w", err)
}
return alert, nil
}
// AcknowledgeAlert marks an alert as acknowledged
func (s *AlertService) AcknowledgeAlert(ctx context.Context, alertID string, userID string) error {
query := `
UPDATE alerts
SET is_acknowledged = true, acknowledged_by = $1, acknowledged_at = NOW()
WHERE id = $2 AND is_acknowledged = false
`
result, err := s.db.ExecContext(ctx, query, userID, alertID)
if err != nil {
return fmt.Errorf("failed to acknowledge alert: %w", err)
}
rows, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected: %w", err)
}
if rows == 0 {
return fmt.Errorf("alert not found or already acknowledged")
}
s.logger.Info("Alert acknowledged", "alert_id", alertID, "user_id", userID)
return nil
}
// ResolveAlert marks an alert as resolved
func (s *AlertService) ResolveAlert(ctx context.Context, alertID string) error {
query := `
UPDATE alerts
SET resolved_at = NOW()
WHERE id = $1 AND resolved_at IS NULL
`
result, err := s.db.ExecContext(ctx, query, alertID)
if err != nil {
return fmt.Errorf("failed to resolve alert: %w", err)
}
rows, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected: %w", err)
}
if rows == 0 {
return fmt.Errorf("alert not found or already resolved")
}
s.logger.Info("Alert resolved", "alert_id", alertID)
return nil
}
// DeleteAlert deletes an alert (soft delete by resolving it)
func (s *AlertService) DeleteAlert(ctx context.Context, alertID string) error {
// For safety, we'll just resolve it instead of hard delete
return s.ResolveAlert(ctx, alertID)
}
// AlertFilters represents filters for listing alerts
type AlertFilters struct {
Severity AlertSeverity
Source AlertSource
IsAcknowledged *bool
ResourceType string
ResourceID string
Limit int
}
// scanAlert scans a row into an Alert struct
func (s *AlertService) scanAlert(rows *sql.Rows) (*Alert, error) {
var alert Alert
var severity, source string
var resourceType, resourceID, acknowledgedBy sql.NullString
var acknowledgedAt, resolvedAt sql.NullTime
var metadata sql.NullString
err := rows.Scan(
&alert.ID,
&severity,
&source,
&alert.Title,
&alert.Message,
&resourceType,
&resourceID,
&alert.IsAcknowledged,
&acknowledgedBy,
&acknowledgedAt,
&resolvedAt,
&alert.CreatedAt,
&metadata,
)
if err != nil {
return nil, err
}
alert.Severity = AlertSeverity(severity)
alert.Source = AlertSource(source)
if resourceType.Valid {
alert.ResourceType = resourceType.String
}
if resourceID.Valid {
alert.ResourceID = resourceID.String
}
if acknowledgedBy.Valid {
alert.AcknowledgedBy = acknowledgedBy.String
}
if acknowledgedAt.Valid {
alert.AcknowledgedAt = &acknowledgedAt.Time
}
if resolvedAt.Valid {
alert.ResolvedAt = &resolvedAt.Time
}
if metadata.Valid && metadata.String != "" {
json.Unmarshal([]byte(metadata.String), &alert.Metadata)
}
return &alert, nil
}
// scanAlertRow scans a single row into an Alert struct
func (s *AlertService) scanAlertRow(row *sql.Row) (*Alert, error) {
var alert Alert
var severity, source string
var resourceType, resourceID, acknowledgedBy sql.NullString
var acknowledgedAt, resolvedAt sql.NullTime
var metadata sql.NullString
err := row.Scan(
&alert.ID,
&severity,
&source,
&alert.Title,
&alert.Message,
&resourceType,
&resourceID,
&alert.IsAcknowledged,
&acknowledgedBy,
&acknowledgedAt,
&resolvedAt,
&alert.CreatedAt,
&metadata,
)
if err != nil {
return nil, err
}
alert.Severity = AlertSeverity(severity)
alert.Source = AlertSource(source)
if resourceType.Valid {
alert.ResourceType = resourceType.String
}
if resourceID.Valid {
alert.ResourceID = resourceID.String
}
if acknowledgedBy.Valid {
alert.AcknowledgedBy = acknowledgedBy.String
}
if acknowledgedAt.Valid {
alert.AcknowledgedAt = &acknowledgedAt.Time
}
if resolvedAt.Valid {
alert.ResolvedAt = &resolvedAt.Time
}
if metadata.Valid && metadata.String != "" {
json.Unmarshal([]byte(metadata.String), &alert.Metadata)
}
return &alert, nil
}

View File

@@ -0,0 +1,159 @@
package monitoring
import (
"encoding/json"
"sync"
"time"
"github.com/atlasos/calypso/internal/common/logger"
"github.com/gorilla/websocket"
)
// EventType represents the type of event
type EventType string
const (
EventTypeAlert EventType = "alert"
EventTypeTask EventType = "task"
EventTypeSystem EventType = "system"
EventTypeStorage EventType = "storage"
EventTypeSCST EventType = "scst"
EventTypeTape EventType = "tape"
EventTypeVTL EventType = "vtl"
EventTypeMetrics EventType = "metrics"
)
// Event represents a system event
type Event struct {
Type EventType `json:"type"`
Timestamp time.Time `json:"timestamp"`
Data map[string]interface{} `json:"data"`
}
// EventHub manages WebSocket connections and broadcasts events
type EventHub struct {
clients map[*websocket.Conn]bool
broadcast chan *Event
register chan *websocket.Conn
unregister chan *websocket.Conn
mu sync.RWMutex
logger *logger.Logger
}
// NewEventHub creates a new event hub
func NewEventHub(log *logger.Logger) *EventHub {
return &EventHub{
clients: make(map[*websocket.Conn]bool),
broadcast: make(chan *Event, 256),
register: make(chan *websocket.Conn),
unregister: make(chan *websocket.Conn),
logger: log,
}
}
// Run starts the event hub
func (h *EventHub) Run() {
for {
select {
case conn := <-h.register:
h.mu.Lock()
h.clients[conn] = true
h.mu.Unlock()
h.logger.Info("WebSocket client connected", "total_clients", len(h.clients))
case conn := <-h.unregister:
h.mu.Lock()
if _, ok := h.clients[conn]; ok {
delete(h.clients, conn)
conn.Close()
}
h.mu.Unlock()
h.logger.Info("WebSocket client disconnected", "total_clients", len(h.clients))
case event := <-h.broadcast:
h.mu.RLock()
for conn := range h.clients {
select {
case <-time.After(5 * time.Second):
// Timeout - close connection
h.mu.RUnlock()
h.mu.Lock()
delete(h.clients, conn)
conn.Close()
h.mu.Unlock()
h.mu.RLock()
default:
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := conn.WriteJSON(event); err != nil {
h.logger.Error("Failed to send event to client", "error", err)
h.mu.RUnlock()
h.mu.Lock()
delete(h.clients, conn)
conn.Close()
h.mu.Unlock()
h.mu.RLock()
}
}
}
h.mu.RUnlock()
}
}
}
// Broadcast broadcasts an event to all connected clients
func (h *EventHub) Broadcast(eventType EventType, data map[string]interface{}) {
event := &Event{
Type: eventType,
Timestamp: time.Now(),
Data: data,
}
select {
case h.broadcast <- event:
default:
h.logger.Warn("Event broadcast channel full, dropping event", "type", eventType)
}
}
// BroadcastAlert broadcasts an alert event
func (h *EventHub) BroadcastAlert(alert *Alert) {
data := map[string]interface{}{
"id": alert.ID,
"severity": alert.Severity,
"source": alert.Source,
"title": alert.Title,
"message": alert.Message,
"resource_type": alert.ResourceType,
"resource_id": alert.ResourceID,
"is_acknowledged": alert.IsAcknowledged,
"created_at": alert.CreatedAt,
}
h.Broadcast(EventTypeAlert, data)
}
// BroadcastTaskUpdate broadcasts a task update event
func (h *EventHub) BroadcastTaskUpdate(taskID string, status string, progress int, message string) {
data := map[string]interface{}{
"task_id": taskID,
"status": status,
"progress": progress,
"message": message,
}
h.Broadcast(EventTypeTask, data)
}
// BroadcastMetrics broadcasts metrics update
func (h *EventHub) BroadcastMetrics(metrics *Metrics) {
data := make(map[string]interface{})
bytes, _ := json.Marshal(metrics)
json.Unmarshal(bytes, &data)
h.Broadcast(EventTypeMetrics, data)
}
// GetClientCount returns the number of connected clients
func (h *EventHub) GetClientCount() int {
h.mu.RLock()
defer h.mu.RUnlock()
return len(h.clients)
}

View File

@@ -0,0 +1,184 @@
package monitoring
import (
"net/http"
"strconv"
"time"
"github.com/atlasos/calypso/internal/common/database"
"github.com/atlasos/calypso/internal/common/logger"
"github.com/atlasos/calypso/internal/iam"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
// Handler handles monitoring API requests
type Handler struct {
alertService *AlertService
metricsService *MetricsService
eventHub *EventHub
db *database.DB
logger *logger.Logger
}
// NewHandler creates a new monitoring handler
func NewHandler(db *database.DB, log *logger.Logger, alertService *AlertService, metricsService *MetricsService, eventHub *EventHub) *Handler {
return &Handler{
alertService: alertService,
metricsService: metricsService,
eventHub: eventHub,
db: db,
logger: log,
}
}
// ListAlerts lists alerts with optional filters
func (h *Handler) ListAlerts(c *gin.Context) {
filters := &AlertFilters{}
// Parse query parameters
if severity := c.Query("severity"); severity != "" {
filters.Severity = AlertSeverity(severity)
}
if source := c.Query("source"); source != "" {
filters.Source = AlertSource(source)
}
if acknowledged := c.Query("acknowledged"); acknowledged != "" {
ack, err := strconv.ParseBool(acknowledged)
if err == nil {
filters.IsAcknowledged = &ack
}
}
if resourceType := c.Query("resource_type"); resourceType != "" {
filters.ResourceType = resourceType
}
if resourceID := c.Query("resource_id"); resourceID != "" {
filters.ResourceID = resourceID
}
if limitStr := c.Query("limit"); limitStr != "" {
if limit, err := strconv.Atoi(limitStr); err == nil && limit > 0 {
filters.Limit = limit
}
}
alerts, err := h.alertService.ListAlerts(c.Request.Context(), filters)
if err != nil {
h.logger.Error("Failed to list alerts", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list alerts"})
return
}
c.JSON(http.StatusOK, gin.H{"alerts": alerts})
}
// GetAlert retrieves a single alert
func (h *Handler) GetAlert(c *gin.Context) {
alertID := c.Param("id")
alert, err := h.alertService.GetAlert(c.Request.Context(), alertID)
if err != nil {
h.logger.Error("Failed to get alert", "alert_id", alertID, "error", err)
c.JSON(http.StatusNotFound, gin.H{"error": "alert not found"})
return
}
c.JSON(http.StatusOK, alert)
}
// AcknowledgeAlert acknowledges an alert
func (h *Handler) AcknowledgeAlert(c *gin.Context) {
alertID := c.Param("id")
// Get current user
user, exists := c.Get("user")
if !exists {
c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized"})
return
}
authUser, ok := user.(*iam.User)
if !ok {
c.JSON(http.StatusInternalServerError, gin.H{"error": "invalid user context"})
return
}
if err := h.alertService.AcknowledgeAlert(c.Request.Context(), alertID, authUser.ID); err != nil {
h.logger.Error("Failed to acknowledge alert", "alert_id", alertID, "error", err)
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"message": "alert acknowledged"})
}
// ResolveAlert resolves an alert
func (h *Handler) ResolveAlert(c *gin.Context) {
alertID := c.Param("id")
if err := h.alertService.ResolveAlert(c.Request.Context(), alertID); err != nil {
h.logger.Error("Failed to resolve alert", "alert_id", alertID, "error", err)
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"message": "alert resolved"})
}
// GetMetrics retrieves current system metrics
func (h *Handler) GetMetrics(c *gin.Context) {
metrics, err := h.metricsService.CollectMetrics(c.Request.Context())
if err != nil {
h.logger.Error("Failed to collect metrics", "error", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to collect metrics"})
return
}
c.JSON(http.StatusOK, metrics)
}
// WebSocketHandler handles WebSocket connections for event streaming
func (h *Handler) WebSocketHandler(c *gin.Context) {
// Upgrade connection to WebSocket
upgrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
// Allow all origins for now (should be restricted in production)
return true
},
}
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
if err != nil {
h.logger.Error("Failed to upgrade WebSocket connection", "error", err)
return
}
// Register client
h.eventHub.register <- conn
// Keep connection alive and handle ping/pong
go func() {
defer func() {
h.eventHub.unregister <- conn
}()
conn.SetReadDeadline(time.Now().Add(60 * time.Second))
conn.SetPongHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(60 * time.Second))
return nil
})
// Send ping every 30 seconds
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}()
}

View File

@@ -0,0 +1,201 @@
package monitoring
import (
"context"
"time"
"github.com/atlasos/calypso/internal/common/database"
"github.com/atlasos/calypso/internal/common/logger"
)
// HealthStatus represents the health status of a component
type HealthStatus string
const (
HealthStatusHealthy HealthStatus = "healthy"
HealthStatusDegraded HealthStatus = "degraded"
HealthStatusUnhealthy HealthStatus = "unhealthy"
HealthStatusUnknown HealthStatus = "unknown"
)
// ComponentHealth represents the health of a system component
type ComponentHealth struct {
Name string `json:"name"`
Status HealthStatus `json:"status"`
Message string `json:"message,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
// EnhancedHealth represents enhanced health check response
type EnhancedHealth struct {
Status string `json:"status"`
Service string `json:"service"`
Version string `json:"version,omitempty"`
Uptime int64 `json:"uptime_seconds"`
Components []ComponentHealth `json:"components"`
Timestamp time.Time `json:"timestamp"`
}
// HealthService provides enhanced health checking
type HealthService struct {
db *database.DB
logger *logger.Logger
startTime time.Time
metricsService *MetricsService
}
// NewHealthService creates a new health service
func NewHealthService(db *database.DB, log *logger.Logger, metricsService *MetricsService) *HealthService {
return &HealthService{
db: db,
logger: log,
startTime: time.Now(),
metricsService: metricsService,
}
}
// CheckHealth performs a comprehensive health check
func (s *HealthService) CheckHealth(ctx context.Context) *EnhancedHealth {
health := &EnhancedHealth{
Status: string(HealthStatusHealthy),
Service: "calypso-api",
Uptime: int64(time.Since(s.startTime).Seconds()),
Timestamp: time.Now(),
Components: []ComponentHealth{},
}
// Check database
dbHealth := s.checkDatabase(ctx)
health.Components = append(health.Components, dbHealth)
// Check storage
storageHealth := s.checkStorage(ctx)
health.Components = append(health.Components, storageHealth)
// Check SCST
scstHealth := s.checkSCST(ctx)
health.Components = append(health.Components, scstHealth)
// Determine overall status
hasUnhealthy := false
hasDegraded := false
for _, comp := range health.Components {
if comp.Status == HealthStatusUnhealthy {
hasUnhealthy = true
} else if comp.Status == HealthStatusDegraded {
hasDegraded = true
}
}
if hasUnhealthy {
health.Status = string(HealthStatusUnhealthy)
} else if hasDegraded {
health.Status = string(HealthStatusDegraded)
}
return health
}
// checkDatabase checks database health
func (s *HealthService) checkDatabase(ctx context.Context) ComponentHealth {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err := s.db.PingContext(ctx); err != nil {
return ComponentHealth{
Name: "database",
Status: HealthStatusUnhealthy,
Message: "Database connection failed: " + err.Error(),
Timestamp: time.Now(),
}
}
// Check if we can query
var count int
if err := s.db.QueryRowContext(ctx, "SELECT 1").Scan(&count); err != nil {
return ComponentHealth{
Name: "database",
Status: HealthStatusDegraded,
Message: "Database query failed: " + err.Error(),
Timestamp: time.Now(),
}
}
return ComponentHealth{
Name: "database",
Status: HealthStatusHealthy,
Timestamp: time.Now(),
}
}
// checkStorage checks storage component health
func (s *HealthService) checkStorage(ctx context.Context) ComponentHealth {
// Check if we have any active repositories
var count int
if err := s.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM disk_repositories WHERE is_active = true").Scan(&count); err != nil {
return ComponentHealth{
Name: "storage",
Status: HealthStatusDegraded,
Message: "Failed to query storage repositories",
Timestamp: time.Now(),
}
}
if count == 0 {
return ComponentHealth{
Name: "storage",
Status: HealthStatusDegraded,
Message: "No active storage repositories configured",
Timestamp: time.Now(),
}
}
// Check repository capacity
var usagePercent float64
query := `
SELECT COALESCE(
SUM(used_bytes)::float / NULLIF(SUM(total_bytes), 0) * 100,
0
)
FROM disk_repositories
WHERE is_active = true
`
if err := s.db.QueryRowContext(ctx, query).Scan(&usagePercent); err == nil {
if usagePercent > 95 {
return ComponentHealth{
Name: "storage",
Status: HealthStatusDegraded,
Message: "Storage repositories are nearly full",
Timestamp: time.Now(),
}
}
}
return ComponentHealth{
Name: "storage",
Status: HealthStatusHealthy,
Timestamp: time.Now(),
}
}
// checkSCST checks SCST component health
func (s *HealthService) checkSCST(ctx context.Context) ComponentHealth {
// Check if SCST targets exist
var count int
if err := s.db.QueryRowContext(ctx, "SELECT COUNT(*) FROM scst_targets").Scan(&count); err != nil {
return ComponentHealth{
Name: "scst",
Status: HealthStatusUnknown,
Message: "Failed to query SCST targets",
Timestamp: time.Now(),
}
}
// SCST is healthy if we can query it (even if no targets exist)
return ComponentHealth{
Name: "scst",
Status: HealthStatusHealthy,
Timestamp: time.Now(),
}
}

View File

@@ -0,0 +1,405 @@
package monitoring
import (
"context"
"database/sql"
"fmt"
"runtime"
"time"
"github.com/atlasos/calypso/internal/common/database"
"github.com/atlasos/calypso/internal/common/logger"
)
// Metrics represents system metrics
type Metrics struct {
System SystemMetrics `json:"system"`
Storage StorageMetrics `json:"storage"`
SCST SCSTMetrics `json:"scst"`
Tape TapeMetrics `json:"tape"`
VTL VTLMetrics `json:"vtl"`
Tasks TaskMetrics `json:"tasks"`
API APIMetrics `json:"api"`
CollectedAt time.Time `json:"collected_at"`
}
// SystemMetrics represents system-level metrics
type SystemMetrics struct {
CPUUsagePercent float64 `json:"cpu_usage_percent"`
MemoryUsed int64 `json:"memory_used_bytes"`
MemoryTotal int64 `json:"memory_total_bytes"`
MemoryPercent float64 `json:"memory_usage_percent"`
DiskUsed int64 `json:"disk_used_bytes"`
DiskTotal int64 `json:"disk_total_bytes"`
DiskPercent float64 `json:"disk_usage_percent"`
UptimeSeconds int64 `json:"uptime_seconds"`
}
// StorageMetrics represents storage metrics
type StorageMetrics struct {
TotalDisks int `json:"total_disks"`
TotalRepositories int `json:"total_repositories"`
TotalCapacityBytes int64 `json:"total_capacity_bytes"`
UsedCapacityBytes int64 `json:"used_capacity_bytes"`
AvailableBytes int64 `json:"available_bytes"`
UsagePercent float64 `json:"usage_percent"`
}
// SCSTMetrics represents SCST metrics
type SCSTMetrics struct {
TotalTargets int `json:"total_targets"`
TotalLUNs int `json:"total_luns"`
TotalInitiators int `json:"total_initiators"`
ActiveTargets int `json:"active_targets"`
}
// TapeMetrics represents physical tape metrics
type TapeMetrics struct {
TotalLibraries int `json:"total_libraries"`
TotalDrives int `json:"total_drives"`
TotalSlots int `json:"total_slots"`
OccupiedSlots int `json:"occupied_slots"`
}
// VTLMetrics represents virtual tape library metrics
type VTLMetrics struct {
TotalLibraries int `json:"total_libraries"`
TotalDrives int `json:"total_drives"`
TotalTapes int `json:"total_tapes"`
ActiveDrives int `json:"active_drives"`
LoadedTapes int `json:"loaded_tapes"`
}
// TaskMetrics represents task execution metrics
type TaskMetrics struct {
TotalTasks int `json:"total_tasks"`
PendingTasks int `json:"pending_tasks"`
RunningTasks int `json:"running_tasks"`
CompletedTasks int `json:"completed_tasks"`
FailedTasks int `json:"failed_tasks"`
AvgDurationSec float64 `json:"avg_duration_seconds"`
}
// APIMetrics represents API metrics
type APIMetrics struct {
TotalRequests int64 `json:"total_requests"`
RequestsPerSec float64 `json:"requests_per_second"`
ErrorRate float64 `json:"error_rate"`
AvgLatencyMs float64 `json:"avg_latency_ms"`
ActiveConnections int `json:"active_connections"`
}
// MetricsService collects and provides system metrics
type MetricsService struct {
db *database.DB
logger *logger.Logger
startTime time.Time
}
// NewMetricsService creates a new metrics service
func NewMetricsService(db *database.DB, log *logger.Logger) *MetricsService {
return &MetricsService{
db: db,
logger: log,
startTime: time.Now(),
}
}
// CollectMetrics collects all system metrics
func (s *MetricsService) CollectMetrics(ctx context.Context) (*Metrics, error) {
metrics := &Metrics{
CollectedAt: time.Now(),
}
// Collect system metrics
sysMetrics, err := s.collectSystemMetrics(ctx)
if err != nil {
s.logger.Error("Failed to collect system metrics", "error", err)
} else {
metrics.System = *sysMetrics
}
// Collect storage metrics
storageMetrics, err := s.collectStorageMetrics(ctx)
if err != nil {
s.logger.Error("Failed to collect storage metrics", "error", err)
} else {
metrics.Storage = *storageMetrics
}
// Collect SCST metrics
scstMetrics, err := s.collectSCSTMetrics(ctx)
if err != nil {
s.logger.Error("Failed to collect SCST metrics", "error", err)
} else {
metrics.SCST = *scstMetrics
}
// Collect tape metrics
tapeMetrics, err := s.collectTapeMetrics(ctx)
if err != nil {
s.logger.Error("Failed to collect tape metrics", "error", err)
} else {
metrics.Tape = *tapeMetrics
}
// Collect VTL metrics
vtlMetrics, err := s.collectVTLMetrics(ctx)
if err != nil {
s.logger.Error("Failed to collect VTL metrics", "error", err)
} else {
metrics.VTL = *vtlMetrics
}
// Collect task metrics
taskMetrics, err := s.collectTaskMetrics(ctx)
if err != nil {
s.logger.Error("Failed to collect task metrics", "error", err)
} else {
metrics.Tasks = *taskMetrics
}
// API metrics are collected separately via middleware
metrics.API = APIMetrics{} // Placeholder
return metrics, nil
}
// collectSystemMetrics collects system-level metrics
func (s *MetricsService) collectSystemMetrics(ctx context.Context) (*SystemMetrics, error) {
var m runtime.MemStats
runtime.ReadMemStats(&m)
// Get memory info
memoryUsed := int64(m.Alloc)
memoryTotal := int64(m.Sys)
memoryPercent := float64(memoryUsed) / float64(memoryTotal) * 100
// Uptime
uptime := time.Since(s.startTime).Seconds()
// CPU and disk would require external tools or system calls
// For now, we'll use placeholders
metrics := &SystemMetrics{
CPUUsagePercent: 0.0, // Would need to read from /proc/stat
MemoryUsed: memoryUsed,
MemoryTotal: memoryTotal,
MemoryPercent: memoryPercent,
DiskUsed: 0, // Would need to read from df
DiskTotal: 0,
DiskPercent: 0,
UptimeSeconds: int64(uptime),
}
return metrics, nil
}
// collectStorageMetrics collects storage metrics
func (s *MetricsService) collectStorageMetrics(ctx context.Context) (*StorageMetrics, error) {
// Count disks
diskQuery := `SELECT COUNT(*) FROM physical_disks WHERE is_active = true`
var totalDisks int
if err := s.db.QueryRowContext(ctx, diskQuery).Scan(&totalDisks); err != nil {
return nil, fmt.Errorf("failed to count disks: %w", err)
}
// Count repositories and calculate capacity
repoQuery := `
SELECT COUNT(*), COALESCE(SUM(total_bytes), 0), COALESCE(SUM(used_bytes), 0)
FROM disk_repositories
WHERE is_active = true
`
var totalRepos int
var totalCapacity, usedCapacity int64
if err := s.db.QueryRowContext(ctx, repoQuery).Scan(&totalRepos, &totalCapacity, &usedCapacity); err != nil {
return nil, fmt.Errorf("failed to query repositories: %w", err)
}
availableBytes := totalCapacity - usedCapacity
usagePercent := 0.0
if totalCapacity > 0 {
usagePercent = float64(usedCapacity) / float64(totalCapacity) * 100
}
return &StorageMetrics{
TotalDisks: totalDisks,
TotalRepositories: totalRepos,
TotalCapacityBytes: totalCapacity,
UsedCapacityBytes: usedCapacity,
AvailableBytes: availableBytes,
UsagePercent: usagePercent,
}, nil
}
// collectSCSTMetrics collects SCST metrics
func (s *MetricsService) collectSCSTMetrics(ctx context.Context) (*SCSTMetrics, error) {
// Count targets
targetQuery := `SELECT COUNT(*) FROM scst_targets`
var totalTargets int
if err := s.db.QueryRowContext(ctx, targetQuery).Scan(&totalTargets); err != nil {
return nil, fmt.Errorf("failed to count targets: %w", err)
}
// Count LUNs
lunQuery := `SELECT COUNT(*) FROM scst_luns`
var totalLUNs int
if err := s.db.QueryRowContext(ctx, lunQuery).Scan(&totalLUNs); err != nil {
return nil, fmt.Errorf("failed to count LUNs: %w", err)
}
// Count initiators
initQuery := `SELECT COUNT(*) FROM scst_initiators`
var totalInitiators int
if err := s.db.QueryRowContext(ctx, initQuery).Scan(&totalInitiators); err != nil {
return nil, fmt.Errorf("failed to count initiators: %w", err)
}
// Active targets (targets with at least one LUN)
activeQuery := `
SELECT COUNT(DISTINCT target_id)
FROM scst_luns
`
var activeTargets int
if err := s.db.QueryRowContext(ctx, activeQuery).Scan(&activeTargets); err != nil {
activeTargets = 0 // Not critical
}
return &SCSTMetrics{
TotalTargets: totalTargets,
TotalLUNs: totalLUNs,
TotalInitiators: totalInitiators,
ActiveTargets: activeTargets,
}, nil
}
// collectTapeMetrics collects physical tape metrics
func (s *MetricsService) collectTapeMetrics(ctx context.Context) (*TapeMetrics, error) {
// Count libraries
libQuery := `SELECT COUNT(*) FROM physical_tape_libraries`
var totalLibraries int
if err := s.db.QueryRowContext(ctx, libQuery).Scan(&totalLibraries); err != nil {
return nil, fmt.Errorf("failed to count libraries: %w", err)
}
// Count drives
driveQuery := `SELECT COUNT(*) FROM physical_tape_drives`
var totalDrives int
if err := s.db.QueryRowContext(ctx, driveQuery).Scan(&totalDrives); err != nil {
return nil, fmt.Errorf("failed to count drives: %w", err)
}
// Count slots
slotQuery := `
SELECT COUNT(*), COUNT(CASE WHEN tape_barcode IS NOT NULL THEN 1 END)
FROM physical_tape_slots
`
var totalSlots, occupiedSlots int
if err := s.db.QueryRowContext(ctx, slotQuery).Scan(&totalSlots, &occupiedSlots); err != nil {
return nil, fmt.Errorf("failed to count slots: %w", err)
}
return &TapeMetrics{
TotalLibraries: totalLibraries,
TotalDrives: totalDrives,
TotalSlots: totalSlots,
OccupiedSlots: occupiedSlots,
}, nil
}
// collectVTLMetrics collects VTL metrics
func (s *MetricsService) collectVTLMetrics(ctx context.Context) (*VTLMetrics, error) {
// Count libraries
libQuery := `SELECT COUNT(*) FROM virtual_tape_libraries`
var totalLibraries int
if err := s.db.QueryRowContext(ctx, libQuery).Scan(&totalLibraries); err != nil {
return nil, fmt.Errorf("failed to count VTL libraries: %w", err)
}
// Count drives
driveQuery := `SELECT COUNT(*) FROM virtual_tape_drives`
var totalDrives int
if err := s.db.QueryRowContext(ctx, driveQuery).Scan(&totalDrives); err != nil {
return nil, fmt.Errorf("failed to count VTL drives: %w", err)
}
// Count tapes
tapeQuery := `SELECT COUNT(*) FROM virtual_tapes`
var totalTapes int
if err := s.db.QueryRowContext(ctx, tapeQuery).Scan(&totalTapes); err != nil {
return nil, fmt.Errorf("failed to count VTL tapes: %w", err)
}
// Count active drives (drives with loaded tape)
activeQuery := `
SELECT COUNT(*)
FROM virtual_tape_drives
WHERE loaded_tape_id IS NOT NULL
`
var activeDrives int
if err := s.db.QueryRowContext(ctx, activeQuery).Scan(&activeDrives); err != nil {
activeDrives = 0
}
// Count loaded tapes
loadedQuery := `
SELECT COUNT(*)
FROM virtual_tapes
WHERE is_loaded = true
`
var loadedTapes int
if err := s.db.QueryRowContext(ctx, loadedQuery).Scan(&loadedTapes); err != nil {
loadedTapes = 0
}
return &VTLMetrics{
TotalLibraries: totalLibraries,
TotalDrives: totalDrives,
TotalTapes: totalTapes,
ActiveDrives: activeDrives,
LoadedTapes: loadedTapes,
}, nil
}
// collectTaskMetrics collects task execution metrics
func (s *MetricsService) collectTaskMetrics(ctx context.Context) (*TaskMetrics, error) {
// Count tasks by status
query := `
SELECT
COUNT(*) as total,
COUNT(*) FILTER (WHERE status = 'pending') as pending,
COUNT(*) FILTER (WHERE status = 'running') as running,
COUNT(*) FILTER (WHERE status = 'completed') as completed,
COUNT(*) FILTER (WHERE status = 'failed') as failed
FROM tasks
`
var total, pending, running, completed, failed int
if err := s.db.QueryRowContext(ctx, query).Scan(&total, &pending, &running, &completed, &failed); err != nil {
return nil, fmt.Errorf("failed to count tasks: %w", err)
}
// Calculate average duration for completed tasks
avgDurationQuery := `
SELECT AVG(EXTRACT(EPOCH FROM (completed_at - started_at)))
FROM tasks
WHERE status = 'completed' AND started_at IS NOT NULL AND completed_at IS NOT NULL
`
var avgDuration sql.NullFloat64
if err := s.db.QueryRowContext(ctx, avgDurationQuery).Scan(&avgDuration); err != nil {
avgDuration = sql.NullFloat64{Valid: false}
}
avgDurationSec := 0.0
if avgDuration.Valid {
avgDurationSec = avgDuration.Float64
}
return &TaskMetrics{
TotalTasks: total,
PendingTasks: pending,
RunningTasks: running,
CompletedTasks: completed,
FailedTasks: failed,
AvgDurationSec: avgDurationSec,
}, nil
}

View File

@@ -0,0 +1,233 @@
package monitoring
import (
"context"
"fmt"
"time"
"github.com/atlasos/calypso/internal/common/database"
"github.com/atlasos/calypso/internal/common/logger"
)
// AlertRule represents a rule that can trigger alerts
type AlertRule struct {
ID string
Name string
Source AlertSource
Condition AlertCondition
Severity AlertSeverity
Enabled bool
Description string
}
// NewAlertRule creates a new alert rule (helper function)
func NewAlertRule(id, name string, source AlertSource, condition AlertCondition, severity AlertSeverity, enabled bool, description string) *AlertRule {
return &AlertRule{
ID: id,
Name: name,
Source: source,
Condition: condition,
Severity: severity,
Enabled: enabled,
Description: description,
}
}
// AlertCondition represents a condition that triggers an alert
type AlertCondition interface {
Evaluate(ctx context.Context, db *database.DB, logger *logger.Logger) (bool, *Alert, error)
}
// AlertRuleEngine manages alert rules and evaluation
type AlertRuleEngine struct {
db *database.DB
logger *logger.Logger
service *AlertService
rules []*AlertRule
interval time.Duration
stopCh chan struct{}
}
// NewAlertRuleEngine creates a new alert rule engine
func NewAlertRuleEngine(db *database.DB, log *logger.Logger, service *AlertService) *AlertRuleEngine {
return &AlertRuleEngine{
db: db,
logger: log,
service: service,
rules: []*AlertRule{},
interval: 30 * time.Second, // Check every 30 seconds
stopCh: make(chan struct{}),
}
}
// RegisterRule registers an alert rule
func (e *AlertRuleEngine) RegisterRule(rule *AlertRule) {
e.rules = append(e.rules, rule)
e.logger.Info("Alert rule registered", "rule_id", rule.ID, "name", rule.Name)
}
// Start starts the alert rule engine background monitoring
func (e *AlertRuleEngine) Start(ctx context.Context) {
e.logger.Info("Starting alert rule engine", "interval", e.interval)
ticker := time.NewTicker(e.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
e.logger.Info("Alert rule engine stopped")
return
case <-e.stopCh:
e.logger.Info("Alert rule engine stopped")
return
case <-ticker.C:
e.evaluateRules(ctx)
}
}
}
// Stop stops the alert rule engine
func (e *AlertRuleEngine) Stop() {
close(e.stopCh)
}
// evaluateRules evaluates all registered rules
func (e *AlertRuleEngine) evaluateRules(ctx context.Context) {
for _, rule := range e.rules {
if !rule.Enabled {
continue
}
triggered, alert, err := rule.Condition.Evaluate(ctx, e.db, e.logger)
if err != nil {
e.logger.Error("Error evaluating alert rule",
"rule_id", rule.ID,
"rule_name", rule.Name,
"error", err,
)
continue
}
if triggered && alert != nil {
alert.Severity = rule.Severity
alert.Source = rule.Source
if err := e.service.CreateAlert(ctx, alert); err != nil {
e.logger.Error("Failed to create alert from rule",
"rule_id", rule.ID,
"error", err,
)
}
}
}
}
// Built-in alert conditions
// StorageCapacityCondition checks if storage capacity is below threshold
type StorageCapacityCondition struct {
ThresholdPercent float64
}
func (c *StorageCapacityCondition) Evaluate(ctx context.Context, db *database.DB, logger *logger.Logger) (bool, *Alert, error) {
query := `
SELECT id, name, used_bytes, total_bytes
FROM disk_repositories
WHERE is_active = true
`
rows, err := db.QueryContext(ctx, query)
if err != nil {
return false, nil, fmt.Errorf("failed to query repositories: %w", err)
}
defer rows.Close()
for rows.Next() {
var id, name string
var usedBytes, totalBytes int64
if err := rows.Scan(&id, &name, &usedBytes, &totalBytes); err != nil {
continue
}
if totalBytes == 0 {
continue
}
usagePercent := float64(usedBytes) / float64(totalBytes) * 100
if usagePercent >= c.ThresholdPercent {
alert := &Alert{
Title: fmt.Sprintf("Storage repository %s is %d%% full", name, int(usagePercent)),
Message: fmt.Sprintf("Repository %s has used %d%% of its capacity (%d/%d bytes)", name, int(usagePercent), usedBytes, totalBytes),
ResourceType: "repository",
ResourceID: id,
Metadata: map[string]interface{}{
"usage_percent": usagePercent,
"used_bytes": usedBytes,
"total_bytes": totalBytes,
},
}
return true, alert, nil
}
}
return false, nil, nil
}
// TaskFailureCondition checks for failed tasks
type TaskFailureCondition struct {
LookbackMinutes int
}
func (c *TaskFailureCondition) Evaluate(ctx context.Context, db *database.DB, logger *logger.Logger) (bool, *Alert, error) {
query := `
SELECT id, type, error_message, created_at
FROM tasks
WHERE status = 'failed'
AND created_at > NOW() - INTERVAL '%d minutes'
ORDER BY created_at DESC
LIMIT 1
`
rows, err := db.QueryContext(ctx, fmt.Sprintf(query, c.LookbackMinutes))
if err != nil {
return false, nil, fmt.Errorf("failed to query failed tasks: %w", err)
}
defer rows.Close()
if rows.Next() {
var id, taskType, errorMsg string
var createdAt time.Time
if err := rows.Scan(&id, &taskType, &errorMsg, &createdAt); err != nil {
return false, nil, err
}
alert := &Alert{
Title: fmt.Sprintf("Task %s failed", taskType),
Message: errorMsg,
ResourceType: "task",
ResourceID: id,
Metadata: map[string]interface{}{
"task_type": taskType,
"created_at": createdAt,
},
}
return true, alert, nil
}
return false, nil, nil
}
// SystemServiceDownCondition checks if critical services are down
type SystemServiceDownCondition struct {
CriticalServices []string
}
func (c *SystemServiceDownCondition) Evaluate(ctx context.Context, db *database.DB, logger *logger.Logger) (bool, *Alert, error) {
// This would check systemd service status
// For now, we'll return false as this requires systemd integration
// This is a placeholder for future implementation
return false, nil, nil
}