package monitoring import ( "encoding/json" "sync" "time" "github.com/atlasos/calypso/internal/common/logger" "github.com/gorilla/websocket" ) // EventType represents the type of event type EventType string const ( EventTypeAlert EventType = "alert" EventTypeTask EventType = "task" EventTypeSystem EventType = "system" EventTypeStorage EventType = "storage" EventTypeSCST EventType = "scst" EventTypeTape EventType = "tape" EventTypeVTL EventType = "vtl" EventTypeMetrics EventType = "metrics" ) // Event represents a system event type Event struct { Type EventType `json:"type"` Timestamp time.Time `json:"timestamp"` Data map[string]interface{} `json:"data"` } // EventHub manages WebSocket connections and broadcasts events type EventHub struct { clients map[*websocket.Conn]bool broadcast chan *Event register chan *websocket.Conn unregister chan *websocket.Conn mu sync.RWMutex logger *logger.Logger } // NewEventHub creates a new event hub func NewEventHub(log *logger.Logger) *EventHub { return &EventHub{ clients: make(map[*websocket.Conn]bool), broadcast: make(chan *Event, 256), register: make(chan *websocket.Conn), unregister: make(chan *websocket.Conn), logger: log, } } // Run starts the event hub func (h *EventHub) Run() { for { select { case conn := <-h.register: h.mu.Lock() h.clients[conn] = true h.mu.Unlock() h.logger.Info("WebSocket client connected", "total_clients", len(h.clients)) case conn := <-h.unregister: h.mu.Lock() if _, ok := h.clients[conn]; ok { delete(h.clients, conn) conn.Close() } h.mu.Unlock() h.logger.Info("WebSocket client disconnected", "total_clients", len(h.clients)) case event := <-h.broadcast: h.mu.RLock() for conn := range h.clients { select { case <-time.After(5 * time.Second): // Timeout - close connection h.mu.RUnlock() h.mu.Lock() delete(h.clients, conn) conn.Close() h.mu.Unlock() h.mu.RLock() default: conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) if err := conn.WriteJSON(event); err != nil { h.logger.Error("Failed to send event to client", "error", err) h.mu.RUnlock() h.mu.Lock() delete(h.clients, conn) conn.Close() h.mu.Unlock() h.mu.RLock() } } } h.mu.RUnlock() } } } // Broadcast broadcasts an event to all connected clients func (h *EventHub) Broadcast(eventType EventType, data map[string]interface{}) { event := &Event{ Type: eventType, Timestamp: time.Now(), Data: data, } select { case h.broadcast <- event: default: h.logger.Warn("Event broadcast channel full, dropping event", "type", eventType) } } // BroadcastAlert broadcasts an alert event func (h *EventHub) BroadcastAlert(alert *Alert) { data := map[string]interface{}{ "id": alert.ID, "severity": alert.Severity, "source": alert.Source, "title": alert.Title, "message": alert.Message, "resource_type": alert.ResourceType, "resource_id": alert.ResourceID, "is_acknowledged": alert.IsAcknowledged, "created_at": alert.CreatedAt, } h.Broadcast(EventTypeAlert, data) } // BroadcastTaskUpdate broadcasts a task update event func (h *EventHub) BroadcastTaskUpdate(taskID string, status string, progress int, message string) { data := map[string]interface{}{ "task_id": taskID, "status": status, "progress": progress, "message": message, } h.Broadcast(EventTypeTask, data) } // BroadcastMetrics broadcasts metrics update func (h *EventHub) BroadcastMetrics(metrics *Metrics) { data := make(map[string]interface{}) bytes, _ := json.Marshal(metrics) json.Unmarshal(bytes, &data) h.Broadcast(EventTypeMetrics, data) } // GetClientCount returns the number of connected clients func (h *EventHub) GetClientCount() int { h.mu.RLock() defer h.mu.RUnlock() return len(h.clients) }