diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000000..3ffde9e6d0 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "manager"] + path = manager + url = https://github.com/wzdnzd/proxy-manager.git diff --git a/manager b/manager new file mode 160000 index 0000000000..e3d93e51e7 --- /dev/null +++ b/manager @@ -0,0 +1 @@ +Subproject commit e3d93e51e762d269546e299490ff114031272e0b diff --git a/subscribe/airport.py b/subscribe/airport.py index b2b6d39b92..28182d53e6 100644 --- a/subscribe/airport.py +++ b/subscribe/airport.py @@ -533,14 +533,15 @@ def parse( with open(self.sub, "r", encoding="UTF8") as f: text = f.read() else: - headers = {"User-Agent": "Clash.Meta; Mihomo"} + client = f"{utils.USER_AGENT}; Clash.Meta; Mihomo; Shadowrocket;" + headers = {"User-Agent": client} trace = os.environ.get("TRACE_ENABLE", "false").lower() in ["true", "1"] text = utils.http_get( url=self.sub, headers=headers, retry=retry, - timeout=30, + timeout=120, trace=trace, interval=1, max_size=15 * 1024 * 1024, @@ -699,7 +700,7 @@ def parse( @staticmethod def check_protocol(link: str) -> bool: return re.match( - r"^(vmess|trojan|ss|ssr|vless|hysteria|hysteria2|tuic|snell|anytls)://[a-zA-Z0-9:.?+=@%&#_\-/]{10,}", + r"^(vmess|trojan|ss|ssr|vless|hysteria|hysteria2|tuic|snell|anytls|socks5|https?)://[a-zA-Z0-9:.?+=@%&#_\-/]{10,}", utils.trim(link).replace("\r", ""), flags=re.I, ) @@ -754,7 +755,7 @@ def clean_text(document: str) -> str: not is_b64encode and not is_json and not is_yaml - and all(AirPort.check_protocol(x) for x in text.split("\n") if x) + and all(AirPort.check_protocol(x) for x in text.split("\n") if x and not x.startswith("#")) ): text = base64.b64encode(text.encode(encoding="UTF8")).decode(encoding="UTF8") @@ -777,6 +778,7 @@ def clean_text(document: str) -> str: f"{artifact}.yaml", "clash", True, + True, ignore, ) if not success: diff --git a/subscribe/clash.py b/subscribe/clash.py index 193559f585..c684f07ab5 100644 --- a/subscribe/clash.py +++ b/subscribe/clash.py @@ -521,6 +521,20 @@ def verify(item: dict, mihomo: bool = True) -> bool: elif item["type"] == "vless": authentication = "uuid" + + # see: https://github.com/MetaCubeX/mihomo/blob/Alpha/transport/vless/encryption/factory.go#L12 + encryption = utils.trim(item.get("encryption", "")) + if encryption not in ["", "none"]: + parts = encryption.split(".") + + # Must be: mlkem768x25519plus..<...>.<...> (len >= 4) + if ( + len(parts) < 4 + or parts[0] != "mlkem768x25519plus" + or parts[1] not in ("native", "xorpub", "random") + ): + return False + network = utils.trim(item.get("network", "tcp")) # mihomo: https://wiki.metacubex.one/config/proxies/vless/#network @@ -575,8 +589,14 @@ def verify(item: dict, mihomo: bool = True) -> bool: short_id = str(short_id) else: return False + # if len(short_id) != 8 or not is_hex(short_id) or re.match(r"\d+e\d+", short_id, flags=re.I): + # return False - if len(short_id) != 8 or not is_hex(short_id) or re.match(r"\d+e\d+", short_id, flags=re.I): + try: + sib = bytes.fromhex(short_id) + if len(sib) > 8: + return False + except ValueError: return False reality_opts["short-id"] = QuotedStr(short_id) diff --git a/subscribe/crawl.py b/subscribe/crawl.py index 0c8cdc2bed..1e4f5276b1 100644 --- a/subscribe/crawl.py +++ b/subscribe/crawl.py @@ -1272,7 +1272,7 @@ def check_status( return False, connectable try: - headers = {"User-Agent": "clash.meta"} + headers = {"User-Agent": f"{utils.USER_AGENT}; Clash.Meta; Mihomo; Shadowrocket;"} request = urllib.request.Request(url=url, headers=headers) response = urllib.request.urlopen(request, timeout=10, context=utils.CTX) if response.getcode() != 200: diff --git a/subscribe/location.py b/subscribe/location.py index 1a00668397..4c7df6cbcb 100644 --- a/subscribe/location.py +++ b/subscribe/location.py @@ -3,6 +3,7 @@ # @Author : wzdnzd # @Time : 2024-07-12 +import html import json import math import os @@ -13,8 +14,11 @@ import sys import time import urllib +import urllib.parse +import urllib.request from collections import defaultdict from dataclasses import dataclass +from functools import partial import utils import yaml @@ -739,6 +743,23 @@ def make_proxy_request( logger.warning("No port provided for proxy") return False, {} + def _build_headers(url: str) -> dict: + result = urllib.parse.urlparse(url) + base = f"{result.scheme}://{result.netloc}" if result.scheme and result.netloc else "" + + headers = { + "User-Agent": utils.USER_AGENT, + "Accept": "application/json, text/plain, */*", + "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", + "Cache-Control": "no-cache", + "Pragma": "no-cache", + "Connection": "close", + "Referer": f"{base}/" if base else url, + "Origin": base if base else url, + } + + return headers + # Configure the proxy for the request proxy_url = f"http://127.0.0.1:{port}" proxies_config = {"http": proxy_url, "https": proxy_url} @@ -746,16 +767,12 @@ def make_proxy_request( # Configure proxy handler proxy_handler = urllib.request.ProxyHandler(proxies_config) - # Build opener with proxy handler - opener = urllib.request.build_opener(proxy_handler) + # Build opener with proxy handler and custom SSL context. + # Using explicit Request(headers=...) is more stable than opener.addheaders for proxy HTTPS requests. + opener = urllib.request.build_opener(proxy_handler, urllib.request.HTTPSHandler(context=utils.CTX)) + default_headers = _build_headers(url) if headers and isinstance(headers, dict): - opener.addheaders = [(k, v) for k, v in headers.items() if k] - else: - opener.addheaders = [ - ("User-Agent", utils.USER_AGENT), - ("Accept", "application/json"), - ("Connection", "close"), - ] + default_headers.update({k: v for k, v in headers.items() if k and v is not None}) # Try to get response with retry and backoff attempt, success, data = 0, False, None @@ -767,7 +784,8 @@ def make_proxy_request( time.sleep(wait_time) # Make request - response = opener.open(url, timeout=timeout) + request = urllib.request.Request(url=url, headers=default_headers, method="GET") + response = opener.open(request, timeout=timeout) if response.getcode() == 200: content = response.read().decode("utf-8") data = json.loads(content) if deserialize else content @@ -802,10 +820,10 @@ def get_ipv4(port: int, max_retries: int = 5) -> str: # Online API services for IP location LOCATION_API_SERVICES = [ {"url": "https://ipinfo.io", "country_key": "country"}, + {"url": "https://api.ip2location.io", "country_key": "country_code"}, {"url": "https://ipapi.co/json/", "country_key": "country_code"}, {"url": "https://ipwho.is", "country_key": "country_code"}, - {"url": "https://freeipapi.com/api/json", "country_key": "countryCode"}, - {"url": "https://api.country.is", "country_key": "country"}, + {"url": "https://free.freeipapi.com/api/json", "country_key": "countryCode"}, {"url": "https://api.ip.sb/geoip", "country_key": "country_code"}, ] @@ -818,7 +836,7 @@ def random_delay(min_delay: float = 0.01, max_delay: float = 0.5): time.sleep(random.uniform(min_delay, max_delay)) -def check_residential(proxy: dict, port: int, api_key: str = "", use_ipinfo: bool = True) -> ProxyQueryResult: +def check_residential(proxy: dict, port: int, api_key: str = "", ip_library: str = "ip2location") -> ProxyQueryResult: """ Check if a proxy is residential by making a request through it @@ -826,39 +844,133 @@ def check_residential(proxy: dict, port: int, api_key: str = "", use_ipinfo: boo proxy: The proxy information dict port: The port of the proxy api_key: Optional API key for ipapi.is. Uses free tier if not provided - use_ipinfo: Whether to use ipinfo.io instead of ipapi.is, defaults to True + ip_library: IP query provider, supported: ip2location/iplark/ipinfo/ippure/ipapi (default: ip2location) Returns: ProxyQueryResult: Complete proxy query result """ - def _get_ipapi_url(key: str = "") -> str: - url, key = "https://api.ipapi.is", utils.trim(key) - if key: - url += f"?key={key}" - return url - - def _get_ipinfo_url(port: int, name: str) -> str: - # First, get the IP address - success, content = make_proxy_request( - port=port, - url="https://ipinfo.io/ip", - max_retries=2, - timeout=15, - deserialize=False, - ) - if not success or not content: - logger.warning(f"Failed to get IP from ipinfo.io for proxy {name}") - return "" + def _build_url(provider: str, port: int, name: str, api_key: str) -> str: + if provider == "ipinfo": + # First, get the IP address + success, content = make_proxy_request( + port=port, + url="https://ipinfo.io/ip", + max_retries=2, + timeout=15, + deserialize=False, + ) + if not success or not content: + logger.warning(f"Failed to get IP from ipinfo.io for proxy {name}") + return "" + + # Extract IP from response + ip = utils.trim(content) + if not ip: + logger.warning(f"Invalid IP address from ipinfo.io for proxy {name}") + return "" + + # Now get detailed information using the IP + return f"https://ipinfo.io/widget/demo/{ip}" + elif provider == "ipapi": + url, key = "https://api.ipapi.is", utils.trim(api_key) + if key: + url += f"?key={key}" + return url + elif provider == "ippure": + return "https://my.ippure.com/v1/info" + elif provider == "ip2location": + return "https://www.ip2location.com/demo" + + return "https://iplark.com/ipapi/public/ipinfo" + + def _get_providers(preferred: str) -> list[str]: + candidates = ["ip2location", "iplark", "ippure", "ipinfo", "ipapi"] + + library = utils.trim(preferred).lower() + if library not in candidates: + library = "ip2location" + + providers = [library] + for item in candidates: + if item not in providers: + providers.append(item) + + return providers + + def _parse_data(provider: str, response: dict) -> tuple[dict, str, str, str]: + data, country_code, company_type, asn_type = {}, "", "", "" + + if provider == "ipinfo": + data = response.get("data", {}) if isinstance(response, dict) else {} + country_code = data.get("country", "") + company_type = data.get("company", {}).get("type", "") + asn_type = data.get("asn", {}).get("type", "") + elif provider == "ipapi": + data = response if isinstance(response, dict) else {} + country_code = data.get("location", {}).get("country_code", "") + company_type = data.get("company", {}).get("type", "") + asn_type = data.get("asn", {}).get("type", "") + elif provider == "ippure": + data = response if isinstance(response, dict) else {} + country_code = data.get("countryCode", "") + + flag = data.get("isResidential", False) + if flag: + company_type, asn_type = "isp", "isp" + else: + company_type, asn_type = "hosting", "hosting" + elif provider == "ip2location": + data = response if isinstance(response, dict) else {} + country_code = data.get("country_code", "") - # Extract IP from response - ip = utils.trim(content) - if not ip: - logger.warning(f"Invalid IP address from ipinfo.io for proxy {name}") - return "" + usage_type = utils.trim(data.get("usage_type", "")).lower() + as_usage_type = utils.trim(data.get("as_info", {}).get("as_usage_type", "")).lower() + + check = lambda usage: usage.startswith("isp") or usage == "mob" + if check(usage_type) and check(as_usage_type): + company_type, asn_type = "isp", "isp" + else: + company_type, asn_type = "hosting", "hosting" + else: + data = response if isinstance(response, dict) else {} + country_code = data.get("country_code", "") + + node_type = utils.trim(data.get("type", "")).lower() + if node_type == "isp": + company_type, asn_type = "isp", "isp" + elif node_type == "business": + company_type, asn_type = "business", "business" + else: + company_type, asn_type = "hosting", "hosting" + + return data, utils.trim(country_code).upper(), utils.trim(company_type).lower(), utils.trim(asn_type).lower() + + def _extract_ip2location_data(content: str) -> dict: + """Extract JSON payload from ip2location demo HTML response.""" + if not content or not isinstance(content, str): + return {} - # Now get detailed information using the IP - return f"https://ipinfo.io/widget/demo/{ip}" + pattern = r']*class=["\'][^"\']*\blanguage-json\b[^"\']*["\'][^>]*>(.*?)\s*' + groups = re.findall(pattern, content, flags=re.I | re.S) + if not groups: + return {} + + for group in groups: + payload = utils.trim(group) + if not payload: + continue + + # Some syntax highlighters may inject tags into the JSON block. + payload = re.sub(r"<[^>]+>", "", payload, flags=re.I | re.S) + payload = html.unescape(payload) + + try: + return json.loads(payload) + except Exception: + continue + + return {} name = proxy.get("name", "") result = ProxyInfo(name=name) @@ -871,32 +983,58 @@ def _get_ipinfo_url(port: int, name: str) -> str: random_delay() try: - url = "" - if use_ipinfo: - url = _get_ipinfo_url(port=port, name=name) + providers = _get_providers(ip_library) + success, response, provider = False, None, "" - if not url: - url = _get_ipapi_url(key=api_key) - use_ipinfo = False + for idx, item in enumerate(providers): + url = _build_url(provider=item, port=port, name=name, api_key=api_key) + if not url: + continue - # Call API for IP information through the proxy - success, response = make_proxy_request(port=port, url=url, max_retries=2, timeout=12) + # Call API for IP information through the proxy + deserialize, headers = True, None + if item == "ip2location": + headers = {"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"} + deserialize = False + + success, response = make_proxy_request( + port=port, + url=url, + max_retries=2, + timeout=12, + headers=headers, + deserialize=deserialize, + ) + + if success and item == "ip2location": + response = _extract_ip2location_data(response) + success = isinstance(response, dict) and bool(response) + if not success: + logger.warning(f"Failed to extract JSON payload from ip2location demo HTML for proxy {name}") + + if success: + provider = item + logger.debug(f"IP infor for proxy {name} successfully retrieved, provider: {provider}") + break + + if idx < len(providers) - 1: + fallback = providers[idx + 1] + logger.warning(f"Failed to query {url} for proxy {name}, provider={item}, trying fallback: {fallback}") + else: + logger.warning(f"Failed to query {url} for proxy {name}, provider={item}") # Parse data from response if success: try: - data = response.get("data", {}) if use_ipinfo else response - - # Extract country code from data - if use_ipinfo: - country_code = data.get("country", "") - else: - country_code = data.get("location", {}).get("country_code", "") + data, country_code, company_type, asn_type = _parse_data(provider, response) - result.country = ISO_TO_CHINESE.get(country_code, "") if country_code else "" + if country_code: + result.country = ISO_TO_CHINESE.get(country_code, "") - company_type = data.get("company", {}).get("type", "") - asn_type = data.get("asn", {}).get("type", "") + if not result.country: + result.country = utils.trim( + data.get("country_zh", "") or data.get("country", "") or data.get("country_name", "") + ) # Check if it's residential (both company and asn type should be "isp") if company_type == "isp" and asn_type == "isp": @@ -905,9 +1043,9 @@ def _get_ipinfo_url(port: int, name: str) -> str: result.ip_type = "business" except Exception as e: - logger.error(f"Error parsing {url} response for proxy {name}: {str(e)}") + logger.error(f"Error parsing response for proxy {name}: {str(e)}") else: - logger.warning(f"Failed to query {url} for proxy {name}") + logger.warning(f"Failed to query residential info for proxy {name} with providers: {providers}") # Determine if query was successful flag = result.country != "" or result.ip_type != "" @@ -1161,6 +1299,7 @@ def regularize( show_progress: bool = True, locate: bool = False, residential: bool = False, + ip_library: str = "", digits: int = 2, ) -> list[dict]: if not proxies or not isinstance(proxies, list): @@ -1181,7 +1320,7 @@ def regularize( # Use mihomo to check for residential proxies results = batch_query( proxies=proxies, - func=check_residential, + func=partial(check_residential, ip_library=ip_library), num_threads=num_threads, show_progress=show_progress, description="Checking residential", diff --git a/subscribe/process.py b/subscribe/process.py index cc98014986..3fdce485e3 100644 --- a/subscribe/process.py +++ b/subscribe/process.py @@ -62,6 +62,7 @@ def load_configs( only_check: bool = False, num_threads: int = 0, display: bool = True, + retry: int = 3, ) -> ProcessConfig: def parse_config(config: dict) -> None: tasks.extend(config.get("domains", [])) @@ -273,7 +274,7 @@ def verify(storage: dict, groups: dict) -> bool: url, ): headers = {"User-Agent": utils.USER_AGENT, "Referer": url} - content = utils.http_get(url=url, headers=headers) + content = utils.http_get(url=url, headers=headers, retry=max(retry, 1), timeout=120) if not content: logger.error(f"cannot fetch config from remote, url: {utils.hide(url=url)}") else: @@ -509,6 +510,7 @@ def aggregate(args: argparse.Namespace) -> None: clash_bin, subconverter_bin = executable.which_bin() display = not args.invisible + retry = min(max(1, args.retry), 10) # parse config server = utils.trim(args.server) or os.environ.get("SUBSCRIBE_CONF", "").strip() @@ -517,11 +519,11 @@ def aggregate(args: argparse.Namespace) -> None: only_check=args.check, num_threads=args.num, display=display, + retry=retry, ) storages = process_config.storage or {} pushtool = push.get_instance(config=push.PushConfig.from_dict(storages)) - retry = min(max(1, args.retry), 10) # generate tasks tasks, groups, sites = assign( @@ -637,6 +639,7 @@ def aggregate(args: argparse.Namespace) -> None: if regularize and isinstance(regularize, dict) and regularize.get("enable", False): locate = regularize.get("locate", False) residential = regularize.get("residential", False) + ip_library = regularize.get("library", "") try: bits = max(1, int(regularize.get("bits", 2))) except: @@ -648,6 +651,7 @@ def aggregate(args: argparse.Namespace) -> None: show_progress=display, locate=locate, residential=residential, + ip_library=ip_library, digits=bits, ) diff --git a/subscribe/scripts/fofa.py b/subscribe/scripts/fofa.py index 30e8ef3e94..d1bbdd4695 100644 --- a/subscribe/scripts/fofa.py +++ b/subscribe/scripts/fofa.py @@ -67,7 +67,7 @@ def extract_one(url: str) -> list[str]: regex = r"(?:https?://)?(?:[a-zA-Z0-9\u4e00-\u9fa5\-]+\.)+[a-zA-Z0-9\u4e00-\u9fa5\-]+(?:(?:(?:/index.php)?/api/v1/client/subscribe\?token=[a-zA-Z0-9]{16,32})|(?:/link/[a-zA-Z0-9]+\?(?:sub|mu|clash)=\d)|(?:/(?:s|sub)/[a-zA-Z0-9]{32}))" - headers = {"User-Agent": "Clash.Meta; Mihomo"} + headers = {"User-Agent": f"{utils.USER_AGENT}; Clash.Meta; Mihomo; Shadowrocket;"} subscriptions, content = [], "" count, retry = 0, 2 diff --git a/subscribe/subconverter.py b/subscribe/subconverter.py index fa4b552ad0..d7fa3bc07a 100644 --- a/subscribe/subconverter.py +++ b/subscribe/subconverter.py @@ -103,7 +103,7 @@ def generate_conf( lines.extend(["emoji=false", "add_emoji=false"]) if ignore_exclude: - lines.append("exclude=流量|过期|剩余|时间|Expire|Traffic") + lines.append("exclude=[到过]期|Expire|Traffic|剩余流量|时间|官网|产品|联系") lines.append("\n") content = "\n".join(lines) diff --git a/subscribe/utils.py b/subscribe/utils.py index 957731a715..75f8962acb 100644 --- a/subscribe/utils.py +++ b/subscribe/utils.py @@ -35,7 +35,7 @@ CTX.verify_mode = ssl.CERT_NONE USER_AGENT = ( - "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36" + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36" ) diff --git a/tools/socks-checker.py b/tools/socks-checker.py index c7bbb26388..5e0e7608e5 100644 --- a/tools/socks-checker.py +++ b/tools/socks-checker.py @@ -11,13 +11,15 @@ import argparse import asyncio +import html import ipaddress +import json import re import sys from dataclasses import dataclass from datetime import datetime -from typing import Dict, List, Optional, Tuple -from urllib.parse import urlparse +from typing import Any, Callable, Dict, List, Optional, Tuple +from urllib.parse import quote, urlparse import aiohttp import yaml @@ -276,6 +278,32 @@ } +def country_flag_emoji(country_code: str) -> str: + if not country_code or len(country_code) != 2: + return "" + + code = country_code.upper() + if not code.isalpha(): + return "" + + return chr(0x1F1E6 + ord(code[0]) - ord("A")) + chr(0x1F1E6 + ord(code[1]) - ord("A")) + + +def country_name_zh(country_code: str) -> str: + if not country_code: + return "" + + return COUNTRY_NAME_ZH.get(country_code.upper(), "") + + +def short_company_name(value: str) -> str: + if not value: + return "UNKNOWN" + + parts = [part for part in re.split(r"[\s,\.\-_@;:]+", value.strip()) if part] + return parts[0].upper() if parts else "UNKNOWN" + + @dataclass class ProxyInfo: protocol: str @@ -309,6 +337,399 @@ def from_proxy(cls, proxy_info: ProxyInfo) -> "TestResult": ) +@dataclass +class IpLookupResult: + ip: Optional[str] + data: Optional[Dict] + error: Optional[str] = None + + +class IPLibrary: + name: str = "" + + async def lookup( + self, session: aiohttp.ClientSession, proxy_info: ProxyInfo, retries: int, timeout: int + ) -> IpLookupResult: + data, error = await self._fetch(session, proxy_info, retries, timeout) + if not data: + host = "" if not proxy_info else proxy_info.host + return IpLookupResult(None, None, error or f"Failed to get IP info from {self.name}, host: {host}") + + return self._verify(data, self.name) + + def build_remark(self, data: Dict, include_asn_name: bool) -> str: + raise NotImplementedError + + async def _fetch( + self, session: aiohttp.ClientSession, proxy_info: ProxyInfo, retries: int, timeout: int + ) -> Tuple[Optional[Dict], Optional[str]]: + raise NotImplementedError + + @staticmethod + def _build_headers(url: str) -> Dict[str, str]: + result = urlparse(url) + base = f"{result.scheme}://{result.netloc}" if result.scheme and result.netloc else "" + + return { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36", + "Accept": "application/json, text/plain, */*", + "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", + "Cache-Control": "no-cache", + "Pragma": "no-cache", + "Connection": "close", + "Referer": f"{base}/" if base else url, + "Origin": base if base else url, + } + + async def _make_request( + self, + session: aiohttp.ClientSession, + url: str, + retries: int, + timeout: int, + headers: Optional[Dict[str, str]] = None, + deserialize: bool = True, + parser: Optional[Callable[[str], Any]] = None, + ) -> Tuple[Optional[Any], Optional[str]]: + default_headers = self._build_headers(url) + if headers and isinstance(headers, dict): + default_headers.update({k: v for k, v in headers.items() if k and v is not None}) + + error = None + for attempt in range(1, retries + 1): + try: + async with session.get( + url, + headers=default_headers, + timeout=aiohttp.ClientTimeout(total=timeout), + ) as response: + if response.status == 200: + content = await response.text() + + if parser is not None: + data = parser(content) + if data: + return data, None + + error = "Invalid response payload" + elif deserialize: + try: + data = json.loads(content) + except Exception: + data = None + + if isinstance(data, dict): + return data, None + + error = "Invalid JSON response" + else: + return content, None + + else: + error = f"HTTP {response.status}" + except asyncio.TimeoutError: + error = "Timeout" + except Exception as e: + error = str(e)[:100] + + if attempt < retries: + await asyncio.sleep(attempt) + + return None, error + + @staticmethod + def _verify(data: Dict, source: str) -> IpLookupResult: + address = (data.get("ip") or "").strip() + if not address: + return IpLookupResult(None, None, f"Invalid IP from {source}") + + try: + ipaddress.ip_address(address) + except ValueError: + return IpLookupResult(None, None, f"Invalid IP from {source}, ip: {address}") + + return IpLookupResult(address, data, None) + + @staticmethod + def _format_remark( + country_code: str, + country: str, + label: str, + include_asn_name: bool, + company_name: str, + detail: str = "", + ) -> str: + flag = country_flag_emoji(country_code) + base = f"{flag} {country}{label}".strip() + + if include_asn_name and company_name: + if detail: + return f"{base} [{company_name}::{detail}]".strip() + + return f"{base} [{company_name}]".strip() + + return base + + +class IPInfoLibrary(IPLibrary): + name = "ipinfo" + + def build_remark(self, data: Dict, include_asn_name: bool) -> str: + country_code = (data.get("country") or "").upper() + flag = country_flag_emoji(country_code) + + asn_info = data.get("asn", {}) or {} + company_info = data.get("company", {}) or {} + asn_type = (asn_info.get("type") or "").lower() + company_type = (company_info.get("type") or "").lower() + + asn_name = (asn_info.get("domain") or "").strip() + if not asn_name or re.match(r"^as\d+\.", asn_name, flags=re.I): + asn_name = (asn_info.get("name") or "").strip() + + company_name = short_company_name(asn_name) + + if asn_type == "isp" and company_type == "isp": + label = "家宽" + elif asn_type == "isp" or company_type == "isp": + label = "商宽" + elif asn_type == "edu" or company_type == "edu": + label = "教育" + else: + label = "" + + country = country_name_zh(country_code) or country_code or "未知" + base = f"{flag} {country}{label}".strip() + if include_asn_name and company_name: + return f"{base} [{company_name}]".strip() + + return base + + @staticmethod + def _is_ipv4(host: str) -> bool: + if not host: + return False + try: + return isinstance(ipaddress.ip_address(host), ipaddress.IPv4Address) + except ValueError: + return False + + async def _resolve_ip(self, session: aiohttp.ClientSession, host: str, retries: int, timeout: int) -> Optional[str]: + if self._is_ipv4(host): + return host + + url = "https://ipinfo.io/ip" + for attempt in range(1, retries + 1): + try: + async with session.get( + url, + headers=self._build_headers(url), + timeout=aiohttp.ClientTimeout(total=timeout), + ) as response: + if response.status == 200: + text = (await response.text()).strip() + try: + ipaddress.ip_address(text) + return text + except ValueError: + pass + except asyncio.TimeoutError: + pass + except Exception: + pass + + if attempt < retries: + await asyncio.sleep(attempt) + + return None + + async def _fetch( + self, session: aiohttp.ClientSession, proxy_info: ProxyInfo, retries: int, timeout: int + ) -> Tuple[Optional[Dict], Optional[str]]: + host = proxy_info.host if proxy_info else "" + address = await self._resolve_ip(session, host, retries, timeout) + if not address: + return None, f"Failed to get IP from ipinfo.io/ip, host: {host}" + + url = f"https://ipinfo.io/widget/demo/{address}" + data, error = await self._make_request(session, url, retries, timeout) + if not data: + return None, error or f"Failed to get IP info from ipinfo.io, ip: {address}" + + return data.get("data", data), None + + +class IPPureLibrary(IPLibrary): + name = "ippure" + + def build_remark(self, data: Dict, include_asn_name: bool) -> str: + residential = data.get("isResidential") + label = "家宽" if residential is True else "" + + country_code = (data.get("countryCode") or "").upper() + country = country_name_zh(country_code) or (data.get("country") or "未知") + + company_name = short_company_name(data.get("asOrganization") or "") + score = str(data.get("fraudScore")).zfill(3) if "fraudScore" in data else "NUL" + + return self._format_remark( + country_code=country_code, + country=country, + label=label, + include_asn_name=include_asn_name, + company_name=company_name, + detail=score, + ) + + async def _fetch( + self, session: aiohttp.ClientSession, _: ProxyInfo, retries: int, timeout: int + ) -> Tuple[Optional[Dict], Optional[str]]: + url = "https://my.ippure.com/v1/info" + return await self._make_request(session, url, retries, timeout) + + +class IP2LocationLibrary(IPLibrary): + name = "ip2location" + + def build_remark(self, data: Dict, include_asn_name: bool) -> str: + as_info = data.get("as_info") or {} + + usage_type = (data.get("usage_type") or "").strip().lower() + as_usage_type = ((as_info.get("as_usage_type") if isinstance(as_info, dict) else "") or "").strip().lower() + + check = lambda usage: usage.startswith("isp") or usage == "mob" + label = "家宽" if check(usage_type) and check(as_usage_type) else "" + + country_code = (data.get("country_code") or "").upper() + country = ( + country_name_zh(country_code) + or data.get("country_name") + or data.get("country", {}).get("name", "") + or "未知" + ) + + provider = (data.get("as", "") or data.get("isp", "") or "").strip() + if not provider and as_info and isinstance(as_info, dict): + provider = (as_info.get("as_name", "") or as_info.get("as_domain", "")).strip() + if not provider: + provider = data.get("domain", "").strip() or "" + + company_name = short_company_name(provider) + score = str(data.get("fraud_score")).zfill(3) if "fraud_score" in data else "NUL" + + return self._format_remark( + country_code=country_code, + country=country, + label=label, + include_asn_name=include_asn_name, + company_name=company_name, + detail=score, + ) + + @staticmethod + def _extract_data(content: str) -> Dict: + if not content or not isinstance(content, str): + return {} + + pattern = r']*class=["\'][^"\']*\blanguage-json\b[^"\']*["\'][^>]*>(.*?)\s*' + groups = re.findall(pattern, content, flags=re.I | re.S) + if not groups: + return {} + + for group in groups: + payload = group.strip() + if not payload: + continue + + payload = re.sub(r"<[^>]+>", "", payload, flags=re.I | re.S) + payload = html.unescape(payload) + + try: + data = json.loads(payload) + if isinstance(data, dict): + return data + except Exception: + continue + + return {} + + async def _fetch( + self, session: aiohttp.ClientSession, _: ProxyInfo, retries: int, timeout: int + ) -> Tuple[Optional[Dict], Optional[str]]: + url = "https://www.ip2location.com/demo" + headers = {"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"} + data, error = await self._make_request( + session=session, + url=url, + retries=retries, + timeout=timeout, + headers=headers, + deserialize=False, + parser=self._extract_data, + ) + if not data: + return None, "Invalid HTML response" if error == "Invalid response payload" else error + + return data, None + + +class IPLarkLibrary(IPLibrary): + name = "iplark" + + def build_remark(self, data: Dict, include_asn_name: bool) -> str: + node_type = (data.get("type") or "").strip().lower() + if node_type == "isp": + label = "家宽" + elif node_type == "business": + label = "商宽" + elif node_type == "education": + label = "教育" + else: + label = "" + + country_code = (data.get("country_code") or "").upper() + country = country_name_zh(country_code) or (data.get("country_zh") or data.get("country") or "未知") + + # asn = str(data.get("asn") or "").strip() + # detail = f"AS{asn}" if asn else "NUL" + detail = "" + + company_name = short_company_name(data.get("organization") or "") + + return self._format_remark( + country_code=country_code, + country=country, + label=label, + include_asn_name=include_asn_name, + company_name=company_name, + detail=detail, + ) + + async def _fetch( + self, session: aiohttp.ClientSession, _: ProxyInfo, retries: int, timeout: int + ) -> Tuple[Optional[Dict], Optional[str]]: + url = "https://iplark.com/ipapi/public/ipinfo" + return await self._make_request(session, url, retries, timeout) + + +IP_LIBRARIES = { + "ip2location": IP2LocationLibrary, + "iplark": IPLarkLibrary, + "ipinfo": IPInfoLibrary, + "ippure": IPPureLibrary, +} + + +def get_ip_library(name: str) -> IPLibrary: + key = (name or "ip2location").strip().lower() + library = IP_LIBRARIES.get(key) + if not library: + supported = ", ".join(sorted(IP_LIBRARIES.keys())) + raise ValueError(f"Unsupported ip library: {name}. Supported: {supported}") + + return library() + + class ProxyChecker: def __init__( self, @@ -316,6 +737,7 @@ def __init__( format_pattern: Optional[str] = None, default_port: int = 1080, include_asn_name: bool = False, + ip_library: str = "ip2location", ): """ 初始化代理检测器 @@ -332,6 +754,7 @@ def __init__( self.format_pattern = format_pattern self.default_port = default_port self.include_asn_name = include_asn_name + self.ip_library = get_ip_library(ip_library) self.results: List[TestResult] = [] self.summary: Optional[Dict[str, float]] = None @@ -340,7 +763,7 @@ def parse_proxy(self, text: str, format_pattern: Optional[str] = None) -> Option 解析代理字符串,支持自定义格式 支持的格式占位符: - - {protocol}: 协议类型 (socks5/socks4/http等) + - {protocol}: 协议类型 (socks5/socks4/http/https等) - {username}: 用户名 - {password}: 密码 - {host}: 主机地址 @@ -382,10 +805,7 @@ def parse_proxy(self, text: str, format_pattern: Optional[str] = None) -> Option prefix = f"socks5://{prefix}" result = urlparse(prefix) - protocol = result.scheme or "socks5" - if protocol == "https": - protocol = "http" return ProxyInfo( protocol=protocol, @@ -472,9 +892,6 @@ def _parse_custom_format(self, text: str, format_pattern: str) -> Optional[Proxy elif placeholder == "host": host = value - if protocol == "https": - protocol = "http" - return ProxyInfo( protocol=protocol, username=username, @@ -490,33 +907,28 @@ async def test_proxy(self, proxy_info: ProxyInfo, retries: int = 3) -> TestResul Test a single proxy with retries. """ result = TestResult.from_proxy(proxy_info) - - # Build proxy URL - if proxy_info.username and proxy_info.password: - proxy_url = ( - f"{proxy_info.protocol}://{proxy_info.username}:{proxy_info.password}" - f"@{proxy_info.host}:{proxy_info.port}" - ) - else: - proxy_url = f"{proxy_info.protocol}://{proxy_info.host}:{proxy_info.port}" - start_time = datetime.now() try: - connector = ProxyConnector.from_url(proxy_url) - async with aiohttp.ClientSession(connector=connector) as session: - ip_address = await self._resolve_ip_with_proxy(session, proxy_info.host, retries) - if not ip_address: - result.error = "Failed to get IP from ipinfo.io/ip" - return result - - ip_data, ip_error = await self._fetch_ipinfo(session, ip_address, retries) - if not ip_data: - result.error = ip_error or "Failed to get IP info from ipinfo.io" + protocol = (proxy_info.protocol or "").lower() + if protocol in ("http", "https"): + proxy_url = self._build_proxy_url(proxy_info, include_auth=False) + proxy_auth = self._build_proxy_auth(proxy_info) + connector = aiohttp.TCPConnector(ssl=False) + session = aiohttp.ClientSession(connector=connector, proxy=proxy_url, proxy_auth=proxy_auth) + else: + proxy_url = self._build_proxy_url(proxy_info, include_auth=True) + connector = ProxyConnector.from_url(proxy_url) + session = aiohttp.ClientSession(connector=connector) + + async with session: + lookup = await self.ip_library.lookup(session, proxy_info, retries, self.timeout) + if not lookup.ip or not lookup.data: + result.error = lookup.error or f"Failed to get IP info from {self.ip_library.name}" return result - remark = self._build_remark_from_ipinfo(ip_data) + remark = self.ip_library.build_remark(lookup.data, self.include_asn_name) result.remark = remark - result.ip = ip_address + result.ip = lookup.ip result.status = "success" result.response_time = round((datetime.now() - start_time).total_seconds(), 2) result.error = None @@ -531,77 +943,6 @@ async def test_proxy(self, proxy_info: ProxyInfo, retries: int = 3) -> TestResul result.error = str(e)[:100] return result - def _is_ipv4(self, host: str) -> bool: - if not host: - return False - try: - return isinstance(ipaddress.ip_address(host), ipaddress.IPv4Address) - except ValueError: - return False - - async def _resolve_ip_with_proxy(self, session: aiohttp.ClientSession, host: str, retries: int) -> Optional[str]: - if self._is_ipv4(host): - return host - - url = "https://ipinfo.io/ip" - for attempt in range(1, retries + 1): - try: - async with session.get(url, timeout=aiohttp.ClientTimeout(total=self.timeout)) as response: - if response.status == 200: - text = (await response.text()).strip() - try: - ipaddress.ip_address(text) - return text - except ValueError: - pass - except asyncio.TimeoutError: - pass - except Exception: - pass - - if attempt < retries: - await asyncio.sleep(attempt) - - return None - - async def _fetch_ipinfo( - self, session: aiohttp.ClientSession, ip_address: str, retries: int - ) -> Tuple[Optional[Dict], Optional[str]]: - url = f"https://ipinfo.io/widget/demo/{ip_address}" - last_error = None - for attempt in range(1, retries + 1): - try: - async with session.get(url, timeout=aiohttp.ClientTimeout(total=self.timeout)) as response: - if response.status == 200: - data = await response.json() - if isinstance(data, dict): - return data.get("data", data), None - last_error = "Invalid JSON response" - else: - last_error = f"HTTP {response.status}" - except asyncio.TimeoutError: - last_error = "Timeout" - except Exception as e: - last_error = str(e)[:100] - - if attempt < retries: - await asyncio.sleep(attempt) - - return None, last_error - - def _country_flag_emoji(self, country_code: str) -> str: - if not country_code or len(country_code) != 2: - return "" - code = country_code.upper() - if not code.isalpha(): - return "" - return chr(0x1F1E6 + ord(code[0]) - ord("A")) + chr(0x1F1E6 + ord(code[1]) - ord("A")) - - def _country_name_zh(self, country_code: str) -> str: - if not country_code: - return "" - return COUNTRY_NAME_ZH.get(country_code.upper(), "") - def _format_standard(self, proxy_info: ProxyInfo, remark: str) -> str: auth = "" if proxy_info.username or proxy_info.password: @@ -611,58 +952,43 @@ def _format_standard(self, proxy_info: ProxyInfo, remark: str) -> str: return f"{base}#{remark}" return base + def _build_proxy_url(self, proxy_info: ProxyInfo, include_auth: bool) -> str: + auth = "" + if include_auth and (proxy_info.username or proxy_info.password): + username = quote(proxy_info.username or "", safe="") + password = quote(proxy_info.password or "", safe="") + auth = f"{username}:{password}@" + + return f"{proxy_info.protocol}://{auth}{proxy_info.host}:{proxy_info.port}" + + def _build_proxy_auth(self, proxy_info: ProxyInfo) -> Optional[aiohttp.BasicAuth]: + if not (proxy_info.username or proxy_info.password): + return None + + return aiohttp.BasicAuth(proxy_info.username or "", proxy_info.password or "") + def _yaml_quote(self, value: str) -> str: escaped = value.replace("\\", "\\\\").replace('"', '\\"') return f'"{escaped}"' def _format_yaml_line(self, result: TestResult) -> str: name = result.remark or result.host + protocol = (result.protocol or "").lower() + clash_type = "http" if protocol == "https" else protocol parts = [ f"name: {self._yaml_quote(name)}", f"server: {self._yaml_quote(result.host)}", f"port: {result.port}", - f"type: {self._yaml_quote(result.protocol)}", + f"type: {self._yaml_quote(clash_type)}", ] + if protocol == "https": + parts.append("tls: true") if result.username: parts.append(f"username: {self._yaml_quote(result.username)}") if result.password: parts.append(f"password: {self._yaml_quote(result.password)}") return " - {" + ", ".join(parts) + "}" - def _build_remark_from_ipinfo(self, ip_data: Dict) -> str: - country_code = (ip_data.get("country") or "").upper() - flag = self._country_flag_emoji(country_code) - country_display = self._country_name_zh(country_code) or "未知" - - asn_info = ip_data.get("asn", {}) or {} - company_info = ip_data.get("company", {}) or {} - asn_type = (asn_info.get("type") or "").lower() - company_type = (company_info.get("type") or "").lower() - - asn_name = (asn_info.get("domain") or "").strip() - if not asn_name: - asn_name = (asn_info.get("name") or "").strip() - - if asn_name: - parts = [p for p in re.split(r"[\s,\.]+", asn_name) if p] - company_name = parts[0].upper() if parts else "UNKNOWN" - else: - company_name = "UNKNOWN" - - if asn_type == "isp" and company_type == "isp": - label = "家宽" - elif asn_type == "isp" or company_type == "isp": - label = "商宽" - elif asn_type == "edu" or company_type == "edu": - label = "教育" - else: - label = "" - - base = f"{flag} {country_display}{label}".strip() - if self.include_asn_name and company_name: - return f"{base} [{company_name}]".strip() - return base - def _convert(self, input_file: str, output_file: str, output_format: str, digits: int = 2) -> None: proxies = read_proxies(input_file) if not proxies: @@ -765,7 +1091,7 @@ async def check_proxies( output_handle = None if not output_file: - output_file = f'{output_format}.{"txt" if output_format == "v2ray" else "yaml"}' + output_file = f'{output_format}-{self.ip_library}.{"txt" if output_format == "v2ray" else "yaml"}' output_handle = open(output_file, "w", encoding="utf-8") if output_format == "clash": @@ -797,12 +1123,22 @@ async def test_with_semaphore(proxy_info): # 实时输出结果 status_icon = "✓" if result.status == "success" else "✗" if result.status == "success": - print(f"{status_icon} {result.original[:60]}... | {result.response_time}s | IP: {result.ip}") + print( + f"{status_icon} {result.original[:60]}... | {result.response_time}s | Export IP: {result.ip}".encode( + "utf-8", errors="ignore" + ).decode( + "utf-8" + ) + ) if write_queue: line = self._format_yaml_line(result) if output_format == "clash" else result.proxy await write_queue.put(line + "\n") else: - print(f"{status_icon} {result.original[:60]}... | {result.error}") + print( + f"{status_icon} {result.original[:60]}... | {result.error}".encode( + "utf-8", errors="ignore" + ).decode("utf-8") + ) async with stats_lock: if result.status == "success": @@ -930,8 +1266,8 @@ def _build_proxy(entry: Dict) -> Optional[str]: return None protocol = str(entry.get("type") or "socks5").strip().lower() - if protocol == "https": - protocol = "http" + if protocol == "http" and entry.get("tls") is True: + protocol = "https" elif protocol == "socks": protocol = "socks5" @@ -1055,7 +1391,7 @@ async def main(): %(prog)s -f proxies.txt --input-format "socks5://{host}:{port}:{username}:{password}" 支持的格式占位符: - {protocol} - 协议类型 (socks5/socks4/http等) + {protocol} - 协议类型 (socks5/socks4/http/https等) {username} - 用户名 {password} - 密码 {host} - 主机地址/IP @@ -1089,6 +1425,14 @@ async def main(): help="在备注中追加 ASN 名称 (默认不追加)", ) + parser.add_argument( + "--ip-library", + dest="ip_library", + choices=sorted(IP_LIBRARIES.keys()), + default="ip2location", + help="IP地址数据库服务商: ip2location、iplark、ipinfo 或 ippure (默认: ip2location)", + ) + args = parser.parse_args() # 获取代理列表 @@ -1116,6 +1460,7 @@ async def main(): format_pattern=args.format_pattern, default_port=args.default_port, include_asn_name=args.include_asn_name, + ip_library=args.ip_library, ) await checker.check_proxies(