Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 58 additions & 8 deletions sros2/sros2/_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,34 @@
_KEYSTORE_DIR_ENV = 'ROS_SECURITY_KEYSTORE'


def create_signed_cert(
keystore_ca_cert_path: pathlib.Path,
keystore_ca_key_path: pathlib.Path,
identity: str,
cert_path: pathlib.Path,
key_path: pathlib.Path,
**kwargs):
# Load the CA cert and key from disk
ca_cert = load_cert(keystore_ca_cert_path)

with open(keystore_ca_key_path, 'rb') as f:
ca_key = serialization.load_pem_private_key(f.read(), None, cryptography_backend())

ca_pub_key = ca_cert.public_key()
# Calculate the key ID from the issuer's public key
key_id = x509.SubjectKeyIdentifier.from_public_key(ca_pub_key).digest

cert, private_key = build_key_and_cert(
x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, identity)]),
issuer_name=ca_cert.subject,
ca_key=ca_key,
key_id=key_id,
**kwargs)

write_key(private_key, key_path)
write_cert(cert, cert_path, chain_ca=[ca_cert]) # Store the full chain aswell


def create_symlink(*, src: pathlib.Path, dst: pathlib.Path):
if dst.exists():
# Don't do more work than we need to
Expand Down Expand Up @@ -66,7 +94,8 @@ def create_smime_signed_file(cert_path, key_path, unsigned_file_path, signed_fil
f.write(_sign_bytes(cert, private_key, content))


def build_key_and_cert(subject_name, *, ca=False, ca_key=None, issuer_name=''):
def build_key_and_cert(subject_name, *, ca=False, ca_key=None, issuer_name='',
key_id=None, path_length=None, duration_days=3650):
if not issuer_name:
issuer_name = subject_name

Expand All @@ -75,11 +104,11 @@ def build_key_and_cert(subject_name, *, ca=False, ca_key=None, issuer_name=''):
if not ca_key:
ca_key = private_key

if ca:
extension = x509.BasicConstraints(ca=True, path_length=1)
else:
extension = x509.BasicConstraints(ca=False, path_length=None)
# Verify path_length is >= 1 if creating a CA
if ca and not path_length:
path_length = 1

extension = x509.BasicConstraints(ca=ca, path_length=path_length)
utcnow = datetime.datetime.utcnow()
builder = x509.CertificateBuilder(
).issuer_name(
Expand All @@ -92,15 +121,30 @@ def build_key_and_cert(subject_name, *, ca=False, ca_key=None, issuer_name=''):
# https://github.com/ros2/ci/pull/436#issuecomment-624874296
utcnow - datetime.timedelta(days=1)
).not_valid_after(
# TODO: This should not be hard-coded
utcnow + datetime.timedelta(days=3650)
utcnow + datetime.timedelta(days=duration_days)
).public_key(
private_key.public_key()
).subject_name(
subject_name
).add_extension(
extension, critical=ca
)
# Add extension for when it's not a self signed certificate
if issuer_name != subject_name and ca:
builder = builder.add_extension(
x509.KeyUsage(
key_agreement=False,
digital_signature=True,
key_encipherment=False,
key_cert_sign=True,
crl_sign=False,
content_commitment=False,
data_encipherment=False,
encipher_only=False,
decipher_only=False
),
critical=True
)
cert = builder.sign(ca_key, hashes.SHA256(), cryptography_backend())

return (cert, private_key)
Expand All @@ -121,9 +165,15 @@ def write_key(
encryption_algorithm=encryption_algorithm))


def write_cert(cert, cert_path: pathlib.Path, *, encoding=serialization.Encoding.PEM):
def write_cert(cert, cert_path: pathlib.Path, *,
chain_ca=None,
encoding=serialization.Encoding.PEM):
with open(cert_path, 'wb') as f:
f.write(cert.public_bytes(encoding=encoding))
# Write the full chain with the certificate
if chain_ca:
for ca in chain_ca:
f.write(ca.public_bytes(encoding=encoding))


def load_cert(cert_path: pathlib.Path):
Expand Down
9 changes: 7 additions & 2 deletions sros2/sros2/keystore/_enclave.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ def create_enclave(keystore_path: pathlib.Path, identity: str) -> None:
keystore_identity_ca_key_path = _keystore.get_keystore_private_dir(
keystore_path).joinpath('identity_ca.key.pem')

# The root CA that signed the identity_ca.cert.pem
root_ca = _keystore.get_keystore_public_dir(
keystore_path).joinpath('ca.cert.pem')
# Only create certs/keys if they don't already exist
cert_path = key_dir.joinpath('cert.pem')
key_path = key_dir.joinpath('key.pem')
Expand All @@ -75,7 +78,8 @@ def create_enclave(keystore_path: pathlib.Path, identity: str) -> None:
keystore_identity_ca_key_path,
identity,
cert_path,
key_path
key_path,
root_ca=root_ca
)

