Source code for pycape.cape

"""The Cape Python client.

The :class:`Cape` client uses websockets to connect to Cape enclaves that are hosting a
user's deployed functions. Before being able to run functions from the Cape client,
users must have gone through the process of developing a Cape function in Python,
deploying it from the CLI, and generating a personal access token.

All public async methods in the :class:`Cape` client interface can be used in either
synchronous or asynchronous contexts via asyncio.

**Usage**

::

    cape = Cape()

    f = cape.function("user/pythagorean")  # refer to function by name
    t = cape.token("user.token")           # load function owner's PAT
    cape.connect(f, t)

    c1 = cape.invoke(3, 4, use_serdio=True)
    print(c1)  # 5

    c2 = cape.invoke(5, 12, use_serdio=True)
    print(c2)  # 13

    cape.close()  # release the enclave connection

    # similar invocation, but async
    c3 = asyncio.run(
        cape.run(f, t, 8, 15, use_serdio=True)
    )
    print(c3)  # 17

"""
import base64
import json
import logging
import os
import pathlib
import random
import ssl
import urllib
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Union

import requests
import synchronicity
import websockets

import serdio
from pycape import _attestation as attest
from pycape import _config as cape_config
from pycape import _enclave_encrypt as enclave_encrypt
from pycape import cape_encrypt
from pycape import function_ref as fref
from pycape import token as tkn

logging.basicConfig(format="%(message)s")
_logger = logging.getLogger("pycape")
_synchronizer = synchronicity.Synchronizer(multiwrap_warning=True)


