diff --git a/ai_flow/endpoint/server/server.py b/ai_flow/endpoint/server/server.py
index 6bb4c9151..2161f2cd5 100644
--- a/ai_flow/endpoint/server/server.py
+++ b/ai_flow/endpoint/server/server.py
@@ -41,6 +41,7 @@
from ai_flow.model_center.service.service import ModelCenterService
from ai_flow.protobuf.high_availability_pb2_grpc import add_HighAvailabilityManagerServicer_to_server
from ai_flow.scheduler_service.service.service import SchedulerService, SchedulerServiceConfig
+from ai_flow.settings import AIFLOW_HOME
from ai_flow.store.db.base_model import base
from ai_flow.store.db.db_util import extract_db_engine_from_uri, create_db_store
from ai_flow.store.mongo_store import MongoStoreConnManager
@@ -76,7 +77,8 @@ def __init__(self,
ha_manager=None,
ha_server_uri=None,
ha_storage=None,
- ttl_ms: int = 10000):
+ ttl_ms: int = 10000,
+ base_log_folder: str = AIFLOW_HOME):
self.store_uri = store_uri
self.db_type = DBType.value_of(extract_db_engine_from_uri(store_uri))
self.executor = Executor(futures.ThreadPoolExecutor(max_workers=10))
@@ -99,7 +101,7 @@ def __init__(self,
metric_service_pb2_grpc.add_MetricServiceServicer_to_server(MetricService(db_uri=store_uri), self.server)
if start_scheduler_service:
- self._add_scheduler_service(scheduler_service_config, store_uri, notification_server_uri)
+ self._add_scheduler_service(scheduler_service_config, store_uri, notification_server_uri, base_log_folder)
if enabled_ha:
self._add_ha_service(ha_manager, ha_server_uri, ha_storage, store_uri, ttl_ms)
@@ -108,10 +110,10 @@ def __init__(self,
self._stop = threading.Event()
- def _add_scheduler_service(self, scheduler_service_config, db_uri, notification_server_uri):
+ def _add_scheduler_service(self, scheduler_service_config, db_uri, notification_server_uri, base_log_folder):
logging.info("start scheduler service.")
real_config = SchedulerServiceConfig(scheduler_service_config)
- self.scheduler_service = SchedulerService(real_config, db_uri, notification_server_uri)
+ self.scheduler_service = SchedulerService(real_config, db_uri, notification_server_uri, base_log_folder)
scheduling_service_pb2_grpc.add_SchedulingServiceServicer_to_server(self.scheduler_service,
self.server)
diff --git a/ai_flow/endpoint/server/server_config.py b/ai_flow/endpoint/server/server_config.py
index dca7f44bc..1274213c6 100644
--- a/ai_flow/endpoint/server/server_config.py
+++ b/ai_flow/endpoint/server/server_config.py
@@ -16,7 +16,6 @@
# specific language governing permissions and limitations
# under the License.
#
-import os
from ai_flow.common.configuration import AIFlowConfiguration
from enum import Enum
from typing import Text
@@ -135,3 +134,6 @@ def get_wait_for_server_started_timeout(self):
return self.get('wait_for_server_started_timeout')
else:
return 5.0
+
+ def get_base_log_folder(self):
+ return self.get('base_log_folder')
diff --git a/ai_flow/endpoint/server/server_runner.py b/ai_flow/endpoint/server/server_runner.py
index 4e63cfbfe..b3236a25a 100644
--- a/ai_flow/endpoint/server/server_runner.py
+++ b/ai_flow/endpoint/server/server_runner.py
@@ -23,6 +23,7 @@
from ai_flow.endpoint.server.server import AIFlowServer
from ai_flow.endpoint.server.server_config import AIFlowServerConfig
from ai_flow.client.ai_flow_client import get_ai_flow_client
+from ai_flow.settings import AIFLOW_HOME
from ai_flow.util.net_utils import get_ip_addr
import logging
@@ -78,7 +79,10 @@ def start(self,
scheduler_service_config=self.server_config.get_scheduler_service_config(),
enabled_ha=self.server_config.get_enable_ha(),
ha_server_uri=get_ip_addr() + ":" + str(self.server_config.get_server_port()),
- ttl_ms=self.server_config.get_ha_ttl_ms())
+ ttl_ms=self.server_config.get_ha_ttl_ms(),
+ base_log_folder=self.server_config.get_base_log_folder()
+ if self.server_config.get_base_log_folder()
+ else AIFLOW_HOME)
self.server.run(is_block=is_block)
if not is_block:
self._wait_for_server_available(timeout=self.server_config.get_wait_for_server_started_timeout())
diff --git a/ai_flow/frontend/src/views/metadata/JobExecution.vue b/ai_flow/frontend/src/views/metadata/JobExecution.vue
index 9fda3e4e3..2c0fc3997 100644
--- a/ai_flow/frontend/src/views/metadata/JobExecution.vue
+++ b/ai_flow/frontend/src/views/metadata/JobExecution.vue
@@ -45,6 +45,11 @@ limitations under the License. -->
{{ text }}
+
+
+ Log
+
+
@@ -114,6 +119,12 @@ const columns = [
title: 'End Date',
dataIndex: '_end_date',
customRender: (t) => formateDate(new Date(parseInt(t)), 'YYYY-MM-dd hh:mm')
+ },
+ {
+ title: 'Action',
+ dataIndex: 'action',
+ width: '150px',
+ scopedSlots: { customRender: 'action' }
}
]
@@ -145,6 +156,13 @@ export default {
this.getAIFlowVersion()
},
methods: {
+ handleLog (record) {
+ if (record._job_type === 'bash' || record._job_type === 'python' || record._job_type === 'flink') {
+ window.open(`/job-execution/log?workflow_execution_id=${encodeURIComponent(this.queryParam.workflow_execution_id)}&job_name=${encodeURIComponent(record._job_name)}&job_type=${encodeURIComponent(record._job_type)}&job_execution_id=${encodeURIComponent(record._job_execution_id)}`, '_blank')
+ } else {
+ alert(`Viewing logs of ${record._job_type} type of job is not supported.`)
+ }
+ },
resetSearchForm () {
this.queryParam = {
date: moment(new Date())
diff --git a/ai_flow/frontend/web_server.py b/ai_flow/frontend/web_server.py
index 060a3f89b..6275cd283 100644
--- a/ai_flow/frontend/web_server.py
+++ b/ai_flow/frontend/web_server.py
@@ -17,10 +17,13 @@
# under the License.
#
import getopt
+import io
import json
import logging
+import os
import sys
-from logging.config import dictConfig
+import time
+import zipfile
from ai_flow.ai_graph.ai_graph import AIGraph
from ai_flow.ai_graph.ai_node import AINode, ReadDatasetNode, WriteDatasetNode
@@ -29,14 +32,16 @@
from ai_flow.meta.workflow_meta import WorkflowMeta
from ai_flow.plugin_interface.scheduler_interface import Scheduler, SchedulerFactory
from ai_flow.scheduler_service.service.config import SchedulerServiceConfig
+from ai_flow.settings import AIFLOW_HOME
from ai_flow.store.abstract_store import Filters, AbstractStore, Orders
from ai_flow.store.db.db_util import create_db_store
from ai_flow.util.json_utils import loads, Jsonable, dumps
from ai_flow.version import __version__
from ai_flow.workflow.control_edge import ControlEdge
from django.core.paginator import Paginator
-from flask import Flask, request, render_template
+from flask import Flask, request, render_template, Response, make_response, redirect
from flask_cors import CORS
+from logging.config import dictConfig
from typing import List, Dict
from werkzeug.local import LocalProxy
@@ -65,6 +70,7 @@ def config_logging():
store: AbstractStore = None
scheduler: Scheduler = None
airflow: str = None
+config: AIFlowServerConfig = None
logger = logging.getLogger(__name__)
@@ -319,6 +325,18 @@ def json_pagination_response(page_no: int, total_count: int, data: List):
return json.dumps({'pageNo': page_no, 'totalCount': total_count, 'data': data})
+def make_zip(source_dir, target_file):
+ if os.path.exists(target_file):
+ os.remove(target_file)
+ zip_file = zipfile.ZipFile(os.path.join(source_dir, target_file), 'w')
+ for parent, dirs, files in os.walk(source_dir):
+ for file in files:
+ if file != target_file:
+ file_path = os.path.join(parent, file)
+ zip_file.write(file_path, file_path[len(os.path.dirname(source_dir)):].strip(os.path.sep))
+ zip_file.close()
+
+
@app.route('/')
def index():
return render_template('index.html')
@@ -404,10 +422,62 @@ def workflow_execution_metadata():
def job_execution_metadata():
workflow_execution_id = request.args.get('workflow_execution_id')
job_execution_list = scheduler.list_job_executions(workflow_execution_id) if workflow_execution_id else None
+ page_job_executions = Paginator(job_execution_list, int(request.args.get('pageSize'))).get_page(
+ int(request.args.get('pageNo'))).object_list if job_execution_list else None
+ job_execution_objects = []
+ if page_job_executions:
+ workflow_info = page_job_executions[0].workflow_execution.workflow_info
+ workflow_graph = extract_graph(store.get_workflow_by_name(workflow_info.namespace,
+ workflow_info.workflow_name))
+ job_types = {}
+ for node in workflow_graph.nodes.values():
+ job_types.update({node.config.job_name: node.config.job_type})
+ for page_job_execution in page_job_executions:
+ job_execution_object = page_job_execution.__dict__
+ job_execution_object.update({'_job_type': job_types.get(page_job_execution.job_name)})
+ job_execution_objects.append(job_execution_object)
return pagination_response(page_no=int(request.args.get('pageNo')),
- total_count=len(job_execution_list) if job_execution_list else 0,
- data=Paginator(job_execution_list, int(request.args.get('pageSize'))).get_page(
- int(request.args.get('pageNo'))).object_list if job_execution_list else [])
+ total_count=len(job_execution_list) if job_execution_list else 0,
+ data=job_execution_objects)
+
+
+@app.route('/job-execution/log')
+def job_execution_log():
+ job_type = request.args.get('job_type')
+ job_execution_id = request.args.get('job_execution_id')
+ workflow_execution_id = request.args.get('workflow_execution_id')
+ job_executions = scheduler.get_job_executions(request.args.get('job_name'),
+ workflow_execution_id)
+ log_job_execution = None
+ for job_execution in job_executions:
+ if job_execution.job_execution_id == job_execution_id:
+ log_job_execution = job_execution
+ break
+ if log_job_execution:
+ if job_type == 'bash':
+ return redirect('{}/graph?dag_id={}'.format(airflow, workflow_execution_id.split('|')[0]))
+ else:
+ base_log_folder = config.get_base_log_folder()
+ log_dir = os.path.join(base_log_folder if base_log_folder else AIFLOW_HOME, 'logs',
+ log_job_execution.workflow_execution.workflow_info.namespace,
+ log_job_execution.workflow_execution.workflow_info.workflow_name,
+ log_job_execution.job_name,
+ log_job_execution.job_execution_id)
+ if os.path.exists(log_dir):
+ log_file = 'logs.zip'
+ make_zip(log_dir, log_file)
+ file_obj = io.BytesIO()
+ with zipfile.ZipFile(file_obj, 'w') as zip_file:
+ zip_info = zipfile.ZipInfo(log_file)
+ zip_info.date_time = time.localtime(time.time())[:6]
+ zip_info.compress_type = zipfile.ZIP_DEFLATED
+ with open(os.path.join(log_dir, log_file), 'rb') as fd:
+ zip_file.writestr(zip_info, fd.read())
+ file_obj.seek(0)
+ return Response(file_obj.getvalue(),
+ mimetype='application/zip',
+ headers={'Content-Disposition': 'attachment;filename={}'.format(log_file)})
+ return make_response('No log found.')
@app.route('/dataset')
@@ -505,6 +575,7 @@ def main(argv):
def start_web_server_by_config_file_path(config_file_path):
+ global config
config = AIFlowServerConfig()
config.load_from_file(config_file_path)
store_uri = config.get_db_uri()
diff --git a/ai_flow/runtime/job_runtime_env.py b/ai_flow/runtime/job_runtime_env.py
index 549b9d352..5b7205890 100644
--- a/ai_flow/runtime/job_runtime_env.py
+++ b/ai_flow/runtime/job_runtime_env.py
@@ -16,8 +16,11 @@
# under the License.
import os
from typing import Text
+
+from ai_flow.context.project_context import ProjectContext
from ai_flow.plugin_interface.scheduler_interface import JobExecutionInfo
from ai_flow.project.project_config import ProjectConfig
+from ai_flow.settings import AIFLOW_HOME
from ai_flow.workflow.workflow_config import WorkflowConfig, load_workflow_config
from ai_flow.util import serialization_utils
@@ -34,11 +37,14 @@ class JobRuntimeEnv(object):
def __init__(self,
working_dir: Text,
- job_execution_info: JobExecutionInfo = None):
+ job_execution_info: JobExecutionInfo = None,
+ project_context: ProjectContext = None,
+ base_log_folder: Text = None):
self._working_dir: Text = working_dir
self._job_execution_info: JobExecutionInfo = job_execution_info
self._workflow_config: WorkflowConfig = None
- self._project_config: ProjectConfig = None
+ self._project_config: ProjectConfig = project_context.project_config if project_context else None
+ self._base_log_folder: Text = base_log_folder if base_log_folder else AIFLOW_HOME
@property
def working_dir(self) -> Text:
@@ -75,7 +81,11 @@ def log_dir(self) -> Text:
"""
return: The directory where job logs are stored.
"""
- return os.path.join(self._working_dir, 'logs')
+ return os.path.join(self._base_log_folder, 'logs',
+ self.project_config.get_project_name(),
+ self.job_execution_info.workflow_execution.workflow_info.workflow_name,
+ self.job_execution_info.job_name,
+ self.job_execution_info.job_execution_id)
@property
def resource_dir(self) -> Text:
diff --git a/ai_flow/runtime/job_runtime_util.py b/ai_flow/runtime/job_runtime_util.py
index 965e579bb..7c7a0346e 100644
--- a/ai_flow/runtime/job_runtime_util.py
+++ b/ai_flow/runtime/job_runtime_util.py
@@ -26,7 +26,8 @@ def prepare_job_runtime_env(workflow_generated_dir,
workflow_name,
project_context: ProjectContext,
job_execution_info: JobExecutionInfo,
- root_working_dir=None) -> JobRuntimeEnv:
+ root_working_dir=None,
+ base_log_folder=None) -> JobRuntimeEnv:
"""
Prepare the operating environment for the ai flow job(ai_flow.workflow.job.Job)
:param workflow_generated_dir: The generated directory of workflow.
@@ -34,6 +35,7 @@ def prepare_job_runtime_env(workflow_generated_dir,
:param project_context: The context of the project which the job belongs.
:param job_execution_info: The information of the execution of the job.
:param root_working_dir: The working directory of the execution of the job(ai_flow.workflow.job.Job).
+ :param base_log_folder: The base folder of the logs.
:return: ai_flow.runtime.job_runtime_env.JobRuntimeEnv object.
"""
working_dir = os.path.join(root_working_dir,
@@ -41,10 +43,14 @@ def prepare_job_runtime_env(workflow_generated_dir,
job_execution_info.job_name,
str(time.strftime("%Y%m%d%H%M%S", time.localtime())))
job_runtime_env: JobRuntimeEnv = JobRuntimeEnv(working_dir=working_dir,
- job_execution_info=job_execution_info)
+ job_execution_info=job_execution_info,
+ project_context=project_context,
+ base_log_folder=base_log_folder)
if not os.path.exists(working_dir):
- os.makedirs(job_runtime_env.log_dir)
+ os.makedirs(working_dir)
job_runtime_env.save_job_execution_info()
+ if not os.path.exists(job_runtime_env.log_dir):
+ os.makedirs(job_runtime_env.log_dir)
if not os.path.exists(os.path.dirname(job_runtime_env.workflow_dir)):
os.makedirs(os.path.dirname(job_runtime_env.workflow_dir))
if not os.path.exists(job_runtime_env.workflow_dir):
diff --git a/ai_flow/scheduler_service/service/service.py b/ai_flow/scheduler_service/service/service.py
index 3f61f2c01..45e54cb1a 100644
--- a/ai_flow/scheduler_service/service/service.py
+++ b/ai_flow/scheduler_service/service/service.py
@@ -50,8 +50,10 @@ class SchedulerService(SchedulingServiceServicer):
def __init__(self,
scheduler_service_config: SchedulerServiceConfig,
db_uri,
- notification_server_uri):
+ notification_server_uri,
+ base_log_folder):
self._scheduler_service_config = scheduler_service_config
+ scheduler_service_config.scheduler().scheduler_config().update({'base_log_folder': base_log_folder})
self._scheduler: Scheduler \
= SchedulerFactory.create_scheduler(scheduler_service_config.scheduler().scheduler_class(),
scheduler_service_config.scheduler().scheduler_config())
diff --git a/ai_flow_plugins/scheduler_plugins/airflow/ai_flow_operator.py b/ai_flow_plugins/scheduler_plugins/airflow/ai_flow_operator.py
index 7f600394d..1c50e84ff 100644
--- a/ai_flow_plugins/scheduler_plugins/airflow/ai_flow_operator.py
+++ b/ai_flow_plugins/scheduler_plugins/airflow/ai_flow_operator.py
@@ -86,6 +86,7 @@ def __init__(
job: Job,
workflow: Workflow,
resource_dir: Text = None,
+ base_log_folder: Text = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -98,6 +99,7 @@ def __init__(
self.job_handle: JobHandle = None
self.job_runtime_env: JobRuntimeEnv = None
self.resource_dir = resource_dir
+ self.base_log_folder = base_log_folder
def context_to_job_info(self, project_name: Text, context: Any) -> JobExecutionInfo:
"""
@@ -158,7 +160,8 @@ def pre_execute(self, context: Any):
workflow_name=self.workflow.workflow_name,
project_context=project_context,
job_execution_info=job_execution_info,
- root_working_dir=root_working_dir)
+ root_working_dir=root_working_dir,
+ base_log_folder=self.base_log_folder)
def execute(self, context: Any):
self.log.info("context:" + str(context))
diff --git a/ai_flow_plugins/scheduler_plugins/airflow/airflow_scheduler.py b/ai_flow_plugins/scheduler_plugins/airflow/airflow_scheduler.py
index 6113783e4..a61e0fbaa 100644
--- a/ai_flow_plugins/scheduler_plugins/airflow/airflow_scheduler.py
+++ b/ai_flow_plugins/scheduler_plugins/airflow/airflow_scheduler.py
@@ -133,6 +133,7 @@ def submit_workflow(self, workflow: Workflow, context_extractor, project_context
code_text = self.dag_generator.generate(workflow=workflow,
project_name=project_context.project_name,
resource_dir=self.config.get('resource_dir'),
+ base_log_folder=self.config.get('base_log_folder'),
context_extractor=context_extractor)
self._check_configurable_path('airflow_deploy_path')
airflow_file_path = self._write_to_deploy_path(code_text, dag_id + ".py",
diff --git a/ai_flow_plugins/scheduler_plugins/airflow/dag_generator.py b/ai_flow_plugins/scheduler_plugins/airflow/dag_generator.py
index f6f76c5bc..2a7b942ee 100644
--- a/ai_flow_plugins/scheduler_plugins/airflow/dag_generator.py
+++ b/ai_flow_plugins/scheduler_plugins/airflow/dag_generator.py
@@ -88,18 +88,19 @@ class DAGGenerator(object):
def __init__(self):
self.op_count = -1
- def generate_op_code(self, job, resource_dir):
+ def generate_op_code(self, job, resource_dir, base_log_folder):
self.op_count += 1
OP_DEFINE = """
job_json_{0} = '''{1}'''
job_{0} = json_utils.loads(job_json_{0})
-op_{0} = AIFlowOperator(task_id='{2}', job=job_{0}, workflow=workflow, dag=dag, resource_dir='{3}'"""
+op_{0} = AIFlowOperator(task_id='{2}', job=job_{0}, workflow=workflow, dag=dag, resource_dir='{3}', base_log_folder='{4}'"""
if 'airflow_args' in job.job_config.properties and len(job.job_config.properties.get('airflow_args')) > 0:
op_code = OP_DEFINE.format(self.op_count,
json_utils.dumps(job),
job.job_name,
- resource_dir)
+ resource_dir,
+ base_log_folder)
airflow_args = job.job_config.properties.get('airflow_args')
end_code = ''
for k, v in airflow_args.items():
@@ -111,7 +112,8 @@ def generate_op_code(self, job, resource_dir):
return 'op_{}'.format(self.op_count), OP_DEFINE.format(self.op_count,
json_utils.dumps(job),
job.job_name,
- resource_dir) + ')\n'
+ resource_dir,
+ base_log_folder) + ')\n'
def generate_upstream(self, op_1, op_2):
return DAGTemplate.UPSTREAM_OP.format(op_1, op_2)
@@ -136,6 +138,7 @@ def generate(self,
workflow: Workflow,
project_name: Text,
resource_dir: Text,
+ base_log_folder: Text,
context_extractor: ContextExtractor = BroadcastAllContextExtractor()) -> Text:
code_text = DAGTemplate.AIRFLOW_IMPORT
code_text += import_job_plugins_text(workflow)
@@ -166,7 +169,7 @@ def dict_code_text(data: Dict) -> Text:
task_map = {}
for name, job in workflow.jobs.items():
- op_name, code = self.generate_op_code(job, resource_dir)
+ op_name, code = self.generate_op_code(job, resource_dir, base_log_folder)
code_text += code
task_map[job.job_name] = op_name
if name in workflow.workflow_config.job_periodic_config_dict:
diff --git a/ai_flow_plugins/tests/scheduler_plugins/airflow/mock_scheduler_plugin.py b/ai_flow_plugins/tests/scheduler_plugins/airflow/mock_scheduler_plugin.py
index f1a8f005a..671f1b010 100644
--- a/ai_flow_plugins/tests/scheduler_plugins/airflow/mock_scheduler_plugin.py
+++ b/ai_flow_plugins/tests/scheduler_plugins/airflow/mock_scheduler_plugin.py
@@ -42,7 +42,8 @@ def stop_workflow_execution_by_context(self, workflow_name: Text, context: Text)
def submit_workflow(self, workflow: Workflow, context_extractor, project_context: ProjectContext) -> WorkflowInfo:
code_text = self.dag_generator.generate(workflow=workflow,
project_name=project_context.project_name,
- resource_dir='/tmp')
+ resource_dir='/tmp',
+ base_log_folder='/tmp')
return WorkflowInfo(workflow_name=workflow.workflow_name, properties={'code': code_text})
def delete_workflow(self, project_name: Text, workflow_name: Text) -> WorkflowInfo: