Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(typegraph): send rpc message in chunks in the TS typegraph client #904

Merged
merged 8 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/typegraphs/metagen/rs/fdk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl Router {
}

pub fn init(&self, args: InitArgs) -> Result<InitResponse, InitError> {
static MT_VERSION: &str = "0.5.0-rc.3";
static MT_VERSION: &str = "0.5.0-rc.4";
if args.metatype_version != MT_VERSION {
return Err(InitError::VersionMismatch(MT_VERSION.into()));
}
Expand Down
7 changes: 4 additions & 3 deletions src/meta-cli/src/cli/gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl InputResolver for MetagenCtx {
.join(path)
.canonicalize()
.wrap_err("unable to canonicalize typegraph path, make sure it exists")?;
let raw = load_tg_at(config, path, name.as_deref()).await?;
let raw = load_tg_at(config, path, name.as_deref(), &self.dir).await?;
GeneratorInputResolved::TypegraphFromTypegate { raw }
}
GeneratorInputOrder::LoadFdkTemplate {
Expand All @@ -177,6 +177,7 @@ async fn load_tg_at(
config: Arc<Config>,
path: PathBuf,
name: Option<&str>,
dir: &Path,
) -> anyhow::Result<Box<Typegraph>> {
let console = ConsoleActor::new(Arc::clone(&config)).start();

Expand All @@ -185,7 +186,7 @@ async fn load_tg_at(
config.clone(),
SerializeActionGenerator::new(
config_dir.clone(),
config_dir, // TODO cwd
dir.into(),
config
.prisma_migrations_base_dir(PathOption::Absolute)
.into(),
Expand All @@ -200,7 +201,7 @@ async fn load_tg_at(
let mut tgs = report.into_typegraphs()?;

if tgs.is_empty() {
bail!("not typegraphs loaded from path at {path:?}")
bail!("no typegraphs loaded from path at {path:?}")
}
let tg = if let Some(tg_name) = name {
if let Some(idx) = tgs.iter().position(|tg| tg.name().unwrap() == tg_name) {
Expand Down
186 changes: 120 additions & 66 deletions src/meta-cli/src/deploy/actors/task_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,6 @@ mod message {
pub(super) struct Exit;
}

#[derive(Clone, Copy)]
enum OutputLevel {
Debug,
Info,
Warning,
Error,
}

#[derive(Serialize, Deserialize, Debug)]
enum JsonRpcVersion {
#[serde(rename = "2.0")]
Expand Down Expand Up @@ -76,8 +68,8 @@ pub(super) struct TaskIoActor<A: TaskAction + 'static> {
action: A,
task: Addr<TaskActor<A>>,
console: Addr<ConsoleActor>,
latest_level: OutputLevel,
results: Vec<ActionResult<A>>,
rpc_message_buffer: String,
}

impl<A: TaskAction + 'static> TaskIoActor<A> {
Expand All @@ -102,8 +94,8 @@ impl<A: TaskAction + 'static> TaskIoActor<A> {
action,
task,
console: console.clone(),
latest_level: OutputLevel::Info,
results: vec![],
rpc_message_buffer: String::new(),
};

let self_addr = ctx.address().downgrade();
Expand Down Expand Up @@ -155,6 +147,25 @@ impl<A: TaskAction + 'static> Actor for TaskIoActor<A> {
}
}

#[derive(Deserialize, Debug)]
struct RpcNotificationMessage {
#[allow(dead_code)]
jsonrpc: JsonRpcVersion,
#[serde(flatten)]
notification: RpcNotification,
}

#[derive(Deserialize, Debug)]
#[serde(tag = "method", content = "params")]
enum RpcNotification {
Debug { message: String },
Info { message: String },
Warning { message: String },
Error { message: String },
Success { data: serde_json::Value },
Failure { data: serde_json::Value },
}

impl<A: TaskAction + 'static> Handler<message::OutputLine> for TaskIoActor<A> {
type Result = ();

Expand All @@ -166,57 +177,28 @@ impl<A: TaskAction + 'static> Handler<message::OutputLine> for TaskIoActor<A> {
Some((prefix, tail)) => {
trace!("prefix: {prefix}");
match prefix {
"debug" => {
console.debug(format!("{scope} {tail}"));
self.latest_level = OutputLevel::Debug;
"jsonrpc^" => {
self.rpc_message_buffer.push_str(tail);
}
"info" => {
console.info(format!("{scope} {tail}"));
self.latest_level = OutputLevel::Info;
}
"warning" => {
console.warning(format!("{scope} {tail}"));
self.latest_level = OutputLevel::Warning;
}
"error" => {
console.error(format!("{scope} {tail}"));
self.latest_level = OutputLevel::Error;
}
"success" => {
match serde_json::from_str(tail) {
Ok(data) => self.results.push(Ok(data)),
Err(err) => {
console.error(format!("{scope} failed to process message: {err}"));
// TODO fail task?
}
}
}
"failure" => {
match serde_json::from_str(tail) {
Ok(data) => {
self.results.push(Err(data));
}
Err(err) => {
console.error(format!("{scope} failed to process message: {err}"));
// TODO fail task?
}
}
}
"jsonrpc" => {
match serde_json::from_str(tail) {
Ok(req) => self.handle_rpc_request(req, ctx.address(), ctx),
Err(err) => {
console.error(format!("{scope} failed to process message: {err}"));
// TODO fail task?
}
}
"jsonrpc$" => {
self.rpc_message_buffer.push_str(tail);
let message = std::mem::take(&mut self.rpc_message_buffer);
self.handle_rpc_message(&message, ctx);
}

_ => self.handle_continuation(&line),
_ => {
// a log message that were not outputted with the log library
// on the typegraph client
// --> as a debug message
console.debug(format!("{scope}$>{line}"));
}
}
}
None => {
self.handle_continuation(&line);
// a log message that were not outputted with the log library
// on the typegraph client
// --> as a debug message
console.debug(format!("{scope}$>{line}"));
}
}
}
Expand All @@ -228,23 +210,95 @@ impl<A: TaskAction + 'static> TaskIoActor<A> {
format!("[{path}]", path = path.yellow())
}

// process as continuation to previous output
fn handle_continuation(&self, line: &str) {
fn handle_rpc_message(&mut self, message: &str, ctx: &mut Context<Self>) {
let console = &self.console;
let scope = self.get_console_scope();
let message: serde_json::Value = match serde_json::from_str(message) {
Ok(value) => value,
Err(err) => {
self.console
.error(format!("{scope} failed to parse JSON-RPC message: {err}"));
// TODO cancel task?
return;
}
};

if message.get("id").is_some() {
// JSON-RPC request
match serde_json::from_value(message) {
Ok(req) => self.handle_rpc_request(req, ctx.address(), ctx),
Err(err) => {
console.error(format!(
"{scope} failed to validate JSON-RPC request: {err}"
));
// TODO cancel task?
}
}
} else {
// JSON-RPC notification
match serde_json::from_value::<RpcNotificationMessage>(message)
.map(|msg| msg.notification)
{
Ok(notification) => self.handle_rpc_notification(notification),
Err(err) => {
console.error(format!(
"{scope} failed to validate JSON-RPC notification: {err}"
));
// TODO cancel task?
}
};
}
}

match self.latest_level {
OutputLevel::Debug => {
console.debug(format!("{scope}>{line}"));
fn handle_rpc_notification(&mut self, notification: RpcNotification) {
let console = &self.console;
let scope = self.get_console_scope();
match notification {
RpcNotification::Debug { message } => {
for line in message.lines() {
console.debug(format!("{scope} {line}"));
}
}
RpcNotification::Info { message } => {
for line in message.lines() {
console.info(format!("{scope} {line}"));
}
}
RpcNotification::Warning { message } => {
for line in message.lines() {
console.warning(format!("{scope} {line}"));
}
}
OutputLevel::Info => {
console.info(format!("{scope}>{line}"));
RpcNotification::Error { message } => {
for line in message.lines() {
console.error(format!("{scope} {line}"));
}
}
OutputLevel::Warning => {
console.warning(format!("{scope}>{line}"));
RpcNotification::Success { data } => {
let data = match serde_json::from_value(data) {
Ok(data) => data,
Err(err) => {
console.error(format!(
"{scope} failed to validate JSON-RPC notification (success): {err}"
));
// TODO cancel task?
return;
}
};
self.results.push(Ok(data));
}
OutputLevel::Error => {
console.error(format!("{scope}>{line}"));
RpcNotification::Failure { data } => {
let data = match serde_json::from_value(data) {
Ok(data) => data,
Err(err) => {
console.error(format!(
"{scope} failed to validate JSON-RPC notification (failure): {err}"
));
// TODO cancel task?
return;
}
};
self.results.push(Err(data));
}
}
}
Expand Down
10 changes: 9 additions & 1 deletion src/meta-cli/src/deploy/actors/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{config::Config, interlude::*};
use colored::OwoColorize;
use futures::channel::oneshot;
use indexmap::IndexMap;
use pathdiff::diff_paths;
use signal_handler::set_stop_recipient;
use std::collections::VecDeque;
use std::sync::atomic::{AtomicUsize, Ordering};
Expand Down Expand Up @@ -205,9 +206,16 @@ impl<A: TaskAction + 'static> TaskManagerInit<A> {
) -> Option<Addr<WatcherActor<A>>> {
match &self.task_source {
TaskSource::Static(paths) => {
let working_dir = self
.action_generator
.get_shared_config()
.working_dir
.clone();
for path in paths {
let relative_path = diff_paths(path, &working_dir);
addr.do_send(AddTask {
task_ref: task_generator.generate(path.clone().into(), 0),
task_ref: task_generator
.generate(relative_path.unwrap_or_else(|| path.clone()).into(), 0),
reason: TaskReason::User,
});
}
Expand Down
Loading
Loading