11# -*- coding: utf-8 -*-
2- import argparse
32import asyncio
43
5- from cloudproof_py .cover_crypt import (
6- Attribute ,
7- CoverCrypt ,
8- Policy ,
9- PolicyAxis ,
10- UserSecretKey ,
11- )
12- from cloudproof_py .kms import KmsClient
4+ from cloudproof_py .cover_crypt import Attribute
5+ from cloudproof_py .cover_crypt import CoverCrypt
6+ from cloudproof_py .cover_crypt import Policy
7+ from cloudproof_py .cover_crypt import PolicyAxis
8+ from cloudproof_py .cover_crypt import UserSecretKey
139
1410
1511async def main (use_kms : bool = True ):
16- """Usage example of Cover Crypt"""
12+ """Usage example of Cover Crypt
13+ Keys generation, encryption and decryption are done locally."""
1714
1815 # Creating Policy
1916 policy = Policy ()
@@ -37,17 +34,6 @@ async def main(use_kms: bool = True):
3734 )
3835 )
3936
40- # Example storing keys in Cosmian KMS
41- if use_kms :
42- await kms_example (policy )
43-
44- # Example storing keys in memory
45- else :
46- offline_example (policy )
47-
48-
49- def offline_example (policy : Policy ):
50- """Keys generation, encryption and decryption are done locally."""
5137 # Generating master keys
5238 cover_crypt = CoverCrypt ()
5339 master_private_key , public_key = cover_crypt .generate_master_keys (policy )
@@ -129,25 +115,22 @@ def offline_example(policy: Policy):
129115 )
130116 assert protected_fin_plaintext == protected_fin_data
131117
132- # Rotating Attributes
118+ # Rekey
133119
134120 # make a copy of the current user key
135121 old_confidential_mkg_user_key = UserSecretKey .from_bytes (
136122 confidential_mkg_user_key .to_bytes ()
137123 )
138124
139- # rotate MKG attribute
140- policy .rotate (Attribute ("Department" , "MKG" ))
141-
142- # update master keys
143- cover_crypt .update_master_keys (policy , master_private_key , public_key )
125+ # Rekey MKG attribute
126+ cover_crypt .rekey_master_keys (
127+ "Department::MKG" , policy , master_private_key , public_key
128+ )
144129
145130 # update user key
146131 cover_crypt .refresh_user_secret_key (
147132 confidential_mkg_user_key ,
148- "Department::MKG && Security Level::Confidential" ,
149133 master_private_key ,
150- policy ,
151134 keep_old_accesses = True ,
152135 )
153136
@@ -185,25 +168,19 @@ def offline_example(policy: Policy):
185168
186169 # decrypting the "new" `confidential marketing` message with the old key fails
187170 try :
188- new_confidential_mkg_plaintext , _ = cover_crypt .decrypt (
189- old_confidential_mkg_user_key , confidential_mkg_ciphertext
190- )
171+ cover_crypt .decrypt (old_confidential_mkg_user_key , confidential_mkg_ciphertext )
191172 except Exception as e :
192173 # ==> the user is not be able to decrypt
193174 print ("Expected error:" , e )
194175
195- # Clearing old rotations
176+ # Prune: remove old keys for the MKG attribute
196177
197- policy .clear_old_attribute_values (Attribute ("Department" , "MKG" ))
198- # old rotations for this attribute will be definitely removed from the master keys
199- cover_crypt .update_master_keys (policy , master_private_key , public_key )
178+ cover_crypt .prune_master_secret_key ("Department::MKG" , policy , master_private_key )
200179
201180 # update user key
202181 cover_crypt .refresh_user_secret_key (
203182 confidential_mkg_user_key ,
204- "Department::MKG && Security Level::Confidential" ,
205183 master_private_key ,
206- policy ,
207184 keep_old_accesses = True , # will not keep removed rotations
208185 )
209186
@@ -250,6 +227,15 @@ def offline_example(policy: Policy):
250227 )
251228 assert protected_rd_plaintext == protected_rd_data
252229
230+ # Rename attribute "Department::MKG" to "Department::Marketing"
231+ policy .rename_attribute (Attribute ("Department" , "MKG" ), "Marketing" )
232+
233+ # Encryption and decryption work the same even with previously generated keys and ciphers
234+ confidential_mkg_plaintext , _ = cover_crypt .decrypt (
235+ confidential_mkg_user_key , confidential_mkg_ciphertext
236+ )
237+ assert confidential_mkg_plaintext == confidential_mkg_data
238+
253239 # Removing access to an attribute
254240 # 1 - Keep decryption access to ciphertext from old attributes but remove the right to encrypt new data
255241
@@ -261,9 +247,7 @@ def offline_example(policy: Policy):
261247 cover_crypt .update_master_keys (policy , master_private_key , public_key )
262248 cover_crypt .refresh_user_secret_key (
263249 confidential_rd_fin_user_key ,
264- "(Department::R&D || Department::FIN) && Security Level::Confidential" ,
265250 master_private_key ,
266- policy ,
267251 keep_old_accesses = True ,
268252 )
269253
@@ -296,14 +280,13 @@ def offline_example(policy: Policy):
296280
297281 # after updating the keys, removed attributes can no longer be used to encrypt or decrypt
298282 cover_crypt .update_master_keys (policy , master_private_key , public_key )
283+ cover_crypt .refresh_user_secret_key (
284+ confidential_rd_fin_user_key ,
285+ master_private_key ,
286+ keep_old_accesses = True ,
287+ )
299288 try :
300- cover_crypt .refresh_user_secret_key (
301- confidential_rd_fin_user_key ,
302- "(Department::R&D || Department::FIN) && Security Level::Confidential" , # `Department::R&D` can no longer be used here
303- master_private_key ,
304- policy ,
305- keep_old_accesses = True ,
306- )
289+ cover_crypt .decrypt (confidential_rd_fin_user_key , protected_rd_ciphertext )
307290 except Exception as e :
308291 print ("Expected error:" , e )
309292
@@ -316,146 +299,16 @@ def offline_example(policy: Policy):
316299 # updating the keys will remove all access to previous ciphertext encrypted for `Security Level`
317300 cover_crypt .update_master_keys (policy , master_private_key , public_key )
318301 try :
319- cover_crypt .refresh_user_secret_key (
320- confidential_rd_fin_user_key ,
321- "Department::FIN && Security Level::Confidential" , # `Security Level` can no longer be used here
302+ cover_crypt .generate_user_secret_key (
322303 master_private_key ,
304+ "Department::FIN && Security Level::Confidential" , # `Security Level` can no longer be used here
323305 policy ,
324- keep_old_accesses = True ,
325- )
326- except Exception as e :
327- print ("Expected error:" , e )
328-
329-
330- async def kms_example (policy : Policy ):
331- """Keys generation, encryption and decryption are processed by an external KMS."""
332- # Generating master keys
333- kms_client = KmsClient (server_url = "http://localhost:9998" , api_key = "" )
334- (
335- public_key_uid ,
336- private_key_uid ,
337- ) = await kms_client .create_cover_crypt_master_key_pair (policy )
338-
339- # Copy the keys locally for backup
340- _ = await kms_client .retrieve_cover_crypt_public_master_key (public_key_uid )
341- _ = await kms_client .retrieve_cover_crypt_private_master_key (private_key_uid )
342-
343- # Messages encryption
344- protected_mkg_data = b"protected_mkg_message"
345- protected_mkg_ciphertext = await kms_client .cover_crypt_encryption (
346- "Department::MKG && Security Level::Protected" ,
347- protected_mkg_data ,
348- public_key_uid ,
349- )
350-
351- top_secret_mkg_data = b"top_secret_mkg_message"
352- top_secret_mkg_ciphertext = await kms_client .cover_crypt_encryption (
353- "Department::MKG && Security Level::Top Secret" ,
354- top_secret_mkg_data ,
355- public_key_uid ,
356- )
357-
358- protected_fin_data = b"protected_fin_message"
359- protected_fin_ciphertext = await kms_client .cover_crypt_encryption (
360- "Department::FIN && Security Level::Protected" ,
361- protected_fin_data ,
362- public_key_uid ,
363- )
364-
365- # Generating user keys
366- confidential_mkg_user_uid = await kms_client .create_cover_crypt_user_decryption_key (
367- "Department::MKG && Security Level::Confidential" ,
368- private_key_uid ,
369- )
370-
371- topSecret_mkg_fin_user_uid = (
372- await kms_client .create_cover_crypt_user_decryption_key (
373- "(Department::MKG || Department::FIN) && Security Level::Top Secret" ,
374- private_key_uid ,
375- )
376- )
377-
378- # Decryption with the right access policy
379- protected_mkg_plaintext , _ = await kms_client .cover_crypt_decryption (
380- protected_mkg_ciphertext , confidential_mkg_user_uid
381- )
382- assert protected_mkg_plaintext == protected_mkg_data
383-
384- # Decryption without the right access will fail
385- try :
386- # will throw
387- await kms_client .cover_crypt_decryption (
388- top_secret_mkg_ciphertext , confidential_mkg_user_uid
389306 )
390307 except Exception as e :
391- # ==> the user is not be able to decrypt
392308 print ("Expected error:" , e )
393309
394- try :
395- # will throw
396- await kms_client .cover_crypt_decryption (
397- protected_fin_ciphertext , confidential_mkg_user_uid
398- )
399- except Exception as e :
400- # ==> the user is not be able to decrypt
401- print ("Expected error:" , e )
402-
403- # User with Top Secret access can decrypt messages
404- # of all Security Level within the right Department
405-
406- protected_mkg_plaintext2 , _ = await kms_client .cover_crypt_decryption (
407- protected_mkg_ciphertext , topSecret_mkg_fin_user_uid
408- )
409- assert protected_mkg_plaintext2 == protected_mkg_data
410-
411- topSecret_mkg_plaintext , _ = await kms_client .cover_crypt_decryption (
412- top_secret_mkg_ciphertext , topSecret_mkg_fin_user_uid
413- )
414- assert topSecret_mkg_plaintext == top_secret_mkg_data
415-
416- protected_fin_plaintext , _ = await kms_client .cover_crypt_decryption (
417- protected_fin_ciphertext , topSecret_mkg_fin_user_uid
418- )
419- assert protected_fin_plaintext == protected_fin_data
420-
421- # Rotating Attributes
422-
423- # rotate MKG attribute
424- # all active keys will be rekeyed automatically
425- await kms_client .rotate_cover_crypt_attributes (["Department::MKG" ], private_key_uid )
426-
427- # New confidential marketing message
428-
429- confidential_mkg_data = b"confidential_secret_mkg_message"
430- confidential_mkg_ciphertext = await kms_client .cover_crypt_encryption (
431- "Department::MKG && Security Level::Confidential" ,
432- confidential_mkg_data ,
433- public_key_uid ,
434- )
435-
436- # Decrypting the messages with the rekeyed key
437-
438- # decrypting the "old" `protected marketing` message
439- old_protected_mkg_plaintext , _ = await kms_client .cover_crypt_decryption (
440- protected_mkg_ciphertext , confidential_mkg_user_uid
441- )
442- assert old_protected_mkg_plaintext == protected_mkg_data
443-
444- # decrypting the "new" `confidential marketing` message
445- new_confidential_mkg_plaintext , _ = await kms_client .cover_crypt_decryption (
446- confidential_mkg_ciphertext , confidential_mkg_user_uid
447- )
448- assert new_confidential_mkg_plaintext == confidential_mkg_data
449-
450310
451311if __name__ == "__main__" :
452- parser = argparse .ArgumentParser (description = "CoverCrypt example." )
453- parser .add_argument (
454- "--kms" , action = "store_true" , help = "Use a local KMS to store CoverCrypt keys"
455- )
456-
457- args = parser .parse_args ()
458-
459312 loop = asyncio .new_event_loop ()
460- loop .run_until_complete (main (bool ( args . kms ) ))
313+ loop .run_until_complete (main ())
461314 loop .close ()
0 commit comments