From cc220f55edcbf3fc91ba6bd8853b2e9f79635067 Mon Sep 17 00:00:00 2001 From: wangxinbiao <1412146116@qq.com> Date: Thu, 14 Dec 2023 14:14:45 +0800 Subject: [PATCH] fix:optimization of QA splitting --- data-processing/Dockerfile | 26 +- data-processing/README.md | 123 ++++ .../data_manipulation/common/config.py | 273 ++----- .../data_manipulation/common/log_tag_const.py | 2 + data-processing/data_manipulation/config.yml | 36 - .../data_store_process/minio_store_process.py | 28 +- .../data_process_db_operate.py | 2 +- .../file_handle/common_handle.py | 666 ++++++++++++++++++ .../file_handle/pdf_handle.py | 525 +------------- .../file_handle/word_handle.py | 78 ++ .../data_manipulation/kube/client.py | 38 +- .../kube/custom_resources.py | 2 + .../data_manipulation/kube/dataset_cr.py | 53 +- .../data_manipulation/kube/minio_cr.py | 78 ++ .../data_manipulation/kube/model_cr.py | 133 ++++ .../data_manipulation/kube/postgresql_cr.py | 52 ++ .../llm_api_service/qa_provider_open_ai.py | 85 ++- .../qa_provider_zhi_pu_ai_online.py | 85 ++- .../llm_prompt_template/open_ai_prompt.py | 2 +- .../llm_prompt_template/zhi_pu_ai_prompt.py | 2 +- data-processing/data_manipulation/server.py | 34 - .../service/data_process_service.py | 93 ++- .../transform/text/clean_transform.py | 14 +- .../data_manipulation/utils/docx_utils.py | 31 + data-processing/database/base.sql | 2 - data-processing/requirements.txt | 2 + 26 files changed, 1555 insertions(+), 910 deletions(-) create mode 100644 data-processing/README.md delete mode 100644 data-processing/data_manipulation/config.yml create mode 100644 data-processing/data_manipulation/file_handle/common_handle.py create mode 100644 data-processing/data_manipulation/file_handle/word_handle.py create mode 100644 data-processing/data_manipulation/kube/minio_cr.py create mode 100644 data-processing/data_manipulation/kube/model_cr.py create mode 100644 data-processing/data_manipulation/kube/postgresql_cr.py create mode 100644 data-processing/data_manipulation/utils/docx_utils.py diff --git a/data-processing/Dockerfile b/data-processing/Dockerfile index 86ad0ae59..f995bbb19 100644 --- a/data-processing/Dockerfile +++ b/data-processing/Dockerfile @@ -10,35 +10,13 @@ RUN export DEBIAN_FRONTEND=noninteractive \ && ln -fs /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \ && dpkg-reconfigure --frontend noninteractive tzdata \ && apt-get install -y python3-distutils curl python3-pip \ - && apt-get install -y wget + && apt-get install -y wget \ + && apt-get install -y opencc RUN wget https://github.com/explosion/spacy-models/releases/download/zh_core_web_sm-3.5.0/zh_core_web_sm-3.5.0-py3-none-any.whl -O /tmp/zh_core_web_sm-3.5.0-py3-none-any.whl \ && pip3 install /tmp/zh_core_web_sm-3.5.0-py3-none-any.whl -i https://pypi.org/simple \ && rm /tmp/zh_core_web_sm-3.5.0-py3-none-any.whl - -ENV MINIO_ACCESSKEY=minio_accesskey -ENV MINIO_SECRETKEY=minio_secretkey -ENV MINIO_API_URL=localhost:9000 -ENV MINIO_SECURE=False -ENV MINIO_DATASET_PREFIX=dataset - -ENV LLM_USE_TYPE=xxxxx -ENV LLM_QA_RETRY_COUNT=xxxxx -ENV OPEN_AI_DEFAULT_KEY=xxxxx -ENV OPEN_AI_DEFAULT_BASE_URL=xxxxx -ENV OPEN_AI_DEFAULT_MODEL=xxxxx -ENV ZHIPUAI_API_KEY=xxxxx - -ENV KNOWLEDGE_CHUNK_SIZE=500 -ENV KNOWLEDGE_CHUNK_OVERLAP=50 - -ENV PG_HOST=localhost -ENV PG_PORT=5432 -ENV PG_USER=postgres -ENV PG_PASSWORD=xxxxx -ENV PG_DATABASE=data_process - EXPOSE 28888 ADD . /arcadia_app/ diff --git a/data-processing/README.md b/data-processing/README.md new file mode 100644 index 000000000..e6e7aace9 --- /dev/null +++ b/data-processing/README.md @@ -0,0 +1,123 @@ +# Data Processing + +## Current Version Main Features + +Data Processing is used for data processing through MinIO, databases, Web APIs, etc. The data types handled include: +- txt +- json +- doc +- html +- excel +- csv +- pdf +- markdown +- ppt + +### Current Text Type Processing + +The data processing process includes: cleaning abnormal data, filtering, de-duplication, and anonymization. + +## Design + +![Design](../assets/data_process.drawio.png) + +## Local Development +### Software Requirements + +Before setting up the local data-process environment, please make sure the following software is installed: + +- Python 3.10.x + +### Environment Setup + +Install the Python dependencies in the requirements.txt file + +### Running + +Run the server.py file in the data_manipulation directory + +# isort +isort is a tool for sorting imports alphabetically within your Python code. It helps maintain a consistent and clean import order. + +## install +```shell +pip install isort +``` + +## isort a file +```shell +isort server.py +``` + +## isort a directory +```shell +isort data_manipulation +``` + + +# config.yml +## dev phase +The example config.yml is as the following: +```yaml +minio: + access_key: '${MINIO_ACCESSKEY: hpU4SCmj5jixxx}' + secret_key: '${MINIO_SECRETKEY: xxx}' + api_url: '${MINIO_API_URL: 172.22.96.136.nip.io}' + secure: '${MINIO_SECURE: True}' + dataset_prefix: '${MINIO_DATASET_PREFIX: dataset}' + +llm: + qa_retry_count: '${LLM_QA_RETRY_COUNT: 100}' + +knowledge: + chunk_size: '${KNOWLEDGE_CHUNK_SIZE: 500}' + chunk_overlap: '${KNOWLEDGE_CHUNK_OVERLAP: 50}' + +backendPg: + host: '${PG_HOST: localhost}' + port: '${PG_PORT: 5432}' + user: '${PG_USER: postgres}' + password: '${PG_PASSWORD: 123456}' + database: '${PG_DATABASE: arcadia}' + +kubernetes: + default_config: '${DEFAULT_CONFIG: arcadia-config}' + pod_namespace: '${POD_NAMESPACE: arcadia}' +``` + +\${MINIO_ACCESSKEY: hpU4SCmj5jixxx} + +MINIO_ACCESSKEY is the environment variable name. + +hpU4SCmj5jixxx is the default value if the environment variable is not set. + + +## release phase +The example config.yml is as the following: +```yaml +minio: + access_key: 'hpU4SCmj5jixxx' + secret_key: 'minio_sk' + api_url: '172.22.96.136.nip.io' + secure: 'True' + dataset_prefix: 'dataset' + +llm: + qa_retry_count: '100' + +knowledge: + chunk_size: '500' + chunk_overlap: '50' + +backendPg: + host: 'localhost' + port: '5432' + user: 'admin' + password: '123456' + database: 'arcadia' + +kubernetes: + default_config: 'arcadia-config' + pod_namespace: 'arcadia' +``` +In the K8s, you can use the config map to point to the /arcadia_app/data_manipulation/config.yml file. diff --git a/data-processing/data_manipulation/common/config.py b/data-processing/data_manipulation/common/config.py index 4951efda0..58024538d 100644 --- a/data-processing/data_manipulation/common/config.py +++ b/data-processing/data_manipulation/common/config.py @@ -15,10 +15,16 @@ import logging import os +from pathlib import Path import traceback - import yaml + from utils.class_utils import Singleton +from kube import ( + minio_cr, + model_cr, + postgresql_cr +) from . import log_tag_const @@ -28,245 +34,76 @@ class Config(metaclass=Singleton): """Configuration class to store the env values.""" - def __init__(self, yaml_file_path='config.yml'): + def __init__(self): logger.debug(f"{log_tag_const.CONFIG} start to load config file.") - yaml_data = self.__get_default_yaml_data() - try: - with open(yaml_file_path, 'r') as file: - # load yaml data - yaml_data = yaml.safe_load(file) - except Exception as ex: - logger.error(''.join([ - f"{log_tag_const.CONFIG} There is an error when load the config " - f"(file path = {yaml_file_path}). \n" - f"{traceback.format_exc()}" - ])) - logger.debug(''.join([ - f"{log_tag_const.CONFIG} The content is config.\n", - f"{yaml_data}\n", - ])) - - self.__set_property_value(yaml_data) - - + self.__set_property_value() - def __get_default_yaml_data(self): - """Get the default yaml data.""" - return { - 'minio': {}, - 'zhipuai': {}, - 'llm': { - 'open_ai': {} - }, - 'knowledge': {}, - 'backendPg': {} - } - def __set_property_value(self, yaml_data): + def __set_property_value(self): """设置属性的值""" - # minio access key - self.minio_access_key = self.__get_value_by_key_in_yaml( - yaml_data, - parent_key='minio', - key='access_key', - env_name='MINIO_ACCESSKEY', - default_value='hpU4SCmj5jiU7IP5' + # kubernetes + # namespace + k8s_pod_namespace = os.getenv('POD_NAMESPACE', 'arcadia') + self.k8s_pod_namespace = k8s_pod_namespace + # config + k8s_default_config = os.getenv('DEFAULT_CONFIG', 'arcadia-config') + self.k8s_default_config = k8s_default_config + + + minio_config = minio_cr.get_minio_config_in_k8s_configmap( + namespace=k8s_pod_namespace, + config_map_name=k8s_default_config ) + if minio_config is None: + minio_config = {} + + # minio access key + self.minio_access_key = minio_config.get('minio_access_key') # minio secret key - self.minio_secret_key = self.__get_value_by_key_in_yaml( - yaml_data, - parent_key='minio', - key='secret_key', - env_name='MINIO_SECRETKEY', - default_value='7AUewBESqvKijdnNskm8nU6emTZ3rG8F' - ) + self.minio_secret_key = minio_config.get('minio_secret_key') # minio api url - self.minio_api_url = self.__get_value_by_key_in_yaml( - yaml_data, - parent_key='minio', - key='api_url', - env_name='MINIO_API_URL', - default_value='kubeagi-minio.172.22.96.136.nip.io' - ) + self.minio_api_url = minio_config.get('minio_api_url') # minio secure # if use HTTP, secure = False; # if use HTTPS, secure = True; - self.minio_secure = self.__get_value_by_key_in_yaml( - yaml_data, - parent_key='minio', - key='secure', - env_name='MINIO_SECURE', - default_value=True - ) + self.minio_secure = minio_config.get('minio_secure') # minio data set prefix - self.minio_dataset_prefix = self.__get_value_by_key_in_yaml( - yaml_data, - parent_key='minio', - key='dataset_prefix', - env_name='MINIO_DATASET_PREFIX', - default_value='dataset' - ) + self.minio_dataset_prefix = 'dataset' - # zhi pu ai - # api key - self.zhipuai_api_key = self.__get_value_by_key_in_yaml( - yaml_data, - parent_key='zhipuai', - key='api_key', - env_name='ZHIPUAI_API_KEY', - default_value='871772ac03fcb9db9d4ce7b1e6eea210.VZZVy0mCox0WrzQI' - ) - - # llm - # use type such as zhipuai_online or open_ai - self.llm_use_type = self.__get_value_by_key_in_yaml( - yaml_data, - parent_key='llm', - key='use_type', - env_name='LLM_USE_TYPE', - default_value='zhipuai_online' - ) - - self.llm_qa_retry_count = self.__get_value_by_key_in_yaml( - yaml_data, - parent_key='llm', - key='qa_retry_count', - env_name='LLM_QA_RETRY_COUNT', - default_value=100 - ) + llm_qa_retry_count = model_cr.get_llm_qa_retry_count_in_k8s_configmap( + namespace=k8s_pod_namespace, + config_map_name=k8s_default_config + ) + + if llm_qa_retry_count is None: + llm_qa_retry_count = 5 - # open ai - # key - self.open_ai_default_key = self.__get_value_by_key_in_yaml( - yaml_data, - parent_key='open_ai', - key='key', - env_name='OPEN_AI_DEFAULT_KEY', - default_value='happy' - ) - # base url - self.open_ai_default_base_url = self.__get_value_by_key_in_yaml( - yaml_data, - parent_key='open_ai', - key='base_url', - env_name='OPEN_AI_DEFAULT_BASE_URL', - default_value='http://arcadia-fastchat.172.22.96.167.nip.io/v1' - ) - # default model - self.open_ai_default_model = self.__get_value_by_key_in_yaml( - yaml_data, - parent_key='open_ai', - key='model', - env_name='OPEN_AI_DEFAULT_MODEL', - default_value='baichuan2-7b-worker-baichuan-sample-playground' - ) + self.llm_qa_retry_count = int(llm_qa_retry_count) # knowledge # chunk size - self.knowledge_chunk_size = self.__get_value_by_key_in_yaml( - yaml_data, - parent_key='knowledge', - key='chunk_size', - env_name='KNOWLEDGE_CHUNK_SIZE', - default_value=500 - ) + self.knowledge_chunk_size = 500 # chunk overlap - self.knowledge_chunk_overlap = self.__get_value_by_key_in_yaml( - yaml_data, - parent_key='knowledge', - key='chunk_overlap', - env_name='KNOWLEDGE_CHUNK_OVERLAP', - default_value=50 - ) - + self.knowledge_chunk_overlap = 50 + # backend PostgreSQL + postgresql_config = postgresql_cr.get_postgresql_config_in_k8s_configmap( + namespace=k8s_pod_namespace, + config_map_name=k8s_default_config + ) + if postgresql_config is None: + postgresql_config = {} + # host - self.pg_host = self.__get_value_by_key_in_yaml( - yaml_data, - parent_key='backendPg', - key='host', - env_name='PG_HOST', - default_value='localhost' - ) + self.pg_host = postgresql_config.get('host') # port - self.pg_port = self.__get_value_by_key_in_yaml( - yaml_data, - parent_key='backendPg', - key='port', - env_name='PG_HOST', - default_value=5432 - ) + self.pg_port = postgresql_config.get('port') # user - self.pg_user = self.__get_value_by_key_in_yaml( - yaml_data, - parent_key='backendPg', - key='user', - env_name='PG_USER', - default_value='postgres' - ) - # password - self.pg_password = self.__get_value_by_key_in_yaml( - yaml_data, - parent_key='backendPg', - key='password', - env_name='PG_PASSWORD', - default_value='123456' - ) + self.pg_user = postgresql_config.get('user') + # password + self.pg_password = postgresql_config.get('password') # database name - self.pg_database = self.__get_value_by_key_in_yaml( - yaml_data, - parent_key='backendPg', - key='database', - env_name='PG_DATABASE', - default_value='arcadia' - ) - - - def __get_value_by_key_in_yaml( - self, - config_json, - parent_key, - key, - env_name, - default_value - ): - """Get the value by key int the yaml file. - - Parameters - ---------- - config_json - the config json - parent_key - the parent key. - env_name - the environment variable name. - default_value: - the default value. - """ - value = config_json[parent_key].get(key) - if value is None: - value = os.getenv(env_name, default_value) - else: - if value.startswith('${'): - values_in_yaml = value.split(': ') - if len(values_in_yaml) == 2: - env_name_in_yaml = values_in_yaml[0].strip()[2:] - default_value_in_yaml = values_in_yaml[1].strip()[:-1] - - value = os.getenv(env_name_in_yaml, default_value_in_yaml) - - return value + self.pg_database = postgresql_config.get('database') - config = Config() - - - - - - - - - - diff --git a/data-processing/data_manipulation/common/log_tag_const.py b/data-processing/data_manipulation/common/log_tag_const.py index 7ddb22966..ba80ad2cb 100644 --- a/data-processing/data_manipulation/common/log_tag_const.py +++ b/data-processing/data_manipulation/common/log_tag_const.py @@ -24,8 +24,10 @@ DATA_PROCESS_DETAIL = "Data Process Detail" +COMMON_HANDLE = "Common Handle" PDF_HANDLE = "PDF Handle" CSV_HANDLE = "CSV Handle" +WORD_HANDLE = "Word Handle" QA_SPLIT = "Question Answer Split" CLEAN_TRANSFORM = "Clean Transform" diff --git a/data-processing/data_manipulation/config.yml b/data-processing/data_manipulation/config.yml deleted file mode 100644 index a224b8128..000000000 --- a/data-processing/data_manipulation/config.yml +++ /dev/null @@ -1,36 +0,0 @@ -minio: - access_key: '${MINIO_ACCESSKEY: hpU4SCmj5jiU7IP5}' - secret_key: '${MINIO_SECRETKEY: 7AUewBESqvKijdnNskm8nU6emTZ3rG8F}' - api_url: '${MINIO_API_URL: kubeagi-minio.172.22.96.136.nip.io}' - secure: '${MINIO_SECURE: True}' - dataset_prefix: '${MINIO_DATASET_PREFIX: dataset}' - -zhipuai: - api_key: '${ZHIPUAI_API_KEY: 871772ac03fcb9db9d4ce7b1e6eea210.VZZVy0mCox0WrzQI}' - -llm: - use_type: '${LLM_USE_TYPE: open_ai}' # zhi_pu_online or open_ai - qa_retry_count: '${LLM_QA_RETRY_COUNT: 100}' - -open_ai: - key: '${OPEN_AI_DEFAULT_KEY: fake}' - base_url: '${OPEN_AI_DEFAULT_BASE_URL: http://arcadia-fastchat.172.22.96.167.nip.io/v1}' - model: '${OPEN_AI_DEFAULT_MODEL_NAME: 3d407a8b-90ab-43c4-9fc0-87e533368570}' - - - -knowledge: - chunk_size: '${KNOWLEDGE_CHUNK_SIZE: 500}' - chunk_overlap: '${KNOWLEDGE_CHUNK_OVERLAP: 50}' - -backendPg: - host: '${PG_HOST: localhost}' - port: '${PG_PORT: 5432}' - user: '${PG_USER: postgres}' - password: '${PG_PASSWORD: 123456}' - database: '${PG_DATABASE: data_process}' - - - - - diff --git a/data-processing/data_manipulation/data_store_process/minio_store_process.py b/data-processing/data_manipulation/data_store_process/minio_store_process.py index 1cee6ca5f..ac00e3942 100644 --- a/data-processing/data_manipulation/data_store_process/minio_store_process.py +++ b/data-processing/data_manipulation/data_store_process/minio_store_process.py @@ -22,7 +22,7 @@ from common.config import config from data_store_clients import minio_store_client from database_operate import data_process_db_operate -from file_handle import csv_handle, pdf_handle +from file_handle import csv_handle, pdf_handle, word_handle from kube import dataset_cr from utils import file_utils @@ -65,10 +65,13 @@ def text_manipulate( ) # 文件处理 + task_status = 'process_complete' # 存放每个文件对应的数据量 data_volumes_file = [] for item in file_names: + result = [] + file_name = item['name'] file_extension = file_name.split('.')[-1].lower() if file_extension in ['csv']: @@ -89,6 +92,23 @@ def text_manipulate( task_id=id, create_user=req_json['creator'] ) + + elif file_extension in ['docx']: + # 处理.docx文件 + result = word_handle.docx_text_manipulate( + chunk_size=req_json.get('chunk_size'), + chunk_overlap=req_json.get('chunk_overlap'), + file_name=file_name, + support_type=support_type, + conn_pool=pool, + task_id=id, + create_user=req_json['creator'] + ) + + if result.get('status') != 200: + # 任务失败 + task_status = 'process_fail' + break data_volumes_file.append(result['data']) @@ -113,8 +133,8 @@ def text_manipulate( # 数据库更新任务状态 update_params = { 'id': id, - 'status': 'process_complete', - 'create_user': req_json['creator'] + 'status': task_status, + 'user': req_json['creator'] } data_process_db_operate.update_status_by_id( update_params, @@ -125,7 +145,7 @@ def text_manipulate( dataset_cr.update_dataset_k8s_cr( bucket_name=req_json['bucket_name'], version_data_set_name=req_json['version_data_set_name'], - reason='process_complete' + reason=task_status ) return { diff --git a/data-processing/data_manipulation/database_operate/data_process_db_operate.py b/data-processing/data_manipulation/database_operate/data_process_db_operate.py index 0ebee14ae..bc5de39d7 100644 --- a/data-processing/data_manipulation/database_operate/data_process_db_operate.py +++ b/data-processing/data_manipulation/database_operate/data_process_db_operate.py @@ -183,7 +183,7 @@ def update_status_by_id( ): """Update the status with id""" now = date_time_utils.now_str() - user = req_json['create_user'] + user = req_json['user'] program = '修改任务状态' params = { diff --git a/data-processing/data_manipulation/file_handle/common_handle.py b/data-processing/data_manipulation/file_handle/common_handle.py new file mode 100644 index 000000000..14697f00b --- /dev/null +++ b/data-processing/data_manipulation/file_handle/common_handle.py @@ -0,0 +1,666 @@ +# Copyright 2023 KubeAGI. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import logging +import os +import traceback +import base64 + +import pandas as pd +import ulid +from common import log_tag_const +from common.config import config +from database_operate import data_process_detail_db_operate +from langchain.text_splitter import SpacyTextSplitter +from llm_api_service.qa_provider_open_ai import QAProviderOpenAI +from llm_api_service.qa_provider_zhi_pu_ai_online import QAProviderZhiPuAIOnline +from transform.text import clean_transform, privacy_transform +from utils import csv_utils, file_utils, docx_utils +from kube import model_cr + +logger = logging.getLogger(__name__) + + +def text_manipulate( + file_name, + content, + support_type, + conn_pool, + task_id, + create_user, + chunk_size, + chunk_overlap +): + """Manipulate the text content. + + file_name: file name; + support_type: support type; + conn_pool: database connection pool; + task_id: data process task id; + chunk_size: chunk size; + chunk_overlap: chunk overlap; + """ + + logger.debug(f"{log_tag_const.COMMON_HANDLE} Start to manipulate the text") + + try: + support_type_map = _convert_support_type_to_map(support_type) + + # Clean the data such as removing invisible characters. + clean_result = _data_clean( + support_type_map=support_type_map, + file_name=file_name, + data=content, + conn_pool=conn_pool, + task_id=task_id, + create_user=create_user + ) + + if clean_result['status'] == 200: + content = clean_result['data'] + + # Remove the privacy info such as removing email. + clean_result = _remove_privacy_info( + support_type_map=support_type_map, + file_name=file_name, + data=content, + conn_pool=conn_pool, + task_id=task_id, + create_user=create_user + ) + + if clean_result['status'] == 200: + content = clean_result['data'] + + + # 数据量 + object_count = 0 + object_name = '' + if support_type_map.get('qa_split'): + logger.debug(f"{log_tag_const.QA_SPLIT} Start to split QA.") + qa_list_dict = support_type_map.get('qa_split') + llm_config = qa_list_dict.get('llm_config') + + qa_response = _generate_qa_list( + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + data=content, + name=llm_config.get('name'), + namespace=llm_config.get('namespace'), + model=llm_config.get('model') + ) + + if qa_response.get('status') != 200: + return qa_response + + logger.debug(f"{log_tag_const.QA_SPLIT} The QA data is: \n{qa_response}\n") + + # start to insert qa data + qa_data = qa_response.get('data') + for i in range(len(qa_data)): + if i == 0: + continue + qa_insert_item = { + 'id': ulid.ulid(), + 'task_id': task_id, + 'file_name': file_name, + 'question': qa_data[i][0], + 'answer': qa_data[i][1], + 'create_user': create_user + } + + data_process_detail_db_operate.insert_question_answer_info( + qa_insert_item, + pool=conn_pool + ) + + # Save the csv file. + file_name_without_extension = file_name.rsplit('.', 1)[0] + '_final' + csv_utils.save_csv( + file_name=file_name_without_extension + '.csv', + phase_value='final', + data=qa_data + ) + + object_name = file_name_without_extension + '.csv' + # 减 1 是为了去除表头 + object_count = len(qa_data) - 1 + + logger.debug(f"{log_tag_const.QA_SPLIT} Finish splitting QA.") + + logger.debug(f"{log_tag_const.COMMON_HANDLE} Finish manipulating the text") + return { + 'status': 200, + 'message': '', + 'data': { + 'object_name': object_name, + 'object_count': object_count + } + } + except Exception as ex: + logger.error(''.join([ + f"{log_tag_const.COMMON_HANDLE} There is an error when manipulate ", + f"the text in common handler. \n{traceback.format_exc()}" + ])) + logger.debug(f"{log_tag_const.COMMON_HANDLE} Finish manipulating the text") + return { + 'status': 400, + 'message': str(ex), + 'data': traceback.format_exc() + } + +def _data_clean( + support_type_map, + data, + task_id, + file_name, + create_user, + conn_pool +): + """Clean the data. + + support_type_map: example + { + "qa_split": { + "type": "qa_split", + "name": "xx", + "namespace": "xx" + }, + "remove_invisible_characters": { + "type": "remove_invisible_characters" + }, + "space_standardization": { + "type": "space_standardization" + }, + "remove_email": { + "type": "remove_email" + } + } + data: data; + file_name: file name; + conn_pool: database connection pool; + task_id: data process task id; + """ + # remove invisible characters + if support_type_map.get('remove_invisible_characters'): + result = clean_transform.remove_invisible_characters( + text=data + ) + if result['status'] == 200: + clean_data = result['data']['clean_data'] + if len(clean_data) > 0: + for item in clean_data: + task_detail_item = { + 'id': ulid.ulid(), + 'task_id': task_id, + 'file_name': file_name, + 'transform_type': 'remove_invisible_characters', + 'pre_content': item['pre_content'], + 'post_content': item['post_content'], + 'create_user': create_user + } + data_process_detail_db_operate.insert_transform_info( + task_detail_item, + pool=conn_pool + ) + data = result['data']['text'] + + + # process for space standardization + if support_type_map.get('space_standardization'): + result = clean_transform.space_standardization( + text=data + ) + if result['status'] == 200: + clean_data = result['data']['clean_data'] + if len(clean_data) > 0: + for item in clean_data: + task_detail_item = { + 'id': ulid.ulid(), + 'task_id': task_id, + 'file_name': file_name, + 'transform_type': 'space_standardization', + 'pre_content': item['pre_content'], + 'post_content': item['post_content'], + 'create_user': create_user + } + data_process_detail_db_operate.insert_transform_info( + task_detail_item, + pool=conn_pool + ) + data = result['data']['text'] + + + # process for remove garbled text + if support_type_map.get('remove_garbled_text'): + result = clean_transform.remove_garbled_text( + text=data + ) + if result['status'] == 200: + if result['data']['found'] > 0: + task_detail_item = { + 'id': ulid.ulid(), + 'task_id': task_id, + 'file_name': file_name, + 'transform_type': 'remove_garbled_text', + 'pre_content': data, + 'post_content': result['data']['text'], + 'create_user': create_user + } + data_process_detail_db_operate.insert_transform_info( + task_detail_item, + pool=conn_pool + ) + data = result['data']['text'] + + + # process for Traditional Chinese to Simplified Chinese + if support_type_map.get('traditional_to_simplified'): + result = clean_transform.traditional_to_simplified( + text=data + ) + if result['status'] == 200: + if result['data']['found'] > 0: + task_detail_item = { + 'id': ulid.ulid(), + 'task_id': task_id, + 'file_name': file_name, + 'transform_type': 'traditional_to_simplified', + 'pre_content': data, + 'post_content': result['data']['text'], + 'create_user': create_user + } + data_process_detail_db_operate.insert_transform_info( + task_detail_item, + pool=conn_pool + ) + data = result['data']['text'] + + + # process for clean html code in text samples + if support_type_map.get('remove_html_tag'): + result = clean_transform.remove_html_tag( + text=data + ) + if result['status'] == 200: + if result['data']['found'] > 0: + task_detail_item = { + 'id': ulid.ulid(), + 'task_id': task_id, + 'file_name': file_name, + 'transform_type': 'remove_html_tag', + 'pre_content': data, + 'post_content': result['data']['text'], + 'create_user': create_user + } + data_process_detail_db_operate.insert_transform_info( + task_detail_item, + pool=conn_pool + ) + data = result['data']['text'] + + + # process for remove emojis + if support_type_map.get('remove_emojis'): + result = clean_transform.remove_emojis( + text=data + ) + if result['status'] == 200: + clean_data = result['data']['clean_data'] + if len(clean_data) > 0: + for item in clean_data: + task_detail_item = { + 'id': ulid.ulid(), + 'task_id': task_id, + 'file_name': file_name, + 'transform_type': 'remove_emojis', + 'pre_content': item['pre_content'], + 'post_content': item['post_content'], + 'create_user': create_user + } + data_process_detail_db_operate.insert_transform_info( + task_detail_item, + pool=conn_pool + ) + data = result['data']['text'] + + return { + 'status': 200, + 'message': '', + 'data': data + } + + +def _remove_privacy_info( + support_type_map, + data, + task_id, + file_name, + create_user, + conn_pool +): + """"Remove the privacy info such as removing email. + + support_type_map: example + { + "qa_split": { + "type": "qa_split", + "name": "xx", + "namespace": "xx" + }, + "remove_invisible_characters": { + "type": "remove_invisible_characters" + }, + "space_standardization": { + "type": "space_standardization" + }, + "remove_email": { + "type": "remove_email" + } + } + data: data; + file_name: file name; + conn_pool: database connection pool; + task_id: data process task id; + """ + # remove email + if support_type_map.get('remove_email'): + result = privacy_transform.remove_email( + text=data + ) + if result['status'] == 200: + clean_data = result['data']['clean_data'] + if len(clean_data) > 0: + for item in clean_data: + task_detail_item = { + 'id': ulid.ulid(), + 'task_id': task_id, + 'file_name': file_name, + 'transform_type': 'remove_email', + 'pre_content': item['pre_content'], + 'post_content': item['post_content'], + 'create_user': create_user + } + data_process_detail_db_operate.insert_transform_info( + task_detail_item, + pool=conn_pool + ) + data = result['data']['text'] + + + # remove ip addresses + if support_type_map.get('remove_ip_address'): + result = privacy_transform.remove_ip_address( + text=data + ) + if result['status'] == 200: + clean_data = result['data']['clean_data'] + if len(clean_data) > 0: + for item in clean_data: + task_detail_item = { + 'id': ulid.ulid(), + 'task_id': task_id, + 'file_name': file_name, + 'transform_type': 'remove_ip_address', + 'pre_content': item['pre_content'], + 'post_content': item['post_content'], + 'create_user': create_user + } + data_process_detail_db_operate.insert_transform_info( + task_detail_item, + pool=conn_pool + ) + data = result['data']['text'] + + # remove number + if support_type_map.get('remove_number'): + # remove phone + result = privacy_transform.remove_phone( + text=data + ) + if result['status'] == 200: + clean_data = result['data']['clean_data'] + if len(clean_data) > 0: + for item in clean_data: + task_detail_item = { + 'id': ulid.ulid(), + 'task_id': task_id, + 'file_name': file_name, + 'transform_type': 'remove_number', + 'pre_content': item['pre_content'], + 'post_content': item['post_content'], + 'create_user': create_user + } + data_process_detail_db_operate.insert_transform_info( + task_detail_item, + pool=conn_pool + ) + data = result['data']['text'] + + # remove id card + result = privacy_transform.remove_id_card( + text=data + ) + if result['status'] == 200: + clean_data = result['data']['clean_data'] + if len(clean_data) > 0: + for item in clean_data: + task_detail_item = { + 'id': ulid.ulid(), + 'task_id': task_id, + 'file_name': file_name, + 'transform_type': 'remove_number', + 'pre_content': item['pre_content'], + 'post_content': item['post_content'], + 'create_user': create_user + } + data_process_detail_db_operate.insert_transform_info( + task_detail_item, + pool=conn_pool + ) + data = result['data']['text'] + + # remove weixin + result = privacy_transform.remove_weixin( + text=data + ) + if result['status'] == 200: + clean_data = result['data']['clean_data'] + if len(clean_data) > 0: + for item in clean_data: + task_detail_item = { + 'id': ulid.ulid(), + 'task_id': task_id, + 'file_name': file_name, + 'transform_type': 'remove_number', + 'pre_content': item['pre_content'], + 'post_content': item['post_content'], + 'create_user': create_user + } + data_process_detail_db_operate.insert_transform_info( + task_detail_item, + pool=conn_pool + ) + data = result['data']['text'] + + # remove bank card + result = privacy_transform.remove_bank_card( + text=data + ) + if result['status'] == 200: + clean_data = result['data']['clean_data'] + if len(clean_data) > 0: + for item in clean_data: + task_detail_item = { + 'id': ulid.ulid(), + 'task_id': task_id, + 'file_name': file_name, + 'transform_type': 'remove_number', + 'pre_content': item['pre_content'], + 'post_content': item['post_content'], + 'create_user': create_user + } + data_process_detail_db_operate.insert_transform_info( + task_detail_item, + pool=conn_pool + ) + data = result['data']['text'] + + return { + 'status': 200, + 'message': '', + 'data': data + } + + + +def _generate_qa_list( + chunk_size, + chunk_overlap, + data, + name, + namespace, + model +): + """Generate the Question and Answer list. + + chunk_size: chunck size; + chunk_overlap: chunk overlap; + data: the text used to generate QA; + name: llms cr name; + namespace: llms cr namespace; + model: model id or model version; + """ + # Split the text. + if chunk_size is None: + chunk_size = config.knowledge_chunk_size + + if chunk_overlap is None: + chunk_overlap = config.knowledge_chunk_overlap + + text_splitter = SpacyTextSplitter( + separator="\n\n", + pipeline="zh_core_web_sm", + chunk_size=int(chunk_size), + chunk_overlap=int(chunk_overlap) + ) + texts = text_splitter.split_text(data) + + logger.debug(''.join([ + f"original text is: \n{data}\n", + f"splitted text is: \n{texts}\n" + ])) + + # llms cr 中模型相关信息 + llm_spec_info = model_cr.get_spec_for_llms_k8s_cr( + name=name, + namespace=namespace + ) + + # Generate the QA list. + qa_list = [['q', 'a']] + if llm_spec_info.get('data').get('provider').get('worker'): + # get base url for configmap + base_url = model_cr.get_worker_base_url_k8s_configmap( + name=config.k8s_default_config, + namespace=config.k8s_pod_namespace + ) + logger.debug(''.join([ + f"Generate the QA list \n", + f"name: {name}\n", + f"namespace: {namespace}\n", + f"model: {model}\n", + f"base_url: {base_url}\n" + ])) + + # generate QA list + qa_provider = QAProviderOpenAI( + api_key='fake', + base_url=base_url, + model=model + ) + for item in texts: + text = item.replace("\n", "") + data = qa_provider.generate_qa_list(text) + + if data.get('status') != 200: + return data + + qa_list.extend(data.get('data')) + else: + endpoint = llm_spec_info.get('data').get('provider').get('endpoint') + base_url = endpoint.get('url') + secret_name = endpoint.get('authSecret').get('name') + + # get api key for secret + secret_info = model_cr.get_secret_info( + name=secret_name, + namespace=namespace + ) + api_key = secret_info.get('apiKey') + + llm_type = llm_spec_info.get('data').get('type') + if llm_type == 'zhipuai': + + zhipuai_api_key = base64.b64decode(api_key).decode('utf-8') + qa_provider = QAProviderZhiPuAIOnline(api_key=zhipuai_api_key) + + # generate QA list + for item in texts: + text = item.replace("\n", "") + data = qa_provider.generate_qa_list(text) + if data.get('status') != 200: + return data + + qa_list.extend(data.get('data')) + else: + return { + 'status': 1000, + 'message': '暂时不支持该类型的模型', + 'data': '' + } + + return { + 'status': 200, + 'message': '', + 'data': qa_list + } + + +def _convert_support_type_to_map(supprt_type): + """Convert support type to map. + + support_type: support type list + example + [ + { + "type": "qa_split" + }, + { + "type": "remove_invisible_characters" + }, + { + "type": "space_standardization" + }, + { + "type": "remove_email" + } + ] + """ + result = {} + for item in supprt_type: + result[item['type']] = item + + return result \ No newline at end of file diff --git a/data-processing/data_manipulation/file_handle/pdf_handle.py b/data-processing/data_manipulation/file_handle/pdf_handle.py index 423d248b6..ba19959d9 100644 --- a/data-processing/data_manipulation/file_handle/pdf_handle.py +++ b/data-processing/data_manipulation/file_handle/pdf_handle.py @@ -14,20 +14,11 @@ import logging -import os import traceback -import pandas as pd -import ulid from common import log_tag_const -from common.config import config -from database_operate import data_process_detail_db_operate -from langchain.text_splitter import SpacyTextSplitter -from llm_api_service.qa_provider_open_ai import QAProviderOpenAI -from llm_api_service.qa_provider_zhi_pu_ai_online import \ - QAProviderZhiPuAIOnline -from transform.text import clean_transform, privacy_transform -from utils import csv_utils, file_utils, pdf_utils +from file_handle import common_handle +from utils import file_utils, pdf_utils logger = logging.getLogger(__name__) @@ -62,93 +53,18 @@ def text_manipulate( content = pdf_utils.get_content(file_path) logger.debug(f"{log_tag_const.PDF_HANDLE} The pdf content is\n {content}") - support_type_map = _convert_support_type_to_map(support_type) - - # step 2 - # Clean the data such as removing invisible characters. - clean_result = _data_clean( - support_type_map=support_type_map, - file_name=file_name, - data=content, - conn_pool=conn_pool, - task_id=task_id, - create_user=create_user - ) - - if clean_result['status'] == 200: - content = clean_result['data'] - - # step 3 - # Remove the privacy info such as removing email. - clean_result = _remove_privacy_info( - support_type_map=support_type_map, + response = common_handle.text_manipulate( file_name=file_name, - data=content, + content=content, + support_type=support_type, conn_pool=conn_pool, task_id=task_id, - create_user=create_user + create_user=create_user, + chunk_size=chunk_size, + chunk_overlap=chunk_overlap ) - if clean_result['status'] == 200: - content = clean_result['data'] - - - - # 数据量 - object_count = 0 - object_name = '' - if support_type_map.get('qa_split'): - logger.debug(f"{log_tag_const.QA_SPLIT} Start to split QA.") - - qa_data = _generate_qa_list( - chunk_size=chunk_size, - chunk_overlap=chunk_overlap, - data=content - ) - - logger.debug(f"{log_tag_const.QA_SPLIT} The QA data is: \n{qa_data}\n") - - # start to insert qa data - for i in range(len(qa_data)): - if i == 0: - continue - qa_insert_item = { - 'id': ulid.ulid(), - 'task_id': task_id, - 'file_name': file_name, - 'question': qa_data[i][0], - 'answer': qa_data[i][1], - 'create_user': create_user - } - - data_process_detail_db_operate.insert_question_answer_info( - qa_insert_item, - pool=conn_pool - ) - - # Save the csv file. - file_name_without_extension = file_name.rsplit('.', 1)[0] + '_final' - csv_utils.save_csv( - file_name=file_name_without_extension + '.csv', - phase_value='final', - data=qa_data - ) - - object_name = file_name_without_extension + '.csv' - # 减 1 是为了去除表头 - object_count = len(qa_data) - 1 - - logger.debug(f"{log_tag_const.QA_SPLIT} Finish splitting QA.") - - logger.debug(f"{log_tag_const.PDF_HANDLE} Finish manipulating the text in pdf") - return { - 'status': 200, - 'message': '', - 'data': { - 'object_name': object_name, - 'object_count': object_count - } - } + return response except Exception as ex: logger.error(''.join([ f"{log_tag_const.PDF_HANDLE} There is an error when manipulate ", @@ -160,426 +76,3 @@ def text_manipulate( 'message': str(ex), 'data': traceback.format_exc() } - -def _data_clean( - support_type_map, - data, - task_id, - file_name, - create_user, - conn_pool -): - """Clean the data. - - support_type_map: example - { - "qa_split": 1, - "remove_invisible_characters": 1, - "space_standardization": 1, - "remove_email": 1 - } - data: data; - file_name: file name; - conn_pool: database connection pool; - task_id: data process task id; - """ - # remove invisible characters - if support_type_map.get('remove_invisible_characters'): - result = clean_transform.remove_invisible_characters( - text=data - ) - if result['status'] == 200: - clean_data = result['data']['clean_data'] - if len(clean_data) > 0: - for item in clean_data: - task_detail_item = { - 'id': ulid.ulid(), - 'task_id': task_id, - 'file_name': file_name, - 'transform_type': 'remove_invisible_characters', - 'pre_content': item['pre_content'], - 'post_content': item['post_content'], - 'create_user': create_user - } - data_process_detail_db_operate.insert_transform_info( - task_detail_item, - pool=conn_pool - ) - data = result['data']['text'] - - - # process for space standardization - if support_type_map.get('space_standardization'): - result = clean_transform.space_standardization( - text=data - ) - if result['status'] == 200: - clean_data = result['data']['clean_data'] - if len(clean_data) > 0: - for item in clean_data: - task_detail_item = { - 'id': ulid.ulid(), - 'task_id': task_id, - 'file_name': file_name, - 'transform_type': 'space_standardization', - 'pre_content': item['pre_content'], - 'post_content': item['post_content'], - 'create_user': create_user - } - data_process_detail_db_operate.insert_transform_info( - task_detail_item, - pool=conn_pool - ) - data = result['data']['text'] - - - # process for remove garbled text - if support_type_map.get('remove_garbled_text'): - result = clean_transform.remove_garbled_text( - text=data - ) - if result['status'] == 200: - if result['data']['found'] > 0: - task_detail_item = { - 'id': ulid.ulid(), - 'task_id': task_id, - 'file_name': file_name, - 'transform_type': 'remove_garbled_text', - 'pre_content': data, - 'post_content': result['data']['text'], - 'create_user': create_user - } - data_process_detail_db_operate.insert_transform_info( - task_detail_item, - pool=conn_pool - ) - data = result['data']['text'] - - - # process for Traditional Chinese to Simplified Chinese - if support_type_map.get('traditional_to_simplified'): - result = clean_transform.traditional_to_simplified( - text=data - ) - if result['status'] == 200: - if result['data']['found'] > 0: - task_detail_item = { - 'id': ulid.ulid(), - 'task_id': task_id, - 'file_name': file_name, - 'transform_type': 'traditional_to_simplified', - 'pre_content': data, - 'post_content': result['data']['text'], - 'create_user': create_user - } - data_process_detail_db_operate.insert_transform_info( - task_detail_item, - pool=conn_pool - ) - data = result['data']['text'] - - - # process for clean html code in text samples - if support_type_map.get('remove_html_tag'): - result = clean_transform.remove_html_tag( - text=data - ) - if result['status'] == 200: - if result['data']['found'] > 0: - task_detail_item = { - 'id': ulid.ulid(), - 'task_id': task_id, - 'file_name': file_name, - 'transform_type': 'remove_html_tag', - 'pre_content': data, - 'post_content': result['data']['text'], - 'create_user': create_user - } - data_process_detail_db_operate.insert_transform_info( - task_detail_item, - pool=conn_pool - ) - data = result['data']['text'] - - - # process for remove emojis - if support_type_map.get('remove_emojis'): - result = clean_transform.remove_emojis( - text=data - ) - if result['status'] == 200: - clean_data = result['data']['clean_data'] - if len(clean_data) > 0: - for item in clean_data: - task_detail_item = { - 'id': ulid.ulid(), - 'task_id': task_id, - 'file_name': file_name, - 'transform_type': 'remove_emojis', - 'pre_content': item['pre_content'], - 'post_content': item['post_content'], - 'create_user': create_user - } - data_process_detail_db_operate.insert_transform_info( - task_detail_item, - pool=conn_pool - ) - data = result['data']['text'] - - return { - 'status': 200, - 'message': '', - 'data': data - } - - -def _remove_privacy_info( - support_type_map, - data, - task_id, - file_name, - create_user, - conn_pool -): - """"Remove the privacy info such as removing email. - - support_type_map: example - { - "qa_split": 1, - "remove_invisible_characters": 1, - "space_standardization": 1, - "remove_email": 1 - } - data: data; - file_name: file name; - conn_pool: database connection pool; - task_id: data process task id; - """ - # remove email - if support_type_map.get('remove_email'): - result = privacy_transform.remove_email( - text=data - ) - if result['status'] == 200: - clean_data = result['data']['clean_data'] - if len(clean_data) > 0: - for item in clean_data: - task_detail_item = { - 'id': ulid.ulid(), - 'task_id': task_id, - 'file_name': file_name, - 'transform_type': 'remove_email', - 'pre_content': item['pre_content'], - 'post_content': item['post_content'], - 'create_user': create_user - } - data_process_detail_db_operate.insert_transform_info( - task_detail_item, - pool=conn_pool - ) - data = result['data']['text'] - - - # remove ip addresses - if support_type_map.get('remove_ip_address'): - result = privacy_transform.remove_ip_address( - text=data - ) - if result['status'] == 200: - clean_data = result['data']['clean_data'] - if len(clean_data) > 0: - for item in clean_data: - task_detail_item = { - 'id': ulid.ulid(), - 'task_id': task_id, - 'file_name': file_name, - 'transform_type': 'remove_ip_address', - 'pre_content': item['pre_content'], - 'post_content': item['post_content'], - 'create_user': create_user - } - data_process_detail_db_operate.insert_transform_info( - task_detail_item, - pool=conn_pool - ) - data = result['data']['text'] - - # remove number - if support_type_map.get('remove_number'): - # remove phone - result = privacy_transform.remove_phone( - text=data - ) - if result['status'] == 200: - clean_data = result['data']['clean_data'] - if len(clean_data) > 0: - for item in clean_data: - task_detail_item = { - 'id': ulid.ulid(), - 'task_id': task_id, - 'file_name': file_name, - 'transform_type': 'remove_number', - 'pre_content': item['pre_content'], - 'post_content': item['post_content'], - 'create_user': create_user - } - data_process_detail_db_operate.insert_transform_info( - task_detail_item, - pool=conn_pool - ) - data = result['data']['text'] - - # remove id card - result = privacy_transform.remove_id_card( - text=data - ) - if result['status'] == 200: - clean_data = result['data']['clean_data'] - if len(clean_data) > 0: - for item in clean_data: - task_detail_item = { - 'id': ulid.ulid(), - 'task_id': task_id, - 'file_name': file_name, - 'transform_type': 'remove_number', - 'pre_content': item['pre_content'], - 'post_content': item['post_content'], - 'create_user': create_user - } - data_process_detail_db_operate.insert_transform_info( - task_detail_item, - pool=conn_pool - ) - data = result['data']['text'] - - # remove weixin - result = privacy_transform.remove_weixin( - text=data - ) - if result['status'] == 200: - clean_data = result['data']['clean_data'] - if len(clean_data) > 0: - for item in clean_data: - task_detail_item = { - 'id': ulid.ulid(), - 'task_id': task_id, - 'file_name': file_name, - 'transform_type': 'remove_number', - 'pre_content': item['pre_content'], - 'post_content': item['post_content'], - 'create_user': create_user - } - data_process_detail_db_operate.insert_transform_info( - task_detail_item, - pool=conn_pool - ) - data = result['data']['text'] - - # remove bank card - result = privacy_transform.remove_bank_card( - text=data - ) - if result['status'] == 200: - clean_data = result['data']['clean_data'] - if len(clean_data) > 0: - for item in clean_data: - task_detail_item = { - 'id': ulid.ulid(), - 'task_id': task_id, - 'file_name': file_name, - 'transform_type': 'remove_number', - 'pre_content': item['pre_content'], - 'post_content': item['post_content'], - 'create_user': create_user - } - data_process_detail_db_operate.insert_transform_info( - task_detail_item, - pool=conn_pool - ) - data = result['data']['text'] - - return { - 'status': 200, - 'message': '', - 'data': data - } - - - -def _generate_qa_list( - chunk_size, - chunk_overlap, - data -): - """Generate the Question and Answer list. - - chunk_size: chunck size; - chunk_overlap: chunk overlap; - data: the text used to generate QA; - """ - # step 1 - # Split the text. - if chunk_size is None: - chunk_size = config.knowledge_chunk_size - - if chunk_overlap is None: - chunk_overlap = config.knowledge_chunk_overlap - - text_splitter = SpacyTextSplitter( - separator="\n\n", - pipeline="zh_core_web_sm", - chunk_size=int(chunk_size), - chunk_overlap=int(chunk_overlap) - ) - texts = text_splitter.split_text(data) - - logger.debug(''.join([ - f"original text is: \n{data}\n", - f"splitted text is: \n{texts}\n" - ])) - - - # step 2 - # Generate the QA list. - qa_list = [['q', 'a']] - if config.llm_use_type == 'open_ai': - qa_provider = QAProviderOpenAI() - for item in texts: - text = item.replace("\n", "") - data = qa_provider.generate_qa_list(text) - qa_list.extend(data) - elif config.llm_use_type == 'zhi_pu_online': - qa_provider = QAProviderZhiPuAIOnline() - for item in texts: - text = item.replace("\n", "") - data = qa_provider.generate_qa_list(text) - qa_list.extend(data) - - return qa_list - - -def _convert_support_type_to_map(supprt_type): - """Convert support type to map. - - support_type: support type list - example - [ - { - "type": "qa_split" - }, - { - "type": "remove_invisible_characters" - }, - { - "type": "space_standardization" - }, - { - "type": "remove_email" - } - ] - """ - result = {} - for item in supprt_type: - result[item['type']] = 1 - - return result \ No newline at end of file diff --git a/data-processing/data_manipulation/file_handle/word_handle.py b/data-processing/data_manipulation/file_handle/word_handle.py new file mode 100644 index 000000000..2e5fbdd86 --- /dev/null +++ b/data-processing/data_manipulation/file_handle/word_handle.py @@ -0,0 +1,78 @@ +# Copyright 2023 KubeAGI. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import logging +import traceback + +from common import log_tag_const +from file_handle import common_handle +from utils import csv_utils, file_utils, docx_utils + +logger = logging.getLogger(__name__) + + +def docx_text_manipulate( + file_name, + support_type, + conn_pool, + task_id, + create_user, + chunk_size, + chunk_overlap +): + """Manipulate the text content from a word file. + + file_name: file name; + support_type: support type; + conn_pool: database connection pool; + task_id: data process task id; + chunk_size: chunk size; + chunk_overlap: chunk overlap; + """ + + logger.debug(f"{log_tag_const.WORD_HANDLE} Start to manipulate the text in word") + + try: + word_file_path = file_utils.get_temp_file_path() + file_path = word_file_path + 'original/' + file_name + + # step 1 + # Get the content from the word fild. + content = docx_utils.get_content(file_path) + logger.debug(f"{log_tag_const.WORD_HANDLE} The word content is\n {content}") + + response = common_handle.text_manipulate( + file_name=file_name, + content=content, + support_type=support_type, + conn_pool=conn_pool, + task_id=task_id, + create_user=create_user, + chunk_size=chunk_size, + chunk_overlap=chunk_overlap + ) + + return response + except Exception as ex: + logger.error(''.join([ + f"{log_tag_const.WORD_HANDLE} There is an error when manipulate ", + f"the text in word handler. \n{traceback.format_exc()}" + ])) + logger.debug(f"{log_tag_const.WORD_HANDLE} Finish manipulating the text in word") + return { + 'status': 400, + 'message': str(ex), + 'data': traceback.format_exc() + } diff --git a/data-processing/data_manipulation/kube/client.py b/data-processing/data_manipulation/kube/client.py index f7503d46b..921c2ad31 100644 --- a/data-processing/data_manipulation/kube/client.py +++ b/data-processing/data_manipulation/kube/client.py @@ -19,11 +19,12 @@ from common import log_tag_const from kubernetes import client, config -from kubernetes.client import CustomObjectsApi +from kubernetes.client import CustomObjectsApi, CoreV1Api from .custom_resources import (arcadia_resource_datasets, arcadia_resource_datasources, - arcadia_resource_versioneddatasets) + arcadia_resource_versioneddatasets, + arcadia_resource_models) logger = logging.getLogger(__name__) @@ -119,3 +120,36 @@ def patch_versioneddatasets_status(self, namespace: str, name: str, status: any) name, status ) + + def get_versionedmodels_status(self, namespace: str, name: str): + return CustomObjectsApi().get_namespaced_custom_object_status( + arcadia_resource_models.get_group(), + arcadia_resource_models.get_version(), + namespace, + arcadia_resource_models.get_name(), + name + ) + + def read_namespaced_config_map(self, namespace: str, name: str): + return CoreV1Api().read_namespaced_config_map( + namespace=namespace, + name=name + ) + + def get_secret_info(self, namespace: str, name: str): + """Get the secret info.""" + data = CoreV1Api().read_namespaced_secret( + namespace=namespace, + name=name + ) + return data.data + + def get_datasource_object(self, namespace: str, name: str): + """Get the Datasource object.""" + return CustomObjectsApi().get_namespaced_custom_object( + group=arcadia_resource_models.get_group(), + version=arcadia_resource_models.get_version(), + namespace=namespace, + plural= arcadia_resource_datasources.get_name(), + name=name + ) \ No newline at end of file diff --git a/data-processing/data_manipulation/kube/custom_resources.py b/data-processing/data_manipulation/kube/custom_resources.py index f82e5d132..6a25d8a4c 100644 --- a/data-processing/data_manipulation/kube/custom_resources.py +++ b/data-processing/data_manipulation/kube/custom_resources.py @@ -40,6 +40,8 @@ def get_name(self): arcadia_resource_datasources = CustomResource(arcadia_group, "datasources") # CRD Dataset arcadia_resource_datasets = CustomResource(arcadia_group, "datasets") +# CRD LLM +arcadia_resource_models = CustomResource(arcadia_group, "llms") # CRD Versioneddataset arcadia_resource_versioneddatasets = CustomResource( arcadia_group, "versioneddatasets") diff --git a/data-processing/data_manipulation/kube/dataset_cr.py b/data-processing/data_manipulation/kube/dataset_cr.py index 01dd7bc11..b38381e41 100644 --- a/data-processing/data_manipulation/kube/dataset_cr.py +++ b/data-processing/data_manipulation/kube/dataset_cr.py @@ -70,9 +70,9 @@ def update_dataset_k8s_cr( bucket_name, version_data_set_name, { - 'status': { - 'conditions': conditions - } + 'status': { + 'conditions': conditions + } } ) @@ -87,4 +87,49 @@ def update_dataset_k8s_cr( 'status': 400, 'message': '更新数据集状态失败', 'data': '' - } \ No newline at end of file + } + +def get_dataset_status_k8s_cr( + bucket_name, + version_data_set_name +): + """ get the condition info for the dataset. + + bucket_name: bucket name; + version_data_set_name: version dataset name; + """ + try: + dataset_status = None + kube = client.KubeEnv() + + one_cr_datasets = kube.get_versioneddatasets_status( + bucket_name, + version_data_set_name + ) + + conditions = one_cr_datasets['status']['conditions'] + + found_index = None + for i in range(len(conditions)): + item = conditions[i] + if item['type'] == 'DataProcessing': + found_index = i + break + + + result = None + if found_index: + dataset_status = conditions[found_index].get('reason') + + return { + 'status': 200, + 'message': '获取数据集状态成功', + 'data': dataset_status + } + except Exception as ex: + logger.error(str(ex)) + return { + 'status': 400, + 'message': '获取数据集状态失败', + 'data': '' + } diff --git a/data-processing/data_manipulation/kube/minio_cr.py b/data-processing/data_manipulation/kube/minio_cr.py new file mode 100644 index 000000000..f6d8ef7be --- /dev/null +++ b/data-processing/data_manipulation/kube/minio_cr.py @@ -0,0 +1,78 @@ +# Copyright 2023 KubeAGI. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import base64 +import logging +import traceback +import yaml + +from . import client + +logger = logging.getLogger(__name__) + + +def get_minio_config_in_k8s_configmap( + namespace, + config_map_name +): + """Get the MinIO config info in the configmap. + + namespace: namespace; + config_map_name: config map name + """ + try: + kube = client.KubeEnv() + + config_map = kube.read_namespaced_config_map( + namespace=namespace, + name=config_map_name + ) + + config = config_map.data.get('config') + + json_data = yaml.safe_load(config) + + datasource = json_data['systemDatasource'] + + minio_cr_object = kube.get_datasource_object( + namespace=datasource['namespace'], + name=datasource['name'] + ) + + minio_api_url = minio_cr_object['spec']['endpoint']['url'] + + minio_secure = True + insecure_str = str(minio_cr_object['spec']['endpoint']['insecure']) + if insecure_str == 'true': + minio_secure = False + + + secret_info = kube.get_secret_info( + namespace=namespace, + name=minio_cr_object['spec']['endpoint']['authSecret']['name'] + ) + + return { + 'minio_api_url': minio_api_url, + 'minio_secure': minio_secure, + 'minio_access_key': base64.b64decode(secret_info['rootUser']).decode('utf-8'), + 'minio_secret_key': base64.b64decode(secret_info['rootPassword']).decode('utf-8') + } + except Exception as ex: + logger.error(''.join([ + f"Can not get the MinIO config info. The error is: \n", + f"{traceback.format_exc()}\n" + ])) + + return None \ No newline at end of file diff --git a/data-processing/data_manipulation/kube/model_cr.py b/data-processing/data_manipulation/kube/model_cr.py new file mode 100644 index 000000000..192741008 --- /dev/null +++ b/data-processing/data_manipulation/kube/model_cr.py @@ -0,0 +1,133 @@ +# Copyright 2023 KubeAGI. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import yaml + +from utils import date_time_utils + +from . import client + +logger = logging.getLogger(__name__) + +def get_spec_for_llms_k8s_cr( + name, + namespace +): + """ get worker model. + + name: model name; + namespace: namespace; + """ + try: + kube = client.KubeEnv() + + one_cr_llm = kube.get_versionedmodels_status( + namespace=namespace, + name=name + ) + + provider = one_cr_llm['spec'] + + return { + 'status': 200, + 'message': '获取llms中的provider成功', + 'data': provider + } + except Exception as ex: + logger.error(str(ex)) + return { + 'status': 400, + 'message': '获取llms中的provider失败', + 'data': '' + } + + +def get_worker_base_url_k8s_configmap( + name, + namespace +): + """ get base url for configmap. + + name: model name; + namespace: namespace; + """ + try: + kube = client.KubeEnv() + + config_map = kube.read_namespaced_config_map( + name=name, + namespace=namespace + ) + + config = config_map.data.get('config') + + json_data = yaml.safe_load(config) + external_api_server = json_data.get('gateway', {}).get('apiServer') + + return external_api_server + except Exception as ex: + logger.error(str(ex)) + return None + + +def get_secret_info( + name, + namespace +): + """ get secret info by name and namespace. + + name: model name; + namespace: namespace; + """ + try: + kube = client.KubeEnv() + + return kube.get_secret_info( + namespace=namespace, + name=name + ) + except Exception as ex: + logger.error(str(ex)) + return None + +def get_llm_qa_retry_count_in_k8s_configmap( + namespace, + config_map_name +): + """Get the llm QA retry count in the configmap. + + namespace: namespace; + config_map_name: config map name + """ + try: + kube = client.KubeEnv() + + config_map = kube.read_namespaced_config_map( + namespace=namespace, + name=config_map_name + ) + + config = config_map.data.get('dataprocess') + + json_data = yaml.safe_load(config) + + return json_data['llm']['qa_retry_count'] + except Exception as ex: + logger.error(''.join([ + f"Can not the llm QA retry count. The error is: \n", + f"{traceback.format_exc()}\n" + ])) + + return None diff --git a/data-processing/data_manipulation/kube/postgresql_cr.py b/data-processing/data_manipulation/kube/postgresql_cr.py new file mode 100644 index 000000000..426748cb7 --- /dev/null +++ b/data-processing/data_manipulation/kube/postgresql_cr.py @@ -0,0 +1,52 @@ +# Copyright 2023 KubeAGI. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import traceback +import yaml + +from . import client + +logger = logging.getLogger(__name__) + + +def get_postgresql_config_in_k8s_configmap( + namespace, + config_map_name +): + """Get the PostgreSQL config info in the configmap. + + namespace: namespace; + config_map_name: config map name + """ + try: + kube = client.KubeEnv() + + config_map = kube.read_namespaced_config_map( + namespace=namespace, + name=config_map_name + ) + + config = config_map.data.get('dataprocess') + + json_data = yaml.safe_load(config) + + return json_data['postgresql'] + except Exception as ex: + logger.error(''.join([ + f"Can not get the PostgreSQL config info. The error is: \n", + f"{traceback.format_exc()}\n" + ])) + + return None \ No newline at end of file diff --git a/data-processing/data_manipulation/llm_api_service/qa_provider_open_ai.py b/data-processing/data_manipulation/llm_api_service/qa_provider_open_ai.py index 3785d8a47..3cfa7ceef 100644 --- a/data-processing/data_manipulation/llm_api_service/qa_provider_open_ai.py +++ b/data-processing/data_manipulation/llm_api_service/qa_provider_open_ai.py @@ -20,9 +20,12 @@ from common import log_tag_const from common.config import config -from langchain.chains import LLMChain -from langchain.llms import OpenAI -from langchain.prompts import PromptTemplate +from langchain.chat_models import ChatOpenAI +from langchain import LLMChain +from langchain.prompts.chat import ( + ChatPromptTemplate, + HumanMessagePromptTemplate, +) from llm_prompt_template import open_ai_prompt from .base_qa_provider import BaseQAProvider @@ -34,21 +37,16 @@ class QAProviderOpenAI(BaseQAProvider): def __init__( self, - api_key=None, - base_url=None, - model=None + api_key, + base_url, + model ): - if api_key is None: - api_key = config.open_ai_default_key - if base_url is None: - base_url = config.open_ai_default_base_url - if model is None: - model = config.open_ai_default_model - - self.llm = OpenAI( + # TODO: temperature and top_p/top_k should be configured later + self.llm = ChatOpenAI( openai_api_key=api_key, base_url=base_url, - model=model + model=model, + temperature=0.8 ) def generate_qa_list( @@ -68,40 +66,57 @@ def generate_qa_list( if prompt_template is None: prompt_template = open_ai_prompt.get_default_prompt_template() - prompt = PromptTemplate( - template=prompt_template, - input_variables=["text"] - ) + human_message_prompt = HumanMessagePromptTemplate.from_template(prompt_template) + prompt = ChatPromptTemplate.from_messages([human_message_prompt]) llm_chain = LLMChain( prompt=prompt, llm=self.llm ) result = [] + status = 200 + message = '' invoke_count = 0 while True: try: - response = llm_chain.run(text) - result = self.__get_qa_list_from_response(response) - if len(result) > 0 or invoke_count > int(config.llm_qa_retry_count): - logger.debug(''.join([ - f"{log_tag_const.OPEN_AI} The QA list is \n", - f"\n{result}\n" + if invoke_count >= int(config.llm_qa_retry_count): + logger.error(''.join([ + f"{log_tag_const.OPEN_AI} Cannot access the open ai service.\n", + f"The tracing error is: \n{traceback.format_exc()}\n" ])) + + status = 1000 + message = traceback.format_exc() + break else: - logger.warn('failed to get QA list, wait for 2 seconds and retry') - time.sleep(5) # sleep 5 seconds - invoke_count += 1 + response = llm_chain.run(text=text) + result = self.__get_qa_list_from_response(response) + if len(result) > 0: + break + elif invoke_count > int(config.llm_qa_retry_count): + logger.error(''.join([ + f"{log_tag_const.OPEN_AI} Cannot access the open ai service.\n", + f"The tracing error is: \n{traceback.format_exc()}\n" + ])) + + status = 1000 + message = traceback.format_exc() + + break + else: + logger.warn('failed to get QA list, wait for 10 seconds and retry') + time.sleep(10) # sleep 10 seconds + invoke_count += 1 except Exception as ex: - result = [] - logger.error(''.join([ - f"{log_tag_const.OPEN_AI} Cannot access the open ai service.\n", - f"The tracing error is: \n{traceback.format_exc()}\n" - ])) - time.sleep(5) + time.sleep(10) + invoke_count += 1 - return result + return { + 'status': status, + 'message': message, + 'data': result + } def __get_qa_list_from_response( diff --git a/data-processing/data_manipulation/llm_api_service/qa_provider_zhi_pu_ai_online.py b/data-processing/data_manipulation/llm_api_service/qa_provider_zhi_pu_ai_online.py index bea67f492..c6982f84a 100644 --- a/data-processing/data_manipulation/llm_api_service/qa_provider_zhi_pu_ai_online.py +++ b/data-processing/data_manipulation/llm_api_service/qa_provider_zhi_pu_ai_online.py @@ -16,6 +16,7 @@ import logging import re import traceback +import time import zhipuai from common import log_tag_const @@ -50,7 +51,6 @@ def generate_qa_list( prompt_template the prompt template """ - print('xx', 'text', text) if prompt_template is None: prompt_template = zhi_pu_ai_prompt.get_default_prompt_template() @@ -59,28 +59,69 @@ def generate_qa_list( ) result = [] - try: - response = zhipuai.model_api.invoke( - model="chatglm_6b", - prompt=[{"role": "user", "content": content}], - top_p=0.7, - temperature=0.9, - ) - if response['success']: - result = self.__format_response_to_qa_list(response) - else: - logger.error(''.join([ - f"{log_tag_const.ZHI_PU_AI} Cannot access the ZhiPuAI service.\n", - f"The error is: \n{response['msg']}\n" - ])) - except Exception as ex: - result = [] - logger.error(''.join([ - f"{log_tag_const.ZHI_PU_AI} Cannot access the ZhiPuAI service.\n", - f"The tracing error is: \n{traceback.format_exc()}\n" + status = 200 + message = '' + invoke_count = 0 + while True: + logger.debug(''.join([ + f"{log_tag_const.ZHI_PU_AI} content.\n", + f"{content}\n" ])) - - return result + try: + if invoke_count >= int(config.llm_qa_retry_count): + logger.error(''.join([ + f"{log_tag_const.ZHI_PU_AI} Cannot access the open ai service.\n", + f"The tracing error is: \n{traceback.format_exc()}\n" + ])) + + status = 1000 + message = traceback.format_exc() + + break + else: + # TODO: temperature and top_p/top_k should be configured later + response = zhipuai.model_api.invoke( + model="chatglm_6b", + prompt=[{"role": "user", "content": content}], + top_p=0.7, + temperature=0.9, + ) + if response['success']: + result = self.__format_response_to_qa_list(response) + if len(result) > 0: + break + elif invoke_count > int(config.llm_qa_retry_count): + logger.error(''.join([ + f"{log_tag_const.ZHI_PU_AI} Cannot access the open ai service.\n", + f"The tracing error is: \n{traceback.format_exc()}\n" + ])) + + status = 1000 + message = traceback.format_exc() + + break + else: + logger.warn('failed to get QA list, wait for 10 seconds and retry') + time.sleep(10) # sleep 10 seconds + invoke_count += 1 + else: + logger.error(''.join([ + f"{log_tag_const.ZHI_PU_AI} Cannot access the ZhiPuAI service.\n", + f"The error is: \n{response['msg']}\n" + ])) + logger.warn('zhipuai request failed, wait for 10 seconds and retry') + time.sleep(10) # sleep 10 seconds + invoke_count += 1 + except Exception as ex: + logger.warn('zhipuai request exception, wait for 10 seconds and retry') + time.sleep(10) + invoke_count += 1 + + return { + 'status': status, + 'message': message, + 'data': result + } def __format_response_to_qa_list(self, response): diff --git a/data-processing/data_manipulation/llm_prompt_template/open_ai_prompt.py b/data-processing/data_manipulation/llm_prompt_template/open_ai_prompt.py index 7e09b99a1..d5a383672 100644 --- a/data-processing/data_manipulation/llm_prompt_template/open_ai_prompt.py +++ b/data-processing/data_manipulation/llm_prompt_template/open_ai_prompt.py @@ -17,7 +17,7 @@ def get_default_prompt_template(): prompt_template = """ {text} - 请将上述内容按照问答的方式,提出不超过 25 个问题,并给出每个问题的答案,每个问题必须有 Q 和对应的 A,并严格按照以下方式展示: Q1: 问题。\n A1: 答案。\n Q2: 问题 \n 2: 答案\n 注意,尽可能多的提出问题,但是 Q 不要重复,也不要出现只有 Q 没有 A 的情况。 + 请将上述内容按照问答的方式,提出不超过 25 个问题,并给出每个问题的答案,每个问题必须有 Q 和对应的 A,并严格按照以下方式展示: Q1: 问题。\n A1: 答案。\n Q2: 问题 \n A2: 答案\n 注意,尽可能多的提出问题,但是 Q 不要重复,也不要出现只有 Q 没有 A 的情况。 """ return prompt_template \ No newline at end of file diff --git a/data-processing/data_manipulation/llm_prompt_template/zhi_pu_ai_prompt.py b/data-processing/data_manipulation/llm_prompt_template/zhi_pu_ai_prompt.py index 7e09b99a1..d5a383672 100644 --- a/data-processing/data_manipulation/llm_prompt_template/zhi_pu_ai_prompt.py +++ b/data-processing/data_manipulation/llm_prompt_template/zhi_pu_ai_prompt.py @@ -17,7 +17,7 @@ def get_default_prompt_template(): prompt_template = """ {text} - 请将上述内容按照问答的方式,提出不超过 25 个问题,并给出每个问题的答案,每个问题必须有 Q 和对应的 A,并严格按照以下方式展示: Q1: 问题。\n A1: 答案。\n Q2: 问题 \n 2: 答案\n 注意,尽可能多的提出问题,但是 Q 不要重复,也不要出现只有 Q 没有 A 的情况。 + 请将上述内容按照问答的方式,提出不超过 25 个问题,并给出每个问题的答案,每个问题必须有 Q 和对应的 A,并严格按照以下方式展示: Q1: 问题。\n A1: 答案。\n Q2: 问题 \n A2: 答案\n 注意,尽可能多的提出问题,但是 Q 不要重复,也不要出现只有 Q 没有 A 的情况。 """ return prompt_template \ No newline at end of file diff --git a/data-processing/data_manipulation/server.py b/data-processing/data_manipulation/server.py index dac6e5959..c69b2e781 100644 --- a/data-processing/data_manipulation/server.py +++ b/data-processing/data_manipulation/server.py @@ -74,40 +74,6 @@ async def shutdown_web_server(app, loop): app.blueprint(data_process_controller.data_process) - -@app.route('test-config', methods=['POST']) -async def test_config(request): - from common.config import config - - data = { - 'minio_access_key': config.minio_access_key, - 'minio_secret_key': config.minio_secret_key, - 'minio_api_url': config.minio_api_url, - 'minio_secure': config.minio_secure, - 'minio_dataset_prefix': config.minio_dataset_prefix, - 'zhipuai_api_key': config.zhipuai_api_key, - 'llm_use_type': config.llm_use_type, - 'open_ai_default_key': config.open_ai_default_key, - 'open_ai_default_base_url': config.open_ai_default_base_url, - 'open_ai_default_model': config.open_ai_default_model, - 'knowledge_chunk_size': config.knowledge_chunk_size, - 'knowledge_chunk_overlap': config.knowledge_chunk_overlap, - 'pg_host': config.pg_host, - 'pg_port': config.pg_port, - 'pg_user': config.pg_user, - 'pg_password': config.pg_password, - 'pg_database': config.pg_database - - } - - return json({ - 'status': 200, - 'message': '', - 'data': data - }) - - - def _create_database_connection(): """Create a database connection.""" return psycopg2.connect( diff --git a/data-processing/data_manipulation/service/data_process_service.py b/data-processing/data_manipulation/service/data_process_service.py index aaf61a3d6..972302c94 100644 --- a/data-processing/data_manipulation/service/data_process_service.py +++ b/data-processing/data_manipulation/service/data_process_service.py @@ -294,8 +294,10 @@ def _set_basic_info_for_config_map_for_result( # data clean if process_cofig_map.get('remove_invisible_characters') or \ process_cofig_map.get('space_standardization') or \ + process_cofig_map.get('remove_garbled_text') or \ process_cofig_map.get('traditional_to_simplified') or \ - process_cofig_map.get('space_standremove_html_tagardization'): + process_cofig_map.get('remove_html_tag') or \ + process_cofig_map.get('remove_emojis'): if from_result.get('clean') is None: from_result['clean'] = { 'name': 'clean', @@ -305,14 +307,16 @@ def _set_basic_info_for_config_map_for_result( } # remove privacy - if process_cofig_map.get('remove_email'): + if process_cofig_map.get('remove_email') or \ + process_cofig_map.get('remove_ip_address') or \ + process_cofig_map.get('remove_number'): if from_result.get('privacy') is None: from_result['privacy'] = { 'name': 'privacy', 'description': '数据隐私处理', 'status': 'succeed', 'children': [] - } + } def _set_children_info_for_config_map_for_result( @@ -369,6 +373,62 @@ def _set_children_info_for_config_map_for_result( ) }) + # remove garbled text + if process_cofig_map.get('remove_garbled_text'): + from_result['clean']['children'].append({ + 'name': 'remove_garbled_text', + 'enable': 'true', + 'zh_name': '去除乱码', + 'description': '去除乱码和无意义的unicode', + 'preview': _get_transform_preview_list( + task_id=task_id, + transform_type='remove_garbled_text', + conn_pool=conn_pool + ) + }) + + # traditional to simplified + if process_cofig_map.get('traditional_to_simplified'): + from_result['clean']['children'].append({ + 'name': 'traditional_to_simplified', + 'enable': 'true', + 'zh_name': '繁转简', + 'description': '繁体转简体,如“不經意,妳的笑容”清洗成“不经意,你的笑容”', + 'preview': _get_transform_preview_list( + task_id=task_id, + transform_type='traditional_to_simplified', + conn_pool=conn_pool + ) + }) + + # remove html tag + if process_cofig_map.get('remove_html_tag'): + from_result['clean']['children'].append({ + 'name': 'remove_html_tag', + 'enable': 'true', + 'zh_name': '去除网页标识符', + 'description': '移除文档中的html标签, 如,,

