#!/usr/bin/env python3
# SPDX-License-Identifier: AGPL-3.0-or-later
"""
Path Resolver Module
Handles resolution of package:// URIs and relative paths in URDF files.
Supports ROS package paths, environment-based search, and configurable
package maps.
Copyright (c) 2025 Mohamed Aboelnasr
"""
import json
import logging
import os
from pathlib import Path
from typing import Callable, Dict, List, Optional, Union
from urllib.parse import urlparse
from urllib.request import url2pathname
logger = logging.getLogger(__name__)
[docs]
class PackageResolver:
"""
Resolve package:// URIs and relative paths for URDF resources.
Supports multiple resolution strategies:
1. Explicit package map (package_name -> path)
2. ROS package paths (via rospack or ament_index)
3. Environment-based search paths
4. Base path relative resolution
Example:
>>> resolver = PackageResolver()
>>> resolver.add_package("ur_description", "/opt/ros/melodic/share/ur_description")
>>> resolved = resolver.resolve("package://ur_description/meshes/ur5/visual/base.dae")
"/opt/ros/melodic/share/ur_description/meshes/ur5/visual/base.dae"
"""
def __init__(
self,
base_path: Optional[Path] = None,
package_map: Optional[Dict[str, Union[str, Path]]] = None,
search_paths: Optional[List[Union[str, Path]]] = None,
use_ros: bool = True,
) -> None:
"""
Initialize PackageResolver.
Args:
base_path: Base directory for relative path resolution
package_map: Dictionary mapping package names to paths
search_paths: Additional directories to search for packages
use_ros: Whether to use ROS package discovery (if available)
"""
self.base_path = Path(base_path) if base_path else None
self._package_map: Dict[str, Path] = {}
self._search_paths: List[Path] = []
self._use_ros = use_ros
# Add initial package map
if package_map:
for name, path in package_map.items():
self.add_package(name, path)
# Add search paths
if search_paths:
for path in search_paths:
self.add_search_path(path)
# Add paths from environment
self._init_from_environment()
def _init_from_environment(self) -> None:
"""Initialize from environment variables."""
# ROS package paths — only when ROS discovery is enabled, otherwise
# use_ros=False would still leak host ROS packages via search paths.
if self._use_ros:
ros_package_path = os.environ.get("ROS_PACKAGE_PATH", "")
if ros_package_path:
for path_str in ros_package_path.split(os.pathsep):
path = Path(path_str)
if path.exists():
self._search_paths.append(path)
ament_prefix_path = os.environ.get("AMENT_PREFIX_PATH", "")
if ament_prefix_path:
for prefix in ament_prefix_path.split(os.pathsep):
share_path = Path(prefix) / "share"
if share_path.exists():
self._search_paths.append(share_path)
# Custom ManipulaPy package path
manipulapy_package_path = os.environ.get("MANIPULAPY_PACKAGE_PATH", "")
if manipulapy_package_path:
for path_str in manipulapy_package_path.split(os.pathsep):
path = Path(path_str)
if path.exists():
self._search_paths.append(path)
# Optional explicit package map (JSON file path or JSON string)
package_map_env = os.environ.get("MANIPULAPY_PACKAGE_MAP", "")
if package_map_env:
map_data = None
map_path = Path(package_map_env)
if map_path.exists():
try:
map_data = json.loads(map_path.read_text(encoding="utf-8"))
except json.JSONDecodeError as exc:
logger.warning(
f"MANIPULAPY_PACKAGE_MAP file is not valid JSON: {exc}"
)
else:
try:
map_data = json.loads(package_map_env)
except json.JSONDecodeError:
logger.warning(
"MANIPULAPY_PACKAGE_MAP must be a JSON file path or JSON string."
)
if isinstance(map_data, dict):
for name, path in map_data.items():
self.add_package(name, path)
elif map_data is not None:
logger.warning(
"MANIPULAPY_PACKAGE_MAP must be a JSON object mapping package name to path."
)
[docs]
def add_package(self, name: str, path: Union[str, Path]) -> None:
"""
Add a package to the resolver.
Args:
name: Package name
path: Path to package root directory
"""
path = Path(path)
if path.exists():
self._package_map[name] = path
logger.debug(f"Added package '{name}' at {path}")
else:
logger.warning(f"Package path does not exist: {path}")
[docs]
def add_search_path(self, path: Union[str, Path]) -> None:
"""
Add a search path for package discovery.
Args:
path: Directory to search for packages
"""
path = Path(path)
if path.exists() and path not in self._search_paths:
self._search_paths.append(path)
logger.debug(f"Added search path: {path}")
[docs]
def resolve(self, uri: str) -> str:
"""
Resolve a URI to an absolute file path.
Handles:
- package://package_name/path/to/file
- file:///absolute/path
- Relative paths
- Absolute paths
Args:
uri: URI or path to resolve
Returns:
Resolved absolute path (or original if unresolvable)
"""
if not uri:
return uri
# Handle package:// URIs
if uri.startswith("package://"):
return self._resolve_package_uri(uri)
# Handle file:// URIs (use url2pathname so file:///C:/... works on Windows)
if uri.startswith("file://"):
return url2pathname(urlparse(uri).path)
# Handle absolute paths
if Path(uri).is_absolute():
return uri
# Handle relative paths
return self._resolve_relative_path(uri)
def _resolve_package_uri(self, uri: str) -> Optional[str]:
"""Resolve package://pkg/path with ambiguity detection.
Strategy 1 (explicit ``add_package`` mapping) short-circuits — the
documented escape hatch must always win. Strategies 2-5 (search paths,
ROS lookup, base path, ancestor heuristic) collect candidates whose
canonical (symlink-resolved) paths are deduped before the ambiguity
check, so symlinked or case-insensitive workspaces don't trigger false
ambiguity.
"""
if not uri.startswith("package://"):
return uri
rest = uri[len("package://") :]
if not rest:
logger.warning(f"Malformed package URI {uri!r}: missing package name")
return uri
parts = rest.split("/", 1)
if len(parts) < 2 or not parts[0] or not parts[1]:
logger.warning(
f"Malformed package URI {uri!r}: expected 'package://<name>/<path>'"
)
return uri
package_name, relative_path = parts[0], parts[1]
rel_parts = Path(relative_path).parts
if ".." in rel_parts or Path(relative_path).is_absolute():
logger.warning(
f"Refusing to resolve {uri!r}: relative path contains traversal "
"or is absolute"
)
return uri
# Strategy 1: explicit package map (highest precedence — short-circuits
# ambiguity detection so add_package() remains a working escape hatch).
# If the caller pinned this package and the file is missing under the
# pinned root, do NOT fall through to other strategies: silently
# returning a different package would defeat the explicit override.
if package_name in self._package_map:
cand = Path(self._package_map[package_name]) / relative_path
if cand.exists():
return str(cand)
logger.warning(
"Explicit package mapping for %r did not contain %r under %r; "
"refusing to fall back to auto-discovery to honor the override.",
package_name,
relative_path,
str(self._package_map[package_name]),
)
return uri
candidates: List[Path] = []
# Strategy 2: search paths — try both the package-rooted and the flat
# forms (regression: prior code only tried search_path/pkg/relative).
for search_path in self._search_paths:
c1 = Path(search_path) / package_name / relative_path
if c1.exists():
candidates.append(c1)
c2 = Path(search_path) / relative_path
if c2.exists():
candidates.append(c2)
# Strategy 3: ROS package discovery (only when use_ros is enabled).
if self._use_ros:
ros_pkg_root = self._find_ros_package(package_name)
if ros_pkg_root is not None:
c = ros_pkg_root / relative_path
if c.exists():
candidates.append(c)
ros_paths = os.environ.get("ROS_PACKAGE_PATH", "").split(os.pathsep)
for ros_path in ros_paths:
if not ros_path:
continue
c = Path(ros_path) / package_name / relative_path
if c.exists():
candidates.append(c)
# Strategy 4: base path fallback
if self.base_path:
c = Path(self.base_path) / relative_path
if c.exists():
candidates.append(c)
# Strategy 5: ancestor heuristic
for ancestor in [
self.base_path,
self.base_path.parent,
self.base_path.parent.parent,
]:
cand_a = ancestor / package_name / relative_path
if cand_a.exists():
candidates.append(cand_a)
cand_b = ancestor / relative_path
if cand_b.exists():
candidates.append(cand_b)
# Dedup by canonical (symlink/case-resolved) path before ambiguity check.
seen_canonical = set()
unique_paths: List[str] = []
for cand in candidates:
try:
canonical = cand.resolve(strict=True)
except (OSError, RuntimeError):
canonical = cand.absolute()
key = str(canonical)
if key in seen_canonical:
continue
seen_canonical.add(key)
unique_paths.append(str(cand))
if not unique_paths:
logger.warning(f"Package URI {uri!r} could not be resolved (no candidates)")
return uri
if len(unique_paths) == 1:
return unique_paths[0]
logger.warning(
f"Multiple package paths matched for {uri!r}: {sorted(unique_paths)}. "
"Refusing to auto-resolve to avoid the wrong choice. Add explicit "
"package mapping with resolver.add_package(name, path)."
)
return uri
def _resolve_relative_path(self, path: str) -> str:
"""
Resolve a relative path.
Args:
path: Relative path
Returns:
Resolved absolute path
"""
# Try base path first
if self.base_path:
candidate = self.base_path / path
if candidate.exists():
return str(candidate)
# Try search paths
for search_path in self._search_paths:
candidate = search_path / path
if candidate.exists():
return str(candidate)
# Return as-is if not found
return path
def _find_ros_package(self, package_name: str) -> Optional[Path]:
"""
Find a ROS package using rospack or ament_index.
Args:
package_name: Name of the package
Returns:
Path to package or None if not found
"""
# Try ament_index_python (ROS 2)
try:
from ament_index_python.packages import get_package_share_directory
return Path(get_package_share_directory(package_name))
except (ImportError, Exception):
pass
# Try rospkg (ROS 1)
try:
import rospkg
rospack = rospkg.RosPack()
return Path(rospack.get_path(package_name))
except (ImportError, Exception):
pass
# Try catkin_find (ROS 1)
try:
import subprocess
result = subprocess.run(
["catkin_find", package_name],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode == 0 and result.stdout.strip():
return Path(result.stdout.strip().split("\n")[0])
except (FileNotFoundError, subprocess.TimeoutExpired, Exception):
pass
return None
[docs]
def create_handler(self) -> Callable[[str], str]:
"""
Create a filename handler function for the parser.
Returns:
Function that resolves URIs to paths
"""
return self.resolve
[docs]
def list_packages(self) -> Dict[str, Path]:
"""
List all known packages.
Returns:
Dictionary of package names to paths
"""
return dict(self._package_map)
[docs]
def list_search_paths(self) -> List[Path]:
"""
List all search paths.
Returns:
List of search paths
"""
return list(self._search_paths)
[docs]
@classmethod
def for_urdf(cls, urdf_path: Union[str, Path]) -> "PackageResolver":
"""
Create a resolver configured for a specific URDF file.
Automatically sets base_path to the URDF's directory and
adds common relative package locations.
Args:
urdf_path: Path to the URDF file
Returns:
Configured PackageResolver
"""
urdf_path = Path(urdf_path).resolve()
base_path = urdf_path.parent
resolver = cls(base_path=base_path)
# Add parent directories as potential package roots
# Common structures:
# - package/urdf/robot.urdf -> package is 2 levels up
# - package/robots/model.urdf -> package is 2 levels up
for parent in [base_path.parent, base_path.parent.parent]:
if parent.exists():
resolver.add_search_path(parent)
# If it looks like a package directory, add it
if (parent / "package.xml").exists():
resolver.add_package(parent.name, parent)
# Scan more ancestor levels for deeply nested URDFs
for depth in range(1, 4):
if depth < len(urdf_path.parents):
parent = urdf_path.parents[depth]
if parent.exists():
resolver.add_search_path(parent)
if (parent / "package.xml").exists():
resolver.add_package(parent.name, parent)
return resolver