15
15
#include " common/common/utility.h"
16
16
17
17
#include " extensions/filters/network/mongo_proxy/codec_impl.h"
18
+ #include " extensions/filters/network/well_known_names.h"
19
+
20
+ #include " absl/strings/str_split.h"
18
21
19
22
namespace Envoy {
20
23
namespace Extensions {
21
24
namespace NetworkFilters {
22
25
namespace MongoProxy {
23
26
27
+ class DynamicMetadataKeys {
28
+ public:
29
+ const std::string MessagesField{" messages" };
30
+ const std::string OperationField{" operation" };
31
+ const std::string OperationInsert{" OP_INSERT" };
32
+ const std::string OperationQuery{" OP_QUERY" };
33
+ const std::string ResourceField{" resource" };
34
+ };
35
+
36
+ typedef ConstSingleton<DynamicMetadataKeys> DynamicMetadataKeysSingleton;
37
+
24
38
AccessLog::AccessLog (const std::string& file_name, Envoy::AccessLog::AccessLogManager& log_manager,
25
39
TimeSource& time_source)
26
40
: time_source_(time_source) {
@@ -44,10 +58,12 @@ ProxyFilter::ProxyFilter(const std::string& stat_prefix, Stats::Scope& scope,
44
58
Runtime::Loader& runtime, AccessLogSharedPtr access_log,
45
59
const FaultConfigSharedPtr& fault_config,
46
60
const Network::DrainDecision& drain_decision,
47
- Runtime::RandomGenerator& generator, Event::TimeSystem& time_system)
61
+ Runtime::RandomGenerator& generator, Event::TimeSystem& time_system,
62
+ bool emit_dynamic_metadata)
48
63
: stat_prefix_(stat_prefix), scope_(scope), stats_(generateStats(stat_prefix, scope)),
49
64
runtime_ (runtime), drain_decision_(drain_decision), generator_(generator),
50
- access_log_(access_log), fault_config_(fault_config), time_system_(time_system) {
65
+ access_log_(access_log), fault_config_(fault_config), time_system_(time_system),
66
+ emit_dynamic_metadata_(emit_dynamic_metadata) {
51
67
if (!runtime_.snapshot ().featureEnabled (MongoRuntimeConfig::get ().ConnectionLoggingEnabled ,
52
68
100 )) {
53
69
// If we are not logging at the connection level, just release the shared pointer so that we
@@ -58,6 +74,23 @@ ProxyFilter::ProxyFilter(const std::string& stat_prefix, Stats::Scope& scope,
58
74
59
75
ProxyFilter::~ProxyFilter () { ASSERT (!delay_timer_); }
60
76
77
+ void ProxyFilter::setDynamicMetadata (std::string operation, std::string resource) {
78
+ ProtobufWkt::Struct metadata (
79
+ (*read_callbacks_->connection ()
80
+ .streamInfo ()
81
+ .dynamicMetadata ()
82
+ .mutable_filter_metadata ())[NetworkFilterNames::get ().MongoProxy ]);
83
+ auto & fields = *metadata.mutable_fields ();
84
+ auto & list = *fields[DynamicMetadataKeysSingleton::get ().MessagesField ].mutable_list_value ();
85
+ auto & message = *list.add_values ()->mutable_struct_value ()->mutable_fields ();
86
+
87
+ message[DynamicMetadataKeysSingleton::get ().OperationField ].set_string_value (operation);
88
+ message[DynamicMetadataKeysSingleton::get ().ResourceField ].set_string_value (resource);
89
+
90
+ read_callbacks_->connection ().streamInfo ().setDynamicMetadata (
91
+ NetworkFilterNames::get ().MongoProxy , metadata);
92
+ }
93
+
61
94
void ProxyFilter::decodeGetMore (GetMoreMessagePtr&& message) {
62
95
tryInjectDelay ();
63
96
@@ -69,6 +102,11 @@ void ProxyFilter::decodeGetMore(GetMoreMessagePtr&& message) {
69
102
void ProxyFilter::decodeInsert (InsertMessagePtr&& message) {
70
103
tryInjectDelay ();
71
104
105
+ if (emit_dynamic_metadata_) {
106
+ setDynamicMetadata (DynamicMetadataKeysSingleton::get ().OperationInsert ,
107
+ message->fullCollectionName ());
108
+ }
109
+
72
110
stats_.op_insert_ .inc ();
73
111
logMessage (*message, true );
74
112
ENVOY_LOG (debug, " decoded INSERT: {}" , message->toString (true ));
@@ -85,6 +123,11 @@ void ProxyFilter::decodeKillCursors(KillCursorsMessagePtr&& message) {
85
123
void ProxyFilter::decodeQuery (QueryMessagePtr&& message) {
86
124
tryInjectDelay ();
87
125
126
+ if (emit_dynamic_metadata_) {
127
+ setDynamicMetadata (DynamicMetadataKeysSingleton::get ().OperationQuery ,
128
+ message->fullCollectionName ());
129
+ }
130
+
88
131
stats_.op_query_ .inc ();
89
132
logMessage (*message, true );
90
133
ENVOY_LOG (debug, " decoded QUERY: {}" , message->toString (true ));
@@ -255,6 +298,15 @@ void ProxyFilter::doDecode(Buffer::Instance& buffer) {
255
298
return ;
256
299
}
257
300
301
+ // Clear dynamic metadata
302
+ if (emit_dynamic_metadata_) {
303
+ auto & metadata = (*read_callbacks_->connection ()
304
+ .streamInfo ()
305
+ .dynamicMetadata ()
306
+ .mutable_filter_metadata ())[NetworkFilterNames::get ().MongoProxy ];
307
+ metadata.mutable_fields ()->clear ();
308
+ }
309
+
258
310
if (!decoder_) {
259
311
decoder_ = createDecoder (*this );
260
312
}
0 commit comments