trigger.acl — ACL parsing library

Warning

Much of the functionality in this library WILL NOT work without Redis installed and WITH_ACLS set to True in your settings.py. If you have ACL support disabled, proceed at your own risk!

Trigger’s ACL parser.

This library contains various modules that allow for parsing, manipulation, and management of network access control lists (ACLs). It will parse a complete ACL and return an ACL object that can be easily translated to any supported vendor syntax.

trigger.acl.literals(d)

Longest match of all the strings that are keys of ‘d’.

trigger.acl.parse(input_data)

Parse a complete ACL and return an ACL object. This should be the only external interface to the parser.

>>> from trigger.acl import parse
>>> aclobj = parse("access-list 123 permit tcp any host 10.20.30.40 eq 80")
>>> aclobj.terms
[<Term: None>]
Parameters:input_data – An ACL policy as a string or file-like object.
trigger.acl.S(prod)

Wrap your grammar token in this to call your helper function with a list of each parsed subtag, instead of the raw text. This is useful for performing modifiers.

Parameters:prod – The parser product.
class trigger.acl.ACL(name=None, terms=None, format=None, family=None, interface_specific=False)

An abstract access-list object intended to be created by the parse() function.

name_terms()

Assign names to all unnamed terms.

output(format=None, *largs, **kwargs)

Output the ACL data in the specified format.

output_ios(replace=False)

Output the ACL in IOS traditional format.

Parameters:replace – If set the ACL is preceded by a no access-list line.
output_ios_brocade(replace=False, receive_acl=False)

Output the ACL in Brocade-flavored IOS format.

The difference between this and “traditional” IOS are:

  • Stripping of comments
  • Appending of ip rebind-acl or ip rebind-receive-acl line
Parameters:
  • replace – If set the ACL is preceded by a no access-list line.
  • receive_acl – If set the ACL is suffixed with a ip rebind-receive-acl' instead of ``ip rebind-acl.
output_ios_named(replace=False)

Output the ACL in IOS named format.

Parameters:replace – If set the ACL is preceded by a no access-list line.
output_iosxr(replace=False)

Output the ACL in IOS XR format.

Parameters:replace – If set the ACL is preceded by a no ipv4 access-list line.
output_junos(replace=False, family=None)

Output the ACL in JunOS format.

Parameters:
  • replace – If set the ACL is wrapped in a firewall { replace: ... } section.
  • family – If set, the value is used to wrap the ACL in a family inet { ...} section.
strip_comments()

Strips all comments from ACL header and all terms.

class trigger.acl.ACLProcessor
class trigger.acl.Comment(data)

Container for inline comments.

output_ios()

Output the Comment to IOS traditional format.

output_ios_named()

Output the Comment to IOS named format.

output_iosxr()

Output the Comment to IOS XR format.

output_junos()

Output the Comment to JunOS format.

class trigger.acl.Matches(d=None, **kwargs)

Container class for Term.match object used for membership tests on access checks.

ios_address_str(addrs)

Convert a list of addresses to IOS-style stupid strings.

Parameters:addrs – List of IP address objects.
ios_port_str(ports)

Convert a list of tuples back to ranges, then to strings.

Parameters:ports – A list of port tuples, e.g. [(0,65535), (1,2)].
junos_str(pair)

Convert a 2-tuple into a hyphenated string, e.g. a range of ports. If not a tuple, tries to treat it as IPs or failing that, casts it to a string.

Parameters:pair – The 2-tuple to convert.
output_ios()

Return a string of IOS ACL bodies.

output_junos()

Return a list that can form the from { ... } clause of the term.

class trigger.acl.Policer(name, data)

Container class for policer policy definitions. This is a dummy class for now, that just passes it through as a string.

class trigger.acl.PolicerGroup(format=None)

Container for Policer objects. Juniper only.

class trigger.acl.Protocol(arg)

A protocol object used for access membership tests in Term objects. Acts like an integer, but stringify into a name if possible.

class trigger.acl.RangeList(data=None)