等', + 'preview': _get_transform_preview_list( + task_id=task_id, + transform_type='remove_html_tag', + conn_pool=conn_pool + ) + }) + + # remove emojis + if process_cofig_map.get('remove_emojis'): + from_result['clean']['children'].append({ + 'name': 'remove_emojis', + 'enable': 'true', + 'zh_name': '去除表情', + 'description': '去除文档中的表情,如‘🐰’, ‘🧑🏼’等', + 'preview': _get_transform_preview_list( + task_id=task_id, + transform_type='remove_emojis', + conn_pool=conn_pool + ) + }) + # remove email if process_cofig_map.get('remove_email'): from_result['privacy']['children'].append({ @@ -383,6 +443,33 @@ def _set_children_info_for_config_map_for_result( ) }) + # remove ip address + if process_cofig_map.get('remove_ip_address'): + from_result['privacy']['children'].append({ + 'name': 'remove_ip_address', + 'enable': 'true', + 'zh_name': '去除IP地址', + 'description': '去除IPv4 或者 IPv6 地址', + 'preview': _get_transform_preview_list( + task_id=task_id, + transform_type='remove_ip_address', + conn_pool=conn_pool + ) + }) + + # remove number + if process_cofig_map.get('remove_number'): + from_result['privacy']['children'].append({ + 'name': 'remove_number', + 'enable': 'true', + 'zh_name': '去除数字', + 'description': '去除数字和字母数字标识符,如电话号码、信用卡号、十六进制散列等,同时跳过年份和简单数字的实例', + 'preview': _get_transform_preview_list( + task_id=task_id, + transform_type='remove_number', + conn_pool=conn_pool + ) + }) def _get_transform_preview_list( diff --git a/data-processing/data_manipulation/transform/text/clean_transform.py b/data-processing/data_manipulation/transform/text/clean_transform.py index e661e2dbf..793f5c0f6 100644 --- a/data-processing/data_manipulation/transform/text/clean_transform.py +++ b/data-processing/data_manipulation/transform/text/clean_transform.py @@ -37,8 +37,8 @@ def remove_invisible_characters(text): “一户一表、水表出户、抄表到户”是指一个家庭用户安装一个计量水表,计量水表安装在住宅的公共部位,供水企业抄表到户,按户计量收费。 """ try: - pattern = r'[\x00-\x1F\x7F-\x9F\xAD\r\n\t\b\x0B\x1C\x1D\x1E]' - find_pattern = r'[^,。!?,.!?]*[\x00-\x1F\x7F-\x9F\xAD\r\n\t\b\x0B\x1C\x1D\x1E][^,。!?,.!?]*' + pattern = r'[\x00-\x1F\x7F-\x9F\xAD\r\t\b\x0B\x1C\x1D\x1E]' + find_pattern = r'[^,。!?,.!?]*[\x00-\x1F\x7F-\x9F\xAD\r\t\b\x0B\x1C\x1D\x1E][^,。!?,.!?]*' clean_text = re.sub(pattern, '', text) @@ -246,11 +246,11 @@ def remove_emojis(text): clean_text = re.sub(pattern, '', text) - clean_data = _find_clean_data({ - 'text': text, - 'pattern': pattern, - 'find_pattern': find_pattern - }) + clean_data = _find_clean_data( + text=text, + pattern=pattern, + find_pattern=find_pattern + ) return { 'status': 200, diff --git a/data-processing/data_manipulation/utils/docx_utils.py b/data-processing/data_manipulation/utils/docx_utils.py new file mode 100644 index 000000000..18929dbdb --- /dev/null +++ b/data-processing/data_manipulation/utils/docx_utils.py @@ -0,0 +1,31 @@ +# Copyright 2023 KubeAGI. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import docx + +def get_content( + file_path +): + """Get the content from a word docx file. + + file_path: file path; + """ + doc = docx.Document(file_path) + content = "" + for i in range(len(doc.paragraphs)): + para = doc.paragraphs[i] + text = para.text + content += text + + return content diff --git a/data-processing/database/base.sql b/data-processing/database/base.sql index 5cf262d52..0cfb45126 100644 --- a/data-processing/database/base.sql +++ b/data-processing/database/base.sql @@ -26,8 +26,6 @@ CREATE TABLE IF NOT EXISTS public.data_process_task CONSTRAINT data_process_task_pkey PRIMARY KEY (id) ) -TABLESPACE pg_default; - -- Table: public.data_process_task_detail -- DROP TABLE IF EXISTS public.data_process_task_detail; diff --git a/data-processing/requirements.txt b/data-processing/requirements.txt index a22e3b2fc..a310da981 100644 --- a/data-processing/requirements.txt +++ b/data-processing/requirements.txt @@ -20,3 +20,5 @@ pyyaml==6.0.1 opencc==0.2 opencc-python-reimplemented==0.1.7 selectolax==0.3.17 +openai==1.3.7 +python-docx==1.1.0