Skip to content

Commit

Permalink
Merge branch 'master' of github.com:mozilla-services/syncstorage-rs i…
Browse files Browse the repository at this point in the history
…nto chore/update-202009
  • Loading branch information
jrconlin committed Sep 8, 2020
2 parents 164b642 + b197107 commit 994c2b5
Show file tree
Hide file tree
Showing 21 changed files with 529 additions and 336 deletions.
193 changes: 104 additions & 89 deletions CHANGELOG.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "syncstorage"
version = "0.5.5"
version = "0.5.8"
license = "MPL-2.0"
authors = [
"Ben Bangert <ben@groovie.org>",
Expand All @@ -19,7 +19,7 @@ actix-http = "1"
actix-web = "2"
actix-rt = "1"
actix-cors = "0.2"
async-trait = "0.1.36"
async-trait = "0.1.40"
base64 = "0.12"
bb8 = "0.4.1"
bytes = "0.5"
Expand Down
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,23 @@ We use [env_logger](https://crates.io/crates/env_logger): set the `RUST_LOG` env

`make test` - open the Makefile to adjust your `SYNC_DATABASE_URL` as needed.

#### Debugging unit test state

In some cases, it is useful to inspect the mysql state of a failed test. By
default, we use the diesel test_transaction functionality to ensure test data
is not committed to the database. Therefore, there is an environment variable
which can be used to turn off test_transaction.

SYNC_DATABASE_USE_TEST_TRANSACTIONS=false cargo test [testname]

Note that you will almost certainly want to pass a single test name. When running
the entire test suite, data from previous tests will cause future tests to fail.

To reset the database state between test runs, drop and recreate the database
in the mysql client:

drop database syncstorage_rs; create database syncstorage_rs; use syncstorage_rs;

### End-to-End tests

Functional tests live in [server-syncstorage](https://github.com/mozilla-services/server-syncstorage/) and can be run against a local server, e.g.:
Expand Down
3 changes: 0 additions & 3 deletions src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@ pub const FIRST_CUSTOM_COLLECTION_ID: i32 = 101;
/// Rough guesstimate of the maximum reasonable life span of a batch
pub const BATCH_LIFETIME: i64 = 2 * 60 * 60 * 1000; // 2 hours, in milliseconds

/// DbPools' worker ThreadPool size
pub const DB_THREAD_POOL_SIZE: usize = 50;

type DbFuture<'a, T> = LocalBoxFuture<'a, Result<T, ApiError>>;

#[async_trait(?Send)]
Expand Down
21 changes: 18 additions & 3 deletions src/db/mysql/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ use diesel::{
use super::models::{MysqlDb, Result};
#[cfg(test)]
use super::test::TestTransactionCustomizer;
use crate::db::{error::DbError, results, Db, DbPool, STD_COLLS};
use crate::db::{
error::DbError,
results::{self, PoolState},
Db, DbPool, STD_COLLS,
};
use crate::error::{ApiError, ApiResult};
use crate::server::metrics::Metrics;
use crate::settings::Settings;
Expand Down Expand Up @@ -105,8 +109,10 @@ impl DbPool for MysqlDbPool {
}

impl fmt::Debug for MysqlDbPool {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "MysqlDbPool {{ coll_cache: {:?} }}", self.coll_cache)
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("MysqlDbPool")
.field("coll_cache", &self.coll_cache)
.finish()
}
}

Expand Down Expand Up @@ -175,3 +181,12 @@ impl Default for CollectionCache {
}
}
}

impl From<diesel::r2d2::State> for PoolState {
fn from(state: diesel::r2d2::State) -> PoolState {
PoolState {
connections: state.connections,
idle_connections: state.idle_connections,
}
}
}
27 changes: 0 additions & 27 deletions src/db/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,33 +76,6 @@ pub struct PoolState {
pub idle_connections: u32,
}

impl From<diesel::r2d2::State> for PoolState {
fn from(state: diesel::r2d2::State) -> PoolState {
PoolState {
connections: state.connections,
idle_connections: state.idle_connections,
}
}
}

impl From<bb8::State> for PoolState {
fn from(state: bb8::State) -> PoolState {
PoolState {
connections: state.connections,
idle_connections: state.idle_connections,
}
}
}

impl From<deadpool::Status> for PoolState {
fn from(status: deadpool::Status) -> PoolState {
PoolState {
connections: status.size as u32,
idle_connections: status.available as u32,
}
}
}

#[cfg(test)]
pub type GetCollectionId = i32;

Expand Down
119 changes: 35 additions & 84 deletions src/db/spanner/manager/bb8.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,25 @@
use std::marker::PhantomData;
use std::{fmt, sync::Arc};

use actix_web::web::block;
use async_trait::async_trait;
use bb8::ManageConnection;
use googleapis_raw::spanner::v1::{
spanner::{CreateSessionRequest, GetSessionRequest, Session},
spanner_grpc::SpannerClient,
};
use grpcio::{
CallOption, ChannelBuilder, ChannelCredentials, EnvBuilder, Environment, MetadataBuilder,
};
use bb8::{ManageConnection, PooledConnection};
use grpcio::{EnvBuilder, Environment};

use crate::{
db::error::{DbError, DbErrorKind},
db::{
error::{DbError, DbErrorKind},
results::PoolState,
},
server::metrics::Metrics,
settings::Settings,
};

