Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task fails, update the 'message' information in the dataset for cr #562

Merged
merged 1 commit into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 1 addition & 11 deletions apiserver/graph/generated/generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion apiserver/graph/generated/models_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion apiserver/graph/schema/dataprocessing.graphqls
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ input AddDataProcessInput {
post_data_set_name: String!
post_data_set_version: String!
data_process_config_info: [DataProcessConfigItem!]
bucket_name: String!
version_data_set_name: String!
namespace: String!
creator: String!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,14 @@ async def add(request):
"post_data_set_name": "dataset1",
"post_data_set_version": "v2",
"version_data_set_name": "dataset1-v2",
"bucket_name": "system-tce",
"file_names": [
{
"name": "数据处理文件_小T.pdf"
}
],
"data_process_config_info": []
"data_process_config_info": [],
"creator": "",
"namespace": "abc"
}
"""
res = data_process_service.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def text_manipulate(
req_json is a dictionary object.
"""

bucket_name = req_json['bucket_name']
namespace = req_json['namespace']
support_type = req_json['data_process_config_info']
file_names = req_json['file_names']

Expand All @@ -72,9 +72,10 @@ def text_manipulate(

# update the dataset status
update_dataset = _update_dateset_status(
bucket_name=req_json['bucket_name'],
namespace=req_json['namespace'],
version_data_set_name=req_json['version_data_set_name'],
reason='processing',
message='Data processing in progress',
task_id=id,
log_id=log_id,
creator=req_json.get('creator'),
Expand Down Expand Up @@ -130,7 +131,7 @@ def text_manipulate(
# 将文件下载到本地
minio_store_client.download(
minio_client,
bucket_name=bucket_name,
bucket_name=namespace,
folder_prefix=folder_prefix,
file_name=file_name
)
Expand Down Expand Up @@ -328,17 +329,18 @@ def text_manipulate(
minio_store_client.upload_files_to_minio_with_tags(
minio_client=minio_client,
local_folder=file_path + 'final',
minio_bucket=bucket_name,
minio_bucket=namespace,
minio_prefix=folder_prefix,
support_type=support_type,
data_volumes_file=data_volumes_file
)

# update the dataset status
update_dataset = _update_dateset_status(
bucket_name=req_json['bucket_name'],
namespace=req_json['namespace'],
version_data_set_name=req_json['version_data_set_name'],
reason=task_status,
message=error_msg,
task_id=id,
log_id=log_id,
creator=req_json.get('creator'),
Expand Down Expand Up @@ -417,9 +419,10 @@ def text_manipulate_retry(

# 更新数据集状态
update_dataset = _update_dateset_status(
bucket_name=task_info_dict.get('namespace'),
namespace=task_info_dict.get('namespace'),
version_data_set_name=task_info_dict.get('pre_version_data_set_name'),
reason='processing',
message='Data processing in progress',
task_id=task_id,
log_id=log_id,
creator=creator,
Expand Down Expand Up @@ -515,17 +518,18 @@ def text_manipulate_retry(
minio_store_client.upload_files_to_minio_with_tags(
minio_client=minio_client,
local_folder=file_path + 'final',
minio_bucket=task_info_dict.get('bucket_name'),
minio_bucket=task_info_dict.get('namespace'),
minio_prefix=folder_prefix,
support_type=task_info_dict.get('data_process_config_info'),
data_volumes_file=data_volumes_file
)

# 更新数据集状态
update_dataset = _update_dateset_status(
bucket_name=task_info_dict.get('namespace'),
namespace=task_info_dict.get('namespace'),
version_data_set_name=task_info_dict.get('pre_version_data_set_name'),
reason=task_status,
message=error_msg,
task_id=task_id,
log_id=log_id,
creator=creator,
Expand Down Expand Up @@ -597,9 +601,10 @@ def _remove_local_file(file_name):
}

def _update_dateset_status(
bucket_name,
namespace,
version_data_set_name,
reason,
message,
task_id,
log_id,
creator,
Expand All @@ -608,21 +613,22 @@ def _update_dateset_status(
logger.debug(''.join([
f"{log_tag_const.MINIO_STORE_PROCESS} update dataset status \n",
f"task_id: {task_id}\n",
f"bucket_name: {bucket_name}\n",
f"namespace: {namespace}\n",
f"version_data_set_name: {version_data_set_name}\n",
f"reason: {reason}"
]))
update_dataset = dataset_cr.update_dataset_k8s_cr(
bucket_name=bucket_name,
namespace=namespace,
version_data_set_name=version_data_set_name,
reason=reason
reason=reason,
message=message
)

if update_dataset['status'] != 200:
logger.error(''.join([
f"{log_tag_const.MINIO_STORE_PROCESS} update dataset status \n",
f"task_id: {task_id}\n",
f"bucket_name: {bucket_name}\n",
f"namespace: {namespace}\n",
f"version_data_set_name: {version_data_set_name}\n",
f"reason: {reason}"
]))
Expand Down Expand Up @@ -1019,7 +1025,7 @@ def _text_manipulate_retry_for_document(
# 将文件下载到本地
minio_store_client.download(
minio_client,
bucket_name=task_info.get('bucket_name'),
bucket_name=task_info.get('namespace'),
folder_prefix=folder_prefix,
file_name=file_name
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ def add(
'file_type': req_json['file_type'],
'status': 'processing',
'namespace': req_json['namespace'],
'bucket_name': req_json['bucket_name'],
'pre_data_set_name': req_json['pre_data_set_name'],
'pre_data_set_version': req_json['pre_data_set_version'],
'pre_version_data_set_name': req_json['version_data_set_name'],
Expand All @@ -144,7 +143,6 @@ def add(
file_type,
status,
namespace,
bucket_name,
pre_data_set_name,
pre_data_set_version,
file_names,
Expand All @@ -166,7 +164,6 @@ def add(
%(file_type)s,
%(status)s,
%(namespace)s,
%(bucket_name)s,
%(pre_data_set_name)s,
%(pre_data_set_version)s,
%(file_names)s,
Expand Down Expand Up @@ -246,7 +243,6 @@ def info_by_id(
dpt.data_process_config_info,
dpt.start_datetime,
dpt.end_datetime,
dpt.bucket_name,
dpt.namespace,
dpt.pre_version_data_set_name,
dpt.create_user,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,16 +457,21 @@ def query_question_answer_list(

sql = """
select
id,
task_id,
document_id,
document_chunk_id,
file_name,
question,
answer
from public.data_process_task_question_answer_clean
dptqa.id,
dptqa.task_id,
dptqa.document_id,
dptqa.document_chunk_id,
dptqa.file_name,
dptqa.question,
dptqa.answer,
dptdc.content,
dptdc.page_number
from public.data_process_task_question_answer dptqa
left join public.data_process_task_document_chunk dptdc
on
dptdc.id = dptqa.document_chunk_id
where
document_id = %(document_id)s
dptqa.document_id = %(document_id)s
""".strip()

res = postgresql_pool_client.execute_query(pool, sql, params)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,14 @@ def text_manipulate(
pool=conn_pool
)

qa_data_dict = [['q', 'a']]
qa_data_dict = [['q', 'a', 'file_name', 'page_number', 'chunk_content']]
for item in qa_list.get('data'):
qa_data_dict.append([
item.get('question'),
item.get('answer')
item.get('answer'),
item.get('file_name'),
item.get('page_number'),
item.get('content')
])

# Save the csv file.
Expand Down
23 changes: 13 additions & 10 deletions data-processing/data_manipulation/kube/dataset_cr.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,22 @@
logger = logging.getLogger(__name__)

def update_dataset_k8s_cr(
bucket_name,
namespace,
version_data_set_name,
reason
reason,
message
):
""" Update the condition info for the dataset.

bucket_name: bucket name;
namespace: namespace;
version_data_set_name: version dataset name;
reason: the update reason;
"""
try:
kube = client.KubeEnv()

one_cr_datasets = kube.get_versioneddatasets_status(
bucket_name,
namespace,
version_data_set_name
)

Expand All @@ -56,18 +57,20 @@ def update_dataset_k8s_cr(
'lastTransitionTime': now_utc_str,
'reason': reason,
'status': "True",
"type": "DataProcessing"
"type": "DataProcessing",
"message": message
})
else:
conditions[found_index] = {
'lastTransitionTime': now_utc_str,
'reason': reason,
'status': "True",
"type": "DataProcessing"
"type": "DataProcessing",
"message": message
}

kube.patch_versioneddatasets_status(
bucket_name,
namespace,
version_data_set_name,
{
'status': {
Expand All @@ -90,20 +93,20 @@ def update_dataset_k8s_cr(
}

def get_dataset_status_k8s_cr(
bucket_name,
namespace,
version_data_set_name
):
""" get the condition info for the dataset.

bucket_name: bucket name;
namespace: namespace;
version_data_set_name: version dataset name;
"""
try:
dataset_status = None
kube = client.KubeEnv()

one_cr_datasets = kube.get_versioneddatasets_status(
bucket_name,
namespace,
version_data_set_name
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def add(
"post_data_set_name": "dataset1",
"post_data_set_version": "v2",
"version_data_set_name": "dataset1-v2",
"bucket_name": "system-tce",
"namespace": "system-tce",
"file_names": [
{
"name": "数据处理文件_小T.pdf"
Expand Down
2 changes: 0 additions & 2 deletions data-processing/db-scripts/init-database-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@
update_user character varying(32) COLLATE pg_catalog."default",
update_program character varying(64) COLLATE pg_catalog."default",
namespace character varying(64) COLLATE pg_catalog."default",
bucket_name character varying(64) COLLATE pg_catalog."default",
current_log_id character varying(32) COLLATE pg_catalog."default",
CONSTRAINT data_process_task_pkey PRIMARY KEY (id)
);

COMMENT ON COLUMN public.data_process_task.bucket_name IS 'bucket name';
COMMENT ON COLUMN public.data_process_task.current_log_id IS '当前日志Id';
COMMENT ON COLUMN public.data_process_task.pre_version_data_set_name IS '处理前数据集版本信息';

Expand Down
2 changes: 1 addition & 1 deletion deploy/charts/arcadia/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ apiVersion: v2
name: arcadia
description: A Helm chart(KubeBB Component) for KubeAGI Arcadia
type: application
version: 0.2.10
version: 0.2.11
appVersion: "0.1.0"

keywords:
Expand Down
2 changes: 0 additions & 2 deletions deploy/charts/arcadia/templates/pg-init-data-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,10 @@ data:
update_user character varying(32) COLLATE pg_catalog."default",
update_program character varying(64) COLLATE pg_catalog."default",
namespace character varying(64) COLLATE pg_catalog."default",
bucket_name character varying(64) COLLATE pg_catalog."default",
current_log_id character varying(32) COLLATE pg_catalog."default",
CONSTRAINT data_process_task_pkey PRIMARY KEY (id)
);

COMMENT ON COLUMN public.data_process_task.bucket_name IS 'bucket name';
COMMENT ON COLUMN public.data_process_task.current_log_id IS '当前日志Id';
COMMENT ON COLUMN public.data_process_task.pre_version_data_set_name IS '处理前数据集版本信息';

Expand Down