Skip to content

Commit

Permalink
added job history for job scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
dungnmaster committed Nov 3, 2023
1 parent 04fab31 commit cbb87e7
Show file tree
Hide file tree
Showing 13 changed files with 417 additions and 32 deletions.
52 changes: 51 additions & 1 deletion evadb/catalog/catalog_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
FunctionMetadataCatalogEntry,
IndexCatalogEntry,
JobCatalogEntry,
JobHistoryCatalogEntry,
TableCatalogEntry,
drop_all_tables_except_catalog,
init_db,
Expand All @@ -64,6 +65,7 @@
)
from evadb.catalog.services.index_catalog_service import IndexCatalogService
from evadb.catalog.services.job_catalog_service import JobCatalogService
from evadb.catalog.services.job_history_catalog_service import JobHistoryCatalogService
from evadb.catalog.services.table_catalog_service import TableCatalogService
from evadb.catalog.sql_config import IDENTIFIER_COLUMN, SQLConfig
from evadb.expression.function_expression import FunctionExpression
Expand All @@ -89,6 +91,9 @@ def __init__(self, db_uri: str):
self._sql_config.session
)
self._job_catalog_service = JobCatalogService(self._sql_config.session)
self._job_history_catalog_service = JobHistoryCatalogService(
self._sql_config.session
)
self._table_catalog_service = TableCatalogService(self._sql_config.session)
self._column_service = ColumnCatalogService(self._sql_config.session)
self._function_service = FunctionCatalogService(self._sql_config.session)
Expand Down Expand Up @@ -231,7 +236,7 @@ def insert_job_catalog_entry(
active: bool,
next_schedule_run: datetime,
) -> JobCatalogEntry:
"""A new entry is persisted in the job catalog."
"""A new entry is persisted in the job catalog.
Args:
name: job name
Expand Down Expand Up @@ -305,6 +310,51 @@ def update_job_catalog_entry(
job_name, next_scheduled_run, active
)

"Job history catalog services"

def insert_job_history_catalog_entry(
self,
job_id: str,
job_name: str,
execution_start_time: datetime,
execution_end_time: datetime,
) -> JobCatalogEntry:
"""A new entry is persisted in the job history catalog.
Args:
job_id: job id for the execution entry
job_name: job name for the execution entry
execution_start_time: job execution start time
execution_end_time: job execution end time
"""
job_history_entry = self._job_history_catalog_service.insert_entry(
job_id, job_name, execution_start_time, execution_end_time
)

return job_history_entry

def get_job_history_by_job_id(self, job_id: int) -> list[JobHistoryCatalogEntry]:
"""Returns all the entries present for this job_id on in the history.
Args:
job_id: the id of job whose history should be fetched
"""
return self._job_history_catalog_service.get_entry_by_job_id(job_id)

def update_job_history_end_time(
self, job_id: int, execution_start_time: datetime, execution_end_time: datetime
) -> list[JobHistoryCatalogEntry]:
"""Updates the execution_end_time for this job history matching job_id and execution_start_time.
Args:
job_id: id of the job whose history entry which should be updated
execution_start_time: the start time for the job history entry
execution_end_time: the end time for the job history entry
"""
return self._job_history_catalog_service.update_entry_end_time(
job_id, execution_start_time, execution_end_time
)

"Table catalog services"

def insert_table_catalog_entry(
Expand Down
2 changes: 2 additions & 0 deletions evadb/catalog/models/job_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import json

from sqlalchemy import Boolean, Column, DateTime, Index, Integer, String
from sqlalchemy.orm import relationship

from evadb.catalog.models.base_model import BaseModel
from evadb.catalog.models.utils import JobCatalogEntry
Expand Down Expand Up @@ -56,6 +57,7 @@ class JobCatalog(BaseModel):
)

_next_run_index = Index("_next_run_index", _next_scheduled_run)
_job_history_catalog = relationship("JobHistoryCatalog", cascade="all, delete")

def __init__(
self,
Expand Down
59 changes: 59 additions & 0 deletions evadb/catalog/models/job_history_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,62 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime

from sqlalchemy import Column, DateTime, ForeignKey, Integer, String, UniqueConstraint

from evadb.catalog.models.base_model import BaseModel
from evadb.catalog.models.utils import JobHistoryCatalogEntry


class JobHistoryCatalog(BaseModel):
"""The `JobHistoryCatalog` stores the execution history of jobs .
`_row_id:` an autogenerated unique identifier.
`_job_id:` job id.
`_job_name:` job name.
`_execution_start_time:` start time of this run
`_execution_end_time:` end time for this run
`_created_at:` entry creation time
`_updated_at:` entry last update time
"""

__tablename__ = "job_history_catalog"

_job_id = Column(
"job_id", Integer, ForeignKey("job_catalog._row_id", ondelete="CASCADE")
)
_job_name = Column("job_name", String(100))
_execution_start_time = Column("execution_start_time", DateTime)
_execution_end_time = Column("execution_end_time", DateTime)
_created_at = Column("created_at", DateTime, default=datetime.datetime.now)
_updated_at = Column(
"updated_at",
DateTime,
default=datetime.datetime.now,
onupdate=datetime.datetime.now,
)

__table_args__ = (UniqueConstraint("job_id", "execution_start_time"), {})

def __init__(
self,
job_id: int,
job_name: str,
execution_start_time: datetime,
execution_end_time: datetime,
):
self._job_id = job_id
self._job_name = job_name
self._execution_start_time = execution_start_time
self._execution_end_time = execution_end_time

def as_dataclass(self) -> "JobHistoryCatalogEntry":
return JobHistoryCatalogEntry(
row_id=self._row_id,
job_id=self._job_id,
job_name=self._job_name,
execution_start_time=self._execution_start_time,
execution_end_time=self._execution_end_time,
created_at=self._created_at,
updated_at=self._updated_at,
)
24 changes: 24 additions & 0 deletions evadb/catalog/models/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,3 +305,27 @@ def display_format(self):
"created_at": self.created_at,
"updated_at": self.updated_at,
}


