477 lines
14 KiB
Go
477 lines
14 KiB
Go
package api
|
|
|
|
import (
|
|
"context"
|
|
"crypto/rand"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"errors"
|
|
"net/http"
|
|
"strings"
|
|
|
|
"github.com/go-chi/chi/v5"
|
|
|
|
"jagacloud/node-agent/pkg/compute/libvirt"
|
|
"jagacloud/node-agent/pkg/config"
|
|
"jagacloud/node-agent/pkg/containers/lxc"
|
|
"jagacloud/node-agent/pkg/containers/podman"
|
|
"jagacloud/node-agent/pkg/tasks"
|
|
"jagacloud/node-agent/pkg/validators"
|
|
)
|
|
|
|
// Utility to write JSON.
|
|
func writeJSON(w http.ResponseWriter, status int, v interface{}) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(status)
|
|
_ = json.NewEncoder(w).Encode(v)
|
|
}
|
|
|
|
// Node info
|
|
func handleNodeInfo(cfg config.Config, svc Services) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
// TODO: collect real stats (CPU, mem, storage, bridges) using cfg.
|
|
writeJSON(w, http.StatusOK, map[string]interface{}{
|
|
"hostname": "TODO",
|
|
"version": "0.1.0",
|
|
})
|
|
}
|
|
}
|
|
|
|
// VM handlers
|
|
func handleListVMs(cfg config.Config, svc Services) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
stored, _ := svc.Store.ListVMs()
|
|
runtime, _ := svc.Libvirt.ListVMs()
|
|
// Merge status from runtime into stored specs
|
|
statusMap := map[string]string{}
|
|
for _, vm := range runtime {
|
|
statusMap[vm.ID] = vm.Status
|
|
}
|
|
out := []interface{}{}
|
|
for _, spec := range stored {
|
|
status := statusMap[spec.ID]
|
|
out = append(out, map[string]interface{}{
|
|
"id": spec.ID,
|
|
"name": spec.Name,
|
|
"status": status,
|
|
"cpus": spec.CPU,
|
|
"memory_mb": spec.MemoryMB,
|
|
})
|
|
}
|
|
writeJSON(w, http.StatusOK, out)
|
|
}
|
|
}
|
|
|
|
func handleCreateVM(cfg config.Config, svc Services) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
var spec libvirt.VMSpec
|
|
if err := json.NewDecoder(r.Body).Decode(&spec); err != nil {
|
|
writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
|
|
return
|
|
}
|
|
if spec.ID == "" && spec.Name != "" {
|
|
spec.ID = spec.Name
|
|
}
|
|
if spec.ID == "" {
|
|
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "id or name required"})
|
|
return
|
|
}
|
|
if err := validators.CheckBridgeSet(spec.NICs); err != nil {
|
|
writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
|
|
return
|
|
}
|
|
if err := validateVM(spec, cfg); err != nil {
|
|
writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
|
|
return
|
|
}
|
|
if err := validators.CheckStoragePoolsVM(spec.Disks, cfg); err != nil {
|
|
writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
|
|
return
|
|
}
|
|
if err := svc.Store.SaveVM(spec); err != nil {
|
|
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
|
return
|
|
}
|
|
taskID := enqueueWork(svc.Tasks, "vm.create", func(ctx context.Context) (interface{}, error) {
|
|
unlock := svc.StoreLock(spec.ID)
|
|
defer unlock()
|
|
return svc.Libvirt.CreateVM(spec)
|
|
})
|
|
writeJSON(w, http.StatusAccepted, map[string]string{"status": "queued", "task": taskID})
|
|
}
|
|
}
|
|
|
|
func handleGetVM(cfg config.Config, svc Services) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
id := chi.URLParam(r, "id")
|
|
spec, err := svc.Store.LoadVM(id)
|
|
if err == nil {
|
|
writeJSON(w, http.StatusOK, spec)
|
|
return
|
|
}
|
|
vms, err := svc.Libvirt.ListVMs()
|
|
if err != nil {
|
|
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
|
return
|
|
}
|
|
for _, vm := range vms {
|
|
if vm.ID == id || vm.Name == id {
|
|
writeJSON(w, http.StatusOK, vm)
|
|
return
|
|
}
|
|
}
|
|
writeJSON(w, http.StatusNotFound, map[string]string{"error": "not found"})
|
|
}
|
|
}
|
|
|
|
func handleStartVM(cfg config.Config, svc Services) http.HandlerFunc {
|
|
return lifecycleVM(cfg, svc, "start")
|
|
}
|
|
func handleStopVM(cfg config.Config, svc Services) http.HandlerFunc {
|
|
return lifecycleVM(cfg, svc, "stop")
|
|
}
|
|
func handleRebootVM(cfg config.Config, svc Services) http.HandlerFunc {
|
|
return lifecycleVM(cfg, svc, "reboot")
|
|
}
|
|
func handleDeleteVM(cfg config.Config, svc Services) http.HandlerFunc {
|
|
return lifecycleVM(cfg, svc, "delete")
|
|
}
|
|
|
|
func lifecycleVM(cfg config.Config, svc Services, action string) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
id := chi.URLParam(r, "id")
|
|
// Ensure VM exists either in store or runtime
|
|
spec, specErr := svc.Store.LoadVM(id)
|
|
runtimeExists := false
|
|
if specErr != nil {
|
|
if vms, err := svc.Libvirt.ListVMs(); err == nil {
|
|
for _, vm := range vms {
|
|
if vm.ID == id || vm.Name == id {
|
|
runtimeExists = true
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if specErr != nil && !runtimeExists {
|
|
writeJSON(w, http.StatusNotFound, map[string]string{"error": "vm not found"})
|
|
return
|
|
}
|
|
if specErr == nil {
|
|
if err := validators.CheckStoragePoolsVM(spec.Disks, cfg); err != nil {
|
|
writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
|
|
return
|
|
}
|
|
}
|
|
taskID := enqueueWork(svc.Tasks, "vm."+action, func(ctx context.Context) (interface{}, error) {
|
|
unlock := svc.StoreLock(id)
|
|
defer unlock()
|
|
switch action {
|
|
case "start":
|
|
return nil, svc.Libvirt.StartVM(id)
|
|
case "stop":
|
|
return nil, svc.Libvirt.StopVM(id)
|
|
case "reboot":
|
|
return nil, svc.Libvirt.RebootVM(id)
|
|
case "delete":
|
|
if err := svc.Libvirt.DeleteVM(id); err != nil {
|
|
return nil, err
|
|
}
|
|
_ = svc.Store.DeleteVM(id)
|
|
return nil, nil
|
|
default:
|
|
return nil, errors.New("unsupported action")
|
|
}
|
|
})
|
|
writeJSON(w, http.StatusAccepted, map[string]string{"id": id, "action": action, "status": "queued", "task": taskID})
|
|
}
|
|
}
|
|
|
|
// Container (LXC) handlers
|
|
func handleListCT(cfg config.Config, svc Services) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
stored, _ := svc.Store.ListCTs()
|
|
runtime, _ := svc.LXC.List()
|
|
statusMap := map[string]string{}
|
|
for _, ct := range runtime {
|
|
statusMap[ct.ID] = ct.Status
|
|
}
|
|
out := []interface{}{}
|
|
for _, spec := range stored {
|
|
status := statusMap[spec.ID]
|
|
out = append(out, map[string]interface{}{
|
|
"id": spec.ID,
|
|
"name": spec.Name,
|
|
"status": status,
|
|
"limits": spec.Limits,
|
|
"nics": spec.NICs,
|
|
"template": spec.Template,
|
|
})
|
|
}
|
|
writeJSON(w, http.StatusOK, out)
|
|
}
|
|
}
|
|
|
|
func handleCreateCT(cfg config.Config, svc Services) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
var spec lxc.Spec
|
|
if err := json.NewDecoder(r.Body).Decode(&spec); err != nil {
|
|
writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
|
|
return
|
|
}
|
|
if spec.ID == "" && spec.Name != "" {
|
|
spec.ID = spec.Name
|
|
}
|
|
if spec.ID == "" {
|
|
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "id or name required"})
|
|
return
|
|
}
|
|
if err := validators.CheckBridgeSetCT(spec.NICs); err != nil {
|
|
writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
|
|
return
|
|
}
|
|
if err := validators.CheckStoragePoolsCT(spec, cfg); err != nil {
|
|
writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
|
|
return
|
|
}
|
|
if err := svc.Store.SaveCT(spec); err != nil {
|
|
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
|
return
|
|
}
|
|
taskID := enqueueWork(svc.Tasks, "ct.create", func(ctx context.Context) (interface{}, error) {
|
|
unlock := svc.StoreLock(spec.ID)
|
|
defer unlock()
|
|
return svc.LXC.Create(spec)
|
|
})
|
|
writeJSON(w, http.StatusAccepted, map[string]string{"status": "queued", "task": taskID})
|
|
}
|
|
}
|
|
|
|
func handleGetCT(cfg config.Config, svc Services) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
id := chi.URLParam(r, "id")
|
|
if spec, err := svc.Store.LoadCT(id); err == nil {
|
|
writeJSON(w, http.StatusOK, spec)
|
|
return
|
|
}
|
|
cts, err := svc.LXC.List()
|
|
if err != nil {
|
|
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
|
return
|
|
}
|
|
for _, ct := range cts {
|
|
if ct.ID == id || ct.Name == id {
|
|
writeJSON(w, http.StatusOK, ct)
|
|
return
|
|
}
|
|
}
|
|
writeJSON(w, http.StatusNotFound, map[string]string{"error": "not found"})
|
|
}
|
|
}
|
|
|
|
func handleStartCT(cfg config.Config, svc Services) http.HandlerFunc {
|
|
return lifecycleCT(cfg, svc, "start")
|
|
}
|
|
func handleStopCT(cfg config.Config, svc Services) http.HandlerFunc {
|
|
return lifecycleCT(cfg, svc, "stop")
|
|
}
|
|
func handleDeleteCT(cfg config.Config, svc Services) http.HandlerFunc {
|
|
return lifecycleCT(cfg, svc, "delete")
|
|
}
|
|
|
|
func lifecycleCT(cfg config.Config, svc Services, action string) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
id := chi.URLParam(r, "id")
|
|
spec, specErr := svc.Store.LoadCT(id)
|
|
runtimeExists := false
|
|
if specErr != nil {
|
|
if cts, err := svc.LXC.List(); err == nil {
|
|
for _, ct := range cts {
|
|
if ct.ID == id || ct.Name == id {
|
|
runtimeExists = true
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if specErr != nil && !runtimeExists {
|
|
writeJSON(w, http.StatusNotFound, map[string]string{"error": "ct not found"})
|
|
return
|
|
}
|
|
if specErr == nil {
|
|
if err := validators.CheckStoragePoolsCT(spec, cfg); err != nil {
|
|
writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
|
|
return
|
|
}
|
|
}
|
|
taskID := enqueueWork(svc.Tasks, "ct."+action, func(ctx context.Context) (interface{}, error) {
|
|
unlock := svc.StoreLock(id)
|
|
defer unlock()
|
|
switch action {
|
|
case "start":
|
|
return nil, svc.LXC.Start(id)
|
|
case "stop":
|
|
return nil, svc.LXC.Stop(id)
|
|
case "delete":
|
|
if err := svc.LXC.Delete(id); err != nil {
|
|
return nil, err
|
|
}
|
|
_ = svc.Store.DeleteCT(id)
|
|
return nil, nil
|
|
default:
|
|
return nil, errors.New("unsupported action")
|
|
}
|
|
})
|
|
writeJSON(w, http.StatusAccepted, map[string]string{"id": id, "action": action, "status": "queued", "task": taskID})
|
|
}
|
|
}
|
|
|
|
// OCI-in-LXC handlers
|
|
func handleListOCI(cfg config.Config, svc Services) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
id := chi.URLParam(r, "id")
|
|
items, err := svc.Podman.List(id)
|
|
if err != nil {
|
|
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, items)
|
|
}
|
|
}
|
|
|
|
func handleCreateOCI(cfg config.Config, svc Services) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
ctID := chi.URLParam(r, "id")
|
|
var spec podman.CreateSpec
|
|
if err := json.NewDecoder(r.Body).Decode(&spec); err != nil {
|
|
writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
|
|
return
|
|
}
|
|
taskID := enqueueWork(svc.Tasks, "oci.create", func(ctx context.Context) (interface{}, error) {
|
|
return svc.Podman.Create(ctID, spec)
|
|
})
|
|
writeJSON(w, http.StatusAccepted, map[string]string{"status": "queued", "task": taskID})
|
|
}
|
|
}
|
|
|
|
func handleGetOCI(cfg config.Config, svc Services) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
cid := chi.URLParam(r, "cid")
|
|
ctID := chi.URLParam(r, "id")
|
|
items, err := svc.Podman.List(ctID)
|
|
if err != nil {
|
|
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
|
return
|
|
}
|
|
for _, item := range items {
|
|
if item.ID == cid {
|
|
writeJSON(w, http.StatusOK, item)
|
|
return
|
|
}
|
|
}
|
|
writeJSON(w, http.StatusNotFound, map[string]string{"error": "not found"})
|
|
}
|
|
}
|
|
|
|
func handleStartOCI(cfg config.Config, svc Services) http.HandlerFunc {
|
|
return lifecycleOCI(cfg, svc, "start")
|
|
}
|
|
func handleStopOCI(cfg config.Config, svc Services) http.HandlerFunc {
|
|
return lifecycleOCI(cfg, svc, "stop")
|
|
}
|
|
func handleDeleteOCI(cfg config.Config, svc Services) http.HandlerFunc {
|
|
return lifecycleOCI(cfg, svc, "delete")
|
|
}
|
|
|
|
func lifecycleOCI(cfg config.Config, svc Services, action string) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
cid := chi.URLParam(r, "cid")
|
|
ctID := chi.URLParam(r, "id")
|
|
taskID := enqueueWork(svc.Tasks, "oci."+action, func(ctx context.Context) (interface{}, error) {
|
|
switch action {
|
|
case "start":
|
|
return nil, svc.Podman.Start(ctID, cid)
|
|
case "stop":
|
|
return nil, svc.Podman.Stop(ctID, cid)
|
|
case "delete":
|
|
return nil, svc.Podman.Delete(ctID, cid)
|
|
default:
|
|
return nil, errors.New("unsupported action")
|
|
}
|
|
})
|
|
writeJSON(w, http.StatusAccepted, map[string]string{"id": cid, "action": action, "status": "queued", "task": taskID})
|
|
}
|
|
}
|
|
|
|
// Task handlers
|
|
func handleGetTask(cfg config.Config, svc Services) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
id := chi.URLParam(r, "id")
|
|
t, ok := svc.Tasks.Get(id)
|
|
if !ok {
|
|
writeJSON(w, http.StatusNotFound, map[string]string{"error": "not found"})
|
|
return
|
|
}
|
|
writeJSON(w, http.StatusOK, t)
|
|
}
|
|
}
|
|
|
|
// enqueue creates a task entry with queued status.
|
|
func enqueueWork(reg *tasks.Registry, kind string, fn tasks.WorkFunc) string {
|
|
id := randomID()
|
|
reg.EnqueueWork(id, wrapWork(kind, fn))
|
|
return id
|
|
}
|
|
|
|
func randomID() string {
|
|
b := make([]byte, 8)
|
|
if _, err := rand.Read(b); err != nil {
|
|
return "task-unknown"
|
|
}
|
|
return hex.EncodeToString(b)
|
|
}
|
|
|
|
func wrapWork(kind string, fn tasks.WorkFunc) tasks.WorkFunc {
|
|
return func(ctx context.Context) (interface{}, error) {
|
|
if fn == nil {
|
|
return nil, errors.New("no work function")
|
|
}
|
|
res, err := fn(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return map[string]interface{}{"kind": kind, "result": res}, nil
|
|
}
|
|
}
|
|
|
|
// authMiddleware enforces a static bearer token.
|
|
func authMiddleware(token string) func(http.Handler) http.Handler {
|
|
return func(next http.Handler) http.Handler {
|
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
header := r.Header.Get("Authorization")
|
|
if !strings.HasPrefix(header, "Bearer ") {
|
|
writeJSON(w, http.StatusUnauthorized, map[string]string{"error": "missing bearer token"})
|
|
return
|
|
}
|
|
supplied := strings.TrimPrefix(header, "Bearer ")
|
|
if supplied != token {
|
|
writeJSON(w, http.StatusForbidden, map[string]string{"error": "invalid token"})
|
|
return
|
|
}
|
|
ctx := context.WithValue(r.Context(), "auth", "ok")
|
|
next.ServeHTTP(w, r.WithContext(ctx))
|
|
})
|
|
}
|
|
}
|
|
|
|
func validateVM(spec libvirt.VMSpec, cfg config.Config) error {
|
|
if spec.CPU <= 0 {
|
|
return errors.New("cpu must be > 0")
|
|
}
|
|
if spec.MemoryMB <= 0 {
|
|
return errors.New("memory_mb must be > 0")
|
|
}
|
|
// storage pools validated elsewhere
|
|
return nil
|
|
}
|