Skip to content

Commit

Permalink
Allow any BatchObserver in BlockPublisher
Browse files Browse the repository at this point in the history
The type of argument should support non-python executors.

Note that this change requires moving batch observers into state so they can be
cloned, since you cannot require `Clone` on a trait object.

Signed-off-by: Adam Ludvik <ludvik@bitwise.io>
  • Loading branch information
Adam Ludvik committed Oct 3, 2018
1 parent 3dd270f commit d5b2bfe
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 20 deletions.
25 changes: 11 additions & 14 deletions validator/src/journal/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,13 @@ pub enum BlockPublisherError {
UnknownBlock(String),
}

pub trait BatchObserver: Send + Sync {
fn notify_batch_pending(&self, batch: &Batch);
}

pub struct BlockPublisherState {
pub transaction_executor: Box<ExecutionPlatform>,
pub batch_observers: Vec<Box<BatchObserver>>,
pub chain_head: Option<Block>,
pub candidate_block: Option<CandidateBlock>,
pub pending_batches: PendingBatchesPool,
Expand All @@ -92,11 +97,13 @@ pub struct BlockPublisherState {
impl BlockPublisherState {
pub fn new(
transaction_executor: Box<ExecutionPlatform>,
batch_observers: Vec<Box<BatchObserver>>,
chain_head: Option<Block>,
candidate_block: Option<CandidateBlock>,
pending_batches: PendingBatchesPool,
) -> Self {
BlockPublisherState {
batch_observers,
transaction_executor,
chain_head,
candidate_block,
Expand All @@ -114,7 +121,6 @@ pub struct SyncBlockPublisher {
pub state: Arc<RwLock<BlockPublisherState>>,

block_manager: BlockManager,
batch_observers: Vec<PyObject>,
batch_injector_factory: PyObject,
batch_committed: PyObject,
transaction_committed: PyObject,
Expand All @@ -139,11 +145,6 @@ impl Clone for SyncBlockPublisher {
SyncBlockPublisher {
state,
block_manager: self.block_manager.clone(),
batch_observers: self
.batch_observers
.iter()
.map(|i| i.clone_ref(py))
.collect(),
batch_injector_factory: self.batch_injector_factory.clone_ref(py),
batch_committed: self.batch_committed.clone_ref(py),
transaction_committed: self.transaction_committed.clone_ref(py),
Expand Down Expand Up @@ -453,12 +454,8 @@ impl SyncBlockPublisher {

pub fn on_batch_received(&self, batch: Batch) {
let mut state = self.state.write().expect("Lock should not be poisoned");
for observer in &self.batch_observers {
let gil = Python::acquire_gil();
let py = gil.python();
observer
.call_method(py, "notify_batch_pending", (batch.clone(),), None)
.expect("BatchObserver has no method notify_batch_pending");
for observer in &state.batch_observers {
observer.notify_batch_pending(&batch);
}
let permission_check = {
let gil = Python::acquire_gil();
Expand Down Expand Up @@ -515,11 +512,12 @@ impl BlockPublisher {
data_dir: PyObject,
config_dir: PyObject,
permission_verifier: PyObject,
batch_observers: Vec<PyObject>,
batch_observers: Vec<Box<BatchObserver>>,
batch_injector_factory: PyObject,
) -> Self {
let state = Arc::new(RwLock::new(BlockPublisherState::new(
transaction_executor,
batch_observers,
chain_head,
None,
PendingBatchesPool::new(NUM_PUBLISH_COUNT_SAMPLES, INITIAL_PUBLISH_COUNT),
Expand All @@ -537,7 +535,6 @@ impl BlockPublisher {
data_dir,
config_dir,
permission_verifier,
batch_observers,
batch_injector_factory,
exit: Arc::new(Exit::new()),
};
Expand Down
37 changes: 31 additions & 6 deletions validator/src/journal/publisher_ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use execution::py_executor::PyExecutor;
use ffi::py_import_class;
use journal::block_manager::BlockManager;
use journal::publisher::{
BlockPublisher, FinalizeBlockError, IncomingBatchSender, InitializeBlockError,
BatchObserver, BlockPublisher, FinalizeBlockError, IncomingBatchSender, InitializeBlockError,
};

use state::state_view_factory::StateViewFactory;
Expand Down Expand Up @@ -119,11 +119,16 @@ pub unsafe extern "C" fn block_publisher_new(
.extract(py)
.expect("Got chain head that wasn't a BlockWrapper")
};
let batch_observers: Vec<PyObject> = batch_observers
.extract::<PyList>(py)
.unwrap()
.iter(py)
.collect();

let batch_observers = if let Ok(py_list) = batch_observers.extract::<PyList>(py) {
let mut res: Vec<Box<BatchObserver>> = Vec::with_capacity(py_list.len(py));
py_list
.iter(py)
.for_each(|pyobj| res.push(Box::new(PyBatchObserver::new(pyobj))));
res
} else {
return ErrorCode::InvalidInput;
};

let batch_publisher = PY_BATCH_PUBLISHER_CLASS
.call(py, (identity_signer.clone_ref(py), batch_sender), None)
Expand Down Expand Up @@ -394,3 +399,23 @@ pub unsafe extern "C" fn block_publisher_cancel_block(publisher: *mut c_void) ->
Err(_) => ErrorCode::BlockNotInitialized,
}
}

struct PyBatchObserver {
py_batch_observer: PyObject,
}

impl PyBatchObserver {
fn new(py_batch_observer: PyObject) -> Self {
PyBatchObserver { py_batch_observer }
}
}

impl BatchObserver for PyBatchObserver {
fn notify_batch_pending(&self, batch: &Batch) {
let gil = Python::acquire_gil();
let py = gil.python();
self.py_batch_observer
.call_method(py, "notify_batch_pending", (batch,), None)
.expect("BatchObserver has no method notify_batch_pending");
}
}

0 comments on commit d5b2bfe

Please sign in to comment.