Source code for Acquire.Registry._registry


__all__ = ["Registry"]

_registry_key = "registry"


def _inc_uid(vals):
    import copy as _copy
    vals = _copy.copy(vals)

    for j in range(0, len(vals)):
        i = len(vals) - j - 1
        vals[i] += 1

        if i % 2 == 1:
            if vals[i] < 10:
                return vals
            else:
                vals[i] = 0
        else:
            if vals[i] < 52:
                return vals
            else:
                vals[i] = 0

    # we only get here if we have overflowed. In this
    # case, add an extra pair of digits
    vals = [0] * (len(vals) + 2)

    return vals


def _to_uid(vals):
    import string as _string
    parts = []
    for i in range(0, len(vals)):
        x = vals[i]
        if i % 2 == 1:
            if x < 0 or x > 9:
                raise ValueError(x)
            else:
                parts.append(str(x))
        else:
            if x < 0 or x > 51:
                raise ValueError(x)
            elif x < 26:
                parts.append(_string.ascii_lowercase[x])
            else:
                parts.append(_string.ascii_uppercase[x-26])

    s = "".join(parts)
    return ".".join([s[i:i+2] for i in range(0, len(s), 2)])


def _generate_service_uid(bucket, registry_uid):
    """Function to generate a new service_uid on this registry.

       The UIDs have the form a0-a0, when "a" is any letter from [a-zA-Z]
       and "0" is any number from [0-9]. This give 520 possible values
       for each part either side of the hyphen.

       The part on the left of the hypen is the root UID, which
       matches the root of the service_uid of the registry service
       that registered this service (the service_uid of a registry
       service has the UID root-root).

       If more than 520 values are needed, then either side of the
       ID can be extended by additional pairs of a0 digits, using
       a "." to separate pairs, e.g.

       the service_uid for registry b4-b4 that comes after
       b4-Z9.Z9.Z9 is b4-a0.a0.a0.a0

       similarly, the registry after Z9 is A0-A0.

       This means that

       a0.a0-a0.a0.a0.a0

       would be a perfectly valid ID. We would only need IDs of this
       length if we have ~270k registry services, and this service_uid
       came from a service that had registered ~73 trillion services...

       The registry root Z9, with registry Z9-Z9 is reserved for
       the temporary registry created during testing
    """
    from Acquire.ObjectStore import ObjectStore as _ObjectStore
    from Acquire.ObjectStore import Mutex as _Mutex

    root = registry_uid.split("-")[0]

    key = "%s/last_service_uid" % _registry_key

    mutex = _Mutex(key=key)

    try:
        last_vals = _ObjectStore.get_object_from_json(bucket=bucket,
                                                      key=key)
        last_vals = _inc_uid(last_vals)
    except:
        last_vals = [0, 0]

    service_uid = "%s-%s" % (root, _to_uid(last_vals))

    while service_uid == registry_uid:
        last_vals = _inc_uid(last_vals)
        service_uid = "%s-%s" % (root, _to_uid(last_vals))

    _ObjectStore.set_object_from_json(bucket=bucket, key=key, data=last_vals)
    mutex.unlock()

    return service_uid


