Add /v3/cluster/events endpoint to gather events from all nodes

This commit is contained in:
Ingo Oppermann 2024-08-22 13:40:38 +02:00
parent 9947ba822b
commit bebef61e55
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
6 changed files with 228 additions and 37 deletions

View File

@ -164,8 +164,9 @@ func (n *Core) connect() error {
Address: u.String(),
Client: &http.Client{
Transport: tr,
Timeout: 5 * time.Second,
Timeout: 0,
},
Timeout: 5 * time.Second,
})
if err != nil {
return fmt.Errorf("creating client failed (%s): %w", address, err)
@ -267,7 +268,6 @@ type CoreVersion struct {
}
func (n *Core) About() (CoreAbout, error) {
n.lock.RLock()
client := n.client
n.lock.RUnlock()
@ -808,3 +808,15 @@ func (n *Core) ClusterProcessList() ([]Process, error) {
return processes, nil
}
func (n *Core) Events(ctx context.Context, filters api.EventFilters) (<-chan api.Event, error) {
n.lock.RLock()
client := n.client
n.lock.RUnlock()
if client == nil {
return nil, ErrNoPeer
}
return client.Events(ctx, filters)
}

View File

@ -1,6 +1,7 @@
package node
import (
"context"
"errors"
"fmt"
"io"
@ -625,3 +626,25 @@ func (p *Manager) ProcessProbeConfig(nodeid string, config *app.Config) (api.Pro
return node.Core().ProcessProbeConfig(config)
}
func (p *Manager) Events(ctx context.Context, filters api.EventFilters) (<-chan api.Event, error) {
eventChan := make(chan api.Event, 128)
p.lock.RLock()
for _, n := range p.nodes {
go func(node *Node, e chan<- api.Event) {
eventChan, err := node.Core().Events(ctx, filters)
if err != nil {
return
}
for event := range eventChan {
event.CoreID = node.id
e <- event
}
}(n, eventChan)
}
p.lock.RUnlock()
return eventChan, nil
}

View File

@ -15,6 +15,7 @@ type Event struct {
Component string `json:"event"`
Message string `json:"message"`
Caller string `json:"caller"`
CoreID string `json:"core_id,omitempty"`
Data map[string]string `json:"data"`
}
@ -66,12 +67,18 @@ func (e *Event) Filter(ef *EventFilter) bool {
}
}
if ef.reCaller != nil {
if len(e.Caller) != 0 && ef.reCaller != nil {
if !ef.reCaller.MatchString(e.Caller) {
return false
}
}
if len(e.CoreID) != 0 && ef.reCoreID != nil {
if !ef.reCoreID.MatchString(e.CoreID) {
return false
}
}
for k, r := range ef.reData {
v, ok := e.Data[k]
if !ok {
@ -91,11 +98,13 @@ type EventFilter struct {
Message string `json:"message"`
Level string `json:"level"`
Caller string `json:"caller"`
CoreID string `json:"core_id"`
Data map[string]string `json:"data"`
reMessage *regexp.Regexp
reLevel *regexp.Regexp
reCaller *regexp.Regexp
reCoreID *regexp.Regexp
reData map[string]*regexp.Regexp
}
@ -131,6 +140,15 @@ func (ef *EventFilter) Compile() error {
ef.reCaller = r
}
if len(ef.CoreID) != 0 {
r, err := regexp.Compile("(?i)" + ef.CoreID)
if err != nil {
return err
}
ef.reCoreID = r
}
ef.reData = make(map[string]*regexp.Regexp)
for k, v := range ef.Data {

View File

@ -167,8 +167,13 @@ type Config struct {
// Auth0Token is a valid Auth0 token to authorize access to the API.
Auth0Token string
// Client is a HTTPClient that will be used for the API calls. Optional.
// Client is a HTTPClient that will be used for the API calls. Optional. Don't
// set a timeout in the client if you want to use the timeout in this config.
Client HTTPClient
// Timeout is the timeout for the whole connection. Don't set a timeout in
// the optional HTTPClient as it will override this timeout.
Timeout time.Duration
}
type apiconstraint struct {
@ -178,16 +183,17 @@ type apiconstraint struct {
// restclient implements the RestClient interface.
type restclient struct {
address string
prefix string
accessToken Token
refreshToken Token
username string
password string
auth0Token string
client HTTPClient
about api.About
aboutLock sync.RWMutex
address string
prefix string
accessToken Token
refreshToken Token
username string
password string
auth0Token string
client HTTPClient
clientTimeout time.Duration
about api.About
aboutLock sync.RWMutex
version struct {
connectedCore *semver.Version
@ -199,12 +205,13 @@ type restclient struct {
// in case of an error.
func New(config Config) (RestClient, error) {
r := &restclient{
address: config.Address,
prefix: "/api",
username: config.Username,
password: config.Password,
auth0Token: config.Auth0Token,
client: config.Client,
address: config.Address,
prefix: "/api",
username: config.Username,
password: config.Password,
auth0Token: config.Auth0Token,
client: config.Client,
clientTimeout: config.Timeout,
}
if len(config.AccessToken) != 0 {
@ -806,26 +813,11 @@ func (r *restclient) info() (api.About, error) {
}
func (r *restclient) request(req *http.Request) (int, io.ReadCloser, error) {
/*
fmt.Printf("%s %s\n", req.Method, req.URL)
for key, value := range req.Header {
for _, v := range value {
fmt.Printf("%s: %s\n", key, v)
}
}
fmt.Printf("\n")
*/
resp, err := r.client.Do(req)
if err != nil {
return -1, nil, err
}
/*
for key, value := range resp.Header {
for _, v := range value {
fmt.Printf("%s: %s\n", key, v)
}
}
*/
reader := resp.Body
contentEncoding := resp.Header.Get("Content-Encoding")
@ -923,7 +915,7 @@ func (r *restclient) stream(ctx context.Context, method, path string, query *url
}
func (r *restclient) call(method, path string, query *url.Values, header http.Header, contentType string, data io.Reader) ([]byte, error) {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), r.clientTimeout)
defer cancel()
body, err := r.stream(ctx, method, path, query, header, contentType, data)

View File

@ -0,0 +1,144 @@
package api
import (
"context"
"net/http"
"strings"
"time"
"github.com/datarhei/core/v16/encoding/json"
"github.com/datarhei/core/v16/http/api"
"github.com/datarhei/core/v16/http/handler/util"
"github.com/labstack/echo/v4"
)
// Events returns a stream of event
// @Summary Stream of events
// @Description Stream of events of whats happening on each node in the cluster
// @ID cluster-3-events
// @Tags v16.?.?
// @Accept json
// @Produce text/event-stream
// @Produce json-stream
// @Param filters body api.EventFilters false "Event filters"
// @Success 200 {object} api.Event
// @Security ApiKeyAuth
// @Router /api/v3/cluster/events [post]
func (h *ClusterHandler) Events(c echo.Context) error {
filters := api.EventFilters{}
if err := util.ShouldBindJSON(c, &filters); err != nil {
return api.Err(http.StatusBadRequest, "", "invalid JSON: %s", err.Error())
}
filter := map[string]*api.EventFilter{}
for _, f := range filters.Filters {
f := f
if err := f.Compile(); err != nil {
return api.Err(http.StatusBadRequest, "", "invalid filter: %s: %s", f.Component, err.Error())
}
component := strings.ToLower(f.Component)
filter[component] = &f
}
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
req := c.Request()
reqctx := req.Context()
contentType := "text/event-stream"
accept := req.Header.Get(echo.HeaderAccept)
if strings.Contains(accept, "application/x-json-stream") {
contentType = "application/x-json-stream"
}
res := c.Response()
res.Header().Set(echo.HeaderContentType, contentType+"; charset=UTF-8")
res.Header().Set(echo.HeaderCacheControl, "no-store")
res.Header().Set(echo.HeaderConnection, "close")
res.WriteHeader(http.StatusOK)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
evts, err := h.proxy.Events(ctx, filters)
if err != nil {
return api.Err(http.StatusInternalServerError, "", "%s", err.Error())
}
enc := json.NewEncoder(res)
enc.SetIndent("", "")
done := make(chan error, 1)
filterEvent := func(event *api.Event) bool {
if len(filter) == 0 {
return true
}
f, ok := filter[event.Component]
if !ok {
return false
}
return event.Filter(f)
}
if contentType == "text/event-stream" {
res.Write([]byte(":keepalive\n\n"))
res.Flush()
for {
select {
case err := <-done:
return err
case <-reqctx.Done():
done <- nil
case <-ticker.C:
res.Write([]byte(":keepalive\n\n"))
res.Flush()
case event := <-evts:
if !filterEvent(&event) {
continue
}
res.Write([]byte("event: " + event.Component + "\ndata: "))
if err := enc.Encode(event); err != nil {
done <- err
}
res.Write([]byte("\n"))
res.Flush()
}
}
} else {
res.Write([]byte("{\"event\": \"keepalive\"}\n"))
res.Flush()
for {
select {
case err := <-done:
return err
case <-reqctx.Done():
done <- nil
case <-ticker.C:
res.Write([]byte("{\"event\": \"keepalive\"}\n"))
res.Flush()
case event := <-evts:
if !filterEvent(&event) {
continue
}
if err := enc.Encode(event); err != nil {
done <- err
}
res.Flush()
}
}
}
}

View File

@ -762,6 +762,8 @@ func (s *server) setRoutesV3(v3 *echo.Group) {
v3.GET("/cluster/fs/:storage", s.v3handler.cluster.FilesystemListFiles)
v3.POST("/cluster/events", s.v3handler.cluster.Events)
if !s.readOnly {
v3.PUT("/cluster/transfer/:id", s.v3handler.cluster.TransferLeadership)
v3.PUT("/cluster/leave", s.v3handler.cluster.Leave)