Skip to content

Commit

Permalink
Merge pull request #387 from HSF/dev
Browse files Browse the repository at this point in the history
fix to make sure built request can be handled
  • Loading branch information
wguanicedew authored Jan 31, 2025
2 parents c070114 + f1578d6 commit ddfbebf
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 20 deletions.
41 changes: 27 additions & 14 deletions main/lib/idds/agents/clerk/clerk.py
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,7 @@ def handle_build_request(self, req):
self.logger.warn(log_pre + "Handle build request error result: %s" % str(ret_req))
return ret_req

def update_request(self, req):
def update_request(self, req, origin_req=None):
new_tf_ids, update_tf_ids = [], []
try:
log_pre = self.get_log_prefix(req)
Expand All @@ -1027,13 +1027,19 @@ def update_request(self, req):
else:
new_conditions = []

if origin_req:
origin_status = origin_req['status']
else:
origin_status = None

retry = True
retry_num = 0
while retry:
retry = False
retry_num += 1
try:
_, new_tf_ids, update_tf_ids = core_requests.update_request_with_transforms(req['request_id'], req['parameters'],
origin_status=origin_status,
new_transforms=new_transforms,
update_transforms=update_transforms,
new_conditions=new_conditions)
Expand Down Expand Up @@ -1067,8 +1073,14 @@ def update_request(self, req):
req_parameters['update_retries'] = req['parameters']['update_retries']
if 'errors' in req['parameters']:
req_parameters['errors'] = req['parameters']['errors']

if origin_req:
origin_status = origin_req['status']
else:
origin_status = None

self.logger.warn(log_pre + "Update request in exception: %s" % str(req_parameters))
core_requests.update_request_with_transforms(req['request_id'], req_parameters)
core_requests.update_request_with_transforms(req['request_id'], req_parameters, origin_status=origin_status)
except Exception as ex:
self.logger.error(ex)
self.logger.error(traceback.format_exc())
Expand All @@ -1093,7 +1105,7 @@ def process_new_request(self, event):
ret = self.handle_new_generic_request(req)
else:
ret = self.handle_new_request(req)
new_tf_ids, update_tf_ids = self.update_request(ret)
new_tf_ids, update_tf_ids = self.update_request(ret, origin_req=req)
for tf_id in new_tf_ids:
self.logger.info(log_pre + "NewTransformEvent(transform_id: %s)" % str(tf_id))
event = NewTransformEvent(publisher_id=self.id, transform_id=tf_id)
Expand Down Expand Up @@ -1437,14 +1449,15 @@ def process_update_request(self, event):
req = self.get_request(request_id=event._request_id, locking=True)
if not req:
self.logger.error("Cannot find request for event: %s" % str(event))
pro_ret = ReturnCode.Locked.value
# pro_ret = ReturnCode.Locked.value
pro_ret = ReturnCode.Ok.value
else:
log_pre = self.get_log_prefix(req)
if req['request_type'] in [RequestType.iWorkflow, RequestType.iWorkflowLocal]:
ret = self.handle_update_irequest(req, event=event)
else:
ret = self.handle_update_request(req, event=event)
new_tf_ids, update_tf_ids = self.update_request(ret)
new_tf_ids, update_tf_ids = self.update_request(ret, origin_req=req)
for tf_id in new_tf_ids:
self.logger.info(log_pre + "NewTransformEvent(transform_id: %s)" % tf_id)
event = NewTransformEvent(publisher_id=self.id, transform_id=tf_id, content=event._content)
Expand Down Expand Up @@ -1542,18 +1555,18 @@ def process_abort_request(self, event):
if req['errors'] and 'msg' in req['errors']:
ret['parameters']['errors']['msg'] = req['errors']['msg']
self.logger.info(log_pre + "process_abort_request result: %s" % str(ret))
self.update_request(ret)
self.update_request(ret, origin_req=req)
self.handle_command(event, cmd_status=CommandStatus.Failed, errors="Request is already terminated. Cannot be aborted")
elif req['request_type'] in [RequestType.iWorkflow, RequestType.iWorkflowLocal]:
ret = self.handle_close_irequest(req, event=event)
self.update_request(ret)
self.update_request(ret, origin_req=req)

