From 5af85c6a8b4e1e9ac2abb770fe2e0f1cd3b8d902 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Fri, 5 Aug 2022 12:30:25 +0200 Subject: [PATCH] Add diskfs proxying --- cluster/cluster.go | 23 ++++++++++++++++++++--- cluster/node.go | 19 ++++++++++++++----- http/fs/cluster.go | 4 +++- http/handler/diskfs.go | 10 ++++++++-- http/server.go | 9 +++++++-- 5 files changed, 52 insertions(+), 13 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index 910546fd..d0567ec5 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "path/filepath" + "regexp" "sync" "time" @@ -16,7 +17,7 @@ type Cluster interface { ListNodes() []NodeReader GetNode(id string) (NodeReader, error) Stop() - GetFile(path string) (string, error) + GetURL(path string) (string, error) } type ClusterConfig struct { @@ -35,6 +36,8 @@ type cluster struct { cancel context.CancelFunc once sync.Once + prefix *regexp.Regexp + logger log.Logger } @@ -45,6 +48,7 @@ func New(config ClusterConfig) (Cluster, error) { idupdate: map[string]time.Time{}, fileid: map[string]string{}, updates: make(chan NodeState, 64), + prefix: regexp.MustCompile(`^[a-z]+:`), logger: config.Logger, } @@ -170,7 +174,7 @@ func (c *cluster) GetNode(id string) (NodeReader, error) { return node, nil } -func (c *cluster) GetFile(path string) (string, error) { +func (c *cluster) GetURL(path string) (string, error) { c.lock.RLock() defer c.lock.RUnlock() @@ -199,7 +203,20 @@ func (c *cluster) GetFile(path string) (string, error) { return "", fmt.Errorf("file not found") } - url := node.Address() + "/" + filepath.Join("memfs", path) + // Remove prefix from path + prefix := c.prefix.FindString(path) + path = c.prefix.ReplaceAllString(path, "") + + url := "" + + if prefix == "memfs:" { + url = node.Address() + "/" + filepath.Join("memfs", path) + } else if prefix == "diskfs:" { + url = node.Address() + path + } else { + c.logger.Debug().WithField("path", path).WithField("prefix", prefix).Log("unknown prefix") + return "", fmt.Errorf("file not found") + } c.logger.Debug().WithField("url", url).Log("file cluster url") diff --git a/cluster/node.go b/cluster/node.go index e1cf8a0a..5daea55e 100644 --- a/cluster/node.go +++ b/cluster/node.go @@ -131,11 +131,12 @@ func (n *node) stop() { } func (n *node) files() { - files, err := n.peer.MemFSList("name", "asc") + memfsfiles, errMemfs := n.peer.MemFSList("name", "asc") + diskfsfiles, errDiskfs := n.peer.DiskFSList("name", "asc") n.lastUpdate = time.Now() - if err != nil { + if errMemfs != nil || errDiskfs != nil { n.fileList = nil n.state = stateDisconnected return @@ -143,10 +144,18 @@ func (n *node) files() { n.state = stateConnected - n.fileList = make([]string, len(files)) + n.fileList = make([]string, len(memfsfiles)+len(diskfsfiles)) - for i, file := range files { - n.fileList[i] = file.Name + nfiles := 0 + + for _, file := range memfsfiles { + n.fileList[nfiles] = "memfs:" + file.Name + nfiles++ + } + + for _, file := range diskfsfiles { + n.fileList[nfiles] = "diskfs:" + file.Name + nfiles++ } return diff --git a/http/fs/cluster.go b/http/fs/cluster.go index 10d1f1dc..8c713000 100644 --- a/http/fs/cluster.go +++ b/http/fs/cluster.go @@ -16,12 +16,14 @@ type Filesystem interface { type filesystem struct { fs.Filesystem + what string cluster cluster.Cluster } func NewClusterFS(what string, fs fs.Filesystem, cluster cluster.Cluster) Filesystem { f := &filesystem{ Filesystem: fs, + what: what, cluster: cluster, } @@ -35,7 +37,7 @@ func (fs *filesystem) Open(path string) fs.File { } // Check if the file is available in the cluster - url, err := fs.cluster.GetFile(path) + url, err := fs.cluster.GetURL(fs.what + ":" + path) if err != nil { return nil } diff --git a/http/handler/diskfs.go b/http/handler/diskfs.go index 9726c258..545cf149 100644 --- a/http/handler/diskfs.go +++ b/http/handler/diskfs.go @@ -49,7 +49,10 @@ func (h *DiskFSHandler) GetFile(c echo.Context) error { return api.Err(http.StatusNotFound, "File not found", path) } - stat, _ := file.Stat() + stat, err := file.Stat() + if err != nil { + return api.Err(http.StatusNotFound, "File not found", path) + } if stat.IsDir() { path = filepath.Join(path, "index.html") @@ -61,7 +64,10 @@ func (h *DiskFSHandler) GetFile(c echo.Context) error { return api.Err(http.StatusNotFound, "File not found", path) } - stat, _ = file.Stat() + stat, err = file.Stat() + if err != nil { + return api.Err(http.StatusNotFound, "File not found", path) + } } defer file.Close() diff --git a/http/server.go b/http/server.go index 07a8264a..f8bd7317 100644 --- a/http/server.go +++ b/http/server.go @@ -184,8 +184,13 @@ func NewServer(config Config) (Server, error) { config.Cache, ) + filesystem := config.DiskFS + if config.Cluster != nil { + filesystem = clusterfs.NewClusterFS("diskfs", filesystem, config.Cluster) + } + s.handler.diskfs = handler.NewDiskFS( - config.DiskFS, + filesystem, config.Cache, ) @@ -230,7 +235,7 @@ func NewServer(config Config) (Server, error) { filesystem := config.MemFS.Filesystem if config.Cluster != nil { - filesystem = clusterfs.NewClusterFS("TODO", filesystem, config.Cluster) + filesystem = clusterfs.NewClusterFS("memfs", filesystem, config.Cluster) } s.handler.memfs = handler.NewMemFS(