Skip to content

Commit

Permalink
supports log view of job execution for frontend
Browse files Browse the repository at this point in the history
  • Loading branch information
SteNicholas committed Jan 20, 2022
1 parent 6e5a6dd commit 93988ba
Show file tree
Hide file tree
Showing 12 changed files with 148 additions and 25 deletions.
10 changes: 6 additions & 4 deletions ai_flow/endpoint/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from ai_flow.model_center.service.service import ModelCenterService
from ai_flow.protobuf.high_availability_pb2_grpc import add_HighAvailabilityManagerServicer_to_server
from ai_flow.scheduler_service.service.service import SchedulerService, SchedulerServiceConfig
from ai_flow.settings import AIFLOW_HOME
from ai_flow.store.db.base_model import base
from ai_flow.store.db.db_util import extract_db_engine_from_uri, create_db_store
from ai_flow.store.mongo_store import MongoStoreConnManager
Expand Down Expand Up @@ -76,7 +77,8 @@ def __init__(self,
ha_manager=None,
ha_server_uri=None,
ha_storage=None,
ttl_ms: int = 10000):
ttl_ms: int = 10000,
base_log_folder: str = AIFLOW_HOME):
self.store_uri = store_uri
self.db_type = DBType.value_of(extract_db_engine_from_uri(store_uri))
self.executor = Executor(futures.ThreadPoolExecutor(max_workers=10))
Expand All @@ -99,7 +101,7 @@ def __init__(self,
metric_service_pb2_grpc.add_MetricServiceServicer_to_server(MetricService(db_uri=store_uri), self.server)

if start_scheduler_service:
self._add_scheduler_service(scheduler_service_config, store_uri, notification_server_uri)
self._add_scheduler_service(scheduler_service_config, store_uri, notification_server_uri, base_log_folder)

if enabled_ha:
self._add_ha_service(ha_manager, ha_server_uri, ha_storage, store_uri, ttl_ms)
Expand All @@ -108,10 +110,10 @@ def __init__(self,

self._stop = threading.Event()

def _add_scheduler_service(self, scheduler_service_config, db_uri, notification_server_uri):
def _add_scheduler_service(self, scheduler_service_config, db_uri, notification_server_uri, base_log_folder):
logging.info("start scheduler service.")
real_config = SchedulerServiceConfig(scheduler_service_config)
self.scheduler_service = SchedulerService(real_config, db_uri, notification_server_uri)
self.scheduler_service = SchedulerService(real_config, db_uri, notification_server_uri, base_log_folder)
scheduling_service_pb2_grpc.add_SchedulingServiceServicer_to_server(self.scheduler_service,
self.server)

Expand Down
4 changes: 3 additions & 1 deletion ai_flow/endpoint/server/server_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
# specific language governing permissions and limitations
# under the License.
#
import os
from ai_flow.common.configuration import AIFlowConfiguration
from enum import Enum
from typing import Text
Expand Down Expand Up @@ -135,3 +134,6 @@ def get_wait_for_server_started_timeout(self):
return self.get('wait_for_server_started_timeout')
else:
return 5.0

def get_base_log_folder(self):
return self.get('base_log_folder')
6 changes: 5 additions & 1 deletion ai_flow/endpoint/server/server_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from ai_flow.endpoint.server.server import AIFlowServer
from ai_flow.endpoint.server.server_config import AIFlowServerConfig
from ai_flow.client.ai_flow_client import get_ai_flow_client
from ai_flow.settings import AIFLOW_HOME
from ai_flow.util.net_utils import get_ip_addr
import logging

Expand Down Expand Up @@ -78,7 +79,10 @@ def start(self,
scheduler_service_config=self.server_config.get_scheduler_service_config(),
enabled_ha=self.server_config.get_enable_ha(),
ha_server_uri=get_ip_addr() + ":" + str(self.server_config.get_server_port()),
ttl_ms=self.server_config.get_ha_ttl_ms())
ttl_ms=self.server_config.get_ha_ttl_ms(),
base_log_folder=self.server_config.get_base_log_folder()
if self.server_config.get_base_log_folder()
else AIFLOW_HOME)
self.server.run(is_block=is_block)
if not is_block:
self._wait_for_server_available(timeout=self.server_config.get_wait_for_server_started_timeout())
Expand Down
18 changes: 18 additions & 0 deletions ai_flow/frontend/src/views/metadata/JobExecution.vue
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ limitations under the License. -->
<span slot="_execution_label" slot-scope="text">
<ellipsis :length="32" tooltip>{{ text }}</ellipsis>
</span>
<span slot="action" slot-scope="text, record">
<template>
<a @click="handleLog(record)">Log</a>
</template>
</span>
</s-table>
</a-card>
<a-card :bordered="false">
Expand Down Expand Up @@ -114,6 +119,12 @@ const columns = [
title: 'End Date',
dataIndex: '_end_date',
customRender: (t) => formateDate(new Date(parseInt(t)), 'YYYY-MM-dd hh:mm')
},
{
title: 'Action',
dataIndex: 'action',
width: '150px',
scopedSlots: { customRender: 'action' }
}
]

