feat: example for ftp secured env#76
Conversation
WalkthroughAdds a new FTP pull-mode demo script, a pull-mode transcription server module, requirements for dependencies, and a comprehensive test suite. The server watches remote storage, downloads audio, simulates Gladia transcription, and uploads JSON results. The demo simulates listing, filtering, and parallel processing of mock audio files. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Scheduler as Watcher Loop
participant Storage as Remote Storage (FTP/SFTP)
participant Worker as Worker Task
participant Gladia as Gladia API (simulated)
participant FS as Local FS
Scheduler->>Storage: List remote files in REMOTE_DIRECTORY
Storage-->>Scheduler: Audio files + existing .json
Scheduler->>Scheduler: Filter unprocessed audio (no matching .json)
loop For each unprocessed file (<= MAX_PARALLELISM)
Scheduler->>Worker: Submit remote_audio_path
Worker->>Storage: Download audio to local temp
Worker->>Gladia: Upload audio (returns audio_url)
Gladia-->>Worker: audio_url
Worker->>Gladia: Request transcription (returns id)
Gladia-->>Worker: transcription_id
loop Poll until done (every POLLING_INTERVAL_SECONDS)
Worker->>Gladia: Poll transcription_id
Gladia-->>Worker: Pending | Result
end
Worker->>FS: Write transcript.json locally
Worker->>Storage: Upload transcript.json next to audio
Worker->>FS: Cleanup temp files
Worker-->>Scheduler: Completion / error
end
sequenceDiagram
autonumber
actor User as Demo Main()
participant MockFTP as Simulated FTP
participant Exec as ThreadPoolExecutor
User->>MockFTP: simulate_ftp_server()
MockFTP-->>User: List of audio + existing .json
User->>User: Filter audio & unprocessed
alt Unprocessed exists
User->>Exec: Submit simulate_processing(audio_file) (up to 2)
loop Futures complete
Exec-->>User: Result or Exception
end
else None
User->>User: Print summary only
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Pre-merge checks (2 passed, 1 inconclusive)❌ Failed checks (1 inconclusive)
✅ Passed checks (2 passed)
Poem
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).Please share your feedback with us on this Discord post. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
🧹 Nitpick comments (16)
integrations-examples/ftp/pull_mode/requirements.txt (1)
13-17: Keep test-only deps separate (optional)Consider a dedicated requirements-test.txt or extras to avoid installing pytest deps in runtime environments.
integrations-examples/ftp/pull_mode/demo.py (4)
1-1: Shebang vs. permissionsEither remove the shebang or mark the file executable (chmod +x) to avoid EXE001.
-#!/usr/bin/env python3
7-13: Prune unused importssys, json, tempfile, shutil are unused.
-import os -import sys +import os import time -import json -import tempfile -import shutil from concurrent.futures import ThreadPoolExecutor, as_completed
44-44: Remove f-prefix from literal stringsThese f-strings have no placeholders (ruff F541).
- print(f" ⬆️ Uploading to Gladia API...") + print(" ⬆️ Uploading to Gladia API...") - print(f" 🔄 Starting transcription job...") + print(" 🔄 Starting transcription job...") - print(f" ✅ Transcription complete!") + print(" ✅ Transcription complete!") - print(f"\n📊 Analysis:") + print("\n📊 Analysis:") - print(f"\n🚀 Starting parallel processing with 2 workers...") + print("\n🚀 Starting parallel processing with 2 workers...") - print(f"\n📈 Summary:") + print("\n📈 Summary:") - print(f"\n📁 New JSON files created:") + print("\n📁 New JSON files created:")Also applies to: 48-48, 58-58, 89-89, 98-98, 120-120, 124-124
110-115: Narrow exception or log with traceback (demo)Catching bare Exception is fine for a demo, but consider narrowing or adding traceback for easier debugging.
- except Exception as e: - print(f"\n❌ Error processing {futures[future]}: {e}") + except Exception as e: + import traceback + print(f"\n❌ Error processing {futures[future]}: {e}") + traceback.print_exc()integrations-examples/ftp/pull_mode/test_pull_from_ftp.py (5)
6-17: Remove unused importstime, Future, BytesIO, uuid are unused.
import os import sys import json import tempfile import shutil -import time from unittest import TestCase, TestLoader, TextTestRunner, mock from unittest.mock import Mock, MagicMock, patch, call, mock_open -from concurrent.futures import Future -from io import BytesIO -import uuid
43-45: Drop unused test fixtureself.mock_module is never used.
- # Create mock for the module - self.mock_module = MagicMock()
55-58: Ensure fresh import under patched envAvoid cross-test contamination by re-importing the module after clearing sys.modules.
with patch.dict(os.environ, self.env_vars): - # Mock the module import to test configuration - import pull_from_ftp + if 'pull_from_ftp' in sys.modules: + del sys.modules['pull_from_ftp'] + import pull_from_ftp
107-123: Use the patched sleep to assert polling behaviorAssert that polling waited (and silence ARG002 for mock_sleep).
def test_gladia_poll_for_result(self, mock_sleep): @@ result = pull_from_ftp.gladia_poll_for_result("test-id") @@ self.assertIn("full_transcript", result["transcription"]) + self.assertGreaterEqual(mock_sleep.call_count, 1)
1-1: Shebang vs. permissionsSame as demo: either remove the shebang or make the file executable.
-#!/usr/bin/env python3integrations-examples/ftp/pull_mode/pull_from_ftp.py (6)
1-7: Header/file-name mismatchTop-of-file comment says "transcription_server.py" but the file path is "pull_from_ftp.py". Align to avoid confusion.
-# transcription_server.py +# pull_from_ftp.py
34-36: Harden SUPPORTED_EXTENSIONS parsingAvoid empty items and double dots when users set values like ".mp3" or provide empty strings.
-SUPPORTED_EXTENSIONS = tuple(f".{ext.strip().lower()}" for ext in os.getenv("SUPPORTED_EXTENSIONS", "mp3, wav, m4a").split(',')) +SUPPORTED_EXTENSIONS = tuple( + f".{ext.strip().lstrip('.').lower()}" + for ext in os.getenv("SUPPORTED_EXTENSIONS", "mp3, wav, m4a").split(',') + if ext.strip() +)
40-41: Prefer a dedicated configuration error type (and shorter message)Improves signal and satisfies linters complaining about long exception messages.
-if not all([GLADIA_API_KEY, STORAGE_HOST]): - raise ValueError("Error: GLADIA_API_KEY and STORAGE_HOST must be set as environment variables.") +if not all([GLADIA_API_KEY, STORAGE_HOST]): + raise ConfigError("GLADIA_API_KEY and STORAGE_HOST must be set.")Add once near imports:
class ConfigError(Exception): pass
158-162: Use POSIX join for remote pathsAvoid OS-dependent separators without manual replace().
+import posixpath @@ - remote_json_path = os.path.join(remote_dir, json_name).replace("\\", "/") + remote_json_path = posixpath.join(remote_dir, json_name)
184-187: Avoid blindexcept ExceptionCatching everything hides actionable errors and trips BLE001. Narrow to expected failures (I/O, network), and log with traceback.
Example (when real libs are used):
except (OSError, TimeoutError, requests.RequestException) as e: logger.exception("Processing failed for %s", original_remote_path)Also applies to: 243-245
141-146: Atomic remote writes and idempotency (operational)When implementing real storage, upload JSON to a temp name then atomic-rename, and consider a “.inprogress”/marker scheme to avoid double-processing and to make the workflow idempotent across restarts.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
integrations-examples/ftp/pull_mode/demo.py(1 hunks)integrations-examples/ftp/pull_mode/pull_from_ftp.py(1 hunks)integrations-examples/ftp/pull_mode/requirements.txt(1 hunks)integrations-examples/ftp/pull_mode/test_pull_from_ftp.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
integrations-examples/ftp/pull_mode/test_pull_from_ftp.py (1)
integrations-examples/ftp/pull_mode/pull_from_ftp.py (8)
gladia_upload_audio(49-63)gladia_request_transcription(65-78)gladia_poll_for_result(80-114)list_remote_files(119-131)download_file_from_storage(133-139)save_file_to_storage(141-145)process_audio_file(150-186)worker_task(188-204)
🪛 Ruff (0.12.2)
integrations-examples/ftp/pull_mode/test_pull_from_ftp.py
1-1: Shebang is present but file is not executable
(EXE001)
108-108: Unused method argument: mock_sleep
(ARG002)
175-175: Unused method argument: mock_file
(ARG002)
304-304: Unused method argument: mock_download
(ARG002)
304-304: Unused method argument: mock_remove
(ARG002)
319-321: try-except-pass detected, consider logging the exception
(S110)
319-319: Do not catch blind exception: Exception
(BLE001)
integrations-examples/ftp/pull_mode/demo.py
1-1: Shebang is present but file is not executable
(EXE001)
44-44: f-string without any placeholders
Remove extraneous f prefix
(F541)
48-48: f-string without any placeholders
Remove extraneous f prefix
(F541)
58-58: f-string without any placeholders
Remove extraneous f prefix
(F541)
89-89: f-string without any placeholders
Remove extraneous f prefix
(F541)
98-98: f-string without any placeholders
Remove extraneous f prefix
(F541)
113-113: Do not catch blind exception: Exception
(BLE001)
120-120: f-string without any placeholders
Remove extraneous f prefix
(F541)
124-124: f-string without any placeholders
Remove extraneous f prefix
(F541)
integrations-examples/ftp/pull_mode/pull_from_ftp.py
41-41: Avoid specifying long messages outside the exception class
(TRY003)
184-184: Do not catch blind exception: Exception
(BLE001)
243-243: Do not catch blind exception: Exception
(BLE001)
🔇 Additional comments (1)
integrations-examples/ftp/pull_mode/pull_from_ftp.py (1)
206-213: Overall structure looks good for a demoClear separation of concerns and a straightforward ThreadPool-based watcher. With the fixes above, this is a solid starting point.
| print(f"Polling for result of {transcription_id}...") | ||
| while True: | ||
| # headers = {"x-gladia-key": GLADIA_API_KEY} | ||
| # response = requests.get(f"https://api.gladia.io/v2/audio/transcription/{transcription_id}", headers=headers) | ||
| # response.raise_for_status() | ||
| # data = response.json() | ||
|
|
||
| # Pseudocode result for demonstration (simulates waiting): | ||
| if not hasattr(gladia_poll_for_result, "poll_count"): | ||
| gladia_poll_for_result.poll_count = 0 | ||
| gladia_poll_for_result.poll_count += 1 | ||
|
|
||
| if gladia_poll_for_result.poll_count < 3: | ||
| print(f"-> Status: processing. Waiting {POLLING_INTERVAL_SECONDS}s...") | ||
| time.sleep(POLLING_INTERVAL_SECONDS) | ||
| continue | ||
| else: | ||
| print("-> Status: done. Result received.") | ||
| gladia_poll_for_result.poll_count = 0 # Reset for the next file | ||
| # Dummy final data | ||
| data = { | ||
| "status": "done", | ||
| "result": { | ||
| "transcription": { | ||
| "full_transcript": "This is the final transcribed text from the audio." | ||
| }, | ||
| "metadata": {"audio_url": "https://..."}, | ||
| } | ||
| } | ||
| return data["result"] | ||
|
|
There was a problem hiding this comment.
Non-thread-safe shared state in polling (cross-file interference)
Using a function attribute as a counter makes concurrent polls interfere and produce wrong timing. Use local state.
def gladia_poll_for_result(transcription_id):
@@
- while True:
- # headers = {"x-gladia-key": GLADIA_API_KEY}
- # response = requests.get(f"https://api.gladia.io/v2/audio/transcription/{transcription_id}", headers=headers)
- # response.raise_for_status()
- # data = response.json()
-
- # Pseudocode result for demonstration (simulates waiting):
- if not hasattr(gladia_poll_for_result, "poll_count"):
- gladia_poll_for_result.poll_count = 0
- gladia_poll_for_result.poll_count += 1
-
- if gladia_poll_for_result.poll_count < 3:
- print(f"-> Status: processing. Waiting {POLLING_INTERVAL_SECONDS}s...")
- time.sleep(POLLING_INTERVAL_SECONDS)
- continue
- else:
- print("-> Status: done. Result received.")
- gladia_poll_for_result.poll_count = 0 # Reset for the next file
- # Dummy final data
- data = {
- "status": "done",
- "result": {
- "transcription": {
- "full_transcript": "This is the final transcribed text from the audio."
- },
- "metadata": {"audio_url": "https://..."},
- }
- }
- return data["result"]
+ attempts = 0
+ max_attempts = 3 # demo
+ while attempts < max_attempts:
+ print(f"-> Status: processing. Waiting {POLLING_INTERVAL_SECONDS}s...")
+ time.sleep(POLLING_INTERVAL_SECONDS)
+ attempts += 1
+ print("-> Status: done. Result received.")
+ data = {
+ "status": "done",
+ "result": {
+ "transcription": {"full_transcript": "This is the final transcribed text from the audio."},
+ "metadata": {"audio_url": "https://..."},
+ },
+ }
+ return data["result"]📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| print(f"Polling for result of {transcription_id}...") | |
| while True: | |
| # headers = {"x-gladia-key": GLADIA_API_KEY} | |
| # response = requests.get(f"https://api.gladia.io/v2/audio/transcription/{transcription_id}", headers=headers) | |
| # response.raise_for_status() | |
| # data = response.json() | |
| # Pseudocode result for demonstration (simulates waiting): | |
| if not hasattr(gladia_poll_for_result, "poll_count"): | |
| gladia_poll_for_result.poll_count = 0 | |
| gladia_poll_for_result.poll_count += 1 | |
| if gladia_poll_for_result.poll_count < 3: | |
| print(f"-> Status: processing. Waiting {POLLING_INTERVAL_SECONDS}s...") | |
| time.sleep(POLLING_INTERVAL_SECONDS) | |
| continue | |
| else: | |
| print("-> Status: done. Result received.") | |
| gladia_poll_for_result.poll_count = 0 # Reset for the next file | |
| # Dummy final data | |
| data = { | |
| "status": "done", | |
| "result": { | |
| "transcription": { | |
| "full_transcript": "This is the final transcribed text from the audio." | |
| }, | |
| "metadata": {"audio_url": "https://..."}, | |
| } | |
| } | |
| return data["result"] | |
| def gladia_poll_for_result(transcription_id): | |
| print(f"Polling for result of {transcription_id}...") | |
| attempts = 0 | |
| max_attempts = 3 # demo | |
| while attempts < max_attempts: | |
| print(f"-> Status: processing. Waiting {POLLING_INTERVAL_SECONDS}s...") | |
| time.sleep(POLLING_INTERVAL_SECONDS) | |
| attempts += 1 | |
| print("-> Status: done. Result received.") | |
| data = { | |
| "status": "done", | |
| "result": { | |
| "transcription": {"full_transcript": "This is the final transcribed text from the audio."}, | |
| "metadata": {"audio_url": "https://..."}, | |
| }, | |
| } | |
| return data["result"] |
🤖 Prompt for AI Agents
integrations-examples/ftp/pull_mode/pull_from_ftp.py around lines 85-115: the
polling routine uses a function attribute (gladia_poll_for_result.poll_count) as
shared mutable state which is not thread-safe and can cause cross-file or
concurrent poll interference; replace this with a local counter (or
timestamp/loop variable) defined inside the function before the while loop,
increment that local variable on each iteration, use it to decide when to stop
polling and remove any global/function-attribute resets—this ensures each poll
invocation has independent state and is safe for concurrent use.
| def download_file_from_storage(remote_path, local_path): | ||
| """Downloads a single file from the remote server to a local path.""" | ||
| print(f"Downloading {remote_path} to {local_path}...") | ||
| # Real implementation would use sftp.get() or ftp.retrbinary() | ||
| with open(local_path, "w") as f: | ||
| f.write("dummy audio data") # Simulate file content | ||
| print("-> Download complete.") | ||
|
|
There was a problem hiding this comment.
🛠️ Refactor suggestion
Write audio as binary; ensure parent directory exists
Prevents encoding issues and file-not-found on nested temp paths.
def download_file_from_storage(remote_path, local_path):
@@
- # Real implementation would use sftp.get() or ftp.retrbinary()
- with open(local_path, "w") as f:
- f.write("dummy audio data") # Simulate file content
+ # Real implementation would use sftp.get() or ftp.retrbinary()
+ os.makedirs(os.path.dirname(local_path), exist_ok=True)
+ with open(local_path, "wb") as f:
+ f.write(b"dummy audio data") # Simulate file contentCommittable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In integrations-examples/ftp/pull_mode/pull_from_ftp.py around lines 133 to 140,
the helper writes simulated audio as text and assumes the parent directory
exists; change it to ensure the parent directory for local_path exists (use
os.path.dirname and os.makedirs(..., exist_ok=True) if non-empty) and open the
file in binary mode ("wb") writing bytes (e.g., b"dummy audio data") so nested
temp paths don't fail and no encoding issues occur.
| try: | ||
| # 1. Upload the local audio copy to Gladia and start transcription | ||
| audio_url = gladia_upload_audio(local_temp_path) | ||
| transcription_id = gladia_request_transcription(audio_url) | ||
|
|
||
| # 2. Poll until the final result is ready | ||
| final_result = gladia_poll_for_result(transcription_id) | ||
|
|
||
| # 3. Save the transcription JSON to a temporary local file | ||
| temp_json_path = f"{local_temp_path}.json" | ||
| with open(temp_json_path, "w") as f: | ||
| json.dump(final_result, f, indent=4) | ||
|
|
||
| # 4. Upload the JSON to the same remote directory as the audio | ||
| save_file_to_storage(temp_json_path, remote_json_path) | ||
|
|
||
| # 5. Cleanup local temp JSON file | ||
| os.remove(temp_json_path) | ||
| print(f"--- [Thread] Successfully processed and archived JSON for {base_name} ---") | ||
| print(f" JSON -> {remote_json_path}") | ||
|
|
||
| except Exception as e: | ||
| print(f"!!! [Thread] CRITICAL ERROR processing {original_remote_path}: {e}") | ||
| # Add logic here for error handling (e.g., move to a failed directory) | ||
|
|
There was a problem hiding this comment.
🛠️ Refactor suggestion
Ensure local JSON is always cleaned up
If upload fails, the temp JSON currently leaks; add a finally cleanup and compute temp path outside try.
- try:
+ temp_json_path = f"{local_temp_path}.json"
+ try:
# 1. Upload the local audio copy to Gladia and start transcription
audio_url = gladia_upload_audio(local_temp_path)
transcription_id = gladia_request_transcription(audio_url)
@@
- # 3. Save the transcription JSON to a temporary local file
- temp_json_path = f"{local_temp_path}.json"
+ # 3. Save the transcription JSON to a temporary local file
with open(temp_json_path, "w") as f:
json.dump(final_result, f, indent=4)
@@
- # 5. Cleanup local temp JSON file
- os.remove(temp_json_path)
print(f"--- [Thread] Successfully processed and archived JSON for {base_name} ---")
print(f" JSON -> {remote_json_path}")
except Exception as e:
print(f"!!! [Thread] CRITICAL ERROR processing {original_remote_path}: {e}")
# Add logic here for error handling (e.g., move to a failed directory)
+ finally:
+ # Always attempt to remove the local temp JSON
+ if os.path.exists(temp_json_path):
+ try:
+ os.remove(temp_json_path)
+ except OSError:
+ pass📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| try: | |
| # 1. Upload the local audio copy to Gladia and start transcription | |
| audio_url = gladia_upload_audio(local_temp_path) | |
| transcription_id = gladia_request_transcription(audio_url) | |
| # 2. Poll until the final result is ready | |
| final_result = gladia_poll_for_result(transcription_id) | |
| # 3. Save the transcription JSON to a temporary local file | |
| temp_json_path = f"{local_temp_path}.json" | |
| with open(temp_json_path, "w") as f: | |
| json.dump(final_result, f, indent=4) | |
| # 4. Upload the JSON to the same remote directory as the audio | |
| save_file_to_storage(temp_json_path, remote_json_path) | |
| # 5. Cleanup local temp JSON file | |
| os.remove(temp_json_path) | |
| print(f"--- [Thread] Successfully processed and archived JSON for {base_name} ---") | |
| print(f" JSON -> {remote_json_path}") | |
| except Exception as e: | |
| print(f"!!! [Thread] CRITICAL ERROR processing {original_remote_path}: {e}") | |
| # Add logic here for error handling (e.g., move to a failed directory) | |
| # Precompute temp JSON path so we can always clean it up | |
| temp_json_path = f"{local_temp_path}.json" | |
| try: | |
| # 1. Upload the local audio copy to Gladia and start transcription | |
| audio_url = gladia_upload_audio(local_temp_path) | |
| transcription_id = gladia_request_transcription(audio_url) | |
| # 2. Poll until the final result is ready | |
| final_result = gladia_poll_for_result(transcription_id) | |
| # 3. Save the transcription JSON to a temporary local file | |
| with open(temp_json_path, "w") as f: | |
| json.dump(final_result, f, indent=4) | |
| # 4. Upload the JSON to the same remote directory as the audio | |
| save_file_to_storage(temp_json_path, remote_json_path) | |
| print(f"--- [Thread] Successfully processed and archived JSON for {base_name} ---") | |
| print(f" JSON -> {remote_json_path}") | |
| except Exception as e: | |
| print(f"!!! [Thread] CRITICAL ERROR processing {original_remote_path}: {e}") | |
| # Add logic here for error handling (e.g., move to a failed directory) | |
| finally: | |
| # Always attempt to remove the local temp JSON | |
| if os.path.exists(temp_json_path): | |
| try: | |
| os.remove(temp_json_path) | |
| except OSError: | |
| pass |
🧰 Tools
🪛 Ruff (0.12.2)
184-184: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
In integrations-examples/ftp/pull_mode/pull_from_ftp.py around lines 163 to 187,
the temporary JSON file is created inside the try block and not guaranteed to be
removed on upload failure which leaks temp files; compute temp_json_path before
entering the try, move creation/writing/ upload steps into the try, and add a
finally block that checks for existence of temp_json_path and attempts to remove
it (safely catching and logging any OSError) so the temp file is always cleaned
up regardless of success or failure.
| temp_dir = "temp_downloads" | ||
| # Create a unique local filename to prevent collisions between threads | ||
| unique_filename = f"{uuid.uuid4()}-{os.path.basename(remote_audio_path)}" | ||
| local_temp_path = os.path.join(temp_dir, unique_filename) | ||
|
|
There was a problem hiding this comment.
🛠️ Refactor suggestion
Create temp directory in worker too
Makes the worker robust when used standalone (e.g., in tests).
def worker_task(remote_audio_path):
@@
- temp_dir = "temp_downloads"
+ temp_dir = "temp_downloads"
+ os.makedirs(temp_dir, exist_ok=True)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| temp_dir = "temp_downloads" | |
| # Create a unique local filename to prevent collisions between threads | |
| unique_filename = f"{uuid.uuid4()}-{os.path.basename(remote_audio_path)}" | |
| local_temp_path = os.path.join(temp_dir, unique_filename) | |
| temp_dir = "temp_downloads" | |
| os.makedirs(temp_dir, exist_ok=True) | |
| # Create a unique local filename to prevent collisions between threads | |
| unique_filename = f"{uuid.uuid4()}-{os.path.basename(remote_audio_path)}" | |
| local_temp_path = os.path.join(temp_dir, unique_filename) |
🤖 Prompt for AI Agents
In integrations-examples/ftp/pull_mode/pull_from_ftp.py around lines 192 to 196,
the code assumes the temp_dir "temp_downloads" exists before writing files; make
the worker robust by ensuring the directory is created when running standalone
(e.g., tests). Add a call to create the directory before constructing/writing
the local_temp_path (use os.makedirs(temp_dir, exist_ok=True) or equivalent) so
the worker won’t fail if the folder is missing.
| with ThreadPoolExecutor(max_workers=MAX_PARALLELISM) as executor: | ||
| # This loop runs indefinitely to check for new files. | ||
| while True: | ||
| try: | ||
| print(f"\nChecking remote path '{REMOTE_DIRECTORY}' for new files...") | ||
| all_files = list_remote_files(REMOTE_DIRECTORY) | ||
|
|
||
| audio_files = {f for f in all_files if f.lower().endswith(SUPPORTED_EXTENSIONS)} | ||
| json_files = {f for f in all_files if f.lower().endswith('.json')} | ||
|
|
||
| unprocessed_audio = [] | ||
| for audio_path in audio_files: | ||
| expected_json_path = os.path.splitext(audio_path)[0] + ".json" | ||
| if expected_json_path not in json_files: | ||
| unprocessed_audio.append(audio_path) | ||
|
|
||
| if not unprocessed_audio: | ||
| print("No new audio files to process.") | ||
| else: | ||
| for remote_path in unprocessed_audio: | ||
| if remote_path not in processing_files: | ||
| print(f"Found new file, submitting to processing queue: {remote_path}") | ||
| processing_files.add(remote_path) | ||
| future = executor.submit(worker_task, remote_path) | ||
| # When the task is done (successfully or not), remove it from the set. | ||
| future.add_done_callback(lambda f: processing_files.remove(f.result())) | ||
|
|
There was a problem hiding this comment.
Don't call f.result() in callback; also guard against duplicates
Calling result() re-raises task exceptions and prevents cleanup; discard by captured path instead. This also avoids KeyError on remove().
- future.add_done_callback(lambda f: processing_files.remove(f.result()))
+ future.add_done_callback(lambda f, p=remote_path: processing_files.discard(p))Optional: if you want to be extra safe about concurrent set mutations, wrap add/discard in a lock. I can provide a small wrapper if needed.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| with ThreadPoolExecutor(max_workers=MAX_PARALLELISM) as executor: | |
| # This loop runs indefinitely to check for new files. | |
| while True: | |
| try: | |
| print(f"\nChecking remote path '{REMOTE_DIRECTORY}' for new files...") | |
| all_files = list_remote_files(REMOTE_DIRECTORY) | |
| audio_files = {f for f in all_files if f.lower().endswith(SUPPORTED_EXTENSIONS)} | |
| json_files = {f for f in all_files if f.lower().endswith('.json')} | |
| unprocessed_audio = [] | |
| for audio_path in audio_files: | |
| expected_json_path = os.path.splitext(audio_path)[0] + ".json" | |
| if expected_json_path not in json_files: | |
| unprocessed_audio.append(audio_path) | |
| if not unprocessed_audio: | |
| print("No new audio files to process.") | |
| else: | |
| for remote_path in unprocessed_audio: | |
| if remote_path not in processing_files: | |
| print(f"Found new file, submitting to processing queue: {remote_path}") | |
| processing_files.add(remote_path) | |
| future = executor.submit(worker_task, remote_path) | |
| # When the task is done (successfully or not), remove it from the set. | |
| future.add_done_callback(lambda f: processing_files.remove(f.result())) | |
| with ThreadPoolExecutor(max_workers=MAX_PARALLELISM) as executor: | |
| # This loop runs indefinitely to check for new files. | |
| while True: | |
| try: | |
| print(f"\nChecking remote path '{REMOTE_DIRECTORY}' for new files...") | |
| all_files = list_remote_files(REMOTE_DIRECTORY) | |
| audio_files = {f for f in all_files if f.lower().endswith(SUPPORTED_EXTENSIONS)} | |
| json_files = {f for f in all_files if f.lower().endswith('.json')} | |
| unprocessed_audio = [] | |
| for audio_path in audio_files: | |
| expected_json_path = os.path.splitext(audio_path)[0] + ".json" | |
| if expected_json_path not in json_files: | |
| unprocessed_audio.append(audio_path) | |
| if not unprocessed_audio: | |
| print("No new audio files to process.") | |
| else: | |
| for remote_path in unprocessed_audio: | |
| if remote_path not in processing_files: | |
| print(f"Found new file, submitting to processing queue: {remote_path}") | |
| processing_files.add(remote_path) | |
| future = executor.submit(worker_task, remote_path) | |
| # When the task is done (successfully or not), remove it from the set. | |
| future.add_done_callback(lambda f, p=remote_path: processing_files.discard(p)) |
🤖 Prompt for AI Agents
In integrations-examples/ftp/pull_mode/pull_from_ftp.py around lines 216 to 242,
the done-callback calls f.result() which will re-raise exceptions and block
cleanup and uses remove() which can KeyError; change the callback to capture the
remote_path when submitting the task (e.g. bind it via a lambda param or
functools.partial) and in the callback discard that captured path from
processing_files (use set.discard to avoid KeyError). Optionally wrap
processing_files.add/discard in a threading.Lock if you want stronger protection
against concurrent mutations.
| paramiko>=3.0.0 # For SFTP support | ||
| pysftp>=0.2.9 # Alternative SFTP client | ||
| ftputil>=5.0.0 # For FTP support |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Drop pysftp (unmaintained); rely on Paramiko SFTP only
pysftp has long been unmaintained and is a frequent source of security/compat issues. Paramiko already provides solid SFTP; keeping both increases surface area with no gain.
Apply:
requests>=2.31.0 # For Gladia API interactions
paramiko>=3.0.0 # For SFTP support
-pysftp>=0.2.9 # Alternative SFTP client
ftputil>=5.0.0 # For FTP support📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| paramiko>=3.0.0 # For SFTP support | |
| pysftp>=0.2.9 # Alternative SFTP client | |
| ftputil>=5.0.0 # For FTP support | |
| requests>=2.31.0 # For Gladia API interactions | |
| paramiko>=3.0.0 # For SFTP support | |
| ftputil>=5.0.0 # For FTP support |
🤖 Prompt for AI Agents
In integrations-examples/ftp/pull_mode/requirements.txt around lines 3 to 5,
remove the unmaintained pysftp dependency and its comment so the file relies on
paramiko for SFTP support only; update or keep the remaining comments as needed
(e.g., keep paramiko>=3.0.0 for SFTP and ftputil>=5.0.0 for FTP) and ensure no
other references to pysftp remain in the example code or docs.
| @patch('os.remove') | ||
| @patch('pull_from_ftp.process_audio_file') | ||
| @patch('pull_from_ftp.download_file_from_storage') | ||
| def test_error_handling_in_worker(self, mock_download, mock_process, mock_remove): | ||
| """Test error handling in the worker task.""" | ||
| import pull_from_ftp | ||
|
|
||
| # Make processing raise an exception | ||
| mock_process.side_effect = Exception("Processing failed") | ||
|
|
||
| remote_path = "/remote/test.mp3" | ||
|
|
||
| # Worker task should still complete even with exception | ||
| # The finally block ensures cleanup happens | ||
| try: | ||
| result = pull_from_ftp.worker_task(remote_path) | ||
| # Should still return the path even on error | ||
| self.assertEqual(result, remote_path) | ||
| except Exception: | ||
| # The pseudocode doesn't handle the exception, so it might propagate | ||
| pass |
There was a problem hiding this comment.
Make error-path test deterministic; assert cleanup and raise contract
Currently the test passes even if an exception is raised (try/except/pass). Decide on the contract; assuming worker_task propagates errors, assert raise and that cleanup ran.
- @patch('os.remove')
- @patch('pull_from_ftp.process_audio_file')
- @patch('pull_from_ftp.download_file_from_storage')
- def test_error_handling_in_worker(self, mock_download, mock_process, mock_remove):
+ @patch('os.path.exists')
+ @patch('os.remove')
+ @patch('pull_from_ftp.process_audio_file')
+ @patch('pull_from_ftp.download_file_from_storage')
+ def test_error_handling_in_worker(self, mock_download, mock_process, mock_remove, mock_exists):
"""Test error handling in the worker task."""
import pull_from_ftp
@@
- # Worker task should still complete even with exception
- # The finally block ensures cleanup happens
- try:
- result = pull_from_ftp.worker_task(remote_path)
- # Should still return the path even on error
- self.assertEqual(result, remote_path)
- except Exception:
- # The pseudocode doesn't handle the exception, so it might propagate
- pass
+ # Ensure cleanup branch is taken
+ mock_exists.return_value = True
+ with self.assertRaises(Exception):
+ pull_from_ftp.worker_task(remote_path)
+ mock_remove.assert_called()If you prefer worker_task to swallow errors and return the path, I can flip the assertion and provide a matching worker_task change.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @patch('os.remove') | |
| @patch('pull_from_ftp.process_audio_file') | |
| @patch('pull_from_ftp.download_file_from_storage') | |
| def test_error_handling_in_worker(self, mock_download, mock_process, mock_remove): | |
| """Test error handling in the worker task.""" | |
| import pull_from_ftp | |
| # Make processing raise an exception | |
| mock_process.side_effect = Exception("Processing failed") | |
| remote_path = "/remote/test.mp3" | |
| # Worker task should still complete even with exception | |
| # The finally block ensures cleanup happens | |
| try: | |
| result = pull_from_ftp.worker_task(remote_path) | |
| # Should still return the path even on error | |
| self.assertEqual(result, remote_path) | |
| except Exception: | |
| # The pseudocode doesn't handle the exception, so it might propagate | |
| pass | |
| @patch('os.path.exists') | |
| @patch('os.remove') | |
| @patch('pull_from_ftp.process_audio_file') | |
| @patch('pull_from_ftp.download_file_from_storage') | |
| def test_error_handling_in_worker(self, mock_download, mock_process, mock_remove, mock_exists): | |
| """Test error handling in the worker task.""" | |
| import pull_from_ftp | |
| # Make processing raise an exception | |
| mock_process.side_effect = Exception("Processing failed") | |
| remote_path = "/remote/test.mp3" | |
| # Ensure cleanup branch is taken | |
| mock_exists.return_value = True | |
| with self.assertRaises(Exception): | |
| pull_from_ftp.worker_task(remote_path) | |
| mock_remove.assert_called() |
🧰 Tools
🪛 Ruff (0.12.2)
304-304: Unused method argument: mock_download
(ARG002)
304-304: Unused method argument: mock_remove
(ARG002)
319-321: try-except-pass detected, consider logging the exception
(S110)
319-319: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
integrations-examples/ftp/pull_mode/test_pull_from_ftp.py around lines 301-321:
the test is non-deterministic because it swallows exceptions with
try/except/pass; decide the contract and make assertions deterministic—if
worker_task should propagate errors, replace the try/except with an assertion
that calling worker_task raises the expected Exception (use assertRaises or
pytest.raises) and assert that cleanup functions were called (e.g.,
mock_remove.assert_called_with(remote_path) and any download cleanup invoked);
alternatively, if worker_task should swallow errors and return the path, change
the test to assert the return value equals remote_path and assert cleanup calls
occurred, or update worker_task to catch exceptions, perform cleanup, and return
the path to match the test.
Summary by CodeRabbit