From c4dfdbe635886f672a6d7dfe46811935beb2194e Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Fri, 11 Apr 2025 16:58:01 +0200 Subject: [PATCH] Report more suitable errors --- docs/docs.go | 114 ++++++++++++++++++++++++++++ docs/swagger.json | 114 ++++++++++++++++++++++++++++ docs/swagger.yaml | 76 +++++++++++++++++++ http/client/client.go | 13 ++-- http/handler/api/cluster_process.go | 3 + http/handler/api/process.go | 74 ++++++++++++------ http/handler/api/process_test.go | 2 +- restream/core.go | 48 +++++------- restream/errors.go | 9 +++ restream/task.go | 5 -- 10 files changed, 397 insertions(+), 61 deletions(-) create mode 100644 restream/errors.go diff --git a/docs/docs.go b/docs/docs.go index 2dbbec65..9b1452f1 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -3283,11 +3283,23 @@ const docTemplate = `{ "$ref": "#/definitions/api.Error" } }, + "403": { + "description": "Forbidden", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, "404": { "description": "Not Found", "schema": { "$ref": "#/definitions/api.Error" } + }, + "409": { + "description": "Conflict", + "schema": { + "$ref": "#/definitions/api.Error" + } } } }, @@ -3332,6 +3344,24 @@ const docTemplate = `{ "schema": { "$ref": "#/definitions/api.Error" } + }, + "403": { + "description": "Forbidden", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, + "409": { + "description": "Conflict", + "schema": { + "$ref": "#/definitions/api.Error" + } } } } @@ -3534,6 +3564,18 @@ const docTemplate = `{ "schema": { "$ref": "#/definitions/api.Error" } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, + "409": { + "description": "Conflict", + "schema": { + "$ref": "#/definitions/api.Error" + } } } } @@ -3634,6 +3676,12 @@ const docTemplate = `{ "$ref": "#/definitions/api.Process" } }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, "403": { "description": "Forbidden", "schema": { @@ -3645,6 +3693,12 @@ const docTemplate = `{ "schema": { "$ref": "#/definitions/api.Error" } + }, + "409": { + "description": "Conflict", + "schema": { + "$ref": "#/definitions/api.Error" + } } } }, @@ -3714,6 +3768,12 @@ const docTemplate = `{ "schema": { "$ref": "#/definitions/api.Error" } + }, + "409": { + "description": "Conflict", + "schema": { + "$ref": "#/definitions/api.Error" + } } } }, @@ -3754,6 +3814,12 @@ const docTemplate = `{ "type": "string" } }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, "403": { "description": "Forbidden", "schema": { @@ -3765,6 +3831,12 @@ const docTemplate = `{ "schema": { "$ref": "#/definitions/api.Error" } + }, + "409": { + "description": "Conflict", + "schema": { + "$ref": "#/definitions/api.Error" + } } } } @@ -3836,6 +3908,12 @@ const docTemplate = `{ "schema": { "$ref": "#/definitions/api.Error" } + }, + "409": { + "description": "Conflict", + "schema": { + "$ref": "#/definitions/api.Error" + } } } } @@ -3895,6 +3973,12 @@ const docTemplate = `{ "schema": { "$ref": "#/definitions/api.Error" } + }, + "409": { + "description": "Conflict", + "schema": { + "$ref": "#/definitions/api.Error" + } } } } @@ -3959,6 +4043,12 @@ const docTemplate = `{ "schema": { "$ref": "#/definitions/api.Error" } + }, + "409": { + "description": "Conflict", + "schema": { + "$ref": "#/definitions/api.Error" + } } } }, @@ -4028,6 +4118,12 @@ const docTemplate = `{ "schema": { "$ref": "#/definitions/api.Error" } + }, + "409": { + "description": "Conflict", + "schema": { + "$ref": "#/definitions/api.Error" + } } } } @@ -4522,6 +4618,12 @@ const docTemplate = `{ "schema": { "$ref": "#/definitions/api.Error" } + }, + "409": { + "description": "Conflict", + "schema": { + "$ref": "#/definitions/api.Error" + } } } }, @@ -4591,6 +4693,12 @@ const docTemplate = `{ "schema": { "$ref": "#/definitions/api.Error" } + }, + "409": { + "description": "Conflict", + "schema": { + "$ref": "#/definitions/api.Error" + } } } } @@ -4650,6 +4758,12 @@ const docTemplate = `{ "schema": { "$ref": "#/definitions/api.Error" } + }, + "409": { + "description": "Conflict", + "schema": { + "$ref": "#/definitions/api.Error" + } } } } diff --git a/docs/swagger.json b/docs/swagger.json index 0a6205ad..88611c5e 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -3276,11 +3276,23 @@ "$ref": "#/definitions/api.Error" } }, + "403": { + "description": "Forbidden", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, "404": { "description": "Not Found", "schema": { "$ref": "#/definitions/api.Error" } + }, + "409": { + "description": "Conflict", + "schema": { + "$ref": "#/definitions/api.Error" + } } } }, @@ -3325,6 +3337,24 @@ "schema": { "$ref": "#/definitions/api.Error" } + }, + "403": { + "description": "Forbidden", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, + "409": { + "description": "Conflict", + "schema": { + "$ref": "#/definitions/api.Error" + } } } } @@ -3527,6 +3557,18 @@ "schema": { "$ref": "#/definitions/api.Error" } + }, + "404": { + "description": "Not Found", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, + "409": { + "description": "Conflict", + "schema": { + "$ref": "#/definitions/api.Error" + } } } } @@ -3627,6 +3669,12 @@ "$ref": "#/definitions/api.Process" } }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, "403": { "description": "Forbidden", "schema": { @@ -3638,6 +3686,12 @@ "schema": { "$ref": "#/definitions/api.Error" } + }, + "409": { + "description": "Conflict", + "schema": { + "$ref": "#/definitions/api.Error" + } } } }, @@ -3707,6 +3761,12 @@ "schema": { "$ref": "#/definitions/api.Error" } + }, + "409": { + "description": "Conflict", + "schema": { + "$ref": "#/definitions/api.Error" + } } } }, @@ -3747,6 +3807,12 @@ "type": "string" } }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/api.Error" + } + }, "403": { "description": "Forbidden", "schema": { @@ -3758,6 +3824,12 @@ "schema": { "$ref": "#/definitions/api.Error" } + }, + "409": { + "description": "Conflict", + "schema": { + "$ref": "#/definitions/api.Error" + } } } } @@ -3829,6 +3901,12 @@ "schema": { "$ref": "#/definitions/api.Error" } + }, + "409": { + "description": "Conflict", + "schema": { + "$ref": "#/definitions/api.Error" + } } } } @@ -3888,6 +3966,12 @@ "schema": { "$ref": "#/definitions/api.Error" } + }, + "409": { + "description": "Conflict", + "schema": { + "$ref": "#/definitions/api.Error" + } } } } @@ -3952,6 +4036,12 @@ "schema": { "$ref": "#/definitions/api.Error" } + }, + "409": { + "description": "Conflict", + "schema": { + "$ref": "#/definitions/api.Error" + } } } }, @@ -4021,6 +4111,12 @@ "schema": { "$ref": "#/definitions/api.Error" } + }, + "409": { + "description": "Conflict", + "schema": { + "$ref": "#/definitions/api.Error" + } } } } @@ -4515,6 +4611,12 @@ "schema": { "$ref": "#/definitions/api.Error" } + }, + "409": { + "description": "Conflict", + "schema": { + "$ref": "#/definitions/api.Error" + } } } }, @@ -4584,6 +4686,12 @@ "schema": { "$ref": "#/definitions/api.Error" } + }, + "409": { + "description": "Conflict", + "schema": { + "$ref": "#/definitions/api.Error" + } } } } @@ -4643,6 +4751,12 @@ "schema": { "$ref": "#/definitions/api.Error" } + }, + "409": { + "description": "Conflict", + "schema": { + "$ref": "#/definitions/api.Error" + } } } } diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 4f62d80e..95498e94 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -4971,10 +4971,18 @@ paths: description: Bad Request schema: $ref: '#/definitions/api.Error' + "403": + description: Forbidden + schema: + $ref: '#/definitions/api.Error' "404": description: Not Found schema: $ref: '#/definitions/api.Error' + "409": + description: Conflict + schema: + $ref: '#/definitions/api.Error' security: - ApiKeyAuth: [] summary: Retrieve JSON metadata from a key @@ -5006,6 +5014,18 @@ paths: description: Bad Request schema: $ref: '#/definitions/api.Error' + "403": + description: Forbidden + schema: + $ref: '#/definitions/api.Error' + "404": + description: Not Found + schema: + $ref: '#/definitions/api.Error' + "409": + description: Conflict + schema: + $ref: '#/definitions/api.Error' security: - ApiKeyAuth: [] summary: Add JSON metadata under the given key @@ -5144,6 +5164,14 @@ paths: description: Forbidden schema: $ref: '#/definitions/api.Error' + "404": + description: Not Found + schema: + $ref: '#/definitions/api.Error' + "409": + description: Conflict + schema: + $ref: '#/definitions/api.Error' security: - ApiKeyAuth: [] summary: Add a new process @@ -5170,6 +5198,10 @@ paths: description: OK schema: type: string + "400": + description: Bad Request + schema: + $ref: '#/definitions/api.Error' "403": description: Forbidden schema: @@ -5178,6 +5210,10 @@ paths: description: Not Found schema: $ref: '#/definitions/api.Error' + "409": + description: Conflict + schema: + $ref: '#/definitions/api.Error' security: - ApiKeyAuth: [] summary: Delete a process by its ID @@ -5209,6 +5245,10 @@ paths: description: OK schema: $ref: '#/definitions/api.Process' + "400": + description: Bad Request + schema: + $ref: '#/definitions/api.Error' "403": description: Forbidden schema: @@ -5217,6 +5257,10 @@ paths: description: Not Found schema: $ref: '#/definitions/api.Error' + "409": + description: Conflict + schema: + $ref: '#/definitions/api.Error' security: - ApiKeyAuth: [] summary: List a process by its ID @@ -5262,6 +5306,10 @@ paths: description: Not Found schema: $ref: '#/definitions/api.Error' + "409": + description: Conflict + schema: + $ref: '#/definitions/api.Error' security: - ApiKeyAuth: [] summary: Replace an existing process @@ -5308,6 +5356,10 @@ paths: description: Not Found schema: $ref: '#/definitions/api.Error' + "409": + description: Conflict + schema: + $ref: '#/definitions/api.Error' security: - ApiKeyAuth: [] summary: Issue a command to a process @@ -5347,6 +5399,10 @@ paths: description: Not Found schema: $ref: '#/definitions/api.Error' + "409": + description: Conflict + schema: + $ref: '#/definitions/api.Error' security: - ApiKeyAuth: [] summary: Get the configuration of a process @@ -5390,6 +5446,10 @@ paths: description: Not Found schema: $ref: '#/definitions/api.Error' + "409": + description: Conflict + schema: + $ref: '#/definitions/api.Error' security: - ApiKeyAuth: [] summary: Retrieve JSON metadata stored with a process under a key @@ -5439,6 +5499,10 @@ paths: description: Not Found schema: $ref: '#/definitions/api.Error' + "409": + description: Conflict + schema: + $ref: '#/definitions/api.Error' security: - ApiKeyAuth: [] summary: Add JSON metadata with a process under the given key @@ -5770,6 +5834,10 @@ paths: description: Not Found schema: $ref: '#/definitions/api.Error' + "409": + description: Conflict + schema: + $ref: '#/definitions/api.Error' security: - ApiKeyAuth: [] summary: Get the logs of a process @@ -5815,6 +5883,10 @@ paths: description: Not Found schema: $ref: '#/definitions/api.Error' + "409": + description: Conflict + schema: + $ref: '#/definitions/api.Error' security: - ApiKeyAuth: [] summary: Set the report history a process @@ -5853,6 +5925,10 @@ paths: description: Not Found schema: $ref: '#/definitions/api.Error' + "409": + description: Conflict + schema: + $ref: '#/definitions/api.Error' security: - ApiKeyAuth: [] summary: Get the state of a process diff --git a/http/client/client.go b/http/client/client.go index d877076b..a7dfcd65 100644 --- a/http/client/client.go +++ b/http/client/client.go @@ -844,7 +844,7 @@ func (r *restclient) request(req *http.Request) (int, io.ReadCloser, error) { func (r *restclient) stream(ctx context.Context, method, path string, query *url.Values, header http.Header, contentType string, data io.Reader) (io.ReadCloser, error) { if err := r.checkVersion(method, r.prefix+path); err != nil { - return nil, err + return nil, api.Err(http.StatusNotImplemented, "", "%s", err.Error()) } u := r.address + r.prefix + path @@ -854,7 +854,7 @@ func (r *restclient) stream(ctx context.Context, method, path string, query *url req, err := http.NewRequestWithContext(ctx, method, u, data) if err != nil { - return nil, err + return nil, api.Err(http.StatusInternalServerError, "", "create request: %s", err.Error()) } if header != nil { @@ -871,7 +871,7 @@ func (r *restclient) stream(ctx context.Context, method, path string, query *url if r.accessToken.IsExpired() { if err := r.refresh(); err != nil { if err := r.login(); err != nil { - return nil, err + return nil, api.Err(http.StatusUnauthorized, "", "%s", err.Error()) } } } @@ -881,7 +881,7 @@ func (r *restclient) stream(ctx context.Context, method, path string, query *url status, body, err := r.request(req) if err != nil { - return nil, err + return nil, api.Err(http.StatusInternalServerError, "", "request failed: %s", err.Error()) } if status < 200 || status >= 300 { @@ -921,12 +921,15 @@ func (r *restclient) call(method, path string, query *url.Values, header http.He body, err := r.stream(ctx, method, path, query, header, contentType, data) if err != nil { - return nil, fmt.Errorf("%s %s: %w", method, path, err) + return nil, err } defer body.Close() x, err := io.ReadAll(body) + if err != nil { + err = api.Err(http.StatusInternalServerError, "", "read body: %s", err.Error()) + } return x, err } diff --git a/http/handler/api/cluster_process.go b/http/handler/api/cluster_process.go index 11b3894d..f470d243 100644 --- a/http/handler/api/cluster_process.go +++ b/http/handler/api/cluster_process.go @@ -413,6 +413,9 @@ func (h *ClusterHandler) ProcessSetCommand(c echo.Context) error { } if err := h.cluster.ProcessSetCommand("", pid, command.Command); err != nil { + if cerr, ok := err.(api.Error); ok { + return api.Err(cerr.Code, "", "comm failed: %s", cerr.Error()) + } return api.Err(http.StatusNotFound, "", "command failed: %s", err.Error()) } diff --git a/http/handler/api/process.go b/http/handler/api/process.go index 70d381da..bb3af127 100644 --- a/http/handler/api/process.go +++ b/http/handler/api/process.go @@ -1,6 +1,7 @@ package api import ( + "errors" "fmt" "net/http" "sort" @@ -44,6 +45,8 @@ func NewProcess(restream restream.Restreamer, iam iam.IAM) *ProcessHandler { // @Success 200 {object} api.ProcessConfig // @Failure 400 {object} api.Error // @Failure 403 {object} api.Error +// @Failure 404 {object} api.Error +// @Failure 409 {object} api.Error // @Security ApiKeyAuth // @Router /api/v3/process [post] func (h *ProcessHandler) Add(c echo.Context) error { @@ -82,7 +85,7 @@ func (h *ProcessHandler) Add(c echo.Context) error { config, metadata := process.Marshal() if err := h.restream.AddProcess(config); err != nil { - return api.Err(http.StatusBadRequest, "", "invalid process config: %s", err.Error()) + return h.apiErrorFromError(err) } tid := app.ProcessID{ @@ -203,8 +206,10 @@ func (h *ProcessHandler) GetAll(c echo.Context) error { // @Param domain query string false "Domain to act on" // @Param filter query string false "Comma separated list of fields (config, state, report, metadata) to be part of the output. If empty, all fields will be part of the output" // @Success 200 {object} api.Process +// @Failure 400 {object} api.Error // @Failure 403 {object} api.Error // @Failure 404 {object} api.Error +// @Failure 409 {object} api.Error // @Security ApiKeyAuth // @Router /api/v3/process/{id} [get] func (h *ProcessHandler) Get(c echo.Context) error { @@ -224,7 +229,7 @@ func (h *ProcessHandler) Get(c echo.Context) error { p, err := h.getProcess(tid, newFilter(filter)) if err != nil { - return api.Err(http.StatusNotFound, "", "unknown process ID: %s", err.Error()) + return h.apiErrorFromError(err) } return c.JSON(http.StatusOK, p) @@ -239,8 +244,10 @@ func (h *ProcessHandler) Get(c echo.Context) error { // @Param id path string true "Process ID" // @Param domain query string false "Domain to act on" // @Success 200 {string} string +// @Failure 400 {object} api.Error // @Failure 403 {object} api.Error // @Failure 404 {object} api.Error +// @Failure 409 {object} api.Error // @Security ApiKeyAuth // @Router /api/v3/process/{id} [delete] func (h *ProcessHandler) Delete(c echo.Context) error { @@ -261,11 +268,11 @@ func (h *ProcessHandler) Delete(c echo.Context) error { } if err := h.restream.StopProcess(tid); err != nil { - return api.Err(http.StatusNotFound, "", "unknown process ID: %s", err.Error()) + return h.apiErrorFromError(err) } if err := h.restream.DeleteProcess(tid); err != nil { - return api.Err(http.StatusInternalServerError, "", "process can't be deleted: %s", err.Error()) + return h.apiErrorFromError(err) } return c.JSON(http.StatusOK, "OK") @@ -285,6 +292,7 @@ func (h *ProcessHandler) Delete(c echo.Context) error { // @Failure 400 {object} api.Error // @Failure 403 {object} api.Error // @Failure 404 {object} api.Error +// @Failure 409 {object} api.Error // @Security ApiKeyAuth // @Router /api/v3/process/{id} [put] func (h *ProcessHandler) Update(c echo.Context) error { @@ -334,11 +342,7 @@ func (h *ProcessHandler) Update(c echo.Context) error { config, metadata := process.Marshal() if err := h.restream.UpdateProcess(tid, config); err != nil { - if err == restream.ErrUnknownProcess { - return api.Err(http.StatusNotFound, "", "process not found: %s", id) - } - - return api.Err(http.StatusBadRequest, "", "process can't be updated: %s", err.Error()) + return h.apiErrorFromError(err) } tid = app.ProcessID{ @@ -367,6 +371,7 @@ func (h *ProcessHandler) Update(c echo.Context) error { // @Failure 400 {object} api.Error // @Failure 403 {object} api.Error // @Failure 404 {object} api.Error +// @Failure 409 {object} api.Error // @Security ApiKeyAuth // @Router /api/v3/process/{id}/command [put] func (h *ProcessHandler) Command(c echo.Context) error { @@ -403,7 +408,7 @@ func (h *ProcessHandler) Command(c echo.Context) error { } if err != nil { - return api.Err(http.StatusBadRequest, "", "command failed: %s", err.Error()) + return h.apiErrorFromError(err) } return c.JSON(http.StatusOK, "OK") @@ -421,6 +426,7 @@ func (h *ProcessHandler) Command(c echo.Context) error { // @Failure 400 {object} api.Error // @Failure 403 {object} api.Error // @Failure 404 {object} api.Error +// @Failure 409 {object} api.Error // @Security ApiKeyAuth // @Router /api/v3/process/{id}/config [get] func (h *ProcessHandler) GetConfig(c echo.Context) error { @@ -439,7 +445,7 @@ func (h *ProcessHandler) GetConfig(c echo.Context) error { p, err := h.restream.GetProcess(tid) if err != nil { - return api.Err(http.StatusNotFound, "", "unknown process ID: %s", err.Error()) + return h.apiErrorFromError(err) } config := api.ProcessConfig{} @@ -460,6 +466,7 @@ func (h *ProcessHandler) GetConfig(c echo.Context) error { // @Failure 400 {object} api.Error // @Failure 403 {object} api.Error // @Failure 404 {object} api.Error +// @Failure 409 {object} api.Error // @Security ApiKeyAuth // @Router /api/v3/process/{id}/state [get] func (h *ProcessHandler) GetState(c echo.Context) error { @@ -478,7 +485,7 @@ func (h *ProcessHandler) GetState(c echo.Context) error { s, err := h.restream.GetProcessState(tid) if err != nil { - return api.Err(http.StatusNotFound, "", "unknown process ID: %s", err.Error()) + return h.apiErrorFromError(err) } state := api.ProcessState{} @@ -501,6 +508,7 @@ func (h *ProcessHandler) GetState(c echo.Context) error { // @Failure 400 {object} api.Error // @Failure 403 {object} api.Error // @Failure 404 {object} api.Error +// @Failure 409 {object} api.Error // @Security ApiKeyAuth // @Router /api/v3/process/{id}/report [get] func (h *ProcessHandler) GetReport(c echo.Context) error { @@ -540,7 +548,7 @@ func (h *ProcessHandler) GetReport(c echo.Context) error { l, err := h.restream.GetProcessReport(tid) if err != nil { - return api.Err(http.StatusNotFound, "", "unknown process ID: %s", err.Error()) + return h.apiErrorFromError(err) } report := api.ProcessReport{} @@ -615,6 +623,7 @@ func (h *ProcessHandler) GetReport(c echo.Context) error { // @Failure 400 {object} api.Error // @Failure 403 {object} api.Error // @Failure 404 {object} api.Error +// @Failure 409 {object} api.Error // @Security ApiKeyAuth // @Router /api/v3/process/{id}/report [put] func (h *ProcessHandler) SetReport(c echo.Context) error { @@ -642,7 +651,7 @@ func (h *ProcessHandler) SetReport(c echo.Context) error { appreport := report.Marshal() if err := h.restream.SetProcessReport(tid, &appreport); err != nil { - return api.Err(http.StatusNotFound, "", "unknown process ID: %s", err.Error()) + return h.apiErrorFromError(err) } return c.JSON(http.StatusOK, "OK") @@ -841,6 +850,7 @@ func (h *ProcessHandler) ReloadSkills(c echo.Context) error { // @Failure 400 {object} api.Error // @Failure 403 {object} api.Error // @Failure 404 {object} api.Error +// @Failure 409 {object} api.Error // @Security ApiKeyAuth // @Router /api/v3/process/{id}/metadata/{key} [get] func (h *ProcessHandler) GetProcessMetadata(c echo.Context) error { @@ -860,11 +870,7 @@ func (h *ProcessHandler) GetProcessMetadata(c echo.Context) error { data, err := h.restream.GetProcessMetadata(tid, key) if err != nil { - if err == restream.ErrMetadataKeyNotFound { - return api.Err(http.StatusNotFound, "", "unknown key: %s", err.Error()) - } - - return api.Err(http.StatusNotFound, "", "unknown process ID: %s", err.Error()) + return h.apiErrorFromError(err) } return c.JSON(http.StatusOK, data) @@ -884,6 +890,7 @@ func (h *ProcessHandler) GetProcessMetadata(c echo.Context) error { // @Failure 400 {object} api.Error // @Failure 403 {object} api.Error // @Failure 404 {object} api.Error +// @Failure 409 {object} api.Error // @Security ApiKeyAuth // @Router /api/v3/process/{id}/metadata/{key} [put] func (h *ProcessHandler) SetProcessMetadata(c echo.Context) error { @@ -912,7 +919,7 @@ func (h *ProcessHandler) SetProcessMetadata(c echo.Context) error { } if err := h.restream.SetProcessMetadata(tid, key, data); err != nil { - return api.Err(http.StatusNotFound, "", "unknown process ID: %s", err.Error()) + return h.apiErrorFromError(err) } return c.JSON(http.StatusOK, data) @@ -926,8 +933,10 @@ func (h *ProcessHandler) SetProcessMetadata(c echo.Context) error { // @Produce json // @Param key path string true "Key for data store" // @Success 200 {object} api.Metadata -// @Failure 404 {object} api.Error // @Failure 400 {object} api.Error +// @Failure 403 {object} api.Error +// @Failure 404 {object} api.Error +// @Failure 409 {object} api.Error // @Security ApiKeyAuth // @Router /api/v3/metadata/{key} [get] func (h *ProcessHandler) GetMetadata(c echo.Context) error { @@ -935,7 +944,7 @@ func (h *ProcessHandler) GetMetadata(c echo.Context) error { data, err := h.restream.GetMetadata(key) if err != nil { - return api.Err(http.StatusNotFound, "", "metadata not found: %s", err.Error()) + return h.apiErrorFromError(err) } return c.JSON(http.StatusOK, data) @@ -951,6 +960,9 @@ func (h *ProcessHandler) GetMetadata(c echo.Context) error { // @Param data body api.Metadata true "Arbitrary JSON data" // @Success 200 {object} api.Metadata // @Failure 400 {object} api.Error +// @Failure 403 {object} api.Error +// @Failure 404 {object} api.Error +// @Failure 409 {object} api.Error // @Security ApiKeyAuth // @Router /api/v3/metadata/{key} [put] func (h *ProcessHandler) SetMetadata(c echo.Context) error { @@ -967,7 +979,7 @@ func (h *ProcessHandler) SetMetadata(c echo.Context) error { } if err := h.restream.SetMetadata(key, data); err != nil { - return api.Err(http.StatusBadRequest, "", "invalid metadata: %s", err.Error()) + return h.apiErrorFromError(err) } return c.JSON(http.StatusOK, data) @@ -1083,3 +1095,19 @@ func (h *ProcessHandler) getProcess(id app.ProcessID, filter filter) (api.Proces return info, nil } + +func (h *ProcessHandler) apiErrorFromError(err error) error { + if errors.Is(err, restream.ErrUnknownProcess) { + return api.Err(http.StatusNotFound, "", "%s", err.Error()) + } else if errors.Is(err, restream.ErrProcessExists) { + return api.Err(http.StatusConflict, "", "%s", err.Error()) + } else if errors.Is(err, restream.ErrInvalidProcessConfig) { + return api.Err(http.StatusBadRequest, "", "%s", err.Error()) + } else if errors.Is(err, restream.ErrMetadataKeyNotFound) { + return api.Err(http.StatusNotFound, "", "%s", err.Error()) + } else if errors.Is(err, restream.ErrMetadataKeyRequired) { + return api.Err(http.StatusBadRequest, "", "%s", err.Error()) + } + + return api.Err(http.StatusBadRequest, "", "%s", err.Error()) +} diff --git a/http/handler/api/process_test.go b/http/handler/api/process_test.go index ff607405..a70238e7 100644 --- a/http/handler/api/process_test.go +++ b/http/handler/api/process_test.go @@ -642,7 +642,7 @@ func TestProcessCommandNotFound(t *testing.T) { require.NoError(t, err) command := mock.Read(t, "./fixtures/commandStart.json") - mock.Request(t, http.StatusBadRequest, router, "PUT", "/test/command", command) + mock.Request(t, http.StatusNotFound, router, "PUT", "/test/command", command) } func TestProcessCommandInvalid(t *testing.T) { diff --git a/restream/core.go b/restream/core.go index 12fd9799..e4105ca3 100644 --- a/restream/core.go +++ b/restream/core.go @@ -2,7 +2,6 @@ package restream import ( "context" - "errors" "fmt" "path/filepath" "regexp" @@ -527,11 +526,6 @@ func (r *restream) CreatedAt() time.Time { return r.createdAt } -var ErrUnknownProcess = errors.New("unknown process") -var ErrUnknownProcessGroup = errors.New("unknown process group") -var ErrProcessExists = errors.New("process already exists") -var ErrForbidden = errors.New("forbidden") - func (r *restream) AddProcess(config *app.Config) error { t, err := r.createTask(config) if err != nil { @@ -566,7 +560,7 @@ func (r *restream) createTask(config *app.Config) (*task, error) { id := strings.TrimSpace(config.ID) if len(id) == 0 { - return nil, fmt.Errorf("an empty ID is not allowed") + return nil, fmt.Errorf("an empty ID is not allowed: %w", ErrInvalidProcessConfig) } config.FFVersion = "^" + r.ffmpeg.Skills().FFmpeg.Version @@ -821,7 +815,7 @@ func (r *restream) unsetPlayoutPorts(t *task) { // otherwise nil and whether there is a disk filesystem involved. func validateConfig(config *app.Config, fss []rfs.Filesystem, ffmpeg ffmpeg.FFmpeg) (bool, error) { if len(config.Input) == 0 { - return false, fmt.Errorf("at least one input must be defined for the process '%s'", config.ID) + return false, fmt.Errorf("at least one input must be defined for the process '%s': %w", config.ID, ErrInvalidProcessConfig) } var err error @@ -832,11 +826,11 @@ func validateConfig(config *app.Config, fss []rfs.Filesystem, ffmpeg ffmpeg.FFmp io.ID = strings.TrimSpace(io.ID) if len(io.ID) == 0 { - return false, fmt.Errorf("empty input IDs are not allowed (process '%s')", config.ID) + return false, fmt.Errorf("empty input IDs are not allowed (process '%s': %w)", config.ID, ErrInvalidProcessConfig) } if _, found := ids[io.ID]; found { - return false, fmt.Errorf("the input ID '%s' is already in use for the process `%s`", io.ID, config.ID) + return false, fmt.Errorf("the input ID '%s' is already in use for the process '%s': %w", io.ID, config.ID, ErrInvalidProcessConfig) } ids[io.ID] = true @@ -844,7 +838,7 @@ func validateConfig(config *app.Config, fss []rfs.Filesystem, ffmpeg ffmpeg.FFmp io.Address = strings.TrimSpace(io.Address) if len(io.Address) == 0 { - return false, fmt.Errorf("the address for input '#%s:%s' must not be empty", config.ID, io.ID) + return false, fmt.Errorf("the address for input '#%s:%s' must not be empty: %w", config.ID, io.ID, ErrInvalidProcessConfig) } maxFails := 0 @@ -866,7 +860,7 @@ func validateConfig(config *app.Config, fss []rfs.Filesystem, ffmpeg ffmpeg.FFmp } if len(config.Output) == 0 { - return false, fmt.Errorf("at least one output must be defined for the process '#%s'", config.ID) + return false, fmt.Errorf("at least one output must be defined for the process '#%s': %w", config.ID, ErrInvalidProcessConfig) } ids = map[string]bool{} @@ -876,11 +870,11 @@ func validateConfig(config *app.Config, fss []rfs.Filesystem, ffmpeg ffmpeg.FFmp io.ID = strings.TrimSpace(io.ID) if len(io.ID) == 0 { - return false, fmt.Errorf("empty output IDs are not allowed (process '%s')", config.ID) + return false, fmt.Errorf("empty output IDs are not allowed (process '%s'): %w", config.ID, ErrInvalidProcessConfig) } if _, found := ids[io.ID]; found { - return false, fmt.Errorf("the output ID '%s' is already in use for the process `%s`", io.ID, config.ID) + return false, fmt.Errorf("the output ID '%s' is already in use for the process '%s': %w", io.ID, config.ID, ErrInvalidProcessConfig) } ids[io.ID] = true @@ -888,7 +882,7 @@ func validateConfig(config *app.Config, fss []rfs.Filesystem, ffmpeg ffmpeg.FFmp io.Address = strings.TrimSpace(io.Address) if len(io.Address) == 0 { - return false, fmt.Errorf("the address for output '#%s:%s' must not be empty", config.ID, io.ID) + return false, fmt.Errorf("the address for output '#%s:%s' must not be empty: %w", config.ID, io.ID, ErrInvalidProcessConfig) } maxFails := 0 @@ -932,7 +926,7 @@ func validateInputAddress(address, _ string, ffmpeg ffmpeg.FFmpeg) (string, erro } if !ffmpeg.ValidateInputAddress(address) { - return address, fmt.Errorf("address is not allowed") + return address, fmt.Errorf("address is not allowed: %w", ErrInvalidProcessConfig) } return address, nil @@ -976,7 +970,7 @@ func validateOutputAddress(address, basedir string, ffmpeg ffmpeg.FFmpeg) (strin } if !ffmpeg.ValidateOutputAddress(address) { - return address, false, fmt.Errorf("address is not allowed") + return address, false, fmt.Errorf("address is not allowed: %w", ErrInvalidProcessConfig) } return address, false, nil @@ -994,18 +988,18 @@ func validateOutputAddress(address, basedir string, ffmpeg ffmpeg.FFmpeg) (strin if strings.HasPrefix(address, "/dev/") { if !ffmpeg.ValidateOutputAddress("file:" + address) { - return address, false, fmt.Errorf("address is not allowed") + return address, false, fmt.Errorf("address is not allowed: %w", ErrInvalidProcessConfig) } return "file:" + address, false, nil } if !strings.HasPrefix(address, basedir) { - return address, false, fmt.Errorf("%s is not inside of %s", address, basedir) + return address, false, fmt.Errorf("%s is not inside of %s: %w", address, basedir, ErrInvalidProcessConfig) } if !ffmpeg.ValidateOutputAddress("file:" + address) { - return address, false, fmt.Errorf("address is not allowed") + return address, false, fmt.Errorf("address is not allowed: %w", ErrInvalidProcessConfig) } return "file:" + address, true, nil @@ -1041,7 +1035,7 @@ func (r *restream) resolveAddress(tasks *Storage, id, address string) (string, e } if matches["id"] == id { - return address, fmt.Errorf("self-reference is not allowed (%s)", address) + return address, fmt.Errorf("self-reference is not allowed (%s): %w", address, ErrInvalidProcessConfig) } var t *task = nil @@ -1059,7 +1053,7 @@ func (r *restream) resolveAddress(tasks *Storage, id, address string) (string, e }) if t == nil { - return address, fmt.Errorf("unknown process '%s' in domain '%s' (%s)", matches["id"], matches["domain"], address) + return address, fmt.Errorf("unknown process '%s' in domain '%s' (%s): %w", matches["id"], matches["domain"], address, ErrInvalidProcessConfig) } defer t.Release(ttoken) @@ -1118,12 +1112,12 @@ func (r *restream) resolveAddress(tasks *Storage, id, address string) (string, e return r.rewrite.RewriteAddress(addresses[0], t.config.Owner, rewrite.READ), nil } - return address, fmt.Errorf("the process '%s' in group '%s' has no outputs with the ID '%s' (%s)", matches["id"], matches["group"], matches["output"], address) + return address, fmt.Errorf("the process '%s' in group '%s' has no outputs with the ID '%s' (%s): %w", matches["id"], matches["group"], matches["output"], address, ErrInvalidProcessConfig) } func parseAddressReference(address string) (map[string]string, error) { if len(address) == 0 { - return nil, fmt.Errorf("empty address") + return nil, fmt.Errorf("empty address: %w", ErrInvalidProcessConfig) } if address[0] != '#' { @@ -1161,7 +1155,7 @@ func parseAddressReference(address string) (map[string]string, error) { } if idEnd < 0 { - return nil, fmt.Errorf("invalid format (%s)", address) + return nil, fmt.Errorf("invalid format (%s): %w", address, ErrInvalidProcessConfig) } results["id"] = address[1:idEnd] @@ -1629,7 +1623,7 @@ func (r *restream) GetPlayout(id app.ProcessID, inputid string) (string, error) defer task.Release(token) if !task.IsValid() { - return "", fmt.Errorf("invalid process definition") + return "", ErrInvalidProcessConfig } port, ok := task.playout[inputid] @@ -1674,7 +1668,7 @@ func (r *restream) SetMetadata(key string, data interface{}) error { defer r.lock.Unlock() if len(key) == 0 { - return fmt.Errorf("a key for storing the data has to be provided") + return ErrMetadataKeyRequired } if r.metadata == nil { diff --git a/restream/errors.go b/restream/errors.go new file mode 100644 index 00000000..d43d32b4 --- /dev/null +++ b/restream/errors.go @@ -0,0 +1,9 @@ +package restream + +import "errors" + +var ErrUnknownProcess = errors.New("unknown process") +var ErrProcessExists = errors.New("process already exists") +var ErrInvalidProcessConfig = errors.New("invalid process config") +var ErrMetadataKeyNotFound = errors.New("unknown metadata key") +var ErrMetadataKeyRequired = errors.New("a key for storing metadata is required") diff --git a/restream/task.go b/restream/task.go index f0ae7a78..cda0da05 100644 --- a/restream/task.go +++ b/restream/task.go @@ -1,7 +1,6 @@ package restream import ( - "errors" "maps" "time" @@ -15,10 +14,6 @@ import ( "github.com/puzpuzpuz/xsync/v3" ) -var ErrInvalidProcessConfig = errors.New("invalid process config") -var ErrMetadataKeyNotFound = errors.New("unknown metadata key") -var ErrMetadataKeyRequired = errors.New("a key for storing metadata is required") - type task struct { valid bool id string // ID of the task/process