Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
.PHONY: all build-sidecar build-controller gen-certs
COMPONENTS := sidecar controller loadbalancer

all: build-sidecar build-controller gen-certs
.PHONY: all gen-certs test test-race
.DEFAULT_GOAL := all
all: $(addprefix build-,$(COMPONENTS))

build-sidecar:
@echo "Building WebSocket Proxy Sidecar..."
./scripts/build-sidecar.sh

build-controller:
@echo "Building WebSocket Operator Controller..."
./scripts/build-controller.sh

build-loadbalancer:
@echo "Building WebSocket Operator LoadBalancer..."
./scripts/build-loadbalancer.sh
build-%:
@echo "Building WebSocket $*"
COMPONENT="$*" ./scripts/build.sh

push-%:
@echo "Build and Push Image $*"
COMPONENT="$*" PUSH="true" ./scripts/build.sh

push: $(addprefix push-,$(COMPONENTS))
build: $(addprefix build-,$(COMPONENTS))

run-%:
go run "./cmd/$*/"

test-server-push:
docker build -f deployments/local/Dockerfile -t lukas8219/websocket-operator-test-server:latest
docker push -t lukas8219/websocket-operator-test-server:latest

gen-certs:
@echo "Generating TLS certificates..."
Expand All @@ -22,6 +31,9 @@ test:
@echo "Running tests..."
go test -v ./...

integration:
go test -v ./integration_tests/...