A type which stores ordered sets, with efficient handling of ranges. It can also store non-incrementable terms as an sorted set without collapsing into ranges.

This is currently used to just store match conditions (e.g. protocols, ports), but could be fleshed out into a general-purpose class. One thing to think about is how/whether to handle a list of tuples as distinct from a list of ranges. Should we just store them as xrange objects? Should the object appear as discrete elements by default, for example in len(), with the collapsed view as a method, or should we keep it as it is now? All the current uses of this class are in this file and have unit tests, so when we decided what the semantics of the generalized module ought to be, we can make it so without worry.

expanded()

Return a list with all ranges converted to discrete elements.

class trigger.acl.Remark(data)

IOS extended ACL “remark” lines automatically become comments when converting to other formats of ACL.

output_ios_named()

Output the Remark to IOS named format.

class trigger.acl.Term(name=None, action='accept', match=None, modifiers=None, inactive=False, isglobal=False, extra=None)

An individual term from which an ACL is made

output(format, *largs, **kwargs)

Output the term to the specified format

Parameters:format – The desired output format.
output_ios(prefix=None, acl_name=None)

Output term to IOS traditional format.

Parameters:
  • prefix – Prefix to use, default: ‘access-list’
  • acl_name – Name of access-list to display
output_ios_named(prefix='', *args, **kwargs)

Output term to IOS named format.

output_iosxr(prefix='', *args, **kwargs)

Output term to IOS XR format.

output_junos(*args, **kwargs)

Convert the term to JunOS format.

set_action_or_modifier(action)

Add or replace a modifier, or set the primary action. This method exists for the convenience of parsers.

class trigger.acl.TermList

Container class for Term objects within an ACL object.

class trigger.acl.TIP(data, **kwargs)

Class based on IPy.IP, but with extensions for Trigger.

Currently, only the only extension is the ability to negate a network block. Only used internally within the parser, as it’s not complete (doesn’t interact well with IPy.IP objects). Does not handle IPv6 yet.

trigger.acl.autoacl

This module controls when ACLs get auto-applied to network devices, in addition to what is specified in acls.db.

This is primarily used by AclsDB to populate the implicit ACL-to-device mappings.

No changes should be made to this module. You must specify the path to the autoacl logic inside of settings.py as AUTOACL_FILE. This will be exported as autoacl so that the module path for the autoacl() function will still be trigger.autoacl.autoacl().

This trickery allows us to keep the business-logic for how ACLs are mapped to devices out of the Trigger packaging.

If you do not specify a location for AUTOACL_FILE or the module cannot be loaded, then a default autoacl() function ill be used.

trigger.acl.autoacl.autoacl(dev, explicit_acls=None)

Given a NetDevice object, returns a set of implicit (auto) ACLs. We require a device object so that we don’t have circular dependencies between netdevices and autoacl.

This function MUST return a set() of acl names or you will break the ACL associations. An empty set is fine, but it must be a set!

Parameters:
  • dev – A NetDevice object.
  • explicit_acls – A set containing names of ACLs. Default: set()
>>> dev = nd.find('test1-abc')
>>> dev.vendor
<Vendor: Juniper>
>>> autoacl(dev)
set(['juniper-router-protect', 'juniper-router.policer'])

NOTE: If the default function is returned it does nothing with the arguments and always returns an empty set.

trigger.acl.db

Redis-based replacement of the legacy acls.db file. This is used for interfacing with the explicit and automatic ACL-to-device mappings.

>>> from trigger.netdevices import NetDevices
>>> from trigger.acl.db import AclsDB
>>> nd = NetDevices()
>>> dev = nd.find('test1-abc')
>>> a = AclsDB()
>>> a.get_acl_set(dev)
set(['juniper-router.policer', 'juniper-router-protect', 'abc123'])
>>> a.get_acl_set(dev, 'explicit')
set(['abc123'])
>>> a.get_acl_set(dev, 'implicit')
set(['juniper-router.policer', 'juniper-router-protect'])
>>> a.get_acl_dict(dev)
{'all': set(['abc123', 'juniper-router-protect', 'juniper-router.policer']),
 'explicit': set(['abc123']),
  'implicit': set(['juniper-router-protect', 'juniper-router.policer'])}