@dataclass(unsafe_hash=True)
class JobHistoryCatalogEntry:
"""Dataclass representing an entry in the `JobHistoryCatalog`."""

job_id: int
job_name: str
execution_start_time: datetime
execution_end_time: datetime
created_at: datetime
updated_at: datetime
row_id: int = None

def display_format(self):
return {
"row_id": self.row_id,
"job_id": self.job_name,
"job_name": self.job_name,
"execution_start_time": self.execution_start_time,
"execution_end_time": self.execution_end_time,
"created_at": self.created_at,
"updated_at": self.updated_at,
}
6 changes: 3 additions & 3 deletions evadb/catalog/services/job_catalog_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ def get_all_overdue_jobs(self) -> list:
self.model._active == true(),
)
)
).all()
entry = [row.as_dataclass() for row in entries]
return entry
).scalars().all()
entries = [row.as_dataclass() for row in entries]
return entries

def get_next_executable_job(self, only_past_jobs: bool) -> JobCatalogEntry:
"""Get the oldest job that is ready to be triggered by trigger time
Expand Down
92 changes: 92 additions & 0 deletions evadb/catalog/services/job_history_catalog_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# coding=utf-8
# Copyright 2018-2023 EvaDB
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import json

from sqlalchemy import and_, true
from sqlalchemy.orm import Session
from sqlalchemy.sql.expression import select

from evadb.catalog.models.job_history_catalog import JobHistoryCatalog
from evadb.catalog.models.utils import JobHistoryCatalogEntry
from evadb.catalog.services.base_service import BaseService
from evadb.utils.errors import CatalogError
from evadb.utils.logging_manager import logger


class JobHistoryCatalogService(BaseService):
def __init__(self, db_session: Session):
super().__init__(JobHistoryCatalog, db_session)

def insert_entry(
self,
job_id: str,
job_name: str,
execution_start_time: datetime,
execution_end_time: datetime,
) -> JobHistoryCatalogEntry:
try:
job_history_catalog_obj = self.model(
job_id=job_id,
job_name=job_name,
execution_start_time=execution_start_time,
execution_end_time=execution_end_time,
)
job_history_catalog_obj = job_history_catalog_obj.save(self.session)

except Exception as e:
logger.exception(
f"Failed to insert entry into job history catalog with exception {str(e)}"
)
raise CatalogError(e)

return job_history_catalog_obj.as_dataclass()

def get_entry_by_job_id(self, job_id: int) -> list[JobHistoryCatalogEntry]:
"""
Get all the job history catalog entry with given job id.
Arguments:
job_id (int): Job id
Returns:
list[JobHistoryCatalogEntry]: all history catalog entries for given job id
"""
entries = self.session.execute(
select(self.model).filter(self.model._job_id == job_id)
).scalars().all()
entries = [row.as_dataclass() for row in entries]
return entries

def update_entry_end_time(
self, job_id: int, execution_start_time: datetime, execution_end_time: datetime
):
"""Update the execution_end_time of the entry as per the provided values
Arguments:
job_id (int): id of the job whose history entry which should be updated
execution_start_time (datetime): the start time for the job history entry
execution_end_time (datetime): the end time for the job history entry
Returns:
void
"""
job_history_entry = (
self.session.query(self.model).filter(
and_(self.model._job_id == job_id, self.model._execution_start_time == execution_start_time)
).first()
)
if job_history_entry:
job_history_entry._execution_end_time = execution_end_time
self.session.commit()
1 change: 1 addition & 0 deletions evadb/catalog/sql_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
"function_cost_catalog",
"function_metadata_catalog",
"job_catalog",
"job_history_catalog",
]
# Add all keywords that are restricted by EvaDB

Expand Down
4 changes: 2 additions & 2 deletions evadb/executor/create_job_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def _get_repeat_time_interval_seconds(
self, repeat_interval: int, repeat_period: str
) -> int:
unit_to_seconds = {
"second": 1,
"seconds": 1,
"minute": 60,
"minutes": 60,
"min": 60,
Expand All @@ -71,7 +71,7 @@ def _get_repeat_time_interval_seconds(
}
assert (repeat_period is None) or (
repeat_period in unit_to_seconds
), "repeat period should be one of these values: minute | minutes | min | hour | hours | day | days | week | weeks | month | months"
), "repeat period should be one of these values: seconds | minute | minutes | min | hour | hours | day | days | week | weeks | month | months"

repeat_interval = 1 if repeat_interval is None else repeat_interval
return repeat_interval * unit_to_seconds.get(repeat_period, 0)
Expand Down
Loading

0 comments on commit cbb87e7

Please sign in to comment.