Don't collect sessions coming from other nodes in the cluster

This commit is contained in:
Ingo Oppermann 2023-06-27 21:11:29 +02:00
parent 89379b2acd
commit 57c1e50d60
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
5 changed files with 45 additions and 54 deletions

View File

@ -156,6 +156,8 @@ type cluster struct {
nodesLock sync.RWMutex
ready bool
limiter net.IPLimiter
}
var ErrDegraded = errors.New("cluster is currently degraded")
@ -184,12 +186,18 @@ func New(ctx context.Context, config Config) (Cluster, error) {
config: config.CoreConfig,
nodes: map[string]*clusterNode{},
limiter: config.IPLimiter,
}
if c.config == nil {
return nil, fmt.Errorf("the core config must be provided")
}
if c.limiter == nil {
c.limiter = net.NewNullIPLimiter()
}
c.isTLSRequired = c.config.TLS.Enable && c.config.TLS.Auto
host, port, err := gonet.SplitHostPort(c.config.Address)
@ -253,10 +261,8 @@ func New(ctx context.Context, config Config) (Cluster, error) {
c.api = api
nodeproxy, err := proxy.NewProxy(proxy.ProxyConfig{
ID: c.id,
Name: c.name,
IPLimiter: config.IPLimiter,
Logger: c.logger.WithField("logname", "proxy"),
ID: c.id,
Logger: c.logger.WithField("logname", "proxy"),
})
if err != nil {
c.Shutdown()
@ -872,19 +878,24 @@ func (c *cluster) trackNodeChanges() {
logger.Warn().WithError(err).Log("Discovering cluster API address")
}
cnode := NewClusterNode(id, address)
node := NewClusterNode(id, address)
if err := verifyClusterVersion(cnode.Version()); err != nil {
if err := verifyClusterVersion(node.Version()); err != nil {
logger.Warn().Log("Version mismatch. Cluster will end up in degraded mode")
}
if _, err := c.proxy.AddNode(id, cnode.Proxy()); err != nil {
if _, err := c.proxy.AddNode(id, node.Proxy()); err != nil {
logger.Warn().WithError(err).Log("Adding node")
cnode.Stop()
node.Stop()
continue
}
c.nodes[id] = cnode
c.nodes[id] = node
ips := node.IPs()
for _, ip := range ips {
c.limiter.AddBlock(ip)
}
} else {
delete(removeNodes, id)
}
@ -898,6 +909,12 @@ func (c *cluster) trackNodeChanges() {
c.proxy.RemoveNode(id)
node.Stop()
ips := node.IPs()
for _, ip := range ips {
c.limiter.RemoveBlock(ip)
}
delete(c.nodes, id)
}

View File

@ -3,6 +3,7 @@ package cluster
import (
"context"
"fmt"
"net"
"net/http"
"sync"
"time"
@ -17,6 +18,7 @@ type clusterNode struct {
id string
address string
ips []string
version string
lastContact time.Time
lastContactErr error
@ -44,6 +46,12 @@ func NewClusterNode(id, address string) *clusterNode {
},
}
if host, _, err := net.SplitHostPort(address); err == nil {
if addrs, err := net.LookupHost(host); err == nil {
n.ips = addrs
}
}
if version, err := n.client.Version(); err == nil {
n.version = version
}
@ -128,6 +136,10 @@ func (n *clusterNode) Version() string {
return n.version
}
func (n *clusterNode) IPs() []string {
return n.ips
}
func (n *clusterNode) Status() (string, error) {
n.pingLock.RLock()
defer n.pingLock.RUnlock()

View File

@ -45,7 +45,6 @@ type Node interface {
}
type NodeReader interface {
IPs() []string
Ping() (time.Duration, error)
About() NodeAbout
Version() NodeVersion
@ -106,7 +105,6 @@ const (
type node struct {
id string
address string
ips []string
peer client.RestClient
peerErr error
@ -227,11 +225,6 @@ func (n *node) connect(ctx context.Context) error {
return fmt.Errorf("invalid address (%s): %w", u.Host, err)
}
addrs, err := net.LookupHost(nodehost)
if err != nil {
return fmt.Errorf("lookup failed for %s: %w", nodehost, err)
}
peer, err := client.New(client.Config{
Address: u.String(),
Client: &http.Client{
@ -297,8 +290,6 @@ func (n *node) connect(ctx context.Context) error {
n.srtAddress.RawQuery = v.Encode()
}
n.ips = addrs
n.peer = peer
n.peerWg.Add(2)
@ -638,10 +629,6 @@ func (n *node) Version() NodeVersion {
return version
}
func (n *node) IPs() []string {
return n.ips
}
func (n *node) Files() NodeFiles {
id := n.About().ID

View File

@ -10,7 +10,6 @@ import (
"time"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/net"
"github.com/datarhei/core/v16/restream/app"
clientapi "github.com/datarhei/core-client-go/v16/api"
@ -119,24 +118,19 @@ func (p *proxyReader) GetFileInfo(prefix, path string) (int64, time.Time, error)
}
type ProxyConfig struct {
ID string // ID of the node
Name string // Name of the node
ID string // ID of the node
IPLimiter net.IPLimiter
Logger log.Logger
Logger log.Logger
}
type proxy struct {
id string
name string
id string
nodes map[string]Node // List of known nodes
idfiles map[string][]string // Map from nodeid to list of files
idupdate map[string]time.Time // Map from nodeid to time of last update
fileid map[string]string // Map from file name to nodeid
limiter net.IPLimiter
updates chan NodeFiles
lock sync.RWMutex
@ -152,20 +146,14 @@ var ErrNodeNotFound = errors.New("node not found")
func NewProxy(config ProxyConfig) (Proxy, error) {
p := &proxy{
id: config.ID,
name: config.Name,
nodes: map[string]Node{},
idfiles: map[string][]string{},
idupdate: map[string]time.Time{},
fileid: map[string]string{},
limiter: config.IPLimiter,
updates: make(chan NodeFiles, 64),
logger: config.Logger,
}
if p.limiter == nil {
p.limiter = net.NewNullIPLimiter()
}
if p.logger == nil {
p.logger = log.New("")
}
@ -281,17 +269,6 @@ func (p *proxy) AddNode(id string, node Node) (string, error) {
n.StopFiles()
delete(p.nodes, id)
ips := node.IPs()
for _, ip := range ips {
p.limiter.RemoveBlock(ip)
}
}
ips := node.IPs()
for _, ip := range ips {
p.limiter.AddBlock(ip)
}
p.nodes[id] = node
@ -320,12 +297,6 @@ func (p *proxy) RemoveNode(id string) error {
delete(p.nodes, id)
ips := node.IPs()
for _, ip := range ips {
p.limiter.RemoveBlock(ip)
}
p.logger.Info().WithFields(log.Fields{
"id": id,
}).Log("Removed node")

View File

@ -48,6 +48,10 @@ func NewHTTPWithConfig(config HTTPConfig) echo.MiddlewareFunc {
return next(c)
}
if !config.Collector.IsCollectableIP(c.RealIP()) {
return next(c)
}
ctxuser := util.DefaultContext(c, "user", "")
req := c.Request()