Upgrade core client

This commit is contained in:
Ingo Oppermann 2024-04-15 14:59:31 +02:00
parent 18bf51d334
commit 3a31ce6f0e
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
16 changed files with 232 additions and 57 deletions

2
go.mod
View File

@ -9,7 +9,7 @@ require (
github.com/atrox/haikunatorgo/v2 v2.0.1
github.com/caddyserver/certmagic v0.19.2
github.com/casbin/casbin/v2 v2.77.2
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230926123431-2fdbec157292
github.com/datarhei/core-client-go/v16 v16.11.1-0.20240415125433-2e78b4319e8e
github.com/datarhei/gosrt v0.5.4
github.com/datarhei/joy4 v0.0.0-20230505074825-fde05957445a
github.com/fujiwara/shapeio v1.0.0

4
go.sum
View File

@ -46,8 +46,8 @@ github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp
github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w=
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230926123431-2fdbec157292 h1:/GV5wClf40U23jhwMIyoq0hgyCqmN2kRWPyRZe6Lf+Y=
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230926123431-2fdbec157292/go.mod h1:3eKfwhPKoW7faTn+luShRVNMqcIskvlIKjRJ7ShjyL8=
github.com/datarhei/core-client-go/v16 v16.11.1-0.20240415125433-2e78b4319e8e h1:3O75rKCLZLe8323DZCYOL9+eyyWxw50/qxHctDdBTss=
github.com/datarhei/core-client-go/v16 v16.11.1-0.20240415125433-2e78b4319e8e/go.mod h1:3eKfwhPKoW7faTn+luShRVNMqcIskvlIKjRJ7ShjyL8=
github.com/datarhei/gosrt v0.5.4 h1:dE3mmSB+n1GeviGM8xQAW3+UD3mKeFmd84iefDul5Vs=
github.com/datarhei/gosrt v0.5.4/go.mod h1:MiUCwCG+LzFMzLM/kTA+3wiTtlnkVvGbW/F0XzyhtG8=
github.com/datarhei/joy4 v0.0.0-20230505074825-fde05957445a h1:Tf4DSHY1xruBglr+yYP5Wct7czM86GKMYgbXH8a7OFo=

View File

@ -324,7 +324,7 @@ func (h *ClusterHandler) GetProcess(c echo.Context) error {
})
if len(procs) == 0 {
// Check the store in the store for an undeployed process
// Check the store in the cluster for an undeployed process
p, err := h.cluster.GetProcess(app.NewProcessID(id, domain))
if err != nil {
return api.Err(http.StatusNotFound, "", "Unknown process ID: %s", id)

View File

@ -27,6 +27,7 @@ type ClusterNodeCore struct {
Error string `json:"error"`
LastContact float64 `json:"last_contact_ms"` // milliseconds
Latency float64 `json:"latency_ms"` // milliseconds
Version string `json:"version"`
}
type ClusterNodeResources struct {
@ -48,11 +49,13 @@ type ClusterRaft struct {
LogIndex uint64 `json:"log_index"`
}
type ClusterAboutLeader struct {
ID string `json:"id"`
Address string `json:"address"`
ElectedSince uint64 `json:"elected_seconds"`
}
type ClusterAbout struct {
ID string `json:"id"`
Name string `json:"name"`
Leader bool `json:"leader"`
Address string `json:"address"`
Raft ClusterRaft `json:"raft"`
Nodes []ClusterNode `json:"nodes"`
Version string `json:"version"`
@ -60,6 +63,22 @@ type ClusterAbout struct {
DegradedErr string `json:"degraded_error"`
}
type ClusterAboutV1 struct {
ID string `json:"id"`
Name string `json:"name"`
Leader bool `json:"leader"`
Address string `json:"address"`
ClusterAbout
}
type ClusterAboutV2 struct {
ID string `json:"id"`
Domains []string `json:"public_domains"`
Leader ClusterAboutLeader `json:"leader"`
Status string `json:"status"`
ClusterAbout
}
type ClusterNodeFiles struct {
LastUpdate int64 `json:"last_update"` // unix timestamp
Files map[string][]string `json:"files"`

View File

@ -5,6 +5,7 @@ type Event struct {
Level int `json:"level"`
Component string `json:"event"`
Message string `json:"message"`
Caller string `json:"caller"`
Data map[string]string `json:"data"`
}

View File

@ -21,7 +21,7 @@ type ProcessConfigIO struct {
ID string `json:"id"`
Address string `json:"address" validate:"required" jsonschema:"minLength=1"`
Options []string `json:"options"`
Cleanup []ProcessConfigIOCleanup `json:"cleanup,omitempty"`
Cleanup []ProcessConfigIOCleanup `json:"cleanup"`
}
type ProcessConfigIOCleanup struct {

View File

@ -45,6 +45,7 @@ type ProgressIO struct {
// Progress represents the progress of an ffmpeg process
type Progress struct {
Started bool `json:"started"`
Input []ProgressIO `json:"inputs"`
Output []ProgressIO `json:"outputs"`
Mapping StreamMapping `json:"mapping"`

View File

@ -65,12 +65,12 @@ type RestClient interface {
MemFSDeleteFile(path string) error // DELETE /v3/fs/mem/{path}
MemFSAddFile(path string, data io.Reader) error // PUT /v3/fs/mem/{path}
FilesystemList(name, pattern, sort, order string) ([]api.FileInfo, error) // GET /v3/fs/{name}
FilesystemHasFile(name, path string) bool // HEAD /v3/fs/{name}/{path}
FilesystemGetFile(name, path string) (io.ReadCloser, error) // GET /v3/fs/{name}/{path}
FilesystemGetFileOffset(name, path string, offset int64) (io.ReadCloser, error) // GET /v3/fs/{name}/{path}
FilesystemDeleteFile(name, path string) error // DELETE /v3/fs/{name}/{path}
FilesystemAddFile(name, path string, data io.Reader) error // PUT /v3/fs/{name}/{path}
FilesystemList(storage, pattern, sort, order string) ([]api.FileInfo, error) // GET /v3/fs/{storage}
FilesystemHasFile(storage, path string) bool // HEAD /v3/fs/{storage}/{path}
FilesystemGetFile(storage, path string) (io.ReadCloser, error) // GET /v3/fs/{storage}/{path}
FilesystemGetFileOffset(storage, path string, offset int64) (io.ReadCloser, error) // GET /v3/fs/{storage}/{path}
FilesystemDeleteFile(storage, path string) error // DELETE /v3/fs/{storage}/{path}
FilesystemAddFile(storage, path string, data io.Reader) error // PUT /v3/fs/{storage}/{path}
Log() ([]api.LogEvent, error) // GET /v3/log
Events(ctx context.Context, filters api.EventFilters) (<-chan api.Event, error) // POST /v3/events
@ -95,6 +95,8 @@ type RestClient interface {
ProcessMetadata(id ProcessID, key string) (api.Metadata, error) // GET /v3/process/{id}/metadata/{key}
ProcessMetadataSet(id ProcessID, key string, metadata api.Metadata) error // PUT /v3/process/{id}/metadata/{key}
PlayoutStatus(id ProcessID, inputID string) (api.PlayoutStatus, error) // GET /v3/process/{id}/playout/{inputid}/status
IdentitiesList() ([]api.IAMUser, error) // GET /v3/iam/user
Identity(name string) (api.IAMUser, error) // GET /v3/iam/user/{name}
IdentityAdd(u api.IAMUser) error // POST /v3/iam/user
@ -102,17 +104,21 @@ type RestClient interface {
IdentitySetPolicies(name string, p []api.IAMPolicy) error // PUT /v3/iam/user/{name}/policy
IdentityDelete(name string) error // DELETE /v3/iam/user/{name}
Cluster() (api.ClusterAbout, error) // GET /v3/cluster
ClusterHealthy() (bool, error) // GET /v3/cluster/healthy
ClusterSnapshot() (io.ReadCloser, error) // GET /v3/cluster/snapshot
ClusterLeave() error // PUT /v3/cluster/leave
ClusterTransferLeadership(id string) error // PUT /v3/cluster/transfer/{id}
Cluster() (*api.ClusterAboutV1, *api.ClusterAboutV2, error) // GET /v3/cluster
ClusterHealthy() (bool, error) // GET /v3/cluster/healthy
ClusterSnapshot() (io.ReadCloser, error) // GET /v3/cluster/snapshot
ClusterLeave() error // PUT /v3/cluster/leave
ClusterTransferLeadership(id string) error // PUT /v3/cluster/transfer/{id}
ClusterNodeList() ([]api.ClusterNode, error) // GET /v3/cluster/node
ClusterNode(id string) (api.ClusterNode, error) // GET /v3/cluster/node/{id}
ClusterNodeFiles(id string) (api.ClusterNodeFiles, error) // GET /v3/cluster/node/{id}/files
ClusterNodeProcessList(id string, opts ProcessListOptions) ([]api.Process, error) // GET /v3/cluster/node/{id}/process
ClusterNodeVersion(id string) (api.Version, error) // GET /v3/cluster/node/{id}/version
ClusterNodeList() ([]api.ClusterNode, error) // GET /v3/cluster/node
ClusterNode(id string) (api.ClusterNode, error) // GET /v3/cluster/node/{id}
ClusterNodeFiles(id string) (api.ClusterNodeFiles, error) // GET /v3/cluster/node/{id}/files
ClusterNodeProcessList(id string, opts ProcessListOptions) ([]api.Process, error) // GET /v3/cluster/node/{id}/process
ClusterNodeVersion(id string) (api.Version, error) // GET /v3/cluster/node/{id}/version
ClusterNodeFilesystemList(id, storage, pattern, sort, order string) ([]api.FileInfo, error) // GET /v3/cluster/node/{id}/fs/{storage}
ClusterNodeFilesystemDeleteFile(id, storage, path string) error // DELETE /v3/cluster/node/{id}/fs/{storage}/{path}
ClusterNodeFilesystemPutFile(id, storage, path string, data io.Reader) error // PUT /v3/cluster/node/{id}/fs/{storage}/{path}
ClusterNodeFilesystemGetFile(id, storage, path string) (io.ReadCloser, error) // GET /v3/cluster/node/{id}/fs/{storage}/{path}
ClusterDBProcessList() ([]api.Process, error) // GET /v3/cluster/db/process
ClusterDBProcess(id ProcessID) (api.Process, error) // GET /v3/cluster/db/process/{id}
@ -123,6 +129,8 @@ type RestClient interface {
ClusterDBKeyValues() (api.ClusterKVS, error) // GET /v3/cluster/db/kv
ClusterDBProcessMap() (api.ClusterProcessMap, error) // GET /v3/cluster/db/map/process
ClusterFilesystemList(name, pattern, sort, order string) ([]api.FileInfo, error) // GET /v3/cluster/fs/{storage}
ClusterProcessList(opts ProcessListOptions) ([]api.Process, error) // GET /v3/cluster/process
ClusterProcess(id ProcessID, filter []string) (api.Process, error) // POST /v3/cluster/process
ClusterProcessAdd(p api.ProcessConfig) error // GET /v3/cluster/process/{id}
@ -144,6 +152,7 @@ type RestClient interface {
RTMPChannels() ([]api.RTMPChannel, error) // GET /v3/rtmp
SRTChannels() ([]api.SRTChannel, error) // GET /v3/srt
SRTChannelsRaw() ([]byte, error) // GET /v3/srt
Sessions(collectors []string) (api.SessionsSummary, error) // GET /v3/session
SessionsActive(collectors []string) (api.SessionsActive, error) // GET /v3/session/active
@ -377,6 +386,10 @@ func New(config Config) (RestClient, error) {
path: mustNewGlob("/v3/cluster/db/kv"),
constraint: mustNewConstraint("^16.14.0"),
},
{
path: mustNewGlob("/v3/cluster/fs/*"),
constraint: mustNewConstraint("^16.14.0"),
},
{
path: mustNewGlob("/v3/cluster/process"),
constraint: mustNewConstraint("^16.14.0"),
@ -424,6 +437,13 @@ func New(config Config) (RestClient, error) {
{
path: mustNewGlob("/v3/cluster/db/map/process"),
constraint: mustNewConstraint("^16.14.0"),
}, {
path: mustNewGlob("/v3/cluster/node/*/fs/*"),
constraint: mustNewConstraint("^16.14.0"),
},
{
path: mustNewGlob("/v3/cluster/node/*/fs/*/**"),
constraint: mustNewConstraint("^16.14.0"),
},
},
"POST": {
@ -497,6 +517,10 @@ func New(config Config) (RestClient, error) {
path: mustNewGlob("/v3/cluster/transfer/*"),
constraint: mustNewConstraint("^16.14.0"),
},
{
path: mustNewGlob("/v3/cluster/node/*/fs/*/**"),
constraint: mustNewConstraint("^16.14.0"),
},
},
"DELETE": {
{
@ -504,11 +528,15 @@ func New(config Config) (RestClient, error) {
constraint: mustNewConstraint("^16.14.0"),
},
{
path: mustNewGlob("/v3/cluster/process/{id}"),
path: mustNewGlob("/v3/cluster/process/*"),
constraint: mustNewConstraint("^16.14.0"),
},
{
path: mustNewGlob("/v3/cluster/iam/user/{name}"),
path: mustNewGlob("/v3/cluster/iam/user/*"),
constraint: mustNewConstraint("^16.14.0"),
},
{
path: mustNewGlob("/v3/cluster/node/*/fs/*/**"),
constraint: mustNewConstraint("^16.14.0"),
},
},
@ -769,10 +797,6 @@ func (r *restclient) refresh() error {
return err
}
if err != nil {
return err
}
if status != 200 {
return fmt.Errorf("invalid refresh token")
}

View File

@ -3,23 +3,53 @@ package coreclient
import (
"context"
"encoding/json"
"fmt"
"io"
"net/url"
"strings"
"github.com/datarhei/core-client-go/v16/api"
)
func (r *restclient) Cluster() (api.ClusterAbout, error) {
var about api.ClusterAbout
func (r *restclient) Cluster() (*api.ClusterAboutV1, *api.ClusterAboutV2, error) {
data, err := r.call("GET", "/v3/cluster", nil, nil, "", nil)
if err != nil {
return about, err
return nil, nil, err
}
err = json.Unmarshal(data, &about)
var aboutV1 *api.ClusterAboutV1
var aboutV2 *api.ClusterAboutV2
return about, err
type version struct {
Version string `json:"version"`
}
v := version{}
err = json.Unmarshal(data, &v)
if err != nil {
return nil, nil, err
}
if strings.HasPrefix(v.Version, "1.") {
aboutV1 = &api.ClusterAboutV1{}
err = json.Unmarshal(data, aboutV1)
if err != nil {
return nil, nil, err
}
} else if strings.HasPrefix(v.Version, "2.") {
aboutV2 = &api.ClusterAboutV2{}
err = json.Unmarshal(data, aboutV2)
if err != nil {
return nil, nil, err
}
} else {
err = fmt.Errorf("unsupported version (%s)", v.Version)
}
return aboutV1, aboutV2, err
}
func (r *restclient) ClusterHealthy() (bool, error) {

View File

@ -0,0 +1,26 @@
package coreclient
import (
"encoding/json"
"net/url"
"github.com/datarhei/core-client-go/v16/api"
)
func (r *restclient) ClusterFilesystemList(storage, pattern, sort, order string) ([]api.FileInfo, error) {
var files []api.FileInfo
query := &url.Values{}
query.Set("glob", pattern)
query.Set("sort", sort)
query.Set("order", order)
data, err := r.call("GET", "/v3/cluster/fs/"+url.PathEscape(storage), query, nil, "", nil)
if err != nil {
return files, err
}
err = json.Unmarshal(data, &files)
return files, err
}

View File

@ -1,8 +1,11 @@
package coreclient
import (
"context"
"encoding/json"
"io"
"net/url"
"path/filepath"
"github.com/datarhei/core-client-go/v16/api"
)
@ -46,6 +49,52 @@ func (r *restclient) ClusterNodeFiles(id string) (api.ClusterNodeFiles, error) {
return files, err
}
func (r *restclient) ClusterNodeFilesystemList(id, storage, pattern, sort, order string) ([]api.FileInfo, error) {
var files []api.FileInfo
query := &url.Values{}
query.Set("glob", pattern)
query.Set("sort", sort)
query.Set("order", order)
data, err := r.call("GET", "/v3/cluster/node/"+url.PathEscape(id)+"/fs/"+url.PathEscape(storage), query, nil, "", nil)
if err != nil {
return files, err
}
err = json.Unmarshal(data, &files)
return files, err
}
func (r *restclient) ClusterNodeFilesystemPutFile(id, storage, path string, data io.Reader) error {
if !filepath.IsAbs(path) {
path = "/" + path
}
_, err := r.call("PUT", "/v3/cluster/node/"+url.PathEscape(id)+"/fs/"+url.PathEscape(storage)+path, nil, nil, "", data)
return err
}
func (r *restclient) ClusterNodeFilesystemGetFile(id, storage, path string) (io.ReadCloser, error) {
if !filepath.IsAbs(path) {
path = "/" + path
}
return r.stream(context.Background(), "GET", "/v3/cluster/node/"+url.PathEscape(id)+"/fs/"+url.PathEscape(storage)+path, nil, nil, "", nil)
}
func (r *restclient) ClusterNodeFilesystemDeleteFile(id, storage, path string) error {
if !filepath.IsAbs(path) {
path = "/" + path
}
_, err := r.call("DELETE", "/v3/cluster/node/"+url.PathEscape(id)+"/fs/"+url.PathEscape(storage)+path, nil, nil, "", nil)
return err
}
func (r *restclient) ClusterNodeProcessList(id string, opts ProcessListOptions) ([]api.Process, error) {
var processes []api.Process

View File

@ -32,7 +32,7 @@ func (r *restclient) Events(ctx context.Context, filters api.EventFilters) (<-ch
decoder := json.NewDecoder(stream)
for {
for decoder.More() {
var event api.Event
if err := decoder.Decode(&event); err == io.EOF {
return
@ -46,14 +46,9 @@ func (r *restclient) Events(ctx context.Context, filters api.EventFilters) (<-ch
continue
}
select {
case ch <- event:
default:
// Abort if channel is not drained
return
}
ch <- event
if event.Component == "error" {
if event.Component == "" || event.Component == "error" {
return
}
}

View File

@ -23,7 +23,7 @@ const (
ORDER_DESC = "desc"
)
func (r *restclient) FilesystemList(name, pattern, sort, order string) ([]api.FileInfo, error) {
func (r *restclient) FilesystemList(storage, pattern, sort, order string) ([]api.FileInfo, error) {
var files []api.FileInfo
query := &url.Values{}
@ -31,7 +31,7 @@ func (r *restclient) FilesystemList(name, pattern, sort, order string) ([]api.Fi
query.Set("sort", sort)
query.Set("order", order)
data, err := r.call("GET", "/v3/fs/"+url.PathEscape(name), query, nil, "", nil)
data, err := r.call("GET", "/v3/fs/"+url.PathEscape(storage), query, nil, "", nil)
if err != nil {
return files, err
}
@ -51,8 +51,8 @@ func (r *restclient) FilesystemHasFile(name, path string) bool {
return err == nil
}
func (r *restclient) FilesystemGetFile(name, path string) (io.ReadCloser, error) {
return r.FilesystemGetFileOffset(name, path, 0)
func (r *restclient) FilesystemGetFile(storage, path string) (io.ReadCloser, error) {
return r.FilesystemGetFileOffset(storage, path, 0)
}
type ContextReadCloser struct {
@ -72,7 +72,7 @@ func (r *ContextReadCloser) Close() error {
return r.ReadCloser.Close()
}
func (r *restclient) FilesystemGetFileOffset(name, path string, offset int64) (io.ReadCloser, error) {
func (r *restclient) FilesystemGetFileOffset(storage, path string, offset int64) (io.ReadCloser, error) {
if !filepath.IsAbs(path) {
path = "/" + path
}
@ -84,25 +84,25 @@ func (r *restclient) FilesystemGetFileOffset(name, path string, offset int64) (i
header.Set("Range", "bytes="+strconv.FormatInt(offset, 10)+"-")
}
return r.stream(context.Background(), "GET", "/v3/fs/"+url.PathEscape(name)+path, nil, header, "", nil)
return r.stream(context.Background(), "GET", "/v3/fs/"+url.PathEscape(storage)+path, nil, header, "", nil)
}
func (r *restclient) FilesystemDeleteFile(name, path string) error {
func (r *restclient) FilesystemDeleteFile(storage, path string) error {
if !filepath.IsAbs(path) {
path = "/" + path
}
_, err := r.call("DELETE", "/v3/fs/"+url.PathEscape(name)+path, nil, nil, "", nil)
_, err := r.call("DELETE", "/v3/fs/"+url.PathEscape(storage)+path, nil, nil, "", nil)
return err
}
func (r *restclient) FilesystemAddFile(name, path string, data io.Reader) error {
func (r *restclient) FilesystemAddFile(storage, path string, data io.Reader) error {
if !filepath.IsAbs(path) {
path = "/" + path
}
_, err := r.call("PUT", "/v3/fs/"+url.PathEscape(name)+path, nil, nil, "application/data", data)
_, err := r.call("PUT", "/v3/fs/"+url.PathEscape(storage)+path, nil, nil, "application/data", data)
return err
}

View File

@ -0,0 +1,26 @@
package coreclient
import (
"encoding/json"
"net/url"
"github.com/datarhei/core-client-go/v16/api"
)
func (r *restclient) PlayoutStatus(id ProcessID, inputID string) (api.PlayoutStatus, error) {
var status api.PlayoutStatus
path := "/v3/process/" + url.PathEscape(id.ID) + "/playout/" + url.PathEscape(inputID) + "/status"
values := &url.Values{}
values.Set("domain", id.Domain)
data, err := r.call("GET", path, values, nil, "", nil)
if err != nil {
return status, err
}
err = json.Unmarshal(data, &status)
return status, err
}

View File

@ -9,7 +9,7 @@ import (
func (r *restclient) SRTChannels() ([]api.SRTChannel, error) {
var m []api.SRTChannel
data, err := r.call("GET", "/v3/srt", nil, nil, "", nil)
data, err := r.SRTChannelsRaw()
if err != nil {
return nil, err
}
@ -18,3 +18,7 @@ func (r *restclient) SRTChannels() ([]api.SRTChannel, error) {
return m, err
}
func (r *restclient) SRTChannelsRaw() ([]byte, error) {
return r.call("GET", "/v3/srt", nil, nil, "", nil)
}

2
vendor/modules.txt vendored
View File

@ -78,7 +78,7 @@ github.com/cespare/xxhash/v2
# github.com/cpuguy83/go-md2man/v2 v2.0.2
## explicit; go 1.11
github.com/cpuguy83/go-md2man/v2/md2man
# github.com/datarhei/core-client-go/v16 v16.11.1-0.20230926123431-2fdbec157292
# github.com/datarhei/core-client-go/v16 v16.11.1-0.20240415125433-2e78b4319e8e
## explicit; go 1.18
github.com/datarhei/core-client-go/v16
github.com/datarhei/core-client-go/v16/api