Files
jagacloud/pkg/api/handlers.go
2025-11-24 15:20:05 +07:00

511 lines
15 KiB
Go

package api
import (
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"errors"
"net/http"
"strings"
"github.com/go-chi/chi/v5"
"jagacloud/node-agent/pkg/compute/libvirt"
"jagacloud/node-agent/pkg/config"
"jagacloud/node-agent/pkg/containers/lxc"
"jagacloud/node-agent/pkg/containers/podman"
"jagacloud/node-agent/pkg/storage"
"jagacloud/node-agent/pkg/tasks"
"jagacloud/node-agent/pkg/validators"
)
// Utility to write JSON.
func writeJSON(w http.ResponseWriter, status int, v interface{}) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
_ = json.NewEncoder(w).Encode(v)
}
// Node info
func handleNodeInfo(cfg config.Config, svc Services) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// TODO: collect real stats (CPU, mem, storage, bridges) using cfg.
writeJSON(w, http.StatusOK, map[string]interface{}{
"hostname": "TODO",
"version": "0.1.0",
})
}
}
// VM handlers
func handleListVMs(cfg config.Config, svc Services) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
stored, _ := svc.Store.ListVMs()
runtime, _ := svc.Libvirt.ListVMs()
// Merge status from runtime into stored specs
statusMap := map[string]string{}
for _, vm := range runtime {
statusMap[vm.ID] = vm.Status
}
out := []interface{}{}
for _, spec := range stored {
status := statusMap[spec.ID]
out = append(out, map[string]interface{}{
"id": spec.ID,
"name": spec.Name,
"status": status,
"cpus": spec.CPU,
"memory_mb": spec.MemoryMB,
})
}
writeJSON(w, http.StatusOK, out)
}
}
func handleCreateVM(cfg config.Config, svc Services) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var spec libvirt.VMSpec
if err := json.NewDecoder(r.Body).Decode(&spec); err != nil {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
return
}
if spec.ID == "" && spec.Name != "" {
spec.ID = spec.Name
}
if spec.ID == "" {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "id or name required"})
return
}
if err := validators.CheckBridgeSet(spec.NICs); err != nil {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
return
}
if err := validateVM(spec, cfg); err != nil {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
return
}
if err := validators.CheckStoragePoolsVM(spec.Disks, cfg); err != nil {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
return
}
// Resolve disk paths for dir pools
for i := range spec.Disks {
if spec.Disks[i].Path == "" && spec.Disks[i].Pool != "" {
if path, err := storage.ResolveVolume(toPoolConfigs(cfg.StoragePools), spec.Disks[i].Pool, spec.Disks[i].Name+".qcow2"); err == nil {
spec.Disks[i].Path = path
}
}
}
if err := svc.Store.SaveVM(spec); err != nil {
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
return
}
taskID := enqueueWork(svc.Tasks, "vm.create", func(ctx context.Context) (interface{}, error) {
unlock := svc.StoreLock(spec.ID)
defer unlock()
return svc.Libvirt.CreateVM(spec)
})
writeJSON(w, http.StatusAccepted, map[string]string{"status": "queued", "task": taskID})
}
}
func handleGetVM(cfg config.Config, svc Services) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
spec, err := svc.Store.LoadVM(id)
if err == nil {
writeJSON(w, http.StatusOK, spec)
return
}
vms, err := svc.Libvirt.ListVMs()
if err != nil {
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
return
}
for _, vm := range vms {
if vm.ID == id || vm.Name == id {
writeJSON(w, http.StatusOK, vm)
return
}
}
writeJSON(w, http.StatusNotFound, map[string]string{"error": "not found"})
}
}
func handleStartVM(cfg config.Config, svc Services) http.HandlerFunc {
return lifecycleVM(cfg, svc, "start")
}
func handleStopVM(cfg config.Config, svc Services) http.HandlerFunc {
return lifecycleVM(cfg, svc, "stop")
}
func handleRebootVM(cfg config.Config, svc Services) http.HandlerFunc {
return lifecycleVM(cfg, svc, "reboot")
}
func handleDeleteVM(cfg config.Config, svc Services) http.HandlerFunc {
return lifecycleVM(cfg, svc, "delete")
}
func lifecycleVM(cfg config.Config, svc Services, action string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
// Ensure VM exists either in store or runtime
spec, specErr := svc.Store.LoadVM(id)
runtimeExists := false
if specErr != nil {
if vms, err := svc.Libvirt.ListVMs(); err == nil {
for _, vm := range vms {
if vm.ID == id || vm.Name == id {
runtimeExists = true
break
}
}
}
}
if specErr != nil && !runtimeExists {
writeJSON(w, http.StatusNotFound, map[string]string{"error": "vm not found"})
return
}
if specErr == nil {
if err := validators.CheckStoragePoolsVM(spec.Disks, cfg); err != nil {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
return
}
} else {
// runtime-only VM: attempt pool validation via virsh and config
if err := validators.CheckStoragePoolsRuntime([]string{}, cfg); err != nil {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
return
}
}
taskID := enqueueWork(svc.Tasks, "vm."+action, func(ctx context.Context) (interface{}, error) {
unlock := svc.StoreLock(id)
defer unlock()
switch action {
case "start":
return nil, svc.Libvirt.StartVM(id)
case "stop":
return nil, svc.Libvirt.StopVM(id)
case "reboot":
return nil, svc.Libvirt.RebootVM(id)
case "delete":
if err := svc.Libvirt.DeleteVM(id); err != nil {
return nil, err
}
_ = svc.Store.DeleteVM(id)
return nil, nil
default:
return nil, errors.New("unsupported action")
}
})
writeJSON(w, http.StatusAccepted, map[string]string{"id": id, "action": action, "status": "queued", "task": taskID})
}
}
// Container (LXC) handlers
func handleListCT(cfg config.Config, svc Services) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
stored, _ := svc.Store.ListCTs()
runtime, _ := svc.LXC.List()
statusMap := map[string]string{}
for _, ct := range runtime {
statusMap[ct.ID] = ct.Status
}
out := []interface{}{}
for _, spec := range stored {
status := statusMap[spec.ID]
out = append(out, map[string]interface{}{
"id": spec.ID,
"name": spec.Name,
"status": status,
"limits": spec.Limits,
"nics": spec.NICs,
"template": spec.Template,
})
}
writeJSON(w, http.StatusOK, out)
}
}
func handleCreateCT(cfg config.Config, svc Services) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var spec lxc.Spec
if err := json.NewDecoder(r.Body).Decode(&spec); err != nil {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
return
}
if spec.ID == "" && spec.Name != "" {
spec.ID = spec.Name
}
if spec.ID == "" {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "id or name required"})
return
}
if err := validators.CheckBridgeSetCT(spec.NICs); err != nil {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
return
}
if err := validators.CheckStoragePoolsCT(spec, cfg); err != nil {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
return
}
if spec.RootfsPool != "" && spec.RootfsSizeG > 0 && spec.RootfsPath == "" {
if path, err := storage.ResolveVolume(toPoolConfigs(cfg.StoragePools), spec.RootfsPool, spec.ID+"-rootfs"); err == nil {
spec.RootfsPath = path
}
}
if err := svc.Store.SaveCT(spec); err != nil {
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
return
}
taskID := enqueueWork(svc.Tasks, "ct.create", func(ctx context.Context) (interface{}, error) {
unlock := svc.StoreLock(spec.ID)
defer unlock()
return svc.LXC.Create(spec)
})
writeJSON(w, http.StatusAccepted, map[string]string{"status": "queued", "task": taskID})
}
}
func handleGetCT(cfg config.Config, svc Services) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
if spec, err := svc.Store.LoadCT(id); err == nil {
writeJSON(w, http.StatusOK, spec)
return
}
cts, err := svc.LXC.List()
if err != nil {
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
return
}
for _, ct := range cts {
if ct.ID == id || ct.Name == id {
writeJSON(w, http.StatusOK, ct)
return
}
}
writeJSON(w, http.StatusNotFound, map[string]string{"error": "not found"})
}
}
func handleStartCT(cfg config.Config, svc Services) http.HandlerFunc {
return lifecycleCT(cfg, svc, "start")
}
func handleStopCT(cfg config.Config, svc Services) http.HandlerFunc {
return lifecycleCT(cfg, svc, "stop")
}
func handleDeleteCT(cfg config.Config, svc Services) http.HandlerFunc {
return lifecycleCT(cfg, svc, "delete")
}
func lifecycleCT(cfg config.Config, svc Services, action string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
spec, specErr := svc.Store.LoadCT(id)
runtimeExists := false
if specErr != nil {
if cts, err := svc.LXC.List(); err == nil {
for _, ct := range cts {
if ct.ID == id || ct.Name == id {
runtimeExists = true
break
}
}
}
}
if specErr != nil && !runtimeExists {
writeJSON(w, http.StatusNotFound, map[string]string{"error": "ct not found"})
return
}
if specErr == nil {
if err := validators.CheckStoragePoolsCT(spec, cfg); err != nil {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
return
}
}
taskID := enqueueWork(svc.Tasks, "ct."+action, func(ctx context.Context) (interface{}, error) {
unlock := svc.StoreLock(id)
defer unlock()
switch action {
case "start":
return nil, svc.LXC.Start(id)
case "stop":
return nil, svc.LXC.Stop(id)
case "delete":
if err := svc.LXC.Delete(id); err != nil {
return nil, err
}
_ = svc.Store.DeleteCT(id)
return nil, nil
default:
return nil, errors.New("unsupported action")
}
})
writeJSON(w, http.StatusAccepted, map[string]string{"id": id, "action": action, "status": "queued", "task": taskID})
}
}
// OCI-in-LXC handlers
func handleListOCI(cfg config.Config, svc Services) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
items, err := svc.Podman.List(id)
if err != nil {
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
return
}
writeJSON(w, http.StatusOK, items)
}
}
func handleCreateOCI(cfg config.Config, svc Services) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctID := chi.URLParam(r, "id")
var spec podman.CreateSpec
if err := json.NewDecoder(r.Body).Decode(&spec); err != nil {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
return
}
taskID := enqueueWork(svc.Tasks, "oci.create", func(ctx context.Context) (interface{}, error) {
return svc.Podman.Create(ctID, spec)
})
writeJSON(w, http.StatusAccepted, map[string]string{"status": "queued", "task": taskID})
}
}
func handleGetOCI(cfg config.Config, svc Services) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
cid := chi.URLParam(r, "cid")
ctID := chi.URLParam(r, "id")
items, err := svc.Podman.List(ctID)
if err != nil {
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
return
}
for _, item := range items {
if item.ID == cid {
writeJSON(w, http.StatusOK, item)
return
}
}
writeJSON(w, http.StatusNotFound, map[string]string{"error": "not found"})
}
}
func handleStartOCI(cfg config.Config, svc Services) http.HandlerFunc {
return lifecycleOCI(cfg, svc, "start")
}
func handleStopOCI(cfg config.Config, svc Services) http.HandlerFunc {
return lifecycleOCI(cfg, svc, "stop")
}
func handleDeleteOCI(cfg config.Config, svc Services) http.HandlerFunc {
return lifecycleOCI(cfg, svc, "delete")
}
func lifecycleOCI(cfg config.Config, svc Services, action string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
cid := chi.URLParam(r, "cid")
ctID := chi.URLParam(r, "id")
taskID := enqueueWork(svc.Tasks, "oci."+action, func(ctx context.Context) (interface{}, error) {
switch action {
case "start":
return nil, svc.Podman.Start(ctID, cid)
case "stop":
return nil, svc.Podman.Stop(ctID, cid)
case "delete":
return nil, svc.Podman.Delete(ctID, cid)
default:
return nil, errors.New("unsupported action")
}
})
writeJSON(w, http.StatusAccepted, map[string]string{"id": cid, "action": action, "status": "queued", "task": taskID})
}
}
// Task handlers
func handleGetTask(cfg config.Config, svc Services) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
t, ok := svc.Tasks.Get(id)
if !ok {
writeJSON(w, http.StatusNotFound, map[string]string{"error": "not found"})
return
}
writeJSON(w, http.StatusOK, t)
}
}
// enqueue creates a task entry with queued status.
func enqueueWork(reg *tasks.Registry, kind string, fn tasks.WorkFunc) string {
id := randomID()
reg.EnqueueWork(id, wrapWork(kind, fn))
return id
}
func randomID() string {
b := make([]byte, 8)
if _, err := rand.Read(b); err != nil {
return "task-unknown"
}
return hex.EncodeToString(b)
}
func wrapWork(kind string, fn tasks.WorkFunc) tasks.WorkFunc {
return func(ctx context.Context) (interface{}, error) {
if fn == nil {
return nil, errors.New("no work function")
}
res, err := fn(ctx)
if err != nil {
return nil, err
}
return map[string]interface{}{"kind": kind, "result": res}, nil
}
}
// authMiddleware enforces a static bearer token.
func authMiddleware(token string) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
header := r.Header.Get("Authorization")
if !strings.HasPrefix(header, "Bearer ") {
writeJSON(w, http.StatusUnauthorized, map[string]string{"error": "missing bearer token"})
return
}
supplied := strings.TrimPrefix(header, "Bearer ")
if supplied != token {
writeJSON(w, http.StatusForbidden, map[string]string{"error": "invalid token"})
return
}
ctx := context.WithValue(r.Context(), "auth", "ok")
next.ServeHTTP(w, r.WithContext(ctx))
})
}
}
func validateVM(spec libvirt.VMSpec, cfg config.Config) error {
if spec.CPU <= 0 {
return errors.New("cpu must be > 0")
}
if spec.MemoryMB <= 0 {
return errors.New("memory_mb must be > 0")
}
// storage pools validated elsewhere
return nil
}
// toPoolConfigs converts config pools to storage pool configs.
func toPoolConfigs(p []config.StoragePool) []storage.PoolConfig {
out := make([]storage.PoolConfig, 0, len(p))
for _, sp := range p {
out = append(out, storage.PoolConfig{
Name: sp.Name,
Type: sp.Type,
Path: sp.Path,
VG: sp.Path, // reuse Path for now; real config should split
})
}
return out
}