Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance and refactor URIs resolving logic #11

Merged
merged 20 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ jobs:

- name: Run the Python tests
shell: bash -l {0}
run: pytest
run: pytest --capture=no

# This test requires the conda-forge::icub-models,
# robotology::ergocub-software and
Expand All @@ -53,3 +53,13 @@ jobs:
resolve-robotics-uri-py package://ergoCub/robots/ergoCubSN000/model.urdf
resolve-robotics-uri-py package://moveit_resources_panda_description/urdf/panda.urdf
! resolve-robotics-uri-py package://this/file/does/not/exist
- name: Check command line helper (Ubuntu and macOS)
if: startsWith(matrix.os, 'ubuntu') || startsWith(matrix.os, 'macos')
shell: bash -l {0}
run: |
mkdir -p /tmp/folder/ && touch "/tmp/folder/file with spaces.txt"
mkdir -p /tmp/folder/ && touch "/tmp/folder/file_without_spaces.txt"
resolve-robotics-uri-py "file:///tmp/folder/file_without_spaces.txt"
resolve-robotics-uri-py "file:///tmp/folder/file with spaces.txt"
resolve-robotics-uri-py "file:/tmp/folder/file_without_spaces.txt"
resolve-robotics-uri-py "file:/tmp/folder/file with spaces.txt"
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,4 @@ all =

[tool:pytest]
addopts = -rsxX -v --strict-markers
testpaths = tests
testpaths = test
213 changes: 165 additions & 48 deletions src/resolve_robotics_uri_py/resolve_robotics_uri_py.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,80 +2,197 @@
import os
import pathlib
import sys
from typing import List
import warnings
from typing import Iterable

# =====================
# URI resolving helpers
# =====================

# Supported URI schemes
SupportedSchemes = {"file", "package", "model"}

# Environment variables in the search path.
#
# * https://github.com/robotology/idyntree/issues/291
# * https://github.com/gazebosim/sdformat/issues/1234
#
# AMENT_PREFIX_PATH is the only "special" as we need to add
# "share" after each value, see https://github.com/stack-of-tasks/pinocchio/issues/1520
#
# This list specify the origin of each env variable:
#
# * AMENT_PREFIX_PATH: Used in ROS2
# * GAZEBO_MODEL_PATH: Used in Gazebo Classic
# * GZ_SIM_RESOURCE_PATH: Used in Gazebo Sim >= 7
# * IGN_GAZEBO_RESOURCE_PATH: Used in Ignition Gazebo <= 7
# * ROS_PACKAGE_PATH: Used in ROS1
# * SDF_PATH: Used in sdformat
#
SupportedEnvVars = {
"AMENT_PREFIX_PATH",
"GAZEBO_MODEL_PATH",
"GZ_SIM_RESOURCE_PATH",
"IGN_GAZEBO_RESOURCE_PATH",
"ROS_PACKAGE_PATH",
"SDF_PATH",
}