# self.handle_command(event, cmd_status=CommandStatus.Failed, errors="Not support abortion for iWorkflow")
self.handle_command(event, cmd_status=CommandStatus.Processed, errors=None)
else:
ret = self.handle_abort_request(req, event)
self.logger.info(log_pre + "process_abort_request result: %s" % str(ret))
self.update_request(ret)
self.update_request(ret, origin_req=req)
to_abort_transform_id = None
if event and event._content and event._content['cmd_content'] and 'transform_id' in event._content['cmd_content']:
to_abort_transform_id = event._content['cmd_content']['transform_id']
Expand Down Expand Up @@ -1689,17 +1702,17 @@ def process_close_request(self, event):
if req['errors'] and 'msg' in req['errors']:
ret['parameters']['errors']['msg'] = req['errors']['msg']
self.logger.info(log_pre + "process_abort_request result: %s" % str(ret))
self.update_request(ret)
self.update_request(ret, origin_req=req)
self.handle_command(event, cmd_status=CommandStatus.Failed, errors="Request is already terminated. Cannot be closed")
else:
if req['request_type'] in [RequestType.iWorkflow, RequestType.iWorkflowLocal]:
ret = self.handle_close_irequest(req, event=event)
self.update_request(ret)
self.update_request(ret, origin_req=req)
else:
pass
ret = self.handle_abort_request(req, event)
self.logger.info(log_pre + "process_abort_request result: %s" % str(ret))
self.update_request(ret)
self.update_request(ret, origin_req=req)
to_abort_transform_id = None
if event and event._content and event._content['cmd_content'] and 'transform_id' in event._content['cmd_content']:
to_abort_transform_id = event._content['cmd_content']['transform_id']
Expand Down Expand Up @@ -1840,18 +1853,18 @@ def process_resume_request(self, event):
ret['parameters']['errors']['msg'] = req['errors']['msg']
self.logger.info(log_pre + "process_resume_request result: %s" % str(ret))

self.update_request(ret)
self.update_request(ret, origin_req=req)
self.handle_command(event, cmd_status=CommandStatus.Failed, errors="Request is already finished. Cannot be resumed")
elif req['request_type'] in [RequestType.iWorkflow, RequestType.iWorkflowLocal]:
ret = self.handle_resume_irequest(req)
self.update_request(ret)
self.update_request(ret, origin_req=req)
# self.handle_command(event, cmd_status=CommandStatus.Failed, errors="Not support to reusme for iWorkflow")
self.handle_command(event, cmd_status=CommandStatus.Processed, errors=None)
else:
ret = self.handle_resume_request(req)
self.logger.info(log_pre + "process_resume_request result: %s" % str(ret))

self.update_request(ret)
self.update_request(ret, origin_req=req)
wf = req['request_metadata']['workflow']
works = wf.get_all_works()
if works:
Expand Down
3 changes: 2 additions & 1 deletion main/lib/idds/core/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ def generate_collections(transform):

@transactional_session
def update_request_with_transforms(request_id, parameters,
origin_status=None,
new_transforms=None, update_transforms=None,
new_messages=None, update_messages=None,
new_conditions=None, session=None):
Expand Down Expand Up @@ -369,7 +370,7 @@ def update_request_with_transforms(request_id, parameters,
if new_conditions:
orm_conditions.add_conditions(new_conditions, session=session)

return orm_requests.update_request(request_id, parameters, session=session), new_tf_ids, update_tf_ids
return orm_requests.update_request(request_id, parameters, origin_status=origin_status, session=session), new_tf_ids, update_tf_ids


@transactional_session
Expand Down
22 changes: 17 additions & 5 deletions main/lib/idds/orm/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1131,7 +1131,7 @@ def get_requests_by_status_type(status, request_type=None, time_period=None, req


@transactional_session
def update_request(request_id, parameters, update_request_metadata=False, locking=False, session=None):
def update_request(request_id, parameters, update_request_metadata=False, locking=False, origin_status=None, session=None):
"""
update an request.
Expand Down Expand Up @@ -1180,17 +1180,29 @@ def update_request(request_id, parameters, update_request_metadata=False, lockin
parameters['_processing_metadata'] = parameters['processing_metadata']
del parameters['processing_metadata']

build_status = [RequestStatus.Built, RequestStatus.Built]
query = session.query(models.Request).filter_by(request_id=request_id)

if locking:
query = query.filter(models.Request.locking == RequestLocking.Idle)
query = query.with_for_update(skip_locked=True)
num_rows = query.update(parameters, synchronize_session=False)
return num_rows
else:
query = query.filter(not_(models.Request.status.in_(build_status)))
build_status = [RequestStatus.Built, RequestStatus.Built]
if origin_status and origin_status not in build_status:
query_no_built = query.filter(not_(models.Request.status.in_(build_status)))

num_rows = query.update(parameters, synchronize_session=False)
return num_rows
num_rows = query_no_built.update(parameters, synchronize_session=False)
if num_rows > 0:
return num_rows
else:
if 'status' in parameters:
parameters['status'] = origin_status
num_rows = query.update(parameters, synchronize_session=False)
return num_rows
else:
num_rows = query.update(parameters, synchronize_session=False)
return num_rows
except sqlalchemy.orm.exc.NoResultFound as error:
raise exceptions.NoObject('Request %s cannot be found: %s' % (request_id, error))
return 0
Expand Down

0 comments on commit ddfbebf

Please sign in to comment.