Add experimental challenge forwarder

This commit is contained in:
Ingo Oppermann 2023-06-26 22:16:32 +02:00
parent 7fc58454e4
commit b78a2368b3
No known key found for this signature in database
GPG Key ID: 2AB32426E9DD229E
3 changed files with 111 additions and 38 deletions

View File

@ -5,6 +5,8 @@ import (
"crypto/tls"
"fmt"
"net/http"
"net/http/httputil"
"net/url"
"strings"
"sync"
"time"
@ -124,3 +126,32 @@ func (m *manager) AcquireCertificates(ctx context.Context, listenAddress string,
return certerr
}
func ProxyHTTPChallenge(ctx context.Context, listenAddress string, target *url.URL) error {
proxy := httputil.NewSingleHostReverseProxy(target)
// Start temporary http server on configured port
tempserver := &http.Server{
Addr: listenAddress,
Handler: proxy,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
MaxHeaderBytes: 1 << 20,
}
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
tempserver.ListenAndServe()
wg.Done()
}()
<-ctx.Done()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
tempserver.Shutdown(ctx)
cancel()
return nil
}

View File

@ -330,7 +330,7 @@ func New(ctx context.Context, config Config) (Cluster, error) {
c.logger.Info().Log("Waiting for a leader to be elected ...")
for {
leader := c.raft.Leader()
_, leader := c.raft.Leader()
if len(leader) != 0 {
break
}
@ -369,55 +369,91 @@ func New(ctx context.Context, config Config) (Cluster, error) {
c.logger.Info().Log("Cluster is operational")
if c.isTLSRequired && c.IsRaftLeader() {
names, err := c.getClusterHostnames()
if err != nil {
c.Shutdown()
return nil, fmt.Errorf("failed to assemble list of all configured hostnames: %w", err)
}
if c.isTLSRequired {
if c.IsRaftLeader() {
// Acquire certificates
names, err := c.getClusterHostnames()
if err != nil {
c.Shutdown()
return nil, fmt.Errorf("failed to assemble list of all configured hostnames: %w", err)
}
kvs, err := NewClusterKVS(c)
if err != nil {
c.Shutdown()
return nil, fmt.Errorf("cluster KVS: %w", err)
}
kvs, err := NewClusterKVS(c)
if err != nil {
c.Shutdown()
return nil, fmt.Errorf("cluster KVS: %w", err)
}
storage, err := NewClusterStorage(kvs, "core-cluster-certificates")
if err != nil {
c.Shutdown()
return nil, fmt.Errorf("certificate store: %w", err)
}
storage, err := NewClusterStorage(kvs, "core-cluster-certificates")
if err != nil {
c.Shutdown()
return nil, fmt.Errorf("certificate store: %w", err)
}
manager, err := autocert.New(autocert.Config{
Storage: storage,
DefaultHostname: names[0],
EmailAddress: c.config.TLS.Email,
IsProduction: false,
Logger: c.logger.WithComponent("Let's Encrypt"),
})
if err != nil {
c.Shutdown()
return nil, fmt.Errorf("certificate manager: %w", err)
}
manager, err := autocert.New(autocert.Config{
Storage: storage,
DefaultHostname: names[0],
EmailAddress: c.config.TLS.Email,
IsProduction: false,
Logger: c.logger.WithComponent("Let's Encrypt"),
})
if err != nil {
c.Shutdown()
return nil, fmt.Errorf("certificate manager: %w", err)
}
c.certManager = manager
c.certManager = manager
err = manager.AcquireCertificates(ctx, c.config.Address, names)
if err != nil {
c.Shutdown()
return nil, fmt.Errorf("failed to acquire certificates: %w", err)
err = manager.AcquireCertificates(ctx, c.config.Address, names)
if err != nil {
c.Shutdown()
return nil, fmt.Errorf("failed to acquire certificates: %w", err)
}
}
}
if !c.IsRaftLeader() {
tempctx, cancel := context.WithCancel(context.Background())
if c.isTLSRequired {
// All followers forward any HTTP requests to the leader such that it can respond to the HTTP challenge
leaderAddress, _ := c.raft.Leader()
leader, err := c.CoreAPIAddress(leaderAddress)
if err != nil {
cancel()
c.Shutdown()
return nil, fmt.Errorf("unable to find leader address: %w", err)
}
url, err := url.Parse(leader)
if err != nil {
cancel()
return nil, fmt.Errorf("unable to parse leader address: %w", err)
}
url.Scheme = "http"
url.Path = "/"
url.User = nil
url.RawQuery = ""
go func() {
c.logger.Info().WithField("leader", url.String()).Log("Forwarding ACME challenges to leader")
autocert.ProxyHTTPChallenge(tempctx, c.config.Address, url)
c.logger.Info().WithField("leader", url.String()).Log("Stopped forwarding ACME challenges to leader")
}()
}
for {
// Ask leader if it is ready
err := c.IsReady("")
if err == nil {
cancel()
break
}
select {
case <-ctx.Done():
cancel()
c.Shutdown()
return nil, fmt.Errorf("starting cluster has been aborted: %w", ctx.Err())
default:

View File

@ -24,8 +24,14 @@ import (
type Raft interface {
Shutdown()
// IsLeader returns whether this node is the leader.
IsLeader() bool
Leader() string
// Leader returns the address and ID of the current leader.
Leader() (string, string)
// Servers returns the list of servers in the cluster.
Servers() ([]Server, error)
Stats() Stats
Apply([]byte) error
@ -156,10 +162,10 @@ func (r *raft) IsLeader() bool {
return r.isLeader
}
func (r *raft) Leader() string {
_, leaderID := r.raft.LeaderWithID()
func (r *raft) Leader() (string, string) {
leaderAddress, leaderID := r.raft.LeaderWithID()
return string(leaderID)
return string(leaderAddress), string(leaderID)
}
func (r *raft) Servers() ([]Server, error) {
@ -168,7 +174,7 @@ func (r *raft) Servers() ([]Server, error) {
return nil, fmt.Errorf("failed to get raft configuration: %w", err)
}
leaderID := r.Leader()
_, leaderID := r.Leader()
servers := []Server{}