-
Notifications
You must be signed in to change notification settings - Fork 42
/
Copy pathutils.rs
295 lines (245 loc) · 8.76 KB
/
utils.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
use std::{
net::Ipv4Addr,
time::{SystemTime, UNIX_EPOCH},
};
use alloy::{
primitives::U256,
rpc::types::beacon::{BlsPublicKey, BlsSignature},
};
use axum::http::HeaderValue;
use blst::min_pk::{PublicKey, Signature};
use rand::{distributions::Alphanumeric, Rng};
use reqwest::header::HeaderMap;
use serde::{de::DeserializeOwned, Serialize};
use serde_json::Value;
use tracing::Level;
use tracing_appender::{non_blocking::WorkerGuard, rolling::Rotation};
use tracing_subscriber::{fmt::Layer, prelude::*, EnvFilter};
use crate::{
config::{load_optional_env_var, LogsSettings, LOGS_DIR_DEFAULT, PBS_MODULE_NAME},
pbs::HEADER_VERSION_VALUE,
types::Chain,
};
const MILLIS_PER_SECOND: u64 = 1_000;
pub fn timestamp_of_slot_start_sec(slot: u64, chain: Chain) -> u64 {
chain.genesis_time_sec() + slot * chain.slot_time_sec()
}
pub fn timestamp_of_slot_start_millis(slot: u64, chain: Chain) -> u64 {
timestamp_of_slot_start_sec(slot, chain) * MILLIS_PER_SECOND
}
pub fn ms_into_slot(slot: u64, chain: Chain) -> u64 {
let slot_start_ms = timestamp_of_slot_start_millis(slot, chain);
utcnow_ms().saturating_sub(slot_start_ms)
}
/// Seconds
pub fn utcnow_sec() -> u64 {
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()
}
/// Millis
pub fn utcnow_ms() -> u64 {
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64
}
/// Micros
pub fn utcnow_us() -> u64 {
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_micros() as u64
}
/// Nanos
pub fn utcnow_ns() -> u64 {
// safe until ~2554
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos() as u64
}
pub const WEI_PER_ETH: u64 = 1_000_000_000_000_000_000;
pub fn eth_to_wei(eth: f64) -> U256 {
U256::from((eth * WEI_PER_ETH as f64).floor())
}
// Serde
/// Test that the encoding and decoding works, returns the decoded struct
pub fn test_encode_decode<T: Serialize + DeserializeOwned>(d: &str) -> T {
let decoded = serde_json::from_str::<T>(d).expect("deserialize");
// re-encode to make sure that different formats are ignored
let encoded = serde_json::to_string(&decoded).unwrap();
let original_v: Value = serde_json::from_str(d).unwrap();
let encoded_v: Value = serde_json::from_str(&encoded).unwrap();
assert_eq!(original_v, encoded_v, "encode mismatch");
decoded
}
pub mod as_str {
use std::{fmt::Display, str::FromStr};
use serde::Deserialize;
pub fn serialize<S, T: Display>(data: T, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.collect_str(&data.to_string())
}
pub fn deserialize<'de, D, T, E>(deserializer: D) -> Result<T, D::Error>
where
D: serde::Deserializer<'de>,
T: FromStr<Err = E>,
E: Display,
{
let s = String::deserialize(deserializer)?;
T::from_str(&s).map_err(serde::de::Error::custom)
}
}
pub mod as_eth_str {
use alloy::primitives::{
utils::{format_ether, parse_ether},
U256,
};
use serde::Deserialize;
use super::eth_to_wei;
pub fn serialize<S>(data: &U256, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let s = format_ether(*data);
serializer.serialize_str(&s)
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<U256, D::Error>
where
D: serde::Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum StringOrF64 {
Str(String),
F64(f64),
}
let value = StringOrF64::deserialize(deserializer)?;
let wei = match value {
StringOrF64::Str(s) => {
parse_ether(&s).map_err(|_| serde::de::Error::custom("invalid eth amount"))?
}
StringOrF64::F64(f) => eth_to_wei(f),
};
Ok(wei)
}
}
pub const fn default_u64<const U: u64>() -> u64 {
U
}
pub const fn default_u16<const U: u16>() -> u16 {
U
}
pub const fn default_bool<const U: bool>() -> bool {
U
}
pub const fn default_host() -> Ipv4Addr {
Ipv4Addr::LOCALHOST
}
pub const fn default_u256() -> U256 {
U256::ZERO
}
// LOGGING
pub fn initialize_tracing_log(module_id: &str) -> eyre::Result<WorkerGuard> {
let settings = LogsSettings::from_env_config()?;
// Use file logs only if setting is set
let use_file_logs = settings.is_some();
let settings = settings.unwrap_or_default();
// Log level for stdout
let stdout_log_level = if let Some(log_level) = load_optional_env_var("RUST_LOG") {
log_level.parse::<Level>().expect("invalid RUST_LOG value")
} else {
settings.log_level.parse::<Level>().expect("invalid log_level value in settings")
};
let stdout_filter = format_crates_filter(Level::INFO.as_str(), stdout_log_level.as_str());
if use_file_logs {
// Log all events to a rolling log file.
let mut builder =
tracing_appender::rolling::Builder::new().filename_prefix(module_id.to_lowercase());
if let Some(value) = settings.max_log_files {
builder = builder.max_log_files(value);
}
let file_appender = builder
.rotation(Rotation::DAILY)
.build(LOGS_DIR_DEFAULT)
.expect("failed building rolling file appender");
let (writer, guard) = tracing_appender::non_blocking(file_appender);
// at least debug for file logs
let file_log_level = stdout_log_level.max(Level::DEBUG);
let file_log_filter = format_crates_filter(Level::INFO.as_str(), file_log_level.as_str());
let stdout_layer =
tracing_subscriber::fmt::layer().with_target(false).with_filter(stdout_filter);
let file_layer = Layer::new()
.json()
.with_current_span(false)
.with_span_list(true)
.with_writer(writer)
.with_filter(file_log_filter);
tracing_subscriber::registry().with(stdout_layer.and_then(file_layer)).init();
Ok(guard)
} else {
let (writer, guard) = tracing_appender::non_blocking(std::io::stdout());
let stdout_layer = tracing_subscriber::fmt::layer()
.with_target(false)
.with_writer(writer)
.with_filter(stdout_filter);
tracing_subscriber::registry().with(stdout_layer).init();
Ok(guard)
}
}
pub fn initialize_pbs_tracing_log() -> eyre::Result<WorkerGuard> {
initialize_tracing_log(PBS_MODULE_NAME)
}
// all commit boost crates
// TODO: this can probably done without unwrap
fn format_crates_filter(default_level: &str, crates_level: &str) -> EnvFilter {
let s = format!(
"{default_level},cb_signer={crates_level},cb_pbs={crates_level},cb_common={crates_level},cb_metrics={crates_level}",
);
s.parse().unwrap()
}
pub fn print_logo() {
println!(
r#" ______ _ __ ____ __
/ ____/___ ____ ___ ____ ___ (_) /_ / __ )____ ____ _____/ /_
/ / / __ \/ __ `__ \/ __ `__ \/ / __/ / __ / __ \/ __ \/ ___/ __/
/ /___/ /_/ / / / / / / / / / / / / /_ / /_/ / /_/ / /_/ (__ ) /_
\____/\____/_/ /_/ /_/_/ /_/ /_/_/\__/ /_____/\____/\____/____/\__/
"#
)
}
// Crypto conversions
pub fn alloy_pubkey_to_blst(pubkey: &BlsPublicKey) -> Result<PublicKey, blst::BLST_ERROR> {
PublicKey::key_validate(&pubkey.0)
}
pub fn alloy_sig_to_blst(signature: &BlsSignature) -> Result<Signature, blst::BLST_ERROR> {
Signature::from_bytes(&signature.0)
}
pub fn blst_pubkey_to_alloy(pubkey: &PublicKey) -> BlsPublicKey {
BlsPublicKey::from_slice(&pubkey.to_bytes())
}
/// Generates a random string
pub fn random_jwt() -> String {
rand::thread_rng().sample_iter(&Alphanumeric).take(32).map(char::from).collect()
}
/// Returns the user agent from the request headers or an empty string if not
/// present
pub fn get_user_agent(req_headers: &HeaderMap) -> String {
req_headers
.get(reqwest::header::USER_AGENT)
.and_then(|ua| ua.to_str().ok().map(|s| s.to_string()))
.unwrap_or_default()
}
/// Adds the commit boost version to the existing user agent
pub fn get_user_agent_with_version(req_headers: &HeaderMap) -> eyre::Result<HeaderValue> {
let ua = get_user_agent(req_headers);
Ok(HeaderValue::from_str(&format!("commit-boost/{HEADER_VERSION_VALUE} {}", ua))?)
}
#[cfg(unix)]
pub async fn wait_for_signal() -> eyre::Result<()> {
use tokio::signal::unix::{signal, SignalKind};
let mut sigint = signal(SignalKind::interrupt())?;
let mut sigterm = signal(SignalKind::terminate())?;
tokio::select! {
_ = sigint.recv() => {}
_ = sigterm.recv() => {}
}
Ok(())
}
#[cfg(windows)]
pub async fn wait_for_signal() -> eyre::Result<()> {
tokio::signal::ctrl_c().await?;
Ok(())
}