diff --git a/scripts/webdav_flow_probe.py b/scripts/webdav_flow_probe.py new file mode 100644 index 0000000..f630d27 --- /dev/null +++ b/scripts/webdav_flow_probe.py @@ -0,0 +1,199 @@ +from __future__ import annotations + +import argparse +import sys +import traceback +from collections.abc import Callable, Iterable +from dataclasses import dataclass + +from portkeydrop.protocols import ConnectionInfo, Protocol, RemoteFile, WebDAVClient + + +@dataclass(frozen=True) +class WebDAVProbeTarget: + name: str + host: str + username: str + password: str + port: int = 443 + paths: tuple[str, ...] = () + + +@dataclass(frozen=True) +class ProbeStep: + label: str + ok: bool + message: str + + +@dataclass(frozen=True) +class ProbeResult: + target: WebDAVProbeTarget + steps: tuple[ProbeStep, ...] + + @property + def ok(self) -> bool: + return all(step.ok for step in self.steps) + + +ClientFactory = Callable[[ConnectionInfo], WebDAVClient] + + +DEFAULT_TARGETS: tuple[WebDAVProbeTarget, ...] = ( + WebDAVProbeTarget( + name="dlp-public", + host="https://www.dlp-test.com/webdav/", + username=r"www.dlp-test.com\WebDAV", + password="WebDAV", + ), + WebDAVProbeTarget( + name="dlp-private", + host="https://www.dlp-test.com/webdav_private/", + username=r"www.dlp-test.com\WebDAV", + password="WebDAV", + ), +) + + +def _format_listing(files: list[RemoteFile]) -> str: + if not files: + return "0 items" + dirs = sum(1 for item in files if item.is_dir) + file_count = len(files) - dirs + sample = ", ".join(item.name for item in files[:5]) + return f"{len(files)} items ({dirs} dirs, {file_count} files): {sample}" + + +def _error_message(exc: Exception) -> str: + return f"{type(exc).__name__}: {exc}" + + +def run_target( + target: WebDAVProbeTarget, + *, + client_factory: ClientFactory = WebDAVClient, +) -> ProbeResult: + info = ConnectionInfo( + protocol=Protocol.WEBDAV, + host=target.host, + port=target.port, + username=target.username, + password=target.password, + timeout=30, + ) + client = client_factory(info) + steps: list[ProbeStep] = [] + + try: + client.connect() + steps.append(ProbeStep("connect", True, "connected")) + + files = client.list_dir() + steps.append(ProbeStep("refresh /", True, _format_listing(files))) + + for path in _unique_paths(target.paths): + try: + client.chdir(path) + steps.append(ProbeStep(f"open {path}", True, f"cwd={client.cwd}")) + except Exception as exc: + steps.append(ProbeStep(f"open {path}", False, _error_message(exc))) + continue + + try: + files = client.list_dir() + steps.append(ProbeStep(f"refresh {client.cwd}", True, _format_listing(files))) + except Exception as exc: + steps.append(ProbeStep(f"refresh {client.cwd}", False, _error_message(exc))) + except Exception as exc: + steps.append(ProbeStep("connect", False, _error_message(exc))) + steps.append(ProbeStep("traceback", False, traceback.format_exc().strip())) + finally: + try: + client.disconnect() + except Exception as exc: + steps.append(ProbeStep("disconnect", False, _error_message(exc))) + + return ProbeResult(target=target, steps=tuple(steps)) + + +def _unique_paths(paths: Iterable[str]) -> tuple[str, ...]: + seen: set[str] = set() + unique: list[str] = [] + for path in paths: + normalized = path.strip() + if not normalized or normalized in seen: + continue + seen.add(normalized) + unique.append(normalized) + return tuple(unique) + + +def _select_targets( + names: Iterable[str], extra_paths: tuple[str, ...] +) -> tuple[WebDAVProbeTarget, ...]: + requested = set(names) + if not requested or "all" in requested: + selected = DEFAULT_TARGETS + else: + by_name = {target.name: target for target in DEFAULT_TARGETS} + unknown = requested - set(by_name) + if unknown: + raise SystemExit(f"Unknown target(s): {', '.join(sorted(unknown))}") + selected = tuple(by_name[name] for name in requested) + if not extra_paths: + return selected + return tuple( + WebDAVProbeTarget( + name=target.name, + host=target.host, + username=target.username, + password=target.password, + port=target.port, + paths=(*target.paths, *extra_paths), + ) + for target in selected + ) + + +def format_result(result: ProbeResult) -> str: + lines = [f"{result.target.name} {result.target.host}"] + for step in result.steps: + marker = "PASS" if step.ok else "FAIL" + lines.append(f" [{marker}] {step.label}: {step.message}") + return "\n".join(lines) + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description=( + "Probe public WebDAV servers through PortkeyDrop's WebDAVClient navigation flow." + ) + ) + parser.add_argument( + "--target", + action="append", + default=[], + help="Target name to probe: dlp-public, dlp-private, or all. May be repeated.", + ) + parser.add_argument( + "--path", + action="append", + default=[], + help="Extra remote directory path to open with the same chdir/list flow PKD uses.", + ) + return parser + + +def main(argv: list[str] | None = None) -> int: + args = build_parser().parse_args(argv) + targets = _select_targets(args.target, tuple(args.path)) + results = [run_target(target) for target in targets] + for index, result in enumerate(results): + if index: + print() + print(format_result(result)) + return 0 if all(result.ok for result in results) else 1 + + +if __name__ == "__main__": + raise SystemExit(main(sys.argv[1:])) diff --git a/src/portkeydrop/protocols.py b/src/portkeydrop/protocols.py index 9a98a23..b042516 100644 --- a/src/portkeydrop/protocols.py +++ b/src/portkeydrop/protocols.py @@ -486,6 +486,34 @@ def _build_hostname(self) -> str: netloc = f"{userinfo}@{netloc}" return urlunsplit((parts.scheme or "https", netloc, parts.path, parts.query, "")) + def _url_base_path(self) -> str: + raw_host = self._info.host.strip() + if "://" not in raw_host: + scheme = "http" if self._info.effective_port == 80 else "https" + raw_host = f"{scheme}://{raw_host}" + return urlsplit(raw_host).path.rstrip("/") + + def _to_app_path(self, path: str) -> str: + if not path: + return "/" + if not path.startswith("/"): + path = f"/{path}" + base_path = self._url_base_path() + if ( + base_path + and base_path != "/" + and (path == base_path or path.startswith(f"{base_path}/")) + ): + path = path[len(base_path) :] or "/" + return path if path.startswith("/") else f"/{path}" + + @staticmethod + def _same_collection(left: str, right: str) -> bool: + def normalize(path: str) -> str: + return path.rstrip("/") or "/" + + return normalize(left) == normalize(right) + def _resolve_path(self, path: str = ".") -> str: if not path or path == ".": return self._cwd @@ -534,17 +562,22 @@ def _remote_file_from_info(self, info: dict, fallback_path: str = "") -> RemoteF modified = self._parse_modified( info.get("modified") or info.get("modified_at") or info.get("lastmodified") ) - if path and not path.startswith("/"): - path = f"/{path}" + path = self._to_app_path(path) return RemoteFile(name=name, path=path, size=size, is_dir=is_dir, modified=modified) def list_dir(self, path: str = ".") -> list[RemoteFile]: client = self._ensure_connected() target = self._resolve_path(path) items = client.list(target, get_info=True) - return [self._remote_file_from_info(item) for item in items if isinstance(item, dict)] + files = [self._remote_file_from_info(item) for item in items if isinstance(item, dict)] + return [ + item for item in files if not (item.is_dir and self._same_collection(item.path, target)) + ] def chdir(self, path: str) -> str: + if self._same_collection(self._resolve_path(path), "/"): + self._cwd = "/" + return self._cwd remote = self.stat(self._resolve_path(path)) if not remote.is_dir: raise NotADirectoryError(path) diff --git a/tests/test_protocols.py b/tests/test_protocols.py index 5f9e762..bf5a082 100644 --- a/tests/test_protocols.py +++ b/tests/test_protocols.py @@ -325,6 +325,52 @@ def test_list_dir_maps_webdav_items(self, monkeypatch): assert files[1].size == 12 assert files[1].is_dir is False + def test_list_dir_strips_webdav_url_base_path_and_skips_current_collection(self, monkeypatch): + webdav = MagicMock() + webdav.list.return_value = [ + { + "path": "/webdav/", + "name": "webdav", + "isdir": True, + }, + { + "path": "/webdav/docs/", + "name": "docs", + "isdir": True, + }, + { + "path": "/webdav/readme.txt", + "name": "readme.txt", + "isdir": False, + "size": "12", + }, + ] + self._install_fake_webdav(monkeypatch, webdav) + client = WebDAVClient( + ConnectionInfo(protocol=Protocol.WEBDAV, host="https://dav.example.com/webdav/") + ) + client.connect() + + files = client.list_dir("/") + + assert files == [ + RemoteFile(name="docs", path="/docs/", size=0, is_dir=True), + RemoteFile(name="readme.txt", path="/readme.txt", size=12, is_dir=False), + ] + + def test_chdir_webdav_root_does_not_require_stat(self, monkeypatch): + webdav = MagicMock() + webdav.list.return_value = [] + self._install_fake_webdav(monkeypatch, webdav) + client = WebDAVClient( + ConnectionInfo(protocol=Protocol.WEBDAV, host="https://dav.example.com/webdav/") + ) + client.connect() + + assert client.chdir("/") == "/" + assert client.cwd == "/" + webdav.info.assert_not_called() + def test_stat_maps_webdav_info(self, monkeypatch): webdav = MagicMock() webdav.list.return_value = [] diff --git a/tests/test_webdav_flow_probe.py b/tests/test_webdav_flow_probe.py new file mode 100644 index 0000000..05283d3 --- /dev/null +++ b/tests/test_webdav_flow_probe.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +from scripts.webdav_flow_probe import DEFAULT_TARGETS, WebDAVProbeTarget, run_target +from portkeydrop.protocols import Protocol, RemoteFile + + +class FakeWebDAVClient: + instances: list["FakeWebDAVClient"] = [] + + def __init__(self, info) -> None: + self.info = info + self.connected = False + self.cwd = "/" + self.calls: list[tuple[str, str | None]] = [] + FakeWebDAVClient.instances.append(self) + + def connect(self) -> None: + self.calls.append(("connect", None)) + self.connected = True + + def disconnect(self) -> None: + self.calls.append(("disconnect", None)) + self.connected = False + + def list_dir(self, path: str = ".") -> list[RemoteFile]: + self.calls.append(("list_dir", path)) + if self.cwd == "/docs/": + return [RemoteFile(name="guide.txt", path="/docs/guide.txt", size=5)] + return [ + RemoteFile(name="docs", path="/docs/", is_dir=True), + RemoteFile(name="readme.txt", path="/readme.txt", size=12), + ] + + def chdir(self, path: str) -> str: + self.calls.append(("chdir", path)) + if path != "/docs/": + raise NotADirectoryError(path) + self.cwd = "/docs/" + return self.cwd + + +def test_default_targets_cover_dlp_public_and_private() -> None: + assert [target.name for target in DEFAULT_TARGETS] == ["dlp-public", "dlp-private"] + assert DEFAULT_TARGETS[0].host == "https://www.dlp-test.com/webdav/" + assert DEFAULT_TARGETS[1].host == "https://www.dlp-test.com/webdav_private/" + assert DEFAULT_TARGETS[0].username == r"www.dlp-test.com\WebDAV" + assert DEFAULT_TARGETS[0].password == "WebDAV" + + +def test_run_target_uses_portkeydrop_webdav_navigation_flow() -> None: + FakeWebDAVClient.instances.clear() + target = WebDAVProbeTarget( + name="fake", + host="https://dav.example.test/root/", + username="alice", + password="secret", + port=443, + paths=("/docs/",), + ) + + result = run_target(target, client_factory=FakeWebDAVClient) + + client = FakeWebDAVClient.instances[0] + assert client.info.protocol is Protocol.WEBDAV + assert client.info.host == "https://dav.example.test/root/" + assert client.info.username == "alice" + assert client.info.password == "secret" + assert client.calls == [ + ("connect", None), + ("list_dir", "."), + ("chdir", "/docs/"), + ("list_dir", "."), + ("disconnect", None), + ] + assert result.ok + assert [step.label for step in result.steps] == [ + "connect", + "refresh /", + "open /docs/", + "refresh /docs/", + ] + + +def test_run_target_reports_directory_navigation_failures() -> None: + FakeWebDAVClient.instances.clear() + target = WebDAVProbeTarget( + name="fake", + host="https://dav.example.test/", + username="alice", + password="secret", + paths=("/missing/",), + ) + + result = run_target(target, client_factory=FakeWebDAVClient) + + assert not result.ok + assert result.steps[-1].label == "open /missing/" + assert "NotADirectoryError" in result.steps[-1].message