Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resolve RUSTSEC-2023-0086 #2004

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,125 changes: 467 additions & 658 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 5 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ restate-worker = { path = "crates/worker" }
ahash = "0.8.5"
anyhow = "1.0.68"
arc-swap = "1.6"
arrow = { version = "52.0.0", default-features = false }
arrow-flight = { version = "52.0.0" }
arrow = { version = "53.1.0", default-features = false }
arrow-flight = { version = "53.1.0" }
assert2 = "0.3.11"
async-channel = "2.1.1"
async-trait = "0.1.73"
Expand All @@ -83,7 +83,7 @@ bitflags = { version = "2.6.0" }
bytes = { version = "1.7", features = ["serde"] }
bytes-utils = "0.1.3"
bytestring = { version = "1.2", features = ["serde"] }
chrono = { version = "0.4.31", default-features = false, features = ["clock"] }
chrono = { version = "0.4.38", default-features = false, features = ["clock"] }
comfy-table = { version = "7.1" }
chrono-humanize = { version = "0.2.3" }
clap = { version = "4", default-features = false }
Expand All @@ -92,13 +92,13 @@ cling = { version = "0.1", default-features = false, features = ["derive"] }
criterion = "0.5"
crossterm = { version = "0.27.0" }
dashmap = { version = "6" }
datafusion = { version = "40.0.0", default-features = false, features = [
datafusion = { version = "42.0.0", default-features = false, features = [
"crypto_expressions",
"encoding_expressions",
"regex_expressions",
"unicode_expressions",
] }
datafusion-expr = { version = "40.0.0" }
datafusion-expr = { version = "42.0.0" }
derive_builder = "0.20.0"
derive_more = { version = "1", features = ["full"] }
dialoguer = { version = "0.11.0" }
Expand Down Expand Up @@ -186,7 +186,6 @@ tonic = { version = "0.12.3", default-features = false }
tonic-reflection = { version = "0.12.3" }
tonic-health = { version = "0.12.3" }
tonic-build = { version = "0.12.3" }
tonic-0-11 = { package = "tonic", version = "0.11.0", default-features = false }
tower = "0.4"
tower-http = { version = "0.5.2", default-features = false }
tracing = "0.1"
Expand Down
4 changes: 2 additions & 2 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ restate-types = { workspace = true }

anyhow = { workspace = true }
arc-swap = { workspace = true }
arrow = { version = "51.0.0", features = ["ipc", "prettyprint", "json"] }
arrow_convert = { version = "0.6.6" }
arrow = { version = "53.1.0", features = ["ipc", "prettyprint", "json"] }
arrow_convert = { version = "0.7.2" }
axum = { workspace = true, default-features = false, features = ["http1", "http2", "query", "tokio"] }
bytes = { workspace = true }
base62 = { version = "2.0.2" }
Expand Down
1 change: 0 additions & 1 deletion crates/admin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ serde_with = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tonic = { workspace = true, features = ["transport", "codegen", "prost", "gzip"] }
tonic-0-11 = { workspace = true }
tower = { workspace = true, features = ["load-shed", "limit"] }
tracing = { workspace = true }

Expand Down
42 changes: 1 addition & 41 deletions crates/admin/src/storage_query/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
// by the Apache License, Version 2.0.

use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::task::{Context, Poll};

Expand All @@ -36,8 +35,6 @@ use restate_core::network::protobuf::node_svc::StorageQueryRequest;
use schemars::JsonSchema;
use serde::Deserialize;
use serde_with::serde_as;
use tonic::metadata::{KeyAndValueRef, MetadataMap};
use tonic::Status;

use super::error::StorageQueryError;
use crate::state::QueryServiceState;
Expand Down Expand Up @@ -81,7 +78,7 @@ pub async fn query(
data_body: response.data,
..FlightData::default()
})
.map_err(|status| FlightError::from(tonic_status_012_to_011(status))),
.map_err(FlightError::from),
);

// create a stream without LargeUtf8 or LargeBinary columns as JS doesn't support these yet
Expand Down Expand Up @@ -270,40 +267,3 @@ impl Stream for ConvertRecordBatchStream {
}
}
}