pub const SPANNER_ADDRESS: &str = "spanner.googleapis.com:443";
use super::session::{create_spanner_session, recycle_spanner_session, SpannerSession};

pub struct SpannerConnectionManager<T> {
#[allow(dead_code)]
pub type Conn<'a> = PooledConnection<'a, SpannerSessionManager<SpannerSession>>;

pub struct SpannerSessionManager<T> {
database_name: String,
/// The gRPC environment
env: Arc<Environment>,
Expand All @@ -29,30 +28,30 @@ pub struct SpannerConnectionManager<T> {
phantom: PhantomData<T>,
}

impl<_T> fmt::Debug for SpannerConnectionManager<_T> {
impl<_T> fmt::Debug for SpannerSessionManager<_T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("SpannerConnectionManager")
fmt.debug_struct("bb8::SpannerSessionManager")
.field("database_name", &self.database_name)
.field("test_transactions", &self.test_transactions)
.finish()
}
}

impl<T> SpannerConnectionManager<T> {
impl<T> SpannerSessionManager<T> {
#[allow(dead_code)]
pub fn new(settings: &Settings, metrics: &Metrics) -> Result<Self, DbError> {
let url = &settings.database_url;
if !url.starts_with("spanner://") {
Err(DbErrorKind::InvalidUrl(url.to_owned()))?;
}
let database_name = url["spanner://".len()..].to_owned();
let database_name = settings
.spanner_database_name()
.ok_or_else(|| DbErrorKind::InvalidUrl(settings.database_url.to_owned()))?
.to_owned();
let env = Arc::new(EnvBuilder::new().build());

#[cfg(not(test))]
let test_transactions = false;
#[cfg(test)]
let test_transactions = settings.database_use_test_transactions;

Ok(SpannerConnectionManager::<T> {
Ok(SpannerSessionManager::<T> {
database_name,
env,
metrics: metrics.clone(),
Expand All @@ -62,67 +61,23 @@ impl<T> SpannerConnectionManager<T> {
}
}

pub struct SpannerSession {
pub client: SpannerClient,
pub session: Session,

pub(in crate::db::spanner) use_test_transactions: bool,
}

#[async_trait]
impl<T: std::marker::Send + std::marker::Sync + 'static> ManageConnection
for SpannerConnectionManager<T>
{
impl<T: Send + Sync + 'static> ManageConnection for SpannerSessionManager<T> {
type Connection = SpannerSession;
type Error = DbError;

async fn connect(&self) -> Result<Self::Connection, Self::Error> {
let env = self.env.clone();
let mut metrics = self.metrics.clone();
// XXX: issue732: Could google_default_credentials (or
// ChannelBuilder::secure_connect) block?!
let chan = block(move || -> Result<grpcio::Channel, grpcio::Error> {
metrics.start_timer("storage.pool.grpc_auth", None);
// Requires
// GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account.json
let creds = ChannelCredentials::google_default_credentials()?;
Ok(ChannelBuilder::new(env)
.max_send_message_len(100 << 20)
.max_receive_message_len(100 << 20)
.secure_connect(SPANNER_ADDRESS, creds))
})
create_spanner_session(
Arc::clone(&self.env),
self.metrics.clone(),
&self.database_name,
self.test_transactions,
)
.await
.map_err(|e| match e {
actix_web::error::BlockingError::Error(e) => e.into(),
actix_web::error::BlockingError::Canceled => {
DbError::internal("web::block Manager operation canceled")
}
})?;
let client = SpannerClient::new(chan);

// Connect to the instance and create a Spanner session.
let session = create_session(&client, &self.database_name).await?;

Ok(SpannerSession {
client,
session,
use_test_transactions: self.test_transactions,
})
}

async fn is_valid(&self, mut conn: Self::Connection) -> Result<Self::Connection, Self::Error> {
let mut req = GetSessionRequest::new();
req.set_name(conn.session.get_name().to_owned());
if let Err(e) = conn.client.get_session_async(&req)?.await {
match e {
grpcio::Error::RpcFailure(ref status)
if status.status == grpcio::RpcStatusCode::NOT_FOUND =>
{
conn.session = create_session(&conn.client, &self.database_name).await?;
}
_ => return Err(e.into()),
}
}
recycle_spanner_session(&mut conn, &self.database_name).await?;
Ok(conn)
}

Expand All @@ -131,15 +86,11 @@ impl<T: std::marker::Send + std::marker::Sync + 'static> ManageConnection
}
}

pub async fn create_session(
client: &SpannerClient,
database_name: &str,
) -> Result<Session, grpcio::Error> {
let mut req = CreateSessionRequest::new();
req.database = database_name.to_owned();
let mut meta = MetadataBuilder::new();
meta.add_str("google-cloud-resource-prefix", database_name)?;
meta.add_str("x-goog-api-client", "gcp-grpc-rs")?;
let opt = CallOption::default().headers(meta.build());
client.create_session_async_opt(&req, opt)?.await
impl From<bb8::State> for PoolState {
fn from(state: bb8::State) -> PoolState {
PoolState {
connections: state.connections,
idle_connections: state.idle_connections,
}
}
}
Loading

0 comments on commit 994c2b5

Please sign in to comment.