Skip to content
This repository was archived by the owner on May 9, 2020. It is now read-only.

Commit 8468559

Browse files
kamilbednarzfpedrini
authored andcommitted
Reworked AES implementation - use ctypes instead of m2crypto
1 parent 9863d29 commit 8468559

File tree

3 files changed

+160
-1
lines changed

3 files changed

+160
-1
lines changed

chef/aes.py

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
import os
2+
3+
from ctypes import *
4+
from rsa import load_crypto_lib, SSLError
5+
6+
_eay = load_crypto_lib()
7+
8+
c_int_p = POINTER(c_int)
9+
10+
# void EVP_CIPHER_CTX_init(EVP_CIPHER_CTX *a);
11+
EVP_CIPHER_CTX_init = _eay.EVP_CIPHER_CTX_init
12+
EVP_CIPHER_CTX_init.argtypes = [c_void_p]
13+
EVP_CIPHER_CTX_init.restype = None
14+
15+
#int EVP_CipherUpdate(EVP_CIPHER_CTX *ctx, unsigned char *out,
16+
# int *outl, unsigned char *in, int inl);
17+
EVP_CipherUpdate = _eay.EVP_CipherUpdate
18+
EVP_CipherUpdate.argtypes = [c_void_p, c_char_p, c_int_p, c_char_p, c_int]
19+
EVP_CipherUpdate.restype = c_int
20+
21+
#int EVP_CipherFinal(EVP_CIPHER_CTX *ctx, unsigned char *out,
22+
# int *outl);
23+
EVP_CipherFinal = _eay.EVP_CipherFinal
24+
EVP_CipherFinal.argtypes = [c_void_p, c_char_p, c_int_p]
25+
EVP_CipherFinal.restype = c_int
26+
27+
#EVP_CIPHER *EVP_aes_256_cbc(void);
28+
EVP_aes_256_cbc = _eay.EVP_aes_256_cbc
29+
EVP_aes_256_cbc.argtypes = []
30+
EVP_aes_256_cbc.restype = c_void_p
31+
32+
#EVP_MD *EVP_sha1(void);
33+
EVP_sha1 = _eay.EVP_sha1
34+
EVP_sha1.argtypes = []
35+
EVP_sha1.restype = c_void_p
36+
37+
#int EVP_CipherInit(EVP_CIPHER_CTX *ctx, const EVP_CIPHER *type,
38+
# unsigned char *key, unsigned char *iv, int enc);
39+
EVP_CipherInit = _eay.EVP_CipherInit
40+
EVP_CipherInit.argtypes = [c_void_p, c_void_p, c_char_p, c_char_p, c_int]
41+
EVP_CipherInit.restype = c_int
42+
43+
#int EVP_CIPHER_CTX_set_padding(EVP_CIPHER_CTX *x, int padding);
44+
EVP_CIPHER_CTX_set_padding = _eay.EVP_CIPHER_CTX_set_padding
45+
EVP_CIPHER_CTX_set_padding.argtypes = [c_void_p, c_int]
46+
EVP_CIPHER_CTX_set_padding.restype = c_int
47+
48+
# Structures required for ctypes
49+
50+
EVP_MAX_IV_LENGTH = 16
51+
EVP_MAX_BLOCK_LENGTH = 32
52+
AES_BLOCK_SIZE = 16
53+
54+
class EVP_CIPHER(Structure):
55+
_fields_ = [
56+
("nid", c_int),
57+
("block_size", c_int),
58+
("key_len", c_int),
59+
("iv_len", c_int),
60+
("flags", c_ulong),
61+
("init", c_voidp),
62+
("do_cipher", c_voidp),
63+
("cleanup", c_voidp),
64+
("set_asn1_parameters", c_voidp),
65+
("get_asn1_parameters", c_voidp),
66+
("ctrl", c_voidp),
67+
("app_data", c_voidp)
68+
]
69+
70+
class EVP_CIPHER_CTX(Structure):
71+
_fields_ = [
72+
("cipher", POINTER(EVP_CIPHER)),
73+
("engine", c_voidp),
74+
("encrypt", c_int),
75+
("buflen", c_int),
76+
("oiv", c_ubyte * EVP_MAX_IV_LENGTH),
77+
("iv", c_ubyte * EVP_MAX_IV_LENGTH),
78+
("buf", c_ubyte * EVP_MAX_BLOCK_LENGTH),
79+
("num", c_int),
80+
("app_data", c_voidp),
81+
("key_len", c_int),
82+
("flags", c_ulong),
83+
("cipher_data", c_voidp),
84+
("final_used", c_int),
85+
("block_mask", c_int),
86+
("final", c_ubyte * EVP_MAX_BLOCK_LENGTH) ]
87+
88+
89+
class AES256Cipher():
90+
def __init__(self, key, iv, salt='12345678'):
91+
self.key_data = create_string_buffer(key)
92+
self.iv = create_string_buffer(iv)
93+
self.encryptor = self.decryptor = None
94+
self.salt = create_string_buffer(salt)
95+
96+
self.encryptor = EVP_CIPHER_CTX()
97+
EVP_CIPHER_CTX_init(byref(self.encryptor))
98+
EVP_CipherInit(byref(self.encryptor), EVP_aes_256_cbc(), self.key_data, self.iv, c_int(1))
99+
EVP_CIPHER_CTX_set_padding(byref(self.encryptor), c_int(1))
100+
101+
self.decryptor = EVP_CIPHER_CTX()
102+
EVP_CIPHER_CTX_init(byref(self.decryptor))
103+
EVP_CipherInit(byref(self.decryptor), EVP_aes_256_cbc(), self.key_data, self.iv, c_int(0))
104+
EVP_CIPHER_CTX_set_padding(byref(self.decryptor), c_int(1))
105+
106+
def encrypt(self, data):
107+
length = c_int(len(data))
108+
buf_length = c_int(length.value + AES_BLOCK_SIZE)
109+
buf = create_string_buffer(buf_length.value)
110+
111+
final_buf = create_string_buffer(AES_BLOCK_SIZE)
112+
final_length = c_int(0)
113+
114+
EVP_CipherUpdate(byref(self.encryptor), buf, byref(buf_length), create_string_buffer(data), length)
115+
EVP_CipherFinal(byref(self.encryptor), final_buf, byref(final_length))
116+
117+
return string_at(buf, buf_length) + string_at(final_buf, final_length)
118+
119+
def decrypt(self, data):
120+
length = c_int(len(data))
121+
buf_length = c_int(length.value + AES_BLOCK_SIZE)
122+
buf = create_string_buffer(buf_length.value)
123+
124+
final_buf = create_string_buffer(AES_BLOCK_SIZE)
125+
final_length = c_int(0)
126+
127+
EVP_CipherUpdate(byref(self.decryptor), buf, byref(buf_length), create_string_buffer(data), length)
128+
EVP_CipherFinal(byref(self.decryptor), final_buf, byref(final_length))
129+
130+
return string_at(buf, buf_length) + string_at(final_buf, final_length)
131+
132+
133+
134+

