-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Issue #1824 Kafka: Support access logging for Kafka requests/responses #1870
Conversation
pkg/proxy/kafka.go
Outdated
} | ||
} | ||
|
||
func (r *kafkaRedirect) getDestinationInfo(dstIPPort string) accesslog.EndpointInfo { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
receiver name r should be consistent with previous receiver name k for kafkaRedirect
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pkg/proxy/kafka.go
Outdated
|
||
// egressDestinationInfo returns the destination EndpointInfo for a flow | ||
// leaving the proxy at egress. | ||
func (r *kafkaRedirect) egressDestinationInfo(ipstr string, info *accesslog.EndpointInfo) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
receiver name r should be consistent with previous receiver name k for kafkaRedirect
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pkg/proxy/kafka.go
Outdated
// - info.Identity | ||
// - info.Labels | ||
// - info.LabelsSHA256 | ||
func (r *kafkaRedirect) fillReservedIdentity(info *accesslog.EndpointInfo, id policy.NumericIdentity) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
receiver name r should be consistent with previous receiver name k for kafkaRedirect
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pkg/proxy/kafka.go
Outdated
} | ||
} | ||
|
||
func (r *kafkaRedirect) getSourceInfo(remoteAddr string, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
receiver name r should be consistent with previous receiver name k for kafkaRedirect
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pkg/proxy/kafka.go
Outdated
// ingress policies are set, the ingress policy cannot determine the source | ||
// endpoint info based on ip address, as the ip address would be that of the | ||
// egress proxy i.e host. | ||
func (r *kafkaRedirect) getInfoFromConsumable(ipstr string, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
receiver name r should be consistent with previous receiver name k for kafkaRedirect
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pkg/proxy/accesslog/log.go
Outdated
@@ -44,6 +43,58 @@ const ( | |||
FieldFilePath = "file-path" | |||
) | |||
|
|||
const ( | |||
FieldKafkaApiKey = "kafkaApiKey" | |||
FieldKafkaApiVersion = "kafkaApiVersion" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const FieldKafkaApiVersion should be FieldKafkaAPIVersion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pkg/proxy/accesslog/log.go
Outdated
@@ -44,6 +43,58 @@ const ( | |||
FieldFilePath = "file-path" | |||
) | |||
|
|||
const ( | |||
FieldKafkaApiKey = "kafkaApiKey" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const FieldKafkaApiKey should be FieldKafkaAPIKey
exported const FieldKafkaApiKey should have comment (or a comment on this block) or be unexported
pkg/kafka/request.go
Outdated
@@ -54,6 +66,69 @@ func (req *RequestMessage) String() string { | |||
req.kind, req.version, len(req.rawMsg), string(b)) | |||
} | |||
|
|||
func (req *RequestMessage) GetTopics() []string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method RequestMessage.GetTopics should have comment or be unexported
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pkg/kafka/request.go
Outdated
return req.version | ||
} | ||
|
||
func (req *RequestMessage) GetCorrelationID() int32 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method RequestMessage.GetCorrelationID should have comment or be unexported
pkg/kafka/error.go
Outdated
package kafka | ||
|
||
var ( | ||
ErrUnknown = -1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported var ErrUnknown should have comment or be unexported
Changes to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some random style comments from me, nothing blocking (and you can ignore them too)
ErrTopicAuthorizationFailed = 29 | ||
ErrGroupAuthorizationFailed = 30 | ||
ErrClusterAuthorizationFailed = 31 | ||
ErrInvalidTimeStamp = 32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not too important. Assuming these are our internal errors (and not mapped to an external set of Error codes) you can use iota
to avoid numbering them (of course, you already numbered them so I'm kinda suggesting you do double work :/)
https://golang.org/ref/spec#Iota
const (
ErrUnknown = -1
ErrNone = iota
ErrOffsetOutOfRange
ErrInvalidMessage
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, I see they're actual Kafka errors, ignore me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a comment to make it more clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pkg/kafka/request.go
Outdated
return int32(binary.BigEndian.Uint32(req.rawMsg[8:12])) | ||
} | ||
|
||
func (req *RequestMessage) getVersion() int16 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be less confusing if named extractVersion or something different than the exported GetVersion function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was responsible for this naming but I agree. extract
or read
is better here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to extractVersion
pkg/proxy/accesslog/log.go
Outdated
FieldFilePath: logPath, | ||
}).Debug("Skipping writing to access log (write buffer nil)") | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could move this whole block out of the switch so it isn't repeated (I realise it was here to begin with, but since you're shuffling things around anyway).
pkg/proxy/accesslog/log.go
Outdated
FieldProtocol: l.HttpRequest.Proto, | ||
FieldHeader: l.HttpRequest.Header, | ||
FieldFilePath: logPath, | ||
}).Debug("Logging HTTP L7 flow record") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you prefer, you can create a logrus object outside the switch with fields common to each switch case, then use that. It would ensure some consistency as we add more cases, but it's not a big deal.
scopedLog := log.WithFields(log.Fields{
FieldType: typ,
FieldVerdict: verdict,
FieldCode: code,
FieldFilePath: logPath,
})
switch L7Type {
case L7TypeHTTP:
scopedLog.WithFields(log.Fields{
case specific fields
}).Debug("blah")
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was a merge rebase fallout., I think... will fix it
pkg/proxy/accesslog/log.go
Outdated
} | ||
switch L7type { | ||
case L7TypeHTTP: | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these {}
for variable scoping? I've never seen it before but it certainly works :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably there to make C programmers feel more at home
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we need some semicolons? :P
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will remove this sorry.. it is the C programmer talking!
pkg/proxy/kafka.go
Outdated
log.Warn("Missing security identity in source endpoint info") | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you didn't want to have so much indentation, you can use a switch here:
switch {
case !r.ingress:
blah
case err == nil && srcIdentity != 0:
bloh
case err == nil:
bleeh
}
pkg/kafka/request.go
Outdated
} | ||
|
||
func (req *RequestMessage) GetCorrelationID() int32 { | ||
return int32(binary.BigEndian.Uint32(req.rawMsg[8:12])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Kafka parser only guarantees that the first 8 bytes have been read into rawMsg
. You need to check len()
of rawMsg
in ReadRequest()
to be at least 12 bytes to avoid out of bounds accesses.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding
if len(req.rawMsg) < 12 { return nil, fmt.Errorf("unable to read full request") }
in ReadRequest
pkg/kafka/request.go
Outdated
@@ -54,6 +66,69 @@ func (req *RequestMessage) String() string { | |||
req.kind, req.version, len(req.rawMsg), string(b)) | |||
} | |||
|
|||
func (req *RequestMessage) GetTopics() []string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
pkg/kafka/request.go
Outdated
@@ -121,11 +192,5 @@ func ReadRequest(reader io.Reader) (*RequestMessage, error) { | |||
req.request, err = proto.ReadOffsetFetchReq(buf) | |||
} | |||
|
|||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are you removing this error check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
put back
pkg/proxy/accesslog/log.go
Outdated
@@ -44,6 +43,58 @@ const ( | |||
FieldFilePath = "file-path" | |||
) | |||
|
|||
const ( | |||
FieldKafkaApiKey = "kafkaApiKey" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Continue in the above const()
section?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to keep it separate for "Kafka log record messages" as opposed the shared Log record fields for L7 messages.
@@ -154,7 +152,6 @@ func (n *EndpointSelector) Matches(lblsToMatch k8sLbls.Labels) bool { | |||
// FIXME: Omit this error or throw it to the caller? | |||
// We are doing the verification in the ParseEndpointSelector but | |||
// don't make sure the user can modify the current labels. | |||
log.WithError(err).WithField(logfields.EndpointLabelSelector, logfields.Repr(n)).Error("unable to match label selector in selector") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove this in a separate PR with proper explanation why it is safe? If we remove the error the FIXME needs to go as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will rebase this, this was a fallout of the circular dependency I was getting... @raybejjani has fixed this, I should be able to undo this part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done..
pkg/proxy/kafka.go
Outdated
} | ||
} | ||
|
||
func (r *kafkaRedirect) getSourceInfo(remoteAddr string, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we share this function with HTTP? It looks identical
pkg/proxy/kafka.go
Outdated
// - info.Identity | ||
// - info.Labels | ||
// - info.LabelsSHA256 | ||
func (r *kafkaRedirect) fillReservedIdentity(info *accesslog.EndpointInfo, id policy.NumericIdentity) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
pkg/proxy/kafka.go
Outdated
@@ -185,14 +373,29 @@ func (k *kafkaRedirect) handleRequest(pair *connectionPair, req *kafka.RequestMe | |||
fieldID: pair.String(), | |||
"source": addr.String(), | |||
}).WithError(err).Error("Unable lookup original destination") | |||
record.Info = fmt.Sprintf("Invalid message format: %s", err) | |||
accesslog.Log(record, accesslog.TypeRequest, accesslog.VerdictError, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
error should be: unable to lookup original destination
pkg/proxy/kafka.go
Outdated
@@ -205,6 +408,11 @@ func (k *kafkaRedirect) handleRequest(pair *connectionPair, req *kafka.RequestMe | |||
return | |||
} | |||
|
|||
// log valid request | |||
record.Timestamp = time.Now().UTC().Format(time.RFC3339Nano) | |||
accesslog.Log(record, accesslog.TypeRequest, accesslog.VerdictForwarded, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is too early, dialing the original destination can still fail here
pkg/proxy/kafka.go
Outdated
|
||
// log valid response | ||
record.Timestamp = time.Now().UTC().Format(time.RFC3339Nano) | ||
accesslog.Log(record, accesslog.TypeResponse, accesslog.VerdictForwarded, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unlike with HTTP, the responses are parsed as well so we can log them there. See handleResponseConnection()
@manalibhutiyani could you resolve the conflicts? |
@michi-covalent : yes, I will address the review comments and rebase the branch. |
63fec2b
to
728f58d
Compare
pkg/proxy/accesslog/log.go
Outdated
@@ -45,6 +45,58 @@ const ( | |||
FieldFilePath = logfields.Path | |||
) | |||
|
|||
const ( | |||
FieldKafkaApiKey = "kafkaApiKey" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const FieldKafkaApiKey should be FieldKafkaAPIKey
exported const FieldKafkaApiKey should have comment (or a comment on this block) or be unexported
pkg/proxy/accesslog/log.go
Outdated
} | ||
|
||
/* | ||
* Log multiple entries for multiple Kafka topics in a single |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a FIXME
tag here as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not really.a FIXME. The GH issue is to point out why we are logging multiple access logs for a single Kafka request
pkg/proxy/accesslog/record.go
Outdated
// Internal | ||
Request *http.Request `json:"-"` | ||
// Internal HTTP request | ||
HttpRequest *http.Request `json:"-"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pkg/proxy/kafka.go
Outdated
@@ -47,6 +52,19 @@ type kafkaRedirect struct { | |||
socket *proxySocket | |||
} | |||
|
|||
/* | |||
id string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this? leftovers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed..
pkg/proxy/kafka.go
Outdated
// leaving the proxy at egress. | ||
func (r *kafkaRedirect) egressDestinationInfo(ipstr string, info *accesslog.EndpointInfo) { | ||
ip := net.ParseIP(ipstr) | ||
if ip != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid indentation write if ip == nil { return }
Manali: @aanm goFmt automatically formats this to indented values for me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@manalibhutiyani if you do if ip == nil { return }
the following part of the code won't have an extra unnecessary indentation.
pkg/proxy/kafka.go
Outdated
c := addressing.DeriveCiliumIPv4(ip) | ||
ep := endpointmanager.LookupIPv4(c.String()) | ||
if ep != nil { | ||
info.ID = uint64(ep.ID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're missing endpoint.RLock()
and endpoint.RUnlock()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was missing from the beginning (before Kafka). Added.
pkg/proxy/kafka.go
Outdated
func (r *kafkaRedirect) getDestinationInfo(dstIPPort string) accesslog.EndpointInfo { | ||
info := accesslog.EndpointInfo{} | ||
ipstr, port, err := net.SplitHostPort(dstIPPort) | ||
if err == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If dstIPPort
returns an error, why continue the execution of this function?
Add debug log to show the error in case we ever it this.
To avoid indentation write if ip == nil { return }
Manali: @aanm Not sure why I cannot comment here, but we need to log even if we fail to Destination Info. This just means the destination info will not be present in access log.
pkg/proxy/kafka.go
Outdated
ipstr, port, err := net.SplitHostPort(dstIPPort) | ||
if err == nil { | ||
p, err := strconv.ParseUint(port, 10, 16) | ||
if err == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
@aanm :answered above
pkg/policy/api/rule.go
Outdated
// KafkaMaxTopicVal is the maximum value of supported API Keys in KafkaAPIKeyMap | ||
// KafkaReverseAPIKeyMap | ||
const ( | ||
KafkaMaxApiKeyVal = 33 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const KafkaMaxApiKeyVal should be KafkaMaxAPIKeyVal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pkg/policy/api/rule.go
Outdated
// KafkaReverseApiKeyMap is the map of all allowed kafka API keys | ||
// with the key values. | ||
// Reference: https://kafka.apache.org/protocol#protocol_api_keys | ||
var KafkaReverseApiKeyMap = map[int16]string{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
var KafkaReverseApiKeyMap should be KafkaReverseAPIKeyMap
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
97cf641
to
c9c8aa2
Compare
pkg/kafka/request.go
Outdated
@@ -99,7 +173,10 @@ func ReadRequest(reader io.Reader) (*RequestMessage, error) { | |||
return nil, err | |||
} | |||
|
|||
req.version = req.getVersion() | |||
if len(req.rawMsg) < 12 { | |||
return nil, fmt.Errorf("unable to read full request") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs a more descriptive error message, something like:
unexpected end of request (length < 12 bytes)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pkg/kafka/request.go
Outdated
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This style change seems unwanted
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed.
pkg/proxy/accesslog/log.go
Outdated
if apiKey < 0 || apiKey > api.KafkaMaxAPIKeyVal { | ||
log.WithFields(log.Fields{ | ||
FieldFilePath: logPath, | ||
}).Error("Invalid Kafka Api Key") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we not logging this? If it was observed on the wire then we should log it in the access log. I understand that this is protecting the map access. What I would do here instead is create a function which checks whether there is a hit in the map and if no hit is found, encode the apiKey as number in string representation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally think that since this has already gone through canAccess
by the time we actually log it, the apiKey is already checked for correct values, so this failure will not be hit Imo, please correct me if I am wrong. Let me re-organise and see how I can change the code here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that it can be hit before canAccess
. Ignore my earlier comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed as per discussion offline.
pkg/proxy/accesslog/log.go
Outdated
} | ||
|
||
/* | ||
* Log multiple entries for multiple Kafka topics in a single |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are in C land here ;-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed the comment style. :)
|
||
// APIKey for Kafka message | ||
// Reference: https://kafka.apache.org/protocol#protocol_api_keys | ||
APIKey int16 | ||
APIKey string | ||
|
||
// CorrelationID is a user-supplied integer value that will be passed | ||
// back with the response | ||
CorrelationID int32 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
pkg/kafka/request.go
Outdated
@@ -54,6 +68,70 @@ func (req *RequestMessage) String() string { | |||
req.kind, req.version, len(req.rawMsg), string(b)) | |||
} | |||
|
|||
// GetTopics returns the Kafka request list of topics | |||
func (req *RequestMessage) GetTopics() []string { | |||
switch val := req.request.(type) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think req.request
can be nil here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a check before dereferencing.
if req.request == nil { return nil }
pkg/proxy/accesslog/log.go
Outdated
func apiKeyToString(apiKey int16) string { | ||
if key, ok := api.KafkaReverseAPIKeyMap[apiKey]; ok { | ||
return key | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if block ends with a return statement, so drop this else and outdent its block (move short variable declaration to its own line if necessary)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pkg/proxy/kafka.go
Outdated
|
||
// egressDestinationInfo returns the destination EndpointInfo for a flow | ||
// leaving the proxy at egress. | ||
func (k *kafkaRedirect) egressDestinationInfo(ipstr string, info *accesslog.EndpointInfo) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function can be removed, EgressDestinationInfo()
in proxy.go
can be made private and can be called directly. The *kafkaRedirect
context is not required
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pkg/proxy/kafka.go
Outdated
} | ||
|
||
// getDestinationInfo returns the destination EndpointInfo. | ||
func (k *kafkaRedirect) getDestinationInfo(dstIPPort string) accesslog.EndpointInfo { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getDestinationInfo needs redirect information from individual proxy redirects
pkg/proxy/kafka.go
Outdated
addr := pair.rx.conn.RemoteAddr() | ||
if addr == nil { | ||
scopedLog.Warn("RemoteAddr() is nil") | ||
record.Info = fmt.Sprint("RemoteAddr() is nil") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can fill record first and then print the warning like this:
scopedLog.Warn(record.Info)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pkg/proxy/kafka.go
Outdated
@@ -174,15 +231,35 @@ func (k *kafkaRedirect) handleRequest(pair *connectionPair, req *kafka.RequestMe | |||
srcIdentity, dstIPPort, err := k.conf.lookupNewDest(addr.String(), k.conf.listenPort) | |||
if err != nil { | |||
log.WithField("source", addr.String()).WithError(err).Error("Unable lookup original destination") | |||
record.Info = fmt.Sprintf("Invalid message format: %s", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Invalid message format
doesn't make sense as error here. Use the Unable to lookup original destination error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pkg/proxy/oxyproxy.go
Outdated
} | ||
// fillReservedIdentity resolves the labels of the specified identity if known | ||
// locally and fills in the info member fields. | ||
func (r *OxyRedirect) fillReservedIdentity(info *accesslog.EndpointInfo, id policy.NumericIdentity) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function can be removed and the proxy.go
version can be called directly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pkg/proxy/oxyproxy.go
Outdated
return info | ||
// egressDestinationInfo returns the destination EndpointInfo for a flow | ||
// leaving the proxy at egress. | ||
func (r *OxyRedirect) egressDestinationInfo(ipstr string, info *accesslog.EndpointInfo) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pkg/proxy/proxy.go
Outdated
@@ -142,6 +151,181 @@ func (p *Proxy) allocatePort() (uint16, error) { | |||
|
|||
var gcOnce sync.Once | |||
|
|||
// LocalEndpointInfo fills the access log with the local endpoint info. | |||
func LocalEndpointInfo(info *accesslog.EndpointInfo, source ProxySource, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All of these functions can be marked private
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LocalEndpointInfo needs redirect info for individual Kafka redirect as well as Oxyproxy,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's still weird to have proxy.go
call an interface which requires proxy.go
functionality itself again.
Looking at LocalEndpointInfo()
, it requires ProxySource
and the ID of the endpoint it is proxying for. The endpoint ID can be retrieved via the source using source.GetID()
Instead of writing into kafka and back into proxy, I'd rather add getSource()
to Redirect interface
which then just has to return the source
func (m *proxySourceMocker) RLock() { m.RLock() } | ||
func (m *proxySourceMocker) RUnlock() { m.RUnlock() } | ||
func (m *proxySourceMocker) RLock() { m.RWMutex.RLock() } | ||
func (m *proxySourceMocker) RUnlock() { m.RWMutex.RUnlock() } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change seems unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per @aanm this was needed for one of the tests failing. ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tgraf it is necessary, otherwise the RLock() was a recursive function. :)
pkg/proxy/kafka.go
Outdated
@@ -159,30 +160,82 @@ func (k *kafkaRedirect) canAccess(req *kafka.RequestMessage, numIdentity policy. | |||
return req.MatchesRule(rules.Kafka) | |||
} | |||
|
|||
func (r *kafkaRedirect) getSource() ProxySource { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
receiver name r should be consistent with previous receiver name k for kafkaRedirect
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pkg/proxy/proxy.go
Outdated
} | ||
|
||
// GetDestinationInfo returns the destination EndpointInfo. | ||
func GetDestinationInfo(dstIPPort string, r Redirect, ingress bool) accesslog.EndpointInfo { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can still be made private
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure.. my concepts on private functions in a golang package are a bit shaky :) Will do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
pkg/proxy/proxy.go
Outdated
|
||
// GetSourceInfo resolves source information | ||
// using source identity and fills the access log. | ||
func GetSourceInfo(RemoteAddr string, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can still be made private
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure. my concepts on private functions in a golang package are a bit shaky :) Will do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
1c13523
to
649e337
Compare
…enforcement. Signed-off by: Manali Bhutiyani manali@covalent.io
…logging code Addresses #1824 Signed-off by: Manali Bhutiyani manali@covalent.io
…ests Addresses #1824 Signed-off by: Manali Bhutiyani manali@covalent.io
Addresses #1824 Signed-off by: Manali Bhutiyani manali@covalent.io
… to access log Addresses #1824 Signed-off by: Manali Bhutiyani manali@covalent.io
…vate Addresses #1824 Signed-off by: Manali Bhutiyani manali@covalent.io
Addresses #1824 Signed-off by: Manali Bhutiyani manali@covalent.io
Addresses #1824 Signed-off by: Manali Bhutiyani manali@covalent.io
649e337
to
7faa7cb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@manalibhutiyani Thanks for the fixups!
@manalibhutiyani The remaining changes can be fixed on a following up PR |
This is the first pass of supporting access.logging for Kafka policy-enforcement.
Addresses #1824
Signed-off by: Manali Bhutiyani manali@covalent.io