234 lines
5.9 KiB
Go
234 lines
5.9 KiB
Go
package monitoring
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/atlasos/calypso/internal/common/database"
|
|
"github.com/atlasos/calypso/internal/common/logger"
|
|
)
|
|
|
|
// AlertRule represents a rule that can trigger alerts
|
|
type AlertRule struct {
|
|
ID string
|
|
Name string
|
|
Source AlertSource
|
|
Condition AlertCondition
|
|
Severity AlertSeverity
|
|
Enabled bool
|
|
Description string
|
|
}
|
|
|
|
// NewAlertRule creates a new alert rule (helper function)
|
|
func NewAlertRule(id, name string, source AlertSource, condition AlertCondition, severity AlertSeverity, enabled bool, description string) *AlertRule {
|
|
return &AlertRule{
|
|
ID: id,
|
|
Name: name,
|
|
Source: source,
|
|
Condition: condition,
|
|
Severity: severity,
|
|
Enabled: enabled,
|
|
Description: description,
|
|
}
|
|
}
|
|
|
|
// AlertCondition represents a condition that triggers an alert
|
|
type AlertCondition interface {
|
|
Evaluate(ctx context.Context, db *database.DB, logger *logger.Logger) (bool, *Alert, error)
|
|
}
|
|
|
|
// AlertRuleEngine manages alert rules and evaluation
|
|
type AlertRuleEngine struct {
|
|
db *database.DB
|
|
logger *logger.Logger
|
|
service *AlertService
|
|
rules []*AlertRule
|
|
interval time.Duration
|
|
stopCh chan struct{}
|
|
}
|
|
|
|
// NewAlertRuleEngine creates a new alert rule engine
|
|
func NewAlertRuleEngine(db *database.DB, log *logger.Logger, service *AlertService) *AlertRuleEngine {
|
|
return &AlertRuleEngine{
|
|
db: db,
|
|
logger: log,
|
|
service: service,
|
|
rules: []*AlertRule{},
|
|
interval: 30 * time.Second, // Check every 30 seconds
|
|
stopCh: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// RegisterRule registers an alert rule
|
|
func (e *AlertRuleEngine) RegisterRule(rule *AlertRule) {
|
|
e.rules = append(e.rules, rule)
|
|
e.logger.Info("Alert rule registered", "rule_id", rule.ID, "name", rule.Name)
|
|
}
|
|
|
|
// Start starts the alert rule engine background monitoring
|
|
func (e *AlertRuleEngine) Start(ctx context.Context) {
|
|
e.logger.Info("Starting alert rule engine", "interval", e.interval)
|
|
ticker := time.NewTicker(e.interval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
e.logger.Info("Alert rule engine stopped")
|
|
return
|
|
case <-e.stopCh:
|
|
e.logger.Info("Alert rule engine stopped")
|
|
return
|
|
case <-ticker.C:
|
|
e.evaluateRules(ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Stop stops the alert rule engine
|
|
func (e *AlertRuleEngine) Stop() {
|
|
close(e.stopCh)
|
|
}
|
|
|
|
// evaluateRules evaluates all registered rules
|
|
func (e *AlertRuleEngine) evaluateRules(ctx context.Context) {
|
|
for _, rule := range e.rules {
|
|
if !rule.Enabled {
|
|
continue
|
|
}
|
|
|
|
triggered, alert, err := rule.Condition.Evaluate(ctx, e.db, e.logger)
|
|
if err != nil {
|
|
e.logger.Error("Error evaluating alert rule",
|
|
"rule_id", rule.ID,
|
|
"rule_name", rule.Name,
|
|
"error", err,
|
|
)
|
|
continue
|
|
}
|
|
|
|
if triggered && alert != nil {
|
|
alert.Severity = rule.Severity
|
|
alert.Source = rule.Source
|
|
if err := e.service.CreateAlert(ctx, alert); err != nil {
|
|
e.logger.Error("Failed to create alert from rule",
|
|
"rule_id", rule.ID,
|
|
"error", err,
|
|
)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Built-in alert conditions
|
|
|
|
// StorageCapacityCondition checks if storage capacity is below threshold
|
|
type StorageCapacityCondition struct {
|
|
ThresholdPercent float64
|
|
}
|
|
|
|
func (c *StorageCapacityCondition) Evaluate(ctx context.Context, db *database.DB, logger *logger.Logger) (bool, *Alert, error) {
|
|
query := `
|
|
SELECT id, name, used_bytes, total_bytes
|
|
FROM disk_repositories
|
|
WHERE is_active = true
|
|
`
|
|
|
|
rows, err := db.QueryContext(ctx, query)
|
|
if err != nil {
|
|
return false, nil, fmt.Errorf("failed to query repositories: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
for rows.Next() {
|
|
var id, name string
|
|
var usedBytes, totalBytes int64
|
|
|
|
if err := rows.Scan(&id, &name, &usedBytes, &totalBytes); err != nil {
|
|
continue
|
|
}
|
|
|
|
if totalBytes == 0 {
|
|
continue
|
|
}
|
|
|
|
usagePercent := float64(usedBytes) / float64(totalBytes) * 100
|
|
|
|
if usagePercent >= c.ThresholdPercent {
|
|
alert := &Alert{
|
|
Title: fmt.Sprintf("Storage repository %s is %d%% full", name, int(usagePercent)),
|
|
Message: fmt.Sprintf("Repository %s has used %d%% of its capacity (%d/%d bytes)", name, int(usagePercent), usedBytes, totalBytes),
|
|
ResourceType: "repository",
|
|
ResourceID: id,
|
|
Metadata: map[string]interface{}{
|
|
"usage_percent": usagePercent,
|
|
"used_bytes": usedBytes,
|
|
"total_bytes": totalBytes,
|
|
},
|
|
}
|
|
return true, alert, nil
|
|
}
|
|
}
|
|
|
|
return false, nil, nil
|
|
}
|
|
|
|
// TaskFailureCondition checks for failed tasks
|
|
type TaskFailureCondition struct {
|
|
LookbackMinutes int
|
|
}
|
|
|
|
func (c *TaskFailureCondition) Evaluate(ctx context.Context, db *database.DB, logger *logger.Logger) (bool, *Alert, error) {
|
|
query := `
|
|
SELECT id, type, error_message, created_at
|
|
FROM tasks
|
|
WHERE status = 'failed'
|
|
AND created_at > NOW() - INTERVAL '%d minutes'
|
|
ORDER BY created_at DESC
|
|
LIMIT 1
|
|
`
|
|
|
|
rows, err := db.QueryContext(ctx, fmt.Sprintf(query, c.LookbackMinutes))
|
|
if err != nil {
|
|
return false, nil, fmt.Errorf("failed to query failed tasks: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
if rows.Next() {
|
|
var id, taskType, errorMsg string
|
|
var createdAt time.Time
|
|
|
|
if err := rows.Scan(&id, &taskType, &errorMsg, &createdAt); err != nil {
|
|
return false, nil, err
|
|
}
|
|
|
|
alert := &Alert{
|
|
Title: fmt.Sprintf("Task %s failed", taskType),
|
|
Message: errorMsg,
|
|
ResourceType: "task",
|
|
ResourceID: id,
|
|
Metadata: map[string]interface{}{
|
|
"task_type": taskType,
|
|
"created_at": createdAt,
|
|
},
|
|
}
|
|
return true, alert, nil
|
|
}
|
|
|
|
return false, nil, nil
|
|
}
|
|
|
|
// SystemServiceDownCondition checks if critical services are down
|
|
type SystemServiceDownCondition struct {
|
|
CriticalServices []string
|
|
}
|
|
|
|
func (c *SystemServiceDownCondition) Evaluate(ctx context.Context, db *database.DB, logger *logger.Logger) (bool, *Alert, error) {
|
|
// This would check systemd service status
|
|
// For now, we'll return false as this requires systemd integration
|
|
// This is a placeholder for future implementation
|
|
return false, nil, nil
|
|
}
|
|
|