Fix proper cancelation of event emitting, refarctor cluster events

This commit is contained in:
Ingo Oppermann 2025-12-05 16:30:33 +01:00
parent 36f156e4ed
commit b5b16a6f9a
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
16 changed files with 592 additions and 216 deletions

View File

@ -106,7 +106,7 @@ type api struct {
log struct {
writer io.Writer
buffer log.BufferWriter
events log.ChannelWriter
events *log.ChannelWriter
logger struct {
core log.Logger
main log.Logger

View File

@ -13,6 +13,7 @@ import (
"time"
"github.com/datarhei/core/v16/config"
"github.com/datarhei/core/v16/event"
"github.com/datarhei/core/v16/http/api"
"github.com/datarhei/core/v16/http/client"
"github.com/datarhei/core/v16/log"
@ -99,6 +100,11 @@ type Core struct {
media map[string]*Media
mediaLock sync.RWMutex
events struct {
log *event.PubSub
process *event.PubSub
}
logger log.Logger
}
@ -111,6 +117,9 @@ func NewCore(id string, logger log.Logger) *Core {
media: map[string]*Media{},
}
core.events.log = event.NewPubSub()
core.events.process = event.NewPubSub()
if core.logger == nil {
core.logger = log.New("")
}
@ -174,6 +183,9 @@ func (n *Core) Stop() {
n.cancel()
n.cancel = nil
n.events.process.Close()
n.events.log.Close()
n.disconnect()
}
@ -321,6 +333,9 @@ func (n *Core) connect() error {
go n.mediaEvents(ctx, "rtmp")
go n.mediaEvents(ctx, "srt")
go n.logEvents(ctx)
go n.processEvents(ctx)
n.lock.Unlock()
return nil
@ -933,14 +948,114 @@ func (n *Core) ClusterProcessList() ([]Process, error) {
return processes, nil
}
func (n *Core) Events(ctx context.Context, filters api.LogEventFilters) (<-chan api.LogEvent, error) {
n.lock.RLock()
client := n.client
n.lock.RUnlock()
func (n *Core) logEvents(ctx context.Context) {
defer func() {
n.logger.Warn().WithField("source", "log").Log("Disconnected from event source")
}()
if client == nil {
return nil, ErrNoPeer
for {
select {
case <-ctx.Done():
return
default:
}
n.lock.RLock()
client := n.client
n.lock.RUnlock()
if client == nil {
n.logger.Error().WithField("source", "log").Log("Failed to connect to event source, client not connected")
time.Sleep(5 * time.Second)
continue
}
ch, err := client.LogEvents(ctx, api.LogEventFilters{})
if err != nil {
n.logger.Error().WithField("source", "log").WithError(err).Log("Failed to connect to event source")
time.Sleep(5 * time.Second)
continue
}
n.logger.Info().WithField("source", "log").Log("Connected to event source")
innerloop:
for {
select {
case <-ctx.Done():
return
case e, ok := <-ch:
if !ok {
break innerloop
}
e.CoreID = n.id
n.events.log.Publish(e.Marshal())
}
}
n.logger.Info().WithField("source", "process").Log("Reconnecting to event source")
time.Sleep(5 * time.Second)
}
return client.Events(ctx, filters)
}
func (n *Core) LogEventsSource() event.EventSource {
return n.events.log.EventSource()
}
func (n *Core) processEvents(ctx context.Context) {
defer func() {
n.logger.Warn().WithField("source", "process").Log("Disconnected from event source")
}()
for {
select {
case <-ctx.Done():
return
default:
}
n.lock.RLock()
client := n.client
n.lock.RUnlock()
if client == nil {
n.logger.Error().WithField("source", "process").Log("Failed to connect to event source, client not connected")
time.Sleep(5 * time.Second)
continue
}
ch, err := client.ProcessEvents(ctx, api.ProcessEventFilters{})
if err != nil {
n.logger.Error().WithField("source", "process").WithError(err).Log("Failed to connect to event source")
time.Sleep(5 * time.Second)
continue
}
n.logger.Info().WithField("source", "process").Log("Connected to event source")
innerloop:
for {
select {
case <-ctx.Done():
return
case e, ok := <-ch:
if !ok {
break innerloop
}
e.CoreID = n.id
n.events.process.Publish(e.Marshal())
}
}
n.logger.Info().WithField("source", "process").Log("Reconnecting to event source")
time.Sleep(5 * time.Second)
}
}
func (n *Core) ProcessEventsSource() event.EventSource {
return n.events.process.EventSource()
}

View File

@ -1,7 +1,6 @@
package node
import (
"context"
"errors"
"fmt"
"io"
@ -11,6 +10,7 @@ import (
"sync"
"time"
"github.com/datarhei/core/v16/event"
"github.com/datarhei/core/v16/http/api"
"github.com/datarhei/core/v16/http/client"
"github.com/datarhei/core/v16/log"
@ -34,6 +34,11 @@ type Manager struct {
cache *Cache[string]
logger log.Logger
events struct {
log *event.PubSub
process *event.PubSub
}
}
var ErrNodeNotFound = errors.New("node not found")
@ -50,6 +55,9 @@ func NewManager(config ManagerConfig) (*Manager, error) {
p.logger = log.New("")
}
p.events.log = event.NewPubSub()
p.events.process = event.NewPubSub()
return p, nil
}
@ -66,6 +74,9 @@ func (p *Manager) NodeAdd(id string, node *Node) (string, error) {
p.nodes[id] = node
p.events.log.Consume(node.Core().LogEventsSource(), nil)
p.events.process.Consume(node.Core().ProcessEventsSource(), nil)
p.logger.Info().WithFields(log.Fields{
"address": about.Address,
"name": about.Name,
@ -655,24 +666,14 @@ func (p *Manager) ProcessValidateConfig(nodeid string, config *app.Config) error
return node.Core().ProcessValidateConfig(config)
}
func (p *Manager) LogEvents(ctx context.Context, filters api.LogEventFilters) (<-chan api.LogEvent, error) {
eventChan := make(chan api.LogEvent, 128)
func (p *Manager) LogEvents() (<-chan event.Event, event.CancelFunc, error) {
ch, cancel := p.events.log.Subscribe()
p.lock.RLock()
for _, n := range p.nodes {
go func(node *Node, e chan<- api.LogEvent) {
eventChan, err := node.Core().Events(ctx, filters)
if err != nil {
return
}
for event := range eventChan {
event.CoreID = node.id
e <- event
}
}(n, eventChan)
}
p.lock.RUnlock()
return eventChan, nil
return ch, cancel, nil
}
func (p *Manager) ProcessEvents() (<-chan event.Event, event.CancelFunc, error) {
ch, cancel := p.events.process.Subscribe()
return ch, cancel, nil
}

View File

@ -18,6 +18,16 @@ type EventSource interface {
Events() (<-chan Event, CancelFunc, error)
}
type eventSource struct {
pubsub *PubSub
}
func (s *eventSource) Events() (<-chan Event, CancelFunc, error) {
ch, cancel := s.pubsub.Subscribe()
return ch, cancel, nil
}
type PubSub struct {
publisher chan Event
publisherClosed bool
@ -44,6 +54,10 @@ func NewPubSub() *PubSub {
return w
}
func (w *PubSub) EventSource() EventSource {
return &eventSource{pubsub: w}
}
func (w *PubSub) Consume(s EventSource, rewrite func(e Event) Event) {
ch, cancel, err := s.Events()
if err != nil {
@ -111,7 +125,7 @@ func (w *PubSub) Close() {
}
func (w *PubSub) Subscribe() (<-chan Event, CancelFunc) {
l := make(chan Event, 1024)
l := make(chan Event, 128)
var id string = ""

42
event/log.go Normal file
View File

@ -0,0 +1,42 @@
package event
import (
"maps"
"time"
)
type LogEvent struct {
Time time.Time
Level string
Component string
Caller string
Message string
CoreID string
Data map[string]any
}
func (e *LogEvent) Clone() Event {
evt := &LogEvent{
Time: e.Time,
Level: e.Level,
Component: e.Component,
Caller: e.Caller,
Message: e.Message,
CoreID: e.CoreID,
Data: maps.Clone(e.Data),
}
return evt
}
func NewLogEvent(ts time.Time, level, component, caller, message string, data map[string]any) *LogEvent {
return &LogEvent{
Time: ts,
Level: level,
Component: component,
Caller: caller,
Message: message,
Data: maps.Clone(data),
}
}

View File

@ -11,6 +11,7 @@ type ProcessEvent struct {
Line string
Progress *ProcessProgress
Timestamp time.Time
CoreID string
}
func (e *ProcessEvent) Clone() Event {
@ -20,6 +21,7 @@ func (e *ProcessEvent) Clone() Event {
Type: e.Type,
Line: e.Line,
Timestamp: e.Timestamp,
CoreID: e.CoreID,
}
if e.Progress != nil {

View File

@ -4,15 +4,15 @@ import (
"fmt"
"regexp"
"strings"
"time"
"github.com/datarhei/core/v16/encoding/json"
"github.com/datarhei/core/v16/event"
"github.com/datarhei/core/v16/log"
)
type LogEvent struct {
Timestamp int64 `json:"ts" format:"int64"`
Level int `json:"level"`
Level string `json:"level"`
Component string `json:"event"`
Message string `json:"message"`
Caller string `json:"caller"`
@ -21,16 +21,22 @@ type LogEvent struct {
Data map[string]string `json:"data"`
}
func (e *LogEvent) Unmarshal(le *log.Event) {
e.Timestamp = le.Time.Unix()
e.Level = int(le.Level)
e.Component = strings.ToLower(le.Component)
e.Message = le.Message
e.Caller = le.Caller
func (e *LogEvent) Unmarshal(le event.Event) bool {
evt, ok := le.(*event.LogEvent)
if !ok {
return false
}
e.Timestamp = evt.Time.Unix()
e.Level = evt.Level
e.Component = strings.ToLower(evt.Component)
e.Message = evt.Message
e.Caller = evt.Caller
e.CoreID = evt.CoreID
e.Data = make(map[string]string)
for k, v := range le.Data {
for k, v := range evt.Data {
var value string
switch val := v.(type) {
@ -52,6 +58,26 @@ func (e *LogEvent) Unmarshal(le *log.Event) {
e.Data[k] = value
}
return true
}
func (e *LogEvent) Marshal() event.Event {
evt := &event.LogEvent{
Time: time.UnixMilli(e.Timestamp),
Level: e.Level,
Component: e.Component,
Caller: e.Caller,
Message: e.Message,
CoreID: e.CoreID,
Data: map[string]any{},
}
for k, v := range e.Data {
evt.Data[k] = v
}
return evt
}
func (e *LogEvent) Filter(ef *LogEventFilter) bool {
@ -62,8 +88,7 @@ func (e *LogEvent) Filter(ef *LogEventFilter) bool {
}
if ef.reLevel != nil {
level := log.Level(e.Level).String()
if !ef.reLevel.MatchString(level) {
if !ef.reLevel.MatchString(e.Level) {
return false
}
}
@ -191,6 +216,7 @@ type ProcessEvent struct {
Type string `json:"type"`
Line string `json:"line,omitempty"`
Progress *ProcessProgress `json:"progress,omitempty"`
CoreID string `json:"core_id,omitempty"`
Timestamp int64 `json:"ts"`
}
@ -200,6 +226,22 @@ type ProcessProgressInput struct {
AVstream ProcessProgressInputAVstream `json:"avstream"`
}
func (p *ProcessProgressInput) Marshal() event.ProcessProgressInput {
o := event.ProcessProgressInput{}
if x, err := p.Bitrate.Float64(); err == nil {
o.Bitrate = x
}
if x, err := p.FPS.Float64(); err == nil {
o.FPS = x
}
o.AVstream = p.AVstream.Marshal()
return o
}
type ProcessProgressInputAVstream struct {
Looping bool `json:"looping"`
Enc uint64 `json:"enc"`
@ -208,11 +250,37 @@ type ProcessProgressInputAVstream struct {
Time uint64 `json:"time"`
}
func (p *ProcessProgressInputAVstream) Marshal() event.ProcessProgressInputAVstream {
o := event.ProcessProgressInputAVstream{
Looping: p.Looping,
Enc: p.Enc,
Drop: p.Drop,
Dup: p.Dup,
Time: p.Time,
}
return o
}
type ProcessProgressOutput struct {
Bitrate json.Number `json:"bitrate" swaggertype:"number" jsonschema:"type=number"`
FPS json.Number `json:"fps" swaggertype:"number" jsonschema:"type=number"`
}
func (p *ProcessProgressOutput) Marshal() event.ProcessProgressOutput {
o := event.ProcessProgressOutput{}
if x, err := p.Bitrate.Float64(); err == nil {
o.Bitrate = x
}
if x, err := p.FPS.Float64(); err == nil {
o.FPS = x
}
return o
}
type ProcessProgress struct {
Input []ProcessProgressInput `json:"input"`
Output []ProcessProgressOutput `json:"output"`
@ -244,6 +312,24 @@ func (p *ProcessProgress) Unmarshal(e *event.ProcessProgress) {
p.Time = json.ToNumber(e.Time)
}
func (p *ProcessProgress) Marshal() *event.ProcessProgress {
e := &event.ProcessProgress{}
if x, err := p.Time.Float64(); err == nil {
e.Time = x
}
for _, input := range p.Input {
e.Input = append(e.Input, input.Marshal())
}
for _, output := range p.Output {
e.Output = append(e.Output, output.Marshal())
}
return e
}
func (p *ProcessEvent) Unmarshal(e event.Event) bool {
evt, ok := e.(*event.ProcessEvent)
if !ok {
@ -261,10 +347,29 @@ func (p *ProcessEvent) Unmarshal(e event.Event) bool {
p.Progress.Unmarshal(evt.Progress)
}
p.Timestamp = evt.Timestamp.UnixMilli()
p.CoreID = evt.CoreID
return true
}
func (p *ProcessEvent) Marshal() event.Event {
evt := &event.ProcessEvent{
ProcessID: p.ProcessID,
Domain: p.Domain,
Type: p.Type,
Line: p.Line,
Progress: nil,
Timestamp: time.UnixMilli(p.Timestamp),
CoreID: p.CoreID,
}
if p.Progress != nil {
evt.Progress = p.Progress.Marshal()
}
return evt
}
func (e *ProcessEvent) Filter(ef *ProcessEventFilter) bool {
if ef.reProcessID != nil {
if !ef.reProcessID.MatchString(e.ProcessID) {
@ -284,6 +389,12 @@ func (e *ProcessEvent) Filter(ef *ProcessEventFilter) bool {
}
}
if ef.reCoreID != nil {
if !ef.reCoreID.MatchString(e.CoreID) {
return false
}
}
return true
}
@ -291,10 +402,12 @@ type ProcessEventFilter struct {
ProcessID string `json:"pid"`
Domain string `json:"domain"`
Type string `json:"type"`
CoreID string `json:"core_id"`
reProcessID *regexp.Regexp
reDomain *regexp.Regexp
reType *regexp.Regexp
reCoreID *regexp.Regexp
}
type ProcessEventFilters struct {
@ -329,5 +442,14 @@ func (ef *ProcessEventFilter) Compile() error {
ef.reType = r
}
if len(ef.CoreID) != 0 {
r, err := regexp.Compile("(?i)" + ef.CoreID)
if err != nil {
return err
}
ef.reCoreID = r
}
return nil
}

View File

@ -9,7 +9,7 @@ import (
func TestEventFilter(t *testing.T) {
event := LogEvent{
Timestamp: 1234,
Level: 3,
Level: "info",
Component: "foobar",
Message: "none",
Data: map[string]string{
@ -83,7 +83,7 @@ func TestEventFilter(t *testing.T) {
func TestEventFilterDataKey(t *testing.T) {
event := LogEvent{
Timestamp: 1234,
Level: 3,
Level: "info",
Component: "foobar",
Message: "none",
Data: map[string]string{
@ -137,7 +137,7 @@ func TestEventFilterDataKey(t *testing.T) {
func BenchmarkEventFilters(b *testing.B) {
event := LogEvent{
Timestamp: 1234,
Level: 3,
Level: "info",
Component: "foobar",
Message: "none",
Data: map[string]string{
@ -159,7 +159,7 @@ func BenchmarkEventFilters(b *testing.B) {
res := event.Filter(&levelfilter)
require.True(b, res)
for i := 0; i < b.N; i++ {
for b.Loop() {
event.Filter(&levelfilter)
}
}

View File

@ -57,8 +57,9 @@ type RestClient interface {
FilesystemDeleteFile(storage, path string) error // DELETE /v3/fs/{storage}/{path}
FilesystemAddFile(storage, path string, data io.Reader) error // PUT /v3/fs/{storage}/{path}
Events(ctx context.Context, filters api.LogEventFilters) (<-chan api.LogEvent, error) // POST /v3/events
MediaEvents(ctx context.Context, storage, pattern string) (<-chan api.MediaEvent, error) // GET /v3/events/media/{storage}
LogEvents(ctx context.Context, filters api.LogEventFilters) (<-chan api.LogEvent, error) // POST /v3/events
MediaEvents(ctx context.Context, storage, pattern string) (<-chan api.MediaEvent, error) // GET /v3/events/media/{storage}
ProcessEvents(ctx context.Context, filters api.ProcessEventFilters) (<-chan api.ProcessEvent, error) // POST /v3/events/process
ProcessList(opts ProcessListOptions) ([]api.Process, error) // GET /v3/process
ProcessAdd(p *app.Config, metadata map[string]any) error // POST /v3/process

View File

@ -11,7 +11,7 @@ import (
"github.com/datarhei/core/v16/mem"
)
func (r *restclient) Events(ctx context.Context, filters api.LogEventFilters) (<-chan api.LogEvent, error) {
func (r *restclient) LogEvents(ctx context.Context, filters api.LogEventFilters) (<-chan api.LogEvent, error) {
buf := mem.Get()
defer mem.Put(buf)
@ -104,3 +104,51 @@ func (r *restclient) MediaEvents(ctx context.Context, storage, pattern string) (
return channel, nil
}
func (r *restclient) ProcessEvents(ctx context.Context, filters api.ProcessEventFilters) (<-chan api.ProcessEvent, error) {
buf := mem.Get()
defer mem.Put(buf)
e := json.NewEncoder(buf)
e.Encode(filters)
header := make(http.Header)
header.Set("Accept", "application/x-json-stream")
stream, err := r.stream(ctx, "POST", "/v3/events/process", nil, header, "application/json", buf.Reader())
if err != nil {
return nil, err
}
channel := make(chan api.ProcessEvent, 128)
go func(stream io.ReadCloser, ch chan<- api.ProcessEvent) {
defer stream.Close()
defer close(channel)
decoder := json.NewDecoder(stream)
for decoder.More() {
var event api.ProcessEvent
if err := decoder.Decode(&event); err == io.EOF {
return
} else if err != nil {
event.Type = "error"
event.Line = err.Error()
}
// Don't emit keepalives
if event.Type == "keepalive" {
continue
}
ch <- event
if event.Type == "" || event.Type == "error" {
return
}
}
}(stream, channel)
return channel, nil
}

View File

@ -1,8 +1,9 @@
package api
import (
"context"
"fmt"
"net/http"
goslices "slices"
"strings"
"time"
@ -13,10 +14,10 @@ import (
"github.com/labstack/echo/v4"
)
// Events returns a stream of log event
// LogEvents returns a stream of log event
// @Summary Stream of log events
// @Description Stream of log events of whats happening on each node in the cluster
// @ID cluster-3-events
// @ID cluster-3-events-log
// @Tags v16.?.?
// @Accept json
// @Produce text/event-stream
@ -25,7 +26,7 @@ import (
// @Success 200 {object} api.LogEvent
// @Security ApiKeyAuth
// @Router /api/v3/cluster/events [post]
func (h *ClusterHandler) Events(c echo.Context) error {
func (h *ClusterHandler) LogEvents(c echo.Context) error {
filters := api.LogEventFilters{}
if err := util.ShouldBindJSON(c, &filters); err != nil {
@ -64,19 +65,15 @@ func (h *ClusterHandler) Events(c echo.Context) error {
res.Header().Set(echo.HeaderConnection, "close")
res.WriteHeader(http.StatusOK)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
evts, err := h.proxy.LogEvents(ctx, filters)
evts, cancel, err := h.proxy.LogEvents()
if err != nil {
return api.Err(http.StatusInternalServerError, "", "%s", err.Error())
}
defer cancel()
enc := json.NewEncoder(res)
enc.SetIndent("", "")
done := make(chan error, 1)
filterEvent := func(event *api.LogEvent) bool {
if len(filter) == 0 {
return true
@ -90,27 +87,33 @@ func (h *ClusterHandler) Events(c echo.Context) error {
return event.Filter(f)
}
event := api.LogEvent{}
if contentType == "text/event-stream" {
res.Write([]byte(":keepalive\n\n"))
res.Flush()
for {
select {
case err := <-done:
return err
case <-reqctx.Done():
done <- nil
return nil
case <-ticker.C:
res.Write([]byte(":keepalive\n\n"))
res.Flush()
case event := <-evts:
case e, ok := <-evts:
if !ok {
return fmt.Errorf("channel closed")
}
event.Unmarshal(e)
if !filterEvent(&event) {
continue
}
res.Write([]byte("event: " + event.Component + "\ndata: "))
if err := enc.Encode(event); err != nil {
done <- err
return err
}
res.Write([]byte("\n"))
res.Flush()
@ -122,23 +125,122 @@ func (h *ClusterHandler) Events(c echo.Context) error {
for {
select {
case err := <-done:
return err
case <-reqctx.Done():
done <- nil
return nil
case <-ticker.C:
res.Write([]byte("{\"event\": \"keepalive\"}\n"))
res.Flush()
case event := <-evts:
case e, ok := <-evts:
if !ok {
return fmt.Errorf("channel closed")
}
event.Unmarshal(e)
if !filterEvent(&event) {
continue
}
if err := enc.Encode(event); err != nil {
done <- err
return err
}
res.Flush()
}
}
}
}
// ProcessEvents returns a stream of process event
// @Summary Stream of process events
// @Description Stream of process events of whats happening on each node in the cluster
// @ID cluster-3-events-process
// @Tags v16.?.?
// @Accept json
// @Produce json-stream
// @Param filters body api.ProcessEventFilters false "Event filters"
// @Success 200 {object} api.ProcessEvent
// @Security ApiKeyAuth
// @Router /api/v3/cluster/events/process [post]
func (h *ClusterHandler) ProcessEvents(c echo.Context) error {
filters := api.ProcessEventFilters{}
if err := util.ShouldBindJSON(c, &filters); err != nil {
return api.Err(http.StatusBadRequest, "", "invalid JSON: %s", err.Error())
}
filter := []*api.ProcessEventFilter{}
for _, f := range filters.Filters {
f := f
if err := f.Compile(); err != nil {
return api.Err(http.StatusBadRequest, "", "invalid filter: %s", err.Error())
}
filter = append(filter, &f)
}
keepaliveTicker := time.NewTicker(5 * time.Second)
defer keepaliveTicker.Stop()
req := c.Request()
reqctx := req.Context()
contentType := "application/x-json-stream"
evts, cancel, err := h.proxy.ProcessEvents()
if err != nil {
return api.Err(http.StatusNotImplemented, "", "events are not implemented for this server")
}
defer cancel()
res := c.Response()
res.Header().Set(echo.HeaderContentType, contentType+"; charset=UTF-8")
res.Header().Set(echo.HeaderCacheControl, "no-store")
res.Header().Set(echo.HeaderConnection, "close")
res.WriteHeader(http.StatusOK)
enc := json.NewEncoder(res)
enc.SetIndent("", "")
filterEvent := func(event *api.ProcessEvent) bool {
if len(filter) == 0 {
return true
}
return goslices.ContainsFunc(filter, event.Filter)
}
event := api.ProcessEvent{}
for {
select {
case <-reqctx.Done():
return nil
case <-keepaliveTicker.C:
_, err := res.Write([]byte("{\"type\":\"keepalive\"}\n"))
if err != nil {
return err
}
res.Flush()
case e, ok := <-evts:
if !ok {
return fmt.Errorf("channel closed")
}
if !event.Unmarshal(e) {
continue
}
if !filterEvent(&event) {
continue
}
if err := enc.Encode(event); err != nil {
return err
}
res.Flush()
}
}
}

View File

@ -1,6 +1,7 @@
package api
import (
"fmt"
"net/http"
goslices "slices"
"strings"
@ -12,7 +13,6 @@ import (
"github.com/datarhei/core/v16/glob"
"github.com/datarhei/core/v16/http/api"
"github.com/datarhei/core/v16/http/handler/util"
"github.com/datarhei/core/v16/log"
"github.com/datarhei/core/v16/slices"
"github.com/labstack/echo/v4"
@ -20,16 +20,15 @@ import (
// The EventsHandler type provides handler functions for retrieving event.
type EventsHandler struct {
logs log.ChannelWriter
logs event.EventSource
media map[string]event.MediaSource
process event.EventSource
lock sync.Mutex
}
// NewEvents returns a new EventsHandler type
func NewEvents(logs log.ChannelWriter) *EventsHandler {
func NewEvents() *EventsHandler {
return &EventsHandler{
logs: logs,
media: map[string]event.MediaSource{},
}
}
@ -56,6 +55,17 @@ func (h *EventsHandler) SetProcessSource(source event.EventSource) {
h.process = source
}
func (h *EventsHandler) SetLogSource(source event.EventSource) {
if source == nil {
return
}
h.lock.Lock()
defer h.lock.Unlock()
h.logs = source
}
// LogEvents returns a stream of event
// @Summary Stream of log events
// @Description Stream of log event of whats happening in the core
@ -107,14 +117,15 @@ func (h *EventsHandler) LogEvents(c echo.Context) error {
res.Header().Set(echo.HeaderConnection, "close")
res.WriteHeader(http.StatusOK)
evts, cancel := h.logs.Subscribe()
evts, cancel, err := h.logs.Events()
if err != nil {
return api.Err(http.StatusNotImplemented, "", "events are not implemented for this server")
}
defer cancel()
enc := json.NewEncoder(res)
enc.SetIndent("", "")
done := make(chan error, 1)
filterEvent := func(event *api.LogEvent) bool {
if len(filter) == 0 {
return true
@ -136,15 +147,17 @@ func (h *EventsHandler) LogEvents(c echo.Context) error {
for {
select {
case err := <-done:
return err
case <-reqctx.Done():
done <- nil
return nil
case <-ticker.C:
res.Write([]byte(":keepalive\n\n"))
res.Flush()
case e := <-evts:
event.Unmarshal(&e)
case e, ok := <-evts:
if !ok {
return fmt.Errorf("channel closed")
}
event.Unmarshal(e)
if !filterEvent(&event) {
continue
@ -152,7 +165,7 @@ func (h *EventsHandler) LogEvents(c echo.Context) error {
res.Write([]byte("event: " + event.Component + "\ndata: "))
if err := enc.Encode(event); err != nil {
done <- err
return err
}
res.Write([]byte("\n"))
res.Flush()
@ -164,22 +177,24 @@ func (h *EventsHandler) LogEvents(c echo.Context) error {
for {
select {
case err := <-done:
return err
case <-reqctx.Done():
done <- nil
return nil
case <-ticker.C:
res.Write([]byte("{\"event\": \"keepalive\"}\n"))
res.Flush()
case e := <-evts:
event.Unmarshal(&e)
case e, ok := <-evts:
if !ok {
return fmt.Errorf("channel closed")
}
event.Unmarshal(e)
if !filterEvent(&event) {
continue
}
if err := enc.Encode(event); err != nil {
done <- err
return err
}
res.Flush()
}
@ -248,8 +263,6 @@ func (h *EventsHandler) MediaEvents(c echo.Context) error {
enc := json.NewEncoder(res)
enc.SetIndent("", "")
done := make(chan error, 1)
createList := func() api.MediaEvent {
list := mediaSource.MediaList()
@ -277,7 +290,7 @@ func (h *EventsHandler) MediaEvents(c echo.Context) error {
}
if err := enc.Encode(createList()); err != nil {
done <- err
return err
}
res.Flush()
@ -285,19 +298,21 @@ func (h *EventsHandler) MediaEvents(c echo.Context) error {
for {
select {
case err := <-done:
return err
case <-reqctx.Done():
done <- nil
return nil
case <-keepaliveTicker.C:
res.Write([]byte("{\"action\":\"keepalive\"}\n"))
res.Flush()
case <-listTicker.C:
if err := enc.Encode(createList()); err != nil {
done <- err
return err
}
res.Flush()
case e := <-evts:
case e, ok := <-evts:
if !ok {
return fmt.Errorf("channel closed")
}
if !event.Unmarshal(e) {
continue
}
@ -309,7 +324,7 @@ func (h *EventsHandler) MediaEvents(c echo.Context) error {
}
if err := enc.Encode(event); err != nil {
done <- err
return err
}
res.Flush()
}
@ -370,8 +385,6 @@ func (h *EventsHandler) ProcessEvents(c echo.Context) error {
enc := json.NewEncoder(res)
enc.SetIndent("", "")
done := make(chan error, 1)
filterEvent := func(event *api.ProcessEvent) bool {
if len(filter) == 0 {
return true
@ -384,14 +397,19 @@ func (h *EventsHandler) ProcessEvents(c echo.Context) error {
for {
select {
case err := <-done:
return err
case <-reqctx.Done():
done <- nil
return nil
case <-keepaliveTicker.C:
res.Write([]byte("{\"type\":\"keepalive\"}\n"))
_, err := res.Write([]byte("{\"type\":\"keepalive\"}\n"))
if err != nil {
return err
}
res.Flush()
case e := <-evts:
case e, ok := <-evts:
if !ok {
return fmt.Errorf("channel closed")
}
if !event.Unmarshal(e) {
continue
}
@ -401,7 +419,7 @@ func (h *EventsHandler) ProcessEvents(c echo.Context) error {
}
if err := enc.Encode(event); err != nil {
done <- err
return err
}
res.Flush()
}

View File

@ -38,6 +38,7 @@ import (
"github.com/datarhei/core/v16/cluster"
cfgstore "github.com/datarhei/core/v16/config/store"
"github.com/datarhei/core/v16/event"
"github.com/datarhei/core/v16/http/cache"
"github.com/datarhei/core/v16/http/errorhandler"
"github.com/datarhei/core/v16/http/fs"
@ -84,7 +85,7 @@ var ListenAndServe = http.ListenAndServe
type Config struct {
Logger log.Logger
LogBuffer log.BufferWriter
LogEvents log.ChannelWriter
LogEvents event.EventSource
Restream restream.Restreamer
Metrics monitor.HistoryReader
Prometheus prometheus.Reader
@ -279,10 +280,9 @@ func NewServer(config Config) (serverhandler.Server, error) {
config.LogBuffer,
)
s.v3handler.events = api.NewEvents(
config.LogEvents,
)
s.v3handler.events = api.NewEvents()
s.v3handler.events.SetLogSource(config.LogEvents)
for name, fs := range s.filesystems {
s.v3handler.events.AddMediaSource(name, fs.Filesystem)
}
@ -773,7 +773,9 @@ func (s *server) setRoutesV3(v3 *echo.Group) {
v3.GET("/cluster/fs/:storage", s.v3handler.cluster.FilesystemListFiles)
v3.POST("/cluster/events", s.v3handler.cluster.Events)
v3.POST("/cluster/events", s.v3handler.cluster.LogEvents)
v3.POST("/cluster/events/log", s.v3handler.cluster.LogEvents)
v3.POST("/cluster/events/process", s.v3handler.cluster.ProcessEvents)
if !s.readOnly {
v3.PUT("/cluster/transfer/:id", s.v3handler.cluster.TransferLeadership)

View File

@ -3,6 +3,7 @@ package log
import (
"fmt"
"maps"
"reflect"
"runtime"
"runtime/debug"
@ -285,11 +286,6 @@ func (e *Event) Log(format string, args ...interface{}) {
}
func (e *Event) clone() *Event {
data := make(Fields, len(e.Data))
for k, v := range e.Data {
data[k] = v
}
return &Event{
Time: e.Time,
Caller: e.Caller,
@ -298,7 +294,7 @@ func (e *Event) clone() *Event {
Component: e.Component,
Message: e.Message,
err: e.err,
Data: data,
Data: maps.Clone(e.Data),
}
}

View File

@ -2,15 +2,13 @@ package log
import (
"container/ring"
"context"
"fmt"
"io"
"os"
"regexp"
"strings"
"sync"
"github.com/lithammer/shortuuid/v4"
"github.com/datarhei/core/v16/event"
"github.com/mattn/go-isatty"
)
@ -318,114 +316,28 @@ func (w *bufferWriter) Events() []*Event {
return lines
}
type ChannelWriter interface {
Writer
Subscribe() (<-chan Event, func())
type ChannelWriter struct {
pubsub *event.PubSub
}
type channelWriter struct {
publisher chan Event
publisherClosed bool
publisherLock sync.Mutex
ctx context.Context
cancel context.CancelFunc
subscriber map[string]chan Event
subscriberLock sync.Mutex
}
func NewChannelWriter() ChannelWriter {
w := &channelWriter{
publisher: make(chan Event, 1024),
publisherClosed: false,
subscriber: make(map[string]chan Event),
func NewChannelWriter() *ChannelWriter {
w := &ChannelWriter{
pubsub: event.NewPubSub(),
}
w.ctx, w.cancel = context.WithCancel(context.Background())
go w.broadcast()
return w
}
func (w *channelWriter) Write(e *Event) error {
event := e.clone()
event.logger = nil
w.publisherLock.Lock()
defer w.publisherLock.Unlock()
if w.publisherClosed {
return fmt.Errorf("writer is closed")
}
select {
case w.publisher <- *e:
default:
return fmt.Errorf("publisher queue full")
}
return nil
func (w *ChannelWriter) Write(e *Event) error {
return w.pubsub.Publish(event.NewLogEvent(e.Time, e.Level.String(), e.Component, e.Caller, e.Message, e.Data))
}
func (w *channelWriter) Close() {
w.cancel()
w.publisherLock.Lock()
close(w.publisher)
w.publisherClosed = true
w.publisherLock.Unlock()
w.subscriberLock.Lock()
for _, c := range w.subscriber {
close(c)
}
w.subscriber = make(map[string]chan Event)
w.subscriberLock.Unlock()
func (w *ChannelWriter) Close() {
w.pubsub.Close()
}
func (w *channelWriter) Subscribe() (<-chan Event, func()) {
l := make(chan Event, 1024)
func (w *ChannelWriter) Events() (<-chan event.Event, event.CancelFunc, error) {
ch, cancel := w.pubsub.Subscribe()
var id string = ""
w.subscriberLock.Lock()
for {
id = shortuuid.New()
if _, ok := w.subscriber[id]; !ok {
w.subscriber[id] = l
break
}
}
w.subscriberLock.Unlock()
unsubscribe := func() {
w.subscriberLock.Lock()
delete(w.subscriber, id)
w.subscriberLock.Unlock()
}
return l, unsubscribe
}
func (w *channelWriter) broadcast() {
for {
select {
case <-w.ctx.Done():
return
case e := <-w.publisher:
w.subscriberLock.Lock()
for _, c := range w.subscriber {
pp := e.clone()
select {
case c <- *pp:
default:
}
}
w.subscriberLock.Unlock()
}
}
return ch, cancel, nil
}

View File

@ -226,6 +226,7 @@ func (r *restream) Stop() {
go func(t *task) {
defer wg.Done()
t.Kill()
t.parser.Destroy()
}(t)
return true