Allow to mount multiple S3 storages

This commit is contained in:
Ingo Oppermann 2022-08-23 17:37:33 +03:00
parent 20fbb9b7bc
commit 0cfe07de85
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
4 changed files with 204 additions and 53 deletions

View File

@ -65,7 +65,7 @@ type api struct {
ffmpeg ffmpeg.FFmpeg
diskfs fs.Filesystem
memfs fs.Filesystem
s3fs fs.Filesystem
s3fs map[string]fs.Filesystem
rtmpserver rtmp.Server
srtserver srt.Server
metrics monitor.HistoryMonitor
@ -115,6 +115,7 @@ var ErrConfigReload = fmt.Errorf("configuration reload")
func New(configpath string, logwriter io.Writer) (API, error) {
a := &api{
state: "idle",
s3fs: map[string]fs.Filesystem{},
}
a.config.path = configpath
@ -375,7 +376,7 @@ func (a *api) start() error {
Logger: a.log.logger.core.WithComponent("FS"),
})
if err != nil {
return err
return fmt.Errorf("disk filesystem: %w", err)
}
a.diskfs = diskfs
@ -410,10 +411,10 @@ func (a *api) start() error {
a.memfs.Resize(cfg.Storage.Memory.Size * 1024 * 1024)
}
if cfg.Storage.S3.Enable {
for _, s3 := range cfg.Storage.S3 {
baseS3FS := url.URL{
Scheme: "http",
Path: "/s3",
Path: s3.Mountpoint,
}
host, port, _ := gonet.SplitHostPort(cfg.Address)
@ -423,25 +424,29 @@ func (a *api) start() error {
baseS3FS.Host = cfg.Address
}
if cfg.Storage.S3.Auth.Enable {
baseS3FS.User = url.UserPassword(cfg.Storage.S3.Auth.Username, cfg.Storage.S3.Auth.Password)
if s3.Auth.Enable {
baseS3FS.User = url.UserPassword(s3.Auth.Username, s3.Auth.Password)
}
s3fs, err := fs.NewS3Filesystem(fs.S3Config{
Base: baseS3FS.String(),
Endpoint: cfg.Storage.S3.Endpoint,
AccessKeyID: cfg.Storage.S3.AccessKeyID,
SecretAccessKey: cfg.Storage.S3.SecretAccessKey,
Region: cfg.Storage.S3.Region,
Bucket: cfg.Storage.S3.Bucket,
UseSSL: cfg.Storage.S3.UseSSL,
Endpoint: s3.Endpoint,
AccessKeyID: s3.AccessKeyID,
SecretAccessKey: s3.SecretAccessKey,
Region: s3.Region,
Bucket: s3.Bucket,
UseSSL: s3.UseSSL,
Logger: a.log.logger.core.WithComponent("FS"),
})
if err != nil {
return err
return fmt.Errorf("s3 filesystem (%s): %w", s3.Name, err)
}
a.s3fs = s3fs
if _, ok := a.s3fs[s3.Name]; ok {
return fmt.Errorf("the name '%s' for a filesystem is already in use", s3.Name)
}
a.s3fs[s3.Name] = s3fs
}
var portrange net.Portranger
@ -449,18 +454,18 @@ func (a *api) start() error {
if cfg.Playout.Enable {
portrange, err = net.NewPortrange(cfg.Playout.MinPort, cfg.Playout.MaxPort)
if err != nil {
return err
return fmt.Errorf("playout port range: %w", err)
}
}
validatorIn, err := ffmpeg.NewValidator(cfg.FFmpeg.Access.Input.Allow, cfg.FFmpeg.Access.Input.Block)
if err != nil {
return err
return fmt.Errorf("input address validator: %w", err)
}
validatorOut, err := ffmpeg.NewValidator(cfg.FFmpeg.Access.Output.Allow, cfg.FFmpeg.Access.Output.Block)
if err != nil {
return err
return fmt.Errorf("output address validator: %w", err)
}
ffmpeg, err := ffmpeg.New(ffmpeg.Config{
@ -474,7 +479,7 @@ func (a *api) start() error {
Collector: a.sessions.Collector("ffmpeg"),
})
if err != nil {
return err
return fmt.Errorf("unable to create ffmpeg: %w", err)
}
a.ffmpeg = ffmpeg
@ -488,8 +493,8 @@ func (a *api) start() error {
a.replacer.RegisterTemplate("fs:diskfs", a.diskfs.Base())
a.replacer.RegisterTemplate("fs:memfs", a.memfs.Base())
if a.s3fs != nil {
a.replacer.RegisterTemplate("fs:s3fs", a.s3fs.Base())
for name, s3 := range a.s3fs {
a.replacer.RegisterTemplate("fs:"+name, s3.Base())
}
host, port, _ := gonet.SplitHostPort(cfg.RTMP.Address)
@ -595,8 +600,8 @@ func (a *api) start() error {
metrics.Register(monitor.NewDiskCollector(a.diskfs.Base()))
metrics.Register(monitor.NewFilesystemCollector("diskfs", diskfs))
metrics.Register(monitor.NewFilesystemCollector("memfs", a.memfs))
if a.s3fs != nil {
metrics.Register(monitor.NewFilesystemCollector("s3fs", a.s3fs))
for name, fs := range a.s3fs {
metrics.Register(monitor.NewFilesystemCollector(name, fs))
}
metrics.Register(monitor.NewRestreamCollector(a.restream))
metrics.Register(monitor.NewFFmpegCollector(a.ffmpeg))
@ -865,18 +870,18 @@ func (a *api) start() error {
},
}
if a.s3fs != nil {
for _, s3 := range cfg.Storage.S3 {
filesystems = append(filesystems, httpfs.FS{
Name: "s3fs",
Mountpoint: "/s3",
Name: s3.Name,
Mountpoint: s3.Mountpoint,
AllowWrite: true,
EnableAuth: cfg.Storage.S3.Auth.Enable,
Username: cfg.Storage.S3.Auth.Username,
Password: cfg.Storage.S3.Auth.Password,
EnableAuth: s3.Auth.Enable,
Username: s3.Auth.Username,
Password: s3.Auth.Password,
DefaultFile: "",
DefaultContentType: "application/data",
Gzip: true,
Filesystem: a.s3fs,
Filesystem: a.s3fs[s3.Name],
Cache: a.cache,
})
}

View File

@ -49,6 +49,22 @@ type Auth0Tenant struct {
Users []string `json:"users"`
}
type S3Storage struct {
Name string `json:"name"`
Mountpoint string `json:"mountpoint"`
Auth struct {
Enable bool `json:"enable"`
Username string `json:"username"`
Password string `json:"password"`
} `json:"auth"`
Endpoint string `json:"endpoint"`
AccessKeyID string `json:"access_key_id"`
SecretAccessKey string `json:"secret_access_key"`
Bucket string `json:"bucket"`
Region string `json:"region"`
UseSSL bool `json:"use_ssl"`
}
type DataVersion struct {
Version int64 `json:"version"`
}
@ -200,16 +216,7 @@ func (d *Config) init() {
d.val(newBoolValue(&d.Storage.Memory.Purge, false), "storage.memory.purge", "CORE_STORAGE_MEMORY_PURGE", nil, "Automatically remove the oldest files if /memfs is full", false, false)
// Storage (S3)
d.val(newBoolValue(&d.Storage.S3.Enable, true), "storage.s3.enable", "CORE_STORAGE_S3_ENABLE", nil, "Enable S3 storage", false, false)
d.val(newBoolValue(&d.Storage.S3.Auth.Enable, true), "storage.s3.auth.enable", "CORE_STORAGE_S3_AUTH_ENABLE", nil, "Enable basic auth for PUT,POST, and DELETE on /s3", false, false)
d.val(newStringValue(&d.Storage.S3.Auth.Username, "admin"), "storage.s3.auth.username", "CORE_STORAGE_S3_AUTH_USERNAME", nil, "Username for Basic-Auth of /s3", false, false)
d.val(newStringValue(&d.Storage.S3.Auth.Password, rand.StringAlphanumeric(18)), "storage.s3.auth.password", "CORE_STORAGE_S3_AUTH_PASSWORD", nil, "Password for Basic-Auth of /s3", false, true)
d.val(newStringValue(&d.Storage.S3.Endpoint, ""), "storage.s3.endpoint", "CORE_STORAGE_S3_ENDPOINT", nil, "S3 host", false, false)
d.val(newStringValue(&d.Storage.S3.AccessKeyID, ""), "storage.s3.acces_key_id", "CORE_STORAGE_S3_ACCESS_KEY_ID", nil, "S3 access key ID", false, false)
d.val(newStringValue(&d.Storage.S3.SecretAccessKey, ""), "storage.s3.secret_access_key", "CORE_STORAGE_S3_SECRET_ACCESS_KEY", nil, "S3 secret access key", false, true)
d.val(newStringValue(&d.Storage.S3.Bucket, ""), "storage.s3.bucket", "CORE_STORAGE_S3_BUCKET", nil, "Bucket name, will be created if it doesn't exists", false, false)
d.val(newStringValue(&d.Storage.S3.Region, ""), "storage.s3.region", "CORE_STORAGE_S3_REGION", nil, "S3 region", false, false)
d.val(newBoolValue(&d.Storage.S3.UseSSL, true), "storage.s3.use_ssl", "CORE_STORAGE_S3_USE_SSL", nil, "Enable SSL for communication (recommended)", false, false)
d.val(newS3StorageListValue(&d.Storage.S3, []S3Storage{}, "|"), "storage.s3", "CORE_STORAGE_S3", nil, "List of S3 storage URLS", false, false)
// Storage (CORS)
d.val(newCORSOriginsValue(&d.Storage.CORS.Origins, []string{"*"}, ","), "storage.cors.origins", "CORE_STORAGE_CORS_ORIGINS", nil, "Allowed CORS origins for /memfs and /data", false, false)

View File

@ -81,20 +81,7 @@ type Data struct {
Size int64 `json:"max_size_mbytes"`
Purge bool `json:"purge"`
} `json:"memory"`
S3 struct {
Enable bool `json:"enable"`
Auth struct {
Enable bool `json:"enable"`
Username string `json:"username"`
Password string `json:"password"`
} `json:"auth"`
Endpoint string `json:"endpoint"`
AccessKeyID string `json:"access_key_id"`
SecretAccessKey string `json:"secret_access_key"`
Bucket string `json:"bucket"`
Region string `json:"region"`
UseSSL bool `json:"use_ssl"`
} `json:"s3"`
S3 []S3Storage `json:"s3"`
CORS struct {
Origins []string `json:"origins"`
} `json:"cors"`

View File

@ -15,6 +15,8 @@ import (
"time"
"github.com/datarhei/core/v16/http/cors"
"golang.org/x/net/publicsuffix"
)
type value interface {
@ -223,6 +225,156 @@ func (s *tenantListValue) IsEmpty() bool {
return len(*s.p) == 0
}
// array of s3 storages
// https://access_key_id:secret_access_id@region.endpoint/bucket?name=aaa&mount=/abc&username=xxx&password=yyy
type s3StorageListValue struct {
p *[]S3Storage
separator string
}
func newS3StorageListValue(p *[]S3Storage, val []S3Storage, separator string) *s3StorageListValue {
v := &s3StorageListValue{
p: p,
separator: separator,
}
*p = val
return v
}
func (s *s3StorageListValue) Set(val string) error {
list := []S3Storage{}
for _, elm := range strings.Split(val, s.separator) {
u, err := url.Parse(elm)
if err != nil {
return fmt.Errorf("invalid S3 storage URL (%s): %w", elm, err)
}
t := S3Storage{
Name: u.Query().Get("name"),
Mountpoint: u.Query().Get("mountpoint"),
AccessKeyID: u.User.Username(),
}
hostname := u.Hostname()
port := u.Port()
domain, err := publicsuffix.EffectiveTLDPlusOne(hostname)
if err != nil {
return fmt.Errorf("invalid eTLD (%s): %w", hostname, err)
}
t.Endpoint = domain
if len(port) != 0 {
t.Endpoint += ":" + port
}
region := strings.TrimSuffix(hostname, domain)
if len(region) != 0 {
t.Region = strings.TrimSuffix(region, ".")
}
secret, ok := u.User.Password()
if ok {
t.SecretAccessKey = secret
}
t.Bucket = strings.TrimPrefix(u.Path, "/")
if u.Scheme == "https" {
t.UseSSL = true
}
if u.Query().Has("username") || u.Query().Has("password") {
t.Auth.Enable = true
t.Auth.Username = u.Query().Get("username")
t.Auth.Username = u.Query().Get("password")
}
list = append(list, t)
}
*s.p = list
return nil
}
func (s *s3StorageListValue) String() string {
if s.IsEmpty() {
return "(empty)"
}
list := []string{}
for _, t := range *s.p {
u := url.URL{}
if t.UseSSL {
u.Scheme = "https"
} else {
u.Scheme = "http"
}
u.User = url.UserPassword(t.AccessKeyID, "---")
u.Host = t.Endpoint
if len(t.Region) != 0 {
u.Host = t.Region + "." + u.Host
}
if len(t.Bucket) != 0 {
u.Path = "/" + t.Bucket
}
v := url.Values{}
v.Set("name", t.Name)
v.Set("mountpoint", t.Mountpoint)
if t.Auth.Enable {
if len(t.Auth.Username) != 0 {
v.Set("username", t.Auth.Username)
}
if len(t.Auth.Password) != 0 {
v.Set("password", t.Auth.Password)
}
}
u.RawQuery = v.Encode()
list = append(list, u.String())
}
return strings.Join(list, s.separator)
}
func (s *s3StorageListValue) Validate() error {
for i, t := range *s.p {
if len(t.Name) == 0 {
return fmt.Errorf("the name for s3 storage %d is missing", i)
}
if len(t.Mountpoint) == 0 {
return fmt.Errorf("the mountpoint for s3 storage %d is missing", i)
}
if t.Auth.Enable {
if len(t.Auth.Username) == 0 && len(t.Auth.Password) == 0 {
return fmt.Errorf("auth is enabled, but no username and password are set for s3 storage %d", i)
}
}
}
return nil
}
func (s *s3StorageListValue) IsEmpty() bool {
return len(*s.p) == 0
}
// map of strings to strings
type stringMapStringValue struct {