Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Frontend] Supports log view of job execution for frontend #251

Merged
merged 1 commit into from
Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions ai_flow/endpoint/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from ai_flow.model_center.service.service import ModelCenterService
from ai_flow.protobuf.high_availability_pb2_grpc import add_HighAvailabilityManagerServicer_to_server
from ai_flow.scheduler_service.service.service import SchedulerService, SchedulerServiceConfig
from ai_flow.settings import AIFLOW_HOME
from ai_flow.store.db.base_model import base
from ai_flow.store.db.db_util import extract_db_engine_from_uri, create_db_store
from ai_flow.store.mongo_store import MongoStoreConnManager
Expand Down Expand Up @@ -76,7 +77,8 @@ def __init__(self,
ha_manager=None,
ha_server_uri=None,
ha_storage=None,
ttl_ms: int = 10000):
ttl_ms: int = 10000,
base_log_folder: str = AIFLOW_HOME):
self.store_uri = store_uri
self.db_type = DBType.value_of(extract_db_engine_from_uri(store_uri))
self.executor = Executor(futures.ThreadPoolExecutor(max_workers=10))
Expand All @@ -99,7 +101,7 @@ def __init__(self,
metric_service_pb2_grpc.add_MetricServiceServicer_to_server(MetricService(db_uri=store_uri), self.server)

if start_scheduler_service:
self._add_scheduler_service(scheduler_service_config, store_uri, notification_server_uri)
self._add_scheduler_service(scheduler_service_config, store_uri, notification_server_uri, base_log_folder)

if enabled_ha:
self._add_ha_service(ha_manager, ha_server_uri, ha_storage, store_uri, ttl_ms)
Expand All @@ -108,10 +110,10 @@ def __init__(self,

self._stop = threading.Event()

def _add_scheduler_service(self, scheduler_service_config, db_uri, notification_server_uri):
def _add_scheduler_service(self, scheduler_service_config, db_uri, notification_server_uri, base_log_folder):
logging.info("start scheduler service.")
real_config = SchedulerServiceConfig(scheduler_service_config)
self.scheduler_service = SchedulerService(real_config, db_uri, notification_server_uri)
self.scheduler_service = SchedulerService(real_config, db_uri, notification_server_uri, base_log_folder)
scheduling_service_pb2_grpc.add_SchedulingServiceServicer_to_server(self.scheduler_service,
self.server)

Expand Down
4 changes: 3 additions & 1 deletion ai_flow/endpoint/server/server_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
# specific language governing permissions and limitations
# under the License.
#
import os
from ai_flow.common.configuration import AIFlowConfiguration
from enum import Enum
from typing import Text
Expand Down Expand Up @@ -135,3 +134,6 @@ def get_wait_for_server_started_timeout(self):
return self.get('wait_for_server_started_timeout')
else:
return 5.0

def get_base_log_folder(self):
return self.get('base_log_folder')
6 changes: 5 additions & 1 deletion ai_flow/endpoint/server/server_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from ai_flow.endpoint.server.server import AIFlowServer
from ai_flow.endpoint.server.server_config import AIFlowServerConfig
from ai_flow.client.ai_flow_client import get_ai_flow_client
from ai_flow.settings import AIFLOW_HOME
from ai_flow.util.net_utils import get_ip_addr
import logging

Expand Down Expand Up @@ -78,7 +79,10 @@ def start(self,
scheduler_service_config=self.server_config.get_scheduler_service_config(),
enabled_ha=self.server_config.get_enable_ha(),
ha_server_uri=get_ip_addr() + ":" + str(self.server_config.get_server_port()),
ttl_ms=self.server_config.get_ha_ttl_ms())
ttl_ms=self.server_config.get_ha_ttl_ms(),
base_log_folder=self.server_config.get_base_log_folder()
if self.server_config.get_base_log_folder()
else AIFLOW_HOME)
self.server.run(is_block=is_block)
if not is_block:
self._wait_for_server_available(timeout=self.server_config.get_wait_for_server_started_timeout())
Expand Down
18 changes: 18 additions & 0 deletions ai_flow/frontend/src/views/metadata/JobExecution.vue
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ limitations under the License. -->
<span slot="_execution_label" slot-scope="text">
<ellipsis :length="32" tooltip>{{ text }}</ellipsis>
</span>
<span slot="action" slot-scope="text, record">
<template>
<a @click="handleLog(record)">Log</a>
</template>
</span>
</s-table>
</a-card>
<a-card :bordered="false">
Expand Down Expand Up @@ -114,6 +119,12 @@ const columns = [
title: 'End Date',
dataIndex: '_end_date',
customRender: (t) => formateDate(new Date(parseInt(t)), 'YYYY-MM-dd hh:mm')
},
{
title: 'Action',
dataIndex: 'action',
width: '150px',
scopedSlots: { customRender: 'action' }
}
]

