Skip to content

Commit

Permalink
Revert "Job scheduler implementation (#1308)"
Browse files Browse the repository at this point in the history
This reverts commit 1fbb74f.
  • Loading branch information
xzdandy authored Nov 14, 2023
1 parent 1fbb74f commit fe44197
Show file tree
Hide file tree
Showing 25 changed files with 10 additions and 1,452 deletions.
140 changes: 0 additions & 140 deletions evadb/catalog/catalog_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import shutil
from pathlib import Path
from typing import Any, List
Expand Down Expand Up @@ -40,8 +39,6 @@
FunctionIOCatalogEntry,
FunctionMetadataCatalogEntry,
IndexCatalogEntry,
JobCatalogEntry,
JobHistoryCatalogEntry,
TableCatalogEntry,
drop_all_tables_except_catalog,
init_db,
Expand All @@ -64,8 +61,6 @@
FunctionMetadataCatalogService,
)
from evadb.catalog.services.index_catalog_service import IndexCatalogService
from evadb.catalog.services.job_catalog_service import JobCatalogService
from evadb.catalog.services.job_history_catalog_service import JobHistoryCatalogService
from evadb.catalog.services.table_catalog_service import TableCatalogService
from evadb.catalog.sql_config import IDENTIFIER_COLUMN, SQLConfig
from evadb.expression.function_expression import FunctionExpression
Expand All @@ -90,10 +85,6 @@ def __init__(self, db_uri: str):
self._config_catalog_service = ConfigurationCatalogService(
self._sql_config.session
)
self._job_catalog_service = JobCatalogService(self._sql_config.session)
self._job_history_catalog_service = JobHistoryCatalogService(
self._sql_config.session
)
self._table_catalog_service = TableCatalogService(self._sql_config.session)
self._column_service = ColumnCatalogService(self._sql_config.session)
self._function_service = FunctionCatalogService(self._sql_config.session)
Expand Down Expand Up @@ -224,137 +215,6 @@ def check_native_table_exists(self, table_name: str, database_name: str):

return True

"Job catalog services"

def insert_job_catalog_entry(
self,
name: str,
queries: str,
start_time: datetime,
end_time: datetime,
repeat_interval: int,
active: bool,
next_schedule_run: datetime,
) -> JobCatalogEntry:
"""A new entry is persisted in the job catalog.
Args:
name: job name
queries: job's queries
start_time: job start time
end_time: job end time
repeat_interval: job repeat interval
active: job status
next_schedule_run: next run time as per schedule
"""
job_entry = self._job_catalog_service.insert_entry(
name,
queries,
start_time,
end_time,
repeat_interval,
active,
next_schedule_run,
)

return job_entry

def get_job_catalog_entry(self, job_name: str) -> JobCatalogEntry:
"""
Returns the job catalog entry for the given database_name
Arguments:
job_name (str): name of the job
Returns:
JobCatalogEntry
"""

table_entry = self._job_catalog_service.get_entry_by_name(job_name)

return table_entry

def drop_job_catalog_entry(self, job_entry: JobCatalogEntry) -> bool:
"""
This method deletes the job from catalog.
Arguments:
job_entry: job catalog entry to remove
Returns:
True if successfully deleted else False
"""
return self._job_catalog_service.delete_entry(job_entry)

def get_next_executable_job(self, only_past_jobs: bool = False) -> JobCatalogEntry:
"""Get the oldest job that is ready to be triggered by trigger time
Arguments:
only_past_jobs: boolean flag to denote if only jobs with trigger time in
past should be considered
Returns:
Returns the first job to be triggered
"""
return self._job_catalog_service.get_next_executable_job(only_past_jobs)

def update_job_catalog_entry(
self, job_name: str, next_scheduled_run: datetime, active: bool
):
"""Update the next_scheduled_run and active column as per the provided values
Arguments:
job_name (str): job which should be updated
next_run_time (datetime): the next trigger time for the job
active (bool): the active status for the job
"""
self._job_catalog_service.update_next_scheduled_run(
job_name, next_scheduled_run, active
)

"Job history catalog services"

def insert_job_history_catalog_entry(
self,
job_id: str,
job_name: str,
execution_start_time: datetime,
execution_end_time: datetime,
) -> JobCatalogEntry:
"""A new entry is persisted in the job history catalog.
Args:
job_id: job id for the execution entry
job_name: job name for the execution entry
execution_start_time: job execution start time
execution_end_time: job execution end time
"""
job_history_entry = self._job_history_catalog_service.insert_entry(
job_id, job_name, execution_start_time, execution_end_time
)

return job_history_entry

def get_job_history_by_job_id(self, job_id: int) -> list[JobHistoryCatalogEntry]:
"""Returns all the entries present for this job_id on in the history.
Args:
job_id: the id of job whose history should be fetched
"""
return self._job_history_catalog_service.get_entry_by_job_id(job_id)

def update_job_history_end_time(
self, job_id: int, execution_start_time: datetime, execution_end_time: datetime
) -> list[JobHistoryCatalogEntry]:
"""Updates the execution_end_time for this job history matching job_id and execution_start_time.
Args:
job_id: id of the job whose history entry which should be updated
execution_start_time: the start time for the job history entry
execution_end_time: the end time for the job history entry
"""
return self._job_history_catalog_service.update_entry_end_time(
job_id, execution_start_time, execution_end_time
)

"Table catalog services"

def insert_table_catalog_entry(
Expand Down
92 changes: 0 additions & 92 deletions evadb/catalog/models/job_catalog.py

This file was deleted.

73 changes: 0 additions & 73 deletions evadb/catalog/models/job_history_catalog.py

This file was deleted.

Loading

0 comments on commit fe44197

Please sign in to comment.