Skip to content

Commit

Permalink
refactor node methods (#76)
Browse files Browse the repository at this point in the history
  • Loading branch information
ramfox authored Nov 21, 2023
1 parent 4de231c commit 2c6b54e
Show file tree
Hide file tree
Showing 5 changed files with 271 additions and 208 deletions.
61 changes: 61 additions & 0 deletions src/author.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use std::{str::FromStr, sync::Arc};

use futures::TryStreamExt;

use crate::{block_on, IrohError, IrohNode};

impl IrohNode {
/// Create a new author.
pub fn author_create(&self) -> Result<Arc<AuthorId>, IrohError> {
block_on(&self.async_runtime, async {
let author = self
.sync_client
.authors
.create()
.await
.map_err(IrohError::author)?;

Ok(Arc::new(AuthorId(author)))
})
}

/// List all the AuthorIds that exist on this node.
pub fn author_list(&self) -> Result<Vec<Arc<AuthorId>>, IrohError> {
block_on(&self.async_runtime, async {
let authors = self
.sync_client
.authors
.list()
.await
.map_err(IrohError::author)?
.map_ok(|id| Arc::new(AuthorId(id)))
.try_collect::<Vec<_>>()
.await
.map_err(IrohError::author)?;
Ok(authors)
})
}
}

/// Identifier for an [`Author`]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AuthorId(pub(crate) iroh::sync::AuthorId);

impl std::fmt::Display for AuthorId {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}

impl AuthorId {
/// Get an [`AuthorId`] from a String
pub fn from_string(str: String) -> Result<Self, IrohError> {
let author = iroh::sync::AuthorId::from_str(&str).map_err(IrohError::author)?;
Ok(AuthorId(author))
}

/// Returns true when both AuthorId's have the same value
pub fn equal(&self, other: Arc<AuthorId>) -> bool {
*self == *other
}
}
161 changes: 133 additions & 28 deletions src/doc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use std::str::FromStr;
use std::sync::Arc;
use std::time::SystemTime;
use std::{str::FromStr, sync::Arc, time::SystemTime};

use futures::{StreamExt, TryStreamExt};
use iroh::{
Expand All @@ -9,11 +7,75 @@ use iroh::{
rpc_protocol::{ProviderRequest, ProviderResponse},
};

pub use iroh::sync::CapabilityKind;

use quic_rpc::transport::flume::FlumeConnection;

use crate::{block_on, Hash, IrohError, PublicKey, SocketAddr, SocketAddrType};
use crate::{block_on, AuthorId, Hash, IrohError, IrohNode, PublicKey, SocketAddr, SocketAddrType};

pub use iroh::sync::CapabilityKind;
impl IrohNode {
/// Create a new doc.
pub fn doc_create(&self) -> Result<Arc<Doc>, IrohError> {
block_on(&self.async_runtime, async {
let doc = self
.sync_client
.docs
.create()
.await
.map_err(IrohError::doc)?;

Ok(Arc::new(Doc {
inner: doc,
rt: self.async_runtime.clone(),
}))
})
}

/// Join and sync with an already existing document.
pub fn doc_join(&self, ticket: Arc<DocTicket>) -> Result<Arc<Doc>, IrohError> {
block_on(&self.async_runtime, async {
let doc = self
.sync_client
.docs
.import(ticket.0.clone())
.await
.map_err(IrohError::doc)?;

Ok(Arc::new(Doc {
inner: doc,
rt: self.async_runtime.clone(),
}))
})
}
/// List all the docs we have access to on this node.
pub fn doc_list(&self) -> Result<Vec<NamespaceAndCapability>, IrohError> {
block_on(&self.async_runtime, async {
let docs = self
.sync_client
.docs
.list()
.await
.map_err(IrohError::doc)?
.map_ok(|(namespace, capability)| NamespaceAndCapability {
namespace: Arc::new(namespace.into()),
capability,
})
.try_collect::<Vec<_>>()
.await
.map_err(IrohError::doc)?;

Ok(docs)
})
}
}

/// The NamespaceId and CapabilityKind (read/write) of the doc
pub struct NamespaceAndCapability {
/// The NamespaceId of the doc
pub namespace: Arc<NamespaceId>,
/// The capability you have for the doc (read/write)
pub capability: CapabilityKind,
}

/// A representation of a mutable, synchronizable key-value store.
pub struct Doc {
Expand Down Expand Up @@ -402,29 +464,6 @@ impl Entry {
}
}

/// Identifier for an [`Author`]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AuthorId(pub(crate) iroh::sync::AuthorId);

impl std::fmt::Display for AuthorId {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}

impl AuthorId {
/// Get an [`AuthorId`] from a String
pub fn from_string(str: String) -> Result<Self, IrohError> {
let author = iroh::sync::AuthorId::from_str(&str).map_err(IrohError::author)?;
Ok(AuthorId(author))
}

/// Returns true when both AuthorId's have the same value
pub fn equal(&self, other: Arc<AuthorId>) -> bool {
*self == *other
}
}

/// An identifier for a Doc
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NamespaceId(pub(crate) iroh::sync::NamespaceId);
Expand Down Expand Up @@ -1206,6 +1245,72 @@ mod tests {
use rand::RngCore;
use std::io::Write;

#[test]
fn test_doc_create() {
let path = tempfile::tempdir().unwrap();
let node = IrohNode::new(path.path().to_string_lossy().into_owned()).unwrap();
let node_id = node.node_id();
println!("id: {}", node_id);
let doc = node.doc_create().unwrap();
let doc_id = doc.id();
println!("doc_id: {}", doc_id);

let doc_ticket = doc.share(crate::doc::ShareMode::Write).unwrap();
let doc_ticket_string = doc_ticket.to_string();
let dock_ticket_back = DocTicket::from_string(doc_ticket_string.clone()).unwrap();
assert_eq!(doc_ticket.0.to_string(), dock_ticket_back.0.to_string());
println!("doc_ticket: {}", doc_ticket_string);
node.doc_join(doc_ticket).unwrap();
}

#[test]
fn test_basic_sync() {
// create node_0
let iroh_dir_0 = tempfile::tempdir().unwrap();
let node_0 = IrohNode::new(iroh_dir_0.path().to_string_lossy().into_owned()).unwrap();

// create node_1
let iroh_dir_1 = tempfile::tempdir().unwrap();
let node_1 = IrohNode::new(iroh_dir_1.path().to_string_lossy().into_owned()).unwrap();

// create doc on node_0
let doc_0 = node_0.doc_create().unwrap();
let ticket = doc_0.share(ShareMode::Write).unwrap();

// subscribe to sync events
let (found_s, found_r) = std::sync::mpsc::channel();
struct Callback {
found_s: std::sync::mpsc::Sender<Result<Hash, IrohError>>,
}
impl SubscribeCallback for Callback {
fn event(&self, event: Arc<LiveEvent>) -> Result<(), IrohError> {
match *event {
LiveEvent::ContentReady { ref hash } => {
self.found_s
.send(Ok(hash.clone()))
.map_err(IrohError::doc)?;
}
_ => {}
}
Ok(())
}
}
let cb = Callback { found_s };
doc_0.subscribe(Box::new(cb)).unwrap();

// join the same doc from node_1
let doc_1 = node_1.doc_join(ticket).unwrap();

// create author on node_1
let author = node_1.author_create().unwrap();
doc_1
.set_bytes(author, b"hello".to_vec(), b"world".to_vec())
.unwrap();
let hash = found_r.recv().unwrap().unwrap();
let val = node_1.blobs_read_to_bytes(hash.into()).unwrap();
assert_eq!(b"world".to_vec(), val);
}

#[test]
fn test_node_addr() {
//
Expand Down
23 changes: 20 additions & 3 deletions src/iroh.udl
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,26 @@ interface IrohNode {
void blobs_delete_blob(Hash hash);
};

/// Information about a direct address.
interface DirectAddrInfo {
/// Get the reported address
SocketAddr addr();
/// Get the reported latency, if it exists
duration? latency();
/// Get the last control message received by this node
LatencyAndControlMsg? last_control();
/// Get how long ago the last payload message was received for this node
duration? last_payload();
};

/// The latency and type of the control message
dictionary LatencyAndControlMsg {
/// The latency of the control message
duration latency;
/// The type of control message, represented as a string
string control_msg;
};

/// A representation of a mutable, synchronizable key-value store.
interface Doc {
/// Get the document id of this doc.
Expand Down Expand Up @@ -687,9 +707,6 @@ interface ConnectionType{
None();
};

/// Information about a direct address.
interface DirectAddrInfo {};

/// Type of SocketAddr
enum SocketAddrType {
/// Ipv4 SocketAddr
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod author;
mod blob;
mod doc;
mod error;
Expand All @@ -6,6 +7,7 @@ mod net;
mod node;
mod tag;

pub use self::author::*;
pub use self::blob::*;
pub use self::doc::*;
pub use self::error::IrohError;
Expand Down
Loading

0 comments on commit 2c6b54e

Please sign in to comment.