diff --git a/cluster/cluster.go b/cluster/cluster.go index e1bc64c6..679d74cb 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -156,6 +156,8 @@ type cluster struct { nodesLock sync.RWMutex ready bool + + limiter net.IPLimiter } var ErrDegraded = errors.New("cluster is currently degraded") @@ -184,12 +186,18 @@ func New(ctx context.Context, config Config) (Cluster, error) { config: config.CoreConfig, nodes: map[string]*clusterNode{}, + + limiter: config.IPLimiter, } if c.config == nil { return nil, fmt.Errorf("the core config must be provided") } + if c.limiter == nil { + c.limiter = net.NewNullIPLimiter() + } + c.isTLSRequired = c.config.TLS.Enable && c.config.TLS.Auto host, port, err := gonet.SplitHostPort(c.config.Address) @@ -253,10 +261,8 @@ func New(ctx context.Context, config Config) (Cluster, error) { c.api = api nodeproxy, err := proxy.NewProxy(proxy.ProxyConfig{ - ID: c.id, - Name: c.name, - IPLimiter: config.IPLimiter, - Logger: c.logger.WithField("logname", "proxy"), + ID: c.id, + Logger: c.logger.WithField("logname", "proxy"), }) if err != nil { c.Shutdown() @@ -872,19 +878,24 @@ func (c *cluster) trackNodeChanges() { logger.Warn().WithError(err).Log("Discovering cluster API address") } - cnode := NewClusterNode(id, address) + node := NewClusterNode(id, address) - if err := verifyClusterVersion(cnode.Version()); err != nil { + if err := verifyClusterVersion(node.Version()); err != nil { logger.Warn().Log("Version mismatch. Cluster will end up in degraded mode") } - if _, err := c.proxy.AddNode(id, cnode.Proxy()); err != nil { + if _, err := c.proxy.AddNode(id, node.Proxy()); err != nil { logger.Warn().WithError(err).Log("Adding node") - cnode.Stop() + node.Stop() continue } - c.nodes[id] = cnode + c.nodes[id] = node + + ips := node.IPs() + for _, ip := range ips { + c.limiter.AddBlock(ip) + } } else { delete(removeNodes, id) } @@ -898,6 +909,12 @@ func (c *cluster) trackNodeChanges() { c.proxy.RemoveNode(id) node.Stop() + + ips := node.IPs() + for _, ip := range ips { + c.limiter.RemoveBlock(ip) + } + delete(c.nodes, id) } diff --git a/cluster/node.go b/cluster/node.go index a4bd0e73..712268da 100644 --- a/cluster/node.go +++ b/cluster/node.go @@ -3,6 +3,7 @@ package cluster import ( "context" "fmt" + "net" "net/http" "sync" "time" @@ -17,6 +18,7 @@ type clusterNode struct { id string address string + ips []string version string lastContact time.Time lastContactErr error @@ -44,6 +46,12 @@ func NewClusterNode(id, address string) *clusterNode { }, } + if host, _, err := net.SplitHostPort(address); err == nil { + if addrs, err := net.LookupHost(host); err == nil { + n.ips = addrs + } + } + if version, err := n.client.Version(); err == nil { n.version = version } @@ -128,6 +136,10 @@ func (n *clusterNode) Version() string { return n.version } +func (n *clusterNode) IPs() []string { + return n.ips +} + func (n *clusterNode) Status() (string, error) { n.pingLock.RLock() defer n.pingLock.RUnlock() diff --git a/cluster/proxy/node.go b/cluster/proxy/node.go index 9e67fd34..afc754a0 100644 --- a/cluster/proxy/node.go +++ b/cluster/proxy/node.go @@ -45,7 +45,6 @@ type Node interface { } type NodeReader interface { - IPs() []string Ping() (time.Duration, error) About() NodeAbout Version() NodeVersion @@ -106,7 +105,6 @@ const ( type node struct { id string address string - ips []string peer client.RestClient peerErr error @@ -227,11 +225,6 @@ func (n *node) connect(ctx context.Context) error { return fmt.Errorf("invalid address (%s): %w", u.Host, err) } - addrs, err := net.LookupHost(nodehost) - if err != nil { - return fmt.Errorf("lookup failed for %s: %w", nodehost, err) - } - peer, err := client.New(client.Config{ Address: u.String(), Client: &http.Client{ @@ -297,8 +290,6 @@ func (n *node) connect(ctx context.Context) error { n.srtAddress.RawQuery = v.Encode() } - n.ips = addrs - n.peer = peer n.peerWg.Add(2) @@ -638,10 +629,6 @@ func (n *node) Version() NodeVersion { return version } -func (n *node) IPs() []string { - return n.ips -} - func (n *node) Files() NodeFiles { id := n.About().ID diff --git a/cluster/proxy/proxy.go b/cluster/proxy/proxy.go index e472cfbc..371e300a 100644 --- a/cluster/proxy/proxy.go +++ b/cluster/proxy/proxy.go @@ -10,7 +10,6 @@ import ( "time" "github.com/datarhei/core/v16/log" - "github.com/datarhei/core/v16/net" "github.com/datarhei/core/v16/restream/app" clientapi "github.com/datarhei/core-client-go/v16/api" @@ -119,24 +118,19 @@ func (p *proxyReader) GetFileInfo(prefix, path string) (int64, time.Time, error) } type ProxyConfig struct { - ID string // ID of the node - Name string // Name of the node + ID string // ID of the node - IPLimiter net.IPLimiter - Logger log.Logger + Logger log.Logger } type proxy struct { - id string - name string + id string nodes map[string]Node // List of known nodes idfiles map[string][]string // Map from nodeid to list of files idupdate map[string]time.Time // Map from nodeid to time of last update fileid map[string]string // Map from file name to nodeid - limiter net.IPLimiter - updates chan NodeFiles lock sync.RWMutex @@ -152,20 +146,14 @@ var ErrNodeNotFound = errors.New("node not found") func NewProxy(config ProxyConfig) (Proxy, error) { p := &proxy{ id: config.ID, - name: config.Name, nodes: map[string]Node{}, idfiles: map[string][]string{}, idupdate: map[string]time.Time{}, fileid: map[string]string{}, - limiter: config.IPLimiter, updates: make(chan NodeFiles, 64), logger: config.Logger, } - if p.limiter == nil { - p.limiter = net.NewNullIPLimiter() - } - if p.logger == nil { p.logger = log.New("") } @@ -281,17 +269,6 @@ func (p *proxy) AddNode(id string, node Node) (string, error) { n.StopFiles() delete(p.nodes, id) - - ips := node.IPs() - - for _, ip := range ips { - p.limiter.RemoveBlock(ip) - } - } - - ips := node.IPs() - for _, ip := range ips { - p.limiter.AddBlock(ip) } p.nodes[id] = node @@ -320,12 +297,6 @@ func (p *proxy) RemoveNode(id string) error { delete(p.nodes, id) - ips := node.IPs() - - for _, ip := range ips { - p.limiter.RemoveBlock(ip) - } - p.logger.Info().WithFields(log.Fields{ "id": id, }).Log("Removed node") diff --git a/http/middleware/session/HTTP.go b/http/middleware/session/HTTP.go index 07712661..59d88474 100644 --- a/http/middleware/session/HTTP.go +++ b/http/middleware/session/HTTP.go @@ -48,6 +48,10 @@ func NewHTTPWithConfig(config HTTPConfig) echo.MiddlewareFunc { return next(c) } + if !config.Collector.IsCollectableIP(c.RealIP()) { + return next(c) + } + ctxuser := util.DefaultContext(c, "user", "") req := c.Request()