Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 199 additions & 0 deletions scripts/webdav_flow_probe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
from __future__ import annotations

import argparse
import sys
import traceback
from collections.abc import Callable, Iterable
from dataclasses import dataclass

from portkeydrop.protocols import ConnectionInfo, Protocol, RemoteFile, WebDAVClient


@dataclass(frozen=True)
class WebDAVProbeTarget:
name: str
host: str
username: str
password: str
port: int = 443
paths: tuple[str, ...] = ()


@dataclass(frozen=True)
class ProbeStep:
label: str
ok: bool
message: str


@dataclass(frozen=True)
class ProbeResult:
target: WebDAVProbeTarget
steps: tuple[ProbeStep, ...]

@property
def ok(self) -> bool:
return all(step.ok for step in self.steps)


ClientFactory = Callable[[ConnectionInfo], WebDAVClient]


DEFAULT_TARGETS: tuple[WebDAVProbeTarget, ...] = (
WebDAVProbeTarget(
name="dlp-public",
host="https://www.dlp-test.com/webdav/",
username=r"www.dlp-test.com\WebDAV",
password="WebDAV",
),
WebDAVProbeTarget(
name="dlp-private",
host="https://www.dlp-test.com/webdav_private/",
username=r"www.dlp-test.com\WebDAV",
password="WebDAV",
),
)


def _format_listing(files: list[RemoteFile]) -> str:
if not files:
return "0 items"
dirs = sum(1 for item in files if item.is_dir)
file_count = len(files) - dirs
sample = ", ".join(item.name for item in files[:5])
return f"{len(files)} items ({dirs} dirs, {file_count} files): {sample}"


def _error_message(exc: Exception) -> str:
return f"{type(exc).__name__}: {exc}"


def run_target(
target: WebDAVProbeTarget,
*,
client_factory: ClientFactory = WebDAVClient,
) -> ProbeResult:
info = ConnectionInfo(
protocol=Protocol.WEBDAV,
host=target.host,
port=target.port,
username=target.username,
password=target.password,
timeout=30,
)
client = client_factory(info)
steps: list[ProbeStep] = []

try:
client.connect()
steps.append(ProbeStep("connect", True, "connected"))

files = client.list_dir()
steps.append(ProbeStep("refresh /", True, _format_listing(files)))

for path in _unique_paths(target.paths):
try:
client.chdir(path)
steps.append(ProbeStep(f"open {path}", True, f"cwd={client.cwd}"))
except Exception as exc:
steps.append(ProbeStep(f"open {path}", False, _error_message(exc)))
continue

try:
files = client.list_dir()
steps.append(ProbeStep(f"refresh {client.cwd}", True, _format_listing(files)))
except Exception as exc:
steps.append(ProbeStep(f"refresh {client.cwd}", False, _error_message(exc)))
except Exception as exc:
steps.append(ProbeStep("connect", False, _error_message(exc)))
steps.append(ProbeStep("traceback", False, traceback.format_exc().strip()))
finally:
try:
client.disconnect()
except Exception as exc:
steps.append(ProbeStep("disconnect", False, _error_message(exc)))

return ProbeResult(target=target, steps=tuple(steps))


def _unique_paths(paths: Iterable[str]) -> tuple[str, ...]:
seen: set[str] = set()
unique: list[str] = []
for path in paths:
normalized = path.strip()
if not normalized or normalized in seen:
continue
seen.add(normalized)
unique.append(normalized)
return tuple(unique)


def _select_targets(
names: Iterable[str], extra_paths: tuple[str, ...]
) -> tuple[WebDAVProbeTarget, ...]:
requested = set(names)
if not requested or "all" in requested:
selected = DEFAULT_TARGETS
else:
by_name = {target.name: target for target in DEFAULT_TARGETS}
unknown = requested - set(by_name)
if unknown:
raise SystemExit(f"Unknown target(s): {', '.join(sorted(unknown))}")
selected = tuple(by_name[name] for name in requested)
if not extra_paths:
return selected
return tuple(
WebDAVProbeTarget(
name=target.name,
host=target.host,
username=target.username,
password=target.password,
port=target.port,
paths=(*target.paths, *extra_paths),
)
for target in selected
)


def format_result(result: ProbeResult) -> str:
lines = [f"{result.target.name} {result.target.host}"]
for step in result.steps:
marker = "PASS" if step.ok else "FAIL"
lines.append(f" [{marker}] {step.label}: {step.message}")
return "\n".join(lines)


