Skip to content

Sidecar supports MarkDoneAll/UnlinkAll and retries on failed requests#198

Draft
yizhuoliang wants to merge 1 commit intomasterfrom
sidecar-effcnt-rlybl
Draft

Sidecar supports MarkDoneAll/UnlinkAll and retries on failed requests#198
yizhuoliang wants to merge 1 commit intomasterfrom
sidecar-effcnt-rlybl

Conversation

@yizhuoliang
Copy link
Copy Markdown
Collaborator

NOTE: only tested exmaples/mllm.py so far. Need more tests.

@gemini-code-assist
Copy link
Copy Markdown

Summary of Changes

Hello @yizhuoliang, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the sidecar service by introducing batch operations for marking data as done and unlinking shared memory, streamlining resource management. A critical improvement is the integration of a comprehensive gRPC retry mechanism with exponential backoff across various client and server interactions, making the system more resilient to transient communication failures. These changes aim to improve both efficiency and reliability of the sidecar's data handling capabilities.

Highlights

  • New Batch Operations for Sidecar: Introduced MarkDoneAll and UnlinkAll RPCs in the sidecar protocol, allowing for efficient batch processing of data cleanup and shared memory unlinking for a given data ID.
  • gRPC Retry Mechanism: Implemented a robust gRPC retry mechanism with exponential backoff for various client and server-side calls across the sidecar service, enhancing resilience against transient network issues or temporary service unavailability.
  • Automated Resource Cleanup: Integrated the new mark_done_all functionality into the geri/engine/client.py to automatically free all associated sidecar buffers after streaming generation, improving resource management.
  • Improved aiohttp Response Handling: Refined the aiohttp client response closing logic in omni.py to prevent errors when the response might already be closed.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces MarkDoneAll and UnlinkAll RPCs to the sidecar service, which is a great addition for cleaning up all chunks of a request at once. It also adds retry logic with exponential backoff for gRPC requests, which improves the resilience of the system.

My main feedback is about the implementation of the retry logic:

  1. Code Duplication: The retry logic is duplicated in many places across multiple files for both synchronous and asynchronous calls. This makes the code difficult to maintain and prone to inconsistencies. I've left a detailed comment with a suggestion to refactor this into a reusable helper function.
  2. Bug in Synchronous Retries: Several of the new synchronous retry loops are missing a call to time.sleep(). This will cause them to busy-wait and spin on the CPU during transient failures, which should be fixed. I've marked these as critical.

Once these points are addressed, the PR will be in great shape.

Comment on lines +132 to +138
logger.warning(
"Register retry %d for sidecar rank %d due to %s",
attempt + 1,
self.sidecar_rank,
code.name,
)
continue
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The retry logic for this synchronous gRPC call is missing a sleep with exponential backoff. Without it, the loop will spin without waiting, causing high CPU usage on transient failures. You should add time.sleep(backoff_delay) and also log the backoff duration.

Suggested change
logger.warning(
"Register retry %d for sidecar rank %d due to %s",
attempt + 1,
self.sidecar_rank,
code.name,
)
continue
backoff_delay = GRPC_RETRY_INITIAL_BACKOFF_SECONDS * (GRPC_RETRY_BACKOFF_MULTIPLIER**attempt)
logger.warning(
"Register retry %d for sidecar rank %d due to %s, waiting %.3fs",
attempt + 1,
self.sidecar_rank,
code.name,
backoff_delay,
)
time.sleep(backoff_delay)
continue

Comment on lines +295 to +303
logger.warning(
"Send retry %d for shard %d chunk %d in req %s due to %s",
attempt + 1,
self.shard_rank,
chunk_id,
id,
code.name,
)
continue
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The retry logic for this synchronous gRPC call is missing a sleep with exponential backoff. Without it, the loop will spin without waiting, causing high CPU usage on transient failures. You should add time.sleep(backoff_delay) and also log the backoff duration.

