diff --git a/M3U_link_scanner/README.md b/M3U_link_scanner/README.md new file mode 100644 index 0000000000..b0d5fc5796 --- /dev/null +++ b/M3U_link_scanner/README.md @@ -0,0 +1,147 @@ +# IPTV Link Tester - Comprehensive Edition + +A robust Python tool for testing and validating IPTV M3U stream links using multiple verification methods. + +## Features + +- **Multiple Testing Methods**: Uses 5 different approaches to verify stream availability + - HTTP HEAD requests + - HTTP GET with partial content + - HTTP streaming test + - Socket connection verification + - FFmpeg/FFprobe analysis (optional) + +- **Retry Logic**: Tests each link 5 times per method for reliability +- **Detailed Reporting**: Shows real-time progress and success rates +- **Automatic Sorting**: Separates working and broken links into different files +- **Success Rate Tracking**: Records the percentage of successful tests for each link + +## Requirements + +- Python 3.6 or higher +- FFmpeg/FFprobe (optional, but recommended for better accuracy) + +## Installation + +1. Clone or download this repository + +2. Install required Python packages: +```bash +pip install -r requirements.txt +``` + +3. (Optional) Install FFmpeg for enhanced testing: + - **Windows**: Download from [ffmpeg.org](https://ffmpeg.org/download.html) + - **macOS**: `brew install ffmpeg` + - **Linux**: `sudo apt-get install ffmpeg` or `sudo yum install ffmpeg` + +## Usage + +1. Create a file named `iptv_links.txt` in the same directory as the script + +2. Add your IPTV links (one per line): +``` +http://example.com:8080/live/stream1/index.m3u8 +http://example.com:8080/live/stream2/index.m3u8 +http://example.com:8080/live/stream3/index.m3u8 +``` + +3. Run the script: +```bash +python iptv_tester.py +``` + +4. Wait for the testing to complete. The script will: + - Test each link thoroughly with multiple methods + - Show real-time progress and results + - Save working links to `working_links.txt` + - Save broken links to `broken_links.txt` + +## Configuration + +You can customize the testing parameters by modifying the `IPTVLinkTester` class initialization: + +```python +tester = IPTVLinkTester( + input_file='iptv_links.txt', # Input file with links + working_file='working_links.txt', # Output file for working links + broken_file='broken_links.txt' # Output file for broken links +) + +# Modify these attributes: +tester.timeout = 20 # Timeout per request (seconds) +tester.attempts = 5 # Number of attempts per test method +``` + +## Output Files + +### working_links.txt +Contains all links that passed at least one test, along with their success rate: +``` +http://example.com/stream1.m3u8 # Success rate: 85.0% +http://example.com/stream2.m3u8 # Success rate: 92.0% +``` + +### broken_links.txt +Contains links that failed all tests: +``` +http://example.com/dead_stream.m3u8 # All tests failed +``` + +## Testing Process + +For each link, the script performs: +- 5 attempts × 5 test methods = 25 total tests per link +- Automatic delays between attempts to avoid rate limiting +- A link is marked as "working" if ANY test succeeds + +### Test Methods Explained + +1. **HTTP HEAD Request**: Quick check if the server responds +2. **HTTP GET Partial**: Downloads first 1KB to verify data availability +3. **HTTP Streaming**: Attempts to stream multiple chunks of data +4. **Socket Connection**: Verifies basic network connectivity +5. **FFmpeg Probe**: Uses FFmpeg to analyze stream metadata (if available) + +## Estimated Time + +- Testing time depends on: + - Number of links + - Network speed + - Server response times + +- Approximate calculation: `(Number of links × 5 methods × 5 attempts × 3 seconds) / 60 minutes` +- Example: 10 links ≈ 12-15 minutes + +## Troubleshooting + +### All links showing as broken +- Check your internet connection +- Verify the links are valid and currently active +- Some IPTV providers may block automated testing +- Try reducing the number of attempts or increasing timeout + +### Script running very slowly +- This is normal due to thorough testing +- You can reduce `attempts` or `timeout` for faster results (less accurate) +- Some servers may have rate limiting + +### FFmpeg tests always fail +- FFmpeg is not installed or not in PATH +- This is optional; other methods can still validate links + +## Notes + +- Some IPTV providers implement anti-scraping measures +- Links may work for real players but fail automated tests +- False positives/negatives are possible +- Always respect the terms of service of IPTV providers +- This tool is for personal use and legitimate testing only + +## License + +Free to use for personal and educational purposes. + +## Disclaimer + +This tool is intended for testing your own IPTV subscriptions and legally obtained streams. Users are responsible for ensuring they have the right to access and test the streams they provide to this tool. diff --git a/M3U_link_scanner/iptv_tester.py b/M3U_link_scanner/iptv_tester.py new file mode 100644 index 0000000000..22e6250aeb --- /dev/null +++ b/M3U_link_scanner/iptv_tester.py @@ -0,0 +1,535 @@ +""" +IPTV Link Tester - Comprehensive Edition +Tests M3U/IPTV links using multiple verification methods +""" + +import shutil +import requests +import time +from urllib.parse import urlparse +import socket +import subprocess + + +class IPTVLinkTester: + """Test IPTV/M3U links for availability using multiple methods""" + + # HTTP status codes that indicate a working link + SUCCESS_STATUS_CODES = {200, 206, 301, 302, 307, 308} + STREAMING_STATUS_CODES = {200, 206} + + def __init__( + self, + input_file='iptv_links.txt', + working_file='working_links.txt', + broken_file='broken_links.txt' + ): + """ + Initialize the IPTV link tester + + Args: + input_file: Path to file containing IPTV links (one per line) + working_file: Path to output file for working links + broken_file: Path to output file for broken links + """ + self.input_file = input_file + self.working_file = working_file + self.broken_file = broken_file + self.timeout = 20 + self.attempts = 5 + self.headers = { + 'User-Agent': ( + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' + 'AppleWebKit/537.36' + ), + 'Accept': '*/*', + 'Accept-Encoding': 'identity;q=1, *;q=0', + 'Range': 'bytes=0-', + } + + def test_http_head(self, url): + """ + Test using HTTP HEAD request + + Args: + url: The URL to test + + Returns: + bool: True if the link responds with a success status code + """ + try: + response = requests.head( + url, + headers=self.headers, + timeout=self.timeout, + allow_redirects=True + ) + return response.status_code in self.SUCCESS_STATUS_CODES + except requests.Timeout: + return False + except requests.ConnectionError: + return False + except requests.RequestException: + return False + except Exception as e: + print(f" Unexpected error in HTTP HEAD: " + f"{type(e).__name__}") + return False + + def test_http_get_partial(self, url): + """ + Test using HTTP GET with range request for first 1KB + + Args: + url: The URL to test + + Returns: + bool: True if data can be retrieved + """ + try: + headers = self.headers.copy() + headers['Range'] = 'bytes=0-1024' + response = requests.get( + url, + headers=headers, + timeout=self.timeout, + stream=True, + allow_redirects=True + ) + + if response.status_code in self.STREAMING_STATUS_CODES: + # Try to read a small chunk + chunk = next(response.iter_content(1024), None) + return chunk is not None and len(chunk) > 0 + return False + except requests.Timeout: + return False + except requests.ConnectionError: + return False + except requests.RequestException: + return False + except Exception as e: + print(f" Unexpected error in HTTP GET Partial: " + f"{type(e).__name__}") + return False + + def test_http_streaming(self, url): + """ + Test streaming capability by reading multiple chunks + + Args: + url: The URL to test + + Returns: + bool: True if multiple chunks can be streamed + """ + try: + response = requests.get( + url, + headers=self.headers, + timeout=self.timeout, + stream=True, + allow_redirects=True + ) + + if response.status_code in self.STREAMING_STATUS_CODES: + # Try to read multiple chunks + chunk_count = 0 + for chunk in response.iter_content(chunk_size=8192): + if chunk: + chunk_count += 1 + if chunk_count >= 3: + # Successfully read 3 chunks + return True + return chunk_count > 0 + return False + except requests.Timeout: + return False + except requests.ConnectionError: + return False + except requests.RequestException: + return False + except Exception as e: + print(f" Unexpected error in HTTP Streaming: " + f"{type(e).__name__}") + return False + + def test_socket_connection(self, url): + """ + Test basic socket connection to the host + + Args: + url: The URL to test + + Returns: + bool: True if socket connection succeeds + """ + try: + parsed = urlparse(url) + host = parsed.hostname + port = parsed.port or ( + 443 if parsed.scheme == 'https' else 80 + ) + + if not host: + return False + + # Use context manager for proper resource cleanup + with socket.socket( + socket.AF_INET, + socket.SOCK_STREAM + ) as sock: + sock.settimeout(self.timeout) + result = sock.connect_ex((host, port)) + return result == 0 + except socket.timeout: + return False + except socket.gaierror: + # DNS resolution failed + return False + except socket.error: + return False + except Exception as e: + print(f" Unexpected error in Socket Connection: " + f"{type(e).__name__}") + return False + + def test_with_ffmpeg(self, url): + """ + Test using ffmpeg/ffprobe if available + + Args: + url: The URL to test + + Returns: + bool: True if ffprobe can read the stream + """ + # Check if ffprobe is available in PATH + ffprobe_path = shutil.which('ffprobe') + if not ffprobe_path: + return False + + try: + result = subprocess.run( + [ + ffprobe_path, + '-v', 'error', + '-show_entries', 'format=duration', + '-of', 'default=noprint_wrappers=1:nokey=1', + url + ], + capture_output=True, + timeout=self.timeout, + text=True, + check=False + ) + return result.returncode == 0 + except subprocess.TimeoutExpired: + return False + except FileNotFoundError: + # ffprobe not installed + return False + except PermissionError: + # No permission to execute ffprobe + return False + except Exception as e: + print(f" Unexpected error in FFmpeg: " + f"{type(e).__name__}") + return False + + def _run_test_attempts(self, method_name, test_func, url): + """ + Run multiple test attempts for a single test method + + Args: + method_name: Name of the test method for display + test_func: The test function to call + url: The URL to test + + Returns: + list: List of boolean results for each attempt + """ + print(f"\n{method_name}:") + method_results = [] + + for attempt in range(self.attempts): + attempt_num = attempt + 1 + print(f" Attempt {attempt_num}/{self.attempts}...", + end=' ', flush=True) + try: + result = test_func(url) + method_results.append(result) + print('✓ PASS' if result else '✗ FAIL') + + # Delay between attempts to avoid rate limiting + time.sleep(2 if result else 3) + except KeyboardInterrupt: + print("\n\nTesting interrupted by user") + raise + except Exception as e: + print(f"✗ ERROR: {str(e)[:50]}") + method_results.append(False) + time.sleep(3) + + if method_results: + success_count = sum(method_results) + success_rate = (success_count / len(method_results)) * 100 + print(f" Success rate: {success_rate:.1f}%") + else: + print(" Success rate: 0.0%") + + return method_results + + @staticmethod + def _display_test_header(url, link_number, total_links): + """Display test header information""" + separator = '=' * 70 + print(f"\n{separator}") + print(f"Testing link {link_number}/{total_links}") + truncated_url = url[:80] + if len(url) > 80: + truncated_url += '...' + print(f"URL: {truncated_url}") + print(separator) + + @staticmethod + def _display_test_results( + successful_tests, + total_tests, + success_percentage + ): + """Display test results summary""" + separator = '─' * 70 + print(f"\n{separator}") + print(f"OVERALL RESULT: {successful_tests}/{total_tests} " + f"tests passed ({success_percentage:.1f}%)") + + # If ANY test passed even once, consider it potentially working + is_working = successful_tests > 0 + + if is_working: + status = (f"✓ WORKING (at least {successful_tests} " + f"test(s) succeeded)") + else: + status = "✗ BROKEN (all tests failed)" + + print(f"Status: {status}") + print(f"{'=' * 70}\n") + + return is_working + + def test_link_comprehensive(self, url, link_number, total_links): + """ + Perform comprehensive testing on a single link + + Args: + url: The URL to test + link_number: Current link number being tested + total_links: Total number of links to test + + Returns: + tuple: (is_working, success_percentage) + """ + IPTVLinkTester._display_test_header( + url, + link_number, + total_links + ) + + test_methods = [ + ("HTTP HEAD Request", self.test_http_head), + ("HTTP GET Partial", self.test_http_get_partial), + ("HTTP Streaming", self.test_http_streaming), + ("Socket Connection", self.test_socket_connection), + ("FFmpeg Probe", self.test_with_ffmpeg) + ] + + # Collect all test results + all_results = [] + for method_name, test_func in test_methods: + method_results = self._run_test_attempts( + method_name, + test_func, + url + ) + all_results.extend(method_results) + + # Calculate overall statistics + total_tests = len(all_results) + successful_tests = sum(all_results) + + if total_tests > 0: + success_percentage = (successful_tests / total_tests) * 100 + else: + success_percentage = 0.0 + + # Display results + is_working = IPTVLinkTester._display_test_results( + successful_tests, + total_tests, + success_percentage + ) + + return is_working, success_percentage + + def _read_links_from_file(self): + """ + Read links from input file + + Returns: + list: List of links, or None if error occurred + """ + try: + with open(self.input_file, 'r', encoding='utf-8') as f: + links = [line.strip() for line in f if line.strip()] + return links + except FileNotFoundError: + print(f"Error: {self.input_file} not found!") + print(f"Please create {self.input_file} with one IPTV " + f"link per line.") + return None + except PermissionError: + print(f"Error: Permission denied reading " + f"{self.input_file}") + return None + except UnicodeDecodeError: + print(f"Error: Unable to decode {self.input_file}. " + f"Please ensure it's UTF-8 encoded.") + return None + except IOError as e: + print(f"Error reading {self.input_file}: {e}") + return None + + @staticmethod + def _write_results_to_file(filename, links): + """ + Write results to output file + + Args: + filename: Output filename + links: List of formatted link strings to write + + Returns: + bool: True if successful, False otherwise + """ + try: + with open(filename, 'w', encoding='utf-8') as f: + f.writelines(links) + return True + except PermissionError: + print(f"Error: Permission denied writing to {filename}") + return False + except IOError as e: + print(f"Error writing to {filename}: {e}") + return False + + def _display_testing_plan(self, link_count): + """Display the testing plan information""" + estimated_time = (link_count * self.attempts * 5 * 3) / 60 + print(f"Found {link_count} links to test") + print(f"Each link will be tested {self.attempts} times " + f"per method") + print(f"Timeout per request: {self.timeout} seconds") + print(f"Estimated time: {estimated_time:.1f} minutes\n") + + def _display_final_summary( + self, + total_links, + working_count, + broken_count + ): + """Display final testing summary""" + separator = '=' * 70 + print(f"\n{separator}") + print("TESTING COMPLETE!") + print(separator) + print(f"Total links tested: {total_links}") + print(f"Working links: {working_count} " + f"(saved to {self.working_file})") + print(f"Broken links: {broken_count} " + f"(saved to {self.broken_file})") + print(separator) + + def process_links(self): + """ + Process all links from input file and categorize as + working or broken + """ + # Read all links from input file + links = self._read_links_from_file() + if links is None: + return + + if not links: + print("No links found in the input file!") + return + + # Display testing plan + self._display_testing_plan(len(links)) + + working_links = [] + broken_links = [] + + # Test each link + try: + for idx, link in enumerate(links, 1): + is_working, success_rate = self.test_link_comprehensive( + link, idx, len(links) + ) + + result_line = ( + f"{link} # Success rate: {success_rate:.1f}%\n" + ) + if is_working: + working_links.append(result_line) + else: + broken_links.append( + f"{link} # All tests failed\n" + ) + + # Delay between links to avoid rate limiting + if idx < len(links): + print("Waiting 5 seconds before next link...\n") + time.sleep(5) + except KeyboardInterrupt: + print("\n\nTesting interrupted by user. " + "Saving partial results...") + + # Write results to files + if not IPTVLinkTester._write_results_to_file( + self.working_file, + working_links + ): + return + + if not IPTVLinkTester._write_results_to_file( + self.broken_file, + broken_links + ): + return + + # Display summary + self._display_final_summary( + len(links), + len(working_links), + len(broken_links) + ) + + +def main(): + """Main entry point for the IPTV link tester""" + print("IPTV Link Tester - Comprehensive Edition") + print("=" * 70) + + try: + tester = IPTVLinkTester() + tester.process_links() + except KeyboardInterrupt: + print("\n\nProgram terminated by user") + except Exception as e: + print(f"\n\nUnexpected error: {type(e).__name__}: {e}") + print("Please report this issue with the full error message") + + +if __name__ == "__main__": + main() diff --git a/M3U_link_scanner/requirements.txt b/M3U_link_scanner/requirements.txt new file mode 100644 index 0000000000..23b02303ae --- /dev/null +++ b/M3U_link_scanner/requirements.txt @@ -0,0 +1,2 @@ +requests>=2.31.0 +urllib3>=2.0.0