Skip to content

Commit

Permalink
Add some async job infrastructure.
Browse files Browse the repository at this point in the history
  • Loading branch information
jneem committed Jun 28, 2021
1 parent 02f8295 commit 6c215d2
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 42 deletions.
34 changes: 10 additions & 24 deletions helix-term/src/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
args::Args,
compositor::Compositor,
config::Config,
job::Jobs,
keymap::Keymaps,
ui::{self, Spinner},
};
Expand All @@ -31,13 +32,6 @@ use crossterm::{

use futures_util::{future, stream::FuturesUnordered};

type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
pub type LspCallback =
BoxFuture<Result<Box<dyn FnOnce(&mut Editor, &mut Compositor) + Send>, anyhow::Error>>;

pub type LspCallbacks = FuturesUnordered<LspCallback>;
pub type LspCallbackWrapper = Box<dyn FnOnce(&mut Editor, &mut Compositor) + Send>;

pub struct Application {
compositor: Compositor,
editor: Editor,
Expand All @@ -48,7 +42,7 @@ pub struct Application {
theme_loader: Arc<theme::Loader>,
syn_loader: Arc<syntax::Loader>,

callbacks: LspCallbacks,
jobs: Jobs,
lsp_progress: LspProgressMap,
}

Expand Down Expand Up @@ -120,7 +114,7 @@ impl Application {
theme_loader,
syn_loader,

callbacks: FuturesUnordered::new(),
jobs: Jobs::new(),
lsp_progress: LspProgressMap::new(),
};

Expand All @@ -130,11 +124,11 @@ impl Application {
fn render(&mut self) {
let editor = &mut self.editor;
let compositor = &mut self.compositor;
let callbacks = &mut self.callbacks;
let jobs = &mut self.jobs;

let mut cx = crate::compositor::Context {
editor,
callbacks,
jobs,
scroll: None,
};

Expand All @@ -148,6 +142,7 @@ impl Application {

loop {
if self.editor.should_close() {
self.jobs.finish();
break;
}

Expand All @@ -172,27 +167,18 @@ impl Application {
}
self.render();
}
Some(callback) = &mut self.callbacks.next() => {
self.handle_language_server_callback(callback)
Some(callback) = self.jobs.next() => {
self.jobs.handle_callback(&mut self.editor, &mut self.compositor, callback);
self.render();
}
}
}
}
pub fn handle_language_server_callback(
&mut self,
callback: Result<LspCallbackWrapper, anyhow::Error>,
) {
if let Ok(callback) = callback {
// TODO: handle Err()
callback(&mut self.editor, &mut self.compositor);
self.render();
}
}

pub fn handle_terminal_events(&mut self, event: Option<Result<Event, crossterm::ErrorKind>>) {
let mut cx = crate::compositor::Context {
editor: &mut self.editor,
callbacks: &mut self.callbacks,
jobs: &mut self.jobs,
scroll: None,
};
// Handle key events
Expand Down
32 changes: 18 additions & 14 deletions helix-term/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use crate::{
ui::{self, Completion, Picker, Popup, Prompt, PromptEvent},
};

use crate::application::{LspCallback, LspCallbackWrapper, LspCallbacks};
use futures_util::FutureExt;
use crate::job::{self, Job, JobFuture, Jobs};
use futures_util::{FutureExt, TryFutureExt};
use std::{fmt, future::Future, path::Display, str::FromStr};

use std::{
Expand All @@ -54,7 +54,7 @@ pub struct Context<'a> {

pub callback: Option<crate::compositor::Callback>,
pub on_next_key_callback: Option<Box<dyn FnOnce(&mut Context, KeyEvent)>>,
pub callbacks: &'a mut LspCallbacks,
pub jobs: &'a mut Jobs,
}

impl<'a> Context<'a> {
Expand Down Expand Up @@ -85,13 +85,13 @@ impl<'a> Context<'a> {
let callback = Box::pin(async move {
let json = call.await?;
let response = serde_json::from_value(json)?;
let call: LspCallbackWrapper =
let call: job::Callback =
Box::new(move |editor: &mut Editor, compositor: &mut Compositor| {
callback(editor, compositor, response)
});
Ok(call)
});
self.callbacks.push(callback);
self.jobs.callback(callback);
}

/// Returns 1 if no explicit count was provided
Expand Down Expand Up @@ -1110,7 +1110,7 @@ mod cmd {
path: Option<P>,
) -> Result<tokio::task::JoinHandle<Result<(), anyhow::Error>>, anyhow::Error> {
use anyhow::anyhow;
let callbacks = &mut cx.callbacks;
let jobs = &mut cx.jobs;
let (view, doc) = current!(cx.editor);

if let Some(path) = path {
Expand All @@ -1129,7 +1129,7 @@ mod cmd {
doc.format().map(|fmt| {
let shared = fmt.shared();
let callback = make_format_callback(doc.id(), doc.version(), true, shared.clone());
callbacks.push(callback);
jobs.callback(callback);
shared
})
} else {
Expand All @@ -1139,8 +1139,12 @@ mod cmd {
}

fn write(cx: &mut compositor::Context, args: &[&str], event: PromptEvent) {
if let Err(e) = write_impl(cx, args.first()) {
cx.editor.set_error(e.to_string());
match write_impl(cx, args.first()) {
Err(e) => cx.editor.set_error(e.to_string()),
Ok(handle) => {
cx.jobs
.add(Job::new(handle.unwrap_or_else(|e| Err(e.into()))).wait_before_exiting());
}
};
}

Expand All @@ -1153,7 +1157,7 @@ mod cmd {

if let Some(format) = doc.format() {
let callback = make_format_callback(doc.id(), doc.version(), false, format);
cx.callbacks.push(callback);
cx.jobs.callback(callback);
}
}

Expand Down Expand Up @@ -1879,10 +1883,10 @@ fn make_format_callback(
doc_version: i32,
set_unmodified: bool,
format: impl Future<Output = helix_lsp::util::LspFormatting> + Send + 'static,
) -> LspCallback {
Box::pin(async move {
) -> impl Future<Output = anyhow::Result<job::Callback>> {
async move {
let format = format.await;
let call: LspCallbackWrapper =
let call: job::Callback =
Box::new(move |editor: &mut Editor, compositor: &mut Compositor| {
let view_id = view!(editor).id;
if let Some(doc) = editor.document_mut(doc_id) {
Expand All @@ -1898,7 +1902,7 @@ fn make_format_callback(
}
});
Ok(call)
})
}
}

enum Open {
Expand Down
4 changes: 2 additions & 2 deletions helix-term/src/compositor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ pub enum EventResult {

use helix_view::Editor;

use crate::application::LspCallbacks;
use crate::job::Jobs;

pub struct Context<'a> {
pub editor: &'a mut Editor,
pub scroll: Option<usize>,
pub callbacks: &'a mut LspCallbacks,
pub jobs: &'a mut Jobs,
}

pub trait Component: Any + AnyComponent {
Expand Down
100 changes: 100 additions & 0 deletions helix-term/src/job.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use helix_view::Editor;

use crate::compositor::Compositor;

use futures_util::future::{self, BoxFuture, Future, FutureExt};
use futures_util::stream::{self, FuturesUnordered, Select, StreamExt};

pub type Callback = Box<dyn FnOnce(&mut Editor, &mut Compositor) + Send>;
pub type JobFuture = BoxFuture<'static, anyhow::Result<Option<Callback>>>;

pub struct Job {
pub future: BoxFuture<'static, anyhow::Result<Option<Callback>>>,
/// Do we need to wait for this job to finish before exiting?
pub wait: bool,
}

#[derive(Default)]
pub struct Jobs {
futures: FuturesUnordered<JobFuture>,
/// These are the ones that need to complete before we exit.
wait_futures: FuturesUnordered<JobFuture>,
}

impl Job {
pub fn new<F: Future<Output = anyhow::Result<()>> + Send + 'static>(f: F) -> Job {
Job {
future: f.map(|r| r.map(|()| None)).boxed(),
wait: false,
}
}

pub fn with_callback<F: Future<Output = anyhow::Result<Callback>> + Send + 'static>(
f: F,
) -> Job {
Job {
future: f.map(|r| r.map(|x| Some(x))).boxed(),
wait: false,
}
}

pub fn wait_before_exiting(mut self) -> Job {
self.wait = true;
self
}
}

impl Jobs {
pub fn new() -> Jobs {
Jobs::default()
}

pub fn spawn<F: Future<Output = anyhow::Result<()>> + Send + 'static>(&mut self, f: F) {
self.add(Job::new(f));
}

pub fn callback<F: Future<Output = anyhow::Result<Callback>> + Send + 'static>(
&mut self,
f: F,
) {
self.add(Job::with_callback(f));
}

pub fn handle_callback(
&mut self,
editor: &mut Editor,
compositor: &mut Compositor,
call: anyhow::Result<Option<Callback>>,
) {
match call {
Ok(None) => {}
Ok(Some(call)) => {
call(editor, compositor);
}
Err(e) => {
editor.set_error(format!("Async job failed: {}", e));
}
}
}

pub fn next<'a>(
&'a mut self,
) -> impl Future<Output = Option<anyhow::Result<Option<Callback>>>> + 'a {
future::select(self.futures.next(), self.wait_futures.next())
.map(|either| either.factor_first().0)
}

pub fn add(&mut self, j: Job) {
if j.wait {
self.wait_futures.push(j.future);
} else {
self.futures.push(j.future);
}
}

/// Blocks until all the jobs that need to be waited on are done.
pub fn finish(&mut self) {
let wait_futures = std::mem::take(&mut self.wait_futures);
helix_lsp::block_on(wait_futures.for_each(|_| future::ready(())));
}
}
1 change: 1 addition & 0 deletions helix-term/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ pub mod args;
pub mod commands;
pub mod compositor;
pub mod config;
pub mod job;
pub mod keymap;
pub mod ui;
4 changes: 2 additions & 2 deletions helix-term/src/ui/editor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ impl Component for EditorView {
count: None,
callback: None,
on_next_key_callback: None,
callbacks: cx.callbacks,
jobs: cx.jobs,
};

if let Some(on_next_key) = self.on_next_key.take() {
Expand All @@ -639,7 +639,7 @@ impl Component for EditorView {
// use a fake context here
let mut cx = Context {
editor: cxt.editor,
callbacks: cxt.callbacks,
jobs: cxt.jobs,
scroll: None,
};
let res = completion.handle_event(event, &mut cx);
Expand Down

0 comments on commit 6c215d2

Please sign in to comment.