From b78a2368b3e20d522477471bd9ba3c2d15430181 Mon Sep 17 00:00:00 2001 From: Ingo Oppermann Date: Mon, 26 Jun 2023 22:16:32 +0200 Subject: [PATCH] Add experimental challenge forwarder --- autocert/autocert.go | 31 +++++++++++++ cluster/cluster.go | 102 +++++++++++++++++++++++++++++-------------- cluster/raft/raft.go | 16 ++++--- 3 files changed, 111 insertions(+), 38 deletions(-) diff --git a/autocert/autocert.go b/autocert/autocert.go index 87dac54b..1ab8fe8c 100644 --- a/autocert/autocert.go +++ b/autocert/autocert.go @@ -5,6 +5,8 @@ import ( "crypto/tls" "fmt" "net/http" + "net/http/httputil" + "net/url" "strings" "sync" "time" @@ -124,3 +126,32 @@ func (m *manager) AcquireCertificates(ctx context.Context, listenAddress string, return certerr } + +func ProxyHTTPChallenge(ctx context.Context, listenAddress string, target *url.URL) error { + proxy := httputil.NewSingleHostReverseProxy(target) + + // Start temporary http server on configured port + tempserver := &http.Server{ + Addr: listenAddress, + Handler: proxy, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + MaxHeaderBytes: 1 << 20, + } + + wg := sync.WaitGroup{} + wg.Add(1) + + go func() { + tempserver.ListenAndServe() + wg.Done() + }() + + <-ctx.Done() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + tempserver.Shutdown(ctx) + cancel() + + return nil +} diff --git a/cluster/cluster.go b/cluster/cluster.go index ff72703c..02400087 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -330,7 +330,7 @@ func New(ctx context.Context, config Config) (Cluster, error) { c.logger.Info().Log("Waiting for a leader to be elected ...") for { - leader := c.raft.Leader() + _, leader := c.raft.Leader() if len(leader) != 0 { break } @@ -369,55 +369,91 @@ func New(ctx context.Context, config Config) (Cluster, error) { c.logger.Info().Log("Cluster is operational") - if c.isTLSRequired && c.IsRaftLeader() { - names, err := c.getClusterHostnames() - if err != nil { - c.Shutdown() - return nil, fmt.Errorf("failed to assemble list of all configured hostnames: %w", err) - } + if c.isTLSRequired { + if c.IsRaftLeader() { + // Acquire certificates + names, err := c.getClusterHostnames() + if err != nil { + c.Shutdown() + return nil, fmt.Errorf("failed to assemble list of all configured hostnames: %w", err) + } - kvs, err := NewClusterKVS(c) - if err != nil { - c.Shutdown() - return nil, fmt.Errorf("cluster KVS: %w", err) - } + kvs, err := NewClusterKVS(c) + if err != nil { + c.Shutdown() + return nil, fmt.Errorf("cluster KVS: %w", err) + } - storage, err := NewClusterStorage(kvs, "core-cluster-certificates") - if err != nil { - c.Shutdown() - return nil, fmt.Errorf("certificate store: %w", err) - } + storage, err := NewClusterStorage(kvs, "core-cluster-certificates") + if err != nil { + c.Shutdown() + return nil, fmt.Errorf("certificate store: %w", err) + } - manager, err := autocert.New(autocert.Config{ - Storage: storage, - DefaultHostname: names[0], - EmailAddress: c.config.TLS.Email, - IsProduction: false, - Logger: c.logger.WithComponent("Let's Encrypt"), - }) - if err != nil { - c.Shutdown() - return nil, fmt.Errorf("certificate manager: %w", err) - } + manager, err := autocert.New(autocert.Config{ + Storage: storage, + DefaultHostname: names[0], + EmailAddress: c.config.TLS.Email, + IsProduction: false, + Logger: c.logger.WithComponent("Let's Encrypt"), + }) + if err != nil { + c.Shutdown() + return nil, fmt.Errorf("certificate manager: %w", err) + } - c.certManager = manager + c.certManager = manager - err = manager.AcquireCertificates(ctx, c.config.Address, names) - if err != nil { - c.Shutdown() - return nil, fmt.Errorf("failed to acquire certificates: %w", err) + err = manager.AcquireCertificates(ctx, c.config.Address, names) + if err != nil { + c.Shutdown() + return nil, fmt.Errorf("failed to acquire certificates: %w", err) + } } } if !c.IsRaftLeader() { + tempctx, cancel := context.WithCancel(context.Background()) + + if c.isTLSRequired { + // All followers forward any HTTP requests to the leader such that it can respond to the HTTP challenge + leaderAddress, _ := c.raft.Leader() + leader, err := c.CoreAPIAddress(leaderAddress) + if err != nil { + cancel() + c.Shutdown() + return nil, fmt.Errorf("unable to find leader address: %w", err) + } + + url, err := url.Parse(leader) + if err != nil { + cancel() + return nil, fmt.Errorf("unable to parse leader address: %w", err) + } + + url.Scheme = "http" + url.Path = "/" + url.User = nil + url.RawQuery = "" + + go func() { + c.logger.Info().WithField("leader", url.String()).Log("Forwarding ACME challenges to leader") + autocert.ProxyHTTPChallenge(tempctx, c.config.Address, url) + c.logger.Info().WithField("leader", url.String()).Log("Stopped forwarding ACME challenges to leader") + }() + } + for { + // Ask leader if it is ready err := c.IsReady("") if err == nil { + cancel() break } select { case <-ctx.Done(): + cancel() c.Shutdown() return nil, fmt.Errorf("starting cluster has been aborted: %w", ctx.Err()) default: diff --git a/cluster/raft/raft.go b/cluster/raft/raft.go index eed1bba3..36380f9d 100644 --- a/cluster/raft/raft.go +++ b/cluster/raft/raft.go @@ -24,8 +24,14 @@ import ( type Raft interface { Shutdown() + + // IsLeader returns whether this node is the leader. IsLeader() bool - Leader() string + + // Leader returns the address and ID of the current leader. + Leader() (string, string) + + // Servers returns the list of servers in the cluster. Servers() ([]Server, error) Stats() Stats Apply([]byte) error @@ -156,10 +162,10 @@ func (r *raft) IsLeader() bool { return r.isLeader } -func (r *raft) Leader() string { - _, leaderID := r.raft.LeaderWithID() +func (r *raft) Leader() (string, string) { + leaderAddress, leaderID := r.raft.LeaderWithID() - return string(leaderID) + return string(leaderAddress), string(leaderID) } func (r *raft) Servers() ([]Server, error) { @@ -168,7 +174,7 @@ func (r *raft) Servers() ([]Server, error) { return nil, fmt.Errorf("failed to get raft configuration: %w", err) } - leaderID := r.Leader() + _, leaderID := r.Leader() servers := []Server{}