diff --git a/docs/references.md b/docs/references.md index 04a0f7d..2954795 100644 --- a/docs/references.md +++ b/docs/references.md @@ -3,7 +3,7 @@ Pure python asyncio Etcd client. -### _class_ etcetra.client.EtcdClient(addr: etcetra.types.HostPortPair, credentials: Optional[etcetra.types.EtcdCredential] = None, secure: bool = False, encoding: str = 'utf-8') +### _class_ etcetra.client.EtcdClient(addr: HostPortPair, credentials: Optional[EtcdCredential] = None, secure: bool = False, encoding: str = 'utf-8') Wrapper class of underlying actual Etcd API implementations (KV, Watch, Txn, …). In most cases, user can perform most of the jobs by creating EtcdClient object. @@ -64,10 +64,36 @@ Acquired lock will automatically released when user exits with context. -### _class_ etcetra.client.EtcdCommunicator(channel: grpc.aio._base_channel.Channel, encoding: str = 'utf-8') +### _class_ etcetra.client.EtcdCommunicator(channel: Channel, encoding: str = 'utf-8') Performs actual API calls to Etcd cluster and returns result. +#### create_lease_keepalive_task(id: int, interval: float) +Creates asyncio Task which sends Keepalive request to given lease ID. + + +* **Parameters** + + + * **id** – Lease ID to send Keepalive request. + + + * **interval** – Interval to send Keepalive request. + + + +* **Returns** + + **task** + + + +* **Return type** + + asyncio.Task + + + #### _async_ delete(key: str, prev_kv: bool = False, encoding: Optional[str] = None) Deletes the given key the key-value store. A delete request increments the revision of the key-value store @@ -179,6 +205,109 @@ and generates a delete event in the event history for every deleted key. +#### _async_ election_campaign(name: str, lease_id: int, value: Optional[str] = None, encoding: Optional[str] = None) +Campaign waits to acquire leadership in an election, +returning a LeaderKey representing the leadership if successful. +The LeaderKey can then be used to issue new values on the election, +transactionally guard API requests on leadership still being held, +and resign from the election. + + +* **Parameters** + + + * **name** – Name is the election’s identifier for the campaign. + + + * **lease_id** – LeaseID is the ID of the lease attached to leadership of the election. + If the lease expires or is revoked before resigning leadership, + then the leadership is transferred to the next campaigner, if any. + + + * **value** – Value is the initial proclaimed value set when the campaigner wins the election. + + + +* **Returns** + + **leader** – Leader describes the resources used for holding leadereship of the election. + + + +* **Return type** + + etcetra.types.LeaderKey + + + +#### _async_ election_leader(name: str, encoding: Optional[str] = None) +Returns the current election proclamation, if any. + + +* **Parameters** + + **name** – Name is the election identifier for the leadership information. + + + +* **Returns** + + LeaderKey is the key-value pair representing the latest leader update + + + +* **Return type** + + leader_key + + + +#### _async_ election_observe(name: str, encoding: Optional[str] = None) +Observe streams election proclamations in-order as made by the election’s elected leaders. + + +* **Parameters** + + **name** – Name is the election identifier for the leadership information. + + + +* **Returns** + + **event** – A KeyValue object containing event information. + + + +* **Return type** + + AsyncIterator[KeyValue] + + + +#### _async_ election_proclaim(leader: LeaderKey, value: str, encoding: Optional[str] = None) +Proclaim updates the leader’s posted value with a new value. + + +* **Parameters** + + + * **leader** – Leader is the leadership hold on the election. + + + * **value** – Value is an update meant to overwrite the leader’s current value. + + + +#### _async_ election_resign(leader: LeaderKey, encoding: Optional[str] = None) +Resign releases election leadership so other campaigners may acquire leadership on the election. + + +* **Parameters** + + **leader** – Leader is the leadership to relinquish by resignation. + + + #### _async_ get(key: str, max_create_revision: Optional[str] = None, max_mod_revision: Optional[str] = None, min_create_revision: Optional[str] = None, min_mod_revision: Optional[str] = None, revision: Optional[str] = None, encoding: Optional[str] = None) Gets value associated with given key from the key-value store. @@ -229,7 +358,7 @@ Gets value associated with given key from the key-value store. -#### _async_ get_prefix(key: str, max_create_revision: Optional[str] = None, max_mod_revision: Optional[str] = None, min_create_revision: Optional[str] = None, min_mod_revision: Optional[str] = None, revision: Optional[str] = None, sort_order: etcetra.types.RangeRequestSortOrder = RangeRequestSortOrder.NONE, sort_target: etcetra.types.RangeRequestSortTarget = RangeRequestSortTarget.KEY, encoding: Optional[str] = None) +#### _async_ get_prefix(key: str, max_create_revision: Optional[str] = None, max_mod_revision: Optional[str] = None, min_create_revision: Optional[str] = None, min_mod_revision: Optional[str] = None, revision: Optional[str] = None, sort_order: RangeRequestSortOrder = RangeRequestSortOrder.NONE, sort_target: RangeRequestSortTarget = RangeRequestSortTarget.KEY, encoding: Optional[str] = None) Gets the key-value in dictionary from the key-value store with given key prefix. i.e. get_prefix(‘/sorna/local’) call looks up all keys which has /sorna/local prefix. @@ -286,7 +415,7 @@ i.e. get_prefix(‘/sorna/local’) call looks up all keys which has /sorna/loca -#### _async_ get_range(key: str, range_end: str, limit: Optional[str] = None, max_create_revision: Optional[str] = None, max_mod_revision: Optional[str] = None, min_create_revision: Optional[str] = None, min_mod_revision: Optional[str] = None, revision: Optional[str] = None, sort_order: etcetra.types.RangeRequestSortOrder = RangeRequestSortOrder.NONE, sort_target: etcetra.types.RangeRequestSortTarget = RangeRequestSortTarget.KEY, encoding: Optional[str] = None) +#### _async_ get_range(key: str, range_end: str, limit: Optional[str] = None, max_create_revision: Optional[str] = None, max_mod_revision: Optional[str] = None, min_create_revision: Optional[str] = None, min_mod_revision: Optional[str] = None, revision: Optional[str] = None, sort_order: RangeRequestSortOrder = RangeRequestSortOrder.NONE, sort_target: RangeRequestSortTarget = RangeRequestSortTarget.KEY, encoding: Optional[str] = None) Gets the key-value in dictionary from the key-value store with keys in [key, range_end) range. @@ -374,7 +503,7 @@ Each expired key generates a delete event in the event history. -#### _async_ keys_prefix(key: str, max_create_revision: Optional[str] = None, max_mod_revision: Optional[str] = None, min_create_revision: Optional[str] = None, min_mod_revision: Optional[str] = None, revision: Optional[str] = None, sort_order: etcetra.types.RangeRequestSortOrder = RangeRequestSortOrder.NONE, sort_target: etcetra.types.RangeRequestSortTarget = RangeRequestSortTarget.KEY, encoding: Optional[str] = None) +#### _async_ keys_prefix(key: str, max_create_revision: Optional[str] = None, max_mod_revision: Optional[str] = None, min_create_revision: Optional[str] = None, min_mod_revision: Optional[str] = None, revision: Optional[str] = None, sort_order: RangeRequestSortOrder = RangeRequestSortOrder.NONE, sort_target: RangeRequestSortTarget = RangeRequestSortTarget.KEY, encoding: Optional[str] = None) Gets the keys which has given prefix from the key-value store. @@ -430,7 +559,7 @@ Gets the keys which has given prefix from the key-value store. -#### _async_ keys_range(key: str, range_end: str, limit: Optional[str] = None, max_create_revision: Optional[str] = None, max_mod_revision: Optional[str] = None, min_create_revision: Optional[str] = None, min_mod_revision: Optional[str] = None, revision: Optional[str] = None, serializable: bool = True, sort_order: etcetra.types.RangeRequestSortOrder = RangeRequestSortOrder.NONE, sort_target: etcetra.types.RangeRequestSortTarget = RangeRequestSortTarget.KEY, encoding: Optional[str] = None) +#### _async_ keys_range(key: str, range_end: str, limit: Optional[str] = None, max_create_revision: Optional[str] = None, max_mod_revision: Optional[str] = None, min_create_revision: Optional[str] = None, min_mod_revision: Optional[str] = None, revision: Optional[str] = None, serializable: bool = True, sort_order: RangeRequestSortOrder = RangeRequestSortOrder.NONE, sort_target: RangeRequestSortTarget = RangeRequestSortTarget.KEY, encoding: Optional[str] = None) Gets the keys in the range from the key-value store. @@ -489,32 +618,6 @@ Gets the keys in the range from the key-value store. -#### _async_ lease_keepalive(id: int, interval: float) -Creates asyncio Task which sends Keepalive request to given lease ID. - - -* **Parameters** - - - * **id** – Lease ID to send Keepalive request. - - - * **interval** – Interval to send Keepalive request. - - - -* **Returns** - - **task** - - - -* **Return type** - - asyncio.Task - - - #### _async_ put(key: str, value: Optional[str], lease: Optional[int] = None, prev_kv: bool = False, encoding: Optional[str] = None) Puts given key into the key-value store. @@ -566,7 +669,7 @@ Revokes a lease. All keys attached to the lease will expire and be deleted. -#### _async_ txn(txn_builder: Callable[[etcetra.client.EtcdTransactionAction], None], encoding: Optional[str] = None) +#### _async_ txn(txn_builder: Callable[[EtcdTransactionAction], None], encoding: Optional[str] = None) A shorthand helper for Txn, with no compare arguments. This can be helpful when user just wants to execute transaction without any conditions. @@ -608,7 +711,7 @@ any conditions. -#### _async_ txn_compare(compares: List[rpc_pb2.Compare], txn_builder: Callable[[etcetra.client.EtcdTransactionAction, etcetra.client.EtcdTransactionAction], None], encoding: Optional[str] = None) +#### _async_ txn_compare(compares: List[Compare], txn_builder: Callable[[EtcdTransactionAction, EtcdTransactionAction], None], encoding: Optional[str] = None) Processes multiple requests in a single transaction. A txn request increments the revision of the key-value store and generates events with the same revision for every completed request. @@ -667,7 +770,7 @@ It is not allowed to modify the same key several times within one txn. -#### watch(key: str, ready_event: Optional[asyncio.locks.Event] = None, filters: Optional[List[etcetra.types.WatchCreateRequestFilterType]] = None, prev_kv: bool = False, progress_notify: bool = False, start_revision: Optional[int] = None, watch_id: Optional[int] = None, encoding: Optional[str] = None) +#### watch(key: str, ready_event: Optional[Event] = None, filters: Optional[List[WatchCreateRequestFilterType]] = None, prev_kv: bool = False, progress_notify: bool = False, start_revision: Optional[int] = None, watch_id: Optional[int] = None, encoding: Optional[str] = None) Async iterator which watches for events happening or that have happened. Both input and output are streams; the input stream is for creating and canceling watchers and the output stream sends events. @@ -729,7 +832,7 @@ The entire event history can be watched starting from the last compaction revisi -#### watch_prefix(key: str, ready_event: Optional[asyncio.locks.Event] = None, filters: Optional[List[etcetra.types.WatchCreateRequestFilterType]] = None, prev_kv: bool = False, progress_notify: bool = True, start_revision: Optional[int] = None, watch_id: Optional[int] = None, encoding: Optional[str] = None) +#### watch_prefix(key: str, ready_event: Optional[Event] = None, filters: Optional[List[WatchCreateRequestFilterType]] = None, prev_kv: bool = False, progress_notify: bool = True, start_revision: Optional[int] = None, watch_id: Optional[int] = None, encoding: Optional[str] = None) Watches for events happening or that have happened along keys with given prefix. Both input and output are streams; the input stream is for creating and canceling watchers and the output stream sends events. @@ -801,7 +904,7 @@ A delete request increments the revision of the key-value store and generates a delete event in the event history for every deleted key. -#### get(key: str, limit: Optional[str] = None, max_create_revision: Optional[str] = None, max_mod_revision: Optional[str] = None, min_create_revision: Optional[str] = None, min_mod_revision: Optional[str] = None, revision: Optional[str] = None, serializable: bool = True, sort_order: etcetra.types.RangeRequestSortOrder = RangeRequestSortOrder.NONE, sort_target: etcetra.types.RangeRequestSortTarget = RangeRequestSortTarget.KEY, encoding: Optional[str] = None) +#### get(key: str, limit: Optional[str] = None, max_create_revision: Optional[str] = None, max_mod_revision: Optional[str] = None, min_create_revision: Optional[str] = None, min_mod_revision: Optional[str] = None, revision: Optional[str] = None, serializable: bool = True, sort_order: RangeRequestSortOrder = RangeRequestSortOrder.NONE, sort_target: RangeRequestSortTarget = RangeRequestSortTarget.KEY, encoding: Optional[str] = None) Gets the keys in the range from the key-value store. @@ -822,7 +925,7 @@ An enumeration. ### _class_ etcetra.types.CompareKey(key: 'str', target_lease: 'Optional[int]' = None, mod_revision: 'Optional[int]' = None, range_end: 'Optional[str]' = None, target_version: 'Optional[int]' = None, encoding: 'str' = 'utf-8') ### etcetra.types.DeleteRangeRequestType() -alias of `rpc_pb2.DeleteRangeRequest` +alias of `DeleteRangeRequest` ### _class_ etcetra.types.EtcdCredential(username: 'str', password: 'str') @@ -832,7 +935,7 @@ alias of `rpc_pb2.DeleteRangeRequest` ### _class_ etcetra.types.HostPortPair(host: 'str', port: 'int') ### etcetra.types.PutRequestType() -alias of `rpc_pb2.PutRequest` +alias of `PutRequest` ### _class_ etcetra.types.RangeRequestSortOrder(value) @@ -844,7 +947,7 @@ An enumeration. ### etcetra.types.RangeRequestType() -alias of `rpc_pb2.RangeRequest` +alias of `RangeRequest` ### _class_ etcetra.types.TxnReturnType(values, success) diff --git a/scripts/compile_protobuf.py b/scripts/compile_protobuf.py index e8d6860..a10c657 100755 --- a/scripts/compile_protobuf.py +++ b/scripts/compile_protobuf.py @@ -164,7 +164,7 @@ def main(version: str, repo_path: Optional[Path] = None): parser.add_argument('version', type=str, help='target etcd version') parser.add_argument( '--repository-path', type=str, - help='git repository folder path of etcd source code to use. Ff not supplied, ' + help='git repository folder path of etcd source code to use. If not supplied, ' 'this script will clone fresh repo on temporary directory and remove it upon exit.') args = parser.parse_args() diff --git a/src/etcetra/client.py b/src/etcetra/client.py index 8fab1db..67a5e5a 100644 --- a/src/etcetra/client.py +++ b/src/etcetra/client.py @@ -32,12 +32,13 @@ from grpc.aio._typing import RequestType, RequestIterableType, ResponseType, ResponseIterableType from .grpc_api import rpc_pb2, rpc_pb2_grpc +from .grpc_api import v3election_pb2, v3election_pb2_grpc from .grpc_api import v3lock_pb2, v3lock_pb2_grpc from .types import ( DeleteRangeRequestType, EtcdCredential, EtcdLockOption, HostPortPair, - PutRequestType, RangeRequestSortOrder, RangeRequestSortTarget, RangeRequestType, - TransactionRequest, TxnReturnType, TxnReturnValues, WatchCreateRequestFilterType, - WatchEvent, WatchEventType, + KeyValue, LeaderKey, PutRequestType, RangeRequestSortOrder, RangeRequestSortTarget, + RangeRequestType, TransactionRequest, TxnReturnType, TxnReturnValues, + WatchCreateRequestFilterType, WatchEvent, WatchEventType, ) __all__ = ( 'EtcdClient', @@ -1033,6 +1034,137 @@ async def keys_range( ) return [x.key.decode(encoding) for x in response.kvs] + async def election_campaign( + self, + name: str, + lease_id: int, + value: Optional[str] = None, + encoding: Optional[str] = None, + ) -> LeaderKey: + """ + Campaign waits to acquire leadership in an election, + returning a LeaderKey representing the leadership if successful. + The LeaderKey can then be used to issue new values on the election, + transactionally guard API requests on leadership still being held, + and resign from the election. + + Parameters + --------- + name + Name is the election’s identifier for the campaign. + lease_id + LeaseID is the ID of the lease attached to leadership of the election. + If the lease expires or is revoked before resigning leadership, + then the leadership is transferred to the next campaigner, if any. + value: + Value is the initial proclaimed value set when the campaigner wins the election. + + Returns + ------- + leader: etcetra.types.LeaderKey + Leader describes the resources used for holding leadereship of the election. + """ + encoding = encoding or self.encoding + name_bytes = name.encode(encoding) + value_bytes: Optional[bytes] = ( + value.encode(encoding) if value is not None + else None + ) + + stub = v3election_pb2_grpc.ElectionStub(self.channel) + request = v3election_pb2.CampaignRequest(name=name_bytes, lease=lease_id, value=value_bytes) + response = await stub.Campaign(request) + return LeaderKey.parse(response.leader, encoding=encoding) + + async def election_resign(self, leader: LeaderKey, encoding: Optional[str] = None) -> None: + """ + Resign releases election leadership so other campaigners may acquire leadership on the election. + + Parameters + --------- + leader + Leader is the leadership to relinquish by resignation. + """ + encoding = encoding or self.encoding + leader_proto: v3election_pb2.LeaderKey = leader.proto(encoding=encoding) + + stub = v3election_pb2_grpc.ElectionStub(self.channel) + await stub.Resign(v3election_pb2.ResignRequest(leader=leader_proto)) + + async def election_proclaim( + self, + leader: LeaderKey, + value: str, + encoding: Optional[str] = None, + ) -> None: + """ + Proclaim updates the leader’s posted value with a new value. + + Parameters + --------- + leader + Leader is the leadership hold on the election. + value + Value is an update meant to overwrite the leader’s current value. + """ + encoding = encoding or self.encoding + leader_proto: v3election_pb2.LeaderKey = leader.proto(encoding=encoding) + value_bytes = value.encode(encoding) + + stub = v3election_pb2_grpc.ElectionStub(self.channel) + await stub.Proclaim(v3election_pb2.ProclaimRequest(leader=leader_proto, value=value_bytes)) + + async def election_leader(self, name: str, encoding: Optional[str] = None) -> LeaderKey: + """ + Returns the current election proclamation, if any. + + Parameters + --------- + name + Name is the election identifier for the leadership information. + + Returns + ------- + leader_key + LeaderKey is the key-value pair representing the latest leader update + """ + encoding = encoding or self.encoding + name_bytes = name.encode(encoding) + + stub = v3election_pb2_grpc.ElectionStub(self.channel) + response = await stub.Leader(v3election_pb2.LeaderRequest(name=name_bytes)) + return LeaderKey( + name=name, + key=response.kv.key.decode(encoding), + rev=response.kv.mod_revision, + lease=response.kv.lease, + ) + + async def election_observe( + self, + name: str, + encoding: Optional[str] = None, + ) -> AsyncIterator[KeyValue]: + """ + Observe streams election proclamations in-order as made by the election’s elected leaders. + + Parameters + --------- + name + Name is the election identifier for the leadership information. + + Returns + ------- + event: AsyncIterator[KeyValue] + A `KeyValue` object containing event information. + """ + encoding = encoding or self.encoding + name_bytes = name.encode(encoding) + + stub = v3election_pb2_grpc.ElectionStub(self.channel) + async for response in stub.Observe(v3election_pb2.LeaderRequest(name=name_bytes)): + yield KeyValue.parse(response.kv, encoding=self.encoding) + async def grant_lease(self, ttl: int, id: Optional[int] = None) -> int: """ Creates a lease which expires if the server does not receive a keepAlive diff --git a/src/etcetra/types.py b/src/etcetra/types.py index fd04284..5f0c50c 100644 --- a/src/etcetra/types.py +++ b/src/etcetra/types.py @@ -6,7 +6,7 @@ if TYPE_CHECKING: from typing_extensions import TypeAlias -from etcetra.grpc_api import rpc_pb2 +from etcetra.grpc_api import kv_pb2, rpc_pb2, v3election_pb2 __all__ = ( 'RangeRequestSortOrder', @@ -188,3 +188,59 @@ class EtcdLockOption: lock_name: str timeout: Optional[float] ttl: Optional[int] + + +@dataclass +class LeaderKey: + name: str + key: str + rev: int + lease: int + + @classmethod + def parse(cls, key: v3election_pb2.LeaderKey, encoding: str = "utf-8") -> "LeaderKey": + return cls( + name=key.name.decode(encoding), + key=key.key.decode(encoding), + rev=key.rev, + lease=key.lease, + ) + + def proto(self, encoding: str = "utf-8") -> v3election_pb2.LeaderKey: + return v3election_pb2.LeaderKey( + name=self.name.encode(encoding), + key=self.key.encode(encoding), + rev=self.rev, + lease=self.lease, + ) + + +@dataclass +class KeyValue: + key: str + create_revision: int + mod_revision: int + version: int + value: str + lease: int + + @classmethod + def parse(cls, kv: kv_pb2.KeyValue, encoding: str = "utf-8") -> "KeyValue": + return cls( + key=kv.key.decode(encoding), + create_revision=kv.create_revision, + mod_revision=kv.mod_revision, + version=kv.version, + value=kv.value.decode(encoding), + lease=kv.lease, + ) + + def proto(self, encoding: str = "utf-8") -> kv_pb2.KeyValue: + return kv_pb2.KeyValue( + key=self.key.encode(encoding), + create_revision=self.create_revision, + mod_revision=self.mod_revision, + version=self.version, + value=self.value.encode(encoding), + lease=self.lease, + ) diff --git a/tests/conftest.py b/tests/conftest.py index f7906ef..cf3635a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,6 @@ import os +import uuid + import pytest from etcetra import EtcdClient, HostPortPair @@ -21,3 +23,8 @@ async def etcd(etcd_addr): async with etcd.connect() as communicator: await communicator.delete_prefix('/test') del etcd + + +@pytest.fixture(scope="module") +def election_id() -> str: + return f"test/{uuid.uuid4()}" diff --git a/tests/test_election.py b/tests/test_election.py new file mode 100644 index 0000000..8188fce --- /dev/null +++ b/tests/test_election.py @@ -0,0 +1,77 @@ +import asyncio +import uuid +from multiprocessing import Queue, queues +from typing import Optional + +import pytest + +from etcetra import EtcdClient +from etcetra.types import LeaderKey + + +@pytest.mark.asyncio +async def test_election_service(etcd: EtcdClient, election_id: str): + + async def _campaign_task(value: str) -> LeaderKey: + async with etcd.connect() as communicator: + lease_id = await communicator.grant_lease(ttl=60 * 60) + return await communicator.election_campaign( + name=election_id, + lease_id=lease_id, + value=value, + ) + + async def _resign_task(leader: LeaderKey) -> None: + async with etcd.connect() as communicator: + await communicator.election_resign(leader) + + async def _leader_task() -> LeaderKey: + async with etcd.connect() as communicator: + return await communicator.election_leader(name=election_id) + + async def _proclaim_task(leader: LeaderKey, value: str) -> None: + async with etcd.connect() as communicator: + await communicator.election_proclaim(leader, value) + + async def _observe_task(election_id: str, queue: Optional[queues.Queue] = None) -> None: + async with etcd.connect() as communicator: + async for kv in communicator.election_observe(name=election_id): + if queue: + queue.put(kv.value) + + # Campaign + random_value = str(uuid.uuid4()) + leader_key = await _campaign_task(value=random_value) + + # Leader + current_leader_key = await _leader_task() + assert current_leader_key.lease == leader_key.lease + + election_event_queue: queues.Queue = Queue() + observe_task = asyncio.create_task(_observe_task(election_id, queue=election_event_queue)) + await asyncio.sleep(3.0) + + # Proclaim + next_random_value = str(uuid.uuid4()) + await _proclaim_task(leader=leader_key, value=next_random_value) + + # Observe + initial_value = election_event_queue.get() + assert initial_value == random_value + proclaimed_value = election_event_queue.get() + assert proclaimed_value == next_random_value + + observe_task.cancel() + + # Resign + new_random_value = str(uuid.uuid4()) + new_campaign_task = _campaign_task(value=new_random_value) + await _resign_task(leader=leader_key) + new_leader_key = await asyncio.wait_for(new_campaign_task, timeout=None) + current_leader_key = await _leader_task() + assert current_leader_key.lease == new_leader_key.lease + + # Cleanup granted leases + async with etcd.connect() as communicator: + await communicator.revoke_lease(leader_key.lease) + await communicator.revoke_lease(new_leader_key.lease)