Skip to content

Commit

Permalink
supports log view of job execution for frontend
Browse files Browse the repository at this point in the history
  • Loading branch information
SteNicholas committed Jan 11, 2022
1 parent 6e5a6dd commit 382d2fb
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 9 deletions.
18 changes: 18 additions & 0 deletions ai_flow/frontend/src/views/metadata/JobExecution.vue
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ limitations under the License. -->
<span slot="_execution_label" slot-scope="text">
<ellipsis :length="32" tooltip>{{ text }}</ellipsis>
</span>
<span slot="action" slot-scope="text, record">
<template>
<a @click="handleLog(record)">Log</a>
</template>
</span>
</s-table>
</a-card>
<a-card :bordered="false">
Expand Down Expand Up @@ -114,6 +119,12 @@ const columns = [
title: 'End Date',
dataIndex: '_end_date',
customRender: (t) => formateDate(new Date(parseInt(t)), 'YYYY-MM-dd hh:mm')
},
{
title: 'Action',
dataIndex: 'action',
width: '150px',
scopedSlots: { customRender: 'action' }
}
]

Expand Down Expand Up @@ -145,6 +156,13 @@ export default {
this.getAIFlowVersion()
},
methods: {
handleLog (record) {
if (record._job_type === 'bash' || record._job_type === 'python' || record._job_type === 'flink') {
window.open(`/job-execution/log?workflow_execution_id=${encodeURIComponent(this.queryParam.workflow_execution_id)}&job_name=${encodeURIComponent(record._job_name)}&job_type=${encodeURIComponent(record._job_type)}&job_execution_id=${encodeURIComponent(record._job_execution_id)}`, '_blank')
} else {
alert(`Viewing logs of ${record._job_type} type of job is not supported.`)
}
},
resetSearchForm () {
this.queryParam = {
date: moment(new Date())
Expand Down
77 changes: 73 additions & 4 deletions ai_flow/frontend/web_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@
# under the License.
#
import getopt
import io
import json
import logging
import os
import sys
import time
import zipfile
from logging.config import dictConfig

from ai_flow.ai_graph.ai_graph import AIGraph
Expand All @@ -29,13 +33,14 @@
from ai_flow.meta.workflow_meta import WorkflowMeta
from ai_flow.plugin_interface.scheduler_interface import Scheduler, SchedulerFactory
from ai_flow.scheduler_service.service.config import SchedulerServiceConfig
from ai_flow.settings import AIFLOW_HOME
from ai_flow.store.abstract_store import Filters, AbstractStore, Orders
from ai_flow.store.db.db_util import create_db_store
from ai_flow.util.json_utils import loads, Jsonable, dumps
from ai_flow.version import __version__
from ai_flow.workflow.control_edge import ControlEdge
from django.core.paginator import Paginator
from flask import Flask, request, render_template
from flask import Flask, request, render_template, Response, make_response, redirect
from flask_cors import CORS
from typing import List, Dict
from werkzeug.local import LocalProxy
Expand Down Expand Up @@ -319,6 +324,18 @@ def json_pagination_response(page_no: int, total_count: int, data: List):
return json.dumps({'pageNo': page_no, 'totalCount': total_count, 'data': data})


def make_zip(source_dir, target_file):
if os.path.exists(target_file):
os.remove(target_file)
zip_file = zipfile.ZipFile(os.path.join(source_dir, target_file), 'w')
for parent, dirs, files in os.walk(source_dir):
for file in files:
if file != target_file:
file_path = os.path.join(parent, file)
zip_file.write(file_path, file_path[len(os.path.dirname(source_dir)):].strip(os.path.sep))
zip_file.close()


@app.route('/')
def index():
return render_template('index.html')
Expand Down Expand Up @@ -404,10 +421,62 @@ def workflow_execution_metadata():
def job_execution_metadata():
workflow_execution_id = request.args.get('workflow_execution_id')
job_execution_list = scheduler.list_job_executions(workflow_execution_id) if workflow_execution_id else None
page_job_executions = Paginator(job_execution_list, int(request.args.get('pageSize'))).get_page(
int(request.args.get('pageNo'))).object_list if job_execution_list else None
job_execution_objects = []
if page_job_executions:
workflow_info = page_job_executions[0].workflow_execution.workflow_info
workflow_graph = extract_graph(store.get_workflow_by_name(workflow_info.namespace,
workflow_info.workflow_name))
job_types = {}
for node in workflow_graph.nodes.values():
job_types.update({node.config.job_name: node.config.job_type})
for page_job_execution in page_job_executions:
job_execution_object = page_job_execution.__dict__
job_execution_object.update({'_job_type': job_types.get(page_job_execution.job_name)})
job_execution_objects.append(job_execution_object)
return pagination_response(page_no=int(request.args.get('pageNo')),
total_count=len(job_execution_list) if job_execution_list else 0,
data=Paginator(job_execution_list, int(request.args.get('pageSize'))).get_page(
int(request.args.get('pageNo'))).object_list if job_execution_list else [])
total_count=len(job_execution_list) if job_execution_list else 0,
data=job_execution_objects)


@app.route('/job-execution/log')
def job_execution_log():
job_type = request.args.get('job_type')
job_execution_id = request.args.get('job_execution_id')
workflow_execution_id = request.args.get('workflow_execution_id')
job_executions = scheduler.get_job_executions(request.args.get('job_name'),
workflow_execution_id)
log_job_execution = None
for job_execution in job_executions:
if job_execution.job_execution_id == job_execution_id:
log_job_execution = job_execution
break
if log_job_execution:
if job_type == 'bash':
return redirect('{}/graph?dag_id={}'.format(airflow, workflow_execution_id.split('|')[0]))
else:
log_dir = os.path.join(AIFLOW_HOME, 'logs',
log_job_execution.workflow_execution.workflow_info.namespace,
log_job_execution.workflow_execution.workflow_info.workflow_name,
log_job_execution.workflow_execution.workflow_execution_id,
log_job_execution.job_name,
log_job_execution.job_execution_id)
if os.path.exists(log_dir):
log_file = 'logs.zip'
make_zip(log_dir, log_file)
file_obj = io.BytesIO()
with zipfile.ZipFile(file_obj, 'w') as zip_file:
zip_info = zipfile.ZipInfo(log_file)
zip_info.date_time = time.localtime(time.time())[:6]
zip_info.compress_type = zipfile.ZIP_DEFLATED
with open(os.path.join(log_dir, log_file), 'rb') as fd:
zip_file.writestr(zip_info, fd.read())
file_obj.seek(0)
return Response(file_obj.getvalue(),
mimetype='application/zip',
headers={'Content-Disposition': 'attachment;filename={}'.format(log_file)})
return make_response('No log found.')


@app.route('/dataset')
Expand Down
15 changes: 12 additions & 3 deletions ai_flow/runtime/job_runtime_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
# under the License.
import os
from typing import Text

from ai_flow.context.project_context import ProjectContext
from ai_flow.plugin_interface.scheduler_interface import JobExecutionInfo
from ai_flow.project.project_config import ProjectConfig
from ai_flow.settings import AIFLOW_HOME
from ai_flow.workflow.workflow_config import WorkflowConfig, load_workflow_config

from ai_flow.util import serialization_utils
Expand All @@ -34,11 +37,12 @@ class JobRuntimeEnv(object):

def __init__(self,
working_dir: Text,
job_execution_info: JobExecutionInfo = None):
job_execution_info: JobExecutionInfo = None,
project_context: ProjectContext = None):
self._working_dir: Text = working_dir
self._job_execution_info: JobExecutionInfo = job_execution_info
self._workflow_config: WorkflowConfig = None
self._project_config: ProjectConfig = None
self._project_config: ProjectConfig = project_context.project_config if project_context else None

@property
def working_dir(self) -> Text:
Expand Down Expand Up @@ -75,7 +79,12 @@ def log_dir(self) -> Text:
"""
return: The directory where job logs are stored.
"""
return os.path.join(self._working_dir, 'logs')
return os.path.join(AIFLOW_HOME, 'logs',
self.project_config.get_project_name(),
self.job_execution_info.workflow_execution.workflow_info.workflow_name,
self.job_execution_info.workflow_execution.workflow_execution_id,
self.job_execution_info.job_name,
self.job_execution_info.job_execution_id)

@property
def resource_dir(self) -> Text:
Expand Down
7 changes: 5 additions & 2 deletions ai_flow/runtime/job_runtime_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,13 @@ def prepare_job_runtime_env(workflow_generated_dir,
job_execution_info.job_name,
str(time.strftime("%Y%m%d%H%M%S", time.localtime())))
job_runtime_env: JobRuntimeEnv = JobRuntimeEnv(working_dir=working_dir,
job_execution_info=job_execution_info)
job_execution_info=job_execution_info,
project_context=project_context)
if not os.path.exists(working_dir):
os.makedirs(job_runtime_env.log_dir)
os.makedirs(working_dir)
job_runtime_env.save_job_execution_info()
if not os.path.exists(job_runtime_env.log_dir):
os.makedirs(job_runtime_env.log_dir)
if not os.path.exists(os.path.dirname(job_runtime_env.workflow_dir)):
os.makedirs(os.path.dirname(job_runtime_env.workflow_dir))
if not os.path.exists(job_runtime_env.workflow_dir):
Expand Down

0 comments on commit 382d2fb

Please sign in to comment.