Skip to content

Commit

Permalink
feat(clp-package): Add support for extracting JSON streams from archi…
Browse files Browse the repository at this point in the history
…ves stored on S3. (#678)

Co-authored-by: haiqi96 <14502009+haiqi96@users.noreply.github.com>
  • Loading branch information
gibber9809 and haiqi96 authored Jan 20, 2025
1 parent f0b846d commit ba63a76
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import sys
from typing import Optional

from clp_py_utils.clp_config import CLPConfig, StorageType
from clp_py_utils.clp_config import CLPConfig, StorageEngine, StorageType

from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
Expand Down Expand Up @@ -82,8 +82,12 @@ def handle_extract_file_cmd(
return -1

storage_type = clp_config.archive_output.storage.type
if StorageType.FS != storage_type:
logger.error(f"File extraction is not supported for archive storage type: {storage_type}.")
storage_engine = clp_config.package.storage_engine
if StorageType.FS != storage_type or StorageEngine.CLP != storage_engine:
logger.error(
f"File extraction is not supported for archive storage type `{storage_type}` with"
f" storage engine `{storage_engine}`."
)
return -1

container_name = generate_container_name(str(JobType.FILE_EXTRACTION))
Expand Down Expand Up @@ -162,9 +166,11 @@ def handle_extract_stream_cmd(
return -1

storage_type = clp_config.archive_output.storage.type
if StorageType.FS != storage_type:
storage_engine = clp_config.package.storage_engine
if StorageType.S3 == storage_type and StorageEngine.CLP == storage_engine:
logger.error(
f"Stream extraction is not supported for archive storage type: {storage_type}."
f"Stream extraction is not supported for archive storage type `{storage_type}` with"
f" storage engine `{storage_engine}`."
)
return -1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
import json
import os
from pathlib import Path
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Tuple

from celery.app.task import Task
from celery.utils.log import get_task_logger
from clp_py_utils.clp_config import Database, S3Config, StorageEngine, StorageType, WorkerConfig
from clp_py_utils.clp_logging import set_logging_level
from clp_py_utils.s3_utils import s3_put
from clp_py_utils.s3_utils import generate_s3_virtual_hosted_style_url, s3_put
from clp_py_utils.sql_adapter import SQL_Adapter
from job_orchestration.executor.query.celery import app
from job_orchestration.executor.query.utils import (
Expand All @@ -23,65 +23,153 @@
logger = get_task_logger(__name__)


def make_command(
def _make_clp_command_and_env_vars(
clp_home: Path,
worker_config: WorkerConfig,
archive_id: str,
job_config: dict,
results_cache_uri: str,
print_stream_stats: bool,
) -> Optional[List[str]]:
storage_engine = worker_config.package.storage_engine
) -> Tuple[Optional[List[str]], Optional[Dict[str, str]]]:
storage_type = worker_config.archive_output.storage.type
archives_dir = worker_config.archive_output.get_directory()
stream_output_dir = worker_config.stream_output.get_directory()
stream_collection_name = worker_config.stream_collection_name

if StorageEngine.CLP == storage_engine:
logger.info("Starting IR extraction")
extract_ir_config = ExtractIrJobConfig.parse_obj(job_config)
if not extract_ir_config.file_split_id:
logger.error("file_split_id not supplied")
return None
command = [
str(clp_home / "bin" / "clo"),
"i",
str(archives_dir / archive_id),
extract_ir_config.file_split_id,
if StorageType.S3 == storage_type:
logger.error(
f"IR extraction is not supported for storage type '{storage_type}' while using the"
f" '{worker_config.package.storage_engine}' storage engine."
)
return None, None

logger.info("Starting IR extraction")
extract_ir_config = ExtractIrJobConfig.parse_obj(job_config)
if not extract_ir_config.file_split_id:
logger.error("file_split_id not supplied")
return None, None
command = [
str(clp_home / "bin" / "clo"),
"i",
str(archives_dir / archive_id),
extract_ir_config.file_split_id,
str(stream_output_dir),
results_cache_uri,
stream_collection_name,
]
if extract_ir_config.target_uncompressed_size is not None:
command.append("--target-size")
command.append(str(extract_ir_config.target_uncompressed_size))
if print_stream_stats:
command.append("--print-ir-stats")
return command, None


def _make_clp_s_command_and_env_vars(
clp_home: Path,
worker_config: WorkerConfig,
archive_id: str,
job_config: dict,
results_cache_uri: str,
print_stream_stats: bool,
) -> Tuple[Optional[List[str]], Optional[Dict[str, str]]]:
storage_type = worker_config.archive_output.storage.type
stream_output_dir = worker_config.stream_output.get_directory()
stream_collection_name = worker_config.stream_collection_name

logger.info("Starting JSON extraction")
extract_json_config = ExtractJsonJobConfig.parse_obj(job_config)
command = [
str(clp_home / "bin" / "clp-s"),
"x",
]

if StorageType.S3 == storage_type:
s3_config = worker_config.archive_output.storage.s3_config
try:
s3_url = generate_s3_virtual_hosted_style_url(
s3_config.region_code, s3_config.bucket, f"{s3_config.key_prefix}{archive_id}"
)
except ValueError as ex:
logger.error(f"Encountered error while generating S3 url: {ex}")
return None, None
# fmt: off
command.extend((
s3_url,
str(stream_output_dir),
results_cache_uri,
stream_collection_name,
]
if extract_ir_config.target_uncompressed_size is not None:
command.append("--target-size")
command.append(str(extract_ir_config.target_uncompressed_size))
if print_stream_stats:
command.append("--print-ir-stats")
elif StorageEngine.CLP_S == storage_engine:
logger.info("Starting JSON extraction")
extract_json_config = ExtractJsonJobConfig.parse_obj(job_config)
command = [
str(clp_home / "bin" / "clp-s"),
"x",
str(archives_dir),
"--auth",
"s3",
))
# fmt: on
aws_access_key_id, aws_secret_access_key = s3_config.get_credentials()
if aws_access_key_id is None or aws_secret_access_key is None:
logger.error("Missing credentials for accessing archives on S3")
return None, None
env_vars = {
**os.environ,
"AWS_ACCESS_KEY_ID": aws_access_key_id,
"AWS_SECRET_ACCESS_KEY": aws_secret_access_key,
}
else:
# fmt: off
command.extend((
str(worker_config.archive_output.get_directory()),
str(stream_output_dir),
"--ordered",
"--archive-id",
archive_id,
"--mongodb-uri",
))
# fmt: on
env_vars = None

# fmt: off
command.extend((
"--ordered",
"--mongodb-uri",
results_cache_uri,
"--mongodb-collection",
stream_collection_name,
))
# fmt: on

if extract_json_config.target_chunk_size is not None:
command.append("--target-ordered-chunk-size")
command.append(str(extract_json_config.target_chunk_size))
if print_stream_stats:
command.append("--print-ordered-chunk-stats")
return command, env_vars


def _make_command_and_env_vars(
clp_home: Path,
worker_config: WorkerConfig,
archive_id: str,
job_config: dict,
results_cache_uri: str,
print_stream_stats: bool,
) -> Tuple[Optional[List[str]], Optional[Dict[str, str]]]:
storage_engine = worker_config.package.storage_engine
if StorageEngine.CLP == storage_engine:
command, env_vars = _make_clp_command_and_env_vars(
clp_home,
worker_config,
archive_id,
job_config,
results_cache_uri,
print_stream_stats,
)
elif StorageEngine.CLP_S == storage_engine:
command, env_vars = _make_clp_s_command_and_env_vars(
clp_home,
worker_config,
archive_id,
job_config,
results_cache_uri,
"--mongodb-collection",
stream_collection_name,
]
if extract_json_config.target_chunk_size is not None:
command.append("--target-ordered-chunk-size")
command.append(str(extract_json_config.target_chunk_size))
if print_stream_stats:
command.append("--print-ordered-chunk-stats")
print_stream_stats,
)
else:
logger.error(f"Unsupported storage engine {storage_engine}")
return None

return command
return None, None
return command, env_vars


@app.task(bind=True)
Expand Down Expand Up @@ -117,14 +205,6 @@ def extract_stream(
start_time=start_time,
)

if worker_config.archive_output.storage.type == StorageType.S3:
logger.error(f"Stream extraction is not supported for the S3 storage type")
return report_task_failure(
sql_adapter=sql_adapter,
task_id=task_id,
start_time=start_time,
)

# Make task_command
clp_home = Path(os.getenv("CLP_HOME"))

Expand All @@ -136,7 +216,7 @@ def extract_stream(
s3_config = storage_config.s3_config
enable_s3_upload = True

task_command = make_command(
task_command, core_clp_env_vars = _make_command_and_env_vars(
clp_home=clp_home,
worker_config=worker_config,
archive_id=archive_id,
Expand All @@ -157,7 +237,7 @@ def extract_stream(
logger=logger,
clp_logs_dir=clp_logs_dir,
task_command=task_command,
env_vars=None,
env_vars=core_clp_env_vars,
task_name=task_name,
job_id=job_id,
task_id=task_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ def _make_core_clp_s_command_and_env_vars(
return None, None
env_vars = {
**os.environ,
"AWS_ACCESS_KEY_ID": s3_config.credentials.access_key_id,
"AWS_SECRET_ACCESS_KEY": s3_config.credentials.secret_access_key,
"AWS_ACCESS_KEY_ID": aws_access_key_id,
"AWS_SECRET_ACCESS_KEY": aws_secret_access_key,
}
else:
# fmt: off
Expand Down

0 comments on commit ba63a76

Please sign in to comment.