Use pattern to match placeholders for filesystems

This commit is contained in:
Ingo Oppermann 2022-08-24 12:31:04 +03:00
parent e74149eed2
commit 3a6281295c
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
2 changed files with 17 additions and 15 deletions

View File

@ -652,16 +652,17 @@ A command is defined as:
Currently supported placeholders are: Currently supported placeholders are:
| Placeholder | Description | Location | | Placeholder | Description | Location |
| ------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------ | ----------------------------------------------------------------------------------------------------------------------- | | ------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------ | ----------------------------------------------------------------------------------------------------------------------- |
| `{diskfs}` | Will be replaced by the provided `CORE_STORAGE_DISK_DIR`. | `options`, `input.address`, `input.options`, `output.address`, `output.options` | | `{diskfs}` or `{fs:disk}` | Will be replaced by the provided `CORE_STORAGE_DISK_DIR`. | `options`, `input.address`, `input.options`, `output.address`, `output.options` |
| `{memfs}` | Will be replace by the base URL of the MemFS. | `input.address`, `input.options`, `output.address`, `output.options` | | `{memfs}` or `{fs:mem}` | Will be replaced by the base address of the MemFS. | `input.address`, `input.options`, `output.address`, `output.options` |
| `{processid}` | Will be replaced by the ID of the process. | `input.id`, `input.address`, `input.options`, `output.id`, `output.address`, `output.options`, `output.cleanup.pattern` | | `{fs:*}` | Will be replaces by the base address of the respective filesystem. | See `{memfs}` |
| `{reference}` | Will be replaced by the reference of the process | `input.id`, `input.address`, `input.options`, `output.id`, `output.address`, `output.options`, `output.cleanup.pattern` | | `{processid}` | Will be replaced by the ID of the process. | `input.id`, `input.address`, `input.options`, `output.id`, `output.address`, `output.options`, `output.cleanup.pattern` |
| `{inputid}` | Will be replaced by the ID of the input. | `input.address`, `input.options` | | `{reference}` | Will be replaced by the reference of the process | `input.id`, `input.address`, `input.options`, `output.id`, `output.address`, `output.options`, `output.cleanup.pattern` |
| `{outputid}` | Will be replaced by the ID of the output. | `output.address`, `output.options`, `output.cleanup.pattern` | | `{inputid}` | Will be replaced by the ID of the input. | `input.address`, `input.options` |
| `{rtmp}` | Will be replaced by the internal address of the RTMP server. Requires parameter `name` (name of the stream). | `input.address`, `output.address` | | `{outputid}` | Will be replaced by the ID of the output. | `output.address`, `output.options`, `output.cleanup.pattern` |
| `{srt}` | Will be replaced by the internal address of the SRT server. Requires parameter `name` (name of the stream) and `mode` (either `publish` or `request`). | `input.address`, `output.address` | | `{rtmp}` | Will be replaced by the internal address of the RTMP server. Requires parameter `name` (name of the stream). | `input.address`, `output.address` |
| `{srt}` | Will be replaced by the internal address of the SRT server. Requires parameter `name` (name of the stream) and `mode` (either `publish` or `request`). | `input.address`, `output.address` |
Before replacing the placeholders in the process config, all references (see below) will be resolved. Before replacing the placeholders in the process config, all references (see below) will be resolved.

View File

@ -59,6 +59,7 @@ type Config struct {
ID string ID string
Name string Name string
Store store.Store Store store.Store
Filesystems []fs.Filesystem
DiskFS fs.Filesystem DiskFS fs.Filesystem
MemFS fs.Filesystem MemFS fs.Filesystem
Replace replace.Replacer Replace replace.Replacer
@ -188,7 +189,7 @@ func (r *restream) Start() {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
r.fs.stopObserver = cancel r.fs.stopObserver = cancel
go r.observe(ctx, 10*time.Second) go r.observe(ctx, r.fs.diskfs, 10*time.Second)
r.stopOnce = sync.Once{} r.stopOnce = sync.Once{}
}) })
@ -219,7 +220,7 @@ func (r *restream) Stop() {
}) })
} }
func (r *restream) observe(ctx context.Context, interval time.Duration) { func (r *restream) observe(ctx context.Context, fs fs.Filesystem, interval time.Duration) {
ticker := time.NewTicker(interval) ticker := time.NewTicker(interval)
defer ticker.Stop() defer ticker.Stop()
@ -228,14 +229,14 @@ func (r *restream) observe(ctx context.Context, interval time.Duration) {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-ticker.C: case <-ticker.C:
size, limit := r.fs.diskfs.Size() size, limit := fs.Size()
isFull := false isFull := false
if limit > 0 && size >= limit { if limit > 0 && size >= limit {
isFull = true isFull = true
} }
if isFull { if isFull {
// Stop all tasks that write to disk // Stop all tasks that write to this filesystem
r.lock.Lock() r.lock.Lock()
for id, t := range r.tasks { for id, t := range r.tasks {
if !t.valid { if !t.valid {
@ -250,7 +251,7 @@ func (r *restream) observe(ctx context.Context, interval time.Duration) {
continue continue
} }
r.logger.Warn().Log("Shutting down because disk is full") r.logger.Warn().Log("Shutting down because filesystem is full")
r.stopProcess(id) r.stopProcess(id)
} }
r.lock.Unlock() r.lock.Unlock()