WIP: add verfiers, fix bugs, implement middleware, support rtmp

This commit is contained in:
Ingo Oppermann 2023-02-09 21:33:45 +01:00
parent 11e55fc2c7
commit 312f65d110
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
11 changed files with 957 additions and 353 deletions

View File

@ -23,7 +23,6 @@ import (
"github.com/datarhei/core/v16/http"
"github.com/datarhei/core/v16/http/cache"
httpfs "github.com/datarhei/core/v16/http/fs"
"github.com/datarhei/core/v16/http/jwt"
"github.com/datarhei/core/v16/http/router"
"github.com/datarhei/core/v16/iam"
"github.com/datarhei/core/v16/io/fs"
@ -81,7 +80,6 @@ type api struct {
cache cache.Cacher
mainserver *gohttp.Server
sidecarserver *gohttp.Server
httpjwt jwt.JWT
update update.Checker
replacer replace.Replacer
iam iam.IAM
@ -395,12 +393,6 @@ func (a *api) start() error {
},
Auth0: iam.UserAuthAPIAuth0{
Enable: cfg.API.Auth.Auth0.Enable,
User: cfg.API.Auth.Auth0.Tenants[0].Users[0],
Tenant: iam.Auth0Tenant{
Domain: cfg.API.Auth.Auth0.Tenants[0].Domain,
Audience: cfg.API.Auth.Auth0.Tenants[0].Audience,
ClientID: cfg.API.Auth.Auth0.Tenants[0].ClientID,
},
},
},
Services: iam.UserAuthServices{
@ -413,6 +405,15 @@ func (a *api) start() error {
},
}
if cfg.API.Auth.Auth0.Enable {
superuser.Auth.API.Auth0.User = cfg.API.Auth.Auth0.Tenants[0].Users[0]
superuser.Auth.API.Auth0.Tenant = iam.Auth0Tenant{
Domain: cfg.API.Auth.Auth0.Tenants[0].Domain,
Audience: cfg.API.Auth.Auth0.Tenants[0].Audience,
ClientID: cfg.API.Auth.Auth0.Tenants[0].ClientID,
}
}
fs, err := fs.NewRootedDiskFilesystem(fs.RootedDiskConfig{
Root: filepath.Join(cfg.DB.Dir, "iam"),
})
@ -420,7 +421,18 @@ func (a *api) start() error {
return err
}
iam, err := iam.NewIAM(fs, superuser)
secret := rand.String(32)
if len(cfg.API.Auth.JWT.Secret) != 0 {
secret = cfg.API.Auth.Username + cfg.API.Auth.Password + cfg.API.Auth.JWT.Secret
}
fmt.Printf("superuser: %+v\n", superuser)
iam, err := iam.NewIAM(iam.Config{
FS: fs,
Superuser: superuser,
JWTSecret: secret,
})
if err != nil {
return fmt.Errorf("iam: %w", err)
}
@ -657,49 +669,49 @@ func (a *api) start() error {
}
a.restream = restream
/*
var httpjwt jwt.JWT
var httpjwt jwt.JWT
if cfg.API.Auth.Enable {
secret := rand.String(32)
if len(cfg.API.Auth.JWT.Secret) != 0 {
secret = cfg.API.Auth.Username + cfg.API.Auth.Password + cfg.API.Auth.JWT.Secret
}
var err error
httpjwt, err = jwt.New(jwt.Config{
Realm: app.Name,
Secret: secret,
SkipLocalhost: cfg.API.Auth.DisableLocalhost,
})
if err != nil {
return fmt.Errorf("unable to create JWT provider: %w", err)
}
if validator, err := jwt.NewLocalValidator(a.iam); err == nil {
if err := httpjwt.AddValidator(app.Name, validator); err != nil {
return fmt.Errorf("unable to add local JWT validator: %w", err)
if cfg.API.Auth.Enable {
secret := rand.String(32)
if len(cfg.API.Auth.JWT.Secret) != 0 {
secret = cfg.API.Auth.Username + cfg.API.Auth.Password + cfg.API.Auth.JWT.Secret
}
} else {
return fmt.Errorf("unable to create local JWT validator: %w", err)
}
if cfg.API.Auth.Auth0.Enable {
for _, t := range cfg.API.Auth.Auth0.Tenants {
if validator, err := jwt.NewAuth0Validator(a.iam); err == nil {
if err := httpjwt.AddValidator("https://"+t.Domain+"/", validator); err != nil {
return fmt.Errorf("unable to add Auth0 JWT validator: %w", err)
var err error
httpjwt, err = jwt.New(jwt.Config{
Realm: app.Name,
Secret: secret,
SkipLocalhost: cfg.API.Auth.DisableLocalhost,
})
if err != nil {
return fmt.Errorf("unable to create JWT provider: %w", err)
}
if validator, err := jwt.NewLocalValidator(a.iam); err == nil {
if err := httpjwt.AddValidator(app.Name, validator); err != nil {
return fmt.Errorf("unable to add local JWT validator: %w", err)
}
} else {
return fmt.Errorf("unable to create local JWT validator: %w", err)
}
if cfg.API.Auth.Auth0.Enable {
for _, t := range cfg.API.Auth.Auth0.Tenants {
if validator, err := jwt.NewAuth0Validator(a.iam); err == nil {
if err := httpjwt.AddValidator("https://"+t.Domain+"/", validator); err != nil {
return fmt.Errorf("unable to add Auth0 JWT validator: %w", err)
}
} else {
return fmt.Errorf("unable to create Auth0 JWT validator: %w", err)
}
} else {
return fmt.Errorf("unable to create Auth0 JWT validator: %w", err)
}
}
}
}
a.httpjwt = httpjwt
a.httpjwt = httpjwt
*/
metrics, err := monitor.NewHistory(monitor.HistoryConfig{
Enable: cfg.Metrics.Enable,
Timerange: time.Duration(cfg.Metrics.Range) * time.Second,
@ -1056,7 +1068,6 @@ func (a *api) start() error {
},
RTMP: a.rtmpserver,
SRT: a.srtserver,
JWT: a.httpjwt,
Config: a.config.store,
Sessions: a.sessions,
Router: router,
@ -1345,11 +1356,6 @@ func (a *api) stop() {
a.iam.Close()
}
// Stop JWT authentication
if a.httpjwt != nil {
a.httpjwt.ClearValidators()
}
if a.update != nil {
a.update.Stop()
a.update = nil

View File

@ -35,6 +35,18 @@ func NewAbout(restream restream.Restreamer, auths []string) *AboutHandler {
// @Security ApiKeyAuth
// @Router /api [get]
func (p *AboutHandler) About(c echo.Context) error {
user, _ := c.Get("user").(string)
if user == "$anon" {
return c.JSON(http.StatusOK, api.MinimalAbout{
App: app.Name,
Auths: p.auths,
Version: api.VersionMinimal{
Number: app.Version.MajorString(),
},
})
}
createdAt := p.restream.CreatedAt()
about := api.About{

53
http/handler/api/jwt.go Normal file
View File

@ -0,0 +1,53 @@
package api
import (
"net/http"
"github.com/datarhei/core/v16/http/api"
"github.com/datarhei/core/v16/iam"
"github.com/labstack/echo/v4"
)
type JWTHandler struct {
iam iam.IAM
}
func NewJWT(iam iam.IAM) *JWTHandler {
return &JWTHandler{
iam: iam,
}
}
func (j *JWTHandler) Login(c echo.Context) error {
subject, ok := c.Get("user").(string)
if !ok {
return api.Err(http.StatusForbidden, "Invalid token")
}
at, rt, err := j.iam.CreateJWT(subject)
if err != nil {
return api.Err(http.StatusInternalServerError, "Failed to create JWT", "%s", err)
}
return c.JSON(http.StatusOK, api.JWT{
AccessToken: at,
RefreshToken: rt,
})
}
func (j *JWTHandler) Refresh(c echo.Context) error {
subject, ok := c.Get("user").(string)
if !ok {
return api.Err(http.StatusForbidden, "Invalid token")
}
at, _, err := j.iam.CreateJWT(subject)
if err != nil {
return api.Err(http.StatusInternalServerError, "Failed to create JWT", "%s", err)
}
return c.JSON(http.StatusOK, api.JWTRefresh{
AccessToken: at,
})
}

View File

@ -4,11 +4,13 @@ import (
"errors"
"fmt"
"net/http"
"sync"
"strings"
"time"
"github.com/datarhei/core/v16/app"
"github.com/datarhei/core/v16/http/api"
"github.com/datarhei/core/v16/http/handler/util"
"github.com/datarhei/core/v16/iam"
jwtgo "github.com/golang-jwt/jwt/v4"
"github.com/google/uuid"
@ -21,19 +23,13 @@ type Config struct {
Realm string
Secret string
SkipLocalhost bool
IAM iam.IAM
}
// JWT provides access to a JWT provider
type JWT interface {
AddValidator(iss string, issuer Validator) error
ClearValidators()
Validators() []string
// Middleware returns an echo middleware
AccessMiddleware() echo.MiddlewareFunc
RefreshMiddleware() echo.MiddlewareFunc
Middleware() echo.MiddlewareFunc
// LoginHandler is an echo route handler for retrieving a JWT
LoginHandler(c echo.Context) error
@ -43,19 +39,14 @@ type JWT interface {
}
type jwt struct {
realm string
skipLocalhost bool
secret []byte
accessValidFor time.Duration
accessConfig middleware.JWTConfig
accessMiddleware echo.MiddlewareFunc
refreshValidFor time.Duration
refreshConfig middleware.JWTConfig
refreshMiddleware echo.MiddlewareFunc
// Validators is a map of all recognized issuers to their specific validators. The key is the value of
// the "iss" field in the claims. Somewhat required because otherwise the token cannot be verified.
validators map[string]Validator
lock sync.RWMutex
realm string
skipLocalhost bool
secret []byte
accessValidFor time.Duration
refreshValidFor time.Duration
config middleware.JWTConfig
middleware echo.MiddlewareFunc
iam iam.IAM
}
// New returns a new JWT provider
@ -84,7 +75,7 @@ func New(config Config) (JWT, error) {
return false
}
j.accessConfig = middleware.JWTConfig{
j.config = middleware.JWTConfig{
Skipper: skipperFunc,
SigningMethod: middleware.AlgorithmHS256,
ContextKey: "user",
@ -103,104 +94,53 @@ func New(config Config) (JWT, error) {
}
c.Set("user", subject)
},
ParseTokenFunc: j.parseToken("access"),
}
j.refreshConfig = middleware.JWTConfig{
Skipper: skipperFunc,
SigningMethod: middleware.AlgorithmHS256,
ContextKey: "user",
TokenLookup: "header:" + echo.HeaderAuthorization,
AuthScheme: "Bearer",
Claims: jwtgo.MapClaims{},
ErrorHandlerWithContext: j.ErrorHandler,
ParseTokenFunc: j.parseToken("refresh"),
var usefor string
if claims, ok := token.Claims.(jwtgo.MapClaims); ok {
if sub, ok := claims["usefor"]; ok {
usefor = sub.(string)
}
}
c.Set("usefor", usefor)
},
ParseTokenFunc: j.parseToken,
}
return j, nil
}
func (j *jwt) parseToken(use string) func(auth string, c echo.Context) (interface{}, error) {
func (j *jwt) parseToken(auth string, c echo.Context) (interface{}, error) {
keyFunc := func(*jwtgo.Token) (interface{}, error) { return j.secret, nil }
return func(auth string, c echo.Context) (interface{}, error) {
var token *jwtgo.Token
var err error
var token *jwtgo.Token
var err error
token, err = jwtgo.Parse(auth, keyFunc)
if err != nil {
return nil, err
}
if !token.Valid {
return nil, errors.New("invalid token")
}
if _, ok := token.Claims.(jwtgo.MapClaims)["usefor"]; !ok {
return nil, fmt.Errorf("usefor claim is required")
}
claimuse := token.Claims.(jwtgo.MapClaims)["usefor"].(string)
if claimuse != use {
return nil, fmt.Errorf("invalid token claim")
}
return token, nil
}
}
func (j *jwt) Validators() []string {
j.lock.RLock()
defer j.lock.RUnlock()
values := []string{}
for _, v := range j.validators {
values = append(values, v.String())
token, err = jwtgo.Parse(auth, keyFunc)
if err != nil {
return nil, err
}
return values
}
func (j *jwt) AddValidator(iss string, issuer Validator) error {
j.lock.Lock()
defer j.lock.Unlock()
if j.validators == nil {
j.validators = make(map[string]Validator)
if !token.Valid {
return nil, errors.New("invalid token")
}
if _, ok := j.validators[iss]; ok {
return fmt.Errorf("a validator for %s is already registered", iss)
if _, ok := token.Claims.(jwtgo.MapClaims)["sub"]; !ok {
return nil, fmt.Errorf("sub claim is required")
}
j.validators[iss] = issuer
return nil
}
func (j *jwt) ClearValidators() {
j.lock.Lock()
defer j.lock.Unlock()
if j.validators == nil {
return
if _, ok := token.Claims.(jwtgo.MapClaims)["usefor"]; !ok {
return nil, fmt.Errorf("usefor claim is required")
}
for _, v := range j.validators {
v.Cancel()
}
j.validators = nil
return token, nil
}
func (j *jwt) ErrorHandler(err error, c echo.Context) error {
if c.Request().URL.Path == "/api" {
return c.JSON(http.StatusOK, api.MinimalAbout{
App: app.Name,
Auths: j.Validators(),
Auths: []string{},
Version: api.VersionMinimal{
Number: app.Version.MajorString(),
},
@ -210,20 +150,12 @@ func (j *jwt) ErrorHandler(err error, c echo.Context) error {
return api.Err(http.StatusUnauthorized, "Missing or invalid JWT token")
}
func (j *jwt) AccessMiddleware() echo.MiddlewareFunc {
if j.accessMiddleware == nil {
j.accessMiddleware = middleware.JWTWithConfig(j.accessConfig)
func (j *jwt) Middleware() echo.MiddlewareFunc {
if j.middleware == nil {
j.middleware = middleware.JWTWithConfig(j.config)
}
return j.accessMiddleware
}
func (j *jwt) RefreshMiddleware() echo.MiddlewareFunc {
if j.refreshMiddleware == nil {
j.refreshMiddleware = middleware.JWTWithConfig(j.refreshConfig)
}
return j.refreshMiddleware
return j.middleware
}
// LoginHandler returns an access token and a refresh token
@ -239,18 +171,7 @@ func (j *jwt) RefreshMiddleware() echo.MiddlewareFunc {
// @Security Auth0KeyAuth
// @Router /api/login [post]
func (j *jwt) LoginHandler(c echo.Context) error {
var ok bool
var subject string
var err error
j.lock.RLock()
for _, validator := range j.validators {
ok, subject, err = validator.Validate(c)
if ok {
break
}
}
j.lock.RUnlock()
ok, subject, err := j.validateLogin(c)
if ok {
if err != nil {
@ -273,6 +194,79 @@ func (j *jwt) LoginHandler(c echo.Context) error {
})
}
func (j *jwt) validateLogin(c echo.Context) (bool, string, error) {
ok, subject, err := j.validateUserpassLogin(c)
if ok {
return ok, subject, err
}
return j.validateAuth0Login(c)
}
func (j *jwt) validateUserpassLogin(c echo.Context) (bool, string, error) {
var login api.Login
if err := util.ShouldBindJSON(c, &login); err != nil {
return false, "", nil
}
identity, err := j.iam.GetIdentity(login.Username)
if err != nil {
return true, "", fmt.Errorf("invalid username or password")
}
if !identity.VerifyAPIPassword(login.Password) {
return true, "", fmt.Errorf("invalid username or password")
}
return true, identity.Name(), nil
}
func (j *jwt) validateAuth0Login(c echo.Context) (bool, string, error) {
// Look for an Auth header
values := c.Request().Header.Values("Authorization")
prefix := "Bearer "
auth := ""
for _, value := range values {
if !strings.HasPrefix(value, prefix) {
continue
}
auth = value[len(prefix):]
break
}
if len(auth) == 0 {
return false, "", nil
}
p := &jwtgo.Parser{}
token, _, err := p.ParseUnverified(auth, jwtgo.MapClaims{})
if err != nil {
return false, "", nil
}
var subject string
if claims, ok := token.Claims.(jwtgo.MapClaims); ok {
if sub, ok := claims["sub"]; ok {
subject = sub.(string)
}
}
identity, err := j.iam.GetIdentityByAuth0(subject)
if err != nil {
return true, "", fmt.Errorf("invalid token")
}
if !identity.VerifyAPIAuth0(auth) {
return true, "", fmt.Errorf("invalid token")
}
return true, identity.Name(), nil
}
// RefreshHandler returns a new refresh token
// @Summary Retrieve a new access token
// @Description Retrieve a new access token by providing the refresh token
@ -283,12 +277,19 @@ func (j *jwt) LoginHandler(c echo.Context) error {
// @Security ApiRefreshKeyAuth
// @Router /api/login/refresh [get]
func (j *jwt) RefreshHandler(c echo.Context) error {
token, ok := c.Get("user").(*jwtgo.Token)
subject, ok := c.Get("user").(string)
if !ok {
return api.Err(http.StatusForbidden, "Invalid token")
}
subject := token.Claims.(jwtgo.MapClaims)["sub"].(string)
usefor, ok := c.Get("usefor").(string)
if !ok {
return api.Err(http.StatusForbidden, "Invalid token")
}
if usefor != "refresh" {
return api.Err(http.StatusForbidden, "Invalid token")
}
at, _, err := j.createToken(subject)
if err != nil {

View File

@ -1,10 +1,20 @@
package iam
import (
"encoding/base64"
"fmt"
"net/http"
"path/filepath"
"sort"
"strings"
"time"
"github.com/datarhei/core/v16/http/api"
"github.com/datarhei/core/v16/http/handler/util"
"github.com/datarhei/core/v16/iam"
"github.com/datarhei/core/v16/log"
jwtgo "github.com/golang-jwt/jwt/v4"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
)
@ -13,11 +23,19 @@ type Config struct {
// Skipper defines a function to skip middleware.
Skipper middleware.Skipper
IAM iam.IAM
Logger log.Logger
}
var DefaultConfig = Config{
Skipper: middleware.DefaultSkipper,
IAM: nil,
Logger: nil,
}
type iammiddleware struct {
iam iam.IAM
mounts []string
logger log.Logger
}
func New() echo.MiddlewareFunc {
@ -29,32 +47,340 @@ func NewWithConfig(config Config) echo.MiddlewareFunc {
config.Skipper = DefaultConfig.Skipper
}
mw := iammiddleware{
iam: config.IAM,
mounts: []string{"/", "/memfs"},
logger: config.Logger,
}
// Sort the mounts from longest to shortest
sort.Slice(mw.mounts, func(i, j int) bool {
if len(mw.mounts[i]) > len(mw.mounts[j]) {
return true
}
return false
})
return func(next echo.HandlerFunc) echo.HandlerFunc {
return func(c echo.Context) error {
if config.Skipper(c) {
c.Set("user", "$anon")
return next(c)
}
if config.IAM == nil {
return next(c)
return api.Err(http.StatusForbidden, "Forbidden", "IAM is not provided")
}
user := c.Get("user").(string)
if len(user) == 0 {
user = "$anon"
var identity iam.IdentityVerifier = nil
var err error
resource := c.Request().URL.Path
var domain string
if resource == "/api" || strings.HasPrefix(resource, "/api/") {
if resource == "/api/login" {
identity, err = mw.findIdentityFromUserpass(c)
if err != nil {
time.Sleep(5 * time.Second)
return api.Err(http.StatusForbidden, "Forbidden", "%s", err)
}
if identity == nil {
identity, err = mw.findIdentityFromAuth0(c)
if err != nil {
time.Sleep(5 * time.Second)
return api.Err(http.StatusForbidden, "Forbidden", "%s", err)
}
}
} else {
identity, err = mw.findIdentityFromJWT(c)
if err != nil {
return api.Err(http.StatusForbidden, "Forbidden", "%s", err)
}
if identity != nil {
if resource == "/api/login/refresh" {
usefor, _ := c.Get("usefor").(string)
if usefor != "refresh" {
time.Sleep(5 * time.Second)
return api.Err(http.StatusForbidden, "Forbidden", "invalid token")
}
} else {
usefor, _ := c.Get("usefor").(string)
if usefor != "access" {
time.Sleep(5 * time.Second)
return api.Err(http.StatusForbidden, "Forbidden", "invalid token")
}
}
}
}
domain = c.QueryParam("group")
resource = "api:" + resource
} else {
identity, err = mw.findIdentityFromBasicAuth(c)
if err != nil {
return api.Err(http.StatusForbidden, "Bad request", "%s", err)
}
domain = mw.findDomainFromFilesystem(resource)
resource = "fs:" + resource
}
domain := c.QueryParam("group")
username := "$anon"
if identity != nil {
username = identity.Name()
}
c.Set("user", username)
if identity != nil && identity.IsSuperuser() {
username = "$superuser"
}
if len(domain) == 0 {
domain = "$none"
}
resource := c.Request().URL.Path
action := c.Request().Method
if !config.IAM.Enforce(user, domain, resource, action) {
return echo.NewHTTPError(http.StatusForbidden)
l := mw.logger.Debug().WithFields(log.Fields{
"subject": username,
"domain": domain,
"resource": resource,
"action": action,
})
if ok, rule := config.IAM.Enforce(username, domain, resource, action); !ok {
l.Log("access denied")
return api.Err(http.StatusForbidden, "Forbidden", "access denied")
} else {
l.Log(rule)
}
return next(c)
}
}
}
func (m *iammiddleware) findIdentityFromBasicAuth(c echo.Context) (iam.IdentityVerifier, error) {
basic := "basic"
auth := c.Request().Header.Get(echo.HeaderAuthorization)
l := len(basic)
if len(auth) == 0 {
return nil, nil
}
var username string
var password string
if len(auth) > l+1 && strings.EqualFold(auth[:l], basic) {
// Invalid base64 shouldn't be treated as error
// instead should be treated as invalid client input
b, err := base64.StdEncoding.DecodeString(auth[l+1:])
if err != nil {
return nil, err
}
cred := string(b)
for i := 0; i < len(cred); i++ {
if cred[i] == ':' {
username, password = cred[:i], cred[i+1:]
break
}
}
}
identity, err := m.iam.GetIdentity(username)
if err != nil {
m.logger.Debug().WithFields(log.Fields{
"path": c.Request().URL.Path,
"method": c.Request().Method,
}).WithError(err).Log("identity not found")
return nil, fmt.Errorf("invalid username or password")
}
if ok, err := identity.VerifyAPIPassword(password); !ok {
m.logger.Debug().WithFields(log.Fields{
"path": c.Request().URL.Path,
"method": c.Request().Method,
}).WithError(err).Log("wrong password")
return nil, fmt.Errorf("invalid username or password")
}
return identity, nil
}
func (m *iammiddleware) findIdentityFromJWT(c echo.Context) (iam.IdentityVerifier, error) {
// Look for an Auth header
values := c.Request().Header.Values("Authorization")
prefix := "Bearer "
auth := ""
for _, value := range values {
if !strings.HasPrefix(value, prefix) {
continue
}
auth = value[len(prefix):]
break
}
if len(auth) == 0 {
return nil, nil
}
p := &jwtgo.Parser{}
token, _, err := p.ParseUnverified(auth, jwtgo.MapClaims{})
if err != nil {
m.logger.Debug().WithFields(log.Fields{
"path": c.Request().URL.Path,
"method": c.Request().Method,
}).WithError(err).Log("identity not found")
return nil, err
}
var subject string
if claims, ok := token.Claims.(jwtgo.MapClaims); ok {
if sub, ok := claims["sub"]; ok {
subject = sub.(string)
}
}
var usefor string
if claims, ok := token.Claims.(jwtgo.MapClaims); ok {
if sub, ok := claims["usefor"]; ok {
usefor = sub.(string)
}
}
identity, err := m.iam.GetIdentity(subject)
if err != nil {
m.logger.Debug().WithFields(log.Fields{
"path": c.Request().URL.Path,
"method": c.Request().Method,
}).WithError(err).Log("identity not found")
return nil, fmt.Errorf("invalid token")
}
if ok, err := identity.VerifyJWT(auth); !ok {
m.logger.Debug().WithFields(log.Fields{
"path": c.Request().URL.Path,
"method": c.Request().Method,
}).WithError(err).Log("identity not found")
return nil, fmt.Errorf("invalid token")
}
c.Set("usefor", usefor)
return identity, nil
}
func (m *iammiddleware) findIdentityFromUserpass(c echo.Context) (iam.IdentityVerifier, error) {
var login api.Login
if err := util.ShouldBindJSON(c, &login); err != nil {
return nil, nil
}
identity, err := m.iam.GetIdentity(login.Username)
if err != nil {
m.logger.Debug().WithFields(log.Fields{
"path": c.Request().URL.Path,
"method": c.Request().Method,
}).WithError(err).Log("identity not found")
return nil, fmt.Errorf("invalid username or password")
}
if ok, err := identity.VerifyAPIPassword(login.Password); !ok {
m.logger.Debug().WithFields(log.Fields{
"path": c.Request().URL.Path,
"method": c.Request().Method,
}).WithError(err).Log("identity not found")
return nil, fmt.Errorf("invalid username or password")
}
return identity, nil
}
func (m *iammiddleware) findIdentityFromAuth0(c echo.Context) (iam.IdentityVerifier, error) {
// Look for an Auth header
values := c.Request().Header.Values("Authorization")
prefix := "Bearer "
auth := ""
for _, value := range values {
if !strings.HasPrefix(value, prefix) {
continue
}
auth = value[len(prefix):]
break
}
if len(auth) == 0 {
return nil, nil
}
p := &jwtgo.Parser{}
token, _, err := p.ParseUnverified(auth, jwtgo.MapClaims{})
if err != nil {
m.logger.Debug().WithFields(log.Fields{
"path": c.Request().URL.Path,
"method": c.Request().Method,
}).WithError(err).Log("identity not found")
return nil, nil
}
var subject string
if claims, ok := token.Claims.(jwtgo.MapClaims); ok {
if sub, ok := claims["sub"]; ok {
subject = sub.(string)
}
}
identity, err := m.iam.GetIdentityByAuth0(subject)
if err != nil {
m.logger.Debug().WithFields(log.Fields{
"path": c.Request().URL.Path,
"method": c.Request().Method,
}).WithError(err).Log("identity not found")
return nil, fmt.Errorf("invalid token")
}
if ok, err := identity.VerifyAPIAuth0(auth); !ok {
m.logger.Debug().WithFields(log.Fields{
"path": c.Request().URL.Path,
"method": c.Request().Method,
}).WithError(err).Log("identity not found")
return nil, fmt.Errorf("invalid token")
}
return identity, nil
}
func (m *iammiddleware) findDomainFromFilesystem(path string) string {
path = filepath.Clean(path)
// Longest prefix search. The slice is assumed to be sorted accordingly.
// Assume path is /memfs/foobar/file.txt
// The longest prefix that matches is /memfs/
// Remove it from the path and split it into components: foobar file.txt
// Check if foobar a known domain. If yes, return it. If not, return empty domain.
for _, mount := range m.mounts {
prefix := filepath.Join(mount, "/")
if strings.HasPrefix(path, prefix) {
elements := strings.Split(strings.TrimPrefix(path, prefix), "/")
if m.iam.IsDomain(elements[0]) {
return elements[0]
}
}
}
return ""
}

View File

@ -40,7 +40,6 @@ import (
"github.com/datarhei/core/v16/http/graph/resolver"
"github.com/datarhei/core/v16/http/handler"
api "github.com/datarhei/core/v16/http/handler/api"
"github.com/datarhei/core/v16/http/jwt"
"github.com/datarhei/core/v16/http/router"
"github.com/datarhei/core/v16/http/validator"
"github.com/datarhei/core/v16/iam"
@ -57,6 +56,7 @@ import (
mwcors "github.com/datarhei/core/v16/http/middleware/cors"
mwgzip "github.com/datarhei/core/v16/http/middleware/gzip"
mwhlsrewrite "github.com/datarhei/core/v16/http/middleware/hlsrewrite"
mwiam "github.com/datarhei/core/v16/http/middleware/iam"
mwiplimit "github.com/datarhei/core/v16/http/middleware/iplimit"
mwlog "github.com/datarhei/core/v16/http/middleware/log"
mwmime "github.com/datarhei/core/v16/http/middleware/mime"
@ -87,7 +87,6 @@ type Config struct {
Cors CorsConfig
RTMP rtmp.Server
SRT srt.Server
JWT jwt.JWT
Config cfgstore.Store
Cache cache.Cacher
Sessions session.RegistryReader
@ -113,7 +112,7 @@ type server struct {
profiling *handler.ProfilingHandler
ping *handler.PingHandler
graph *api.GraphHandler
jwt jwt.JWT
jwt *api.JWTHandler
}
v3handler struct {
@ -137,6 +136,7 @@ type server struct {
cache echo.MiddlewareFunc
session echo.MiddlewareFunc
hlsrewrite echo.MiddlewareFunc
iam echo.MiddlewareFunc
}
gzip struct {
@ -210,20 +210,20 @@ func NewServer(config Config) (Server, error) {
}
if config.Logger == nil {
s.logger = log.New("HTTP")
s.logger = log.New("")
}
if config.JWT == nil {
s.handler.about = api.NewAbout(
config.Restream,
[]string{},
)
} else {
s.handler.about = api.NewAbout(
config.Restream,
config.JWT.Validators(),
)
}
s.middleware.iam = mwiam.NewWithConfig(mwiam.Config{
IAM: config.IAM,
Logger: s.logger.WithComponent("IAM"),
})
s.handler.about = api.NewAbout(
config.Restream,
config.IAM.Validators(),
)
s.handler.jwt = api.NewJWT(config.IAM)
s.v3handler.log = api.NewLog(
config.LogBuffer,
@ -275,12 +275,6 @@ func NewServer(config Config) (Server, error) {
)
}
if config.JWT != nil {
s.handler.jwt = config.JWT
s.middleware.accessJWT = config.JWT.AccessMiddleware()
s.middleware.refreshJWT = config.JWT.RefreshMiddleware()
}
if config.Sessions == nil {
config.Sessions, _ = session.New(session.Config{})
}
@ -354,6 +348,8 @@ func NewServer(config Config) (Server, error) {
s.router.Use(s.middleware.cors)
}
s.router.Use(s.middleware.iam)
// Add static routes
if path, target := config.Router.StaticRoute(); len(target) != 0 {
group := s.router.Group(path)
@ -416,14 +412,9 @@ func (s *server) setRoutes() {
api.Use(s.middleware.iplimit)
}
if s.middleware.accessJWT != nil {
// Enable JWT auth
api.Use(s.middleware.accessJWT)
// The login endpoint should not be blocked by auth
s.router.POST("/api/login", s.handler.jwt.LoginHandler)
s.router.GET("/api/login/refresh", s.handler.jwt.RefreshHandler, s.middleware.refreshJWT)
}
// The login endpoint should not be blocked by auth
s.router.POST("/api/login", s.handler.jwt.Login)
s.router.GET("/api/login/refresh", s.handler.jwt.Refresh)
api.GET("", s.handler.about.About)
@ -467,23 +458,9 @@ func (s *server) setRoutes() {
fs.HEAD("", filesystem.handler.GetFile)
if filesystem.AllowWrite {
if filesystem.EnableAuth {
authmw := middleware.BasicAuth(func(username, password string, c echo.Context) (bool, error) {
if username == filesystem.Username && password == filesystem.Password {
return true, nil
}
return false, nil
})
fs.POST("", filesystem.handler.PutFile, authmw)
fs.PUT("", filesystem.handler.PutFile, authmw)
fs.DELETE("", filesystem.handler.DeleteFile, authmw)
} else {
fs.POST("", filesystem.handler.PutFile)
fs.PUT("", filesystem.handler.PutFile)
fs.DELETE("", filesystem.handler.DeleteFile)
}
fs.POST("", filesystem.handler.PutFile)
fs.PUT("", filesystem.handler.PutFile)
fs.DELETE("", filesystem.handler.DeleteFile)
}
}
@ -522,10 +499,6 @@ func (s *server) setRoutes() {
// APIv3 router group
v3 := api.Group("/v3")
if s.handler.jwt != nil {
v3.Use(s.middleware.accessJWT)
}
v3.Use(gzipMiddleware)
s.setRoutesV3(v3)

View File

@ -1,6 +1,8 @@
package iam
import (
"strings"
"github.com/datarhei/core/v16/io/fs"
"github.com/casbin/casbin/v2"
@ -8,7 +10,7 @@ import (
)
type AccessEnforcer interface {
Enforce(name, domain, resource, action string) bool
Enforce(name, domain, resource, action string) (bool, string)
}
type AccessManager interface {
@ -51,8 +53,8 @@ func NewAccessManager(fs fs.Filesystem) (AccessManager, error) {
}
func (am *access) AddPolicy() {}
func (am *access) Enforce(name, domain, resource, action string) bool {
ok, _, _ := am.enforcer.EnforceEx(name, domain, resource, action)
func (am *access) Enforce(name, domain, resource, action string) (bool, string) {
ok, rule, _ := am.enforcer.EnforceEx(name, domain, resource, action)
return ok
return ok, strings.Join(rule, ", ")
}

View File

@ -23,7 +23,7 @@ type adapter struct {
}
func newAdapter(fs fs.Filesystem, filePath string) persist.Adapter {
return &adapter{filePath: filePath}
return &adapter{filePath: filePath, fs: fs}
}
// Adapter
@ -111,6 +111,8 @@ func (a *adapter) importPolicy(model model.Model, rule []string) error {
copiedRule := make([]string, len(rule))
copy(copiedRule, rule)
fmt.Printf("%+v\n", copiedRule)
ok, err := model.HasPolicyEx(copiedRule[0], copiedRule[0], copiedRule[1:])
if err != nil {
return err

View File

@ -3,10 +3,16 @@ package iam
import "github.com/datarhei/core/v16/io/fs"
type IAM interface {
Enforce(user, domain, resource, action string) bool
Enforce(user, domain, resource, action string) (bool, string)
IsDomain(domain string) bool
Validators() []string
GetIdentity(name string) (IdentityVerifier, error)
GetIdentityByAuth0(name string) (IdentityVerifier, error)
GetDefaultIdentity() (IdentityVerifier, error)
CreateJWT(name string) (string, string, error)
Close()
}
@ -16,13 +22,23 @@ type iam struct {
am AccessManager
}
func NewIAM(fs fs.Filesystem, superuser User) (IAM, error) {
im, err := NewIdentityManager(fs, superuser)
type Config struct {
FS fs.Filesystem
Superuser User
JWTSecret string
}
func NewIAM(config Config) (IAM, error) {
im, err := NewIdentityManager(IdentityConfig{
FS: config.FS,
Superuser: config.Superuser,
JWTSecret: config.JWTSecret,
})
if err != nil {
return nil, err
}
am, err := NewAccessManager(fs)
am, err := NewAccessManager(config.FS)
if err != nil {
return nil, err
}
@ -42,7 +58,7 @@ func (i *iam) Close() {
return
}
func (i *iam) Enforce(user, domain, resource, action string) bool {
func (i *iam) Enforce(user, domain, resource, action string) (bool, string) {
return i.am.Enforce(user, domain, resource, action)
}
@ -53,3 +69,19 @@ func (i *iam) GetIdentity(name string) (IdentityVerifier, error) {
func (i *iam) GetIdentityByAuth0(name string) (IdentityVerifier, error) {
return i.im.GetVerifierByAuth0(name)
}
func (i *iam) GetDefaultIdentity() (IdentityVerifier, error) {
return i.im.GetDefaultVerifier()
}
func (i *iam) CreateJWT(name string) (string, string, error) {
return i.im.CreateJWT(name)
}
func (i *iam) IsDomain(domain string) bool {
return false
}
func (i *iam) Validators() []string {
return i.im.Validators()
}

View File

@ -6,9 +6,11 @@ import (
"os"
"regexp"
"sync"
"time"
"github.com/datarhei/core/v16/iam/jwks"
"github.com/datarhei/core/v16/io/fs"
"github.com/google/uuid"
jwtgo "github.com/golang-jwt/jwt/v4"
)
@ -86,7 +88,8 @@ func (u *User) marshalIdentity() *identity {
type identity struct {
user User
tenant *auth0Tenant
tenant *auth0Tenant
jwtKeyFunc func(*jwtgo.Token) (interface{}, error)
valid bool
@ -97,37 +100,37 @@ func (i *identity) Name() string {
return i.user.Name
}
func (i *identity) VerifyAPIPassword(password string) bool {
func (i *identity) VerifyAPIPassword(password string) (bool, error) {
i.lock.RLock()
defer i.lock.RUnlock()
if !i.isValid() {
return false
return false, fmt.Errorf("invalid identity")
}
if !i.user.Auth.API.Userpass.Enable {
return false
return false, fmt.Errorf("authentication method disabled")
}
return i.user.Auth.API.Userpass.Password == password
return i.user.Auth.API.Userpass.Password == password, nil
}
func (i *identity) VerifyAPIAuth0(jwt string) bool {
func (i *identity) VerifyAPIAuth0(jwt string) (bool, error) {
i.lock.RLock()
defer i.lock.RUnlock()
if !i.isValid() {
return false
return false, fmt.Errorf("invalid identity")
}
if !i.user.Auth.API.Auth0.Enable {
return false
return false, fmt.Errorf("authentication method disabled")
}
p := &jwtgo.Parser{}
token, _, err := p.ParseUnverified(jwt, jwtgo.MapClaims{})
if err != nil {
return false
return false, err
}
var subject string
@ -138,7 +141,7 @@ func (i *identity) VerifyAPIAuth0(jwt string) bool {
}
if subject != i.user.Auth.API.Auth0.User {
return false
return false, fmt.Errorf("wrong subject")
}
var issuer string
@ -149,19 +152,19 @@ func (i *identity) VerifyAPIAuth0(jwt string) bool {
}
if issuer != i.tenant.issuer {
return false
return false, fmt.Errorf("wrong issuer")
}
token, err = jwtgo.Parse(jwt, i.auth0KeyFunc)
if err != nil {
return false
return false, err
}
if !token.Valid {
return false
return false, fmt.Errorf("invalid token")
}
return true
return true, nil
}
func (i *identity) auth0KeyFunc(token *jwtgo.Token) (interface{}, error) {
@ -214,30 +217,64 @@ func (i *identity) auth0KeyFunc(token *jwtgo.Token) (interface{}, error) {
return publicKey, nil
}
func (i *identity) VerifyServiceBasicAuth(password string) bool {
func (i *identity) VerifyJWT(jwt string) (bool, error) {
p := &jwtgo.Parser{}
token, _, err := p.ParseUnverified(jwt, jwtgo.MapClaims{})
if err != nil {
return false, err
}
var issuer string
if claims, ok := token.Claims.(jwtgo.MapClaims); ok {
if sub, ok := claims["iss"]; ok {
issuer = sub.(string)
}
}
if issuer != "datarhei-core" {
return false, fmt.Errorf("wrong issuer")
}
if token.Method.Alg() != "HS256" {
return false, fmt.Errorf("invalid hashing algorithm")
}
token, err = jwtgo.Parse(jwt, i.jwtKeyFunc)
if err != nil {
return false, err
}
if !token.Valid {
return false, fmt.Errorf("invalid token")
}
return true, nil
}
func (i *identity) VerifyServiceBasicAuth(password string) (bool, error) {
i.lock.RLock()
defer i.lock.RUnlock()
if !i.isValid() {
return false
return false, fmt.Errorf("invalid identity")
}
if !i.user.Auth.Services.Basic.Enable {
return false
return false, fmt.Errorf("authentication method disabled")
}
return i.user.Auth.Services.Basic.Password == password
return i.user.Auth.Services.Basic.Password == password, nil
}
func (i *identity) VerifyServiceToken(token string) bool {
func (i *identity) VerifyServiceToken(token string) (bool, error) {
i.lock.RLock()
defer i.lock.RUnlock()
if !i.isValid() {
return false
return false, fmt.Errorf("invalid identity")
}
return i.user.Auth.Services.Token == token
return i.user.Auth.Services.Token == token, nil
}
func (i *identity) isValid() bool {
@ -254,11 +291,13 @@ func (i *identity) IsSuperuser() bool {
type IdentityVerifier interface {
Name() string
VerifyAPIPassword(password string) bool
VerifyAPIAuth0(jwt string) bool
VerifyJWT(jwt string) (bool, error)
VerifyServiceBasicAuth(password string) bool
VerifyServiceToken(token string) bool
VerifyAPIPassword(password string) (bool, error)
VerifyAPIAuth0(jwt string) (bool, error)
VerifyServiceBasicAuth(password string) (bool, error)
VerifyServiceToken(token string) (bool, error)
IsSuperuser() bool
}
@ -269,56 +308,20 @@ type IdentityManager interface {
Get(name string) (User, error)
GetVerifier(name string) (IdentityVerifier, error)
GetVerifierByAuth0(name string) (IdentityVerifier, error)
GetDefaultVerifier() (IdentityVerifier, error)
Rename(oldname, newname string) error
Update(name string, identity User) error
Validators() []string
CreateJWT(name string) (string, string, error)
Save() error
Close()
}
type Auth0Tenant struct {
Domain string
Audience string
ClientID string
}
func (t *Auth0Tenant) key() string {
return t.Domain + t.Audience
}
type auth0Tenant struct {
domain string
issuer string
audience string
clientIDs []string
certs jwks.JWKS
}
func newAuth0Tenant(tenant Auth0Tenant) (*auth0Tenant, error) {
t := &auth0Tenant{
domain: tenant.Domain,
issuer: "https://" + tenant.Domain + "/",
audience: tenant.Audience,
clientIDs: []string{tenant.ClientID},
certs: nil,
}
url := t.issuer + "/.well-known/jwks.json"
certs, err := jwks.NewFromURL(url, jwks.Config{})
if err != nil {
return nil, err
}
t.certs = certs
return t, nil
}
func (a *auth0Tenant) Cancel() {
a.certs.Cancel()
}
type identityManager struct {
root *identity
identities map[string]*identity
tenants map[string]*auth0Tenant
@ -327,16 +330,25 @@ type identityManager struct {
fs fs.Filesystem
filePath string
jwtSecret []byte
lock sync.RWMutex
}
func NewIdentityManager(fs fs.Filesystem, superuser User) (IdentityManager, error) {
type IdentityConfig struct {
FS fs.Filesystem
Superuser User
JWTSecret string
}
func NewIdentityManager(config IdentityConfig) (IdentityManager, error) {
im := &identityManager{
identities: map[string]*identity{},
tenants: map[string]*auth0Tenant{},
auth0UserIdentityMap: map[string]string{},
fs: fs,
fs: config.FS,
filePath: "./users.json",
jwtSecret: []byte(config.JWTSecret),
}
err := im.load(im.filePath)
@ -344,13 +356,14 @@ func NewIdentityManager(fs fs.Filesystem, superuser User) (IdentityManager, erro
return nil, err
}
if len(im.identities) == 0 {
superuser.Superuser = true
im.Create(superuser)
im.save(im.filePath)
config.Superuser.Superuser = true
identity, err := im.create(config.Superuser)
if err != nil {
return nil, err
}
im.root = identity
return im, nil
}
@ -361,6 +374,7 @@ func (im *identityManager) Close() {
im.fs = nil
im.auth0UserIdentityMap = map[string]string{}
im.identities = map[string]*identity{}
im.root = nil
for _, t := range im.tenants {
t.Cancel()
@ -384,11 +398,22 @@ func (im *identityManager) Create(u User) error {
return fmt.Errorf("identity already exists")
}
identity, err := im.create(u)
if err != nil {
return err
}
im.identities[identity.user.Name] = identity
return nil
}
func (im *identityManager) create(u User) (*identity, error) {
identity := u.marshalIdentity()
if identity.user.Auth.API.Auth0.Enable {
if _, ok := im.auth0UserIdentityMap[identity.user.Auth.API.Auth0.User]; ok {
return fmt.Errorf("the Auth0 user has already an identity")
return nil, fmt.Errorf("the Auth0 user has already an identity")
}
auth0Key := identity.user.Auth.API.Auth0.Tenant.key()
@ -396,17 +421,17 @@ func (im *identityManager) Create(u User) error {
if _, ok := im.tenants[auth0Key]; !ok {
tenant, err := newAuth0Tenant(identity.user.Auth.API.Auth0.Tenant)
if err != nil {
return err
return nil, err
}
im.tenants[auth0Key] = tenant
identity.tenant = tenant
}
}
im.identities[identity.user.Name] = identity
identity.valid = true
return nil
return identity, nil
}
func (im *identityManager) Update(name string, identity User) error {
@ -432,11 +457,21 @@ func (im *identityManager) Remove(name string) error {
}
func (im *identityManager) getIdentity(name string) (*identity, error) {
identity, ok := im.identities[name]
if !ok {
var identity *identity = nil
if im.root.user.Name == name {
identity = im.root
} else {
identity, _ = im.identities[name]
}
if identity == nil {
return nil, fmt.Errorf("not found")
}
identity.jwtKeyFunc = func(*jwtgo.Token) (interface{}, error) { return im.jwtSecret, nil }
return identity, nil
}
@ -471,6 +506,10 @@ func (im *identityManager) GetVerifierByAuth0(name string) (IdentityVerifier, er
return im.getIdentity(name)
}
func (im *identityManager) GetDefaultVerifier() (IdentityVerifier, error) {
return im.root, nil
}
func (im *identityManager) Rename(oldname, newname string) error {
im.lock.Lock()
defer im.lock.Unlock()
@ -554,3 +593,102 @@ func (im *identityManager) save(filePath string) error {
return err
}
func (im *identityManager) Validators() []string {
validators := []string{"localjwt"}
im.lock.RLock()
defer im.lock.RUnlock()
for _, t := range im.tenants {
for _, clientid := range t.clientIDs {
validators = append(validators, fmt.Sprintf("auth0 domain=%s audience=%s clientid=%s", t.domain, t.audience, clientid))
}
}
return validators
}
func (im *identityManager) CreateJWT(name string) (string, string, error) {
now := time.Now()
accessExpires := now.Add(time.Minute * 10)
refreshExpires := now.Add(time.Hour * 24)
// Create access token
accessToken := jwtgo.NewWithClaims(jwtgo.SigningMethodHS256, jwtgo.MapClaims{
"iss": "datarhei-core",
"sub": name,
"usefor": "access",
"iat": now.Unix(),
"exp": accessExpires.Unix(),
"exi": uint64(accessExpires.Sub(now).Seconds()),
"jti": uuid.New().String(),
})
// Generate encoded access token
at, err := accessToken.SignedString(im.jwtSecret)
if err != nil {
return "", "", err
}
// Create refresh token
refreshToken := jwtgo.NewWithClaims(jwtgo.SigningMethodHS256, jwtgo.MapClaims{
"iss": "datarhei-core",
"sub": name,
"usefor": "refresh",
"iat": now.Unix(),
"exp": refreshExpires.Unix(),
"exi": uint64(refreshExpires.Sub(now).Seconds()),
"jti": uuid.New().String(),
})
// Generate encoded refresh token
rt, err := refreshToken.SignedString(im.jwtSecret)
if err != nil {
return "", "", err
}
return at, rt, nil
}
type Auth0Tenant struct {
Domain string
Audience string
ClientID string
}
func (t *Auth0Tenant) key() string {
return t.Domain + t.Audience
}
type auth0Tenant struct {
domain string
issuer string
audience string
clientIDs []string
certs jwks.JWKS
}
func newAuth0Tenant(tenant Auth0Tenant) (*auth0Tenant, error) {
t := &auth0Tenant{
domain: tenant.Domain,
issuer: "https://" + tenant.Domain + "/",
audience: tenant.Audience,
clientIDs: []string{tenant.ClientID},
certs: nil,
}
url := t.issuer + "/.well-known/jwks.json"
certs, err := jwks.NewFromURL(url, jwks.Config{})
if err != nil {
return nil, err
}
t.certs = certs
return t, nil
}
func (a *auth0Tenant) Cancel() {
a.certs.Cancel()
}

View File

@ -231,6 +231,8 @@ type server struct {
// access to the map.
channels map[string]*channel
lock sync.RWMutex
iam iam.IAM
}
// New creates a new RTMP server according to the given config
@ -248,6 +250,7 @@ func New(config Config) (Server, error) {
token: config.Token,
logger: config.Logger,
collector: config.Collector,
iam: config.IAM,
}
if s.collector == nil {
@ -360,23 +363,31 @@ func (s *server) handlePlay(conn *rtmp.Conn) {
defer conn.Close()
playPath := conn.URL.Path
playPath, token := getToken(conn.URL)
// Check the token in the URL if one is required
if len(s.token) != 0 {
path, token := getToken(conn.URL)
identity, err := s.findIdentityFromStreamKey(token)
if err != nil {
s.logger.Debug().WithError(err).Log("no valid identity found")
s.log("PLAY", "FORBIDDEN", playPath, "invalid streamkey ("+token+")", client)
return
}
if len(token) == 0 {
s.log("PLAY", "FORBIDDEN", path, "no streamkey provided", client)
return
}
domain := s.findDomainFromPlaypath(playPath)
resource := "rtmp:" + playPath
if s.token != token {
s.log("PLAY", "FORBIDDEN", path, "invalid streamkey ("+token+")", client)
return
}
l := s.logger.Debug().WithFields(log.Fields{
"name": identity.Name(),
"domain": domain,
"resource": resource,
"action": "PLAY",
})
playPath = path
if ok, rule := s.iam.Enforce(identity.Name(), domain, resource, "PLAY"); !ok {
l.Log("access denied")
s.log("PLAY", "FORBIDDEN", playPath, "invalid streamkey ("+token+")", client)
return
} else {
l.Log(rule)
}
/*
@ -446,23 +457,7 @@ func (s *server) handlePublish(conn *rtmp.Conn) {
defer conn.Close()
playPath := conn.URL.Path
if len(s.token) != 0 {
path, token := getToken(conn.URL)
if len(token) == 0 {
s.log("PLAY", "FORBIDDEN", path, "no streamkey provided", client)
return
}
if s.token != token {
s.log("PLAY", "FORBIDDEN", path, "invalid streamkey ("+token+")", client)
return
}
playPath = path
}
playPath, token := getToken(conn.URL)
// Check the app patch
if !strings.HasPrefix(playPath, s.app) {
@ -470,6 +465,31 @@ func (s *server) handlePublish(conn *rtmp.Conn) {
return
}
identity, err := s.findIdentityFromStreamKey(token)
if err != nil {
s.logger.Debug().WithError(err).Log("no valid identity found")
s.log("PUBLISH", "FORBIDDEN", playPath, "invalid streamkey ("+token+")", client)
return
}
domain := s.findDomainFromPlaypath(playPath)
resource := "rtmp:" + playPath
l := s.logger.Debug().WithFields(log.Fields{
"name": identity.Name(),
"domain": domain,
"resource": resource,
"action": "PUBLISH",
})
if ok, rule := s.iam.Enforce(identity.Name(), domain, "rtmp:"+playPath, "PUBLISH"); !ok {
l.Log("access denied")
s.log("PUBLISH", "FORBIDDEN", playPath, "invalid streamkey ("+token+")", client)
return
} else {
l.Log(rule)
}
// Check the stream if it contains any valid/known streams
streams, _ := conn.Streams()
@ -530,3 +550,42 @@ func (s *server) handlePublish(conn *rtmp.Conn) {
s.log("PUBLISH", "STOP", playPath, "", client)
}
func (s *server) findIdentityFromStreamKey(key string) (iam.IdentityVerifier, error) {
var identity iam.IdentityVerifier
var err error
elements := strings.Split(key, ":")
if len(elements) == 1 {
identity, err = s.iam.GetDefaultIdentity()
} else {
identity, err = s.iam.GetIdentity(elements[0])
}
if err != nil {
return nil, fmt.Errorf("invalid token: %w", err)
}
if ok, err := identity.VerifyServiceToken(elements[1]); !ok {
return nil, fmt.Errorf("invalid token: %w", err)
}
return identity, nil
}
func (s *server) findDomainFromPlaypath(path string) string {
path = strings.TrimPrefix(path, filepath.Join(s.app, "/"))
elements := strings.Split(path, "/")
if len(elements) == 1 {
return ""
}
domain := elements[0]
if s.iam.IsDomain(domain) {
return domain
}
return ""
}