From fcfb86fcb6e0a514ce7a0907bee5aa3e8a40b644 Mon Sep 17 00:00:00 2001 From: skyflow-shravan <121150537+skyflow-shravan@users.noreply.github.com> Date: Mon, 22 Sep 2025 18:52:23 +0530 Subject: [PATCH 1/2] SK-2131 add retry for errors (#202) --- skyflow/vault/_client.py | 69 ++++++++++++++-------------------------- 1 file changed, 23 insertions(+), 46 deletions(-) diff --git a/skyflow/vault/_client.py b/skyflow/vault/_client.py index 9797950..e21dcba 100644 --- a/skyflow/vault/_client.py +++ b/skyflow/vault/_client.py @@ -5,7 +5,6 @@ import types import requests import asyncio -from requests.adapters import HTTPAdapter from skyflow.vault._insert import getInsertRequestBody, processResponse, convertResponse from skyflow.vault._update import sendUpdateRequests, createUpdateResponseBody from skyflow.vault._config import Configuration, ConnectionConfig, DeleteOptions, DetokenizeOptions, GetOptions, InsertOptions, UpdateOptions, QueryOptions @@ -37,71 +36,49 @@ def __init__(self, config: Configuration): raise SkyflowError(SkyflowErrorCodes.INVALID_INPUT, SkyflowErrorMessages.TOKEN_PROVIDER_ERROR.value % ( str(type(config.tokenProvider))), interface=interface) - self._create_session() self.vaultID = config.vaultID self.vaultURL = config.vaultURL.rstrip('/') self.tokenProvider = config.tokenProvider self.storedToken = '' log_info(InfoMessages.CLIENT_INITIALIZED.value, interface=interface) - - def _create_session(self): - self.session = requests.Session() - adapter = HTTPAdapter(pool_connections=1, pool_maxsize=25, pool_block=True) - self.session.mount("https://", adapter) - - def __del__(self): - if (self.session is not None): - log_info(InfoMessages.CLOSING_SESSION.value, interface=InterfaceName.CLIENT.value) - self.session.close() - self.session = None - - def _get_session(self): - if (self.session is None): - self._create_session() - return self.session def insert(self, records: dict, options: InsertOptions = InsertOptions()): - max_retries = 1 interface = InterfaceName.INSERT.value log_info(InfoMessages.INSERT_TRIGGERED.value, interface=interface) self._checkConfig(interface) + jsonBody = getInsertRequestBody(records, options) requestURL = self._get_complete_vault_url() - - for attempt in range(max_retries + 1): + self.storedToken = tokenProviderWrapper( + self.storedToken, self.tokenProvider, interface) + headers = { + "Authorization": "Bearer " + self.storedToken, + "sky-metadata": json.dumps(getMetrics()) + } + max_retries = 3 + # Use for-loop for retry logic, avoid code repetition + for attempt in range(max_retries+1): try: - self.storedToken = tokenProviderWrapper( - self.storedToken, self.tokenProvider, interface) - headers = { - "Authorization": "Bearer " + self.storedToken, - "sky-metadata": json.dumps(getMetrics()), - } - response = self._get_session().post( - requestURL, - data=jsonBody, - headers=headers, - ) + # If jsonBody is a dict, use json=, else use data= + response = requests.post(requestURL, data=jsonBody, headers=headers) processedResponse = processResponse(response) result, partial = convertResponse(records, processedResponse, options) if partial: log_error(SkyflowErrorMessages.BATCH_INSERT_PARTIAL_SUCCESS.value, interface) - elif 'records' not in result: + raise SkyflowError(SkyflowErrorCodes.PARTIAL_SUCCESS, SkyflowErrorMessages.BATCH_INSERT_PARTIAL_SUCCESS.value, result, interface=interface) + if 'records' not in result: log_error(SkyflowErrorMessages.BATCH_INSERT_FAILURE.value, interface) - else: - log_info(InfoMessages.INSERT_DATA_SUCCESS.value, interface) + raise SkyflowError(SkyflowErrorCodes.SERVER_ERROR, SkyflowErrorMessages.BATCH_INSERT_FAILURE.value, result, interface=interface) + log_info(InfoMessages.INSERT_DATA_SUCCESS.value, interface) return result - except requests.exceptions.ConnectionError as err: + except Exception as err: if attempt < max_retries: - continue - raise SkyflowError( - SkyflowErrorCodes.SERVER_ERROR, - SkyflowErrorMessages.NETWORK_ERROR.value % str(err), - interface=interface - ) - except SkyflowError as err: - if err.code != SkyflowErrorCodes.SERVER_ERROR or attempt >= max_retries: - raise err - continue + continue + else: + if isinstance(err, SkyflowError): + raise err + else: + raise SkyflowError(SkyflowErrorCodes.SERVER_ERROR, f"Error occurred: {err}", interface=interface) def detokenize(self, records: dict, options: DetokenizeOptions = DetokenizeOptions()): interface = InterfaceName.DETOKENIZE.value From dc823e666030395973ff997ca9f3866cc5a624e1 Mon Sep 17 00:00:00 2001 From: skyflow-shravan Date: Mon, 22 Sep 2025 14:40:00 +0000 Subject: [PATCH 2/2] [AUTOMATED] Private Release 1.15.6.dev0+fcfb86f --- setup.py | 2 +- skyflow/version.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 6c48fc9..ae32d11 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ if sys.version_info < (3, 7): raise RuntimeError("skyflow requires Python 3.7+") -current_version = '1.15.6' +current_version = '1.15.6.dev0+fcfb86f' setup( name='skyflow', diff --git a/skyflow/version.py b/skyflow/version.py index 491b079..ce8f9ca 100644 --- a/skyflow/version.py +++ b/skyflow/version.py @@ -1 +1 @@ -SDK_VERSION = '1.15.6' \ No newline at end of file +SDK_VERSION = '1.15.6.dev0+fcfb86f' \ No newline at end of file