Source code for Acquire.Identity._aclrules


from enum import Enum as _Enum

__all__ = ["ACLRules", "ACLUserRules", "ACLGroupRules"]


class ACLRuleOperation(_Enum):
    MAX = "max"  # add rules together (most permissive)
    MIN = "min"  # add rules together (least permissive)
    SUB = "sub"  # subtract rules (why?)
    SET = "set"  # break - set first matching fully-resolved rule

    def to_data(self):
        return self.value

    def combine(self, acl1, acl2):
        if acl1 is None:
            return acl2
        elif acl2 is None:
            return acl1

        if self is ACLRuleOperation.SET:
            return acl1
        elif self is ACLRuleOperation.MAX:
            return acl1 + acl2
        elif self is ACLRuleOperation.MIN:
            return acl1 * acl2
        elif self is ACLRuleOperation.SUB:
            return acl1 - acl2
        else:
            return None

    @staticmethod
    def from_data(data):
        return ACLRuleOperation(data)


def _save_rule(rule):
    """Return a json-serialisable object for the passed rule"""
    return [rule.__class__.__name__, rule.to_data()]


def _load_rule(rule):
    """Return the rule loaded from the json-deserialised data"""
    try:
        classname = rule[0]
        classdata = rule[1]
    except:
        raise TypeError("Expected [classname, classdata]")

    if classname == "ACLRules":
        return ACLRules.from_data(classdata)
    elif classname == "ACLUserRules":
        return ACLUserRules.from_data(classdata)
    elif classname == "ACLGroupRules":
        return ACLGroupRules.from_data(classdata)
    elif classname == "ACLRule":
        from Acquire.Identity import ACLRule as _ACLRule
        return _ACLRule.from_data(classdata)
    else:
        raise TypeError("Unrecognised type '%s'" % classname)


