package system import ( "context" "fmt" "os" "os/exec" "path/filepath" "strconv" "strings" "time" "github.com/atlasos/calypso/internal/common/logger" ) // RRDService handles RRD database operations for network monitoring type RRDService struct { logger *logger.Logger rrdDir string interfaceName string } // NewRRDService creates a new RRD service func NewRRDService(log *logger.Logger, rrdDir string, interfaceName string) *RRDService { return &RRDService{ logger: log, rrdDir: rrdDir, interfaceName: interfaceName, } } // NetworkStats represents network interface statistics type NetworkStats struct { Interface string `json:"interface"` RxBytes uint64 `json:"rx_bytes"` TxBytes uint64 `json:"tx_bytes"` RxPackets uint64 `json:"rx_packets"` TxPackets uint64 `json:"tx_packets"` Timestamp time.Time `json:"timestamp"` } // GetNetworkStats reads network statistics from /proc/net/dev func (r *RRDService) GetNetworkStats(ctx context.Context, interfaceName string) (*NetworkStats, error) { data, err := os.ReadFile("/proc/net/dev") if err != nil { return nil, fmt.Errorf("failed to read /proc/net/dev: %w", err) } lines := strings.Split(string(data), "\n") for _, line := range lines { line = strings.TrimSpace(line) if !strings.HasPrefix(line, interfaceName+":") { continue } // Parse line: interface: rx_bytes rx_packets ... tx_bytes tx_packets ... parts := strings.Fields(line) if len(parts) < 17 { continue } // Extract statistics // Format: interface: rx_bytes rx_packets rx_errs rx_drop ... tx_bytes tx_packets ... rxBytes, err := strconv.ParseUint(parts[1], 10, 64) if err != nil { continue } rxPackets, err := strconv.ParseUint(parts[2], 10, 64) if err != nil { continue } txBytes, err := strconv.ParseUint(parts[9], 10, 64) if err != nil { continue } txPackets, err := strconv.ParseUint(parts[10], 10, 64) if err != nil { continue } return &NetworkStats{ Interface: interfaceName, RxBytes: rxBytes, TxBytes: txBytes, RxPackets: rxPackets, TxPackets: txPackets, Timestamp: time.Now(), }, nil } return nil, fmt.Errorf("interface %s not found in /proc/net/dev", interfaceName) } // InitializeRRD creates RRD database if it doesn't exist func (r *RRDService) InitializeRRD(ctx context.Context) error { // Ensure RRD directory exists if err := os.MkdirAll(r.rrdDir, 0755); err != nil { return fmt.Errorf("failed to create RRD directory: %w", err) } rrdFile := filepath.Join(r.rrdDir, fmt.Sprintf("network-%s.rrd", r.interfaceName)) // Check if RRD file already exists if _, err := os.Stat(rrdFile); err == nil { r.logger.Info("RRD file already exists", "file", rrdFile) return nil } // Create RRD database // Use COUNTER type to track cumulative bytes, RRD will calculate rate automatically // DS:inbound:COUNTER:20:0:U - inbound cumulative bytes, 20s heartbeat // DS:outbound:COUNTER:20:0:U - outbound cumulative bytes, 20s heartbeat // RRA:AVERAGE:0.5:1:600 - 1 sample per step, 600 steps (100 minutes at 10s interval) // RRA:AVERAGE:0.5:6:700 - 6 samples per step, 700 steps (11.6 hours at 1min interval) // RRA:AVERAGE:0.5:60:730 - 60 samples per step, 730 steps (5 days at 1hour interval) // RRA:MAX:0.5:1:600 - Max values for same intervals // RRA:MAX:0.5:6:700 // RRA:MAX:0.5:60:730 cmd := exec.CommandContext(ctx, "rrdtool", "create", rrdFile, "--step", "10", // 10 second step "DS:inbound:COUNTER:20:0:U", // Inbound cumulative bytes, 20s heartbeat "DS:outbound:COUNTER:20:0:U", // Outbound cumulative bytes, 20s heartbeat "RRA:AVERAGE:0.5:1:600", // 10s resolution, 100 minutes "RRA:AVERAGE:0.5:6:700", // 1min resolution, 11.6 hours "RRA:AVERAGE:0.5:60:730", // 1hour resolution, 5 days "RRA:MAX:0.5:1:600", // Max values "RRA:MAX:0.5:6:700", "RRA:MAX:0.5:60:730", ) output, err := cmd.CombinedOutput() if err != nil { return fmt.Errorf("failed to create RRD: %s: %w", string(output), err) } r.logger.Info("RRD database created", "file", rrdFile) return nil } // UpdateRRD updates RRD database with new network statistics func (r *RRDService) UpdateRRD(ctx context.Context, stats *NetworkStats) error { rrdFile := filepath.Join(r.rrdDir, fmt.Sprintf("network-%s.rrd", stats.Interface)) // Update with cumulative byte counts (COUNTER type) // RRD will automatically calculate the rate (bytes per second) cmd := exec.CommandContext(ctx, "rrdtool", "update", rrdFile, fmt.Sprintf("%d:%d:%d", stats.Timestamp.Unix(), stats.RxBytes, stats.TxBytes), ) output, err := cmd.CombinedOutput() if err != nil { return fmt.Errorf("failed to update RRD: %s: %w", string(output), err) } return nil } // FetchRRDData fetches data from RRD database for graphing func (r *RRDService) FetchRRDData(ctx context.Context, startTime time.Time, endTime time.Time, resolution string) ([]NetworkDataPoint, error) { rrdFile := filepath.Join(r.rrdDir, fmt.Sprintf("network-%s.rrd", r.interfaceName)) // Check if RRD file exists if _, err := os.Stat(rrdFile); os.IsNotExist(err) { return []NetworkDataPoint{}, nil } // Fetch data using rrdtool fetch // Use AVERAGE consolidation with appropriate resolution cmd := exec.CommandContext(ctx, "rrdtool", "fetch", rrdFile, "AVERAGE", "--start", fmt.Sprintf("%d", startTime.Unix()), "--end", fmt.Sprintf("%d", endTime.Unix()), "--resolution", resolution, ) output, err := cmd.CombinedOutput() if err != nil { return nil, fmt.Errorf("failed to fetch RRD data: %s: %w", string(output), err) } // Parse rrdtool fetch output // Format: // inbound outbound // 1234567890: 1.2345678901e+06 2.3456789012e+06 points := []NetworkDataPoint{} lines := strings.Split(string(output), "\n") // Skip header lines dataStart := false for _, line := range lines { line = strings.TrimSpace(line) if line == "" { continue } // Check if this is the data section if strings.Contains(line, "inbound") && strings.Contains(line, "outbound") { dataStart = true continue } if !dataStart { continue } // Parse data line: timestamp: inbound_value outbound_value parts := strings.Fields(line) if len(parts) < 3 { continue } // Parse timestamp timestampStr := strings.TrimSuffix(parts[0], ":") timestamp, err := strconv.ParseInt(timestampStr, 10, 64) if err != nil { continue } // Parse inbound (bytes per second from COUNTER, convert to Mbps) inboundStr := parts[1] inbound, err := strconv.ParseFloat(inboundStr, 64) if err != nil || inbound < 0 { // Skip NaN or negative values continue } // Convert bytes per second to Mbps (bytes/s * 8 / 1000000) inboundMbps := inbound * 8 / 1000000 // Parse outbound outboundStr := parts[2] outbound, err := strconv.ParseFloat(outboundStr, 64) if err != nil || outbound < 0 { // Skip NaN or negative values continue } outboundMbps := outbound * 8 / 1000000 // Format time as MM:SS t := time.Unix(timestamp, 0) timeStr := fmt.Sprintf("%02d:%02d", t.Minute(), t.Second()) points = append(points, NetworkDataPoint{ Time: timeStr, Inbound: inboundMbps, Outbound: outboundMbps, }) } return points, nil } // NetworkDataPoint represents a single data point for graphing type NetworkDataPoint struct { Time string `json:"time"` Inbound float64 `json:"inbound"` // Mbps Outbound float64 `json:"outbound"` // Mbps } // StartCollector starts a background goroutine to periodically collect and update RRD func (r *RRDService) StartCollector(ctx context.Context, interval time.Duration) error { // Initialize RRD if needed if err := r.InitializeRRD(ctx); err != nil { return fmt.Errorf("failed to initialize RRD: %w", err) } go func() { ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: // Get current stats stats, err := r.GetNetworkStats(ctx, r.interfaceName) if err != nil { r.logger.Warn("Failed to get network stats", "error", err) continue } // Update RRD with cumulative byte counts // RRD COUNTER type will automatically calculate rate if err := r.UpdateRRD(ctx, stats); err != nil { r.logger.Warn("Failed to update RRD", "error", err) } } } }() return nil }