// todo: Remove once arrow-flight works with tonic 0.12
fn tonic_status_012_to_011(status: Status) -> tonic_0_11::Status {
let code = tonic_0_11::Code::from(status.code() as i32);
let message = status.message().to_owned();
let details = Bytes::copy_from_slice(status.details());
let metadata = tonic_metadata_map_012_to_011(status.metadata());
tonic_0_11::Status::with_details_and_metadata(code, message, details, metadata)
}

// todo: Remove once arrow-flight works with tonic 0.12
fn tonic_metadata_map_012_to_011(metadata_map: &MetadataMap) -> tonic_0_11::metadata::MetadataMap {
let mut resulting_metadata_map =
tonic_0_11::metadata::MetadataMap::with_capacity(metadata_map.len());
for key_value in metadata_map.iter() {
match key_value {
KeyAndValueRef::Ascii(key, value) => {
// ignore metadata map entries if conversion fails
if let Ok(value) =
tonic_0_11::metadata::MetadataValue::from_str(value.to_str().unwrap_or(""))
{
if let Ok(key) = tonic_0_11::metadata::MetadataKey::from_str(key.as_str()) {
resulting_metadata_map.insert(key, value);
}
}
}
KeyAndValueRef::Binary(key, value) => {
if let Ok(key) = tonic_0_11::metadata::MetadataKey::from_bytes(key.as_ref()) {
let value = tonic_0_11::metadata::MetadataValue::from_bytes(value.as_ref());
resulting_metadata_map.insert_bin(key, value);
}
}
}
}

resulting_metadata_map
}
2 changes: 1 addition & 1 deletion crates/errors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ include_doc = ["termimad"]
[dependencies]
codederror = { workspace = true }
paste = { workspace = true }
termimad = { version = "0.23", optional = true }
termimad = { version = "0.30.0", optional = true }
tracing = { workspace = true }

[dev-dependencies]
Expand Down
1 change: 0 additions & 1 deletion crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ tokio-stream = { workspace = true }
tokio-util = { workspace = true }
tonic = { workspace = true }
tonic-reflection = { workspace = true }
tonic-0-11 = { workspace = true }
tower = { workspace = true }
tower-http = { workspace = true, features = ["trace"] }
tracing = { workspace = true }
Expand Down
41 changes: 2 additions & 39 deletions crates/node/src/network_server/handler/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

use arrow_flight::encode::FlightDataEncoderBuilder;
use arrow_flight::error::FlightError;
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::TryStreamExt;
use restate_core::network::protobuf::node_svc::node_svc_server::NodeSvc;
Expand All @@ -21,10 +20,8 @@ use restate_core::network::{ConnectionManager, GrpcConnector};
use restate_core::{metadata, TaskCenter};
use restate_types::protobuf::common::NodeStatus;
use restate_types::protobuf::node::Message;
use std::str::FromStr;
use tokio_stream::StreamExt;
use tonic::metadata::{MetadataKey, MetadataMap, MetadataValue};
use tonic::{Code, Request, Response, Status, Streaming};
use tonic::{Request, Response, Status, Streaming};

use crate::network_server::WorkerDependencies;

Expand Down Expand Up @@ -133,43 +130,9 @@ fn flight_error_to_tonic_status(err: FlightError) -> Status {
match err {
FlightError::Arrow(e) => Status::internal(e.to_string()),
FlightError::NotYetImplemented(e) => Status::internal(e),
FlightError::Tonic(status) => tonic_status_010_to_012(status),
FlightError::Tonic(status) => status,
FlightError::ProtocolError(e) => Status::internal(e),
FlightError::DecodeError(e) => Status::internal(e),
FlightError::ExternalError(e) => Status::internal(e.to_string()),
}
}

// todo: Remove once arrow-flight works with tonic 0.12
fn tonic_status_010_to_012(status: tonic_0_11::Status) -> Status {
let code = Code::from(status.code() as i32);
let message = status.message().to_owned();
let details = Bytes::copy_from_slice(status.details());
let metadata = tonic_metadata_map_010_to_012(status.metadata());
Status::with_details_and_metadata(code, message, details, metadata)
}

