Skip to content

Fixes for reading from named pipe#456

Open
kirillgarbar wants to merge 5 commits intoyandex:mainfrom
kirillgarbar:fix-old-backups-pipe
Open

Fixes for reading from named pipe#456
kirillgarbar wants to merge 5 commits intoyandex:mainfrom
kirillgarbar:fix-old-backups-pipe

Conversation

@kirillgarbar
Copy link
Copy Markdown
Contributor

@kirillgarbar kirillgarbar commented Mar 23, 2026

Summary by Sourcery

Bug Fixes:

  • Prevent potential deadlocks when reading backup metadata from a named pipe by replacing a raw thread with a future-based worker and enforcing timeouts on both the subprocess and the reader.

@sourcery-ai
Copy link
Copy Markdown
Contributor

sourcery-ai bot commented Mar 23, 2026

Reviewer's Guide

Refactors the logic that downloads and parses missing cloud storage backups to use a ThreadPoolExecutor-based worker instead of manually managed threads, and ensures subprocess and parsing timeouts are correctly applied when reading from a named pipe.

Sequence diagram for downloading and parsing missing cloud storage backups via named pipe

sequenceDiagram
    participant GenerateBlobs as _generate_blobs_from_tar_files
    participant Executor as ThreadPoolExecutor
    participant Parser as _insert_blobs_from_tar
    participant Subprocess as ch-backup process
    participant Pipe as named_pipe

    GenerateBlobs->>GenerateBlobs: missing_backups = get_missing_chs3_backups(disk_conf.name)
    GenerateBlobs->>Executor: create with max_workers = 1

    loop for each backup in missing_backups
        GenerateBlobs->>GenerateBlobs: read config (pipe_path, timeout)
        GenerateBlobs->>GenerateBlobs: check os.path.exists(pipe_path)
        alt pipe exists
            GenerateBlobs-->>GenerateBlobs: raise RuntimeError
        else pipe does not exist
            GenerateBlobs->>Pipe: create via _missing_backups_named_pipe

            GenerateBlobs->>Executor: submit(_insert_blobs_from_tar, pipe_path)
            Executor-->>GenerateBlobs: Future

            GenerateBlobs->>Subprocess: Popen(ch-backup get-cloud-storage-metadata ... --local-path pipe_path backup)
            Subprocess->>Pipe: write tar metadata stream
            Subprocess->>Subprocess: communicate(timeout=timeout)
            Subprocess-->>GenerateBlobs: (stdout, stderr)

            alt subprocess returncode != 0
                GenerateBlobs-->>GenerateBlobs: raise RuntimeError with stderr
            else subprocess success
                GenerateBlobs->>Executor: wait on Future.result(timeout)
                alt parser completes before timeout
                    Parser->>Pipe: read and parse tar contents
                    Parser-->>Executor: parsing done
                    Executor-->>GenerateBlobs: result
                else parser times out
                    Executor-->>GenerateBlobs: raise TimeoutError
                    GenerateBlobs-->>GenerateBlobs: raise RuntimeError for locked metadata reading
                end
            end

            GenerateBlobs->>Pipe: close and remove via context manager
        end
    end
Loading

File-Level Changes

Change Details Files
Replace manual parsing thread management with ThreadPoolExecutor to process tar data from a named pipe while enforcing proper timeouts for both subprocess execution and parsing.
  • Wrap processing of missing backups in a ThreadPoolExecutor with max_workers=1 to run the tar parsing function asynchronously for each backup.
  • Replace explicit Thread creation and join calls with executor.submit to start _insert_blobs_from_tar and future.result(timeout) to wait for completion with a timeout.
  • Ensure subprocess.Popen.communicate is called with the configured timeout keyword argument when running ch-backup to download metadata into the named pipe.
  • Preserve and rethrow failures by checking the subprocess return code and raising RuntimeError with stderr details if the download command fails.
  • Replace custom timeout handling logic around the parse thread with catching TimeoutError from the future to raise a clear RuntimeError when metadata reading blocks.
ch_tools/common/commands/object_storage.py

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link
Copy Markdown
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 2 issues, and left some high level feedback:

  • The future.result(timeout) call raises concurrent.futures.TimeoutError, not the built-in TimeoutError, so the current except TimeoutError: block will never trigger; catch concurrent.futures.TimeoutError explicitly instead.
  • Using a ThreadPoolExecutor(max_workers=1) that lives for the entire loop but only ever runs a single short-lived task per iteration adds complexity compared to the original threading.Thread; consider whether a simple thread with a timeout or a per-iteration executor is clearer for this use case.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The `future.result(timeout)` call raises `concurrent.futures.TimeoutError`, not the built-in `TimeoutError`, so the current `except TimeoutError:` block will never trigger; catch `concurrent.futures.TimeoutError` explicitly instead.
- Using a `ThreadPoolExecutor(max_workers=1)` that lives for the entire loop but only ever runs a single short-lived task per iteration adds complexity compared to the original `threading.Thread`; consider whether a simple thread with a timeout or a per-iteration executor is clearer for this use case.

## Individual Comments

### Comment 1
<location path="ch_tools/common/commands/object_storage.py" line_range="548-557" />
<code_context>
+    with ThreadPoolExecutor(max_workers=1) as executor:
</code_context>
<issue_to_address>
**issue (bug_risk):** Potential deadlock when `future.result` times out but executor shutdown waits for the still-running task.

Previously this ran in a daemon thread, so a hung `_insert_blobs_from_tar` wouldn’t block process exit. `ThreadPoolExecutor` workers are non-daemon and the context manager calls `shutdown(wait=True)` by default. If `future.result(timeout=timeout)` times out, the task continues running and the `with ThreadPoolExecutor(...)` exit can block forever. To avoid this, either explicitly use `executor.shutdown(wait=False)` on timeout, manage the executor outside the loop to control shutdown, or revert to a daemon `Thread` so a timeout can’t cause the process to hang.
</issue_to_address>

### Comment 2
<location path="ch_tools/common/commands/object_storage.py" line_range="587-589" />
<code_context>
-                raise RuntimeError(
-                    "Downloading cloud storage metadata command has failed: Timeout exceeded, metadata reading thread is probably locked"
-                )
+                try:
+                    future.result(timeout)
+                except TimeoutError:
+                    raise RuntimeError(
+                        "Downloading cloud storage metadata command has failed: Timeout exceeded, metadata reading thread is probably locked"
</code_context>
<issue_to_address>
**issue (bug_risk):** Catching the wrong TimeoutError type from `future.result`.

`future.result(timeout=timeout)` raises `concurrent.futures.TimeoutError`, not the built-in `TimeoutError`, so this `except TimeoutError:` block will never run and the timeout will propagate unhandled. Please catch `concurrent.futures.TimeoutError` instead, either via an explicit import alias or by using the fully qualified name.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@aalexfvk
Copy link
Copy Markdown
Contributor

Merge main pls

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants