Source code for fwOper.acl

# ----------------------------------------------------------------------------------------
from nettoolkit import *
from collections import OrderedDict
from copy import deepcopy

from .acg import OBJ
from .member import *
from .entity import *
from .static import *
from .fwObj import *

# ----------------------------------------------------------------------------------------
# Local Functions
# ----------------------------------------------------------------------------------------

[docs]def access_list_list(config_list): """extracts access-lists from provided configuration list ie.config_list. Args: config_list (list): configuration list Returns: list: access-lists lines in a list """ return [line.rstrip() for line in config_list if line.startswith("access-list ")]
[docs]def update_obj_grp_str(item, what): """update the object group and host string in acl Args: item (dict): acl line item what (str): acl line attribte name ('source', 'destination', 'ports', 'protocol') Returns: str: string represenation of object group or host object in acl """ if isinstance(item[what], OBJ): return 'object-group ' + item[what].name if isinstance(item[what], Network) and item[what].host: return 'host ' + item[what].network else: return item[what]
[docs]def dummy_group(source_grp, item, values): """create a dummy object-group with provided items, by taking template as source group Args: source_grp (OBJ): source group (will be a template to create new dummy group) item (str): acl line attribte name ('source', 'destination', 'ports', 'protocol') values (str, set, tuple, list): set of value(s) Returns: OBJ: object-group object with provided item: values """ v = values if isinstance(values, (set, tuple, list)) else {values,} dg = OBJ('temporary', 1) if item in ('protocol',):rdk = 'protocol-object' if item in ('ports',):rdk = 'port-object' if item in ('source', 'destination'): rdk = 'network-object' ogd = {} ogd['candiates_list'] = '' ogd['type'] = source_grp.obj_grp_type ogd['svc_filter'] = source_grp.obj_grp_svc_filter dg.set_instance_primary_details(ogd) dg._repr_dic[rdk] = v return dg
# ---------------------------------------------------------------------------------------- # Access Lists Entries # ----------------------------------------------------------------------------------------
[docs]class ACLS(Plurals): """collection of ACL objects Args: Plurals (Plurals): Inherits - group of items properties definitions """ def __init__(self, config_list, objs=None): super().__init__() self.what = "access-lists" self.acls_list = access_list_list(config_list) self.set_acl_names() self.set_objects(objs)
[docs] def changes(self, change): """collate the delta changes recorded in all access-lists and provide delta for that change ( "ADDS", "REMOVALS") Args: change (str): type of change for which change output requested ( "ADDS", "REMOVALS" ) Returns: str: delta changes """ return super().changes('acl', change)
# ~~~~~~~~~~~~~~~~~~ CALLABLE ~~~~~~~~~~~~~~~~~~
[docs] def set_acl_names(self): """sets available access-lists names in _repr_dic (key) Returns: dict: _repr_dict """ for acl_line in self.acls_list: spl_acl_line = acl_line.split() acl_name = spl_acl_line[1] if acl_name not in self._repr_dic: self._repr_dic[acl_name] = [] self._repr_dic[acl_name].append(acl_line) return self._repr_dic
[docs] def set_objects(self, objs): """sets access-lists (ACL)s in _repr_dic (value) Args: objs (OBJS): object of dictionary of object-groups """ for acl_name, acl_lines_list in self._repr_dic.items(): acl = ACL(acl_name, acl_lines_list, objs) acl.parse(objs) self._repr_dic[acl_name] = acl
# ---------------------------------------------------------------------------------------- # Access List detail # ----------------------------------------------------------------------------------------
[docs]class ACL(Singulars): """Individual access-list object Args: Singulars (Singulars): Inherits - individual object properties definitions Raises: Exception: MissingMandatoryParameter Exception: exact match process error Returns: ACL: a single access-list object Yields: tuple: tuple of (line-number, line-attributes) """ end_point_identifiers_pos = { # static index points 0: 5, # src 1: 7, # dst 2: 9, # port } mandatory_item_values_for_str = ('acl_type', 'action', 'protocol', 'source', 'destination', 'ports', 'log_warning' ) def __init__(self, acl_name, acl_lines_list, objs): super().__init__(acl_name) self.name = acl_name self.acl_lines_list = acl_lines_list self.objs = objs self.adds = '' self.removals = '' self._sequence = False self._repr_dic = OrderedDict() @property def max(self): return max(self._repr_dic.keys()) @property def min(self): return min(self._repr_dic.keys()) @property ## Boolean: sequence number visibility def sequence(self): return self._sequence @sequence.setter def sequence(self, seq): self._sequence = seq def __iter__(self): for k, v in sorted(self._repr_dic.items()): yield (k, v) def __getitem__(self, item): if isinstance(item, slice): return ''.join([self[i] for i in range(*item.indices(len(self)))]) else: try: return self._to_str(item) except KeyError: return None def __delitem__(self, item): self.delete(item) def __add__(self, attribs): return self.copy_and_append(attribs) def __sub__(self, n): return self.copy_and_delete(n) def __iadd__(self, attribs): self.append(attribs) return self def __isub__(self, n): self.delete(n) return self def __eq__(self, obj): return self._is_equal(obj) def __gt__(self, obj): return self._compare(obj) def __lt__(self, obj): return obj._compare(self) def __contains__(self, item): return self.contains(item) # ~~~~~~~~~~~~~~~~~~~ EXTERNAL CALLABLES ~~~~~~~~~~~~~~~~~~~
[docs] def str(self): """String representation of full acl Returns: str: full acl """ s = '' for k, v in self: s += self[k] return s
[docs] def add_str(self): """String representation of acl recoded additions Returns: str: recorded acl changes (adds) """ return str(self.adds)
[docs] def del_str(self): """String representation of acl recoded deletions Returns: str: recorded acl changes (removals) """ return str(self.removals)
# //////// OPERATIONS /////////
[docs] def delete(self, attribs, stop=None, step=1): """delete a line in acl: can be use with standard delete command as well, del(acl_name[n]) Args: attribs (int, dict): int->deletes an entry by line number, dict->delete entry which matches attribute stop (int, optional): to delete a range of lines provide end sequence. Defaults to None. step (int, optional): to delete line numbers in multiple of. Defaults to 1. Returns: str: delta change(s) for the deletion of entry """ if isinstance(attribs, int): if stop and isinstance(stop, int): s = '' for i in reversed(range(attribs, stop, step)): s += self.delete(i) return s else: return self._delete_by_line(attribs) elif isinstance(attribs, dict): return self._delete_by_attribs(attribs) elif isinstance(attribs, slice): for i in reversed(range(*attribs.indices(len(self)))): self.delete(i) else: print(f"Incorrect input to delete {attribs}") return None
[docs] def insert(self, line_no, attribs): """insert a line in acl: can be use with standard way as well, aclname[line_no] = attribs display warning message - MatchingEntryAlreadyexistAtLine, if a match already exist in acl Args: line_no (int): line number at which entry to be inserted attribs (dict): line attributes Returns: str: delta change(s) for the insertion of entry """ mv = self.contains(attribs) if not mv: self._key_extend(line_no) return self._add(line_no, attribs) else: print(f"MatchingEntryAlreadyexistAtLine-{mv}")
[docs] def append(self, attribs): """append a line to acl display warning message - MatchingEntryAlreadyexistAtLine, if a match already exist in acl Args: attribs (dict): line attributes Returns: str: delta change(s) for the append of entry """ mv = self.contains(attribs) if not mv: return self._add(self.max+1, attribs) else: print(f"MatchingEntryAlreadyexistAtLine-{mv}")
[docs] def contains(self, item): """check matching attributes in acl object, and return set of matching acl line numbers for containing item (sparse match) Args: item (dict): line attributes Returns: set: set of matching acl line numbers (sparse match) """ matching_lines = set() item = self._update_group_members(item) for line_no, acl_details in self: if isinstance(acl_details, dict): for item_k, item_v in item.items(): if item_k == 'log_warning': continue if isinstance(acl_details[item_k], OBJ): if item_v not in acl_details[item_k]: break continue if item_v != acl_details[item_k]: break else: matching_lines.add(line_no) return matching_lines
[docs] def exact(self, item): """check matching attributes in acl object, and return set of matching acl line numbers for exact matches item only Args: item (dict): line attributes Raises: Exception: exact match process error Returns: set: set of matching acl line numbers (exact match) """ matching_lines = set() item = self._update_group_members(item) for line_no, acl_details in self: if isinstance(acl_details, dict): for item_k, item_v in item.items(): if item_k == 'log_warning': continue if isinstance(acl_details[item_k], OBJ): dg_rdk = dummy_group(acl_details[item_k], item_k, item_v) try: if dg_rdk != acl_details[item_k]: break except: raise Exception("exact match process error..") continue if item_v != acl_details[item_k]: break else: matching_lines.add(line_no) return matching_lines
[docs] def difference(self, obj): """difference between self and another ACL object elements Args: obj (ACL): another ACL object to compare differences Returns: dict: difference between self and another ACL object elements """ diffacl = ACL(self._name, None, self.grp) for self_k, self_v in self._repr_dic.items(): if isinstance(self_v, ACL_REMARK): continue _self_v = {k:v for k,v in self_v.items() if k != 'remark'} for obj_k, obj_v in obj._repr_dic.items(): if isinstance(obj_v, ACL_REMARK): continue _obj_v = {k:v for k,v in obj_v.items() if k != 'remark'} found = _self_v == _obj_v if found: break if not found: diffacl[self_k] = self_v return diffacl
[docs] def same_elements(self, obj): """compare self for similar elements with provided another ACL object. Args: obj (ACL): another ACL object to compare elements Returns: bool: if self and provided ACL has same elements or not """ return ( not self.difference(obj) and not obj.difference(self) )
# ---------------- Operate on a Copy ---------------
[docs] def copy_and_append(self, attribs): """create duplicate of self, append a new acl line in new object with provided attributes Args: attribs (dict): line attributes Returns: ACL: copy of ACL with attributes appended """ newobj = deepcopy(self) newobj.append(attribs) return newobj
[docs] def copy_and_delete(self, attribs): """create duplicate of self, delete a line in new acl for given line number/attributes Args: attribs (dict): line attributes Returns: ACL: copy of ACL with attributes/line removed """ newobj = deepcopy(self) newobj -= attribs return newobj
[docs] def copy_and_insert(self, line_no, attribs): """create duplicate of self, insert a new acl line in new acl object, with provided attributes at given line number and return new updated object. existing object remains untouched. Args: line_no (int): line number at which entry to be inserted attribs (dict): line attributes Returns: ACL: copy of ACL with attributes/line insert """ newacl = deepcopy(self) newacl.insert(line_no, attribs) return newacl
# ~~~~~~~~~~~~~~~~~~~ INTERNALS / SUPPORTIVE ~~~~~~~~~~~~~~~~~~~ def _delete_by_line(self, line_no): """Supportive : delete a line in acl Args: line_no (int): line number at which entry to be deleted Returns: str: delta change(s) for the deletion of entry """ removals = self._del_str(line_no) self.removals += removals self._key_delete(line_no) if self.max > line_no: self._key_deflate(line_no) return removals def _delete_by_attribs(self, attribs): """Supportive : delete a line in acl by attrib Args: attribs (dict): line attributes Returns: str: delta change(s) for the addition of entry """ mv = self.contains(attribs) s = '' for i in reversed(sorted(mv)): s += self._delete_by_line(i) return s def _is_equal(self, obj): """supportive: for standard way of comparing two ACL objects self and obj Args: obj (ACL): another ACL object to be compared with self Returns: bool: if both ACL objects (self and obj) are equal or not """ for k, v in obj: if k in self._repr_dic and self._repr_dic[k] == v: continue else: return False return True def _compare(self, obj): """supportive: compare and get difference between self and obj/ACL object Args: obj (ACL): another ACL object to be compared with self Returns: ACL: an ACL with differences between self and provided obj(ACL) object """ diffacl = ACL(self._name, None, self.grp) for self_k, self_v in self._repr_dic.items(): found = False for obj_k, obj_v in obj._repr_dic.items(): found = self_v == obj_v if found: break if not found: diffacl[self_k] = self_v return diffacl def _key_extend(self, n): """supportive: insert a new line at position n Args: n (int): sequence line number of an acl from where keys to be extended to insert a new line """ rvs_keys = list(reversed(self._repr_dic.keys())) for key in rvs_keys: if key >= n: self[key+1] = self._repr_dic[key] else: break def _key_delete(self, n): """supportive: deletes a line in ACL Args: n (int): sequence line number of an acl to be removed """ try: del(self._repr_dic[n]) except: print(f"NoDeletableEntryFoundForLine-{n}-orAlreadyRemoved") def _key_deflate(self, n): """supportive: rearranges next lines Args: n (int): sequence line number of an acl from where keys to be deflated """ last_used_key = self.max for key in range(n, last_used_key): self[key] = self._repr_dic[key+1] del(self._repr_dic[last_used_key]) def _add(self, line_no, attribs): """supportive : update member per std and insert entry to acl Args: line_no (int): line number attribs (dict): attributes to be set on line Returns: str: delta change of modification """ attribs = self._update_members(attribs) attribs = self._update_remarks(line_no, attribs) self[line_no] = attribs adds = self._to_str(line_no) self.adds += adds return adds def _update_members(self, attribs): """supportive : attributes update as per std source/destination/port/protocol Args: attribs (dict): attributes to be modified with members if any Returns: dict: updated attributes """ attribs = self._update_group_members(attribs) return attribs def _update_remarks(self, n, attribs): """supportive : remark attributes update, if not provided use previous line remark Args: n (int): line number attribs (dict): remark attributes to be modified if need any Returns: dict: updated attributes """ if not attribs.get('remark') : try: attribs['remark'] = self._repr_dic[n-1]['remark'] except: attribs['remark'] = str(self._repr_dic[n-1]) return attribs def _update_group_members(self, attribs): """supportive : source/destination/port attributes update as per std Args: attribs (dict): attributes Returns: dict: updated attributes """ sdps = {'source', 'destination', 'ports'} for sdp in sdps: self._update_an_attrib(attribs, sdp) return attribs def _update_an_attrib(self, attribs, sdp): """supportive : an attribute update as per std ( one of source/destination/port ) Args: attribs (dict): attributes sdp (str): attribute name ('source', 'destination' etc..) """ f = port_group_member if sdp == 'ports' else network_group_member if isinstance(attribs[sdp], (set, tuple, list)): # iterable attribute update self._update_set_attribs(attribs, sdp, f) else: # normal attribute update attribs[sdp] = f(str(attribs[sdp]).split(), idx=0, objectGroups=self.objs) def _update_set_attribs(self, attribs, sdp, f): """supportive : an attribute update per std (iterable attribute) Args: attribs (dict): attributes sdp (str): attribute name ('source', 'destination' etc..) f (function): function on which the action to carried up on """ s = set() for _sdp in attribs[sdp]: s.add(f(str(_sdp).split(), idx=0, objectGroups=self.objs)) attribs[sdp] = s # ----------------- String repr / supportive --------------- # def _to_str(self, n, sequence=True): """String representation of an acl-line (n) Args: n (int): line number sequence (bool, optional): require sequence number in output. Defaults to True. Returns: str: an acl line """ seq = self.sequence and sequence seq_no = f"line {n} " if seq else "" item = self._repr_dic[n] if isinstance(item, dict): for v in self.mandatory_item_values_for_str: if v not in item: item[v] = self._normalize(v) log_warning = " log warning" if item['log_warning'] else "" src = update_obj_grp_str(item, 'source') dst = update_obj_grp_str(item, 'destination') pt = update_obj_grp_str(item, 'ports') prt = update_obj_grp_str(item, 'protocol') s = (f"access-list {self._name} {seq_no}" f"{item['acl_type']} {item['action']} {prt} " f"{src} {dst} {pt}{log_warning}\n") else: s = f"access-list {self._name} {seq_no}{item}" return s def _del_str(self, n=0): """negating string of an acl-line (n), can be use with standard way as well usage: del(acl[n:n+x:step]) to delete acl entry(ies). get/find deleted entries using property acl.removals [remove full acl if n is not provided] Args: n (int, optional): line number. Defaults to 0. Returns: str: delta for negating acl line(s) """ s = '' if n and isinstance(n, int): return "no " + self._to_str(n, False) elif n and isinstance(n, (list, tuple, set)): for i in n: s += self._del_str(i) elif not n: for n, v in self: s += self._del_str(n) return s ## ----------- Other Supportive to Supportives --------- ## def _normalize(self, item): """return default attributes for the require item/attribute. Args: item (str): attribute name Raises: Exception: MissingMadatoryParameter Returns: various: standard value for item, defined in normalize_item_values """ normalize_item_values = { 'acl_type': 'extended', # 'action': 'permit', # 'protocol': 'tcp', # 'source': , # 'destination': , # 'ports': , 'log_warning': True, } if normalize_item_values.get(item): return normalize_item_values[item] else: raise Exception(f"MissingMandatoryParameter-{item}, NormalizationNotAvailableForMandatoryField") # ~~~~~~ CALLABLE ~~~~~~~~~~~~~~~~~~~ # USED WHILE INITIATLIZING / PARSER # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def parse(self, objs): """parse access-list-lines-list and set _repr_dic objs requires for acl lines having object-group-names Args: objs (OBJS): object of object-groups (OBJS) """ remark = None for line_no, line in enumerate(self.acl_lines_list): test = line.startswith("access-list al_from_blue extended deny tcp any4 any4 object-group INET-TCP-DROP") spl_line = line.split() # remarks line if spl_line[2] == 'remark': remark = " ".join(spl_line[3:]) self._repr_dic[line_no] = ACL_REMARK(remark) continue # src /dst / ports idx_variance = 0 protocol_variance = 1 if spl_line[4] == "object-group" else 0 for k, v in self.end_point_identifiers_pos.items(): pv = v+protocol_variance+idx_variance if k == 0: source = network_group_member(spl_line, idx=pv, objectGroups=objs) if k == 1: destination = network_group_member(spl_line, idx=pv, objectGroups=objs) if k == 2: ports = port_group_member(spl_line, idx=pv, objectGroups=objs) try: if spl_line[pv] in ANY: idx_variance -= 1 except: pass # add rest statics and create ACL entry dict self._repr_dic[line_no] = { 'remark': remark, 'acl_type': spl_line[2], 'action': spl_line[3], 'protocol': spl_line[4+protocol_variance], 'source': source, 'destination': destination, 'ports': ports, 'log_warning': STR.found(line, 'log warnings'), }
# ---------------------------------------------------------------------------------------- # Main # ---------------------------------------------------------------------------------------- if __name__ == '__main__': pass # ----------------------------------------------------------------------------------------