trigger.acl.db.get_matching_acls(wanted, exact=True, match_acl=True, match_device=False, nd=None)

Return a sorted list of the names of devices that have at least one of the wanted ACLs, and the ACLs that matched on each. Without ‘exact’, match ACL name by startswith.

To get a list of devices, matching the ACLs specified:

>>> adb.get_matching_acls(['abc123'])
[('fw1-xyz.net.aol.com', ['abc123']), ('test1-abc.net.aol.com', ['abc123'])]

To get a list of ACLS matching the devices specified using an explicit match (default) by setting match_device=True:

>>> adb.get_matching_acls(['test1-abc'], match_device=True)
[]
>>> adb.get_matching_acls(['test1-abc.net.aol.com'], match_device=True)
[('test1-abc.net.aol.com', ['abc123', 'juniper-router-protect',
'juniper-router.policer'])]

To get a list of ACLS matching the devices specified using a partial match. Not how it returns all devices starting with ‘test1-mtc’:

>>> adb.get_matching_acls(['test1-abc'], match_device=True, exact=False)
[('test1-abc.net.aol.com', ['abc123', 'juniper-router-protect',
'juniper-router.policer'])]
trigger.acl.db.get_all_acls(nd=None)

Returns a dict keyed by acl names whose containing a set of NetDevices objects to which each acl is applied.

@nd can be your own NetDevices object if one is not supplied already

>>> all_acls = get_all_acls()
>>> all_acls['abc123']
set([<NetDevice: test1-abc.net.aol.com>, <NetDevice: fw1-xyz.net.aol.com>])
trigger.acl.db.get_bulk_acls(nd=None)

Returns a set of acls with an applied count over settings.AUTOLOAD_BULK_THRESH.

trigger.acl.db.populate_bulk_acls(nd=None)

Given a NetDevices instance, Adds bulk_acls attribute to NetDevice objects.

class trigger.acl.db.AclsDB

Container for ACL operations.

add/remove operations are for explicit associations only.

add_acl(device, acl)

Add explicit acl to device

>>> dev = nd.find('test1-mtc')
>>> a.add_acl(dev, 'acb123')
'added acl abc123 to test1-mtc.net.aol.com'
get_acl_dict(device)

Returns a dict of acl mappings for a @device, which is expected to be a NetDevice object.

>>> a.get_acl_dict(dev)
{'all': set(['115j', 'protectRE', 'protectRE.policer', 'test-bluej',
'testgreenj', 'testops_blockmj']),
'explicit': set(['test-bluej', 'testgreenj', 'testops_blockmj']),
'implicit': set(['115j', 'protectRE', 'protectRE.policer'])}
get_acl_set(device, acl_set='all')

Return an acl set matching @acl_set for a given device. Match can be one of [‘all’, ‘explicit’, ‘implicit’]. Defaults to ‘all’.

>>> a.get_acl_set(dev)
set(['testops_blockmj', 'testgreenj', '115j', 'protectRE',
'protectRE.policer', 'test-bluej'])
>>> a.get_acl_set(dev, 'explicit')
set(['testops_blockmj', 'test-bluej', 'testgreenj'])
>>> a.get_acl_set(dev, 'implicit')
set(['protectRE', 'protectRE.policer', '115j'])
remove_acl(device, acl)

Remove explicit acl from device.

>>> a.remove_acl(dev, 'acb123')
'removed acl abc123 from test1-mtc.net.aol.com'

trigger.acl.parser

Parse and manipulate network access control lists.

This library doesn’t completely follow the border of the valid/invalid ACL set, which is determined by multiple vendors and not completely documented by any of them. We could asymptotically approach that with an enormous amount of testing, although it would require a ‘flavor’ flag (vendor, router model, software version) for full support. The realistic goal is to catch all the errors that we see in practice, and to accept all the ACLs that we use in practice, rather than to try to reject every invalid ACL and accept every valid ACL.

