From 50164ea78e64eb6594f3424737cc12ac391bb410 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Tue, 16 Aug 2022 14:32:58 +0300 Subject: [PATCH] Get files from diskfs and memfs via API --- client/client.go | 47 ++++++++++++++++++++++++---------- client/diskfs.go | 6 ++++- client/memfs.go | 6 ++++- cluster/cluster.go | 46 ++++++++++++++++++++++++++++++++- cluster/node.go | 17 +++++++++++- http/fs/cluster.go | 19 ++------------ http/middleware/session/HLS.go | 2 +- 7 files changed, 108 insertions(+), 35 deletions(-) diff --git a/client/client.go b/client/client.go index 0ddaa013..039341e9 100644 --- a/client/client.go +++ b/client/client.go @@ -39,12 +39,14 @@ type RestClient interface { ConfigReload() error // GET /config/reload DiskFSList(sort, order string) ([]api.FileInfo, error) // GET /fs/disk - DiskFSHasFile(path string) bool // GET /fs/disk/{path} + DiskFSHasFile(path string) bool // HEAD /fs/disk/{path} + DiskFSGetFile(path string) (io.ReadCloser, error) // GET /fs/disk/{path} DiskFSDeleteFile(path string) error // DELETE /fs/disk/{path} DiskFSAddFile(path string, data io.Reader) error // PUT /fs/disk/{path} MemFSList(sort, order string) ([]api.FileInfo, error) // GET /fs/mem - MemFSHasFile(path string) bool // GET /fs/mem/{path} + MemFSHasFile(path string) bool // HEAD /fs/mem/{path} + MemFSGetFile(path string) (io.ReadCloser, error) // GET /fs/mem/{path} MemFSDeleteFile(path string) error // DELETE /fs/mem/{path} MemFSAddFile(path string, data io.Reader) error // PUT /fs/mem/{path} @@ -235,9 +237,11 @@ func (r *restclient) login() error { return fmt.Errorf("wrong username and/or password") } + data, _ := io.ReadAll(body) + jwt := api.JWT{} - json.Unmarshal(body, &jwt) + json.Unmarshal(data, &jwt) r.accessToken = jwt.AccessToken r.refreshToken = jwt.RefreshToken @@ -277,9 +281,11 @@ func (r *restclient) refresh() error { return fmt.Errorf("invalid refresh token") } + data, _ := io.ReadAll(body) + jwt := api.JWTRefresh{} - json.Unmarshal(body, &jwt) + json.Unmarshal(data, &jwt) r.accessToken = jwt.AccessToken @@ -305,14 +311,16 @@ func (r *restclient) info() (api.About, error) { return api.About{}, fmt.Errorf("access to API failed (%d)", status) } + data, _ := io.ReadAll(body) + about := api.About{} - json.Unmarshal(body, &about) + json.Unmarshal(data, &about) return about, nil } -func (r *restclient) call(method, path, contentType string, data io.Reader) ([]byte, error) { +func (r *restclient) stream(method, path, contentType string, data io.Reader) (io.ReadCloser, error) { req, err := http.NewRequest(method, r.address+r.prefix+"/v3"+path, data) if err != nil { return nil, err @@ -345,7 +353,11 @@ func (r *restclient) call(method, path, contentType string, data io.Reader) ([]b if status < 200 || status >= 300 { e := api.Error{} - json.Unmarshal(body, &e) + defer body.Close() + + x, _ := io.ReadAll(body) + + json.Unmarshal(x, &e) return nil, fmt.Errorf("%w", e) } @@ -353,15 +365,24 @@ func (r *restclient) call(method, path, contentType string, data io.Reader) ([]b return body, nil } -func (r *restclient) request(req *http.Request) (int, []byte, error) { +func (r *restclient) call(method, path, contentType string, data io.Reader) ([]byte, error) { + body, err := r.stream(method, path, contentType, data) + if err != nil { + return nil, err + } + + defer body.Close() + + x, _ := io.ReadAll(body) + + return x, nil +} + +func (r *restclient) request(req *http.Request) (int, io.ReadCloser, error) { resp, err := r.client.Do(req) if err != nil { return -1, nil, err } - defer resp.Body.Close() - - body, _ := io.ReadAll(resp.Body) - - return resp.StatusCode, body, nil + return resp.StatusCode, resp.Body, nil } diff --git a/client/diskfs.go b/client/diskfs.go index 4346b2ee..f74b6a02 100644 --- a/client/diskfs.go +++ b/client/diskfs.go @@ -37,11 +37,15 @@ func (r *restclient) DiskFSList(sort, order string) ([]api.FileInfo, error) { } func (r *restclient) DiskFSHasFile(path string) bool { - _, err := r.call("GET", "/fs/disk"+path, "", nil) + _, err := r.call("HEAD", "/fs/disk"+path, "", nil) return err == nil } +func (r *restclient) DiskFSGetFile(path string) (io.ReadCloser, error) { + return r.stream("GET", "/fs/disk"+path, "", nil) +} + func (r *restclient) DiskFSDeleteFile(path string) error { _, err := r.call("DELETE", "/fs/disk"+path, "", nil) diff --git a/client/memfs.go b/client/memfs.go index 2ae30da6..a55cec73 100644 --- a/client/memfs.go +++ b/client/memfs.go @@ -26,11 +26,15 @@ func (r *restclient) MemFSList(sort, order string) ([]api.FileInfo, error) { } func (r *restclient) MemFSHasFile(path string) bool { - _, err := r.call("GET", "/fs/mem"+path, "", nil) + _, err := r.call("HEAD", "/fs/mem"+path, "", nil) return err == nil } +func (r *restclient) MemFSGetFile(path string) (io.ReadCloser, error) { + return r.stream("GET", "/fs/mem"+path, "", nil) +} + func (r *restclient) MemFSDeleteFile(path string) error { _, err := r.call("DELETE", "/fs/mem"+path, "", nil) diff --git a/cluster/cluster.go b/cluster/cluster.go index e19cfc93..71b4d497 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -3,6 +3,7 @@ package cluster import ( "context" "fmt" + "io" "sync" "time" @@ -12,6 +13,7 @@ import ( type ClusterReader interface { GetURL(path string) (string, error) + GetFile(path string) (io.ReadCloser, error) } type dummyClusterReader struct{} @@ -24,6 +26,10 @@ func (r *dummyClusterReader) GetURL(path string) (string, error) { return "", fmt.Errorf("not implemented in dummy cluster") } +func (r *dummyClusterReader) GetFile(path string) (io.ReadCloser, error) { + return nil, fmt.Errorf("not implemented in dummy cluster") +} + type Cluster interface { AddNode(address, username, password string) (string, error) RemoveNode(id string) error @@ -240,7 +246,7 @@ func (c *cluster) GetURL(path string) (string, error) { return "", fmt.Errorf("file not found") } - url, err := node.GetURL(path) + url, err := node.getURL(path) if err != nil { c.logger.Debug().WithField("path", path).Log("Invalid path") return "", fmt.Errorf("file not found") @@ -250,3 +256,41 @@ func (c *cluster) GetURL(path string) (string, error) { return url, nil } + +func (c *cluster) GetFile(path string) (io.ReadCloser, error) { + c.lock.RLock() + defer c.lock.RUnlock() + + id, ok := c.fileid[path] + if !ok { + c.logger.Debug().WithField("path", path).Log("Not found") + return nil, fmt.Errorf("file not found") + } + + ts, ok := c.idupdate[id] + if !ok { + c.logger.Debug().WithField("path", path).Log("No age information found") + return nil, fmt.Errorf("file not found") + } + + if time.Since(ts) > 2*time.Second { + c.logger.Debug().WithField("path", path).Log("File too old") + return nil, fmt.Errorf("file not found") + } + + node, ok := c.nodes[id] + if !ok { + c.logger.Debug().WithField("path", path).Log("Unknown node") + return nil, fmt.Errorf("file not found") + } + + data, err := node.getFile(path) + if err != nil { + c.logger.Debug().WithField("path", path).Log("Invalid path") + return nil, fmt.Errorf("file not found") + } + + c.logger.Debug().WithField("path", path).Log("File cluster path") + + return data, nil +} diff --git a/cluster/node.go b/cluster/node.go index 2a14cf14..3f93975d 100644 --- a/cluster/node.go +++ b/cluster/node.go @@ -3,6 +3,7 @@ package cluster import ( "context" "fmt" + "io" "net" "net/http" "net/url" @@ -253,7 +254,7 @@ func (n *node) files() { } } -func (n *node) GetURL(path string) (string, error) { +func (n *node) getURL(path string) (string, error) { // Remove prefix from path prefix := n.prefix.FindString(path) path = n.prefix.ReplaceAllString(path, "") @@ -285,3 +286,17 @@ func (n *node) GetURL(path string) (string, error) { return u, nil } + +func (n *node) getFile(path string) (io.ReadCloser, error) { + // Remove prefix from path + prefix := n.prefix.FindString(path) + path = n.prefix.ReplaceAllString(path, "") + + if prefix == "memfs:" { + return n.peer.MemFSGetFile(path) + } else if prefix == "diskfs:" { + return n.peer.DiskFSGetFile(path) + } + + return nil, fmt.Errorf("unknown prefix") +} diff --git a/http/fs/cluster.go b/http/fs/cluster.go index 6848d5f6..e42d091a 100644 --- a/http/fs/cluster.go +++ b/http/fs/cluster.go @@ -2,7 +2,6 @@ package fs import ( "io" - "net/http" "time" "github.com/datarhei/core/v16/cluster" @@ -37,27 +36,13 @@ func (fs *filesystem) Open(path string) fs.File { } // Check if the file is available in the cluster - url, err := fs.cluster.GetURL(fs.what + ":" + path) - if err != nil { - return nil - } - - req, err := http.NewRequest("GET", url, nil) - if err != nil { - return nil - } - - client := &http.Client{ - Timeout: 15 * time.Second, - } - - resp, err := client.Do(req) + data, err := fs.cluster.GetFile(fs.what + ":" + path) if err != nil { return nil } file := &file{ - ReadCloser: resp.Body, + ReadCloser: data, name: path, } diff --git a/http/middleware/session/HLS.go b/http/middleware/session/HLS.go index 3de73be4..e8b791b5 100644 --- a/http/middleware/session/HLS.go +++ b/http/middleware/session/HLS.go @@ -186,7 +186,7 @@ func (h *hls) handleEgress(c echo.Context, next echo.HandlerFunc) error { // Add the new session's top bitrate to the ingress top bitrate resultingBitrate := currentBitrate + streamBitrate - if resultingBitrate <= 0.5 || resultingBitrate >= maxBitrate { + if resultingBitrate >= maxBitrate { return echo.NewHTTPError(509, "Bitrate limit exceeded") } }