Expand Down Expand Up @@ -145,6 +156,13 @@ export default {
this.getAIFlowVersion()
},
methods: {
handleLog (record) {
if (record._job_type === 'bash' || record._job_type === 'python' || record._job_type === 'flink') {
window.open(`/job-execution/log?workflow_execution_id=${encodeURIComponent(this.queryParam.workflow_execution_id)}&job_name=${encodeURIComponent(record._job_name)}&job_type=${encodeURIComponent(record._job_type)}&job_execution_id=${encodeURIComponent(record._job_execution_id)}`, '_blank')
} else {
alert(`Viewing logs of ${record._job_type} type of job is not supported.`)
}
},
resetSearchForm () {
this.queryParam = {
date: moment(new Date())
Expand Down
81 changes: 76 additions & 5 deletions ai_flow/frontend/web_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
# under the License.
#
import getopt
import io
import json
import logging
import os
import sys
from logging.config import dictConfig
import time
import zipfile

from ai_flow.ai_graph.ai_graph import AIGraph
from ai_flow.ai_graph.ai_node import AINode, ReadDatasetNode, WriteDatasetNode
Expand All @@ -29,14 +32,16 @@
from ai_flow.meta.workflow_meta import WorkflowMeta
from ai_flow.plugin_interface.scheduler_interface import Scheduler, SchedulerFactory
from ai_flow.scheduler_service.service.config import SchedulerServiceConfig
from ai_flow.settings import AIFLOW_HOME
from ai_flow.store.abstract_store import Filters, AbstractStore, Orders
from ai_flow.store.db.db_util import create_db_store
from ai_flow.util.json_utils import loads, Jsonable, dumps
from ai_flow.version import __version__
from ai_flow.workflow.control_edge import ControlEdge
from django.core.paginator import Paginator
from flask import Flask, request, render_template
from flask import Flask, request, render_template, Response, make_response, redirect
from flask_cors import CORS
from logging.config import dictConfig
from typing import List, Dict
from werkzeug.local import LocalProxy

Expand Down Expand Up @@ -65,6 +70,7 @@ def config_logging():
store: AbstractStore = None
scheduler: Scheduler = None
airflow: str = None
config: AIFlowServerConfig = None
logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -319,6 +325,18 @@ def json_pagination_response(page_no: int, total_count: int, data: List):
return json.dumps({'pageNo': page_no, 'totalCount': total_count, 'data': data})


def make_zip(source_dir, target_file):
if os.path.exists(target_file):
os.remove(target_file)
zip_file = zipfile.ZipFile(os.path.join(source_dir, target_file), 'w')
for parent, dirs, files in os.walk(source_dir):
for file in files:
if file != target_file:
file_path = os.path.join(parent, file)
zip_file.write(file_path, file_path[len(os.path.dirname(source_dir)):].strip(os.path.sep))
zip_file.close()


@app.route('/')
def index():
return render_template('index.html')
Expand Down Expand Up @@ -404,10 +422,62 @@ def workflow_execution_metadata():
def job_execution_metadata():
workflow_execution_id = request.args.get('workflow_execution_id')
job_execution_list = scheduler.list_job_executions(workflow_execution_id) if workflow_execution_id else None
page_job_executions = Paginator(job_execution_list, int(request.args.get('pageSize'))).get_page(
int(request.args.get('pageNo'))).object_list if job_execution_list else None
job_execution_objects = []
if page_job_executions:
workflow_info = page_job_executions[0].workflow_execution.workflow_info
workflow_graph = extract_graph(store.get_workflow_by_name(workflow_info.namespace,
workflow_info.workflow_name))
job_types = {}
for node in workflow_graph.nodes.values():
job_types.update({node.config.job_name: node.config.job_type})
for page_job_execution in page_job_executions:
job_execution_object = page_job_execution.__dict__
job_execution_object.update({'_job_type': job_types.get(page_job_execution.job_name)})
job_execution_objects.append(job_execution_object)
return pagination_response(page_no=int(request.args.get('pageNo')),
total_count=len(job_execution_list) if job_execution_list else 0,
data=Paginator(job_execution_list, int(request.args.get('pageSize'))).get_page(
int(request.args.get('pageNo'))).object_list if job_execution_list else [])
total_count=len(job_execution_list) if job_execution_list else 0,
data=job_execution_objects)


@app.route('/job-execution/log')
def job_execution_log():
job_type = request.args.get('job_type')
job_execution_id = request.args.get('job_execution_id')
workflow_execution_id = request.args.get('workflow_execution_id')
job_executions = scheduler.get_job_executions(request.args.get('job_name'),
workflow_execution_id)
log_job_execution = None
for job_execution in job_executions:
if job_execution.job_execution_id == job_execution_id:
log_job_execution = job_execution
break
if log_job_execution:
if job_type == 'bash':
return redirect('{}/graph?dag_id={}'.format(airflow, workflow_execution_id.split('|')[0]))
else:
base_log_folder = config.get_base_log_folder()
log_dir = os.path.join(base_log_folder if base_log_folder else AIFLOW_HOME, 'logs',
log_job_execution.workflow_execution.workflow_info.namespace,
log_job_execution.workflow_execution.workflow_info.workflow_name,
log_job_execution.job_name,
log_job_execution.job_execution_id)
if os.path.exists(log_dir):
log_file = 'logs.zip'
make_zip(log_dir, log_file)
file_obj = io.BytesIO()
with zipfile.ZipFile(file_obj, 'w') as zip_file:
zip_info = zipfile.ZipInfo(log_file)
zip_info.date_time = time.localtime(time.time())[:6]
zip_info.compress_type = zipfile.ZIP_DEFLATED
with open(os.path.join(log_dir, log_file), 'rb') as fd:
zip_file.writestr(zip_info, fd.read())
file_obj.seek(0)
return Response(file_obj.getvalue(),
mimetype='application/zip',
headers={'Content-Disposition': 'attachment;filename={}'.format(log_file)})
return make_response('No log found.')


@app.route('/dataset')
Expand Down Expand Up @@ -505,6 +575,7 @@ def main(argv):


def start_web_server_by_config_file_path(config_file_path):
global config
config = AIFlowServerConfig()
config.load_from_file(config_file_path)
store_uri = config.get_db_uri()
Expand Down
16 changes: 13 additions & 3 deletions ai_flow/runtime/job_runtime_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
# under the License.
import os
from typing import Text

from ai_flow.context.project_context import ProjectContext
from ai_flow.plugin_interface.scheduler_interface import JobExecutionInfo
from ai_flow.project.project_config import ProjectConfig
from ai_flow.settings import AIFLOW_HOME
from ai_flow.workflow.workflow_config import WorkflowConfig, load_workflow_config

from ai_flow.util import serialization_utils
Expand All @@ -34,11 +37,14 @@ class JobRuntimeEnv(object):

def __init__(self,
working_dir: Text,
job_execution_info: JobExecutionInfo = None):
job_execution_info: JobExecutionInfo = None,
project_context: ProjectContext = None,
base_log_folder: Text = None):
self._working_dir: Text = working_dir
self._job_execution_info: JobExecutionInfo = job_execution_info
self._workflow_config: WorkflowConfig = None
self._project_config: ProjectConfig = None
self._project_config: ProjectConfig = project_context.project_config if project_context else None
self._base_log_folder: Text = base_log_folder if base_log_folder else AIFLOW_HOME

@property
def working_dir(self) -> Text:
Expand Down Expand Up @@ -75,7 +81,11 @@ def log_dir(self) -> Text:
"""
return: The directory where job logs are stored.
"""
return os.path.join(self._working_dir, 'logs')
return os.path.join(self._base_log_folder, 'logs',
self.project_config.get_project_name(),
self.job_execution_info.workflow_execution.workflow_info.workflow_name,
self.job_execution_info.job_name,
self.job_execution_info.job_execution_id)

@property
def resource_dir(self) -> Text:
Expand Down
12 changes: 9 additions & 3 deletions ai_flow/runtime/job_runtime_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,31 @@ def prepare_job_runtime_env(workflow_generated_dir,
workflow_name,
project_context: ProjectContext,
job_execution_info: JobExecutionInfo,
root_working_dir=None) -> JobRuntimeEnv:
root_working_dir=None,
base_log_folder=None) -> JobRuntimeEnv:
"""
Prepare the operating environment for the ai flow job(ai_flow.workflow.job.Job)
:param workflow_generated_dir: The generated directory of workflow.
:param workflow_name: The name of the workflow(ai_flow.workflow.workflow.Workflow).
:param project_context: The context of the project which the job belongs.
:param job_execution_info: The information of the execution of the job.
:param root_working_dir: The working directory of the execution of the job(ai_flow.workflow.job.Job).
:param base_log_folder: The base folder of the logs.
:return: ai_flow.runtime.job_runtime_env.JobRuntimeEnv object.
"""
working_dir = os.path.join(root_working_dir,
workflow_name,
job_execution_info.job_name,
str(time.strftime("%Y%m%d%H%M%S", time.localtime())))
job_runtime_env: JobRuntimeEnv = JobRuntimeEnv(working_dir=working_dir,
job_execution_info=job_execution_info)
job_execution_info=job_execution_info,
project_context=project_context,
base_log_folder=base_log_folder)
if not os.path.exists(working_dir):
os.makedirs(job_runtime_env.log_dir)
os.makedirs(working_dir)
job_runtime_env.save_job_execution_info()
if not os.path.exists(job_runtime_env.log_dir):
os.makedirs(job_runtime_env.log_dir)
if not os.path.exists(os.path.dirname(job_runtime_env.workflow_dir)):
os.makedirs(os.path.dirname(job_runtime_env.workflow_dir))
if not os.path.exists(job_runtime_env.workflow_dir):
Expand Down
4 changes: 3 additions & 1 deletion ai_flow/scheduler_service/service/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ class SchedulerService(SchedulingServiceServicer):
def __init__(self,
scheduler_service_config: SchedulerServiceConfig,
db_uri,
notification_server_uri):
notification_server_uri,
base_log_folder):
self._scheduler_service_config = scheduler_service_config
scheduler_service_config.scheduler().scheduler_config().update({'base_log_folder': base_log_folder})
self._scheduler: Scheduler \
= SchedulerFactory.create_scheduler(scheduler_service_config.scheduler().scheduler_class(),
scheduler_service_config.scheduler().scheduler_config())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def __init__(
job: Job,
workflow: Workflow,
resource_dir: Text = None,
base_log_folder: Text = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -98,6 +99,7 @@ def __init__(
self.job_handle: JobHandle = None
self.job_runtime_env: JobRuntimeEnv = None
self.resource_dir = resource_dir
self.base_log_folder = base_log_folder

def context_to_job_info(self, project_name: Text, context: Any) -> JobExecutionInfo:
"""
Expand Down Expand Up @@ -158,7 +160,8 @@ def pre_execute(self, context: Any):
workflow_name=self.workflow.workflow_name,
project_context=project_context,
job_execution_info=job_execution_info,
root_working_dir=root_working_dir)
root_working_dir=root_working_dir,
base_log_folder=self.base_log_folder)

def execute(self, context: Any):
self.log.info("context:" + str(context))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ def submit_workflow(self, workflow: Workflow, context_extractor, project_context
code_text = self.dag_generator.generate(workflow=workflow,
project_name=project_context.project_name,
resource_dir=self.config.get('resource_dir'),
base_log_folder=self.config.get('base_log_folder'),
context_extractor=context_extractor)
self._check_configurable_path('airflow_deploy_path')
airflow_file_path = self._write_to_deploy_path(code_text, dag_id + ".py",
Expand Down
Loading