Files
calypso/backend/internal/system/rrd.go

293 lines
8.2 KiB
Go

package system
import (
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/atlasos/calypso/internal/common/logger"
)
// RRDService handles RRD database operations for network monitoring
type RRDService struct {
logger *logger.Logger
rrdDir string
interfaceName string
}
// NewRRDService creates a new RRD service
func NewRRDService(log *logger.Logger, rrdDir string, interfaceName string) *RRDService {
return &RRDService{
logger: log,
rrdDir: rrdDir,
interfaceName: interfaceName,
}
}
// NetworkStats represents network interface statistics
type NetworkStats struct {
Interface string `json:"interface"`
RxBytes uint64 `json:"rx_bytes"`
TxBytes uint64 `json:"tx_bytes"`
RxPackets uint64 `json:"rx_packets"`
TxPackets uint64 `json:"tx_packets"`
Timestamp time.Time `json:"timestamp"`
}
// GetNetworkStats reads network statistics from /proc/net/dev
func (r *RRDService) GetNetworkStats(ctx context.Context, interfaceName string) (*NetworkStats, error) {
data, err := os.ReadFile("/proc/net/dev")
if err != nil {
return nil, fmt.Errorf("failed to read /proc/net/dev: %w", err)
}
lines := strings.Split(string(data), "\n")
for _, line := range lines {
line = strings.TrimSpace(line)
if !strings.HasPrefix(line, interfaceName+":") {
continue
}
// Parse line: interface: rx_bytes rx_packets ... tx_bytes tx_packets ...
parts := strings.Fields(line)
if len(parts) < 17 {
continue
}
// Extract statistics
// Format: interface: rx_bytes rx_packets rx_errs rx_drop ... tx_bytes tx_packets ...
rxBytes, err := strconv.ParseUint(parts[1], 10, 64)
if err != nil {
continue
}
rxPackets, err := strconv.ParseUint(parts[2], 10, 64)
if err != nil {
continue
}
txBytes, err := strconv.ParseUint(parts[9], 10, 64)
if err != nil {
continue
}
txPackets, err := strconv.ParseUint(parts[10], 10, 64)
if err != nil {
continue
}
return &NetworkStats{
Interface: interfaceName,
RxBytes: rxBytes,
TxBytes: txBytes,
RxPackets: rxPackets,
TxPackets: txPackets,
Timestamp: time.Now(),
}, nil
}
return nil, fmt.Errorf("interface %s not found in /proc/net/dev", interfaceName)
}
// InitializeRRD creates RRD database if it doesn't exist
func (r *RRDService) InitializeRRD(ctx context.Context) error {
// Ensure RRD directory exists
if err := os.MkdirAll(r.rrdDir, 0755); err != nil {
return fmt.Errorf("failed to create RRD directory: %w", err)
}
rrdFile := filepath.Join(r.rrdDir, fmt.Sprintf("network-%s.rrd", r.interfaceName))
// Check if RRD file already exists
if _, err := os.Stat(rrdFile); err == nil {
r.logger.Info("RRD file already exists", "file", rrdFile)
return nil
}
// Create RRD database
// Use COUNTER type to track cumulative bytes, RRD will calculate rate automatically
// DS:inbound:COUNTER:20:0:U - inbound cumulative bytes, 20s heartbeat
// DS:outbound:COUNTER:20:0:U - outbound cumulative bytes, 20s heartbeat
// RRA:AVERAGE:0.5:1:600 - 1 sample per step, 600 steps (100 minutes at 10s interval)
// RRA:AVERAGE:0.5:6:700 - 6 samples per step, 700 steps (11.6 hours at 1min interval)
// RRA:AVERAGE:0.5:60:730 - 60 samples per step, 730 steps (5 days at 1hour interval)
// RRA:MAX:0.5:1:600 - Max values for same intervals
// RRA:MAX:0.5:6:700
// RRA:MAX:0.5:60:730
cmd := exec.CommandContext(ctx, "rrdtool", "create", rrdFile,
"--step", "10", // 10 second step
"DS:inbound:COUNTER:20:0:U", // Inbound cumulative bytes, 20s heartbeat
"DS:outbound:COUNTER:20:0:U", // Outbound cumulative bytes, 20s heartbeat
"RRA:AVERAGE:0.5:1:600", // 10s resolution, 100 minutes
"RRA:AVERAGE:0.5:6:700", // 1min resolution, 11.6 hours
"RRA:AVERAGE:0.5:60:730", // 1hour resolution, 5 days
"RRA:MAX:0.5:1:600", // Max values
"RRA:MAX:0.5:6:700",
"RRA:MAX:0.5:60:730",
)
output, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("failed to create RRD: %s: %w", string(output), err)
}
r.logger.Info("RRD database created", "file", rrdFile)
return nil
}
// UpdateRRD updates RRD database with new network statistics
func (r *RRDService) UpdateRRD(ctx context.Context, stats *NetworkStats) error {
rrdFile := filepath.Join(r.rrdDir, fmt.Sprintf("network-%s.rrd", stats.Interface))
// Update with cumulative byte counts (COUNTER type)
// RRD will automatically calculate the rate (bytes per second)
cmd := exec.CommandContext(ctx, "rrdtool", "update", rrdFile,
fmt.Sprintf("%d:%d:%d", stats.Timestamp.Unix(), stats.RxBytes, stats.TxBytes),
)
output, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("failed to update RRD: %s: %w", string(output), err)
}
return nil
}
// FetchRRDData fetches data from RRD database for graphing
func (r *RRDService) FetchRRDData(ctx context.Context, startTime time.Time, endTime time.Time, resolution string) ([]NetworkDataPoint, error) {
rrdFile := filepath.Join(r.rrdDir, fmt.Sprintf("network-%s.rrd", r.interfaceName))
// Check if RRD file exists
if _, err := os.Stat(rrdFile); os.IsNotExist(err) {
return []NetworkDataPoint{}, nil
}
// Fetch data using rrdtool fetch
// Use AVERAGE consolidation with appropriate resolution
cmd := exec.CommandContext(ctx, "rrdtool", "fetch", rrdFile,
"AVERAGE",
"--start", fmt.Sprintf("%d", startTime.Unix()),
"--end", fmt.Sprintf("%d", endTime.Unix()),
"--resolution", resolution,
)
output, err := cmd.CombinedOutput()
if err != nil {
return nil, fmt.Errorf("failed to fetch RRD data: %s: %w", string(output), err)
}
// Parse rrdtool fetch output
// Format:
// inbound outbound
// 1234567890: 1.2345678901e+06 2.3456789012e+06
points := []NetworkDataPoint{}
lines := strings.Split(string(output), "\n")
// Skip header lines
dataStart := false
for _, line := range lines {
line = strings.TrimSpace(line)
if line == "" {
continue
}
// Check if this is the data section
if strings.Contains(line, "inbound") && strings.Contains(line, "outbound") {
dataStart = true
continue
}
if !dataStart {
continue
}
// Parse data line: timestamp: inbound_value outbound_value
parts := strings.Fields(line)
if len(parts) < 3 {
continue
}
// Parse timestamp
timestampStr := strings.TrimSuffix(parts[0], ":")
timestamp, err := strconv.ParseInt(timestampStr, 10, 64)
if err != nil {
continue
}
// Parse inbound (bytes per second from COUNTER, convert to Mbps)
inboundStr := parts[1]
inbound, err := strconv.ParseFloat(inboundStr, 64)
if err != nil || inbound < 0 {
// Skip NaN or negative values
continue
}
// Convert bytes per second to Mbps (bytes/s * 8 / 1000000)
inboundMbps := inbound * 8 / 1000000
// Parse outbound
outboundStr := parts[2]
outbound, err := strconv.ParseFloat(outboundStr, 64)
if err != nil || outbound < 0 {
// Skip NaN or negative values
continue
}
outboundMbps := outbound * 8 / 1000000
// Format time as MM:SS
t := time.Unix(timestamp, 0)
timeStr := fmt.Sprintf("%02d:%02d", t.Minute(), t.Second())
points = append(points, NetworkDataPoint{
Time: timeStr,
Inbound: inboundMbps,
Outbound: outboundMbps,
})
}
return points, nil
}
// NetworkDataPoint represents a single data point for graphing
type NetworkDataPoint struct {
Time string `json:"time"`
Inbound float64 `json:"inbound"` // Mbps
Outbound float64 `json:"outbound"` // Mbps
}
// StartCollector starts a background goroutine to periodically collect and update RRD
func (r *RRDService) StartCollector(ctx context.Context, interval time.Duration) error {
// Initialize RRD if needed
if err := r.InitializeRRD(ctx); err != nil {
return fmt.Errorf("failed to initialize RRD: %w", err)
}
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// Get current stats
stats, err := r.GetNetworkStats(ctx, r.interfaceName)
if err != nil {
r.logger.Warn("Failed to get network stats", "error", err)
continue
}
// Update RRD with cumulative byte counts
// RRD COUNTER type will automatically calculate rate
if err := r.UpdateRRD(ctx, stats); err != nil {
r.logger.Warn("Failed to update RRD", "error", err)
}
}
}
}()
return nil
}