[docs]class Registry: """This class holds the registry of all services registered by this service. Registries provided trusted actors who can supply public keys, URLs, and UIDs for all of the different services in the system. """ def __init__(self): """Constructor""" self._bucket = None from Acquire.Service import get_this_service as _get_this_service self._registry_uid = _get_this_service( need_private_access=False).uid()
[docs] def get_registry_uid(self, service_uid): """Return the UID of the registry that initially registered the service with UID 'service_uid'. """ parts = service_uid.split("-") return "%s-%s" % (parts[0], parts[0])
[docs] def get_bucket(self): if self._bucket: return self._bucket else: from Acquire.Service import get_service_account_bucket \ as _get_service_account_bucket self._bucket = _get_service_account_bucket() return self._bucket
[docs] def registry_root(self): """Return the root ID of this registry""" return self._registry_uid.split("-")[0]
[docs] def registry_uid(self): """Return the UID of this registry""" return self._registry_uid
[docs] def is_test_registry(self): """Return whether or not this is a test registry (only used while testing the code) """ return self._registry_uid == "Z9-Z9"
def _get_key_for_uid(self, service_uid): return "%s/uid/%s" % (_registry_key, service_uid) def _get_uid_from_key(self, key): return key.split("/")[-1] def _get_domain(self, url): """Return the domain name of the server in the passed url""" from urllib.parse import urlparse as _urlparse try: domain = _urlparse(url).netloc.split(":")[0] except: domain = None if domain is None or len(domain) == 0: if self.is_test_registry(): return "testing" else: raise ValueError("Cannot extract the domain from '%s'" % url) return domain def _get_root_key_for_domain(self, domain): from Acquire.ObjectStore import string_to_encoded \ as _string_to_encoded return "%s/domain/%s" % (_registry_key, _string_to_encoded(domain)) def _get_key_for_url(self, service_url): from Acquire.ObjectStore import string_to_encoded \ as _string_to_encoded return "%s/url/%s" % (_registry_key, _string_to_encoded(service_url))
[docs] def get_service_key(self, service_uid=None, service_url=None): """Return the key for the passed service in the object store""" if service_uid is not None: return self._get_key_for_uid(service_uid) else: bucket = self.get_bucket() key = self._get_key_for_url(service_url) try: from Acquire.ObjectStore import ObjectStore as _ObjectStore service_key = _ObjectStore.get_string_object(bucket=bucket, key=key) except: service_key = None return service_key
[docs] def challenge_service(self, service): """Send a challenge to the passed service, returning the actual service returned. This will only pass if our copy of the service matches us with the copy returned from the actual service. This verifies that there is a real service sitting at that URL, and that we have the right keys and certs """ from Acquire.Crypto import PrivateKey as _PrivateKey from Acquire.ObjectStore import bytes_to_string as _bytes_to_string from Acquire.Service import Service as _Service challenge = _PrivateKey.random_passphrase() pubkey = service.public_key() encrypted_challenge = pubkey.encrypt(challenge) args = {"challenge": _bytes_to_string(encrypted_challenge), "fingerprint": pubkey.fingerprint()} if service.uid().startswith("STAGE"): # we cannot call using the service directly, as it is # not yet fully constructed from Acquire.Service import get_this_service as _get_this_service from Acquire.Service import call_function as _call_function this_service = _get_this_service(need_private_access=True) result = _call_function(service_url=service.service_url(), function=None, args=args, args_key=service.public_key(), response_key=this_service.private_key(), public_cert=service.public_certificate()) else: result = service.call_function(function=None, args=args) if result["response"] != challenge: raise PermissionError( "Failure of the service %s to correctly respond " "to the challenge!" % service) return _Service.from_data(result["service_info"])
[docs] def get_service(self, service_uid=None, service_url=None): """Load and return the service with specified url or uid from the registry. This will consult with other registry services to find the matching service """ from Acquire.ObjectStore import ObjectStore as _ObjectStore from Acquire.Service import Service as _Service from Acquire.ObjectStore import string_to_encoded \ as _string_to_encoded from Acquire.Service import get_this_service as _get_this_service this_service = _get_this_service(need_private_access=False) if service_url is not None: from Acquire.Service import Service as _Service service_url = _Service.get_canonical_url(service_url) if this_service.uid() == service_uid: return this_service elif this_service.canonical_url() == service_url: return this_service bucket = self.get_bucket() service_key = self.get_service_key(service_uid=service_uid, service_url=service_url) service = None if service_key is not None: try: data = _ObjectStore.get_object_from_json(bucket=bucket, key=service_key) service = _Service.from_data(data) except: pass if service is not None: must_write = False if service.uid() == "STAGE1": # we need to directly ask the service for its info service = self.challenge_service(service) if service.uid() == "STAGE1": from Acquire.Service import MissingServiceError raise MissingServiceError( "Service %s|%s not available as it is still under " "construction!" % (service_uid, service)) # we can now move this service from pending to active uidkey = self._get_key_for_uid(service.uid()) domain = self._get_domain(service.service_url()) domainroot = self._get_root_key_for_domain(domain=domain) pending_key = "%s/pending/%s" % (domainroot, service.uid()) active_key = "%s/active/%s" % (domainroot, service.uid()) try: _ObjectStore.delete_object(bucket=bucket, key=pending_key) except: pass try: _ObjectStore.set_string_object(bucket=bucket, key=active_key, string_data=uidkey) except: pass must_write = True elif service.should_refresh_keys(): service.refresh_keys() must_write = True if must_write: data = service.to_data() _ObjectStore.set_object_from_json(bucket=bucket, key=service_key, data=data) return service # we only get here if we can't find the service on this registry. # In the future, we will use the initial part of the UID of # the service to ask its registering registry for its data. # For now, we just raise an error from Acquire.Service import MissingServiceError raise MissingServiceError( "No service available: service_url=%s service_uid=%s" % (service_url, service_uid))
[docs] def register_service(self, service, force_new_uid=False): """Register the passed service""" from Acquire.Service import Service as _Service from Acquire.ObjectStore import ObjectStore as _ObjectStore if not isinstance(service, _Service): raise TypeError("You can only register Service objects") if service.uid() != "STAGE1": raise PermissionError("You cannot register a service twice!") # first, stop a single domain monopolising resources... bucket = self.get_bucket() domain = self._get_domain(service.service_url()) domainroot = self._get_root_key_for_domain(domain=domain) try: pending_keys = _ObjectStore.get_all_object_names( bucket=bucket, prefix="%s/pending/" % domainroot) num_pending = len(pending_keys) except: num_pending = 0 if num_pending >= 4: raise PermissionError( "You cannot register a new service as you have reached " "the quota (4) for the number of pending services registered " "against the domain '%s'. Please get some of these services " "so that you can make them active." % domain) try: active_keys = _ObjectStore.get_all_object_names( bucket=bucket, prefix="%s/active/" % domainroot) num_active = len(active_keys) except: num_active = 0 if num_active + num_pending >= 16: raise PermissionError( "You cannot register a new service as you have reached " "the quota (16) for the number registered against the " "domain '%s'" % domain) # first, challenge the service to ensure that it exists # and our keys are correct service = self.challenge_service(service) if service.uid() != "STAGE1": raise PermissionError("You cannot register a service twice!") bucket = self.get_bucket() urlkey = self._get_key_for_url(service.canonical_url()) try: uidkey = _ObjectStore.get_string_object(bucket=bucket, key=urlkey) except: uidkey = None service_uid = None if uidkey is not None: # there is already a service registered at this domain. Since # we have successfully challenged the service, this must be # someone re-bootstrapping a service. It is safe to give them # back their UID if requested if not force_new_uid: service_uid = self._get_uid_from_key(uidkey) if service_uid is None: # how many services from this domain are still pending? service_uid = _generate_service_uid( bucket=self.get_bucket(), registry_uid=self.registry_uid()) # save this service to the object store uidkey = self._get_key_for_uid(service_uid) _ObjectStore.set_object_from_json(bucket=bucket, key=uidkey, data=service.to_data()) _ObjectStore.set_string_object(bucket=bucket, key=urlkey, string_data=uidkey) domainkey = self._get_root_key_for_domain(domain=domain) _ObjectStore.set_string_object( bucket=bucket, key="%s/pending/%s" % (domainkey, service_uid), string_data=uidkey) return service_uid
def _get_key_for_uid(self, service_uid): return "%s/uid/%s" % (_registry_key, service_uid) def _get_key_for_url(self, service_url): from Acquire.ObjectStore import string_to_encoded \ as _string_to_encoded return "%s/url/%s" % (_registry_key, _string_to_encoded(service_url))