Suggested change
logger.warning(
"Send retry %d for shard %d chunk %d in req %s due to %s",
attempt + 1,
self.shard_rank,
chunk_id,
id,
code.name,
)
continue
backoff_delay = GRPC_RETRY_INITIAL_BACKOFF_SECONDS * (GRPC_RETRY_BACKOFF_MULTIPLIER**attempt)
logger.warning(
"Send retry %d for shard %d chunk %d in req %s due to %s, waiting %.3fs",
attempt + 1,
self.shard_rank,
chunk_id,
id,
code.name,
backoff_delay,
)
time.sleep(backoff_delay)
continue

Comment on lines +378 to +384
logger.warning(
"CloseStream retry %d for stream %s due to %s",
attempt + 1,
id,
code.name,
)
continue
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The retry logic for this synchronous gRPC call is missing a sleep with exponential backoff. Without it, the loop will spin without waiting, causing high CPU usage on transient failures. You should add time.sleep(backoff_delay) and also log the backoff duration.

Suggested change
logger.warning(
"CloseStream retry %d for stream %s due to %s",
attempt + 1,
id,
code.name,
)
continue
backoff_delay = GRPC_RETRY_INITIAL_BACKOFF_SECONDS * (GRPC_RETRY_BACKOFF_MULTIPLIER**attempt)
logger.warning(
"CloseStream retry %d for stream %s due to %s, waiting %.3fs",
attempt + 1,
id,
code.name,
backoff_delay,
)
time.sleep(backoff_delay)
continue