test-race:
@echo "Running tests with race detector..."
go test -race -v ./...
2 changes: 1 addition & 1 deletion cmd/loadbalancer/connection/downstream_proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func (p *WSProxier) ProxyDownstreamToUpstream() (net.Conn, error) {
upstreamCancelChan := p.tracker.UpstreamCancelChan()
downstreamConn := p.tracker.DownstreamConn()

proxiedConn, _, _, err := p.dialer.Dial(context.Background(), "ws://"+host)
proxiedConn, _, _, err := p.Dial(context.Background(), "ws://"+host)
if err != nil {
p.tracker.Error("Failed to dial upstream", "error", err)
return nil, err
Expand Down
18 changes: 18 additions & 0 deletions cmd/loadbalancer/connection/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/gobwas/ws"
)

const MAX_RETRIES = 5

// Proxier manages bidirectional proxying of connections
type Proxier interface {
ProxyUpstreamToDownstream()
Expand All @@ -33,6 +35,22 @@ func NewWSProxier(tracker *Tracker, dialer WSDialer) *WSProxier {
}
}

func (w *WSProxier) Dial(ctx context.Context, urlstr string) (net.Conn, *bufio.Reader, ws.Handshake, error) {
return w.dialWithRetry(nil, 0, ctx, urlstr)
}

// TODO remove this. I've added this because while testing, it seems like the Kubernetes reconcile was faster than the API was ready to receive requests
func (w *WSProxier) dialWithRetry(lastError error, retry uint, ctx context.Context, urlstr string) (net.Conn, *bufio.Reader, ws.Handshake, error) {
if retry == MAX_RETRIES {
return nil, nil, ws.Handshake{}, lastError
}
conn, reader, ws, error := w.dialer.Dial(ctx, urlstr)
if error != nil {
return w.dialWithRetry(error, retry+1, ctx, urlstr)
}
return conn, reader, ws, nil
}

func (p *WSProxier) Close() {
upstreamConn := p.tracker.UpstreamConn()
downstreamConn := p.tracker.DownstreamConn()
Expand Down
26 changes: 14 additions & 12 deletions cmd/loadbalancer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,26 @@ package main
import (
"flag"
"lukas8219/websocket-operator/cmd/loadbalancer/server"
"lukas8219/websocket-operator/internal/consistent_hashing"
"lukas8219/websocket-operator/internal/logger"
"lukas8219/websocket-operator/internal/route"
)

var (
router route.RouterImpl
"lukas8219/websocket-operator/internal/peer_discovery"
"lukas8219/websocket-operator/internal/resolver"
)

func main() {
port := flag.String("port", "3000", "Port to listen on")
mode := flag.String("mode", "kubernetes", "Mode to use")
debug := flag.Bool("debug", false, "Debug mode")
port := flag.String("port", "8080", "Port to listen on")
// mode := flag.String("mode", "kubernetes", "Mode to use")
debug := flag.Bool("debug", true, "Debug mode")
flag.Parse()
logger.SetupLogger(*debug)
router = route.NewRouter(route.RouterConfig{Mode: route.RouterConfigMode(*mode)})
router.InitializeHosts()
peerDiscovery := peer_discovery.NewKubernetes("default", "ws-proxy-headless")
resolver := resolver.New(
peerDiscovery,
consistent_hashing.NewJumpHash(peerDiscovery),
)
go resolver.Init()
server.StartServer(server.ServerConfig{
Router: router,
Port: *port,
Resolver: &resolver,
Port: *port,
})
}
17 changes: 8 additions & 9 deletions cmd/loadbalancer/server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,20 @@ package server
import (
"log/slog"
"lukas8219/websocket-operator/cmd/loadbalancer/connection"
"lukas8219/websocket-operator/internal/route"
"lukas8219/websocket-operator/internal/resolver"
"net/http"
"os"

"github.com/gobwas/ws"
)

func createHandler(router route.RouterImpl, connections map[string]*connection.Connection) http.HandlerFunc {
func createHandler(rslv *resolver.Resolver, connections map[string]*connection.Connection) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
handleConnection(router, connections, w, r)
handleConnection(*rslv, connections, w, r)
}
}

func handleConnection(router route.RouterImpl, connections map[string]*connection.Connection, w http.ResponseWriter, r *http.Request) {
func handleConnection(rslv resolver.Resolver, connections map[string]*connection.Connection, w http.ResponseWriter, r *http.Request) {
user := r.Header.Get("ws-user-id")
if user == "" {
slog.Error("No user id provided")
Expand All @@ -26,9 +26,8 @@ func handleConnection(router route.RouterImpl, connections map[string]*connectio
//TODO: we should only accept `NewConnection` already with client connection and host set.`
//As only the `connection` pkg should alter it`.

host := router.Route(user)
slog.With("user", user).Debug("New connection")
if host == "" {
host, err := rslv.Lookup([]byte(user))
if err != nil {
slog.Error("No host found for user")
w.WriteHeader(http.StatusBadRequest)
return
Expand All @@ -38,7 +37,7 @@ func handleConnection(router route.RouterImpl, connections map[string]*connectio
upgrader := ws.HTTPUpgrader{
Header: http.Header{
"x-ws-operator-proxy-instance": []string{os.Getenv("HOSTNAME")},
"x-ws-operator-upstream-host": []string{host},
"x-ws-operator-upstream-host": []string{host.Hostname()},
},
}
downstreamConn, _, _, err := upgrader.Upgrade(r, w)
Expand All @@ -49,7 +48,7 @@ func handleConnection(router route.RouterImpl, connections map[string]*connectio
return
}

proxiedConnection := connection.NewConnection(user, host, downstreamConn.RemoteAddr().String(), downstreamConn)
proxiedConnection := connection.NewConnection(user, host.SocketAddres(), downstreamConn.RemoteAddr().String(), downstreamConn)
connections[user] = proxiedConnection

proxiedConnection.Debug("New connection")
Expand Down
66 changes: 27 additions & 39 deletions cmd/loadbalancer/server/rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,51 +3,39 @@ package server
import (
"log/slog"
"lukas8219/websocket-operator/cmd/loadbalancer/connection"
"lukas8219/websocket-operator/internal/route"
rslv "lukas8219/websocket-operator/internal/resolver"
"time"
)

func handleRebalanceLoop(router route.RouterImpl, connections map[string]*connection.Connection) {
slog.Debug("Starting rebalance loop")
for {
select {
case hosts := <-router.RebalanceRequests():
slog.Debug("Received message to rebalance", "hosts", hosts)
upstreamHostsToConnectionTracker := make(map[string]*connection.Connection, len(connections))
slog.Debug("Flat mapping ConnectionTracker to upstreamHosts", "connections", connections)
for _, connectionTracker := range connections {
upstreamHostsToConnectionTracker[connectionTracker.User()] = connectionTracker
func handleRebalanceLoop(resolver rslv.Resolver, connections map[string]*connection.Connection) {
slog.Info("Starting rebalance loop")
for _ = range resolver.VersionUpgradeChannel() {
upstreamHostsToConnectionTracker := make(map[string]*connection.Connection, len(connections))
for user, connectionTracker := range connections {
newHost, err := resolver.Lookup([]byte(user))
if err != nil {
panic(err) //TODO
}
for _, affectedHost := range hosts {
recipientId := affectedHost[0]
newHost := affectedHost[1]
connectionTracker := upstreamHostsToConnectionTracker[recipientId]
if connectionTracker == nil {
slog.Debug("No connection tracker found", "user", recipientId)
continue
}
oldHost := connectionTracker.UpstreamHost()
if connectionTracker.UpstreamHost() == newHost {
connectionTracker.Debug("No need to rebalance")
continue
}
connectionTracker.Debug("Waiting for upstream to cancel", "oldHost", oldHost)
connectionTracker.SwitchUpstreamHost(newHost)

select {
case <-connectionTracker.UpstreamCancelChan():
connectionTracker.Debug("Successfully received cancellation signal")
case <-time.After(5 * time.Second):
connectionTracker.Error("Timeout waiting for upstream cancellation, proceeding anyway")
}
previousHost := connectionTracker.UpstreamHost()
if previousHost == newHost.SocketAddres() {
connectionTracker.Debug("No need to rebalance")
continue
}
connectionTracker.Debug("Waiting for upstream to cancel", "previous", previousHost)
connectionTracker.SwitchUpstreamHost(newHost.SocketAddres())

connections[recipientId] = connectionTracker
//TODO: gut feeling here. either we move rebalance to the connection pkg or we re-design stuff
//connectionTracker.UpstreamContext, connectionTracker.CancelUpstream = context.WithCancel(context.Background())
connectionTracker.Info("Rebalancing connection from", "old", oldHost, "new", newHost)
//TODO: stopping down -> up could cause issues if this is mid read/write
go connectionTracker.Handle()
select {
case <-connectionTracker.UpstreamCancelChan():
connectionTracker.Debug("Successfully received cancellation signal")
case <-time.After(5 * time.Second):
connectionTracker.Error("Timeout waiting for upstream cancellation, proceeding anyway")
}
//TODO: gut feeling here. either we move rebalance to the connection pkg or we re-design stuff
//connectionTracker.UpstreamContext, connectionTracker.CancelUpstream = context.WithCancel(context.Background())
connectionTracker.Info("Rebalancing connection from", "previous", previousHost, "new", newHost)
//TODO: stopping down -> up could cause issues if this is mid read/write
go connectionTracker.Handle()
upstreamHostsToConnectionTracker[connectionTracker.User()] = connectionTracker
}
}
}
38 changes: 16 additions & 22 deletions cmd/loadbalancer/server/rebalance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"context"
"io"
"log"
"log/slog"
"lukas8219/websocket-operator/cmd/loadbalancer/connection"
"lukas8219/websocket-operator/internal/consistent_hashing"
"lukas8219/websocket-operator/internal/peer_discovery"
"lukas8219/websocket-operator/internal/resolver"
"net"
"sync"
"testing"
Expand All @@ -15,22 +17,6 @@ import (
"github.com/gobwas/ws"
)

type MockRouter struct {
rebalanceChan chan [][2]string
*slog.Logger
}

func (m *MockRouter) RebalanceRequests() <-chan [][2]string {
return m.rebalanceChan
}

func (m *MockRouter) Route(string) string { return "" }
func (m *MockRouter) Add([]string) {}
func (m *MockRouter) GetAllUpstreamHosts() []string {
return []string{}
}
func (m *MockRouter) InitializeHosts() error { return nil }

type NetConnectionMock struct {
net.Conn
remoteAddr net.Addr
Expand Down Expand Up @@ -118,12 +104,15 @@ func NewMockConnection(user, upstreamHost string, downstreamConn net.Conn, wsDia
}

func TestHandleRebalanceLoop(t *testing.T) {
mockRouter := &MockRouter{
rebalanceChan: make(chan [][2]string, 1),
}
memoryDiscoveryBackend := peer_discovery.NewInMemoryPeerDiscovery()
mockResolver := resolver.New(
memoryDiscoveryBackend,
consistent_hashing.NewJumpHash(memoryDiscoveryBackend),
)
go mockResolver.Init()
connections := make(map[string]*connection.Connection)

go handleRebalanceLoop(mockRouter, connections)
go handleRebalanceLoop(mockResolver, connections)

t.Run("Sucessfully rebalanced", func(t *testing.T) {
mockDownstreamConn := &NetConnectionMock{
Expand All @@ -139,7 +128,12 @@ func TestHandleRebalanceLoop(t *testing.T) {

mockConn.Tracker.UpstreamCancelChan() <- 1
time.Sleep(100 * time.Millisecond)
mockRouter.rebalanceChan <- [][2]string{{mockConn.Tracker.User(), "new-host:3000"}}
t.Log("Before atomic")
memoryDiscoveryBackend.AtomicOperation(
[]peer_discovery.Peer{peer_discovery.NewPeer("new-host", 3000)},
[]peer_discovery.Peer{peer_discovery.NewPeer("old-host", 3000)},
)
t.Log("After atomic")
time.Sleep(100 * time.Millisecond)

if mockConn.UpstreamHost() != "new-host:3000" {
Expand Down
20 changes: 14 additions & 6 deletions cmd/loadbalancer/server/server.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,30 @@
package server

import (
"encoding/json"
"log/slog"
"lukas8219/websocket-operator/cmd/loadbalancer/connection"
"lukas8219/websocket-operator/internal/route"
"lukas8219/websocket-operator/internal/resolver"
"net/http"
)

type ServerConfig struct {
Router route.RouterImpl
Port string
Resolver *resolver.Resolver
Port string
}

func StartServer(config ServerConfig) {
slog.Info("Starting load balancer server", "port", config.Port)
router := config.Router
connections := make(map[string]*connection.Connection) //TODO: This could be a broadcast instead of a single recipient/connection
go handleRebalanceLoop(router, connections)

go handleRebalanceLoop(*config.Resolver, connections)
//TODO how to properly test this - aka not having a server running at all
http.ListenAndServe("0.0.0.0:"+config.Port, createHandler(router, connections))
go http.ListenAndServe("0.0.0.0:"+config.Port, createHandler(config.Resolver, connections))

http.ListenAndServe("0.0.0.0:8081", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
hosts, _ := (*config.Resolver).CurrentHosts()
content, _ := json.Marshal(hosts)
w.WriteHeader(200)
w.Write(content)
}))
}
Loading
Loading