Expand Down Expand Up @@ -145,6 +156,13 @@ export default {
this.getAIFlowVersion()
},
methods: {
handleLog (record) {
if (record._job_type === 'bash' || record._job_type === 'python' || record._job_type === 'flink') {
window.open(`/job-execution/log?workflow_execution_id=${encodeURIComponent(this.queryParam.workflow_execution_id)}&job_name=${encodeURIComponent(record._job_name)}&job_type=${encodeURIComponent(record._job_type)}&job_execution_id=${encodeURIComponent(record._job_execution_id)}`, '_blank')
} else {
alert(`Viewing logs of ${record._job_type} type of job is not supported.`)
}
},
resetSearchForm () {
this.queryParam = {
date: moment(new Date())
Expand Down
81 changes: 76 additions & 5 deletions ai_flow/frontend/web_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
# under the License.
#
import getopt
import io
import json
import logging
import os
import sys
from logging.config import dictConfig
import time
import zipfile

from ai_flow.ai_graph.ai_graph import AIGraph
from ai_flow.ai_graph.ai_node import AINode, ReadDatasetNode, WriteDatasetNode
Expand All @@ -29,14 +32,16 @@
from ai_flow.meta.workflow_meta import WorkflowMeta
from ai_flow.plugin_interface.scheduler_interface import Scheduler, SchedulerFactory
from ai_flow.scheduler_service.service.config import SchedulerServiceConfig
from ai_flow.settings import AIFLOW_HOME
from ai_flow.store.abstract_store import Filters, AbstractStore, Orders
from ai_flow.store.db.db_util import create_db_store
from ai_flow.util.json_utils import loads, Jsonable, dumps
from ai_flow.version import __version__
from ai_flow.workflow.control_edge import ControlEdge
from django.core.paginator import Paginator
from flask import Flask, request, render_template
from flask import Flask, request, render_template, Response, make_response, redirect
from flask_cors import CORS
from logging.config import dictConfig
from typing import List, Dict
from werkzeug.local import LocalProxy

Expand Down Expand Up @@ -65,6 +70,7 @@ def config_logging():
store: AbstractStore = None
scheduler: Scheduler = None
airflow: str = None
config: AIFlowServerConfig = None
logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -319,6 +325,18 @@ def json_pagination_response(page_no: int, total_count: int, data: List):
return json.dumps({'pageNo': page_no, 'totalCount': total_count, 'data': data})


def make_zip(source_dir, target_file):
if os.path.exists(target_file):
os.remove(target_file)
zip_file = zipfile.ZipFile(os.path.join(source_dir, target_file), 'w')
for parent, dirs, files in os.walk(source_dir):
for file in files:
if file != target_file:
file_path = os.path.join(parent, file)
zip_file.write(file_path, file_path[len(os.path.dirname(source_dir)):].strip(os.path.sep))
zip_file.close()


@app.route('/')
def index():
return render_template('index.html')
Expand Down Expand Up @@ -404,10 +422,62 @@ def workflow_execution_metadata():
def job_execution_metadata():
workflow_execution_id = request.args.get('workflow_execution_id')
job_execution_list = scheduler.list_job_executions(workflow_execution_id) if workflow_execution_id else None
page_job_executions = Paginator(job_execution_list, int(request.args.get('pageSize'))).get_page(
int(request.args.get('pageNo'))).object_list if job_execution_list else None
job_execution_objects = []
if page_job_executions:
workflow_info = page_job_executions[0].workflow_execution.workflow_info
workflow_graph = extract_graph(store.get_workflow_by_name(workflow_info.namespace,
workflow_info.workflow_name))
job_types = {}
for node in workflow_graph.nodes.values():
job_types.update({node.config.job_name: node.config.job_type})
for page_job_execution in page_job_executions:
job_execution_object = page_job_execution.__dict__
job_execution_object.update({'_job_type': job_types.get(page_job_execution.job_name)})
job_execution_objects.append(job_execution_object)
return pagination_response(page_no=int(request.args.get('pageNo')),
total_count=len(job_execution_list) if job_execution_list else 0,
data=Paginator(job_execution_list, int(request.args.get('pageSize'))).get_page(
int(request.args.get('pageNo'))).object_list if job_execution_list else [])
total_count=len(job_execution_list) if job_execution_list else 0,
data=job_execution_objects)


@app.route('/job-execution/log')
def job_execution_log():
job_type = request.args.get('job_type')
job_execution_id = request.args.get('job_execution_id')
workflow_execution_id = request.args.get('workflow_execution_id')
job_executions = scheduler.get_job_executions(request.args.get('job_name'),
workflow_execution_id)
log_job_execution = None
for job_execution in job_executions:
if job_execution.job_execution_id == job_execution_id:
log_job_execution = job_execution
break
if log_job_execution:
if job_type == 'bash':
return redirect('{}/graph?dag_id={}'.format(airflow, workflow_execution_id.split('|')[0]))
else:
base_log_folder = config.get_base_log_folder()
log_dir = os.path.join(base_log_folder if base_log_folder else AIFLOW_HOME, 'logs',
log_job_execution.workflow_execution.workflow_info.namespace,
log_job_execution.workflow_execution.workflow_info.workflow_name,
log_job_execution.job_name,
log_job_execution.job_execution_id)
if os.path.exists(log_dir):
log_file = 'logs.zip'
make_zip(log_dir, log_file)
file_obj = io.BytesIO()
with zipfile.ZipFile(file_obj, 'w') as zip_file:
zip_info = zipfile.ZipInfo(log_file)
zip_info.date_time = time.localtime(time.time())[:6]
zip_info.compress_type = zipfile.ZIP_DEFLATED
with open(os.path.join(log_dir, log_file), 'rb') as fd:
zip_file.writestr(zip_info, fd.read())
file_obj.seek(0)
return Response(file_obj.getvalue(),
mimetype='application/zip',
headers={'Content-Disposition': 'attachment;filename={}'.format(log_file)})
return make_response('No log found.')


@app.route('/dataset')
Expand Down Expand Up @@ -505,6 +575,7 @@ def main(argv):


def start_web_server_by_config_file_path(config_file_path):
global config
config = AIFlowServerConfig()
config.load_from_file(config_file_path)
store_uri = config.get_db_uri()
Expand Down
16 changes: 13 additions & 3 deletions ai_flow/runtime/job_runtime_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
# under the License.
import os
from typing import Text

from ai_flow.context.project_context import ProjectContext
from ai_flow.plugin_interface.scheduler_interface import JobExecutionInfo
from ai_flow.project.project_config import ProjectConfig
from ai_flow.settings import AIFLOW_HOME
from ai_flow.workflow.workflow_config import WorkflowConfig, load_workflow_config

from ai_flow.util import serialization_utils
Expand All @@ -34,11 +37,14 @@ class JobRuntimeEnv(object):

def __init__(self,
working_dir: Text,
job_execution_info: JobExecutionInfo = None):
job_execution_info: JobExecutionInfo = None,
project_context: ProjectContext = None,
base_log_folder: Text = None):
self._working_dir: Text = working_dir
self._job_execution_info: JobExecutionInfo = job_execution_info
self._workflow_config: WorkflowConfig = None
self._project_config: ProjectConfig = None
self._project_config: ProjectConfig = project_context.project_config if project_context else None
self._base_log_folder: Text = base_log_folder if base_log_folder else AIFLOW_HOME

@property
def working_dir(self) -> Text:
Expand Down Expand Up @@ -75,7 +81,11 @@ def log_dir(self) -> Text:
"""
return: The directory where job logs are stored.
"""
return os.path.join(self._working_dir, 'logs')
return os.path.join(self._base_log_folder, 'logs',
self.project_config.get_project_name(),
self.job_execution_info.workflow_execution.workflow_info.workflow_name,
self.job_execution_info.job_name,
self.job_execution_info.job_execution_id)

@property
def resource_dir(self) -> Text:
Expand Down
12 changes: 9 additions & 3 deletions ai_flow/runtime/job_runtime_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,31 @@ def prepare_job_runtime_env(workflow_generated_dir,
workflow_name,
project_context: ProjectContext,
job_execution_info: JobExecutionInfo,
root_working_dir=None) -> JobRuntimeEnv:
root_working_dir=None,
base_log_folder=None) -> JobRuntimeEnv:
"""
Prepare the operating environment for the ai flow job(ai_flow.workflow.job.Job)
:param workflow_generated_dir: The generated directory of workflow.
:param workflow_name: The name of the workflow(ai_flow.workflow.workflow.Workflow).
:param project_context: The context of the project which the job belongs.
:param job_execution_info: The information of the execution of the job.
:param root_working_dir: The working directory of the execution of the job(ai_flow.workflow.job.Job).
:param base_log_folder: The base folder of the logs.
:return: ai_flow.runtime.job_runtime_env.JobRuntimeEnv object.
"""
working_dir = os.path.join(root_working_dir,
workflow_name,
job_execution_info.job_name,
str(time.strftime("%Y%m%d%H%M%S", time.localtime())))
job_runtime_env: JobRuntimeEnv = JobRuntimeEnv(working_dir=working_dir,
job_execution_info=job_execution_info)
job_execution_info=job_execution_info,
project_context=project_context,
base_log_folder=base_log_folder)
if not os.path.exists(working_dir):
os.makedirs(job_runtime_env.log_dir)
os.makedirs(working_dir)
job_runtime_env.save_job_execution_info()
if not os.path.exists(job_runtime_env.log_dir):
os.makedirs(job_runtime_env.log_dir)
if not os.path.exists(os.path.dirname(job_runtime_env.workflow_dir)):
os.makedirs(os.path.dirname(job_runtime_env.workflow_dir))
if not os.path.exists(job_runtime_env.workflow_dir):
Expand Down
4 changes: 3 additions & 1 deletion ai_flow/scheduler_service/service/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ class SchedulerService(SchedulingServiceServicer):
def __init__(self,
scheduler_service_config: SchedulerServiceConfig,
db_uri,
notification_server_uri):
notification_server_uri,
base_log_folder):
self._scheduler_service_config = scheduler_service_config
scheduler_service_config.scheduler().scheduler_config().update({'base_log_folder': base_log_folder})
self._scheduler: Scheduler \
= SchedulerFactory.create_scheduler(scheduler_service_config.scheduler().scheduler_class(),
scheduler_service_config.scheduler().scheduler_config())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def __init__(
job: Job,
workflow: Workflow,
resource_dir: Text = None,
base_log_folder: Text = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -98,6 +99,7 @@ def __init__(
self.job_handle: JobHandle = None
self.job_runtime_env: JobRuntimeEnv = None
self.resource_dir = resource_dir
self.base_log_folder = base_log_folder

def context_to_job_info(self, project_name: Text, context: Any) -> JobExecutionInfo:
"""
Expand Down Expand Up @@ -158,7 +160,8 @@ def pre_execute(self, context: Any):
workflow_name=self.workflow.workflow_name,
project_context=project_context,
job_execution_info=job_execution_info,
root_working_dir=root_working_dir)
root_working_dir=root_working_dir,
base_log_folder=self.base_log_folder)

def execute(self, context: Any):
self.log.info("context:" + str(context))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ def submit_workflow(self, workflow: Workflow, context_extractor, project_context
code_text = self.dag_generator.generate(workflow=workflow,
project_name=project_context.project_name,
resource_dir=self.config.get('resource_dir'),
base_log_folder=self.config.get('base_log_folder'),
context_extractor=context_extractor)
self._check_configurable_path('airflow_deploy_path')
airflow_file_path = self._write_to_deploy_path(code_text, dag_id + ".py",
Expand Down
Loading

0 comments on commit 93988ba

Please sign in to comment.