Add diskfs proxying
This commit is contained in:
parent
c31fd657be
commit
5af85c6a8b
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -16,7 +17,7 @@ type Cluster interface {
|
||||
ListNodes() []NodeReader
|
||||
GetNode(id string) (NodeReader, error)
|
||||
Stop()
|
||||
GetFile(path string) (string, error)
|
||||
GetURL(path string) (string, error)
|
||||
}
|
||||
|
||||
type ClusterConfig struct {
|
||||
@ -35,6 +36,8 @@ type cluster struct {
|
||||
cancel context.CancelFunc
|
||||
once sync.Once
|
||||
|
||||
prefix *regexp.Regexp
|
||||
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
@ -45,6 +48,7 @@ func New(config ClusterConfig) (Cluster, error) {
|
||||
idupdate: map[string]time.Time{},
|
||||
fileid: map[string]string{},
|
||||
updates: make(chan NodeState, 64),
|
||||
prefix: regexp.MustCompile(`^[a-z]+:`),
|
||||
logger: config.Logger,
|
||||
}
|
||||
|
||||
@ -170,7 +174,7 @@ func (c *cluster) GetNode(id string) (NodeReader, error) {
|
||||
return node, nil
|
||||
}
|
||||
|
||||
func (c *cluster) GetFile(path string) (string, error) {
|
||||
func (c *cluster) GetURL(path string) (string, error) {
|
||||
c.lock.RLock()
|
||||
defer c.lock.RUnlock()
|
||||
|
||||
@ -199,7 +203,20 @@ func (c *cluster) GetFile(path string) (string, error) {
|
||||
return "", fmt.Errorf("file not found")
|
||||
}
|
||||
|
||||
url := node.Address() + "/" + filepath.Join("memfs", path)
|
||||
// Remove prefix from path
|
||||
prefix := c.prefix.FindString(path)
|
||||
path = c.prefix.ReplaceAllString(path, "")
|
||||
|
||||
url := ""
|
||||
|
||||
if prefix == "memfs:" {
|
||||
url = node.Address() + "/" + filepath.Join("memfs", path)
|
||||
} else if prefix == "diskfs:" {
|
||||
url = node.Address() + path
|
||||
} else {
|
||||
c.logger.Debug().WithField("path", path).WithField("prefix", prefix).Log("unknown prefix")
|
||||
return "", fmt.Errorf("file not found")
|
||||
}
|
||||
|
||||
c.logger.Debug().WithField("url", url).Log("file cluster url")
|
||||
|
||||
|
||||
@ -131,11 +131,12 @@ func (n *node) stop() {
|
||||
}
|
||||
|
||||
func (n *node) files() {
|
||||
files, err := n.peer.MemFSList("name", "asc")
|
||||
memfsfiles, errMemfs := n.peer.MemFSList("name", "asc")
|
||||
diskfsfiles, errDiskfs := n.peer.DiskFSList("name", "asc")
|
||||
|
||||
n.lastUpdate = time.Now()
|
||||
|
||||
if err != nil {
|
||||
if errMemfs != nil || errDiskfs != nil {
|
||||
n.fileList = nil
|
||||
n.state = stateDisconnected
|
||||
return
|
||||
@ -143,10 +144,18 @@ func (n *node) files() {
|
||||
|
||||
n.state = stateConnected
|
||||
|
||||
n.fileList = make([]string, len(files))
|
||||
n.fileList = make([]string, len(memfsfiles)+len(diskfsfiles))
|
||||
|
||||
for i, file := range files {
|
||||
n.fileList[i] = file.Name
|
||||
nfiles := 0
|
||||
|
||||
for _, file := range memfsfiles {
|
||||
n.fileList[nfiles] = "memfs:" + file.Name
|
||||
nfiles++
|
||||
}
|
||||
|
||||
for _, file := range diskfsfiles {
|
||||
n.fileList[nfiles] = "diskfs:" + file.Name
|
||||
nfiles++
|
||||
}
|
||||
|
||||
return
|
||||
|
||||
@ -16,12 +16,14 @@ type Filesystem interface {
|
||||
type filesystem struct {
|
||||
fs.Filesystem
|
||||
|
||||
what string
|
||||
cluster cluster.Cluster
|
||||
}
|
||||
|
||||
func NewClusterFS(what string, fs fs.Filesystem, cluster cluster.Cluster) Filesystem {
|
||||
f := &filesystem{
|
||||
Filesystem: fs,
|
||||
what: what,
|
||||
cluster: cluster,
|
||||
}
|
||||
|
||||
@ -35,7 +37,7 @@ func (fs *filesystem) Open(path string) fs.File {
|
||||
}
|
||||
|
||||
// Check if the file is available in the cluster
|
||||
url, err := fs.cluster.GetFile(path)
|
||||
url, err := fs.cluster.GetURL(fs.what + ":" + path)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -49,7 +49,10 @@ func (h *DiskFSHandler) GetFile(c echo.Context) error {
|
||||
return api.Err(http.StatusNotFound, "File not found", path)
|
||||
}
|
||||
|
||||
stat, _ := file.Stat()
|
||||
stat, err := file.Stat()
|
||||
if err != nil {
|
||||
return api.Err(http.StatusNotFound, "File not found", path)
|
||||
}
|
||||
|
||||
if stat.IsDir() {
|
||||
path = filepath.Join(path, "index.html")
|
||||
@ -61,7 +64,10 @@ func (h *DiskFSHandler) GetFile(c echo.Context) error {
|
||||
return api.Err(http.StatusNotFound, "File not found", path)
|
||||
}
|
||||
|
||||
stat, _ = file.Stat()
|
||||
stat, err = file.Stat()
|
||||
if err != nil {
|
||||
return api.Err(http.StatusNotFound, "File not found", path)
|
||||
}
|
||||
}
|
||||
|
||||
defer file.Close()
|
||||
|
||||
@ -184,8 +184,13 @@ func NewServer(config Config) (Server, error) {
|
||||
config.Cache,
|
||||
)
|
||||
|
||||
filesystem := config.DiskFS
|
||||
if config.Cluster != nil {
|
||||
filesystem = clusterfs.NewClusterFS("diskfs", filesystem, config.Cluster)
|
||||
}
|
||||
|
||||
s.handler.diskfs = handler.NewDiskFS(
|
||||
config.DiskFS,
|
||||
filesystem,
|
||||
config.Cache,
|
||||
)
|
||||
|
||||
@ -230,7 +235,7 @@ func NewServer(config Config) (Server, error) {
|
||||
|
||||
filesystem := config.MemFS.Filesystem
|
||||
if config.Cluster != nil {
|
||||
filesystem = clusterfs.NewClusterFS("TODO", filesystem, config.Cluster)
|
||||
filesystem = clusterfs.NewClusterFS("memfs", filesystem, config.Cluster)
|
||||
}
|
||||
|
||||
s.handler.memfs = handler.NewMemFS(
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user