[docs]class Cape: """A websocket client for interacting with enclaves hosting Cape functions. This is the main interface for interacting with Cape functions from Python. See module-level documentation :mod:`pycape.cape` for usage example. Args: url: The Cape platform's websocket URL, which is responsible for forwarding client requests to the proper enclave instances. If None, tries to load value from the ``CAPE_ENCLAVE_HOST`` environment variable. If no such variable value is supplied, defaults to ``"https://app.capeprivacy.com"``. verbose: Boolean controlling verbose logging for the ``"pycape"`` logger. If True, sets log-level to ``DEBUG``. """ def __init__( self, url: Optional[str] = None, verbose: bool = False, ): self._url = url or cape_config.ENCLAVE_HOST self._root_cert = None self._ctx = None if verbose: _logger.setLevel(logging.DEBUG)
[docs] @_synchronizer async def close(self): """Closes the current enclave connection.""" if self._ctx is not None: await self._ctx.close() self._ctx = None
[docs] @_synchronizer async def connect( self, function_ref: Union[str, os.PathLike, fref.FunctionRef], token: Union[str, os.PathLike, tkn.Token], pcrs: Optional[Dict[str, List[str]]] = None, ): """Connects to the enclave hosting the function denoted by ``function_ref``. Note that this method creates a stateful websocket connection, which is a necessary precondition for callers of :meth:`~Cape.invoke`. When using the default Cape host, the enclave will terminate this websocket connection after 60s of inactivity. Care should be taken to close the websocket connection with :meth:`~Cape.close` once all invocations have finished. Args: function_ref: Reference to a Cape deployed function. Must be convertible to a :class:`~function_ref.FunctionRef`. See :meth:`Cape.function` for a description of recognized values. token: Personal Access Token scoped for the given Cape function. Must be convertible to :class:`~token.Token`, see :meth:`Cape.token` for a description of recognized values. pcrs: An optional dictionary of PCR indexes to a list of expected or allowed PCRs. Raises: RuntimeError: if the websocket response or the enclave attestation doc is malformed, or if the enclave fails to return a function checksum matching our own. Exception: if the enclave threw an error while trying to fulfill the connection request. """ function_ref = self.function(function_ref) token = self.token(token) await self._request_connection(function_ref, token, pcrs)
[docs] @_synchronizer async def encrypt( self, input: bytes, *, username: Optional[str] = None, key: Optional[bytes] = None, key_path: Optional[Union[str, os.PathLike]] = None, ) -> bytes: """Encrypts inputs to Cape functions in Cape's encryption format. The encrypted value can be used as input to Cape handlers by other callers of :meth:`~Cape.invoke` or :meth:`~Cape.run` without giving them plaintext access to it. The core encryption functionality uses envelope encryption; the value is AES-encrypted with an ephemeral AES key, which is itself encrypted with the Cape user's assigned RSA public key. The corresponding RSA private key is only accessible from within a Cape enclave, which guarantees secrecy of the encrypted value. See the Cape encrypt docs for further detail. Args: input: Input bytes to encrypt. username: A Github username corresponding to a Cape user who's public key you want to use for the encryption. See :meth:`Cape.key` for details. key: Optional bytes for the Cape key. If None, will delegate to calling :meth:`Cape.key` w/ the given ``key_path`` to retrieve the user's Cape key. key_path: Optional path to a locally-cached Cape key. See :meth:`Cape.key` for details. Returns: Tagged ciphertext representing a base64-encoded Cape encryption of the ``input``. Raises: ValueError: if Cape key is not a properly-formatted RSA public key. RuntimeError: if the enclave attestation doc does not contain a Cape key, if the websocket response or the attestation doc is malformed. Exception: if the enclave threw an error while trying to fulfill the connection request. """ cape_key = key or await self.key(username=username, key_path=key_path) ctxt = cape_encrypt.encrypt(input, cape_key) # cape-encrypted ctxt must be b64-encoded and tagged ctxt = base64.b64encode(ctxt) return b"cape:" + ctxt
[docs] def function( self, identifier: Union[str, os.PathLike, fref.FunctionRef], *, checksum: Optional[str] = None, ) -> fref.FunctionRef: """Convenience function for creating a :class:`~.function_ref.FunctionRef`. The ``identifier`` parameter is interepreted according to the following priority: - Filepath to a :class:`~.function_ref.FunctionRef` JSON. See :meth:`~.function_ref.FunctionRef.from_json` for expected JSON structure. - String representing a function ID - String of the form "{username}/{fn_name}" representing a function name. - A :class:`~function_ref.FunctionRef`. If its checksum is missing and a ``checksum`` argument is given, it will be added to the returned value. Args: identifier: A string identifier that can be converted into a :class:`~.function_ref.FunctionRef`. See above for options. checksum: keyword-only argument for the function checksum. Ignored if ``identifier`` points to a JSON. """ if isinstance(identifier, pathlib.Path): return fref.FunctionRef.from_json(identifier) if isinstance(identifier, str): identifier_as_path = pathlib.Path(identifier) if identifier_as_path.exists(): return fref.FunctionRef.from_json(identifier_as_path) # not a path, try to interpret as function name if len(identifier.split("/")) == 2: return fref.FunctionRef(id=None, name=identifier, checksum=checksum) # not a function name, try to interpret as function id elif len(identifier) == 22: return fref.FunctionRef(id=identifier, name=None, checksum=checksum) if isinstance(identifier, fref.FunctionRef): if checksum is None: return identifier elif identifier.checksum is None: return fref.FunctionRef( id=identifier.id, name=identifier.full_name, checksum=checksum ) else: if checksum == identifier.checksum: return identifier raise ValueError( "Checksum mismatch: given `checksum` argument conflicts with " "given FunctionRef's checksum." ) raise ValueError("Unrecognized form of `identifier` argument: {identifier}.")
[docs] @_synchronizer @_synchronizer.asynccontextmanager async def function_context( self, function_ref: Union[str, os.PathLike, fref.FunctionRef], token: Union[str, os.PathLike, tkn.Token], pcrs: Optional[Dict[str, List[str]]] = None, ): """Creates a context manager for a given ``function_ref``'s enclave connection. Note that this context manager accomplishes the same functionality as :meth:`~Cape.connect`, except that it will also automatically :meth:`~Cape.close` the connection when exiting the context. **Usage** :: cape = Cape(url="https://app.capeprivacy.com") f = cape.function("function.json) t = cape.token("pycape-dev.token") with cape.function_context(f, t): c1 = cape.invoke(3, 4, use_serdio=True) print(c1) # 5 c2 = cape.invoke(5, 12, use_serdio=True) print(c2) # 13 # websocket connection is automatically closed Args: function_ref: A function ID or :class:`~.function_ref.FunctionRef` representing a deployed Cape function. Raises: RuntimeError: if the websocket response or the enclave attestation doc is malformed, or if the enclave fails to return a function checksum matching our own. Exception: if the enclave threw an error while trying to fulfill the connection request. """ try: yield await self.connect(function_ref, token, pcrs) finally: await self.close()
[docs] @_synchronizer async def invoke( self, *args: Any, serde_hooks=None, use_serdio: bool = False, **kwargs: Any ) -> Any: """Invokes a function call from the currently connected websocket. This method assumes that the client is currently maintaining an open websocket connection to an enclave hosting a particular Cape function. Care should be taken to ensure that the function_red that spawned the connection is the correct one. The connection should be closed with :meth:`~Cape.close` once the caller is finished with its invocations. Args: *args: Arguments to pass to the connected Cape function. If ``use_serdio=False``, we expect a single argument of type ``bytes``. Otherwise, these arguments should match the positional arguments of the undecorated Cape handler, and they will be auto-serialized by Serdio before being sent in the request. serde_hooks: An optional pair of serdio encoder/decoder hooks convertible to :class:`serdio.SerdeHookBundle`. The hooks are necessary if the ``args`` / ``kwargs`` have any user-defined types that can't be handled by vanilla Serdio. See :func:`serdio.bundle_serde_hooks` for supported types. use_serdio: Boolean controlling whether or not the inputs should be auto-serialized by serdio. kwargs: Keyword arguments to be passed to the connected Cape function. These are treated the same way as the ``args`` are. Returns: If ``use_serdio=True``, returns the auto-deserialized result of calling the connected Cape function on the given ``args`` / ``kwargs``. If ``use_serdio=False``, returns the output of the Cape function as raw bytes. Raises: RuntimeError: if serialized inputs could not be HPKE-encrypted, or if websocket response is malformed. """ if serde_hooks is not None: serde_hooks = serdio.bundle_serde_hooks(serde_hooks) return await self._request_invocation(serde_hooks, use_serdio, *args, **kwargs)
[docs] @_synchronizer async def key( self, *, username: Optional[str] = None, key_path: Optional[Union[str, os.PathLike]] = None, pcrs: Optional[Dict[str, List[str]]] = None, ) -> bytes: """Load a Cape key from disk or download and persist an enclave-generated one. If no username or key_path is provided, will try to load the currently logged-in CLI user's key from a local cache. Args: username: An optional string representing the Github username of a Cape user. The resulting public key will be associated with their account, and data encrypted with this key will be available inside functions that user has deployed. key_path: The path to the Cape key file. If the file already exists, the key will be read from disk and returned. Otherwise, a Cape key will be requested from the Cape platform and written to this location. If None, the default path is ``"$HOME/.config/cape/capekey.pub.der"``, or alternatively whatever path is specified by expanding the env variables ``CAPE_LOCAL_CONFIG_DIR / CAPE_LOCAL_CAPE_KEY_FILENAME``. pcrs: A dictionary of PCR indexes to a list of potential values. Returns: Bytes containing the Cape key. The key is also cached on disk for later use. Raises: RuntimeError: if the enclave attestation doc does not contain a Cape key, if the websocket response or the attestation doc is malformed. Exception: if the enclave threw an error while trying to fulfill the connection request. """ if username is not None and key_path is not None: raise ValueError("User provided both 'username' and 'key_path' arguments.") if key_path is not None: key_path = pathlib.Path(key_path) else: config_dir = pathlib.Path(cape_config.LOCAL_CONFIG_DIR) if username is not None: # look for locally-cached user key key_qualifier = config_dir / "encryption_keys" / username else: # try to load the current CLI user's capekey key_qualifier = config_dir key_path = key_qualifier / cape_config.LOCAL_CAPE_KEY_FILENAME if key_path.exists(): with open(key_path, "rb") as f: cape_key = f.read() return cape_key if username is not None: cape_key = await self._request_key_with_username(username, pcrs=pcrs) await _persist_cape_key(cape_key, key_path) return cape_key raise ValueError( "Cannot find a Cape key in the local cache. Either specify a username or " "log into the Cape CLI and run `cape key` to locally cache your own " "account's Cape key." )
[docs] @_synchronizer async def run( self, function_ref: Union[str, os.PathLike, fref.FunctionRef], token: Union[str, os.PathLike, tkn.Token], *args: Any, pcrs: Optional[Dict[str, List[str]]] = None, serde_hooks=None, use_serdio: bool = False, **kwargs: Any, ) -> Any: """Single-shot version of connect + invoke + close. This method takes care of establishing a websocket connection via :meth:`~Cape.connect`, invoking it via :meth:`~Cape.invoke`, and then finally closing the connection with :meth:`~Cape.close`. This method should be preferred when the caller doesn't need to invoke a Cape function more than once. Args: function_ref: A value convertible to a :class:`~.function_ref.FunctionRef`, representing a deployed Cape function. See :meth:`Cape.function` for recognized values. *args: Arguments to pass to the connected Cape function. If ``use_serdio=False``, we expect a single argument of type ``bytes``. Otherwise, these arguments should match the positional arguments of the undecorated Cape handler, and they will be auto-serialized by Serdio before being sent in the request. serde_hooks: An optional pair of serdio encoder/decoder hooks convertible to :class:`serdio.SerdeHookBundle`. The hooks are necessary if the ``args`` / ``kwargs`` have any user-defined types that can't be handled by vanilla Serdio. See :func:`serdio.bundle_serde_hooks` for supported types. use_serdio: Boolean controlling whether or not the inputs should be auto-serialized by serdio. kwargs: Keyword arguments to be passed to the connected Cape function. These are treated the same way as the ``args`` are. Returns: If ``use_serdio=True``, returns the auto-deserialized result of calling the connected Cape function on the given ``args`` / ``kwargs``. If ``use_serdio=False``, returns the output of the Cape function as raw bytes. Raises: RuntimeError: if serialized inputs could not be HPKE-encrypted, or if websocket response is malformed. """ if serde_hooks is not None: serde_hooks = serdio.bundle_serde_hooks(serde_hooks) async with self.function_context(function_ref, token, pcrs): result = await self.invoke( *args, serde_hooks=serde_hooks, use_serdio=use_serdio, **kwargs ) return result
[docs] def token(self, token: Union[str, os.PathLike, tkn.Token]) -> tkn.Token: """Create or load a :class:`~token.Token`. Args: token: Filepath to a token file, or the raw token string itself. Returns: A :class:`~token.Token` that can be used to access users' deployed Cape functions. Raises: TypeError: if the ``token`` argument type is unrecognized. """ token_out = None if isinstance(token, pathlib.Path): tokenfile = token return tkn.Token.from_disk(tokenfile) if isinstance(token, str): # str could be a filename if len(token) <= 255: token_as_path = pathlib.Path(token) token_out = _try_load_token_file(token_as_path) return token_out or tkn.Token(token) if isinstance(token, tkn.Token): return token raise TypeError(f"Expected token to be PathLike or str, found {type(token)}")
async def _request_connection(self, function_ref, token, pcrs=None): if function_ref.id is not None: fn_endpoint = f"{self._url}/v1/run/{function_ref.id}" elif function_ref.full_name is not None: fn_endpoint = f"{self._url}/v1/run/{function_ref.user}/{function_ref.name}" self._root_cert = self._root_cert or attest.download_root_cert() self._ctx = _EnclaveContext( endpoint=fn_endpoint, auth_protocol="cape.runtime", auth_token=token.raw, root_cert=self._root_cert, ) attestation_doc = await self._ctx.bootstrap(pcrs) user_data = attestation_doc.get("user_data") checksum = function_ref.checksum if checksum is not None and user_data is None: # Close the connection explicitly before throwing exception await self._ctx.close() raise RuntimeError( f"No function checksum received from enclave, expected{checksum}." ) user_data_dict = json.loads(user_data) received_checksum = user_data_dict.get("func_checksum") if checksum is not None: # Checksum is hex encoded, we manipulate it to string for comparison received_checksum = str(base64.b64decode(received_checksum).hex()) if str(checksum) != str(received_checksum): # Close the connection explicitly before throwing exception await self._ctx.close() raise RuntimeError( "Returned checksum did not match provided, " f"got: {received_checksum}, want: {checksum}." ) return async def _request_invocation(self, serde_hooks, use_serdio, *args, **kwargs): # If multiple args and/or kwargs are supplied to the Cape function through # Cape.run or Cape.invoke, before serialization, we pack them # into a dictionary with the following keys: # {"cape_fn_args": <tuple_args>, "cape_fn_kwargs": <dict_kwargs>}. single_input = _maybe_get_single_input(args, kwargs) if single_input is not None: inputs = single_input elif single_input is None and not use_serdio: raise ValueError( "Expected a single input of type 'bytes' when use_serdio=False.\n" "Found:" f"\t- args: {args}" f"\t- kwargs: {kwargs}" ) if serde_hooks is not None: encoder_hook, decoder_hook = serde_hooks.unbundle() use_serdio = True else: encoder_hook, decoder_hook = None, None if use_serdio: inputs = serdio.serialize(*args, encoder=encoder_hook, **kwargs) if not isinstance(inputs, bytes): raise TypeError( f"The input type is: {type(inputs)}. Provide input as bytes or " "set use_serdio=True for PyCape to serialize your input " "with Serdio." ) result = await self._ctx.invoke(inputs) if use_serdio: result = serdio.deserialize(result, decoder=decoder_hook) return result async def _request_key_with_username( self, username: str, pcrs: Optional[Dict[str, List[str]]] = None, ) -> bytes: user_key_endpoint = f"{self._url}/v1/user/{username}/key" response = requests.get(user_key_endpoint).json() adoc_blob = response.get("attestation_document", None) if adoc_blob is None: raise RuntimeError( f"Bad response from '/v1/user/{username}/key' route, expected " f"attestation_document key-value: {response}." ) self._root_cert = self._root_cert or attest.download_root_cert() doc_bytes = base64.b64decode(adoc_blob) attestation_doc = attest.load_attestation_document(doc_bytes) not_before = attest.get_certificate_not_before(attestation_doc["certificate"]) attestation_doc = attest.parse_attestation( doc_bytes, self._root_cert, checkDate=not_before ) if pcrs is not None: attest.verify_pcrs(pcrs, attestation_doc) user_data = attestation_doc.get("user_data") user_data_dict = json.loads(user_data) cape_key = user_data_dict.get("key") if cape_key is None: raise RuntimeError( "Enclave response did not include a Cape key in attestation user data." ) return base64.b64decode(cape_key) async def _request_key_with_token( self, token: str, pcrs: Optional[Dict[str, List[str]]] = None, ) -> bytes: key_endpoint = f"{self._url}/v1/key" self._root_cert = self._root_cert or attest.download_root_cert() key_ctx = _EnclaveContext( key_endpoint, auth_protocol="cape.function", auth_token=token, root_cert=self._root_cert, ) attestation_doc = await key_ctx.bootstrap(pcrs) await key_ctx.close() # we have the attestation doc, no longer any need for ctx user_data = attestation_doc.get("user_data") user_data_dict = json.loads(user_data) cape_key = user_data_dict.get("key") if cape_key is None: raise RuntimeError( "Enclave response did not include a Cape key in attestation user data." ) return base64.b64decode(cape_key)
class _EnclaveContext: """A context managing a connection to a particular enclave instance.""" def __init__(self, endpoint, auth_protocol, auth_token, root_cert): self._endpoint = _transform_url(endpoint) self._auth_token = auth_token self._auth_protocol = auth_protocol self._root_cert = root_cert ssl_ctx = ssl.create_default_context() if cape_config.DEV_DISABLE_SSL: ssl_ctx.check_hostname = False ssl_ctx.verify_mode = ssl.CERT_NONE self._ssl_ctx = ssl_ctx # state to be explicitly created/destroyed by callers via bootstrap/close self._websocket = None self._public_key = None async def authenticate(self, nonce): request = _create_connection_request(nonce) _logger.debug("\n> Sending authentication request...") await self._websocket.send(request) _logger.debug("* Waiting for attestation document...") msg = await self._websocket.recv() _logger.debug("< Auth completed. Received attestation document.") return _parse_wss_response(msg) async def bootstrap(self, pcrs: Optional[Dict[str, List[str]]] = None): _logger.debug(f"* Dialing {self._endpoint}") self._websocket = await websockets.connect( self._endpoint, ssl=self._ssl_ctx, subprotocols=[self._auth_protocol, self._auth_token], max_size=None, ) _logger.debug("* Websocket connection established") nonce = _generate_nonce() auth_response = await self.authenticate(nonce) attestation_doc = attest.parse_attestation( auth_response, self._root_cert, nonce=nonce ) self._public_key = attestation_doc["public_key"] if pcrs is not None: attest.verify_pcrs(pcrs, attestation_doc) return attestation_doc async def close(self): if self._websocket is not None: await self._websocket.close() self._websocket = None self._public_key = None async def invoke(self, inputs: bytes) -> bytes: input_ciphertext = enclave_encrypt.encrypt(self._public_key, inputs) _logger.debug("> Sending encrypted inputs") try: await self._websocket.send(input_ciphertext) except websockets.exceptions.ConnectionClosedOK: await self.close() raise RuntimeError( "Enclave websocket connection was closed, likely due to timeout error. " "Please invoke your function more frequently to keep the connection " "alive for more than 60 seconds." ) invoke_response = await self._websocket.recv() _logger.debug("< Received function results") return _parse_wss_response(invoke_response) def _generate_nonce(length=16): """ Generates a string of digits between 0 and 9 of a given length """ nonce = "".join([str(random.randint(0, 9)) for i in range(length)]) _logger.debug(f"* Generated nonce: {nonce}") return nonce.encode() def _create_connection_request(nonce): """ Returns a json string with nonce """ request = {"message": {"nonce": base64.b64encode(nonce).decode()}} return json.dumps(request) def _parse_wss_response(response): """ Returns the inner message field received in a WebSocket message from enclave """ response = json.loads(response) if "error" in response: raise Exception(response["error"]) response_msg = _handle_expected_field( response, "message", fallback_err="Missing 'message' field in websocket response.", ) inner_msg = _handle_expected_field( response_msg, "message", fallback_err=( "Malformed websocket response contents: missing inner 'message' field." ), ) return base64.b64decode(inner_msg) def _handle_expected_field(dictionary, field, *, fallback_err=None): """ Returns value of a provided key from dictionary, optionally raising a custom RuntimeError if it's missing. """ v = dictionary.get(field, None) if v is None: if fallback_err is not None: _logger.error(fallback_err) raise RuntimeError(fallback_err) raise RuntimeError(f"Dictionary {dictionary} missing key {field}.") return v def _maybe_get_single_input(args, kwargs): single_arg = len(args) == 1 and len(kwargs) == 0 single_kwarg = len(args) == 0 and len(kwargs) == 1 if single_arg: return args[0] elif single_kwarg: return kwargs.items()[0][1] async def _persist_cape_key(cape_key: str, key_path: pathlib.Path): key_path.parent.mkdir(parents=True, exist_ok=True) with open(key_path, "wb") as f: f.write(cape_key) def _transform_url(url): url = urllib.parse.urlparse(url) if url.scheme == "https": return url.geturl().replace("https://", "wss://") elif url.scheme == "http": return url.geturl().replace("http://", "ws://") return url.geturl() def _try_load_token_file(token_file: pathlib.Path): if token_file.exists(): with open(token_file, "r") as f: token_output = f.read() return token_output