Skip to content

Commit

Permalink
adaptation current dataset
Browse files Browse the repository at this point in the history
  • Loading branch information
SunsetWolf committed Jan 7, 2025
1 parent b275bf3 commit 7666a9f
Showing 1 changed file with 126 additions and 110 deletions.
236 changes: 126 additions & 110 deletions scripts/check_data_health.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import enum
import logging
from loguru import logger
import os
from typing import List, Optional, Dict, Tuple

Expand Down Expand Up @@ -31,6 +31,7 @@ def __init__(
qlib_dir=None,
large_step_threshold_price=0.5,
large_step_threshold_volume=3,
missing_data_num=0,
):
assert csv_path or qlib_dir, "One of csv_path or qlib_dir should be provided."
assert not (
Expand All @@ -41,6 +42,7 @@ def __init__(
self.problems = {}
self.large_step_threshold_price = large_step_threshold_price
self.large_step_threshold_volume = large_step_threshold_volume
self.missing_data_num = missing_data_num

if csv_path:
assert os.path.isdir(csv_path), f"{csv_path} should be a directory."
Expand All @@ -56,133 +58,147 @@ def __init__(
def load_qlib_data(self):
instruments = D.instruments(market="all")
instrument_list = D.list_instruments(instruments=instruments, as_list=True)
required_fields = ["$open", "$close", "$low", "$high", "$volume"]
required_fields = ["$open", "$close", "$low", "$high", "$volume", "$factor"]
for instrument in instrument_list:
df = D.features([instrument], required_fields, freq="day")
df.rename(
columns={
'$open': 'open',
'$close': 'close',
'$low': 'low',
'$high': 'high',
'$volume': 'volume',
'$factor': 'factor',
},
inplace=True,
)
self.data[instrument] = df

def check_missing_data(
self, filename: str, df: pd.DataFrame
) -> Optional[Tuple[DataProblem, List[str]]]:
def check_missing_data(self) -> Optional[Tuple[DataProblem, List[str]]]:
"""Check if any data is missing in the DataFrame."""
if df.isnull().values.any():
result_dict = {
"instruments": [],
"open": [],
"high": [],
"low": [],
"close": [],
"volume": [],
}
for filename, df in self.data.items():
missing_data_columns = (
df.isnull().sum()[df.isnull().sum() > 0].index.tolist()
)
logging.warning(
f"{filename}: Missing data in columns {missing_data_columns}."
df.isnull().sum()[df.isnull().sum() > self.missing_data_num].index.tolist()
)
return self.DataProblem.MISSING_DATA, missing_data_columns
if len(missing_data_columns) > 0:
result_dict["instruments"].append(filename)
result_dict["open"].append(df.isnull().sum()["open"])
result_dict["high"].append(df.isnull().sum()["high"])
result_dict["low"].append(df.isnull().sum()["low"])
result_dict["close"].append(df.isnull().sum()["close"])
result_dict["volume"].append(df.isnull().sum()["volume"])

result_df = pd.DataFrame(result_dict).set_index("instruments")
if not result_df.empty:
return result_df
else:
logger.info(f"✅ There are no missing data.")

def check_large_step_changes(
self, filename: str, df: pd.DataFrame
) -> Optional[Tuple[DataProblem, List[str]]]:
def check_large_step_changes(self) -> Optional[Tuple[DataProblem, List[str]]]:
"""Check if there are any large step changes above the threshold in the OHLCV columns."""
affected_columns = []
for col in ["open", "high", "low", "close", "volume"]:
if col in df.columns:
pct_change = df[col].pct_change().abs()
threshold = (
self.large_step_threshold_volume
if col == "volume"
else self.large_step_threshold_price
)
if pct_change.max() > threshold:
large_steps = pct_change[pct_change > threshold]
logging.warning(
f"{filename}: Relative step changes above threshold {threshold} in column '{col}' at indices {large_steps.index.tolist()}."
result_dict = {
"instruments": [],
"col_name": [],
"date": [],
"pct_change": [],
}
for filename, df in self.data.items():
affected_columns = []
for col in ["open", "high", "low", "close", "volume"]:
if col in df.columns:
pct_change = df[col].pct_change(fill_method=None).abs()
threshold = (
self.large_step_threshold_volume
if col == "volume"
else self.large_step_threshold_price
)
affected_columns.append(col)
if affected_columns:
return self.DataProblem.LARGE_STEP_CHANGE, affected_columns
if pct_change.max() > threshold:
large_steps = pct_change[pct_change > threshold]
result_dict["instruments"].append(filename)
result_dict["col_name"].append(col)
result_dict["date"].append(large_steps.index.to_list()[0][1].strftime("%Y-%m-%d"))
result_dict["pct_change"].append(pct_change.max())
affected_columns.append(col)

result_df = pd.DataFrame(result_dict).set_index("instruments")
if not result_df.empty:
return result_df
else:
logger.info(f"✅ There are no large step changes in the OHLCV column above the threshold.")

def check_required_columns(
self, filename: str, df: pd.DataFrame
) -> Optional[Tuple[DataProblem, List[str]]]:
def check_required_columns(self) -> Optional[Tuple[DataProblem, List[str]]]:
"""Check if any of the required columns (OLHCV) are missing in the DataFrame."""
required_columns = ["open", "high", "low", "close", "volume"]
if not all(column in df.columns for column in required_columns):
missing_required_columns = [
column for column in required_columns if column not in df.columns
]
logging.error(
f"{filename}: Missing columns {missing_required_columns} of required columns {required_columns}."
)
return self.DataProblem.MISSING_REQUIRED_COLUMN, missing_required_columns
result_dict = {
"instruments": [],
"missing_col": [],
}
for filename, df in self.data.items():
if not all(column in df.columns for column in required_columns):
missing_required_columns = [
column for column in required_columns if column not in df.columns
]
result_dict["instruments"].append(filename)
result_dict["missing_col"] += missing_required_columns

result_df = pd.DataFrame(result_dict).set_index("instruments")
if not result_df.empty:
return result_df
else:
logger.info(f"✅ The columns (OLHCV) are complete and not missing.")

def check_missing_factor(
self, filename: str, df: pd.DataFrame
) -> Optional[Tuple[DataProblem, List[str]]]:
def check_missing_factor(self) -> Optional[Tuple[DataProblem, List[str]]]:
"""Check if the 'factor' column is missing in the DataFrame."""
if "factor" not in df.columns:
logging.warning(
f"{filename}: Missing 'factor' column, trading unit will be disabled."
)
return self.DataProblem.MISSING_FACTOR, ["factor"]
elif df["factor"].isnull().any():
logging.warning(
f"{filename}: Missing factor data, trading unit may be incorrectly adjusted."
)
return self.DataProblem.MISSING_FACTOR, ["factor"]
result_dict = {
"instruments": [],
"missing_factor_col": [],
"missing_factor_data": [],
}
for filename, df in self.data.items():
if "factor" not in df.columns:
result_dict["instruments"].append(filename)
result_dict["missing_factor_col"].append(True)
if df["factor"].isnull().all():
if filename in result_dict["instruments"]:
result_dict["missing_factor_data"].append(True)
else:
result_dict["instruments"].append(filename)
result_dict["missing_factor_col"].append(False)
result_dict["missing_factor_data"].append(True)

result_df = pd.DataFrame(result_dict).set_index("instruments")
if not result_df.empty:
return result_df
else:
logger.info(f"✅ The `factor` column already exists and is not empty.")

def check_data(self):
checks = [
self.check_missing_data,
self.check_large_step_changes,
self.check_required_columns,
self.check_missing_factor,
]
for filename, df in self.data.items():
for check in checks:
problem = check(filename, df)
if problem:
self.problems.setdefault(filename, []).append(problem)
self._print_report(self.problems)

def _print_report(self, problems: Dict[str, List[Tuple[DataProblem, str]]]):
"""Count the number of problems for each type and print the report together with the affected columns."""
if problems:
problem_stats_by_type = {}
for _, problem_tuples in problems.items():
for name, affected_columns in problem_tuples:
stats = problem_stats_by_type.setdefault(
name, {"count": 0, "affected_columns": set()}
)
stats["count"] += 1
stats["affected_columns"].update(affected_columns)
check_missing_data_result = self.check_missing_data()
check_large_step_changes_result = self.check_large_step_changes()
check_required_columns_result = self.check_required_columns()
check_missing_factor_result = self.check_missing_factor()
print(f"\nSummary of data health check ({len(self.data)} files checked):")
print("-----------------------")
padding_between_columns = 2
padding_problem_name = (
max(len(problem.name) for problem in self.DataProblem)
+ padding_between_columns
)
padding_count = (
max(
len(str(stats["count"])) for stats in problem_stats_by_type.values()
)
+ padding_between_columns
)
print(
"Problem".ljust(padding_problem_name),
"Count".ljust(padding_count),
"Affected columns",
)
for problem in self.DataProblem:
padded_name = problem.name.ljust(padding_problem_name)
padded_count = str(
problem_stats_by_type[problem]["count"]
if problem in problem_stats_by_type
else 0
).ljust(padding_count)
affected_columns = (
problem_stats_by_type[problem]["affected_columns"]
if problem in problem_stats_by_type
else "-"
)
print(padded_name, padded_count, affected_columns)
else:
logging.info("Data check passed. No problems found.")
print("-------------------------------------------------")
if isinstance(check_missing_data_result, pd.DataFrame):
logger.warning(f"There is missing data.")
print(check_missing_data_result)
if isinstance(check_large_step_changes_result, pd.DataFrame):
logger.warning(f"The OHLCV column has large step changes.")
print(check_large_step_changes_result)
if isinstance(check_required_columns_result, pd.DataFrame):
logger.warning(f"Columns (OLHCV) are missing.")
print(check_required_columns_result)
if isinstance(check_missing_factor_result, pd.DataFrame):
logger.warning(f"The factor column does not exist or is empty")
print(check_missing_factor_result)


if __name__ == "__main__":
Expand Down

0 comments on commit 7666a9f

Please sign in to comment.