diff --git a/.gitignore b/.gitignore index 14d4c523..0edbd10f 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,5 @@ resources/ chromedriver chromedriver.exe data.txt -login.py .DS_Store debug.log \ No newline at end of file diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index 02f4a853..00000000 --- a/Dockerfile +++ /dev/null @@ -1,27 +0,0 @@ -FROM python:3.8.6-slim-buster - -ARG UID=1000 -ARG GID=1000 - -WORKDIR /usr/src/app - -RUN groupadd -o -g ${GID} -r app && adduser --system --home /home/app --ingroup app --uid ${UID} app && \ - chown -R app:app /usr/src/app && \ - apt-get update && \ - apt-get install -y curl unzip gnupg && \ - curl -sS -o - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add - && \ - echo "deb [arch=amd64] http://dl.google.com/linux/chrome/deb/ stable main" >> /etc/apt/sources.list.d/google-chrome.list && \ - apt-get update && \ - apt-get install -y google-chrome-stable && \ - rm -rf /var/lib/apt/lists/* - -COPY --chown=app:app requirements.txt docker/download_chromedriver.py ./ - -RUN python3 -m pip install --no-cache-dir -r requirements.txt && \ - python3 download_chromedriver.py && chown -R app:app /usr/src/app - -COPY --chown=app:app . . - -USER app - -ENTRYPOINT [ "python3" ] diff --git a/check_and_gen.py b/check_and_gen.py deleted file mode 100644 index e722352d..00000000 --- a/check_and_gen.py +++ /dev/null @@ -1,306 +0,0 @@ -#!/usr/bin/env python3 - -from lib import modwall; modwall.check() # We check the requirements - -import json -from time import time -from os.path import isfile -from pathlib import Path -from ssl import SSLError -import base64 -from copy import deepcopy - -import httpx -from seleniumwire import webdriver -from selenium.common.exceptions import TimeoutException as SE_TimeoutExepction -from bs4 import BeautifulSoup as bs - -import config -from lib.utils import * -from lib import listener - - -# We change the current working directory to allow using GHunt from anywhere -os.chdir(Path(__file__).parents[0]) - -def get_saved_cookies(): - ''' returns cookie cache if exists ''' - if isfile(config.data_path): - try: - with open(config.data_path, 'r') as f: - out = json.loads(f.read()) - cookies = out["cookies"] - print("[+] Detected stored cookies, checking it") - return cookies - except Exception: - print("[-] Stored cookies are corrupted\n") - return False - print("[-] No stored cookies found\n") - return False - - -def get_authorization_source(cookies): - ''' returns html source of hangouts page if user authorized ''' - req = httpx.get("https://docs.google.com/document/u/0/?usp=direct_url", - cookies=cookies, headers=config.headers) - - if req.status_code == 200: - req2 = httpx.get("https://hangouts.google.com", cookies=cookies, - headers=config.headers) - if "myaccount.google.com" in req2.text: - return req.text - return None - - -def save_tokens(hangouts_auth, gdoc_token, hangouts_token, internal_token, internal_auth, cac_key, cookies, osid): - ''' save tokens to file ''' - output = { - "hangouts_auth": hangouts_auth, "internal_auth": internal_auth, - "keys": {"gdoc": gdoc_token, "hangouts": hangouts_token, "internal": internal_token, "clientauthconfig": cac_key}, - "cookies": cookies, - "osids": { - "cloudconsole": osid - } - } - with open(config.data_path, 'w') as f: - f.write(json.dumps(output)) - - -def get_hangouts_tokens(driver, cookies, tmprinter): - ''' gets auth and hangouts token ''' - - tmprinter.out("Setting cookies...") - driver.get("https://hangouts.google.com/robots.txt") - for k, v in cookies.items(): - driver.add_cookie({'name': k, 'value': v}) - - tmprinter.out("Fetching Hangouts homepage...") - driver.get("https://hangouts.google.com") - - tmprinter.out("Waiting for the /v2/people/me/blockedPeople request, it " - "can takes a few minutes...") - try: - req = driver.wait_for_request('/v2/people/me/blockedPeople', timeout=config.browser_waiting_timeout) - tmprinter.out("Request found !") - driver.close() - tmprinter.out("") - except SE_TimeoutExepction: - tmprinter.out("") - exit("\n[!] Selenium TimeoutException has occured. Please check your internet connection, proxies, vpns, et cetera.") - - - hangouts_auth = req.headers["Authorization"] - hangouts_token = req.url.split("key=")[1] - - return (hangouts_auth, hangouts_token) - -def drive_interceptor(request): - global internal_auth, internal_token - - if request.url.endswith(('.woff2', '.css', '.png', '.jpeg', '.svg', '.gif')): - request.abort() - elif request.path != "/drive/my-drive" and "Accept" in request.headers and \ - any([x in request.headers["Accept"] for x in ["image", "font-woff"]]): - request.abort() - if "authorization" in request.headers and "_" in request.headers["authorization"] and \ - request.headers["authorization"]: - internal_auth = request.headers["authorization"] - -def get_internal_tokens(driver, cookies, tmprinter): - """ Extract the mysterious token used for Internal People API - and some Drive requests, with the Authorization header""" - - global internal_auth, internal_token - - internal_auth = "" - - tmprinter.out("Setting cookies...") - driver.get("https://drive.google.com/robots.txt") - for k, v in cookies.items(): - driver.add_cookie({'name': k, 'value': v}) - - start = time() - - tmprinter.out("Fetching Drive homepage...") - driver.request_interceptor = drive_interceptor - driver.get("https://drive.google.com/drive/my-drive") - - body = driver.page_source - internal_token = body.split("appsitemsuggest-pa")[1].split(",")[3].strip('"') - - tmprinter.out(f"Waiting for the authorization header, it " - "can takes a few minutes...") - - while True: - if internal_auth and internal_token: - tmprinter.clear() - break - elif time() - start > config.browser_waiting_timeout: - tmprinter.clear() - exit("[-] Timeout while fetching the Internal tokens.\nPlease increase the timeout in config.py or try again.") - - del driver.request_interceptor - - return internal_auth, internal_token - -def gen_osid(cookies, domain, service): - req = httpx.get(f"https://accounts.google.com/ServiceLogin?service={service}&osid=1&continue=https://{domain}/&followup=https://{domain}/&authuser=0", - cookies=cookies, headers=config.headers) - - body = bs(req.text, 'html.parser') - - params = {x.attrs["name"]:x.attrs["value"] for x in body.find_all("input", {"type":"hidden"})} - - headers = {**config.headers, **{"Content-Type": "application/x-www-form-urlencoded"}} - req = httpx.post(f"https://{domain}/accounts/SetOSID", cookies=cookies, data=params, headers=headers) - - osid_header = [x for x in req.headers["set-cookie"].split(", ") if x.startswith("OSID")] - if not osid_header: - exit("[-] No OSID header detected, exiting...") - - osid = osid_header[0].split("OSID=")[1].split(";")[0] - - return osid - -def get_clientauthconfig_key(cookies): - """ Extract the Client Auth Config API token.""" - - req = httpx.get("https://console.cloud.google.com", - cookies=cookies, headers=config.headers) - - if req.status_code == 200 and "pantheon_apiKey" in req.text: - cac_key = req.text.split('pantheon_apiKey\\x22:')[1].split(",")[0].strip('\\x22') - return cac_key - exit("[-] I can't find the Client Auth Config API...") - -def check_cookies(cookies): - wanted = ["authuser", "continue", "osidt", "ifkv"] - - req = httpx.get(f"https://accounts.google.com/ServiceLogin?service=cloudconsole&osid=1&continue=https://console.cloud.google.com/&followup=https://console.cloud.google.com/&authuser=0", - cookies=cookies, headers=config.headers) - - body = bs(req.text, 'html.parser') - - params = [x.attrs["name"] for x in body.find_all("input", {"type":"hidden"})] - for param in wanted: - if param not in params: - return False - - return True - -def getting_cookies(cookies): - choices = ("You can facilitate configuring GHunt by using the GHunt Companion extension on Firefox, Chrome, Edge and Opera here :\n" - "=> https://github.com/mxrch/ghunt_companion\n\n" - "[1] (Companion) Put GHunt on listening mode (currently not compatible with docker)\n" - "[2] (Companion) Paste base64-encoded cookies\n" - "[3] Enter manually all cookies\n\n" - "Choice => ") - - choice = input(choices) - if choice not in ["1","2","3"]: - exit("Please choose a valid choice. Exiting...") - - if choice == "1": - received_cookies = listener.run() - cookies = json.loads(base64.b64decode(received_cookies)) - - elif choice == "2": - received_cookies = input("Paste the cookies here => ") - cookies = json.loads(base64.b64decode(received_cookies)) - - elif choice == "3": - for name in cookies.keys(): - if not cookies[name]: - cookies[name] = input(f"{name} => ").strip().strip('\"') - - return cookies - -if __name__ == '__main__': - - driverpath = get_driverpath() - cookies_from_file = get_saved_cookies() - - tmprinter = TMPrinter() - - cookies = {"SID": "", "SSID": "", "APISID": "", "SAPISID": "", "HSID": "", "LSID": "", "__Secure-3PSID": "", "CONSENT": config.default_consent_cookie, "PREF": config.default_pref_cookie} - - new_cookies_entered = False - - if not cookies_from_file: - cookies = getting_cookies(cookies) - new_cookies_entered = True - else: - # in case user wants to enter new cookies (example: for new account) - html = get_authorization_source(cookies_from_file) - valid_cookies = check_cookies(cookies_from_file) - valid = False - if html and valid_cookies: - print("[+] The cookies seems valid !") - valid = True - else: - print("[-] Seems like the cookies are invalid.") - new_gen_inp = input("\nDo you want to enter new browser cookies from accounts.google.com ? (Y/n) ").lower() - if new_gen_inp == "y": - cookies = getting_cookies(cookies) - new_cookies_entered = True - - elif not valid: - exit("Please put valid cookies. Exiting...") - - - # Validate cookies - if new_cookies_entered or not cookies_from_file: - html = get_authorization_source(cookies) - if html: - print("\n[+] The cookies seems valid !") - else: - exit("\n[-] Seems like the cookies are invalid, try regenerating them.") - - if not new_cookies_entered: - cookies = cookies_from_file - choice = input("Do you want to generate new tokens ? (Y/n) ").lower() - if choice != "y": - exit() - - # Start the extraction process - - # We first initialize the browser driver - chrome_options = get_chrome_options_args(config.headless) - options = { - 'connection_timeout': None # Never timeout, otherwise it floods errors - } - - tmprinter.out("Starting browser...") - driver = webdriver.Chrome( - executable_path=driverpath, seleniumwire_options=options, - options=chrome_options - ) - driver.header_overrides = config.headers - - print("Extracting the tokens...\n") - # Extracting Google Docs token - trigger = '\"token\":\"' - if trigger not in html: - exit("[-] I can't find the Google Docs token in the source code...\n") - else: - gdoc_token = html.split(trigger)[1][:100].split('"')[0] - print("Google Docs Token => {}".format(gdoc_token)) - - print("Generating OSID for the Cloud Console...") - osid = gen_osid(cookies, "console.cloud.google.com", "cloudconsole") - cookies_with_osid = deepcopy(cookies) - cookies_with_osid["OSID"] = osid - # Extracting Internal People API tokens - internal_auth, internal_token = get_internal_tokens(driver, cookies_with_osid, tmprinter) - print(f"Internal APIs Token => {internal_token}") - print(f"Internal APIs Authorization => {internal_auth}") - - # Extracting Hangouts tokens - auth_token, hangouts_token = get_hangouts_tokens(driver, cookies_with_osid, tmprinter) - print(f"Hangouts Authorization => {auth_token}") - print(f"Hangouts Token => {hangouts_token}") - - cac_key = get_clientauthconfig_key(cookies_with_osid) - print(f"Client Auth Config API Key => {cac_key}") - - save_tokens(auth_token, gdoc_token, hangouts_token, internal_token, internal_auth, cac_key, cookies, osid) diff --git a/config.py b/config.py deleted file mode 100644 index 5139c2c8..00000000 --- a/config.py +++ /dev/null @@ -1,30 +0,0 @@ -regexs = { - "albums": r'href=\"\.\/albumarchive\/\d*?\/album\/(.*?)\" jsaction.*?>(?:<.*?>){5}(.*?)<\/div><.*?>(\d*?) ', - "photos": r'\],\"(https:\/\/lh\d\.googleusercontent\.com\/.*?)\",\[\"\d{21}\"(?:.*?,){16}\"(.*?)\"', - "review_loc_by_id": r'{}\",.*?\[\[null,null,(.*?),(.*?)\]', - "gplus": r"plus\.google\.com\/\d*\"" -} - -headers = { - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; rv:68.0) Gecko/20100101 Firefox/68.0', - 'Connection': 'Keep-Alive' -} - -headless = True # if True, it doesn't show the browser while scraping GMaps reviews -ytb_hunt_always = True # if True, search the Youtube channel everytime -gmaps_radius = 30 # in km. The radius distance to create groups of gmaps reviews. -gdocs_public_doc = "1jaEEHZL32t1RUN5WuZEnFpqiEPf_APYKrRBG9LhLdvE" # The public Google Doc to use it as an endpoint, to use Google's Search. -data_path = "resources/data.txt" -browser_waiting_timeout = 120 - -# Profile pictures options -write_profile_pic = True -profile_pics_dir = "profile_pics" - -# Cookies -# if True, it will uses the Google Account cookies to request the services, -# and gonna be able to read your personal informations -gmaps_cookies = False -calendar_cookies = False -default_consent_cookie = "YES+FR.fr+V10+BX" -default_pref_cookie = "tz=Europe.Paris&f6=40000000&hl=en" # To set the lang settings to english \ No newline at end of file diff --git a/docker/download_chromedriver.py b/docker/download_chromedriver.py deleted file mode 100644 index 77c8ff95..00000000 --- a/docker/download_chromedriver.py +++ /dev/null @@ -1,4 +0,0 @@ -from webdriver_manager.chrome import ChromeDriverManager - -ChromeDriverManager(path="/usr/src/app").install() -print('ChromeDriver download was successful.') diff --git a/docker_check_and_gen.sh b/docker_check_and_gen.sh deleted file mode 100644 index 0d053b84..00000000 --- a/docker_check_and_gen.sh +++ /dev/null @@ -1,2 +0,0 @@ -#!/bin/bash -docker run -v ghunt-resources:/usr/src/app/resources -ti ghcr.io/mxrch/ghunt check_and_gen.py diff --git a/docker_hunt.sh b/docker_hunt.sh deleted file mode 100755 index b242d3eb..00000000 --- a/docker_hunt.sh +++ /dev/null @@ -1,2 +0,0 @@ -#!/bin/bash -docker run -v ghunt-resources:/usr/src/app/resources -ti ghcr.io/mxrch/ghunt ghunt.py $1 $2 \ No newline at end of file diff --git a/ghunt.py b/ghunt.py deleted file mode 100644 index a0a6145b..00000000 --- a/ghunt.py +++ /dev/null @@ -1,44 +0,0 @@ -#!/usr/bin/env python3 - -from lib import modwall; modwall.check() # We check the requirements - -import sys -import os -from pathlib import Path - -from lib import modwall -from lib.utils import * -from modules.doc import doc_hunt -from modules.email import email_hunt -from modules.gaia import gaia_hunt -from modules.youtube import youtube_hunt - - -if __name__ == "__main__": - - # We change the current working directory to allow using GHunt from anywhere - os.chdir(Path(__file__).parents[0]) - - modules = ["email", "doc", "gaia", "youtube"] - - if len(sys.argv) <= 1 or sys.argv[1].lower() not in modules: - print("Please choose a module.\n") - print("Available modules :") - for module in modules: - print(f"- {module}") - exit() - - module = sys.argv[1].lower() - if len(sys.argv) >= 3: - data = sys.argv[2] - else: - data = None - - if module == "email": - email_hunt(data) - elif module == "doc": - doc_hunt(data) - elif module == "gaia": - gaia_hunt(data) - elif module == "youtube": - youtube_hunt(data) \ No newline at end of file diff --git a/lib/__init__.py b/ghunt/apis/__init__.py similarity index 100% rename from lib/__init__.py rename to ghunt/apis/__init__.py diff --git a/ghunt/apis/peoplepa.py b/ghunt/apis/peoplepa.py new file mode 100644 index 00000000..fb6013a5 --- /dev/null +++ b/ghunt/apis/peoplepa.py @@ -0,0 +1,170 @@ +from ghunt.objects.base import GHuntCreds +from ghunt.errors import * +import ghunt.globals as gb +from ghunt.objects.apis import HttpAPI +from ghunt.parsers.people import Person + +import http.steven1024ej@gmail.com + +import inspect +import json + + +class PeoplePaHttp(HttpAPI): + def __init__(self, creds: GHuntCreds, headers: dict[str, str] = {}): + if not headers: + headers = gb.config.headers + + self.hostname = "people-pa.clients6.google.com" + self.scheme = "https" + + self.require_key = "photos" # key name, or None + + self._load_api(creds, headers) + + async def people_lookup(self, as_client: httpx.AsyncClient, email: str, params_template="just_gaia_id") -> tuple[bool, Person]: + endpoint_name = inspect.currentframe().f_code.co_name + + verb = "GET" + base_url = "/v2/people/lookup" + require_sapisidhash = True # bool, and if true, cookies must be true + require_cookies = True # bool + params_templates = { + "just_gaia_id": { + "id": email, + "type": "EMAIL", + "matchType": "EXACT", + "requestMask.includeField.paths": "person.metadata" + }, + "just_name": { + "id": email, + "type": "EMAIL", + "matchType": "EXACT", + "requestMask.includeField.paths": "person.metadata", + "core_id_params.enable_private_names": True + }, + "max_details": { + "id": email, + "type": "EMAIL", + "match_type": "EXACT", + "extension_set.extension_names": [ + "HANGOUTS_ADDITIONAL_DATA", + "HANGOUTS_OFF_NETWORK_GAIA_LOOKUP", + "HANGOUTS_PHONE_DATA", + "DYNAMITE_ADDITIONAL_DATA", + "DYNAMITE_ORGANIZATION_INFO", + "GPLUS_ADDITIONAL_DATA" + ], + "request_mask.include_field.paths": [ + "person.metadata.best_display_name", + "person.photo", + "person.cover_photo", + "person.interaction_settings", + "person.legacy_fields", + "person.metadata", + "person.in_app_reachability", + "person.name", + "person.read_only_profile_info", + "person.sort_keys", + "person.email" + ], + "request_mask.include_container": [ + "AFFINITY", + "PROFILE", + "DOMAIN_PROFILE", + "ACCOUNT", + "EXTERNAL_ACCOUNT", + "CIRCLE", + "DOMAIN_CONTACT", + "DEVICE_CONTACT", + "GOOGLE_GROUP", + "CONTACT" + ], + "core_id_params.enable_private_names": True + } + } + + if not params_templates.get(params_template): + raise GHuntParamsTemplateError(f"The asked template {params_template} for the endpoint {endpoint_name} wasn't recognized by GHunt.") + + self._load_endpoint(endpoint_name, require_sapisidhash, require_cookies) + req = await self._query(as_client, verb, endpoint_name, base_url, params_templates[params_template]) + + # Parsing + data = json.loads(req.text) + person = Person() + if not data: + return False, person + + person_data = list(data["people"].values())[0] + await person._scrape(as_client, person_data) + + return True, person + + async def people(self, as_client: httpx.AsyncClient, gaia_id: str, params_template="just_name") -> tuple[bool, Person]: + endpoint_name = inspect.currentframe().f_code.co_name + + verb = "GET" + base_url = "/v2/people" + require_sapisidhash = True # bool, and if true, cookies must be true + require_cookies = True # bool + params_templates = { + "just_name": { + "person_id": gaia_id, + "requestMask.includeField.paths": "person.name", + "core_id_params.enable_private_names": True + }, + "max_details": { + "person_id": gaia_id, + "extension_set.extension_names": [ + "HANGOUTS_ADDITIONAL_DATA", + "HANGOUTS_PHONE_DATA", + "DYNAMITE_ADDITIONAL_DATA", + "DYNAMITE_ORGANIZATION_INFO", + "GPLUS_ADDITIONAL_DATA" + ], + "request_mask.include_field.paths": [ + "person.metadata.best_display_name", + "person.photo", + "person.cover_photo", + "person.interaction_settings", + "person.legacy_fields", + "person.metadata", + "person.in_app_reachability", + "person.name", + "person.read_only_profile_info", + "person.sort_keys", + "person.email" + ], + "request_mask.include_container": [ + "AFFINITY", + "PROFILE", + "DOMAIN_PROFILE", + "ACCOUNT", + "EXTERNAL_ACCOUNT", + "CIRCLE", + "DOMAIN_CONTACT", + "DEVICE_CONTACT", + "GOOGLE_GROUP", + "CONTACT" + ], + "core_id_params.enable_private_names": True + } + } + + if not params_templates.get(params_template): + raise GHuntParamsTemplateError(f"The asked template {params_template} for the endpoint {endpoint_name} wasn't recognized by GHunt.") + + self._load_endpoint(endpoint_name, require_sapisidhash, require_cookies) + req = await self._query(as_client, verb, endpoint_name, base_url, params_templates[params_template]) + + # Parsing + data = json.loads(req.text) + person = Person() + if data["personResponse"][0]["status"] == "NOT_FOUND": + return False, person + + person_data = data["personResponse"][0]["person"] + await person._scrape(as_client, person_data) + + return True, person \ No newline at end of file diff --git a/ghunt/cli.py b/ghunt/cli.py new file mode 100644 index 00000000..48e94b57 --- /dev/null +++ b/ghunt/cli.py @@ -0,0 +1,36 @@ +import argparse + +import trio + + +def parse_and_run(): + parser = argparse.ArgumentParser() + subparsers = parser.add_subparsers(dest="module") + + ### Login module + parser_login = subparsers.add_parser('login') + + ### Email module + parser_email = subparsers.add_parser('email') + parser_email.add_argument("email_address") + + ### Gaia module + parser_gaia = subparsers.add_parser('gaia') + parser_gaia.add_argument("gaia_id") + + ### Parsing + args = parser.parse_args() + process_args(args) + +def process_args(args: argparse.Namespace): + if args.module == "login": + from ghunt.modules import login + login.check_and_login() + + if args.module == "email": + from ghunt.modules import email + trio.run(email.hunt, None, args.email_address) + + if args.module == "gaia": + from ghunt.modules import gaia + trio.run(gaia.hunt, None, args.gaia_id) \ No newline at end of file diff --git a/ghunt/config.py b/ghunt/config.py new file mode 100644 index 00000000..4b43bd73 --- /dev/null +++ b/ghunt/config.py @@ -0,0 +1,38 @@ +regexs = { + "review_loc_by_id": r'{}\",.*?\[\[null,null,(.*?),(.*?)\]', + "gplus": r"plus\.google\.com\/\d*\"" +} + +headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; rv:68.0) Gecko/20100101 Firefox/68.0', + 'Connection': 'Keep-Alive' +} + +templates = { + "gmaps_pb":{ + "stats": "!1s{}!2m3!1sYE3rYc2rEsqOlwSHx534DA!7e81!15i14416!6m2!4b1!7b1!9m0!16m4!1i100!4b1!5b1!6BQ0FFU0JrVm5TVWxEenc9PQ!17m28!1m6!1m2!1i0!2i0!2m2!1i458!2i736!1m6!1m2!1i1868!2i0!2m2!1i1918!2i736!1m6!1m2!1i0!2i0!2m2!1i1918!2i20!1m6!1m2!1i0!2i716!2m2!1i1918!2i736!18m12!1m3!1d806313.5865720833!2d150.19484835!3d-34.53825215!2m3!1f0!2f0!3f0!3m2!1i1918!2i736!4f13.1", + "reviews": { + "first": "!1s{}!2m5!1soViSYcvVG6iJytMPk6amiA8%3A1!2zMWk6NCx0OjE0MzIzLGU6MCxwOm9WaVNZY3ZWRzZpSnl0TVBrNmFtaUE4OjE!4m1!2i14323!7e81!6m2!4b1!7b1!9m0!10m6!1b1!2b1!5b1!8b1!9m1!1e3!14m69!1m57!1m4!1m3!1e3!1e2!1e4!3m5!2m4!3m3!1m2!1i260!2i365!4m1!3i10!10b1!11m42!1m3!1e1!2b0!3e3!1m3!1e2!2b1!3e2!1m3!1e2!2b0!3e3!1m3!1e8!2b0!3e3!1m3!1e10!2b0!3e3!1m3!1e10!2b1!3e2!1m3!1e9!2b1!3e2!1m3!1e10!2b0!3e3!1m3!1e10!2b1!3e2!1m3!1e10!2b0!3e4!2b1!4b1!2m5!1e1!1e4!1e3!1e5!1e2!3b0!4b1!5m1!1e1!7b1!16m3!1i10!4b1!5b1!17m0!18m9!1m3!1d2567.508024970022!2d-78.667885!3d35.7546725!2m0!3m2!1i537!2i609!4f13.1", + "page": "!1s{}!2m3!1sYE3rYc2rEsqOlwSHx534DA!7e81!15i14416!6m2!4b1!7b1!9m0!16m4!1i100!4b1!5b1!6B{}!17m28!1m6!1m2!1i0!2i0!2m2!1i458!2i736!1m6!1m2!1i1868!2i0!2m2!1i1918!2i736!1m6!1m2!1i0!2i0!2m2!1i1918!2i20!1m6!1m2!1i0!2i716!2m2!1i1918!2i736!18m12!1m3!1d806313.5865720833!2d150.19484835!3d-34.53825215!2m3!1f0!2f0!3f0!3m2!1i1918!2i736!4f13.1" + }, + "photos": { + "first": "!1s{}!2m3!1spQUAYoPQLcOTlwT9u6-gDA!7e81!15i18404!9m0!14m69!1m57!1m4!1m3!1e3!1e2!1e4!3m5!2m4!3m3!1m2!1i260!2i365!4m1!3i10!10b1!11m42!1m3!1e1!2b0!3e3!1m3!1e2!2b1!3e2!1m3!1e2!2b0!3e3!1m3!1e8!2b0!3e3!1m3!1e10!2b0!3e3!1m3!1e10!2b1!3e2!1m3!1e9!2b1!3e2!1m3!1e10!2b0!3e3!1m3!1e10!2b1!3e2!1m3!1e10!2b0!3e4!2b1!4b1!2m5!1e1!1e4!1e3!1e5!1e2!3b1!4b1!5m1!1e1!7b1", + "page": "!1s{}!2m3!1spQUAYoPQLcOTlwT9u6-gDA!7e81!15i14415!9m0!14m68!1m58!1m4!1m3!1e3!1e2!1e4!3m5!2m4!3m3!1m2!1i260!2i365!4m2!2s{}!3i100!10b1!11m42!1m3!1e1!2b0!3e3!1m3!1e2!2b1!3e2!1m3!1e2!2b0!3e3!1m3!1e8!2b0!3e3!1m3!1e10!2b0!3e3!1m3!1e10!2b1!3e2!1m3!1e9!2b1!3e2!1m3!1e10!2b0!3e3!1m3!1e10!2b1!3e2!1m3!1e10!2b0!3e4!2b1!4b1!2m5!1e1!1e4!1e3!1e5!1e2!5m1!1e1!7b1!17m28!1m6!1m2!1i0!2i0!2m2!1i458!2i595!1m6!1m2!1i950!2i0!2m2!1i1000!2i595!1m6!1m2!1i0!2i0!2m2!1i1000!2i20!1m6!1m2!1i0!2i575!2m2!1i1000!2i595!18m12!1m3!1d1304345.2752527467!2d149.32871599857805!3d-34.496155324132545!2m3!1f0!2f0!3f0!3m2!1i1000!2i595!4f13.1" + } + } +} + +gmaps_radius = 30 # in km. The radius distance to create groups of gmaps reviews. +gdocs_public_doc = "1jaEEHZL32t1RUN5WuZEnFpqiEPf_APYKrRBG9LhLdvE" # A public Google Doc to use as an endpoint for Google's Search. +creds_path = "resources/session.txt" + +# Cookies +# if True, it will uses the Google Account cookies to request the services, +# and gonna be able to read your personal informations +gmaps_cookies = False +calendar_cookies = False +default_consent_cookie = "YES+cb.20220118-08-p0.fr+FX+510" +default_pref_cookie = "tz=Europe.Paris&f6=40000000&hl=en" # To set the lang settings to english + +# If someone want to use GHunt as a lib, he maybe doesn't want all the prints, but just the JSON output +silent_mode = False \ No newline at end of file diff --git a/ghunt/errors.py b/ghunt/errors.py new file mode 100644 index 00000000..61bc89c6 --- /dev/null +++ b/ghunt/errors.py @@ -0,0 +1,20 @@ +class GHuntKnowledgeError(BaseException): + pass + +class GHuntCorruptedHeadersError(BaseException): + pass + +class GHuntUnknownVerbError(BaseException): + pass + +class GHuntInsufficientCreds(BaseException): + pass + +class GHuntParamsTemplateError(BaseException): + pass + +class GHuntAPIResponseParsingError(BaseException): + pass + +class GHuntObjectsMergingError(BaseException): + pass \ No newline at end of file diff --git a/ghunt/globals.py b/ghunt/globals.py new file mode 100644 index 00000000..e7b8f878 --- /dev/null +++ b/ghunt/globals.py @@ -0,0 +1,11 @@ +from ghunt.objects.utils import TMPrinter +from rich.console import Console +# This file is only intended to serve global variables at a project-wide level. + + +def init_globals(): + global as_client, config, tmprinter, rc + as_client = None + config = None + tmprinter = TMPrinter() + rc = Console(highlight=False) # Rich Console \ No newline at end of file diff --git a/profile_pics/.keep b/ghunt/knowledge/__init__.py similarity index 100% rename from profile_pics/.keep rename to ghunt/knowledge/__init__.py diff --git a/ghunt/knowledge/keys.py b/ghunt/knowledge/keys.py new file mode 100644 index 00000000..91b2cf77 --- /dev/null +++ b/ghunt/knowledge/keys.py @@ -0,0 +1,6 @@ +keys = { + "gdrive": {"key": "AIzaSyAWGrfCCr7albM3lmCc937gx4uIphbpeKQ", "origin": "https://drive.google.com"}, + "hangouts": {"key": "AIzaSyD7InnYR3VKdb4j2rMUEbTCIr2VyEazl6k", "origin": "https://hangouts.google.com"}, + "pantheon": {"key": "AIzaSyCI-zsRP85UVOi0DjtiCwWBwQ1djDy741g", "origin": "https://console.cloud.google.com"}, + "photos": {"key": "AIzaSyAa2odBewW-sPJu3jMORr0aNedh3YlkiQc", "origin": "https://photos.google.com"} +} \ No newline at end of file diff --git a/ghunt/knowledge/maps.py b/ghunt/knowledge/maps.py new file mode 100644 index 00000000..5c736f1d --- /dev/null +++ b/ghunt/knowledge/maps.py @@ -0,0 +1,35 @@ +types_translations = { + 'airport': 'Airport', + 'bar': 'Bar', + 'bank_intl': 'Bank', + 'bus': 'Bus', + 'cafe': 'Café', + 'camping': 'Camping', + 'cemetery': 'Cemetery', + 'civic_bldg': 'Civic building', + 'ferry': 'Ferry', + 'gas': 'Gas', + 'generic': 'Generic', + 'golf': 'Golf', + 'hospital_H': 'Hospital H', + 'library': 'Library', + 'lodging': 'Lodging', + 'monument': 'Monument', + 'movie': 'Movie', + 'museum': 'Museum', + 'parking': 'Parking', + 'police': 'Police', + 'postoffice': 'Post office', + 'restaurant': 'Restaurant', + 'school': 'School', + 'shoppingbag': 'Shopping bag', + 'shoppingcart': 'Shopping cart', + 'train': 'Train', + 'tram': 'Tram', + 'tree': 'Park', + 'worship_buddhist': 'Worship Buddhist', + 'worship_christian': 'Worship Christian', + 'worship_hindu': 'Worship Hindu', + 'worship_islam': 'Worship Islam', + 'worship_jewish': 'Worship Jewish' +} \ No newline at end of file diff --git a/ghunt/knowledge/services.py b/ghunt/knowledge/services.py new file mode 100644 index 00000000..24ca30cd --- /dev/null +++ b/ghunt/knowledge/services.py @@ -0,0 +1,3 @@ +services_baseurls = { + "cloudconsole": "console.cloud.google.com" +} \ No newline at end of file diff --git a/ghunt/lib/__init__.py b/ghunt/lib/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ghunt/lib/banner.py b/ghunt/lib/banner.py new file mode 100644 index 00000000..aeddacbb --- /dev/null +++ b/ghunt/lib/banner.py @@ -0,0 +1,16 @@ +from ghunt import globals as gb + +def banner(): + + banner = """ + [red] .d8888b. [/][blue]888 888[/][red] 888 + [/][red]d88P Y88b [/][blue]888 888[/][red] 888 + [/][yellow]888 [/][red]888 [/][blue]888 888[/][red] 888 + [/][yellow]888 [/][blue]8888888888[/][green] 888 888[/][yellow] 88888b. [/][red] 888888 + [/][yellow]888 [/][blue]88888 [/][blue]888 888[/][green] 888 888[/][yellow] 888 "88b[/][red] 888 + [/][yellow]888 [/][blue]888 [/][blue]888 888[/][green] 888 888[/][yellow] 888 888[/][red] 888 + [/][green]Y88b d88P [/][blue]888 888[/][green] Y88b 888[/][yellow] 888 888[/][red] Y88b. + [/][green] "Y8888P88 [/][blue]888 888[/][green] "Y88888[/][yellow] 888 888[/][red] "Y888 + """ + + gb.rc.print(banner) \ No newline at end of file diff --git a/ghunt/lib/gmaps.py b/ghunt/lib/gmaps.py new file mode 100644 index 00000000..c055077e --- /dev/null +++ b/ghunt/lib/gmaps.py @@ -0,0 +1,417 @@ +from dateutil.relativedelta import relativedelta +from datetime import datetime +import json +from geopy import distance +from geopy.geocoders import Nominatim + +import httpx +from alive_progress import alive_bar + +from ghunt import globals as gb +from ghunt.objects.base import * +from ghunt.lib.utils import * +from ghunt.objects.utils import * +from ghunt.lib.knowledge import get_gmaps_type_translation + + +def get_datetime(datepublished: str): + """ + Get an approximative date from the maps review date + Examples : 'last 2 days', 'an hour ago', '3 years ago' + """ + if datepublished.split()[0] in ["a", "an"]: + nb = 1 + else: + if datepublished.startswith("last"): + nb = int(datepublished.split()[1]) + else: + nb = int(datepublished.split()[0]) + + if "minute" in datepublished: + delta = relativedelta(minutes=nb) + elif "hour" in datepublished: + delta = relativedelta(hours=nb) + elif "day" in datepublished: + delta = relativedelta(days=nb) + elif "week" in datepublished: + delta = relativedelta(weeks=nb) + elif "month" in datepublished: + delta = relativedelta(months=nb) + elif "year" in datepublished: + delta = relativedelta(years=nb) + else: + delta = relativedelta() + + return (datetime.today() - delta).replace(microsecond=0, second=0) + +async def get_reviews(as_client: httpx.AsyncClient, gaia_id: str) -> Tuple[str, dict[str, int], list[MapsReview], list[MapsPhoto]]: + """Extracts the target's statistics, reviews and photos.""" + next_page_token = "" + agg_reviews = [] + agg_photos = [] + stats = {} + + req = await as_client.get(f"https://www.google.com/locationhistory/preview/mas?authuser=0&hl=en&gl=us&pb={gb.config.templates['gmaps_pb']['stats'].format(gaia_id)}") + data = json.loads(req.text[5:]) + if not data[16][8]: + return "empty", stats, [], [] + stats = {sec[6]:sec[7] for sec in data[16][8][0]} + total_reviews = stats["Reviews"] + stats["Ratings"] + stats["Photos"] + if not total_reviews: + return "empty", stats, [], [] + + with alive_bar(total_reviews, receipt=False) as bar: + for category in ["reviews", "photos"]: + first = True + while True: + if first: + req = await as_client.get(f"https://www.google.com/locationhistory/preview/mas?authuser=0&hl=en&gl=us&pb={gb.config.templates['gmaps_pb'][category]['first'].format(gaia_id)}") + first = False + else: + req = await as_client.get(f"https://www.google.com/locationhistory/preview/mas?authuser=0&hl=en&gl=us&pb={gb.config.templates['gmaps_pb'][category]['page'].format(gaia_id, next_page_token)}") + data = json.loads(req.text[5:]) + + new_reviews = [] + new_photos = [] + next_page_token = "" + + # Reviews + if category == "reviews": + if not data[24]: + return "private", stats, [], [] + reviews_data = data[24][0] + if not reviews_data: + break + for review_data in reviews_data: + review = MapsReview() + review.id = review_data[0][10] + review.approximative_date = get_datetime(review_data[0][1]) # UTC + review.comment = review_data[0][3] + review.rating = review_data[0][4] + if len(review_data[0]) >= 50 and review_data[0][49]: + guided_data = review_data[0][49] + for guided_section in guided_data: + guided = MapsGuidedAnswer() + guided.id = guided_section[0][0] + guided.question = guided_section[1] + guided.answer = guided_section[2][0][0][1] + review.guided_answers.append(guided) + + review.location.id = review_data[1][14][0] + review.location.name = review_data[1][2] + review.location.address = review_data[1][3] + review.location.tags = review_data[1][4] if review_data[1][4] else [] + review.location.types = [x for x in review_data[1][8] if x] + if review_data[1][0]: + review.location.position.latitude = review_data[1][0][2] + review.location.position.longitude = review_data[1][0][3] + if len(review_data[1]) > 31 and review_data[1][31]: + review.location.cost = len(review_data[1][31]) + new_reviews.append(review) + bar() + + agg_reviews += new_reviews + + if not new_reviews or len(data[24]) < 4 or not data[24][3]: + break + next_page_token = data[24][3].strip("=") + + # Photos + elif category == "photos" : + if not data[22]: + return "private", stats, [], [] + photos_data = data[22][1] + if not photos_data: + break + for photo_data in photos_data: + photos = MapsPhoto() + photos.id = photo_data[0][10] + photos.url = photo_data[0][6][0].split("=")[0] + date = photo_data[0][21][6][8] + photos.exact_date = datetime(date[0], date[1], date[2], date[3]) # UTC + photos.approximative_date = get_datetime(date[8][0]) # UTC + + if len(photo_data) > 1: + photos.location.id = photo_data[1][14][0] + photos.location.name = photo_data[1][2] + photos.location.address = photo_data[1][3] + photos.location.tags = photo_data[1][4] if photo_data[1][4] else [] + photos.location.types = [x for x in photo_data[1][8] if x] if photo_data[1][8] else [] + if photo_data[1][0]: + photos.location.position.latitude = photo_data[1][0][2] + photos.location.position.longitude = photo_data[1][0][3] + if len(photo_data[1]) > 31 and photo_data[1][31]: + photos.location.cost = len(photo_data[1][31]) + new_photos.append(photos) + bar() + + agg_photos += new_photos + + if not new_photos or len(data[22]) < 4 or not data[22][3]: + break + next_page_token = data[22][3].strip("=") + + return "", stats, agg_reviews, agg_photos + +def avg_location(locs: Tuple[float, float]): + """ + Calculates the average location + from a list of (latitude, longitude) tuples. + """ + latitude = [] + longitude = [] + for loc in locs: + latitude.append(loc[0]) + longitude.append(loc[1]) + + latitude = sum(latitude) / len(latitude) + longitude = sum(longitude) / len(longitude) + return latitude, longitude + +def translate_confidence(percents: int): + """Translates the percents number to a more human-friendly text""" + if percents >= 100: + return "Extremely high" + elif percents >= 80: + return "Very high" + elif percents >= 60: + return "Little high" + elif percents >= 40: + return "Okay" + elif percents >= 20: + return "Low" + elif percents >= 10: + return "Very low" + else: + return "Extremely low" + +def sanitize_location(location: Dict[str, str]): + """Returns the nearest place from a Nomatim location response.""" + not_country = False + not_town = False + town = "?" + country = "?" + if "city" in location: + town = location["city"] + elif "village" in location: + town = location["village"] + elif "town" in location: + town = location["town"] + elif "municipality" in location: + town = location["municipality"] + else: + not_town = True + if not "country" in location: + not_country = True + location["country"] = country + if not_country and not_town: + return False + location["town"] = town + return location + +def calculate_probable_location(geolocator: Nominatim, reviews_and_photos: list[MapsReview|MapsPhoto], gmaps_radius: int): + """Calculates the probable location from a list of reviews and the max radius.""" + tmprinter = TMPrinter() + radius = gmaps_radius + + locations = {} + tmprinter.out(f"Calculation of the distance of each review...") + for nb, review in enumerate(reviews_and_photos): + if not review.location.position.latitude or not review.location.position.longitude: + continue + if review.location.id not in locations: + locations[review.location.id] = {"dates": [], "locations": [], "range": None, "score": 0} + location = (review.location.position.latitude, review.location.position.longitude) + for review2 in reviews_and_photos: + location2 = (review2.location.position.latitude, review2.location.position.longitude) + dis = distance.distance(location, location2).km + + if dis <= radius: + locations[review.location.id]["dates"].append(review2.approximative_date) + locations[review.location.id]["locations"].append(location2) + + maxdate = max(locations[review.location.id]["dates"]) + mindate = min(locations[review.location.id]["dates"]) + locations[review.location.id]["range"] = maxdate - mindate + tmprinter.out(f"Calculation of the distance of each review ({nb}/{len(reviews_and_photos)})...") + + tmprinter.clear() + + locations = {k: v for k, v in + sorted(locations.items(), key=lambda k: len(k[1]["locations"]), reverse=True)} # We sort it + + tmprinter.out("Identification of redundant areas...") + to_del = [] + for id in locations: + if id in to_del: + continue + for id2 in locations: + if id2 in to_del or id == id2: + continue + if all([loc in locations[id]["locations"] for loc in locations[id2]["locations"]]): + to_del.append(id2) + for hash in to_del: + del locations[hash] + + tmprinter.out("Calculating confidence...") + + maxrange = max([locations[hash]["range"] for hash in locations]) + maxlen = max([len(locations[hash]["locations"]) for hash in locations]) + minreq = 3 + mingroups = 3 + + score_steps = 4 + for hash, loc in locations.items(): + if len(loc["locations"]) == maxlen: + locations[hash]["score"] += score_steps * 4 + if loc["range"] == maxrange: + locations[hash]["score"] += score_steps * 3 + if len(locations) >= mingroups: + others = sum([len(locations[h]["locations"]) for h in locations if h != hash]) + if len(loc["locations"]) > others: + locations[hash]["score"] += score_steps * 2 + if len(loc["locations"]) >= minreq: + locations[hash]["score"] += score_steps + + panels = sorted(set([loc["score"] for loc in locations.values()]), reverse=True) + + maxscore = sum([p * score_steps for p in range(1, score_steps + 1)]) + for panel in panels: + locs = [loc for loc in locations.values() if loc["score"] == panel] + if len(locs[0]["locations"]) == 1: + panel /= 2 + if len(reviews_and_photos) < 4: + panel /= 2 + confidence = translate_confidence(panel / maxscore * 100) + for nb, loc in enumerate(locs): + avg = avg_location(loc["locations"]) + while True: + try: + location = geolocator.reverse(f"{avg[0]}, {avg[1]}", timeout=10).raw["address"] + break + except: + pass + location = sanitize_location(location) + locs[nb]["avg"] = location + del locs[nb]["locations"] + del locs[nb]["score"] + del locs[nb]["range"] + del locs[nb]["dates"] + + tmprinter.clear() + + return confidence, locs + +def output(err: str, stats: dict[str, int], reviews: list[MapsReview], photos: list[MapsPhoto], gaia_id: str): + """Pretty print the Maps results, and do some guesses.""" + + print(f"\nProfile page : https://www.google.com/maps/contrib/{gaia_id}/reviews") + + reviews_and_photos: list[MapsReview|MapsPhoto] = reviews + photos + if err != "private" and (err == "empty" or not reviews_and_photos): + print("\n[-] No review.") + return + + print("\n[Statistics]") + for section, number in stats.items(): + if number: + print(f"{section} : {number}") + + if err == "private": + print("\n[-] Reviews are private.") + return + + print("\n[Reviews]") + avg_ratings = round(sum([x.rating for x in reviews]) / len(reviews), 1) + print(f"[+] Average rating : {ppnb(avg_ratings)}/5\n") + + costs_table = { + 1: "Inexpensive", + 2: "Moderately expensive", + 3: "Expensive", + 4: "Very expensive" + } + + total_costs = 0 + costs_stats = {x:0 for x in range(1,5)} + for review in reviews_and_photos: + if review.location.cost: + costs_stats[review.location.cost] += 1 + total_costs += 1 + costs_stats = dict(sorted(costs_stats.items(), key=lambda item: item[1], reverse=True)) # We sort the dict by cost popularity + + if total_costs: + print("[Costs]") + for cost, desc in costs_table.items(): + line = f"> {ppnb(round(costs_stats[cost]/total_costs*100, 1))}% {desc} ({costs_stats[cost]})" + style = "" + if not costs_stats[cost]: + style = "bright_black" + elif costs_stats[cost] == list(costs_stats.values())[0]: + style = "green" + gb.rc.print(line, style=style) + + avg_costs = round(sum([x*y for x,y in costs_stats.items()]) / total_costs) + print(f"\n[+] Average costs : {costs_table[avg_costs]}") + else: + print("[-] No costs data.") + + types = {} + for review in reviews_and_photos: + for type in review.location.types: + if type not in types: + types[type] = 0 + types[type] += 1 + types = dict(sorted(types.items(), key=lambda item: item[1], reverse=True)) + + types_and_tags = {} + for review in reviews_and_photos: + for type in review.location.types: + if type not in types_and_tags: + types_and_tags[type] = {} + for tag in review.location.tags: + if tag not in types_and_tags[type]: + types_and_tags[type][tag] = 0 + types_and_tags[type][tag] += 1 + types_and_tags[type] = dict(sorted(types_and_tags[type].items(), key=lambda item: item[1], reverse=True)) + types_and_tags = dict(sorted(types_and_tags.items())) + + if types_and_tags: + print("\nTarget's locations preferences :") + + unknown_trads = [] + for type, type_count in types.items(): + tags_counts = types_and_tags[type] + translation = get_gmaps_type_translation(type) + if not translation: + unknown_trads.append(type) + gb.rc.print(f"\n🏨 [underline]{translation if translation else type.title()} [{type_count}]", style="bold") + nb = 0 + for tag, tag_count in list(tags_counts.items()): + if nb >= 7: + break + elif tag.lower() == type: + continue + print(f"- {tag} ({tag_count})") + nb += 1 + + if unknown_trads: + print(f"\n⚠️ The following gmaps types haven't been found in GHunt\'s knowledge.") + for type in unknown_trads: + print(f"- {type}") + print("Please open an issue on the GHunt Github or submit a PR to add it !") + + geolocator = Nominatim(user_agent="nominatim") + + confidence, locations = calculate_probable_location(geolocator, reviews_and_photos, gb.config.gmaps_radius) + print(f"\n[+] Probable location (confidence => {confidence}) :") + + loc_names = [] + for loc in locations: + loc_names.append( + f"- {loc['avg']['town']}, {loc['avg']['country']}" + ) + + loc_names = set(loc_names) # delete duplicates + for loc in loc_names: + print(loc) \ No newline at end of file diff --git a/ghunt/lib/knowledge.py b/ghunt/lib/knowledge.py new file mode 100644 index 00000000..571d9f1c --- /dev/null +++ b/ghunt/lib/knowledge.py @@ -0,0 +1,25 @@ +from ghunt.knowledge.services import services_baseurls +from ghunt.knowledge.keys import keys +from ghunt.knowledge.maps import types_translations +from ghunt.errors import GHuntKnowledgeError + + +def get_domain_of_service(service: str) -> str: + if service not in services_baseurls: + raise GHuntKnowledgeError(f'The service "{service}" has not been found in GHunt\'s knowledge.') + return services_baseurls.get(service) + +def get_origin_of_key(key_name: str) -> str: + if key_name not in keys: + raise GHuntKnowledgeError(f'The key "{key_name}" has not been found in GHunt\'s knowledge.') + return keys.get(key_name, {}).get("origin") + +def get_api_key(key_name: str) -> str: + if key_name not in keys: + raise GHuntKnowledgeError(f'The key "{key_name}" has not been found in GHunt\'s knowledge.') + return keys.get(key_name, {}).get("key") + +def get_gmaps_type_translation(type_name: str) -> str: + #if type_name not in types_translations: + #raise GHuntKnowledgeError(f'The gmaps type "{type_name}" has not been found in GHunt\'s knowledge.\nPlease open an issue on the GHunt Github or submit a PR to add it !') + return types_translations.get(type_name) \ No newline at end of file diff --git a/lib/listener.py b/ghunt/lib/listener.py similarity index 98% rename from lib/listener.py rename to ghunt/lib/listener.py index 5a29fb88..4f06827c 100644 --- a/lib/listener.py +++ b/ghunt/lib/listener.py @@ -1,5 +1,4 @@ from http.server import BaseHTTPRequestHandler, HTTPServer -import threading from time import sleep diff --git a/lib/modwall.py b/ghunt/lib/modwall.py similarity index 100% rename from lib/modwall.py rename to ghunt/lib/modwall.py diff --git a/ghunt/lib/utils.py b/ghunt/lib/utils.py new file mode 100644 index 00000000..f9f0f361 --- /dev/null +++ b/ghunt/lib/utils.py @@ -0,0 +1,75 @@ +from pathlib import Path +from PIL import Image +import hashlib +from typing import * +from time import time +from datetime import datetime, timezone +from copy import deepcopy + +import httpx +import imagehash +from io import BytesIO + +from ghunt import globals as gb + + +def gprint(*args, **kwargs) -> None: + if not gb.config.silent_mode: + print(*args, **kwargs) + +def within_docker() -> bool: + return Path('/.dockerenv').is_file() + +def gen_sapisidhash(sapisid: str, origin: str, timestamp: str = str(int(time()))) -> str: + return f"{timestamp}_{hashlib.sha1(' '.join([timestamp, sapisid, origin]).encode()).hexdigest()}" + +def extract_set_cookies(req: httpx.Response) -> Dict[str, str]: + return {pair[0]:''.join(pair[1:]) for x in req.headers.get_list("set-cookie") if (pair := x.split(";")[0].split("="))} + +def inject_osid(cookies: Dict[str, str], osids: Dict[str, str], service: str) -> Dict[str, str]: + cookies_with_osid = deepcopy(cookies) + cookies_with_osid["OSID"] = osids[service] + return cookies_with_osid + +def is_headers_syntax_good(headers: dict[str, str]) -> bool: + try: + httpx.Headers(headers) + return True + except: + return False + +async def get_image_flathash(as_client: httpx.AsyncClient, image_url: str) -> imagehash.ImageHash: + req = await as_client.get(image_url) + img = Image.open(BytesIO(req.content)) + flathash = imagehash.average_hash(img) + return flathash + +async def is_default_profile_pic(as_client: httpx.AsyncClient, image_url: str): + flathash = await get_image_flathash(as_client, image_url) + if flathash - imagehash.hex_to_flathash("000018183c3c0000", 8) < 10 : + return True + return False + +def get_class_name(obj) -> str: + return str(obj).strip("<>").split(" ")[0] + +def get_datetime_utc(date_str): + """Converts ISO to datetime object in UTC""" + date = datetime.fromisoformat(date_str) + margin = date.utcoffset() + return date.replace(tzinfo=timezone.utc) - margin + +def ppnb(nb: float) -> float|int: + """ + Pretty print float number + Ex: 3.9 -> 3.9 + 4.0 -> 4 + 4.1 -> 4.1 + """ + try: + return int(nb) if nb % int(nb) == 0.0 else nb + except ZeroDivisionError: + if nb == 0.0: + return 0 + else: + return nb \ No newline at end of file diff --git a/ghunt/modules/email.py b/ghunt/modules/email.py new file mode 100644 index 00000000..4adcd487 --- /dev/null +++ b/ghunt/modules/email.py @@ -0,0 +1,88 @@ +from ghunt.objects.base import GHuntCreds +from ghunt.apis.peoplepa import PeoplePaHttp +from ghunt.lib import gmaps +from ghunt import globals as gb + +import httpx + + +async def hunt(as_client: httpx.AsyncClient, email_address: str): + + if not as_client: + as_client = httpx.AsyncClient() + + ghunt_creds = GHuntCreds() + ghunt_creds.load_creds() + + gb.rc.print("\n🪁 Google Account data", style="dodger_blue2") + + people_pa = PeoplePaHttp(ghunt_creds) + is_found, target = await people_pa.people_lookup(as_client, email_address, params_template="max_details") + if not is_found: + await as_client.aclose() + exit("\n[-] The target wasn't found.") + + containers = target.sourceIds + for container in containers: + print(f"\n[{container} CONTAINER]") + if container in target.names: + print(f"Name : {target.names[container].fullname}\n") + + if container in target.profilePhotos: + if target.profilePhotos[container].isDefault: + print("[-] Default profile picture") + else: + print("[+] Custom profile picture !") + print(f"=> {target.profilePhotos[container].url}") + + if container in target.coverPhotos: + if target.coverPhotos[container].isDefault: + print("[-] Default cover picture\n") + else: + print("[+] Custom cover picture !") + print(f"=> {target.coverPhotos[container].url}\n") + + print(f"Last profile edit : {target.sourceIds[container].lastUpdated}\n") + + if container in target.emails: + print(f"Email : {target.emails[container].value}") + else: + print(f"Email : {email_address}\n") + + print(f"Gaia ID : {target.personId}\n") + + if container in target.profileInfos: + print("User types :") + for user_type in target.profileInfos[container].userTypes: + print(f"- {user_type}") + print() + + gb.rc.print(f"📞 Hangouts Extended Data\n", style="green") + + print(f"Is bot : {target.extendedData.hangoutsData.isBot}") + print(f"User type : {target.extendedData.hangoutsData.userType}") + print(f"Past Hangouts State : {target.extendedData.hangoutsData.hadPastHangoutState}") + + gb.rc.print(f"\n🧨 Dynamite Extended Data\n", style="orange1") + + print(f"Presence : {target.extendedData.dynamiteData.presence}") + print(f"Entity Type : {target.extendedData.dynamiteData.entityType}") + print(f"DND State : {target.extendedData.dynamiteData.dndState}") + print(f"Customer ID : {target.extendedData.dynamiteData.customerId}") + + gb.rc.print(f"\n🌐 Google Plus Extended Data\n", style="cyan") + + print(f"Entreprise User : {target.extendedData.gplusData.isEntrepriseUser}") + print(f"Content Restriction : {target.extendedData.gplusData.contentRestriction}") + + if container in target.inAppReachability: + print("\n[+] Activated Google services :") + for app in target.inAppReachability[container].apps: + print(f"- {app}") + + gb.rc.print("\n🗺️ Maps data", style="green4") + + err, stats, reviews, photos = await gmaps.get_reviews(as_client, target.personId) + gmaps.output(err, stats, reviews, photos, target.personId) + + await as_client.aclose() \ No newline at end of file diff --git a/ghunt/modules/gaia.py b/ghunt/modules/gaia.py new file mode 100644 index 00000000..cde0be59 --- /dev/null +++ b/ghunt/modules/gaia.py @@ -0,0 +1,82 @@ +from ghunt.objects.base import GHuntCreds +from ghunt.apis.peoplepa import PeoplePaHttp +from ghunt import globals as gb +from ghunt.lib import gmaps + +import httpx + + +async def hunt(as_client: httpx.AsyncClient, gaia_id: str): + if not as_client: + as_client = httpx.AsyncClient() + + ghunt_creds = GHuntCreds() + ghunt_creds.load_creds() + + gb.rc.print("\n🪁 Google Account data", style="dodger_blue2") + + people_pa = PeoplePaHttp(ghunt_creds) + is_found, target = await people_pa.people(as_client, gaia_id, params_template="max_details") + if not is_found: + await as_client.aclose() + exit("\n[-] The target wasn't found.") + + containers = target.sourceIds + for container in containers: + print(f"\n[{container} CONTAINER]") + if container in target.names: + print(f"Name : {target.names[container].fullname}\n") + + if container in target.profilePhotos: + if target.profilePhotos[container].isDefault: + print("[-] Default profile picture") + else: + print("[+] Custom profile picture !") + print(f"=> {target.profilePhotos[container].url}") + + if container in target.coverPhotos: + if target.coverPhotos[container].isDefault: + print("[-] Default cover picture\n") + else: + print("[+] Custom cover picture !") + print(f"=> {target.coverPhotos[container].url}\n") + + print(f"Last profile edit : {target.sourceIds[container].lastUpdated}\n") + + print(f"Gaia ID : {target.personId}\n") + + if container in target.profileInfos: + print("User types :") + for user_type in target.profileInfos[container].userTypes: + print(f"- {user_type}") + print() + + gb.rc.print(f"📞 Hangouts Extended Data\n", style="green") + + print(f"Is bot : {target.extendedData.hangoutsData.isBot}") + print(f"User type : {target.extendedData.hangoutsData.userType}") + print(f"Past Hangouts State : {target.extendedData.hangoutsData.hadPastHangoutState}") + + gb.rc.print(f"\n🧨 Dynamite Extended Data\n", style="orange1") + + print(f"Presence : {target.extendedData.dynamiteData.presence}") + print(f"Entity Type : {target.extendedData.dynamiteData.entityType}") + print(f"DND State : {target.extendedData.dynamiteData.dndState}") + print(f"Customer ID : {target.extendedData.dynamiteData.customerId}") + + gb.rc.print(f"\n🌐 Google Plus Extended Data\n", style="cyan") + + print(f"Entreprise User : {target.extendedData.gplusData.isEntrepriseUser}") + print(f"Content Restriction : {target.extendedData.gplusData.contentRestriction}") + + if container in target.inAppReachability: + print("\n[+] Activated Google services :") + for app in target.inAppReachability[container].apps: + print(f"- {app}") + + gb.rc.print("\n🗺️ Maps data", style="green4") + + err, stats, reviews, photos = await gmaps.get_reviews(as_client, target.personId) + gmaps.output(err, stats, reviews, photos, target.personId) + + await as_client.aclose() \ No newline at end of file diff --git a/ghunt/modules/login.py b/ghunt/modules/login.py new file mode 100644 index 00000000..78f36f37 --- /dev/null +++ b/ghunt/modules/login.py @@ -0,0 +1,180 @@ +import json +import base64 +from typing import * + +import httpx +from bs4 import BeautifulSoup as bs + +from ghunt import globals as gb +from ghunt.lib.utils import * +from ghunt.lib import listener +from ghunt.lib.knowledge import get_domain_of_service +from ghunt.objects.base import GHuntCreds + + +def save_cookies_and_keys(ghunt_creds: GHuntCreds, creds_path: str): + """Save cookies, OSIDs and tokens to the specified file.""" + data = { + "cookies": ghunt_creds.cookies, + "osids": ghunt_creds.osids, + "tokens": ghunt_creds.tokens + } + with open(creds_path, "w") as f: + f.write(json.dumps(data, indent=4)) + + print(f"\n[+] Creds have been saved in {creds_path} !") + +def gen_osids(cookies: Dict[str, str], osids: Dict[str, str]) -> Dict[str, str]: + """ + Generate OSIDs of given services names, + contained in the "osids" dict argument. + """ + for service in osids: + domain = get_domain_of_service(service) + req = httpx.get(f"https://accounts.google.com/ServiceLogin?service={service}&osid=1&continue=https://{domain}/&followup=https://{domain}/&authuser=0", + cookies=cookies, headers=gb.config.headers) + + body = bs(req.text, 'html.parser') + + params = {x.attrs["name"]:x.attrs["value"] for x in body.find_all("input", {"type":"hidden"})} + + headers = {**gb.config.headers, **{"Content-Type": "application/x-www-form-urlencoded"}} + req = httpx.post(f"https://{domain}/accounts/SetOSID", cookies=cookies, data=params, headers=headers) + + osid_header = [x for x in req.headers["set-cookie"].split(", ") if x.startswith("OSID")] + if not osid_header: + exit("[-] No OSID header detected, exiting...") + + osids[service] = osid_header[0].split("OSID=")[1].split(";")[0] + + return osids + +# Tokens extractions + +def get_gdocs_token(cookies: Dict[str, str]) -> str: + """Extracts the Google Docs token.""" + req = httpx.get("https://docs.google.com/document/u/0/", cookies=cookies, headers=gb.config.headers) + trigger = '\"token\":\"' + if trigger not in req.text: + exit("[-] I can't find the Google Docs token in the source code...\n") + else: + gdoc_token = req.text.split(trigger)[1][:100].split('"')[0] + return gdoc_token + +def check_cookies(cookies) -> bool: + """Checks the validity of given cookies.""" + req = httpx.get("https://docs.google.com", cookies=cookies, headers=gb.config.headers) + if req.status_code != 307: + return False + + set_cookies = extract_set_cookies(req) + if any([cookie in set_cookies for cookie in cookies]): + return False + + return True + +def check_osids(cookies, osids) -> bool: + """Checks the validity of given OSIDs.""" + for service in osids: + domain = get_domain_of_service(service) + cookies_with_osid = inject_osid(cookies, osids, service) + wanted = ["authuser", "continue", "osidt", "ifkv"] + req = httpx.get(f"https://accounts.google.com/ServiceLogin?service={service}&osid=1&continue=https://{domain}/&followup=https://{domain}/&authuser=0", + cookies=cookies_with_osid, headers=gb.config.headers) + + body = bs(req.text, 'html.parser') + params = [x.attrs["name"] for x in body.find_all("input", {"type":"hidden"})] + if not all([param in wanted for param in params]): + return False + + return True + +def getting_cookies_dialog(cookies: Dict[str, str]) -> Dict[str, str] : + """ + Launch the dialog that asks the user + how he want to generate its credentials. + """ + choices = ("You can facilitate configuring GHunt by using the GHunt Companion extension on Firefox, Chrome, Edge and Opera here :\n" + "=> https://github.com/mxrch/ghunt_companion\n\n" + "[1] (Companion) Put GHunt on listening mode (currently not compatible with docker)\n" + "[2] (Companion) Paste base64-encoded cookies\n" + "[3] Enter manually all cookies\n\n" + "Choice => ") + + choice = input(choices) + if choice not in ["1","2","3"]: + exit("Please choose a valid choice. Exiting...") + + if choice == "1": + received_cookies = listener.run() + cookies = json.loads(base64.b64decode(received_cookies)) + + elif choice == "2": + received_cookies = input("Paste the cookies here => ") + cookies = json.loads(base64.b64decode(received_cookies)) + + elif choice == "3": + for name in cookies.keys(): + if not cookies[name]: + cookies[name] = input(f"{name} => ").strip().strip('\"') + + return cookies + +def check_and_login() -> None: + """Check the users credentials validity, and generate new ones.""" + + ghunt_creds = GHuntCreds() + ghunt_creds.load_creds() + + cookies = {"SID": "", "SSID": "", "APISID": "", "SAPISID": "", "HSID": "", "LSID": "", "__Secure-3PSID": "", "CONSENT": gb.config.default_consent_cookie, "PREF": gb.config.default_pref_cookie} + osids = {"cloudconsole": ""} + + new_cookies_entered = False + if not ghunt_creds.are_creds_loaded(): + cookies = getting_cookies_dialog(cookies) + new_cookies_entered = True + else: + # in case user wants to enter new cookies (example: for new account) + valid_cookies = check_cookies(ghunt_creds.cookies) + if valid_cookies: + print("[+] The cookies seem valid !") + valid_osids = check_osids(ghunt_creds.cookies, ghunt_creds.osids) + if valid_osids: + print("[+] The OSIDs seem valid !") + else: + print("[-] Seems like the OSIDs are invalid.") + else: + print("[-] Seems like the cookies are invalid.") + new_gen_inp = input("\nDo you want to input new cookies ? (Y/n) ").lower() + if new_gen_inp == "y": + cookies = getting_cookies_dialog(cookies) + new_cookies_entered = True + elif not valid_cookies: + exit("Please put valid cookies. Exiting...") + + + # Validate cookies + if new_cookies_entered or not ghunt_creds.are_creds_loaded(): + valid_cookies = check_cookies(cookies) + if valid_cookies: + print("\n[+] The cookies seems valid !") + else: + exit("\n[-] Seems like the cookies are invalid, try regenerating them.") + + if not new_cookies_entered: + cookies = ghunt_creds.cookies + choice = input("Do you want to generate new tokens ? (Y/n) ").lower() + if choice != "y": + exit() + + # Feed the GHuntCreds object + ghunt_creds.cookies = cookies + + # Start the extraction process + ghunt_creds.tokens["gdocs"] = get_gdocs_token(cookies) + print(f'Google Docs Token => {ghunt_creds.tokens["gdocs"]}') + + print("Generating OSID for the Cloud Console...") + ghunt_creds.osids = gen_osids(cookies, osids) + + save_cookies_and_keys(ghunt_creds, gb.config.creds_path) diff --git a/ghunt/objects/apis.py b/ghunt/objects/apis.py new file mode 100644 index 00000000..17d5ea54 --- /dev/null +++ b/ghunt/objects/apis.py @@ -0,0 +1,90 @@ +from ghunt.errors import GHuntCorruptedHeadersError +from ghunt.lib.knowledge import get_origin_of_key, get_api_key +from ghunt.objects.base import GHuntCreds +from ghunt.lib.utils import gen_sapisidhash, is_headers_syntax_good +from ghunt.errors import * +from ghunt.lib.utils import get_class_name + +import httpx + + +class EndpointConfig(): + def __init__(self, headers: dict[str, str], sapisidhash: str, cookies: str): + self.headers = headers + self.sapisidhash = sapisidhash + self.cookies = cookies + +class HttpAPI(): + def _load_api(self, creds: GHuntCreds, headers: dict[str, str]): + if not creds.are_creds_loaded(): + raise GHuntInsufficientCreds(f"This API requires a loaded GhuntCreds object, but it is not.") + + if not is_headers_syntax_good(headers): + raise GHuntCorruptedHeadersError(f"The provided headers when loading the endpoint seems corrupted, please check it : {headers}") + + self.key_origin = None + if (key_name := self.require_key): + if not get_api_key(key_name): + raise GHuntInsufficientCreds(f"This API requires the {key_name} API key in the GhuntCreds object, but it isn't loaded.") + key_origin = get_origin_of_key(key_name) + self.headers = {**headers, "Origin": key_origin, "Referer": key_origin, "X-Goog-Api-Key": get_api_key(key_name)} + self.key_origin = key_origin + + self.creds = creds + self.loaded_endpoints : dict[str, EndpointConfig] = {} + + def _load_endpoint(self, endpoint_name: str, require_sapisidhash: bool, require_cookies: bool): + if endpoint_name in self.loaded_endpoints: + return + + if require_sapisidhash: + if not (sapisidhash := self.creds.cookies.get("SAPISID")): + raise GHuntInsufficientCreds(f"This endpoint requires the SAPISID cookie in the GhuntCreds object, but it isn't loaded.") + headers = {**self.headers, "Authorization": f"SAPISIDHASH {gen_sapisidhash(sapisidhash, self.key_origin)}"} + + if require_cookies: + if not self.creds.cookies: + raise GHuntInsufficientCreds(f"This endpoint requires the cookies in the GhuntCreds object, but they aren't loaded.") + + self.creds = self.creds + self.loaded_endpoints[endpoint_name] = EndpointConfig(headers, sapisidhash, self.creds.cookies) + + async def _query(self, as_client: httpx.AsyncClient, verb: str, endpoint_name: str, base_url: str, params_template: dict[str, any]) -> httpx.Response: + endpoint = self.loaded_endpoints[endpoint_name] + + if verb == "GET": + req = await as_client.get(f"{self.scheme}://{self.hostname}{base_url}", + params=params_template, headers=endpoint.headers, cookies=endpoint.cookies) + elif verb == "POST": + req = await as_client.post(f"{self.scheme}://{self.hostname}{base_url}", + data=params_template, headers=endpoint.headers, cookies=endpoint.cookies) + else: + raise GHuntUnknownVerbError(f"The provided verb {verb} wasn't recognized by GHunt.") + + return req + +class Parser(): + def _merge(self, obj) -> any: + """Merging two objects of the same class.""" + + def recursive_merge(obj1, obj2, module_name: str) -> any: + directions = [(obj1, obj2), (obj2, obj1)] + for direction in directions: + from_obj, target_obj = direction + for attr_name, attr_value in from_obj.__dict__.items(): + class_name = get_class_name(attr_value) + if class_name.startswith(module_name) and attr_name in target_obj.__dict__: + merged_obj = recursive_merge(attr_value, target_obj.__dict__[attr_name], module_name) + target_obj.__dict__[attr_name] = merged_obj + + elif not attr_name in target_obj.__dict__ or \ + (attr_value and not target_obj.__dict__.get(attr_name)): + target_obj.__dict__[attr_name] = attr_value + return obj1 + + class_name = get_class_name(self) + module_name = self.__module__ + if not get_class_name(obj).startswith(class_name): + raise GHuntObjectsMergingError("The two objects being merged aren't from the same class.") + + self = recursive_merge(self, obj, module_name) \ No newline at end of file diff --git a/ghunt/objects/base.py b/ghunt/objects/base.py new file mode 100644 index 00000000..13c2d032 --- /dev/null +++ b/ghunt/objects/base.py @@ -0,0 +1,83 @@ +from typing import * +from pathlib import Path +import json +from dateutil.relativedelta import relativedelta +from datetime import datetime + +from ghunt import globals as gb +from ghunt.lib.utils import gprint + + +class GHuntCreds(): + """ + This object stores all the needed credentials that GHunt uses, + such as cookies, OSIDs, keys and tokens. + """ + + def __init__( + self, + cookies: Dict[str, str] = {}, + osids: Dict[str, str] = {}, + tokens: Dict[str, str] = {}, + creds_path: str = "" + ) -> None: + self.cookies = cookies + self.osids = osids + self.tokens = tokens + self.creds_path = creds_path if creds_path else gb.config.creds_path + + def are_creds_loaded(self) -> bool: + return all([self.cookies, self.osids, self.tokens]) + + def load_creds(self) -> None: + """ Returns cookies, OSIDs and tokens if they exist """ + if Path(self.creds_path).is_file(): + try: + with open(self.creds_path, 'r') as f: + out = json.loads(f.read()) + self.cookies = out["cookies"] + self.osids = out["osids"] + self.tokens = out["tokens"] + gprint("[+] Detected stored cookies") + except Exception: + gprint("[-] Stored cookies are corrupted\n") + else: + gprint("[-] No stored cookies found\n") + +class Position(): + def __init__(self): + self.latitude: float = 0.0 + self.longitude: float = 0.0 + +class MapsGuidedAnswer(): + def __init__(self): + self.id: str = "" + self.question: str = "" + self.answer: str = "" + +class MapsLocation(): + def __init__(self): + self.id: str = "" + self.name: str = "" + self.address: str = "" + self.position: Position = Position() + self.tags: list[str] = [] + self.types: list[str] = [] + self.cost: int = 0 # 1-4 + +class MapsReview(): + def __init__(self): + self.id: str = "" + self.comment: str = "" + self.rating: int = 0 + self.location: MapsLocation = MapsLocation() + self.guided_answers: list[MapsGuidedAnswer] = [] + self.approximative_date: relativedelta = None + +class MapsPhoto(): + def __init__(self): + self.id: str = "" + self.url: str = "" + self.location: MapsLocation = MapsLocation() + self.approximative_date: relativedelta = None + self.exact_date: datetime = None diff --git a/ghunt/objects/utils.py b/ghunt/objects/utils.py new file mode 100644 index 00000000..c3c21c7d --- /dev/null +++ b/ghunt/objects/utils.py @@ -0,0 +1,16 @@ +from ghunt.lib.utils import gprint + + +class TMPrinter(): + def __init__(self): + self.max_len = 0 + + def out(self, text: str): + if len(text) > self.max_len: + self.max_len = len(text) + else: + text += (" " * (self.max_len - len(text))) + gprint(text, end='\r') + + def clear(self): + gprint(" " * self.max_len, end="\r") \ No newline at end of file diff --git a/ghunt/parsers/__init__.py b/ghunt/parsers/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ghunt/parsers/people.py b/ghunt/parsers/people.py new file mode 100644 index 00000000..f070c816 --- /dev/null +++ b/ghunt/parsers/people.py @@ -0,0 +1,190 @@ +from datetime import datetime + +from ghunt.errors import * +from ghunt.lib.utils import is_default_profile_pic +from ghunt.objects.apis import Parser + +import httpx + + +class PersonGplusExtendedData(): + def __init__(self): + self.contentRestriction: str = "" + self.isEntrepriseUser: bool = False + + def _scrape(self, gplus_data): + self.contentRestriction = gplus_data.get("contentRestriction") + + if (isEnterpriseUser := gplus_data.get("isEnterpriseUser")): + self.isEntrepriseUser = isEnterpriseUser + +class PersonDynamiteExtendedData(): + def __init__(self): + self.presence: str = "" + self.entityType: str = "" + self.dndState: str = "" + self.customerId: str = "" + + def _scrape(self, dynamite_data): + self.presence = dynamite_data.get("presence") + self.entityType = dynamite_data.get("entityType") + self.dndState = dynamite_data.get("dndState") + if (customerId := dynamite_data.get("organizationInfo", {}).get("customerInfo", {}). + get("customerId", {}).get("customerId")): + self.customerId = customerId + + +class PersonHangoutsExtendedData(): + def __init__(self): + self.isBot: bool = False + self.userType: str = "" + self.hadPastHangoutState: str = "" + + def _scrape(self, hangouts_data): + if (isBot := hangouts_data.get("isBot")): + self.isBot = isBot + + self.userType = hangouts_data.get("userType") + self.hadPastHangoutState = hangouts_data.get("hasPastHangoutState") + +class PersonExtendedData(): + def __init__(self): + self.hangoutsData: PersonHangoutsExtendedData = PersonHangoutsExtendedData() + self.dynamiteData: PersonDynamiteExtendedData = PersonDynamiteExtendedData() + self.gplusData: PersonGplusExtendedData = PersonGplusExtendedData() + + def _scrape(self, extended_data: dict[str, any]): + if (hangouts_data := extended_data.get("hangoutsExtendedData")): + self.hangoutsData._scrape(hangouts_data) + + if (dynamite_data := extended_data.get("dynamiteExtendedData")): + self.dynamiteData._scrape(dynamite_data) + + if (gplus_data := extended_data.get("gplusExtendedData")): + self.gplusData._scrape(gplus_data) + +class PersonPhoto(): + def __init__(self): + self.url: str = "" + self.isDefault: bool = False + + async def _scrape(self, as_client: httpx.AsyncClient, photo_data: dict[str, any], photo_type: str): + if photo_type == "profile_photo": + self.url = photo_data.get("url") + self.isDefault = await is_default_profile_pic(as_client, self.url) + + elif photo_type == "cover_photo": + self.url = '='.join(photo_data.get("imageUrl").split("=")[:-1]) + if (isDefault := photo_data.get("isDefault")): + self.isDefault = isDefault + else: + raise GHuntAPIResponseParsingError(f'The provided photo type "{photo_type}" weren\'t recognized.') + +class PersonEmail(): + def __init__(self): + self.value: str = "" + + def _scrape(self, email_data: dict[str, any]): + self.value = email_data.get("value") + +class PersonName(): + def __init__(self): + self.fullname: str = "" + self.firstName: str = "" + self.lastName: str = "" + + def _scrape(self, name_data: dict[str, any]): + self.fullname = name_data.get("displayName") + self.firstName = name_data.get("givenName") + self.lastName = name_data.get("familyName") + +class PersonProfileInfo(): + def __init__(self): + self.userTypes: list[str] = [] + + def _scrape(self, profile_data: dict[str, any]): + if "ownerUserType" in profile_data: + self.userTypes += profile_data.get("ownerUserType") + +class PersonSourceIds(): + def __init__(self): + self.lastUpdated: datetime = None + + def _scrape(self, source_ids_data: dict[str, any]): + if (timestamp := source_ids_data.get("lastUpdatedMicros")): + self.lastUpdated = datetime.utcfromtimestamp(float(timestamp[:-6])).strftime("%Y/%m/%d %H:%M:%S (UTC)") + +class PersonInAppReachability(): + def __init__(self): + self.apps: list[str] = [] + + def _scrape(self, apps_data, container_name: str): + for app in apps_data: + if app["metadata"]["container"] == container_name: + self.apps.append(app["appType"].title()) + +class PersonContainers(dict): + pass + +class Person(Parser): + def __init__(self): + self.personId: str = "" + self.sourceIds: dict[str, PersonSourceIds] = PersonContainers() # All the fetched containers + self.emails: dict[str, PersonEmail] = PersonContainers() + self.names: dict[str, PersonName] = PersonContainers() + self.profileInfos: dict[str, PersonProfileInfo] = PersonContainers() + self.profilePhotos: dict[str, PersonPhoto] = PersonContainers() + self.coverPhotos: dict[str, PersonPhoto] = PersonContainers() + self.inAppReachability: dict[str, PersonInAppReachability] = PersonContainers() + self.extendedData: PersonExtendedData = PersonExtendedData() + + async def _scrape(self, as_client: httpx.AsyncClient, person_data: dict[str, any]): + self.personId = person_data.get("personId") + if person_data.get("email"): + for email_data in person_data["email"]: + person_email = PersonEmail() + person_email._scrape(email_data) + self.emails[email_data["metadata"]["container"]] = person_email + + if person_data.get("name"): + for name_data in person_data["name"]: + person_name = PersonName() + person_name._scrape(name_data) + self.names[name_data["metadata"]["container"]] = person_name + + if person_data.get("readOnlyProfileInfo"): + for profile_data in person_data["readOnlyProfileInfo"]: + person_profile = PersonProfileInfo() + person_profile._scrape(profile_data) + self.profileInfos[profile_data["metadata"]["container"]] = person_profile + + if (source_ids := person_data.get("metadata", {}).get("identityInfo", {}).get("sourceIds")): + for source_ids_data in source_ids: + person_source_ids = PersonSourceIds() + person_source_ids._scrape(source_ids_data) + self.sourceIds[source_ids_data["container"]] = person_source_ids + + if person_data.get("photo"): + for photo_data in person_data["photo"]: + person_photo = PersonPhoto() + await person_photo._scrape(as_client, photo_data, "profile_photo") + self.profilePhotos[profile_data["metadata"]["container"]] = person_photo + + if person_data.get("coverPhoto"): + for cover_photo_data in person_data["coverPhoto"]: + person_cover_photo = PersonPhoto() + await person_cover_photo._scrape(as_client, cover_photo_data, "cover_photo") + self.coverPhotos[cover_photo_data["metadata"]["container"]] = person_cover_photo + + if (apps_data := person_data.get("inAppReachability")): + containers_names = set() + for app_data in person_data["inAppReachability"]: + containers_names.add(app_data["metadata"]["container"]) + + for container_name in containers_names: + person_app_reachability = PersonInAppReachability() + person_app_reachability._scrape(apps_data, container_name) + self.inAppReachability[container_name] = person_app_reachability + + if (extended_data := person_data.get("extendedData")): + self.extendedData._scrape(extended_data) \ No newline at end of file diff --git a/lib/banner.py b/lib/banner.py deleted file mode 100644 index 53f66b7c..00000000 --- a/lib/banner.py +++ /dev/null @@ -1,18 +0,0 @@ -from colorama import init, Fore, Back, Style - -def banner(): - init() - - banner = """ - """ + Fore.RED + """ .d8888b. """ + Fore.BLUE + """888 888""" + Fore.RED + """ 888 - """ + Fore.RED + """d88P Y88b """ + Fore.BLUE + """888 888""" + Fore.RED + """ 888 - """ + Fore.YELLOW + """888 """ + Fore.RED + """888 """ + Fore.BLUE + """888 888""" + Fore.RED + """ 888 - """ + Fore.YELLOW + """888 """ + Fore.BLUE + """8888888888""" + Fore.GREEN + """ 888 888""" + Fore.YELLOW + """ 88888b. """ + Fore.RED + """ 888888 - """ + Fore.YELLOW + """888 """ + Fore.BLUE + """88888 """ + Fore.BLUE + """888 888""" + Fore.GREEN + """ 888 888""" + Fore.YELLOW + """ 888 "88b""" + Fore.RED + """ 888 - """ + Fore.YELLOW + """888 """ + Fore.BLUE + """888 """ + Fore.BLUE + """888 888""" + Fore.GREEN + """ 888 888""" + Fore.YELLOW + """ 888 888""" + Fore.RED + """ 888 - """ + Fore.GREEN + """Y88b d88P """ + Fore.BLUE + """888 888""" + Fore.GREEN + """ Y88b 888""" + Fore.YELLOW + """ 888 888""" + Fore.RED + """ Y88b. - """ + Fore.GREEN + """ "Y8888P88 """ + Fore.BLUE + """888 888""" + Fore.GREEN + """ "Y88888""" + Fore.YELLOW + """ 888 888""" + Fore.RED + """ "Y888 - """ + Fore.RESET - - print(banner) - diff --git a/lib/calendar.py b/lib/calendar.py deleted file mode 100644 index 37045b02..00000000 --- a/lib/calendar.py +++ /dev/null @@ -1,91 +0,0 @@ -import httpx -from dateutil.relativedelta import relativedelta -from beautifultable import BeautifulTable -from termcolor import colored - -import time -import json -from datetime import datetime, timezone -from urllib.parse import urlencode - - -# assembling the json request url endpoint -def assemble_api_req(calendarId, singleEvents, maxAttendees, maxResults, sanitizeHtml, timeMin, API_key, email): - base_url = f"https://clients6.google.com/calendar/v3/calendars/{email}/events?" - params = { - "calendarId": calendarId, - "singleEvents": singleEvents, - "maxAttendees": maxAttendees, - "maxResults": maxResults, - "timeMin": timeMin, - "key": API_key - } - base_url += urlencode(params, doseq=True) - return base_url - -# from iso to datetime object in utc -def get_datetime_utc(date_str): - date = datetime.fromisoformat(date_str) - margin = date.utcoffset() - return date.replace(tzinfo=timezone.utc) - margin - -# main method of calendar.py -def fetch(email, client, config): - if not config.calendar_cookies: - cookies = {"CONSENT": config.default_consent_cookie} - client.cookies = cookies - url_endpoint = f"https://calendar.google.com/calendar/u/0/embed?src={email}" - print("\nGoogle Calendar : " + url_endpoint) - req = client.get(url_endpoint + "&hl=en") - source = req.text - try: - # parsing parameters from source code - calendarId = source.split('title\":\"')[1].split('\"')[0] - singleEvents = "true" - maxAttendees = 1 - maxResults = 250 - sanitizeHtml = "true" - timeMin = datetime.strptime(source.split('preloadStart\":\"')[1].split('\"')[0], '%Y%m%d').replace(tzinfo=timezone.utc).isoformat() - API_key = source.split('developerKey\":\"')[1].split('\"')[0] - except IndexError: - return False - - json_calendar_endpoint = assemble_api_req(calendarId, singleEvents, maxAttendees, maxResults, sanitizeHtml, timeMin, API_key, email) - req = client.get(json_calendar_endpoint) - data = json.loads(req.text) - events = [] - try: - for item in data["items"]: - title = item["summary"] - start = get_datetime_utc(item["start"]["dateTime"]) - end = get_datetime_utc(item["end"]["dateTime"]) - - events.append({"title": title, "start": start, "end": end}) - except KeyError: - return False - - return {"status": "available", "events": events} - -def out(events): - limit = 5 - now = datetime.utcnow().replace(tzinfo=timezone.utc) - after = [date for date in events if date["start"] >= now][:limit] - before = [date for date in events if date["start"] <= now][:limit] - print(f"\n=> The {'next' if after else 'last'} {len(after) if after else len(before)} event{'s' if (len(after) > 1) or (not after and len(before) > 1) else ''} :") - target = after if after else before - - table = BeautifulTable() - table.set_style(BeautifulTable.STYLE_GRID) - table.columns.header = [colored(x, attrs=['bold']) for x in ["Name", "Datetime (UTC)", "Duration"]] - for event in target: - title = event["title"] - duration = relativedelta(event["end"], event["start"]) - if duration.days or duration.hours or duration.minutes: - duration = (f"{(str(duration.days) + ' day' + ('s' if duration.days > 1 else '')) if duration.days else ''} " - f"{(str(duration.hours) + ' hour' + ('s' if duration.hours > 1 else '')) if duration.hours else ''} " - f"{(str(duration.minutes) + ' minute' + ('s' if duration.minutes > 1 else '')) if duration.minutes else ''}").strip() - else: - duration = "?" - date = event["start"].strftime("%Y/%m/%d %H:%M:%S") - table.rows.append([title, date, duration]) - print(table) \ No newline at end of file diff --git a/lib/gmaps.py b/lib/gmaps.py deleted file mode 100644 index bb4acc35..00000000 --- a/lib/gmaps.py +++ /dev/null @@ -1,253 +0,0 @@ -import hashlib -import re -import time -from datetime import datetime - -from dateutil.relativedelta import relativedelta -from geopy import distance -from geopy.geocoders import Nominatim -from selenium.webdriver.common.by import By -from selenium.webdriver.support import expected_conditions as EC -from selenium.webdriver.support.ui import WebDriverWait -from seleniumwire import webdriver -from webdriver_manager.chrome import ChromeDriverManager - -from lib.utils import * - - -def scrape(gaiaID, client, cookies, config, headers, regex_rev_by_id, is_headless): - def get_datetime(datepublished): - if datepublished.split()[0] == "a": - nb = 1 - else: - nb = int(datepublished.split()[0]) - if "minute" in datepublished: - delta = relativedelta(minutes=nb) - elif "hour" in datepublished: - delta = relativedelta(hours=nb) - elif "day" in datepublished: - delta = relativedelta(days=nb) - elif "week" in datepublished: - delta = relativedelta(weeks=nb) - elif "month" in datepublished: - delta = relativedelta(months=nb) - elif "year" in datepublished: - delta = relativedelta(years=nb) - else: - delta = relativedelta() - return (datetime.today() - delta).replace(microsecond=0, second=0) - - tmprinter = TMPrinter() - - base_url = f"https://www.google.com/maps/contrib/{gaiaID}/reviews?hl=en" - print(f"\nGoogle Maps : {base_url.replace('?hl=en', '')}") - - tmprinter.out("Initial request...") - - req = client.get(base_url) - source = req.text - - data = source.split(';window.APP_INITIALIZATION_STATE=')[1].split(';window.APP_FLAGS')[0].replace("\\", "") - - if "/maps/reviews/data" not in data: - tmprinter.out("") - print("[-] No reviews") - return False - - chrome_options = get_chrome_options_args(is_headless) - options = { - 'connection_timeout': None # Never timeout, otherwise it floods errors - } - - tmprinter.out("Starting browser...") - - driverpath = get_driverpath() - driver = webdriver.Chrome(executable_path=driverpath, seleniumwire_options=options, options=chrome_options) - driver.header_overrides = headers - wait = WebDriverWait(driver, 15) - - tmprinter.out("Setting cookies...") - driver.get("https://www.google.com/robots.txt") - - if not config.gmaps_cookies: - cookies = {"CONSENT": config.default_consent_cookie} - for k, v in cookies.items(): - driver.add_cookie({'name': k, 'value': v}) - - tmprinter.out("Fetching reviews page...") - driver.get(base_url) - - wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, 'div.section-scrollbox'))) - scrollbox = driver.find_element(By.CSS_SELECTOR, 'div.section-scrollbox') - - tab_info = scrollbox.find_element(By.TAG_NAME, "div") - if tab_info and tab_info.text: - scroll_max = sum([int(x) for x in tab_info.text.split() if x.isdigit()]) - else: - return False - - tmprinter.clear() - print(f"[+] {scroll_max} reviews found !") - - timeout = scroll_max * 1.25 - timeout_start = time.time() - reviews_elements = driver.find_elements_by_xpath('//div[@data-review-id][@aria-label]') - tmprinter.out(f"Fetching reviews... ({len(reviews_elements)}/{scroll_max})") - while len(reviews_elements) < scroll_max: - driver.execute_script("arguments[0].scrollTop = arguments[0].scrollHeight", scrollbox) - reviews_elements = driver.find_elements_by_xpath('//div[@data-review-id][@aria-label]') - tmprinter.out(f"Fetching reviews... ({len(reviews_elements)}/{scroll_max})") - if time.time() > timeout_start + timeout: - tmprinter.out(f"Timeout while fetching reviews !") - break - - tmprinter.out("Fetching internal requests history...") - requests = [r.url for r in driver.requests if "locationhistory" in r.url] - tmprinter.out(f"Fetching internal requests... (0/{len(requests)})") - for nb, load in enumerate(requests): - req = client.get(load) - data += req.text.replace('\n', '') - tmprinter.out(f"Fetching internal requests... ({nb + 1}/{len(requests)})") - - tmprinter.out(f"Fetching reviews location... (0/{len(reviews_elements)})") - reviews = [] - rating = 0 - for nb, review in enumerate(reviews_elements): - id = review.get_attribute("data-review-id") - location = re.compile(regex_rev_by_id.format(id)).findall(data)[0] - try: - stars = review.find_element(By.CSS_SELECTOR, 'span[aria-label$="stars "]') - except Exception: - stars = review.find_element(By.CSS_SELECTOR, 'span[aria-label$="star "]') - rating += int(stars.get_attribute("aria-label").strip().split()[0]) - date = get_datetime(stars.find_element(By.XPATH, "following-sibling::span").text) - reviews.append({"location": location, "date": date}) - tmprinter.out(f"Fetching reviews location... ({nb + 1}/{len(reviews_elements)})") - - rating_avg = rating / len(reviews) - tmprinter.clear() - print(f"[+] Average rating : {int(rating_avg) if int(rating_avg) / round(rating_avg, 1) == 1 else round(rating_avg, 1)}/5 stars !") - # 4.9 => 4.9, 5.0 => 5, we don't show the 0 - return reviews - - -def avg_location(locs): - latitude = [] - longitude = [] - for loc in locs: - latitude.append(float(loc[0])) - longitude.append(float(loc[1])) - - latitude = sum(latitude) / len(latitude) - longitude = sum(longitude) / len(longitude) - return latitude, longitude - - -def translate_confidence(percents): - if percents >= 100: - return "Extremely high" - elif percents >= 80: - return "Very high" - elif percents >= 60: - return "Little high" - elif percents >= 40: - return "Okay" - elif percents >= 20: - return "Low" - elif percents >= 10: - return "Very low" - else: - return "Extremely low" - - -def get_confidence(geolocator, data, gmaps_radius): - tmprinter = TMPrinter() - radius = gmaps_radius - - locations = {} - tmprinter.out(f"Calculation of the distance of each review...") - for nb, review in enumerate(data): - hash = hashlib.md5(str(review).encode()).hexdigest() - if hash not in locations: - locations[hash] = {"dates": [], "locations": [], "range": None, "score": 0} - location = review["location"] - for review2 in data: - location2 = review2["location"] - dis = distance.distance(location, location2).km - - if dis <= radius: - locations[hash]["dates"].append(review2["date"]) - locations[hash]["locations"].append(review2["location"]) - - maxdate = max(locations[hash]["dates"]) - mindate = min(locations[hash]["dates"]) - locations[hash]["range"] = maxdate - mindate - tmprinter.out(f"Calculation of the distance of each review ({nb}/{len(data)})...") - - tmprinter.out("") - - locations = {k: v for k, v in - sorted(locations.items(), key=lambda k: len(k[1]["locations"]), reverse=True)} # We sort it - - tmprinter.out("Identification of redundant areas...") - to_del = [] - for hash in locations: - if hash in to_del: - continue - for hash2 in locations: - if hash2 in to_del or hash == hash2: - continue - if all([loc in locations[hash]["locations"] for loc in locations[hash2]["locations"]]): - to_del.append(hash2) - for hash in to_del: - del locations[hash] - - tmprinter.out("Calculating confidence...") - maxrange = max([locations[hash]["range"] for hash in locations]) - maxlen = max([len(locations[hash]["locations"]) for hash in locations]) - minreq = 3 - mingroups = 3 - - score_steps = 4 - for hash, loc in locations.items(): - if len(loc["locations"]) == maxlen: - locations[hash]["score"] += score_steps * 4 - if loc["range"] == maxrange: - locations[hash]["score"] += score_steps * 3 - if len(locations) >= mingroups: - others = sum([len(locations[h]["locations"]) for h in locations if h != hash]) - if len(loc["locations"]) > others: - locations[hash]["score"] += score_steps * 2 - if len(loc["locations"]) >= minreq: - locations[hash]["score"] += score_steps - - # for hash,loc in locations.items(): - # print(f"{hash} => {len(loc['locations'])} ({int(loc['score'])/40*100})") - - panels = sorted(set([loc["score"] for loc in locations.values()]), reverse=True) - - maxscore = sum([p * score_steps for p in range(1, score_steps + 1)]) - for panel in panels: - locs = [loc for loc in locations.values() if loc["score"] == panel] - if len(locs[0]["locations"]) == 1: - panel /= 2 - if len(data) < 4: - panel /= 2 - confidence = translate_confidence(panel / maxscore * 100) - for nb, loc in enumerate(locs): - avg = avg_location(loc["locations"]) - #import pdb; pdb.set_trace() - while True: - try: - location = geolocator.reverse(f"{avg[0]}, {avg[1]}", timeout=10).raw["address"] - break - except: - pass - location = sanitize_location(location) - locs[nb]["avg"] = location - del locs[nb]["locations"] - del locs[nb]["score"] - del locs[nb]["range"] - del locs[nb]["dates"] - tmprinter.out("") - return confidence, locs diff --git a/lib/metadata.py b/lib/metadata.py deleted file mode 100644 index 0036c853..00000000 --- a/lib/metadata.py +++ /dev/null @@ -1,188 +0,0 @@ -from datetime import datetime - -from PIL import ExifTags -from PIL.ExifTags import TAGS, GPSTAGS -from geopy.geocoders import Nominatim - -from lib.utils import * - - -class ExifEater(): - - def __init__(self): - self.devices = {} - self.softwares = {} - self.locations = {} - self.geolocator = Nominatim(user_agent="nominatim") - - def get_GPS(self, img): - location = "" - geoaxis = {} - geotags = {} - try: - exif = img._getexif() - - for (idx, tag) in TAGS.items(): - if tag == 'GPSInfo': - if idx in exif: - for (key, val) in GPSTAGS.items(): - if key in exif[idx]: - geotags[val] = exif[idx][key] - - for axis in ["Latitude", "Longitude"]: - dms = geotags[f'GPS{axis}'] - ref = geotags[f'GPS{axis}Ref'] - - degrees = dms[0][0] / dms[0][1] - minutes = dms[1][0] / dms[1][1] / 60.0 - seconds = dms[2][0] / dms[2][1] / 3600.0 - - if ref in ['S', 'W']: - degrees = -degrees - minutes = -minutes - seconds = -seconds - - geoaxis[axis] = round(degrees + minutes + seconds, 5) - location = \ - self.geolocator.reverse("{}, {}".format(geoaxis["Latitude"], geoaxis["Longitude"])).raw[ - "address"] - except Exception: - return "" - else: - if location: - location = sanitize_location(location) - if not location: - return "" - return f'{location["town"]}, {location["country"]}' - else: - return "" - - def feed(self, img): - try: - img._getexif() - except: - try: - img._getexif = img.getexif - except: - img._getexif = lambda d={}:d - if img._getexif(): - location = self.get_GPS(img) - exif = {ExifTags.TAGS[k]: v for k, v in img._getexif().items() if k in ExifTags.TAGS} - interesting_fields = ["Make", "Model", "DateTime", "Software"] - metadata = {k: v for k, v in exif.items() if k in interesting_fields} - try: - date = datetime.strptime(metadata["DateTime"], '%Y:%m:%d %H:%M:%S') - is_date_valid = "Valid" - except Exception: - date = None - is_date_valid = "Invalid" - - if location: - if location not in self.locations: - self.locations[location] = {"Valid": [], "Invalid": []} - self.locations[location][is_date_valid].append(date) - if "Make" in metadata and "Model" in metadata: - if metadata["Model"] not in self.devices: - self.devices[metadata["Model"]] = {"Make": metadata["Make"], - "History": {"Valid": [], "Invalid": []}, "Firmwares": {}} - self.devices[metadata["Model"]]["History"][is_date_valid].append(date) - if "Software" in metadata: - if metadata["Software"] not in self.devices[metadata["Model"]]["Firmwares"]: - self.devices[metadata["Model"]]["Firmwares"][metadata["Software"]] = {"Valid": [], - "Invalid": []} - self.devices[metadata["Model"]]["Firmwares"][metadata["Software"]][is_date_valid].append(date) - elif "Software" in metadata: - if metadata["Software"] not in self.softwares: - self.softwares[metadata["Software"]] = {"Valid": [], "Invalid": []} - self.softwares[metadata["Software"]][is_date_valid].append(date) - - def give_back(self): - return self.locations, self.devices - - def output(self): - bkn = '\n' # to use in f-strings - - def picx(n): - return "s" if n > 1 else "" - - def print_dates(dates_list): - dates = {} - dates["max"] = max(dates_list).strftime("%Y/%m/%d") - dates["min"] = min(dates_list).strftime("%Y/%m/%d") - if dates["max"] == dates["min"]: - return dates["max"] - else: - return f'{dates["min"]} -> {dates["max"]}' - - # pprint((self.devices, self.softwares, self.locations)) - - devices = self.devices - if devices: - print(f"[+] {len(devices)} device{picx(len(devices))} found !") - for model, data in devices.items(): - make = data["Make"] - if model.lower().startswith(make.lower()): - model = model[len(make):].strip() - n = len(data["History"]["Valid"] + data["History"]["Invalid"]) - for validity, dateslist in data["History"].items(): - if dateslist and ( - (validity == "Valid") or (validity == "Invalid" and not data["History"]["Valid"])): - if validity == "Valid": - dates = print_dates(data["History"]["Valid"]) - elif validity == "Valid" and data["History"]["Invalid"]: - dates = print_dates(data["History"]["Valid"]) - dates += " (+ ?)" - elif validity == "Invalid" and not data["History"]["Valid"]: - dates = "?" - print( - f"{bkn if data['Firmwares'] else ''}- {make.capitalize()} {model} ({n} pic{picx(n)}) [{dates}]") - if data["Firmwares"]: - n = len(data['Firmwares']) - print(f"-> {n} Firmware{picx(n)} found !") - for firmware, firmdata in data["Firmwares"].items(): - for validity2, dateslist2 in firmdata.items(): - if dateslist2 and ((validity2 == "Valid") or ( - validity2 == "Invalid" and not firmdata["Valid"])): - if validity2 == "Valid": - dates2 = print_dates(firmdata["Valid"]) - elif validity2 == "Valid" and firmdata["Invalid"]: - dates2 = print_dates(firmdata["Valid"]) - dates2 += " (+ ?)" - elif validity2 == "Invalid" and not firmdata["Valid"]: - dates2 = "?" - print(f"--> {firmware} [{dates2}]") - - locations = self.locations - if locations: - print(f"\n[+] {len(locations)} location{picx(len(locations))} found !") - for location, data in locations.items(): - n = len(data["Valid"] + data["Invalid"]) - for validity, dateslist in data.items(): - if dateslist and ((validity == "Valid") or (validity == "Invalid" and not data["Valid"])): - if validity == "Valid": - dates = print_dates(data["Valid"]) - elif validity == "Valid" and data["Invalid"]: - dates = print_dates(data["Valid"]) - dates += " (+ ?)" - elif validity == "Invalid" and not data["Valid"]: - dates = "?" - print(f"- {location} ({n} pic{picx(n)}) [{dates}]") - - softwares = self.softwares - if softwares: - print(f"\n[+] {len(softwares)} software{picx(len(softwares))} found !") - for software, data in softwares.items(): - n = len(data["Valid"] + data["Invalid"]) - for validity, dateslist in data.items(): - if dateslist and ((validity == "Valid") or (validity == "Invalid" and not data["Valid"])): - if validity == "Valid": - dates = print_dates(data["Valid"]) - elif validity == "Valid" and data["Invalid"]: - dates = print_dates(data["Valid"]) - dates += " (+ ?)" - elif validity == "Invalid" and not data["Valid"]: - dates = "?" - print(f"- {software} ({n} pic{picx(n)}) [{dates}]") - - if not devices and not locations and not softwares: - print("=> Nothing found") diff --git a/lib/os_detect.py b/lib/os_detect.py deleted file mode 100644 index 3091caf0..00000000 --- a/lib/os_detect.py +++ /dev/null @@ -1,46 +0,0 @@ -from platform import system, uname - - -class Os: - """ - returns class with properties: - .cygwin Cygwin detected - .wsl Windows Subsystem for Linux (WSL) detected - .mac Mac OS detected - .linux Linux detected - .bsd BSD detected - """ - - def __init__(self): - syst = system().lower() - - # initialize - self.cygwin = False - self.wsl = False - self.mac = False - self.linux = False - self.windows = False - self.bsd = False - - if 'cygwin' in syst: - self.cygwin = True - self.os = 'cygwin' - elif 'darwin' in syst: - self.mac = True - self.os = 'mac' - elif 'linux' in syst: - self.linux = True - self.os = 'linux' - if 'Microsoft' in uname().release: - self.wsl = True - self.linux = False - self.os = 'wsl' - elif 'windows' in syst: - self.windows = True - self.os = 'windows' - elif 'bsd' in syst: - self.bsd = True - self.os = 'bsd' - - def __str__(self): - return self.os diff --git a/lib/photos.py b/lib/photos.py deleted file mode 100644 index c6ebd558..00000000 --- a/lib/photos.py +++ /dev/null @@ -1,151 +0,0 @@ -import re -from io import BytesIO -import pdb - -from PIL import Image -from selenium.webdriver.common.by import By -from selenium.webdriver.support import expected_conditions as EC -from selenium.webdriver.support.ui import WebDriverWait -from seleniumwire import webdriver -from webdriver_manager.chrome import ChromeDriverManager - -from lib.metadata import ExifEater -from lib.utils import * - - -class element_has_substring_or_substring(object): - def __init__(self, locator, substring1, substring2): - self.locator = locator - self.substring1 = substring1 - self.substring2 = substring2 - - def __call__(self, driver): - element = driver.find_element(*self.locator) # Finding the referenced element - if self.substring1 in element.text: - return self.substring1 - elif self.substring2 in element.text: - return self.substring2 - else: - return False - - -def get_source(gaiaID, client, cookies, headers, is_headless): - baseurl = f"https://get.google.com/albumarchive/{gaiaID}/albums/profile-photos?hl=en" - req = client.get(baseurl) - if req.status_code != 200: - return False - - tmprinter = TMPrinter() - chrome_options = get_chrome_options_args(is_headless) - options = { - 'connection_timeout': None # Never timeout, otherwise it floods errors - } - - tmprinter.out("Starting browser...") - - driverpath = get_driverpath() - driver = webdriver.Chrome(executable_path=driverpath, seleniumwire_options=options, options=chrome_options) - driver.header_overrides = headers - wait = WebDriverWait(driver, 30) - - tmprinter.out("Setting cookies...") - driver.get("https://get.google.com/robots.txt") - for k, v in cookies.items(): - driver.add_cookie({'name': k, 'value': v}) - - tmprinter.out('Fetching Google Photos "Profile photos" album...') - driver.get(baseurl) - - tmprinter.out('Fetching the Google Photos albums overview...') - buttons = driver.find_elements(By.XPATH, "//button") - for button in buttons: - text = button.get_attribute('jsaction') - if text and 'touchcancel' in text: - button.click() - break - else: - tmprinter.out("") - print("Can't get the back button..") - driver.close() - return False - - wait.until(EC.text_to_be_present_in_element((By.XPATH, "//body"), "Album Archive")) - tmprinter.out("Got the albums overview !") - no_photos_trigger = "reached the end" - photos_trigger = " item" - body = driver.find_element(By.XPATH, "//body").text - if no_photos_trigger in body: - stats = "notfound" - elif photos_trigger in body: - stats = "found" - else: - try: - result = wait.until(element_has_substring_or_substring((By.XPATH, "//body"), no_photos_trigger, photos_trigger)) - except Exception: - tmprinter.out("[-] Timeout while fetching photos.") - return False - else: - if result == no_photos_trigger: - stats = "notfound" - elif result == photos_trigger: - stats = "found" - else: - return False - tmprinter.out("") - source = driver.page_source - driver.close() - - return {"stats": stats, "source": source} - - -def gpics(gaiaID, client, cookies, headers, regex_albums, regex_photos, headless=True): - baseurl = "https://get.google.com/albumarchive/" - - print(f"\nGoogle Photos : {baseurl + gaiaID + '/albums/profile-photos'}") - out = get_source(gaiaID, client, cookies, headers, headless) - - if not out: - print("=> Couldn't fetch the public photos.") - return False - if out["stats"] == "notfound": - print("=> No album") - return False - - # open('debug.html', 'w').write(repr(out["source"])) - results = re.compile(regex_albums).findall(out["source"]) - - list_albums_length = len(results) - - if results: - exifeater = ExifEater() - pics = [] - for album in results: - album_name = album[1] - album_link = baseurl + gaiaID + "/album/" + album[0] - album_length = int(album[2]) - - if album_length >= 1: - try: - req = client.get(album_link) - source = req.text.replace('\n', '') - results_pics = re.compile(regex_photos).findall(source) - for pic in results_pics: - pic_name = pic[1] - pic_link = pic[0] - pics.append(pic_link) - except: - pass - - print(f"=> {list_albums_length} albums{', ' + str(len(pics)) + ' photos' if list_albums_length else ''}") - for pic in pics: - try: - req = client.get(pic) - img = Image.open(BytesIO(req.content)) - exifeater.feed(img) - except: - pass - - print("\nSearching metadata...") - exifeater.output() - else: - print("=> No album") diff --git a/lib/search.py b/lib/search.py deleted file mode 100644 index d7ee3c28..00000000 --- a/lib/search.py +++ /dev/null @@ -1,43 +0,0 @@ -import json -import httpx - -from pprint import pprint -from time import sleep - - -def search(query, data_path, gdocs_public_doc, size=1000): - cookies = "" - token = "" - - with open(data_path, 'r') as f: - out = json.loads(f.read()) - token = out["keys"]["gdoc"] - cookies = out["cookies"] - data = {"request": '["documentsuggest.search.search_request","{}",[{}],null,1]'.format(query, size)} - - retries = 10 - time_to_wait = 5 - for retry in list(range(retries))[::-1]: - req = httpx.post('https://docs.google.com/document/d/{}/explore/search?token={}'.format(gdocs_public_doc, token), - cookies=cookies, data=data) - #print(req.text) - if req.status_code == 200: - break - if req.status_code == 500: - if retry == 0: - exit(f"[-] Error (GDocs): request gives {req.status_code}, wait a minute and retry !") - print(f"[-] GDocs request gives a 500 status code, retrying in 5 seconds...") - continue - - output = json.loads(req.text.replace(")]}'", "")) - if isinstance(output[0][1], str) and output[0][1].lower() == "xsrf": - exit(f"\n[-] Error : XSRF detected.\nIt means your cookies have expired, please generate new ones.") - - results = [] - for result in output[0][1]: - link = result[0][0] - title = result[0][1] - desc = result[0][2] - results.append({"title": title, "desc": desc, "link": link}) - - return results diff --git a/lib/utils.py b/lib/utils.py deleted file mode 100644 index 73be1db5..00000000 --- a/lib/utils.py +++ /dev/null @@ -1,257 +0,0 @@ -import imagehash -from webdriver_manager.chrome import ChromeDriverManager -from selenium.webdriver.chrome.options import Options -from seleniumwire.webdriver import Chrome - -from lib.os_detect import Os - -from pathlib import Path -import shutil -import subprocess, os -from os.path import isfile -import json -import re -from pprint import pprint - - -class TMPrinter(): - def __init__(self): - self.max_len = 0 - - def out(self, text): - if len(text) > self.max_len: - self.max_len = len(text) - else: - text += (" " * (self.max_len - len(text))) - print(text, end='\r') - def clear(self): - print(" " * self.max_len, end="\r") - -def within_docker(): - return Path('/.dockerenv').is_file() - -class Picture: - def __init__(self, url, is_default=False): - self.url = url - self.is_default = is_default - -class Contact: - def __init__(self, val, is_primary=True): - self.value = val - self.is_secondary = not is_primary - - def is_normalized(self, val): - return val.replace('.', '').lower() == self.value.replace('.', '').lower() - - def __str__(self): - printable_value = self.value - if self.is_secondary: - printable_value += ' (secondary)' - return printable_value - -def update_emails(emails, data): - """ - Typically canonical user email - May not be present in the list method response - """ - if not "email" in data: - return emails - - for e in data["email"]: - is_primary = e.get("signupEmailMetadata", {}).get("primary") - email = Contact(e["value"], is_primary) - - if email.value in emails: - if is_primary: - emails[email.value].is_secondary = False - else: - emails[email.value] = email - - return emails - -def is_email_google_account(httpx_client, auth, cookies, email, hangouts_token): - host = "https://people-pa.clients6.google.com" - url = "/v2/people/lookup?key={}".format(hangouts_token) - body = """id={}&type=EMAIL&matchType=EXACT&extensionSet.extensionNames=HANGOUTS_ADDITIONAL_DATA&extensionSet.extensionNames=HANGOUTS_OFF_NETWORK_GAIA_LOOKUP&extensionSet.extensionNames=HANGOUTS_PHONE_DATA&coreIdParams.useRealtimeNotificationExpandedAcls=true&requestMask.includeField.paths=person.email&requestMask.includeField.paths=person.gender&requestMask.includeField.paths=person.in_app_reachability&requestMask.includeField.paths=person.metadata&requestMask.includeField.paths=person.name&requestMask.includeField.paths=person.phone&requestMask.includeField.paths=person.photo&requestMask.includeField.paths=person.read_only_profile_info&requestMask.includeContainer=AFFINITY&requestMask.includeContainer=PROFILE&requestMask.includeContainer=DOMAIN_PROFILE&requestMask.includeContainer=ACCOUNT&requestMask.includeContainer=EXTERNAL_ACCOUNT&requestMask.includeContainer=CIRCLE&requestMask.includeContainer=DOMAIN_CONTACT&requestMask.includeContainer=DEVICE_CONTACT&requestMask.includeContainer=GOOGLE_GROUP&requestMask.includeContainer=CONTACT""" - - headers = { - "X-HTTP-Method-Override": "GET", - "Authorization": auth, - "Content-Type": "application/x-www-form-urlencoded", - "Origin": "https://hangouts.google.com" - } - - req = httpx_client.post(host + url, data=body.format(email), headers=headers, cookies=cookies) - data = json.loads(req.text) - #pprint(data) - if "error" in data and "Request had invalid authentication credentials" in data["error"]["message"]: - exit("[-] Cookies/Tokens seems expired, please verify them.") - elif "error" in data: - print("[-] Error :") - pprint(data) - exit() - elif not "matches" in data: - exit("[-] This email address does not belong to a Google Account.") - - return data - -def get_account_data(httpx_client, gaiaID, internal_auth, internal_token, config): - # Bypass method - req_headers = { - "Origin": "https://drive.google.com", - "authorization": internal_auth, - "Host": "people-pa.clients6.google.com" - } - headers = {**config.headers, **req_headers} - - url = f"https://people-pa.clients6.google.com/v2/people?person_id={gaiaID}&request_mask.include_container=PROFILE&request_mask.include_container=DOMAIN_PROFILE&request_mask.include_field.paths=person.metadata.best_display_name&request_mask.include_field.paths=person.photo&request_mask.include_field.paths=person.cover_photo&request_mask.include_field.paths=person.email&request_mask.include_field.paths=person.organization&request_mask.include_field.paths=person.location&request_mask.include_field.paths=person.email&requestMask.includeField.paths=person.phone&core_id_params.enable_private_names=true&requestMask.includeField.paths=person.read_only_profile_info&key={internal_token}" - req = httpx_client.get(url, headers=headers) - data = json.loads(req.text) - # pprint(data) - if "error" in data and "Request had invalid authentication credentials" in data["error"]["message"]: - exit("[-] Cookies/Tokens seems expired, please verify them.") - elif "error" in data: - print("[-] Error :") - pprint(data) - exit() - if data["personResponse"][0]["status"].lower() == "not_found": - return False - - name = get_account_name(httpx_client, gaiaID, data, internal_auth, internal_token, config) - - profile_data = data["personResponse"][0]["person"] - - profile_pics = [] - for p in profile_data["photo"]: - profile_pics.append(Picture(p["url"], p.get("isDefault", False))) - - # mostly is default - cover_pics = [] - for p in profile_data["coverPhoto"]: - cover_pics.append(Picture(p["imageUrl"], p["isDefault"])) - - emails = update_emails({}, profile_data) - - # absent if user didn't enter or hide them - phones = [] - if "phone" in profile_data: - for p in profile_data["phone"]: - phones.append(f'{p["value"]} ({p["type"]})') - - # absent if user didn't enter or hide them - locations = [] - if "location" in profile_data: - for l in profile_data["location"]: - locations.append(l["value"] if not l.get("current") else f'{l["value"]} (current)') - - # absent if user didn't enter or hide them - organizations = [] - if "organization" in profile_data: - organizations = (f'{o["name"]} ({o["type"]})' for o in profile_data["organization"]) - - return {"name": name, "profile_pics": profile_pics, "cover_pics": cover_pics, - "organizations": ', '.join(organizations), "locations": ', '.join(locations), - "emails_set": emails, "phones": ', '.join(phones)} - -def get_account_name(httpx_client, gaiaID, data, internal_auth, internal_token, config): - try: - name = data["personResponse"][0]["person"]["metadata"]["bestDisplayName"]["displayName"] - except KeyError: - pass # We fallback on the classic method - else: - return name - - # Classic method, but requires the target to have at least 1 GMaps contribution - req = httpx_client.get(f"https://www.google.com/maps/contrib/{gaiaID}") - gmaps_source = req.text - match = re.search(r'', gmaps_source) - if not match: - return None - return match[1] - -def image_hash(img): - flathash = imagehash.average_hash(img) - return flathash - -def detect_default_profile_pic(flathash): - if flathash - imagehash.hex_to_flathash("000018183c3c0000", 8) < 10 : - return True - return False - -def sanitize_location(location): - not_country = False - not_town = False - town = "?" - country = "?" - if "city" in location: - town = location["city"] - elif "village" in location: - town = location["village"] - elif "town" in location: - town = location["town"] - elif "municipality" in location: - town = location["municipality"] - else: - not_town = True - if not "country" in location: - not_country = True - location["country"] = country - if not_country and not_town: - return False - location["town"] = town - return location - - -def get_driverpath(): - driver_path = shutil.which("chromedriver") - if driver_path: - return driver_path - if within_docker(): - chromedrivermanager_silent = ChromeDriverManager(print_first_line=False, log_level=0, path="/usr/src/app") - else: - chromedrivermanager_silent = ChromeDriverManager(print_first_line=False, log_level=0) - driver = chromedrivermanager_silent.driver - driverpath_with_version = chromedrivermanager_silent.driver_cache.find_driver(driver.browser_version, driver.get_name(), driver.get_os_type(), driver.get_version()) - driverpath_without_version = chromedrivermanager_silent.driver_cache.find_driver("", driver.get_name(), driver.get_os_type(), "") - if driverpath_with_version: - return driverpath_with_version - elif not driverpath_with_version and driverpath_without_version: - print("[Webdrivers Manager] I'm updating the chromedriver...") - if within_docker(): - driver_path = ChromeDriverManager(path="/usr/src/app").install() - else: - driver_path = ChromeDriverManager().install() - print("[Webdrivers Manager] The chromedriver has been updated !\n") - else: - print("[Webdrivers Manager] I can't find the chromedriver, so I'm downloading and installing it for you...") - if within_docker(): - driver_path = ChromeDriverManager(path="/usr/src/app").install() - else: - driver_path = ChromeDriverManager().install() - print("[Webdrivers Manager] The chromedriver has been installed !\n") - return driver_path - - -def get_chrome_options_args(is_headless): - chrome_options = Options() - chrome_options.add_argument('--log-level=3') - chrome_options.add_experimental_option('excludeSwitches', ['enable-logging']) - chrome_options.add_argument("--no-sandbox") - if is_headless: - chrome_options.add_argument("--headless") - if (Os().wsl or Os().windows) and is_headless: - chrome_options.add_argument("--disable-gpu") - chrome_options.add_argument("--disable-dev-shm-usage") - chrome_options.add_argument("--disable-setuid-sandbox") - chrome_options.add_argument("--no-first-run") - chrome_options.add_argument("--no-zygote") - chrome_options.add_argument("--single-process") - chrome_options.add_argument("--disable-features=VizDisplayCompositor") - return chrome_options - -def inject_osid(cookies, service, config): - with open(config.data_path, 'r') as f: - out = json.loads(f.read()) - - cookies["OSID"] = out["osids"][service] - return cookies \ No newline at end of file diff --git a/lib/youtube.py b/lib/youtube.py deleted file mode 100644 index afa0a315..00000000 --- a/lib/youtube.py +++ /dev/null @@ -1,202 +0,0 @@ -import json -import urllib.parse -from io import BytesIO -from urllib.parse import unquote as parse_url - -from PIL import Image - -from lib.search import search as gdoc_search -from lib.utils import * - - -def get_channel_data(client, channel_url): - data = None - - retries = 2 - for retry in list(range(retries))[::-1]: - req = client.get(f"{channel_url}/about") - source = req.text - try: - data = json.loads(source.split('var ytInitialData = ')[1].split(';')[0]) - except (KeyError, IndexError): - if retry == 0: - return False - continue - else: - break - - handle = data["metadata"]["channelMetadataRenderer"]["vanityChannelUrl"].split("/")[-1] - tabs = [x[list(x.keys())[0]] for x in data["contents"]["twoColumnBrowseResultsRenderer"]["tabs"]] - about_tab = [x for x in tabs if x["title"].lower() == "about"][0] - channel_details = about_tab["content"]["sectionListRenderer"]["contents"][0]["itemSectionRenderer"]["contents"][0]["channelAboutFullMetadataRenderer"] - - out = { - "name": None, - "description": None, - "channel_urls": [], - "email_contact": False, - "views": None, - "joined_date": None, - "primary_links": [], - "country": None - } - - out["name"] = data["metadata"]["channelMetadataRenderer"]["title"] - - out["channel_urls"].append(data["metadata"]["channelMetadataRenderer"]["channelUrl"]) - out["channel_urls"].append(f"https://www.youtube.com/c/{handle}") - out["channel_urls"].append(f"https://www.youtube.com/user/{handle}") - - out["email_contact"] = "businessEmailLabel" in channel_details - - out["description"] = channel_details["description"]["simpleText"] if "description" in channel_details else None - out["views"] = channel_details["viewCountText"]["simpleText"].split(" ")[0] if "viewCountText" in channel_details else None - out["joined_date"] = channel_details["joinedDateText"]["runs"][1]["text"] if "joinedDateText" in channel_details else None - out["country"] = channel_details["country"]["simpleText"] if "country" in channel_details else None - - if "primaryLinks" in channel_details: - for primary_link in channel_details["primaryLinks"]: - title = primary_link["title"]["simpleText"] - url = parse_url(primary_link["navigationEndpoint"]["urlEndpoint"]["url"].split("&q=")[-1]) - out["primary_links"].append({"title": title, "url": url}) - - return out - -def youtube_channel_search(client, query): - try: - link = "https://www.youtube.com/results?search_query={}&sp=EgIQAg%253D%253D" - req = client.get(link.format(urllib.parse.quote(query))) - source = req.text - data = json.loads( - source.split('window["ytInitialData"] = ')[1].split('window["ytInitialPlayerResponse"]')[0].split(';\n')[0]) - channels = \ - data["contents"]["twoColumnSearchResultsRenderer"]["primaryContents"]["sectionListRenderer"]["contents"][0][ - "itemSectionRenderer"]["contents"] - results = {"channels": [], "length": len(channels)} - for channel in channels: - if len(results["channels"]) >= 10: - break - title = channel["channelRenderer"]["title"]["simpleText"] - if not query.lower() in title.lower(): - continue - avatar_link = channel["channelRenderer"]["thumbnail"]["thumbnails"][0]["url"].split('=')[0] - if avatar_link[:2] == "//": - avatar_link = "https:" + avatar_link - profile_url = "https://youtube.com" + channel["channelRenderer"]["navigationEndpoint"]["browseEndpoint"][ - "canonicalBaseUrl"] - req = client.get(avatar_link) - img = Image.open(BytesIO(req.content)) - hash = str(image_hash(img)) - results["channels"].append({"profile_url": profile_url, "name": title, "hash": hash}) - return results - except (KeyError, IndexError): - return False - - -def youtube_channel_search_gdocs(client, query, data_path, gdocs_public_doc): - search_query = f"site:youtube.com/channel \\\"{query}\\\"" - search_results = gdoc_search(search_query, data_path, gdocs_public_doc) - channels = [] - - for result in search_results: - sanitized = "https://youtube.com/" + ('/'.join(result["link"].split('/')[3:5]).split("?")[0]) - if sanitized not in channels: - channels.append(sanitized) - - if not channels: - return False - - results = {"channels": [], "length": len(channels)} - channels = channels[:5] - - for profile_url in channels: - data = None - avatar_link = None - - retries = 2 - for retry in list(range(retries))[::-1]: - req = client.get(profile_url, follow_redirects=True) - source = req.text - try: - data = json.loads(source.split('var ytInitialData = ')[1].split(';')[0]) - avatar_link = data["metadata"]["channelMetadataRenderer"]["avatar"]["thumbnails"][0]["url"].split('=')[0] - except (KeyError, IndexError) as e: - #import pdb; pdb.set_trace() - if retry == 0: - return False - continue - else: - break - req = client.get(avatar_link) - img = Image.open(BytesIO(req.content)) - hash = str(image_hash(img)) - title = data["metadata"]["channelMetadataRenderer"]["title"] - results["channels"].append({"profile_url": profile_url, "name": title, "hash": hash}) - return results - - -def get_channels(client, query, data_path, gdocs_public_doc): - from_youtube = youtube_channel_search(client, query) - from_gdocs = youtube_channel_search_gdocs(client, query, data_path, gdocs_public_doc) - to_process = [] - if from_youtube: - from_youtube["origin"] = "youtube" - to_process.append(from_youtube) - if from_gdocs: - from_gdocs["origin"] = "gdocs" - to_process.append(from_gdocs) - if not to_process: - return False - return to_process - - -def get_confidence(data, query, hash): - score_steps = 4 - - for source_nb, source in enumerate(data): - for channel_nb, channel in enumerate(source["channels"]): - score = 0 - - if hash == imagehash.hex_to_flathash(channel["hash"], 8): - score += score_steps * 4 - if query == channel["name"]: - score += score_steps * 3 - if query in channel["name"]: - score += score_steps * 2 - if ((source["origin"] == "youtube" and source["length"] <= 5) or - (source["origin"] == "google" and source["length"] <= 4)): - score += score_steps - data[source_nb]["channels"][channel_nb]["score"] = score - - channels = [] - for source in data: - for channel in source["channels"]: - found_better = False - for source2 in data: - for channel2 in source["channels"]: - if channel["profile_url"] == channel2["profile_url"]: - if channel2["score"] > channel["score"]: - found_better = True - break - if found_better: - break - if found_better: - continue - else: - channels.append(channel) - channels = sorted([json.loads(chan) for chan in set([json.dumps(channel) for channel in channels])], - key=lambda k: k['score'], reverse=True) - panels = sorted(set([c["score"] for c in channels]), reverse=True) - if not channels or (panels and panels[0] <= 0): - return 0, [] - - maxscore = sum([p * score_steps for p in range(1, score_steps + 1)]) - for panel in panels: - chans = [c for c in channels if c["score"] == panel] - if len(chans) > 1: - panel -= 5 - return (panel / maxscore * 100), chans - - -def extract_usernames(channels): - return [chan['profile_url'].split("/user/")[1] for chan in channels if "/user/" in chan['profile_url']] diff --git a/main.py b/main.py new file mode 100644 index 00000000..7759950a --- /dev/null +++ b/main.py @@ -0,0 +1,19 @@ +import os +from pathlib import Path + +# We change the current working directory to allow using GHunt from anywhere +os.chdir(Path(__file__).parents[0]) + +from ghunt.lib import modwall; modwall.check() # We check the requirements +from ghunt.lib.banner import banner +from ghunt import config +from ghunt import globals as gb +from ghunt.cli import parse_and_run + + +gb.init_globals() +config.silent_mode = False # If true, no print is being executed +gb.config = config # We export the given config to a project-wide global variable + +banner() +parse_and_run() \ No newline at end of file diff --git a/modules/doc.py b/modules/doc.py deleted file mode 100644 index 060b3096..00000000 --- a/modules/doc.py +++ /dev/null @@ -1,133 +0,0 @@ -#!/usr/bin/env python3 - -import json -import sys -import os -from datetime import datetime -from io import BytesIO -from os.path import isfile -from pathlib import Path -from pprint import pprint - -import httpx -from PIL import Image - -import config -from lib.utils import * -from lib.banner import banner - - -def doc_hunt(doc_link): - banner() - - tmprinter = TMPrinter() - - if not doc_link: - exit("Please give the link to a Google resource.\nExample : https://docs.google.com/spreadsheets/d/1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms") - - is_within_docker = within_docker() - if is_within_docker: - print("[+] Docker detected, profile pictures will not be saved.") - - doc_id = ''.join([x for x in doc_link.split("?")[0].split("/") if len(x) in (33, 44)]) - if doc_id: - print(f"\nDocument ID : {doc_id}\n") - else: - exit("\nDocument ID not found.\nPlease make sure you have something that looks like this in your link :\1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms") - - if not isfile(config.data_path): - exit("Please generate cookies and tokens first, with the check_and_gen.py script.") - - internal_token = "" - cookies = {} - - with open(config.data_path, 'r') as f: - out = json.loads(f.read()) - internal_token = out["keys"]["internal"] - cookies = out["cookies"] - - headers = {**config.headers, **{"X-Origin": "https://drive.google.com"}} - client = httpx.Client(cookies=cookies, headers=headers) - - url = f"https://clients6.google.com/drive/v2beta/files/{doc_id}?fields=alternateLink%2CcopyRequiresWriterPermission%2CcreatedDate%2Cdescription%2CdriveId%2CfileSize%2CiconLink%2Cid%2Clabels(starred%2C%20trashed)%2ClastViewedByMeDate%2CmodifiedDate%2Cshared%2CteamDriveId%2CuserPermission(id%2Cname%2CemailAddress%2Cdomain%2Crole%2CadditionalRoles%2CphotoLink%2Ctype%2CwithLink)%2Cpermissions(id%2Cname%2CemailAddress%2Cdomain%2Crole%2CadditionalRoles%2CphotoLink%2Ctype%2CwithLink)%2Cparents(id)%2Ccapabilities(canMoveItemWithinDrive%2CcanMoveItemOutOfDrive%2CcanMoveItemOutOfTeamDrive%2CcanAddChildren%2CcanEdit%2CcanDownload%2CcanComment%2CcanMoveChildrenWithinDrive%2CcanRename%2CcanRemoveChildren%2CcanMoveItemIntoTeamDrive)%2Ckind&supportsTeamDrives=true&enforceSingleParent=true&key={internal_token}" - - retries = 100 - for retry in range(retries): - req = client.get(url) - if "File not found" in req.text: - exit("[-] This file does not exist or is not public") - elif "rateLimitExceeded" in req.text: - tmprinter.out(f"[-] Rate-limit detected, retrying... {retry+1}/{retries}") - continue - else: - break - else: - tmprinter.clear() - exit("[-] Rate-limit exceeded. Try again later.") - - if '"reason": "keyInvalid"' in req.text: - exit("[-] Your key is invalid, try regenerating your cookies & keys.") - - tmprinter.clear() - data = json.loads(req.text) - - # Extracting informations - - # Dates - - created_date = datetime.strptime(data["createdDate"], '%Y-%m-%dT%H:%M:%S.%fz') - modified_date = datetime.strptime(data["modifiedDate"], '%Y-%m-%dT%H:%M:%S.%fz') - - print(f"[+] Creation date : {created_date.strftime('%Y/%m/%d %H:%M:%S')} (UTC)") - print(f"[+] Last edit date : {modified_date.strftime('%Y/%m/%d %H:%M:%S')} (UTC)") - - # Permissions - - user_permissions = [] - if data["userPermission"]: - if data["userPermission"]["id"] == "me": - user_permissions.append(data["userPermission"]["role"]) - if "additionalRoles" in data["userPermission"]: - user_permissions += data["userPermission"]["additionalRoles"] - - public_permissions = [] - owner = None - for permission in data["permissions"]: - if permission["id"] in ["anyoneWithLink", "anyone"]: - public_permissions.append(permission["role"]) - if "additionalRoles" in data["permissions"]: - public_permissions += permission["additionalRoles"] - elif permission["role"] == "owner": - owner = permission - - print("\nPublic permissions :") - for permission in public_permissions: - print(f"- {permission}") - - if public_permissions != user_permissions: - print("[+] You have special permissions :") - for permission in user_permissions: - print(f"- {permission}") - - if owner: - print("\n[+] Owner found !\n") - print(f"Name : {owner['name']}") - print(f"Email : {owner['emailAddress']}") - print(f"Google ID : {owner['id']}") - - # profile picture - profile_pic_link = owner['photoLink'] - req = client.get(profile_pic_link) - - profile_pic_img = Image.open(BytesIO(req.content)) - profile_pic_flathash = image_hash(profile_pic_img) - is_default_profile_pic = detect_default_profile_pic(profile_pic_flathash) - - if not is_default_profile_pic and not is_within_docker: - print("\n[+] Custom profile picture !") - print(f"=> {profile_pic_link}") - if config.write_profile_pic and not is_within_docker: - open(Path(config.profile_pics_dir) / f'{owner["emailAddress"]}.jpg', 'wb').write(req.content) - print("Profile picture saved !\n") - else: - print("\n[-] Default profile picture\n") \ No newline at end of file diff --git a/modules/email.py b/modules/email.py deleted file mode 100644 index 72228b3d..00000000 --- a/modules/email.py +++ /dev/null @@ -1,228 +0,0 @@ -#!/usr/bin/env python3 - -import json -import sys -import os -from datetime import datetime -from io import BytesIO -from os.path import isfile -from pathlib import Path -from pprint import pprint - -import httpx -from PIL import Image -from geopy.geocoders import Nominatim - -import config -from lib.banner import banner -import lib.gmaps as gmaps -import lib.youtube as ytb -from lib.photos import gpics -from lib.utils import * -import lib.calendar as gcalendar - - -def email_hunt(email): - banner() - - if not email: - exit("Please give a valid email.\nExample : larry@google.com") - - if not isfile(config.data_path): - exit("Please generate cookies and tokens first, with the check_and_gen.py script.") - - hangouts_auth = "" - hangouts_token = "" - internal_auth = "" - internal_token = "" - - cookies = {} - - with open(config.data_path, 'r') as f: - out = json.loads(f.read()) - hangouts_auth = out["hangouts_auth"] - hangouts_token = out["keys"]["hangouts"] - internal_auth = out["internal_auth"] - internal_token = out["keys"]["internal"] - cookies = out["cookies"] - - client = httpx.Client(cookies=cookies, headers=config.headers) - - data = is_email_google_account(client, hangouts_auth, cookies, email, - hangouts_token) - - is_within_docker = within_docker() - if is_within_docker: - print("[+] Docker detected, profile pictures will not be saved.") - - geolocator = Nominatim(user_agent="nominatim") - print(f"[+] {len(data['matches'])} account found !") - - for user in data["matches"]: - print("\n------------------------------\n") - - gaiaID = user["personId"][0] - email = user["lookupId"] - infos = data["people"][gaiaID] - - # get name & profile picture - account = get_account_data(client, gaiaID, internal_auth, internal_token, config) - name = account["name"] - - if name: - print(f"Name : {name}") - else: - if "name" not in infos: - print("[-] Couldn't find name") - else: - for i in range(len(infos["name"])): - if 'displayName' in infos['name'][i].keys(): - name = infos["name"][i]["displayName"] - print(f"Name : {name}") - - organizations = account["organizations"] - if organizations: - print(f"Organizations : {organizations}") - - locations = account["locations"] - if locations: - print(f"Locations : {locations}") - - # profile picture - profile_pic_url = account.get("profile_pics") and account["profile_pics"][0].url - if profile_pic_url: - req = client.get(profile_pic_url) - - # TODO: make sure it's necessary now - profile_pic_img = Image.open(BytesIO(req.content)) - profile_pic_flathash = image_hash(profile_pic_img) - is_default_profile_pic = detect_default_profile_pic(profile_pic_flathash) - - if not is_default_profile_pic: - print("\n[+] Custom profile picture !") - print(f"=> {profile_pic_url}") - if config.write_profile_pic and not is_within_docker: - open(Path(config.profile_pics_dir) / f'{email}.jpg', 'wb').write(req.content) - print("Profile picture saved !") - else: - print("\n[-] Default profile picture") - - # cover profile picture - cover_pic = account.get("cover_pics") and account["cover_pics"][0] - if cover_pic and not cover_pic.is_default: - cover_pic_url = cover_pic.url - req = client.get(cover_pic_url) - - print("\n[+] Custom profile cover picture !") - print(f"=> {cover_pic_url}") - if config.write_profile_pic and not is_within_docker: - open(Path(config.profile_pics_dir) / f'cover_{email}.jpg', 'wb').write(req.content) - print("Cover profile picture saved !") - - # last edit - try: - timestamp = int(infos["metadata"]["lastUpdateTimeMicros"][:-3]) - last_edit = datetime.utcfromtimestamp(timestamp).strftime("%Y/%m/%d %H:%M:%S (UTC)") - print(f"\nLast profile edit : {last_edit}") - except KeyError: - last_edit = None - print(f"\nLast profile edit : Not found") - - canonical_email = "" - emails = update_emails(account["emails_set"], infos) - if emails and len(list(emails)) == 1: - if list(emails.values())[0].is_normalized(email): - new_email = list(emails.keys())[0] - if email != new_email: - canonical_email = f' (canonical email is {new_email})' - emails = [] - - print(f"\nEmail : {email}{canonical_email}\nGaia ID : {gaiaID}\n") - - if emails: - print(f"Contact emails : {', '.join(map(str, emails.values()))}") - - phones = account["phones"] - if phones: - print(f"Contact phones : {phones}") - - # is bot? - if "extendedData" in infos: - isBot = infos["extendedData"]["hangoutsExtendedData"]["isBot"] - if isBot: - print("Hangouts Bot : Yes !") - else: - print("Hangouts Bot : No") - else: - print("Hangouts Bot : Unknown") - - # decide to check YouTube - ytb_hunt = False - try: - services = [x["appType"].lower() if x["appType"].lower() != "babel" else "hangouts" for x in - infos["inAppReachability"]] - if name and (config.ytb_hunt_always or "youtube" in services): - ytb_hunt = True - print("\n[+] Activated Google services :") - print('\n'.join(["- " + x.capitalize() for x in services])) - - except KeyError: - ytb_hunt = True - print("\n[-] Unable to fetch connected Google services.") - - # check YouTube - if name and ytb_hunt: - confidence = None - data = ytb.get_channels(client, name, config.data_path, - config.gdocs_public_doc) - if not data: - print("\n[-] YouTube channel not found.") - else: - confidence, channels = ytb.get_confidence(data, name, profile_pic_flathash) - - if confidence: - print(f"\n[+] YouTube channel (confidence => {confidence}%) :") - for channel in channels: - print(f"- [{channel['name']}] {channel['profile_url']}") - possible_usernames = ytb.extract_usernames(channels) - if possible_usernames: - print("\n[+] Possible usernames found :") - for username in possible_usernames: - print(f"- {username}") - else: - print("\n[-] YouTube channel not found.") - - # TODO: return gpics function output here - #gpics(gaiaID, client, cookies, config.headers, config.regexs["albums"], config.regexs["photos"], - # config.headless) - - # reviews - reviews = gmaps.scrape(gaiaID, client, cookies, config, config.headers, config.regexs["review_loc_by_id"], config.headless) - - if reviews: - confidence, locations = gmaps.get_confidence(geolocator, reviews, config.gmaps_radius) - print(f"\n[+] Probable location (confidence => {confidence}) :") - - loc_names = [] - for loc in locations: - loc_names.append( - f"- {loc['avg']['town']}, {loc['avg']['country']}" - ) - - loc_names = set(loc_names) # delete duplicates - for loc in loc_names: - print(loc) - - - # Google Calendar - calendar_response = gcalendar.fetch(email, client, config) - if calendar_response: - print("[+] Public Google Calendar found !") - events = calendar_response["events"] - if events: - gcalendar.out(events) - else: - print("=> No recent events found.") - else: - print("[-] No public Google Calendar.") - \ No newline at end of file diff --git a/modules/gaia.py b/modules/gaia.py deleted file mode 100644 index d62ed5bb..00000000 --- a/modules/gaia.py +++ /dev/null @@ -1,146 +0,0 @@ -#!/usr/bin/env python3 - -import json -import sys -import os -from datetime import datetime -from io import BytesIO -from os.path import isfile -from pathlib import Path -from pprint import pprint - -import httpx -from PIL import Image -from geopy.geocoders import Nominatim - -import config -from lib.banner import banner -import lib.gmaps as gmaps -import lib.youtube as ytb -from lib.utils import * - - -def gaia_hunt(gaiaID): - banner() - - if not gaiaID: - exit("Please give a valid GaiaID.\nExample : 113127526941309521065") - - if not isfile(config.data_path): - exit("Please generate cookies and tokens first, with the check_and_gen.py script.") - - internal_auth = "" - internal_token = "" - - cookies = {} - - with open(config.data_path, 'r') as f: - out = json.loads(f.read()) - internal_auth = out["internal_auth"] - internal_token = out["keys"]["internal"] - cookies = out["cookies"] - - client = httpx.Client(cookies=cookies, headers=config.headers) - - account = get_account_data(client, gaiaID, internal_auth, internal_token, config) - if not account: - exit("[-] No account linked to this Gaia ID.") - - is_within_docker = within_docker() - if is_within_docker: - print("[+] Docker detected, profile pictures will not be saved.") - - geolocator = Nominatim(user_agent="nominatim") - - # get name & other info - name = account["name"] - if name: - print(f"Name : {name}") - - organizations = account["organizations"] - if organizations: - print(f"Organizations : {organizations}") - - locations = account["locations"] - if locations: - print(f"Locations : {locations}") - - # get profile picture - profile_pic_url = account.get("profile_pics") and account["profile_pics"][0].url - if profile_pic_url: - req = client.get(profile_pic_url) - - # TODO: make sure it's necessary now - profile_pic_img = Image.open(BytesIO(req.content)) - profile_pic_flathash = image_hash(profile_pic_img) - is_default_profile_pic = detect_default_profile_pic(profile_pic_flathash) - - if not is_default_profile_pic: - print("\n[+] Custom profile picture !") - print(f"=> {profile_pic_url}") - if config.write_profile_pic and not is_within_docker: - open(Path(config.profile_pics_dir) / f'{gaiaID}.jpg', 'wb').write(req.content) - print("Profile picture saved !") - else: - print("\n[-] Default profile picture") - - # cover profile picture - cover_pic = account.get("cover_pics") and account["cover_pics"][0] - if cover_pic and not cover_pic.is_default: - req = client.get(cover_pic_url) - - print("\n[+] Custom profile cover picture !") - print(f"=> {cover_pic_url}") - if config.write_profile_pic and not is_within_docker: - open(Path(config.profile_pics_dir) / f'cover_{email}.jpg', 'wb').write(req.content) - print("Cover profile picture saved !") - - - print(f"\nGaia ID : {gaiaID}") - - emails = account["emails_set"] - if emails: - print(f"Contact emails : {', '.join(map(str, emails.values()))}") - - phones = account["phones"] - if phones: - print(f"Contact phones : {phones}") - - # check YouTube - if name: - confidence = None - data = ytb.get_channels(client, name, config.data_path, - config.gdocs_public_doc) - if not data: - print("\n[-] YouTube channel not found.") - else: - confidence, channels = ytb.get_confidence(data, name, profile_pic_flathash) - - if confidence: - print(f"\n[+] YouTube channel (confidence => {confidence}%) :") - for channel in channels: - print(f"- [{channel['name']}] {channel['profile_url']}") - possible_usernames = ytb.extract_usernames(channels) - if possible_usernames: - print("\n[+] Possible usernames found :") - for username in possible_usernames: - print(f"- {username}") - else: - print("\n[-] YouTube channel not found.") - - # reviews - reviews = gmaps.scrape(gaiaID, client, cookies, config, config.headers, config.regexs["review_loc_by_id"], config.headless) - - if reviews: - confidence, locations = gmaps.get_confidence(geolocator, reviews, config.gmaps_radius) - print(f"\n[+] Probable location (confidence => {confidence}) :") - - loc_names = [] - for loc in locations: - loc_names.append( - f"- {loc['avg']['town']}, {loc['avg']['country']}" - ) - - loc_names = set(loc_names) # delete duplicates - for loc in loc_names: - print(loc) diff --git a/modules/youtube.py b/modules/youtube.py deleted file mode 100644 index 8b64e709..00000000 --- a/modules/youtube.py +++ /dev/null @@ -1,250 +0,0 @@ -#!/usr/bin/env python3 - -import json -import sys -from datetime import datetime -from datetime import date -from io import BytesIO -from os.path import isfile -from pathlib import Path -from pprint import pprint - -import httpx -import wayback -from PIL import Image -from bs4 import BeautifulSoup as bs -from geopy.geocoders import Nominatim - -import config -from lib.banner import banner -import lib.gmaps as gmaps -import lib.youtube as ytb -from lib.utils import * - - -def find_gaiaID(body): - """ - We don't use a regex to avoid extracting an other gaiaID - for example if the target had put a secondary Google Plus blog in his channel social links. - """ - - # 1st method ~ 2014 - try: - publisher = body.find("link", {"rel": "publisher"}) - gaiaID = publisher.attrs["href"].split("/")[-1] - except: - pass - else: - if gaiaID: - return gaiaID - - # 2nd method ~ 2015 - try: - author_links = [x.find_next("link") for x in body.find_all("span", {"itemprop": "author"})] - valid_author_link = [x for x in author_links if "plus.google.com/" in x.attrs["href"]][0] - gaiaID = valid_author_link.attrs["href"].split("/")[-1] - except: - pass - else: - if gaiaID: - return gaiaID - - # 3rd method ~ 2019 - try: - data = json.loads(str(body).split('window["ytInitialData"] = ')[1].split('window["ytInitialPlayerResponse"]')[0].strip().strip(";")) - gaiaID = data["metadata"]["channelMetadataRenderer"]["plusPageLink"].split("/")[-1] - except: - pass - else: - if gaiaID: - return gaiaID - -def analyze_snapshots(client, wb_client, channel_url, dates): - body = None - record = None - for record in wb_client.search(channel_url, to_date=dates["to"], from_date=dates["from"]): - try: - req = client.get(record.raw_url) - if req.status_code == 429: - continue # Rate-limit is fucked up and is snapshot-based, we can just take the next snapshot - except Exception as err: - pass - else: - if re.compile(config.regexs["gplus"]).findall(req.text): - body = bs(req.text, 'html.parser') - #print(record) - print(f'[+] Snapshot : {record.timestamp.strftime("%d/%m/%Y")}') - break - else: - return None - - gaiaID = find_gaiaID(body) - return gaiaID - -def check_channel(client, wb_client, channel_url): - # Fast check (no doubt that GaiaID is present in this period) - - dates = {"to": date(2019, 12, 31), "from": date(2014, 1, 1)} - gaiaID = analyze_snapshots(client, wb_client, channel_url, dates) - - # Complete check - - if not gaiaID: - dates = {"to": date(2020, 7, 31), "from": date(2013, 6, 3)} - gaiaID = analyze_snapshots(client, wb_client, channel_url, dates) - - return gaiaID - -def launch_checks(client, wb_client, channel_data): - for channel_url in channel_data["channel_urls"]: - gaiaID = check_channel(client, wb_client, channel_url) - if gaiaID: - return gaiaID - - return False - -def youtube_hunt(channel_url): - banner() - - if not channel_url: - exit("Please give a valid channel URL.\nExample : https://www.youtube.com/user/PewDiePie") - - if not isfile(config.data_path): - exit("Please generate cookies and tokens first, with the check_and_gen.py script.") - - internal_auth = "" - internal_token = "" - - cookies = {} - - with open(config.data_path, 'r') as f: - out = json.loads(f.read()) - internal_auth = out["internal_auth"] - internal_token = out["keys"]["internal"] - cookies = out["cookies"] - - if not "PREF" in cookies: - pref_cookies = {"PREF": "tz=Europe.Paris&f6=40000000&hl=en"} # To set the lang in english - cookies = {**cookies, **pref_cookies} - - client = httpx.Client(cookies=cookies, headers=config.headers) - - is_within_docker = within_docker() - if is_within_docker: - print("[+] Docker detected, profile pictures will not be saved.") - - geolocator = Nominatim(user_agent="nominatim") - - print("\n📌 [Youtube channel]") - - channel_data = ytb.get_channel_data(client, channel_url) - if channel_data: - is_channel_existing = True - print(f'[+] Channel name : {channel_data["name"]}\n') - else: - is_channel_existing = False - print("[-] Channel not found.\nSearching for a trace in the archives...\n") - - channel_data = { - "name": None, - "description": None, - "channel_urls": [channel_url], - "email_contact": False, - "views": None, - "joined_date": None, - "primary_links": [], - "country": None - } - - wb_client = wayback.WaybackClient() - gaiaID = launch_checks(client, wb_client, channel_data) - if gaiaID: - print(f"[+] GaiaID => {gaiaID}\n") - else: - print("[-] No interesting snapshot found.\n") - - if is_channel_existing: - if channel_data["email_contact"]: - print(f'[+] Email on profile : available !') - else: - print(f'[-] Email on profile : not available.') - if channel_data["country"]: - print(f'[+] Country : {channel_data["country"]}') - print() - if channel_data["description"]: - print(f'🧬 Description : {channel_data["description"]}') - if channel_data["views"]: - print(f'🧬 Total views : {channel_data["views"]}') - if channel_data["joined_date"]: - print(f'🧬 Joined date : {channel_data["joined_date"]}') - - if channel_data["primary_links"]: - print(f'\n[+] Primary links ({len(channel_data["primary_links"])} found)') - for primary_link in channel_data["primary_links"]: - print(f'- {primary_link["title"]} => {primary_link["url"]}') - - - if not gaiaID: - exit() - - print("\n📌 [Google account]") - # get name & profile picture - account = get_account_data(client, gaiaID, internal_auth, internal_token, config) - name = account["name"] - - if name: - print(f"Name : {name}") - - # profile picture - profile_pic_url = account.get("profile_pics") and account["profile_pics"][0].url - req = client.get(profile_pic_url) - - profile_pic_img = Image.open(BytesIO(req.content)) - profile_pic_hash = image_hash(profile_pic_img) - is_default_profile_pic = detect_default_profile_pic(profile_pic_hash) - - if profile_pic_url: - req = client.get(profile_pic_url) - - # TODO: make sure it's necessary now - profile_pic_img = Image.open(BytesIO(req.content)) - profile_pic_flathash = image_hash(profile_pic_img) - is_default_profile_pic = detect_default_profile_pic(profile_pic_flathash) - - if not is_default_profile_pic: - print("\n[+] Custom profile picture !") - print(f"=> {profile_pic_url}") - if config.write_profile_pic and not is_within_docker: - open(Path(config.profile_pics_dir) / f'{gaiaID}.jpg', 'wb').write(req.content) - print("Profile picture saved !") - else: - print("\n[-] Default profile picture") - - # cover profile picture - cover_pic = account.get("cover_pics") and account["cover_pics"][0] - if cover_pic and not cover_pic.is_default: - cover_pic_url = cover_pic.url - req = client.get(cover_pic_url) - - print("\n[+] Custom profile cover picture !") - print(f"=> {cover_pic_url}") - if config.write_profile_pic and not is_within_docker: - open(Path(config.profile_pics_dir) / f'cover_{gaiaID}.jpg', 'wb').write(req.content) - print("Cover profile picture saved !") - - # reviews - reviews = gmaps.scrape(gaiaID, client, cookies, config, config.headers, config.regexs["review_loc_by_id"], config.headless) - - if reviews: - confidence, locations = gmaps.get_confidence(geolocator, reviews, config.gmaps_radius) - print(f"\n[+] Probable location (confidence => {confidence}) :") - - loc_names = [] - for loc in locations: - loc_names.append( - f"- {loc['avg']['town']}, {loc['avg']['country']}" - ) - - loc_names = set(loc_names) # delete duplicates - for loc in loc_names: - print(loc) diff --git a/requirements.txt b/requirements.txt index b9802546..c9c54a6a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,14 +1,12 @@ geopy httpx>=0.20.0 -selenium-wire>=4.5.5 -selenium>=4.0.0 imagehash pillow python-dateutil -colorama +rich beautifultable -termcolor -webdriver-manager wayback bs4 packaging +alive-progress +trio