Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable image backend and color channel format to be selectable #1246

Merged
merged 3 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
(<https://github.com/openvinotoolkit/datumaro/pull/1174>)
- Add ImportError to catch GitPython import error
(<https://github.com/openvinotoolkit/datumaro/pull/1174>)
- Enable image backend and color channel format to be selectable
(<https://github.com/openvinotoolkit/datumaro/pull/1246>)

### Bug fixes
- Modify the draw function in the visualizer not to raise an error for unsupported annotation types.
Expand Down
119 changes: 70 additions & 49 deletions src/datumaro/util/image.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
# Copyright (C) 2019-2023 Intel Corporation
# Copyright (C) 2019-2024 Intel Corporation
#
# SPDX-License-Identifier: MIT
from __future__ import annotations

import importlib
import os
import os.path as osp
import shlex
Expand All @@ -26,20 +25,21 @@
DTypeLike = Any


class _IMAGE_BACKENDS(Enum):
class ImageBackend(Enum):
cv2 = auto()
PIL = auto()


_IMAGE_BACKEND: ContextVar[_IMAGE_BACKENDS] = ContextVar("_IMAGE_BACKENDS")
IMAGE_BACKEND: ContextVar[ImageBackend] = ContextVar("IMAGE_BACKEND")
_image_loading_errors = (FileNotFoundError,)
try:
importlib.import_module("cv2")
_IMAGE_BACKEND.set(_IMAGE_BACKENDS.cv2)
import cv2

IMAGE_BACKEND.set(ImageBackend.cv2)
except ModuleNotFoundError:
import PIL

_IMAGE_BACKEND.set(_IMAGE_BACKENDS.PIL)
IMAGE_BACKEND.set(ImageBackend.PIL)

Check warning on line 42 in src/datumaro/util/image.py

View check run for this annotation

Codecov / codecov/patch

src/datumaro/util/image.py#L42

Added line #L42 was not covered by tests
_image_loading_errors = (*_image_loading_errors, PIL.UnidentifiedImageError)

from datumaro.util.image_cache import ImageCache
Expand All @@ -49,63 +49,91 @@
from PIL.Image import Image as PILImage


class ImageColorScale(Enum):
"""Image color scale
class ImageColorChannel(Enum):
"""Image color channel

- UNCHANGED: Use the original image's scale (default)
- COLOR: Use 3 channels (it can ignore the alpha channel or convert the gray scale image to BGR)
- UNCHANGED: Use the original image's channel (default)
- COLOR_BGR: Use BGR 3 channels (it can ignore the alpha channel or convert the gray scale image)
- COLOR_RGB: Use RGB 3 channels (it can ignore the alpha channel or convert the gray scale image)
"""

UNCHANGED = 0
COLOR = 1
COLOR_BGR = 1
COLOR_RGB = 2

def convert_cv2(self, img: np.ndarray) -> np.ndarray:
import cv2
def decode_by_cv2(self, image_bytes: bytes) -> np.ndarray:
"""Convert image color channel for OpenCV image (np.ndarray)."""
image_buffer = np.frombuffer(image_bytes, dtype=np.uint8)

if self == ImageColorChannel.UNCHANGED:
return cv2.imdecode(image_buffer, cv2.IMREAD_UNCHANGED)

img = cv2.imdecode(image_buffer, cv2.IMREAD_COLOR)

if self == ImageColorChannel.COLOR_BGR:
return img

if self == ImageColorChannel.COLOR_RGB:
return cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

raise ValueError

Check warning on line 79 in src/datumaro/util/image.py

View check run for this annotation

Codecov / codecov/patch

src/datumaro/util/image.py#L79

Added line #L79 was not covered by tests

def decode_by_pil(self, image_bytes: bytes) -> PILImage:
"""Convert image color channel for PIL Image."""
from PIL import Image

if self == ImageColorScale.COLOR:
return cv2.imdecode(img, cv2.IMREAD_COLOR)
img = Image.open(BytesIO(image_bytes))

return cv2.imdecode(img, cv2.IMREAD_UNCHANGED)
if self == ImageColorChannel.UNCHANGED:
return img

def convert_pil(self, img: PILImage) -> PILImage:
if self == ImageColorScale.COLOR:
if self == ImageColorChannel.COLOR_BGR:
return Image.fromarray(np.flip(np.asarray(img.convert("RGB")), -1))

if self == ImageColorChannel.COLOR_RGB:
return img.convert("RGB")

return img
raise ValueError

Check warning on line 96 in src/datumaro/util/image.py

View check run for this annotation

Codecov / codecov/patch

src/datumaro/util/image.py#L96

Added line #L96 was not covered by tests


IMAGE_COLOR_SCALE: ContextVar[ImageColorScale] = ContextVar(
"IMAGE_COLOR_SCALE", default=ImageColorScale.UNCHANGED
IMAGE_COLOR_CHANNEL: ContextVar[ImageColorChannel] = ContextVar(
"IMAGE_COLOR_CHANNEL", default=ImageColorChannel.UNCHANGED
)


@contextmanager
def decode_image_context(color_scale: ImageColorScale):
"""Change Datumaro image decoding color scale.
def decode_image_context(image_backend: ImageBackend, image_color_channel: ImageColorChannel):
"""Change Datumaro image color channel while decoding.

For model training, it is recommended to use this context manager
to load images in the BGR 3-channel format. For example,

.. code-block:: python

import datumaro as dm
with decode_image_context(ImageColorScale.COLOR):
with decode_image_context(image_backend=ImageBackend.cv2, image_color_channel=ImageColorScale.COLOR):
item: dm.DatasetItem
img_data = item.media_as(dm.Image).data
assert img_data.shape[-1] == 3 # It should be a 3-channel image
"""
curr_ctx = IMAGE_COLOR_SCALE.get()
IMAGE_COLOR_SCALE.set(color_scale)

curr_ctx = (IMAGE_BACKEND.get(), IMAGE_COLOR_CHANNEL.get())

IMAGE_BACKEND.set(image_backend)
IMAGE_COLOR_CHANNEL.set(image_color_channel)

yield
IMAGE_COLOR_SCALE.set(curr_ctx)

IMAGE_BACKEND.set(curr_ctx[0])
IMAGE_COLOR_CHANNEL.set(curr_ctx[1])


def load_image(path: str, dtype: DTypeLike = np.uint8, crypter: Crypter = NULL_CRYPTER):
"""
Reads an image in the HWC Grayscale/BGR(A) [0; 255] format (default dtype is uint8).
"""

if _IMAGE_BACKEND.get() == _IMAGE_BACKENDS.cv2:
if IMAGE_BACKEND.get() == ImageBackend.cv2:
# cv2.imread does not support paths that are not representable
# in the locale encoding on Windows, so we read the image bytes
# ourselves.
Expand All @@ -114,13 +142,13 @@
image_bytes = crypter.decrypt(f.read())

return decode_image(image_bytes, dtype=dtype)
elif _IMAGE_BACKEND.get() == _IMAGE_BACKENDS.PIL:
elif IMAGE_BACKEND.get() == ImageBackend.PIL:
with open(path, "rb") as f:
image_bytes = crypter.decrypt(f.read())

return decode_image(image_bytes, dtype=dtype)

raise NotImplementedError(_IMAGE_BACKEND)
raise NotImplementedError(IMAGE_BACKEND)


def copyto_image(
Expand Down Expand Up @@ -175,10 +203,10 @@
# NOTE: OpenCV documentation says "If the image format is not supported,
# the image will be converted to 8-bit unsigned and saved that way".
# Conversion from np.int32 to np.uint8 is not working properly
backend = _IMAGE_BACKEND.get()
backend = IMAGE_BACKEND.get()
if dtype == np.int32:
backend = _IMAGE_BACKENDS.PIL
if backend == _IMAGE_BACKENDS.cv2:
backend = ImageBackend.PIL
if backend == ImageBackend.cv2:
# cv2.imwrite does not support paths that are not representable
# in the locale encoding on Windows, so we write the image bytes
# ourselves.
Expand All @@ -190,7 +218,7 @@
f.write(crypter.encrypt(image_bytes))
else:
dst.write(crypter.encrypt(image_bytes))
elif backend == _IMAGE_BACKENDS.PIL:
elif backend == ImageBackend.PIL:
from PIL import Image

if ext.startswith("."):
Expand All @@ -217,7 +245,7 @@
if not kwargs:
kwargs = {}

if _IMAGE_BACKEND.get() == _IMAGE_BACKENDS.cv2:
if IMAGE_BACKEND.get() == ImageBackend.cv2:
import cv2

params = []
Expand All @@ -233,7 +261,7 @@
if not success:
raise Exception("Failed to encode image to '%s' format" % (ext))
return result.tobytes()
elif _IMAGE_BACKEND.get() == _IMAGE_BACKENDS.PIL:
elif IMAGE_BACKEND.get() == ImageBackend.PIL:
from PIL import Image

if ext.startswith("."):
Expand All @@ -256,21 +284,14 @@


def decode_image(image_bytes: bytes, dtype: DTypeLike = np.uint8) -> np.ndarray:
ctx_color_scale = IMAGE_COLOR_SCALE.get()
ctx_color_scale = IMAGE_COLOR_CHANNEL.get()

if _IMAGE_BACKEND.get() == _IMAGE_BACKENDS.cv2:
image = np.frombuffer(image_bytes, dtype=np.uint8)
image = ctx_color_scale.convert_cv2(image)
if IMAGE_BACKEND.get() == ImageBackend.cv2:
image = ctx_color_scale.decode_by_cv2(image_bytes)
image = image.astype(dtype)
elif _IMAGE_BACKEND.get() == _IMAGE_BACKENDS.PIL:
from PIL import Image

image = Image.open(BytesIO(image_bytes))
image = ctx_color_scale.convert_pil(image)
elif IMAGE_BACKEND.get() == ImageBackend.PIL:
image = ctx_color_scale.decode_by_pil(image_bytes)
image = np.asarray(image, dtype=dtype)
if len(image.shape) == 3 and image.shape[2] in {3, 4}:
image = np.array(image) # Release read-only
image[:, :, :3] = image[:, :, 2::-1] # RGB to BGR
else:
raise NotImplementedError()

Expand Down
10 changes: 5 additions & 5 deletions tests/unit/test_crypter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from datumaro.components.crypter import NULL_CRYPTER, Crypter
from datumaro.components.media import Image
from datumaro.util.image import _IMAGE_BACKEND, _IMAGE_BACKENDS
from datumaro.util.image import IMAGE_BACKEND, ImageBackend


@pytest.fixture(scope="class")
Expand All @@ -36,12 +36,12 @@ def fxt_encrypted_image_file(test_dir, fxt_image_file, fxt_crypter):


class CrypterTest:
@pytest.fixture(scope="class", params=[_IMAGE_BACKENDS.cv2, _IMAGE_BACKENDS.PIL], autouse=True)
@pytest.fixture(scope="class", params=[ImageBackend.cv2, ImageBackend.PIL], autouse=True)
def fxt_image_backend(self, request):
curr_backend = _IMAGE_BACKEND.get()
_IMAGE_BACKEND.set(request.param)
curr_backend = IMAGE_BACKEND.get()
IMAGE_BACKEND.set(request.param)
yield
_IMAGE_BACKEND.set(curr_backend)
IMAGE_BACKEND.set(curr_backend)

def test_load_encrypted_image(self, fxt_image_file, fxt_encrypted_image_file, fxt_crypter):
img = Image.from_file(path=fxt_image_file)
Expand Down
Loading
Loading