-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpcloud_dl.py
More file actions
197 lines (159 loc) · 6.53 KB
/
pcloud_dl.py
File metadata and controls
197 lines (159 loc) · 6.53 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import argparse
import json
import os
import re
from datetime import datetime
import requests
from tqdm import tqdm
class PCloudDownloader:
def __init__(self, url):
self.url = url
self.session = requests.Session()
self.session.headers.update(
{
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Referer": "https://e.pcloud.link/",
}
)
# Auto-detect Cluster
self.is_eu = "e.pcloud.link" in url or "eapi.pcloud.com" in url
self.api_host = "eapi.pcloud.com" if self.is_eu else "api.pcloud.com"
# Extract Code
match = re.search(r"code=([a-zA-Z0-9]+)", url)
self.code = match.group(1) if match else None
def scan_link(self):
"""Extracts metadata from the pCloud landing page."""
if not self.code:
return None, []
try:
response = self.session.get(self.url, timeout=15)
if response.status_code != 200:
return None, []
# Locate the embedded JSON object in HTML
pattern = r"var publinkData = (\{.*?\});"
match = re.search(pattern, response.text, re.DOTALL)
if not match:
return None, []
data = json.loads(match.group(1))
meta = data.get("metadata", {})
folder_name = meta.get("name", "pCloud_Shared")
# If folder, get 'contents'. If single file, wrap metadata in a list.
files = meta.get("contents", [])
if not files and not meta.get("isfolder"):
files = [meta]
return folder_name, files
except Exception as e:
print(f"[!] Scan error: {e}")
return None, []
def get_download_url(self, fileid):
"""Requests a temporary download link for a specific file ID."""
api_url = f"https://{self.api_host}/getpublinkdownload?code={self.code}&fileid={fileid}"
try:
res = self.session.get(api_url).json()
if res.get("result") == 0:
return f"https://{res['hosts'][0]}{res['path']}"
except Exception:
pass
return None
def download_stream(self, file_info, target_path):
"""Streams the file to disk with a progress bar."""
url = self.get_download_url(file_info["fileid"])
if not url:
return False
with self.session.get(url, stream=True) as r:
r.raise_for_status()
total_size = int(r.headers.get("content-length", 0))
with (
open(target_path, "wb") as f,
tqdm(
desc=file_info["name"],
total=total_size,
unit="B",
unit_scale=True,
unit_divisor=1024,
leave=True,
) as bar,
):
for chunk in r.iter_content(chunk_size=1024 * 1024):
if chunk:
f.write(chunk)
bar.update(len(chunk))
return True
def main():
parser = argparse.ArgumentParser(description="pCloud Public Link CLI Downloader")
subparsers = parser.add_subparsers(dest="command", help="Commands")
# Command: scan
scan_p = subparsers.add_parser("scan", help="Verify link and list files")
scan_p.add_argument("url", help="pCloud public link URL")
# Command: download
dl_p = subparsers.add_parser("download", help="Download files from link")
dl_p.add_argument("url", help="pCloud public link URL")
dl_p.add_argument("-o", "--output", default=".", help="Output directory")
dl_p.add_argument("-s", "--subpath", help="Relative subpath (e.g. pcloud/shared/myfiles)")
dl_p.add_argument("--select", nargs="+", help="Only download files containing these keywords")
args = parser.parse_args()
if not args.command:
parser.print_help()
return
downloader = PCloudDownloader(args.url)
folder_name, files = downloader.scan_link()
if not folder_name:
print("[-] Failed to scan link. Is it valid/public?")
return
# CLI Output for Scan
if args.command == "scan":
# Filter out folders, only show files
files_only = [f for f in files if not f.get("isfolder", False)]
folders_only = [f for f in files if f.get("isfolder", False)]
print(f"\n[+] Link: {folder_name}")
print(
f"[+] Total Items: {len(files)} ({len(files_only)} files, {len(folders_only)} folders)"
)
print("-" * 50)
# Show folders first
if folders_only:
print("\n📁 Folders:")
for f in folders_only:
print(f" - {f['name']} (folder)")
# Show files
if files_only:
print("\n📄 Files:")
for f in files_only:
size_mb = f.get("size", 0) / 1e6
print(f" - {f['name']} ({size_mb:.2f} MB)")
if not files_only:
print("\n⚠️ No downloadable files found (only folders or no content)")
return
# CLI Output for Download
if args.command == "download":
# Filter to only include files (not folders)
files_only = [f for f in files if not f.get("isfolder", False) and "size" in f]
if not files_only:
print("[-] No downloadable files found in this link")
return
# Resolve target directory
if args.subpath:
target_dir = os.path.join(args.output, args.subpath)
else:
date_prefix = datetime.now().strftime("%Y-%m-%d")
target_dir = os.path.join(args.output, f"pcloud/shared/{date_prefix}_{folder_name}")
os.makedirs(target_dir, exist_ok=True)
print(f"[*] Target Directory: {target_dir}")
# Filter files if requested
to_download = files_only
if args.select:
to_download = [
f for f in files_only if any(k.lower() in f["name"].lower() for k in args.select)
]
if not to_download:
print("[-] No files match your selection criteria")
return
print(f"[*] Downloading {len(to_download)} files...")
for f in to_download:
path = os.path.join(target_dir, f["name"])
try:
downloader.download_stream(f, path)
except Exception as e:
print(f"[!] Error downloading {f['name']}: {e}")
if __name__ == "__main__":
main()