>>> from trigger.acl import parse
>>> aclobj = parse("access-list 123 permit tcp any host 10.20.30.40 eq 80")
>>> aclobj.terms
[<Term: None>]
trigger.acl.parser.literals(d)

Longest match of all the strings that are keys of ‘d’.

trigger.acl.parser.parse(input_data)

Parse a complete ACL and return an ACL object. This should be the only external interface to the parser.

>>> from trigger.acl import parse
>>> aclobj = parse("access-list 123 permit tcp any host 10.20.30.40 eq 80")
>>> aclobj.terms
[<Term: None>]
Parameters:input_data – An ACL policy as a string or file-like object.
trigger.acl.parser.S(prod)

Wrap your grammar token in this to call your helper function with a list of each parsed subtag, instead of the raw text. This is useful for performing modifiers.

Parameters:prod – The parser product.
class trigger.acl.parser.ACL(name=None, terms=None, format=None, family=None, interface_specific=False)

An abstract access-list object intended to be created by the parse() function.

name_terms()

Assign names to all unnamed terms.

output(format=None, *largs, **kwargs)

Output the ACL data in the specified format.

output_ios(replace=False)

Output the ACL in IOS traditional format.

Parameters:replace – If set the ACL is preceded by a no access-list line.
output_ios_brocade(replace=False, receive_acl=False)

Output the ACL in Brocade-flavored IOS format.

The difference between this and “traditional” IOS are:

  • Stripping of comments
  • Appending of ip rebind-acl or ip rebind-receive-acl line
Parameters:
  • replace – If set the ACL is preceded by a no access-list line.
  • receive_acl – If set the ACL is suffixed with a ip rebind-receive-acl' instead of ``ip rebind-acl.
output_ios_named(replace=False)

Output the ACL in IOS named format.

Parameters:replace – If set the ACL is preceded by a no access-list line.
output_iosxr(replace=False)

Output the ACL in IOS XR format.

Parameters:replace – If set the ACL is preceded by a no ipv4 access-list line.
output_junos(replace=False, family=None)

Output the ACL in JunOS format.

Parameters:
  • replace – If set the ACL is wrapped in a firewall { replace: ... } section.
  • family – If set, the value is used to wrap the ACL in a family inet { ...} section.
strip_comments()

Strips all comments from ACL header and all terms.

class trigger.acl.parser.ACLProcessor
class trigger.acl.parser.Comment(data)

Container for inline comments.

output_ios()

Output the Comment to IOS traditional format.

output_ios_named()

Output the Comment to IOS named format.

output_iosxr()

Output the Comment to IOS XR format.

output_junos()

Output the Comment to JunOS format.

class trigger.acl.parser.Matches(d=None, **kwargs)

Container class for Term.match object used for membership tests on access checks.

ios_address_str(addrs)

Convert a list of addresses to IOS-style stupid strings.

Parameters:addrs – List of IP address objects.
ios_port_str(ports)

Convert a list of tuples back to ranges, then to strings.

Parameters:ports – A list of port tuples, e.g. [(0,65535), (1,2)].
junos_str(pair)

Convert a 2-tuple into a hyphenated string, e.g. a range of ports. If not a tuple, tries to treat it as IPs or failing that, casts it to a string.

Parameters:pair – The 2-tuple to convert.
output_ios()

Return a string of IOS ACL bodies.

output_junos()

Return a list that can form the from { ... } clause of the term.

class trigger.acl.parser.Policer(name, data)

Container class for policer policy definitions. This is a dummy class for now, that just passes it through as a string.

class trigger.acl.parser.PolicerGroup(format=None)

Container for Policer objects. Juniper only.

class trigger.acl.parser.Protocol(arg)

A protocol object used for access membership tests in Term objects. Acts like an integer, but stringify into a name if possible.

class trigger.acl.parser.RangeList(data=None)

A type which stores ordered sets, with efficient handling of ranges. It can also store non-incrementable terms as an sorted set without collapsing into ranges.

