Update frame work and workspace codebase
This commit is contained in:
438
internal/monitoring/collectors.go
Normal file
438
internal/monitoring/collectors.go
Normal file
@@ -0,0 +1,438 @@
|
||||
package monitoring
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/example/storage-appliance/internal/infra/osexec"
|
||||
"github.com/example/storage-appliance/internal/service"
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultTimeout = 5 * time.Second
|
||||
)
|
||||
|
||||
// MetricValue represents a single metric value
|
||||
type MetricValue struct {
|
||||
Name string
|
||||
Labels map[string]string
|
||||
Value float64
|
||||
Type string // "gauge" or "counter"
|
||||
}
|
||||
|
||||
// MetricCollection represents a collection of metrics
|
||||
type MetricCollection struct {
|
||||
Metrics []MetricValue
|
||||
Errors []string
|
||||
}
|
||||
|
||||
// Collector interface for different metric collectors
|
||||
type Collector interface {
|
||||
Collect(ctx context.Context) MetricCollection
|
||||
Name() string
|
||||
}
|
||||
|
||||
// ZFSCollector collects ZFS pool health and scrub status
|
||||
type ZFSCollector struct {
|
||||
ZFSSvc service.ZFSService
|
||||
Runner osexec.Runner
|
||||
}
|
||||
|
||||
func NewZFSCollector(zfsSvc service.ZFSService, runner osexec.Runner) *ZFSCollector {
|
||||
return &ZFSCollector{ZFSSvc: zfsSvc, Runner: runner}
|
||||
}
|
||||
|
||||
func (c *ZFSCollector) Name() string {
|
||||
return "zfs"
|
||||
}
|
||||
|
||||
func (c *ZFSCollector) Collect(ctx context.Context) MetricCollection {
|
||||
ctx, cancel := context.WithTimeout(ctx, DefaultTimeout)
|
||||
defer cancel()
|
||||
|
||||
collection := MetricCollection{
|
||||
Metrics: []MetricValue{},
|
||||
Errors: []string{},
|
||||
}
|
||||
|
||||
// Get pool list
|
||||
pools, err := c.ZFSSvc.ListPools(ctx)
|
||||
if err != nil {
|
||||
collection.Errors = append(collection.Errors, fmt.Sprintf("failed to list pools: %v", err))
|
||||
return collection
|
||||
}
|
||||
|
||||
for _, pool := range pools {
|
||||
// Pool health metric (1 = ONLINE, 0.5 = DEGRADED, 0 = FAULTED/OFFLINE)
|
||||
healthValue := 0.0
|
||||
switch strings.ToUpper(pool.Health) {
|
||||
case "ONLINE":
|
||||
healthValue = 1.0
|
||||
case "DEGRADED":
|
||||
healthValue = 0.5
|
||||
case "FAULTED", "OFFLINE", "UNAVAIL":
|
||||
healthValue = 0.0
|
||||
}
|
||||
|
||||
collection.Metrics = append(collection.Metrics, MetricValue{
|
||||
Name: "zfs_pool_health",
|
||||
Labels: map[string]string{"pool": pool.Name},
|
||||
Value: healthValue,
|
||||
Type: "gauge",
|
||||
})
|
||||
|
||||
// Get scrub status
|
||||
scrubStatus, err := c.getScrubStatus(ctx, pool.Name)
|
||||
if err != nil {
|
||||
collection.Errors = append(collection.Errors, fmt.Sprintf("failed to get scrub status for %s: %v", pool.Name, err))
|
||||
continue
|
||||
}
|
||||
|
||||
// Scrub in progress (1 = yes, 0 = no)
|
||||
scrubInProgress := 0.0
|
||||
if strings.Contains(scrubStatus, "scan: scrub in progress") {
|
||||
scrubInProgress = 1.0
|
||||
}
|
||||
|
||||
collection.Metrics = append(collection.Metrics, MetricValue{
|
||||
Name: "zfs_pool_scrub_in_progress",
|
||||
Labels: map[string]string{"pool": pool.Name},
|
||||
Value: scrubInProgress,
|
||||
Type: "gauge",
|
||||
})
|
||||
}
|
||||
|
||||
return collection
|
||||
}
|
||||
|
||||
func (c *ZFSCollector) getScrubStatus(ctx context.Context, pool string) (string, error) {
|
||||
out, _, _, err := osexec.ExecWithRunner(c.Runner, ctx, "zpool", "status", pool)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
for _, line := range strings.Split(out, "\n") {
|
||||
if strings.Contains(line, "scan:") {
|
||||
return strings.TrimSpace(line), nil
|
||||
}
|
||||
}
|
||||
return "no-scan", nil
|
||||
}
|
||||
|
||||
// SMARTCollector collects SMART health status
|
||||
type SMARTCollector struct {
|
||||
Runner osexec.Runner
|
||||
}
|
||||
|
||||
func NewSMARTCollector(runner osexec.Runner) *SMARTCollector {
|
||||
return &SMARTCollector{Runner: runner}
|
||||
}
|
||||
|
||||
func (c *SMARTCollector) Name() string {
|
||||
return "smart"
|
||||
}
|
||||
|
||||
func (c *SMARTCollector) Collect(ctx context.Context) MetricCollection {
|
||||
ctx, cancel := context.WithTimeout(ctx, DefaultTimeout)
|
||||
defer cancel()
|
||||
|
||||
collection := MetricCollection{
|
||||
Metrics: []MetricValue{},
|
||||
Errors: []string{},
|
||||
}
|
||||
|
||||
// List all disks (simplified - try common devices)
|
||||
// In a real implementation, you'd scan /dev/ or use lsblk
|
||||
commonDisks := []string{"sda", "sdb", "sdc", "nvme0n1", "nvme1n1"}
|
||||
disks := []string{}
|
||||
for _, d := range commonDisks {
|
||||
disks = append(disks, fmt.Sprintf("/dev/%s", d))
|
||||
}
|
||||
|
||||
// Check SMART health for each disk
|
||||
for _, disk := range disks {
|
||||
health, err := c.getSMARTHealth(ctx, disk)
|
||||
if err != nil {
|
||||
// Skip devices that don't exist or don't support SMART
|
||||
continue
|
||||
}
|
||||
|
||||
// SMART health: 1 = PASSED, 0 = FAILED
|
||||
healthValue := 0.0
|
||||
if strings.Contains(strings.ToUpper(health), "PASSED") {
|
||||
healthValue = 1.0
|
||||
}
|
||||
|
||||
collection.Metrics = append(collection.Metrics, MetricValue{
|
||||
Name: "smart_health",
|
||||
Labels: map[string]string{"device": disk},
|
||||
Value: healthValue,
|
||||
Type: "gauge",
|
||||
})
|
||||
}
|
||||
|
||||
return collection
|
||||
}
|
||||
|
||||
func (c *SMARTCollector) getSMARTHealth(ctx context.Context, device string) (string, error) {
|
||||
// Use smartctl -H to get health status
|
||||
out, _, code, err := osexec.ExecWithRunner(c.Runner, ctx, "smartctl", "-H", device)
|
||||
if err != nil || code != 0 {
|
||||
return "", fmt.Errorf("smartctl failed: %v", err)
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// ServiceCollector collects service states
|
||||
type ServiceCollector struct {
|
||||
Runner osexec.Runner
|
||||
}
|
||||
|
||||
func NewServiceCollector(runner osexec.Runner) *ServiceCollector {
|
||||
return &ServiceCollector{Runner: runner}
|
||||
}
|
||||
|
||||
func (c *ServiceCollector) Name() string {
|
||||
return "services"
|
||||
}
|
||||
|
||||
func (c *ServiceCollector) Collect(ctx context.Context) MetricCollection {
|
||||
ctx, cancel := context.WithTimeout(ctx, DefaultTimeout)
|
||||
defer cancel()
|
||||
|
||||
collection := MetricCollection{
|
||||
Metrics: []MetricValue{},
|
||||
Errors: []string{},
|
||||
}
|
||||
|
||||
services := []string{"nfs-server", "smbd", "iscsid", "iscsi", "minio"}
|
||||
|
||||
for _, svc := range services {
|
||||
status, err := c.getServiceStatus(ctx, svc)
|
||||
if err != nil {
|
||||
collection.Errors = append(collection.Errors, fmt.Sprintf("failed to check %s: %v", svc, err))
|
||||
continue
|
||||
}
|
||||
|
||||
// Service state: 1 = active/running, 0 = inactive/stopped
|
||||
stateValue := 0.0
|
||||
if strings.Contains(strings.ToLower(status), "active") || strings.Contains(strings.ToLower(status), "running") {
|
||||
stateValue = 1.0
|
||||
}
|
||||
|
||||
collection.Metrics = append(collection.Metrics, MetricValue{
|
||||
Name: "service_state",
|
||||
Labels: map[string]string{"service": svc},
|
||||
Value: stateValue,
|
||||
Type: "gauge",
|
||||
})
|
||||
}
|
||||
|
||||
return collection
|
||||
}
|
||||
|
||||
func (c *ServiceCollector) getServiceStatus(ctx context.Context, service string) (string, error) {
|
||||
// Try systemctl first
|
||||
out, _, code, err := osexec.ExecWithRunner(c.Runner, ctx, "systemctl", "is-active", service)
|
||||
if err == nil && code == 0 {
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// Fallback to checking process
|
||||
out, _, code, err = osexec.ExecWithRunner(c.Runner, ctx, "pgrep", "-f", service)
|
||||
if err == nil && code == 0 && strings.TrimSpace(out) != "" {
|
||||
return "running", nil
|
||||
}
|
||||
|
||||
return "inactive", nil
|
||||
}
|
||||
|
||||
// HostCollector collects host metrics from /proc
|
||||
type HostCollector struct{}
|
||||
|
||||
func NewHostCollector() *HostCollector {
|
||||
return &HostCollector{}
|
||||
}
|
||||
|
||||
func (c *HostCollector) Name() string {
|
||||
return "host"
|
||||
}
|
||||
|
||||
func (c *HostCollector) Collect(ctx context.Context) MetricCollection {
|
||||
ctx, cancel := context.WithTimeout(ctx, DefaultTimeout)
|
||||
defer cancel()
|
||||
|
||||
collection := MetricCollection{
|
||||
Metrics: []MetricValue{},
|
||||
Errors: []string{},
|
||||
}
|
||||
|
||||
// Load average
|
||||
loadavg, err := c.readLoadAvg()
|
||||
if err != nil {
|
||||
collection.Errors = append(collection.Errors, fmt.Sprintf("failed to read loadavg: %v", err))
|
||||
} else {
|
||||
collection.Metrics = append(collection.Metrics, MetricValue{
|
||||
Name: "host_load1",
|
||||
Labels: map[string]string{},
|
||||
Value: loadavg.Load1,
|
||||
Type: "gauge",
|
||||
})
|
||||
collection.Metrics = append(collection.Metrics, MetricValue{
|
||||
Name: "host_load5",
|
||||
Labels: map[string]string{},
|
||||
Value: loadavg.Load5,
|
||||
Type: "gauge",
|
||||
})
|
||||
collection.Metrics = append(collection.Metrics, MetricValue{
|
||||
Name: "host_load15",
|
||||
Labels: map[string]string{},
|
||||
Value: loadavg.Load15,
|
||||
Type: "gauge",
|
||||
})
|
||||
}
|
||||
|
||||
// Memory info
|
||||
meminfo, err := c.readMemInfo()
|
||||
if err != nil {
|
||||
collection.Errors = append(collection.Errors, fmt.Sprintf("failed to read meminfo: %v", err))
|
||||
} else {
|
||||
collection.Metrics = append(collection.Metrics, MetricValue{
|
||||
Name: "host_memory_total_bytes",
|
||||
Labels: map[string]string{},
|
||||
Value: meminfo.MemTotal,
|
||||
Type: "gauge",
|
||||
})
|
||||
collection.Metrics = append(collection.Metrics, MetricValue{
|
||||
Name: "host_memory_free_bytes",
|
||||
Labels: map[string]string{},
|
||||
Value: meminfo.MemFree,
|
||||
Type: "gauge",
|
||||
})
|
||||
collection.Metrics = append(collection.Metrics, MetricValue{
|
||||
Name: "host_memory_available_bytes",
|
||||
Labels: map[string]string{},
|
||||
Value: meminfo.MemAvailable,
|
||||
Type: "gauge",
|
||||
})
|
||||
}
|
||||
|
||||
// Disk IO (simplified - read from /proc/diskstats)
|
||||
diskIO, err := c.readDiskIO()
|
||||
if err != nil {
|
||||
collection.Errors = append(collection.Errors, fmt.Sprintf("failed to read disk IO: %v", err))
|
||||
} else {
|
||||
for device, io := range diskIO {
|
||||
collection.Metrics = append(collection.Metrics, MetricValue{
|
||||
Name: "host_disk_reads_completed",
|
||||
Labels: map[string]string{"device": device},
|
||||
Value: io.ReadsCompleted,
|
||||
Type: "counter",
|
||||
})
|
||||
collection.Metrics = append(collection.Metrics, MetricValue{
|
||||
Name: "host_disk_writes_completed",
|
||||
Labels: map[string]string{"device": device},
|
||||
Value: io.WritesCompleted,
|
||||
Type: "counter",
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return collection
|
||||
}
|
||||
|
||||
type LoadAvg struct {
|
||||
Load1 float64
|
||||
Load5 float64
|
||||
Load15 float64
|
||||
}
|
||||
|
||||
func (c *HostCollector) readLoadAvg() (LoadAvg, error) {
|
||||
data, err := os.ReadFile("/proc/loadavg")
|
||||
if err != nil {
|
||||
return LoadAvg{}, err
|
||||
}
|
||||
|
||||
fields := strings.Fields(string(data))
|
||||
if len(fields) < 3 {
|
||||
return LoadAvg{}, fmt.Errorf("invalid loadavg format")
|
||||
}
|
||||
|
||||
load1, _ := strconv.ParseFloat(fields[0], 64)
|
||||
load5, _ := strconv.ParseFloat(fields[1], 64)
|
||||
load15, _ := strconv.ParseFloat(fields[2], 64)
|
||||
|
||||
return LoadAvg{Load1: load1, Load5: load5, Load15: load15}, nil
|
||||
}
|
||||
|
||||
type MemInfo struct {
|
||||
MemTotal float64
|
||||
MemFree float64
|
||||
MemAvailable float64
|
||||
}
|
||||
|
||||
func (c *HostCollector) readMemInfo() (MemInfo, error) {
|
||||
data, err := os.ReadFile("/proc/meminfo")
|
||||
if err != nil {
|
||||
return MemInfo{}, err
|
||||
}
|
||||
|
||||
info := MemInfo{}
|
||||
lines := strings.Split(string(data), "\n")
|
||||
for _, line := range lines {
|
||||
fields := strings.Fields(line)
|
||||
if len(fields) < 2 {
|
||||
continue
|
||||
}
|
||||
key := strings.TrimSuffix(fields[0], ":")
|
||||
value, _ := strconv.ParseFloat(fields[1], 64)
|
||||
// Values are in KB, convert to bytes
|
||||
valueBytes := value * 1024
|
||||
|
||||
switch key {
|
||||
case "MemTotal":
|
||||
info.MemTotal = valueBytes
|
||||
case "MemFree":
|
||||
info.MemFree = valueBytes
|
||||
case "MemAvailable":
|
||||
info.MemAvailable = valueBytes
|
||||
}
|
||||
}
|
||||
|
||||
return info, nil
|
||||
}
|
||||
|
||||
type DiskIO struct {
|
||||
ReadsCompleted float64
|
||||
WritesCompleted float64
|
||||
}
|
||||
|
||||
func (c *HostCollector) readDiskIO() (map[string]DiskIO, error) {
|
||||
data, err := os.ReadFile("/proc/diskstats")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result := make(map[string]DiskIO)
|
||||
lines := strings.Split(string(data), "\n")
|
||||
for _, line := range lines {
|
||||
fields := strings.Fields(line)
|
||||
if len(fields) < 14 {
|
||||
continue
|
||||
}
|
||||
device := fields[2]
|
||||
reads, _ := strconv.ParseFloat(fields[3], 64)
|
||||
writes, _ := strconv.ParseFloat(fields[7], 64)
|
||||
|
||||
result[device] = DiskIO{
|
||||
ReadsCompleted: reads,
|
||||
WritesCompleted: writes,
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user