Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions examples/cdp_mode/ReadMe.md
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ sb.cdp.tile_windows(windows=None, max_columns=0)
sb.cdp.grant_permissions(permissions, origin=None)
sb.cdp.grant_all_permissions()
sb.cdp.reset_permissions()
sb.cdp.get_all_urls(absolute=True)
sb.cdp.get_all_cookies(*args, **kwargs)
sb.cdp.set_all_cookies(*args, **kwargs)
sb.cdp.save_cookies(*args, **kwargs)
Expand Down Expand Up @@ -490,6 +491,11 @@ sb.cdp.enter_mfa_code(selector, totp_key=None, timeout=None)
sb.cdp.activate_messenger()
sb.cdp.set_messenger_theme(theme="default", location="default")
sb.cdp.post_message(message, duration=None, pause=True, style="info")
sb.cdp.download_file(file_url)
sb.cdp.save_file_as(file_url, new_file_name)
sb.cdp.assert_downloaded_file(file, timeout=None)
sb.cdp.get_path_of_downloaded_file(file)
sb.cdp.set_download_path(path)
sb.cdp.set_locale(locale)
sb.cdp.set_local_storage_item(key, value)
sb.cdp.set_session_storage_item(key, value)
Expand Down Expand Up @@ -547,8 +553,8 @@ sb.cdp.assert_url_contains(substring)
sb.cdp.assert_text(text, selector="html", timeout=None)
sb.cdp.assert_exact_text(text, selector="html", timeout=None)
sb.cdp.assert_text_not_visible(text, selector="body", timeout=None)
sb.cdp.assert_true()
sb.cdp.assert_false()
sb.cdp.assert_true(expression, msg=None)
sb.cdp.assert_false(expression, msg=None)
sb.cdp.assert_equal(first, second)
sb.cdp.assert_not_equal(first, second)
sb.cdp.assert_in(first, second)
Expand Down
17 changes: 17 additions & 0 deletions examples/cdp_mode/playwright/raw_google_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from playwright.sync_api import sync_playwright
from seleniumbase import sb_cdp

sb = sb_cdp.Chrome()
endpoint_url = sb.get_endpoint_url()

with sync_playwright() as p:
browser = p.chromium.connect_over_cdp(endpoint_url)
page = browser.contexts[0].pages[0]
page.goto("https://google.com/ncr")
sb.click_if_visible('button:contains("Accept all")')
page.type('[name="q"]', "SeleniumBase GitHub page")
sb.click('[value="Google Search"]')
sb.sleep(4) # The "AI Overview" sometimes loads
print(page.title())
sb.save_as_pdf("google_page.pdf", folder="./downloaded_files/")
print("PDF saved to ./downloaded_files/google_page.pdf")
83 changes: 83 additions & 0 deletions examples/cdp_mode/raw_cdp_downloads.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import math
from seleniumbase import sb_cdp

sb = sb_cdp.Chrome()

""" Part 1: Using sb.download_file(file_url) """

sb.goto("about:blank")
words_file = "wordle_words.txt"
words_link = (
"https://seleniumbase.github.io/cdn/txt/%s" % words_file
)
sb.download_file(words_link)
sb.assert_downloaded_file(words_file)
words_path = sb.get_path_of_downloaded_file(words_file)
with open(words_path, "r") as f:
words_data = f.read()
print("%s | Download = %s bytes." % (words_file, len(words_data)))
sb.assert_true(len(words_data) > 100) # Verify file not empty
text = '"oasis","carom","cubit"'
sb.assert_in(text, words_data) # Verify file has expected data

""" Part 2: Using click-initiated downloads """

sb.goto("https://pypi.org/project/sbvirtualdisplay/#files")
sb.assert_element("span#pip-command")
sb.assert_text("Download files", "div#files h2.page-title")
sb.assert_text("Download files", "a#files-tab")
pkg_header = sb.get_text("h1.package-header__name").strip()
pkg_name = pkg_header.replace(" ", "-")
whl_file = pkg_name + "-py3-none-any.whl"
tar_gz_file = pkg_name + ".tar.gz"

# Click the links to download the files into: "./downloaded_files/"
whl_selector = 'div#files a[href$="%s"]' % whl_file
tar_selector = 'div#files a[href$="%s"]' % tar_gz_file
sb.click(whl_selector) # Download the "whl" file
sb.sleep(0.1)
sb.click(tar_selector) # Download the "tar" file

