Skip to content

Commit

Permalink
apply ruff format
Browse files Browse the repository at this point in the history
  • Loading branch information
hirosassa committed Feb 2, 2025
1 parent 47b9102 commit ada9ed8
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 18 deletions.
10 changes: 6 additions & 4 deletions gokart/file_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ def load(self, file):
return pd.DataFrame()

def dump(self, obj, file):
assert (
isinstance(obj, pd.DataFrame) or isinstance(obj, pd.Series) or isinstance(obj, dict)
), f'requires pd.DataFrame or pd.Series or dict, but {type(obj)} is passed.'
assert isinstance(obj, pd.DataFrame) or isinstance(obj, pd.Series) or isinstance(obj, dict), (
f'requires pd.DataFrame or pd.Series or dict, but {type(obj)} is passed.'
)
if isinstance(obj, dict):
obj = pd.DataFrame.from_dict(obj)
obj.to_json(file)
Expand Down Expand Up @@ -263,8 +263,10 @@ def dump(self, obj, file):

if self._store_index_in_feather:
index_column_name = f'{self.INDEX_COLUMN_PREFIX}{dump_obj.index.name}'
assert index_column_name not in dump_obj.columns, f'column name {index_column_name} already exists in dump_obj. \
assert index_column_name not in dump_obj.columns, (
f'column name {index_column_name} already exists in dump_obj. \
Consider not saving index by setting store_index_in_feather=False.'
)
assert dump_obj.index.name != 'None', 'index name is "None", which is not allowed in gokart. Consider setting another index name.'

dump_obj[index_column_name] = dump_obj.index
Expand Down
2 changes: 1 addition & 1 deletion gokart/slack/event_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def get_summary(self) -> str:
def get_event_list(self) -> str:
message = ''
if len(self._failure_events) != 0:
failure_message = os.linesep.join([f"Task: {failure['task']}; Exception: {failure['exception']}" for failure in self._failure_events])
failure_message = os.linesep.join([f'Task: {failure["task"]}; Exception: {failure["exception"]}' for failure in self._failure_events])
message += '---- Failure Tasks ----' + os.linesep + failure_message
if len(self._success_events) != 0:
success_message = os.linesep.join(self._success_events)
Expand Down
4 changes: 2 additions & 2 deletions gokart/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class TaskOnKart(luigi.Task, Generic[T]):
)
serialized_task_definition_check: bool = luigi.BoolParameter(
default=False,
description='If this is true, even if all outputs are present,' 'this task will be executed if any changes have been made to the code.',
description='If this is true, even if all outputs are present,this task will be executed if any changes have been made to the code.',
significant=False,
)
delete_unnecessary_output_files: bool = luigi.BoolParameter(
Expand Down Expand Up @@ -581,5 +581,5 @@ def _make_representation(self, param_obj: luigi.Parameter, param_value):
if isinstance(param_obj, TaskInstanceParameter):
return f'{param_value.get_task_family()}({param_value.make_unique_id()})'
if isinstance(param_obj, ListTaskInstanceParameter):
return f"[{', '.join(f'{v.get_task_family()}({v.make_unique_id()})' for v in param_value)}]"
return f'[{", ".join(f"{v.get_task_family()}({v.make_unique_id()})" for v in param_value)}]'
return param_obj.serialize(param_value)
20 changes: 9 additions & 11 deletions gokart/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ def run(self) -> None:
status = DONE
else:
status = FAILED
expl = 'Task is an external data dependency ' 'and data does not exist (yet?).'
expl = 'Task is an external data dependency and data does not exist (yet?).'
else:
with self._forward_attributes():
new_deps = self._run_get_new_deps()
Expand Down Expand Up @@ -332,10 +332,10 @@ class gokart_worker(luigi.Config):
count_uniques = luigi.BoolParameter(
default=False,
config_path=dict(section='core', name='worker-count-uniques'),
description='worker-count-uniques means that we will keep a ' 'worker alive only if it has a unique pending task, as ' 'well as having keep-alive true',
description='worker-count-uniques means that we will keep a worker alive only if it has a unique pending task, as well as having keep-alive true',
)
count_last_scheduled = luigi.BoolParameter(
default=False, description='Keep a worker alive only if there are ' 'pending tasks which it was the last to ' 'schedule.'
default=False, description='Keep a worker alive only if there are pending tasks which it was the last to schedule.'
)
wait_interval = luigi.FloatParameter(default=1.0, config_path=dict(section='core', name='worker-wait-interval'))
wait_jitter = luigi.FloatParameter(default=5.0)
Expand All @@ -348,18 +348,18 @@ class gokart_worker(luigi.Config):
retry_external_tasks = luigi.BoolParameter(
default=False,
config_path=dict(section='core', name='retry-external-tasks'),
description='If true, incomplete external tasks will be ' 'retested for completion while Luigi is running.',
description='If true, incomplete external tasks will be retested for completion while Luigi is running.',
)
send_failure_email = luigi.BoolParameter(default=True, description='If true, send e-mails directly from the worker' 'on failure')
no_install_shutdown_handler = luigi.BoolParameter(default=False, description='If true, the SIGUSR1 shutdown handler will' 'NOT be install on the worker')
check_unfulfilled_deps = luigi.BoolParameter(default=True, description='If true, check for completeness of ' 'dependencies before running a task')
send_failure_email = luigi.BoolParameter(default=True, description='If true, send e-mails directly from the workeron failure')
no_install_shutdown_handler = luigi.BoolParameter(default=False, description='If true, the SIGUSR1 shutdown handler willNOT be install on the worker')
check_unfulfilled_deps = luigi.BoolParameter(default=True, description='If true, check for completeness of dependencies before running a task')
check_complete_on_run = luigi.BoolParameter(
default=False,
description='If true, only mark tasks as done after running if they are complete. '
'Regardless of this setting, the worker will always check if external '
'tasks are complete before marking them as done.',
)
force_multiprocessing = luigi.BoolParameter(default=False, description='If true, use multiprocessing also when ' 'running with 1 worker')
force_multiprocessing = luigi.BoolParameter(default=False, description='If true, use multiprocessing also when running with 1 worker')
task_process_context = luigi.OptionalParameter(
default=None,
description='If set to a fully qualified class name, the class will '
Expand Down Expand Up @@ -723,9 +723,7 @@ def _add(self, task: Task, is_complete: bool) -> Generator[Task, None, None]:
status = PENDING
runnable = self._config.retry_external_tasks
task.trigger_event(Event.DEPENDENCY_MISSING, task)
logger.warning(
'Data for %s does not exist (yet?). The task is an ' 'external data dependency, so it cannot be run from' ' this luigi process.', task
)
logger.warning('Data for %s does not exist (yet?). The task is an external data dependency, so it cannot be run from this luigi process.', task)

else:
try:
Expand Down

0 comments on commit ada9ed8

Please sign in to comment.