# ----------------------------------------------------------------------------------------
from nettoolkit import *
from .entity import *
from .static import *
# ----------------------------------------------------------------------------------------
# Control Functions
# ----------------------------------------------------------------------------------------
[docs]def network_group_member(spl_line, idx, objectGroups=None):
"""returns Network group member object from given splitted line
Args:
spl_line (list): splitted line of an acl entry
idx (int): index position to start looking for network
objectGroups (OBJS, optional): object-groups object. Defaults to None.
Raises:
Exception: UndefinedEndPointType
Returns:
(Network, OBJ, None): Network group member object
"""
if spl_line[idx] == 'object-group':
try:
return objectGroups[spl_line[idx+1]] # <- OBJ
except:
return None
elif spl_line[idx] in ANY:
return Network(*DEFAULT_ROUTE)
else:
address = spl_line[idx+1] if spl_line[idx] == 'host' else spl_line[idx]
ao = addressing(address)
if type(ao) == IPv6:
return Network(address)
if type(ao) == IPv4:
try:
return Network(address, spl_line[idx+1])
except:
return Network(address)
raise Exception(f"UndefinedEndPointType: {spl_line}\n{idx}")
[docs]def port_group_member(spl_line, idx, objectGroups=None):
"""returns Port group member object from given splitted line
Args:
spl_line (list): splitted line of an acl entry
idx (int): index position to start looking for port(s)
objectGroups (OBJS, optional): object-groups object. Defaults to None.
Raises:
Exception: UndefinedPort/PortType
Returns:
(Ports, OBJ, None): Network group member object
"""
try: spl_line[idx]
except: return ''
if 4 < len(spl_line) <= 8:
pts = Ports("", "")
elif spl_line[idx] == 'eq':
pts = Ports(spl_line[idx], spl_line[idx+1])
elif spl_line[idx] =='range':
pts = Ports(spl_line[idx], spl_line[idx+1], spl_line[idx+2])
elif spl_line[idx] == 'object-group':
try:
pts = objectGroups[spl_line[idx+1]]
except:
pts = None
pass ### bypassed temporily
elif spl_line[idx] in ICMP:
pts = Ports(spl_line[idx], "")
elif spl_line[4] == 'icmp': ### Exceptional match for missing icmp ports
pts = Ports("", 'echo') # define port as echo in this case
elif spl_line[idx] == 'log':
return ''
else:
raise Exception(f"UndefinedPort/PortType: {spl_line} at index {idx}")
return pts
[docs]def icmp_group_member(spl_line):
"""returns icmp port group member object from given splitted line
Args:
spl_line (list): splitted line of an acl entry
Returns:
IcmpProtocol: IcmpProtocol member object
"""
pts = IcmpProtocol(spl_line[-1])
return pts
[docs]def protocol_group_member(spl_line):
"""returns protocol group member object from given splitted line
Args:
spl_line (list): splitted line of an acl entry
Returns:
NetworkProtocol: NetworkProtocol member object
"""
pts = NetworkProtocol(spl_line[-1])
return pts
[docs]def group_object_member(spl_line, objectGroups=None):
"""returns object-group object from given splitted line
Args:
spl_line ([type]): [description]
objectGroups ([type], optional): [description]. Defaults to None.
Returns:
OBJ: object-group OBJ member object
"""
try:
return objectGroups[spl_line[-1]]
except:
return None
[docs]def network_member(network, objs=None):
"""returns Network group member object for given network, objs will require if network has object-group.
Args:
network (str): ip-network string
objs (OBJS, optional): object-groups object. Defaults to None.
Raises:
Exception: InvalidNetwork
Returns:
Network: Network group member object
"""
if not isinstance(network, str): return network
# ----------------------------------------------------
network = network.strip()
# ----------------------------------------------------
if network in ANY: return Network(*DEFAULT_ROUTE)
# ----------------------------------------------------
spl_network = network.split("/")
net_obj = None
if len(spl_network) == 2:
net_obj = addressing(network)
if net_obj:
mask = int(spl_network[1])
return Network(spl_network[0], bin_mask(mask))
# ----------------------------------------------------
spl_network = network.split(" ")
if len(spl_network) == 2:
if spl_network[0] == 'object-group':
return objs[spl_network[1]]
mask = to_dec_mask(spl_network[1])
net = spl_network[0] +"/"+ str(mask)
net_obj = addressing(net)
if net_obj:
return Network(spl_network[0], spl_network[1])
# ----------------------------------------------------
else:
subnet = network + "/32"
net_obj = addressing(network)
if net_obj:
return Network(network, '255.255.255.255')
# ----------------------------------------------------
raise Exception(f"InvalidNetwork")
[docs]def port_member(port, objs):
"""returns Port group member object for given port, objs will require if port has object-group
Args:
port (str): port string
objs (OBJS): object-groups object
Raises:
Exception: InvalidPort
Returns:
Ports: Ports member object
"""
port = str(port).strip()
if port.startswith('eq '): port = port[3:].lstrip()
if port.startswith('range '): port = port[6:].lstrip()
if port in ICMP: return Ports("", port)
# ----------------------------------------------------
spl_port = port.split(" ")
if len(spl_port) == 2:
if spl_port[0] == 'object-group':
return objs[spl_port[1]]
return Ports('range', spl_port[0], spl_port[1])
dspl_port = port.split("-")
if len(dspl_port) == 2: return Ports('range', dspl_port[0], dspl_port[1])
elif len(dspl_port) == 1 and len(spl_port) == 1: return Ports('eq', port)
# ----------------------------------------------------
raise Exception(f"InvalidPort")
# ----------------------------------------------------------------------------------------
[docs]def get_match_dict(request_parameters, objs):
"""search for request parameters and return matching parameters dictionary.
(dictionary with attributes require to match in ACL)
Args:
request_parameters (dict): request paramters in dictionary
objs (OBJS): object-groups object
Returns:
dict: with filtered parameters only
"""
matching_parameters = ('remark', 'acl_type', 'action', 'protocol', 'source',
'destination', 'ports',)
network_member_parameters = ('source', 'destination')
port_member_parameters = ('ports',)
matching_dict = {}
for item in matching_parameters:
if item in network_member_parameters and item in request_parameters:
matching_dict[item] = network_member(request_parameters[item], objs)
elif item in port_member_parameters and item in request_parameters:
matching_dict[item] = port_member(request_parameters[item], objs)
elif item in request_parameters:
matching_dict[item] = request_parameters[item]
return matching_dict
# ----------------------------------------------------------------------------------------
# Other Functions
# ----------------------------------------------------------------------------------------
[docs]def get_port_name(n):
"""update and return well known port number for port name
Args:
n (int): port number
Returns:
str: well-known port name else port number
"""
return PORT_MAPPINGS[n] if PORT_MAPPINGS.get(n) else n
[docs]def update_ports_name(requests):
"""update and return well known port number for port name in given request port
Args:
requests (dict): acl attributes in request dictionary
Returns:
dict: updated request attributes
"""
for request in requests:
request['ports'] = get_port_name(request['ports'])
return requests
# ----------------------------------------------------------------------------------------
# Main
# ----------------------------------------------------------------------------------------
if __name__ == '__main__':
pass
# ----------------------------------------------------------------------------------------