# create a wildcard permissions file for this node which can be overridden
Expand Down Expand Up @@ -132,7 +136,8 @@ def _create_key_and_cert(
keystore_ca_key_path: pathlib.Path,
identity: str,
cert_path: pathlib.Path,
key_path: pathlib.Path):
key_path: pathlib.Path,
root_ca: pathlib.Path):
# Load the CA cert and key from disk
ca_cert = _utilities.load_cert(keystore_ca_cert_path)

Expand Down
42 changes: 32 additions & 10 deletions sros2/sros2/keystore/_keystore.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
_DEFAULT_COMMON_NAME = 'sros2CA'


def create_keystore(keystore_path: pathlib.Path) -> None:
def create_keystore(keystore_path: pathlib.Path, split_CA=False) -> None:
if is_valid_keystore(keystore_path):
raise sros2.errors.KeystoreExistsError(keystore_path)

Expand Down Expand Up @@ -63,13 +63,34 @@ def create_keystore(keystore_path: pathlib.Path) -> None:

# Create new CA if one doesn't already exist
if not all(x.is_file() for x in required_files):
_create_ca_key_cert(keystore_ca_key_path, keystore_ca_cert_path)

for path in (keystore_permissions_ca_cert_path, keystore_identity_ca_cert_path):
_utilities.create_symlink(src=pathlib.Path('ca.cert.pem'), dst=path)

for path in (keystore_permissions_ca_key_path, keystore_identity_ca_key_path):
_utilities.create_symlink(src=pathlib.Path('ca.key.pem'), dst=path)
if split_CA:
_create_ca_key_cert(keystore_ca_key_path, keystore_ca_cert_path, path_length=2)
# Create independent Permissions and Identity CA
_utilities.create_signed_cert(keystore_ca_cert_path,
keystore_ca_key_path,
'IdentityCA',
keystore_identity_ca_cert_path,
keystore_identity_ca_key_path,
ca=True,
path_length=1,
duration_days=5)
_utilities.create_signed_cert(keystore_ca_cert_path,
keystore_ca_key_path,
'PermissionsCA',
keystore_permissions_ca_cert_path,
keystore_permissions_ca_key_path,
ca=True,
path_length=2,
duration_days=5)
else:
_create_ca_key_cert(keystore_ca_key_path, keystore_ca_cert_path)
# Use the root CA as Permissions and Identity CA
for path in (keystore_permissions_ca_cert_path, keystore_identity_ca_cert_path):
_utilities.create_symlink(src=pathlib.Path('ca.cert.pem'), dst=path)

for path in (keystore_permissions_ca_key_path, keystore_identity_ca_key_path):
_utilities.create_symlink(src=pathlib.Path('ca.key.pem'), dst=path)

# Create governance file if it doesn't already exist
gov_path = keystore_path.joinpath(_KS_ENCLAVES, 'governance.xml')
Expand Down Expand Up @@ -108,10 +129,11 @@ def get_keystore_private_dir(keystore_path: pathlib.Path) -> pathlib.Path:
return keystore_path.joinpath(_KS_PRIVATE)


def _create_ca_key_cert(ca_key_out_path, ca_cert_out_path):
def _create_ca_key_cert(ca_key_out_path, ca_cert_out_path,
name=_DEFAULT_COMMON_NAME, path_length=1):
cert, private_key = _utilities.build_key_and_cert(
x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, _DEFAULT_COMMON_NAME)]),
ca=True)
x509.Name([x509.NameAttribute(x509.oid.NameOID.COMMON_NAME, name)]),
ca=True, path_length=path_length)

_utilities.write_key(private_key, ca_key_out_path)
_utilities.write_cert(cert, ca_cert_out_path)
Expand Down
5 changes: 4 additions & 1 deletion sros2/sros2/verb/create_keystore.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@ class CreateKeystoreVerb(VerbExtension):

def add_arguments(self, parser, cli_name) -> None:
arg = parser.add_argument('ROOT', type=pathlib.Path, help='root path of keystore')
arg = parser.add_argument('--split-CA', action='store_true', default=False,
help='splits the Certificate Authority structure to \
use multiple CAs instead of a single self-signed root CA')
arg.completer = DirectoriesCompleter()

def main(self, *, args) -> int:
try:
sros2.keystore.create_keystore(args.ROOT)
sros2.keystore.create_keystore(args.ROOT, args.split_CA)
except sros2.errors.SROS2Error as e:
print(f'Unable to create keystore: {str(e)}', file=sys.stderr)
return 1
Expand Down