From 0cfe07de8515a4ebee6482ce53c521a6158ab600 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Tue, 23 Aug 2022 17:37:33 +0300 Subject: [PATCH] Allow to mount multiple S3 storages --- app/api/api.go | 63 +++++++++++--------- config/config.go | 27 +++++---- config/data.go | 15 +---- config/types.go | 152 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 204 insertions(+), 53 deletions(-) diff --git a/app/api/api.go b/app/api/api.go index 876074da..7f3c7573 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -65,7 +65,7 @@ type api struct { ffmpeg ffmpeg.FFmpeg diskfs fs.Filesystem memfs fs.Filesystem - s3fs fs.Filesystem + s3fs map[string]fs.Filesystem rtmpserver rtmp.Server srtserver srt.Server metrics monitor.HistoryMonitor @@ -115,6 +115,7 @@ var ErrConfigReload = fmt.Errorf("configuration reload") func New(configpath string, logwriter io.Writer) (API, error) { a := &api{ state: "idle", + s3fs: map[string]fs.Filesystem{}, } a.config.path = configpath @@ -375,7 +376,7 @@ func (a *api) start() error { Logger: a.log.logger.core.WithComponent("FS"), }) if err != nil { - return err + return fmt.Errorf("disk filesystem: %w", err) } a.diskfs = diskfs @@ -410,10 +411,10 @@ func (a *api) start() error { a.memfs.Resize(cfg.Storage.Memory.Size * 1024 * 1024) } - if cfg.Storage.S3.Enable { + for _, s3 := range cfg.Storage.S3 { baseS3FS := url.URL{ Scheme: "http", - Path: "/s3", + Path: s3.Mountpoint, } host, port, _ := gonet.SplitHostPort(cfg.Address) @@ -423,25 +424,29 @@ func (a *api) start() error { baseS3FS.Host = cfg.Address } - if cfg.Storage.S3.Auth.Enable { - baseS3FS.User = url.UserPassword(cfg.Storage.S3.Auth.Username, cfg.Storage.S3.Auth.Password) + if s3.Auth.Enable { + baseS3FS.User = url.UserPassword(s3.Auth.Username, s3.Auth.Password) } s3fs, err := fs.NewS3Filesystem(fs.S3Config{ Base: baseS3FS.String(), - Endpoint: cfg.Storage.S3.Endpoint, - AccessKeyID: cfg.Storage.S3.AccessKeyID, - SecretAccessKey: cfg.Storage.S3.SecretAccessKey, - Region: cfg.Storage.S3.Region, - Bucket: cfg.Storage.S3.Bucket, - UseSSL: cfg.Storage.S3.UseSSL, + Endpoint: s3.Endpoint, + AccessKeyID: s3.AccessKeyID, + SecretAccessKey: s3.SecretAccessKey, + Region: s3.Region, + Bucket: s3.Bucket, + UseSSL: s3.UseSSL, Logger: a.log.logger.core.WithComponent("FS"), }) if err != nil { - return err + return fmt.Errorf("s3 filesystem (%s): %w", s3.Name, err) } - a.s3fs = s3fs + if _, ok := a.s3fs[s3.Name]; ok { + return fmt.Errorf("the name '%s' for a filesystem is already in use", s3.Name) + } + + a.s3fs[s3.Name] = s3fs } var portrange net.Portranger @@ -449,18 +454,18 @@ func (a *api) start() error { if cfg.Playout.Enable { portrange, err = net.NewPortrange(cfg.Playout.MinPort, cfg.Playout.MaxPort) if err != nil { - return err + return fmt.Errorf("playout port range: %w", err) } } validatorIn, err := ffmpeg.NewValidator(cfg.FFmpeg.Access.Input.Allow, cfg.FFmpeg.Access.Input.Block) if err != nil { - return err + return fmt.Errorf("input address validator: %w", err) } validatorOut, err := ffmpeg.NewValidator(cfg.FFmpeg.Access.Output.Allow, cfg.FFmpeg.Access.Output.Block) if err != nil { - return err + return fmt.Errorf("output address validator: %w", err) } ffmpeg, err := ffmpeg.New(ffmpeg.Config{ @@ -474,7 +479,7 @@ func (a *api) start() error { Collector: a.sessions.Collector("ffmpeg"), }) if err != nil { - return err + return fmt.Errorf("unable to create ffmpeg: %w", err) } a.ffmpeg = ffmpeg @@ -488,8 +493,8 @@ func (a *api) start() error { a.replacer.RegisterTemplate("fs:diskfs", a.diskfs.Base()) a.replacer.RegisterTemplate("fs:memfs", a.memfs.Base()) - if a.s3fs != nil { - a.replacer.RegisterTemplate("fs:s3fs", a.s3fs.Base()) + for name, s3 := range a.s3fs { + a.replacer.RegisterTemplate("fs:"+name, s3.Base()) } host, port, _ := gonet.SplitHostPort(cfg.RTMP.Address) @@ -595,8 +600,8 @@ func (a *api) start() error { metrics.Register(monitor.NewDiskCollector(a.diskfs.Base())) metrics.Register(monitor.NewFilesystemCollector("diskfs", diskfs)) metrics.Register(monitor.NewFilesystemCollector("memfs", a.memfs)) - if a.s3fs != nil { - metrics.Register(monitor.NewFilesystemCollector("s3fs", a.s3fs)) + for name, fs := range a.s3fs { + metrics.Register(monitor.NewFilesystemCollector(name, fs)) } metrics.Register(monitor.NewRestreamCollector(a.restream)) metrics.Register(monitor.NewFFmpegCollector(a.ffmpeg)) @@ -865,18 +870,18 @@ func (a *api) start() error { }, } - if a.s3fs != nil { + for _, s3 := range cfg.Storage.S3 { filesystems = append(filesystems, httpfs.FS{ - Name: "s3fs", - Mountpoint: "/s3", + Name: s3.Name, + Mountpoint: s3.Mountpoint, AllowWrite: true, - EnableAuth: cfg.Storage.S3.Auth.Enable, - Username: cfg.Storage.S3.Auth.Username, - Password: cfg.Storage.S3.Auth.Password, + EnableAuth: s3.Auth.Enable, + Username: s3.Auth.Username, + Password: s3.Auth.Password, DefaultFile: "", DefaultContentType: "application/data", Gzip: true, - Filesystem: a.s3fs, + Filesystem: a.s3fs[s3.Name], Cache: a.cache, }) } diff --git a/config/config.go b/config/config.go index 810e24e5..774504ad 100644 --- a/config/config.go +++ b/config/config.go @@ -49,6 +49,22 @@ type Auth0Tenant struct { Users []string `json:"users"` } +type S3Storage struct { + Name string `json:"name"` + Mountpoint string `json:"mountpoint"` + Auth struct { + Enable bool `json:"enable"` + Username string `json:"username"` + Password string `json:"password"` + } `json:"auth"` + Endpoint string `json:"endpoint"` + AccessKeyID string `json:"access_key_id"` + SecretAccessKey string `json:"secret_access_key"` + Bucket string `json:"bucket"` + Region string `json:"region"` + UseSSL bool `json:"use_ssl"` +} + type DataVersion struct { Version int64 `json:"version"` } @@ -200,16 +216,7 @@ func (d *Config) init() { d.val(newBoolValue(&d.Storage.Memory.Purge, false), "storage.memory.purge", "CORE_STORAGE_MEMORY_PURGE", nil, "Automatically remove the oldest files if /memfs is full", false, false) // Storage (S3) - d.val(newBoolValue(&d.Storage.S3.Enable, true), "storage.s3.enable", "CORE_STORAGE_S3_ENABLE", nil, "Enable S3 storage", false, false) - d.val(newBoolValue(&d.Storage.S3.Auth.Enable, true), "storage.s3.auth.enable", "CORE_STORAGE_S3_AUTH_ENABLE", nil, "Enable basic auth for PUT,POST, and DELETE on /s3", false, false) - d.val(newStringValue(&d.Storage.S3.Auth.Username, "admin"), "storage.s3.auth.username", "CORE_STORAGE_S3_AUTH_USERNAME", nil, "Username for Basic-Auth of /s3", false, false) - d.val(newStringValue(&d.Storage.S3.Auth.Password, rand.StringAlphanumeric(18)), "storage.s3.auth.password", "CORE_STORAGE_S3_AUTH_PASSWORD", nil, "Password for Basic-Auth of /s3", false, true) - d.val(newStringValue(&d.Storage.S3.Endpoint, ""), "storage.s3.endpoint", "CORE_STORAGE_S3_ENDPOINT", nil, "S3 host", false, false) - d.val(newStringValue(&d.Storage.S3.AccessKeyID, ""), "storage.s3.acces_key_id", "CORE_STORAGE_S3_ACCESS_KEY_ID", nil, "S3 access key ID", false, false) - d.val(newStringValue(&d.Storage.S3.SecretAccessKey, ""), "storage.s3.secret_access_key", "CORE_STORAGE_S3_SECRET_ACCESS_KEY", nil, "S3 secret access key", false, true) - d.val(newStringValue(&d.Storage.S3.Bucket, ""), "storage.s3.bucket", "CORE_STORAGE_S3_BUCKET", nil, "Bucket name, will be created if it doesn't exists", false, false) - d.val(newStringValue(&d.Storage.S3.Region, ""), "storage.s3.region", "CORE_STORAGE_S3_REGION", nil, "S3 region", false, false) - d.val(newBoolValue(&d.Storage.S3.UseSSL, true), "storage.s3.use_ssl", "CORE_STORAGE_S3_USE_SSL", nil, "Enable SSL for communication (recommended)", false, false) + d.val(newS3StorageListValue(&d.Storage.S3, []S3Storage{}, "|"), "storage.s3", "CORE_STORAGE_S3", nil, "List of S3 storage URLS", false, false) // Storage (CORS) d.val(newCORSOriginsValue(&d.Storage.CORS.Origins, []string{"*"}, ","), "storage.cors.origins", "CORE_STORAGE_CORS_ORIGINS", nil, "Allowed CORS origins for /memfs and /data", false, false) diff --git a/config/data.go b/config/data.go index fe052017..2bdd3203 100644 --- a/config/data.go +++ b/config/data.go @@ -81,20 +81,7 @@ type Data struct { Size int64 `json:"max_size_mbytes"` Purge bool `json:"purge"` } `json:"memory"` - S3 struct { - Enable bool `json:"enable"` - Auth struct { - Enable bool `json:"enable"` - Username string `json:"username"` - Password string `json:"password"` - } `json:"auth"` - Endpoint string `json:"endpoint"` - AccessKeyID string `json:"access_key_id"` - SecretAccessKey string `json:"secret_access_key"` - Bucket string `json:"bucket"` - Region string `json:"region"` - UseSSL bool `json:"use_ssl"` - } `json:"s3"` + S3 []S3Storage `json:"s3"` CORS struct { Origins []string `json:"origins"` } `json:"cors"` diff --git a/config/types.go b/config/types.go index 3b5532ec..2897d593 100644 --- a/config/types.go +++ b/config/types.go @@ -15,6 +15,8 @@ import ( "time" "github.com/datarhei/core/v16/http/cors" + + "golang.org/x/net/publicsuffix" ) type value interface { @@ -223,6 +225,156 @@ func (s *tenantListValue) IsEmpty() bool { return len(*s.p) == 0 } +// array of s3 storages +// https://access_key_id:secret_access_id@region.endpoint/bucket?name=aaa&mount=/abc&username=xxx&password=yyy + +type s3StorageListValue struct { + p *[]S3Storage + separator string +} + +func newS3StorageListValue(p *[]S3Storage, val []S3Storage, separator string) *s3StorageListValue { + v := &s3StorageListValue{ + p: p, + separator: separator, + } + + *p = val + return v +} + +func (s *s3StorageListValue) Set(val string) error { + list := []S3Storage{} + + for _, elm := range strings.Split(val, s.separator) { + u, err := url.Parse(elm) + if err != nil { + return fmt.Errorf("invalid S3 storage URL (%s): %w", elm, err) + } + + t := S3Storage{ + Name: u.Query().Get("name"), + Mountpoint: u.Query().Get("mountpoint"), + AccessKeyID: u.User.Username(), + } + + hostname := u.Hostname() + port := u.Port() + + domain, err := publicsuffix.EffectiveTLDPlusOne(hostname) + if err != nil { + return fmt.Errorf("invalid eTLD (%s): %w", hostname, err) + } + + t.Endpoint = domain + if len(port) != 0 { + t.Endpoint += ":" + port + } + + region := strings.TrimSuffix(hostname, domain) + if len(region) != 0 { + t.Region = strings.TrimSuffix(region, ".") + } + + secret, ok := u.User.Password() + if ok { + t.SecretAccessKey = secret + } + + t.Bucket = strings.TrimPrefix(u.Path, "/") + + if u.Scheme == "https" { + t.UseSSL = true + } + + if u.Query().Has("username") || u.Query().Has("password") { + t.Auth.Enable = true + t.Auth.Username = u.Query().Get("username") + t.Auth.Username = u.Query().Get("password") + } + + list = append(list, t) + } + + *s.p = list + + return nil +} + +func (s *s3StorageListValue) String() string { + if s.IsEmpty() { + return "(empty)" + } + + list := []string{} + + for _, t := range *s.p { + u := url.URL{} + + if t.UseSSL { + u.Scheme = "https" + } else { + u.Scheme = "http" + } + + u.User = url.UserPassword(t.AccessKeyID, "---") + + u.Host = t.Endpoint + + if len(t.Region) != 0 { + u.Host = t.Region + "." + u.Host + } + + if len(t.Bucket) != 0 { + u.Path = "/" + t.Bucket + } + + v := url.Values{} + v.Set("name", t.Name) + v.Set("mountpoint", t.Mountpoint) + + if t.Auth.Enable { + if len(t.Auth.Username) != 0 { + v.Set("username", t.Auth.Username) + } + + if len(t.Auth.Password) != 0 { + v.Set("password", t.Auth.Password) + } + } + + u.RawQuery = v.Encode() + + list = append(list, u.String()) + } + + return strings.Join(list, s.separator) +} + +func (s *s3StorageListValue) Validate() error { + for i, t := range *s.p { + if len(t.Name) == 0 { + return fmt.Errorf("the name for s3 storage %d is missing", i) + } + + if len(t.Mountpoint) == 0 { + return fmt.Errorf("the mountpoint for s3 storage %d is missing", i) + } + + if t.Auth.Enable { + if len(t.Auth.Username) == 0 && len(t.Auth.Password) == 0 { + return fmt.Errorf("auth is enabled, but no username and password are set for s3 storage %d", i) + } + } + } + + return nil +} + +func (s *s3StorageListValue) IsEmpty() bool { + return len(*s.p) == 0 +} + // map of strings to strings type stringMapStringValue struct {