Skip to content

Commit

Permalink
Merge pull request hyperledger-archives#1888 from aludvik/block-publi…
Browse files Browse the repository at this point in the history
…sher-cleanup-1

Block Publisher FFI Cleanup
  • Loading branch information
Adam M Ludvik authored Oct 4, 2018
2 parents 32bdaa1 + d5b2bfe commit ca0e8e1
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 64 deletions.
12 changes: 12 additions & 0 deletions validator/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
* ------------------------------------------------------------------------------
*/

use cpython::{PyObject, Python};

#[no_mangle]
pub unsafe extern "C" fn ffi_reclaim_string(s_ptr: *mut u8, s_len: usize, s_cap: usize) -> isize {
String::from_raw_parts(s_ptr, s_len, s_cap);
Expand All @@ -32,3 +34,13 @@ pub unsafe extern "C" fn ffi_reclaim_vec(

0
}

pub fn py_import_class(module: &str, class: &str) -> PyObject {
let gil = Python::acquire_gil();
let python = gil.python();
python
.import(module)
.expect(&format!("Unable to import '{}'", module))
.get(python, class)
.expect(&format!("Unable to import {} from '{}'", class, module))
}
54 changes: 23 additions & 31 deletions validator/src/journal/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use std::thread;
use std::time::Duration;

use execution::execution_platform::ExecutionPlatform;
use execution::py_executor::PyExecutor;
use ffi::py_import_class;
use journal::block_manager::BlockManager;
use journal::candidate_block::{CandidateBlock, CandidateBlockError};
use journal::chain_commit_state::TransactionCommitCache;
Expand All @@ -48,6 +48,13 @@ lazy_static! {
metrics::get_collector("sawtooth_validator.publisher");
}

lazy_static! {
static ref PY_BLOCK_HEADER_CLASS: PyObject =
py_import_class("sawtooth_validator.protobuf.block_pb2", "BlockHeader");
static ref PY_BLOCK_BUILDER_CLASS: PyObject =
py_import_class("sawtooth_validator.journal.block_builder", "BlockBuilder");
}

#[derive(Debug)]
pub enum InitializeBlockError {
BlockInProgress,
Expand Down Expand Up @@ -75,8 +82,13 @@ pub enum BlockPublisherError {
UnknownBlock(String),
}

pub trait BatchObserver: Send + Sync {
fn notify_batch_pending(&self, batch: &Batch);
}

pub struct BlockPublisherState {
pub transaction_executor: Box<ExecutionPlatform>,
pub batch_observers: Vec<Box<BatchObserver>>,
pub chain_head: Option<Block>,
pub candidate_block: Option<CandidateBlock>,
pub pending_batches: PendingBatchesPool,
Expand All @@ -85,11 +97,13 @@ pub struct BlockPublisherState {
impl BlockPublisherState {
pub fn new(
transaction_executor: Box<ExecutionPlatform>,
batch_observers: Vec<Box<BatchObserver>>,
chain_head: Option<Block>,
candidate_block: Option<CandidateBlock>,
pending_batches: PendingBatchesPool,
) -> Self {
BlockPublisherState {
batch_observers,
transaction_executor,
chain_head,
candidate_block,
Expand All @@ -107,10 +121,7 @@ pub struct SyncBlockPublisher {
pub state: Arc<RwLock<BlockPublisherState>>,

block_manager: BlockManager,
batch_observers: Vec<PyObject>,
batch_injector_factory: PyObject,
block_header_class: PyObject,
block_builder_class: PyObject,
batch_committed: PyObject,
transaction_committed: PyObject,
state_view_factory: StateViewFactory,
Expand All @@ -134,14 +145,7 @@ impl Clone for SyncBlockPublisher {
SyncBlockPublisher {
state,
block_manager: self.block_manager.clone(),
batch_observers: self
.batch_observers
.iter()
.map(|i| i.clone_ref(py))
.collect(),
batch_injector_factory: self.batch_injector_factory.clone_ref(py),
block_header_class: self.block_header_class.clone_ref(py),
block_builder_class: self.block_builder_class.clone_ref(py),
batch_committed: self.batch_committed.clone_ref(py),
transaction_committed: self.transaction_committed.clone_ref(py),
state_view_factory: self.state_view_factory.clone(),
Expand Down Expand Up @@ -262,13 +266,11 @@ impl SyncBlockPublisher {
kwargs
.set_item(py, "signer_public_key", &public_key)
.unwrap();
let block_header = self
.block_header_class
let block_header = PY_BLOCK_HEADER_CLASS
.call(py, NoArgs, Some(&kwargs))
.expect("BlockHeader could not be constructed");

let block_builder = self
.block_builder_class
let block_builder = PY_BLOCK_BUILDER_CLASS
.call(py, (block_header,), None)
.expect("BlockBuilder could not be constructed");

Expand Down Expand Up @@ -452,12 +454,8 @@ impl SyncBlockPublisher {

pub fn on_batch_received(&self, batch: Batch) {
let mut state = self.state.write().expect("Lock should not be poisoned");
for observer in &self.batch_observers {
let gil = Python::acquire_gil();
let py = gil.python();
observer
.call_method(py, "notify_batch_pending", (batch.clone(),), None)
.expect("BatchObserver has no method notify_batch_pending");
for observer in &state.batch_observers {
observer.notify_batch_pending(&batch);
}
let permission_check = {
let gil = Python::acquire_gil();
Expand Down Expand Up @@ -503,7 +501,7 @@ impl BlockPublisher {
#![allow(too_many_arguments)]
pub fn new(
block_manager: BlockManager,
transaction_executor: PyObject,
transaction_executor: Box<ExecutionPlatform>,
batch_committed: PyObject,
transaction_committed: PyObject,
state_view_factory: StateViewFactory,
Expand All @@ -514,15 +512,12 @@ impl BlockPublisher {
data_dir: PyObject,
config_dir: PyObject,
permission_verifier: PyObject,
batch_observers: Vec<PyObject>,
batch_observers: Vec<Box<BatchObserver>>,
batch_injector_factory: PyObject,
block_header_class: PyObject,
block_builder_class: PyObject,
) -> Self {
let tep = Box::new(PyExecutor::new(transaction_executor).unwrap());

let state = Arc::new(RwLock::new(BlockPublisherState::new(
tep,
transaction_executor,
batch_observers,
chain_head,
None,
PendingBatchesPool::new(NUM_PUBLISH_COUNT_SAMPLES, INITIAL_PUBLISH_COUNT),
Expand All @@ -540,10 +535,7 @@ impl BlockPublisher {
data_dir,
config_dir,
permission_verifier,
batch_observers,
batch_injector_factory,
block_header_class,
block_builder_class,
exit: Arc::new(Exit::new()),
};

Expand Down
82 changes: 49 additions & 33 deletions validator/src/journal/publisher_ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,26 @@ use std::mem;
use std::os::raw::{c_char, c_void};
use std::slice;

use cpython::{PyClone, PyList, PyObject, Python};
use cpython::{ObjectProtocol, PyClone, PyList, PyObject, Python};

use batch::Batch;
use block::Block;
use execution::py_executor::PyExecutor;
use ffi::py_import_class;
use journal::block_manager::BlockManager;
use journal::publisher::{
BlockPublisher, FinalizeBlockError, IncomingBatchSender, InitializeBlockError,
BatchObserver, BlockPublisher, FinalizeBlockError, IncomingBatchSender, InitializeBlockError,
};

use state::state_view_factory::StateViewFactory;

lazy_static! {
static ref PY_BATCH_PUBLISHER_CLASS: PyObject = py_import_class(
"sawtooth_validator.journal.consensus.batch_publisher",
"BatchPublisher"
);
}

#[repr(u32)]
#[derive(Debug)]
pub enum ErrorCode {
Expand Down Expand Up @@ -110,38 +119,27 @@ pub unsafe extern "C" fn block_publisher_new(
.extract(py)
.expect("Got chain head that wasn't a BlockWrapper")
};
let batch_observers: Vec<PyObject> = batch_observers
.extract::<PyList>(py)
.unwrap()
.iter(py)
.collect();

let batch_publisher_mod = py
.import("sawtooth_validator.journal.consensus.batch_publisher")
.expect("Unable to import 'sawtooth_validator.journal.consensus.batch_publisher'");
let batch_publisher = batch_publisher_mod
.call(
py,
"BatchPublisher",
(identity_signer.clone_ref(py), batch_sender),
None,
).expect("Unable to create BatchPublisher");

let block_header_class = py
.import("sawtooth_validator.protobuf.block_pb2")
.expect("Unable to import 'sawtooth_validator.protobuf.block_pb2'")
.get(py, "BlockHeader")
.expect("Unable to import BlockHeader from 'sawtooth_validator.protobuf.block_pb2'");

let block_builder_class = py
.import("sawtooth_validator.journal.block_builder")
.expect("Unable to import 'sawtooth_validator.journal.block_builder'")
.get(py, "BlockBuilder")
.expect("Unable to import BlockBuilder from 'sawtooth_validator.journal.block_builder'");

let batch_observers = if let Ok(py_list) = batch_observers.extract::<PyList>(py) {
let mut res: Vec<Box<BatchObserver>> = Vec::with_capacity(py_list.len(py));
py_list
.iter(py)
.for_each(|pyobj| res.push(Box::new(PyBatchObserver::new(pyobj))));
res
} else {
return ErrorCode::InvalidInput;
};

let batch_publisher = PY_BATCH_PUBLISHER_CLASS
.call(py, (identity_signer.clone_ref(py), batch_sender), None)
.expect("Unable to create BatchPublisher");

let publisher = BlockPublisher::new(
block_manager,
transaction_executor,
Box::new(
PyExecutor::new(transaction_executor)
.expect("Failed to create python transaction executor"),
),
batch_committed,
transaction_committed,
state_view_factory.clone(),
Expand All @@ -154,8 +152,6 @@ pub unsafe extern "C" fn block_publisher_new(
permission_verifier,
batch_observers,
batch_injector_factory,
block_header_class,
block_builder_class,
);

*block_publisher_ptr = Box::into_raw(Box::new(publisher)) as *const c_void;
Expand Down Expand Up @@ -403,3 +399,23 @@ pub unsafe extern "C" fn block_publisher_cancel_block(publisher: *mut c_void) ->
Err(_) => ErrorCode::BlockNotInitialized,
}
}

struct PyBatchObserver {
py_batch_observer: PyObject,
}

impl PyBatchObserver {
fn new(py_batch_observer: PyObject) -> Self {
PyBatchObserver { py_batch_observer }
}
}

impl BatchObserver for PyBatchObserver {
fn notify_batch_pending(&self, batch: &Batch) {
let gil = Python::acquire_gil();
let py = gil.python();
self.py_batch_observer
.call_method(py, "notify_batch_pending", (batch,), None)
.expect("BatchObserver has no method notify_batch_pending");
}
}

0 comments on commit ca0e8e1

Please sign in to comment.