# Verify that the downloaded files appear in the [Downloads Folder]
# (This only guarantees that the exact file name is in the folder.)
# (This does not guarantee that the downloaded files are complete.)
# (Later, we'll check that the files were downloaded successfully.)
sb.assert_downloaded_file(whl_file)
sb.assert_downloaded_file(tar_gz_file)

sb.sleep(1) # Add more time to make sure downloads have completed

# Get the actual size of the downloaded files (in bytes)
whl_path = sb.get_path_of_downloaded_file(whl_file)
with open(whl_path, "rb") as f:
whl_file_bytes = len(f.read())
print("%s | Download = %s bytes." % (whl_file, whl_file_bytes))
tar_gz_path = sb.get_path_of_downloaded_file(tar_gz_file)
with open(tar_gz_path, "rb") as f:
tar_gz_file_bytes = len(f.read())
print("%s | Download = %s bytes." % (tar_gz_file, tar_gz_file_bytes))

# Check to make sure the downloaded files are not empty or too small
sb.assert_true(whl_file_bytes > 5000)
sb.assert_true(tar_gz_file_bytes > 5000)

# Get file sizes in kB to compare actual values with displayed values
whl_file_kb = whl_file_bytes / 1000.0
whl_line_fi = sb.get_text('a[href$=".whl"]').strip()
whl_line = sb.get_text('div.file:contains("%s")' % whl_line_fi)
whl_display_kb = float(whl_line.split("(")[1].split(" ")[0])
tar_gz_file_kb = tar_gz_file_bytes / 1000.0
tar_gz_line_fi = sb.get_text('a[href$=".tar.gz"]').strip()
tar_gz_line = sb.get_text('div.file:contains("%s")' % tar_gz_line_fi)
tar_gz_display_kb = float(tar_gz_line.split("(")[1].split(" ")[0])

# Verify downloaded files are the correct size (account for rounding)
sb.assert_true(
abs(math.floor(whl_file_kb) - math.floor(whl_display_kb)) < 2
)
sb.assert_true(
abs(math.floor(tar_gz_file_kb) - math.floor(tar_gz_display_kb)) < 2
)

# Finally quit the browser
sb.quit()
2 changes: 1 addition & 1 deletion examples/test_download_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def test_download_chromedriver_notes(self):
notes_data = f.read()
self.assert_true(len(notes_data) > 100) # Verify file not empty
text = "Switching to nested frame fails with chrome/chromedriver 100"
self.assert_true(text in notes_data) # Verify file has expected data
self.assert_in(text, notes_data) # Verify file has expected data

def test_download_files_from_pypi(self):
self.goto("https://pypi.org/project/sbvirtualdisplay/#files")
Expand Down
10 changes: 8 additions & 2 deletions help_docs/cdp_mode_methods.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ sb.cdp.tile_windows(windows=None, max_columns=0)
sb.cdp.grant_permissions(permissions, origin=None)
sb.cdp.grant_all_permissions()
sb.cdp.reset_permissions()
sb.cdp.get_all_urls(absolute=True)
sb.cdp.get_all_cookies(*args, **kwargs)
sb.cdp.set_all_cookies(*args, **kwargs)
sb.cdp.save_cookies(*args, **kwargs)
Expand Down Expand Up @@ -127,6 +128,11 @@ sb.cdp.enter_mfa_code(selector, totp_key=None, timeout=None)
sb.cdp.activate_messenger()
sb.cdp.set_messenger_theme(theme="default", location="default")
sb.cdp.post_message(message, duration=None, pause=True, style="info")
sb.cdp.download_file(file_url)
sb.cdp.save_file_as(file_url, new_file_name)
sb.cdp.assert_downloaded_file(file, timeout=None)
sb.cdp.get_path_of_downloaded_file(file)
sb.cdp.set_download_path(path)
sb.cdp.set_locale(locale)
sb.cdp.set_local_storage_item(key, value)
sb.cdp.set_session_storage_item(key, value)
Expand Down Expand Up @@ -184,8 +190,8 @@ sb.cdp.assert_url_contains(substring)
sb.cdp.assert_text(text, selector="html", timeout=None)
sb.cdp.assert_exact_text(text, selector="html", timeout=None)
sb.cdp.assert_text_not_visible(text, selector="body", timeout=None)
sb.cdp.assert_true()
sb.cdp.assert_false()
sb.cdp.assert_true(expression, msg=None)
sb.cdp.assert_false(expression, msg=None)
sb.cdp.assert_equal(first, second)
sb.cdp.assert_not_equal(first, second)
sb.cdp.assert_in(first, second)
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ exceptiongroup>=1.3.1
websockets~=15.0.1;python_version<"3.10"
websockets>=16.0;python_version>="3.10"
filelock~=3.19.1;python_version<"3.10"
filelock>=3.29.1;python_version>="3.10"
filelock>=3.29.3;python_version>="3.10"
fasteners>=0.20
mycdp>=1.3.7
pynose>=1.5.5
Expand Down
2 changes: 1 addition & 1 deletion seleniumbase/__version__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# seleniumbase package
__version__ = "4.49.8"
__version__ = "4.49.9"
6 changes: 6 additions & 0 deletions seleniumbase/core/browser_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,7 @@ def uc_open_with_cdp_mode(driver, url=None, **kwargs):
cdp.grant_permissions = CDPM.grant_permissions
cdp.grant_all_permissions = CDPM.grant_all_permissions
cdp.reset_permissions = CDPM.reset_permissions
cdp.get_all_urls = CDPM.get_all_urls
cdp.get_all_cookies = CDPM.get_all_cookies
cdp.set_all_cookies = CDPM.set_all_cookies
cdp.save_cookies = CDPM.save_cookies
Expand Down Expand Up @@ -831,6 +832,11 @@ def uc_open_with_cdp_mode(driver, url=None, **kwargs):
cdp.activate_messenger = CDPM.activate_messenger
cdp.set_messenger_theme = CDPM.set_messenger_theme
cdp.post_message = CDPM.post_message
cdp.download_file = CDPM.download_file
cdp.save_file_as = CDPM.save_file_as
cdp.assert_downloaded_file = CDPM.assert_downloaded_file
cdp.get_path_of_downloaded_file = CDPM.get_path_of_downloaded_file
cdp.set_download_path = CDPM.set_download_path
cdp.set_locale = CDPM.set_locale
cdp.set_local_storage_item = CDPM.set_local_storage_item
cdp.set_session_storage_item = CDPM.set_session_storage_item
Expand Down
104 changes: 100 additions & 4 deletions seleniumbase/core/sb_cdp.py
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,18 @@ def reset_permissions(self):
driver = driver.cdp_base
return self.loop.run_until_complete(driver.reset_permissions())