Comment on lines +402 to +430
res = None
for attempt in range(GRPC_RETRY_MAX_ATTEMPTS):
try:
res = await stub.Unlink(unlink_req)
break
except grpc.RpcError as e:
code = e.code()
if (
code in (grpc.StatusCode.CANCELLED, grpc.StatusCode.UNAVAILABLE)
and attempt < GRPC_RETRY_MAX_ATTEMPTS - 1
):
backoff_delay = GRPC_RETRY_INITIAL_BACKOFF_SECONDS * (GRPC_RETRY_BACKOFF_MULTIPLIER**attempt)
logger.warning(
"Unlink retry %d for req %s chunk %d in rank %d due to %s, waiting %.3fs",
attempt + 1,
mark_done_req.id,
mark_done_req.chunk_id,
chunk_state.intra_node_rank,
code.name,
backoff_delay,
)
await asyncio.sleep(backoff_delay)
continue
raise
if res is None:
await context.abort(
grpc.StatusCode.INTERNAL,
f"Failed to unlink for id {mark_done_req.id}: no response received",
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This retry logic with exponential backoff is duplicated in many places across the codebase (e.g., mark_done_all in this file, and in sender.py, server.py, api.py). This makes the code harder to maintain and prone to errors (like the missing time.sleep in some sync versions).

Consider extracting this logic into a reusable helper function for both asynchronous and synchronous gRPC calls. This would centralize the retry mechanism, improve readability, and ensure consistency.

For example, you could create an async helper like this:

async def grpc_retry_async(stub_call, request, log_message_prefix: str):
    response = None
    for attempt in range(GRPC_RETRY_MAX_ATTEMPTS):
        try:
            response = await stub_call(request)
            break
        except grpc.RpcError as e:
            code = e.code()
            if (
                code in (grpc.StatusCode.CANCELLED, grpc.StatusCode.UNAVAILABLE)
                and attempt < GRPC_RETRY_MAX_ATTEMPTS - 1
            ):
                backoff_delay = GRPC_RETRY_INITIAL_BACKOFF_SECONDS * (GRPC_RETRY_BACKOFF_MULTIPLIER**attempt)
                logger.warning(
                    f"{log_message_prefix} retry %d due to %s, waiting %.3fs",
                    attempt + 1,
                    code.name,
                    backoff_delay,
                )
                await asyncio.sleep(backoff_delay)
                continue
            raise
    return response

And then use it like this:

log_prefix = f"Unlink for req {mark_done_req.id} chunk {mark_done_req.chunk_id} in rank {chunk_state.intra_node_rank}"
res = await grpc_retry_async(stub.Unlink, unlink_req, log_prefix)
if res is None:
    await context.abort(
        grpc.StatusCode.INTERNAL,
        f"Failed to unlink for id {mark_done_req.id}: no response received",
    )

A similar helper grpc_retry_sync could be created for synchronous calls.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yizhuoliang Please deduplicate this part.

Comment on lines +505 to +533
res = None
for attempt in range(GRPC_RETRY_MAX_ATTEMPTS):
try:
res = await stub.UnlinkAll(unlink_all_req)
break
except grpc.RpcError as e:
code = e.code()
if (
code in (grpc.StatusCode.CANCELLED, grpc.StatusCode.UNAVAILABLE)
and attempt < GRPC_RETRY_MAX_ATTEMPTS - 1
):
backoff_delay = GRPC_RETRY_INITIAL_BACKOFF_SECONDS * (GRPC_RETRY_BACKOFF_MULTIPLIER**attempt)
logger.warning(
"UnlinkAll retry %d for req %s in rank %d due to %s, waiting %.3fs",
attempt + 1,
mark_done_all_req.id,
rank,
code.name,
backoff_delay,
)
await asyncio.sleep(backoff_delay)
continue
raise

if res is None:
await context.abort(
grpc.StatusCode.INTERNAL,
f"Failed to unlink all for id {mark_done_all_req.id} in rank {rank}: no response received",
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This is another instance of duplicated retry logic. To improve maintainability and prevent inconsistencies, please see my other comment in this file on the mark_done method about refactoring this into a reusable helper function.

Comment on lines +361 to 387
res = None
for attempt in range(GRPC_RETRY_MAX_ATTEMPTS):
try:
res = await stub.PrepareReceive(req)
break
except grpc.RpcError as e:
code = e.code()
if (
code in (grpc.StatusCode.CANCELLED, grpc.StatusCode.UNAVAILABLE)
and attempt < GRPC_RETRY_MAX_ATTEMPTS - 1
):
backoff_delay = GRPC_RETRY_INITIAL_BACKOFF_SECONDS * (GRPC_RETRY_BACKOFF_MULTIPLIER**attempt)
logger.warning(
"PrepareReceive retry %d for req %s chunk %d to rank %d due to %s, waiting %.3fs",
attempt + 1,
request.id,
request.chunk_id,
dst_rank,
code.name,
backoff_delay,
)
await asyncio.sleep(backoff_delay)
continue
raise
if res is None or res.status != common_pb2.Status.STATUS_OK:
logger.error("Failed to prepare receive")
return sidecar_pb2.SendResponse(status=common_pb2.Status.STATUS_ERROR)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This is another instance of duplicated retry logic. To improve maintainability and prevent inconsistencies, please see my comment in python/cornserve/services/sidecar/receiver.py on the mark_done method about refactoring this into a reusable helper function.

Comment on lines +430 to +454
response = None
for attempt in range(GRPC_RETRY_MAX_ATTEMPTS):
try:
response = await self.aio_stub.Receive(request)
break
except grpc.RpcError as e:
code = e.code()
if (
code in (grpc.StatusCode.CANCELLED, grpc.StatusCode.UNAVAILABLE)
and attempt < GRPC_RETRY_MAX_ATTEMPTS - 1
):
backoff_delay = GRPC_RETRY_INITIAL_BACKOFF_SECONDS * (GRPC_RETRY_BACKOFF_MULTIPLIER**attempt)
logger.warning(
"Receive retry %d for chunk %d in req %s due to %s, waiting %.3fs",
attempt + 1,
chunk_id,
id,
code.name,
backoff_delay,
)
await asyncio.sleep(backoff_delay)
continue
raise
if response is None:
raise RuntimeError(f"Failed to receive data with id {id}: no response received")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This is another instance of duplicated retry logic. To improve maintainability and prevent inconsistencies, please see my comment in python/cornserve/services/sidecar/receiver.py on the mark_done method about refactoring this into a reusable helper function.

@majunze2001
Copy link
Copy Markdown
Collaborator

Please test with omni.py, which will involve all introduced paths

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants