From 7e415fcf2a03f69ce94ab9061ed9ffe17dd2a6f9 Mon Sep 17 00:00:00 2001 From: Md Abu Salehin Date: Fri, 5 Dec 2025 01:32:27 +0600 Subject: [PATCH 01/11] Add IPTV link testing functionality This script tests IPTV links for functionality using various methods including HTTP requests and socket connections. It logs working and broken links to separate files. --- M3U_link_scanner/main.py | 214 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 214 insertions(+) create mode 100644 M3U_link_scanner/main.py diff --git a/M3U_link_scanner/main.py b/M3U_link_scanner/main.py new file mode 100644 index 0000000000..d40b1b46da --- /dev/null +++ b/M3U_link_scanner/main.py @@ -0,0 +1,214 @@ +import requests +import time +import concurrent.futures +from urllib.parse import urlparse +import socket +import subprocess +import os + +class IPTVLinkTester: + def __init__(self, input_file='iptv_links.txt', working_file='working_links.txt', broken_file='broken_links.txt'): + self.input_file = input_file + self.working_file = working_file + self.broken_file = broken_file + self.timeout = 20 + self.attempts = 5 + self.headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', + 'Accept': '*/*', + 'Accept-Encoding': 'identity;q=1, *;q=0', + 'Range': 'bytes=0-', + } + + def test_http_head(self, url): + """Test using HTTP HEAD request""" + try: + response = requests.head(url, headers=self.headers, timeout=self.timeout, allow_redirects=True) + return response.status_code in [200, 206, 301, 302, 307, 308] + except: + return False + + def test_http_get_partial(self, url): + """Test using HTTP GET with range request""" + try: + headers = self.headers.copy() + headers['Range'] = 'bytes=0-1024' + response = requests.get(url, headers=headers, timeout=self.timeout, stream=True, allow_redirects=True) + if response.status_code in [200, 206]: + # Try to read a small chunk + chunk = next(response.iter_content(1024), None) + return chunk is not None and len(chunk) > 0 + return False + except: + return False + + def test_http_streaming(self, url): + """Test streaming capability""" + try: + response = requests.get(url, headers=self.headers, timeout=self.timeout, stream=True, allow_redirects=True) + if response.status_code in [200, 206]: + # Try to read multiple chunks + chunk_count = 0 + for chunk in response.iter_content(chunk_size=8192): + if chunk: + chunk_count += 1 + if chunk_count >= 3: # Successfully read 3 chunks + return True + return chunk_count > 0 + return False + except: + return False + + def test_socket_connection(self, url): + """Test basic socket connection""" + try: + parsed = urlparse(url) + host = parsed.hostname + port = parsed.port or (443 if parsed.scheme == 'https' else 80) + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(self.timeout) + result = sock.connect_ex((host, port)) + sock.close() + return result == 0 + except: + return False + + def test_with_ffmpeg(self, url): + """Test using ffmpeg/ffprobe if available""" + try: + # Check if ffprobe is available + result = subprocess.run( + ['ffprobe', '-v', 'error', '-show_entries', + 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', url], + capture_output=True, + timeout=self.timeout, + text=True + ) + return result.returncode == 0 + except (subprocess.TimeoutExpired, FileNotFoundError, Exception): + return False + + def test_link_comprehensive(self, url, link_number, total_links): + """Perform comprehensive testing on a single link""" + print(f"\n{'='*70}") + print(f"Testing link {link_number}/{total_links}") + print(f"URL: {url[:80]}{'...' if len(url) > 80 else ''}") + print(f"{'='*70}") + + results = [] + test_methods = [ + ("HTTP HEAD Request", self.test_http_head), + ("HTTP GET Partial", self.test_http_get_partial), + ("HTTP Streaming", self.test_http_streaming), + ("Socket Connection", self.test_socket_connection), + ("FFmpeg Probe", self.test_with_ffmpeg) + ] + + # Perform multiple attempts for each test method + for method_name, test_func in test_methods: + print(f"\n{method_name}:") + method_results = [] + + for attempt in range(self.attempts): + print(f" Attempt {attempt + 1}/{self.attempts}...", end=' ', flush=True) + try: + result = test_func(url) + method_results.append(result) + print(f"{'✓ PASS' if result else '✗ FAIL'}") + + if result: + time.sleep(2) # Short delay on success + else: + time.sleep(3) # Slightly longer delay on failure + + except Exception as e: + print(f"✗ ERROR: {str(e)[:50]}") + method_results.append(False) + time.sleep(3) + + success_rate = sum(method_results) / len(method_results) * 100 + print(f" Success rate: {success_rate:.1f}%") + results.extend(method_results) + + # Determine if link is working + total_tests = len(results) + successful_tests = sum(results) + success_percentage = (successful_tests / total_tests) * 100 + + print(f"\n{'─'*70}") + print(f"OVERALL RESULT: {successful_tests}/{total_tests} tests passed ({success_percentage:.1f}%)") + + # If ANY test passed even once, consider it potentially working + is_working = successful_tests > 0 + + if is_working: + print(f"Status: ✓ WORKING (at least {successful_tests} test(s) succeeded)") + else: + print(f"Status: ✗ BROKEN (all tests failed)") + + print(f"{'='*70}\n") + + return is_working, success_percentage + + def process_links(self): + """Process all links from input file""" + # Read all links + try: + with open(self.input_file, 'r', encoding='utf-8') as f: + links = [line.strip() for line in f if line.strip()] + except FileNotFoundError: + print(f"Error: {self.input_file} not found!") + return + + if not links: + print("No links found in the input file!") + return + + print(f"Found {len(links)} links to test") + print(f"Each link will be tested {self.attempts} times per method") + print(f"Timeout per request: {self.timeout} seconds") + print(f"This will take approximately {len(links) * self.attempts * 5 * 3 / 60:.1f} minutes\n") + + working_links = [] + broken_links = [] + + # Test each link + for idx, link in enumerate(links, 1): + is_working, success_rate = self.test_link_comprehensive(link, idx, len(links)) + + if is_working: + working_links.append(f"{link} # Success rate: {success_rate:.1f}%\n") + else: + broken_links.append(f"{link} # All tests failed\n") + + # Small delay between links + if idx < len(links): + print("Waiting 5 seconds before next link...\n") + time.sleep(5) + + # Write results + with open(self.working_file, 'w', encoding='utf-8') as f: + f.writelines(working_links) + + with open(self.broken_file, 'w', encoding='utf-8') as f: + f.writelines(broken_links) + + # Summary + print("\n" + "="*70) + print("TESTING COMPLETE!") + print("="*70) + print(f"Total links tested: {len(links)}") + print(f"Working links: {len(working_links)} (saved to {self.working_file})") + print(f"Broken links: {len(broken_links)} (saved to {self.broken_file})") + print("="*70) + +def main(): + print("IPTV Link Tester - Comprehensive Edition") + print("="*70) + + tester = IPTVLinkTester() + tester.process_links() + +if __name__ == "__main__": + main() From fc8ea3cf3f4df541cde9553836367058873fc623 Mon Sep 17 00:00:00 2001 From: Md Abu Salehin Date: Fri, 5 Dec 2025 01:34:26 +0600 Subject: [PATCH 02/11] Add README for IPTV Link Tester tool Added comprehensive documentation for the IPTV Link Tester, detailing features, installation, usage, configuration, output files, testing process, troubleshooting, and licensing. --- M3U_link_scanner/README.md | 147 +++++++++++++++++++++++++++++++++++++ 1 file changed, 147 insertions(+) create mode 100644 M3U_link_scanner/README.md diff --git a/M3U_link_scanner/README.md b/M3U_link_scanner/README.md new file mode 100644 index 0000000000..b0d5fc5796 --- /dev/null +++ b/M3U_link_scanner/README.md @@ -0,0 +1,147 @@ +# IPTV Link Tester - Comprehensive Edition + +A robust Python tool for testing and validating IPTV M3U stream links using multiple verification methods. + +## Features + +- **Multiple Testing Methods**: Uses 5 different approaches to verify stream availability + - HTTP HEAD requests + - HTTP GET with partial content + - HTTP streaming test + - Socket connection verification + - FFmpeg/FFprobe analysis (optional) + +- **Retry Logic**: Tests each link 5 times per method for reliability +- **Detailed Reporting**: Shows real-time progress and success rates +- **Automatic Sorting**: Separates working and broken links into different files +- **Success Rate Tracking**: Records the percentage of successful tests for each link + +## Requirements + +- Python 3.6 or higher +- FFmpeg/FFprobe (optional, but recommended for better accuracy) + +## Installation + +1. Clone or download this repository + +2. Install required Python packages: +```bash +pip install -r requirements.txt +``` + +3. (Optional) Install FFmpeg for enhanced testing: + - **Windows**: Download from [ffmpeg.org](https://ffmpeg.org/download.html) + - **macOS**: `brew install ffmpeg` + - **Linux**: `sudo apt-get install ffmpeg` or `sudo yum install ffmpeg` + +## Usage + +1. Create a file named `iptv_links.txt` in the same directory as the script + +2. Add your IPTV links (one per line): +``` +http://example.com:8080/live/stream1/index.m3u8 +http://example.com:8080/live/stream2/index.m3u8 +http://example.com:8080/live/stream3/index.m3u8 +``` + +3. Run the script: +```bash +python iptv_tester.py +``` + +4. Wait for the testing to complete. The script will: + - Test each link thoroughly with multiple methods + - Show real-time progress and results + - Save working links to `working_links.txt` + - Save broken links to `broken_links.txt` + +## Configuration + +You can customize the testing parameters by modifying the `IPTVLinkTester` class initialization: + +```python +tester = IPTVLinkTester( + input_file='iptv_links.txt', # Input file with links + working_file='working_links.txt', # Output file for working links + broken_file='broken_links.txt' # Output file for broken links +) + +# Modify these attributes: +tester.timeout = 20 # Timeout per request (seconds) +tester.attempts = 5 # Number of attempts per test method +``` + +## Output Files + +### working_links.txt +Contains all links that passed at least one test, along with their success rate: +``` +http://example.com/stream1.m3u8 # Success rate: 85.0% +http://example.com/stream2.m3u8 # Success rate: 92.0% +``` + +### broken_links.txt +Contains links that failed all tests: +``` +http://example.com/dead_stream.m3u8 # All tests failed +``` + +## Testing Process + +For each link, the script performs: +- 5 attempts × 5 test methods = 25 total tests per link +- Automatic delays between attempts to avoid rate limiting +- A link is marked as "working" if ANY test succeeds + +### Test Methods Explained + +1. **HTTP HEAD Request**: Quick check if the server responds +2. **HTTP GET Partial**: Downloads first 1KB to verify data availability +3. **HTTP Streaming**: Attempts to stream multiple chunks of data +4. **Socket Connection**: Verifies basic network connectivity +5. **FFmpeg Probe**: Uses FFmpeg to analyze stream metadata (if available) + +## Estimated Time + +- Testing time depends on: + - Number of links + - Network speed + - Server response times + +- Approximate calculation: `(Number of links × 5 methods × 5 attempts × 3 seconds) / 60 minutes` +- Example: 10 links ≈ 12-15 minutes + +## Troubleshooting + +### All links showing as broken +- Check your internet connection +- Verify the links are valid and currently active +- Some IPTV providers may block automated testing +- Try reducing the number of attempts or increasing timeout + +### Script running very slowly +- This is normal due to thorough testing +- You can reduce `attempts` or `timeout` for faster results (less accurate) +- Some servers may have rate limiting + +### FFmpeg tests always fail +- FFmpeg is not installed or not in PATH +- This is optional; other methods can still validate links + +## Notes + +- Some IPTV providers implement anti-scraping measures +- Links may work for real players but fail automated tests +- False positives/negatives are possible +- Always respect the terms of service of IPTV providers +- This tool is for personal use and legitimate testing only + +## License + +Free to use for personal and educational purposes. + +## Disclaimer + +This tool is intended for testing your own IPTV subscriptions and legally obtained streams. Users are responsible for ensuring they have the right to access and test the streams they provide to this tool. From 00fe24bb668dd6ca794ab1e671e139a6e2ee93ba Mon Sep 17 00:00:00 2001 From: Md Abu Salehin Date: Fri, 5 Dec 2025 01:35:08 +0600 Subject: [PATCH 03/11] Add requirements for requests and urllib3 --- M3U_link_scanner/requirements.txt | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 M3U_link_scanner/requirements.txt diff --git a/M3U_link_scanner/requirements.txt b/M3U_link_scanner/requirements.txt new file mode 100644 index 0000000000..23b02303ae --- /dev/null +++ b/M3U_link_scanner/requirements.txt @@ -0,0 +1,2 @@ +requests>=2.31.0 +urllib3>=2.0.0 From 97905910ac69676703f1b8d3bed7d45045b4e7b1 Mon Sep 17 00:00:00 2001 From: Md Abu Salehin Date: Fri, 5 Dec 2025 01:39:10 +0600 Subject: [PATCH 04/11] Add iptv_tester.py to M3U_link_scanner Added the iptv link scanners script by python --- M3U_link_scanner/{main.py => iptv_tester.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename M3U_link_scanner/{main.py => iptv_tester.py} (100%) diff --git a/M3U_link_scanner/main.py b/M3U_link_scanner/iptv_tester.py similarity index 100% rename from M3U_link_scanner/main.py rename to M3U_link_scanner/iptv_tester.py From 7965da39505234af633b349696487c838562e4f9 Mon Sep 17 00:00:00 2001 From: Md Abu Salehin Date: Fri, 5 Dec 2025 09:35:46 +0600 Subject: [PATCH 05/11] Refactor IPTV link tester for improved structure Refactor IPTV link testing methods for improved error handling and resource management. Consolidate repeated code into helper functions and enhance overall readability. Pr #3271 --- M3U_link_scanner/iptv_tester.py | 296 +++++++++++++++++++++++--------- 1 file changed, 215 insertions(+), 81 deletions(-) diff --git a/M3U_link_scanner/iptv_tester.py b/M3U_link_scanner/iptv_tester.py index d40b1b46da..511ee6f480 100644 --- a/M3U_link_scanner/iptv_tester.py +++ b/M3U_link_scanner/iptv_tester.py @@ -1,13 +1,33 @@ +""" +IPTV Link Tester - Comprehensive Edition +Tests M3U/IPTV links using multiple verification methods +""" + import requests import time -import concurrent.futures from urllib.parse import urlparse import socket import subprocess -import os + class IPTVLinkTester: - def __init__(self, input_file='iptv_links.txt', working_file='working_links.txt', broken_file='broken_links.txt'): + """Test IPTV/M3U links for availability using multiple methods""" + + # HTTP status codes that indicate a working link + SUCCESS_STATUS_CODES = {200, 206, 301, 302, 307, 308} + STREAMING_STATUS_CODES = {200, 206} + + def __init__(self, input_file='iptv_links.txt', + working_file='working_links.txt', + broken_file='broken_links.txt'): + """ + Initialize the IPTV link tester + + Args: + input_file: Path to file containing IPTV links (one per line) + working_file: Path to output file for working links + broken_file: Path to output file for broken links + """ self.input_file = input_file self.working_file = working_file self.broken_file = broken_file @@ -21,32 +41,77 @@ def __init__(self, input_file='iptv_links.txt', working_file='working_links.txt' } def test_http_head(self, url): - """Test using HTTP HEAD request""" + """ + Test using HTTP HEAD request + + Args: + url: The URL to test + + Returns: + bool: True if the link responds with a success status code + """ try: - response = requests.head(url, headers=self.headers, timeout=self.timeout, allow_redirects=True) - return response.status_code in [200, 206, 301, 302, 307, 308] - except: + response = requests.head( + url, + headers=self.headers, + timeout=self.timeout, + allow_redirects=True + ) + return response.status_code in self.SUCCESS_STATUS_CODES + except (requests.RequestException, requests.Timeout, + requests.ConnectionError) as e: return False def test_http_get_partial(self, url): - """Test using HTTP GET with range request""" + """ + Test using HTTP GET with range request for first 1KB + + Args: + url: The URL to test + + Returns: + bool: True if data can be retrieved + """ try: headers = self.headers.copy() headers['Range'] = 'bytes=0-1024' - response = requests.get(url, headers=headers, timeout=self.timeout, stream=True, allow_redirects=True) - if response.status_code in [200, 206]: + response = requests.get( + url, + headers=headers, + timeout=self.timeout, + stream=True, + allow_redirects=True + ) + + if response.status_code in self.STREAMING_STATUS_CODES: # Try to read a small chunk chunk = next(response.iter_content(1024), None) return chunk is not None and len(chunk) > 0 return False - except: + except (requests.RequestException, requests.Timeout, + requests.ConnectionError): return False def test_http_streaming(self, url): - """Test streaming capability""" + """ + Test streaming capability by reading multiple chunks + + Args: + url: The URL to test + + Returns: + bool: True if multiple chunks can be streamed + """ try: - response = requests.get(url, headers=self.headers, timeout=self.timeout, stream=True, allow_redirects=True) - if response.status_code in [200, 206]: + response = requests.get( + url, + headers=self.headers, + timeout=self.timeout, + stream=True, + allow_redirects=True + ) + + if response.status_code in self.STREAMING_STATUS_CODES: # Try to read multiple chunks chunk_count = 0 for chunk in response.iter_content(chunk_size=8192): @@ -56,47 +121,120 @@ def test_http_streaming(self, url): return True return chunk_count > 0 return False - except: + except (requests.RequestException, requests.Timeout, + requests.ConnectionError): return False def test_socket_connection(self, url): - """Test basic socket connection""" + """ + Test basic socket connection to the host + + Args: + url: The URL to test + + Returns: + bool: True if socket connection succeeds + """ try: parsed = urlparse(url) host = parsed.hostname port = parsed.port or (443 if parsed.scheme == 'https' else 80) - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.settimeout(self.timeout) - result = sock.connect_ex((host, port)) - sock.close() - return result == 0 - except: + if not host: + return False + + # Use context manager for proper resource cleanup + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.settimeout(self.timeout) + result = sock.connect_ex((host, port)) + return result == 0 + + except (socket.error, socket.timeout, OSError): return False def test_with_ffmpeg(self, url): - """Test using ffmpeg/ffprobe if available""" + """ + Test using ffmpeg/ffprobe if available + + Args: + url: The URL to test + + Returns: + bool: True if ffprobe can read the stream + """ try: - # Check if ffprobe is available result = subprocess.run( ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', url], capture_output=True, timeout=self.timeout, - text=True + text=True, + check=False ) return result.returncode == 0 - except (subprocess.TimeoutExpired, FileNotFoundError, Exception): + except subprocess.TimeoutExpired: + return False + except FileNotFoundError: + # ffprobe not installed + return False + except Exception as e: + # Catch any other unexpected errors + print(f" Unexpected error in ffprobe: {type(e).__name__}") return False + def _run_test_attempts(self, method_name, test_func, url): + """ + Run multiple test attempts for a single test method + + Args: + method_name: Name of the test method for display + test_func: The test function to call + url: The URL to test + + Returns: + list: List of boolean results for each attempt + """ + print(f"\n{method_name}:") + method_results = [] + + for attempt in range(self.attempts): + print(f" Attempt {attempt + 1}/{self.attempts}...", end=' ', flush=True) + try: + result = test_func(url) + method_results.append(result) + print('✓ PASS' if result else '✗ FAIL') + + # Delay between attempts to avoid rate limiting + time.sleep(2 if result else 3) + + except Exception as e: + print(f"✗ ERROR: {str(e)[:50]}") + method_results.append(False) + time.sleep(3) + + success_rate = (sum(method_results) / len(method_results)) * 100 + print(f" Success rate: {success_rate:.1f}%") + + return method_results + def test_link_comprehensive(self, url, link_number, total_links): - """Perform comprehensive testing on a single link""" - print(f"\n{'='*70}") + """ + Perform comprehensive testing on a single link + + Args: + url: The URL to test + link_number: Current link number being tested + total_links: Total number of links to test + + Returns: + tuple: (is_working, success_percentage) + """ + separator = '=' * 70 + print(f"\n{separator}") print(f"Testing link {link_number}/{total_links}") print(f"URL: {url[:80]}{'...' if len(url) > 80 else ''}") - print(f"{'='*70}") + print(separator) - results = [] test_methods = [ ("HTTP HEAD Request", self.test_http_head), ("HTTP GET Partial", self.test_http_get_partial), @@ -105,110 +243,106 @@ def test_link_comprehensive(self, url, link_number, total_links): ("FFmpeg Probe", self.test_with_ffmpeg) ] - # Perform multiple attempts for each test method + # Collect all test results + all_results = [] for method_name, test_func in test_methods: - print(f"\n{method_name}:") - method_results = [] - - for attempt in range(self.attempts): - print(f" Attempt {attempt + 1}/{self.attempts}...", end=' ', flush=True) - try: - result = test_func(url) - method_results.append(result) - print(f"{'✓ PASS' if result else '✗ FAIL'}") - - if result: - time.sleep(2) # Short delay on success - else: - time.sleep(3) # Slightly longer delay on failure - - except Exception as e: - print(f"✗ ERROR: {str(e)[:50]}") - method_results.append(False) - time.sleep(3) - - success_rate = sum(method_results) / len(method_results) * 100 - print(f" Success rate: {success_rate:.1f}%") - results.extend(method_results) + method_results = self._run_test_attempts(method_name, test_func, url) + all_results.extend(method_results) - # Determine if link is working - total_tests = len(results) - successful_tests = sum(results) + # Calculate overall statistics + total_tests = len(all_results) + successful_tests = sum(all_results) success_percentage = (successful_tests / total_tests) * 100 - print(f"\n{'─'*70}") - print(f"OVERALL RESULT: {successful_tests}/{total_tests} tests passed ({success_percentage:.1f}%)") + # Display results + print(f"\n{'─' * 70}") + print(f"OVERALL RESULT: {successful_tests}/{total_tests} tests passed " + f"({success_percentage:.1f}%)") # If ANY test passed even once, consider it potentially working is_working = successful_tests > 0 - if is_working: - print(f"Status: ✓ WORKING (at least {successful_tests} test(s) succeeded)") - else: - print(f"Status: ✗ BROKEN (all tests failed)") - - print(f"{'='*70}\n") + status = (f"✓ WORKING (at least {successful_tests} test(s) succeeded)" + if is_working else "✗ BROKEN (all tests failed)") + print(f"Status: {status}") + print(f"{separator}\n") return is_working, success_percentage def process_links(self): - """Process all links from input file""" - # Read all links + """Process all links from input file and categorize as working or broken""" + # Read all links from input file try: with open(self.input_file, 'r', encoding='utf-8') as f: links = [line.strip() for line in f if line.strip()] except FileNotFoundError: print(f"Error: {self.input_file} not found!") + print(f"Please create {self.input_file} with one IPTV link per line.") + return + except IOError as e: + print(f"Error reading {self.input_file}: {e}") return if not links: print("No links found in the input file!") return + # Display testing plan + estimated_time = (len(links) * self.attempts * 5 * 3) / 60 print(f"Found {len(links)} links to test") print(f"Each link will be tested {self.attempts} times per method") print(f"Timeout per request: {self.timeout} seconds") - print(f"This will take approximately {len(links) * self.attempts * 5 * 3 / 60:.1f} minutes\n") + print(f"Estimated time: {estimated_time:.1f} minutes\n") working_links = [] broken_links = [] # Test each link for idx, link in enumerate(links, 1): - is_working, success_rate = self.test_link_comprehensive(link, idx, len(links)) + is_working, success_rate = self.test_link_comprehensive( + link, idx, len(links) + ) if is_working: working_links.append(f"{link} # Success rate: {success_rate:.1f}%\n") else: broken_links.append(f"{link} # All tests failed\n") - # Small delay between links + # Delay between links to avoid rate limiting if idx < len(links): print("Waiting 5 seconds before next link...\n") time.sleep(5) - # Write results - with open(self.working_file, 'w', encoding='utf-8') as f: - f.writelines(working_links) - - with open(self.broken_file, 'w', encoding='utf-8') as f: - f.writelines(broken_links) + # Write results to files + try: + with open(self.working_file, 'w', encoding='utf-8') as f: + f.writelines(working_links) + + with open(self.broken_file, 'w', encoding='utf-8') as f: + f.writelines(broken_links) + except IOError as e: + print(f"Error writing output files: {e}") + return - # Summary - print("\n" + "="*70) + # Display summary + separator = '=' * 70 + print(f"\n{separator}") print("TESTING COMPLETE!") - print("="*70) + print(separator) print(f"Total links tested: {len(links)}") print(f"Working links: {len(working_links)} (saved to {self.working_file})") print(f"Broken links: {len(broken_links)} (saved to {self.broken_file})") - print("="*70) + print(separator) + def main(): + """Main entry point for the IPTV link tester""" print("IPTV Link Tester - Comprehensive Edition") - print("="*70) + print("=" * 70) tester = IPTVLinkTester() tester.process_links() + if __name__ == "__main__": main() From ff57359889096f26e4d087dfb2003d04ec86f906 Mon Sep 17 00:00:00 2001 From: Md Abu Salehin Date: Fri, 5 Dec 2025 09:49:16 +0600 Subject: [PATCH 06/11] Refactor exception handling in iptv_tester.py Refactor exception handling in various methods to improve clarity and maintainability. Added specific exception handling for requests and socket operations, while logging unexpected errors. --- M3U_link_scanner/iptv_tester.py | 134 ++++++++++++++++++++++++-------- 1 file changed, 100 insertions(+), 34 deletions(-) diff --git a/M3U_link_scanner/iptv_tester.py b/M3U_link_scanner/iptv_tester.py index 511ee6f480..7585dfc3aa 100644 --- a/M3U_link_scanner/iptv_tester.py +++ b/M3U_link_scanner/iptv_tester.py @@ -58,8 +58,14 @@ def test_http_head(self, url): allow_redirects=True ) return response.status_code in self.SUCCESS_STATUS_CODES - except (requests.RequestException, requests.Timeout, - requests.ConnectionError) as e: + except requests.Timeout: + return False + except requests.ConnectionError: + return False + except requests.RequestException: + return False + except Exception as e: + print(f" Unexpected error in HTTP HEAD: {type(e).__name__}") return False def test_http_get_partial(self, url): @@ -88,8 +94,14 @@ def test_http_get_partial(self, url): chunk = next(response.iter_content(1024), None) return chunk is not None and len(chunk) > 0 return False - except (requests.RequestException, requests.Timeout, - requests.ConnectionError): + except requests.Timeout: + return False + except requests.ConnectionError: + return False + except requests.RequestException: + return False + except Exception as e: + print(f" Unexpected error in HTTP GET Partial: {type(e).__name__}") return False def test_http_streaming(self, url): @@ -121,8 +133,14 @@ def test_http_streaming(self, url): return True return chunk_count > 0 return False - except (requests.RequestException, requests.Timeout, - requests.ConnectionError): + except requests.Timeout: + return False + except requests.ConnectionError: + return False + except requests.RequestException: + return False + except Exception as e: + print(f" Unexpected error in HTTP Streaming: {type(e).__name__}") return False def test_socket_connection(self, url): @@ -148,8 +166,17 @@ def test_socket_connection(self, url): sock.settimeout(self.timeout) result = sock.connect_ex((host, port)) return result == 0 - - except (socket.error, socket.timeout, OSError): + except socket.timeout: + return False + except socket.gaierror: + # DNS resolution failed + return False + except socket.error: + return False + except OSError: + return False + except Exception as e: + print(f" Unexpected error in Socket Connection: {type(e).__name__}") return False def test_with_ffmpeg(self, url): @@ -177,9 +204,11 @@ def test_with_ffmpeg(self, url): except FileNotFoundError: # ffprobe not installed return False + except PermissionError: + # No permission to execute ffprobe + return False except Exception as e: - # Catch any other unexpected errors - print(f" Unexpected error in ffprobe: {type(e).__name__}") + print(f" Unexpected error in FFmpeg: {type(e).__name__}") return False def _run_test_attempts(self, method_name, test_func, url): @@ -206,14 +235,19 @@ def _run_test_attempts(self, method_name, test_func, url): # Delay between attempts to avoid rate limiting time.sleep(2 if result else 3) - + except KeyboardInterrupt: + print("\n\nTesting interrupted by user") + raise except Exception as e: print(f"✗ ERROR: {str(e)[:50]}") method_results.append(False) time.sleep(3) - success_rate = (sum(method_results) / len(method_results)) * 100 - print(f" Success rate: {success_rate:.1f}%") + if method_results: + success_rate = (sum(method_results) / len(method_results)) * 100 + print(f" Success rate: {success_rate:.1f}%") + else: + print(" Success rate: 0.0%") return method_results @@ -252,7 +286,11 @@ def test_link_comprehensive(self, url, link_number, total_links): # Calculate overall statistics total_tests = len(all_results) successful_tests = sum(all_results) - success_percentage = (successful_tests / total_tests) * 100 + + if total_tests > 0: + success_percentage = (successful_tests / total_tests) * 100 + else: + success_percentage = 0.0 # Display results print(f"\n{'─' * 70}") @@ -262,8 +300,11 @@ def test_link_comprehensive(self, url, link_number, total_links): # If ANY test passed even once, consider it potentially working is_working = successful_tests > 0 - status = (f"✓ WORKING (at least {successful_tests} test(s) succeeded)" - if is_working else "✗ BROKEN (all tests failed)") + if is_working: + status = f"✓ WORKING (at least {successful_tests} test(s) succeeded)" + else: + status = "✗ BROKEN (all tests failed)" + print(f"Status: {status}") print(f"{separator}\n") @@ -279,6 +320,12 @@ def process_links(self): print(f"Error: {self.input_file} not found!") print(f"Please create {self.input_file} with one IPTV link per line.") return + except PermissionError: + print(f"Error: Permission denied reading {self.input_file}") + return + except UnicodeDecodeError: + print(f"Error: Unable to decode {self.input_file}. Please ensure it's UTF-8 encoded.") + return except IOError as e: print(f"Error reading {self.input_file}: {e}") return @@ -298,30 +345,43 @@ def process_links(self): broken_links = [] # Test each link - for idx, link in enumerate(links, 1): - is_working, success_rate = self.test_link_comprehensive( - link, idx, len(links) - ) - - if is_working: - working_links.append(f"{link} # Success rate: {success_rate:.1f}%\n") - else: - broken_links.append(f"{link} # All tests failed\n") - - # Delay between links to avoid rate limiting - if idx < len(links): - print("Waiting 5 seconds before next link...\n") - time.sleep(5) + try: + for idx, link in enumerate(links, 1): + is_working, success_rate = self.test_link_comprehensive( + link, idx, len(links) + ) + + if is_working: + working_links.append(f"{link} # Success rate: {success_rate:.1f}%\n") + else: + broken_links.append(f"{link} # All tests failed\n") + + # Delay between links to avoid rate limiting + if idx < len(links): + print("Waiting 5 seconds before next link...\n") + time.sleep(5) + except KeyboardInterrupt: + print("\n\nTesting interrupted by user. Saving partial results...") # Write results to files try: with open(self.working_file, 'w', encoding='utf-8') as f: f.writelines(working_links) - + except PermissionError: + print(f"Error: Permission denied writing to {self.working_file}") + return + except IOError as e: + print(f"Error writing to {self.working_file}: {e}") + return + + try: with open(self.broken_file, 'w', encoding='utf-8') as f: f.writelines(broken_links) + except PermissionError: + print(f"Error: Permission denied writing to {self.broken_file}") + return except IOError as e: - print(f"Error writing output files: {e}") + print(f"Error writing to {self.broken_file}: {e}") return # Display summary @@ -340,8 +400,14 @@ def main(): print("IPTV Link Tester - Comprehensive Edition") print("=" * 70) - tester = IPTVLinkTester() - tester.process_links() + try: + tester = IPTVLinkTester() + tester.process_links() + except KeyboardInterrupt: + print("\n\nProgram terminated by user") + except Exception as e: + print(f"\n\nUnexpected error: {type(e).__name__}: {e}") + print("Please report this issue with the full error message") if __name__ == "__main__": From 3c84549d50f12194566368a080093141aba9067e Mon Sep 17 00:00:00 2001 From: Md Abu Salehin Date: Fri, 5 Dec 2025 10:04:00 +0600 Subject: [PATCH 07/11] Refactor IPTV link tester for clarity and structure Refactor IPTV link tester for improved readability and structure. Added helper methods for reading and writing files, and enhanced error handling. --- M3U_link_scanner/iptv_tester.py | 377 +++++++++++++++++++++----------- 1 file changed, 246 insertions(+), 131 deletions(-) diff --git a/M3U_link_scanner/iptv_tester.py b/M3U_link_scanner/iptv_tester.py index 7585dfc3aa..73cc08e8f5 100644 --- a/M3U_link_scanner/iptv_tester.py +++ b/M3U_link_scanner/iptv_tester.py @@ -3,6 +3,7 @@ Tests M3U/IPTV links using multiple verification methods """ +import shutil import requests import time from urllib.parse import urlparse @@ -12,17 +13,20 @@ class IPTVLinkTester: """Test IPTV/M3U links for availability using multiple methods""" - + # HTTP status codes that indicate a working link SUCCESS_STATUS_CODES = {200, 206, 301, 302, 307, 308} STREAMING_STATUS_CODES = {200, 206} - - def __init__(self, input_file='iptv_links.txt', - working_file='working_links.txt', - broken_file='broken_links.txt'): + + def __init__( + self, + input_file='iptv_links.txt', + working_file='working_links.txt', + broken_file='broken_links.txt' + ): """ Initialize the IPTV link tester - + Args: input_file: Path to file containing IPTV links (one per line) working_file: Path to output file for working links @@ -34,7 +38,10 @@ def __init__(self, input_file='iptv_links.txt', self.timeout = 20 self.attempts = 5 self.headers = { - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', + 'User-Agent': ( + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' + 'AppleWebKit/537.36' + ), 'Accept': '*/*', 'Accept-Encoding': 'identity;q=1, *;q=0', 'Range': 'bytes=0-', @@ -43,18 +50,18 @@ def __init__(self, input_file='iptv_links.txt', def test_http_head(self, url): """ Test using HTTP HEAD request - + Args: url: The URL to test - + Returns: bool: True if the link responds with a success status code """ try: response = requests.head( - url, - headers=self.headers, - timeout=self.timeout, + url, + headers=self.headers, + timeout=self.timeout, allow_redirects=True ) return response.status_code in self.SUCCESS_STATUS_CODES @@ -65,16 +72,17 @@ def test_http_head(self, url): except requests.RequestException: return False except Exception as e: - print(f" Unexpected error in HTTP HEAD: {type(e).__name__}") + print(f" Unexpected error in HTTP HEAD: " + f"{type(e).__name__}") return False def test_http_get_partial(self, url): """ Test using HTTP GET with range request for first 1KB - + Args: url: The URL to test - + Returns: bool: True if data can be retrieved """ @@ -82,13 +90,13 @@ def test_http_get_partial(self, url): headers = self.headers.copy() headers['Range'] = 'bytes=0-1024' response = requests.get( - url, - headers=headers, - timeout=self.timeout, - stream=True, + url, + headers=headers, + timeout=self.timeout, + stream=True, allow_redirects=True ) - + if response.status_code in self.STREAMING_STATUS_CODES: # Try to read a small chunk chunk = next(response.iter_content(1024), None) @@ -101,35 +109,37 @@ def test_http_get_partial(self, url): except requests.RequestException: return False except Exception as e: - print(f" Unexpected error in HTTP GET Partial: {type(e).__name__}") + print(f" Unexpected error in HTTP GET Partial: " + f"{type(e).__name__}") return False def test_http_streaming(self, url): """ Test streaming capability by reading multiple chunks - + Args: url: The URL to test - + Returns: bool: True if multiple chunks can be streamed """ try: response = requests.get( - url, - headers=self.headers, - timeout=self.timeout, - stream=True, + url, + headers=self.headers, + timeout=self.timeout, + stream=True, allow_redirects=True ) - + if response.status_code in self.STREAMING_STATUS_CODES: # Try to read multiple chunks chunk_count = 0 for chunk in response.iter_content(chunk_size=8192): if chunk: chunk_count += 1 - if chunk_count >= 3: # Successfully read 3 chunks + if chunk_count >= 3: + # Successfully read 3 chunks return True return chunk_count > 0 return False @@ -140,29 +150,35 @@ def test_http_streaming(self, url): except requests.RequestException: return False except Exception as e: - print(f" Unexpected error in HTTP Streaming: {type(e).__name__}") + print(f" Unexpected error in HTTP Streaming: " + f"{type(e).__name__}") return False def test_socket_connection(self, url): """ Test basic socket connection to the host - + Args: url: The URL to test - + Returns: bool: True if socket connection succeeds """ try: parsed = urlparse(url) host = parsed.hostname - port = parsed.port or (443 if parsed.scheme == 'https' else 80) - + port = parsed.port or ( + 443 if parsed.scheme == 'https' else 80 + ) + if not host: return False - + # Use context manager for proper resource cleanup - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + with socket.socket( + socket.AF_INET, + socket.SOCK_STREAM + ) as sock: sock.settimeout(self.timeout) result = sock.connect_ex((host, port)) return result == 0 @@ -173,26 +189,35 @@ def test_socket_connection(self, url): return False except socket.error: return False - except OSError: - return False except Exception as e: - print(f" Unexpected error in Socket Connection: {type(e).__name__}") + print(f" Unexpected error in Socket Connection: " + f"{type(e).__name__}") return False def test_with_ffmpeg(self, url): """ Test using ffmpeg/ffprobe if available - + Args: url: The URL to test - + Returns: bool: True if ffprobe can read the stream """ + # Check if ffprobe is available in PATH + ffprobe_path = shutil.which('ffprobe') + if not ffprobe_path: + return False + try: result = subprocess.run( - ['ffprobe', '-v', 'error', '-show_entries', - 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', url], + [ + ffprobe_path, + '-v', 'error', + '-show_entries', 'format=duration', + '-of', 'default=noprint_wrappers=1:nokey=1', + url + ], capture_output=True, timeout=self.timeout, text=True, @@ -208,31 +233,34 @@ def test_with_ffmpeg(self, url): # No permission to execute ffprobe return False except Exception as e: - print(f" Unexpected error in FFmpeg: {type(e).__name__}") + print(f" Unexpected error in FFmpeg: " + f"{type(e).__name__}") return False def _run_test_attempts(self, method_name, test_func, url): """ Run multiple test attempts for a single test method - + Args: method_name: Name of the test method for display test_func: The test function to call url: The URL to test - + Returns: list: List of boolean results for each attempt """ print(f"\n{method_name}:") method_results = [] - + for attempt in range(self.attempts): - print(f" Attempt {attempt + 1}/{self.attempts}...", end=' ', flush=True) + attempt_num = attempt + 1 + print(f" Attempt {attempt_num}/{self.attempts}...", + end=' ', flush=True) try: result = test_func(url) method_results.append(result) print('✓ PASS' if result else '✗ FAIL') - + # Delay between attempts to avoid rate limiting time.sleep(2 if result else 3) except KeyboardInterrupt: @@ -242,33 +270,67 @@ def _run_test_attempts(self, method_name, test_func, url): print(f"✗ ERROR: {str(e)[:50]}") method_results.append(False) time.sleep(3) - + if method_results: - success_rate = (sum(method_results) / len(method_results)) * 100 + success_count = sum(method_results) + success_rate = (success_count / len(method_results)) * 100 print(f" Success rate: {success_rate:.1f}%") else: print(" Success rate: 0.0%") - + return method_results + def _display_test_header(self, url, link_number, total_links): + """Display test header information""" + separator = '=' * 70 + print(f"\n{separator}") + print(f"Testing link {link_number}/{total_links}") + truncated_url = url[:80] + if len(url) > 80: + truncated_url += '...' + print(f"URL: {truncated_url}") + print(separator) + + def _display_test_results( + self, + successful_tests, + total_tests, + success_percentage + ): + """Display test results summary""" + separator = '─' * 70 + print(f"\n{separator}") + print(f"OVERALL RESULT: {successful_tests}/{total_tests} " + f"tests passed ({success_percentage:.1f}%)") + + # If ANY test passed even once, consider it potentially working + is_working = successful_tests > 0 + + if is_working: + status = (f"✓ WORKING (at least {successful_tests} " + f"test(s) succeeded)") + else: + status = "✗ BROKEN (all tests failed)" + + print(f"Status: {status}") + print(f"{'=' * 70}\n") + + return is_working + def test_link_comprehensive(self, url, link_number, total_links): """ Perform comprehensive testing on a single link - + Args: url: The URL to test link_number: Current link number being tested total_links: Total number of links to test - + Returns: tuple: (is_working, success_percentage) """ - separator = '=' * 70 - print(f"\n{separator}") - print(f"Testing link {link_number}/{total_links}") - print(f"URL: {url[:80]}{'...' if len(url) > 80 else ''}") - print(separator) - + self._display_test_header(url, link_number, total_links) + test_methods = [ ("HTTP HEAD Request", self.test_http_head), ("HTTP GET Partial", self.test_http_get_partial), @@ -276,130 +338,183 @@ def test_link_comprehensive(self, url, link_number, total_links): ("Socket Connection", self.test_socket_connection), ("FFmpeg Probe", self.test_with_ffmpeg) ] - + # Collect all test results all_results = [] for method_name, test_func in test_methods: - method_results = self._run_test_attempts(method_name, test_func, url) + method_results = self._run_test_attempts( + method_name, + test_func, + url + ) all_results.extend(method_results) - + # Calculate overall statistics total_tests = len(all_results) successful_tests = sum(all_results) - + if total_tests > 0: success_percentage = (successful_tests / total_tests) * 100 else: success_percentage = 0.0 - + # Display results - print(f"\n{'─' * 70}") - print(f"OVERALL RESULT: {successful_tests}/{total_tests} tests passed " - f"({success_percentage:.1f}%)") - - # If ANY test passed even once, consider it potentially working - is_working = successful_tests > 0 - - if is_working: - status = f"✓ WORKING (at least {successful_tests} test(s) succeeded)" - else: - status = "✗ BROKEN (all tests failed)" - - print(f"Status: {status}") - print(f"{separator}\n") - + is_working = self._display_test_results( + successful_tests, + total_tests, + success_percentage + ) + return is_working, success_percentage - def process_links(self): - """Process all links from input file and categorize as working or broken""" - # Read all links from input file + def _read_links_from_file(self): + """ + Read links from input file + + Returns: + list: List of links, or None if error occurred + """ try: with open(self.input_file, 'r', encoding='utf-8') as f: links = [line.strip() for line in f if line.strip()] + return links except FileNotFoundError: print(f"Error: {self.input_file} not found!") - print(f"Please create {self.input_file} with one IPTV link per line.") - return + print(f"Please create {self.input_file} with one IPTV " + f"link per line.") + return None except PermissionError: - print(f"Error: Permission denied reading {self.input_file}") - return + print(f"Error: Permission denied reading " + f"{self.input_file}") + return None except UnicodeDecodeError: - print(f"Error: Unable to decode {self.input_file}. Please ensure it's UTF-8 encoded.") - return + print(f"Error: Unable to decode {self.input_file}. " + f"Please ensure it's UTF-8 encoded.") + return None except IOError as e: print(f"Error reading {self.input_file}: {e}") + return None + + def _write_results_to_file(self, filename, links): + """ + Write results to output file + + Args: + filename: Output filename + links: List of formatted link strings to write + + Returns: + bool: True if successful, False otherwise + """ + try: + with open(filename, 'w', encoding='utf-8') as f: + f.writelines(links) + return True + except PermissionError: + print(f"Error: Permission denied writing to {filename}") + return False + except IOError as e: + print(f"Error writing to {filename}: {e}") + return False + + def _display_testing_plan(self, link_count): + """Display the testing plan information""" + estimated_time = (link_count * self.attempts * 5 * 3) / 60 + print(f"Found {link_count} links to test") + print(f"Each link will be tested {self.attempts} times " + f"per method") + print(f"Timeout per request: {self.timeout} seconds") + print(f"Estimated time: {estimated_time:.1f} minutes\n") + + def _display_final_summary( + self, + total_links, + working_count, + broken_count + ): + """Display final testing summary""" + separator = '=' * 70 + print(f"\n{separator}") + print("TESTING COMPLETE!") + print(separator) + print(f"Total links tested: {total_links}") + print(f"Working links: {working_count} " + f"(saved to {self.working_file})") + print(f"Broken links: {broken_count} " + f"(saved to {self.broken_file})") + print(separator) + + def process_links(self): + """ + Process all links from input file and categorize as + working or broken + """ + # Read all links from input file + links = self._read_links_from_file() + if links is None: return - + if not links: print("No links found in the input file!") return - + # Display testing plan - estimated_time = (len(links) * self.attempts * 5 * 3) / 60 - print(f"Found {len(links)} links to test") - print(f"Each link will be tested {self.attempts} times per method") - print(f"Timeout per request: {self.timeout} seconds") - print(f"Estimated time: {estimated_time:.1f} minutes\n") - + self._display_testing_plan(len(links)) + working_links = [] broken_links = [] - + # Test each link try: for idx, link in enumerate(links, 1): is_working, success_rate = self.test_link_comprehensive( link, idx, len(links) ) - + + result_line = ( + f"{link} # Success rate: {success_rate:.1f}%\n" + ) if is_working: - working_links.append(f"{link} # Success rate: {success_rate:.1f}%\n") + working_links.append(result_line) else: - broken_links.append(f"{link} # All tests failed\n") - + broken_links.append( + f"{link} # All tests failed\n" + ) + # Delay between links to avoid rate limiting if idx < len(links): print("Waiting 5 seconds before next link...\n") time.sleep(5) except KeyboardInterrupt: - print("\n\nTesting interrupted by user. Saving partial results...") - + print("\n\nTesting interrupted by user. " + "Saving partial results...") + # Write results to files - try: - with open(self.working_file, 'w', encoding='utf-8') as f: - f.writelines(working_links) - except PermissionError: - print(f"Error: Permission denied writing to {self.working_file}") - return - except IOError as e: - print(f"Error writing to {self.working_file}: {e}") - return - - try: - with open(self.broken_file, 'w', encoding='utf-8') as f: - f.writelines(broken_links) - except PermissionError: - print(f"Error: Permission denied writing to {self.broken_file}") + if not self._write_results_to_file( + self.working_file, + working_links + ): return - except IOError as e: - print(f"Error writing to {self.broken_file}: {e}") + + if not self._write_results_to_file( + self.broken_file, + broken_links + ): return - + # Display summary - separator = '=' * 70 - print(f"\n{separator}") - print("TESTING COMPLETE!") - print(separator) - print(f"Total links tested: {len(links)}") - print(f"Working links: {len(working_links)} (saved to {self.working_file})") - print(f"Broken links: {len(broken_links)} (saved to {self.broken_file})") - print(separator) + self._display_final_summary( + len(links), + len(working_links), + len(broken_links) + ) def main(): """Main entry point for the IPTV link tester""" print("IPTV Link Tester - Comprehensive Edition") print("=" * 70) - + try: tester = IPTVLinkTester() tester.process_links() From d4126aa9b72f7a7dfa6996bd41cc819de6b40c3e Mon Sep 17 00:00:00 2001 From: Md Abu Salehin Date: Fri, 5 Dec 2025 10:09:30 +0600 Subject: [PATCH 08/11] Change _display_test_header to a static method --- M3U_link_scanner/iptv_tester.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/M3U_link_scanner/iptv_tester.py b/M3U_link_scanner/iptv_tester.py index 73cc08e8f5..df9bee954c 100644 --- a/M3U_link_scanner/iptv_tester.py +++ b/M3U_link_scanner/iptv_tester.py @@ -280,7 +280,8 @@ def _run_test_attempts(self, method_name, test_func, url): return method_results - def _display_test_header(self, url, link_number, total_links): + @staticmethod + def _display_test_header(url, link_number, total_links): """Display test header information""" separator = '=' * 70 print(f"\n{separator}") From 5013d7a018be8233897299e735bf0afa68175a3c Mon Sep 17 00:00:00 2001 From: Md Abu Salehin Date: Fri, 5 Dec 2025 10:16:06 +0600 Subject: [PATCH 09/11] Update iptv_tester.py --- M3U_link_scanner/iptv_tester.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/M3U_link_scanner/iptv_tester.py b/M3U_link_scanner/iptv_tester.py index df9bee954c..81e8de5348 100644 --- a/M3U_link_scanner/iptv_tester.py +++ b/M3U_link_scanner/iptv_tester.py @@ -291,7 +291,8 @@ def _display_test_header(url, link_number, total_links): truncated_url += '...' print(f"URL: {truncated_url}") print(separator) - + + @staticmethod def _display_test_results( self, successful_tests, @@ -367,7 +368,8 @@ def test_link_comprehensive(self, url, link_number, total_links): ) return is_working, success_percentage - + + @staticmethod def _read_links_from_file(self): """ Read links from input file From 371fed29269199b8fa5933b7bbdcea576bc7dc2c Mon Sep 17 00:00:00 2001 From: Md Abu Salehin Date: Fri, 5 Dec 2025 10:25:54 +0600 Subject: [PATCH 10/11] Refactor methods to static in iptv_tester.py --- M3U_link_scanner/iptv_tester.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/M3U_link_scanner/iptv_tester.py b/M3U_link_scanner/iptv_tester.py index 81e8de5348..8580c45252 100644 --- a/M3U_link_scanner/iptv_tester.py +++ b/M3U_link_scanner/iptv_tester.py @@ -291,10 +291,9 @@ def _display_test_header(url, link_number, total_links): truncated_url += '...' print(f"URL: {truncated_url}") print(separator) - + @staticmethod def _display_test_results( - self, successful_tests, total_tests, success_percentage @@ -368,8 +367,7 @@ def test_link_comprehensive(self, url, link_number, total_links): ) return is_working, success_percentage - - @staticmethod + def _read_links_from_file(self): """ Read links from input file @@ -398,7 +396,8 @@ def _read_links_from_file(self): print(f"Error reading {self.input_file}: {e}") return None - def _write_results_to_file(self, filename, links): + @staticmethod + def _write_results_to_file(filename, links): """ Write results to output file From 248edd4f6a0b1e84127f09913614b80387880c47 Mon Sep 17 00:00:00 2001 From: Md Abu Salehin Date: Fri, 5 Dec 2025 10:32:08 +0600 Subject: [PATCH 11/11] Refactor IPTVLinkTester method calls for clarity --- M3U_link_scanner/iptv_tester.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/M3U_link_scanner/iptv_tester.py b/M3U_link_scanner/iptv_tester.py index 8580c45252..22e6250aeb 100644 --- a/M3U_link_scanner/iptv_tester.py +++ b/M3U_link_scanner/iptv_tester.py @@ -309,7 +309,7 @@ def _display_test_results( if is_working: status = (f"✓ WORKING (at least {successful_tests} " - f"test(s) succeeded)") + f"test(s) succeeded)") else: status = "✗ BROKEN (all tests failed)" @@ -330,7 +330,11 @@ def test_link_comprehensive(self, url, link_number, total_links): Returns: tuple: (is_working, success_percentage) """ - self._display_test_header(url, link_number, total_links) + IPTVLinkTester._display_test_header( + url, + link_number, + total_links + ) test_methods = [ ("HTTP HEAD Request", self.test_http_head), @@ -360,7 +364,7 @@ def test_link_comprehensive(self, url, link_number, total_links): success_percentage = 0.0 # Display results - is_working = self._display_test_results( + is_working = IPTVLinkTester._display_test_results( successful_tests, total_tests, success_percentage @@ -492,13 +496,13 @@ def process_links(self): "Saving partial results...") # Write results to files - if not self._write_results_to_file( + if not IPTVLinkTester._write_results_to_file( self.working_file, working_links ): return - if not self._write_results_to_file( + if not IPTVLinkTester._write_results_to_file( self.broken_file, broken_links ):