def get_all_urls(self, absolute=True):
"""
Convenience function that returns all links (a,link,img,script,meta).
:param absolute:
Try to build all the links in absolute form
instead of "as is", often relative.
:return: List of URLs.
"""
return self.loop.run_until_complete(
self.page.get_all_urls(absolute=absolute)
)

def get_all_cookies(self, *args, **kwargs):
driver = self.driver
if hasattr(driver, "cdp_base"):
Expand Down Expand Up @@ -1725,6 +1737,80 @@ def post_message(self, message, duration=None, pause=True, style="info"):
duration = float(duration) + 0.15
time.sleep(float(duration))

def download_file(self, file_url):
"""Download a file from a URL.
The default download location is: "./downloaded_files/"."""
self.loop.run_until_complete(
self.page.download_file(file_url)
)

def save_file_as(self, file_url, new_file_name):
"""Download a file from a URL and rename it.
The default download location is: "./downloaded_files/"."""
self.loop.run_until_complete(
self.page.download_file(file_url, new_file_name)
)

def assert_downloaded_file(self, file, timeout=None):
"""Asserts that the file exists in SeleniumBase's [Downloads Folder].
For browser click-initiated downloads, SeleniumBase will override
the system [Downloads Folder] to be "./downloaded_files/".
@Params
file - The filename of the downloaded file.
timeout - The time (seconds) to wait for the download to complete.
browser - If True, uses the path set by click-initiated downloads."""
downloads_folder = constants.Files.DOWNLOADS_FOLDER
abs_path = os.path.abspath(".")
downloads_path = os.path.join(abs_path, downloads_folder)
if not timeout:
timeout = settings.LARGE_TIMEOUT
start_ms = time.time() * 1000.0
stop_ms = start_ms + (timeout * 1000.0)
downloaded_file_path = os.path.join(downloads_path, file)
found = False
for x in range(int(timeout)):
shared_utils.check_if_time_limit_exceeded()
try:
self.assert_true(
os.path.exists(downloaded_file_path),
"File [%s] was not found in the downloads folder [%s]!"
% (file, downloads_path),
)
found = True
break
except Exception:
now_ms = time.time() * 1000.0
if now_ms >= stop_ms:
break
time.sleep(1)
if not found and not os.path.exists(downloaded_file_path):
plural = "s"
if timeout == 1:
plural = ""
message = (
"File {%s} was not found in the downloads folder {%s} "
"after %s second%s! (Or the download didn't complete!)"
% (file, downloads_path, timeout, plural)
)
from seleniumbase.common.exceptions import NoSuchFileException
raise NoSuchFileException(message)

