Add filesystem access via cluster node

This commit is contained in:
Ingo Oppermann 2024-03-13 15:11:20 +01:00
parent 5ba18569e9
commit 727c358921
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
5 changed files with 203 additions and 12 deletions

View File

@ -43,8 +43,10 @@ type NodeReader interface {
Version() NodeVersion
Resources() NodeResources
FileList(storage, pattern string) ([]clientapi.FileInfo, error)
ProxyFileList() NodeFiles
ListFiles(storage, pattern string) ([]clientapi.FileInfo, error)
DeleteFile(storage, path string) error
PutFile(storage, path string, data io.Reader) error
ListResources() NodeFiles
GetURL(prefix, path string) (*url.URL, error)
GetFile(prefix, path string, offset int64) (io.ReadCloser, error)
@ -583,7 +585,7 @@ func (n *node) Version() NodeVersion {
return version
}
func (n *node) ProxyFileList() NodeFiles {
func (n *node) ListResources() NodeFiles {
id := n.About().ID
files := NodeFiles{
@ -730,7 +732,7 @@ func (n *node) files() []string {
return filesList
}
func (n *node) FileList(storage, pattern string) ([]clientapi.FileInfo, error) {
func (n *node) ListFiles(storage, pattern string) ([]clientapi.FileInfo, error) {
n.peerLock.RLock()
defer n.peerLock.RUnlock()
@ -750,6 +752,28 @@ func (n *node) FileList(storage, pattern string) ([]clientapi.FileInfo, error) {
return files, nil
}
func (n *node) PutFile(storage, path string, data io.Reader) error {
n.peerLock.RLock()
defer n.peerLock.RUnlock()
if n.peer == nil {
return ErrNoPeer
}
return n.peer.FilesystemAddFile(storage, path, data)
}
func (n *node) DeleteFile(storage, path string) error {
n.peerLock.RLock()
defer n.peerLock.RUnlock()
if n.peer == nil {
return ErrNoPeer
}
return n.peer.FilesystemDeleteFile(storage, path)
}
func cloneURL(src *url.URL) *url.URL {
dst := &url.URL{
Scheme: src.Scheme,

View File

@ -372,7 +372,7 @@ func (p *proxy) ListFiles(storage, pattern string) []clientapi.FileInfo {
go func(node Node, p chan<- []clientapi.FileInfo) {
defer wg.Done()
files, err := node.FileList(storage, pattern)
files, err := node.ListFiles(storage, pattern)
if err != nil {
return
}

View File

@ -95,9 +95,9 @@ func (h *ClusterHandler) GetNodeVersion(c echo.Context) error {
return c.JSON(http.StatusOK, version)
}
// GetNodeFiles returns the files from the proxy node with the given ID
// @Summary List the files of a proxy node by its ID
// @Description List the files of a proxy node by its ID
// GetNodeResources returns the resources from the proxy node with the given ID
// @Summary List the resources of a proxy node by its ID
// @Description List the resources of a proxy node by its ID
// @Tags v16.?.?
// @ID cluster-3-get-node-files
// @Produce json
@ -106,7 +106,7 @@ func (h *ClusterHandler) GetNodeVersion(c echo.Context) error {
// @Failure 404 {object} api.Error
// @Security ApiKeyAuth
// @Router /api/v3/cluster/node/{id}/files [get]
func (h *ClusterHandler) GetNodeFiles(c echo.Context) error {
func (h *ClusterHandler) GetNodeResources(c echo.Context) error {
id := util.PathParam(c, "id")
peer, err := h.proxy.GetNodeReader(id)
@ -118,7 +118,7 @@ func (h *ClusterHandler) GetNodeFiles(c echo.Context) error {
Files: make(map[string][]string),
}
peerFiles := peer.ProxyFileList()
peerFiles := peer.ListResources()
files.LastUpdate = peerFiles.LastUpdate.Unix()
@ -136,6 +136,168 @@ func (h *ClusterHandler) GetNodeFiles(c echo.Context) error {
return c.JSON(http.StatusOK, files)
}
// NodeFSListFiles lists all files on a filesystem on a node
// @Summary List all files on a filesystem on a node
// @Description List all files on a filesystem on a node. The listing can be ordered by name, size, or date of last modification in ascending or descending order.
// @Tags v16.?.?
// @ID cluster-3-node-fs-list-files
// @Produce json
// @Param id path string true "Node ID"
// @Param storage path string true "Name of the filesystem"
// @Param glob query string false "glob pattern for file names"
// @Param sort query string false "none, name, size, lastmod"
// @Param order query string false "asc, desc"
// @Success 200 {array} api.FileInfo
// @Success 500 {object} api.Error
// @Security ApiKeyAuth
// @Router /api/v3/cluster/node/{id}/fs/{storage} [get]
func (h *ClusterHandler) NodeFSListFiles(c echo.Context) error {
id := util.PathParam(c, "id")
name := util.PathParam(c, "storage")
pattern := util.DefaultQuery(c, "glob", "")
sortby := util.DefaultQuery(c, "sort", "none")
order := util.DefaultQuery(c, "order", "asc")
peer, err := h.proxy.GetNodeReader(id)
if err != nil {
return api.Err(http.StatusNotFound, "", "node not found: %s", err.Error())
}
files, err := peer.ListFiles(name, pattern)
if err != nil {
return api.Err(http.StatusInternalServerError, "", "retrieving file list: %s", err.Error())
}
var sortFunc func(i, j int) bool
switch sortby {
case "name":
if order == "desc" {
sortFunc = func(i, j int) bool { return files[i].Name > files[j].Name }
} else {
sortFunc = func(i, j int) bool { return files[i].Name < files[j].Name }
}
case "size":
if order == "desc" {
sortFunc = func(i, j int) bool { return files[i].Size > files[j].Size }
} else {
sortFunc = func(i, j int) bool { return files[i].Size < files[j].Size }
}
default:
if order == "asc" {
sortFunc = func(i, j int) bool { return files[i].LastMod < files[j].LastMod }
} else {
sortFunc = func(i, j int) bool { return files[i].LastMod > files[j].LastMod }
}
}
sort.Slice(files, sortFunc)
return c.JSON(http.StatusOK, files)
}
// NodeFSGetFile returns the file at the given path on a node
// @Summary Fetch a file from a filesystem on a node
// @Description Fetch a file from a filesystem on a node
// @Tags v16.?.?
// @ID cluster-3-node-fs-get-file
// @Produce application/data
// @Produce json
// @Param id path string true "Node ID"
// @Param storage path string true "Name of the filesystem"
// @Param filepath path string true "Path to file"
// @Success 200 {file} byte
// @Success 301 {string} string
// @Failure 404 {object} api.Error
// @Security ApiKeyAuth
// @Router /api/v3/cluster/node/{id}/fs/{storage}/{filepath} [get]
func (h *ClusterHandler) NodeFSGetFile(c echo.Context) error {
id := util.PathParam(c, "id")
storage := util.PathParam(c, "storage")
path := util.PathWildcardParam(c)
peer, err := h.proxy.GetNodeReader(id)
if err != nil {
return api.Err(http.StatusNotFound, "", "node not found: %s", err.Error())
}
file, err := peer.GetFile(storage, path, 0)
if err != nil {
return api.Err(http.StatusNotFound, "", "%s", err.Error())
}
defer file.Close()
return c.Stream(http.StatusOK, "application/data", file)
}
// NodeFSPutFile adds or overwrites a file at the given path on a node
// @Summary Add a file to a filesystem on a node
// @Description Writes or overwrites a file on a filesystem on a node
// @Tags v16.?.?
// @ID cluster-3-node-fs-put-file
// @Accept application/data
// @Produce text/plain
// @Produce json
// @Param id path string true "Node ID"
// @Param storage path string true "Name of the filesystem"
// @Param filepath path string true "Path to file"
// @Param data body []byte true "File data"
// @Success 201 {string} string
// @Failure 400 {object} api.Error
// @Security ApiKeyAuth
// @Router /api/v3/cluster/node/{id}/fs/{storage}/{filepath} [put]
func (h *ClusterHandler) NodeFSPutFile(c echo.Context) error {
id := util.PathParam(c, "id")
storage := util.PathParam(c, "storage")
path := util.PathWildcardParam(c)
peer, err := h.proxy.GetNodeReader(id)
if err != nil {
return api.Err(http.StatusNotFound, "", "node not found: %s", err.Error())
}
req := c.Request()
err = peer.PutFile(storage, path, req.Body)
if err != nil {
return api.Err(http.StatusBadRequest, "", "%s", err.Error())
}
return c.JSON(http.StatusCreated, nil)
}
// NodeFSDeleteFile removes a file from a filesystem on a node
// @Summary Remove a file from a filesystem on a node
// @Description Remove a file from a filesystem on a node
// @Tags v16.?.?
// @ID cluster-3-node-fs-delete-file
// @Produce text/plain
// @Param id path string true "Node ID"
// @Param storage path string true "Name of the filesystem"
// @Param filepath path string true "Path to file"
// @Success 200 {string} string
// @Failure 404 {object} api.Error
// @Security ApiKeyAuth
// @Router /api/v3/cluster/node/{id}/fs/{storage}/{filepath} [delete]
func (h *ClusterHandler) NodeFSDeleteFile(c echo.Context) error {
id := util.PathParam(c, "id")
storage := util.PathParam(c, "storage")
path := util.PathWildcardParam(c)
peer, err := h.proxy.GetNodeReader(id)
if err != nil {
return api.Err(http.StatusNotFound, "", "node not found: %s", err.Error())
}
err = peer.DeleteFile(storage, path)
if err != nil {
return api.Err(http.StatusNotFound, "", "%s", err.Error())
}
return c.JSON(http.StatusOK, nil)
}
// ListNodeProcesses returns the list of processes running on a node of the cluster
// @Summary List of processes in the cluster on a node
// @Description List of processes in the cluster on a node

View File

@ -70,7 +70,7 @@ func (h *FSHandler) GetFile(c echo.Context) error {
// @Param data body []byte true "File data"
// @Success 201 {string} string
// @Success 204 {string} string
// @Failure 507 {object} api.Error
// @Failure 400 {object} api.Error
// @Security ApiKeyAuth
// @Router /api/v3/fs/{storage}/{filepath} [put]
func (h *FSHandler) PutFile(c echo.Context) error {

View File

@ -747,7 +747,9 @@ func (s *server) setRoutesV3(v3 *echo.Group) {
v3.GET("/cluster/node", s.v3handler.cluster.GetNodes)
v3.GET("/cluster/node/:id", s.v3handler.cluster.GetNode)
v3.GET("/cluster/node/:id/files", s.v3handler.cluster.GetNodeFiles)
v3.GET("/cluster/node/:id/files", s.v3handler.cluster.GetNodeResources)
v3.GET("/cluster/node/:id/fs/:storage", s.v3handler.cluster.NodeFSListFiles)
v3.GET("/cluster/node/:id/fs/:storage/*", s.v3handler.cluster.NodeFSGetFile)
v3.GET("/cluster/node/:id/process", s.v3handler.cluster.ListNodeProcesses)
v3.GET("/cluster/node/:id/version", s.v3handler.cluster.GetNodeVersion)
@ -765,6 +767,9 @@ func (s *server) setRoutesV3(v3 *echo.Group) {
v3.PUT("/cluster/process/:id/command", s.v3handler.cluster.SetProcessCommand)
v3.PUT("/cluster/process/:id/metadata/:key", s.v3handler.cluster.SetProcessMetadata)
v3.DELETE("/cluster/node/:id/fs/:storage/*", s.v3handler.cluster.NodeFSDeleteFile)
v3.PUT("/cluster/node/:id/fs/:storage/*", s.v3handler.cluster.NodeFSPutFile)
v3.PUT("/cluster/iam/reload", s.v3handler.cluster.ReloadIAM)
v3.POST("/cluster/iam/user", s.v3handler.cluster.AddIdentity)
v3.PUT("/cluster/iam/user/:name", s.v3handler.cluster.UpdateIdentity)