Source code for Acquire.Storage._aclrules


from enum import Enum as _Enum

__all__ = ["ACLRules", "ACLUserRules", "ACLGroupRules",
           "create_aclrules"]


class ACLRuleOperation(_Enum):
    MAX = "max"  # add rules together (most permissive)
    MIN = "min"  # add rules together (least permissive)
    SUB = "sub"  # subtract rules (why?)
    SET = "set"  # break - set first matching fully-resolved rule

    def to_data(self):
        return self.value

    def combine(self, acl1, acl2):
        if acl1 is None:
            return acl2
        elif acl2 is None:
            return acl1

        if self is ACLRuleOperation.SET:
            return acl1
        elif self is ACLRuleOperation.MAX:
            return acl1 + acl2
        elif self is ACLRuleOperation.MIN:
            return acl1 * acl2
        elif self is ACLRuleOperation.SUB:
            return acl1 - acl2
        else:
            return None

    @staticmethod
    def from_data(data):
        return ACLRuleOperation(data)


[docs]def create_aclrules(**kwargs): """Create an ACLRules object the passed set of rules, for example user_guid, aclrule would set the aclrule for user to aclrule group_guid, aclrule would set the aclrule for group to aclrule default_rule would set a default rule if nothing else matches """ aclrules = None if "aclrules" in kwargs: aclrules = kwargs["aclrules"] if not isinstance(aclrules, ACLRules): aclrules = ACLRules(rule=aclrules) if "default" in kwargs: aclrules.set_default_rule(kwargs["default"]) if "aclrule" in kwargs: from Acquire.Storage import ACLRule as _ACLRule aclrule = kwargs["aclrule"] if "user_guid" in kwargs: if aclrules is None: aclrules = ACLRules(default_rule=_ACLRule.denied()) user_rules = ACLUserRules() user_rules.add_user_rule(kwargs["user_guid"], aclrule) aclrules.append(user_rules) elif "group_guid" in kwargs: if aclrules is None: aclrules = ACLRules(default_rule=_ACLRule.denied()) group_rules = ACLGroupRules() group_rules.add_group_rule(kwargs["group_guid"], aclrule) aclrules.append(group_rules) else: if isinstance(aclrule, ACLRules): aclrules.append(aclrule) if "default" in kwargs: if aclrules is None: aclrules = ACLRules(default_rule=kwargs["default"]) else: aclrules.set_default(kwargs["default"]) if aclrules is None: aclrules = ACLRules() return aclrules
def _save_rule(rule): """Return a json-serialisable object for the passed rule""" return [rule.__class__.__name__, rule.to_data()] def _load_rule(rule): """Return the rule loaded from the json-deserialised data""" try: classname = rule[0] classdata = rule[1] except: raise TypeError("Expected [classname, classdata]") if classname == "ACLRules": return ACLRules.from_data(classdata) elif classname == "ACLUserRules": return ACLUserRules.from_data(classdata) elif classname == "ACLGroupRules": return ACLGroupRules.from_data(classdata) elif classname == "ACLRule": from Acquire.Storage import ACLRule as _ACLRule return _ACLRule.from_data(classdata) else: raise TypeError("Unrecognised type '%s'" % classname)
[docs]class ACLGroupRules: """This class holds rules that apply to individual groups""" def __init__(self): """Construct, optionally starting with a default rule is no groups are matched """ self._group_rules = {} def __eq__(self, other): if isinstance(other, self.__class__): return self.__dict__ == other.__dict__ else: return False def __str__(self): s = [] for group, rule in self._group_rules.items(): s.append("%s => %s" % (group, rule)) return "Group{%s}" % ", ".join(s)
[docs] def resolve(self, must_resolve=True, **kwargs): """Resolve the rule for the user with specified group_guid. This returns None if there are no rules for this group """ try: group_guid = kwargs["group_guid"] except: group_guid = None if group_guid in self._group_rules: rule = self._group_rules[group_guid] return rule.resolve(must_resolve=must_resolve, **kwargs) elif must_resolve: from Acquire.Storage import ACLRule as _ACLRule return _ACLRule.inherit().resolve(must_resolve=True, **kwargs) else: return None
[docs] def add_group_rule(self, group_guid, rule): """Add a rule for the used with passed 'group_guid'""" self._group_rules[group_guid] = rule
[docs] def to_data(self): """Return a json-serialisable representation of these rules""" data = {} for group_guid, rule in self._group_rules.items(): data[group_guid] = _save_rule(rule) return data
[docs] @staticmethod def from_data(data): """Return the rules constructed from the passed json-deserialised object """ rules = ACLGroupRules() if data is not None and len(data) > 0: for group_guid, rule in data.items(): rules.add_group_rule(group_guid, _load_rule(rule)) return rules
[docs]class ACLUserRules: """This class holds rules that apply to individual users""" def __init__(self): """Construct, optionally starting with a default rule if no users are matched """ self._user_rules = {} def __eq__(self, other): if isinstance(other, self.__class__): return self.__dict__ == other.__dict__ else: return False def __str__(self): s = [] for user, rule in self._user_rules.items(): s.append("%s => %s" % (user, rule)) return "User{%s}" % ", ".join(s)
[docs] def resolve(self, must_resolve=True, **kwargs): """Resolve the rule for the user with specified user_guid. This returns None if there are no rules for this user """ try: user_guid = kwargs["user_guid"] except: user_guid = None if user_guid in self._user_rules: rule = self._user_rules[user_guid] return rule.resolve(must_resolve=must_resolve, **kwargs) elif must_resolve: from Acquire.Storage import ACLRule as _ACLRule return _ACLRule.inherit().resolve(must_resolve=True, **kwargs) else: return None
[docs] def add_user_rule(self, user_guid, rule): """Add a rule for the used with passed 'user_guid'""" self._user_rules[user_guid] = rule
@staticmethod def _create(aclrule, user_guid, user_guids): rule = ACLUserRules() if user_guid is not None: rule.add_user_rule(user_guid, aclrule) if user_guids is not None: for user_guid in user_guids: rule.add_user_rule(user_guid, aclrule) return rule
[docs] @staticmethod def owner(user_guid=None, user_guids=None): """Simple shorthand to create the rule that the specified user is the owner of the resource """ from Acquire.Storage import ACLRule as _ACLRule return ACLUserRules._create(aclrule=_ACLRule.owner(), user_guid=user_guid, user_guids=user_guids)
[docs] @staticmethod def executer(user_guid=None, user_guids=None): """Simple shorthand to create the rule that the specified user is the executer of the resource """ from Acquire.Storage import ACLRule as _ACLRule return ACLUserRules._create(aclrule=_ACLRule.executer(), user_guid=user_guid, user_guids=user_guids)
[docs] @staticmethod def writer(user_guid=None, user_guids=None): """Simple shorthand to create the rule that the specified user is the writer of the resource """ from Acquire.Storage import ACLRule as _ACLRule return ACLUserRules._create(aclrule=_ACLRule.writer(), user_guid=user_guid, user_guids=user_guids)
[docs] @staticmethod def reader(user_guid=None, user_guids=None): """Simple shorthand to create the rule that the specified user is the reader of the resource """ from Acquire.Storage import ACLRule as _ACLRule return ACLUserRules._create(aclrule=_ACLRule.reader(), user_guid=user_guid, user_guids=user_guids)
[docs] def to_data(self): """Return a json-serialisable representation of these rules""" data = {} for user_guid, rule in self._user_rules.items(): data[user_guid] = _save_rule(rule) return data
[docs] @staticmethod def from_data(data): """Return the rules constructed from the passed json-deserialised object """ rules = ACLUserRules() if data is not None and len(data) > 0: for user_guid, rule in data.items(): rules.add_user_rule(user_guid, _load_rule(rule)) return rules
def _is_inherit(aclrule): """Return whether or not this passed rule is just an inherit-all""" from Acquire.Storage import ACLRule as _ACLRule if isinstance(aclrule, _ACLRule): if aclrule == _ACLRule.inherit(): return True return False
[docs]class ACLRules: """This class holds a combination of ACL rules. These are parsed in order to get the ACL for a resource. By default, this is a simple inherit rule (meaning that it will inherit whatever comes from upstream) """ def __init__(self, rule=None, rules=None, default_rule=None, default_operation=ACLRuleOperation.MAX): """Construct, optionally starting with a default ACLRule for all users """ if default_rule is None: self._is_simple_inherit = True elif _is_inherit(default_rule): self._is_simple_inherit = True else: self._is_simple_inherit = False self._default_rule = default_rule self._rules = [] self.set_default_operation(default_operation) if rule is not None: self.prepend(rule) if rules is not None: for rule in rules: self.append(aclrule=rule[1], operation=rule[0]) def __eq__(self, other): if isinstance(other, self.__class__): return self.__dict__ == other.__dict__ else: return False def __str__(self): if self._is_simple_inherit: return "inherit" s = [] for rule in rules: s.append("%s" % rule) if self._default_rule is not None: s.append("DEFAULT %s" % self._default_rule) return s
[docs] def is_simple_inherit(self): """Return whether or not this set of rules is a simple 'inherit all' """ return self._is_simple_inherit
[docs] def set_default_operation(self, default_operation): """Set the default operation used to combine together rules""" if not isinstance(default_operation, ACLRuleOperation): raise TypeError( "The default operation must be type ACLRuleOperation") self._default_operation = default_operation
[docs] def set_default_rule(self, aclrule): """Set the default rule if nothing else matches (optionally also specifying the default operation to combine rules) """ if self._is_simple_inherit: if _is_inherit(aclrule): return else: self._is_simple_inherit = False self._default_rule = aclrule self._default_operation = ACLRuleOperation.MAX self._rules = [] else: self._default_rule = aclrule
[docs] def append(self, aclrule, operation=None, ensure_owner=False): """Append a rule onto the set of rules. This will resolve any conflicts in the rules. If 'ensure_owner' is True, then this will ensure that there is at least one user who has unambiguous ownership of the resource controlled by these ACL rules """ try: idx = len(self._rules) except: idx = 2 self.insert(idx=idx, aclrule=aclrule, operation=operation, ensure_owner=ensure_owner)
[docs] def prepend(self, aclrule, operation=None, ensure_owner=False): """Prepend a rule onto the set of rules""" self.insert(idx=0, aclrule=aclrule, operation=operation, ensure_owner=ensure_owner)
[docs] def insert(self, idx, aclrule, operation=None, ensure_owner=False): """Insert the passed rule at index 'idx', specifying the operation used to combine this rule with what has gone before (defaults to self._default_operation) """ if operation is not None: if not isinstance(operation, ACLRuleOperation): raise TypeError( "The ACL operation must be type ACLRuleOperation") if self._is_simple_inherit: if _is_inherit(aclrule): return else: from Acquire.Storage import ACLRule as _ACLRule self._is_simple_inherit = False self._default_rule = None self._rules = [_ACLRule.inherit()] if operation is not None: self._rules.insert(idx, (operation, aclrule)) else: self._rules.insert(idx, aclrule) if ensure_owner: # need to write code to ensure there is at least one owner pass
[docs] def rules(self): """Return the list of ACL rules that will be applied in order (including the default rule, if set) """ if self._is_simple_inherit: from Acquire.Storage import ACLRule as _ACLRule return [(self._default_operation, _ACLRule.inherit())] else: import copy as copy r = copy.copy(self._rules) if self._default_rule is not None: r.append((self._default_operation, self._default_rule)) return r
[docs] def resolve(self, must_resolve=True, **kwargs): """Resolve the rule based on the passed kwargs. This will resolve the rules in order the final ACLRule has been generated. If 'must_resolve' is True, then this is guaranteed to return a fully-resolved simple ACLRule """ from Acquire.Storage import ACLRule as _ACLRule if self._is_simple_inherit: return _ACLRule.inherit().resolve(must_resolve=must_resolve, **kwargs) result = None must_break = False for rule in self._rules: if isinstance(rule, tuple): op = rule[0] rule = rule[1] else: op = self._default_operation # resolve the rule... rule = rule.resolve(must_resolve=False, **kwargs) if rule is not None: if op is ACLRuleOperation.SET: # take the first matching rule result = rule must_break = True break elif result is None: result = rule else: result = op.combine(result, rule) if (not must_break) and (self._default_rule is not None): rule = self._default_rule.resolve(must_resolve=False, **kwargs) if result is None: result = rule else: result = self._default_operation.combine(result, rule) # should now have a fully resolved ACLRule... if result is None: return _ACLRule.denied() if not isinstance(result, _ACLRule): raise PermissionError( "Did not fully resolve the ACLRule - got %s" % str(result)) if not result.is_fully_resolved(): result = result.resolve(must_resolve=True, **kwargs) # we have not been able to generate a fully-resolved ACL return result
[docs] @staticmethod def from_data(data): """Construct these rules from the passed json-serialised dictionary """ if isinstance(data, str) and data == "inherit": return ACLRules() if data is not None and len(data) > 0: if "default_rule" in data: default_rule = _load_rule(data["default_rule"]) else: default_rule = None if "default_operation" in data: default_operation = \ ACLRuleOperation.from_data(data["default_operation"]) else: default_operation = ACLRuleOperation.MAX if "rules" in data: rules = [] for rule in data["rules"]: if isinstance(rule, tuple): rules.append((ACLRuleOperation.from_data(rule[0]), _load_rule(rule[1]))) else: rules.append((None, _load_rule(rule))) else: rules = None return ACLRules(default_rule=default_rule, default_operation=default_operation, rules=rules) else: return ACLRules()
[docs] def to_data(self): """Return a json-serialisable dictionary of these rules""" if self._is_simple_inherit: return "inherit" data = {} if len(self._rules) > 0: rules = [] for rule in self._rules: if isinstance(rule, tuple): rules.append((rule[0].to_data(), _save_rule(rule[1]))) else: rules.append(_save_rule(rule)) data["rules"] = rules if self._default_rule is not None: data["default_rule"] = _save_rule(self._default_rule) if self._default_operation is not ACLRuleOperation.MAX: data["default_operation"] = self._default_operation.to_data() return data