Skip to content

Commit

Permalink
ActionScheduler will now use ActionListener instead of tokio::watch (T…
Browse files Browse the repository at this point in the history
…raceMachina#1091)

This will enable the underlying scheduler to intercept the Drop
call allowing easier cleanups of actively listened actions.
  • Loading branch information
allada authored Jul 7, 2024
1 parent d70d31d commit cfc0cf6
Show file tree
Hide file tree
Showing 13 changed files with 445 additions and 253 deletions.
1 change: 1 addition & 0 deletions nativelink-scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ rust_library(
srcs = [
"src/action_scheduler.rs",
"src/cache_lookup_scheduler.rs",
"src/default_action_listener.rs",
"src/default_scheduler_factory.rs",
"src/grpc_scheduler.rs",
"src/lib.rs",
Expand Down
22 changes: 19 additions & 3 deletions nativelink-scheduler/src/action_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,32 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::pin::Pin;
use std::sync::Arc;

use async_trait::async_trait;
use futures::Future;
use nativelink_error::Error;
use nativelink_util::action_messages::{ActionInfo, ActionState, ClientOperationId};
use nativelink_util::metrics_utils::Registry;
use tokio::sync::watch;

use crate::platform_property_manager::PlatformPropertyManager;

/// ActionListener interface is responsible for interfacing with clients
/// that are interested in the state of an action.
pub trait ActionListener: Sync + Send + Unpin {
/// Returns the client operation id.
fn client_operation_id(&self) -> &ClientOperationId;

/// Returns the current action state.
fn action_state(&self) -> Arc<ActionState>;

/// Waits for the action state to change.
fn changed(
&mut self,
) -> Pin<Box<dyn Future<Output = Result<Arc<ActionState>, Error>> + Send + Sync + '_>>;
}

/// ActionScheduler interface is responsible for interactions between the scheduler
/// and action related operations.
#[async_trait]
Expand All @@ -37,13 +53,13 @@ pub trait ActionScheduler: Sync + Send + Unpin {
&self,
client_operation_id: ClientOperationId,
action_info: ActionInfo,
) -> Result<(ClientOperationId, watch::Receiver<Arc<ActionState>>), Error>;
) -> Result<Pin<Box<dyn ActionListener>>, Error>;

/// Find an existing action by its name.
async fn find_by_client_operation_id(
&self,
client_operation_id: &ClientOperationId,
) -> Result<Option<watch::Receiver<Arc<ActionState>>>, Error>;
) -> Result<Option<Pin<Box<dyn ActionListener>>>, Error>;

/// Cleans up the cache of recently completed actions.
async fn clean_recently_completed_actions(&self);
Expand Down
Loading

0 comments on commit cfc0cf6

Please sign in to comment.