Source code for cape_encrypt.cape_encrypt

"""
Cape Encrypt functionality for use within functions deployed to run in a Cape Enclave.

This provides the ability to be able to encrypt or decrypt information solely within a
Cape Enclave using the Cape Key associated with the function owner. Functions can then
encrypt data generated during the execution of the function or have fine grained
control over decrypting inputs.
"""
import base64
import json
import socket


[docs]class ExecutionError(Exception): """Server reports an error."""
[docs]class ConnectionError(Exception): """An issue arose in the communication with the server."""
def _call(req: str) -> bytes: """RPC through unix domain socket to Cape Enclave to request operations with the Cape Key. """ buffer_size = 10485760 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.connect("../rpc.sock") sock.sendall(req) response = sock.recv(buffer_size) if response == "": raise ConnectionError("unexpected connection error, invalid response") return response
[docs]def encrypt(plaintext: bytes) -> bytes: """Encrypt a plaintext with a Cape Key within a Cape Enclave. This function is intended only for use within a function deployed in a Cape Enclave. It uses envelope encryption. The plaintext is first AES-encrypted with anephemeral AES key, and then this key is itself encrypted with the Cape Key associated with the Cape account that owns the function. Args: plaintext: bytes to encrypt. Returns: Bytes representing the base64 encoded encryption of the ``plaintext``. The bytes are a concatenation of the AES-ciphertext of the ``plaintext``, an AES nonce, and the RSA-ciphertext of the AES key prefixed by ``b"cape:"`` Raises: TypeError: if the input is not of the correct type ValueError: if the input is empty ConnectionError: if an error is thrown from the socket connection ExecutionError: if a server error is reported during the remote encryption process """ if not isinstance(plaintext, (bytes, bytearray)): raise TypeError("input is required to be valid bytes") if plaintext == b"": raise ValueError("input is empty") b64plaintext = base64.standard_b64encode(plaintext).decode("utf-8") response = _call( json.dumps( {"id": 1, "method": "CapeEncryptRPC.Encrypt", "params": [b64plaintext]} ).encode() ) payload = json.loads(response) if payload["error"] is not None: raise ExecutionError(payload["error"]) return bytes(payload["result"], "utf-8")
[docs]def decrypt(ciphertext: bytes) -> bytes: """Decrypt a plaintext with a Cape Key within a Cape Enclave. This function is intended only for use within a function deployed in a Cape Enclave. This function utilizes the Cape Key associated to the function's owner to decrypt previously Cape Encrypted input. Args: b64ciphertext: Base64 encoded bytes of a previously Cape Encrypted plaintext, prefixed with ``b"cape:"`` Returns: Bytes represeting the plaintext result of the decrypted ciphertext Raises: TypeError: if the input is not of the correct type ValueError: if the input is formatted incorrectly or empty ConnectionError: if an error is thrown from the socket connection ExecutionError: if a server error is reported during the remote encryption process """ prefix = b"cape:" if not isinstance(ciphertext, (bytes, bytearray)): raise TypeError("input is required to be valid bytes") if not bytes.startswith(ciphertext, prefix): raise ValueError( "input must be a valid Cape encrypted value prefixed with 'cape:'" ) if len(bytes.removeprefix(ciphertext, prefix)) == 0: raise ValueError("input is empty") response = _call( json.dumps( { "id": 1, "method": "CapeEncryptRPC.Decrypt", "params": [ciphertext.decode("utf-8")], } ).encode() ) payload = json.loads(response) if payload["error"] is not None: raise ExecutionError(payload["error"]) return base64.standard_b64decode(payload["result"])