Source code for fwOper.acg


# ----------------------------------------------------------------------------------------
from nettoolkit import *
from collections import OrderedDict
from copy import deepcopy

from .member import *
from .entity import *
from .static import *
from .fwObj import *

# ----------------------------------------------------------------------------------------
# Local Functions
# ----------------------------------------------------------------------------------------

def _object_group_list(config_list):
	"""extracts obect groups from provided configuration list ie.config_list 
	returns object groups (OBJ)s in a list

	Args:
		config_list (list): full configuration list

	Returns:
		list: object-groups list
	"""
	obj_grp_list = []
	obj_group_started = False
	for line in config_list:
		spaces = STR.indention(line)
		if line.startswith("object-group"): 
			obj_group_started = True
			obj_grp_list.append(line.rstrip())
			continue
		if obj_group_started and spaces > 0:
			obj_grp_list.append(line.rstrip())
			continue
		if obj_group_started:
			break
	return obj_grp_list


[docs]def get_member_obj(member_type, member, objs): """convert and provided string member to member object aka: Network, OBJ, Ports based on its member-type provided. objs: requires for recursive lookup for OBJ (if any) Args: member_type (str): type of member member (str): string repr of Network, OBJ, Ports etc objs (OBJS): collection of Object-groups (OBJS object) Raises: Exception: InvalidMemberType Returns: [Network, Ports, None]: Based on member type returns member object """ member_type_map = { 'port-object': port_member, 'network-object': network_member, 'icmp-object': None, # TBD 'group-object': None, # TBD 'protocol-object': None, # TBD # ... add more as need } if member_type not in member_type_map: raise Exception(f"InvalidMemberTypeDefined-{member_type} for member-{member}") return member_type_map[member_type](member, objs)
# ---------------------------------------------------------------------------------------- # Collection of Object Group(s) objects # ----------------------------------------------------------------------------------------
[docs]class OBJS(Plurals): """collection of object groups Args: Plurals (Plurals): Inherits - group of items properties definitions """ def __init__(self, config_list): super().__init__() self.what = "object-groups" self.obj_grps_list = _object_group_list(config_list) self._set_obj_grp_basics() self.set_objects()
[docs] def changes(self, change): """collate the delta changes recorded in all object-groups and provide delta for that change ( "ADDS", "REMOVALS") Args: change (str): type of change for which change output requested ( "ADDS", "REMOVALS" ) Returns: str: delta changes """ return super().changes('object-group', change)
# ~~~~~~~~~~~~~~~~~~ CALLABLE ~~~~~~~~~~~~~~~~~~
[docs] def get_matching_obj_grps(self, requests): """matches provided (request members) in all object-groups available on device and returns dictionary of object-group names, where object-group matches same members in it. Args: requests (tuple, list, set): list/set/tuple with members of dict, containing 'source', 'destination', 'ports' as keys. Raises: Exception: Invalid Request type Returns: dict: include all three, src, dest, port """ candidates = {'source': [], 'destination': [], 'ports': []} group_names = {} if not isinstance(requests, (tuple, list, set)): raise Exception(f"NotValidRequestProvided-{requests}") for request in requests: for loc, member in candidates.items(): if request[loc] in ANY: continue member.append(request[loc]) for loc, member in candidates.items(): obj_grps_list = self.matching_obj_grps(member) if obj_grps_list: group_names[loc] = obj_grps_list return group_names
[docs] def matching_obj_grps(self, member): """matches provided [members] in all object-groups available on device and returns list of object-group names, where object-group matches same members in it. Args: member (list, set, tuple): list/set/tuple with members Returns: list: singular object """ if isinstance(member, str): return [obj for name, obj in self if member in obj] elif isinstance(member, (tuple, list, set)): g = [] for name, obj in self: match = False for m in member: match = m in obj if not match: break if match and len(obj) == len(member): g.append(obj) return g
# ~~~~~~~~~~~~~~~~~~~ INTERNALS ~~~~~~~~~~~~~~~~~~~ def _set_obj_grp_basics(self): """set basic information of each object-group. Returns: dict: representation dictionary of self. """ obj_grp_name = None for obj_grps_line in self.obj_grps_list: spaces = STR.indention(obj_grps_line) if spaces == 0: spl_obj_grps_line = obj_grps_line.split() obj_grp_type = spl_obj_grps_line[1] obj_grp_name = spl_obj_grps_line[2] if obj_grp_name not in self._repr_dic: self._repr_dic[obj_grp_name] = {} self._repr_dic[obj_grp_name]['type'] = obj_grp_type self._repr_dic[obj_grp_name]['candiates_list'] = [] try: obj_grp_svc_filter = spl_obj_grps_line[3] self._repr_dic[obj_grp_name]['svc_filter'] = obj_grp_svc_filter except: self._repr_dic[obj_grp_name]['svc_filter'] = "" else: self._repr_dic[obj_grp_name]['candiates_list'].append(obj_grps_line) return self._repr_dic
[docs] def set_objects(self): """set extended information of each object-group. """ h = 0 for obj_grp_name, obj_grp_details in self._repr_dic.items(): obj_grp = OBJ(obj_grp_name, h) obj_grp.set_instance_primary_details(obj_grp_details) obj_grp.parent = self obj_grp.parse() self._repr_dic[obj_grp_name] = obj_grp h += 1
# ---------------------------------------------------------------------------------------- # Object Group Detail # ----------------------------------------------------------------------------------------
[docs]class OBJ(Singulars): """Individual group object Args: Singulars (Singulars): Inherits - individual object properties definitions Raises: Exception: IncorrectIteminItemType Exception: InvalidGroupMemberType Exception: NoValidCandidate Returns: OBJ: a single object-group object """ def __init__(self, obj_grp_name, _hash): """Individual object-group object initialization Args: obj_grp_name (str): name of an object-group _hash (int): hashes for object-group """ super().__init__(obj_grp_name) self.name = obj_grp_name self.description = "" self.removals = {} self.adds = {} self._hash = _hash def __eq__(self, obj): return (((self>obj) is None) and ((obj>self) is None) # and (self.description == obj.description) # tantative ) def __len__(self): return self._len_of_members() def __contains__(self, member): return self._contains(member) def __iadd__(self, n): self._add(n) return self def __isub__(self, n): self._delete(n) return self def __gt__(self, obj): diffs = self._missing(obj) obj_grp = self._blank_copy_of_self() obj_grp._repr_dic = diffs if diffs: return obj_grp def __lt__(self, obj): diffs = obj._missing(self) obj_grp = self._blank_copy_of_self() obj_grp._repr_dic = diffs if diffs: return obj_grp def __add__(self, attribs): newobj = deepcopy(self) newobj += attribs return newobj def __sub__(self, attribs): newobj = deepcopy(self) newobj._delete(attribs) return newobj # ~~~~~~~~~~~~~~~~~~~ EXTERNAL CALLABLES ~~~~~~~~~~~~~~~~~~~
[docs] def str(self): """String representation of full object-group Returns: str: object-group """ s = self._group_head() s += self._group_description() s += self._to_str(self._repr_dic, header=False) return s
# object-group additions / removals
[docs] def add(self, *arg): return self._add(*arg)
[docs] def delete(self, *arg): return self._delete(*arg)
# String representation of object-group additions / removals
[docs] def add_str(self, header=True): return self._to_str(self.adds)
[docs] def del_str(self, header=False): return self._to_str(self.removals)
[docs] def over(self, acls): """ returns dictionary of acls with acl/line/attribute if object group present in any acls Args: acls (ACLS): dictionary of all acls (ACLS) Returns: dict: dictionary of acls with acl/line/attribute """ d = {} for i, acl in acls: d.update(self._within(acl)) return d
[docs] def has(self, obj): """returns object group if self contains provided object-group. Args: obj (OBJ): object-group object to check within Returns: (OBJ, None, False): object-group object if self within it else None """ if 'group-object' not in self.keys(): return None for sgrp in self['group-object']: if obj is self.parent[sgrp]: return self else: return self.parent[sgrp].has(obj) return False
# ---------------- Operate on a Copy --------------- def _blank_copy_of_self(self): """create and return a copy of original instance Returns: OBJ: copy of self """ obj_grp = OBJ(self._name, self._hash*1) obj_grp.set_instance_primary_details(self.grp_details) return obj_grp # ~~~~~~~~~~~~~~~~~~~ INTERNALS / SUPPORTIVE ~~~~~~~~~~~~~~~~~~~ def _within(self, acl): """if self object group is within an acl, provides line number and attributes details with matching object group name Args: acl (ACL): access-list object to check within Returns: dict: dictionary of acls with line/attributes for matching self. """ d = {} acl_name = acl.name for i, line in acl: for attr in GROUP_VALID_FIELDS: if isinstance(line, ACL_REMARK): continue if not isinstance(line[attr], OBJ): continue selfmatch = line[attr] is self parentmatch = line[attr].has(self) if not (selfmatch or parentmatch): continue if not d.get(acl_name): d[acl_name] = {} if not d[acl_name].get(i): d[acl_name][i] = {} d[acl_name][i][attr] = self if selfmatch else parentmatch return d def _len_of_members(self): """supporting len() : count of total members Returns: int: count of total members """ l = 0 for v in self._repr_dic.values(): l += len(v) return l def _contains(self, member): """supporting - [x in, not in instance]: Args: member (str, int, list, set, tuple, Network, Ports): member(s) to check in self Returns: Bool: boolean return if matches member in self. """ if isinstance(member, (str, int, Network, Ports)): member_type = self._get_member_type(member) member_obj = get_member_obj(member_type, member, self.parent) if not self._repr_dic.get(member_type): return None for _ in self[member_type]: if isinstance(_, OBJ) and _._contains(member): return _ else: pass if member_obj == _: return _ elif isinstance(member, (list,set,tuple)): for m in member: if m not in self: return False return True else: # print(type(member)) return None def _add(self, item): """supporting inst.add(member) : method for setting key/value for instance Args: item (str, int, list, set, tuple): item/attribute to be added to object-group Raises: Exception: IncorrectIteminItemType Returns: str: Delta of addition """ if isinstance(item, (tuple, set, list)): s = '' for _ in item: s += self._add(_) return s elif isinstance(item, (str, int)): item_type = self._get_member_type(item) updated_item = self._get_item_object(item_type, item) return self._obj_add(item_type, updated_item) else: item_type = self._get_member_type(item) raise Exception(f"IncorrectIteminItemType-{item_type}/{item}") def _delete(self, item): """supporting inst.delete(member) : method for removing key/value for instance Args: item (str, int, list, set, tuple): item/attribute to be removed from object-group Raises: Exception: IncorrectIteminItemType Returns: str: Delta of deletion """ if isinstance(item, (tuple, set, list)): s = '' for _ in item: s += str(self._delete(_)) return s elif isinstance(item, (str, int)): item_type = self._get_member_type(item) updated_item = self._get_item_object(item_type, item) return self._obj_delete(item_type, updated_item) else: item_type = self._get_member_type(item) raise Exception(f"IncorrectIteminItemType-{item_type}/{item}") def _missing(self, obj): """supporting in comparision between to instances (a > b, a < b): compare and return differences in dictionary. Args: obj (OBJ): another object of an object-group to match with Returns: dict: differences dictionary """ diffs = {} if not isinstance(obj, OBJ): return self t = self.obj_grp_type == obj.obj_grp_type s = self.obj_grp_svc_filter == obj.obj_grp_svc_filter if not t or not s: return diffs for self_k, self_v in self._repr_dic.items(): if not obj._repr_dic.get(self_k): obj._repr_dic[self_k] = None obj_v = obj[self_k] if obj_v is None: diffs[self_k] = self_v else: found = self_v == obj_v if found: continue diffs[self_k] = self_v.difference(obj_v) return diffs # ----------------- String repr / supportive --------------- # def _group_head(self): """return String representation of object-group header/name line Returns: str: object-group header/name """ return (f"object-group {self.obj_grp_type} {self._name} {self.obj_grp_svc_filter}\n") def _group_description(self): """return String representation of object-group description line Returns: str: object-group description """ return (f" description {self.description}\n") def _to_str(self, dic, header=True): """return String representation of object-group ( add/remove actions ) Args: dic (dict): Delta dictionary generated during add/remove action header (bool, optional): add header line or not. Defaults to True Returns: str: object-group """ s = self._group_head() if header else "" m = '' negate = 'no ' if dic is self.removals else '' for _type, candidates in dic.items(): for c in candidates: # _c = self._get_candidate_str(c) m += f" {negate}{_type} {c}\n" s += m return s if m else m ## ----------- Other Supportive to Supportives --------- ## def _get_candidate_str(self, c): """supporting method to get the OBJ object name Args: c (OBJ): object-group object Returns: str: object-group name """ return c.name if isinstance(c, OBJ) else c def _get_item_object(self, item_type, item): """supporting method for retriving member-type/member pair for for a member Args: item_type (str): string of item type item (str): string of item Returns: various: item object( Network, Ports, etc. ) """ spl_line = [item_type] spl_line.extend(item.split()) updated_item = self._get_member(item_type, spl_line) return updated_item def _get_member(self, obj_type, spl_line): """supporting method for retriving member object for provided object-type Args: obj_type (str): string of item type spl_line (list): object group member splitted line Raises: Exception: InvalidGroupMemberType Returns: various: item object( Network, Ports, etc. ) """ if obj_type == 'network-object': member = network_group_member(spl_line, 1, self.parent) elif obj_type == 'port-object': member = port_group_member(spl_line, 1, self.parent) elif obj_type == 'icmp-object': member = icmp_group_member(spl_line) elif obj_type == 'protocol-object': member = protocol_group_member(spl_line) elif obj_type == 'group-object': member = group_object_member(spl_line, self.parent) else: raise Exception(f"InvalidGroupMemberType-Noticed-[{obj_type}]\n{spl_line}") return member def _obj_add(self, item_type, item): """supporting method for setting key/value for instance Args: item_type (str): string of item type item (str): item object( Network, Ports, etc. ) Returns: str: string of delta changes for item addition """ if not self.adds.get(item_type): self.adds[item_type] = set() if not self._repr_dic.get(item_type): self._repr_dic[item_type] = set() self._repr_dic[item_type].add(item) self.adds[item_type].add(item) return f" {item_type} {item}\n" # supporting method for removing key/value for instance def _obj_delete(self, item_type, item): """ # supporting method for removing key/value for instance Args: item_type (str): string of item type item (str): item object( Network, Ports, etc. ) Raises: Exception: NoValidCandidate Returns: str: string of delta changes for item deletion """ if not self.removals.get(item_type): self.removals[item_type] = set() try: self._repr_dic[item_type].remove(item) self.removals[item_type].add(item) if len(self._repr_dic[item_type]) == 0: del(self._repr_dic[item_type]) return f" no {item_type} {item}\n" except: print(f"NoValidCandidateFoundToRemove/OrAlreadyRemoved-\n{item_type}: {item}") def _get_member_type(self, member): """dynamic detection of member-type for given member Args: member (str): string representation of member Raises: Exception: InvalidMember Returns: str: string representation of member-type for provided member """ for k, v in MEMBERS_MEMBERTYPES.items(): if member in k: return v try: network_member(member, self.parent) return 'network-object' except: pass try: port_member(member, self.parent) return 'port-object' except: pass if isinstance(member, str) and member in self.parent: return 'group-object' raise Exception(f"InvalidMemberFound:{member}, unable to generate member type for it.") # ~~~~~~ CALLABLE ~~~~~~~~~~~~~~~~~~~ # USED WHILE INITIATLIZING / PARSER # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[docs] def set_instance_primary_details(self, obj_grp_details): """set primary variable details of instance Args: obj_grp_details (dict): object-group primary/basic details (candidates-list, type, service-filter) """ self.obj_grp_lines_list = obj_grp_details['candiates_list'] self.obj_grp_type = obj_grp_details['type'] self.obj_grp_svc_filter = obj_grp_details['svc_filter']
[docs] def parse(self): """starts parsing object-group-config-lines-list and set extended variables of instance """ for line in self.obj_grp_lines_list: spl_line = line.lstrip().split() sub_obj_type = spl_line[0] if sub_obj_type == 'description': self.description = line[13:] continue member = self._get_member(spl_line[0], spl_line) if not self._repr_dic.get(spl_line[0]): self._repr_dic[spl_line[0]] = set() member = self._get_candidate_str(member) self._repr_dic[spl_line[0]].add(member)
# ~~~~~~~~~~~~~~~~~~~ PROPERTIES ~~~~~~~~~~~~~~~~~~~ @property def grp_details(self): """object group details in dictionary (helpful in generating copy) Returns: dict: object-group primary/basic details """ _grp_details = { 'type': self.obj_grp_type, 'svc_filter': self.obj_grp_svc_filter, 'candiates_list': [], } return _grp_details
# ---------------------------------------------------------------------------------------- # Main # ---------------------------------------------------------------------------------------- if __name__ == '__main__': pass # ----------------------------------------------------------------------------------------