diff --git a/uk_bin_collection/uk_bin_collection/councils/LondonBoroughOfRichmondUponThames.py b/uk_bin_collection/uk_bin_collection/councils/LondonBoroughOfRichmondUponThames.py index 3354700a64..f91d7e69fd 100644 --- a/uk_bin_collection/uk_bin_collection/councils/LondonBoroughOfRichmondUponThames.py +++ b/uk_bin_collection/uk_bin_collection/councils/LondonBoroughOfRichmondUponThames.py @@ -1,100 +1,167 @@ -from selenium.webdriver.common.by import By -from selenium.webdriver.support import expected_conditions as EC +# uk_bin_collection/uk_bin_collection/councils/richmond_gov_uk.py + +import re +import html as html_unescape from datetime import datetime -from selenium.webdriver.support.wait import WebDriverWait -from selenium.webdriver.common.keys import Keys -from uk_bin_collection.uk_bin_collection.common import * +from urllib.parse import urlparse, parse_qs + +import requests + +from uk_bin_collection.uk_bin_collection.common import date_format from uk_bin_collection.uk_bin_collection.get_bin_data import AbstractGetBinDataClass class CouncilClass(AbstractGetBinDataClass): + """ + Richmond upon Thames – parse the static My Property page. + No Selenium. No BeautifulSoup. Just requests + regex tailored to the current markup. + """ def parse_data(self, page: str, **kwargs) -> dict: - print(f"Arguments are f{kwargs}") - driver = None - try: - page = kwargs["url"] - street_name = kwargs.get("paon") - web_driver = kwargs.get("web_driver") - headless = kwargs.get("headless") - - driver = create_webdriver(web_driver, headless, None, __name__) - driver.get(page) - - wait = WebDriverWait(driver, 60) - - self.dismiss_cookie_banner(wait) - self.input_street_name(street_name, wait) - self.submit(wait) - bin_types, collection_days = self.get_bins(driver) - bindata = self.get_collection_days(bin_types, collection_days) - - print(bindata) - - except Exception as e: - # Here you can log the exception if needed - print(f"An error occurred: {e}") - # Optionally, re-raise the exception if you want it to propagate - raise - finally: - # This block ensures that the driver is closed regardless of an exception - if driver: - driver.quit() - return bindata + base_url = kwargs.get("url") or page + pid_arg = kwargs.get("pid") + paon = kwargs.get("paon") - def get_collection_days(self, bin_types, collection_days): - bindata = {"bins": []} - WEEKLY_COLLECTION = 0 - GARDEN_COLLECTION = 1 + # work out final URL, but DO NOT add #my_waste + pid_from_url = self._pid_from_url(base_url) + pid_from_paon = self._pid_from_paon(paon) - for index, bin_type in enumerate(bin_types): - # currently only handled weekly and garden collection, special collections like Christmas Day need to be added - if index == WEEKLY_COLLECTION: - next_collection_date = get_next_day_of_week( - collection_days[index].text.strip(), date_format - ) - elif index == GARDEN_COLLECTION: - split_date_part = collection_days[index].text.split("More dates")[0] - next_collection_date = datetime.strptime( - split_date_part.strip(), "%d %B %Y" - ).strftime(date_format) - else: - next_collection_date = datetime.strptime( - collection_days[index].text.strip(), "%d %B %Y" - ).strftime(date_format) - - dict_data = { - "type": bin_type.text.strip(), - "collectionDate": next_collection_date, - } - bindata["bins"].append(dict_data) + if "pid=" in (base_url or ""): + target_url = base_url + elif pid_arg or pid_from_paon: + pid = pid_arg or pid_from_paon + sep = "&" if "?" in (base_url or "") else "?" + target_url = f"{base_url}{sep}pid={pid}" + else: + raise ValueError( + "Richmond: supply a URL that already has ?pid=... OR put PID in the House Number field." + ) + + html = self._fetch_html(target_url) + bindata = self._parse_html_for_waste(html) + if not bindata["bins"]: + raise RuntimeError("Richmond: no bins found in page HTML.") return bindata - def get_bins(self, driver): - table = driver.find_element(By.XPATH, ".//div[@id='maincontent']//table") - table_rows = table.find_elements(by=By.TAG_NAME, value="tr") - headerRow = table_rows[0] - table_info_row = table_rows[1] - bin_types = headerRow.find_elements(by=By.TAG_NAME, value="th")[2:] - collection_days = table_info_row.find_elements(by=By.TAG_NAME, value="td")[2:] - return bin_types, collection_days - - def submit(self, wait): - main_content_submit_button = wait.until( - EC.element_to_be_clickable( - (By.XPATH, ".//div[@id='maincontent']//input[@type='submit']") + # ----------------- HTTP ----------------- + + def _fetch_html(self, url: str) -> str: + headers = { + "User-Agent": ( + "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36" ) - ) - main_content_submit_button.send_keys(Keys.ENTER) + } + resp = requests.get(url, headers=headers, timeout=30) + resp.raise_for_status() + return resp.text + + # ----------------- parsing (regex) ----------------- + + def _parse_html_for_waste(self, html: str) -> dict: + # isolate the waste block between and next section + waste_block = self._extract_waste_block(html) + if not waste_block: + return {"bins": []} + + bins = [] - def input_street_name(self, street_name, wait): - input_element_postcodesearch = wait.until( - EC.visibility_of_element_located((By.ID, "Street")) + # find all