This is currently used to just store match conditions (e.g. protocols, ports), but could be fleshed out into a general-purpose class. One thing to think about is how/whether to handle a list of tuples as distinct from a list of ranges. Should we just store them as xrange objects? Should the object appear as discrete elements by default, for example in len(), with the collapsed view as a method, or should we keep it as it is now? All the current uses of this class are in this file and have unit tests, so when we decided what the semantics of the generalized module ought to be, we can make it so without worry.

expanded()

Return a list with all ranges converted to discrete elements.

class trigger.acl.parser.Remark(data)

IOS extended ACL “remark” lines automatically become comments when converting to other formats of ACL.

output_ios_named()

Output the Remark to IOS named format.

class trigger.acl.parser.Term(name=None, action='accept', match=None, modifiers=None, inactive=False, isglobal=False, extra=None)

An individual term from which an ACL is made

output(format, *largs, **kwargs)

Output the term to the specified format

Parameters:format – The desired output format.
output_ios(prefix=None, acl_name=None)

Output term to IOS traditional format.

Parameters:
  • prefix – Prefix to use, default: ‘access-list’
  • acl_name – Name of access-list to display
output_ios_named(prefix='', *args, **kwargs)

Output term to IOS named format.

output_iosxr(prefix='', *args, **kwargs)

Output term to IOS XR format.

output_junos(*args, **kwargs)

Convert the term to JunOS format.

set_action_or_modifier(action)

Add or replace a modifier, or set the primary action. This method exists for the convenience of parsers.

class trigger.acl.parser.TermList

Container class for Term objects within an ACL object.

class trigger.acl.parser.TIP(data, **kwargs)

Class based on IPy.IP, but with extensions for Trigger.

Currently, only the only extension is the ability to negate a network block. Only used internally within the parser, as it’s not complete (doesn’t interact well with IPy.IP objects). Does not handle IPv6 yet.

trigger.acl.queue

