package api import ( "context" "crypto/rand" "encoding/hex" "encoding/json" "errors" "net/http" "strings" "github.com/go-chi/chi/v5" "jagacloud/node-agent/pkg/compute/libvirt" "jagacloud/node-agent/pkg/config" "jagacloud/node-agent/pkg/containers/lxc" "jagacloud/node-agent/pkg/containers/podman" "jagacloud/node-agent/pkg/tasks" "jagacloud/node-agent/pkg/validators" ) // Utility to write JSON. func writeJSON(w http.ResponseWriter, status int, v interface{}) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) _ = json.NewEncoder(w).Encode(v) } // Node info func handleNodeInfo(cfg config.Config, svc Services) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { // TODO: collect real stats (CPU, mem, storage, bridges) using cfg. writeJSON(w, http.StatusOK, map[string]interface{}{ "hostname": "TODO", "version": "0.1.0", }) } } // VM handlers func handleListVMs(cfg config.Config, svc Services) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { stored, _ := svc.Store.ListVMs() runtime, _ := svc.Libvirt.ListVMs() // Merge status from runtime into stored specs statusMap := map[string]string{} for _, vm := range runtime { statusMap[vm.ID] = vm.Status } out := []interface{}{} for _, spec := range stored { status := statusMap[spec.ID] out = append(out, map[string]interface{}{ "id": spec.ID, "name": spec.Name, "status": status, "cpus": spec.CPU, "memory_mb": spec.MemoryMB, }) } writeJSON(w, http.StatusOK, out) } } func handleCreateVM(cfg config.Config, svc Services) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { var spec libvirt.VMSpec if err := json.NewDecoder(r.Body).Decode(&spec); err != nil { writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()}) return } if spec.ID == "" && spec.Name != "" { spec.ID = spec.Name } if spec.ID == "" { writeJSON(w, http.StatusBadRequest, map[string]string{"error": "id or name required"}) return } if err := validators.CheckBridgeSet(spec.NICs); err != nil { writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()}) return } if err := validateVM(spec, cfg); err != nil { writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()}) return } if err := validators.CheckStoragePoolsVM(spec.Disks, cfg); err != nil { writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()}) return } if err := svc.Store.SaveVM(spec); err != nil { writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) return } taskID := enqueueWork(svc.Tasks, "vm.create", func(ctx context.Context) (interface{}, error) { unlock := svc.StoreLock(spec.ID) defer unlock() return svc.Libvirt.CreateVM(spec) }) writeJSON(w, http.StatusAccepted, map[string]string{"status": "queued", "task": taskID}) } } func handleGetVM(cfg config.Config, svc Services) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { id := chi.URLParam(r, "id") spec, err := svc.Store.LoadVM(id) if err == nil { writeJSON(w, http.StatusOK, spec) return } vms, err := svc.Libvirt.ListVMs() if err != nil { writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) return } for _, vm := range vms { if vm.ID == id || vm.Name == id { writeJSON(w, http.StatusOK, vm) return } } writeJSON(w, http.StatusNotFound, map[string]string{"error": "not found"}) } } func handleStartVM(cfg config.Config, svc Services) http.HandlerFunc { return lifecycleVM(cfg, svc, "start") } func handleStopVM(cfg config.Config, svc Services) http.HandlerFunc { return lifecycleVM(cfg, svc, "stop") } func handleRebootVM(cfg config.Config, svc Services) http.HandlerFunc { return lifecycleVM(cfg, svc, "reboot") } func handleDeleteVM(cfg config.Config, svc Services) http.HandlerFunc { return lifecycleVM(cfg, svc, "delete") } func lifecycleVM(cfg config.Config, svc Services, action string) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { id := chi.URLParam(r, "id") taskID := enqueueWork(svc.Tasks, "vm."+action, func(ctx context.Context) (interface{}, error) { unlock := svc.StoreLock(id) defer unlock() switch action { case "start": return nil, svc.Libvirt.StartVM(id) case "stop": return nil, svc.Libvirt.StopVM(id) case "reboot": return nil, svc.Libvirt.RebootVM(id) case "delete": if err := svc.Libvirt.DeleteVM(id); err != nil { return nil, err } _ = svc.Store.DeleteVM(id) return nil, nil default: return nil, errors.New("unsupported action") } }) writeJSON(w, http.StatusAccepted, map[string]string{"id": id, "action": action, "status": "queued", "task": taskID}) } } // Container (LXC) handlers func handleListCT(cfg config.Config, svc Services) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { stored, _ := svc.Store.ListCTs() runtime, _ := svc.LXC.List() statusMap := map[string]string{} for _, ct := range runtime { statusMap[ct.ID] = ct.Status } out := []interface{}{} for _, spec := range stored { status := statusMap[spec.ID] out = append(out, map[string]interface{}{ "id": spec.ID, "name": spec.Name, "status": status, "limits": spec.Limits, "nics": spec.NICs, "template": spec.Template, }) } writeJSON(w, http.StatusOK, out) } } func handleCreateCT(cfg config.Config, svc Services) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { var spec lxc.Spec if err := json.NewDecoder(r.Body).Decode(&spec); err != nil { writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()}) return } if spec.ID == "" && spec.Name != "" { spec.ID = spec.Name } if spec.ID == "" { writeJSON(w, http.StatusBadRequest, map[string]string{"error": "id or name required"}) return } if err := validators.CheckBridgeSetCT(spec.NICs); err != nil { writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()}) return } if err := validators.CheckStoragePoolsCT(spec, cfg); err != nil { writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()}) return } if err := svc.Store.SaveCT(spec); err != nil { writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) return } taskID := enqueueWork(svc.Tasks, "ct.create", func(ctx context.Context) (interface{}, error) { unlock := svc.StoreLock(spec.ID) defer unlock() return svc.LXC.Create(spec) }) writeJSON(w, http.StatusAccepted, map[string]string{"status": "queued", "task": taskID}) } } func handleGetCT(cfg config.Config, svc Services) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { id := chi.URLParam(r, "id") if spec, err := svc.Store.LoadCT(id); err == nil { writeJSON(w, http.StatusOK, spec) return } cts, err := svc.LXC.List() if err != nil { writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) return } for _, ct := range cts { if ct.ID == id || ct.Name == id { writeJSON(w, http.StatusOK, ct) return } } writeJSON(w, http.StatusNotFound, map[string]string{"error": "not found"}) } } func handleStartCT(cfg config.Config, svc Services) http.HandlerFunc { return lifecycleCT(cfg, svc, "start") } func handleStopCT(cfg config.Config, svc Services) http.HandlerFunc { return lifecycleCT(cfg, svc, "stop") } func handleDeleteCT(cfg config.Config, svc Services) http.HandlerFunc { return lifecycleCT(cfg, svc, "delete") } func lifecycleCT(cfg config.Config, svc Services, action string) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { id := chi.URLParam(r, "id") taskID := enqueueWork(svc.Tasks, "ct."+action, func(ctx context.Context) (interface{}, error) { unlock := svc.StoreLock(id) defer unlock() switch action { case "start": return nil, svc.LXC.Start(id) case "stop": return nil, svc.LXC.Stop(id) case "delete": if err := svc.LXC.Delete(id); err != nil { return nil, err } _ = svc.Store.DeleteCT(id) return nil, nil default: return nil, errors.New("unsupported action") } }) writeJSON(w, http.StatusAccepted, map[string]string{"id": id, "action": action, "status": "queued", "task": taskID}) } } // OCI-in-LXC handlers func handleListOCI(cfg config.Config, svc Services) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { id := chi.URLParam(r, "id") items, err := svc.Podman.List(id) if err != nil { writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) return } writeJSON(w, http.StatusOK, items) } } func handleCreateOCI(cfg config.Config, svc Services) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { ctID := chi.URLParam(r, "id") var spec podman.CreateSpec if err := json.NewDecoder(r.Body).Decode(&spec); err != nil { writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()}) return } taskID := enqueueWork(svc.Tasks, "oci.create", func(ctx context.Context) (interface{}, error) { return svc.Podman.Create(ctID, spec) }) writeJSON(w, http.StatusAccepted, map[string]string{"status": "queued", "task": taskID}) } } func handleGetOCI(cfg config.Config, svc Services) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { cid := chi.URLParam(r, "cid") ctID := chi.URLParam(r, "id") items, err := svc.Podman.List(ctID) if err != nil { writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) return } for _, item := range items { if item.ID == cid { writeJSON(w, http.StatusOK, item) return } } writeJSON(w, http.StatusNotFound, map[string]string{"error": "not found"}) } } func handleStartOCI(cfg config.Config, svc Services) http.HandlerFunc { return lifecycleOCI(cfg, svc, "start") } func handleStopOCI(cfg config.Config, svc Services) http.HandlerFunc { return lifecycleOCI(cfg, svc, "stop") } func handleDeleteOCI(cfg config.Config, svc Services) http.HandlerFunc { return lifecycleOCI(cfg, svc, "delete") } func lifecycleOCI(cfg config.Config, svc Services, action string) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { cid := chi.URLParam(r, "cid") ctID := chi.URLParam(r, "id") taskID := enqueueWork(svc.Tasks, "oci."+action, func(ctx context.Context) (interface{}, error) { switch action { case "start": return nil, svc.Podman.Start(ctID, cid) case "stop": return nil, svc.Podman.Stop(ctID, cid) case "delete": return nil, svc.Podman.Delete(ctID, cid) default: return nil, errors.New("unsupported action") } }) writeJSON(w, http.StatusAccepted, map[string]string{"id": cid, "action": action, "status": "queued", "task": taskID}) } } // Task handlers func handleGetTask(cfg config.Config, svc Services) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { id := chi.URLParam(r, "id") t, ok := svc.Tasks.Get(id) if !ok { writeJSON(w, http.StatusNotFound, map[string]string{"error": "not found"}) return } writeJSON(w, http.StatusOK, t) } } // enqueue creates a task entry with queued status. func enqueueWork(reg *tasks.Registry, kind string, fn tasks.WorkFunc) string { id := randomID() reg.EnqueueWork(id, wrapWork(kind, fn)) return id } func randomID() string { b := make([]byte, 8) if _, err := rand.Read(b); err != nil { return "task-unknown" } return hex.EncodeToString(b) } func wrapWork(kind string, fn tasks.WorkFunc) tasks.WorkFunc { return func(ctx context.Context) (interface{}, error) { if fn == nil { return nil, errors.New("no work function") } res, err := fn(ctx) if err != nil { return nil, err } return map[string]interface{}{"kind": kind, "result": res}, nil } } // authMiddleware enforces a static bearer token. func authMiddleware(token string) func(http.Handler) http.Handler { return func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { header := r.Header.Get("Authorization") if !strings.HasPrefix(header, "Bearer ") { writeJSON(w, http.StatusUnauthorized, map[string]string{"error": "missing bearer token"}) return } supplied := strings.TrimPrefix(header, "Bearer ") if supplied != token { writeJSON(w, http.StatusForbidden, map[string]string{"error": "invalid token"}) return } ctx := context.WithValue(r.Context(), "auth", "ok") next.ServeHTTP(w, r.WithContext(ctx)) }) } } func validateVM(spec libvirt.VMSpec, cfg config.Config) error { if spec.CPU <= 0 { return errors.New("cpu must be > 0") } if spec.MemoryMB <= 0 { return errors.New("memory_mb must be > 0") } // storage pools validated elsewhere return nil }