From 5cc24cfe61606071c7d1115b55eb68cfc18ab9d2 Mon Sep 17 00:00:00 2001 From: Jeongseok Kang Date: Thu, 28 Jul 2022 17:01:17 +0900 Subject: [PATCH 01/12] feat: Add etcd election APIs --- src/etcetra/client.py | 112 ++++++++++++++++++++++++++++++++++++++++-- src/etcetra/types.py | 10 ++++ 2 files changed, 119 insertions(+), 3 deletions(-) diff --git a/src/etcetra/client.py b/src/etcetra/client.py index 8fab1db..6ccb7cd 100644 --- a/src/etcetra/client.py +++ b/src/etcetra/client.py @@ -31,13 +31,15 @@ ) from grpc.aio._typing import RequestType, RequestIterableType, ResponseType, ResponseIterableType +from .grpc_api import kv_pb2 from .grpc_api import rpc_pb2, rpc_pb2_grpc +from .grpc_api import v3election_pb2, v3election_pb2_grpc from .grpc_api import v3lock_pb2, v3lock_pb2_grpc from .types import ( DeleteRangeRequestType, EtcdCredential, EtcdLockOption, HostPortPair, - PutRequestType, RangeRequestSortOrder, RangeRequestSortTarget, RangeRequestType, - TransactionRequest, TxnReturnType, TxnReturnValues, WatchCreateRequestFilterType, - WatchEvent, WatchEventType, + Leader, PutRequestType, RangeRequestSortOrder, RangeRequestSortTarget, + RangeRequestType, TransactionRequest, TxnReturnType, TxnReturnValues, + WatchCreateRequestFilterType, WatchEvent, WatchEventType, ) __all__ = ( 'EtcdClient', @@ -1033,6 +1035,110 @@ async def keys_range( ) return [x.key.decode(encoding) for x in response.kvs] + async def campaign_election(self, name: bytes, lease_id: int, value: Optional[bytes] = None) -> v3election_pb2.LeaderKey: + """ + Campaign waits to acquire leadership in an election, + returning a LeaderKey representing the leadership if successful. + The LeaderKey can then be used to issue new values on the election, + transactionally guard API requests on leadership still being held, + and resign from the election. + + Parameters + --------- + name + Name is the election’s identifier for the campaign. + lease_id + LeaseID is the ID of the lease attached to leadership of the election. + If the lease expires or is revoked before resigning leadership, + then the leadership is transferred to the next campaigner, if any. + value: + Value is the initial proclaimed value set when the campaigner wins the election. + + Returns + ------- + leader: etcetra.grpc_api.v3election_pb2.LeaderKey + Leader describes the resources used for holding leadereship of the election. + """ + stub = v3election_pb2_grpc.ElectionStub(self.channel) + response = await stub.Campaign(v3election_pb2.CampaignRequest(name=name, lease=lease_id, value=value)) + return response.leader + + async def resign_election(self, leader: v3election_pb2.LeaderKey) -> None: + """ + Resign releases election leadership so other campaigners may acquire leadership on the election. + + Parameters + --------- + leader + Leader is the leadership to relinquish by resignation. + """ + stub = v3election_pb2_grpc.ElectionStub(self.channel) + await stub.Resign(v3election_pb2.ResignRequest(leader=leader)) + + async def proclaim_election(self, leader: v3election_pb2.LeaderKey, value: bytes) -> None: + """ + Proclaim updates the leader’s posted value with a new value. + + Parameters + --------- + leader + Leader is the leadership hold on the election. + value + Value is an update meant to overwrite the leader’s current value. + """ + stub = v3election_pb2_grpc.ElectionStub(self.channel) + await stub.Proclaim(v3election_pb2.ProclaimRequest(leader=leader, value=value)) + + async def get_election(self, name: bytes) -> Leader: + """ + Returns the current election proclamation, if any. + + Parameters + --------- + name + Name is the election identifier for the leadership information. + + Returns + ------- + kv + KV is the key-value pair representing the latest leader update + """ + stub = v3election_pb2_grpc.ElectionStub(self.channel) + response = await stub.Leader(v3election_pb2.LeaderRequest(name=name)) + return Leader( + key=response.kv.key, + create_revision=response.kv.create_revision, + mod_revision=response.kv.mod_revision, + version=response.kv.version, + value=response.kv.value, + lease=response.kv.lease, + ) + + async def observe_election(self, name: bytes) -> AsyncIterator[Leader]: + """ + Observe streams election proclamations in-order as made by the election’s elected leaders. + + Parameters + --------- + name + Name is the election identifier for the leadership information. + + Returns + ------- + event: AsyncIterator[Leader] + A `Leader` object containing event information. + """ + stub = v3election_pb2_grpc.ElectionStub(self.channel) + async for response in stub.Observe(v3election_pb2.LeaderRequest(name=name)): + yield Leader( + key=response.kv.key, + create_revision=response.kv.create_revision, + mod_revision=response.kv.mod_revision, + version=response.kv.version, + value=response.kv.value, + lease=response.kv.lease, + ) + async def grant_lease(self, ttl: int, id: Optional[int] = None) -> int: """ Creates a lease which expires if the server does not receive a keepAlive diff --git a/src/etcetra/types.py b/src/etcetra/types.py index fd04284..430e481 100644 --- a/src/etcetra/types.py +++ b/src/etcetra/types.py @@ -188,3 +188,13 @@ class EtcdLockOption: lock_name: str timeout: Optional[float] ttl: Optional[int] + + +@dataclass +class Leader: + key: bytes + create_revision: int + mod_revision: int + version: int + value: bytes + lease: int From 859fb9aa9f6fbdaec5ae37e081a35b1b87834080 Mon Sep 17 00:00:00 2001 From: Jeongseok Kang Date: Thu, 28 Jul 2022 17:03:23 +0900 Subject: [PATCH 02/12] fix: Delete unnecessary import (flake8) --- src/etcetra/client.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/etcetra/client.py b/src/etcetra/client.py index 6ccb7cd..4a03dce 100644 --- a/src/etcetra/client.py +++ b/src/etcetra/client.py @@ -31,7 +31,6 @@ ) from grpc.aio._typing import RequestType, RequestIterableType, ResponseType, ResponseIterableType -from .grpc_api import kv_pb2 from .grpc_api import rpc_pb2, rpc_pb2_grpc from .grpc_api import v3election_pb2, v3election_pb2_grpc from .grpc_api import v3lock_pb2, v3lock_pb2_grpc From 49013c20fbfa9c34d53c962dcfccb44d523b5ff0 Mon Sep 17 00:00:00 2001 From: Jeongseok Kang Date: Thu, 28 Jul 2022 17:05:20 +0900 Subject: [PATCH 03/12] fix: flake8 error --- src/etcetra/client.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/etcetra/client.py b/src/etcetra/client.py index 4a03dce..c7a15db 100644 --- a/src/etcetra/client.py +++ b/src/etcetra/client.py @@ -1034,7 +1034,12 @@ async def keys_range( ) return [x.key.decode(encoding) for x in response.kvs] - async def campaign_election(self, name: bytes, lease_id: int, value: Optional[bytes] = None) -> v3election_pb2.LeaderKey: + async def campaign_election( + self, + name: bytes, + lease_id: int, + value: Optional[bytes] = None, + ) -> v3election_pb2.LeaderKey: """ Campaign waits to acquire leadership in an election, returning a LeaderKey representing the leadership if successful. @@ -1059,7 +1064,8 @@ async def campaign_election(self, name: bytes, lease_id: int, value: Optional[by Leader describes the resources used for holding leadereship of the election. """ stub = v3election_pb2_grpc.ElectionStub(self.channel) - response = await stub.Campaign(v3election_pb2.CampaignRequest(name=name, lease=lease_id, value=value)) + request = v3election_pb2.CampaignRequest(name=name, lease=lease_id, value=value) + response = await stub.Campaign(request) return response.leader async def resign_election(self, leader: v3election_pb2.LeaderKey) -> None: From 73a9b260b8a0a636d128b569798081ce82b1b16d Mon Sep 17 00:00:00 2001 From: Jeongseok Kang Date: Thu, 28 Jul 2022 17:21:16 +0900 Subject: [PATCH 04/12] docs: Add sphinx-generated document --- docs/references.md | 149 ++++++++++++++++++++++++++++++------ scripts/compile_protobuf.py | 2 +- 2 files changed, 127 insertions(+), 24 deletions(-) diff --git a/docs/references.md b/docs/references.md index 04a0f7d..8b0366d 100644 --- a/docs/references.md +++ b/docs/references.md @@ -3,7 +3,7 @@ Pure python asyncio Etcd client. -### _class_ etcetra.client.EtcdClient(addr: etcetra.types.HostPortPair, credentials: Optional[etcetra.types.EtcdCredential] = None, secure: bool = False, encoding: str = 'utf-8') +### _class_ etcetra.client.EtcdClient(addr: HostPortPair, credentials: Optional[EtcdCredential] = None, secure: bool = False, encoding: str = 'utf-8') Wrapper class of underlying actual Etcd API implementations (KV, Watch, Txn, …). In most cases, user can perform most of the jobs by creating EtcdClient object. @@ -64,10 +64,71 @@ Acquired lock will automatically released when user exits with context. -### _class_ etcetra.client.EtcdCommunicator(channel: grpc.aio._base_channel.Channel, encoding: str = 'utf-8') +### _class_ etcetra.client.EtcdCommunicator(channel: Channel, encoding: str = 'utf-8') Performs actual API calls to Etcd cluster and returns result. +#### _async_ campaign_election(name: bytes, lease_id: int, value: Optional[bytes] = None) +Campaign waits to acquire leadership in an election, +returning a LeaderKey representing the leadership if successful. +The LeaderKey can then be used to issue new values on the election, +transactionally guard API requests on leadership still being held, +and resign from the election. + + +* **Parameters** + + + * **name** – Name is the election’s identifier for the campaign. + + + * **lease_id** – LeaseID is the ID of the lease attached to leadership of the election. + If the lease expires or is revoked before resigning leadership, + then the leadership is transferred to the next campaigner, if any. + + + * **value** – Value is the initial proclaimed value set when the campaigner wins the election. + + + +* **Returns** + + **leader** – Leader describes the resources used for holding leadereship of the election. + + + +* **Return type** + + etcetra.grpc_api.v3election_pb2.LeaderKey + + + +#### create_lease_keepalive_task(id: int, interval: float) +Creates asyncio Task which sends Keepalive request to given lease ID. + + +* **Parameters** + + + * **id** – Lease ID to send Keepalive request. + + + * **interval** – Interval to send Keepalive request. + + + +* **Returns** + + **task** + + + +* **Return type** + + asyncio.Task + + + #### _async_ delete(key: str, prev_kv: bool = False, encoding: Optional[str] = None) Deletes the given key the key-value store. A delete request increments the revision of the key-value store @@ -229,7 +290,29 @@ Gets value associated with given key from the key-value store. -#### _async_ get_prefix(key: str, max_create_revision: Optional[str] = None, max_mod_revision: Optional[str] = None, min_create_revision: Optional[str] = None, min_mod_revision: Optional[str] = None, revision: Optional[str] = None, sort_order: etcetra.types.RangeRequestSortOrder = RangeRequestSortOrder.NONE, sort_target: etcetra.types.RangeRequestSortTarget = RangeRequestSortTarget.KEY, encoding: Optional[str] = None) +#### _async_ get_election(name: bytes) +Returns the current election proclamation, if any. + + +* **Parameters** + + **name** – Name is the election identifier for the leadership information. + + + +* **Returns** + + KV is the key-value pair representing the latest leader update + + + +* **Return type** + + kv + + + +#### _async_ get_prefix(key: str, max_create_revision: Optional[str] = None, max_mod_revision: Optional[str] = None, min_create_revision: Optional[str] = None, min_mod_revision: Optional[str] = None, revision: Optional[str] = None, sort_order: RangeRequestSortOrder = RangeRequestSortOrder.NONE, sort_target: RangeRequestSortTarget = RangeRequestSortTarget.KEY, encoding: Optional[str] = None) Gets the key-value in dictionary from the key-value store with given key prefix. i.e. get_prefix(‘/sorna/local’) call looks up all keys which has /sorna/local prefix. @@ -286,7 +369,7 @@ i.e. get_prefix(‘/sorna/local’) call looks up all keys which has /sorna/loca -#### _async_ get_range(key: str, range_end: str, limit: Optional[str] = None, max_create_revision: Optional[str] = None, max_mod_revision: Optional[str] = None, min_create_revision: Optional[str] = None, min_mod_revision: Optional[str] = None, revision: Optional[str] = None, sort_order: etcetra.types.RangeRequestSortOrder = RangeRequestSortOrder.NONE, sort_target: etcetra.types.RangeRequestSortTarget = RangeRequestSortTarget.KEY, encoding: Optional[str] = None) +#### _async_ get_range(key: str, range_end: str, limit: Optional[str] = None, max_create_revision: Optional[str] = None, max_mod_revision: Optional[str] = None, min_create_revision: Optional[str] = None, min_mod_revision: Optional[str] = None, revision: Optional[str] = None, sort_order: RangeRequestSortOrder = RangeRequestSortOrder.NONE, sort_target: RangeRequestSortTarget = RangeRequestSortTarget.KEY, encoding: Optional[str] = None) Gets the key-value in dictionary from the key-value store with keys in [key, range_end) range. @@ -374,7 +457,7 @@ Each expired key generates a delete event in the event history. -#### _async_ keys_prefix(key: str, max_create_revision: Optional[str] = None, max_mod_revision: Optional[str] = None, min_create_revision: Optional[str] = None, min_mod_revision: Optional[str] = None, revision: Optional[str] = None, sort_order: etcetra.types.RangeRequestSortOrder = RangeRequestSortOrder.NONE, sort_target: etcetra.types.RangeRequestSortTarget = RangeRequestSortTarget.KEY, encoding: Optional[str] = None) +#### _async_ keys_prefix(key: str, max_create_revision: Optional[str] = None, max_mod_revision: Optional[str] = None, min_create_revision: Optional[str] = None, min_mod_revision: Optional[str] = None, revision: Optional[str] = None, sort_order: RangeRequestSortOrder = RangeRequestSortOrder.NONE, sort_target: RangeRequestSortTarget = RangeRequestSortTarget.KEY, encoding: Optional[str] = None) Gets the keys which has given prefix from the key-value store. @@ -430,7 +513,7 @@ Gets the keys which has given prefix from the key-value store. -#### _async_ keys_range(key: str, range_end: str, limit: Optional[str] = None, max_create_revision: Optional[str] = None, max_mod_revision: Optional[str] = None, min_create_revision: Optional[str] = None, min_mod_revision: Optional[str] = None, revision: Optional[str] = None, serializable: bool = True, sort_order: etcetra.types.RangeRequestSortOrder = RangeRequestSortOrder.NONE, sort_target: etcetra.types.RangeRequestSortTarget = RangeRequestSortTarget.KEY, encoding: Optional[str] = None) +#### _async_ keys_range(key: str, range_end: str, limit: Optional[str] = None, max_create_revision: Optional[str] = None, max_mod_revision: Optional[str] = None, min_create_revision: Optional[str] = None, min_mod_revision: Optional[str] = None, revision: Optional[str] = None, serializable: bool = True, sort_order: RangeRequestSortOrder = RangeRequestSortOrder.NONE, sort_target: RangeRequestSortTarget = RangeRequestSortTarget.KEY, encoding: Optional[str] = None) Gets the keys in the range from the key-value store. @@ -489,29 +572,39 @@ Gets the keys in the range from the key-value store. -#### _async_ lease_keepalive(id: int, interval: float) -Creates asyncio Task which sends Keepalive request to given lease ID. +#### _async_ observe_election(name: bytes) +Observe streams election proclamations in-order as made by the election’s elected leaders. * **Parameters** - - * **id** – Lease ID to send Keepalive request. - - - * **interval** – Interval to send Keepalive request. + **name** – Name is the election identifier for the leadership information. * **Returns** - **task** + **event** – A Leader object containing event information. * **Return type** - asyncio.Task + AsyncIterator[Leader] + + + +#### _async_ proclaim_election(leader: LeaderKey, value: bytes) +Proclaim updates the leader’s posted value with a new value. + + +* **Parameters** + + + * **leader** – Leader is the leadership hold on the election. + + + * **value** – Value is an update meant to overwrite the leader’s current value. @@ -556,6 +649,16 @@ Puts given key into the key-value store. +#### _async_ resign_election(leader: LeaderKey) +Resign releases election leadership so other campaigners may acquire leadership on the election. + + +* **Parameters** + + **leader** – Leader is the leadership to relinquish by resignation. + + + #### _async_ revoke_lease(id: int) Revokes a lease. All keys attached to the lease will expire and be deleted. @@ -566,7 +669,7 @@ Revokes a lease. All keys attached to the lease will expire and be deleted. -#### _async_ txn(txn_builder: Callable[[etcetra.client.EtcdTransactionAction], None], encoding: Optional[str] = None) +#### _async_ txn(txn_builder: Callable[[EtcdTransactionAction], None], encoding: Optional[str] = None) A shorthand helper for Txn, with no compare arguments. This can be helpful when user just wants to execute transaction without any conditions. @@ -608,7 +711,7 @@ any conditions. -#### _async_ txn_compare(compares: List[rpc_pb2.Compare], txn_builder: Callable[[etcetra.client.EtcdTransactionAction, etcetra.client.EtcdTransactionAction], None], encoding: Optional[str] = None) +#### _async_ txn_compare(compares: List[Compare], txn_builder: Callable[[EtcdTransactionAction, EtcdTransactionAction], None], encoding: Optional[str] = None) Processes multiple requests in a single transaction. A txn request increments the revision of the key-value store and generates events with the same revision for every completed request. @@ -667,7 +770,7 @@ It is not allowed to modify the same key several times within one txn. -#### watch(key: str, ready_event: Optional[asyncio.locks.Event] = None, filters: Optional[List[etcetra.types.WatchCreateRequestFilterType]] = None, prev_kv: bool = False, progress_notify: bool = False, start_revision: Optional[int] = None, watch_id: Optional[int] = None, encoding: Optional[str] = None) +#### watch(key: str, ready_event: Optional[Event] = None, filters: Optional[List[WatchCreateRequestFilterType]] = None, prev_kv: bool = False, progress_notify: bool = False, start_revision: Optional[int] = None, watch_id: Optional[int] = None, encoding: Optional[str] = None) Async iterator which watches for events happening or that have happened. Both input and output are streams; the input stream is for creating and canceling watchers and the output stream sends events. @@ -729,7 +832,7 @@ The entire event history can be watched starting from the last compaction revisi -#### watch_prefix(key: str, ready_event: Optional[asyncio.locks.Event] = None, filters: Optional[List[etcetra.types.WatchCreateRequestFilterType]] = None, prev_kv: bool = False, progress_notify: bool = True, start_revision: Optional[int] = None, watch_id: Optional[int] = None, encoding: Optional[str] = None) +#### watch_prefix(key: str, ready_event: Optional[Event] = None, filters: Optional[List[WatchCreateRequestFilterType]] = None, prev_kv: bool = False, progress_notify: bool = True, start_revision: Optional[int] = None, watch_id: Optional[int] = None, encoding: Optional[str] = None) Watches for events happening or that have happened along keys with given prefix. Both input and output are streams; the input stream is for creating and canceling watchers and the output stream sends events. @@ -801,7 +904,7 @@ A delete request increments the revision of the key-value store and generates a delete event in the event history for every deleted key. -#### get(key: str, limit: Optional[str] = None, max_create_revision: Optional[str] = None, max_mod_revision: Optional[str] = None, min_create_revision: Optional[str] = None, min_mod_revision: Optional[str] = None, revision: Optional[str] = None, serializable: bool = True, sort_order: etcetra.types.RangeRequestSortOrder = RangeRequestSortOrder.NONE, sort_target: etcetra.types.RangeRequestSortTarget = RangeRequestSortTarget.KEY, encoding: Optional[str] = None) +#### get(key: str, limit: Optional[str] = None, max_create_revision: Optional[str] = None, max_mod_revision: Optional[str] = None, min_create_revision: Optional[str] = None, min_mod_revision: Optional[str] = None, revision: Optional[str] = None, serializable: bool = True, sort_order: RangeRequestSortOrder = RangeRequestSortOrder.NONE, sort_target: RangeRequestSortTarget = RangeRequestSortTarget.KEY, encoding: Optional[str] = None) Gets the keys in the range from the key-value store. @@ -822,7 +925,7 @@ An enumeration. ### _class_ etcetra.types.CompareKey(key: 'str', target_lease: 'Optional[int]' = None, mod_revision: 'Optional[int]' = None, range_end: 'Optional[str]' = None, target_version: 'Optional[int]' = None, encoding: 'str' = 'utf-8') ### etcetra.types.DeleteRangeRequestType() -alias of `rpc_pb2.DeleteRangeRequest` +alias of `DeleteRangeRequest` ### _class_ etcetra.types.EtcdCredential(username: 'str', password: 'str') @@ -832,7 +935,7 @@ alias of `rpc_pb2.DeleteRangeRequest` ### _class_ etcetra.types.HostPortPair(host: 'str', port: 'int') ### etcetra.types.PutRequestType() -alias of `rpc_pb2.PutRequest` +alias of `PutRequest` ### _class_ etcetra.types.RangeRequestSortOrder(value) @@ -844,7 +947,7 @@ An enumeration. ### etcetra.types.RangeRequestType() -alias of `rpc_pb2.RangeRequest` +alias of `RangeRequest` ### _class_ etcetra.types.TxnReturnType(values, success) diff --git a/scripts/compile_protobuf.py b/scripts/compile_protobuf.py index e8d6860..a10c657 100755 --- a/scripts/compile_protobuf.py +++ b/scripts/compile_protobuf.py @@ -164,7 +164,7 @@ def main(version: str, repo_path: Optional[Path] = None): parser.add_argument('version', type=str, help='target etcd version') parser.add_argument( '--repository-path', type=str, - help='git repository folder path of etcd source code to use. Ff not supplied, ' + help='git repository folder path of etcd source code to use. If not supplied, ' 'this script will clone fresh repo on temporary directory and remove it upon exit.') args = parser.parse_args() From f4782c18692753a2655deadc5d5b8a78fef567e9 Mon Sep 17 00:00:00 2001 From: Jeongseok Kang Date: Fri, 29 Jul 2022 14:22:01 +0900 Subject: [PATCH 05/12] feat: Add testcase for election service methods --- src/etcetra/client.py | 41 ++++++++++-------------- src/etcetra/types.py | 14 ++------ tests/conftest.py | 7 ++++ tests/test_election.py | 72 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 98 insertions(+), 36 deletions(-) create mode 100644 tests/test_election.py diff --git a/src/etcetra/client.py b/src/etcetra/client.py index c7a15db..734e071 100644 --- a/src/etcetra/client.py +++ b/src/etcetra/client.py @@ -36,7 +36,7 @@ from .grpc_api import v3lock_pb2, v3lock_pb2_grpc from .types import ( DeleteRangeRequestType, EtcdCredential, EtcdLockOption, HostPortPair, - Leader, PutRequestType, RangeRequestSortOrder, RangeRequestSortTarget, + KeyValue, LeaderKey, PutRequestType, RangeRequestSortOrder, RangeRequestSortTarget, RangeRequestType, TransactionRequest, TxnReturnType, TxnReturnValues, WatchCreateRequestFilterType, WatchEvent, WatchEventType, ) @@ -1034,12 +1034,12 @@ async def keys_range( ) return [x.key.decode(encoding) for x in response.kvs] - async def campaign_election( + async def election_campaign( self, name: bytes, lease_id: int, value: Optional[bytes] = None, - ) -> v3election_pb2.LeaderKey: + ) -> LeaderKey: """ Campaign waits to acquire leadership in an election, returning a LeaderKey representing the leadership if successful. @@ -1060,7 +1060,7 @@ async def campaign_election( Returns ------- - leader: etcetra.grpc_api.v3election_pb2.LeaderKey + leader: etcetra.types.LeaderKey Leader describes the resources used for holding leadereship of the election. """ stub = v3election_pb2_grpc.ElectionStub(self.channel) @@ -1068,7 +1068,7 @@ async def campaign_election( response = await stub.Campaign(request) return response.leader - async def resign_election(self, leader: v3election_pb2.LeaderKey) -> None: + async def election_resign(self, leader: LeaderKey) -> None: """ Resign releases election leadership so other campaigners may acquire leadership on the election. @@ -1080,7 +1080,7 @@ async def resign_election(self, leader: v3election_pb2.LeaderKey) -> None: stub = v3election_pb2_grpc.ElectionStub(self.channel) await stub.Resign(v3election_pb2.ResignRequest(leader=leader)) - async def proclaim_election(self, leader: v3election_pb2.LeaderKey, value: bytes) -> None: + async def election_proclaim(self, leader: LeaderKey, value: bytes) -> None: """ Proclaim updates the leader’s posted value with a new value. @@ -1094,7 +1094,7 @@ async def proclaim_election(self, leader: v3election_pb2.LeaderKey, value: bytes stub = v3election_pb2_grpc.ElectionStub(self.channel) await stub.Proclaim(v3election_pb2.ProclaimRequest(leader=leader, value=value)) - async def get_election(self, name: bytes) -> Leader: + async def election_leader(self, name: bytes) -> LeaderKey: """ Returns the current election proclamation, if any. @@ -1105,21 +1105,19 @@ async def get_election(self, name: bytes) -> Leader: Returns ------- - kv - KV is the key-value pair representing the latest leader update + leader_key + LeaderKey is the key-value pair representing the latest leader update """ stub = v3election_pb2_grpc.ElectionStub(self.channel) response = await stub.Leader(v3election_pb2.LeaderRequest(name=name)) - return Leader( + return LeaderKey( + name=name, key=response.kv.key, - create_revision=response.kv.create_revision, - mod_revision=response.kv.mod_revision, - version=response.kv.version, - value=response.kv.value, + rev=response.kv.mod_revision, lease=response.kv.lease, ) - async def observe_election(self, name: bytes) -> AsyncIterator[Leader]: + async def election_observe(self, name: bytes) -> AsyncIterator[KeyValue]: """ Observe streams election proclamations in-order as made by the election’s elected leaders. @@ -1130,19 +1128,12 @@ async def observe_election(self, name: bytes) -> AsyncIterator[Leader]: Returns ------- - event: AsyncIterator[Leader] - A `Leader` object containing event information. + event: AsyncIterator[KeyValue] + A `KeyValue` object containing event information. """ stub = v3election_pb2_grpc.ElectionStub(self.channel) async for response in stub.Observe(v3election_pb2.LeaderRequest(name=name)): - yield Leader( - key=response.kv.key, - create_revision=response.kv.create_revision, - mod_revision=response.kv.mod_revision, - version=response.kv.version, - value=response.kv.value, - lease=response.kv.lease, - ) + yield response.kv async def grant_lease(self, ttl: int, id: Optional[int] = None) -> int: """ diff --git a/src/etcetra/types.py b/src/etcetra/types.py index 430e481..b2da531 100644 --- a/src/etcetra/types.py +++ b/src/etcetra/types.py @@ -6,7 +6,7 @@ if TYPE_CHECKING: from typing_extensions import TypeAlias -from etcetra.grpc_api import rpc_pb2 +from etcetra.grpc_api import kv_pb2, rpc_pb2, v3election_pb2 __all__ = ( 'RangeRequestSortOrder', @@ -70,6 +70,8 @@ class WatchEventType(enum.Enum): DELETE = 1 +KeyValue = kv_pb2.KeyValue +LeaderKey = v3election_pb2.LeaderKey PutRequestType = rpc_pb2.PutRequest RangeRequestType = rpc_pb2.RangeRequest DeleteRangeRequestType = rpc_pb2.DeleteRangeRequest @@ -188,13 +190,3 @@ class EtcdLockOption: lock_name: str timeout: Optional[float] ttl: Optional[int] - - -@dataclass -class Leader: - key: bytes - create_revision: int - mod_revision: int - version: int - value: bytes - lease: int diff --git a/tests/conftest.py b/tests/conftest.py index f7906ef..8b42f66 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,6 @@ import os +import uuid + import pytest from etcetra import EtcdClient, HostPortPair @@ -21,3 +23,8 @@ async def etcd(etcd_addr): async with etcd.connect() as communicator: await communicator.delete_prefix('/test') del etcd + + +@pytest.fixture(scope="module") +def election_id() -> bytes: + return f"test/{uuid.uuid4()}".encode("utf-8") diff --git a/tests/test_election.py b/tests/test_election.py new file mode 100644 index 0000000..7dc5167 --- /dev/null +++ b/tests/test_election.py @@ -0,0 +1,72 @@ +import asyncio +import uuid +from multiprocessing import Queue +from typing import Optional + +import pytest + +from etcetra import EtcdClient +from etcetra.types import LeaderKey + + +@pytest.mark.asyncio +async def test_election_service(etcd: EtcdClient, election_id: bytes): + + async def _campaign_task(value: bytes) -> LeaderKey: + async with etcd.connect() as communicator: + lease_id = await communicator.grant_lease(ttl=60 * 60) + return await communicator.election_campaign( + name=election_id, + lease_id=lease_id, + value=value, + ) + + async def _resign_task(leader: LeaderKey) -> None: + async with etcd.connect() as communicator: + await communicator.election_resign(leader) + + async def _leader_task() -> LeaderKey: + async with etcd.connect() as communicator: + return await communicator.election_leader(name=election_id) + + async def _proclaim_task(leader: LeaderKey, value: bytes) -> None: + async with etcd.connect() as communicator: + await communicator.election_proclaim(leader, value) + + async def _observe_task(election_id: bytes, queue: Optional[Queue] = None) -> None: + async with etcd.connect() as communicator: + async for kv in communicator.election_observe(name=election_id): + if queue: + queue.put(kv.value) + + # Campaign + random_value = str(uuid.uuid4()).encode("utf-8") + leader_key = await _campaign_task(value=random_value) + + # Leader + current_leader_key = await _leader_task() + assert current_leader_key.lease == leader_key.lease + + queue = Queue() + observe_task = asyncio.create_task(_observe_task(election_id, queue=queue)) + await asyncio.sleep(3.0) + + # Proclaim + next_random_value = str(uuid.uuid4()).encode("utf-8") + await _proclaim_task(leader=leader_key, value=next_random_value) + + # Observe + initial_value = queue.get() + assert initial_value == random_value + proclaimed_value = queue.get() + assert proclaimed_value == next_random_value + + observe_task.cancel() + + # Resign + new_random_value = str(uuid.uuid4()).encode("utf-8") + new_campaign_task = _campaign_task(value=new_random_value) + await _resign_task(leader=leader_key) + new_leader_key = await asyncio.wait_for(new_campaign_task, timeout=None) + current_leader_key = await _leader_task() + assert current_leader_key.lease == new_leader_key.lease From 0f3086d7f5713e95f11372dcaec9382e67d15079 Mon Sep 17 00:00:00 2001 From: Jeongseok Kang Date: Fri, 29 Jul 2022 14:22:12 +0900 Subject: [PATCH 06/12] docs: Add sphinx-generated document --- docs/references.md | 206 ++++++++++++++++++++++----------------------- 1 file changed, 103 insertions(+), 103 deletions(-) diff --git a/docs/references.md b/docs/references.md index 8b0366d..0153bf2 100644 --- a/docs/references.md +++ b/docs/references.md @@ -68,41 +68,6 @@ Acquired lock will automatically released when user exits with context. Performs actual API calls to Etcd cluster and returns result. -#### _async_ campaign_election(name: bytes, lease_id: int, value: Optional[bytes] = None) -Campaign waits to acquire leadership in an election, -returning a LeaderKey representing the leadership if successful. -The LeaderKey can then be used to issue new values on the election, -transactionally guard API requests on leadership still being held, -and resign from the election. - - -* **Parameters** - - - * **name** – Name is the election’s identifier for the campaign. - - - * **lease_id** – LeaseID is the ID of the lease attached to leadership of the election. - If the lease expires or is revoked before resigning leadership, - then the leadership is transferred to the next campaigner, if any. - - - * **value** – Value is the initial proclaimed value set when the campaigner wins the election. - - - -* **Returns** - - **leader** – Leader describes the resources used for holding leadereship of the election. - - - -* **Return type** - - etcetra.grpc_api.v3election_pb2.LeaderKey - - - #### create_lease_keepalive_task(id: int, interval: float) Creates asyncio Task which sends Keepalive request to given lease ID. @@ -240,6 +205,109 @@ and generates a delete event in the event history for every deleted key. +#### _async_ election_campaign(name: bytes, lease_id: int, value: Optional[bytes] = None) +Campaign waits to acquire leadership in an election, +returning a LeaderKey representing the leadership if successful. +The LeaderKey can then be used to issue new values on the election, +transactionally guard API requests on leadership still being held, +and resign from the election. + + +* **Parameters** + + + * **name** – Name is the election’s identifier for the campaign. + + + * **lease_id** – LeaseID is the ID of the lease attached to leadership of the election. + If the lease expires or is revoked before resigning leadership, + then the leadership is transferred to the next campaigner, if any. + + + * **value** – Value is the initial proclaimed value set when the campaigner wins the election. + + + +* **Returns** + + **leader** – Leader describes the resources used for holding leadereship of the election. + + + +* **Return type** + + etcetra.types.LeaderKey + + + +#### _async_ election_leader(name: bytes) +Returns the current election proclamation, if any. + + +* **Parameters** + + **name** – Name is the election identifier for the leadership information. + + + +* **Returns** + + LeaderKey is the key-value pair representing the latest leader update + + + +* **Return type** + + leader_key + + + +#### _async_ election_observe(name: bytes) +Observe streams election proclamations in-order as made by the election’s elected leaders. + + +* **Parameters** + + **name** – Name is the election identifier for the leadership information. + + + +* **Returns** + + **event** – A KeyValue object containing event information. + + + +* **Return type** + + AsyncIterator[KeyValue] + + + +#### _async_ election_proclaim(leader: LeaderKey, value: bytes) +Proclaim updates the leader’s posted value with a new value. + + +* **Parameters** + + + * **leader** – Leader is the leadership hold on the election. + + + * **value** – Value is an update meant to overwrite the leader’s current value. + + + +#### _async_ election_resign(leader: LeaderKey) +Resign releases election leadership so other campaigners may acquire leadership on the election. + + +* **Parameters** + + **leader** – Leader is the leadership to relinquish by resignation. + + + #### _async_ get(key: str, max_create_revision: Optional[str] = None, max_mod_revision: Optional[str] = None, min_create_revision: Optional[str] = None, min_mod_revision: Optional[str] = None, revision: Optional[str] = None, encoding: Optional[str] = None) Gets value associated with given key from the key-value store. @@ -290,28 +358,6 @@ Gets value associated with given key from the key-value store. -#### _async_ get_election(name: bytes) -Returns the current election proclamation, if any. - - -* **Parameters** - - **name** – Name is the election identifier for the leadership information. - - - -* **Returns** - - KV is the key-value pair representing the latest leader update - - - -* **Return type** - - kv - - - #### _async_ get_prefix(key: str, max_create_revision: Optional[str] = None, max_mod_revision: Optional[str] = None, min_create_revision: Optional[str] = None, min_mod_revision: Optional[str] = None, revision: Optional[str] = None, sort_order: RangeRequestSortOrder = RangeRequestSortOrder.NONE, sort_target: RangeRequestSortTarget = RangeRequestSortTarget.KEY, encoding: Optional[str] = None) Gets the key-value in dictionary from the key-value store with given key prefix. i.e. get_prefix(‘/sorna/local’) call looks up all keys which has /sorna/local prefix. @@ -572,42 +618,6 @@ Gets the keys in the range from the key-value store. -#### _async_ observe_election(name: bytes) -Observe streams election proclamations in-order as made by the election’s elected leaders. - - -* **Parameters** - - **name** – Name is the election identifier for the leadership information. - - - -* **Returns** - - **event** – A Leader object containing event information. - - - -* **Return type** - - AsyncIterator[Leader] - - - -#### _async_ proclaim_election(leader: LeaderKey, value: bytes) -Proclaim updates the leader’s posted value with a new value. - - -* **Parameters** - - - * **leader** – Leader is the leadership hold on the election. - - - * **value** – Value is an update meant to overwrite the leader’s current value. - - - #### _async_ put(key: str, value: Optional[str], lease: Optional[int] = None, prev_kv: bool = False, encoding: Optional[str] = None) Puts given key into the key-value store. @@ -649,16 +659,6 @@ Puts given key into the key-value store. -#### _async_ resign_election(leader: LeaderKey) -Resign releases election leadership so other campaigners may acquire leadership on the election. - - -* **Parameters** - - **leader** – Leader is the leadership to relinquish by resignation. - - - #### _async_ revoke_lease(id: int) Revokes a lease. All keys attached to the lease will expire and be deleted. From 6eb9b006777bf57e3dcd6dba1c4c38faef7b607f Mon Sep 17 00:00:00 2001 From: Jeongseok Kang Date: Fri, 29 Jul 2022 14:26:19 +0900 Subject: [PATCH 07/12] fix: mypy error --- tests/test_election.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_election.py b/tests/test_election.py index 7dc5167..1dfb33b 100644 --- a/tests/test_election.py +++ b/tests/test_election.py @@ -1,6 +1,6 @@ import asyncio import uuid -from multiprocessing import Queue +from multiprocessing import Queue, queues from typing import Optional import pytest @@ -33,7 +33,7 @@ async def _proclaim_task(leader: LeaderKey, value: bytes) -> None: async with etcd.connect() as communicator: await communicator.election_proclaim(leader, value) - async def _observe_task(election_id: bytes, queue: Optional[Queue] = None) -> None: + async def _observe_task(election_id: bytes, queue: Optional[queues.Queue] = None) -> None: async with etcd.connect() as communicator: async for kv in communicator.election_observe(name=election_id): if queue: @@ -47,7 +47,7 @@ async def _observe_task(election_id: bytes, queue: Optional[Queue] = None) -> No current_leader_key = await _leader_task() assert current_leader_key.lease == leader_key.lease - queue = Queue() + queue: queues.Queue = Queue() observe_task = asyncio.create_task(_observe_task(election_id, queue=queue)) await asyncio.sleep(3.0) From 7d290a4fff37cf5267670416043a4e41b55e4f28 Mon Sep 17 00:00:00 2001 From: Jeongseok Kang Date: Fri, 29 Jul 2022 17:14:04 +0900 Subject: [PATCH 08/12] fix: Encapsulate encode/decode process with optional `encoding` argument --- src/etcetra/client.py | 48 ++++++++++++++++++++++++++-------- src/etcetra/types.py | 58 ++++++++++++++++++++++++++++++++++++++++-- tests/conftest.py | 4 +-- tests/test_election.py | 22 ++++++++-------- 4 files changed, 107 insertions(+), 25 deletions(-) diff --git a/src/etcetra/client.py b/src/etcetra/client.py index 734e071..7e36350 100644 --- a/src/etcetra/client.py +++ b/src/etcetra/client.py @@ -1036,9 +1036,10 @@ async def keys_range( async def election_campaign( self, - name: bytes, + name: str, lease_id: int, - value: Optional[bytes] = None, + value: Optional[str] = None, + encoding: Optional[str] = None, ) -> LeaderKey: """ Campaign waits to acquire leadership in an election, @@ -1063,12 +1064,17 @@ async def election_campaign( leader: etcetra.types.LeaderKey Leader describes the resources used for holding leadereship of the election. """ + encoding = encoding or self.encoding + name = name.encode(encoding) + if value: + value = value.encode(encoding) + stub = v3election_pb2_grpc.ElectionStub(self.channel) request = v3election_pb2.CampaignRequest(name=name, lease=lease_id, value=value) response = await stub.Campaign(request) - return response.leader + return LeaderKey.parse(response.leader, encoding=encoding) - async def election_resign(self, leader: LeaderKey) -> None: + async def election_resign(self, leader: LeaderKey, encoding: Optional[str] = None) -> None: """ Resign releases election leadership so other campaigners may acquire leadership on the election. @@ -1077,10 +1083,18 @@ async def election_resign(self, leader: LeaderKey) -> None: leader Leader is the leadership to relinquish by resignation. """ + encoding = encoding or self.encoding + leader = leader.proto(encoding=encoding) + stub = v3election_pb2_grpc.ElectionStub(self.channel) await stub.Resign(v3election_pb2.ResignRequest(leader=leader)) - async def election_proclaim(self, leader: LeaderKey, value: bytes) -> None: + async def election_proclaim( + self, + leader: LeaderKey, + value: str, + encoding: Optional[str] = None, + ) -> None: """ Proclaim updates the leader’s posted value with a new value. @@ -1091,10 +1105,14 @@ async def election_proclaim(self, leader: LeaderKey, value: bytes) -> None: value Value is an update meant to overwrite the leader’s current value. """ + encoding = encoding or self.encoding + leader = leader.proto(encoding=encoding) + value = value.encode(encoding) + stub = v3election_pb2_grpc.ElectionStub(self.channel) await stub.Proclaim(v3election_pb2.ProclaimRequest(leader=leader, value=value)) - async def election_leader(self, name: bytes) -> LeaderKey: + async def election_leader(self, name: str, encoding: Optional[str] = None) -> LeaderKey: """ Returns the current election proclamation, if any. @@ -1108,16 +1126,23 @@ async def election_leader(self, name: bytes) -> LeaderKey: leader_key LeaderKey is the key-value pair representing the latest leader update """ + encoding = encoding or self.encoding + name = name.encode(encoding) + stub = v3election_pb2_grpc.ElectionStub(self.channel) response = await stub.Leader(v3election_pb2.LeaderRequest(name=name)) return LeaderKey( - name=name, - key=response.kv.key, + name=name.decode(encoding), + key=response.kv.key.decode(encoding), rev=response.kv.mod_revision, lease=response.kv.lease, ) - async def election_observe(self, name: bytes) -> AsyncIterator[KeyValue]: + async def election_observe( + self, + name: str, + encoding: Optional[str] = None, + ) -> AsyncIterator[KeyValue]: """ Observe streams election proclamations in-order as made by the election’s elected leaders. @@ -1131,9 +1156,12 @@ async def election_observe(self, name: bytes) -> AsyncIterator[KeyValue]: event: AsyncIterator[KeyValue] A `KeyValue` object containing event information. """ + encoding = encoding or self.encoding + name = name.encode(encoding) + stub = v3election_pb2_grpc.ElectionStub(self.channel) async for response in stub.Observe(v3election_pb2.LeaderRequest(name=name)): - yield response.kv + yield KeyValue.parse(response.kv, encoding=self.encoding) async def grant_lease(self, ttl: int, id: Optional[int] = None) -> int: """ diff --git a/src/etcetra/types.py b/src/etcetra/types.py index b2da531..5f0c50c 100644 --- a/src/etcetra/types.py +++ b/src/etcetra/types.py @@ -70,8 +70,6 @@ class WatchEventType(enum.Enum): DELETE = 1 -KeyValue = kv_pb2.KeyValue -LeaderKey = v3election_pb2.LeaderKey PutRequestType = rpc_pb2.PutRequest RangeRequestType = rpc_pb2.RangeRequest DeleteRangeRequestType = rpc_pb2.DeleteRangeRequest @@ -190,3 +188,59 @@ class EtcdLockOption: lock_name: str timeout: Optional[float] ttl: Optional[int] + + +@dataclass +class LeaderKey: + name: str + key: str + rev: int + lease: int + + @classmethod + def parse(cls, key: v3election_pb2.LeaderKey, encoding: str = "utf-8") -> "LeaderKey": + return cls( + name=key.name.decode(encoding), + key=key.key.decode(encoding), + rev=key.rev, + lease=key.lease, + ) + + def proto(self, encoding: str = "utf-8") -> v3election_pb2.LeaderKey: + return v3election_pb2.LeaderKey( + name=self.name.encode(encoding), + key=self.key.encode(encoding), + rev=self.rev, + lease=self.lease, + ) + + +@dataclass +class KeyValue: + key: str + create_revision: int + mod_revision: int + version: int + value: str + lease: int + + @classmethod + def parse(cls, kv: kv_pb2.KeyValue, encoding: str = "utf-8") -> "KeyValue": + return cls( + key=kv.key.decode(encoding), + create_revision=kv.create_revision, + mod_revision=kv.mod_revision, + version=kv.version, + value=kv.value.decode(encoding), + lease=kv.lease, + ) + + def proto(self, encoding: str = "utf-8") -> kv_pb2.KeyValue: + return kv_pb2.KeyValue( + key=self.key.encode(encoding), + create_revision=self.create_revision, + mod_revision=self.mod_revision, + version=self.version, + value=self.value.encode(encoding), + lease=self.lease, + ) diff --git a/tests/conftest.py b/tests/conftest.py index 8b42f66..cf3635a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -26,5 +26,5 @@ async def etcd(etcd_addr): @pytest.fixture(scope="module") -def election_id() -> bytes: - return f"test/{uuid.uuid4()}".encode("utf-8") +def election_id() -> str: + return f"test/{uuid.uuid4()}" diff --git a/tests/test_election.py b/tests/test_election.py index 1dfb33b..e2148a8 100644 --- a/tests/test_election.py +++ b/tests/test_election.py @@ -10,9 +10,9 @@ @pytest.mark.asyncio -async def test_election_service(etcd: EtcdClient, election_id: bytes): +async def test_election_service(etcd: EtcdClient, election_id: str): - async def _campaign_task(value: bytes) -> LeaderKey: + async def _campaign_task(value: str) -> LeaderKey: async with etcd.connect() as communicator: lease_id = await communicator.grant_lease(ttl=60 * 60) return await communicator.election_campaign( @@ -29,42 +29,42 @@ async def _leader_task() -> LeaderKey: async with etcd.connect() as communicator: return await communicator.election_leader(name=election_id) - async def _proclaim_task(leader: LeaderKey, value: bytes) -> None: + async def _proclaim_task(leader: LeaderKey, value: str) -> None: async with etcd.connect() as communicator: await communicator.election_proclaim(leader, value) - async def _observe_task(election_id: bytes, queue: Optional[queues.Queue] = None) -> None: + async def _observe_task(election_id: str, queue: Optional[queues.Queue] = None) -> None: async with etcd.connect() as communicator: async for kv in communicator.election_observe(name=election_id): if queue: queue.put(kv.value) # Campaign - random_value = str(uuid.uuid4()).encode("utf-8") + random_value = str(uuid.uuid4()) leader_key = await _campaign_task(value=random_value) # Leader current_leader_key = await _leader_task() assert current_leader_key.lease == leader_key.lease - queue: queues.Queue = Queue() - observe_task = asyncio.create_task(_observe_task(election_id, queue=queue)) + election_event_queue: queues.Queue = Queue() + observe_task = asyncio.create_task(_observe_task(election_id, queue=election_event_queue)) await asyncio.sleep(3.0) # Proclaim - next_random_value = str(uuid.uuid4()).encode("utf-8") + next_random_value = str(uuid.uuid4()) await _proclaim_task(leader=leader_key, value=next_random_value) # Observe - initial_value = queue.get() + initial_value = election_event_queue.get() assert initial_value == random_value - proclaimed_value = queue.get() + proclaimed_value = election_event_queue.get() assert proclaimed_value == next_random_value observe_task.cancel() # Resign - new_random_value = str(uuid.uuid4()).encode("utf-8") + new_random_value = str(uuid.uuid4()) new_campaign_task = _campaign_task(value=new_random_value) await _resign_task(leader=leader_key) new_leader_key = await asyncio.wait_for(new_campaign_task, timeout=None) From 204f67d7466361d4fdf5b1dee7339c9ff91e7829 Mon Sep 17 00:00:00 2001 From: Jeongseok Kang Date: Fri, 29 Jul 2022 17:24:59 +0900 Subject: [PATCH 09/12] fix: mypy error --- src/etcetra/client.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/etcetra/client.py b/src/etcetra/client.py index 7e36350..508ba0e 100644 --- a/src/etcetra/client.py +++ b/src/etcetra/client.py @@ -1065,12 +1065,12 @@ async def election_campaign( Leader describes the resources used for holding leadereship of the election. """ encoding = encoding or self.encoding - name = name.encode(encoding) - if value: - value = value.encode(encoding) + name_bytes = name.encode(encoding) + value_bytes: Optional[bytes] = value.encode(encoding) if value is not None \ + else None stub = v3election_pb2_grpc.ElectionStub(self.channel) - request = v3election_pb2.CampaignRequest(name=name, lease=lease_id, value=value) + request = v3election_pb2.CampaignRequest(name=name_bytes, lease=lease_id, value=value_bytes) response = await stub.Campaign(request) return LeaderKey.parse(response.leader, encoding=encoding) @@ -1107,10 +1107,10 @@ async def election_proclaim( """ encoding = encoding or self.encoding leader = leader.proto(encoding=encoding) - value = value.encode(encoding) + value_bytes = value.encode(encoding) stub = v3election_pb2_grpc.ElectionStub(self.channel) - await stub.Proclaim(v3election_pb2.ProclaimRequest(leader=leader, value=value)) + await stub.Proclaim(v3election_pb2.ProclaimRequest(leader=leader, value=value_bytes)) async def election_leader(self, name: str, encoding: Optional[str] = None) -> LeaderKey: """ @@ -1127,12 +1127,12 @@ async def election_leader(self, name: str, encoding: Optional[str] = None) -> Le LeaderKey is the key-value pair representing the latest leader update """ encoding = encoding or self.encoding - name = name.encode(encoding) + name_bytes = name.encode(encoding) stub = v3election_pb2_grpc.ElectionStub(self.channel) - response = await stub.Leader(v3election_pb2.LeaderRequest(name=name)) + response = await stub.Leader(v3election_pb2.LeaderRequest(name=name_bytes)) return LeaderKey( - name=name.decode(encoding), + name=name, key=response.kv.key.decode(encoding), rev=response.kv.mod_revision, lease=response.kv.lease, @@ -1157,10 +1157,10 @@ async def election_observe( A `KeyValue` object containing event information. """ encoding = encoding or self.encoding - name = name.encode(encoding) + name_bytes = name.encode(encoding) stub = v3election_pb2_grpc.ElectionStub(self.channel) - async for response in stub.Observe(v3election_pb2.LeaderRequest(name=name)): + async for response in stub.Observe(v3election_pb2.LeaderRequest(name=name_bytes)): yield KeyValue.parse(response.kv, encoding=self.encoding) async def grant_lease(self, ttl: int, id: Optional[int] = None) -> int: From 0405183cd5b56a416523496d52e5c2eddb84178c Mon Sep 17 00:00:00 2001 From: Jeongseok Kang Date: Fri, 29 Jul 2022 17:27:55 +0900 Subject: [PATCH 10/12] docs: Update sphinx-generated document --- docs/references.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/references.md b/docs/references.md index 0153bf2..2954795 100644 --- a/docs/references.md +++ b/docs/references.md @@ -205,7 +205,7 @@ and generates a delete event in the event history for every deleted key. -#### _async_ election_campaign(name: bytes, lease_id: int, value: Optional[bytes] = None) +#### _async_ election_campaign(name: str, lease_id: int, value: Optional[str] = None, encoding: Optional[str] = None) Campaign waits to acquire leadership in an election, returning a LeaderKey representing the leadership if successful. The LeaderKey can then be used to issue new values on the election, @@ -240,7 +240,7 @@ and resign from the election. -#### _async_ election_leader(name: bytes) +#### _async_ election_leader(name: str, encoding: Optional[str] = None) Returns the current election proclamation, if any. @@ -262,7 +262,7 @@ Returns the current election proclamation, if any. -#### _async_ election_observe(name: bytes) +#### _async_ election_observe(name: str, encoding: Optional[str] = None) Observe streams election proclamations in-order as made by the election’s elected leaders. @@ -284,7 +284,7 @@ Observe streams election proclamations in-order as made by the election’s elec -#### _async_ election_proclaim(leader: LeaderKey, value: bytes) +#### _async_ election_proclaim(leader: LeaderKey, value: str, encoding: Optional[str] = None) Proclaim updates the leader’s posted value with a new value. @@ -298,7 +298,7 @@ Proclaim updates the leader’s posted value with a new value. -#### _async_ election_resign(leader: LeaderKey) +#### _async_ election_resign(leader: LeaderKey, encoding: Optional[str] = None) Resign releases election leadership so other campaigners may acquire leadership on the election. From 26d391b8e6fdb497520c39e0ef0f5bb4bee25752 Mon Sep 17 00:00:00 2001 From: Jeongseok Kang Date: Fri, 29 Jul 2022 21:37:08 +0900 Subject: [PATCH 11/12] fix: Cleanup granted leases after test is done --- tests/test_election.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/test_election.py b/tests/test_election.py index e2148a8..8188fce 100644 --- a/tests/test_election.py +++ b/tests/test_election.py @@ -70,3 +70,8 @@ async def _observe_task(election_id: str, queue: Optional[queues.Queue] = None) new_leader_key = await asyncio.wait_for(new_campaign_task, timeout=None) current_leader_key = await _leader_task() assert current_leader_key.lease == new_leader_key.lease + + # Cleanup granted leases + async with etcd.connect() as communicator: + await communicator.revoke_lease(leader_key.lease) + await communicator.revoke_lease(new_leader_key.lease) From df090e54061d153942be4cdba72c7e2d6113b489 Mon Sep 17 00:00:00 2001 From: Jeongseok Kang Date: Mon, 1 Aug 2022 16:08:09 +0900 Subject: [PATCH 12/12] fix: Change variable name to prevent type conflict on reassignment --- src/etcetra/client.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/etcetra/client.py b/src/etcetra/client.py index 508ba0e..67a5e5a 100644 --- a/src/etcetra/client.py +++ b/src/etcetra/client.py @@ -1066,8 +1066,10 @@ async def election_campaign( """ encoding = encoding or self.encoding name_bytes = name.encode(encoding) - value_bytes: Optional[bytes] = value.encode(encoding) if value is not None \ - else None + value_bytes: Optional[bytes] = ( + value.encode(encoding) if value is not None + else None + ) stub = v3election_pb2_grpc.ElectionStub(self.channel) request = v3election_pb2.CampaignRequest(name=name_bytes, lease=lease_id, value=value_bytes) @@ -1084,10 +1086,10 @@ async def election_resign(self, leader: LeaderKey, encoding: Optional[str] = Non Leader is the leadership to relinquish by resignation. """ encoding = encoding or self.encoding - leader = leader.proto(encoding=encoding) + leader_proto: v3election_pb2.LeaderKey = leader.proto(encoding=encoding) stub = v3election_pb2_grpc.ElectionStub(self.channel) - await stub.Resign(v3election_pb2.ResignRequest(leader=leader)) + await stub.Resign(v3election_pb2.ResignRequest(leader=leader_proto)) async def election_proclaim( self, @@ -1106,11 +1108,11 @@ async def election_proclaim( Value is an update meant to overwrite the leader’s current value. """ encoding = encoding or self.encoding - leader = leader.proto(encoding=encoding) + leader_proto: v3election_pb2.LeaderKey = leader.proto(encoding=encoding) value_bytes = value.encode(encoding) stub = v3election_pb2_grpc.ElectionStub(self.channel) - await stub.Proclaim(v3election_pb2.ProclaimRequest(leader=leader, value=value_bytes)) + await stub.Proclaim(v3election_pb2.ProclaimRequest(leader=leader_proto, value=value_bytes)) async def election_leader(self, name: str, encoding: Optional[str] = None) -> LeaderKey: """