// todo: Remove once arrow-flight works with tonic 0.12
fn tonic_metadata_map_010_to_012(metadata_map: &tonic_0_11::metadata::MetadataMap) -> MetadataMap {
let mut resulting_metadata_map = MetadataMap::with_capacity(metadata_map.len());
for key_value in metadata_map.iter() {
match key_value {
tonic_0_11::metadata::KeyAndValueRef::Ascii(key, value) => {
// ignore metadata map entries if conversion fails
if let Ok(value) = MetadataValue::from_str(value.to_str().unwrap_or("")) {
if let Ok(key) = MetadataKey::from_str(key.as_str()) {
resulting_metadata_map.insert(key, value);
}
}
}
tonic_0_11::metadata::KeyAndValueRef::Binary(key, value) => {
if let Ok(key) = MetadataKey::from_bytes(key.as_ref()) {
let value = MetadataValue::from_bytes(value.as_ref());
resulting_metadata_map.insert_bin(key, value);
}
}
}
}

resulting_metadata_map
}
2 changes: 1 addition & 1 deletion crates/storage-query-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ ahash = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
bytestring = { workspace = true }
chrono = { version = "0.4.26", default-features = false, features = ["clock"] }
chrono = { workspace = true }
codederror = { workspace = true }
datafusion = { workspace = true }
derive_more = { workspace = true }
Expand Down
20 changes: 14 additions & 6 deletions crates/storage-query-datafusion/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ use std::sync::Arc;
use async_trait::async_trait;
use codederror::CodedError;
use datafusion::error::DataFusionError;
use datafusion::execution::context::{SQLOptions, SessionState};
use datafusion::execution::context::SQLOptions;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::prelude::{SessionConfig, SessionContext};

Expand Down Expand Up @@ -188,7 +190,10 @@ impl QueryContext {
//
// build the state
//
let mut state = SessionState::new_with_config_rt(session_config, runtime);
let mut state_builder = SessionStateBuilder::new()
.with_config(session_config)
.with_runtime_env(runtime)
.with_default_features();

// Rewrite the logical plan, to transparently add a 'partition_key' column to Join's
// To tables that have a partition key in their schema.
Expand All @@ -200,7 +205,7 @@ impl QueryContext {
// 'SELECT b.service_key FROM sys_invocation_status a JOIN state b on a.target_service_key = b.service_key AND a.partition_key = b.partition_key'
//
// This would be used by the SymmetricHashJoin as a watermark.
state.add_analyzer_rule(Arc::new(
state_builder = state_builder.with_analyzer_rule(Arc::new(
analyzer::UseSymmetricHashJoinWhenPartitionKeyIsPresent::new(),
));

Expand All @@ -219,10 +224,13 @@ impl QueryContext {
// A far more involved but potentially more robust solution would be wrap the SymmetricHashJoin in a ProjectionExec
// If this would become an issue for any reason, then we can explore that alternative.
//
let mut physical_optimizers = state.physical_optimizers().to_vec();
physical_optimizers.insert(0, Arc::new(physical_optimizer::JoinRewrite::new()));
let physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> =
vec![Arc::new(physical_optimizer::JoinRewrite::new())];

state_builder = state_builder.with_physical_optimizer_rules(physical_optimizers);

let state = state_builder.build();

state = state.with_physical_optimizer_rules(physical_optimizers);
let ctx = SessionContext::new_with_state(state);

let sql_options = SQLOptions::new()
Expand Down
6 changes: 3 additions & 3 deletions crates/storage-query-datafusion/src/table_providers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
use datafusion::common::DataFusionError;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::execution::context::{SessionState, TaskContext};
use datafusion::execution::context::TaskContext;
use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
use datafusion::physical_expr::EquivalenceProperties;
use datafusion::physical_plan::{
Expand Down Expand Up @@ -75,7 +75,7 @@ where

async fn scan(
&self,
_state: &SessionState,
_state: &(dyn datafusion::catalog::Session),
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
Expand Down Expand Up @@ -239,7 +239,7 @@ impl TableProvider for GenericTableProvider {

async fn scan(
&self,
_state: &SessionState,
_state: &(dyn datafusion::catalog::Session),
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
Expand Down
1 change: 0 additions & 1 deletion deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ version = 2
yanked = "deny"
ignore = [
{ id = "RUSTSEC-2024-0370", reason = "crate is unmaintained. This needs `arrow_convert` to use an alternative to `err-derive`" },
{ id = "RUSTSEC-2023-0086", reason = "lexical-core pending Arrow update https://github.com/restatedev/restate/issues/1966" }
]


Expand Down
Loading