-
Notifications
You must be signed in to change notification settings - Fork 22
Update python client for latest accumulo-proxy and accumulo:2.1.1 #5
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,7 @@ | ||
| FROM python:3.8.17-slim | ||
|
|
||
| LABEL org.opencontainers.image.source=https://github.com/NationalSecurityAgency/accumulo-python3 | ||
|
|
||
| COPY . /app | ||
|
|
||
| RUN pip install /app |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -12,7 +12,12 @@ Library features include: | |||||
| import accumulo | ||||||
| from accumulo import Mutation, RangePrefix, ScanOptions | ||||||
|
|
||||||
| connector = accumulo.AccumuloProxyConnectionContext().create_connector('user', 'secret') | ||||||
| # The accumulo proxy now authenticates with accumulo by itself. | ||||||
| # This client then authenticates with the proxy using a configured 'sharedSecret', | ||||||
| # shared between the client and proxy | ||||||
| sharedSecret = "sharedSecret" | ||||||
|
|
||||||
| connector = accumulo.AccumuloProxyConnectionContext().create_connector(sharedSecret) | ||||||
|
|
||||||
| # Create the table 'tmp' if it does not already exist. | ||||||
| if not connector.table_exists('tmp'): | ||||||
|
|
@@ -32,7 +37,7 @@ with connector.create_scanner('tmp', ScanOptions(range=RangePrefix('User.1'))) a | |||||
| print(r.row, r.cf, r.value_bytes) | ||||||
| ``` | ||||||
|
|
||||||
| __Note__. This library is a work in progress. It has been tested with Accumulo 1.9 and Python 3.8. | ||||||
| __Note__. This library is a work in progress. It has been tested with Accumulo 2.1.1 and Python 3.8. | ||||||
|
|
||||||
| ## Installation | ||||||
|
|
||||||
|
|
@@ -53,6 +58,14 @@ development. | |||||
| pip install -e . | ||||||
| ``` | ||||||
|
|
||||||
| ### Docker Image | ||||||
|
|
||||||
| Alternatively, build a docker image | ||||||
|
|
||||||
| ``` | ||||||
| docker build . -t accumulo-proxy-client | ||||||
| ``` | ||||||
|
|
||||||
| ## Background | ||||||
|
|
||||||
| Native integration with Accumulo is powered by [Apache Thrift](https://thrift.apache.org/). This library embeds | ||||||
|
|
@@ -98,8 +111,7 @@ higher-level functionality in this library. Use the `client` property of an `Acc | |||||
| access these bindings. | ||||||
|
|
||||||
| ```python | ||||||
| login = proxy_connection.client.login('user', {'password': 'secret'}) | ||||||
| proxy_connection.client.changeUserAuthorizations(login, 'user', [b'ADMIN']) | ||||||
| proxy_connection.client.changeUserAuthorizations('sharedSecret', 'user', [b'ADMIN']) | ||||||
| ``` | ||||||
|
|
||||||
| ### Creating a blocking connector | ||||||
|
|
@@ -111,9 +123,9 @@ Use the `AccumuloProxyConnectionContext` class to create a blocking connector in | |||||
|
|
||||||
| ```python | ||||||
| from accumulo import AccumuloProxyConnectionContext | ||||||
|
|
||||||
| sharedSecret = "sharedSecret" | ||||||
| context = AccumuloProxyConnectionContext(proxy_connection) | ||||||
| connector = context.create_connector('user', 'secret') | ||||||
| connector = context.create_connector(sharedSecret) | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The underlying proxy uses a configuration file to set up an AccumuloClient object. It is no longer necessary for proxy client code to create a connector, and there is no mechanism to do so in the newer proxy code. So, I'm not sure this create connector makes sense anymore. |
||||||
| ``` | ||||||
|
|
||||||
| ### Perform some basic table operations | ||||||
|
|
@@ -243,23 +255,23 @@ with connector.create_scanner( | |||||
| 'tmp', | ||||||
| ScanOptions( | ||||||
| # Binary and non-binary arguments are accepted | ||||||
| range=Range(start_key=Key('sk', b'cf')) | ||||||
| range=Range(start_key=Key('aStartKey', b'cf')) | ||||||
| ) | ||||||
| ) as scanner: | ||||||
| pass | ||||||
|
|
||||||
| with connector.create_scanner( | ||||||
| 'tmp', | ||||||
| ScanOptions( | ||||||
| range=Range(end_key=Key('ek', b'cf'), is_end_key_inclusive=True) | ||||||
| range=Range(end_key=Key('endKey', b'cf'), is_end_key_inclusive=True) | ||||||
| ) | ||||||
| ) as scanner: | ||||||
| pass | ||||||
|
|
||||||
| with connector.create_scanner( | ||||||
| 'tmp', | ||||||
| ScanOptions( | ||||||
| range=Range(start_key=Key('sk', b'cf'), end_key=Key('ek', 'cf', 'cq')) | ||||||
| range=Range(start_key=Key('aStartKey', b'cf'), end_key=Key('endKey', 'cf', 'cq')) | ||||||
| ) | ||||||
| ) as scanner: | ||||||
| pass | ||||||
|
|
@@ -280,15 +292,15 @@ with connector.create_batch_scanner( | |||||
|
|
||||||
| ### Use an iterator | ||||||
|
|
||||||
| `ScanOptions` and `BatchScanOptions` both support an `iterator_settings` keyword argument. | ||||||
| `ScanOptions` and `BatchScanOptions` both support an `iterators` keyword argument. | ||||||
|
|
||||||
| ```python | ||||||
| from accumulo import IteratorSetting | ||||||
|
|
||||||
| with connector.create_scanner( | ||||||
| 'tmp', | ||||||
| ScanOptions( | ||||||
| iterator_settings=[ | ||||||
| iterators=[ | ||||||
| IteratorSetting(priority=30, name='iter', iterator_class='my.iterator', properties={}) | ||||||
| ] | ||||||
| ) | ||||||
|
|
@@ -311,7 +323,7 @@ connector, we will use the `AccumuloProxyConnectionPoolContextAsync` class. | |||||
| ```python | ||||||
| from accumulo import AccumuloProxyConnectionPoolContextAsync | ||||||
|
|
||||||
| async_conn = await AccumuloProxyConnectionPoolContextAsync().create_connector('user', 'secret') | ||||||
| async_conn = await AccumuloProxyConnectionPoolContextAsync().create_connector('sharedSecret') | ||||||
| ``` | ||||||
|
|
||||||
| Unlike the blocking connector, the non-blocking connector uses a pool of proxy connection objects, and uses a | ||||||
|
|
@@ -335,7 +347,7 @@ executor = AsyncAccumuloConnectorPoolExecutor( | |||||
| ) | ||||||
| # A default executor is created if one is not provided. | ||||||
| context = AccumuloProxyConnectionPoolContextAsync(executor) | ||||||
| async_conn = await AccumuloProxyConnectionPoolContextAsync().create_connector('user', 'secret') | ||||||
| async_conn = await AccumuloProxyConnectionPoolContextAsync().create_connector('sharedSecret') | ||||||
| ``` | ||||||
|
|
||||||
| #### Using writers | ||||||
|
|
@@ -370,5 +382,5 @@ the binding function from a proxy client instance. | |||||
|
|
||||||
| ```python | ||||||
| # executor.run(gettern_fn, *args) | ||||||
| await executor.run(lambda c: c.tableExists, login, 'tmp') | ||||||
| await executor.run(lambda c: c.tableExists, 'sharedSecret', 'tmp') | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In these examples, the shared secret is being provided as a string, but probably should show an example of passing it as a variable, the same way the
Suggested change
|
||||||
| ``` | ||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -133,50 +133,50 @@ def context(self): | |
|
|
||
| class AsyncAccumuloConnector(AccumuloConnectorBase): | ||
|
|
||
| def __init__(self, proxy_connection_pool_executor: AsyncAccumuloConnectorPoolExecutor, login: bytes): | ||
| super().__init__(login) | ||
| def __init__(self, proxy_connection_pool_executor: AsyncAccumuloConnectorPoolExecutor, shared_secret: bytes): | ||
| super().__init__(shared_secret) | ||
| self.proxy_connection_pool_executor = proxy_connection_pool_executor | ||
|
|
||
| async def create_scanner(self, table: str, opts: Optional[ScanOptions] = None): | ||
| if opts is None: | ||
| opts = ScanOptions() | ||
| opts = TTypeFactory.scan_options(opts) | ||
| resource_id = await self.proxy_connection_pool_executor.run(AccumuloProxyClientFunctionGetters.create_scanner, | ||
| self.login, table, opts) | ||
| self.shared_secret, table, opts) | ||
| return AsyncAccumuloScanner(self.proxy_connection_pool_executor, resource_id) | ||
|
|
||
| async def create_batch_scanner(self, table: str, opts: Optional[BatchScanOptions] = None): | ||
| if opts is None: | ||
| opts = BatchScanOptions() | ||
| opts = TTypeFactory.batch_scan_options(opts) | ||
| resource_id = await self.proxy_connection_pool_executor.run( | ||
| AccumuloProxyClientFunctionGetters.create_batch_scanner, self.login, table, opts) | ||
| AccumuloProxyClientFunctionGetters.create_batch_scanner, self.shared_secret, table, opts) | ||
| return AsyncAccumuloScanner(self.proxy_connection_pool_executor, resource_id) | ||
|
|
||
| async def create_writer(self, table: str, opts: Optional[WriterOptions] = None): | ||
| if opts is None: | ||
| opts = WriterOptions() | ||
| opts = TTypeFactory.writer_options(opts) | ||
| resource_id = await self.proxy_connection_pool_executor.run(AccumuloProxyClientFunctionGetters.create_writer, | ||
| self.login, table, opts) | ||
| self.shared_secret, table, opts) | ||
| return AsyncAccumuloWriter(self.proxy_connection_pool_executor, resource_id) | ||
|
|
||
| async def change_user_authorizations(self, user: str, auths: Types.T_AUTHORIZATION_SET): | ||
| await self.proxy_connection_pool_executor.run(AccumuloProxyClientFunctionGetters.change_user_authorizations, | ||
| self.login, user, auths) | ||
| self.shared_secret, user, auths) | ||
|
|
||
| async def get_user_authorizations(self, user: str) -> Types.T_AUTHORIZATION_SET: | ||
| auths = await self.proxy_connection_pool_executor.run( | ||
| AccumuloProxyClientFunctionGetters.get_user_authorizations, user) | ||
| return AuthorizationSet(auths) | ||
|
|
||
| async def create_table(self, table: str, version_iter: bool = True, time_type: Types.T_TIME_TYPE = TimeType.MILLIS): | ||
| await self.proxy_connection_pool_executor.run(AccumuloProxyClientFunctionGetters.create_table, self.login, | ||
| await self.proxy_connection_pool_executor.run(AccumuloProxyClientFunctionGetters.create_table, self.shared_secret, | ||
| table, version_iter, time_type) | ||
|
|
||
| async def table_exists(self, table: str) -> bool: | ||
| return await self.proxy_connection_pool_executor.run(AccumuloProxyClientFunctionGetters.table_exists, | ||
| self.login, table) | ||
| self.shared_secret, table) | ||
|
|
||
|
|
||
| class AsyncAccumuloConnectorResource: | ||
|
|
@@ -237,18 +237,12 @@ def __init__(self, proxy_connection_pool_executor: Optional[AsyncAccumuloConnect | |
| proxy_connection_pool_executor = AsyncAccumuloConnectorPoolExecutor() | ||
| self.proxy_connection_pool_executor = proxy_connection_pool_executor | ||
|
|
||
| async def create_connector(self, user: str, password: str) -> AsyncAccumuloConnector: | ||
| login = await self.proxy_connection_pool_executor.run(AccumuloProxyClientFunctionGetters.login, user, | ||
| {'password': password}) | ||
| return AsyncAccumuloConnector(self.proxy_connection_pool_executor, login) | ||
| async def create_connector(self, shared_secret: bytes) -> AsyncAccumuloConnector: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With Accumulo 2.x, we deprecated the "Connector" APIs, and that terminology, in favor of the more familiar "Client" terminology (and APIs). I don't know if it makes sense to continue calling this python library's methods and types "connector" or if it makes sense to rename them to match to the corresponding Accumulo types. Then again, the word "client" is very overloaded here... because you have proxy, which is a client to Accumulo and has an AccumuloClient object, but is also a Thrift service, and there is the generated Thrift client code that is called by this library. Then, there is client code to this library. It is a bit confusing. Perhaps this library may be able to choose a different name that helps disambiguate them, for users of this library, at least? |
||
| return AsyncAccumuloConnector(self.proxy_connection_pool_executor, shared_secret) | ||
|
|
||
|
|
||
| class AccumuloProxyClientFunctionGetters: | ||
|
|
||
| @staticmethod | ||
| def login(c: AccumuloProxy.Client): | ||
| return c.login | ||
|
|
||
| @staticmethod | ||
| def create_scanner(c: AccumuloProxy.Client): | ||
| return c.createScanner | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While there may have been testing done with Accumulo 2.1.1, the accumulo-proxy has not yet had a released version that works with 2.1. The sharedSecret behavior, for example, only exists in the unreleased SNAPSHOT version.
I think this notice could be more clear that the accumulo-proxy this project relies on may itself still be under development, but I'm not sure exactly how to phrase that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at the open issues against the proxy - is there anything holding up a release that works with 2.1? I'm not sure when the last release was, and I don't see any tags at all since the proxy was forked to it's own repository. Fwiw, we've been running with this apache/accumulo-proxy#81 against various accumulo 2.1.X clusters and things seem to be working as expected.