"""The Cape Python client.
The :class:`Cape` client uses websockets to connect to Cape enclaves that are hosting a
user's deployed functions. Before being able to run functions from the Cape client,
users must have gone through the process of developing a Cape function in Python and
deploying it from the CLI and generating a function token.
The majority of the :class:`Cape` client interface can be used in either synchronous or
asynchronous contexts via asyncio.
**Usage**
::
cape = Cape()
f = FunctionRef.from_json("function.json")
cape.connect(f)
c1 = cape.invoke(3, 4, use_serdio=True)
print(c1) # 5
c2 = cape.invoke(5, 12, use_serdio=True)
print(c2) # 13
cape.close() # release the enclave connection
# similar invocation, but async
c3 = asyncio.run(
cape.run("9712r5dynf57l1rcns2", 8, 15, use_serdio=True)
)
print(c3) # 17
"""
import base64
import json
import logging
import os
import pathlib
import random
import ssl
import urllib
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Union
import synchronicity
import websockets
import serdio
from pycape import _attestation as attest
from pycape import _config as cape_config
from pycape import _enclave_encrypt as enclave_encrypt
from pycape import cape_encrypt
from pycape import function_ref as fref
logging.basicConfig(format="%(message)s")
_logger = logging.getLogger("pycape")
_synchronizer = synchronicity.Synchronizer(multiwrap_warning=True)
[docs]class Cape:
"""A websocket client for interacting with enclaves hosting Cape functions.
This is the main interface for interacting with Cape functions from Python.
See module-level documentation :mod:`pycape.cape` for usage example.
Args:
url: The Cape platform's websocket URL, which is responsible for forwarding
client requests to the proper enclave instances. If None, tries to load
value from the ``CAPE_ENCLAVE_HOST`` environment variable. If no such
variable value is supplied, defaults to ``"https://app.capeprivacy.com"``.
verbose: Boolean controlling verbose logging for the ``"pycape"`` logger.
If True, sets log-level to ``DEBUG``.
"""
def __init__(
self,
url: Optional[str] = None,
verbose: bool = False,
):
self._url = url or cape_config.ENCLAVE_HOST
self._root_cert = None
self._ctx = None
if verbose:
_logger.setLevel(logging.DEBUG)
[docs] @_synchronizer
async def close(self):
"""Closes the current enclave connection."""
if self._ctx is not None:
await self._ctx.close()
self._ctx = None
[docs] @_synchronizer
async def connect(
self,
function_ref: fref.FunctionRef,
pcrs: Optional[Dict[str, List[str]]] = None,
):
"""Connects to the enclave hosting the function denoted by ``function_ref``.
Note that this method creates a stateful websocket connection, which is a
necessary precondition for callers of :meth:`~Cape.invoke`. When using the
default Cape host, the enclave will terminate this websocket connection after
60s of inactivity. Care should be taken to close the websocket connection with
:meth:`~Cape.close` once all invocations have finished.
Args:
function_ref: A function ID string or :class:`~.function_ref.FunctionRef`
representing a deployed Cape function.
pcrs: A dictionary of PCR indexes to a list of potential values.
Raises:
RuntimeError: if the websocket response or the enclave attestation doc is
malformed, or if the enclave fails to return a function checksum
matching our own.
Exception: if the enclave threw an error while trying to fulfill the
connection request.
"""
function_ref = _check_function_ref(function_ref)
await self._request_connection(function_ref, pcrs)
[docs] @_synchronizer
async def encrypt(
self,
input: bytes,
token: str,
key: Optional[bytes] = None,
key_path: Optional[Union[str, os.PathLike]] = None,
) -> bytes:
"""Encrypts inputs to Cape functions in Cape's encryption format.
The encrypted value can be used as input to Cape handlers by other callers of
:meth:`~Cape.invoke` or :meth:`~Cape.run` without giving them plaintext access
to it. The core encryption functionality uses envelope encryption; the value is
AES-encrypted with an ephemeral AES key, which is itself encrypted with the Cape
user's assigned RSA public key. The corresponding RSA private key is only
accessible from within a Cape enclave, which guarantees secrecy of the encrypted
value. See the Cape encrypt docs for further detail.
Args:
input: Input bytes to encrypt.
token: A Cape token scoped for retrieving a user's Cape public key.
See :meth:`~Cape.key` for details.
key: Optional bytes for the Cape key. If None, will delegate to calling
:meth:`Cape.key` w/ the given ``key_path`` to retrieve the user's Cape
key.
key_path: Optional path to a locally-cached Cape key. Used to call
:meth:`Cape.key` when an explicit ``key`` argument is not provided.
Returns:
Tagged ciphertext representing a base64-encoded Cape encryption of the
``input``.
Raises:
ValueError: if Cape key is not a properly-formatted RSA public key.
RuntimeError: if the enclave attestation doc does not contain a Cape key,
if the websocket response or the attestation doc is malformed.
Exception: if the enclave threw an error while trying to fulfill the
connection request.
"""
cape_key = key or await self.key(token, key_path)
ctxt = cape_encrypt.encrypt(input, cape_key)
# cape-encrypted ctxt must be b64-encoded and tagged
ctxt = base64.b64encode(ctxt)
return b"cape:" + ctxt
[docs] @_synchronizer
@_synchronizer.asynccontextmanager
async def function_context(
self,
function_ref: fref.FunctionRef,
pcrs: Optional[Dict[str, List[str]]] = None,
):
"""Creates a context manager for a given ``function_ref``'s enclave connection.
Note that this context manager accomplishes the same functionality as
:meth:`~Cape.connect`, except that it will also automatically
:meth:`~Cape.close` the connection when exiting the context.
**Usage** ::
cape = Cape(url="https://app.capeprivacy.com")
f = FunctionRef.from_json("function.json")
with cape.function_context(f):
c1 = cape.invoke(3, 4, use_serdio=True)
print(c1) # 5
c2 = cape.invoke(5, 12, use_serdio=True)
print(c2) # 13
# websocket connection is automatically closed
Args:
function_ref: A function ID or :class:`~.function_ref.FunctionRef`
representing a deployed Cape function.
Raises:
RuntimeError: if the websocket response or the enclave attestation doc is
malformed, or if the enclave fails to return a function checksum
matching our own.
Exception: if the enclave threw an error while trying to fulfill the
connection request.
"""
try:
yield await self.connect(function_ref, pcrs)
finally:
await self.close()
[docs] @_synchronizer
async def invoke(
self, *args: Any, serde_hooks=None, use_serdio: bool = False, **kwargs: Any
) -> Any:
"""Invokes a function call from the currently connected websocket.
This method assumes that the client is currently maintaining an open websocket
connection to an enclave hosting a particular Cape function. Care should be
taken to ensure that the function_red that spawned the connection is the
correct one. The connection should be closed with :meth:`~Cape.close` once the
caller is finished with its invocations.
Args:
*args: Arguments to pass to the connected Cape function. If
``use_serdio=False``, we expect a single argument of type ``bytes``.
Otherwise, these arguments should match the positional arguments
of the undecorated Cape handler, and they will be auto-serialized by
Serdio before being sent in the request.
serde_hooks: An optional pair of serdio encoder/decoder hooks convertible
to :class:`serdio.SerdeHookBundle`. The hooks are necessary if the
``args`` / ``kwargs`` have any user-defined types that can't be handled
by vanilla Serdio. See :func:`serdio.bundle_serde_hooks` for supported
types.
use_serdio: Boolean controlling whether or not the inputs should be
auto-serialized by serdio.
kwargs: Keyword arguments to be passed to the connected Cape function.
These are treated the same way as the ``args`` are.
Returns:
If ``use_serdio=True``, returns the auto-deserialized result of calling the
connected Cape function on the given ``args`` / ``kwargs``.
If ``use_serdio=False``, returns the output of the Cape function as raw
bytes.
Raises:
RuntimeError: if serialized inputs could not be HPKE-encrypted, or if
websocket response is malformed.
"""
if serde_hooks is not None:
serde_hooks = serdio.bundle_serde_hooks(serde_hooks)
return await self._request_invocation(serde_hooks, use_serdio, *args, **kwargs)
[docs] @_synchronizer
async def key(
self,
token: str,
key_path: Optional[Union[str, os.PathLike]] = None,
pcrs: Optional[Dict[str, List[str]]] = None,
) -> bytes:
"""Load a Cape key from disk or download and persist an enclave-generated one.
Args:
token: A string representing a Cape authentication token. Usually retrieved
from a :class:`~.function_ref.FunctionRef` via :attr:`FunctionRef.token`
key_path: The path to the Cape key file. If the file already exists, the key
will be read from disk and returned. Otherwise, a Cape key will be
requested from the Cape platform and written to this location.
If None, the default path is ``"$HOME/.config/cape/capekey.pub.der"``,
or alternatively whatever path is specified by expanding the env
variables ``CAPE_LOCAL_CONFIG_DIR / CAPE_LOCAL_CAPE_KEY_FILENAME``.
pcrs: A dictionary of PCR indexes to a list of potential values.
Returns:
Bytes containing the Cape key. The key is also cached on disk for later
use.
Raises:
RuntimeError: if the enclave attestation doc does not contain a Cape key,
if the websocket response or the attestation doc is malformed.
Exception: if the enclave threw an error while trying to fulfill the
connection request.
"""
if key_path is None:
config_dir = pathlib.Path(cape_config.LOCAL_CONFIG_DIR)
key_path = (
config_dir
/ "encryption_keys"
/ token[-200:] # Shorten file name to avoid Errno 63 when saving file
/ cape_config.LOCAL_CAPE_KEY_FILENAME
)
else:
key_path = pathlib.Path(key_path)
if key_path.exists():
with open(key_path, "rb") as f:
cape_key = f.read()
else:
cape_key = await self._request_key(token, key_path, pcrs=pcrs)
return cape_key
[docs] @_synchronizer
async def run(
self,
function_ref: fref.FunctionRef,
*args: Any,
pcrs: Optional[Dict[str, List[str]]] = None,
serde_hooks=None,
use_serdio: bool = False,
**kwargs: Any,
) -> Any:
"""Single-shot version of connect + invoke + close.
This method takes care of establishing a websocket connection via
:meth:`~Cape.connect`, invoking it via :meth:`~Cape.invoke`, and then finally
closing the connection with :meth:`~Cape.close`. This method should be
preferred when the caller doesn't need to invoke a Cape function more than once.
Args:
function_ref: A :class:`~.function_ref.FunctionRef` representing
a deployed Cape function.
*args: Arguments to pass to the connected Cape function. If
``use_serdio=False``, we expect a single argument of type ``bytes``.
Otherwise, these arguments should match the positional arguments
of the undecorated Cape handler, and they will be auto-serialized by
Serdio before being sent in the request.
serde_hooks: An optional pair of serdio encoder/decoder hooks convertible
to :class:`serdio.SerdeHookBundle`. The hooks are necessary if the
``args`` / ``kwargs`` have any user-defined types that can't be handled
by vanilla Serdio. See :func:`serdio.bundle_serde_hooks` for supported
types.
use_serdio: Boolean controlling whether or not the inputs should be
auto-serialized by serdio.
kwargs: Keyword arguments to be passed to the connected Cape function.
These are treated the same way as the ``args`` are.
Returns:
If ``use_serdio=True``, returns the auto-deserialized result of calling the
connected Cape function on the given ``args`` / ``kwargs``.
If ``use_serdio=False``, returns the output of the Cape function as raw
bytes.
Raises:
RuntimeError: if serialized inputs could not be HPKE-encrypted, or if
websocket response is malformed.
"""
function_ref = _check_function_ref(function_ref)
if serde_hooks is not None:
serde_hooks = serdio.bundle_serde_hooks(serde_hooks)
async with self.function_context(function_ref, pcrs):
result = await self.invoke(
*args, serde_hooks=serde_hooks, use_serdio=use_serdio, **kwargs
)
return result
async def _request_connection(self, function_ref, pcrs=None):
fn_endpoint = f"{self._url}/v1/run/{function_ref.id}"
self._root_cert = self._root_cert or attest.download_root_cert()
self._ctx = _EnclaveContext(
endpoint=fn_endpoint,
auth_protocol="cape.function",
auth_token=function_ref.token,
root_cert=self._root_cert,
)
attestation_doc = await self._ctx.bootstrap(pcrs)
user_data = attestation_doc.get("user_data")
checksum = function_ref.checksum
if checksum is not None and user_data is None:
# Close the connection explicitly before throwing exception
await self._ctx.close()
raise RuntimeError(
f"No function checksum received from enclave, expected{checksum}."
)
user_data_dict = json.loads(user_data)
received_checksum = user_data_dict.get("func_checksum")
if checksum is not None:
# Checksum is hex encoded, we manipulate it to string for comparison
received_checksum = str(base64.b64decode(received_checksum).hex())
if str(checksum) != str(received_checksum):
# Close the connection explicitly before throwing exception
await self._ctx.close()
raise RuntimeError(
"Returned checksum did not match provided, "
f"got: {received_checksum}, want: {checksum}."
)
return
async def _request_invocation(self, serde_hooks, use_serdio, *args, **kwargs):
# If multiple args and/or kwargs are supplied to the Cape function through
# Cape.run or Cape.invoke, before serialization, we pack them
# into a dictionary with the following keys:
# {"cape_fn_args": <tuple_args>, "cape_fn_kwargs": <dict_kwargs>}.
single_input = _maybe_get_single_input(args, kwargs)
if single_input is not None:
inputs = single_input
elif single_input is None and not use_serdio:
raise ValueError(
"Expected a single input of type 'bytes' when use_serdio=False.\n"
"Found:"
f"\t- args: {args}"
f"\t- kwargs: {kwargs}"
)
if serde_hooks is not None:
encoder_hook, decoder_hook = serde_hooks.unbundle()
use_serdio = True
else:
encoder_hook, decoder_hook = None, None
if use_serdio:
inputs = serdio.serialize(*args, encoder=encoder_hook, **kwargs)
if not isinstance(inputs, bytes):
raise TypeError(
f"The input type is: {type(inputs)}. Provide input as bytes or "
"set use_serdio=True for PyCape to serialize your input "
"with Serdio."
)
result = await self._ctx.invoke(inputs)
if use_serdio:
result = serdio.deserialize(result, decoder=decoder_hook)
return result
async def _request_key(
self,
token: str,
key_path: pathlib.Path,
pcrs: Optional[Dict[str, List[str]]] = None,
) -> bytes:
key_endpoint = f"{self._url}/v1/key"
self._root_cert = self._root_cert or attest.download_root_cert()
key_ctx = _EnclaveContext(
key_endpoint,
auth_protocol="cape.function",
auth_token=token,
root_cert=self._root_cert,
)
attestation_doc = await key_ctx.bootstrap(pcrs)
await key_ctx.close() # we have the attestation doc, no longer any need for ctx
user_data = attestation_doc.get("user_data")
user_data_dict = json.loads(user_data)
cape_key = user_data_dict.get("key")
if cape_key is None:
raise RuntimeError(
"Enclave response did not include a Cape key in attestation user data."
)
cape_key = base64.b64decode(cape_key)
await _persist_cape_key(cape_key, key_path)
return cape_key
class _EnclaveContext:
"""A context managing a connection to a particular enclave instance."""
def __init__(self, endpoint, auth_protocol, auth_token, root_cert):
self._endpoint = _transform_url(endpoint)
self._auth_token = auth_token
self._auth_protocol = auth_protocol
self._root_cert = root_cert
ssl_ctx = ssl.create_default_context()
if cape_config.DEV_DISABLE_SSL:
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE
self._ssl_ctx = ssl_ctx
# state to be explicitly created/destroyed by callers via bootstrap/close
self._websocket = None
self._public_key = None
async def authenticate(self):
nonce = _generate_nonce()
request = _create_connection_request(nonce)
_logger.debug("\n> Sending authentication request...")
await self._websocket.send(request)
_logger.debug("* Waiting for attestation document...")
msg = await self._websocket.recv()
_logger.debug("< Auth completed. Received attestation document.")
return _parse_wss_response(msg)
async def bootstrap(self, pcrs: Optional[Dict[str, List[str]]] = None):
_logger.debug(f"* Dialing {self._endpoint}")
self._websocket = await websockets.connect(
self._endpoint,
ssl=self._ssl_ctx,
subprotocols=[self._auth_protocol, self._auth_token],
max_size=None,
)
_logger.debug("* Websocket connection established")
auth_response = await self.authenticate()
attestation_doc = attest.parse_attestation(auth_response, self._root_cert)
self._public_key = attestation_doc["public_key"]
if pcrs is not None:
attest.verify_pcrs(pcrs, attestation_doc)
return attestation_doc
async def close(self):
if self._websocket is not None:
await self._websocket.close()
self._websocket = None
self._public_key = None
async def invoke(self, inputs: bytes) -> bytes:
input_ciphertext = enclave_encrypt.encrypt(self._public_key, inputs)
_logger.debug("> Sending encrypted inputs")
try:
await self._websocket.send(input_ciphertext)
except websockets.exceptions.ConnectionClosedOK:
await self.close()
raise RuntimeError(
"Enclave websocket connection was closed, likely due to timeout error. "
"Please invoke your function more frequently to keep the connection "
"alive for more than 60 seconds."
)
invoke_response = await self._websocket.recv()
_logger.debug("< Received function results")
return _parse_wss_response(invoke_response)
# TODO What should be the length?
def _generate_nonce(length=8):
"""
Generates a string of digits between 0 and 9 of a given length
"""
nonce = "".join([str(random.randint(0, 9)) for i in range(length)])
_logger.debug(f"* Generated nonce: {nonce}")
return nonce
def _create_connection_request(nonce):
"""
Returns a json string with nonce
"""
request = {"message": {"nonce": nonce}}
return json.dumps(request)
def _parse_wss_response(response):
"""
Returns the inner message field received in a WebSocket message from enclave
"""
response = json.loads(response)
if "error" in response:
raise Exception(response["error"])
response_msg = _handle_expected_field(
response,
"message",
fallback_err="Missing 'message' field in websocket response.",
)
inner_msg = _handle_expected_field(
response_msg,
"message",
fallback_err=(
"Malformed websocket response contents: missing inner 'message' field."
),
)
return base64.b64decode(inner_msg)
def _handle_expected_field(dictionary, field, *, fallback_err=None):
"""
Returns value of a provided key from dictionary, optionally raising
a custom RuntimeError if it's missing.
"""
v = dictionary.get(field, None)
if v is None:
if fallback_err is not None:
_logger.error(fallback_err)
raise RuntimeError(fallback_err)
raise RuntimeError(f"Dictionary {dictionary} missing key {field}.")
return v
def _check_function_ref(function_ref):
"""
Check if the FunctionRef object contains a function ID and token.
"""
if isinstance(function_ref, fref.FunctionRef):
if function_ref.id is None:
raise ValueError("The function ID in the provided FunctionRef is empty.")
if function_ref.token is None:
raise ValueError("The function token in the provided FunctionRef is empty.")
return function_ref
else:
raise TypeError(
"`function_ref` arg must be a FunctionRef object, "
f" found {type(function_ref)}."
)
def _maybe_get_single_input(args, kwargs):
single_arg = len(args) == 1 and len(kwargs) == 0
single_kwarg = len(args) == 0 and len(kwargs) == 1
if single_arg:
return args[0]
elif single_kwarg:
return kwargs.items()[0][1]
async def _persist_cape_key(cape_key: str, key_path: pathlib.Path):
key_path.parent.mkdir(parents=True, exist_ok=True)
with open(key_path, "wb") as f:
f.write(cape_key)
def _transform_url(url):
url = urllib.parse.urlparse(url)
if url.scheme == "https":
return url.geturl().replace("https://", "wss://")
elif url.scheme == "http":
return url.geturl().replace("http://", "ws://")
return url.geturl()