def get_path_of_downloaded_file(self, file):
"""This assumes the default location of SeleniumBase downloads,
which is the "./downloaded_files/" folder where scripts run."""
downloads_folder = constants.Files.DOWNLOADS_FOLDER
abs_path = os.path.abspath(".")
downloads_path = os.path.join(abs_path, downloads_folder)
return os.path.join(downloads_path, file)

def set_download_path(self, path):
"""Set a new download path for click-initiated downloads.
(For Pure CDP Mode sync format only! -> sb_cdp.)
The default download location is: "./downloaded_files/".
Convenience methods such as assert_downloaded_file(file)
will still expect the default location."""
self.loop.run_until_complete(self.page.set_download_path(path))

def set_locale(self, locale):
"""(Settings will take effect on the next page load)"""
self.loop.run_until_complete(self.page.set_locale(locale))
Expand Down Expand Up @@ -3233,13 +3319,23 @@ def assert_text_not_visible(self, text, selector="body", timeout=None):
)
return True

def assert_true(self, expression):
def assert_true(self, expression, msg=None):
if not expression:
raise AssertionError("%s is not true" % expression)
if not msg:
raise AssertionError("%s is not true" % expression)
else:
raise AssertionError(
"%s is not true. (%s)" % (expression, msg)
)

def assert_false(self, expression):
def assert_false(self, expression, msg=None):
if expression:
raise AssertionError("%s is not false" % expression)
if not msg:
raise AssertionError("%s is not false" % expression)
else:
raise AssertionError(
"%s is not false. (%s)" % (expression, msg)
)

def assert_equal(self, first, second):
if first != second:
Expand Down
2 changes: 2 additions & 0 deletions seleniumbase/fixtures/base_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -5103,6 +5103,8 @@ def activate_cdp_mode(self, url=None, **kwargs):
self.find_element_by_text = self.cdp.find_element_by_text
if hasattr(self.cdp, "get_active_tab"):
self.get_active_tab = self.cdp.get_active_tab
if hasattr(self.cdp, "get_all_urls"):
self.get_all_urls = self.cdp.get_all_urls
if hasattr(self.cdp, "get_endpoint_url"):
self.get_endpoint_url = self.cdp.get_endpoint_url
if hasattr(self.cdp, "get_event_loop"):
Expand Down
6 changes: 6 additions & 0 deletions seleniumbase/undetected/cdp_driver/browser.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ async def get(
filter(lambda item: item.type_ == "page", self.targets)
)
await connection.sleep(0.005)
_cdp_downloads_path = None
_cdp_timezone = None
_cdp_user_agent = ""
_cdp_locale = None
Expand All @@ -389,6 +390,8 @@ async def get(
_cdp_mobile_mode = None
_cdp_recorder = None
_cdp_ad_block = None
if getattr(sb_config, "_cdp_downloads_path", None):
_cdp_downloads_path = sb_config._cdp_downloads_path
if getattr(sb_config, "_cdp_timezone", None):
_cdp_timezone = sb_config._cdp_timezone
if getattr(sb_config, "_cdp_user_agent", None):
Expand All @@ -405,6 +408,8 @@ async def get(
_cdp_ad_block = sb_config.ad_block_on
if getattr(sb_config, "disable_csp", None):
_cdp_disable_csp = sb_config.disable_csp
if "downloads_path" in kwargs:
_cdp_downloads_path = kwargs["downloads_path"]
if "timezone" in kwargs:
_cdp_timezone = kwargs["timezone"]
elif "tzone" in kwargs:
Expand Down Expand Up @@ -439,6 +444,7 @@ async def get(
await connection.sleep(0.01)
await connection.send(cdp.network.enable())
await connection.sleep(0.01)
await connection.set_downloads_folder(_cdp_downloads_path)
if _cdp_timezone:
await connection.set_timezone(_cdp_timezone)
if _cdp_locale:
Expand Down
2 changes: 2 additions & 0 deletions seleniumbase/undetected/cdp_driver/cdp_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,8 @@ async def start(
sb_config._cdp_user_agent = kwargs["user_agent"]
else:
sb_config._cdp_user_agent = None
if "downloads_path" in kwargs:
sb_config._cdp_downloads_path = kwargs["downloads_path"]
if "platform" in kwargs:
sb_config._cdp_platform = kwargs["platform"]
elif "plat" in kwargs:
Expand Down
Loading