Skip to content

Commit

Permalink
Azure blob storage connection tweaks (#2252)
Browse files Browse the repository at this point in the history
  • Loading branch information
lferran authored Jun 18, 2024
1 parent 216fc45 commit 1fe9d0a
Show file tree
Hide file tree
Showing 14 changed files with 178 additions and 95 deletions.
43 changes: 30 additions & 13 deletions nucliadb/src/nucliadb/ingest/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@
from nucliadb_protos.resources_pb2 import FieldFile as FieldFilePB
from nucliadb_telemetry import metrics
from nucliadb_utils.exceptions import LimitsExceededError, SendToProcessError
from nucliadb_utils.settings import FileBackendConfig, nuclia_settings, storage_settings
from nucliadb_utils.settings import (
FileBackendConfig,
is_onprem_nucliadb,
nuclia_settings,
storage_settings,
)
from nucliadb_utils.storages.storage import Storage
from nucliadb_utils.utilities import Utility, set_utility

Expand Down Expand Up @@ -129,22 +134,34 @@ async def start_processing_engine():
set_utility(Utility.PROCESSING, processing_engine)


def to_processing_driver_type(file_backend_driver: FileBackendConfig) -> int:
class ProcessingDriverType(Enum):
# XXX IMPORTANT XXX: Make sure the values are in sync with
# the ones defined in nuclia/learning/processing repository
GCS = 0
S3 = 1
LOCAL = 2


def to_processing_driver_type(file_backend_driver: FileBackendConfig) -> ProcessingDriverType:
"""
Outputs a nuclia-internal backend driver identifier that is used by processing
to store the blobs of processed metadata in the right bucket folder.
"""
if file_backend_driver == FileBackendConfig.GCS:
return 0
elif file_backend_driver == FileBackendConfig.S3:
return 1
elif file_backend_driver == FileBackendConfig.LOCAL:
return 2
else:
if is_onprem_nucliadb():
# On-prem installations are always regarded as local storage from the processing perspective,
# as Nuclia processing engine will not have direct access to the storage.
return ProcessingDriverType.LOCAL

try:
return {
FileBackendConfig.GCS: ProcessingDriverType.GCS,
FileBackendConfig.S3: ProcessingDriverType.S3,
}[file_backend_driver]
except KeyError:
logger.error(
f"Not a valid file backend driver to processing, fallback to local: {file_backend_driver}"
)
return 2
return ProcessingDriverType.LOCAL


class ProcessingEngine:
Expand Down Expand Up @@ -180,7 +197,7 @@ def __init__(

self.nuclia_jwt_key = nuclia_jwt_key
self.days_to_keep = days_to_keep
self.driver = to_processing_driver_type(driver)
self.driver: ProcessingDriverType = to_processing_driver_type(driver)
self._exit_stack = AsyncExitStack()

async def initialize(self):
Expand All @@ -203,7 +220,7 @@ def generate_file_token_from_cloudfile(self, cf: CloudFile) -> str:
"iat": now,
"md5": cf.md5,
"source": 1, # To indicate that this files comes internally
"driver": self.driver,
"driver": self.driver.value,
"jti": uuid.uuid4().hex,
"bucket_name": cf.bucket_name,
"filename": cf.filename,
Expand All @@ -227,7 +244,7 @@ def generate_file_token_from_fieldfile(self, file: FieldFilePB) -> str:
"iat": now,
"md5": file.file.md5,
"source": 1, # To indicate that this files comes internally
"driver": self.driver,
"driver": self.driver.value,
"jti": uuid.uuid4().hex,
"bucket_name": file.file.bucket_name,
"filename": file.file.filename,
Expand Down
6 changes: 4 additions & 2 deletions nucliadb/src/nucliadb/search/predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,6 @@ async def check_response(self, resp: aiohttp.ClientResponse, expected_status: in
if resp.status == 402:
data = await resp.json()
raise LimitsExceededError(402, data["detail"])

try:
data = await resp.json()
try:
Expand All @@ -256,7 +255,10 @@ async def check_response(self, resp: aiohttp.ClientResponse, expected_status: in
aiohttp.client_exceptions.ContentTypeError,
):
detail = await resp.text()
logger.error(f"Predict API error at {resp.url}: {detail}")
if str(resp.status).startswith("5"):
logger.error(f"Predict API error at {resp.url}: {detail}")
else:
logger.info(f"Predict API error at {resp.url}: {detail}")
raise ProxiedPredictAPIError(status=resp.status, detail=detail)

@backoff.on_exception(
Expand Down
30 changes: 9 additions & 21 deletions nucliadb/src/nucliadb/search/search/chat/prompt.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,7 @@ async def build(

context = ccontext.output
context_images = ccontext.images
context_order = {
text_block_id: order for order, text_block_id in enumerate(context.keys())
}
context_order = {text_block_id: order for order, text_block_id in enumerate(context.keys())}
return context, context_order, context_images

async def _build_context_images(self, context: CappedPromptContext) -> None:
Expand All @@ -401,27 +399,17 @@ async def _build_context_images(self, context: CappedPromptContext) -> None:

for paragraph in self.ordered_paragraphs:
if paragraph.page_with_visual and paragraph.position:
if (
gather_pages
and paragraph.position.page_number
and len(context.images) < page_count
):
if gather_pages and paragraph.position.page_number and len(context.images) < page_count:
field = "/".join(paragraph.id.split("/")[:3])
page = paragraph.position.page_number
page_id = f"{field}/{page}"
if page_id not in context.images:
context.images[page_id] = await get_page_image(
self.kbid, paragraph.id, page
)
context.images[page_id] = await get_page_image(self.kbid, paragraph.id, page)
# Only send tables if enabled by strategy, by default, send paragraph images
send_images = (
gather_tables and paragraph.is_a_table
) or not paragraph.is_a_table
send_images = (gather_tables and paragraph.is_a_table) or not paragraph.is_a_table
if send_images and paragraph.reference and paragraph.reference != "":
image = paragraph.reference
context.images[paragraph.id] = await get_paragraph_image(
self.kbid, paragraph.id, image
)
context.images[paragraph.id] = await get_paragraph_image(self.kbid, paragraph.id, image)

async def _build_context(self, context: CappedPromptContext) -> None:
if self.strategies is None or len(self.strategies) == 0:
Expand Down Expand Up @@ -473,9 +461,7 @@ class ExtraCharsParagraph:
paragraphs: List[Tuple[FindParagraph, str]]


async def get_extra_chars(
kbid: str, ordered_paragraphs: list[FindParagraph], distance: int
):
async def get_extra_chars(kbid: str, ordered_paragraphs: list[FindParagraph], distance: int):
etcache = paragraphs.ExtractedTextCache()
resources: Dict[str, ExtraCharsParagraph] = {}
for paragraph in ordered_paragraphs:
Expand Down Expand Up @@ -531,7 +517,9 @@ async def get_extra_chars(
paragraph.text = ""

if first_paragraph is not None:
first_paragraph.text = f"DOCUMENT: {title_text} \n SUMMARY: {summary_text} \n RESOURCE CONTENT: {text}"
first_paragraph.text = (
f"DOCUMENT: {title_text} \n SUMMARY: {summary_text} \n RESOURCE CONTENT: {text}"
)


def _clean_paragraph_text(paragraph: FindParagraph) -> str:
Expand Down
9 changes: 6 additions & 3 deletions nucliadb/src/nucliadb/writer/tus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,14 @@ async def initialize():
DRIVER = TusStorageDriver(backend=storage_backend, manager=storage_manager)

elif storage_settings.file_backend == FileBackendConfig.AZURE:
if storage_settings.azure_connection_string is None:
raise ConfigurationError("AZURE_CONNECTION_STRING env variable not configured")
if storage_settings.azure_account_url is None:
raise ConfigurationError("AZURE_ACCOUNT_URL env variable not configured")

storage_backend = AzureBlobStore()
await storage_backend.initialize(storage_settings.azure_connection_string)
await storage_backend.initialize(
storage_settings.azure_account_url,
connection_string=storage_settings.azure_connection_string,
)
storage_manager = AzureFileStorageManager(storage_backend)

DRIVER = TusStorageDriver(backend=storage_backend, manager=storage_manager)
Expand Down
6 changes: 4 additions & 2 deletions nucliadb/src/nucliadb/writer/tus/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#
from __future__ import annotations

from typing import Optional

from nucliadb.writer import logger
from nucliadb.writer.tus.dm import FileDataManager
from nucliadb.writer.tus.storage import BlobStore, FileStorageManager
Expand All @@ -39,10 +41,10 @@ async def finalize(self):
logger.exception("Error closing AzureBlobStore")
self._object_store = None

async def initialize(self, connection_string: str):
async def initialize(self, account_url: str, connection_string: Optional[str] = None):
self.bucket = "nucliadb-{kbid}"
self.source = CloudFile.Source.AZURE
self._object_store = AzureObjectStore(connection_string)
self._object_store = AzureObjectStore(account_url, connection_string=connection_string)
await self._object_store.initialize()

@property
Expand Down
1 change: 1 addition & 0 deletions nucliadb/tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,7 @@ def local_storage_settings(tmpdir):
def azure_storage_settings(azurite: AzuriteFixture):
return {
"file_backend": FileBackendConfig.AZURE,
"azure_account_url": azurite.account_url,
"azure_connection_string": azurite.connection_string,
}

Expand Down
13 changes: 7 additions & 6 deletions nucliadb_node/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,9 @@ pub fn build_object_store_driver(settings: &EnvSettings) -> Arc<dyn ObjectStore>
Arc::new(builder.build().unwrap())
}
ObjectStoreType::AZURE => {
let builder =
MicrosoftAzureBuilder::new().with_allow_http(true).with_url(settings.azure_url.clone().unwrap());
let builder = MicrosoftAzureBuilder::new()
.with_allow_http(true)
.with_url(settings.azure_account_url.clone().unwrap());
Arc::new(builder.build().unwrap())
}
// Any other type is not supported for now
Expand Down Expand Up @@ -250,7 +251,7 @@ pub struct EnvSettings {
pub s3_region_name: String,
pub s3_indexing_bucket: String,
pub s3_endpoint: Option<String>,
pub azure_url: Option<String>,
pub azure_account_url: Option<String>,
}

impl EnvSettings {
Expand Down Expand Up @@ -331,7 +332,7 @@ impl Default for EnvSettings {
s3_region_name: Default::default(),
s3_indexing_bucket: Default::default(),
s3_endpoint: None,
azure_url: Default::default(),
azure_account_url: Default::default(),
}
}
}
Expand Down Expand Up @@ -366,8 +367,8 @@ mod tests {
let settings = from_pairs(&[("FILE_BACKEND", "s3")]).unwrap();
assert_eq!(settings.file_backend, super::ObjectStoreType::S3);

let azure_url = "https://myaccount.blob.core.windows.net/mycontainer/myblob";
let settings = from_pairs(&[("FILE_BACKEND", "azure"), ("azure_url", azure_url)]).unwrap();
let azure_account_url = "https://myaccount.blob.core.windows.net/mycontainer/myblob";
let settings = from_pairs(&[("FILE_BACKEND", "azure"), ("azure_account_url", azure_account_url)]).unwrap();
assert_eq!(settings.file_backend, super::ObjectStoreType::AZURE);

let settings = from_pairs(&[("FILE_BACKEND", "unknown")]).unwrap();
Expand Down
1 change: 1 addition & 0 deletions nucliadb_utils/requirements-storages.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ aiofiles>=0.8.0
backoff>=1.11.1
google-auth>=2.4.1
azure-storage-blob>=12.20.0
azure-identity>=1.16.1
9 changes: 7 additions & 2 deletions nucliadb_utils/src/nucliadb_utils/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,16 @@ class StorageSettings(BaseSettings):
description="Number of days that uploaded files are kept in Nulia's processing engine",
)

azure_connection_string: Optional[str] = Field(
azure_account_url: Optional[str] = Field(
default=None,
description="Azure Storage connection string: https://docs.microsoft.com/en-us/azure/storage/common/storage-configure-connection-string", # noqa
description="Azure Account URL. The driver implementation uses Azure's default credential authentication method: https://learn.microsoft.com/en-us/python/api/azure-identity/azure.identity.defaultazurecredential?view=azure-python", # noqa
examples=["https://<storageaccountname>.blob.core.windows.net"],
)

# For testing purposes: Azurite docker image requires a connection string as it
# doesn't support Azure's default credential authentication method
azure_connection_string: Optional[str] = None


storage_settings = StorageSettings()

Expand Down
17 changes: 13 additions & 4 deletions nucliadb_utils/src/nucliadb_utils/storages/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from typing import AsyncGenerator, AsyncIterator, Optional, Union

from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobProperties, BlobType, ContentSettings
from azure.storage.blob.aio import BlobServiceClient

Expand Down Expand Up @@ -161,11 +162,12 @@ class AzureStorage(Storage):

def __init__(
self,
connection_string: str,
account_url: str,
deadletter_bucket: str = "deadletter",
indexing_bucket: str = "indexing",
connection_string: Optional[str] = None,
):
self.object_store = AzureObjectStore(connection_string)
self.object_store = AzureObjectStore(account_url, connection_string=connection_string)
self.deadletter_bucket = deadletter_bucket
self.indexing_bucket = indexing_bucket

Expand Down Expand Up @@ -215,7 +217,8 @@ async def iterate_objects(self, bucket: str, prefix: str) -> AsyncGenerator[Obje


class AzureObjectStore(ObjectStore):
def __init__(self, connection_string: str):
def __init__(self, account_url: str, connection_string: Optional[str] = None):
self.account_url = account_url
self.connection_string = connection_string
self._service_client: Optional[BlobServiceClient] = None

Expand All @@ -226,7 +229,13 @@ def service_client(self) -> BlobServiceClient:
return self._service_client

async def initialize(self):
self._service_client = BlobServiceClient.from_connection_string(self.connection_string)
if self.connection_string:
# For testing purposes
self._service_client = BlobServiceClient.from_connection_string(self.connection_string)
else:
self._service_client = BlobServiceClient(
self.account_url, credential=DefaultAzureCredential()
)

async def finalize(self):
try:
Expand Down
3 changes: 3 additions & 0 deletions nucliadb_utils/src/nucliadb_utils/tests/azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class AzuriteFixture:
port: int
container: BaseImage
connection_string: str
account_url: str


def get_connection_string(host, port) -> str:
Expand All @@ -99,6 +100,7 @@ def azurite() -> Generator[AzuriteFixture, None, None]:
port=port,
container=container.container_obj,
connection_string=get_connection_string(host, port),
account_url=f"http://{host}:{port}/devstoreaccount1",
)
finally:
container.stop()
Expand All @@ -107,6 +109,7 @@ def azurite() -> Generator[AzuriteFixture, None, None]:
@pytest.fixture(scope="function")
async def azure_storage(azurite):
storage = AzureStorage(
account_url=azurite.account_url,
connection_string=azurite.connection_string,
)
MAIN[Utility.STORAGE] = storage
Expand Down
5 changes: 3 additions & 2 deletions nucliadb_utils/src/nucliadb_utils/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,11 @@ async def get_storage(
if storage_settings.file_backend == FileBackendConfig.AZURE:
from nucliadb_utils.storages.azure import AzureStorage

if storage_settings.azure_connection_string is None:
raise ConfigurationError("AZURE_CONNECTION_STRING env var not configured")
if storage_settings.azure_account_url is None:
raise ConfigurationError("AZURE_ACCOUNT_URL env variable not configured")

azureutil = AzureStorage(
account_url=storage_settings.azure_account_url,
connection_string=storage_settings.azure_connection_string,
)

Expand Down
Loading

3 comments on commit 1fe9d0a

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 1fe9d0a Previous: 0d03d9f Ratio
tests/search/unit/search/test_fetch.py::test_highligh_error 3028.048205574699 iter/sec (stddev: 0.000004720072004695991) 2841.0684406726436 iter/sec (stddev: 0.000004954958228416619) 0.94

This comment was automatically generated by workflow using github-action-benchmark.

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 1fe9d0a Previous: 0d03d9f Ratio
tests/search/unit/search/test_fetch.py::test_highligh_error 3040.7022947736336 iter/sec (stddev: 0.0000021570122078951675) 2841.0684406726436 iter/sec (stddev: 0.000004954958228416619) 0.93

This comment was automatically generated by workflow using github-action-benchmark.

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 1fe9d0a Previous: 0d03d9f Ratio
tests/search/unit/search/test_fetch.py::test_highligh_error 3129.1131912328515 iter/sec (stddev: 0.00000447998368105043) 2841.0684406726436 iter/sec (stddev: 0.000004954958228416619) 0.91

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.