Source code for Acquire.Service._get_services


import base64 as _base64

from cachetools import cached as _cached
from cachetools import LRUCache as _LRUCache

_cache_local_serviceinfo = _LRUCache(maxsize=5)
_cache_local_serviceinfos = _LRUCache(maxsize=5)
_cache_remote_serviceinfo = _LRUCache(maxsize=20)

__all__ = ["get_trusted_service", "get_trusted_services",
           "get_remote_service", "get_checked_remote_service",
           "clear_services_cache"]


[docs]def clear_services_cache(): """Clear the caches of loaded services""" _cache_local_serviceinfo.clear() _cache_remote_serviceinfo.clear()
# Cached as the list of all trusted remote service information # will not change too often
[docs]@_cached(_cache_local_serviceinfos) def get_trusted_services(): """Return a dictionary of all trusted services indexed by their type """ from Acquire.Service import is_running_service as _is_running_service if _is_running_service(): from Acquire.Service import get_this_service as _get_this_service from Acquire.Service import Service as _Service from Acquire.Service import get_service_account_bucket as \ _get_service_account_bucket from Acquire.ObjectStore import ObjectStore as _ObjectStore from Acquire.ObjectStore import url_to_encoded as \ _url_to_encoded # we already trust ourselves service = _get_this_service() trusted_services = {} trusted_services[service.service_type()] = [service] bucket = _get_service_account_bucket() uidkey = "_trusted/uid/" datas = _ObjectStore.get_all_objects(bucket, uidkey) for data in datas: remote_service = _Service.from_data(data) if remote_service.should_refresh_keys(): # need to update the keys in our copy of the service remote_service.refresh_keys() key = "%s/%s" % (uidkey, remote_service.uid()) _ObjectStore.set_object_from_json(bucket, key, remote_service.to_data()) if remote_service.service_type() in datas: datas[remote_service.service_type()].append(remote_service) else: datas[remote_service.service_type()] = [remote_service] return datas else: # this is running on the client from Acquire.Client import Wallet as _Wallet return _Wallet.get_services()
# Cached as the remote service information will not change too often
[docs]@_cached(_cache_local_serviceinfo) def get_trusted_service(service_url=None, service_uid=None): """Return the trusted service info for the service with specified service_url or service_uid""" from Acquire.Service import is_running_service as _is_running_service if _is_running_service(): from Acquire.Service import get_this_service as _get_this_service from Acquire.Service import Service as _Service from Acquire.Service import get_service_account_bucket as \ _get_service_account_bucket from Acquire.ObjectStore import ObjectStore as _ObjectStore from Acquire.ObjectStore import url_to_encoded as \ _url_to_encoded service = _get_this_service() if service.canonical_url() == service_url: # we trust ourselves :-) return service bucket = _get_service_account_bucket() uidkey = None if service_uid is not None: uidkey = "_trusted/uid/%s" % service_uid data = _ObjectStore.get_object_from_json(bucket, uidkey) elif service_url is not None: urlkey = "_trusted/url/%s" % _url_to_encoded(service_url) uidkey = _ObjectStore.get_string_object(bucket, urlkey) if uidkey is not None: data = _ObjectStore.get_object_from_json(bucket, uidkey) else: data = None if data is None: from Acquire.Service import ServiceAccountError if service_uid is not None: raise ServiceAccountError( "We do not trust the service with UID '%s'" % service_uid) else: raise ServiceAccountError( "We do not trust the service at URL '%s'" % service_url) remote_service = _Service.from_data(data) if remote_service.should_refresh_keys(): # need to update the keys in our copy of the service remote_service.refresh_keys() if uidkey is not None: _ObjectStore.set_object_from_json(bucket, uidkey, remote_service.to_data()) return remote_service else: # this is running on the client from Acquire.Client import Service as _Service return _Service(service_url=service_url, service_uid=service_uid)
# Cached to stop us sending too many requests for info to remote services
[docs]@_cached(_cache_remote_serviceinfo) def get_remote_service(service_url): """This function returns the service info for the service at 'service_url' """ from Acquire.Crypto import get_private_key as _get_private_key from Acquire.Service import call_function as _call_function from Acquire.Service import Service as _Service key = _get_private_key("function") try: response = _call_function(service_url, response_key=key) except Exception as e: from Acquire.Service import ServiceError raise ServiceError("Cannot get information about '%s': %s" % (service_url, str(e))) from Acquire.Crypto import SignatureVerificationError \ as _SignatureVerificationError try: return _Service.from_data(response["service_info"], verify_data=True) except _SignatureVerificationError: raise except Exception as e: from Acquire.Service import ServiceError raise ServiceError( "Cannot extract service info for '%s' from '%s': %s" % (service_url, str(response), str(e)))
[docs]def get_checked_remote_service(service_url, public_cert): """This function returns the service info for the service at 'service_url'. This checks that the info has been signed correctly by the passed public certificate """ from Acquire.Crypto import get_private_key as _get_private_key from Acquire.Service import call_function as _call_function from Acquire.Service import Service as _Service key = _get_private_key("function") try: response = _call_function(service_url, response_key=key, public_cert=public_cert) except Exception as e: from Acquire.Service import ServiceError raise ServiceError("Cannot get information about '%s': %s" % (service_url, str(e))) from Acquire.Crypto import SignatureVerificationError \ as _SignatureVerificationError try: return _Service.from_data(response["service_info"], verify_data=True) except _SignatureVerificationError: raise except Exception as e: from Acquire.Service import ServiceError raise ServiceError( "Cannot extract service info for '%s' from '%s': %s" & (service_url, str(response), str(e)))