From d0d18017efb97557bb7d47f9fd96695e8c5b7cb2 Mon Sep 17 00:00:00 2001 From: hcwinsemius Date: Fri, 14 Feb 2025 15:38:11 +0100 Subject: [PATCH] 202 prepare water level interpretation (#203) * small mod in cli elements to allow for return of rvec and tvec * Add skeleton for WaterLevel class and corresponding tests #202 * Rename WaterLevel to CrossSection, add geometric functionalities #202 * CrossSection API draft #202 * docs for CrossSection base methods #202 * Add support for using s-coordinates in cross-section methods #202 * version bump and changelog. #202 Resolved issues with cartopy missing in tests * cartopy dependency check added in plot tests * improve test coverage #202 --- CHANGELOG.md | 14 ++ pyorc/__init__.py | 5 +- pyorc/api/__init__.py | 12 +- pyorc/api/cross_section.py | 439 ++++++++++++++++++++++++++++++++++++ pyorc/api/plot.py | 5 +- pyorc/cli/cli_elements.py | 237 +++++++++---------- pyorc/cli/cli_utils.py | 161 +++++++------ pyorc/cv.py | 11 + pyorc/plot_helpers.py | 36 +++ tests/test_cross_section.py | 400 ++++++++++++++++++++++++++++++++ tests/test_plot_helpers.py | 29 +++ tests/test_transect.py | 9 + tests/test_velocimetry.py | 14 +- 13 files changed, 1170 insertions(+), 202 deletions(-) create mode 100644 pyorc/api/cross_section.py create mode 100644 pyorc/plot_helpers.py create mode 100644 tests/test_cross_section.py create mode 100644 tests/test_plot_helpers.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 4cadcbc..f85316d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,17 @@ +## [0.7.2] = 2025-02-14 +### Added +- New class CrossSection. This is to prepare for water level estimation functionalities. + It provides several geometrical operations along a cross section. This is documented yet. + and may change significantly in the near future. + +### Changed +- `cli.cli_utils.get_gcps_optimized_fit` now export rotation and translation vectors also, for + later use in front end functionalities (e.g. show pose of camera interactively). +### Deprecated +### Removed +### Fixed +### Security + ## [0.7.1] - 2024-12-13 ### Added ### Changed diff --git a/pyorc/__init__.py b/pyorc/__init__.py index fb3bb59..308b1bb 100644 --- a/pyorc/__init__.py +++ b/pyorc/__init__.py @@ -1,8 +1,8 @@ """pyorc: free and open-source image-based surface velocity and discharge.""" -__version__ = "0.7.1" +__version__ = "0.7.2" -from .api import CameraConfig, Frames, Transect, Velocimetry, Video, get_camera_config, load_camera_config # noqa +from .api import CameraConfig, Frames, Transect, Velocimetry, Video, CrossSection, get_camera_config, load_camera_config # noqa from .project import * # noqa from . import cli, service # noqa @@ -14,6 +14,7 @@ "Frames", "Velocimetry", "Transect", + "CrossSection", "service", "cli", ] diff --git a/pyorc/api/__init__.py b/pyorc/api/__init__.py index 2fe3cc4..fc7a578 100644 --- a/pyorc/api/__init__.py +++ b/pyorc/api/__init__.py @@ -1,8 +1,11 @@ -from .cameraconfig import CameraConfig, load_camera_config, get_camera_config -from .video import Video +"""API for pyorc.""" + +from .cameraconfig import CameraConfig, get_camera_config, load_camera_config +from .cross_section import CrossSection from .frames import Frames -from .velocimetry import Velocimetry from .transect import Transect +from .velocimetry import Velocimetry +from .video import Video __all__ = [ "CameraConfig", @@ -11,5 +14,6 @@ "Video", "Frames", "Velocimetry", - "Transect" + "CrossSection", + "Transect", ] diff --git a/pyorc/api/cross_section.py b/pyorc/api/cross_section.py new file mode 100644 index 0000000..0f52714 --- /dev/null +++ b/pyorc/api/cross_section.py @@ -0,0 +1,439 @@ +"""WaterLevel module for pyorc.""" + +from __future__ import annotations + +from typing import List, Literal, Optional, Tuple, Union + +import geopandas as gpd +import matplotlib as mpl +import matplotlib.pyplot as plt +import numpy as np +from scipy.interpolate import interp1d +from shapely import affinity, geometry +from shapely.ops import polygonize + +MODES = Literal["camera", "geographic", "cross_section"] + + +class CrossSection: + """Water Level functionality.""" + + def __str__(self): + return str(self) + + def __repr__(self): + return self + + def __init__(self, camera_config, cross_section: Union[gpd.GeoDataFrame, List[List]]): + """Initialize WaterLevel class for establishing estimated water levels.""" + # if cross_section is a GeoDataFrame, check if it has a CRS, if yes, convert coordinates to crs of CameraConfig + if isinstance(cross_section, gpd.GeoDataFrame): + if cross_section.crs is not None and camera_config.crs is not None: + cross_section.to_crs(camera_config.crs, inplace=True) + elif cross_section.crs is not None or camera_config.crs is not None: + raise ValueError("if a CRS is used, then both camera_config and cross_section must have a CRS.") + g = cross_section.geometry + x, y, z = g.x, g.y, g.z + else: + x, y, z = list(map(list, zip(*cross_section, strict=False))) + + x_diff = np.concatenate((np.array([0]), np.diff(x))) + y_diff = np.concatenate((np.array([0]), np.diff(y))) + z_diff = np.concatenate((np.array([0]), np.diff(z))) + # estimate distance from left to right bank + s = np.cumsum((x_diff**2 + y_diff**2) ** 0.5) + + # estimate length coordinates + l = np.cumsum(np.sqrt(x_diff**2 + y_diff**2 + z_diff**2)) + + self.x = x # x-coordinates in local projection or crs + self.y = y # y-coordinates in local projection or crs + self.z = z # z-coordinates in local projection or crs + self.s = s # horizontal distance from left to right bank (only used for interpolation funcs). + self.l = l # length distance (following horizontal and vertical) over cross section from left to right + self.camera_config = camera_config + + @property + def interp_x(self) -> interp1d: + """Linear interpolation function for x-coordinates.""" + return interp1d(self.s, self.x, kind="linear", fill_value="extrapolate") + + @property + def interp_y(self) -> interp1d: + """Linear interpolation function for x-coordinates.""" + return interp1d(self.s, self.y, kind="linear", fill_value="extrapolate") + + @property + def interp_z(self) -> interp1d: + """Linear interpolation function for x-coordinates.""" + return interp1d(self.s, self.z, kind="linear", fill_value="extrapolate") + + @property + def interp_s_from_l(self) -> interp1d: + """Linear interpolation function for s-coordinates, from left to right bank, interpolated from length.""" + return interp1d(self.l, self.s, kind="linear", fill_value="extrapolate") + + + @property + def cs_points(self) -> List[geometry.Point]: + """Cross section as list of shapely.geometry.Point.""" + return [geometry.Point(_x, _y, _z) for _x, _y, _z in zip(self.x, self.y, self.z, strict=False)] + + @property + def cs_points_yz(self) -> List[geometry.Point]: + """Cross section as list of shapely.geometry.Point.""" + return [geometry.Point(_s, _z) for _s, _z in zip(self.s, self.z, strict=False)] + + @property + def cs_linestring(self) -> geometry.LineString: + """Cross section as shapely.geometry.Linestring.""" + return geometry.LineString(self.cs_points) + + @property + def cs_linestring_yz(self) -> geometry.LineString: + """Cross section perpendicular to flow direction (YZ) as shapely.geometry.Linestring.""" + return geometry.LineString(self.cs_points_yz) + + @property + def cs_angle(self): + """Average angle orientation that cross section makes in geographical space. + + Zero means left to right direction, positive means counter-clockwise, negative means clockwise. + """ + point1_xy = self.cs_points[0].coords[0][:-1] + point2_xy = self.cs_points[-1].coords[0][:-1] + diff_xy = np.array(point2_xy) - np.array(point1_xy) + return np.arctan2(diff_xy[1], diff_xy[0]) + + def get_cs_waterlevel(self, h: float, yz=False) -> geometry.LineString: + """Retrieve LineString of water surface at cross section at a given water level. + + Parameters + ---------- + h : float + water level [m] + yz : bool, optional + If set, return water level line in y-z projection, by default False. + + Returns + ------- + geometry.LineString + horizontal line at water level (2d if yz=True, 3d if yz=False) + + """ + # get water level in camera config vertical datum + z = self.camera_config.h_to_z(h) + if yz: + return geometry.LineString(zip(self.s, [z] * len(self.s), strict=False)) + return geometry.LineString(zip(self.x, self.y, [z] * len(self.x), strict=False)) + + def get_csl_point(self, h: Optional[float] = None, s: Optional[float] = None, camera: bool = False) -> List[geometry.Point]: + """Retrieve list of points, where cross section (cs) touches the land (l). + + Multiple points may be found. + + Parameters + ---------- + h : float, optional + water level [m], if provided, s must not be provided. + s : float + coordinate of distance from left to right bank [m], if provided, h must not be provided. + camera : bool, optional + If set, return 2D projected points, by default False. + + Returns + ------- + List[shapely.geometry.Point] + List of points, where water line touches land, can be only one or two points. + + """ + if h is not None and s is not None: + raise ValueError("Only one of h or s can be provided.") + if h is None and s is None: + raise ValueError("One of h or s must be provided.") + # get water level in camera config vertical datum + if s is not None: + if s < 0 or s > self.s[-1]: + raise ValueError( + "Value of s is lower (higher) than the minimum (maximum) value found in the cross section" + ) + cross = [geometry.Point(self.interp_x(s), self.interp_y(s), self.interp_z(s))] + else: + print(h) + print(s) + z = self.camera_config.h_to_z(h) + if z > np.array(self.z).max() or z < np.array(self.z).min(): + raise ValueError( + "Value of water level is lower (higher) than the minimum (maximum) value found in the cross section" + ) + cs_waterlevel = self.get_cs_waterlevel(h, yz=True) + # get crossings and turn into list of geometry.Point + cross_yz = cs_waterlevel.intersection(self.cs_linestring_yz) + + # make cross_yz iterable + if isinstance(cross_yz, geometry.Point): + cross_yz = [cross_yz] + elif isinstance(cross_yz, geometry.MultiPoint): + cross_yz = list(cross_yz.geoms) + else: + raise ValueError( + "Cross section is not crossed by water level. Check if water level is within the cross section." + ) + # find xyz real-world coordinates and turn in to POINT Z list + cross = [geometry.Point(self.interp_x(c.xy[0]), self.interp_y(c.xy[0]), c.xy[1]) for c in cross_yz] + + if camera: + coords = [[p.x, p.y, p.z] for p in cross] + coords_proj = self.camera_config.project_points(coords, swap_y_coords=True) + cross = [geometry.Point(p[0], p[1]) for p in coords_proj] + return cross + + def get_csl_line(self, h: Optional[float] = None, s: Optional[float] = None, length=0.5, offset=0.0, camera: bool = False) -> List[geometry.LineString]: + """Retrieve waterlines over the cross section, perpendicular to the orientation of the cross section. + + Returns a 2D LineString if camera is True, 3D if False + + Parameters + ---------- + h : float + water level [m] + length : float, optional + length of the waterline [m], by default 0.5 + offset : float, optional + perpendicular offset of the waterline from the cross section [m], by default 0.0 + camera : bool, optional + If set, return 2D projected lines, by default False. + + Returns + ------- + List[shapely.geometry.LineString] + List of lines perpendicular to cross section orientation, can be only one or two lines. + + """ + csl_points = self.get_csl_point(h=h, s=s) + z = csl_points[0].z + # z = self.camera_config.h_to_z(h) + # acquire angle perpendicular to cross section + angle_perp = self.cs_angle + np.pi / 2 + # retrieve xyz coordinates of cross-section land crossings + + # move points + csl_points = [ + affinity.translate(p, xoff=np.cos(angle_perp) * offset, yoff=np.sin(angle_perp) * offset) + for p in csl_points + ] + + # make horizontally oriented lines of the required length (these are only xy at this stage) + csl_lines = [ + geometry.LineString([geometry.Point(p.x - length / 2, p.y), geometry.Point(p.x + length / 2, p.y)]) + for p in csl_points + ] + # rotate in counter-clockwise perpendicular direction to the orientation of the cross section itself + csl_lines = [ + affinity.rotate(l, angle_perp, origin=p, use_radians=True) + for l, p in zip(csl_lines, csl_points, strict=False) + ] + + if camera: + # transform to 2D projected lines, make list of lists of coordinates + coords_lines = [[[_x, _y, z] for _x, _y in l.coords] for l in csl_lines] + # project list of lists + coords_lines_proj = [self.camera_config.project_points(cl, swap_y_coords=True) for cl in coords_lines] + # turn list of lists into list of LineString + csl_lines = [geometry.LineString([geometry.Point(_x, _y) for _x, _y in l]) for l in coords_lines_proj] + else: + # add a z-coordinate and return + csl_lines = [geometry.LineString([geometry.Point(_x, _y, z) for _x, _y in l.coords]) for l in csl_lines] + return csl_lines + + def get_csl_pol( + self, + h: Optional[float] = None, + s: Optional[float] = None, + length: float = 0.5, + padding: Tuple[float, float] = (-0.5, 0.5), + offset: float = 0.0, + camera: bool = False, + ) -> List[geometry.Polygon]: + """Get horizontal polygon from cross-section land line towards water or towards land. + + Returns a 2D Polygon if camera is True, 3D if False + + Parameters + ---------- + h : float + water level [m] + length : float, optional + length of the waterline [m], by default 0.5 + padding : Tuple[float, float], optional + amount if distance [m] to extend the polygon beyond the waterline, by default (-0.5, 0.5) + offset : float, optional + perpendicular offset of the waterline from the cross section [m], by default 0.0 + camera : bool, optional + If set, return 2D projected polygons, by default False. + + Returns + ------- + List[shapely.geometry.LineString] + List of lines perpendicular to cross section orientation, can be only one or two lines. + + + """ + # retrieve water line(s) + csl = self.get_csl_line(h=h, s=s, length=length, offset=offset) + if len(padding) != 2: + raise ValueError(f"padding must contain two values (provided: {len(padding)}") + if padding[1] <= padding[0]: + raise ValueError("First value of padding must be smaller than second") + csl_pol_bounds = [ + [ + affinity.translate( + line, xoff=np.cos(self.cs_angle) * padding[0], yoff=np.sin(self.cs_angle) * padding[0] + ), + affinity.translate( + line, xoff=np.cos(self.cs_angle) * padding[1], yoff=np.sin(self.cs_angle) * padding[1] + ), + ] + for line in csl + ] + csl_pol_coords = [ + list(lines[0].coords) + list(lines[1].coords[::-1]) + [lines[0].coords[0]] for lines in csl_pol_bounds + ] + if camera: + csl_pol_coords = [ + self.camera_config.project_points(coords, swap_y_coords=True) for coords in csl_pol_coords + ] + polygons = [geometry.Polygon(coords) for coords in csl_pol_coords] + return polygons + + def get_planar_surface( + self, h: float, length: float = 0.5, offset: float = 0.0, camera: bool = False + ) -> geometry.Polygon: + """Retrieve a planar water surface for a given water level, as a geometry.Polygon. + + Returns a 2D Polygon if camera is True, 3D if False + + Parameters + ---------- + h : float + water level [m] + length : float, optional + length of the waterline [m], by default 0.5 + offset : float, optional + perpendicular offset of the waterline from the cross section [m], by default 0.0 + camera : bool, optional + If set, return 2D projected polygon, by default False. + + Returns + ------- + shapely.geometry.Polygon + rectangular horizontal polygon representing the planar water surface (2d if camera=True, + 3d if camera=False). + + + """ + wls = self.get_csl_line(h=h, offset=offset, length=length, camera=camera) + if len(wls) != 2: + raise ValueError("Amount of water line crossings must be 2 for a planar surface estimate.") + return geometry.Polygon(list(wls[0].coords) + list(wls[1].coords[::-1])) + + def get_wetted_surface_yz(self, h: float) -> geometry.Polygon: + """Retrieve a wetted surface for a given water level, as a geometry.Polygon. + + Parameters + ---------- + h : float + water level [m] + + Returns + ------- + geometry.Polygon + Wetted surface as a polygon, in Y-Z projection. + + """ + wl = self.get_cs_waterlevel(h=h, yz=True) + # create polygon by making a union + pol = list(polygonize(wl.union(self.cs_linestring_yz))) + if len(pol) == 0: + raise ValueError("Water level is not crossed by cross section and therefore undefined.") + elif len(pol) > 1: + raise ValueError("Water level is crossed by multiple polygons.") + else: + pol = pol[0] + return pol + + def get_wetted_surface(self, h: float, camera: bool = False) -> geometry.Polygon: + """Retrieve a wetted surface for a given water level, as a geometry.Polygon. + + Parameters + ---------- + h : float + water level [m] + camera : bool, optional + If set, return 2D projected polygon, by default False. + + Returns + ------- + geometry.Polygon + Wetted surface as a polygon (2d if camera=True, 3d if camera=False). + + + """ + pol = self.get_wetted_surface_yz(h=h) + coords = [[self.interp_x(p[0]), self.interp_y(p[0]), p[1]] for p in pol.exterior.coords] + if camera: + coords_proj = self.camera_config.project_points(coords, swap_y_coords=True) + return geometry.Polygon(coords_proj) + else: + return geometry.Polygon(coords) + + def plot(self, mode=MODES, h: Optional[float] = None, ax: Optional[mpl.axes.Axes] = None) -> mpl.axes.Axes: + """Plot the situation. + + Plotting can be done geographic (planar), from cross section perspective in the camera perspective or 3D. + """ + if not ax: + if mode == "3d": + f, ax = plt.subplots(1, 1, projection="3d") + else: + f, ax = plt.subplots(1, 1) + return ax + + def plot_cs(self, ax=None, camera=False, **kwargs): + """Plot the cross section.""" + if not ax: + ax = plt.axes() + if ax.name == "3d" and not camera: + # map 3d coordinates to x, y, z + x, y, z = zip(*[(c[0], c[1], c[2]) for c in self.cs_linestring.coords], strict=False) + p = ax.plot(x, y, z, **kwargs) + else: + if camera: + # extract pixel locations + pix = self.camera_config.project_points(list(map(list, self.cs_linestring.coords)), swap_y_coords=True) + # map to x and y arrays + x, y = zip(*[(c[0], c[1]) for c in pix], strict=False) + else: + x, y = zip(*[(c[0], c[1]) for c in self.cs_linestring_yz.coords], strict=False) + p = ax.plot(x, y, **kwargs) + return p + + def detect_wl(self, img: np.ndarray, step: float = 0.05): + """Attempt to detect the water line level along the cross section, using a provided pre-treated image.""" + if len(img.shape) == 3: + # flatten image first + img = img.mean(axis=2) + assert ( + img.shape[0] == self.camera_config.height + ), f"Image height {img.shape[0]} is not the same as camera_config height {self.camera_config.height}" + assert ( + img.shape[1] == self.camera_config.width + ), f"Image width {img.shape[1]} is not the same as camera_config width {self.camera_config.width}" + for _ in np.arange(self.z.min(), self.z.max(), step): + # TODO implement detection + pass + + # TODO: do an optimization + z = None + # finally, return water level: + return self.camera_config.h_to_z(z) diff --git a/pyorc/api/plot.py b/pyorc/api/plot.py index 2c44f9d..11c7b1a 100644 --- a/pyorc/api/plot.py +++ b/pyorc/api/plot.py @@ -182,11 +182,12 @@ def get_plot_method( ax.add_patch( plt.Polygon( points, - alpha=0.3, + alpha=0.15, linewidth=2.0, facecolor=LINE_COLOR, # "#00FF88", path_effects=path_effects, edgecolor="w", + zorder=1, ) ) @@ -202,6 +203,7 @@ def get_plot_method( color="w", alpha=0.5, linewidth=2.0, + zorder=1, # path_effects=path_effects ) ax.plot( @@ -211,6 +213,7 @@ def get_plot_method( linewidth=4.0, # path_effects=path_effects, alpha=0.5, + zorder=1, **kwargs_line, ) else: diff --git a/pyorc/cli/cli_elements.py b/pyorc/cli/cli_elements.py index 67d5cfb..3efd48a 100644 --- a/pyorc/cli/cli_elements.py +++ b/pyorc/cli/cli_elements.py @@ -1,38 +1,40 @@ +"""pyorc.cli Interactive plotting elements.""" + try: import cartopy.crs as ccrs import cartopy.io.img_tiles as cimgt + use_cartopy = True -except: +except Exception: use_cartopy = False import logging - import click import matplotlib.pyplot as plt import numpy as np from matplotlib import patheffects from matplotlib.backend_bases import MouseButton -from matplotlib.widgets import Button from matplotlib.patches import Polygon +from matplotlib.widgets import Button from mpl_toolkits.axes_grid1 import Divider, Size -from .. import helpers -from . import cli_utils +from pyorc import helpers +from pyorc.cli import cli_utils path_effects = [ patheffects.Stroke(linewidth=2, foreground="w"), patheffects.Normal(), ] -corner_labels = [ - "upstream-left", - "downstream-left", - "downstream-right", - "upstream-right" -] +corner_labels = ["upstream-left", "downstream-left", "downstream-right", "upstream-right"] + + class BaseSelect: + """Base interactive plot.""" + def __init__(self, img, dst=None, crs=None, buffer=0.0002, zoom_level=19, logger=logging): + """Set up base interactive plot for camera configs.""" self.logger = logger self.height, self.width = img.shape[0:2] if use_cartopy: @@ -49,12 +51,12 @@ def __init__(self, img, dst=None, crs=None, buffer=0.0002, zoom_level=19, logger ymax = np.array(dst)[:, 1].max() extent = [xmin - buffer, xmax + buffer, ymin - buffer, ymax + buffer] if self.crs is not None: - tiler = getattr(cimgt, "GoogleTiles")(style="satellite") - ax_geo = fig.add_axes([0., 0., 1, 1], projection=tiler.crs) + tiler = cimgt.GoogleTiles(style="satellite") + ax_geo = fig.add_axes([0.0, 0.0, 1, 1], projection=tiler.crs) ax_geo.set_extent(extent, crs=ccrs.PlateCarree()) ax_geo.add_image(tiler, zoom_level, zorder=1) else: - ax_geo = fig.add_axes([0., 0., 1, 1]) + ax_geo = fig.add_axes([0.0, 0.0, 1, 1]) ax_geo.set_aspect("equal") plt.tick_params(left=False, right=False, labelleft=False, labelbottom=False, bottom=False) ax_geo.set_visible(False) @@ -66,13 +68,7 @@ def __init__(self, img, dst=None, crs=None, buffer=0.0002, zoom_level=19, logger # fig, ax = plt.subplots() ax.imshow(img) ax.set_title("Left: add point, right: remove point, close: store in .src") - kwargs = dict( - color="w", - markeredgecolor="k", - markersize=10, - zorder=3, - label="Control points" - ) + kwargs = dict(color="w", markeredgecolor="k", markersize=10, zorder=3, label="Control points") kwargs_text = dict( xytext=(6, 6), textcoords="offset points", @@ -87,16 +83,9 @@ def __init__(self, img, dst=None, crs=None, buffer=0.0002, zoom_level=19, logger kwargs["transform"] = ccrs.PlateCarree() transform = ccrs.PlateCarree()._as_mpl_transform(ax_geo) kwargs_text["xycoords"] = transform - self.p_geo = ax_geo.plot( - *list(zip(*dst))[0:2], "o", - **kwargs - ) + self.p_geo = ax_geo.plot(*list(zip(*dst, strict=False))[0:2], "o", **kwargs) for n, _pt in enumerate(dst): - pt = ax_geo.annotate( - n + 1, - xy = _pt[0:2], - **kwargs_text - ) + _ = ax_geo.annotate(n + 1, xy=_pt[0:2], **kwargs_text) self.fig = fig self.ax_geo = ax_geo self.ax = ax # add axes @@ -113,6 +102,7 @@ def __init__(self, img, dst=None, crs=None, buffer=0.0002, zoom_level=19, logger self.dst = dst def add_buttons(self): + """Add interactive buttons for switching axes.""" h = [Size.Fixed(0.5), Size.Fixed(0.6)] v = [Size.Fixed(0.95), Size.Fixed(0.3)] v2 = [Size.Fixed(1.45), Size.Fixed(0.3)] @@ -122,52 +112,50 @@ def add_buttons(self): divider2 = Divider(self.fig, (0, 0, 1, 1), h, v2, aspect=False) divider1 = Divider(self.fig, (0, 0, 1, 1), h, v3, aspect=False) if self.ax_geo is not None: - self.ax_button1 = self.fig.add_axes( - divider1.get_position(), - axes_locator=divider1.new_locator(nx=1, ny=1) - ) - self.button1 = Button(self.ax_button1, 'Camera') + self.ax_button1 = self.fig.add_axes(divider1.get_position(), axes_locator=divider1.new_locator(nx=1, ny=1)) + self.button1 = Button(self.ax_button1, "Camera") self.button1.on_clicked(self.switch_to_ax) - self.ax_button2 = self.fig.add_axes( - divider2.get_position(), - axes_locator=divider2.new_locator(nx=1, ny=1) - ) - self.button2 = Button(self.ax_button2, 'Map') + self.ax_button2 = self.fig.add_axes(divider2.get_position(), axes_locator=divider2.new_locator(nx=1, ny=1)) + self.button2 = Button(self.ax_button2, "Map") self.button2.on_clicked(self.switch_to_ax_geo) - self.ax_button3 = self.fig.add_axes( - divider3.get_position(), - axes_locator=divider3.new_locator(nx=1, ny=1) - ) - self.button3 = Button(self.ax_button3, 'Done') + self.ax_button3 = self.fig.add_axes(divider3.get_position(), axes_locator=divider3.new_locator(nx=1, ny=1)) + self.button3 = Button(self.ax_button3, "Done") self.button3.on_clicked(self.close_window) self.button3.set_active(False) self.button3.label.set_color("gray") def close_window(self, event): + """Define close event of entire window.""" # close window plt.close(self.fig) # exectute close event self.on_close(event) def on_close(self, event): + """Define close event for axes.""" self.ax.figure.canvas.mpl_disconnect(self.press_event) self.ax.figure.canvas.mpl_disconnect(self.release_event) self.ax.figure.canvas.mpl_disconnect(self.close_event) # check if the amount of src points and dst points is equal. If not return error check_length = len(self.src) == self.required_clicks - if not(check_length): - raise click.UsageError(f"Aborting because you have not supplied all {self.required_clicks} ground control points. Only {len(self.src)} points supplied") + if not (check_length): + raise click.UsageError( + f"Aborting because you have not supplied all {self.required_clicks} ground control points. " + f"Only {len(self.src)} points supplied" + ) def on_press(self, event): + """Detect press during click event.""" self.press = True self.move = False def on_move(self, event): - # needed to detect movement and decide if a click if not a drag + """Detect movement during event.""" if self.press: self.move = True def on_click(self, event): + """Define general click event.""" # only if ax is visible and click was within the window if self.ax.get_visible() and event.inaxes == self.ax: if event.button is MouseButton.RIGHT: @@ -186,10 +174,11 @@ def on_click(self, event): self.ax.figure.canvas.draw() def on_left_click(self, event): + """Define left-click event.""" if event.xdata is not None: self.logger.debug(f"Storing coordinate x: {event.xdata} y: {event.ydata} to src") self.src.append([int(np.round(event.xdata)), int(np.round(event.ydata))]) - self.p.set_data(*list(zip(*self.src))) + self.p.set_data(*list(zip(*self.src, strict=False))) pt = self.ax.annotate( len(self.src), xytext=(6, 6), @@ -204,12 +193,14 @@ def on_left_click(self, event): self.pts_t.append(pt) def on_release(self, event): + """Define release click event.""" if self.press and not self.move: self.on_click(event) self.press = False self.move = False def on_right_click(self, event): + """Define right-click event.""" # remove the last added point if len(self.pts_t) > 0: self.logger.debug("Removing last generated point") @@ -218,11 +209,12 @@ def on_right_click(self, event): if len(self.src) > 0: del self.src[-1] if len(self.src) > 0: - self.p.set_data(*list(zip(*self.src))) + self.p.set_data(*list(zip(*self.src, strict=False))) else: self.p.set_data([], []) def switch_to_ax(self, event): + """Switch to camera axes.""" self.fig.canvas.manager.toolbar.home() plt.sca(self.ax) self.ax.set_visible(True) @@ -232,6 +224,7 @@ def switch_to_ax(self, event): # Define function for switching to ax2 def switch_to_ax_geo(self, event): + """Switch to geographic axes.""" self.fig.canvas.manager.toolbar.home() plt.sca(self.ax_geo) self.ax.set_visible(False) @@ -241,11 +234,10 @@ def switch_to_ax_geo(self, event): class AoiSelect(BaseSelect): - """ - Selector tool to provide source GCP coordinates to pyOpenRiverCam - """ + """Selector tool to provide source GCP coordinates to pyOpenRiverCam.""" def __init__(self, img, src, dst, camera_config, logger=logging): + """Set up interactive plot for defining region of interest (AOI).""" if hasattr(camera_config, "crs"): crs = camera_config.crs else: @@ -253,14 +245,8 @@ def __init__(self, img, src, dst, camera_config, logger=logging): super(AoiSelect, self).__init__(img, dst, crs=crs, logger=logger) # make empty plot self.camera_config = camera_config - self.p_gcps, = self.ax.plot( - *list(zip(*src)), - "o", - color="w", - markeredgecolor="k", - markersize=10, - zorder=3, - label="GCPs" + (self.p_gcps,) = self.ax.plot( + *list(zip(*src, strict=False)), "o", color="w", markeredgecolor="k", markersize=10, zorder=3, label="GCPs" ) self.pts_t_gcps = [ self.ax.annotate( @@ -273,11 +259,12 @@ def __init__(self, img, src, dst, camera_config, logger=logging): patheffects.Stroke(linewidth=3, foreground="w"), patheffects.Normal(), ], - ) for n, xy in enumerate(src) + ) + for n, xy in enumerate(src) ] # self.pts_t.append(pt) - self.p, = self.ax.plot([], [], "o", markersize=10, color="c", markeredgecolor="w", zorder=3) + (self.p,) = self.ax.plot([], [], "o", markersize=10, color="c", markeredgecolor="w", zorder=3) kwargs = dict( markersize=10, color="c", @@ -286,15 +273,13 @@ def __init__(self, img, src, dst, camera_config, logger=logging): ) if hasattr(self.camera_config, "crs") and use_cartopy: kwargs["transform"] = ccrs.PlateCarree() - self.p_geo, = self.ax_geo.plot( - [], [], "o", - **kwargs - ) + (self.p_geo,) = self.ax_geo.plot([], [], "o", **kwargs) # plot an empty polygon pol = Polygon(np.zeros((0, 2)), edgecolor="w", alpha=0.5, linewidth=2) if hasattr(self.camera_config, "crs") and use_cartopy: - pol_geo = Polygon(np.zeros((0, 2)), edgecolor="w", alpha=0.5, linewidth=2, transform=ccrs.PlateCarree(), - zorder=3) + pol_geo = Polygon( + np.zeros((0, 2)), edgecolor="w", alpha=0.5, linewidth=2, transform=ccrs.PlateCarree(), zorder=3 + ) else: pol_geo = Polygon(np.zeros((0, 2)), edgecolor="w", alpha=0.5, linewidth=2, zorder=3) self.p_bbox_geo = self.ax_geo.add_patch(pol_geo) @@ -308,7 +293,7 @@ def __init__(self, img, src, dst, camera_config, logger=logging): "downstream:\nUpstream-left\nDownstream-left\nDownstream-right\nUpstream-right", size=12, va="top", - path_effects=path_effects + path_effects=path_effects, ) # # TODO: if no crs provided, then provide a normal axes with equal lengths on x and y axis @@ -318,10 +303,11 @@ def __init__(self, img, src, dst, camera_config, logger=logging): plt.show(block=True) def on_left_click(self, event): + """Define left-click event.""" if event.xdata is not None: self.logger.debug(f"Storing coordinate x: {event.xdata} y: {event.ydata} to src") self.src.append([int(np.round(event.xdata)), int(np.round(event.ydata))]) - self.p.set_data(*list(zip(*self.src))) + self.p.set_data(*list(zip(*self.src, strict=False))) pt = self.ax.annotate( corner_labels[len(self.src) - 1], xytext=(6, 6), @@ -338,22 +324,35 @@ def on_left_click(self, event): if len(self.src) == self.required_clicks: try: self.camera_config.set_bbox_from_corners(self.src) - bbox_cam = list(zip(*self.camera_config.get_bbox(camera=True, expand_exterior=True, within_image=True).exterior.xy)) - bbox_geo = list(zip(*self.camera_config.get_bbox(expand_exterior=False, within_image=True).exterior.xy)) - if hasattr(self.camera_config, "crs") and use_cartopy: - bbox_geo = helpers.xyz_transform( - bbox_geo, - crs_from=self.camera_config.crs, - crs_to=4326 + bbox_cam = list( + zip( + *self.camera_config.get_bbox( + camera=True, expand_exterior=True, within_image=True + ).exterior.xy, + strict=False, + ) + ) + bbox_geo = list( + zip( + *self.camera_config.get_bbox(expand_exterior=False, within_image=True).exterior.xy, + strict=False, ) + ) + if hasattr(self.camera_config, "crs") and use_cartopy: + bbox_geo = helpers.xyz_transform(bbox_geo, crs_from=self.camera_config.crs, crs_to=4326) self.p_bbox.set_xy(bbox_cam) self.p_bbox_geo.set_xy(bbox_geo) self.ax.figure.canvas.draw() - except: - self.title.set_text("Could not resolve bounding box with the set coordinates.\nThe coordinates are likely measured with too low accuracy.\nMeasure with cm accuracy.") + except Exception: + self.title.set_text( + "Could not resolve bounding box with the set coordinates.\nThe coordinates are likely " + "measured with too low accuracy.\nMeasure with cm accuracy." + ) + def on_click(self, event): + """Define general click event.""" super(AoiSelect, self).on_click(event) - if not(len(self.src) == self.required_clicks): + if not (len(self.src) == self.required_clicks): # remove plot if present self.p_bbox.set_xy(np.zeros((0, 2))) self.p_bbox_geo.set_xy(np.zeros((0, 2))) @@ -361,36 +360,22 @@ def on_click(self, event): class GcpSelect(BaseSelect): - """ - Selector tool to provide source GCP coordinates to pyOpenRiverCam - """ + """Selector tool to provide source GCP coordinates to pyOpenRiverCam.""" def __init__(self, img, dst, crs=None, lens_position=None, logger=logging): + """Set up interactive GCP selection plot.""" super(GcpSelect, self).__init__(img, dst, crs=crs, logger=logger) # make empty plot - self.p, = self.ax.plot([], [], "o", color="w", markeredgecolor="k", markersize=10, zorder=3, label="Clicked control points") - kwargs = dict( - color="c", - markeredgecolor="w", - zorder=4, - markersize=10, - label="Selected control points" + (self.p,) = self.ax.plot( + [], [], "o", color="w", markeredgecolor="k", markersize=10, zorder=3, label="Clicked control points" ) + kwargs = dict(color="c", markeredgecolor="w", zorder=4, markersize=10, label="Selected control points") if crs is not None and use_cartopy: kwargs["transform"] = ccrs.PlateCarree() - self.p_geo_selected, = self.ax_geo.plot( - [], [], "o", - **kwargs - ) + (self.p_geo_selected,) = self.ax_geo.plot([], [], "o", **kwargs) if len(dst) >= 4: # plot an empty set of crosses for the fitted gcp row columns after optimization of perspective - self.p_fit, = self.ax.plot( - [], [], "+", - markersize=10, - color="r", - zorder=4, - label="Fitted control_points" - ) + (self.p_fit,) = self.ax.plot([], [], "+", markersize=10, color="r", zorder=4, label="Fitted control_points") else: self.p_fit = None @@ -401,7 +386,7 @@ def __init__(self, img, dst, crs=None, lens_position=None, logger=logging): yloc, "Select location of control points in the right order (check locations on map view)", size=12, - path_effects=path_effects + path_effects=path_effects, ) self.ax_geo.legend() self.ax.legend() @@ -416,60 +401,60 @@ def __init__(self, img, dst, crs=None, lens_position=None, logger=logging): self.dist_coeffs = None def on_left_click(self, event): + """Define left-click event.""" super(GcpSelect, self).on_left_click(event) # figure out if the fitted control points must be computed and plotted if self.p_fit is not None: if len(self.src) == self.required_clicks: self.title.set_text("Fitting pose and camera parameters...") self.ax.figure.canvas.draw() - src_fit, dst_fit, camera_matrix, dist_coeffs, err = cli_utils.get_gcps_optimized_fit( - self.src, - self.dst_crs, - self.height, - self.width, - c=2., - lens_position=self.lens_position + src_fit, dst_fit, camera_matrix, dist_coeffs, rvec, tvec, err = cli_utils.get_gcps_optimized_fit( + self.src, self.dst_crs, self.height, self.width, c=2.0, lens_position=self.lens_position ) - self.p_fit.set_data(*list(zip(*src_fit))) + self.p_fit.set_data(*list(zip(*src_fit, strict=False))) self.camera_matrix = camera_matrix self.dist_coeffs = dist_coeffs - new_text = 'Pose and camera parameters fitted (see "+"), average x, y distance error: {:1.3f} m.'.format(err) + new_text = ( + 'Pose and camera parameters fitted (see "+"), average x, y distance error: {:1.3f} m.'.format(err) + ) if err > 0.1: - new_text += '\nWarning: error is larger than 0.1 meter. Are you sure that the coordinates are measured accurately?' + new_text += ( + "\nWarning: error is larger than 0.1 meter. Are you sure that the coordinates are " + "measured accurately?" + ) self.title.set_text(new_text) self.ax.figure.canvas.draw() else: self.p_fit.set_data([], []) def on_right_click(self, event): + """Define right-click event.""" super(GcpSelect, self).on_right_click(event) if self.p_fit is not None: if len(self.src) < self.required_clicks: self.p_fit.set_data([], []) def on_click(self, event): + """Define general click event.""" super(GcpSelect, self).on_click(event) # update selected dst points - dst_sel = self.dst[:len(self.src)] + dst_sel = self.dst[: len(self.src)] if len(dst_sel) > 0: - self.p_geo_selected.set_data(*list(zip(*dst_sel))[0:2]) + self.p_geo_selected.set_data(*list(zip(*dst_sel, strict=False))[0:2]) else: self.p_geo_selected.set_data([], []) class StabilizeSelect(BaseSelect): + """Stabilization plot.""" + def __init__(self, img, logger=logging): + """Set up stabilization interactive plot.""" super(StabilizeSelect, self).__init__(img, logger=logger) # make empty plot pol = Polygon(np.zeros((0, 2)), edgecolor="w", alpha=0.5, linewidth=2) self.p = self.ax.add_patch(pol) - kwargs = dict( - color="c", - markeredgecolor="w", - zorder=4, - markersize=10, - label="Selected control points" - ) + # kwargs = dict(color="c", markeredgecolor="w", zorder=4, markersize=10, label="Selected control points") xloc = self.ax.get_xlim()[0] + 50 yloc = self.ax.get_ylim()[-1] + 50 self.title = self.ax.text( @@ -478,14 +463,14 @@ def __init__(self, img, logger=logging): "Select a polygon of at least 4 points that encompasses at least the water surface. Areas outside will be " "treated as stable areas for stabilization.", size=12, - path_effects=path_effects + path_effects=path_effects, ) # self.ax.legend() # add dst coords in the intended CRS self.required_clicks = 4 # minimum 4 points needed for a satisfactory ROI - def on_click(self, event): + """Define general click event.""" # only if ax is visible and click was within the window if self.ax.get_visible() and event.inaxes == self.ax: if event.button is MouseButton.RIGHT: @@ -502,8 +487,8 @@ def on_click(self, event): self.ax.figure.canvas.draw() - def on_right_click(self, event): + """Define right-click event.""" if len(self.pts_t) > 0: self.pts_t[-1].remove() del self.pts_t[-1] @@ -516,6 +501,7 @@ def on_right_click(self, event): self.ax.figure.canvas.draw() def on_left_click(self, event): + """Define left-click event.""" if event.xdata is not None: self.logger.debug(f"Storing coordinate x: {event.xdata} y: {event.ydata} to src") self.src.append([int(np.round(event.xdata)), int(np.round(event.ydata))]) @@ -536,6 +522,7 @@ def on_left_click(self, event): self.ax.figure.canvas.draw() def on_close(self, event): + """Define on-close event handler.""" # overrule the amount of required clicks self.required_clicks = len(self.src) super(StabilizeSelect, self).on_close(event) diff --git a/pyorc/cli/cli_utils.py b/pyorc/cli/cli_utils.py index be113bc..56fd368 100644 --- a/pyorc/cli/cli_utils.py +++ b/pyorc/cli/cli_utils.py @@ -1,31 +1,35 @@ -import click -import cv2 -import geopandas as gpd +"""pyorc.cli utilities and interactive views.""" + import hashlib import json import logging +import os + +import click +import cv2 +import geopandas as gpd import matplotlib.pyplot as plt import numpy as np -import os import yaml from shapely.geometry import Point import pyorc.api -from .. import Video, CameraConfig, cv, load_camera_config, helpers -from .cli_elements import GcpSelect, AoiSelect, StabilizeSelect +from pyorc import CameraConfig, Video, cv, helpers, load_camera_config +from pyorc.cli.cli_elements import AoiSelect, GcpSelect, StabilizeSelect def get_corners_interactive( - fn, - gcps, - crs=None, - crs_gcps=None, - frame_sample=0., - camera_matrix=None, - dist_coeffs=None, - rotation=None, - logger=logging, + fn, + gcps, + crs=None, + crs_gcps=None, + frame_sample=0.0, + camera_matrix=None, + dist_coeffs=None, + rotation=None, + logger=logging, ): + """Select AOI corners interactively using first selected video frame.""" vid = Video(fn, start_frame=frame_sample, end_frame=frame_sample + 1, rotation=rotation) # get first frame frame = vid.get_frame(0, method="rgb") @@ -53,15 +57,9 @@ def get_corners_interactive( def get_gcps_interactive( - fn, - dst, - crs=None, - crs_gcps=None, - frame_sample=0, - lens_position=None, - rotation=None, - logger=logging + fn, dst, crs=None, crs_gcps=None, frame_sample=0, lens_position=None, rotation=None, logger=logging ): + """Select GCP points in interactive display using first selected video frame.""" vid = Video(fn, start_frame=frame_sample, end_frame=frame_sample + 1, rotation=rotation) # get first frame frame = vid.get_frame(0, method="rgb") @@ -72,12 +70,8 @@ def get_gcps_interactive( return selector.src, selector.camera_matrix, selector.dist_coeffs -def get_stabilize_pol( - fn, - frame_sample=0, - rotation=None, - logger=logging -): +def get_stabilize_pol(fn, frame_sample=0, rotation=None, logger=logging): + """Select stabilization region in interactive display on first selected frame of provided video file.""" vid = Video(fn, start_frame=frame_sample, end_frame=frame_sample + 1, rotation=rotation) frame = vid.get_frame(0, method="rgb") selector = StabilizeSelect(frame, logger=logger) @@ -86,6 +80,7 @@ def get_stabilize_pol( def get_file_hash(fn): + """Turn file content into a SHA-256 hash for cross-comparison.""" hash256 = hashlib.sha256() with open(fn, "rb") as f: # Read and update hash string value in blocks of 4K @@ -95,7 +90,8 @@ def get_file_hash(fn): return hash256 -def get_gcps_optimized_fit(src, dst, height, width, c=2., lens_position=None): +def get_gcps_optimized_fit(src, dst, height, width, c=2.0, lens_position=None): + """Fit intrinsic and extrinsic parameters on provided set of src and dst points.""" # optimize cam matrix and dist coeffs with provided control points if np.array(dst).shape == (4, 2): _dst = np.c_[np.array(dst), np.zeros(4)] @@ -120,11 +116,12 @@ def get_gcps_optimized_fit(src, dst, height, width, c=2., lens_position=None): src_est, jacobian = cv2.projectPoints(_dst, rvec, tvec, camera_matrix, np.array(dist_coeffs)) src_est = np.array([list(point[0]) for point in src_est]) dst_est = cv.unproject_points(_src, _dst[:, -1], rvec, tvec, camera_matrix, dist_coeffs) - dst_est = np.array(dst_est)[:, 0:len(coord_mean)] + coord_mean - return src_est, dst_est, camera_matrix, dist_coeffs, err + dst_est = np.array(dst_est)[:, 0 : len(coord_mean)] + coord_mean + return src_est, dst_est, camera_matrix, dist_coeffs, rvec, tvec, err def parse_json(ctx, param, value): + """Parse json file.""" if value is None: return None if os.path.isfile(value): @@ -141,43 +138,52 @@ def parse_json(ctx, param, value): def parse_corners(ctx, param, value): + """Parse corner [x, y] coordinates into a list of coordinates.""" if value is None: return None # check if list contains lists of 2 values corners = json.loads(value) - assert(len(corners) == 4), "--corners must contain a list of lists with exactly 4 points" + assert len(corners) == 4, "--corners must contain a list of lists with exactly 4 points" for n, val in enumerate(corners): - assert(isinstance(val, list)), f"--corners value {n} is not a list {val}" - assert(len(val) == 2), f"--corners value {n} must contain row, column coordinate but consists of {len(val)} numbers" + assert isinstance(val, list), f"--corners value {n} is not a list {val}" + assert ( + len(val) == 2 + ), f"--corners value {n} must contain row, column coordinate but consists of {len(val)} numbers" return [[int(x), int(y)] for x, y in corners] def validate_file(ctx, param, value): + """Validate existence of file.""" if value is not None: - if not(os.path.isfile(value)): + if not (os.path.isfile(value)): raise click.FileError(f"{value}") return value def validate_dir(ctx, param, value): - if not(os.path.isdir(value)): + """Validate existence of directory and create if needed.""" + if not (os.path.isdir(value)): os.makedirs(value) return value def validate_rotation(ctx, param, value): + """Validate rotation value within list of possible values.""" if value is not None: - if not(value in [90, 180, 270, None]): - raise click.UsageError(f"Rotation value must be either 90, 180 or 270") + if value not in [90, 180, 270, None]: + raise click.UsageError("Rotation value must be either 90, 180 or 270") return value def parse_camconfig(ctx, param, camconfig_file): - """ - Read and validate cam config file + """Read and validate cam config file. Parameters ---------- + ctx : click context + context + param : Any + parameter (not used) camconfig_file : str, file containing camera configuration @@ -195,11 +201,14 @@ def parse_camconfig(ctx, param, camconfig_file): def parse_recipe(ctx, param, recipe_file): - """ - Read and validate entire recipe from top to bottom, add compulsory classes where needed + """Read and validate entire recipe from top to bottom, add compulsory classes where needed. Parameters ---------- + ctx : click context + context + param : Any + parameter (not used) recipe_file : str, file containing .yml formatted recipe @@ -207,6 +216,7 @@ def parse_recipe(ctx, param, recipe_file): ------- recipe : dict, dictionary with entire recipe for running processing + """ with open(recipe_file, "r") as f: body = f.read() @@ -216,26 +226,32 @@ def parse_recipe(ctx, param, recipe_file): def parse_src(ctx, param, value): + """Parse source (column, row) coordinates.""" value = parse_json(ctx, param, value) if value is not None: # check if at least 4 of 2 - assert(len(value)>=4), "--src must contain a list of lists [column, row] with at least 4 points" + assert len(value) >= 4, "--src must contain a list of lists [column, row] with at least 4 points" for n, val in enumerate(value): - assert(isinstance(val, list)), f"--src value {n} is not a list {val}" - assert(len(val) == 2), f"--src value {n} must contain row, column coordinate but consists of {len(val)} numbers" + assert isinstance(val, list), f"--src value {n} is not a list {val}" + assert ( + len(val) == 2 + ), f"--src value {n} must contain row, column coordinate but consists of {len(val)} numbers" return value + def parse_dst(ctx, param, value): + """Parse destination (real-world) coordinates.""" value = parse_json(ctx, param, value) value = validate_dst(value) return value def parse_str_num(ctx, param, value): + """Parse strings to numbers.""" if value is not None: try: float(value) - except: + except Exception: return value if value.isnumeric(): return int(value) @@ -244,8 +260,9 @@ def parse_str_num(ctx, param, value): def read_shape(fn=None, geojson=None): + """Read shapefile.""" if fn is None and geojson is None: - raise click.UsageError(f"Either fn or geojson must be provided") + raise click.UsageError("Either fn or geojson must be provided") if geojson: if "crs" in geojson: crs = geojson["crs"]["properties"]["name"] @@ -255,18 +272,19 @@ def read_shape(fn=None, geojson=None): else: gdf = gpd.read_file(fn) # check if all geometries are points - assert(all([isinstance(geom, Point) for geom in gdf.geometry])), f'shapefile may only contain geometries of type ' \ - f'"Point"' + assert all([isinstance(geom, Point) for geom in gdf.geometry]), ( + "shapefile may only contain geometries of type " '"Point"' + ) # use the first point to check if points are 2d or 3d if gdf.geometry[0].has_z: coords = [[p.x, p.y, p.z] for p in gdf.geometry] else: coords = [[p.x, p.y] for p in gdf.geometry] - if not(hasattr(gdf, "crs")): - click.echo(f"shapefile or geojson does not contain CRS, assuming CRS is the same as camera config CRS") + if not (hasattr(gdf, "crs")): + click.echo("shapefile or geojson does not contain CRS, assuming CRS is the same as camera config CRS") crs = None elif gdf.crs is None: - click.echo(f"shapefile or geojson does not contain CRS, assuming CRS is the same as camera config CRS") + click.echo("shapefile or geojson does not contain CRS, assuming CRS is the same as camera config CRS") crs = None else: crs = gdf.crs.to_wkt() @@ -274,30 +292,40 @@ def read_shape(fn=None, geojson=None): def validate_dst(value): + """Validate destination (real-world) coordinates.""" if value is not None: if len(value) in [2, 4]: # assume [x, y] pairs are provided len_points = 2 elif len(value) < 6: - raise click.UsageError(f"--dst must contain exactly 2 or 4 with [x, y], or at least 6 with [x, y, z] points, contains {len(value)}.") + raise click.UsageError( + f"--dst must contain exactly 2 or 4 with [x, y], or at least 6 with [x, y, z] points, " + f"contains {len(value)}." + ) else: len_points = 3 for n, val in enumerate(value): - assert(isinstance(val, list)), f"--dst value {n} is not a list {val}" - assert(len(val) == len_points), f"--dst value {n} must contain 3 coordinates (x, y, z) but consists of {len(val)} numbers, value is {val}" + assert isinstance(val, list), f"--dst value {n} is not a list {val}" + assert len(val) == len_points, ( + f"--dst value {n} must contain 3 coordinates (x, y, z) " + f"but consists of {len(val)} numbers, value is {val}" + ) return value def validate_recipe(recipe): + """Validate recipe.""" valid_classes = ["video", "frames", "velocimetry", "mask", "transect", "plot"] # allowed classes required_classes = ["video", "frames", "velocimetry"] # mandatory classes (if not present, these are added) check_args = { "video": "video", "frames": "frames", # "velocimetry": "frames" - } # check if arguments to underlying methods called by recipe section are available and get valid arguments - skip_checks = ["plot"] # these sections are not checked for valid inputs - process_methods = ["write"] # methods that are specifically needed within process steps and not part of pyorc class methods + } # check if arguments to underlying methods called by recipe section are available and get valid arguments + # skip_checks = ["plot"] # these sections are not checked for valid inputs + process_methods = [ + "write" + ] # methods that are specifically needed within process steps and not part of pyorc class methods for k in recipe: # main section if k not in valid_classes: raise ValueError(f"key '{k}' is not allowed, must be one of {valid_classes}") @@ -310,21 +338,26 @@ def validate_recipe(recipe): if m not in process_methods and k in check_args: # get the subclass that is called within the section of interest cls = getattr(pyorc.api, check_args[k].capitalize()) - if (not hasattr(cls, m)): + if not hasattr(cls, m): raise ValueError(f"Class '{check_args[k].capitalize()}' does not have a method or property '{m}'") method = getattr(cls, m) # find valid args to method - if hasattr(method, "__call__"): + if callable(method): if k in check_args: # check if kwargs is contained in method if "kwargs" in method.__code__.co_varnames: valid_args = None else: - valid_args = method.__code__.co_varnames[:method.__code__.co_argcount] # get input args to method + valid_args = method.__code__.co_varnames[ + : method.__code__.co_argcount + ] # get input args to method if valid_args: for arg in recipe[k][m]: - if not arg in valid_args: - raise ValueError(f"Method '{check_args[k].capitalize()}.{m}' does not have input argument '{arg}', must be one of {valid_args}") + if arg not in valid_args: + raise ValueError( + f"Method '{check_args[k].capitalize()}.{m}' does not have input " + f"argument '{arg}', must be one of {valid_args}" + ) # add empty dicts for missing but compulsory classes for _c in required_classes: if _c not in recipe: diff --git a/pyorc/cv.py b/pyorc/cv.py index 979dd6b..a8ec806 100644 --- a/pyorc/cv.py +++ b/pyorc/cv.py @@ -1077,6 +1077,17 @@ def get_aoi(dst_corners, resolution=None): return bbox +def get_polygon_pixels(img, pol, reverse_y=True): + """Get pixel intensities within a polygon.""" + polygon_coords = list(pol.exterior.coords) + mask = np.zeros_like(img, dtype=np.uint8) + cv2.fillPoly(mask, np.array([polygon_coords], np.int32), color=255) + if reverse_y: + return np.flipud(img)[mask==255] + return img[mask==255] + + + def undistort_img(img, camera_matrix, dist_coeffs): """Correct lens distortion of image based on lens characteristics. diff --git a/pyorc/plot_helpers.py b/pyorc/plot_helpers.py new file mode 100644 index 0000000..6e30c48 --- /dev/null +++ b/pyorc/plot_helpers.py @@ -0,0 +1,36 @@ +"""pyorc plot helpers functions.""" + +import matplotlib.pyplot as plt +from mpl_toolkits.mplot3d.art3d import Poly3DCollection + + +def plot_3d_polygon(polygon, ax=None, **kwargs): + """Plot a shapely.geometry.Polygon on matplotlib 3d ax.""" + x, y, z = zip(*polygon.exterior.coords, strict=False) + verts = [list(zip(x, y, z, strict=False))] + + if ax is None: + ax = plt.axes(projection="3d") + poly = Poly3DCollection(verts, **kwargs) + # p = ax.plot_trisurf(x, y, z, **kwargs) + p = ax.add_collection3d(poly) + return p + + +def plot_polygon(polygon, ax=None, **kwargs): + """Plot a shapely.geometry.Polygon on matplotlib ax.""" + # x, y = zip(*polygon.exterior.coords) + if ax is None: + ax = plt.axes() + patch = plt.Polygon(polygon.exterior.coords, **kwargs) + p = ax.add_patch(patch) + return p + + +def plot_line(line, ax=None, **kwargs): + """Plot a shapely.geometry.LineString on matplotlib ax.""" + if ax is None: + ax = plt.axes() + x, y = zip(*line.coords, strict=False) + p = ax.plot(x, y, **kwargs) + return p diff --git a/tests/test_cross_section.py b/tests/test_cross_section.py new file mode 100644 index 0000000..6bf04fd --- /dev/null +++ b/tests/test_cross_section.py @@ -0,0 +1,400 @@ +"""Tests for water level functionalities.""" + +import geopandas as gpd +import matplotlib.pyplot as plt +import numpy as np +import pytest +from pyproj import CRS +from shapely import geometry, wkt + +from pyorc import CameraConfig, CrossSection, plot_helpers + + +@pytest.fixture() +def zs(): + return [ + 152.754, + 152.436, + 152.124, + 151.65, + 151.171, + 150.959, + 150.689, + 150.215, + 150.227, + 150.204, + 150.148, + 150.181, + 150.114, + 150.14, + 150.096, + 150.207, + 150.474, + 150.684, + 150.931, + 151.136, + 151.558, + 151.943, + 152.711, + 153.016, + ] + + +@pytest.fixture() +def xs(): + return [ + 5.913483043333334, + 5.91350165, + 5.913509225, + 5.913517873333333, + 5.913526728333333, + 5.913537678333333, + 5.913544631666667, + 5.913551016666665, + 5.91356275, + 5.913577963333334, + 5.913591855, + 5.913605991666667, + 5.91362158, + 5.91362959, + 5.913639568333333, + 5.913647405, + 5.913650936666666, + 5.91365698, + 5.913666071666667, + 5.913672016666667, + 5.913678495, + 5.91368494, + 5.913693873333334, + 5.913725518333333, + ] + + +@pytest.fixture() +def ys(): + return [ + 50.807081403333335, + 50.80708851833334, + 50.80709163333333, + 50.807093645, + 50.807096580000014, + 50.807099555, + 50.807102958333346, + 50.80710621, + 50.80710916, + 50.807112763333336, + 50.80711691833334, + 50.807121985, + 50.80712629833334, + 50.807129086666656, + 50.807132803333324, + 50.80713549666667, + 50.807136676666666, + 50.807138608333325, + 50.80714141666667, + 50.80714368666667, + 50.80714608333333, + 50.80714834333333, + 50.80715788, + 50.807162983333335, + ] + + +@pytest.fixture() +def crs(): + """Sample CRS for Netherlands.""" + return CRS.from_user_input(28992) + + +@pytest.fixture() +def gdf(xs, ys, zs): + """Sample cross section GeoDataFrame for Geul river (real data).""" + geometry = gpd.points_from_xy(xs, ys, zs) + crs = CRS.from_user_input(4326) # latlon + return gpd.GeoDataFrame({"id": np.arange(len(xs))}, geometry=geometry, crs=crs) + + +@pytest.fixture() +def xyz(gdf, crs): + """Sample cross section xyz list for Geul river (real data). Must be returned in CameraConfig set up.""" + gdf.to_crs(crs, inplace=True) + g = gdf.geometry + x, y, z = g.x, g.y, g.z + return list(map(list, zip(x, y, z, strict=False))) + + +@pytest.fixture() +def camera_config(): + camera_config = { + "height": 1080, + "width": 1920, + "crs": CRS.from_user_input(28992), + "resolution": 0.01, + "gcps": { + "src": [[158, 314], [418, 245], [655, 162], [948, 98], [1587, 321], [1465, 747]], + "dst": [ + [192102.50255553858, 313157.5882846481, 150.831], + [192101.3882378415, 313160.1101843005, 150.717], + [192099.77023223988, 313163.2868999007, 150.807], + [192096.8922817797, 313169.2557434712, 150.621], + [192105.2958125107, 313172.0257530752, 150.616], + [192110.35620407888, 313162.5371485311, 150.758], + ], + "h_ref": 92.45, + "z_0": 150.49, + }, + "window_size": 64, + "is_nadir": False, + "camera_matrix": [[1750.3084716796875, 0.0, 960.0], [0.0, 1750.3084716796875, 540.0], [0.0, 0.0, 1.0]], + "dist_coeffs": [[-0.48456448702008914], [0.44089348828121366], [0.0], [0.0], [0.0]], + "bbox": wkt.loads( + "POLYGON ((192102.55970673775 313154.1397356759, 192098.0727491934 313163.2664060433, 192108.81475944887" + " 313168.5475153654, 192113.3017169932 313159.420844998, 192102.55970673775 313154.1397356759))" + ), + } + return CameraConfig(**camera_config) + + +@pytest.fixture() +def cs(xyz, camera_config): + return CrossSection(camera_config=camera_config, cross_section=xyz) + + +def test_init_water_level(xyz, camera_config): + cs = CrossSection(camera_config=camera_config, cross_section=xyz) + assert isinstance(cs, CrossSection) + + +def test_init_water_level_from_gdf(gdf, camera_config): + cs = CrossSection(camera_config=camera_config, cross_section=gdf) + assert isinstance(cs, CrossSection) + + # get coordinates + + +def test_get_csl_point(cs): + h1 = 92.5 + h2 = 93.0 + # both should get two points back + cross1 = cs.get_csl_point(h=h1) + cross2 = cs.get_csl_point(h=h2) + # ax = plt.axes(projection="3d") + # + # for cross in cross1: + # ax.plot(*cross.coords[0], "bo", label="cross 1") + # for cross in cross2: + # ax.plot(*cross.coords[0], "ro", label="cross 2") + # cs.plot_cs(ax=ax, marker=".", color="c") + # ax.legend() + # plt.show() + + +def test_get_csl_point_camera(cs): + h1 = 92.5 + h2 = 93.0 + # both should get two points back + cross1 = cs.get_csl_point(h=h1, camera=True) + cross2 = cs.get_csl_point(h=h2, camera=True) + # ax = plt.axes() + # + # for cross in cross1: + # ax.plot(*cross.coords[0], "bo") + # for cross in cross2: + # ax.plot(*cross.coords[0], "ro") + # cs.plot_cs(ax=ax, camera=True) + # ax.axis("equal") + # ax.set_xlim([0, cs.camera_config.width]) + # ax.set_ylim([0, cs.camera_config.height]) + # plt.show() + + +def test_get_csl_point_s(cs): + s1 = 5.0 + s2 = 8.0 + # both should get two points back + cross1 = cs.get_csl_point(s=s1) + cross2 = cs.get_csl_point(s=s2) + # ax = plt.axes(projection="3d") + # + # for cross in cross1: + # ax.plot(*cross.coords[0], "bo", label="cross 1") + # for cross in cross2: + # ax.plot(*cross.coords[0], "ro", label="cross 2") + # cs.plot_cs(ax=ax, marker=".", color="c") + # ax.legend() + # plt.show() + + + +def test_get_csl_line(cs): + h1 = 92.5 + h2 = 93.0 + + cross1 = cs.get_csl_line(h=h1, offset=0.0, length=4) + cross2 = cs.get_csl_line(h=h2, offset=0.0, length=4) + assert len(cross1) == 2 + assert len(cross2) == 2 + + # ax = plt.axes(projection="3d") + # for cross in cross1: + # ax.plot(*cross.xy, cs.camera_config.h_to_z(h1), "b-o", label="cross 1") + # for cross in cross2: + # ax.plot(*cross.xy, cs.camera_config.h_to_z(h2), "r-o", label="cross 2") + # cs.plot_cs(ax=ax, marker=".", color="c") + # ax.axis("equal") + # ax.legend() + # plt.show() + +def test_get_csl_line_s(cs): + s1 = 5.0 + s2 = 8.0 + + cross1 = cs.get_csl_line(s=s1, offset=0.0, length=4) + cross2 = cs.get_csl_line(s=s2, offset=0.0, length=4) + assert len(cross1) == 1 + assert len(cross2) == 1 + + # ax = plt.axes(projection="3d") + # for cross in cross1: + # ax.plot(*cross.xy, cross.coords[0][-1], "b-o", label="cross 1") + # for cross in cross2: + # ax.plot(*cross.xy, cross.coords[0][-1], "r-o", label="cross 2") + # cs.plot_cs(ax=ax, marker=".", color="c") + # ax.axis("equal") + # ax.legend() + # plt.show() + + +def test_get_csl_camera(cs): + h1 = 92.5 + + cross1 = cs.get_csl_line(h=h1, offset=2.0, camera=True) + cross2 = cs.get_csl_line(h=h1, offset=0.0, camera=True) + assert len(cross1) == 2 + assert len(cross2) == 2 + # ax = plt.axes() + # for cross in cross1: + # ax.plot(*cross.xy, "b-o", label="cross 1") + # for cross in cross2: + # ax.plot(*cross.xy, "r-o", label="cross 2") + # cs.plot_cs(ax=ax, camera=True, marker=".", color="c") + # ax.axis("equal") + # ax.set_xlim([0, cs.camera_config.width]) + # ax.set_ylim([0, cs.camera_config.height]) + # ax.legend() + # plt.show() + + +def test_get_csl_line_above_first_bank(cs): + h = 94.9 # this level is above one of the banks and should return only one line + cross = cs.get_csl_line(h=h) + # check length + assert len(cross) == 1 + + +def test_get_csl_pol(cs): + h1 = 92.5 + pols1 = cs.get_csl_pol(h=h1, offset=0.0, padding=(-2, 0), length=4.0) + pols2 = cs.get_csl_pol(h=h1, offset=0.0, padding=(0, 2), length=4.0) + # + # ax = plt.axes(projection="3d") + # cs.plot_cs(ax=ax, marker=".", color="c", label="cross section") + # p1_1, p_1_2 = [plot_helpers.plot_3d_polygon(pol, ax=ax, alpha=0.3, label="h=92.5", color="r") for pol in pols1] + # p2_1, p_2_2 = [plot_helpers.plot_3d_polygon(pol, ax=ax, alpha=0.3, label="h=93.0", color="g") for pol in pols2] + # ax.axis("equal") + # ax.legend() + # plt.show() + +def test_get_csl_pol(cs): + s1 = 5.0 + pols1 = cs.get_csl_pol(s=s1, offset=0.0, padding=(-2, 0), length=4.0) + pols2 = cs.get_csl_pol(s=s1, offset=0.0, padding=(0, 2), length=4.0) + + # ax = plt.axes(projection="3d") + # cs.plot_cs(ax=ax, marker=".", color="c", label="cross section") + # p1 = [plot_helpers.plot_3d_polygon(pol, ax=ax, alpha=0.3, label="h=92.5", color="r") for pol in pols1] + # p2 = [plot_helpers.plot_3d_polygon(pol, ax=ax, alpha=0.3, label="h=93.0", color="g") for pol in pols2] + # ax.axis("equal") + # ax.legend() + # plt.show() + + +def test_get_csl_pol_camera(cs): + h1 = 92.5 + pols1 = cs.get_csl_pol(h=h1, offset=0.0, padding=(-2, 0), length=4.0, camera=True) + pols2 = cs.get_csl_pol(h=h1, offset=0.0, padding=(0, 2), length=4.0, camera=True) + + # ax = plt.axes() + # cs.plot_cs(ax=ax, marker=".", color="c", label="cross section", camera=True) + # p1_1, p_1_2 = [plot_helpers.plot_polygon(pol, ax=ax, alpha=0.3, label="h=92.5", color="r") for pol in pols1] + # p2_1, p_2_2 = [plot_helpers.plot_polygon(pol, ax=ax, alpha=0.3, label="h=93.0", color="g") for pol in pols2] + # ax.axis("equal") + # ax.set_xlabel("Camera pixel column [-]") + # ax.set_ylabel("Camera pixel row [-]") + # ax.set_xlim([0, cs.camera_config.width]) + # ax.set_ylim([0, cs.camera_config.height]) + # ax.legend() + # plt.show() + + +def test_get_planar_surface(cs): + h1 = 92.5 + h2 = 93.0 + h3 = 94.9 + pol1 = cs.get_planar_surface(h=h1, length=10.0) + pol2 = cs.get_planar_surface(h=h2, length=5, offset=2.5) + with pytest.raises(ValueError, match="must be 2 for"): + cs.get_planar_surface(h=h3) + # ax = plt.axes(projection="3d") + # cs.plot_cs(ax=ax, marker=".", color="c", label="cross section") + # + # _ = plot_helpers.plot_3d_polygon(pol1, ax=ax, alpha=0.3, label="h=92.5", color="r") + # _ = plot_helpers.plot_3d_polygon(pol2, ax=ax, alpha=0.3, label="h=93.0", color="g") + # ax.axis("equal") + # ax.legend() + # plt.show() + + +def test_get_planar_surface_camera(cs): + h1 = 92.5 + h2 = 93.0 + pol1 = cs.get_planar_surface(h=h1, length=10.0, camera=True) + pol2 = cs.get_planar_surface(h=h2, length=5, offset=2.5, camera=True) + # ax = plt.axes() + # _ = plot_helpers.plot_polygon(pol1, ax=ax, alpha=0.3, label="h=92.5", color="r") + # _ = plot_helpers.plot_polygon(pol2, ax=ax, alpha=0.3, label="h=93.0", color="g") + # ax.axis("equal") + # cs.plot_cs(ax=ax, camera=True, marker=".", color="c", label="cross section") + # ax.set_xlim([0, cs.camera_config.width]) + # ax.set_ylim([0, cs.camera_config.height]) + # ax.legend() + # plt.show() + + +def test_get_wetted_surface(cs): + h1 = 92.5 + h2 = 93.0 + h3 = 94.9 + pol1 = cs.get_wetted_surface(h=h1) + pol2 = cs.get_wetted_surface(h=h2) + assert isinstance(pol1, geometry.Polygon) + assert pol1.has_z + assert pol2.has_z + + with pytest.raises(ValueError, match="Water level is not crossed"): + cs.get_wetted_surface(h=h3) + # + # ax = plt.axes(projection="3d") + # cs.plot_cs(ax=ax, marker=".", color="c", label="cross section") + # _ = plot_helpers.plot_3d_polygon(pol1, ax=ax, alpha=0.3, label="h=92.5", color="r") + # _ = plot_helpers.plot_3d_polygon(pol2, ax=ax, alpha=0.3, label="h=93.0", color="g") + # ax.axis("equal") + # ax.legend() + # plt.show() + +def test_plot_cs(cs): + ax = plt.axes() + cs.plot_cs(ax=ax, camera=True, marker=".", color="c", label="cross section") + ax.set_xlim([0, cs.camera_config.width]) + ax.set_ylim([0, cs.camera_config.height]) + assert ax.has_data() \ No newline at end of file diff --git a/tests/test_plot_helpers.py b/tests/test_plot_helpers.py new file mode 100644 index 0000000..4cc8384 --- /dev/null +++ b/tests/test_plot_helpers.py @@ -0,0 +1,29 @@ +import matplotlib.pyplot as plt +import numpy as np +import pytest +from mpl_toolkits.mplot3d.art3d import Poly3DCollection +from pyorc.plot_helpers import plot_3d_polygon +from shapely.geometry import Polygon + + +@pytest.fixture +def sample_polygon(): + # Create a simple 3D polygon + return Polygon([(0, 0, 0), (1, 0, 1), (1, 1, 2), (0, 1, 0), (0, 0, 0)]) + + +def test_plot_3d_polygon_creates_polycollection(sample_polygon): + ax = plt.figure().add_subplot(projection="3d") + result = plot_3d_polygon(sample_polygon, ax=ax) + assert isinstance(result, Poly3DCollection), "Returned object is not a Poly3DCollection" + + +def test_plot_3d_polygon_no_ax(sample_polygon): + result = plot_3d_polygon(sample_polygon) + assert isinstance(result, Poly3DCollection), "Returned object is not a Poly3DCollection when ax is None" + + +def test_plot_3d_polygon_custom_kwargs(sample_polygon): + ax = plt.figure().add_subplot(projection="3d") + result = plot_3d_polygon(sample_polygon, ax=ax, alpha=0.5, edgecolor='r') + assert np.allclose(result.get_edgecolor()[0], np.array([1.0, 0.0, 0.0, 0.5])), "Edge color not set correctly" diff --git a/tests/test_transect.py b/tests/test_transect.py index 22281d0..fc109f3 100644 --- a/tests/test_transect.py +++ b/tests/test_transect.py @@ -1,6 +1,13 @@ import numpy as np import pytest +try: + import cartopy # Try to import cartopy + + CARTOPY_AVAILABLE = True +except ImportError: + CARTOPY_AVAILABLE = False + def test_get_river_flow(piv_transect): # fill method is already tested in get_q, so choose default only @@ -48,5 +55,7 @@ def test_get_wetted_perspective(piv_transect): ], ) def test_plot(piv_transect, mode, method): + if mode == "geographical" and not CARTOPY_AVAILABLE: + pytest.importorskip("cartopy", "Cartopy is required for geographical plotting") piv_transect.transect.get_q() piv_transect.isel(quantile=2).transect.plot(method=method, mode=mode, add_text=True) diff --git a/tests/test_velocimetry.py b/tests/test_velocimetry.py index 2730bd6..bc3c4a0 100644 --- a/tests/test_velocimetry.py +++ b/tests/test_velocimetry.py @@ -2,6 +2,12 @@ import numpy as np import pytest +try: + import cartopy # Try to import cartopy + + CARTOPY_AVAILABLE = True +except ImportError: + CARTOPY_AVAILABLE = False @pytest.mark.parametrize(("distance", "nr_points"), [(None, 36), (0.1, 50), (0.3, 17)]) def test_get_transect(piv, cross_section, distance, nr_points): @@ -23,13 +29,9 @@ def test_get_transect(piv, cross_section, distance, nr_points): ], ) def test_plot(piv, mode, method): + if mode == "geographical" and not CARTOPY_AVAILABLE: + pytest.importorskip("cartopy", "Cartopy is required for geographical plotting") plot = True - if mode == "geographical": - try: - pass - except ImportError: - print("Cartopy is missing, skipping cartopy dependent test") - plot = False if method == "streamplot": if mode != "local": # skipping the test, because streamplot only works local