Skip to content

Commit

Permalink
Add eom
Browse files Browse the repository at this point in the history
  • Loading branch information
s5u13b committed Sep 20, 2024
1 parent 21f9efc commit db00eb8
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 8 deletions.
13 changes: 8 additions & 5 deletions llumnix/llumlet/llumlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,10 @@ def migrate_out(self, dst_instance_name: str) -> List[str]:
return []
migrated_request_list = []
for migrate_out_request in migrate_out_requests:
migrated_request_list.extend(self._migrate_out_one_request(migrate_out_request, dst_instance_name))
migrated_request = self._migrate_out_one_request(migrate_out_request, dst_instance_name)
migrated_request_list.extend(migrated_request)
if len(migrated_request) == 0 and migrate_out_request.eom:
break
return migrated_request_list

def _migrate_out_one_request(self, migrate_out_request, dst_instance_name: str):
Expand All @@ -115,7 +118,7 @@ def _migrate_out_one_request(self, migrate_out_request, dst_instance_name: str):
migrate_in_ray_actor = ray.get_actor(dst_instance_name, namespace='llumnix')
dst_instance_id = dst_instance_name[len("instance_"):]
logger.info("{}->{} begin migrate out".format(self.instance_id, dst_instance_id))
migrated_request_list = []
migrated_request = []
assert migrate_out_request.status in [RequestStatus.WAITING, RequestStatus.RUNNING], "Only migrate out waiting/running request"
if migrate_out_request.status == RequestStatus.RUNNING:
status = self.migration_coordinator.migrate_out_running_request(migrate_in_ray_actor, migrate_out_request)
Expand All @@ -125,14 +128,14 @@ def _migrate_out_one_request(self, migrate_out_request, dst_instance_name: str):
ray.get(migrate_in_ray_actor.execute_engine_method.remote("commit_dst_request", migrate_out_request))
if migrate_out_request.status == RequestStatus.RUNNING:
self.backend_engine.free_src_request(migrate_out_request)
migrated_request_list.append(migrate_out_request.request_id)
migrated_request.append(migrate_out_request.request_id)
self.backend_engine.remove_migrating_out_request_last_stage(migrate_out_request)
elif status == MigrationStatus.FINISHED_SRC_ABORTED:
migrate_out_request.reset_migration_args()
ray.get(migrate_in_ray_actor.execute_migration_method.remote("free_dst_pre_alloc_cache", migrate_out_request.request_id))
t1 = time.time()
logger.info("{}->{} migrate done, migrate request {}, migration status: {}, len: {} blocks, cost: {} ms" \
.format(self.instance_id, dst_instance_id, migrated_request_list, status, \
.format(self.instance_id, dst_instance_id, migrated_request, status, \
sum(migrate_out_request.stage_num_blocks_list), (t1 - t0)*1000))
except ray.exceptions.RayActorError:
logger.info("[migrate_out] instance {} is dead".format(dst_instance_name[len("instance_"):]))
Expand All @@ -142,7 +145,7 @@ def _migrate_out_one_request(self, migrate_out_request, dst_instance_name: str):
logger.error("unexpected exception occurs: {}".format(e))
logger.error("exception traceback: {}".format(traceback.format_exc()))
raise
return migrated_request_list
return migrated_request

def get_instance_info(self) -> InstanceInfo:
return self.backend_engine.engine.instance_info
Expand Down
3 changes: 2 additions & 1 deletion llumnix/llumlet/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ def __init__(self, request_id: str, server_info: ServerInfo) -> None:
self.last_preemption_time = None
self.stage_timestamps = []
self.stage_num_blocks_list = []

self.waiting_migrating = False
# end-of-migration
self.eom = False

def reset_migration_args(self):
self.last_preemption_time = None
Expand Down
4 changes: 2 additions & 2 deletions tests/llumlet/test_local_migration_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def finished(self) -> bool:

@property
def arrival_time(self) -> float:
self._arrival_time
return self._arrival_time

@property
def status(self) -> RequestStatus:
Expand Down Expand Up @@ -89,7 +89,7 @@ def test_scheduler_policy():
engine.add_request_waiting(request_id="5", length=2, arrival_time=1.0)
scheduler.request_migration_policy = "FCW"
assert scheduler.get_migrate_out_requests()[0].request_id == "5"

scheduler.request_migration_policy = "FCWSR"
assert scheduler.get_migrate_out_requests()[0].request_id == "5"
assert scheduler.get_migrate_out_requests()[1].request_id == "0"
Expand Down

0 comments on commit db00eb8

Please sign in to comment.