293 lines
8.2 KiB
Go
293 lines
8.2 KiB
Go
package system
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/atlasos/calypso/internal/common/logger"
|
|
)
|
|
|
|
// RRDService handles RRD database operations for network monitoring
|
|
type RRDService struct {
|
|
logger *logger.Logger
|
|
rrdDir string
|
|
interfaceName string
|
|
}
|
|
|
|
// NewRRDService creates a new RRD service
|
|
func NewRRDService(log *logger.Logger, rrdDir string, interfaceName string) *RRDService {
|
|
return &RRDService{
|
|
logger: log,
|
|
rrdDir: rrdDir,
|
|
interfaceName: interfaceName,
|
|
}
|
|
}
|
|
|
|
// NetworkStats represents network interface statistics
|
|
type NetworkStats struct {
|
|
Interface string `json:"interface"`
|
|
RxBytes uint64 `json:"rx_bytes"`
|
|
TxBytes uint64 `json:"tx_bytes"`
|
|
RxPackets uint64 `json:"rx_packets"`
|
|
TxPackets uint64 `json:"tx_packets"`
|
|
Timestamp time.Time `json:"timestamp"`
|
|
}
|
|
|
|
// GetNetworkStats reads network statistics from /proc/net/dev
|
|
func (r *RRDService) GetNetworkStats(ctx context.Context, interfaceName string) (*NetworkStats, error) {
|
|
data, err := os.ReadFile("/proc/net/dev")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to read /proc/net/dev: %w", err)
|
|
}
|
|
|
|
lines := strings.Split(string(data), "\n")
|
|
for _, line := range lines {
|
|
line = strings.TrimSpace(line)
|
|
if !strings.HasPrefix(line, interfaceName+":") {
|
|
continue
|
|
}
|
|
|
|
// Parse line: interface: rx_bytes rx_packets ... tx_bytes tx_packets ...
|
|
parts := strings.Fields(line)
|
|
if len(parts) < 17 {
|
|
continue
|
|
}
|
|
|
|
// Extract statistics
|
|
// Format: interface: rx_bytes rx_packets rx_errs rx_drop ... tx_bytes tx_packets ...
|
|
rxBytes, err := strconv.ParseUint(parts[1], 10, 64)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
rxPackets, err := strconv.ParseUint(parts[2], 10, 64)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
txBytes, err := strconv.ParseUint(parts[9], 10, 64)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
txPackets, err := strconv.ParseUint(parts[10], 10, 64)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
return &NetworkStats{
|
|
Interface: interfaceName,
|
|
RxBytes: rxBytes,
|
|
TxBytes: txBytes,
|
|
RxPackets: rxPackets,
|
|
TxPackets: txPackets,
|
|
Timestamp: time.Now(),
|
|
}, nil
|
|
}
|
|
|
|
return nil, fmt.Errorf("interface %s not found in /proc/net/dev", interfaceName)
|
|
}
|
|
|
|
// InitializeRRD creates RRD database if it doesn't exist
|
|
func (r *RRDService) InitializeRRD(ctx context.Context) error {
|
|
// Ensure RRD directory exists
|
|
if err := os.MkdirAll(r.rrdDir, 0755); err != nil {
|
|
return fmt.Errorf("failed to create RRD directory: %w", err)
|
|
}
|
|
|
|
rrdFile := filepath.Join(r.rrdDir, fmt.Sprintf("network-%s.rrd", r.interfaceName))
|
|
|
|
// Check if RRD file already exists
|
|
if _, err := os.Stat(rrdFile); err == nil {
|
|
r.logger.Info("RRD file already exists", "file", rrdFile)
|
|
return nil
|
|
}
|
|
|
|
// Create RRD database
|
|
// Use COUNTER type to track cumulative bytes, RRD will calculate rate automatically
|
|
// DS:inbound:COUNTER:20:0:U - inbound cumulative bytes, 20s heartbeat
|
|
// DS:outbound:COUNTER:20:0:U - outbound cumulative bytes, 20s heartbeat
|
|
// RRA:AVERAGE:0.5:1:600 - 1 sample per step, 600 steps (100 minutes at 10s interval)
|
|
// RRA:AVERAGE:0.5:6:700 - 6 samples per step, 700 steps (11.6 hours at 1min interval)
|
|
// RRA:AVERAGE:0.5:60:730 - 60 samples per step, 730 steps (5 days at 1hour interval)
|
|
// RRA:MAX:0.5:1:600 - Max values for same intervals
|
|
// RRA:MAX:0.5:6:700
|
|
// RRA:MAX:0.5:60:730
|
|
cmd := exec.CommandContext(ctx, "rrdtool", "create", rrdFile,
|
|
"--step", "10", // 10 second step
|
|
"DS:inbound:COUNTER:20:0:U", // Inbound cumulative bytes, 20s heartbeat
|
|
"DS:outbound:COUNTER:20:0:U", // Outbound cumulative bytes, 20s heartbeat
|
|
"RRA:AVERAGE:0.5:1:600", // 10s resolution, 100 minutes
|
|
"RRA:AVERAGE:0.5:6:700", // 1min resolution, 11.6 hours
|
|
"RRA:AVERAGE:0.5:60:730", // 1hour resolution, 5 days
|
|
"RRA:MAX:0.5:1:600", // Max values
|
|
"RRA:MAX:0.5:6:700",
|
|
"RRA:MAX:0.5:60:730",
|
|
)
|
|
|
|
output, err := cmd.CombinedOutput()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create RRD: %s: %w", string(output), err)
|
|
}
|
|
|
|
r.logger.Info("RRD database created", "file", rrdFile)
|
|
return nil
|
|
}
|
|
|
|
// UpdateRRD updates RRD database with new network statistics
|
|
func (r *RRDService) UpdateRRD(ctx context.Context, stats *NetworkStats) error {
|
|
rrdFile := filepath.Join(r.rrdDir, fmt.Sprintf("network-%s.rrd", stats.Interface))
|
|
|
|
// Update with cumulative byte counts (COUNTER type)
|
|
// RRD will automatically calculate the rate (bytes per second)
|
|
cmd := exec.CommandContext(ctx, "rrdtool", "update", rrdFile,
|
|
fmt.Sprintf("%d:%d:%d", stats.Timestamp.Unix(), stats.RxBytes, stats.TxBytes),
|
|
)
|
|
|
|
output, err := cmd.CombinedOutput()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to update RRD: %s: %w", string(output), err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// FetchRRDData fetches data from RRD database for graphing
|
|
func (r *RRDService) FetchRRDData(ctx context.Context, startTime time.Time, endTime time.Time, resolution string) ([]NetworkDataPoint, error) {
|
|
rrdFile := filepath.Join(r.rrdDir, fmt.Sprintf("network-%s.rrd", r.interfaceName))
|
|
|
|
// Check if RRD file exists
|
|
if _, err := os.Stat(rrdFile); os.IsNotExist(err) {
|
|
return []NetworkDataPoint{}, nil
|
|
}
|
|
|
|
// Fetch data using rrdtool fetch
|
|
// Use AVERAGE consolidation with appropriate resolution
|
|
cmd := exec.CommandContext(ctx, "rrdtool", "fetch", rrdFile,
|
|
"AVERAGE",
|
|
"--start", fmt.Sprintf("%d", startTime.Unix()),
|
|
"--end", fmt.Sprintf("%d", endTime.Unix()),
|
|
"--resolution", resolution,
|
|
)
|
|
|
|
output, err := cmd.CombinedOutput()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to fetch RRD data: %s: %w", string(output), err)
|
|
}
|
|
|
|
// Parse rrdtool fetch output
|
|
// Format:
|
|
// inbound outbound
|
|
// 1234567890: 1.2345678901e+06 2.3456789012e+06
|
|
points := []NetworkDataPoint{}
|
|
lines := strings.Split(string(output), "\n")
|
|
|
|
// Skip header lines
|
|
dataStart := false
|
|
for _, line := range lines {
|
|
line = strings.TrimSpace(line)
|
|
if line == "" {
|
|
continue
|
|
}
|
|
|
|
// Check if this is the data section
|
|
if strings.Contains(line, "inbound") && strings.Contains(line, "outbound") {
|
|
dataStart = true
|
|
continue
|
|
}
|
|
|
|
if !dataStart {
|
|
continue
|
|
}
|
|
|
|
// Parse data line: timestamp: inbound_value outbound_value
|
|
parts := strings.Fields(line)
|
|
if len(parts) < 3 {
|
|
continue
|
|
}
|
|
|
|
// Parse timestamp
|
|
timestampStr := strings.TrimSuffix(parts[0], ":")
|
|
timestamp, err := strconv.ParseInt(timestampStr, 10, 64)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
// Parse inbound (bytes per second from COUNTER, convert to Mbps)
|
|
inboundStr := parts[1]
|
|
inbound, err := strconv.ParseFloat(inboundStr, 64)
|
|
if err != nil || inbound < 0 {
|
|
// Skip NaN or negative values
|
|
continue
|
|
}
|
|
// Convert bytes per second to Mbps (bytes/s * 8 / 1000000)
|
|
inboundMbps := inbound * 8 / 1000000
|
|
|
|
// Parse outbound
|
|
outboundStr := parts[2]
|
|
outbound, err := strconv.ParseFloat(outboundStr, 64)
|
|
if err != nil || outbound < 0 {
|
|
// Skip NaN or negative values
|
|
continue
|
|
}
|
|
outboundMbps := outbound * 8 / 1000000
|
|
|
|
// Format time as MM:SS
|
|
t := time.Unix(timestamp, 0)
|
|
timeStr := fmt.Sprintf("%02d:%02d", t.Minute(), t.Second())
|
|
|
|
points = append(points, NetworkDataPoint{
|
|
Time: timeStr,
|
|
Inbound: inboundMbps,
|
|
Outbound: outboundMbps,
|
|
})
|
|
}
|
|
|
|
return points, nil
|
|
}
|
|
|
|
// NetworkDataPoint represents a single data point for graphing
|
|
type NetworkDataPoint struct {
|
|
Time string `json:"time"`
|
|
Inbound float64 `json:"inbound"` // Mbps
|
|
Outbound float64 `json:"outbound"` // Mbps
|
|
}
|
|
|
|
// StartCollector starts a background goroutine to periodically collect and update RRD
|
|
func (r *RRDService) StartCollector(ctx context.Context, interval time.Duration) error {
|
|
// Initialize RRD if needed
|
|
if err := r.InitializeRRD(ctx); err != nil {
|
|
return fmt.Errorf("failed to initialize RRD: %w", err)
|
|
}
|
|
|
|
go func() {
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
// Get current stats
|
|
stats, err := r.GetNetworkStats(ctx, r.interfaceName)
|
|
if err != nil {
|
|
r.logger.Warn("Failed to get network stats", "error", err)
|
|
continue
|
|
}
|
|
|
|
// Update RRD with cumulative byte counts
|
|
// RRD COUNTER type will automatically calculate rate
|
|
if err := r.UpdateRRD(ctx, stats); err != nil {
|
|
r.logger.Warn("Failed to update RRD", "error", err)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|