forked from google/distbench
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprotocol_driver.h
146 lines (120 loc) · 5.04 KB
/
protocol_driver.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef DISTBENCH_PROTOCOL_DRIVER_H_
#define DISTBENCH_PROTOCOL_DRIVER_H_
#include <string_view>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/synchronization/notification.h"
#include "distbench.pb.h"
#include "grpc_wrapper.h"
namespace distbench {
struct ClientRpcState {
GenericRequest request;
GenericResponse response;
absl::Time prior_start_time = absl::InfinitePast();
absl::Time start_time = absl::InfinitePast();
absl::Time end_time;
bool success;
};
struct ServerRpcState {
const GenericRequest* request;
GenericResponse response;
bool have_dedicated_thread = false;
void SetSendResponseFunction(
std::function<void(void)> send_response_function);
void SendResponseIfSet() const;
void SetFreeStateFunction(std::function<void(void)> free_state_function);
void FreeStateIfSet() const;
private:
std::function<void(void)> send_response_function_;
std::function<void(void)> free_state_function_;
};
struct TransportStat {
std::string name;
int64_t value;
};
using RpcId = int64_t;
class SimpleClock {
public:
virtual ~SimpleClock() = default;
virtual absl::Time Now() = 0;
virtual bool MutexLockWhenWithDeadline(absl::Mutex* mu,
const absl::Condition& condition,
absl::Time deadline)
ABSL_EXCLUSIVE_LOCK_FUNCTION(mu) = 0;
};
class ProtocolDriverClient {
public:
virtual ~ProtocolDriverClient() {}
virtual absl::Status Initialize(const ProtocolDriverOptions& pd_opts) {
return absl::OkStatus();
}
// Client interface =========================================================
virtual void SetNumPeers(int num_peers) = 0;
// Allocate local resources that are needed to establish a connection
// E.g. an unconnected RoCE QueuePair. Returns opaque data. If no local
// resources are needed, this is a NOP.
virtual absl::StatusOr<std::string> Preconnect();
// Actually establish a conection, given the opaque data from the
// the responder. E.g. connect the local and remote RoCE queue pairs.
virtual absl::Status HandleConnect(std::string remote_connection_info,
int peer) = 0;
virtual void InitiateRpc(int peer_index, ClientRpcState* state,
std::function<void(void)> done_callback) = 0;
virtual void ChurnConnection(int peer) = 0;
virtual void ShutdownClient() = 0;
// Misc interface ===========================================================
virtual std::vector<TransportStat> GetTransportStats() = 0;
};
class ProtocolDriverServer {
public:
virtual ~ProtocolDriverServer() {}
virtual absl::Status Initialize(const ProtocolDriverOptions& pd_opts,
int* port) {
return absl::OkStatus();
}
// Server interface =========================================================
virtual void SetHandler(
std::function<std::function<void()>(ServerRpcState* state)> handler) = 0;
// Return the address of a running server that a client can connect to, or
// actually establish a single conection, given the opaque data from the
// initiator. E.g. allocate an unconnected RoCE queue pair, and connect it
// to the remote queue pair, and return the info about the newly allocated
// queue pair so that the initiator can connect the queue pairs on its end.
virtual absl::StatusOr<std::string> HandlePreConnect(
std::string_view remote_connection_info, int peer) = 0;
virtual void ShutdownServer() = 0;
// Handle the remote side responding with an RPC error by cleaning up
// the local resources associated with the opaque data.
virtual void HandleConnectFailure(std::string_view local_connection_info);
// Misc interface ===========================================================
virtual std::vector<TransportStat> GetTransportStats() = 0;
};
class ProtocolDriver : public ProtocolDriverClient,
public ProtocolDriverServer {
public:
virtual ~ProtocolDriver() {}
virtual absl::Status Initialize(const ProtocolDriverOptions& pd_opts,
int* port) = 0;
virtual std::vector<TransportStat> GetTransportStats() = 0;
// Misc interface ===========================================================
virtual SimpleClock& GetClock();
private:
// Hide the Initialize functions of the base classes:
using ProtocolDriverClient::Initialize;
using ProtocolDriverServer::Initialize;
};
} // namespace distbench
#endif // DISTBENCH_PROTOCOL_DRIVER_H_