# Function inspired from https://github.com/ami-iit/robot-log-visualizer/pull/51
def get_search_paths_from_envs(env_list):
return [
def get_search_paths_from_envs(env_list: Iterable[str]) -> list[pathlib.Path]:
# Read the searched paths from all the environment variables
search_paths = [
pathlib.Path(f) if (env != "AMENT_PREFIX_PATH") else pathlib.Path(f) / "share"
for env in env_list if os.getenv(env) is not None
for env in env_list
if os.getenv(env) is not None
for f in os.getenv(env).split(os.pathsep)
]

def pathlist_list_to_string(path_list):
return ' '.join(str(path) for path in path_list)
# Resolve and remove duplicate paths
search_paths = list({path.resolve() for path in search_paths})

# Keep only existing paths
existing_search_paths = [path for path in search_paths if path.is_dir()]

# Notify the user of non-existing paths
if len(set(search_paths) - set(existing_search_paths)) > 0:
msg = "resolve-robotics-uri-py: Ignoring non-existing paths from env vars: {}."
warnings.warn(
msg.format(
pathlist_list_to_string(set(search_paths) - set(existing_search_paths))
)
)

return existing_search_paths


def pathlist_list_to_string(path_list: Iterable[str | pathlib.Path]) -> str:
return " ".join(str(path) for path in path_list)


# ===================
# URI resolving logic
# ===================


def resolve_robotics_uri(uri: str) -> pathlib.Path:
# List of environment variables to consider, see:
# * https://github.com/robotology/idyntree/issues/291
# * https://github.com/gazebosim/sdformat/issues/1234
# AMENT_PREFIX_PATH is the only "special" as we need to add
# "share" after each value, see https://github.com/stack-of-tasks/pinocchio/issues/1520
# This list specify the origin of each env variable:
# * GAZEBO_MODEL_PATH: Used in Gazebo Classic
# * ROS_PACKAGE_PATH: Used in ROS1
# * AMENT_PREFIX_PATH: Used in ROS2
# * SDF_PATH: Used in sdformat
# * IGN_GAZEBO_RESOURCE_PATH: Used in Ignition Gazebo <= 7
# * GZ_SIM_RESOURCE_PATH: Used in Gazebo Sim >= 7
env_list = ["GAZEBO_MODEL_PATH", "ROS_PACKAGE_PATH", "AMENT_PREFIX_PATH", "SDF_PATH", "IGN_GAZEBO_RESOURCE_PATH", "GZ_SIM_RESOURCE_PATH"]

# Preliminary step: if there is no scheme, we just consider this a path and we return it as it is
if "://" not in uri:
return pathlib.Path(uri)
"""
Resolve a robotics URI to an absolute filename.

Args:
uri: The URI to resolve.

Returns:
The absolute filename corresponding to the URI.

Raises:
FileNotFoundError: If no file corresponding to the URI is found.
"""

# If the URI has no scheme, use by default file:// which maps the resolved input
# path to a URI with empty authority
if not any(uri.startswith(scheme) for scheme in SupportedSchemes):
uri = f"file://{pathlib.Path(uri).resolve()}"

# ================================================
# Process file:/ separately from the other schemes
# ================================================

# This is the file URI scheme as per RFC8089:
# https://datatracker.ietf.org/doc/html/rfc8089

if uri.startswith("file:"):
# Strip the scheme from the URI
uri = uri.replace(f"file://", "")
uri = uri.replace(f"file:", "")

# Create the file path, resolving symlinks and '..'
uri_file_path = pathlib.Path(uri).resolve()

# Check that the file exists
if not uri_file_path.is_file():
msg = "resolve-robotics-uri-py: No file corresponding to URI '{}' found"
raise FileNotFoundError(msg.format(uri))

return uri_file_path.resolve()

# =========================
# Process the other schemes
# =========================

# Get scheme from URI
from urllib.parse import urlparse

# Parse the URI
parsed_uri = urlparse(uri)

# We only support at the moment:
# file:// scheme: to pass a file path directly
# package:// : ROS-style package URI
# model:// : SDF-style model URI
if parsed_uri.scheme not in ["file", "package", "model"]:
raise FileNotFoundError(f"Passed URI \"{uri}\" use non-supported scheme {parsed_uri.scheme}")
# We only support the following URI schemes at the moment:
#
# * file:/ to pass an absolute file path directly
# * model:// SDF-style model URI
# * package:// ROS-style package URI
#
if parsed_uri.scheme not in SupportedSchemes:
msg = "resolve-robotics-uri-py: Passed URI '{}' use non-supported scheme '{}'"
raise FileNotFoundError(msg.format(uri, parsed_uri.scheme))

# Strip the scheme from the URI
uri_path = uri
uri_path = uri_path.replace(f"{parsed_uri.scheme}://", "")

# List of matching resources found
model_filenames = []

if parsed_uri.scheme == "file":
model_filenames.append(uri.replace("file:/", ""))
# Search the resource in the path from the env variables
for folder in set(get_search_paths_from_envs(SupportedEnvVars)):

# Join the folder from environment variable and the URI path
candidate_file_name = folder / uri_path

# Expand or resolve the file path (symlinks and ..)
candidate_file_name = candidate_file_name.resolve()

if not candidate_file_name.is_file():
continue

if parsed_uri.scheme == "package" or parsed_uri.scheme == "model":
uri_path = uri.replace(f"{parsed_uri.scheme}://","")
for folder in get_search_paths_from_envs(env_list):
candidate_file_name = folder / pathlib.Path(uri_path)
if (candidate_file_name.is_file()):
if candidate_file_name not in model_filenames:
model_filenames.append(candidate_file_name)
# Skip if the file is already in the list
if candidate_file_name not in model_filenames:
model_filenames.append(candidate_file_name)

if model_filenames:
if (len(model_filenames) > 1):
warnings.warn(f"resolve-robotics-uri-py: Multiple files ({pathlist_list_to_string(model_filenames)}) found for uri \"{uri}\", returning the first one.")
return pathlib.Path(model_filenames[0])
if len(model_filenames) == 0:
msg = "resolve-robotics-uri-py: No file corresponding to URI '{}' found"
raise FileNotFoundError(msg.format(uri))

if len(model_filenames) > 1:
msg = "resolve-robotics-uri-py: "
msg += "Multiple files ({}) found for URI '{}', returning the first one."
warnings.warn(msg.format(pathlist_list_to_string(model_filenames), uri))

if len(model_filenames) >= 1:
assert model_filenames[0].exists()
return pathlib.Path(model_filenames[0]).resolve()

# If no file was found raise error
raise FileNotFoundError(f"resolve-robotics-uri-py: No file corresponding to uri \"{uri}\" found")

def main():
parser = argparse.ArgumentParser(description="Utility resolve a robotics URI (file://, model://, package://) to an absolute filename.")
parser.add_argument("uri", metavar="uri", type=str, help="URI to resolve")
parser = argparse.ArgumentParser(
description="Utility resolve a robotics URI ({}) to an absolute filename.".format(
", ".join(f"{scheme}://" for scheme in SupportedSchemes)
)
)
parser.add_argument("uri", metavar="URI", type=str, help="URI to resolve")

args = parser.parse_args()
result = resolve_robotics_uri(args.uri)

print(result)
try:
result = resolve_robotics_uri(args.uri)
except FileNotFoundError as e:
print(e, file=sys.stderr)
sys.exit(1)

print(result, file=sys.stdout)
sys.exit(0)


if __name__ == "__main__":
main()
Loading
Loading