def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description=(
"Probe public WebDAV servers through PortkeyDrop's WebDAVClient navigation flow."
)
)
parser.add_argument(
"--target",
action="append",
default=[],
help="Target name to probe: dlp-public, dlp-private, or all. May be repeated.",
)
parser.add_argument(
"--path",
action="append",
default=[],
help="Extra remote directory path to open with the same chdir/list flow PKD uses.",
)
return parser


def main(argv: list[str] | None = None) -> int:
args = build_parser().parse_args(argv)
targets = _select_targets(args.target, tuple(args.path))
results = [run_target(target) for target in targets]
for index, result in enumerate(results):
if index:
print()
print(format_result(result))
return 0 if all(result.ok for result in results) else 1


if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))
39 changes: 36 additions & 3 deletions src/portkeydrop/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,34 @@ def _build_hostname(self) -> str:
netloc = f"{userinfo}@{netloc}"
return urlunsplit((parts.scheme or "https", netloc, parts.path, parts.query, ""))

def _url_base_path(self) -> str:
raw_host = self._info.host.strip()
if "://" not in raw_host:
scheme = "http" if self._info.effective_port == 80 else "https"
raw_host = f"{scheme}://{raw_host}"
return urlsplit(raw_host).path.rstrip("/")

def _to_app_path(self, path: str) -> str:
if not path:
return "/"
if not path.startswith("/"):
path = f"/{path}"
base_path = self._url_base_path()
if (
base_path
and base_path != "/"
and (path == base_path or path.startswith(f"{base_path}/"))
):
path = path[len(base_path) :] or "/"
return path if path.startswith("/") else f"/{path}"

@staticmethod
def _same_collection(left: str, right: str) -> bool:
def normalize(path: str) -> str:
return path.rstrip("/") or "/"

return normalize(left) == normalize(right)

def _resolve_path(self, path: str = ".") -> str:
if not path or path == ".":
return self._cwd
Expand Down Expand Up @@ -534,17 +562,22 @@ def _remote_file_from_info(self, info: dict, fallback_path: str = "") -> RemoteF
modified = self._parse_modified(
info.get("modified") or info.get("modified_at") or info.get("lastmodified")
)
if path and not path.startswith("/"):
path = f"/{path}"
path = self._to_app_path(path)
return RemoteFile(name=name, path=path, size=size, is_dir=is_dir, modified=modified)

def list_dir(self, path: str = ".") -> list[RemoteFile]:
client = self._ensure_connected()
target = self._resolve_path(path)
items = client.list(target, get_info=True)
return [self._remote_file_from_info(item) for item in items if isinstance(item, dict)]
files = [self._remote_file_from_info(item) for item in items if isinstance(item, dict)]
return [
item for item in files if not (item.is_dir and self._same_collection(item.path, target))
]

def chdir(self, path: str) -> str:
if self._same_collection(self._resolve_path(path), "/"):
self._cwd = "/"
return self._cwd
remote = self.stat(self._resolve_path(path))
if not remote.is_dir:
raise NotADirectoryError(path)
Expand Down
46 changes: 46 additions & 0 deletions tests/test_protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,52 @@ def test_list_dir_maps_webdav_items(self, monkeypatch):
assert files[1].size == 12
assert files[1].is_dir is False

def test_list_dir_strips_webdav_url_base_path_and_skips_current_collection(self, monkeypatch):
webdav = MagicMock()
webdav.list.return_value = [
{
"path": "/webdav/",
"name": "webdav",
"isdir": True,
},
{
"path": "/webdav/docs/",
"name": "docs",
"isdir": True,
},
{
"path": "/webdav/readme.txt",
"name": "readme.txt",
"isdir": False,
"size": "12",
},
]
self._install_fake_webdav(monkeypatch, webdav)
client = WebDAVClient(
ConnectionInfo(protocol=Protocol.WEBDAV, host="https://dav.example.com/webdav/")
)
client.connect()

files = client.list_dir("/")

assert files == [
RemoteFile(name="docs", path="/docs/", size=0, is_dir=True),
RemoteFile(name="readme.txt", path="/readme.txt", size=12, is_dir=False),
]

def test_chdir_webdav_root_does_not_require_stat(self, monkeypatch):
webdav = MagicMock()
webdav.list.return_value = []
self._install_fake_webdav(monkeypatch, webdav)
client = WebDAVClient(
ConnectionInfo(protocol=Protocol.WEBDAV, host="https://dav.example.com/webdav/")
)
client.connect()

assert client.chdir("/") == "/"
assert client.cwd == "/"
webdav.info.assert_not_called()

def test_stat_maps_webdav_info(self, monkeypatch):
webdav = MagicMock()
webdav.list.return_value = []
Expand Down
Loading
Loading