Remove node storage, use raft configuration instead; re-establish file and stream proxying

This commit is contained in:
Ingo Oppermann 2023-05-04 19:49:53 +02:00
parent d201921a33
commit 7f59c188cf
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
17 changed files with 1353 additions and 762 deletions

View File

@ -948,7 +948,7 @@ func (a *api) start() error {
Token: cfg.RTMP.Token,
Logger: a.log.logger.rtmp,
Collector: a.sessions.Collector("rtmp"),
Cluster: a.cluster,
Proxy: a.cluster.ProxyReader(),
}
if cfg.RTMP.EnableTLS {
@ -977,7 +977,7 @@ func (a *api) start() error {
Token: cfg.SRT.Token,
Logger: a.log.logger.core.WithComponent("SRT").WithField("address", cfg.SRT.Address),
Collector: a.sessions.Collector("srt"),
Cluster: a.cluster,
Proxy: a.cluster.ProxyReader(),
}
if cfg.SRT.Log.Enable {

View File

@ -6,6 +6,7 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
@ -32,7 +33,8 @@ type RestClient interface {
// Address returns the address of the connected datarhei Core
Address() string
About() api.About // GET /
About() api.About // GET /
Ping() (bool, time.Duration) // GET /ping
Config() (api.Config, error) // GET /config
ConfigSet(config api.ConfigData) error // POST /config
@ -120,6 +122,27 @@ func New(config Config) (RestClient, error) {
client: config.Client,
}
u, err := url.Parse(r.address)
if err != nil {
return nil, err
}
username := u.User.Username()
if len(username) != 0 {
r.username = username
}
if password, ok := u.User.Password(); ok {
r.password = password
}
u.User = nil
u.RawQuery = ""
u.Fragment = ""
r.address = u.String()
fmt.Printf("address: %s\n", r.address)
if r.client == nil {
r.client = &http.Client{
Timeout: 15 * time.Second,
@ -137,6 +160,12 @@ func New(config Config) (RestClient, error) {
return nil, fmt.Errorf("didn't receive the expected API response (got: %s, want: %s)", r.about.Name, coreapp)
}
if len(r.about.ID) == 0 {
if err := r.login(); err != nil {
return nil, err
}
}
c, _ := semver.NewConstraint(coreversion)
v, err := semver.NewVersion(r.about.Version.Number)
if err != nil {
@ -147,12 +176,6 @@ func New(config Config) (RestClient, error) {
return nil, fmt.Errorf("the core version (%s) is not supported (%s)", r.about.Version.Number, coreversion)
}
if len(r.about.ID) == 0 {
if err := r.login(); err != nil {
return nil, err
}
}
return r, nil
}
@ -172,6 +195,28 @@ func (r *restclient) About() api.About {
return r.about
}
func (r *restclient) Ping() (bool, time.Duration) {
req, err := http.NewRequest(http.MethodGet, r.address+"/ping", nil)
if err != nil {
return false, time.Duration(0)
}
start := time.Now()
status, body, err := r.request(req)
if err != nil {
return false, time.Since(start)
}
defer body.Close()
if status != 200 {
return false, time.Since(start)
}
return true, time.Since(start)
}
func (r *restclient) login() error {
login := api.Login{}

View File

@ -3,8 +3,12 @@ package cluster
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"
httpapi "github.com/datarhei/core/v16/http/api"
"github.com/datarhei/core/v16/http/errorhandler"
@ -60,7 +64,7 @@ func NewAPI(config APIConfig) (API, error) {
a.logger = log.New("")
}
address, err := config.Cluster.APIAddr("")
address, err := config.Cluster.ClusterAPIAddress("")
if err != nil {
return nil, err
}
@ -97,7 +101,7 @@ func NewAPI(config APIConfig) (API, error) {
return httpapi.Err(http.StatusBadRequest, "Invalid JSON", "%s", err)
}
a.logger.Debug().Log("got join request: %+v", r)
a.logger.Debug().WithField("id", r.ID).Log("got join request: %+v", r)
if r.Origin == a.id {
return httpapi.Err(http.StatusLoopDetected, "", "breaking circuit")
@ -105,7 +109,7 @@ func NewAPI(config APIConfig) (API, error) {
err := a.cluster.Join(r.Origin, r.ID, r.RaftAddress, r.APIAddress, "")
if err != nil {
a.logger.Debug().WithError(err).Log("unable to join cluster")
a.logger.Debug().WithError(err).WithField("id", r.ID).Log("unable to join cluster")
return httpapi.Err(http.StatusInternalServerError, "unable to join cluster", "%s", err)
}
@ -119,7 +123,7 @@ func NewAPI(config APIConfig) (API, error) {
return httpapi.Err(http.StatusBadRequest, "Invalid JSON", "%s", err)
}
a.logger.Debug().Log("got leave request: %+v", r)
a.logger.Debug().WithField("id", r.ID).Log("got leave request: %+v", r)
if r.Origin == a.id {
return httpapi.Err(http.StatusLoopDetected, "", "breaking circuit")
@ -127,7 +131,7 @@ func NewAPI(config APIConfig) (API, error) {
err := a.cluster.Leave(r.Origin, r.ID)
if err != nil {
a.logger.Debug().WithError(err).Log("unable to leave cluster")
a.logger.Debug().WithError(err).WithField("id", r.ID).Log("unable to leave cluster")
return httpapi.Err(http.StatusInternalServerError, "unable to leave cluster", "%s", err)
}
@ -141,7 +145,9 @@ func NewAPI(config APIConfig) (API, error) {
return httpapi.Err(http.StatusInternalServerError, "unable to create snapshot", "%s", err)
}
return c.Stream(http.StatusOK, "application/octet-stream", bytes.NewReader(data))
defer data.Close()
return c.Stream(http.StatusOK, "application/octet-stream", data)
})
a.router.POST("/v1/process", func(c echo.Context) error {
@ -152,6 +158,11 @@ func NewAPI(config APIConfig) (API, error) {
return httpapi.Err(http.StatusNotImplemented, "")
})
a.router.GET("/v1/core", func(c echo.Context) error {
address, _ := a.cluster.CoreAPIAddress("")
return c.JSON(http.StatusOK, address)
})
return a, nil
}
@ -163,3 +174,119 @@ func (a *api) Start() error {
func (a *api) Shutdown(ctx context.Context) error {
return a.router.Shutdown(ctx)
}
type APIClient struct {
Address string
Client *http.Client
}
func (c *APIClient) CoreAPIAddress() (string, error) {
data, err := c.call(http.MethodGet, "/core", "", nil)
if err != nil {
return "", err
}
var address string
err = json.Unmarshal(data, &address)
if err != nil {
return "", err
}
return address, nil
}
func (c *APIClient) Join(r JoinRequest) error {
data, err := json.Marshal(&r)
if err != nil {
return err
}
_, err = c.call(http.MethodPost, "/join", "application/json", bytes.NewReader(data))
return err
}
func (c *APIClient) Leave(r LeaveRequest) error {
data, err := json.Marshal(&r)
if err != nil {
return err
}
_, err = c.call(http.MethodPost, "/leave", "application/json", bytes.NewReader(data))
return err
}
func (c *APIClient) Snapshot() (io.ReadCloser, error) {
return c.stream(http.MethodGet, "/snapshot", "", nil)
}
func (c *APIClient) stream(method, path, contentType string, data io.Reader) (io.ReadCloser, error) {
if len(c.Address) == 0 {
return nil, fmt.Errorf("no address defined")
}
address := "http://" + c.Address + "/v1" + path
req, err := http.NewRequest(method, address, data)
if err != nil {
return nil, err
}
if method == "POST" || method == "PUT" {
req.Header.Add("Content-Type", contentType)
}
status, body, err := c.request(req)
if err != nil {
return nil, err
}
if status < 200 || status >= 300 {
e := httpapi.Error{}
defer body.Close()
x, _ := io.ReadAll(body)
json.Unmarshal(x, &e)
return nil, e
}
return body, nil
}
func (c *APIClient) call(method, path, contentType string, data io.Reader) ([]byte, error) {
body, err := c.stream(method, path, contentType, data)
if err != nil {
return nil, err
}
defer body.Close()
x, _ := io.ReadAll(body)
return x, nil
}
func (c *APIClient) request(req *http.Request) (int, io.ReadCloser, error) {
if c.Client == nil {
tr := &http.Transport{
MaxIdleConns: 10,
IdleConnTimeout: 30 * time.Second,
}
c.Client = &http.Client{
Transport: tr,
Timeout: 5 * time.Second,
}
}
resp, err := c.Client.Do(req)
if err != nil {
return -1, nil, err
}
return resp.StatusCode, resp.Body, nil
}

View File

@ -27,14 +27,14 @@ import (
/*
/api/v3:
GET /cluster/db/node - list all nodes that are stored in the FSM - Cluster.Store.ListNodes()
POST /cluster/db/node - add a node to the FSM - Cluster.Store.AddNode()
DELETE /cluster/db/node/:id - remove a node from the FSM - Cluster.Store.RemoveNode()
GET /cluster/store/node - list all nodes that are stored in the FSM - Cluster.Store.ListNodes()
POST /cluster/store/node - add a node to the FSM - Cluster.Store.AddNode()
DELETE /cluster/store/node/:id - remove a node from the FSM - Cluster.Store.RemoveNode()
GET /cluster/db/process - list all process configs that are stored in the FSM - Cluster.Store.ListProcesses()
POST /cluster/db/process - add a process config to the FSM - Cluster.Store.AddProcess()
PUT /cluster/db/process/:id - update a process config in the FSM - Cluster.Store.UpdateProcess()
DELETE /cluster/db/process/:id - remove a process config from the FSM - Cluster.Store.RemoveProcess()
GET /cluster/store/process - list all process configs that are stored in the FSM - Cluster.Store.ListProcesses()
POST /cluster/store/process - add a process config to the FSM - Cluster.Store.AddProcess()
PUT /cluster/store/process/:id - update a process config in the FSM - Cluster.Store.UpdateProcess()
DELETE /cluster/store/process/:id - remove a process config from the FSM - Cluster.Store.RemoveProcess()
** for the processes, the leader will decide where to actually run them. the process configs will
also be added to the regular process DB of each core.
@ -47,32 +47,23 @@ import (
var ErrNodeNotFound = errors.New("node not found")
type ClusterReader interface {
GetURL(path string) (string, error)
GetFile(path string) (io.ReadCloser, error)
}
type dummyClusterReader struct{}
func NewDummyClusterReader() ClusterReader {
return &dummyClusterReader{}
}
func (r *dummyClusterReader) GetURL(path string) (string, error) {
return "", fmt.Errorf("not implemented in dummy cluster")
}
func (r *dummyClusterReader) GetFile(path string) (io.ReadCloser, error) {
return nil, fmt.Errorf("not implemented in dummy cluster")
}
type Cluster interface {
Addr() string
APIAddr(raftAddress string) (string, error)
// Address returns the raft address of this node
Address() string
// ClusterAPIAddress returns the address of the cluster API of a node
// with the given raft address.
ClusterAPIAddress(raftAddress string) (string, error)
// CoreAPIAddress returns the address of the core API of a node with
// the given raft address.
CoreAPIAddress(raftAddress string) (string, error)
About() (ClusterAbout, error)
Join(origin, id, raftAddress, apiAddress, peerAddress string) error
Leave(origin, id string) error // gracefully remove a node from the cluster
Snapshot() ([]byte, error)
Snapshot() (io.ReadCloser, error)
Shutdown() error
@ -81,7 +72,7 @@ type Cluster interface {
ListNodes() []addNodeCommand
GetNode(id string) (addNodeCommand, error)
ClusterReader
ProxyReader() ProxyReader
}
type Peer struct {
@ -143,6 +134,9 @@ type cluster struct {
hasRaftLeader bool
isLeader bool
leaderLock sync.Mutex
nodes map[string]Node
nodesLock sync.RWMutex
}
func New(config ClusterConfig) (Cluster, error) {
@ -158,6 +152,8 @@ func New(config ClusterConfig) (Cluster, error) {
reassertLeaderCh: make(chan chan error),
leaveCh: make(chan struct{}),
shutdownCh: make(chan struct{}),
nodes: map[string]Node{},
}
u, err := url.Parse(config.CoreAPIAddress)
@ -216,6 +212,8 @@ func New(config ClusterConfig) (Cluster, error) {
c.proxy = proxy
go c.trackNodeChanges()
if forwarder, err := NewForwarder(ForwarderConfig{
ID: c.id,
Logger: c.logger.WithField("logname", "forwarder"),
@ -236,7 +234,7 @@ func New(config ClusterConfig) (Cluster, error) {
if len(config.Peers) != 0 {
for i := 0; i < len(config.Peers); i++ {
peerAddress, err := c.APIAddr(config.Peers[i].Address)
peerAddress, err := c.ClusterAPIAddress(config.Peers[i].Address)
if err != nil {
c.shutdownAPI()
c.shutdownRaft()
@ -268,13 +266,13 @@ func New(config ClusterConfig) (Cluster, error) {
return c, nil
}
func (c *cluster) Addr() string {
func (c *cluster) Address() string {
return c.raftAddress
}
func (c *cluster) APIAddr(raftAddress string) (string, error) {
func (c *cluster) ClusterAPIAddress(raftAddress string) (string, error) {
if len(raftAddress) == 0 {
raftAddress = c.raftAddress
raftAddress = c.Address()
}
host, port, _ := gonet.SplitHostPort(raftAddress)
@ -287,6 +285,29 @@ func (c *cluster) APIAddr(raftAddress string) (string, error) {
return gonet.JoinHostPort(host, strconv.Itoa(p+1)), nil
}
func (c *cluster) CoreAPIAddress(raftAddress string) (string, error) {
if len(raftAddress) == 0 {
raftAddress = c.Address()
}
if raftAddress == c.Address() {
return c.coreAddress, nil
}
addr, err := c.ClusterAPIAddress(raftAddress)
if err != nil {
return "", err
}
client := APIClient{
Address: addr,
}
coreAddress, err := client.CoreAPIAddress()
return coreAddress, err
}
func (c *cluster) Shutdown() error {
c.logger.Info().Log("shutting down cluster")
c.shutdownLock.Lock()
@ -299,6 +320,11 @@ func (c *cluster) Shutdown() error {
c.shutdown = true
close(c.shutdownCh)
for id, node := range c.nodes {
node.Disconnect()
c.proxy.RemoveNode(id)
}
if c.proxy != nil {
c.proxy.Stop()
}
@ -437,11 +463,6 @@ func (c *cluster) Leave(origin, id string) error {
return nil
}
err := c.RemoveNode(id)
if err != nil {
c.logger.Error().WithError(err).Log("failed to apply log for removal")
}
// Remove another sever from the cluster
if future := c.raft.RemoveServer(raft.ServerID(id), 0, 0); future.Error() != nil {
c.logger.Error().WithError(future.Error()).WithFields(log.Fields{
@ -468,6 +489,17 @@ func (c *cluster) Join(origin, id, raftAddress, apiAddress, peerAddress string)
}).Log("received join request for remote node")
// connect to the peer's API in order to find out if our version is compatible
address, err := c.CoreAPIAddress(raftAddress)
if err != nil {
return fmt.Errorf("peer API doesn't respond: %w", err)
}
node := NewNode(address)
err = node.Connect()
if err != nil {
return fmt.Errorf("couldn't connect to peer: %w", err)
}
defer node.Disconnect()
configFuture := c.raft.GetConfiguration()
if err := configFuture.Error(); err != nil {
@ -509,21 +541,6 @@ func (c *cluster) Join(origin, id, raftAddress, apiAddress, peerAddress string)
}
}
if err := c.AddNode(id, apiAddress); err != nil {
/*
future := c.raft.RemoveServer(raft.ServerID(id), 0, 0)
if err := future.Error(); err != nil {
c.logger.Error().WithError(err).WithFields(log.Fields{
"nodeid": id,
"address": raftAddress,
}).Log("error removing existing node")
return err
}
return err
*/
c.logger.Debug().WithError(err).Log("")
}
c.logger.Info().WithFields(log.Fields{
"nodeid": id,
"address": raftAddress,
@ -537,7 +554,7 @@ type Snapshot struct {
Data []byte
}
func (c *cluster) Snapshot() ([]byte, error) {
func (c *cluster) Snapshot() (io.ReadCloser, error) {
if !c.IsRaftLeader() {
c.logger.Debug().Log("not leader, forwarding to leader")
return c.forwarder.Snapshot()
@ -573,7 +590,19 @@ func (c *cluster) Snapshot() ([]byte, error) {
return nil, err
}
return buffer.Bytes(), nil
return &readCloserWrapper{&buffer}, nil
}
type readCloserWrapper struct {
io.Reader
}
func (rcw *readCloserWrapper) Read(p []byte) (int, error) {
return rcw.Reader.Read(p)
}
func (rcw *readCloserWrapper) Close() error {
return nil
}
func (c *cluster) ListNodes() []addNodeCommand {
@ -639,6 +668,81 @@ func (c *cluster) RemoveNode(id string) error {
return nil
}
func (c *cluster) trackNodeChanges() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Get the latest configuration.
future := c.raft.GetConfiguration()
if err := future.Error(); err != nil {
c.logger.Error().WithError(err).Log("failed to get raft configuration")
continue
}
c.nodesLock.Lock()
removeNodes := map[string]struct{}{}
for id := range c.nodes {
removeNodes[id] = struct{}{}
}
for _, server := range future.Configuration().Servers {
id := string(server.ID)
if id == c.id {
continue
}
_, ok := c.nodes[id]
if !ok {
address, err := c.CoreAPIAddress(string(server.Address))
if err != nil {
c.logger.Warn().WithError(err).WithFields(log.Fields{
"id": id,
"address": server.Address,
}).Log("Discovering core API address failed")
continue
}
node := NewNode(address)
err = node.Connect()
if err != nil {
c.logger.Warn().WithError(err).WithFields(log.Fields{
"id": id,
"address": server.Address,
}).Log("Connecting to core API failed")
continue
}
if _, err := c.proxy.AddNode(id, node); err != nil {
c.logger.Warn().WithError(err).WithFields(log.Fields{
"id": id,
"address": address,
}).Log("Adding node failed")
}
c.nodes[id] = node
} else {
delete(removeNodes, id)
}
}
for id := range removeNodes {
if node, ok := c.nodes[id]; ok {
node.Disconnect()
c.proxy.RemoveNode(id)
}
}
c.nodesLock.Unlock()
case <-c.shutdownCh:
return
}
}
}
// trackLeaderChanges registers an Observer with raft in order to receive updates
// about leader changes, in order to keep the forwarder up to date.
func (c *cluster) trackLeaderChanges() {
@ -661,7 +765,7 @@ func (c *cluster) trackLeaderChanges() {
}).Log("new leader observation")
addr := string(leaderObs.LeaderAddr)
if len(addr) != 0 {
addr, _ = c.APIAddr(addr)
addr, _ = c.ClusterAPIAddress(addr)
}
c.forwarder.SetLeader(addr)
c.leaderLock.Lock()
@ -901,6 +1005,76 @@ func (c *cluster) applyCommand(cmd *command) error {
return nil
}
type ClusterServer struct {
ID string
Address string
Voter bool
Leader bool
}
type ClusterStats struct {
State string
LastContact time.Duration
NumPeers uint64
}
type ClusterAbout struct {
ID string
Address string
ClusterAPIAddress string
CoreAPIAddress string
Nodes []ClusterServer
Stats ClusterStats
}
func (c *cluster) About() (ClusterAbout, error) {
about := ClusterAbout{
ID: c.id,
Address: c.Address(),
}
if address, err := c.ClusterAPIAddress(""); err == nil {
about.ClusterAPIAddress = address
}
if address, err := c.CoreAPIAddress(""); err == nil {
about.CoreAPIAddress = address
}
stats := c.raft.Stats()
about.Stats.State = stats["state"]
if x, err := time.ParseDuration(stats["last_contact"]); err == nil {
about.Stats.LastContact = x
}
if x, err := strconv.ParseUint(stats["num_peers"], 10, 64); err == nil {
about.Stats.NumPeers = x
}
_, leaderID := c.raft.LeaderWithID()
future := c.raft.GetConfiguration()
if err := future.Error(); err != nil {
c.logger.Error().WithError(err).Log("failed to get raft configuration")
return ClusterAbout{}, err
}
for _, server := range future.Configuration().Servers {
node := ClusterServer{
ID: string(server.ID),
Address: string(server.Address),
Voter: server.Suffrage == raft.Voter,
Leader: server.ID == leaderID,
}
about.Nodes = append(about.Nodes, node)
}
return about, nil
}
func (c *cluster) sentinel() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
@ -918,7 +1092,6 @@ func (c *cluster) sentinel() {
stats := c.raft.Stats()
fields := log.Fields{}
for k, v := range stats {
fields[k] = v
}
@ -950,10 +1123,6 @@ func (c *cluster) sentinel() {
}
}
func (c *cluster) GetURL(path string) (string, error) {
return c.proxy.GetURL(path)
}
func (c *cluster) GetFile(path string) (io.ReadCloser, error) {
return c.proxy.GetFile(path)
func (c *cluster) ProxyReader() ProxyReader {
return c.proxy.Reader()
}

View File

@ -1,15 +1,12 @@
package cluster
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"sync"
"time"
httpapi "github.com/datarhei/core/v16/http/api"
"github.com/datarhei/core/v16/log"
)
@ -19,18 +16,17 @@ type Forwarder interface {
HasLeader() bool
Join(origin, id, raftAddress, apiAddress, peerAddress string) error
Leave(origin, id string) error
Snapshot() ([]byte, error)
Snapshot() (io.ReadCloser, error)
AddProcess() error
UpdateProcess() error
RemoveProcess() error
}
type forwarder struct {
id string
leaderAddr string
lock sync.RWMutex
id string
lock sync.RWMutex
client *http.Client
client APIClient
logger log.Logger
}
@ -60,7 +56,9 @@ func NewForwarder(config ForwarderConfig) (Forwarder, error) {
Timeout: 5 * time.Second,
}
f.client = client
f.client = APIClient{
Client: client,
}
return f, nil
}
@ -69,17 +67,17 @@ func (f *forwarder) SetLeader(address string) {
f.lock.Lock()
defer f.lock.Unlock()
if f.leaderAddr == address {
if f.client.Address == address {
return
}
f.logger.Debug().Log("setting leader address to %s", address)
f.leaderAddr = address
f.client.Address = address
}
func (f *forwarder) HasLeader() bool {
return len(f.leaderAddr) != 0
return len(f.client.Address) != 0
}
func (f *forwarder) Join(origin, id, raftAddress, apiAddress, peerAddress string) error {
@ -96,14 +94,18 @@ func (f *forwarder) Join(origin, id, raftAddress, apiAddress, peerAddress string
f.logger.Debug().WithField("request", r).Log("forwarding to leader")
data, err := json.Marshal(&r)
if err != nil {
return err
f.lock.RLock()
client := f.client
f.lock.RUnlock()
if len(peerAddress) != 0 {
client = APIClient{
Address: peerAddress,
Client: f.client.Client,
}
}
_, err = f.call(http.MethodPost, "/join", "application/json", bytes.NewReader(data), peerAddress)
return err
return client.Join(r)
}
func (f *forwarder) Leave(origin, id string) error {
@ -118,28 +120,19 @@ func (f *forwarder) Leave(origin, id string) error {
f.logger.Debug().WithField("request", r).Log("forwarding to leader")
data, err := json.Marshal(&r)
if err != nil {
return err
}
f.lock.RLock()
client := f.client
f.lock.RUnlock()
_, err = f.call(http.MethodPost, "/leave", "application/json", bytes.NewReader(data), "")
return err
return client.Leave(r)
}
func (f *forwarder) Snapshot() ([]byte, error) {
r, err := f.stream(http.MethodGet, "/snapshot", "", nil, "")
if err != nil {
return nil, err
}
func (f *forwarder) Snapshot() (io.ReadCloser, error) {
f.lock.RLock()
client := f.client
f.lock.RUnlock()
data, err := io.ReadAll(r)
if err != nil {
return nil, err
}
return data, nil
return client.Snapshot()
}
func (f *forwarder) AddProcess() error {
@ -153,70 +146,3 @@ func (f *forwarder) UpdateProcess() error {
func (f *forwarder) RemoveProcess() error {
return fmt.Errorf("not implemented")
}
func (f *forwarder) stream(method, path, contentType string, data io.Reader, leaderOverride string) (io.ReadCloser, error) {
leaderAddr := f.leaderAddr
if len(leaderOverride) != 0 {
leaderAddr = leaderOverride
}
if len(leaderAddr) == 0 {
return nil, fmt.Errorf("no leader address defined")
}
f.lock.RLock()
address := "http://" + leaderAddr + "/v1" + path
f.lock.RUnlock()
f.logger.Debug().Log("address: %s", address)
req, err := http.NewRequest(method, address, data)
if err != nil {
return nil, err
}
if method == "POST" || method == "PUT" {
req.Header.Add("Content-Type", contentType)
}
status, body, err := f.request(req)
if err != nil {
return nil, err
}
if status < 200 || status >= 300 {
e := httpapi.Error{}
defer body.Close()
x, _ := io.ReadAll(body)
json.Unmarshal(x, &e)
return nil, e
}
return body, nil
}
func (f *forwarder) call(method, path, contentType string, data io.Reader, leaderOverride string) ([]byte, error) {
body, err := f.stream(method, path, contentType, data, leaderOverride)
if err != nil {
return nil, err
}
defer body.Close()
x, _ := io.ReadAll(body)
return x, nil
}
func (f *forwarder) request(req *http.Request) (int, io.ReadCloser, error) {
resp, err := f.client.Do(req)
if err != nil {
return -1, nil, err
}
return resp.StatusCode, resp.Body, nil
}

View File

@ -16,7 +16,21 @@ import (
"github.com/datarhei/core/v16/client"
)
type Node interface {
Connect() error
Disconnect()
Start(updates chan<- NodeState) error
Stop()
GetURL(path string) (string, error)
GetFile(path string) (io.ReadCloser, error)
NodeReader
}
type NodeReader interface {
ID() string
Address() string
IPs() []string
State() NodeState
@ -26,12 +40,9 @@ type NodeState struct {
ID string
State string
Files []string
LastPing time.Time
LastUpdate time.Time
}
type NodeSpecs struct {
ID string
Address string
Latency time.Duration
}
type nodeState string
@ -46,18 +57,24 @@ const (
)
type node struct {
address string
ips []string
state nodeState
username string
password string
updates chan<- NodeState
address string
ips []string
peer client.RestClient
filesList []string
lastUpdate time.Time
lock sync.RWMutex
cancel context.CancelFunc
once sync.Once
peerLock sync.RWMutex
lastPing time.Time
cancelPing context.CancelFunc
state nodeState
latency float64 // Seconds
stateLock sync.RWMutex
updates chan<- NodeState
filesList []string
lastUpdate time.Time
cancelFiles context.CancelFunc
runningLock sync.Mutex
running bool
host string
secure bool
@ -72,56 +89,54 @@ type node struct {
prefix *regexp.Regexp
}
func newNode(address string, updates chan<- NodeState) (*node, error) {
u, err := url.Parse(address)
func NewNode(address string) Node {
n := &node{
address: address,
state: stateDisconnected,
prefix: regexp.MustCompile(`^[a-z]+:`),
secure: strings.HasPrefix(address, "https://"),
}
return n
}
func (n *node) Connect() error {
n.peerLock.Lock()
defer n.peerLock.Unlock()
if n.peer != nil {
return nil
}
u, err := url.Parse(n.address)
if err != nil {
return nil, fmt.Errorf("invalid address: %w", err)
return fmt.Errorf("invalid address: %w", err)
}
host, _, err := net.SplitHostPort(u.Host)
if err != nil {
return nil, fmt.Errorf("invalid address: %w", err)
return fmt.Errorf("invalid address: %w", err)
}
addrs, err := net.LookupHost(host)
if err != nil {
return nil, fmt.Errorf("lookup failed: %w", err)
}
username := u.User.Username()
password := ""
if pw, ok := u.User.Password(); ok {
password = pw
}
n := &node{
address: address,
ips: addrs,
username: username,
password: password,
state: stateDisconnected,
updates: updates,
prefix: regexp.MustCompile(`^[a-z]+:`),
host: host,
secure: strings.HasPrefix(address, "https://"),
return fmt.Errorf("lookup failed: %w", err)
}
peer, err := client.New(client.Config{
Address: address,
Username: username,
Password: password,
Address: n.address,
Auth0Token: "",
Client: &http.Client{
Timeout: 5 * time.Second,
},
})
if err != nil {
return nil, err
return fmt.Errorf("creating client failed (%s): %w", n.address, err)
}
config, err := peer.Config()
if err != nil {
return nil, err
return err
}
if config.Config.RTMP.Enable {
@ -159,10 +174,67 @@ func newNode(address string, updates chan<- NodeState) (*node, error) {
}
}
n.ips = addrs
n.host = host
n.peer = peer
ctx, cancel := context.WithCancel(context.Background())
n.cancel = cancel
n.cancelPing = cancel
go func(ctx context.Context) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// ping
ok, latency := n.peer.Ping()
n.stateLock.Lock()
if !ok {
n.state = stateDisconnected
} else {
n.lastPing = time.Now()
n.state = stateConnected
}
n.latency = n.latency*0.2 + latency.Seconds()*0.8
n.stateLock.Unlock()
case <-ctx.Done():
return
}
}
}(ctx)
return nil
}
func (n *node) Disconnect() {
n.peerLock.Lock()
defer n.peerLock.Unlock()
if n.cancelPing != nil {
n.cancelPing()
n.cancelPing = nil
}
n.peer = nil
}
func (n *node) Start(updates chan<- NodeState) error {
n.runningLock.Lock()
defer n.runningLock.Unlock()
if n.running {
return nil
}
n.running = true
n.updates = updates
ctx, cancel := context.WithCancel(context.Background())
n.cancelFiles = cancel
go func(ctx context.Context) {
ticker := time.NewTicker(time.Second)
@ -183,7 +255,20 @@ func newNode(address string, updates chan<- NodeState) (*node, error) {
}
}(ctx)
return n, nil
return nil
}
func (n *node) Stop() {
n.runningLock.Lock()
defer n.runningLock.Unlock()
if !n.running {
return
}
n.running = false
n.cancelFiles()
}
func (n *node) Address() string {
@ -199,12 +284,14 @@ func (n *node) ID() string {
}
func (n *node) State() NodeState {
n.lock.RLock()
defer n.lock.RUnlock()
n.stateLock.RLock()
defer n.stateLock.RUnlock()
state := NodeState{
ID: n.peer.ID(),
LastPing: n.lastPing,
LastUpdate: n.lastUpdate,
Latency: time.Duration(n.latency * float64(time.Second)),
}
if n.state == stateDisconnected || time.Since(n.lastUpdate) > 2*time.Second {
@ -218,10 +305,6 @@ func (n *node) State() NodeState {
return state
}
func (n *node) stop() {
n.once.Do(func() { n.cancel() })
}
func (n *node) files() {
filesChan := make(chan string, 1024)
filesList := []string{}
@ -247,6 +330,9 @@ func (n *node) files() {
go func(f chan<- string) {
defer wg.Done()
n.peerLock.RLock()
defer n.peerLock.RUnlock()
files, err := n.peer.MemFSList("name", "asc")
if err != nil {
return
@ -260,6 +346,9 @@ func (n *node) files() {
go func(f chan<- string) {
defer wg.Done()
n.peerLock.RLock()
defer n.peerLock.RUnlock()
files, err := n.peer.DiskFSList("name", "asc")
if err != nil {
return
@ -276,6 +365,9 @@ func (n *node) files() {
go func(f chan<- string) {
defer wg.Done()
n.peerLock.RLock()
defer n.peerLock.RUnlock()
files, err := n.peer.RTMPChannels()
if err != nil {
return
@ -293,6 +385,9 @@ func (n *node) files() {
go func(f chan<- string) {
defer wg.Done()
n.peerLock.RLock()
defer n.peerLock.RUnlock()
files, err := n.peer.SRTChannels()
if err != nil {
return
@ -310,17 +405,16 @@ func (n *node) files() {
wgList.Wait()
n.lock.Lock()
n.stateLock.Lock()
n.filesList = make([]string, len(filesList))
copy(n.filesList, filesList)
n.lastUpdate = time.Now()
n.state = stateConnected
n.lock.Unlock()
n.stateLock.Unlock()
}
func (n *node) getURL(path string) (string, error) {
func (n *node) GetURL(path string) (string, error) {
// Remove prefix from path
prefix := n.prefix.FindString(path)
path = n.prefix.ReplaceAllString(path, "")
@ -353,11 +447,14 @@ func (n *node) getURL(path string) (string, error) {
return u, nil
}
func (n *node) getFile(path string) (io.ReadCloser, error) {
func (n *node) GetFile(path string) (io.ReadCloser, error) {
// Remove prefix from path
prefix := n.prefix.FindString(path)
path = n.prefix.ReplaceAllString(path, "")
n.peerLock.RLock()
defer n.peerLock.RUnlock()
if prefix == "mem:" {
return n.peer.MemFSGetFile(path)
} else if prefix == "disk:" {

View File

@ -15,8 +15,15 @@ type Proxy interface {
Start()
Stop()
AddNode(address string) (string, error)
AddNode(id string, node Node) (string, error)
RemoveNode(id string) error
ProxyReader
Reader() ProxyReader
}
type ProxyReader interface {
ListNodes() []NodeReader
GetNode(id string) (NodeReader, error)
@ -24,6 +31,46 @@ type Proxy interface {
GetFile(path string) (io.ReadCloser, error)
}
func NewNullProxyReader() ProxyReader {
return &proxyReader{}
}
type proxyReader struct {
proxy *proxy
}
func (p *proxyReader) ListNodes() []NodeReader {
if p.proxy == nil {
return nil
}
return p.proxy.ListNodes()
}
func (p *proxyReader) GetNode(id string) (NodeReader, error) {
if p.proxy == nil {
return nil, fmt.Errorf("no proxy provided")
}
return p.proxy.GetNode(id)
}
func (p *proxyReader) GetURL(path string) (string, error) {
if p.proxy == nil {
return "", fmt.Errorf("no proxy provided")
}
return p.proxy.GetURL(path)
}
func (p *proxyReader) GetFile(path string) (io.ReadCloser, error) {
if p.proxy == nil {
return nil, fmt.Errorf("no proxy provided")
}
return p.proxy.GetFile(path)
}
type ProxyConfig struct {
ID string // ID of the node
Name string // Name of the node
@ -36,7 +83,7 @@ type proxy struct {
id string
name string
nodes map[string]*node // List of known nodes
nodes map[string]Node // List of known nodes
idfiles map[string][]string // Map from nodeid to list of files
idupdate map[string]time.Time // Map from nodeid to time of last update
fileid map[string]string // Map from file name to nodeid
@ -57,7 +104,7 @@ func NewProxy(config ProxyConfig) (Proxy, error) {
p := &proxy{
id: config.ID,
name: config.Name,
nodes: map[string]*node{},
nodes: map[string]Node{},
idfiles: map[string][]string{},
idupdate: map[string]time.Time{},
fileid: map[string]string{},
@ -141,30 +188,45 @@ func (p *proxy) Stop() {
p.logger.Debug().Log("stopping proxy")
p.cancel()
p.cancel = nil
for _, node := range p.nodes {
node.stop()
node.Stop()
}
p.nodes = map[string]*node{}
p.nodes = map[string]Node{}
}
func (p *proxy) AddNode(address string) (string, error) {
node, err := newNode(address, p.updates)
if err != nil {
return "", err
func (p *proxy) Reader() ProxyReader {
return &proxyReader{
proxy: p,
}
}
id := node.ID()
func (p *proxy) AddNode(id string, node Node) (string, error) {
if id == p.id {
return "", fmt.Errorf("can't add myself as node or a node with the same ID")
}
if id != node.ID() {
return "", fmt.Errorf("the provided (%s) and retrieved (%s) ID's don't match", id, node.ID())
}
p.lock.Lock()
defer p.lock.Unlock()
if _, ok := p.nodes[id]; ok {
node.stop()
if n, ok := p.nodes[id]; ok {
n.Stop()
delete(p.nodes, id)
ips := node.IPs()
for _, ip := range ips {
p.limiter.RemoveBlock(ip)
}
return id, nil
}
@ -175,8 +237,10 @@ func (p *proxy) AddNode(address string) (string, error) {
p.nodes[id] = node
node.Start(p.updates)
p.logger.Info().WithFields(log.Fields{
"address": address,
"address": node.Address(),
"id": id,
}).Log("Added node")
@ -192,7 +256,7 @@ func (p *proxy) RemoveNode(id string) error {
return ErrNodeNotFound
}
node.stop()
node.Stop()
delete(p.nodes, id)
@ -261,7 +325,7 @@ func (c *proxy) GetURL(path string) (string, error) {
return "", fmt.Errorf("file not found")
}
url, err := node.getURL(path)
url, err := node.GetURL(path)
if err != nil {
c.logger.Debug().WithField("path", path).Log("Invalid path")
return "", fmt.Errorf("file not found")
@ -299,7 +363,7 @@ func (p *proxy) GetFile(path string) (io.ReadCloser, error) {
return nil, fmt.Errorf("file not found")
}
data, err := node.getFile(path)
data, err := node.GetFile(path)
if err != nil {
p.logger.Debug().WithField("path", path).Log("Invalid path")
return nil, fmt.Errorf("file not found")

View File

@ -12,8 +12,8 @@ import (
type Store interface {
raft.FSM
ListNodes() []string
GetNode(id string) string
ListNodes() []StoreNode
GetNode(id string) (StoreNode, error)
}
type operation string
@ -39,6 +39,11 @@ type removeNodeCommand struct {
ID string
}
type StoreNode struct {
ID string
Address string
}
type addProcessCommand struct {
Config []byte
}
@ -130,12 +135,35 @@ func (s *store) Restore(snapshot io.ReadCloser) error {
return nil
}
func (s *store) ListNodes() []string {
return nil
func (s *store) ListNodes() []StoreNode {
nodes := []StoreNode{}
s.lock.Lock()
defer s.lock.Unlock()
for id, address := range s.Nodes {
nodes = append(nodes, StoreNode{
ID: id,
Address: address,
})
}
return nodes
}
func (s *store) GetNode(id string) string {
return ""
func (s *store) GetNode(id string) (StoreNode, error) {
s.lock.Lock()
defer s.lock.Unlock()
address, ok := s.Nodes[id]
if !ok {
return StoreNode{}, fmt.Errorf("not found")
}
return StoreNode{
ID: id,
Address: address,
}, nil
}
type fsmSnapshot struct {

View File

@ -221,6 +221,35 @@ const docTemplate = `{
],
"summary": "List of nodes in the cluster",
"operationId": "cluster-3-get-cluster",
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/api.ClusterAbout"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/api.Error"
}
}
}
}
},
"/api/v3/cluster/proxy": {
"get": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "List of proxy nodes in the cluster",
"produces": [
"application/json"
],
"summary": "List of proxy nodes in the cluster",
"operationId": "cluster-3-get-proxy-nodes",
"responses": {
"200": {
"description": "OK",
@ -240,62 +269,19 @@ const docTemplate = `{
}
}
},
"/api/v3/cluster/node": {
"post": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Add a new node to the cluster",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"summary": "Add a new node",
"operationId": "cluster-3-add-node",
"parameters": [
{
"description": "Node config",
"name": "config",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/api.ClusterNodeConfig"
}
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "string"
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/api.Error"
}
}
}
}
},
"/api/v3/cluster/node/{id}": {
"/api/v3/cluster/proxy/node/{id}": {
"get": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "List a node by its ID",
"description": "List a proxy node by its ID",
"produces": [
"application/json"
],
"summary": "List a node by its ID",
"operationId": "cluster-3-get-node",
"summary": "List a proxy node by its ID",
"operationId": "cluster-3-get-proxy-node",
"parameters": [
{
"type": "string",
@ -319,117 +305,21 @@ const docTemplate = `{
}
}
}
},
"put": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Replaces an existing node and returns the new node ID",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"summary": "Replaces an existing node",
"operationId": "cluster-3-update-node",
"parameters": [
{
"type": "string",
"description": "Node ID",
"name": "id",
"in": "path",
"required": true
},
{
"description": "Node config",
"name": "config",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/api.ClusterNodeConfig"
}
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "string"
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/api.Error"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/api.Error"
}
}
}
},
"delete": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Delete a node by its ID",
"produces": [
"application/json"
],
"summary": "Delete a node by its ID",
"operationId": "cluster-3-delete-node",
"parameters": [
{
"type": "string",
"description": "Node ID",
"name": "id",
"in": "path",
"required": true
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "string"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/api.Error"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/api.Error"
}
}
}
}
},
"/api/v3/cluster/node/{id}/proxy": {
"/api/v3/cluster/proxy/node/{id}/files": {
"get": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "List the files of a node by its ID",
"description": "List the files of a proxy node by its ID",
"produces": [
"application/json"
],
"summary": "List the files of a node by its ID",
"operationId": "cluster-3-get-node-proxy",
"summary": "List the files of a proxy node by its ID",
"operationId": "cluster-3-get-proxy-node-files",
"parameters": [
{
"type": "string",
@ -2342,6 +2232,32 @@ const docTemplate = `{
}
}
},
"api.ClusterAbout": {
"type": "object",
"properties": {
"address": {
"type": "string"
},
"cluster_api_address": {
"type": "string"
},
"core_api_address": {
"type": "string"
},
"id": {
"type": "string"
},
"nodes": {
"type": "array",
"items": {
"$ref": "#/definitions/api.ClusterServer"
}
},
"stats": {
"$ref": "#/definitions/api.ClusterStats"
}
}
},
"api.ClusterNode": {
"type": "object",
"properties": {
@ -2351,28 +2267,21 @@ const docTemplate = `{
"id": {
"type": "string"
},
"last_ping": {
"type": "integer"
},
"last_update": {
"type": "integer"
},
"latency_ms": {
"description": "milliseconds",
"type": "number"
},
"state": {
"type": "string"
}
}
},
"api.ClusterNodeConfig": {
"type": "object",
"properties": {
"address": {
"type": "string"
},
"password": {
"type": "string"
},
"username": {
"type": "string"
}
}
},
"api.ClusterNodeFiles": {
"type": "object",
"additionalProperties": {
@ -2382,6 +2291,37 @@ const docTemplate = `{
}
}
},
"api.ClusterServer": {
"type": "object",
"properties": {
"address": {
"type": "string"
},
"id": {
"type": "string"
},
"leader": {
"type": "boolean"
},
"voter": {
"type": "boolean"
}
}
},
"api.ClusterStats": {
"type": "object",
"properties": {
"last_contact_ms": {
"type": "number"
},
"num_peers": {
"type": "integer"
},
"state": {
"type": "string"
}
}
},
"api.Command": {
"type": "object",
"required": [
@ -2491,6 +2431,32 @@ const docTemplate = `{
}
}
},
"cluster": {
"type": "object",
"properties": {
"address": {
"type": "string"
},
"bootstrap": {
"type": "boolean"
},
"debug": {
"type": "boolean"
},
"enable": {
"type": "boolean"
},
"peers": {
"type": "array",
"items": {
"type": "string"
}
},
"recover": {
"type": "boolean"
}
}
},
"created_at": {
"description": "When this config has been persisted",
"type": "string"
@ -3334,6 +3300,10 @@ const docTemplate = `{
},
"type": {
"type": "string"
},
"updated_at": {
"type": "integer",
"format": "int64"
}
}
},
@ -3648,18 +3618,7 @@ const docTemplate = `{
"format": "uint64"
},
"framerate": {
"type": "object",
"properties": {
"avg": {
"type": "number"
},
"max": {
"type": "number"
},
"min": {
"type": "number"
}
}
"$ref": "#/definitions/api.ProgressIOFramerate"
},
"height": {
"type": "integer",
@ -3717,6 +3676,20 @@ const docTemplate = `{
}
}
},
"api.ProgressIOFramerate": {
"type": "object",
"properties": {
"avg": {
"type": "number"
},
"max": {
"type": "number"
},
"min": {
"type": "number"
}
}
},
"api.RTMPChannel": {
"type": "object",
"properties": {
@ -4293,6 +4266,32 @@ const docTemplate = `{
}
}
},
"cluster": {
"type": "object",
"properties": {
"address": {
"type": "string"
},
"bootstrap": {
"type": "boolean"
},
"debug": {
"type": "boolean"
},
"enable": {
"type": "boolean"
},
"peers": {
"type": "array",
"items": {
"type": "string"
}
},
"recover": {
"type": "boolean"
}
}
},
"created_at": {
"description": "When this config has been persisted",
"type": "string"

View File

@ -214,6 +214,35 @@
],
"summary": "List of nodes in the cluster",
"operationId": "cluster-3-get-cluster",
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/api.ClusterAbout"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/api.Error"
}
}
}
}
},
"/api/v3/cluster/proxy": {
"get": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "List of proxy nodes in the cluster",
"produces": [
"application/json"
],
"summary": "List of proxy nodes in the cluster",
"operationId": "cluster-3-get-proxy-nodes",
"responses": {
"200": {
"description": "OK",
@ -233,62 +262,19 @@
}
}
},
"/api/v3/cluster/node": {
"post": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Add a new node to the cluster",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"summary": "Add a new node",
"operationId": "cluster-3-add-node",
"parameters": [
{
"description": "Node config",
"name": "config",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/api.ClusterNodeConfig"
}
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "string"
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/api.Error"
}
}
}
}
},
"/api/v3/cluster/node/{id}": {
"/api/v3/cluster/proxy/node/{id}": {
"get": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "List a node by its ID",
"description": "List a proxy node by its ID",
"produces": [
"application/json"
],
"summary": "List a node by its ID",
"operationId": "cluster-3-get-node",
"summary": "List a proxy node by its ID",
"operationId": "cluster-3-get-proxy-node",
"parameters": [
{
"type": "string",
@ -312,117 +298,21 @@
}
}
}
},
"put": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Replaces an existing node and returns the new node ID",
"consumes": [
"application/json"
],
"produces": [
"application/json"
],
"summary": "Replaces an existing node",
"operationId": "cluster-3-update-node",
"parameters": [
{
"type": "string",
"description": "Node ID",
"name": "id",
"in": "path",
"required": true
},
{
"description": "Node config",
"name": "config",
"in": "body",
"required": true,
"schema": {
"$ref": "#/definitions/api.ClusterNodeConfig"
}
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "string"
}
},
"400": {
"description": "Bad Request",
"schema": {
"$ref": "#/definitions/api.Error"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/api.Error"
}
}
}
},
"delete": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Delete a node by its ID",
"produces": [
"application/json"
],
"summary": "Delete a node by its ID",
"operationId": "cluster-3-delete-node",
"parameters": [
{
"type": "string",
"description": "Node ID",
"name": "id",
"in": "path",
"required": true
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "string"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/api.Error"
}
},
"500": {
"description": "Internal Server Error",
"schema": {
"$ref": "#/definitions/api.Error"
}
}
}
}
},
"/api/v3/cluster/node/{id}/proxy": {
"/api/v3/cluster/proxy/node/{id}/files": {
"get": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "List the files of a node by its ID",
"description": "List the files of a proxy node by its ID",
"produces": [
"application/json"
],
"summary": "List the files of a node by its ID",
"operationId": "cluster-3-get-node-proxy",
"summary": "List the files of a proxy node by its ID",
"operationId": "cluster-3-get-proxy-node-files",
"parameters": [
{
"type": "string",
@ -2335,6 +2225,32 @@
}
}
},
"api.ClusterAbout": {
"type": "object",
"properties": {
"address": {
"type": "string"
},
"cluster_api_address": {
"type": "string"
},
"core_api_address": {
"type": "string"
},
"id": {
"type": "string"
},
"nodes": {
"type": "array",
"items": {
"$ref": "#/definitions/api.ClusterServer"
}
},
"stats": {
"$ref": "#/definitions/api.ClusterStats"
}
}
},
"api.ClusterNode": {
"type": "object",
"properties": {
@ -2344,28 +2260,21 @@
"id": {
"type": "string"
},
"last_ping": {
"type": "integer"
},
"last_update": {
"type": "integer"
},
"latency_ms": {
"description": "milliseconds",
"type": "number"
},
"state": {
"type": "string"
}
}
},
"api.ClusterNodeConfig": {
"type": "object",
"properties": {
"address": {
"type": "string"
},
"password": {
"type": "string"
},
"username": {
"type": "string"
}
}
},
"api.ClusterNodeFiles": {
"type": "object",
"additionalProperties": {
@ -2375,6 +2284,37 @@
}
}
},
"api.ClusterServer": {
"type": "object",
"properties": {
"address": {
"type": "string"
},
"id": {
"type": "string"
},
"leader": {
"type": "boolean"
},
"voter": {
"type": "boolean"
}
}
},
"api.ClusterStats": {
"type": "object",
"properties": {
"last_contact_ms": {
"type": "number"
},
"num_peers": {
"type": "integer"
},
"state": {
"type": "string"
}
}
},
"api.Command": {
"type": "object",
"required": [
@ -2484,6 +2424,32 @@
}
}
},
"cluster": {
"type": "object",
"properties": {
"address": {
"type": "string"
},
"bootstrap": {
"type": "boolean"
},
"debug": {
"type": "boolean"
},
"enable": {
"type": "boolean"
},
"peers": {
"type": "array",
"items": {
"type": "string"
}
},
"recover": {
"type": "boolean"
}
}
},
"created_at": {
"description": "When this config has been persisted",
"type": "string"
@ -3327,6 +3293,10 @@
},
"type": {
"type": "string"
},
"updated_at": {
"type": "integer",
"format": "int64"
}
}
},
@ -3641,18 +3611,7 @@
"format": "uint64"
},
"framerate": {
"type": "object",
"properties": {
"avg": {
"type": "number"
},
"max": {
"type": "number"
},
"min": {
"type": "number"
}
}
"$ref": "#/definitions/api.ProgressIOFramerate"
},
"height": {
"type": "integer",
@ -3710,6 +3669,20 @@
}
}
},
"api.ProgressIOFramerate": {
"type": "object",
"properties": {
"avg": {
"type": "number"
},
"max": {
"type": "number"
},
"min": {
"type": "number"
}
}
},
"api.RTMPChannel": {
"type": "object",
"properties": {
@ -4286,6 +4259,32 @@
}
}
},
"cluster": {
"type": "object",
"properties": {
"address": {
"type": "string"
},
"bootstrap": {
"type": "boolean"
},
"debug": {
"type": "boolean"
},
"enable": {
"type": "boolean"
},
"peers": {
"type": "array",
"items": {
"type": "string"
}
},
"recover": {
"type": "boolean"
}
}
},
"created_at": {
"description": "When this config has been persisted",
"type": "string"

View File

@ -62,32 +62,65 @@ definitions:
version:
$ref: '#/definitions/api.Version'
type: object
api.ClusterAbout:
properties:
address:
type: string
cluster_api_address:
type: string
core_api_address:
type: string
id:
type: string
nodes:
items:
$ref: '#/definitions/api.ClusterServer'
type: array
stats:
$ref: '#/definitions/api.ClusterStats'
type: object
api.ClusterNode:
properties:
address:
type: string
id:
type: string
last_ping:
type: integer
last_update:
type: integer
latency_ms:
description: milliseconds
type: number
state:
type: string
type: object
api.ClusterNodeConfig:
properties:
address:
type: string
password:
type: string
username:
type: string
type: object
api.ClusterNodeFiles:
additionalProperties:
items:
type: string
type: array
type: object
api.ClusterServer:
properties:
address:
type: string
id:
type: string
leader:
type: boolean
voter:
type: boolean
type: object
api.ClusterStats:
properties:
last_contact_ms:
type: number
num_peers:
type: integer
state:
type: string
type: object
api.Command:
properties:
command:
@ -159,6 +192,23 @@ definitions:
read_only:
type: boolean
type: object
cluster:
properties:
address:
type: string
bootstrap:
type: boolean
debug:
type: boolean
enable:
type: boolean
peers:
items:
type: string
type: array
recover:
type: boolean
type: object
created_at:
description: When this config has been persisted
type: string
@ -727,6 +777,9 @@ definitions:
$ref: '#/definitions/api.ProcessState'
type:
type: string
updated_at:
format: int64
type: integer
type: object
api.ProcessConfig:
properties:
@ -940,14 +993,7 @@ definitions:
format: uint64
type: integer
framerate:
properties:
avg:
type: number
max:
type: number
min:
type: number
type: object
$ref: '#/definitions/api.ProgressIOFramerate'
height:
format: uint64
type: integer
@ -989,6 +1035,15 @@ definitions:
format: uint64
type: integer
type: object
api.ProgressIOFramerate:
properties:
avg:
type: number
max:
type: number
min:
type: number
type: object
api.RTMPChannel:
properties:
name:
@ -1441,6 +1496,23 @@ definitions:
read_only:
type: boolean
type: object
cluster:
properties:
address:
type: string
bootstrap:
type: boolean
debug:
type: boolean
enable:
type: boolean
peers:
items:
type: string
type: array
recover:
type: boolean
type: object
created_at:
description: When this config has been persisted
type: string
@ -2092,6 +2164,24 @@ paths:
operationId: cluster-3-get-cluster
produces:
- application/json
responses:
"200":
description: OK
schema:
$ref: '#/definitions/api.ClusterAbout'
"404":
description: Not Found
schema:
$ref: '#/definitions/api.Error'
security:
- ApiKeyAuth: []
summary: List of nodes in the cluster
/api/v3/cluster/proxy:
get:
description: List of proxy nodes in the cluster
operationId: cluster-3-get-proxy-nodes
produces:
- application/json
responses:
"200":
description: OK
@ -2105,65 +2195,11 @@ paths:
$ref: '#/definitions/api.Error'
security:
- ApiKeyAuth: []
summary: List of nodes in the cluster
/api/v3/cluster/node:
post:
consumes:
- application/json
description: Add a new node to the cluster
operationId: cluster-3-add-node
parameters:
- description: Node config
in: body
name: config
required: true
schema:
$ref: '#/definitions/api.ClusterNodeConfig'
produces:
- application/json
responses:
"200":
description: OK
schema:
type: string
"400":
description: Bad Request
schema:
$ref: '#/definitions/api.Error'
security:
- ApiKeyAuth: []
summary: Add a new node
/api/v3/cluster/node/{id}:
delete:
description: Delete a node by its ID
operationId: cluster-3-delete-node
parameters:
- description: Node ID
in: path
name: id
required: true
type: string
produces:
- application/json
responses:
"200":
description: OK
schema:
type: string
"404":
description: Not Found
schema:
$ref: '#/definitions/api.Error'
"500":
description: Internal Server Error
schema:
$ref: '#/definitions/api.Error'
security:
- ApiKeyAuth: []
summary: Delete a node by its ID
summary: List of proxy nodes in the cluster
/api/v3/cluster/proxy/node/{id}:
get:
description: List a node by its ID
operationId: cluster-3-get-node
description: List a proxy node by its ID
operationId: cluster-3-get-proxy-node
parameters:
- description: Node ID
in: path
@ -2183,46 +2219,11 @@ paths:
$ref: '#/definitions/api.Error'
security:
- ApiKeyAuth: []
summary: List a node by its ID
put:
consumes:
- application/json
description: Replaces an existing node and returns the new node ID
operationId: cluster-3-update-node
parameters:
- description: Node ID
in: path
name: id
required: true
type: string
- description: Node config
in: body
name: config
required: true
schema:
$ref: '#/definitions/api.ClusterNodeConfig'
produces:
- application/json
responses:
"200":
description: OK
schema:
type: string
"400":
description: Bad Request
schema:
$ref: '#/definitions/api.Error'
"404":
description: Not Found
schema:
$ref: '#/definitions/api.Error'
security:
- ApiKeyAuth: []
summary: Replaces an existing node
/api/v3/cluster/node/{id}/proxy:
summary: List a proxy node by its ID
/api/v3/cluster/proxy/node/{id}/files:
get:
description: List the files of a node by its ID
operationId: cluster-3-get-node-proxy
description: List the files of a proxy node by its ID
operationId: cluster-3-get-proxy-node-files
parameters:
- description: Node ID
in: path
@ -2242,7 +2243,7 @@ paths:
$ref: '#/definitions/api.Error'
security:
- ApiKeyAuth: []
summary: List the files of a node by its ID
summary: List the files of a proxy node by its ID
/api/v3/config:
get:
description: Retrieve the currently active Restreamer configuration

View File

@ -7,10 +7,34 @@ type ClusterNodeConfig struct {
}
type ClusterNode struct {
Address string `json:"address"`
ID string `json:"id"`
LastUpdate int64 `json:"last_update"`
State string `json:"state"`
Address string `json:"address"`
ID string `json:"id"`
LastPing int64 `json:"last_ping"`
LastUpdate int64 `json:"last_update"`
Latency float64 `json:"latency_ms"` // milliseconds
State string `json:"state"`
}
type ClusterNodeFiles map[string][]string
type ClusterServer struct {
ID string `json:"id"`
Address string `json:"address"`
Voter bool `json:"voter"`
Leader bool `json:"leader"`
}
type ClusterStats struct {
State string `json:"state"`
LastContact float64 `json:"last_contact_ms"`
NumPeers uint64 `json:"num_peers"`
}
type ClusterAbout struct {
ID string `json:"id"`
Address string `json:"address"`
ClusterAPIAddress string `json:"cluster_api_address"`
CoreAPIAddress string `json:"core_api_address"`
Nodes []ClusterServer `json:"nodes"`
Stats ClusterStats `json:"stats"`
}

View File

@ -16,19 +16,19 @@ type Filesystem interface {
type filesystem struct {
fs.Filesystem
name string
cluster cluster.ClusterReader
name string
proxy cluster.ProxyReader
}
func NewClusterFS(name string, fs fs.Filesystem, cluster cluster.Cluster) Filesystem {
if cluster == nil {
func NewClusterFS(name string, fs fs.Filesystem, proxy cluster.ProxyReader) Filesystem {
if proxy == nil {
return fs
}
f := &filesystem{
Filesystem: fs,
name: name,
cluster: cluster,
proxy: proxy,
}
return f
@ -41,7 +41,7 @@ func (fs *filesystem) Open(path string) fs.File {
}
// Check if the file is available in the cluster
data, err := fs.cluster.GetFile(fs.name + ":" + path)
data, err := fs.proxy.GetFile(fs.name + ":" + path)
if err != nil {
return nil
}

View File

@ -1,37 +1,44 @@
package api
import (
"net/http"
"regexp"
"sort"
"strings"
"github.com/datarhei/core/v16/cluster"
"github.com/datarhei/core/v16/http/api"
"github.com/datarhei/core/v16/http/handler/util"
"github.com/labstack/echo/v4"
)
// The ClusterHandler type provides handler functions for manipulating the cluster config.
type ClusterHandler struct {
cluster cluster.Cluster
proxy cluster.ProxyReader
prefix *regexp.Regexp
}
/*
// NewCluster return a new ClusterHandler type. You have to provide a cluster.
func NewCluster(cluster cluster.Cluster) *ClusterHandler {
return &ClusterHandler{
cluster: cluster,
proxy: cluster.ProxyReader(),
prefix: regexp.MustCompile(`^[a-z]+:`),
}
}
// GetCluster returns the list of nodes in the cluster
// @Summary List of nodes in the cluster
// @Description List of nodes in the cluster
// @ID cluster-3-get-cluster
// GetProxyNodes returns the list of proxy nodes in the cluster
// @Summary List of proxy nodes in the cluster
// @Description List of proxy nodes in the cluster
// @ID cluster-3-get-proxy-nodes
// @Produce json
// @Success 200 {array} api.ClusterNode
// @Failure 404 {object} api.Error
// @Security ApiKeyAuth
// @Router /api/v3/cluster [get]
func (h *ClusterHandler) GetCluster(c echo.Context) error {
nodes := h.cluster.ListNodesX()
// @Router /api/v3/cluster/proxy [get]
func (h *ClusterHandler) GetProxyNodes(c echo.Context) error {
nodes := h.proxy.ListNodes()
list := []api.ClusterNode{}
@ -40,7 +47,9 @@ func (h *ClusterHandler) GetCluster(c echo.Context) error {
n := api.ClusterNode{
Address: node.Address(),
ID: state.ID,
LastPing: state.LastPing.Unix(),
LastUpdate: state.LastUpdate.Unix(),
Latency: state.Latency.Seconds() * 1000,
State: state.State,
}
@ -50,6 +59,108 @@ func (h *ClusterHandler) GetCluster(c echo.Context) error {
return c.JSON(http.StatusOK, list)
}
// GetProxyNode returns the proxy node with the given ID
// @Summary List a proxy node by its ID
// @Description List a proxy node by its ID
// @ID cluster-3-get-proxy-node
// @Produce json
// @Param id path string true "Node ID"
// @Success 200 {object} api.ClusterNode
// @Failure 404 {object} api.Error
// @Security ApiKeyAuth
// @Router /api/v3/cluster/proxy/node/{id} [get]
func (h *ClusterHandler) GetProxyNode(c echo.Context) error {
id := util.PathParam(c, "id")
peer, err := h.proxy.GetNode(id)
if err != nil {
return api.Err(http.StatusNotFound, "Node not found", "%s", err)
}
state := peer.State()
node := api.ClusterNode{
Address: peer.Address(),
ID: state.ID,
LastUpdate: state.LastUpdate.Unix(),
State: state.State,
}
return c.JSON(http.StatusOK, node)
}
// GetProxyNodeFiles returns the files from the proxy node with the given ID
// @Summary List the files of a proxy node by its ID
// @Description List the files of a proxy node by its ID
// @ID cluster-3-get-proxy-node-files
// @Produce json
// @Param id path string true "Node ID"
// @Success 200 {object} api.ClusterNodeFiles
// @Failure 404 {object} api.Error
// @Security ApiKeyAuth
// @Router /api/v3/cluster/proxy/node/{id}/files [get]
func (h *ClusterHandler) GetProxyNodeFiles(c echo.Context) error {
id := util.PathParam(c, "id")
peer, err := h.proxy.GetNode(id)
if err != nil {
return api.Err(http.StatusNotFound, "Node not found", "%s", err)
}
files := api.ClusterNodeFiles{}
state := peer.State()
sort.Strings(state.Files)
for _, path := range state.Files {
prefix := strings.TrimSuffix(h.prefix.FindString(path), ":")
path = h.prefix.ReplaceAllString(path, "")
files[prefix] = append(files[prefix], path)
}
return c.JSON(http.StatusOK, files)
}
// GetCluster returns the list of nodes in the cluster
// @Summary List of nodes in the cluster
// @Description List of nodes in the cluster
// @ID cluster-3-get-cluster
// @Produce json
// @Success 200 {object} api.ClusterAbout
// @Failure 404 {object} api.Error
// @Security ApiKeyAuth
// @Router /api/v3/cluster [get]
func (h *ClusterHandler) About(c echo.Context) error {
state, _ := h.cluster.About()
about := api.ClusterAbout{
ID: state.ID,
Address: state.Address,
ClusterAPIAddress: state.ClusterAPIAddress,
CoreAPIAddress: state.CoreAPIAddress,
Nodes: []api.ClusterServer{},
Stats: api.ClusterStats{
State: state.Stats.State,
LastContact: state.Stats.LastContact.Seconds() * 1000,
NumPeers: state.Stats.NumPeers,
},
}
for _, n := range state.Nodes {
about.Nodes = append(about.Nodes, api.ClusterServer{
ID: n.ID,
Address: n.Address,
Voter: n.Voter,
Leader: n.Leader,
})
}
return c.JSON(http.StatusOK, about)
}
/*
// AddNode adds a new node
// @Summary Add a new node
// @Description Add a new node to the cluster

View File

@ -195,7 +195,7 @@ func NewServer(config Config) (Server, error) {
corsPrefixes[httpfs.Mountpoint] = config.Cors.Origins
if httpfs.Filesystem.Type() == "disk" || httpfs.Filesystem.Type() == "mem" {
httpfs.Filesystem = fs.NewClusterFS(httpfs.Filesystem.Name(), httpfs.Filesystem, config.Cluster)
httpfs.Filesystem = fs.NewClusterFS(httpfs.Filesystem.Name(), httpfs.Filesystem, config.Cluster.ProxyReader())
}
filesystem := &filesystem{
@ -314,7 +314,7 @@ func NewServer(config Config) (Server, error) {
})
if config.Cluster != nil {
//s.v3handler.cluster = api.NewCluster(config.Cluster)
s.v3handler.cluster = api.NewCluster(config.Cluster)
}
if middleware, err := mwcors.NewWithConfig(mwcors.Config{
@ -656,19 +656,20 @@ func (s *server) setRoutesV3(v3 *echo.Group) {
}
// v3 Cluster
/*
if s.v3handler.cluster != nil {
v3.GET("/cluster", s.v3handler.cluster.GetCluster)
v3.GET("/cluster/node/:id", s.v3handler.cluster.GetNode)
v3.GET("/cluster/node/:id/proxy", s.v3handler.cluster.GetNodeProxy)
if s.v3handler.cluster != nil {
v3.GET("/cluster", s.v3handler.cluster.About)
v3.GET("/cluster/proxy", s.v3handler.cluster.GetProxyNodes)
v3.GET("/cluster/proxy/node/:id", s.v3handler.cluster.GetProxyNode)
v3.GET("/cluster/proxy/node/:id/files", s.v3handler.cluster.GetProxyNodeFiles)
/*
if !s.readOnly {
v3.POST("/cluster/node", s.v3handler.cluster.AddNode)
v3.PUT("/cluster/node/:id", s.v3handler.cluster.UpdateNode)
v3.DELETE("/cluster/node/:id", s.v3handler.cluster.DeleteNode)
}
}
*/
*/
}
// v3 Log
v3.GET("/log", s.v3handler.log.Log)

View File

@ -58,7 +58,7 @@ type Config struct {
// with methods like tls.Config.SetSessionTicketKeys.
TLSConfig *tls.Config
Cluster cluster.ClusterReader
Proxy cluster.ProxyReader
}
// Server represents a RTMP server
@ -93,7 +93,7 @@ type server struct {
channels map[string]*channel
lock sync.RWMutex
cluster cluster.ClusterReader
proxy cluster.ProxyReader
}
// New creates a new RTMP server according to the given config
@ -111,15 +111,15 @@ func New(config Config) (Server, error) {
token: config.Token,
logger: config.Logger,
collector: config.Collector,
cluster: config.Cluster,
proxy: config.Proxy,
}
if s.collector == nil {
s.collector = session.NewNullCollector()
}
if s.cluster == nil {
s.cluster = cluster.NewDummyClusterReader()
if s.proxy == nil {
s.proxy = cluster.NewNullProxyReader()
}
s.server = &rtmp.Server{
@ -254,7 +254,7 @@ func (s *server) handlePlay(conn *rtmp.Conn) {
if ch == nil {
// Check in the cluster for that stream
url, err := s.cluster.GetURL("rtmp:" + conn.URL.Path)
url, err := s.proxy.GetURL("rtmp:" + conn.URL.Path)
if err != nil {
s.log("PLAY", "NOTFOUND", conn.URL.Path, "", client)
return

View File

@ -40,7 +40,7 @@ type Config struct {
SRTLogTopics []string
Cluster cluster.ClusterReader
Proxy cluster.ProxyReader
}
// Server represents a SRT server
@ -77,7 +77,7 @@ type server struct {
srtlog map[string]*ring.Ring // Per logtopic a dedicated ring buffer
srtlogLock sync.RWMutex
cluster cluster.ClusterReader
proxy cluster.ProxyReader
}
func New(config Config) (Server, error) {
@ -87,15 +87,15 @@ func New(config Config) (Server, error) {
passphrase: config.Passphrase,
collector: config.Collector,
logger: config.Logger,
cluster: config.Cluster,
proxy: config.Proxy,
}
if s.collector == nil {
s.collector = session.NewNullCollector()
}
if s.cluster == nil {
s.cluster = cluster.NewDummyClusterReader()
if s.proxy == nil {
s.proxy = cluster.NewNullProxyReader()
}
if s.logger == nil {
@ -438,7 +438,7 @@ func (s *server) handleSubscribe(conn srt.Conn) {
if ch == nil {
// Check in the cluster for the stream and proxy it
srturl, err := s.cluster.GetURL("srt:" + si.resource)
srturl, err := s.proxy.GetURL("srt:" + si.resource)
if err != nil {
s.log("SUBSCRIBE", "NOTFOUND", si.resource, "no publisher for this resource found", client)
return