Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions ch_tools/chadmin/internal/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import re
import shutil
import threading
from enum import Enum
from itertools import islice
from typing import Any, Iterable, Iterator, Optional
Expand All @@ -19,6 +20,24 @@
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"


class RaisingThread(threading.Thread):

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self._exc: Optional[BaseException] = None

def run(self) -> None:
try:
super().run()
except BaseException as e:
self._exc = e

def join(self, timeout: Optional[float] = None) -> None:
super().join(timeout)
if self._exc is not None:
raise self._exc


class Scope(str, Enum):
"""
Define a queru scope.
Expand Down
20 changes: 16 additions & 4 deletions ch_tools/common/commands/object_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import re
import subprocess
import tarfile
import threading
import uuid
from collections import defaultdict
from contextlib import contextmanager
Expand Down Expand Up @@ -36,6 +35,7 @@
)
from ch_tools.chadmin.internal.utils import (
DATETIME_FORMAT,
RaisingThread,
Scope,
assert_equal_table_schema_on_cluster,
chunked,
Expand Down Expand Up @@ -525,6 +525,12 @@ def _generate_blobs_from_tar_files() -> Iterator[S3ObjectLocalInfo]:
with open(pipe_path, "rb") as pipe:
with tarfile.open(fileobj=pipe, mode="r|*") as tar:
for member in tar:
# Old backups may have revision.txt file in tar, it should not be parsed
# frozen_metadata.txt should not be uploaded to backup, ignore it just in case
if member.name.endswith("revision.txt") or member.name.endswith(
"frozen_metadata.txt"
):
continue
file = tar.extractfile(member)
if file:
data = file.read().decode("utf-8")
Expand Down Expand Up @@ -568,7 +574,7 @@ def _generate_blobs_from_tar_files() -> Iterator[S3ObjectLocalInfo]:
raise RuntimeError(f"Pipe at {pipe_path} already exists")

with _missing_backups_named_pipe(pipe_path):
parse_thread = threading.Thread(
parse_thread = RaisingThread(
target=_insert_blobs_from_tar, args=(pipe_path,), daemon=True
)
parse_thread.start()
Expand All @@ -588,14 +594,20 @@ def _generate_blobs_from_tar_files() -> Iterator[S3ObjectLocalInfo]:
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
_, stderr = proc.communicate(timeout)
_, stderr = proc.communicate(timeout=timeout)
if proc.returncode:
assert proc.stderr
raise RuntimeError(
f"Downloading cloud storage metadata command has failed: retcode {proc.returncode}, stderr: {stderr.decode('utf-8')}"
)

parse_thread.join(timeout)
try:
parse_thread.join(timeout)
except Exception as e:
raise RuntimeError(
f"Error from parsing cloud storage metadata thread: {e}"
) from e

if parse_thread.is_alive():
raise RuntimeError(
"Downloading cloud storage metadata command has failed: Timeout exceeded, metadata reading thread is probably locked"
Expand Down
Loading