Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions pkg/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,11 @@ func (s *Proxy) handleWebsocket(w http.ResponseWriter, req *http.Request) {
}

proxy.kubernetesAPIServerProxy.rwLock.Lock()
proxy.kubernetesAPIServerProxy.httpClient = append(proxy.kubernetesAPIServerProxy.httpClient, &http.Client{Transport: k8sTransport})
proxy.kubernetesAPIServerProxy.Clients = append(proxy.kubernetesAPIServerProxy.Clients, &ApiClient{client: &http.Client{Transport: k8sTransport}, bearerToken: c.BearerToken})
proxy.kubernetesAPIServerProxy.rwLock.Unlock()

proxy.kubesphereAPIServerProxy.rwLock.Lock()
proxy.kubesphereAPIServerProxy.httpClient = append(proxy.kubesphereAPIServerProxy.httpClient, &http.Client{Transport: ksTransport})
proxy.kubesphereAPIServerProxy.Clients = append(proxy.kubesphereAPIServerProxy.Clients, &ApiClient{client: &http.Client{Transport: ksTransport}, bearerToken: c.BearerToken})
proxy.kubesphereAPIServerProxy.rwLock.Unlock()
}

Expand All @@ -283,11 +283,11 @@ func (s *Proxy) handleWebsocket(w http.ResponseWriter, req *http.Request) {

proxy.kubernetesAPIServerProxy.rwLock.Lock()
defer proxy.kubernetesAPIServerProxy.rwLock.Unlock()
k8sLen := len(proxy.kubernetesAPIServerProxy.httpClient)
k8sLen := len(proxy.kubernetesAPIServerProxy.Clients)

proxy.kubesphereAPIServerProxy.rwLock.Lock()
defer proxy.kubesphereAPIServerProxy.rwLock.Unlock()
ksLen := len(proxy.kubesphereAPIServerProxy.httpClient)
ksLen := len(proxy.kubesphereAPIServerProxy.Clients)

// httpClientLength <= 1 means there is not enough agent connection
// we need to delete the key, call cancel(), update cluster status
Expand All @@ -300,18 +300,18 @@ func (s *Proxy) handleWebsocket(w http.ResponseWriter, req *http.Request) {
return s.Update(client, false)
})
} else {
for i, v := range proxy.kubernetesAPIServerProxy.httpClient {
if v.Transport == k8sTransport {
proxy.kubernetesAPIServerProxy.httpClient = append(proxy.kubernetesAPIServerProxy.httpClient[:i],
proxy.kubernetesAPIServerProxy.httpClient[i+1:]...)
for i, v := range proxy.kubernetesAPIServerProxy.Clients {
if v.client.Transport == k8sTransport {
proxy.kubernetesAPIServerProxy.Clients = append(proxy.kubernetesAPIServerProxy.Clients[:i],
proxy.kubernetesAPIServerProxy.Clients[i+1:]...)
break
}
}

for i, v := range proxy.kubesphereAPIServerProxy.httpClient {
if v.Transport == ksTransport {
proxy.kubesphereAPIServerProxy.httpClient = append(proxy.kubesphereAPIServerProxy.httpClient[:i],
proxy.kubesphereAPIServerProxy.httpClient[i+1:]...)
for i, v := range proxy.kubesphereAPIServerProxy.Clients {
if v.client.Transport == ksTransport {
proxy.kubesphereAPIServerProxy.Clients = append(proxy.kubesphereAPIServerProxy.Clients[:i],
proxy.kubesphereAPIServerProxy.Clients[i+1:]...)
break
}
}
Expand Down
36 changes: 20 additions & 16 deletions pkg/proxy/proxy_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ import (
"kubesphere.io/tower/pkg/utils"
)

// Each kubernetesClient has a bearerToken. The bearerToken has an expiration date.
type ApiClient struct {
client *http.Client
bearerToken []byte
}

type Server struct {
// Server name used to identify
name string
Expand All @@ -35,17 +41,14 @@ type Server struct {
//
server *http.Server

// http client to do the real proxy
httpClient []*http.Client
// kubernetes client to do the real proxy
Clients []*ApiClient

// RWMutex to implement safe operation while read or update httpClient Slice
rwLock sync.RWMutex

// Whether to use bearer token, if false, need to pass TLS client certificates
useBearerToken bool

// Bearer token to do oauth
bearerToken []byte
}

func newProxyServer(name, host, scheme string, port uint16, useBearerToken bool, transport *http.Transport, servertlsConfig *tls.Config, bearerToken []byte) (*Server, error) {
Expand All @@ -60,11 +63,13 @@ func newProxyServer(name, host, scheme string, port uint16, useBearerToken bool,
scheme: scheme,
port: port,
server: server,
httpClient: []*http.Client{
{Transport: transport},
Clients: []*ApiClient{
{
client: &http.Client{Transport: transport},
bearerToken: bearerToken,
},
},
useBearerToken: useBearerToken,
bearerToken: bearerToken,
}, nil
}

Expand Down Expand Up @@ -168,17 +173,16 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
u.Host = s.host
u.Scheme = s.scheme

if s.useBearerToken && len(s.bearerToken) > 0 {
req = utilnet.CloneRequest(req)
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", s.bearerToken))
}

// we choose one httpClient randomly
rand.Seed(time.Now().UnixNano())
s.rwLock.RLock()
index := rand.Intn(len(s.httpClient))
klog.V(5).Infof("server %s current agent connection length %d, random slice index %d", s.name, len(s.httpClient), index)
httpProxy := k8sproxy.NewUpgradeAwareHandler(&u, s.httpClient[index].Transport, false, false, s)
index := rand.Intn(len(s.Clients))
if s.useBearerToken && len(s.Clients[index].bearerToken) > 0 {
req = utilnet.CloneRequest(req)
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", s.Clients[index].bearerToken))
}
klog.V(5).Infof("server %s current agent connection length %d, random slice index %d", s.name, len(s.Clients), index)
httpProxy := k8sproxy.NewUpgradeAwareHandler(&u, s.Clients[index].client.Transport, false, false, s)
s.rwLock.RUnlock()
httpProxy.ServeHTTP(w, req)
}
Expand Down