[docs]class ACLGroupRules: """This class holds rules that apply to individual groups""" def __init__(self): """Construct, optionally starting with a default rule is no groups are matched """ self._group_rules = {} def __eq__(self, other): if isinstance(other, self.__class__): return self.__dict__ == other.__dict__ else: return False def __str__(self): s = [] for group, rule in self._group_rules.items(): s.append("%s => %s" % (group, rule)) return "Group{%s}" % ", ".join(s) def __repr__(self): return self.__str__()
[docs] def resolve(self, identifiers=None, must_resolve=True, upstream=None, unresolved=False): """Resolve the rule for the user with specified group_guid. This returns None if there are no rules for this group """ try: group_guids = identifiers["group_guids"] except: group_guids = [] try: group_guids.append(identifiers["group_guid"]) except: pass resolved = None for group_guid in group_guids: if group_guid in self._group_rules: rule = self._group_rules[group_guid] rule.resolve(must_resolve=must_resolve, identifers=identifiers, upstream=upstream, unresolved=unresolved) if resolved is None: resolved = rule else: resolved = resolved + rule if resolved is None: if must_resolve: from Acquire.Identity import ACLRule as _ACLRule return _ACLRule.inherit().resolve(must_resolve=True, identifiers=identifiers, upstream=upstream, unresolved=unresolved) else: return None else: return resolved
[docs] def add_group_rule(self, group_guid, rule): """Add a rule for the used with passed 'group_guid'""" self._group_rules[group_guid] = rule
[docs] def to_data(self): """Return a json-serialisable representation of these rules""" data = {} for group_guid, rule in self._group_rules.items(): data[group_guid] = _save_rule(rule) return data
[docs] @staticmethod def from_data(data): """Return the rules constructed from the passed json-deserialised object """ rules = ACLGroupRules() if data is not None and len(data) > 0: for group_guid, rule in data.items(): rules.add_group_rule(group_guid, _load_rule(rule)) return rules
[docs]class ACLUserRules: """This class holds rules that apply to individual users""" def __init__(self): """Construct, optionally starting with a default rule if no users are matched """ self._user_rules = {} def __eq__(self, other): if isinstance(other, self.__class__): return self.__dict__ == other.__dict__ else: return False def __str__(self): s = [] for user, rule in self._user_rules.items(): s.append("%s => %s" % (user, rule)) return "User{%s}" % ", ".join(s)
[docs] def resolve(self, must_resolve=True, identifiers=None, upstream=None, unresolved=False): """Resolve the rule for the user with specified user_guid. This returns None if there are no rules for this user """ try: user_guids = identifiers["user_guids"] except: user_guids = [] try: user_guids.append(identifiers["user_guid"]) except: pass resolved = None for user_guid in user_guids: if user_guid in self._user_rules: rule = self._user_rules[user_guid] rule.resolve(must_resolve=must_resolve, identifiers=identifiers, upstream=upstream, unresolved=unresolved) if resolved is None: resolved = rule else: resolved = resolved + rule if resolved is None: if must_resolve: from Acquire.Identity import ACLRule as _ACLRule return _ACLRule.inherit().resolve(must_resolve=True, identifiers=identifiers, upstream=upstream, unresolved=unresolved) else: return None else: return resolved
[docs] def add_user_rule(self, user_guid, rule): """Add a rule for the used with passed 'user_guid'""" self._user_rules[user_guid] = rule
@staticmethod def _create(aclrule, user_guid, user_guids): rule = ACLUserRules() if user_guid is not None: rule.add_user_rule(user_guid, aclrule) if user_guids is not None: for user_guid in user_guids: rule.add_user_rule(user_guid, aclrule) return rule
[docs] @staticmethod def owner(user_guid=None, user_guids=None): """Simple shorthand to create the rule that the specified user is the owner of the resource """ from Acquire.Identity import ACLRule as _ACLRule return ACLUserRules._create(aclrule=_ACLRule.owner(), user_guid=user_guid, user_guids=user_guids)
[docs] @staticmethod def executer(user_guid=None, user_guids=None): """Simple shorthand to create the rule that the specified user is the executer of the resource """ from Acquire.Identity import ACLRule as _ACLRule return ACLUserRules._create(aclrule=_ACLRule.executer(), user_guid=user_guid, user_guids=user_guids)
[docs] @staticmethod def writer(user_guid=None, user_guids=None): """Simple shorthand to create the rule that the specified user is the writer of the resource """ from Acquire.Identity import ACLRule as _ACLRule return ACLUserRules._create(aclrule=_ACLRule.writer(), user_guid=user_guid, user_guids=user_guids)
[docs] @staticmethod def reader(user_guid=None, user_guids=None): """Simple shorthand to create the rule that the specified user is the reader of the resource """ from Acquire.Identity import ACLRule as _ACLRule return ACLUserRules._create(aclrule=_ACLRule.reader(), user_guid=user_guid, user_guids=user_guids)
[docs] def to_data(self): """Return a json-serialisable representation of these rules""" data = {} for user_guid, rule in self._user_rules.items(): data[user_guid] = _save_rule(rule) return data
[docs] @staticmethod def from_data(data): """Return the rules constructed from the passed json-deserialised object """ rules = ACLUserRules() if data is not None and len(data) > 0: for user_guid, rule in data.items(): rules.add_user_rule(user_guid, _load_rule(rule)) return rules
def _is_inherit(aclrule): """Return whether or not this passed rule is just an inherit-all""" from Acquire.Identity import ACLRule as _ACLRule if isinstance(aclrule, _ACLRule): if aclrule == _ACLRule.inherit(): return True return False
[docs]class ACLRules: """This class holds a combination of ACL rules. These are parsed in order to get the ACL for a resource. By default, this is a simple inherit rule (meaning that it will inherit whatever comes from upstream) """ def __init__(self, rule=None, rules=None, default_rule=None, default_operation=ACLRuleOperation.MAX): """Construct, optionally starting with a default ACLRule for all users """ if default_rule is None: self._is_simple_inherit = True elif _is_inherit(default_rule): self._is_simple_inherit = True else: self._is_simple_inherit = False self._default_rule = default_rule self._rules = [] self.set_default_operation(default_operation) if rule is not None: self.prepend(rule) if rules is not None: for rule in rules: try: aclrule = rule[1] oper = rule[0] except: aclrule = rule oper = default_operation self.append(aclrule=aclrule, operation=oper) def __eq__(self, other): if isinstance(other, self.__class__): return self.__dict__ == other.__dict__ else: return False def __str__(self): if self._is_simple_inherit: return "inherit" s = [] for rule in self._rules: s.append("%s" % rule) if self._default_rule is not None: s.append("DEFAULT %s" % self._default_rule) return "ACLRules{\n%s\n}" % "\n".join(s) @staticmethod def _create(rule, user_guid=None, group_guid=None, user_guids=None, group_guids=None, default_rule=None): """Create and return the ACLRules that applies 'rules' to everyone specified (or to everyone, if this is not specified) """ from Acquire.Identity import ACLRule as _ACLRule if default_rule is not None: if not isinstance(default_rule, _ACLRule): raise TypeError("The default_rule must be type ACLRule") if user_guids is None: user_guids = [] if group_guids is None: group_guids = [] if user_guid is not None: user_guids.append(user_guid) if group_guid is not None: group_guids.append(group_guid) if len(user_guids) == 0 and len(group_guids) == 0: if isinstance(rule, ACLRules): # this is a copy constructor return rule else: # no users or groups are specified - rule applies to everyone return ACLRules(default_rule=rule) if default_rule is None: default_rule = _ACLRule.denied() rules = [] if len(group_guids) > 0: group_rules = ACLGroupRules() group_rules.add_group_rule(group_guid, rule) rules.append(group_rules) if len(user_guids) > 0: user_rules = ACLUserRules() user_rules.add_user_rule(user_guid, rule) rules.append(user_rules) return ACLRules(rules=rules, default_rule=default_rule)
[docs] @staticmethod def create(rule, user_guid=None, user_guids=None, group_guid=None, group_guids=None, default_rule=None): """Create and return the ACLRules that gives 'rule' to everyone specified (or to everyone, if this is not specified) """ return ACLRules._create(user_guid=user_guid, user_guids=user_guids, group_guid=group_guid, group_guids=group_guids, default_rule=default_rule, rule=rule)
[docs] @staticmethod def owner(user_guid=None, user_guids=None, group_guid=None, group_guids=None, default_rule=None): """Create and return the ACLRules that gives ownership to everyone specified (or to everyone, if this is not specified) """ from Acquire.Identity import ACLRule as _ACLRule return ACLRules._create(user_guid=user_guid, user_guids=user_guids, group_guid=group_guid, group_guids=group_guids, default_rule=default_rule, rule=_ACLRule.owner())
[docs] @staticmethod def reader(user_guid=None, user_guids=None, group_guid=None, group_guids=None, default_rule=None): """Create and return the ACLRules that gives readership to everyone specified (or to everyone, if this is not specified) """ from Acquire.Identity import ACLRule as _ACLRule return ACLRules._create(user_guid=user_guid, user_guids=user_guids, group_guid=group_guid, group_guids=group_guids, default_rule=default_rule, rule=_ACLRule.reader())
[docs] @staticmethod def writer(user_guid=None, user_guids=None, group_guid=None, group_guids=None, default_rule=None): """Create and return the ACLRules that gives writership to everyone specified (or to everyone, if this is not specified) """ from Acquire.Identity import ACLRule as _ACLRule return ACLRules._create(user_guid=user_guid, user_guids=user_guids, group_guid=group_guid, group_guids=group_guids, default_rule=default_rule, rule=_ACLRule.writer())
[docs] @staticmethod def inherit(user_guid=None, user_guids=None, group_guid=None, group_guids=None, default_rule=None): """Create and return the ACLRules that sets inherit to everyone specified (or to everyone, if this is not specified) """ from Acquire.Identity import ACLRule as _ACLRule return ACLRules._create(user_guid=user_guid, user_guids=user_guids, group_guid=group_guid, group_guids=group_guids, default_rule=default_rule, rule=_ACLRule.inherit())
[docs] def is_simple_inherit(self): """Return whether or not this set of rules is a simple 'inherit all' """ return self._is_simple_inherit
[docs] def set_default_operation(self, default_operation): """Set the default operation used to combine together rules""" if not isinstance(default_operation, ACLRuleOperation): raise TypeError( "The default operation must be type ACLRuleOperation") self._default_operation = default_operation
[docs] def set_default_rule(self, aclrule): """Set the default rule if nothing else matches (optionally also specifying the default operation to combine rules) """ if self._is_simple_inherit: if _is_inherit(aclrule): return else: self._is_simple_inherit = False self._default_rule = aclrule self._default_operation = ACLRuleOperation.MAX self._rules = [] else: self._default_rule = aclrule
[docs] def append(self, aclrule, operation=None, ensure_owner=False): """Append a rule onto the set of rules. This will resolve any conflicts in the rules. If 'ensure_owner' is True, then this will ensure that there is at least one user who has unambiguous ownership of the resource controlled by these ACL rules """ try: idx = len(self._rules) except: idx = 2 self.insert(idx=idx, aclrule=aclrule, operation=operation, ensure_owner=ensure_owner)
[docs] def prepend(self, aclrule, operation=None, ensure_owner=False): """Prepend a rule onto the set of rules""" self.insert(idx=0, aclrule=aclrule, operation=operation, ensure_owner=ensure_owner)
[docs] def insert(self, idx, aclrule, operation=None, ensure_owner=False): """Insert the passed rule at index 'idx', specifying the operation used to combine this rule with what has gone before (defaults to self._default_operation) """ if operation is not None: if not isinstance(operation, ACLRuleOperation): raise TypeError( "The ACL operation must be type ACLRuleOperation") if self._is_simple_inherit: if _is_inherit(aclrule): return else: from Acquire.Identity import ACLRule as _ACLRule self._is_simple_inherit = False self._default_rule = None self._rules = [_ACLRule.inherit()] if operation is not None: self._rules.insert(idx, (operation, aclrule)) else: self._rules.insert(idx, aclrule) if ensure_owner: # need to write code to ensure there is at least one owner pass
[docs] def rules(self): """Return the list of ACL rules that will be applied in order (including the default rule, if set) """ if self._is_simple_inherit: from Acquire.Identity import ACLRule as _ACLRule return [(self._default_operation, _ACLRule.inherit())] else: import copy as copy r = copy.copy(self._rules) if self._default_rule is not None: r.append((self._default_operation, self._default_rule)) return r
[docs] def resolve(self, must_resolve=True, identifiers=None, upstream=None, unresolved=False): """Resolve the rule based on the passed identifiers. This will resolve the rules in order the final ACLRule has been generated. If 'must_resolve' is True, then this is guaranteed to return a fully-resolved simple ACLRule. Anything unresolved is looked up from 'upstream', or set equal to 'unresolved' """ from Acquire.Identity import ACLRule as _ACLRule if self._is_simple_inherit: return _ACLRule.inherit().resolve(must_resolve=must_resolve, identifiers=identifiers, upstream=upstream, unresolved=unresolved) result = None must_break = False for rule in self._rules: if isinstance(rule, tuple): trule = tuple(rule) # casting to stop linting error op = trule[0] rule = trule[1] else: op = self._default_operation # resolve the rule... rule = rule.resolve(must_resolve=False, identifiers=identifiers, upstream=upstream, unresolved=unresolved) if rule is not None: if op is ACLRuleOperation.SET: # take the first matching rule result = rule must_break = True break elif result is None: result = rule else: result = op.combine(result, rule) if (not must_break) and (self._default_rule is not None): rule = self._default_rule.resolve(must_resolve=False, identifiers=identifiers, upstream=upstream, unresolved=unresolved) if result is None: result = rule else: result = self._default_operation.combine(result, rule) # should now have a fully resolved ACLRule... if result is None: return _ACLRule.denied() if not isinstance(result, _ACLRule): raise PermissionError( "Did not fully resolve the ACLRule - got %s" % str(result)) if not result.is_fully_resolved(): # we have not been able to generate a fully-resolved ACL result = result.resolve(must_resolve=True, identifiers=identifiers, upstream=upstream, unresolved=unresolved) return result
[docs] @staticmethod def from_data(data): """Construct these rules from the passed json-serialised dictionary """ if isinstance(data, str) and data == "inherit": return ACLRules() if data is not None and len(data) > 0: if "default_rule" in data: default_rule = _load_rule(data["default_rule"]) else: default_rule = None if "default_operation" in data: default_operation = \ ACLRuleOperation.from_data(data["default_operation"]) else: default_operation = ACLRuleOperation.MAX if "rules" in data: rules = [] for rule in data["rules"]: if isinstance(rule, dict): aclrule = rule["rule"] oper = rule["operation"] rules.append((ACLRuleOperation.from_data(oper), _load_rule(aclrule))) else: rules.append((None, _load_rule(rule))) else: rules = None return ACLRules(default_rule=default_rule, default_operation=default_operation, rules=rules) else: return ACLRules()
[docs] def to_data(self): """Return a json-serialisable dictionary of these rules""" if self._is_simple_inherit: return "inherit" data = {} if len(self._rules) == 1: rule = self._rules[0] if isinstance(rule, tuple): trule = tuple(rule) rule = trule[1] data["rules"] = [_save_rule(rule)] elif len(self._rules) > 1: rules = [] for rule in self._rules: if isinstance(rule, tuple): trule = tuple(rule) # casting to stop linting error if trule[0] is None: rules.append(_save_rule(trule[1])) else: rules.append({"operation": trule[0].to_data(), "rule": _save_rule(trule[1])}) else: rules.append(_save_rule(rule)) data["rules"] = rules if self._default_rule is not None: data["default_rule"] = _save_rule(self._default_rule) if self._default_operation is not ACLRuleOperation.MAX: data["default_operation"] = self._default_operation.to_data() return data