diff --git a/setup.py b/setup.py index ae32d11..6c48fc9 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ if sys.version_info < (3, 7): raise RuntimeError("skyflow requires Python 3.7+") -current_version = '1.15.6.dev0+fcfb86f' +current_version = '1.15.6' setup( name='skyflow', diff --git a/skyflow/vault/_client.py b/skyflow/vault/_client.py index e21dcba..96d8623 100644 --- a/skyflow/vault/_client.py +++ b/skyflow/vault/_client.py @@ -5,6 +5,7 @@ import types import requests import asyncio +from requests.adapters import HTTPAdapter from skyflow.vault._insert import getInsertRequestBody, processResponse, convertResponse from skyflow.vault._update import sendUpdateRequests, createUpdateResponseBody from skyflow.vault._config import Configuration, ConnectionConfig, DeleteOptions, DetokenizeOptions, GetOptions, InsertOptions, UpdateOptions, QueryOptions @@ -36,49 +37,71 @@ def __init__(self, config: Configuration): raise SkyflowError(SkyflowErrorCodes.INVALID_INPUT, SkyflowErrorMessages.TOKEN_PROVIDER_ERROR.value % ( str(type(config.tokenProvider))), interface=interface) + self._create_session() self.vaultID = config.vaultID self.vaultURL = config.vaultURL.rstrip('/') self.tokenProvider = config.tokenProvider self.storedToken = '' log_info(InfoMessages.CLIENT_INITIALIZED.value, interface=interface) + + def _create_session(self): + self.session = requests.Session() + adapter = HTTPAdapter(pool_connections=1, pool_maxsize=25, pool_block=True) + self.session.mount("https://", adapter) + + def __del__(self): + if (self.session is not None): + log_info(InfoMessages.CLOSING_SESSION.value, interface=InterfaceName.CLIENT.value) + self.session.close() + self.session = None + + def _get_session(self): + if (self.session is None): + self._create_session() + return self.session def insert(self, records: dict, options: InsertOptions = InsertOptions()): + max_retries = 1 interface = InterfaceName.INSERT.value log_info(InfoMessages.INSERT_TRIGGERED.value, interface=interface) self._checkConfig(interface) - jsonBody = getInsertRequestBody(records, options) requestURL = self._get_complete_vault_url() - self.storedToken = tokenProviderWrapper( - self.storedToken, self.tokenProvider, interface) - headers = { - "Authorization": "Bearer " + self.storedToken, - "sky-metadata": json.dumps(getMetrics()) - } - max_retries = 3 - # Use for-loop for retry logic, avoid code repetition - for attempt in range(max_retries+1): + + for attempt in range(max_retries + 1): try: - # If jsonBody is a dict, use json=, else use data= - response = requests.post(requestURL, data=jsonBody, headers=headers) + self.storedToken = tokenProviderWrapper( + self.storedToken, self.tokenProvider, interface) + headers = { + "Authorization": "Bearer " + self.storedToken, + "sky-metadata": json.dumps(getMetrics()), + } + response = self._get_session().post( + requestURL, + data=jsonBody, + headers=headers, + ) processedResponse = processResponse(response) result, partial = convertResponse(records, processedResponse, options) if partial: log_error(SkyflowErrorMessages.BATCH_INSERT_PARTIAL_SUCCESS.value, interface) - raise SkyflowError(SkyflowErrorCodes.PARTIAL_SUCCESS, SkyflowErrorMessages.BATCH_INSERT_PARTIAL_SUCCESS.value, result, interface=interface) - if 'records' not in result: + elif 'records' not in result: log_error(SkyflowErrorMessages.BATCH_INSERT_FAILURE.value, interface) - raise SkyflowError(SkyflowErrorCodes.SERVER_ERROR, SkyflowErrorMessages.BATCH_INSERT_FAILURE.value, result, interface=interface) - log_info(InfoMessages.INSERT_DATA_SUCCESS.value, interface) + else: + log_info(InfoMessages.INSERT_DATA_SUCCESS.value, interface) return result - except Exception as err: + except requests.exceptions.ConnectionError as err: if attempt < max_retries: - continue - else: - if isinstance(err, SkyflowError): - raise err - else: - raise SkyflowError(SkyflowErrorCodes.SERVER_ERROR, f"Error occurred: {err}", interface=interface) + continue + raise SkyflowError( + SkyflowErrorCodes.SERVER_ERROR, + SkyflowErrorMessages.NETWORK_ERROR.value % str(err), + interface=interface + ) + except SkyflowError as err: + if err.code != SkyflowErrorCodes.SERVER_ERROR or attempt >= max_retries: + raise err + continue def detokenize(self, records: dict, options: DetokenizeOptions = DetokenizeOptions()): interface = InterfaceName.DETOKENIZE.value @@ -292,4 +315,4 @@ def delete(self, records: dict, options: DeleteOptions = DeleteOptions()): else: log_info(InfoMessages.DELETE_DATA_SUCCESS.value, interface) - return result + return result \ No newline at end of file diff --git a/skyflow/version.py b/skyflow/version.py index ce8f9ca..491b079 100644 --- a/skyflow/version.py +++ b/skyflow/version.py @@ -1 +1 @@ -SDK_VERSION = '1.15.6.dev0+fcfb86f' \ No newline at end of file +SDK_VERSION = '1.15.6' \ No newline at end of file