Update core client package

This commit is contained in:
Ingo Oppermann 2023-07-21 10:31:33 +02:00
parent ff2b7fe054
commit a193e93b94
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
9 changed files with 139 additions and 9 deletions

2
go.mod
View File

@ -9,7 +9,7 @@ require (
github.com/atrox/haikunatorgo/v2 v2.0.1
github.com/caddyserver/certmagic v0.19.0
github.com/casbin/casbin/v2 v2.72.0
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230717195052-016daa63407b
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230721082756-e22b6cdac41f
github.com/datarhei/gosrt v0.5.2
github.com/datarhei/joy4 v0.0.0-20230505074825-fde05957445a
github.com/fujiwara/shapeio v1.0.0

2
go.sum
View File

@ -52,6 +52,8 @@ github.com/datarhei/core-client-go/v16 v16.11.1-0.20230717141633-8f0e5ce4c68c h1
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230717141633-8f0e5ce4c68c/go.mod h1:3eKfwhPKoW7faTn+luShRVNMqcIskvlIKjRJ7ShjyL8=
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230717195052-016daa63407b h1:s20UH93emEYhorqjy36B7x4aT9Ne8rPzvoQJ8c7ACz4=
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230717195052-016daa63407b/go.mod h1:3eKfwhPKoW7faTn+luShRVNMqcIskvlIKjRJ7ShjyL8=
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230721082756-e22b6cdac41f h1:qjb9P4HynN71QOwit5fdH/7lYeEaS9eVuH8+CDQu+xI=
github.com/datarhei/core-client-go/v16 v16.11.1-0.20230721082756-e22b6cdac41f/go.mod h1:3eKfwhPKoW7faTn+luShRVNMqcIskvlIKjRJ7ShjyL8=
github.com/datarhei/gosrt v0.5.2 h1:eagqZwEIiGPNJW0rLep3gwceObyaZ17+iKRc+l4VEpc=
github.com/datarhei/gosrt v0.5.2/go.mod h1:0308GQhAu5hxe2KYdbss901aKceSSKXnwCr8Vs++eiw=
github.com/datarhei/joy4 v0.0.0-20230505074825-fde05957445a h1:Tf4DSHY1xruBglr+yYP5Wct7czM86GKMYgbXH8a7OFo=

View File

@ -0,0 +1,21 @@
package api
type Event struct {
Timestamp int64 `json:"ts" format:"int64"`
Level int `json:"level"`
Component string `json:"event"`
Message string `json:"message"`
Data map[string]string `json:"data"`
}
type EventFilter struct {
Component string `json:"event"`
Message string `json:"message"`
Level string `json:"level"`
Data map[string]string `json:"data"`
}
type EventFilters struct {
Filters []EventFilter `json:"filters"`
}

View File

@ -2,6 +2,7 @@ package coreclient
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
@ -71,7 +72,8 @@ type RestClient interface {
FilesystemDeleteFile(name, path string) error // DELETE /v3/fs/{name}/{path}
FilesystemAddFile(name, path string, data io.Reader) error // PUT /v3/fs/{name}/{path}
Log() ([]api.LogEvent, error) // GET /v3/log
Log() ([]api.LogEvent, error) // GET /v3/log
Events(ctx context.Context, filters api.EventFilters) (<-chan api.Event, error) // POST /v3/events
Metadata(key string) (api.Metadata, error) // GET /v3/metadata/{key}
MetadataSet(key string, metadata api.Metadata) error // PUT /v3/metadata/{key}
@ -306,7 +308,7 @@ func New(config Config) (RestClient, error) {
if r.client == nil {
r.client = &http.Client{
Timeout: 15 * time.Second,
Timeout: 0,
}
}
@ -435,6 +437,10 @@ func New(config Config) (RestClient, error) {
path: mustNewGlob("/v3/cluster/iam/user"),
constraint: mustNewConstraint("^16.14.0"),
},
{
path: mustNewGlob("/v3/events"),
constraint: mustNewConstraint("^16.14.0"),
},
},
"PUT": {
{
@ -810,7 +816,7 @@ func (r *restclient) request(req *http.Request) (int, io.ReadCloser, error) {
return resp.StatusCode, resp.Body, nil
}
func (r *restclient) stream(method, path string, query *url.Values, header http.Header, contentType string, data io.Reader) (io.ReadCloser, error) {
func (r *restclient) stream(ctx context.Context, method, path string, query *url.Values, header http.Header, contentType string, data io.Reader) (io.ReadCloser, error) {
if err := r.checkVersion(method, r.prefix+path); err != nil {
return nil, err
}
@ -820,7 +826,7 @@ func (r *restclient) stream(method, path string, query *url.Values, header http.
u += "?" + query.Encode()
}
req, err := http.NewRequest(method, u, data)
req, err := http.NewRequestWithContext(ctx, method, u, data)
if err != nil {
return nil, err
}
@ -882,7 +888,10 @@ func (r *restclient) stream(method, path string, query *url.Values, header http.
}
func (r *restclient) call(method, path string, query *url.Values, header http.Header, contentType string, data io.Reader) ([]byte, error) {
body, err := r.stream(method, path, query, header, contentType, data)
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
body, err := r.stream(ctx, method, path, query, header, contentType, data)
if err != nil {
return nil, err
}

View File

@ -1,6 +1,7 @@
package coreclient
import (
"context"
"encoding/json"
"io"
"net/url"
@ -35,7 +36,7 @@ func (r *restclient) ClusterHealthy() (bool, error) {
}
func (r *restclient) ClusterSnapshot() (io.ReadCloser, error) {
return r.stream("GET", "/v3/cluster/snapshot", nil, nil, "", nil)
return r.stream(context.Background(), "GET", "/v3/cluster/snapshot", nil, nil, "", nil)
}
func (r *restclient) ClusterLeave() error {

63
vendor/github.com/datarhei/core-client-go/v16/event.go generated vendored Normal file
View File

@ -0,0 +1,63 @@
package coreclient
import (
"bytes"
"context"
"encoding/json"
"io"
"net/http"
"github.com/datarhei/core-client-go/v16/api"
)
func (r *restclient) Events(ctx context.Context, filters api.EventFilters) (<-chan api.Event, error) {
var buf bytes.Buffer
e := json.NewEncoder(&buf)
e.Encode(filters)
header := make(http.Header)
header.Set("Accept", "application/x-json-stream")
stream, err := r.stream(ctx, "POST", "/v3/events", nil, header, "application/json", &buf)
if err != nil {
return nil, err
}
channel := make(chan api.Event, 128)
go func(stream io.ReadCloser, ch chan<- api.Event) {
defer stream.Close()
defer close(channel)
decoder := json.NewDecoder(stream)
for {
var event api.Event
if err := decoder.Decode(&event); err == io.EOF {
return
} else if err != nil {
event.Component = "error"
event.Message = err.Error()
}
// Don't emit keepalives
if event.Component == "keepalive" {
continue
}
select {
case ch <- event:
default:
// Abort if channel is not drained
return
}
if event.Component == "error" {
return
}
}
}(stream, channel)
return channel, nil
}

View File

@ -1,6 +1,7 @@
package coreclient
import (
"context"
"encoding/json"
"io"
"net/http"
@ -54,6 +55,23 @@ func (r *restclient) FilesystemGetFile(name, path string) (io.ReadCloser, error)
return r.FilesystemGetFileOffset(name, path, 0)
}
type ContextReadCloser struct {
io.ReadCloser
cancel context.CancelFunc
}
func NewContextReadCloser(r io.ReadCloser, cancel context.CancelFunc) *ContextReadCloser {
return &ContextReadCloser{
ReadCloser: r,
cancel: cancel,
}
}
func (r *ContextReadCloser) Close() error {
r.cancel()
return r.ReadCloser.Close()
}
func (r *restclient) FilesystemGetFileOffset(name, path string, offset int64) (io.ReadCloser, error) {
if !filepath.IsAbs(path) {
path = "/" + path
@ -66,7 +84,7 @@ func (r *restclient) FilesystemGetFileOffset(name, path string, offset int64) (i
header.Set("Range", "bytes="+strconv.FormatInt(offset, 10)+"-")
}
return r.stream("GET", "/v3/fs/"+url.PathEscape(name)+path, nil, header, "", nil)
return r.stream(context.Background(), "GET", "/v3/fs/"+url.PathEscape(name)+path, nil, header, "", nil)
}
func (r *restclient) FilesystemDeleteFile(name, path string) error {

View File

@ -78,6 +78,22 @@ func (r *restclient) processList(where string, opts ProcessListOptions) ([]api.P
err = json.Unmarshal(data, &processes)
for i, p := range processes {
if p.Config == nil {
p.Config = &api.ProcessConfig{}
}
if p.State == nil {
p.State = &api.ProcessState{}
}
if p.Report == nil {
p.Report = &api.ProcessReport{}
}
processes[i] = p
}
return processes, err
}

2
vendor/modules.txt vendored
View File

@ -78,7 +78,7 @@ github.com/cespare/xxhash/v2
# github.com/cpuguy83/go-md2man/v2 v2.0.2
## explicit; go 1.11
github.com/cpuguy83/go-md2man/v2/md2man
# github.com/datarhei/core-client-go/v16 v16.11.1-0.20230717195052-016daa63407b
# github.com/datarhei/core-client-go/v16 v16.11.1-0.20230721082756-e22b6cdac41f
## explicit; go 1.18
github.com/datarhei/core-client-go/v16
github.com/datarhei/core-client-go/v16/api