160 lines
4.0 KiB
Go
160 lines
4.0 KiB
Go
package monitoring
|
|
|
|
import (
|
|
"encoding/json"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/atlasos/calypso/internal/common/logger"
|
|
"github.com/gorilla/websocket"
|
|
)
|
|
|
|
// EventType represents the type of event
|
|
type EventType string
|
|
|
|
const (
|
|
EventTypeAlert EventType = "alert"
|
|
EventTypeTask EventType = "task"
|
|
EventTypeSystem EventType = "system"
|
|
EventTypeStorage EventType = "storage"
|
|
EventTypeSCST EventType = "scst"
|
|
EventTypeTape EventType = "tape"
|
|
EventTypeVTL EventType = "vtl"
|
|
EventTypeMetrics EventType = "metrics"
|
|
)
|
|
|
|
// Event represents a system event
|
|
type Event struct {
|
|
Type EventType `json:"type"`
|
|
Timestamp time.Time `json:"timestamp"`
|
|
Data map[string]interface{} `json:"data"`
|
|
}
|
|
|
|
// EventHub manages WebSocket connections and broadcasts events
|
|
type EventHub struct {
|
|
clients map[*websocket.Conn]bool
|
|
broadcast chan *Event
|
|
register chan *websocket.Conn
|
|
unregister chan *websocket.Conn
|
|
mu sync.RWMutex
|
|
logger *logger.Logger
|
|
}
|
|
|
|
// NewEventHub creates a new event hub
|
|
func NewEventHub(log *logger.Logger) *EventHub {
|
|
return &EventHub{
|
|
clients: make(map[*websocket.Conn]bool),
|
|
broadcast: make(chan *Event, 256),
|
|
register: make(chan *websocket.Conn),
|
|
unregister: make(chan *websocket.Conn),
|
|
logger: log,
|
|
}
|
|
}
|
|
|
|
// Run starts the event hub
|
|
func (h *EventHub) Run() {
|
|
for {
|
|
select {
|
|
case conn := <-h.register:
|
|
h.mu.Lock()
|
|
h.clients[conn] = true
|
|
h.mu.Unlock()
|
|
h.logger.Info("WebSocket client connected", "total_clients", len(h.clients))
|
|
|
|
case conn := <-h.unregister:
|
|
h.mu.Lock()
|
|
if _, ok := h.clients[conn]; ok {
|
|
delete(h.clients, conn)
|
|
conn.Close()
|
|
}
|
|
h.mu.Unlock()
|
|
h.logger.Info("WebSocket client disconnected", "total_clients", len(h.clients))
|
|
|
|
case event := <-h.broadcast:
|
|
h.mu.RLock()
|
|
for conn := range h.clients {
|
|
select {
|
|
case <-time.After(5 * time.Second):
|
|
// Timeout - close connection
|
|
h.mu.RUnlock()
|
|
h.mu.Lock()
|
|
delete(h.clients, conn)
|
|
conn.Close()
|
|
h.mu.Unlock()
|
|
h.mu.RLock()
|
|
default:
|
|
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
|
if err := conn.WriteJSON(event); err != nil {
|
|
h.logger.Error("Failed to send event to client", "error", err)
|
|
h.mu.RUnlock()
|
|
h.mu.Lock()
|
|
delete(h.clients, conn)
|
|
conn.Close()
|
|
h.mu.Unlock()
|
|
h.mu.RLock()
|
|
}
|
|
}
|
|
}
|
|
h.mu.RUnlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
// Broadcast broadcasts an event to all connected clients
|
|
func (h *EventHub) Broadcast(eventType EventType, data map[string]interface{}) {
|
|
event := &Event{
|
|
Type: eventType,
|
|
Timestamp: time.Now(),
|
|
Data: data,
|
|
}
|
|
|
|
select {
|
|
case h.broadcast <- event:
|
|
default:
|
|
h.logger.Warn("Event broadcast channel full, dropping event", "type", eventType)
|
|
}
|
|
}
|
|
|
|
// BroadcastAlert broadcasts an alert event
|
|
func (h *EventHub) BroadcastAlert(alert *Alert) {
|
|
data := map[string]interface{}{
|
|
"id": alert.ID,
|
|
"severity": alert.Severity,
|
|
"source": alert.Source,
|
|
"title": alert.Title,
|
|
"message": alert.Message,
|
|
"resource_type": alert.ResourceType,
|
|
"resource_id": alert.ResourceID,
|
|
"is_acknowledged": alert.IsAcknowledged,
|
|
"created_at": alert.CreatedAt,
|
|
}
|
|
h.Broadcast(EventTypeAlert, data)
|
|
}
|
|
|
|
// BroadcastTaskUpdate broadcasts a task update event
|
|
func (h *EventHub) BroadcastTaskUpdate(taskID string, status string, progress int, message string) {
|
|
data := map[string]interface{}{
|
|
"task_id": taskID,
|
|
"status": status,
|
|
"progress": progress,
|
|
"message": message,
|
|
}
|
|
h.Broadcast(EventTypeTask, data)
|
|
}
|
|
|
|
// BroadcastMetrics broadcasts metrics update
|
|
func (h *EventHub) BroadcastMetrics(metrics *Metrics) {
|
|
data := make(map[string]interface{})
|
|
bytes, _ := json.Marshal(metrics)
|
|
json.Unmarshal(bytes, &data)
|
|
h.Broadcast(EventTypeMetrics, data)
|
|
}
|
|
|
|
// GetClientCount returns the number of connected clients
|
|
func (h *EventHub) GetClientCount() int {
|
|
h.mu.RLock()
|
|
defer h.mu.RUnlock()
|
|
return len(h.clients)
|
|
}
|
|
|