diff --git a/Makefile b/Makefile index 3e8e87621..7e8c36531 100644 --- a/Makefile +++ b/Makefile @@ -60,7 +60,8 @@ bin: .PHONY: build build: bin/proxy-agent bin/proxy-server bin/proxy-test-client bin/http-test-server -bin/proxy-agent: proto/agent/agent.pb.go konnectivity-client/proto/client/client.pb.go bin cmd/agent/main.go +agent_targets := $(wildcard cmd/agent/*.go pkg/agent/*.go pkg/util/*.go) +bin/proxy-agent: proto/agent/agent.pb.go konnectivity-client/proto/client/client.pb.go bin $(agent_targets) GO111MODULE=on go build -o bin/proxy-agent cmd/agent/main.go bin/proxy-test-client: konnectivity-client/proto/client/client.pb.go bin cmd/client/main.go @@ -69,7 +70,8 @@ bin/proxy-test-client: konnectivity-client/proto/client/client.pb.go bin cmd/cli bin/http-test-server: bin cmd/test-server/main.go GO111MODULE=on go build -o bin/http-test-server cmd/test-server/main.go -bin/proxy-server: proto/agent/agent.pb.go konnectivity-client/proto/client/client.pb.go bin cmd/server/main.go +server_targets := $(wildcard cmd/server/*.go pkg/server/*.go pkg/util/*.go) +bin/proxy-server: proto/agent/agent.pb.go konnectivity-client/proto/client/client.pb.go bin $(server_targets) GO111MODULE=on go build -o bin/proxy-server cmd/server/main.go ## -------------------------------------- diff --git a/cmd/client/main.go b/cmd/client/main.go index d01f31df4..b325360bd 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -250,6 +250,7 @@ func (c *Client) run(o *GrpcProxyClientOptions) error { time.Sleep(wait) } } + client.CloseIdleConnections() return nil } diff --git a/cmd/server/main.go b/cmd/server/main.go index fe9df1b14..7ef70e665 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -358,7 +358,7 @@ func (p *Proxy) run(o *ProxyRunOptions) error { KubernetesClient: k8sClient, AuthenticationAudience: o.authenticationAudience, } - server := server.NewProxyServer(o.serverID, int(o.serverCount), authOpt) + server := server.NewProxyServer(o.serverID, int(o.serverCount), authOpt, o.keepaliveTime) klog.V(1).Infoln("Starting master server for client connections.") masterStop, err := p.runMasterServer(ctx, o, server) if err != nil { diff --git a/cmd/test-server/main.go b/cmd/test-server/main.go index 78f7fc6b3..bf1b6bba2 100644 --- a/cmd/test-server/main.go +++ b/cmd/test-server/main.go @@ -144,7 +144,7 @@ func SetupSignalHandler() (stopCh <-chan struct{}) { } func returnSuccess(w http.ResponseWriter, req *http.Request) { - fmt.Fprintf(w, "\n\n \n Success\n \n \n

The success test page!

\n \n") + fmt.Fprintf(w, "\n\n \n Success on Jura\n \n \n

The success test page!

\n \n") } func returnError(w http.ResponseWriter, req *http.Request) { diff --git a/go.mod b/go.mod index bcc3e1c5e..f55b0fe5c 100644 --- a/go.mod +++ b/go.mod @@ -3,21 +3,20 @@ module sigs.k8s.io/apiserver-network-proxy go 1.12 require ( - github.com/golang/mock v1.4.0 + github.com/golang/mock v1.4.4 github.com/golang/protobuf v1.4.2 github.com/google/uuid v1.1.1 github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/prometheus/client_golang v1.7.1 github.com/spf13/cobra v0.0.3 github.com/spf13/pflag v1.0.5 - golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 // indirect - golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135 // indirect + golang.org/x/net v0.0.0-20191004110552-13f9640d40b9 google.golang.org/grpc v1.27.0 - honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc // indirect k8s.io/api v0.17.1 k8s.io/apimachinery v0.17.1 k8s.io/client-go v0.17.1 k8s.io/klog/v2 v2.0.0 + rsc.io/quote/v3 v3.1.0 // indirect sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.0 ) diff --git a/go.sum b/go.sum index ff3fa9fb9..9dd351d28 100644 --- a/go.sum +++ b/go.sum @@ -62,6 +62,8 @@ github.com/golang/mock v1.2.0 h1:28o5sBqPkBsMGnC6b4MvE2TzSr5/AT4c/1fLqVGIwlk= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.4.0 h1:Rd1kQnQu0Hq3qvJppYSG0HtP+f5LPPUiDswTLiEegLg= github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= +github.com/golang/mock v1.4.4 h1:l75CXGRSwbaYNpl/Z2X1XIIAMSCquvXgpVZDhwEIJsc= +github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4= github.com/golang/protobuf v0.0.0-20161109072736-4bd1920723d7/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= diff --git a/konnectivity-client/go.mod b/konnectivity-client/go.mod index 8269b083e..4d432a0c2 100644 --- a/konnectivity-client/go.mod +++ b/konnectivity-client/go.mod @@ -4,6 +4,9 @@ go 1.13 require ( github.com/golang/protobuf v1.4.0 + golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 // indirect + golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135 // indirect google.golang.org/grpc v1.27.0 + honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc // indirect k8s.io/klog/v2 v2.0.0 ) diff --git a/konnectivity-client/proto/client/client.pb.go b/konnectivity-client/proto/client/client.pb.go index 107f2e546..fcc061630 100644 --- a/konnectivity-client/proto/client/client.pb.go +++ b/konnectivity-client/proto/client/client.pb.go @@ -16,16 +16,15 @@ limitations under the License. // Code generated by protoc-gen-go. DO NOT EDIT. // source: konnectivity-client/proto/client/client.proto -package client +package client // import "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" import ( - context "context" - fmt "fmt" - proto "github.com/golang/protobuf/proto" + context "golang.org/x/net/context" grpc "google.golang.org/grpc" - codes "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" - math "math" ) // Reference imports to suppress errors if they are not otherwise used. @@ -37,7 +36,7 @@ var _ = math.Inf // is compatible with the proto package it is being compiled against. // A compilation error at this line likely means your copy of the // proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package type PacketType int32 @@ -56,7 +55,6 @@ var PacketType_name = map[int32]string{ 3: "CLOSE_RSP", 4: "DATA", } - var PacketType_value = map[string]int32{ "DIAL_REQ": 0, "DIAL_RSP": 1, @@ -68,9 +66,8 @@ var PacketType_value = map[string]int32{ func (x PacketType) String() string { return proto.EnumName(PacketType_name, int32(x)) } - func (PacketType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_fec4258d9ecd175d, []int{0} + return fileDescriptor_client_500d940c91b4a21e, []int{0} } type Error int32 @@ -82,7 +79,6 @@ const ( var Error_name = map[int32]string{ 0: "EOF", } - var Error_value = map[string]int32{ "EOF": 0, } @@ -90,9 +86,8 @@ var Error_value = map[string]int32{ func (x Error) String() string { return proto.EnumName(Error_name, int32(x)) } - func (Error) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_fec4258d9ecd175d, []int{1} + return fileDescriptor_client_500d940c91b4a21e, []int{1} } type Packet struct { @@ -113,17 +108,16 @@ func (m *Packet) Reset() { *m = Packet{} } func (m *Packet) String() string { return proto.CompactTextString(m) } func (*Packet) ProtoMessage() {} func (*Packet) Descriptor() ([]byte, []int) { - return fileDescriptor_fec4258d9ecd175d, []int{0} + return fileDescriptor_client_500d940c91b4a21e, []int{0} } - func (m *Packet) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Packet.Unmarshal(m, b) } func (m *Packet) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_Packet.Marshal(b, m, deterministic) } -func (m *Packet) XXX_Merge(src proto.Message) { - xxx_messageInfo_Packet.Merge(m, src) +func (dst *Packet) XXX_Merge(src proto.Message) { + xxx_messageInfo_Packet.Merge(dst, src) } func (m *Packet) XXX_Size() int { return xxx_messageInfo_Packet.Size(m) @@ -217,9 +211,9 @@ func (m *Packet) GetCloseResponse() *CloseResponse { return nil } -// XXX_OneofWrappers is for the internal use of the proto package. -func (*Packet) XXX_OneofWrappers() []interface{} { - return []interface{}{ +// XXX_OneofFuncs is for the internal use of the proto package. +func (*Packet) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) { + return _Packet_OneofMarshaler, _Packet_OneofUnmarshaler, _Packet_OneofSizer, []interface{}{ (*Packet_DialRequest)(nil), (*Packet_DialResponse)(nil), (*Packet_Data)(nil), @@ -228,6 +222,126 @@ func (*Packet) XXX_OneofWrappers() []interface{} { } } +func _Packet_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { + m := msg.(*Packet) + // payload + switch x := m.Payload.(type) { + case *Packet_DialRequest: + b.EncodeVarint(2<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.DialRequest); err != nil { + return err + } + case *Packet_DialResponse: + b.EncodeVarint(3<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.DialResponse); err != nil { + return err + } + case *Packet_Data: + b.EncodeVarint(4<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.Data); err != nil { + return err + } + case *Packet_CloseRequest: + b.EncodeVarint(5<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.CloseRequest); err != nil { + return err + } + case *Packet_CloseResponse: + b.EncodeVarint(6<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.CloseResponse); err != nil { + return err + } + case nil: + default: + return fmt.Errorf("Packet.Payload has unexpected type %T", x) + } + return nil +} + +func _Packet_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) { + m := msg.(*Packet) + switch tag { + case 2: // payload.dialRequest + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(DialRequest) + err := b.DecodeMessage(msg) + m.Payload = &Packet_DialRequest{msg} + return true, err + case 3: // payload.dialResponse + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(DialResponse) + err := b.DecodeMessage(msg) + m.Payload = &Packet_DialResponse{msg} + return true, err + case 4: // payload.data + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(Data) + err := b.DecodeMessage(msg) + m.Payload = &Packet_Data{msg} + return true, err + case 5: // payload.closeRequest + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(CloseRequest) + err := b.DecodeMessage(msg) + m.Payload = &Packet_CloseRequest{msg} + return true, err + case 6: // payload.closeResponse + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(CloseResponse) + err := b.DecodeMessage(msg) + m.Payload = &Packet_CloseResponse{msg} + return true, err + default: + return false, nil + } +} + +func _Packet_OneofSizer(msg proto.Message) (n int) { + m := msg.(*Packet) + // payload + switch x := m.Payload.(type) { + case *Packet_DialRequest: + s := proto.Size(x.DialRequest) + n += 1 // tag and wire + n += proto.SizeVarint(uint64(s)) + n += s + case *Packet_DialResponse: + s := proto.Size(x.DialResponse) + n += 1 // tag and wire + n += proto.SizeVarint(uint64(s)) + n += s + case *Packet_Data: + s := proto.Size(x.Data) + n += 1 // tag and wire + n += proto.SizeVarint(uint64(s)) + n += s + case *Packet_CloseRequest: + s := proto.Size(x.CloseRequest) + n += 1 // tag and wire + n += proto.SizeVarint(uint64(s)) + n += s + case *Packet_CloseResponse: + s := proto.Size(x.CloseResponse) + n += 1 // tag and wire + n += proto.SizeVarint(uint64(s)) + n += s + case nil: + default: + panic(fmt.Sprintf("proto: unexpected type %T in oneof", x)) + } + return n +} + type DialRequest struct { // tcp or udp? Protocol string `protobuf:"bytes,1,opt,name=protocol,proto3" json:"protocol,omitempty"` @@ -244,17 +358,16 @@ func (m *DialRequest) Reset() { *m = DialRequest{} } func (m *DialRequest) String() string { return proto.CompactTextString(m) } func (*DialRequest) ProtoMessage() {} func (*DialRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_fec4258d9ecd175d, []int{1} + return fileDescriptor_client_500d940c91b4a21e, []int{1} } - func (m *DialRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_DialRequest.Unmarshal(m, b) } func (m *DialRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_DialRequest.Marshal(b, m, deterministic) } -func (m *DialRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_DialRequest.Merge(m, src) +func (dst *DialRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_DialRequest.Merge(dst, src) } func (m *DialRequest) XXX_Size() int { return xxx_messageInfo_DialRequest.Size(m) @@ -302,17 +415,16 @@ func (m *DialResponse) Reset() { *m = DialResponse{} } func (m *DialResponse) String() string { return proto.CompactTextString(m) } func (*DialResponse) ProtoMessage() {} func (*DialResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_fec4258d9ecd175d, []int{2} + return fileDescriptor_client_500d940c91b4a21e, []int{2} } - func (m *DialResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_DialResponse.Unmarshal(m, b) } func (m *DialResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_DialResponse.Marshal(b, m, deterministic) } -func (m *DialResponse) XXX_Merge(src proto.Message) { - xxx_messageInfo_DialResponse.Merge(m, src) +func (dst *DialResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_DialResponse.Merge(dst, src) } func (m *DialResponse) XXX_Size() int { return xxx_messageInfo_DialResponse.Size(m) @@ -356,17 +468,16 @@ func (m *CloseRequest) Reset() { *m = CloseRequest{} } func (m *CloseRequest) String() string { return proto.CompactTextString(m) } func (*CloseRequest) ProtoMessage() {} func (*CloseRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_fec4258d9ecd175d, []int{3} + return fileDescriptor_client_500d940c91b4a21e, []int{3} } - func (m *CloseRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_CloseRequest.Unmarshal(m, b) } func (m *CloseRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_CloseRequest.Marshal(b, m, deterministic) } -func (m *CloseRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_CloseRequest.Merge(m, src) +func (dst *CloseRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_CloseRequest.Merge(dst, src) } func (m *CloseRequest) XXX_Size() int { return xxx_messageInfo_CloseRequest.Size(m) @@ -398,17 +509,16 @@ func (m *CloseResponse) Reset() { *m = CloseResponse{} } func (m *CloseResponse) String() string { return proto.CompactTextString(m) } func (*CloseResponse) ProtoMessage() {} func (*CloseResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_fec4258d9ecd175d, []int{4} + return fileDescriptor_client_500d940c91b4a21e, []int{4} } - func (m *CloseResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_CloseResponse.Unmarshal(m, b) } func (m *CloseResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_CloseResponse.Marshal(b, m, deterministic) } -func (m *CloseResponse) XXX_Merge(src proto.Message) { - xxx_messageInfo_CloseResponse.Merge(m, src) +func (dst *CloseResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_CloseResponse.Merge(dst, src) } func (m *CloseResponse) XXX_Size() int { return xxx_messageInfo_CloseResponse.Size(m) @@ -449,17 +559,16 @@ func (m *Data) Reset() { *m = Data{} } func (m *Data) String() string { return proto.CompactTextString(m) } func (*Data) ProtoMessage() {} func (*Data) Descriptor() ([]byte, []int) { - return fileDescriptor_fec4258d9ecd175d, []int{5} + return fileDescriptor_client_500d940c91b4a21e, []int{5} } - func (m *Data) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Data.Unmarshal(m, b) } func (m *Data) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { return xxx_messageInfo_Data.Marshal(b, m, deterministic) } -func (m *Data) XXX_Merge(src proto.Message) { - xxx_messageInfo_Data.Merge(m, src) +func (dst *Data) XXX_Merge(src proto.Message) { + xxx_messageInfo_Data.Merge(dst, src) } func (m *Data) XXX_Size() int { return xxx_messageInfo_Data.Size(m) @@ -492,52 +601,14 @@ func (m *Data) GetData() []byte { } func init() { - proto.RegisterEnum("PacketType", PacketType_name, PacketType_value) - proto.RegisterEnum("Error", Error_name, Error_value) proto.RegisterType((*Packet)(nil), "Packet") proto.RegisterType((*DialRequest)(nil), "DialRequest") proto.RegisterType((*DialResponse)(nil), "DialResponse") proto.RegisterType((*CloseRequest)(nil), "CloseRequest") proto.RegisterType((*CloseResponse)(nil), "CloseResponse") proto.RegisterType((*Data)(nil), "Data") -} - -func init() { - proto.RegisterFile("konnectivity-client/proto/client/client.proto", fileDescriptor_fec4258d9ecd175d) -} - -var fileDescriptor_fec4258d9ecd175d = []byte{ - // 472 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x53, 0xd1, 0x6e, 0x9b, 0x30, - 0x14, 0x85, 0x00, 0x49, 0xb8, 0x21, 0x15, 0xb2, 0xa6, 0x09, 0x75, 0x93, 0x5a, 0xf1, 0x14, 0x55, - 0x0b, 0x54, 0xa9, 0x34, 0xed, 0x35, 0x0d, 0xa9, 0x52, 0xa9, 0x5a, 0x99, 0xd3, 0xa7, 0xee, 0x61, - 0xf2, 0xc0, 0x9a, 0x50, 0x18, 0x66, 0xb6, 0x97, 0x8d, 0x0f, 0xda, 0x7f, 0x4e, 0x18, 0x52, 0xc8, - 0xa4, 0x6d, 0x52, 0x9f, 0xe0, 0x1c, 0xdf, 0x73, 0x7c, 0x7d, 0xae, 0x0d, 0xf3, 0x1d, 0x2b, 0x0a, - 0x9a, 0xc8, 0x6c, 0x9f, 0xc9, 0x6a, 0x9e, 0xe4, 0x19, 0x2d, 0x64, 0x58, 0x72, 0x26, 0x59, 0xd8, - 0x82, 0xe6, 0x13, 0x28, 0xce, 0xff, 0x35, 0x80, 0x61, 0x4c, 0x92, 0x1d, 0x95, 0xe8, 0x0c, 0x4c, - 0x59, 0x95, 0xd4, 0xd3, 0xcf, 0xf5, 0xd9, 0xc9, 0x62, 0x12, 0x34, 0xf4, 0x43, 0x55, 0x52, 0xac, - 0x16, 0xd0, 0x25, 0x4c, 0xd2, 0x8c, 0xe4, 0x98, 0x7e, 0xfb, 0x4e, 0x85, 0xf4, 0x06, 0xe7, 0xfa, - 0x6c, 0xb2, 0x70, 0x82, 0xa8, 0xe3, 0x36, 0x1a, 0xee, 0x97, 0xa0, 0x2b, 0x70, 0x1a, 0x28, 0x4a, - 0x56, 0x08, 0xea, 0x19, 0x4a, 0x32, 0x6d, 0x25, 0x0d, 0xb9, 0xd1, 0xf0, 0x51, 0x11, 0x7a, 0x05, - 0x66, 0x4a, 0x24, 0xf1, 0x4c, 0x55, 0x6c, 0x05, 0x11, 0x91, 0x64, 0xa3, 0x61, 0x45, 0xd6, 0x8e, - 0x49, 0xce, 0x04, 0x3d, 0x34, 0x61, 0xb5, 0x8e, 0xab, 0x1e, 0x59, 0x3b, 0xf6, 0x8b, 0xd0, 0x5b, - 0x98, 0xb6, 0xb8, 0xed, 0x63, 0xa8, 0x54, 0x27, 0x07, 0xd5, 0x53, 0x23, 0xc7, 0x65, 0xd7, 0x36, - 0x8c, 0x4a, 0x52, 0xe5, 0x8c, 0xa4, 0xfe, 0x47, 0x98, 0xf4, 0xce, 0x89, 0x4e, 0x61, 0xac, 0xf2, - 0x4b, 0x58, 0xae, 0xf2, 0xb2, 0xf1, 0x13, 0x46, 0x1e, 0x8c, 0x48, 0x9a, 0x72, 0x2a, 0x84, 0x8a, - 0xc8, 0xc6, 0x07, 0x88, 0x5e, 0xc2, 0x90, 0x93, 0x22, 0x65, 0x5f, 0x55, 0x10, 0x06, 0x6e, 0x91, - 0xff, 0x08, 0x4e, 0x3f, 0x11, 0xf4, 0x02, 0x2c, 0xca, 0x39, 0xe3, 0xad, 0x75, 0x03, 0xd0, 0x6b, - 0xb0, 0x93, 0x66, 0xb6, 0xb7, 0x91, 0x72, 0x36, 0x70, 0x47, 0xfc, 0xd5, 0xfb, 0x0d, 0x38, 0xfd, - 0x6c, 0x8e, 0x5d, 0xf4, 0x3f, 0x5c, 0xfc, 0x15, 0x4c, 0x8f, 0x32, 0x79, 0x4e, 0x2b, 0xfe, 0x7b, - 0x30, 0xeb, 0x99, 0xfd, 0x7b, 0xab, 0xce, 0x79, 0xd0, 0x77, 0x46, 0xed, 0xf0, 0xeb, 0x43, 0x38, - 0xcd, 0xcc, 0x2f, 0x62, 0x80, 0xee, 0x2e, 0x22, 0x07, 0xc6, 0xd1, 0xed, 0xf2, 0xee, 0x13, 0x5e, - 0x7f, 0x70, 0xb5, 0x0e, 0x6d, 0x63, 0x57, 0x47, 0x53, 0xb0, 0x57, 0x77, 0xf7, 0xdb, 0xb5, 0x5a, - 0x1c, 0xf4, 0xe0, 0x36, 0x76, 0x0d, 0x34, 0x06, 0x33, 0x5a, 0x3e, 0x2c, 0x5d, 0xf3, 0xc2, 0x05, - 0x6b, 0xad, 0xb6, 0x1b, 0x81, 0xb1, 0xbe, 0xbf, 0x71, 0xb5, 0x45, 0x08, 0x4e, 0xcc, 0xd9, 0xcf, - 0x6a, 0x4b, 0xf9, 0x3e, 0x4b, 0x28, 0x3a, 0x03, 0x4b, 0x61, 0x34, 0x6a, 0xdf, 0xc1, 0xe9, 0xe1, - 0xc7, 0xd7, 0x66, 0xfa, 0xa5, 0x7e, 0x7d, 0xf3, 0x18, 0x89, 0xec, 0x8b, 0x08, 0x76, 0xef, 0x44, - 0x90, 0xb1, 0x90, 0x94, 0x99, 0xa0, 0x7c, 0x4f, 0xf9, 0xbc, 0xa0, 0xf2, 0x07, 0xe3, 0xbb, 0x79, - 0x59, 0xcb, 0xc3, 0xff, 0xbd, 0xc6, 0xcf, 0x43, 0x85, 0xae, 0x7e, 0x07, 0x00, 0x00, 0xff, 0xff, - 0x64, 0xe0, 0x62, 0xbe, 0xb8, 0x03, 0x00, 0x00, + proto.RegisterEnum("PacketType", PacketType_name, PacketType_value) + proto.RegisterEnum("Error", Error_name, Error_value) } // Reference imports to suppress errors if they are not otherwise used. @@ -599,14 +670,6 @@ type ProxyServiceServer interface { Proxy(ProxyService_ProxyServer) error } -// UnimplementedProxyServiceServer can be embedded to have forward compatible implementations. -type UnimplementedProxyServiceServer struct { -} - -func (*UnimplementedProxyServiceServer) Proxy(srv ProxyService_ProxyServer) error { - return status.Errorf(codes.Unimplemented, "method Proxy not implemented") -} - func RegisterProxyServiceServer(s *grpc.Server, srv ProxyServiceServer) { s.RegisterService(&_ProxyService_serviceDesc, srv) } @@ -651,3 +714,41 @@ var _ProxyService_serviceDesc = grpc.ServiceDesc{ }, Metadata: "konnectivity-client/proto/client/client.proto", } + +func init() { + proto.RegisterFile("konnectivity-client/proto/client/client.proto", fileDescriptor_client_500d940c91b4a21e) +} + +var fileDescriptor_client_500d940c91b4a21e = []byte{ + // 472 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x53, 0xd1, 0x6e, 0x9b, 0x30, + 0x14, 0x85, 0x00, 0x49, 0xb8, 0x21, 0x15, 0xb2, 0xa6, 0x09, 0x75, 0x93, 0x5a, 0xf1, 0x14, 0x55, + 0x0b, 0x54, 0xa9, 0x34, 0xed, 0x35, 0x0d, 0xa9, 0x52, 0xa9, 0x5a, 0x99, 0xd3, 0xa7, 0xee, 0x61, + 0xf2, 0xc0, 0x9a, 0x50, 0x18, 0x66, 0xb6, 0x97, 0x8d, 0x0f, 0xda, 0x7f, 0x4e, 0x18, 0x52, 0xc8, + 0xa4, 0x6d, 0x52, 0x9f, 0xe0, 0x1c, 0xdf, 0x73, 0x7c, 0x7d, 0xae, 0x0d, 0xf3, 0x1d, 0x2b, 0x0a, + 0x9a, 0xc8, 0x6c, 0x9f, 0xc9, 0x6a, 0x9e, 0xe4, 0x19, 0x2d, 0x64, 0x58, 0x72, 0x26, 0x59, 0xd8, + 0x82, 0xe6, 0x13, 0x28, 0xce, 0xff, 0x35, 0x80, 0x61, 0x4c, 0x92, 0x1d, 0x95, 0xe8, 0x0c, 0x4c, + 0x59, 0x95, 0xd4, 0xd3, 0xcf, 0xf5, 0xd9, 0xc9, 0x62, 0x12, 0x34, 0xf4, 0x43, 0x55, 0x52, 0xac, + 0x16, 0xd0, 0x25, 0x4c, 0xd2, 0x8c, 0xe4, 0x98, 0x7e, 0xfb, 0x4e, 0x85, 0xf4, 0x06, 0xe7, 0xfa, + 0x6c, 0xb2, 0x70, 0x82, 0xa8, 0xe3, 0x36, 0x1a, 0xee, 0x97, 0xa0, 0x2b, 0x70, 0x1a, 0x28, 0x4a, + 0x56, 0x08, 0xea, 0x19, 0x4a, 0x32, 0x6d, 0x25, 0x0d, 0xb9, 0xd1, 0xf0, 0x51, 0x11, 0x7a, 0x05, + 0x66, 0x4a, 0x24, 0xf1, 0x4c, 0x55, 0x6c, 0x05, 0x11, 0x91, 0x64, 0xa3, 0x61, 0x45, 0xd6, 0x8e, + 0x49, 0xce, 0x04, 0x3d, 0x34, 0x61, 0xb5, 0x8e, 0xab, 0x1e, 0x59, 0x3b, 0xf6, 0x8b, 0xd0, 0x5b, + 0x98, 0xb6, 0xb8, 0xed, 0x63, 0xa8, 0x54, 0x27, 0x07, 0xd5, 0x53, 0x23, 0xc7, 0x65, 0xd7, 0x36, + 0x8c, 0x4a, 0x52, 0xe5, 0x8c, 0xa4, 0xfe, 0x47, 0x98, 0xf4, 0xce, 0x89, 0x4e, 0x61, 0xac, 0xf2, + 0x4b, 0x58, 0xae, 0xf2, 0xb2, 0xf1, 0x13, 0x46, 0x1e, 0x8c, 0x48, 0x9a, 0x72, 0x2a, 0x84, 0x8a, + 0xc8, 0xc6, 0x07, 0x88, 0x5e, 0xc2, 0x90, 0x93, 0x22, 0x65, 0x5f, 0x55, 0x10, 0x06, 0x6e, 0x91, + 0xff, 0x08, 0x4e, 0x3f, 0x11, 0xf4, 0x02, 0x2c, 0xca, 0x39, 0xe3, 0xad, 0x75, 0x03, 0xd0, 0x6b, + 0xb0, 0x93, 0x66, 0xb6, 0xb7, 0x91, 0x72, 0x36, 0x70, 0x47, 0xfc, 0xd5, 0xfb, 0x0d, 0x38, 0xfd, + 0x6c, 0x8e, 0x5d, 0xf4, 0x3f, 0x5c, 0xfc, 0x15, 0x4c, 0x8f, 0x32, 0x79, 0x4e, 0x2b, 0xfe, 0x7b, + 0x30, 0xeb, 0x99, 0xfd, 0x7b, 0xab, 0xce, 0x79, 0xd0, 0x77, 0x46, 0xed, 0xf0, 0xeb, 0x43, 0x38, + 0xcd, 0xcc, 0x2f, 0x62, 0x80, 0xee, 0x2e, 0x22, 0x07, 0xc6, 0xd1, 0xed, 0xf2, 0xee, 0x13, 0x5e, + 0x7f, 0x70, 0xb5, 0x0e, 0x6d, 0x63, 0x57, 0x47, 0x53, 0xb0, 0x57, 0x77, 0xf7, 0xdb, 0xb5, 0x5a, + 0x1c, 0xf4, 0xe0, 0x36, 0x76, 0x0d, 0x34, 0x06, 0x33, 0x5a, 0x3e, 0x2c, 0x5d, 0xf3, 0xc2, 0x05, + 0x6b, 0xad, 0xb6, 0x1b, 0x81, 0xb1, 0xbe, 0xbf, 0x71, 0xb5, 0x45, 0x08, 0x4e, 0xcc, 0xd9, 0xcf, + 0x6a, 0x4b, 0xf9, 0x3e, 0x4b, 0x28, 0x3a, 0x03, 0x4b, 0x61, 0x34, 0x6a, 0xdf, 0xc1, 0xe9, 0xe1, + 0xc7, 0xd7, 0x66, 0xfa, 0xa5, 0x7e, 0x7d, 0xf3, 0x18, 0x89, 0xec, 0x8b, 0x08, 0x76, 0xef, 0x44, + 0x90, 0xb1, 0x90, 0x94, 0x99, 0xa0, 0x7c, 0x4f, 0xf9, 0xbc, 0xa0, 0xf2, 0x07, 0xe3, 0xbb, 0x79, + 0x59, 0xcb, 0xc3, 0xff, 0xbd, 0xc6, 0xcf, 0x43, 0x85, 0xae, 0x7e, 0x07, 0x00, 0x00, 0xff, 0xff, + 0x64, 0xe0, 0x62, 0xbe, 0xb8, 0x03, 0x00, 0x00, +} diff --git a/pkg/agent/client.go b/pkg/agent/client.go index 3d380b91c..73dc13ee4 100644 --- a/pkg/agent/client.go +++ b/pkg/agent/client.go @@ -130,27 +130,32 @@ func newAgentClient(address, agentID string, cs *ClientSet, opts ...grpc.DialOpt func (a *AgentClient) Connect() (int, error) { conn, err := grpc.Dial(a.address, a.opts...) if err != nil { + klog.V(2).InfoS("WRF: Failed to dial", "address", a.address) return 0, err } ctx := metadata.AppendToOutgoingContext(context.Background(), header.AgentID, a.agentID) if a.serviceAccountTokenPath != "" { if ctx, err = a.initializeAuthContext(ctx); err != nil { conn.Close() + klog.V(2).InfoS("WRF: Failed to init auth", "address", a.address) return 0, err } } stream, err := agent.NewAgentServiceClient(conn).Connect(ctx) if err != nil { + klog.V(2).InfoS("WRF: Failed to connect stream", "address", a.address) conn.Close() return 0, err } serverID, err := serverID(stream) if err != nil { + klog.V(2).InfoS("WRF: Failed to get server ID", "address", a.address) conn.Close() return 0, err } serverCount, err := serverCount(stream) if err != nil { + klog.V(2).InfoS("WRF: Failed to get server count", "address", a.address) conn.Close() return 0, err } diff --git a/pkg/server/backend_manager.go b/pkg/server/backend_manager.go index d16976908..226581b40 100644 --- a/pkg/server/backend_manager.go +++ b/pkg/server/backend_manager.go @@ -18,6 +18,7 @@ package server import ( "context" + "io" "math/rand" "sync" "time" @@ -27,6 +28,8 @@ import ( "sigs.k8s.io/apiserver-network-proxy/proto/agent" ) +var BOT time.Time = time.Unix(0, 0) + type Backend interface { Send(p *client.Packet) error Context() context.Context @@ -40,6 +43,8 @@ type backend struct { // write it using channel. Let's worry about performance later. mu sync.Mutex // mu protects conn conn agent.AgentService_ConnectServer + taintExpires time.Time + agentID string } func (b *backend) Send(p *client.Packet) error { @@ -53,8 +58,12 @@ func (b *backend) Context() context.Context { return b.conn.Context() } -func newBackend(conn agent.AgentService_ConnectServer) *backend { - return &backend{conn: conn} +func (b *backend) GetAgentID() string { + return b.agentID +} + +func newBackend(conn agent.AgentService_ConnectServer, agentID string) *backend { + return &backend{conn: conn, taintExpires: BOT, agentID: agentID} } // BackendStorage is an interface to manage the storage of the backend @@ -62,6 +71,8 @@ func newBackend(conn agent.AgentService_ConnectServer) *backend { type BackendStorage interface { // AddBackend adds a backend. AddBackend(agentID string, conn agent.AgentService_ConnectServer) Backend + // TaintBackend indicates an error occurred on a backend and allows to BackendManager to act based on the error. + TaintBackend(agentID string, err error) // RemoveBackend removes a backend. RemoveBackend(agentID string, conn agent.AgentService_ConnectServer) // NumBackends returns the number of backends. @@ -76,7 +87,7 @@ type BackendManager interface { // context instead of a request-scoped context, as the backend manager will // pick a backend for every tunnel session and each tunnel session may // contains multiple requests. - Backend(ctx context.Context) (Backend, error) + Backend(ctx context.Context) (Backend, string, error) BackendStorage } @@ -87,7 +98,7 @@ type DefaultBackendManager struct { *DefaultBackendStorage } -func (dbm *DefaultBackendManager) Backend(_ context.Context) (Backend, error) { +func (dbm *DefaultBackendManager) Backend(_ context.Context) (Backend, string, error) { return dbm.DefaultBackendStorage.GetRandomBackend() } @@ -99,24 +110,36 @@ type DefaultBackendStorage struct { // traffic, because backends[agentID][1:] are more likely to be closed // by the agent to deduplicate connections to the same server. backends map[string][]*backend - // agentID is tracked in this slice to enable randomly picking an - // agentID in the Backend() method. There is no reliable way to - // randomly pick a key from a map (in this case, the backends) in - // Golang. + // agentIDs is a slice of agentIDs to enable randomly picking one + // in the Backend() method. There is no reliable way to randomly + // pick a key from a map (in this case, the backends) in Golang. agentIDs []string + // untaintedIDs is a slice of agentIDs which are not currently + // marked as tainted to enable randomly picking a known good ID + // in the Backend() method. There is no reliable way to randomly + // pick a key from a map (in this case, the backends) in Golang. + // Consider switching this to a map[string]bool to get set behavior + // A little nasty as length where true is yuck. + untaintedIDs []string + // untaintedIDsmu protects untaintedIDs. To prevent deadlocks, + // we use order locking. You should be already holding mu, prior + // to obtaining untaintedIDsmu. + untaintedIDsmu sync.RWMutex random *rand.Rand + keepaliveTime time.Duration } // NewDefaultBackendManager returns a DefaultBackendManager. -func NewDefaultBackendManager() *DefaultBackendManager { - return &DefaultBackendManager{DefaultBackendStorage: NewDefaultBackendStorage()} +func NewDefaultBackendManager(keepaliveTime time.Duration) *DefaultBackendManager { + return &DefaultBackendManager{DefaultBackendStorage: NewDefaultBackendStorage(keepaliveTime)} } // NewDefaultBackendStorage returns a DefaultBackendStorage -func NewDefaultBackendStorage() *DefaultBackendStorage { +func NewDefaultBackendStorage(keepaliveTime time.Duration) *DefaultBackendStorage { return &DefaultBackendStorage{ - backends: make(map[string][]*backend), - random: rand.New(rand.NewSource(time.Now().UnixNano())), + backends: make(map[string][]*backend), + random: rand.New(rand.NewSource(time.Now().UnixNano())), + keepaliveTime: keepaliveTime, } } @@ -126,7 +149,7 @@ func (s *DefaultBackendStorage) AddBackend(agentID string, conn agent.AgentServi s.mu.Lock() defer s.mu.Unlock() _, ok := s.backends[agentID] - addedBackend := newBackend(conn) + addedBackend := newBackend(conn, agentID) if ok { for _, v := range s.backends[agentID] { if v.conn == conn { @@ -139,9 +162,31 @@ func (s *DefaultBackendStorage) AddBackend(agentID string, conn agent.AgentServi } s.backends[agentID] = []*backend{addedBackend} s.agentIDs = append(s.agentIDs, agentID) + s.appendUntaintedSafe(agentID) return addedBackend } +// TaintBackend will find the matching tunnel and taint it for the configured duration. +func (s *DefaultBackendStorage)TaintBackend(agentID string, err error) { + klog.V(2).InfoS("Tainting connection for agent", "agentID", agentID, "error", err) + if err == io.EOF { + // No point in tainting something which is going away cleanly. + } + s.mu.Lock() + defer s.mu.Unlock() + backends, ok := s.backends[agentID] + if !ok { + klog.V(4).InfoS("Cannot taint agent in backends", "agentID", agentID) + return + } + // GetBackend always returns conn 0 for an agentID so assuming this is the correct conn. + s.removeUntaintedSafe(agentID) + backends[0].taintExpires = time.Now().Add(s.keepaliveTime) + klog.V(5).InfoS("Setting taint expiry time", + "agentID", agentID, + "taintExpires", backends[0].taintExpires) +} + // RemoveBackend removes a backend. func (s *DefaultBackendStorage) RemoveBackend(agentID string, conn agent.AgentService_ConnectServer) { klog.V(2).InfoS("Remove connection for agent", "connection", conn, "agentID", agentID) @@ -164,13 +209,14 @@ func (s *DefaultBackendStorage) RemoveBackend(agentID string, conn agent.AgentSe } if len(s.backends[agentID]) == 0 { delete(s.backends, agentID) - for i := range s.agentIDs { - if s.agentIDs[i] == agentID { + for i, id := range s.agentIDs { + if id == agentID { s.agentIDs[i] = s.agentIDs[len(s.agentIDs)-1] s.agentIDs = s.agentIDs[:len(s.agentIDs)-1] break } } + s.removeUntaintedSafe(agentID) } if !found { klog.V(1).InfoS("Cannot find connection for agent in backends", "connection", conn, "agentID", agentID) @@ -193,15 +239,72 @@ func (e *ErrNotFound) Error() string { } // GetRandomBackend returns a random backend. -func (s *DefaultBackendStorage) GetRandomBackend() (Backend, error) { +func (s *DefaultBackendStorage) GetRandomBackend() (Backend, string, error) { s.mu.RLock() defer s.mu.RUnlock() if len(s.backends) == 0 { - return nil, &ErrNotFound{} + return nil, "", &ErrNotFound{} } + // First try find a connection from the complete list. + // This is how we give tainted backends a chance to untaint. agentID := s.agentIDs[s.random.Intn(len(s.agentIDs))] + now := time.Now() + taintExpires := s.backends[agentID][0].taintExpires + untaintedLen := s.getUntaintedLengthSafe() + klog.V(5).InfoS("Checking backend for taint", + "agentID", agentID, + "taintExpires", taintExpires, + "now", now, + "untaintedLen", untaintedLen, + "BOT", BOT) + if taintExpires.After(now) && untaintedLen > 0 { + // Got a still tainted tunnel and there are untainted tunnels. + // No luck on complete list. Now attempting to get from the untainted list. + agentID = s.getUntaintedAgentIDSafe(s.random.Intn(untaintedLen)) + klog.V(5).InfoS("Found taint", + "newAgentID", agentID) + } else if taintExpires != BOT && taintExpires.Before(now) { + // The taint has expired and the tunnel has not yet been cleaned up. + // Given that add the tunnel back to the untainted list. + // This is a write operation so must occur under a write lock. + s.appendUntaintedSafe(agentID) + klog.V(5).InfoS("Removed taint", + "backend", s.backends[agentID][0]) + } klog.V(4).InfoS("Pick agent as backend", "agentID", agentID) + // always return the first connection to an agent, because the agent // will close later connections if there are multiple. - return s.backends[agentID][0], nil + return s.backends[agentID][0], agentID, nil +} + +func (s *DefaultBackendStorage) getUntaintedLengthSafe() int { + s.untaintedIDsmu.RLock() + defer s.untaintedIDsmu.RUnlock() + return len(s.untaintedIDs) +} + +func (s *DefaultBackendStorage) getUntaintedAgentIDSafe(index int) string { + s.untaintedIDsmu.RLock() + defer s.untaintedIDsmu.RUnlock() + return s.untaintedIDs[index] } + +func (s *DefaultBackendStorage) appendUntaintedSafe(agentID string) { + s.untaintedIDsmu.Lock() + defer s.untaintedIDsmu.Unlock() + s.untaintedIDs = append(s.untaintedIDs, agentID) + s.backends[agentID][0].taintExpires = BOT +} + +func (s *DefaultBackendStorage) removeUntaintedSafe(agentID string) { + s.untaintedIDsmu.Lock() + defer s.untaintedIDsmu.Unlock() + for i, id := range s.untaintedIDs { + if id == agentID { + s.untaintedIDs[i] = s.untaintedIDs[len(s.untaintedIDs)-1] + s.untaintedIDs = s.untaintedIDs[:len(s.untaintedIDs)-1] + break + } + } +} \ No newline at end of file diff --git a/pkg/server/backend_manager_test.go b/pkg/server/backend_manager_test.go index 839ffa695..00b1d54cc 100644 --- a/pkg/server/backend_manager_test.go +++ b/pkg/server/backend_manager_test.go @@ -19,6 +19,7 @@ package server import ( "reflect" "testing" + "time" "sigs.k8s.io/apiserver-network-proxy/proto/agent" ) @@ -34,7 +35,7 @@ func TestAddRemoveBackends(t *testing.T) { conn22 := new(fakeAgentService_ConnectServer) conn3 := new(fakeAgentService_ConnectServer) - p := NewDefaultBackendManager() + p := NewDefaultBackendManager(time.Hour) p.AddBackend("agent1", conn1) p.RemoveBackend("agent1", conn1) @@ -47,7 +48,7 @@ func TestAddRemoveBackends(t *testing.T) { t.Errorf("expected %v, got %v", e, a) } - p = NewDefaultBackendManager() + p = NewDefaultBackendManager(time.Hour) p.AddBackend("agent1", conn1) p.AddBackend("agent1", conn12) // Adding the same connection again should be a no-op. @@ -61,8 +62,8 @@ func TestAddRemoveBackends(t *testing.T) { // This is invalid. agent1 doesn't have conn3. This should be a no-op. p.RemoveBackend("agent1", conn3) expectedBackends = map[string][]*backend{ - "agent1": []*backend{newBackend(conn12)}, - "agent3": []*backend{newBackend(conn3)}, + "agent1": []*backend{newBackend(conn12, "agent1")}, + "agent3": []*backend{newBackend(conn3, "agent3")}, } expectedAgentIDs = []string{"agent1", "agent3"} if e, a := expectedBackends, p.backends; !reflect.DeepEqual(e, a) { diff --git a/pkg/server/server.go b/pkg/server/server.go index 5a5394cf8..c42f37917 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -37,6 +37,15 @@ import ( "sigs.k8s.io/apiserver-network-proxy/proto/header" ) +type status string + +const( + Startup status = "Startup" + Running = "Running" + Closing = "Closing" + Unknown = "Unknown" +) + // ProxyClientConnection... type ProxyClientConnection struct { Mode string @@ -141,6 +150,43 @@ var _ agent.AgentServiceServer = &ProxyServer{} var _ client.ProxyServiceServer = &ProxyServer{} +type frontendStream struct { + client.ProxyService_ProxyServer + // agentID indicates the backend that the frontend has been connected to. + agentID string + status status + // mu protects agentID and status, no point in RWLock as there should be only 1 reader + mu sync.Mutex +} + +func (fs *frontendStream) getAgentID() (string, error) { + fs.mu.Lock() + defer fs.mu.Unlock() + if fs.agentID == "" { + return "", &ErrNotFound{} + } + return fs.agentID, nil +} + +func (fs *frontendStream) setAgentID(agentID string) { + fs.mu.Lock() + defer fs.mu.Unlock() + fs.agentID = agentID +} + +func (fs *frontendStream) getStatus() status { + fs.mu.Lock() + defer fs.mu.Unlock() + return fs.status +} + +func (fs *frontendStream) setStatus(status status) { + fs.mu.Lock() + defer fs.mu.Unlock() + fs.status = status +} + + func (s *ProxyServer) addFrontend(agentID string, connID int64, p *ProxyClientConnection) { klog.V(2).InfoS("Register frontend for agent", "frontend", p, "agentID", agentID, "connectionID", connID) s.fmu.Lock() @@ -202,8 +248,8 @@ func (s *ProxyServer) getFrontendsForBackendConn(agentID string, backend Backend } // NewProxyServer creates a new ProxyServer instance -func NewProxyServer(serverID string, serverCount int, agentAuthenticationOptions *AgentTokenAuthenticationOptions) *ProxyServer { - bm := NewDefaultBackendManager() +func NewProxyServer(serverID string, serverCount int, agentAuthenticationOptions *AgentTokenAuthenticationOptions, keepaliveTime time.Duration) *ProxyServer { + bm := NewDefaultBackendManager(keepaliveTime) return &ProxyServer{ frontends: make(map[string](map[int64]*ProxyClientConnection)), PendingDial: NewPendingDialManager(), @@ -227,7 +273,8 @@ func (s *ProxyServer) Proxy(stream client.ProxyService_ProxyServer) error { recvCh := make(chan *client.Packet, 10) stopCh := make(chan error) - go s.serveRecvFrontend(stream, recvCh) + frontStream := frontendStream{ProxyService_ProxyServer: stream, status: Startup} + go s.serveRecvFrontend(&frontStream, recvCh) defer func() { close(recvCh) @@ -238,11 +285,24 @@ func (s *ProxyServer) Proxy(stream client.ProxyService_ProxyServer) error { for { in, err := stream.Recv() if err == io.EOF { + klog.V(5).Infoln("Received EOF from stream") close(stopCh) return } if err != nil { - klog.ErrorS(err, "Stream read from frontend failure") + if agentID, lookupErr := frontStream.getAgentID(); lookupErr == nil { + if frontStream.getStatus() != Closing { + klog.ErrorS(err, "Stream read from frontend failure", + "agentID", agentID) + s.BackendManager.TaintBackend(agentID, err) + } + } else { + if frontStream.getStatus() != Closing { + klog.ErrorS(err, "Stream read from frontend failure", + "frontStream", frontStream) + } + klog.ErrorS(lookupErr, "Failed to lookup agentID on stream", "frontStream", frontStream) + } close(stopCh) return } @@ -254,7 +314,7 @@ func (s *ProxyServer) Proxy(stream client.ProxyService_ProxyServer) error { return <-stopCh } -func (s *ProxyServer) serveRecvFrontend(stream client.ProxyService_ProxyServer, recvCh <-chan *client.Packet) { +func (s *ProxyServer) serveRecvFrontend(stream *frontendStream, recvCh <-chan *client.Packet) { klog.V(4).Infoln("start serving frontend stream") var firstConnID int64 @@ -271,11 +331,16 @@ func (s *ProxyServer) serveRecvFrontend(stream client.ProxyService_ProxyServer, // the address, then we can send the Dial_REQ to the // same agent. That way we save the agent from creating // a new connection to the address. - backend, err = s.BackendManager.Backend(context.TODO()) + var agentID string + backend, agentID, err = s.BackendManager.Backend(context.TODO()) if err != nil { klog.ErrorS(err, "Failed to get a backend") continue } + stream.setAgentID(agentID) + klog.V(5).InfoS("Set agentID on frontend stream", + "agentID", agentID, + "frontStream", stream) s.PendingDial.Add( pkt.GetDialRequest().Random, &ProxyClientConnection{ @@ -288,9 +353,12 @@ func (s *ProxyServer) serveRecvFrontend(stream client.ProxyService_ProxyServer, if err := backend.Send(pkt); err != nil { klog.ErrorS(err, "DIAL_REQ to Backend failed") } + stream.setStatus(Running) klog.V(5).Infoln("DIAL_REQ sent to backend") // got this. but backend didn't receive anything. case client.PacketType_CLOSE_REQ: + klog.V(5).InfoS("Received CLOSE_REQ") + stream.setStatus(Closing) connID := pkt.GetCloseRequest().ConnectID klog.V(5).InfoS("Received CLOSE_REQ", "connectionID", connID) if backend == nil { @@ -304,6 +372,7 @@ func (s *ProxyServer) serveRecvFrontend(stream client.ProxyService_ProxyServer, klog.V(5).Infoln("CLOSE_REQ sent to backend") case client.PacketType_DATA: + klog.V(5).InfoS("Received DATA") connID := pkt.GetData().ConnectID data := pkt.GetData().Data klog.V(5).InfoS("Received data from connection", "bytes", len(data), "connectionID", connID) @@ -325,6 +394,7 @@ func (s *ProxyServer) serveRecvFrontend(stream client.ProxyService_ProxyServer, klog.V(5).Infoln("DATA sent to Backend") default: + stream.setStatus(Unknown) klog.V(5).InfoS("Ignore packet coming from frontend", "type", pkt.Type) } } @@ -380,7 +450,7 @@ func (s *ProxyServer) validateAuthToken(token string) error { } r, err := s.AgentAuthenticationOptions.KubernetesClient.AuthenticationV1().TokenReviews().Create(trReq) if err != nil { - return fmt.Errorf("Failed to authenticate request. err:%v", err) + return fmt.Errorf("failed to authenticate request. err:%v", err) } if r.Status.Error != "" { @@ -417,12 +487,12 @@ func (s *ProxyServer) validateAuthToken(token string) error { func (s *ProxyServer) authenticateAgentViaToken(ctx context.Context) error { md, ok := metadata.FromIncomingContext(ctx) if !ok { - return fmt.Errorf("Failed to retrieve metadata from context") + return fmt.Errorf("failed to retrieve metadata from context") } authContext := md.Get(header.AuthenticationTokenContextKey) if len(authContext) == 0 { - return fmt.Errorf("Authentication context was not found in metadata") + return fmt.Errorf("authentication context was not found in metadata") } if len(authContext) > 1 { @@ -434,7 +504,7 @@ func (s *ProxyServer) authenticateAgentViaToken(ctx context.Context) error { } if err := s.validateAuthToken(strings.TrimPrefix(authContext[0], header.AuthenticationTokenContextSchemePrefix)); err != nil { - return fmt.Errorf("Failed to validate authentication token, err:%v", err) + return fmt.Errorf("failed to validate authentication token, err:%v", err) } klog.V(2).Infoln("Client successfully authenticated via token") @@ -476,11 +546,12 @@ func (s *ProxyServer) Connect(stream agent.AgentService_ConnectServer) error { for { in, err := stream.Recv() if err == io.EOF { + klog.ErrorS(err, "stream closed failure") close(stopCh) return } if err != nil { - klog.ErrorS(err, "stream read failure") + klog.ErrorS(err, "stream read failure", "error type", fmt.Sprintf("%T", err)) close(stopCh) return } diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 379b4dc1a..8d1df2628 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -22,6 +22,7 @@ import ( "io" "reflect" "testing" + "time" "github.com/golang/mock/gomock" "google.golang.org/grpc/metadata" @@ -164,7 +165,7 @@ func TestAgentTokenAuthenticationErrorsToken(t *testing.T) { KubernetesClient: kcs, AgentNamespace: tc.wantNamespace, AgentServiceAccount: tc.wantServiceAccount, - }) + }, time.Hour) err := p.Connect(conn) if tc.wantError { @@ -187,7 +188,7 @@ func TestAddRemoveFrontends(t *testing.T) { agent2ConnID2 := new(ProxyClientConnection) agent3ConnID1 := new(ProxyClientConnection) - p := NewProxyServer("", 1, nil) + p := NewProxyServer("", 1, nil, time.Hour) p.addFrontend("agent1", int64(1), agent1ConnID1) p.removeFrontend("agent1", int64(1)) expectedFrontends := make(map[string]map[int64]*ProxyClientConnection) @@ -195,7 +196,7 @@ func TestAddRemoveFrontends(t *testing.T) { t.Errorf("expected %v, got %v", e, a) } - p = NewProxyServer("", 1, nil) + p = NewProxyServer("", 1, nil, time.Hour) p.addFrontend("agent1", int64(1), agent1ConnID1) p.addFrontend("agent1", int64(2), agent1ConnID2) p.addFrontend("agent2", int64(1), agent2ConnID1) diff --git a/pkg/server/tunnel.go b/pkg/server/tunnel.go index bd1be77c0..8e7bd4a48 100644 --- a/pkg/server/tunnel.go +++ b/pkg/server/tunnel.go @@ -69,7 +69,7 @@ func (t *Tunnel) ServeHTTP(w http.ResponseWriter, r *http.Request) { }, } klog.V(4).InfoS("Set pending", "random", random, "value", w) - backend, err := t.Server.BackendManager.Backend(context.TODO()) + backend, agentID, err := t.Server.BackendManager.Backend(context.TODO()) if err != nil { http.Error(w, fmt.Sprintf("currently no tunnels available: %v", err), http.StatusInternalServerError) return @@ -79,6 +79,7 @@ func (t *Tunnel) ServeHTTP(w http.ResponseWriter, r *http.Request) { Mode: "http-connect", HTTP: conn, connected: connected, + agentID: agentID, start: time.Now(), backend: backend, } @@ -108,7 +109,12 @@ func (t *Tunnel) ServeHTTP(w http.ResponseWriter, r *http.Request) { pkt := make([]byte, 1<<12) connID := connection.connectID - agentID := connection.agentID + newAgentID := connection.agentID + if agentID != newAgentID { + klog.V(1).InfoS("agent ID mismatch setting up tunnel", + "original agentID", agentID, + "new agentID", newAgentID) + } var acc int for { diff --git a/proto/agent/agent.pb.go b/proto/agent/agent.pb.go index 20ca92280..5fd14f3c3 100644 --- a/proto/agent/agent.pb.go +++ b/proto/agent/agent.pb.go @@ -16,17 +16,16 @@ limitations under the License. // Code generated by protoc-gen-go. DO NOT EDIT. // source: proto/agent/agent.proto -package agent +package agent // import "sigs.k8s.io/apiserver-network-proxy/proto/agent" + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" +import client "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" import ( - context "context" - fmt "fmt" - proto "github.com/golang/protobuf/proto" + context "golang.org/x/net/context" grpc "google.golang.org/grpc" - codes "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" - math "math" - client "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client" ) // Reference imports to suppress errors if they are not otherwise used. @@ -38,23 +37,7 @@ var _ = math.Inf // is compatible with the proto package it is being compiled against. // A compilation error at this line likely means your copy of the // proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package - -func init() { proto.RegisterFile("proto/agent/agent.proto", fileDescriptor_656b6c96a18ce683) } - -var fileDescriptor_656b6c96a18ce683 = []byte{ - // 155 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x2f, 0x28, 0xca, 0x2f, - 0xc9, 0xd7, 0x4f, 0x4c, 0x4f, 0xcd, 0x2b, 0x81, 0x90, 0x7a, 0x60, 0x11, 0x29, 0xdd, 0xec, 0xfc, - 0xbc, 0xbc, 0xd4, 0xe4, 0x92, 0xcc, 0xb2, 0xcc, 0x92, 0x4a, 0xdd, 0xe4, 0x9c, 0x4c, 0x90, 0x02, - 0x88, 0x62, 0x28, 0x07, 0x42, 0x41, 0x94, 0x1b, 0x19, 0x72, 0xf1, 0x38, 0x82, 0x74, 0x07, 0xa7, - 0x16, 0x95, 0x65, 0x26, 0xa7, 0x0a, 0x29, 0x72, 0xb1, 0x3b, 0x43, 0x0c, 0x10, 0x62, 0xd7, 0x0b, - 0x48, 0x4c, 0xce, 0x4e, 0x2d, 0x91, 0x82, 0x31, 0x94, 0x18, 0x34, 0x18, 0x0d, 0x18, 0x9d, 0x0c, - 0xa3, 0xf4, 0x8b, 0x33, 0xd3, 0x8b, 0xf5, 0xb2, 0x2d, 0x8a, 0xf5, 0x32, 0xf3, 0xf5, 0x13, 0x0b, - 0x32, 0x8b, 0x53, 0x8b, 0xca, 0x52, 0x8b, 0x74, 0xf3, 0x52, 0x4b, 0xca, 0xf3, 0x8b, 0xb2, 0x75, - 0x0b, 0x8a, 0xf2, 0x2b, 0x2a, 0xf5, 0x91, 0x1c, 0x98, 0xc4, 0x06, 0xe6, 0x18, 0x03, 0x02, 0x00, - 0x00, 0xff, 0xff, 0x85, 0x25, 0x82, 0x73, 0xb6, 0x00, 0x00, 0x00, -} +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package // Reference imports to suppress errors if they are not otherwise used. var _ context.Context @@ -117,14 +100,6 @@ type AgentServiceServer interface { Connect(AgentService_ConnectServer) error } -// UnimplementedAgentServiceServer can be embedded to have forward compatible implementations. -type UnimplementedAgentServiceServer struct { -} - -func (*UnimplementedAgentServiceServer) Connect(srv AgentService_ConnectServer) error { - return status.Errorf(codes.Unimplemented, "method Connect not implemented") -} - func RegisterAgentServiceServer(s *grpc.Server, srv AgentServiceServer) { s.RegisterService(&_AgentService_serviceDesc, srv) } @@ -169,3 +144,19 @@ var _AgentService_serviceDesc = grpc.ServiceDesc{ }, Metadata: "proto/agent/agent.proto", } + +func init() { proto.RegisterFile("proto/agent/agent.proto", fileDescriptor_agent_3136cbbd0a3147a0) } + +var fileDescriptor_agent_3136cbbd0a3147a0 = []byte{ + // 155 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x2f, 0x28, 0xca, 0x2f, + 0xc9, 0xd7, 0x4f, 0x4c, 0x4f, 0xcd, 0x2b, 0x81, 0x90, 0x7a, 0x60, 0x11, 0x29, 0xdd, 0xec, 0xfc, + 0xbc, 0xbc, 0xd4, 0xe4, 0x92, 0xcc, 0xb2, 0xcc, 0x92, 0x4a, 0xdd, 0xe4, 0x9c, 0x4c, 0x90, 0x02, + 0x88, 0x62, 0x28, 0x07, 0x42, 0x41, 0x94, 0x1b, 0x19, 0x72, 0xf1, 0x38, 0x82, 0x74, 0x07, 0xa7, + 0x16, 0x95, 0x65, 0x26, 0xa7, 0x0a, 0x29, 0x72, 0xb1, 0x3b, 0x43, 0x0c, 0x10, 0x62, 0xd7, 0x0b, + 0x48, 0x4c, 0xce, 0x4e, 0x2d, 0x91, 0x82, 0x31, 0x94, 0x18, 0x34, 0x18, 0x0d, 0x18, 0x9d, 0x0c, + 0xa3, 0xf4, 0x8b, 0x33, 0xd3, 0x8b, 0xf5, 0xb2, 0x2d, 0x8a, 0xf5, 0x32, 0xf3, 0xf5, 0x13, 0x0b, + 0x32, 0x8b, 0x53, 0x8b, 0xca, 0x52, 0x8b, 0x74, 0xf3, 0x52, 0x4b, 0xca, 0xf3, 0x8b, 0xb2, 0x75, + 0x0b, 0x8a, 0xf2, 0x2b, 0x2a, 0xf5, 0x91, 0x1c, 0x98, 0xc4, 0x06, 0xe6, 0x18, 0x03, 0x02, 0x00, + 0x00, 0xff, 0xff, 0x85, 0x25, 0x82, 0x73, 0xb6, 0x00, 0x00, 0x00, +} diff --git a/tests/concurrent_client_request_test.go b/tests/concurrent_client_request_test.go index cfc2fabfb..99300a5d5 100644 --- a/tests/concurrent_client_request_test.go +++ b/tests/concurrent_client_request_test.go @@ -60,6 +60,10 @@ type singleTimeManager struct { used map[string]struct{} } +func (s *singleTimeManager) TaintBackend(agentID string, err error) { + panic("implement me") +} + func (s *singleTimeManager) AddBackend(agentID string, conn agent.AgentService_ConnectServer) server.Backend { s.mu.Lock() defer s.mu.Unlock() @@ -80,16 +84,16 @@ func (s *singleTimeManager) RemoveBackend(agentID string, conn agent.AgentServic delete(s.backends, agentID) } -func (s *singleTimeManager) Backend(_ context.Context) (server.Backend, error) { +func (s *singleTimeManager) Backend(_ context.Context) (server.Backend, string, error) { s.mu.Lock() defer s.mu.Unlock() for k, v := range s.backends { if _, ok := s.used[k]; !ok { s.used[k] = struct{}{} - return v, nil + return v, "", nil } } - return nil, fmt.Errorf("cannot find backend to a new agent") + return nil, "", fmt.Errorf("cannot find backend to a new agent") } func (s *singleTimeManager) GetBackend(agentID string) server.Backend { @@ -118,7 +122,7 @@ func TestConcurrentClientRequest(t *testing.T) { t.Fatal(err) } defer cleanup() - ps.BackendManager = newSingleTimeGetter(server.NewDefaultBackendManager()) + ps.BackendManager = newSingleTimeGetter(server.NewDefaultBackendManager(time.Hour)) stopCh := make(chan struct{}) defer close(stopCh) diff --git a/tests/proxy_test.go b/tests/proxy_test.go index 9017c0d44..27708f33b 100644 --- a/tests/proxy_test.go +++ b/tests/proxy_test.go @@ -274,7 +274,7 @@ func runGRPCProxyServerWithServerCount(serverCount int) (proxy, *server.ProxySer var err error var lis, lis2 net.Listener - server := server.NewProxyServer(uuid.New().String(), serverCount, &server.AgentTokenAuthenticationOptions{}) + server := server.NewProxyServer(uuid.New().String(), serverCount, &server.AgentTokenAuthenticationOptions{}, time.Hour) grpcServer := grpc.NewServer() agentServer := grpc.NewServer() cleanup := func() { @@ -312,7 +312,7 @@ func runGRPCProxyServerWithServerCount(serverCount int) (proxy, *server.ProxySer func runHTTPConnProxyServer() (proxy, func(), error) { var proxy proxy - s := server.NewProxyServer(uuid.New().String(), 0, &server.AgentTokenAuthenticationOptions{}) + s := server.NewProxyServer(uuid.New().String(), 0, &server.AgentTokenAuthenticationOptions{}, time.Hour) agentServer := grpc.NewServer() agentproto.RegisterAgentServiceServer(agentServer, s)