From 7f59c188cf3585332e2287009009a5ae0daed8bd Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Thu, 4 May 2023 19:49:53 +0200 Subject: [PATCH] Remove node storage, use raft configuration instead; re-establish file and stream proxying --- app/api/api.go | 4 +- client/client.go | 59 +++++- cluster/api.go | 139 ++++++++++++++- cluster/cluster.go | 297 ++++++++++++++++++++++++------- cluster/forwarder.go | 132 +++----------- cluster/node.go | 211 ++++++++++++++++------ cluster/proxy.go | 98 ++++++++-- cluster/store.go | 40 ++++- docs/docs.go | 345 ++++++++++++++++++------------------ docs/swagger.json | 345 ++++++++++++++++++------------------ docs/swagger.yaml | 231 ++++++++++++------------ http/api/cluster.go | 32 +++- http/fs/cluster.go | 12 +- http/handler/api/cluster.go | 127 ++++++++++++- http/server.go | 19 +- rtmp/rtmp.go | 12 +- srt/srt.go | 12 +- 17 files changed, 1353 insertions(+), 762 deletions(-) diff --git a/app/api/api.go b/app/api/api.go index e1fb0b14..b2aa3116 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -948,7 +948,7 @@ func (a *api) start() error { Token: cfg.RTMP.Token, Logger: a.log.logger.rtmp, Collector: a.sessions.Collector("rtmp"), - Cluster: a.cluster, + Proxy: a.cluster.ProxyReader(), } if cfg.RTMP.EnableTLS { @@ -977,7 +977,7 @@ func (a *api) start() error { Token: cfg.SRT.Token, Logger: a.log.logger.core.WithComponent("SRT").WithField("address", cfg.SRT.Address), Collector: a.sessions.Collector("srt"), - Cluster: a.cluster, + Proxy: a.cluster.ProxyReader(), } if cfg.SRT.Log.Enable { diff --git a/client/client.go b/client/client.go index 039341e9..21c7d666 100644 --- a/client/client.go +++ b/client/client.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net/http" + "net/url" "strings" "time" @@ -32,7 +33,8 @@ type RestClient interface { // Address returns the address of the connected datarhei Core Address() string - About() api.About // GET / + About() api.About // GET / + Ping() (bool, time.Duration) // GET /ping Config() (api.Config, error) // GET /config ConfigSet(config api.ConfigData) error // POST /config @@ -120,6 +122,27 @@ func New(config Config) (RestClient, error) { client: config.Client, } + u, err := url.Parse(r.address) + if err != nil { + return nil, err + } + + username := u.User.Username() + if len(username) != 0 { + r.username = username + } + + if password, ok := u.User.Password(); ok { + r.password = password + } + + u.User = nil + u.RawQuery = "" + u.Fragment = "" + + r.address = u.String() + fmt.Printf("address: %s\n", r.address) + if r.client == nil { r.client = &http.Client{ Timeout: 15 * time.Second, @@ -137,6 +160,12 @@ func New(config Config) (RestClient, error) { return nil, fmt.Errorf("didn't receive the expected API response (got: %s, want: %s)", r.about.Name, coreapp) } + if len(r.about.ID) == 0 { + if err := r.login(); err != nil { + return nil, err + } + } + c, _ := semver.NewConstraint(coreversion) v, err := semver.NewVersion(r.about.Version.Number) if err != nil { @@ -147,12 +176,6 @@ func New(config Config) (RestClient, error) { return nil, fmt.Errorf("the core version (%s) is not supported (%s)", r.about.Version.Number, coreversion) } - if len(r.about.ID) == 0 { - if err := r.login(); err != nil { - return nil, err - } - } - return r, nil } @@ -172,6 +195,28 @@ func (r *restclient) About() api.About { return r.about } +func (r *restclient) Ping() (bool, time.Duration) { + req, err := http.NewRequest(http.MethodGet, r.address+"/ping", nil) + if err != nil { + return false, time.Duration(0) + } + + start := time.Now() + + status, body, err := r.request(req) + if err != nil { + return false, time.Since(start) + } + + defer body.Close() + + if status != 200 { + return false, time.Since(start) + } + + return true, time.Since(start) +} + func (r *restclient) login() error { login := api.Login{} diff --git a/cluster/api.go b/cluster/api.go index fd68f185..81fff71f 100644 --- a/cluster/api.go +++ b/cluster/api.go @@ -3,8 +3,12 @@ package cluster import ( "bytes" "context" + "encoding/json" + "fmt" + "io" "net/http" "strings" + "time" httpapi "github.com/datarhei/core/v16/http/api" "github.com/datarhei/core/v16/http/errorhandler" @@ -60,7 +64,7 @@ func NewAPI(config APIConfig) (API, error) { a.logger = log.New("") } - address, err := config.Cluster.APIAddr("") + address, err := config.Cluster.ClusterAPIAddress("") if err != nil { return nil, err } @@ -97,7 +101,7 @@ func NewAPI(config APIConfig) (API, error) { return httpapi.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) } - a.logger.Debug().Log("got join request: %+v", r) + a.logger.Debug().WithField("id", r.ID).Log("got join request: %+v", r) if r.Origin == a.id { return httpapi.Err(http.StatusLoopDetected, "", "breaking circuit") @@ -105,7 +109,7 @@ func NewAPI(config APIConfig) (API, error) { err := a.cluster.Join(r.Origin, r.ID, r.RaftAddress, r.APIAddress, "") if err != nil { - a.logger.Debug().WithError(err).Log("unable to join cluster") + a.logger.Debug().WithError(err).WithField("id", r.ID).Log("unable to join cluster") return httpapi.Err(http.StatusInternalServerError, "unable to join cluster", "%s", err) } @@ -119,7 +123,7 @@ func NewAPI(config APIConfig) (API, error) { return httpapi.Err(http.StatusBadRequest, "Invalid JSON", "%s", err) } - a.logger.Debug().Log("got leave request: %+v", r) + a.logger.Debug().WithField("id", r.ID).Log("got leave request: %+v", r) if r.Origin == a.id { return httpapi.Err(http.StatusLoopDetected, "", "breaking circuit") @@ -127,7 +131,7 @@ func NewAPI(config APIConfig) (API, error) { err := a.cluster.Leave(r.Origin, r.ID) if err != nil { - a.logger.Debug().WithError(err).Log("unable to leave cluster") + a.logger.Debug().WithError(err).WithField("id", r.ID).Log("unable to leave cluster") return httpapi.Err(http.StatusInternalServerError, "unable to leave cluster", "%s", err) } @@ -141,7 +145,9 @@ func NewAPI(config APIConfig) (API, error) { return httpapi.Err(http.StatusInternalServerError, "unable to create snapshot", "%s", err) } - return c.Stream(http.StatusOK, "application/octet-stream", bytes.NewReader(data)) + defer data.Close() + + return c.Stream(http.StatusOK, "application/octet-stream", data) }) a.router.POST("/v1/process", func(c echo.Context) error { @@ -152,6 +158,11 @@ func NewAPI(config APIConfig) (API, error) { return httpapi.Err(http.StatusNotImplemented, "") }) + a.router.GET("/v1/core", func(c echo.Context) error { + address, _ := a.cluster.CoreAPIAddress("") + return c.JSON(http.StatusOK, address) + }) + return a, nil } @@ -163,3 +174,119 @@ func (a *api) Start() error { func (a *api) Shutdown(ctx context.Context) error { return a.router.Shutdown(ctx) } + +type APIClient struct { + Address string + Client *http.Client +} + +func (c *APIClient) CoreAPIAddress() (string, error) { + data, err := c.call(http.MethodGet, "/core", "", nil) + if err != nil { + return "", err + } + + var address string + err = json.Unmarshal(data, &address) + if err != nil { + return "", err + } + + return address, nil +} + +func (c *APIClient) Join(r JoinRequest) error { + data, err := json.Marshal(&r) + if err != nil { + return err + } + + _, err = c.call(http.MethodPost, "/join", "application/json", bytes.NewReader(data)) + + return err +} + +func (c *APIClient) Leave(r LeaveRequest) error { + data, err := json.Marshal(&r) + if err != nil { + return err + } + + _, err = c.call(http.MethodPost, "/leave", "application/json", bytes.NewReader(data)) + + return err +} + +func (c *APIClient) Snapshot() (io.ReadCloser, error) { + return c.stream(http.MethodGet, "/snapshot", "", nil) +} + +func (c *APIClient) stream(method, path, contentType string, data io.Reader) (io.ReadCloser, error) { + if len(c.Address) == 0 { + return nil, fmt.Errorf("no address defined") + } + + address := "http://" + c.Address + "/v1" + path + + req, err := http.NewRequest(method, address, data) + if err != nil { + return nil, err + } + + if method == "POST" || method == "PUT" { + req.Header.Add("Content-Type", contentType) + } + + status, body, err := c.request(req) + if err != nil { + return nil, err + } + + if status < 200 || status >= 300 { + e := httpapi.Error{} + + defer body.Close() + + x, _ := io.ReadAll(body) + + json.Unmarshal(x, &e) + + return nil, e + } + + return body, nil +} + +func (c *APIClient) call(method, path, contentType string, data io.Reader) ([]byte, error) { + body, err := c.stream(method, path, contentType, data) + if err != nil { + return nil, err + } + + defer body.Close() + + x, _ := io.ReadAll(body) + + return x, nil +} + +func (c *APIClient) request(req *http.Request) (int, io.ReadCloser, error) { + if c.Client == nil { + tr := &http.Transport{ + MaxIdleConns: 10, + IdleConnTimeout: 30 * time.Second, + } + + c.Client = &http.Client{ + Transport: tr, + Timeout: 5 * time.Second, + } + } + + resp, err := c.Client.Do(req) + if err != nil { + return -1, nil, err + } + + return resp.StatusCode, resp.Body, nil +} diff --git a/cluster/cluster.go b/cluster/cluster.go index 9ff7ff8b..9553c4ec 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -27,14 +27,14 @@ import ( /* /api/v3: - GET /cluster/db/node - list all nodes that are stored in the FSM - Cluster.Store.ListNodes() - POST /cluster/db/node - add a node to the FSM - Cluster.Store.AddNode() - DELETE /cluster/db/node/:id - remove a node from the FSM - Cluster.Store.RemoveNode() + GET /cluster/store/node - list all nodes that are stored in the FSM - Cluster.Store.ListNodes() + POST /cluster/store/node - add a node to the FSM - Cluster.Store.AddNode() + DELETE /cluster/store/node/:id - remove a node from the FSM - Cluster.Store.RemoveNode() - GET /cluster/db/process - list all process configs that are stored in the FSM - Cluster.Store.ListProcesses() - POST /cluster/db/process - add a process config to the FSM - Cluster.Store.AddProcess() - PUT /cluster/db/process/:id - update a process config in the FSM - Cluster.Store.UpdateProcess() - DELETE /cluster/db/process/:id - remove a process config from the FSM - Cluster.Store.RemoveProcess() + GET /cluster/store/process - list all process configs that are stored in the FSM - Cluster.Store.ListProcesses() + POST /cluster/store/process - add a process config to the FSM - Cluster.Store.AddProcess() + PUT /cluster/store/process/:id - update a process config in the FSM - Cluster.Store.UpdateProcess() + DELETE /cluster/store/process/:id - remove a process config from the FSM - Cluster.Store.RemoveProcess() ** for the processes, the leader will decide where to actually run them. the process configs will also be added to the regular process DB of each core. @@ -47,32 +47,23 @@ import ( var ErrNodeNotFound = errors.New("node not found") -type ClusterReader interface { - GetURL(path string) (string, error) - GetFile(path string) (io.ReadCloser, error) -} - -type dummyClusterReader struct{} - -func NewDummyClusterReader() ClusterReader { - return &dummyClusterReader{} -} - -func (r *dummyClusterReader) GetURL(path string) (string, error) { - return "", fmt.Errorf("not implemented in dummy cluster") -} - -func (r *dummyClusterReader) GetFile(path string) (io.ReadCloser, error) { - return nil, fmt.Errorf("not implemented in dummy cluster") -} - type Cluster interface { - Addr() string - APIAddr(raftAddress string) (string, error) + // Address returns the raft address of this node + Address() string + + // ClusterAPIAddress returns the address of the cluster API of a node + // with the given raft address. + ClusterAPIAddress(raftAddress string) (string, error) + + // CoreAPIAddress returns the address of the core API of a node with + // the given raft address. + CoreAPIAddress(raftAddress string) (string, error) + + About() (ClusterAbout, error) Join(origin, id, raftAddress, apiAddress, peerAddress string) error Leave(origin, id string) error // gracefully remove a node from the cluster - Snapshot() ([]byte, error) + Snapshot() (io.ReadCloser, error) Shutdown() error @@ -81,7 +72,7 @@ type Cluster interface { ListNodes() []addNodeCommand GetNode(id string) (addNodeCommand, error) - ClusterReader + ProxyReader() ProxyReader } type Peer struct { @@ -143,6 +134,9 @@ type cluster struct { hasRaftLeader bool isLeader bool leaderLock sync.Mutex + + nodes map[string]Node + nodesLock sync.RWMutex } func New(config ClusterConfig) (Cluster, error) { @@ -158,6 +152,8 @@ func New(config ClusterConfig) (Cluster, error) { reassertLeaderCh: make(chan chan error), leaveCh: make(chan struct{}), shutdownCh: make(chan struct{}), + + nodes: map[string]Node{}, } u, err := url.Parse(config.CoreAPIAddress) @@ -216,6 +212,8 @@ func New(config ClusterConfig) (Cluster, error) { c.proxy = proxy + go c.trackNodeChanges() + if forwarder, err := NewForwarder(ForwarderConfig{ ID: c.id, Logger: c.logger.WithField("logname", "forwarder"), @@ -236,7 +234,7 @@ func New(config ClusterConfig) (Cluster, error) { if len(config.Peers) != 0 { for i := 0; i < len(config.Peers); i++ { - peerAddress, err := c.APIAddr(config.Peers[i].Address) + peerAddress, err := c.ClusterAPIAddress(config.Peers[i].Address) if err != nil { c.shutdownAPI() c.shutdownRaft() @@ -268,13 +266,13 @@ func New(config ClusterConfig) (Cluster, error) { return c, nil } -func (c *cluster) Addr() string { +func (c *cluster) Address() string { return c.raftAddress } -func (c *cluster) APIAddr(raftAddress string) (string, error) { +func (c *cluster) ClusterAPIAddress(raftAddress string) (string, error) { if len(raftAddress) == 0 { - raftAddress = c.raftAddress + raftAddress = c.Address() } host, port, _ := gonet.SplitHostPort(raftAddress) @@ -287,6 +285,29 @@ func (c *cluster) APIAddr(raftAddress string) (string, error) { return gonet.JoinHostPort(host, strconv.Itoa(p+1)), nil } +func (c *cluster) CoreAPIAddress(raftAddress string) (string, error) { + if len(raftAddress) == 0 { + raftAddress = c.Address() + } + + if raftAddress == c.Address() { + return c.coreAddress, nil + } + + addr, err := c.ClusterAPIAddress(raftAddress) + if err != nil { + return "", err + } + + client := APIClient{ + Address: addr, + } + + coreAddress, err := client.CoreAPIAddress() + + return coreAddress, err +} + func (c *cluster) Shutdown() error { c.logger.Info().Log("shutting down cluster") c.shutdownLock.Lock() @@ -299,6 +320,11 @@ func (c *cluster) Shutdown() error { c.shutdown = true close(c.shutdownCh) + for id, node := range c.nodes { + node.Disconnect() + c.proxy.RemoveNode(id) + } + if c.proxy != nil { c.proxy.Stop() } @@ -437,11 +463,6 @@ func (c *cluster) Leave(origin, id string) error { return nil } - err := c.RemoveNode(id) - if err != nil { - c.logger.Error().WithError(err).Log("failed to apply log for removal") - } - // Remove another sever from the cluster if future := c.raft.RemoveServer(raft.ServerID(id), 0, 0); future.Error() != nil { c.logger.Error().WithError(future.Error()).WithFields(log.Fields{ @@ -468,6 +489,17 @@ func (c *cluster) Join(origin, id, raftAddress, apiAddress, peerAddress string) }).Log("received join request for remote node") // connect to the peer's API in order to find out if our version is compatible + address, err := c.CoreAPIAddress(raftAddress) + if err != nil { + return fmt.Errorf("peer API doesn't respond: %w", err) + } + + node := NewNode(address) + err = node.Connect() + if err != nil { + return fmt.Errorf("couldn't connect to peer: %w", err) + } + defer node.Disconnect() configFuture := c.raft.GetConfiguration() if err := configFuture.Error(); err != nil { @@ -509,21 +541,6 @@ func (c *cluster) Join(origin, id, raftAddress, apiAddress, peerAddress string) } } - if err := c.AddNode(id, apiAddress); err != nil { - /* - future := c.raft.RemoveServer(raft.ServerID(id), 0, 0) - if err := future.Error(); err != nil { - c.logger.Error().WithError(err).WithFields(log.Fields{ - "nodeid": id, - "address": raftAddress, - }).Log("error removing existing node") - return err - } - return err - */ - c.logger.Debug().WithError(err).Log("") - } - c.logger.Info().WithFields(log.Fields{ "nodeid": id, "address": raftAddress, @@ -537,7 +554,7 @@ type Snapshot struct { Data []byte } -func (c *cluster) Snapshot() ([]byte, error) { +func (c *cluster) Snapshot() (io.ReadCloser, error) { if !c.IsRaftLeader() { c.logger.Debug().Log("not leader, forwarding to leader") return c.forwarder.Snapshot() @@ -573,7 +590,19 @@ func (c *cluster) Snapshot() ([]byte, error) { return nil, err } - return buffer.Bytes(), nil + return &readCloserWrapper{&buffer}, nil +} + +type readCloserWrapper struct { + io.Reader +} + +func (rcw *readCloserWrapper) Read(p []byte) (int, error) { + return rcw.Reader.Read(p) +} + +func (rcw *readCloserWrapper) Close() error { + return nil } func (c *cluster) ListNodes() []addNodeCommand { @@ -639,6 +668,81 @@ func (c *cluster) RemoveNode(id string) error { return nil } +func (c *cluster) trackNodeChanges() { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + // Get the latest configuration. + future := c.raft.GetConfiguration() + if err := future.Error(); err != nil { + c.logger.Error().WithError(err).Log("failed to get raft configuration") + continue + } + + c.nodesLock.Lock() + + removeNodes := map[string]struct{}{} + for id := range c.nodes { + removeNodes[id] = struct{}{} + } + + for _, server := range future.Configuration().Servers { + id := string(server.ID) + if id == c.id { + continue + } + + _, ok := c.nodes[id] + if !ok { + address, err := c.CoreAPIAddress(string(server.Address)) + if err != nil { + c.logger.Warn().WithError(err).WithFields(log.Fields{ + "id": id, + "address": server.Address, + }).Log("Discovering core API address failed") + continue + } + + node := NewNode(address) + err = node.Connect() + if err != nil { + c.logger.Warn().WithError(err).WithFields(log.Fields{ + "id": id, + "address": server.Address, + }).Log("Connecting to core API failed") + continue + } + + if _, err := c.proxy.AddNode(id, node); err != nil { + c.logger.Warn().WithError(err).WithFields(log.Fields{ + "id": id, + "address": address, + }).Log("Adding node failed") + } + + c.nodes[id] = node + } else { + delete(removeNodes, id) + } + } + + for id := range removeNodes { + if node, ok := c.nodes[id]; ok { + node.Disconnect() + c.proxy.RemoveNode(id) + } + } + + c.nodesLock.Unlock() + case <-c.shutdownCh: + return + } + } +} + // trackLeaderChanges registers an Observer with raft in order to receive updates // about leader changes, in order to keep the forwarder up to date. func (c *cluster) trackLeaderChanges() { @@ -661,7 +765,7 @@ func (c *cluster) trackLeaderChanges() { }).Log("new leader observation") addr := string(leaderObs.LeaderAddr) if len(addr) != 0 { - addr, _ = c.APIAddr(addr) + addr, _ = c.ClusterAPIAddress(addr) } c.forwarder.SetLeader(addr) c.leaderLock.Lock() @@ -901,6 +1005,76 @@ func (c *cluster) applyCommand(cmd *command) error { return nil } +type ClusterServer struct { + ID string + Address string + Voter bool + Leader bool +} + +type ClusterStats struct { + State string + LastContact time.Duration + NumPeers uint64 +} + +type ClusterAbout struct { + ID string + Address string + ClusterAPIAddress string + CoreAPIAddress string + Nodes []ClusterServer + Stats ClusterStats +} + +func (c *cluster) About() (ClusterAbout, error) { + about := ClusterAbout{ + ID: c.id, + Address: c.Address(), + } + + if address, err := c.ClusterAPIAddress(""); err == nil { + about.ClusterAPIAddress = address + } + + if address, err := c.CoreAPIAddress(""); err == nil { + about.CoreAPIAddress = address + } + + stats := c.raft.Stats() + + about.Stats.State = stats["state"] + + if x, err := time.ParseDuration(stats["last_contact"]); err == nil { + about.Stats.LastContact = x + } + + if x, err := strconv.ParseUint(stats["num_peers"], 10, 64); err == nil { + about.Stats.NumPeers = x + } + + _, leaderID := c.raft.LeaderWithID() + + future := c.raft.GetConfiguration() + if err := future.Error(); err != nil { + c.logger.Error().WithError(err).Log("failed to get raft configuration") + return ClusterAbout{}, err + } + + for _, server := range future.Configuration().Servers { + node := ClusterServer{ + ID: string(server.ID), + Address: string(server.Address), + Voter: server.Suffrage == raft.Voter, + Leader: server.ID == leaderID, + } + + about.Nodes = append(about.Nodes, node) + } + + return about, nil +} + func (c *cluster) sentinel() { ticker := time.NewTicker(time.Second) defer ticker.Stop() @@ -918,7 +1092,6 @@ func (c *cluster) sentinel() { stats := c.raft.Stats() fields := log.Fields{} - for k, v := range stats { fields[k] = v } @@ -950,10 +1123,6 @@ func (c *cluster) sentinel() { } } -func (c *cluster) GetURL(path string) (string, error) { - return c.proxy.GetURL(path) -} - -func (c *cluster) GetFile(path string) (io.ReadCloser, error) { - return c.proxy.GetFile(path) +func (c *cluster) ProxyReader() ProxyReader { + return c.proxy.Reader() } diff --git a/cluster/forwarder.go b/cluster/forwarder.go index 3171d018..2d25bec4 100644 --- a/cluster/forwarder.go +++ b/cluster/forwarder.go @@ -1,15 +1,12 @@ package cluster import ( - "bytes" - "encoding/json" "fmt" "io" "net/http" "sync" "time" - httpapi "github.com/datarhei/core/v16/http/api" "github.com/datarhei/core/v16/log" ) @@ -19,18 +16,17 @@ type Forwarder interface { HasLeader() bool Join(origin, id, raftAddress, apiAddress, peerAddress string) error Leave(origin, id string) error - Snapshot() ([]byte, error) + Snapshot() (io.ReadCloser, error) AddProcess() error UpdateProcess() error RemoveProcess() error } type forwarder struct { - id string - leaderAddr string - lock sync.RWMutex + id string + lock sync.RWMutex - client *http.Client + client APIClient logger log.Logger } @@ -60,7 +56,9 @@ func NewForwarder(config ForwarderConfig) (Forwarder, error) { Timeout: 5 * time.Second, } - f.client = client + f.client = APIClient{ + Client: client, + } return f, nil } @@ -69,17 +67,17 @@ func (f *forwarder) SetLeader(address string) { f.lock.Lock() defer f.lock.Unlock() - if f.leaderAddr == address { + if f.client.Address == address { return } f.logger.Debug().Log("setting leader address to %s", address) - f.leaderAddr = address + f.client.Address = address } func (f *forwarder) HasLeader() bool { - return len(f.leaderAddr) != 0 + return len(f.client.Address) != 0 } func (f *forwarder) Join(origin, id, raftAddress, apiAddress, peerAddress string) error { @@ -96,14 +94,18 @@ func (f *forwarder) Join(origin, id, raftAddress, apiAddress, peerAddress string f.logger.Debug().WithField("request", r).Log("forwarding to leader") - data, err := json.Marshal(&r) - if err != nil { - return err + f.lock.RLock() + client := f.client + f.lock.RUnlock() + + if len(peerAddress) != 0 { + client = APIClient{ + Address: peerAddress, + Client: f.client.Client, + } } - _, err = f.call(http.MethodPost, "/join", "application/json", bytes.NewReader(data), peerAddress) - - return err + return client.Join(r) } func (f *forwarder) Leave(origin, id string) error { @@ -118,28 +120,19 @@ func (f *forwarder) Leave(origin, id string) error { f.logger.Debug().WithField("request", r).Log("forwarding to leader") - data, err := json.Marshal(&r) - if err != nil { - return err - } + f.lock.RLock() + client := f.client + f.lock.RUnlock() - _, err = f.call(http.MethodPost, "/leave", "application/json", bytes.NewReader(data), "") - - return err + return client.Leave(r) } -func (f *forwarder) Snapshot() ([]byte, error) { - r, err := f.stream(http.MethodGet, "/snapshot", "", nil, "") - if err != nil { - return nil, err - } +func (f *forwarder) Snapshot() (io.ReadCloser, error) { + f.lock.RLock() + client := f.client + f.lock.RUnlock() - data, err := io.ReadAll(r) - if err != nil { - return nil, err - } - - return data, nil + return client.Snapshot() } func (f *forwarder) AddProcess() error { @@ -153,70 +146,3 @@ func (f *forwarder) UpdateProcess() error { func (f *forwarder) RemoveProcess() error { return fmt.Errorf("not implemented") } - -func (f *forwarder) stream(method, path, contentType string, data io.Reader, leaderOverride string) (io.ReadCloser, error) { - leaderAddr := f.leaderAddr - if len(leaderOverride) != 0 { - leaderAddr = leaderOverride - } - - if len(leaderAddr) == 0 { - return nil, fmt.Errorf("no leader address defined") - } - - f.lock.RLock() - address := "http://" + leaderAddr + "/v1" + path - f.lock.RUnlock() - - f.logger.Debug().Log("address: %s", address) - - req, err := http.NewRequest(method, address, data) - if err != nil { - return nil, err - } - - if method == "POST" || method == "PUT" { - req.Header.Add("Content-Type", contentType) - } - - status, body, err := f.request(req) - if err != nil { - return nil, err - } - - if status < 200 || status >= 300 { - e := httpapi.Error{} - - defer body.Close() - - x, _ := io.ReadAll(body) - - json.Unmarshal(x, &e) - - return nil, e - } - - return body, nil -} - -func (f *forwarder) call(method, path, contentType string, data io.Reader, leaderOverride string) ([]byte, error) { - body, err := f.stream(method, path, contentType, data, leaderOverride) - if err != nil { - return nil, err - } - - defer body.Close() - - x, _ := io.ReadAll(body) - - return x, nil -} - -func (f *forwarder) request(req *http.Request) (int, io.ReadCloser, error) { - resp, err := f.client.Do(req) - if err != nil { - return -1, nil, err - } - - return resp.StatusCode, resp.Body, nil -} diff --git a/cluster/node.go b/cluster/node.go index ecb3e154..e519f9e5 100644 --- a/cluster/node.go +++ b/cluster/node.go @@ -16,7 +16,21 @@ import ( "github.com/datarhei/core/v16/client" ) +type Node interface { + Connect() error + Disconnect() + + Start(updates chan<- NodeState) error + Stop() + + GetURL(path string) (string, error) + GetFile(path string) (io.ReadCloser, error) + + NodeReader +} + type NodeReader interface { + ID() string Address() string IPs() []string State() NodeState @@ -26,12 +40,9 @@ type NodeState struct { ID string State string Files []string + LastPing time.Time LastUpdate time.Time -} - -type NodeSpecs struct { - ID string - Address string + Latency time.Duration } type nodeState string @@ -46,18 +57,24 @@ const ( ) type node struct { - address string - ips []string - state nodeState - username string - password string - updates chan<- NodeState + address string + ips []string + peer client.RestClient - filesList []string - lastUpdate time.Time - lock sync.RWMutex - cancel context.CancelFunc - once sync.Once + peerLock sync.RWMutex + lastPing time.Time + cancelPing context.CancelFunc + + state nodeState + latency float64 // Seconds + stateLock sync.RWMutex + updates chan<- NodeState + filesList []string + lastUpdate time.Time + cancelFiles context.CancelFunc + + runningLock sync.Mutex + running bool host string secure bool @@ -72,56 +89,54 @@ type node struct { prefix *regexp.Regexp } -func newNode(address string, updates chan<- NodeState) (*node, error) { - u, err := url.Parse(address) +func NewNode(address string) Node { + n := &node{ + address: address, + state: stateDisconnected, + prefix: regexp.MustCompile(`^[a-z]+:`), + secure: strings.HasPrefix(address, "https://"), + } + + return n +} + +func (n *node) Connect() error { + n.peerLock.Lock() + defer n.peerLock.Unlock() + + if n.peer != nil { + return nil + } + + u, err := url.Parse(n.address) if err != nil { - return nil, fmt.Errorf("invalid address: %w", err) + return fmt.Errorf("invalid address: %w", err) } host, _, err := net.SplitHostPort(u.Host) if err != nil { - return nil, fmt.Errorf("invalid address: %w", err) + return fmt.Errorf("invalid address: %w", err) } addrs, err := net.LookupHost(host) if err != nil { - return nil, fmt.Errorf("lookup failed: %w", err) - } - - username := u.User.Username() - password := "" - if pw, ok := u.User.Password(); ok { - password = pw - } - - n := &node{ - address: address, - ips: addrs, - username: username, - password: password, - state: stateDisconnected, - updates: updates, - prefix: regexp.MustCompile(`^[a-z]+:`), - host: host, - secure: strings.HasPrefix(address, "https://"), + return fmt.Errorf("lookup failed: %w", err) } peer, err := client.New(client.Config{ - Address: address, - Username: username, - Password: password, + Address: n.address, Auth0Token: "", Client: &http.Client{ Timeout: 5 * time.Second, }, }) if err != nil { - return nil, err + return fmt.Errorf("creating client failed (%s): %w", n.address, err) } config, err := peer.Config() if err != nil { - return nil, err + return err } if config.Config.RTMP.Enable { @@ -159,10 +174,67 @@ func newNode(address string, updates chan<- NodeState) (*node, error) { } } + n.ips = addrs + n.host = host + n.peer = peer ctx, cancel := context.WithCancel(context.Background()) - n.cancel = cancel + n.cancelPing = cancel + + go func(ctx context.Context) { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + // ping + ok, latency := n.peer.Ping() + + n.stateLock.Lock() + if !ok { + n.state = stateDisconnected + } else { + n.lastPing = time.Now() + n.state = stateConnected + } + n.latency = n.latency*0.2 + latency.Seconds()*0.8 + n.stateLock.Unlock() + case <-ctx.Done(): + return + } + } + }(ctx) + + return nil +} + +func (n *node) Disconnect() { + n.peerLock.Lock() + defer n.peerLock.Unlock() + + if n.cancelPing != nil { + n.cancelPing() + n.cancelPing = nil + } + + n.peer = nil +} + +func (n *node) Start(updates chan<- NodeState) error { + n.runningLock.Lock() + defer n.runningLock.Unlock() + + if n.running { + return nil + } + + n.running = true + n.updates = updates + + ctx, cancel := context.WithCancel(context.Background()) + n.cancelFiles = cancel go func(ctx context.Context) { ticker := time.NewTicker(time.Second) @@ -183,7 +255,20 @@ func newNode(address string, updates chan<- NodeState) (*node, error) { } }(ctx) - return n, nil + return nil +} + +func (n *node) Stop() { + n.runningLock.Lock() + defer n.runningLock.Unlock() + + if !n.running { + return + } + + n.running = false + + n.cancelFiles() } func (n *node) Address() string { @@ -199,12 +284,14 @@ func (n *node) ID() string { } func (n *node) State() NodeState { - n.lock.RLock() - defer n.lock.RUnlock() + n.stateLock.RLock() + defer n.stateLock.RUnlock() state := NodeState{ ID: n.peer.ID(), + LastPing: n.lastPing, LastUpdate: n.lastUpdate, + Latency: time.Duration(n.latency * float64(time.Second)), } if n.state == stateDisconnected || time.Since(n.lastUpdate) > 2*time.Second { @@ -218,10 +305,6 @@ func (n *node) State() NodeState { return state } -func (n *node) stop() { - n.once.Do(func() { n.cancel() }) -} - func (n *node) files() { filesChan := make(chan string, 1024) filesList := []string{} @@ -247,6 +330,9 @@ func (n *node) files() { go func(f chan<- string) { defer wg.Done() + n.peerLock.RLock() + defer n.peerLock.RUnlock() + files, err := n.peer.MemFSList("name", "asc") if err != nil { return @@ -260,6 +346,9 @@ func (n *node) files() { go func(f chan<- string) { defer wg.Done() + n.peerLock.RLock() + defer n.peerLock.RUnlock() + files, err := n.peer.DiskFSList("name", "asc") if err != nil { return @@ -276,6 +365,9 @@ func (n *node) files() { go func(f chan<- string) { defer wg.Done() + n.peerLock.RLock() + defer n.peerLock.RUnlock() + files, err := n.peer.RTMPChannels() if err != nil { return @@ -293,6 +385,9 @@ func (n *node) files() { go func(f chan<- string) { defer wg.Done() + n.peerLock.RLock() + defer n.peerLock.RUnlock() + files, err := n.peer.SRTChannels() if err != nil { return @@ -310,17 +405,16 @@ func (n *node) files() { wgList.Wait() - n.lock.Lock() + n.stateLock.Lock() n.filesList = make([]string, len(filesList)) copy(n.filesList, filesList) n.lastUpdate = time.Now() - n.state = stateConnected - n.lock.Unlock() + n.stateLock.Unlock() } -func (n *node) getURL(path string) (string, error) { +func (n *node) GetURL(path string) (string, error) { // Remove prefix from path prefix := n.prefix.FindString(path) path = n.prefix.ReplaceAllString(path, "") @@ -353,11 +447,14 @@ func (n *node) getURL(path string) (string, error) { return u, nil } -func (n *node) getFile(path string) (io.ReadCloser, error) { +func (n *node) GetFile(path string) (io.ReadCloser, error) { // Remove prefix from path prefix := n.prefix.FindString(path) path = n.prefix.ReplaceAllString(path, "") + n.peerLock.RLock() + defer n.peerLock.RUnlock() + if prefix == "mem:" { return n.peer.MemFSGetFile(path) } else if prefix == "disk:" { diff --git a/cluster/proxy.go b/cluster/proxy.go index 28e1a2aa..05c2041e 100644 --- a/cluster/proxy.go +++ b/cluster/proxy.go @@ -15,8 +15,15 @@ type Proxy interface { Start() Stop() - AddNode(address string) (string, error) + AddNode(id string, node Node) (string, error) RemoveNode(id string) error + + ProxyReader + + Reader() ProxyReader +} + +type ProxyReader interface { ListNodes() []NodeReader GetNode(id string) (NodeReader, error) @@ -24,6 +31,46 @@ type Proxy interface { GetFile(path string) (io.ReadCloser, error) } +func NewNullProxyReader() ProxyReader { + return &proxyReader{} +} + +type proxyReader struct { + proxy *proxy +} + +func (p *proxyReader) ListNodes() []NodeReader { + if p.proxy == nil { + return nil + } + + return p.proxy.ListNodes() +} + +func (p *proxyReader) GetNode(id string) (NodeReader, error) { + if p.proxy == nil { + return nil, fmt.Errorf("no proxy provided") + } + + return p.proxy.GetNode(id) +} + +func (p *proxyReader) GetURL(path string) (string, error) { + if p.proxy == nil { + return "", fmt.Errorf("no proxy provided") + } + + return p.proxy.GetURL(path) +} + +func (p *proxyReader) GetFile(path string) (io.ReadCloser, error) { + if p.proxy == nil { + return nil, fmt.Errorf("no proxy provided") + } + + return p.proxy.GetFile(path) +} + type ProxyConfig struct { ID string // ID of the node Name string // Name of the node @@ -36,7 +83,7 @@ type proxy struct { id string name string - nodes map[string]*node // List of known nodes + nodes map[string]Node // List of known nodes idfiles map[string][]string // Map from nodeid to list of files idupdate map[string]time.Time // Map from nodeid to time of last update fileid map[string]string // Map from file name to nodeid @@ -57,7 +104,7 @@ func NewProxy(config ProxyConfig) (Proxy, error) { p := &proxy{ id: config.ID, name: config.Name, - nodes: map[string]*node{}, + nodes: map[string]Node{}, idfiles: map[string][]string{}, idupdate: map[string]time.Time{}, fileid: map[string]string{}, @@ -141,30 +188,45 @@ func (p *proxy) Stop() { p.logger.Debug().Log("stopping proxy") + p.cancel() + p.cancel = nil + for _, node := range p.nodes { - node.stop() + node.Stop() } - p.nodes = map[string]*node{} + p.nodes = map[string]Node{} } -func (p *proxy) AddNode(address string) (string, error) { - node, err := newNode(address, p.updates) - if err != nil { - return "", err +func (p *proxy) Reader() ProxyReader { + return &proxyReader{ + proxy: p, } +} - id := node.ID() - +func (p *proxy) AddNode(id string, node Node) (string, error) { if id == p.id { return "", fmt.Errorf("can't add myself as node or a node with the same ID") } + if id != node.ID() { + return "", fmt.Errorf("the provided (%s) and retrieved (%s) ID's don't match", id, node.ID()) + } + p.lock.Lock() defer p.lock.Unlock() - if _, ok := p.nodes[id]; ok { - node.stop() + if n, ok := p.nodes[id]; ok { + n.Stop() + + delete(p.nodes, id) + + ips := node.IPs() + + for _, ip := range ips { + p.limiter.RemoveBlock(ip) + } + return id, nil } @@ -175,8 +237,10 @@ func (p *proxy) AddNode(address string) (string, error) { p.nodes[id] = node + node.Start(p.updates) + p.logger.Info().WithFields(log.Fields{ - "address": address, + "address": node.Address(), "id": id, }).Log("Added node") @@ -192,7 +256,7 @@ func (p *proxy) RemoveNode(id string) error { return ErrNodeNotFound } - node.stop() + node.Stop() delete(p.nodes, id) @@ -261,7 +325,7 @@ func (c *proxy) GetURL(path string) (string, error) { return "", fmt.Errorf("file not found") } - url, err := node.getURL(path) + url, err := node.GetURL(path) if err != nil { c.logger.Debug().WithField("path", path).Log("Invalid path") return "", fmt.Errorf("file not found") @@ -299,7 +363,7 @@ func (p *proxy) GetFile(path string) (io.ReadCloser, error) { return nil, fmt.Errorf("file not found") } - data, err := node.getFile(path) + data, err := node.GetFile(path) if err != nil { p.logger.Debug().WithField("path", path).Log("Invalid path") return nil, fmt.Errorf("file not found") diff --git a/cluster/store.go b/cluster/store.go index 6322bd40..441f92c1 100644 --- a/cluster/store.go +++ b/cluster/store.go @@ -12,8 +12,8 @@ import ( type Store interface { raft.FSM - ListNodes() []string - GetNode(id string) string + ListNodes() []StoreNode + GetNode(id string) (StoreNode, error) } type operation string @@ -39,6 +39,11 @@ type removeNodeCommand struct { ID string } +type StoreNode struct { + ID string + Address string +} + type addProcessCommand struct { Config []byte } @@ -130,12 +135,35 @@ func (s *store) Restore(snapshot io.ReadCloser) error { return nil } -func (s *store) ListNodes() []string { - return nil +func (s *store) ListNodes() []StoreNode { + nodes := []StoreNode{} + + s.lock.Lock() + defer s.lock.Unlock() + + for id, address := range s.Nodes { + nodes = append(nodes, StoreNode{ + ID: id, + Address: address, + }) + } + + return nodes } -func (s *store) GetNode(id string) string { - return "" +func (s *store) GetNode(id string) (StoreNode, error) { + s.lock.Lock() + defer s.lock.Unlock() + + address, ok := s.Nodes[id] + if !ok { + return StoreNode{}, fmt.Errorf("not found") + } + + return StoreNode{ + ID: id, + Address: address, + }, nil } type fsmSnapshot struct { diff --git a/docs/docs.go b/docs/docs.go index 4f0a7414..cd5177f8 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -221,6 +221,35 @@ const docTemplate = `{ ], "summary": "List of nodes in the cluster", "operationId": "cluster-3-get-cluster", + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/api.ClusterAbout" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/api.Error" + } + } + } + } + }, + "/api/v3/cluster/proxy": { + "get": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "List of proxy nodes in the cluster", + "produces": [ + "application/json" + ], + "summary": "List of proxy nodes in the cluster", + "operationId": "cluster-3-get-proxy-nodes", "responses": { "200": { "description": "OK", @@ -240,62 +269,19 @@ const docTemplate = `{ } } }, - "/api/v3/cluster/node": { - "post": { - "security": [ - { - "ApiKeyAuth": [] - } - ], - "description": "Add a new node to the cluster", - "consumes": [ - "application/json" - ], - "produces": [ - "application/json" - ], - "summary": "Add a new node", - "operationId": "cluster-3-add-node", - "parameters": [ - { - "description": "Node config", - "name": "config", - "in": "body", - "required": true, - "schema": { - "$ref": "#/definitions/api.ClusterNodeConfig" - } - } - ], - "responses": { - "200": { - "description": "OK", - "schema": { - "type": "string" - } - }, - "400": { - "description": "Bad Request", - "schema": { - "$ref": "#/definitions/api.Error" - } - } - } - } - }, - "/api/v3/cluster/node/{id}": { + "/api/v3/cluster/proxy/node/{id}": { "get": { "security": [ { "ApiKeyAuth": [] } ], - "description": "List a node by its ID", + "description": "List a proxy node by its ID", "produces": [ "application/json" ], - "summary": "List a node by its ID", - "operationId": "cluster-3-get-node", + "summary": "List a proxy node by its ID", + "operationId": "cluster-3-get-proxy-node", "parameters": [ { "type": "string", @@ -319,117 +305,21 @@ const docTemplate = `{ } } } - }, - "put": { - "security": [ - { - "ApiKeyAuth": [] - } - ], - "description": "Replaces an existing node and returns the new node ID", - "consumes": [ - "application/json" - ], - "produces": [ - "application/json" - ], - "summary": "Replaces an existing node", - "operationId": "cluster-3-update-node", - "parameters": [ - { - "type": "string", - "description": "Node ID", - "name": "id", - "in": "path", - "required": true - }, - { - "description": "Node config", - "name": "config", - "in": "body", - "required": true, - "schema": { - "$ref": "#/definitions/api.ClusterNodeConfig" - } - } - ], - "responses": { - "200": { - "description": "OK", - "schema": { - "type": "string" - } - }, - "400": { - "description": "Bad Request", - "schema": { - "$ref": "#/definitions/api.Error" - } - }, - "404": { - "description": "Not Found", - "schema": { - "$ref": "#/definitions/api.Error" - } - } - } - }, - "delete": { - "security": [ - { - "ApiKeyAuth": [] - } - ], - "description": "Delete a node by its ID", - "produces": [ - "application/json" - ], - "summary": "Delete a node by its ID", - "operationId": "cluster-3-delete-node", - "parameters": [ - { - "type": "string", - "description": "Node ID", - "name": "id", - "in": "path", - "required": true - } - ], - "responses": { - "200": { - "description": "OK", - "schema": { - "type": "string" - } - }, - "404": { - "description": "Not Found", - "schema": { - "$ref": "#/definitions/api.Error" - } - }, - "500": { - "description": "Internal Server Error", - "schema": { - "$ref": "#/definitions/api.Error" - } - } - } } }, - "/api/v3/cluster/node/{id}/proxy": { + "/api/v3/cluster/proxy/node/{id}/files": { "get": { "security": [ { "ApiKeyAuth": [] } ], - "description": "List the files of a node by its ID", + "description": "List the files of a proxy node by its ID", "produces": [ "application/json" ], - "summary": "List the files of a node by its ID", - "operationId": "cluster-3-get-node-proxy", + "summary": "List the files of a proxy node by its ID", + "operationId": "cluster-3-get-proxy-node-files", "parameters": [ { "type": "string", @@ -2342,6 +2232,32 @@ const docTemplate = `{ } } }, + "api.ClusterAbout": { + "type": "object", + "properties": { + "address": { + "type": "string" + }, + "cluster_api_address": { + "type": "string" + }, + "core_api_address": { + "type": "string" + }, + "id": { + "type": "string" + }, + "nodes": { + "type": "array", + "items": { + "$ref": "#/definitions/api.ClusterServer" + } + }, + "stats": { + "$ref": "#/definitions/api.ClusterStats" + } + } + }, "api.ClusterNode": { "type": "object", "properties": { @@ -2351,28 +2267,21 @@ const docTemplate = `{ "id": { "type": "string" }, + "last_ping": { + "type": "integer" + }, "last_update": { "type": "integer" }, + "latency_ms": { + "description": "milliseconds", + "type": "number" + }, "state": { "type": "string" } } }, - "api.ClusterNodeConfig": { - "type": "object", - "properties": { - "address": { - "type": "string" - }, - "password": { - "type": "string" - }, - "username": { - "type": "string" - } - } - }, "api.ClusterNodeFiles": { "type": "object", "additionalProperties": { @@ -2382,6 +2291,37 @@ const docTemplate = `{ } } }, + "api.ClusterServer": { + "type": "object", + "properties": { + "address": { + "type": "string" + }, + "id": { + "type": "string" + }, + "leader": { + "type": "boolean" + }, + "voter": { + "type": "boolean" + } + } + }, + "api.ClusterStats": { + "type": "object", + "properties": { + "last_contact_ms": { + "type": "number" + }, + "num_peers": { + "type": "integer" + }, + "state": { + "type": "string" + } + } + }, "api.Command": { "type": "object", "required": [ @@ -2491,6 +2431,32 @@ const docTemplate = `{ } } }, + "cluster": { + "type": "object", + "properties": { + "address": { + "type": "string" + }, + "bootstrap": { + "type": "boolean" + }, + "debug": { + "type": "boolean" + }, + "enable": { + "type": "boolean" + }, + "peers": { + "type": "array", + "items": { + "type": "string" + } + }, + "recover": { + "type": "boolean" + } + } + }, "created_at": { "description": "When this config has been persisted", "type": "string" @@ -3334,6 +3300,10 @@ const docTemplate = `{ }, "type": { "type": "string" + }, + "updated_at": { + "type": "integer", + "format": "int64" } } }, @@ -3648,18 +3618,7 @@ const docTemplate = `{ "format": "uint64" }, "framerate": { - "type": "object", - "properties": { - "avg": { - "type": "number" - }, - "max": { - "type": "number" - }, - "min": { - "type": "number" - } - } + "$ref": "#/definitions/api.ProgressIOFramerate" }, "height": { "type": "integer", @@ -3717,6 +3676,20 @@ const docTemplate = `{ } } }, + "api.ProgressIOFramerate": { + "type": "object", + "properties": { + "avg": { + "type": "number" + }, + "max": { + "type": "number" + }, + "min": { + "type": "number" + } + } + }, "api.RTMPChannel": { "type": "object", "properties": { @@ -4293,6 +4266,32 @@ const docTemplate = `{ } } }, + "cluster": { + "type": "object", + "properties": { + "address": { + "type": "string" + }, + "bootstrap": { + "type": "boolean" + }, + "debug": { + "type": "boolean" + }, + "enable": { + "type": "boolean" + }, + "peers": { + "type": "array", + "items": { + "type": "string" + } + }, + "recover": { + "type": "boolean" + } + } + }, "created_at": { "description": "When this config has been persisted", "type": "string" diff --git a/docs/swagger.json b/docs/swagger.json index 4bce2bea..93fe4b68 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -214,6 +214,35 @@ ], "summary": "List of nodes in the cluster", "operationId": "cluster-3-get-cluster", + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/api.ClusterAbout" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/api.Error" + } + } + } + } + }, + "/api/v3/cluster/proxy": { + "get": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "List of proxy nodes in the cluster", + "produces": [ + "application/json" + ], + "summary": "List of proxy nodes in the cluster", + "operationId": "cluster-3-get-proxy-nodes", "responses": { "200": { "description": "OK", @@ -233,62 +262,19 @@ } } }, - "/api/v3/cluster/node": { - "post": { - "security": [ - { - "ApiKeyAuth": [] - } - ], - "description": "Add a new node to the cluster", - "consumes": [ - "application/json" - ], - "produces": [ - "application/json" - ], - "summary": "Add a new node", - "operationId": "cluster-3-add-node", - "parameters": [ - { - "description": "Node config", - "name": "config", - "in": "body", - "required": true, - "schema": { - "$ref": "#/definitions/api.ClusterNodeConfig" - } - } - ], - "responses": { - "200": { - "description": "OK", - "schema": { - "type": "string" - } - }, - "400": { - "description": "Bad Request", - "schema": { - "$ref": "#/definitions/api.Error" - } - } - } - } - }, - "/api/v3/cluster/node/{id}": { + "/api/v3/cluster/proxy/node/{id}": { "get": { "security": [ { "ApiKeyAuth": [] } ], - "description": "List a node by its ID", + "description": "List a proxy node by its ID", "produces": [ "application/json" ], - "summary": "List a node by its ID", - "operationId": "cluster-3-get-node", + "summary": "List a proxy node by its ID", + "operationId": "cluster-3-get-proxy-node", "parameters": [ { "type": "string", @@ -312,117 +298,21 @@ } } } - }, - "put": { - "security": [ - { - "ApiKeyAuth": [] - } - ], - "description": "Replaces an existing node and returns the new node ID", - "consumes": [ - "application/json" - ], - "produces": [ - "application/json" - ], - "summary": "Replaces an existing node", - "operationId": "cluster-3-update-node", - "parameters": [ - { - "type": "string", - "description": "Node ID", - "name": "id", - "in": "path", - "required": true - }, - { - "description": "Node config", - "name": "config", - "in": "body", - "required": true, - "schema": { - "$ref": "#/definitions/api.ClusterNodeConfig" - } - } - ], - "responses": { - "200": { - "description": "OK", - "schema": { - "type": "string" - } - }, - "400": { - "description": "Bad Request", - "schema": { - "$ref": "#/definitions/api.Error" - } - }, - "404": { - "description": "Not Found", - "schema": { - "$ref": "#/definitions/api.Error" - } - } - } - }, - "delete": { - "security": [ - { - "ApiKeyAuth": [] - } - ], - "description": "Delete a node by its ID", - "produces": [ - "application/json" - ], - "summary": "Delete a node by its ID", - "operationId": "cluster-3-delete-node", - "parameters": [ - { - "type": "string", - "description": "Node ID", - "name": "id", - "in": "path", - "required": true - } - ], - "responses": { - "200": { - "description": "OK", - "schema": { - "type": "string" - } - }, - "404": { - "description": "Not Found", - "schema": { - "$ref": "#/definitions/api.Error" - } - }, - "500": { - "description": "Internal Server Error", - "schema": { - "$ref": "#/definitions/api.Error" - } - } - } } }, - "/api/v3/cluster/node/{id}/proxy": { + "/api/v3/cluster/proxy/node/{id}/files": { "get": { "security": [ { "ApiKeyAuth": [] } ], - "description": "List the files of a node by its ID", + "description": "List the files of a proxy node by its ID", "produces": [ "application/json" ], - "summary": "List the files of a node by its ID", - "operationId": "cluster-3-get-node-proxy", + "summary": "List the files of a proxy node by its ID", + "operationId": "cluster-3-get-proxy-node-files", "parameters": [ { "type": "string", @@ -2335,6 +2225,32 @@ } } }, + "api.ClusterAbout": { + "type": "object", + "properties": { + "address": { + "type": "string" + }, + "cluster_api_address": { + "type": "string" + }, + "core_api_address": { + "type": "string" + }, + "id": { + "type": "string" + }, + "nodes": { + "type": "array", + "items": { + "$ref": "#/definitions/api.ClusterServer" + } + }, + "stats": { + "$ref": "#/definitions/api.ClusterStats" + } + } + }, "api.ClusterNode": { "type": "object", "properties": { @@ -2344,28 +2260,21 @@ "id": { "type": "string" }, + "last_ping": { + "type": "integer" + }, "last_update": { "type": "integer" }, + "latency_ms": { + "description": "milliseconds", + "type": "number" + }, "state": { "type": "string" } } }, - "api.ClusterNodeConfig": { - "type": "object", - "properties": { - "address": { - "type": "string" - }, - "password": { - "type": "string" - }, - "username": { - "type": "string" - } - } - }, "api.ClusterNodeFiles": { "type": "object", "additionalProperties": { @@ -2375,6 +2284,37 @@ } } }, + "api.ClusterServer": { + "type": "object", + "properties": { + "address": { + "type": "string" + }, + "id": { + "type": "string" + }, + "leader": { + "type": "boolean" + }, + "voter": { + "type": "boolean" + } + } + }, + "api.ClusterStats": { + "type": "object", + "properties": { + "last_contact_ms": { + "type": "number" + }, + "num_peers": { + "type": "integer" + }, + "state": { + "type": "string" + } + } + }, "api.Command": { "type": "object", "required": [ @@ -2484,6 +2424,32 @@ } } }, + "cluster": { + "type": "object", + "properties": { + "address": { + "type": "string" + }, + "bootstrap": { + "type": "boolean" + }, + "debug": { + "type": "boolean" + }, + "enable": { + "type": "boolean" + }, + "peers": { + "type": "array", + "items": { + "type": "string" + } + }, + "recover": { + "type": "boolean" + } + } + }, "created_at": { "description": "When this config has been persisted", "type": "string" @@ -3327,6 +3293,10 @@ }, "type": { "type": "string" + }, + "updated_at": { + "type": "integer", + "format": "int64" } } }, @@ -3641,18 +3611,7 @@ "format": "uint64" }, "framerate": { - "type": "object", - "properties": { - "avg": { - "type": "number" - }, - "max": { - "type": "number" - }, - "min": { - "type": "number" - } - } + "$ref": "#/definitions/api.ProgressIOFramerate" }, "height": { "type": "integer", @@ -3710,6 +3669,20 @@ } } }, + "api.ProgressIOFramerate": { + "type": "object", + "properties": { + "avg": { + "type": "number" + }, + "max": { + "type": "number" + }, + "min": { + "type": "number" + } + } + }, "api.RTMPChannel": { "type": "object", "properties": { @@ -4286,6 +4259,32 @@ } } }, + "cluster": { + "type": "object", + "properties": { + "address": { + "type": "string" + }, + "bootstrap": { + "type": "boolean" + }, + "debug": { + "type": "boolean" + }, + "enable": { + "type": "boolean" + }, + "peers": { + "type": "array", + "items": { + "type": "string" + } + }, + "recover": { + "type": "boolean" + } + } + }, "created_at": { "description": "When this config has been persisted", "type": "string" diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 91e691dd..dde04d36 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -62,32 +62,65 @@ definitions: version: $ref: '#/definitions/api.Version' type: object + api.ClusterAbout: + properties: + address: + type: string + cluster_api_address: + type: string + core_api_address: + type: string + id: + type: string + nodes: + items: + $ref: '#/definitions/api.ClusterServer' + type: array + stats: + $ref: '#/definitions/api.ClusterStats' + type: object api.ClusterNode: properties: address: type: string id: type: string + last_ping: + type: integer last_update: type: integer + latency_ms: + description: milliseconds + type: number state: type: string type: object - api.ClusterNodeConfig: - properties: - address: - type: string - password: - type: string - username: - type: string - type: object api.ClusterNodeFiles: additionalProperties: items: type: string type: array type: object + api.ClusterServer: + properties: + address: + type: string + id: + type: string + leader: + type: boolean + voter: + type: boolean + type: object + api.ClusterStats: + properties: + last_contact_ms: + type: number + num_peers: + type: integer + state: + type: string + type: object api.Command: properties: command: @@ -159,6 +192,23 @@ definitions: read_only: type: boolean type: object + cluster: + properties: + address: + type: string + bootstrap: + type: boolean + debug: + type: boolean + enable: + type: boolean + peers: + items: + type: string + type: array + recover: + type: boolean + type: object created_at: description: When this config has been persisted type: string @@ -727,6 +777,9 @@ definitions: $ref: '#/definitions/api.ProcessState' type: type: string + updated_at: + format: int64 + type: integer type: object api.ProcessConfig: properties: @@ -940,14 +993,7 @@ definitions: format: uint64 type: integer framerate: - properties: - avg: - type: number - max: - type: number - min: - type: number - type: object + $ref: '#/definitions/api.ProgressIOFramerate' height: format: uint64 type: integer @@ -989,6 +1035,15 @@ definitions: format: uint64 type: integer type: object + api.ProgressIOFramerate: + properties: + avg: + type: number + max: + type: number + min: + type: number + type: object api.RTMPChannel: properties: name: @@ -1441,6 +1496,23 @@ definitions: read_only: type: boolean type: object + cluster: + properties: + address: + type: string + bootstrap: + type: boolean + debug: + type: boolean + enable: + type: boolean + peers: + items: + type: string + type: array + recover: + type: boolean + type: object created_at: description: When this config has been persisted type: string @@ -2092,6 +2164,24 @@ paths: operationId: cluster-3-get-cluster produces: - application/json + responses: + "200": + description: OK + schema: + $ref: '#/definitions/api.ClusterAbout' + "404": + description: Not Found + schema: + $ref: '#/definitions/api.Error' + security: + - ApiKeyAuth: [] + summary: List of nodes in the cluster + /api/v3/cluster/proxy: + get: + description: List of proxy nodes in the cluster + operationId: cluster-3-get-proxy-nodes + produces: + - application/json responses: "200": description: OK @@ -2105,65 +2195,11 @@ paths: $ref: '#/definitions/api.Error' security: - ApiKeyAuth: [] - summary: List of nodes in the cluster - /api/v3/cluster/node: - post: - consumes: - - application/json - description: Add a new node to the cluster - operationId: cluster-3-add-node - parameters: - - description: Node config - in: body - name: config - required: true - schema: - $ref: '#/definitions/api.ClusterNodeConfig' - produces: - - application/json - responses: - "200": - description: OK - schema: - type: string - "400": - description: Bad Request - schema: - $ref: '#/definitions/api.Error' - security: - - ApiKeyAuth: [] - summary: Add a new node - /api/v3/cluster/node/{id}: - delete: - description: Delete a node by its ID - operationId: cluster-3-delete-node - parameters: - - description: Node ID - in: path - name: id - required: true - type: string - produces: - - application/json - responses: - "200": - description: OK - schema: - type: string - "404": - description: Not Found - schema: - $ref: '#/definitions/api.Error' - "500": - description: Internal Server Error - schema: - $ref: '#/definitions/api.Error' - security: - - ApiKeyAuth: [] - summary: Delete a node by its ID + summary: List of proxy nodes in the cluster + /api/v3/cluster/proxy/node/{id}: get: - description: List a node by its ID - operationId: cluster-3-get-node + description: List a proxy node by its ID + operationId: cluster-3-get-proxy-node parameters: - description: Node ID in: path @@ -2183,46 +2219,11 @@ paths: $ref: '#/definitions/api.Error' security: - ApiKeyAuth: [] - summary: List a node by its ID - put: - consumes: - - application/json - description: Replaces an existing node and returns the new node ID - operationId: cluster-3-update-node - parameters: - - description: Node ID - in: path - name: id - required: true - type: string - - description: Node config - in: body - name: config - required: true - schema: - $ref: '#/definitions/api.ClusterNodeConfig' - produces: - - application/json - responses: - "200": - description: OK - schema: - type: string - "400": - description: Bad Request - schema: - $ref: '#/definitions/api.Error' - "404": - description: Not Found - schema: - $ref: '#/definitions/api.Error' - security: - - ApiKeyAuth: [] - summary: Replaces an existing node - /api/v3/cluster/node/{id}/proxy: + summary: List a proxy node by its ID + /api/v3/cluster/proxy/node/{id}/files: get: - description: List the files of a node by its ID - operationId: cluster-3-get-node-proxy + description: List the files of a proxy node by its ID + operationId: cluster-3-get-proxy-node-files parameters: - description: Node ID in: path @@ -2242,7 +2243,7 @@ paths: $ref: '#/definitions/api.Error' security: - ApiKeyAuth: [] - summary: List the files of a node by its ID + summary: List the files of a proxy node by its ID /api/v3/config: get: description: Retrieve the currently active Restreamer configuration diff --git a/http/api/cluster.go b/http/api/cluster.go index 770ef763..2dedb103 100644 --- a/http/api/cluster.go +++ b/http/api/cluster.go @@ -7,10 +7,34 @@ type ClusterNodeConfig struct { } type ClusterNode struct { - Address string `json:"address"` - ID string `json:"id"` - LastUpdate int64 `json:"last_update"` - State string `json:"state"` + Address string `json:"address"` + ID string `json:"id"` + LastPing int64 `json:"last_ping"` + LastUpdate int64 `json:"last_update"` + Latency float64 `json:"latency_ms"` // milliseconds + State string `json:"state"` } type ClusterNodeFiles map[string][]string + +type ClusterServer struct { + ID string `json:"id"` + Address string `json:"address"` + Voter bool `json:"voter"` + Leader bool `json:"leader"` +} + +type ClusterStats struct { + State string `json:"state"` + LastContact float64 `json:"last_contact_ms"` + NumPeers uint64 `json:"num_peers"` +} + +type ClusterAbout struct { + ID string `json:"id"` + Address string `json:"address"` + ClusterAPIAddress string `json:"cluster_api_address"` + CoreAPIAddress string `json:"core_api_address"` + Nodes []ClusterServer `json:"nodes"` + Stats ClusterStats `json:"stats"` +} diff --git a/http/fs/cluster.go b/http/fs/cluster.go index 309f8ca2..bb79cb21 100644 --- a/http/fs/cluster.go +++ b/http/fs/cluster.go @@ -16,19 +16,19 @@ type Filesystem interface { type filesystem struct { fs.Filesystem - name string - cluster cluster.ClusterReader + name string + proxy cluster.ProxyReader } -func NewClusterFS(name string, fs fs.Filesystem, cluster cluster.Cluster) Filesystem { - if cluster == nil { +func NewClusterFS(name string, fs fs.Filesystem, proxy cluster.ProxyReader) Filesystem { + if proxy == nil { return fs } f := &filesystem{ Filesystem: fs, name: name, - cluster: cluster, + proxy: proxy, } return f @@ -41,7 +41,7 @@ func (fs *filesystem) Open(path string) fs.File { } // Check if the file is available in the cluster - data, err := fs.cluster.GetFile(fs.name + ":" + path) + data, err := fs.proxy.GetFile(fs.name + ":" + path) if err != nil { return nil } diff --git a/http/handler/api/cluster.go b/http/handler/api/cluster.go index 964484c8..d7ad8b1a 100644 --- a/http/handler/api/cluster.go +++ b/http/handler/api/cluster.go @@ -1,37 +1,44 @@ package api import ( + "net/http" "regexp" + "sort" + "strings" "github.com/datarhei/core/v16/cluster" + "github.com/datarhei/core/v16/http/api" + "github.com/datarhei/core/v16/http/handler/util" + "github.com/labstack/echo/v4" ) // The ClusterHandler type provides handler functions for manipulating the cluster config. type ClusterHandler struct { cluster cluster.Cluster + proxy cluster.ProxyReader prefix *regexp.Regexp } -/* // NewCluster return a new ClusterHandler type. You have to provide a cluster. func NewCluster(cluster cluster.Cluster) *ClusterHandler { return &ClusterHandler{ cluster: cluster, + proxy: cluster.ProxyReader(), prefix: regexp.MustCompile(`^[a-z]+:`), } } -// GetCluster returns the list of nodes in the cluster -// @Summary List of nodes in the cluster -// @Description List of nodes in the cluster -// @ID cluster-3-get-cluster +// GetProxyNodes returns the list of proxy nodes in the cluster +// @Summary List of proxy nodes in the cluster +// @Description List of proxy nodes in the cluster +// @ID cluster-3-get-proxy-nodes // @Produce json // @Success 200 {array} api.ClusterNode // @Failure 404 {object} api.Error // @Security ApiKeyAuth -// @Router /api/v3/cluster [get] -func (h *ClusterHandler) GetCluster(c echo.Context) error { - nodes := h.cluster.ListNodesX() +// @Router /api/v3/cluster/proxy [get] +func (h *ClusterHandler) GetProxyNodes(c echo.Context) error { + nodes := h.proxy.ListNodes() list := []api.ClusterNode{} @@ -40,7 +47,9 @@ func (h *ClusterHandler) GetCluster(c echo.Context) error { n := api.ClusterNode{ Address: node.Address(), ID: state.ID, + LastPing: state.LastPing.Unix(), LastUpdate: state.LastUpdate.Unix(), + Latency: state.Latency.Seconds() * 1000, State: state.State, } @@ -50,6 +59,108 @@ func (h *ClusterHandler) GetCluster(c echo.Context) error { return c.JSON(http.StatusOK, list) } +// GetProxyNode returns the proxy node with the given ID +// @Summary List a proxy node by its ID +// @Description List a proxy node by its ID +// @ID cluster-3-get-proxy-node +// @Produce json +// @Param id path string true "Node ID" +// @Success 200 {object} api.ClusterNode +// @Failure 404 {object} api.Error +// @Security ApiKeyAuth +// @Router /api/v3/cluster/proxy/node/{id} [get] +func (h *ClusterHandler) GetProxyNode(c echo.Context) error { + id := util.PathParam(c, "id") + + peer, err := h.proxy.GetNode(id) + if err != nil { + return api.Err(http.StatusNotFound, "Node not found", "%s", err) + } + + state := peer.State() + + node := api.ClusterNode{ + Address: peer.Address(), + ID: state.ID, + LastUpdate: state.LastUpdate.Unix(), + State: state.State, + } + + return c.JSON(http.StatusOK, node) +} + +// GetProxyNodeFiles returns the files from the proxy node with the given ID +// @Summary List the files of a proxy node by its ID +// @Description List the files of a proxy node by its ID +// @ID cluster-3-get-proxy-node-files +// @Produce json +// @Param id path string true "Node ID" +// @Success 200 {object} api.ClusterNodeFiles +// @Failure 404 {object} api.Error +// @Security ApiKeyAuth +// @Router /api/v3/cluster/proxy/node/{id}/files [get] +func (h *ClusterHandler) GetProxyNodeFiles(c echo.Context) error { + id := util.PathParam(c, "id") + + peer, err := h.proxy.GetNode(id) + if err != nil { + return api.Err(http.StatusNotFound, "Node not found", "%s", err) + } + + files := api.ClusterNodeFiles{} + + state := peer.State() + + sort.Strings(state.Files) + + for _, path := range state.Files { + prefix := strings.TrimSuffix(h.prefix.FindString(path), ":") + path = h.prefix.ReplaceAllString(path, "") + + files[prefix] = append(files[prefix], path) + } + + return c.JSON(http.StatusOK, files) +} + +// GetCluster returns the list of nodes in the cluster +// @Summary List of nodes in the cluster +// @Description List of nodes in the cluster +// @ID cluster-3-get-cluster +// @Produce json +// @Success 200 {object} api.ClusterAbout +// @Failure 404 {object} api.Error +// @Security ApiKeyAuth +// @Router /api/v3/cluster [get] +func (h *ClusterHandler) About(c echo.Context) error { + state, _ := h.cluster.About() + + about := api.ClusterAbout{ + ID: state.ID, + Address: state.Address, + ClusterAPIAddress: state.ClusterAPIAddress, + CoreAPIAddress: state.CoreAPIAddress, + Nodes: []api.ClusterServer{}, + Stats: api.ClusterStats{ + State: state.Stats.State, + LastContact: state.Stats.LastContact.Seconds() * 1000, + NumPeers: state.Stats.NumPeers, + }, + } + + for _, n := range state.Nodes { + about.Nodes = append(about.Nodes, api.ClusterServer{ + ID: n.ID, + Address: n.Address, + Voter: n.Voter, + Leader: n.Leader, + }) + } + + return c.JSON(http.StatusOK, about) +} + +/* // AddNode adds a new node // @Summary Add a new node // @Description Add a new node to the cluster diff --git a/http/server.go b/http/server.go index 05c3aaf0..0a3970b0 100644 --- a/http/server.go +++ b/http/server.go @@ -195,7 +195,7 @@ func NewServer(config Config) (Server, error) { corsPrefixes[httpfs.Mountpoint] = config.Cors.Origins if httpfs.Filesystem.Type() == "disk" || httpfs.Filesystem.Type() == "mem" { - httpfs.Filesystem = fs.NewClusterFS(httpfs.Filesystem.Name(), httpfs.Filesystem, config.Cluster) + httpfs.Filesystem = fs.NewClusterFS(httpfs.Filesystem.Name(), httpfs.Filesystem, config.Cluster.ProxyReader()) } filesystem := &filesystem{ @@ -314,7 +314,7 @@ func NewServer(config Config) (Server, error) { }) if config.Cluster != nil { - //s.v3handler.cluster = api.NewCluster(config.Cluster) + s.v3handler.cluster = api.NewCluster(config.Cluster) } if middleware, err := mwcors.NewWithConfig(mwcors.Config{ @@ -656,19 +656,20 @@ func (s *server) setRoutesV3(v3 *echo.Group) { } // v3 Cluster - /* - if s.v3handler.cluster != nil { - v3.GET("/cluster", s.v3handler.cluster.GetCluster) - v3.GET("/cluster/node/:id", s.v3handler.cluster.GetNode) - v3.GET("/cluster/node/:id/proxy", s.v3handler.cluster.GetNodeProxy) + if s.v3handler.cluster != nil { + v3.GET("/cluster", s.v3handler.cluster.About) + v3.GET("/cluster/proxy", s.v3handler.cluster.GetProxyNodes) + v3.GET("/cluster/proxy/node/:id", s.v3handler.cluster.GetProxyNode) + v3.GET("/cluster/proxy/node/:id/files", s.v3handler.cluster.GetProxyNodeFiles) + /* if !s.readOnly { v3.POST("/cluster/node", s.v3handler.cluster.AddNode) v3.PUT("/cluster/node/:id", s.v3handler.cluster.UpdateNode) v3.DELETE("/cluster/node/:id", s.v3handler.cluster.DeleteNode) } - } - */ + */ + } // v3 Log v3.GET("/log", s.v3handler.log.Log) diff --git a/rtmp/rtmp.go b/rtmp/rtmp.go index f7ff12bc..a3e0bc38 100644 --- a/rtmp/rtmp.go +++ b/rtmp/rtmp.go @@ -58,7 +58,7 @@ type Config struct { // with methods like tls.Config.SetSessionTicketKeys. TLSConfig *tls.Config - Cluster cluster.ClusterReader + Proxy cluster.ProxyReader } // Server represents a RTMP server @@ -93,7 +93,7 @@ type server struct { channels map[string]*channel lock sync.RWMutex - cluster cluster.ClusterReader + proxy cluster.ProxyReader } // New creates a new RTMP server according to the given config @@ -111,15 +111,15 @@ func New(config Config) (Server, error) { token: config.Token, logger: config.Logger, collector: config.Collector, - cluster: config.Cluster, + proxy: config.Proxy, } if s.collector == nil { s.collector = session.NewNullCollector() } - if s.cluster == nil { - s.cluster = cluster.NewDummyClusterReader() + if s.proxy == nil { + s.proxy = cluster.NewNullProxyReader() } s.server = &rtmp.Server{ @@ -254,7 +254,7 @@ func (s *server) handlePlay(conn *rtmp.Conn) { if ch == nil { // Check in the cluster for that stream - url, err := s.cluster.GetURL("rtmp:" + conn.URL.Path) + url, err := s.proxy.GetURL("rtmp:" + conn.URL.Path) if err != nil { s.log("PLAY", "NOTFOUND", conn.URL.Path, "", client) return diff --git a/srt/srt.go b/srt/srt.go index c9cb6cb2..6d8196cc 100644 --- a/srt/srt.go +++ b/srt/srt.go @@ -40,7 +40,7 @@ type Config struct { SRTLogTopics []string - Cluster cluster.ClusterReader + Proxy cluster.ProxyReader } // Server represents a SRT server @@ -77,7 +77,7 @@ type server struct { srtlog map[string]*ring.Ring // Per logtopic a dedicated ring buffer srtlogLock sync.RWMutex - cluster cluster.ClusterReader + proxy cluster.ProxyReader } func New(config Config) (Server, error) { @@ -87,15 +87,15 @@ func New(config Config) (Server, error) { passphrase: config.Passphrase, collector: config.Collector, logger: config.Logger, - cluster: config.Cluster, + proxy: config.Proxy, } if s.collector == nil { s.collector = session.NewNullCollector() } - if s.cluster == nil { - s.cluster = cluster.NewDummyClusterReader() + if s.proxy == nil { + s.proxy = cluster.NewNullProxyReader() } if s.logger == nil { @@ -438,7 +438,7 @@ func (s *server) handleSubscribe(conn srt.Conn) { if ch == nil { // Check in the cluster for the stream and proxy it - srturl, err := s.cluster.GetURL("srt:" + si.resource) + srturl, err := s.proxy.GetURL("srt:" + si.resource) if err != nil { s.log("SUBSCRIBE", "NOTFOUND", si.resource, "no publisher for this resource found", client) return