...

in that block + for h_match in re.finditer(r"

(.*?)

", waste_block, flags=re.I | re.S): + bin_name = self._clean(h_match.group(1)) + if not bin_name: + continue + + # slice from end of this

to either next

or end of block + start = h_match.end() + # find next h4 after this one + next_h = re.search(r"

", waste_block[start:], flags=re.I) + if next_h: + section = waste_block[start : start + next_h.start()] + else: + section = waste_block[start:] + + # try to find + date_lines = [] + ul_match = re.search(r"]*>(.*?)", section, flags=re.I | re.S) + if ul_match: + ul_inner = ul_match.group(1) + for li in re.findall(r"]*>(.*?)", ul_inner, flags=re.I | re.S): + text = self._clean(li) + if text: + date_lines.append(text) + + # fallback to

...

+ if not date_lines: + p_match = re.search(r"]*>(.*?)

", section, flags=re.I | re.S) + if p_match: + text = self._clean(p_match.group(1)) + if text: + date_lines.append(text) + + col_date = self._first_date_or_message(date_lines) + if col_date: + bins.append( + { + "type": bin_name, + "collectionDate": col_date, + } + ) + + return {"bins": bins} + + def _extract_waste_block(self, html: str) -> str | None: + # try to grab from
to (or my-councillors as fallback) + m = re.search( + r'(.+?)(?: str | None: + if not url: + return None + try: + q = parse_qs(urlparse(url).query) + return q.get("pid", [None])[0] + except Exception: + return None - def dismiss_cookie_banner(self, wait): - cookie_banner = wait.until( - EC.visibility_of_element_located((By.ID, "ccc-dismiss-button")) + def _pid_from_paon(self, paon) -> str | None: + # allow putting PID into "house number" + if paon and str(paon).isdigit() and 10 <= len(str(paon)) <= 14: + return str(paon) + return None + + def _clean(self, s: str) -> str: + # remove tags, unescape, strip + # first remove
and friends by replacing with space + s = re.sub(r"", " ", s, flags=re.I) + # strip any other simple tags + s = re.sub(r"<[^>]+>", "", s) + s = html_unescape.unescape(s) + return " ".join(s.split()) + + def _first_date_or_message(self, lines) -> str | None: + # match "Thursday 23 October 2025" or "23 October 2025" + date_rx = re.compile( + r"(?:(Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday)\s+)?" + r"(\d{1,2}\s+[A-Za-z]+\s+\d{4})" ) - cookie_banner.send_keys(Keys.ENTER) + for line in lines: + m = date_rx.search(line) + if m: + ds = m.group(0) + fmt = "%A %d %B %Y" if m.group(1) else "%d %B %Y" + dt = datetime.strptime(ds, fmt) + return dt.strftime(date_format) + + lower = line.lower() + if "no collection" in lower or "no contract" in lower or "no subscription" in lower: + return line + return None