Database interface for automated ACL queue. Used primarily by load_acl and acl` commands for manipulating the work queue.

>>> from trigger.acl.queue import Queue
>>> q = Queue()
>>> q.list()
(('dc1-abc.net.aol.com', 'datacenter-protect'), ('dc2-abc.net.aol.com',
'datacenter-protect'))
class trigger.acl.queue.Queue(verbose=True)

Interacts with firewalls database to insert/remove items into the queue.

Parameters:verbose (Boolean) – Toggle verbosity
complete(device, acls)

Mark a device and its ACLs as complete using current timestamp.

(Integrated queue only.)

Parameters:
  • device – Device names
  • acls – List of ACL names
create_task(queue, *args, **kwargs)

Create a task in the specified queue.

Parameters:queue – Name of the queue whose object you want
delete(acl, routers=None, escalation=False)

Delete an ACL from the firewall database queue.

Attempts to delete from integrated queue. If ACL test fails or if routers are not specified, the item is deleted from manual queue.

Parameters:
  • acl – ACL name
  • routers – List of device names. If this is ommitted, the manual queue is used.
  • escalation – Whether this is an escalated task
get_model(queue)

Given a queue name, return its DB model.

Parameters:queue – Name of the queue whose object you want
insert(acl, routers, escalation=False)

Insert an ACL and associated devices into the ACL load queue.

Attempts to insert into integrated queue. If ACL test fails, then item is inserted into manual queue.

Parameters:
  • acl – ACL name
  • routers – List of device names
  • escalation – Whether this is an escalated task
list(queue='integrated', escalation=False, q_names=('integrated', 'manual'))

List items in the specified queue, defauls to integrated queue.

Parameters:
  • queue – Name of the queue to list
  • escalation – Whether this is an escalated task
  • q_names – (Optional) List of valid queue names
remove(acl, routers, escalation=False)

Integrated queue only.

Mark an ACL and associated devices as “removed” (loaded=0). Intended for use when performing manual actions on the load queue when troubleshooting or addressing errors with automated loads. This leaves the items in the database but removes them from the active queue.

Parameters:
  • acl – ACL name
  • routers – List of device names
  • escalation – Whether this is an escalated task
vprint(msg)

Print something if verbose instance variable is set.

Parameters:msg – The string to print

trigger.acl.tools

Various tools for use in scripts or other modules. Heavy lifting from tools that have matured over time have been moved into this module.

trigger.acl.tools.create_trigger_term(source_ips=[], dest_ips=[], source_ports=[], dest_ports=[], protocols=[], action=['accept'], name='generated_term')

Constructs & returns a Term object from constituent parts.

trigger.acl.tools.create_access(terms_to_check, new_term)

Breaks a new_term up into separate constituent parts so that they can be compared in a check_access test.

Returns a list of terms that should be inserted.

trigger.acl.tools.check_access(terms_to_check, new_term, quiet=True, format='junos', acl_name=None)

Determine whether access is permitted by a given ACL (list of terms).

Tests a new term against a list of terms. Return True if access in new term is permitted, or False if not.

Optionally displays the terms that apply and what edits are needed.

Parameters:
  • terms_to_check – A list of Term objects to check
  • new_term – The Term object used for the access test
  • quiet – Toggle whether output is displayed
  • format – The ACL format to use for output display
  • acl_name – The ACL name to use for output display
class trigger.acl.tools.ACLScript(acl=None, mode='insert', cmd='acl_script', show_mods=True, no_worklog=False, no_changes=False)

Interface to generating or modifying access-lists. Intended for use in creating command-line utilities using the ACL API.

trigger.acl.tools.process_bulk_loads(work, max_hits=1, force_bulk=False)

Formerly “process –ones”.

Processes work dict and determines tuple of (prefix, site) for each device. Stores tuple as a dict key in prefix_hits. If prefix_hits[(prefix, site)] is greater than max_hits, remove all further matching devices from work dict.

By default if a device has no acls flagged as bulk_acls, it is not removed from the work dict.

Example:
  • Device ‘foo1-xyz.example.com’ returns (‘foo’, ‘xyz’) as tuple.
  • This is stored as prefix_hits[(‘foo’, ‘xyz’)] = 1
  • All further devices matching that tuple increment the hits for that tuple
  • Any devices matching hit counter exceeds max_hits is removed from work dict

You may override max_hits to increase the num. of devices on which to load a bulk acl. You may pass force_bulk=True to treat all loads as bulk loads.

trigger.acl.tools.get_bulk_acls()

Returns a dict of acls with an applied count over settings.AUTOLOAD_BULK_THRESH

trigger.acl.tools.get_comment_matches(aclobj, requests)

Given an ACL object and a list of ticket numbers return a list of matching comments.

trigger.acl.tools.write_tmpacl(acl, process_name='_tmpacl')

Write a temporary file to disk from an Trigger acl.ACL object & return the filename

trigger.acl.tools.diff_files(old, new)

Return a unified diff between two files

trigger.acl.tools.worklog(title, diff, log_string='updated by express-gen')

Save a diff to the ACL worklog

trigger.acl.tools.insert_term_into_acl(new_term, aclobj, debug=False)

Return a new ACL object with the new_term added in the proper place based on the aclobj. Intended to recursively append to an interim ACL object based on a list of Term objects.

It’s safe to assume that this function is incomplete pending better documentation and examples.

Parameters:
  • new_term – The Term object to use for comparison against aclobj
  • aclobj – The original ACL object to use for creation of new_acl

Example:

import copy
# terms_to_be_added is a list of Term objects that is to be added in
# the "right place" into new_acl based on the contents of aclobj
original_acl = parse(open('acl.original'))
new_acl = copy.deepcopy(original_acl) # Dupe the original
for term in terms_to_be_added:
    new_acl = generate_new_acl(term, new_acl)
trigger.acl.tools.create_new_acl(old_file, terms_to_be_added)

Given a list of Term objects call insert_term_into_acl() to determine what needs to be added in based on the contents of old_file. Returns a new ACL object.