chef/encrypted_data_bag_item.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ def __init__(self, key, data):
5858
def encrypt(self):
5959
if self.encrypted_data is None:
6060
data = json.dumps({'json_wrapper': self.data})
61-
self.encrypted_data = self.encryptor.update(data) + self.encryptor.final()
61+
update_data = self.encryptor.update(data)
62+
self.encrypted_data = update_data + self.encryptor.final()
6263
del self.encryptor
6364
return self.encrypted_data
6465

chef/tests/test_aes.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from chef.tests import ChefTestCase, TEST_ROOT
2+
from chef.aes import AES256Cipher
3+
from chef.rsa import SSLError
4+
5+
import base64
6+
import os
7+
import hashlib
8+
import json
9+
10+
class AES256CipherTestCase(ChefTestCase):
11+
def setUp(self):
12+
super(AES256CipherTestCase, self).setUp()
13+
key = hashlib.sha256(open(os.path.join(TEST_ROOT, 'encryption_key')).read()).digest()
14+
iv = base64.standard_b64decode('GLVikZLxG0SWYnb68Pr8Ag==\n')
15+
self.cipher = AES256Cipher(key, iv)
16+
17+
def test_encrypt(self):
18+
encrypted_value = self.cipher.encrypt('{"json_wrapper":"secr3t c0d3"}')
19+
self.assertEquals(base64.standard_b64encode(encrypted_value).strip(), "Ym5T8umtSd0wgjDYq1ZDK5dAh6OjgrTxlloGNf2xYhg=")
20+
21+
22+
def test_decrypt(self):
23+
decrypted_value = self.cipher.decrypt(base64.standard_b64decode('Ym5T8umtSd0wgjDYq1ZDK5dAh6OjgrTxlloGNf2xYhg=\n'))
24+
self.assertEquals(decrypted_value, '{"json_wrapper":"secr3t c0d3"}')

0 commit comments

Comments
 (0)