trigger.acl — ACL parsing library

Trigger’s ACL parser.

This library contains various modules that allow for parsing, manipulation, and management of network access control lists (ACLs). It will parse a complete ACL and return an ACL object that can be easily translated to any supported vendor syntax.

trigger.acl.parse(input_data)

Parse a complete ACL and return an ACL object. This should be the only external interface to the parser.

Parameters:data – An ACL policy as a string or file-like object.
class trigger.acl.ACL(name=None, terms=None, format=None, family=None)

An abstract access-list object intended to be created by the parse() function.

name_terms()

Assign names to all unnamed terms.

output(format=None, *largs, **kwargs)

Output the ACL data in the specified format.

output_ios(replace=False)

Output the ACL in IOS traditional format.

Parameters:replace – If set the ACL is preceded by a no access-list line.
output_ios_brocade(replace=False, receive_acl=False)

Output the ACL in Brocade-flavored IOS format.

The difference between this and “traditional” IOS are:

  • Stripping of comments
  • Appending of ip rebind-acl or ip rebind-receive-acl line
Parameters:
  • replace – If set the ACL is preceded by a no access-list line.
  • receive_acl – If set the ACL is suffixed with a ip rebind-receive-acl' instead of ``ip rebind-acl.
output_ios_named(replace=False)

Output the ACL in IOS named format.

Parameters:replace – If set the ACL is preceded by a no access-list line.
output_iosxr(replace=False)

Output the ACL in IOS XR format.

Parameters:replace – If set the ACL is preceded by a no ipv4 access-list line.
output_junos(replace=False, family=None)

Output the ACL in JunOS format.

Parameters:
  • replace – If set the ACL is wrapped in a firewall { replace: ... } section.
  • family – If set, the value is used to wrap the ACL in a family inet { ...} section.
strip_comments()

Strips all comments from ACL header and all terms.

trigger.acl.autoacl

This module controls when ACLs get auto-applied to network devices, in addition to what is specified in acls.db.

This is primarily used by AclsDB to populate the implicit ACL-to-device mappings.

No changes should be made to this module. You must specify the path to the autoacl logic inside of settings.py as AUTOACL_FILE. This will be exported as autoacl so that the module path for the autoacl() function will still be trigger.autoacl.autoacl().

This trickery allows us to keep the business-logic for how ACLs are mapped to devices out of the Trigger packaging.

If you do not specify a location for AUTOACL_FILE or the module cannot be loaded, then a default autoacl() function ill be used.

trigger.acl.autoacl.autoacl(dev, explicit_acls=None)

Given a NetDevice object, returns a set of implicit (auto) ACLs. We require a device object so that we don’t have circular dependencies between netdevices and autoacl.

This function MUST return a set() of acl names or you will break the ACL associations. An empty set is fine, but it must be a set!

Parameters:
  • dev – A NetDevice object.
  • explicit_acls – A set containing names of ACLs. Default: set()
>>> dev = nd.find('test1-abc')
>>> dev.vendor
<Vendor: Juniper>
>>> autoacl(dev)
set(['juniper-router-protect', 'juniper-router.policer'])

NOTE: If the default function is returned it does nothing with the arguments and always returns an empty set.

trigger.acl.db

Redis-based replacement of the legacy acls.db file. This is used for interfacing with the explicit and automatic ACL-to-device mappings.

>>> from trigger.netdevices import NetDevices
>>> from trigger.acl.db import AclsDB
>>> nd = NetDevices()
>>> dev = nd.find('test1-abc')
>>> a = AclsDB()
>>> a.get_acl_set(dev)
set(['juniper-router.policer', 'juniper-router-protect', 'abc123'])
>>> a.get_acl_set(dev, 'explicit')
set(['abc123'])
>>> a.get_acl_set(dev, 'implicit')
set(['juniper-router.policer', 'juniper-router-protect'])
>>> a.get_acl_dict(dev)
{'all': set(['abc123', 'juniper-router-protect', 'juniper-router.policer']),
 'explicit': set(['abc123']),
  'implicit': set(['juniper-router-protect', 'juniper-router.policer'])}
trigger.acl.db.get_matching_acls(wanted, exact=True, match_acl=True, match_device=False, nd=None)

Return a sorted list of the names of devices that have at least one of the wanted ACLs, and the ACLs that matched on each. Without ‘exact’, match ACL name by startswith.

To get a list of devices, matching the ACLs specified:

>>> adb.get_matching_acls(['abc123'])
[('fw1-xyz.net.aol.com', ['abc123']), ('test1-abc.net.aol.com', ['abc123'])]

To get a list of ACLS matching the devices specified using an explicit match (default) by setting match_device=True:

>>> adb.get_matching_acls(['test1-abc'], match_device=True)
[]
>>> adb.get_matching_acls(['test1-abc.net.aol.com'], match_device=True)
[('test1-abc.net.aol.com', ['abc123', 'juniper-router-protect',
'juniper-router.policer'])]

To get a list of ACLS matching the devices specified using a partial match. Not how it returns all devices starting with ‘test1-mtc’:

>>> adb.get_matching_acls(['test1-abc'], match_device=True, exact=False)
[('test1-abc.net.aol.com', ['abc123', 'juniper-router-protect',
'juniper-router.policer'])]
trigger.acl.db.get_all_acls(nd=None)

Returns a dict keyed by acl names whose containing a set of NetDevices objects to which each acl is applied.

@nd can be your own NetDevices object if one is not supplied already

>>> all_acls = get_all_acls()
>>> all_acls['abc123']
set([<NetDevice: test1-abc.net.aol.com>, <NetDevice: fw1-xyz.net.aol.com>])
trigger.acl.db.get_bulk_acls(nd=None)

Returns a set of acls with an applied count over settings.AUTOLOAD_BULK_THRESH.

trigger.acl.db.populate_bulk_acls(nd=None)

Given a NetDevices instance, Adds bulk_acls attribute to NetDevice objects.

class trigger.acl.db.AclsDB

Container for ACL operations.

add/remove operations are for explicit associations only.

add_acl(device, acl)

Add explicit acl to device

>>> dev = nd.find('test1-mtc')
>>> a.add_acl(dev, 'acb123')
'added acl abc123 to test1-mtc.net.aol.com'
get_acl_dict(device)

Returns a dict of acl mappings for a @device, which is expected to be a NetDevice object.

>>> a.get_acl_dict(dev)
{'all': set(['115j', 'protectRE', 'protectRE.policer', 'test-bluej',
'testgreenj', 'testops_blockmj']),
'explicit': set(['test-bluej', 'testgreenj', 'testops_blockmj']),
'implicit': set(['115j', 'protectRE', 'protectRE.policer'])}
get_acl_set(device, acl_set='all')

Return an acl set matching @acl_set for a given device. Match can be one of [‘all’, ‘explicit’, ‘implicit’]. Defaults to ‘all’.

>>> a.get_acl_set(dev)
set(['testops_blockmj', 'testgreenj', '115j', 'protectRE',
'protectRE.policer', 'test-bluej'])
>>> a.get_acl_set(dev, 'explicit')
set(['testops_blockmj', 'test-bluej', 'testgreenj'])
>>> a.get_acl_set(dev, 'implicit')
set(['protectRE', 'protectRE.policer', '115j'])
remove_acl(device, acl)

Remove explicit acl from device.

>>> a.remove_acl(dev, 'acb123')
'removed acl abc123 from test1-mtc.net.aol.com'

trigger.acl.parser

Parse and manipulate network access control lists.

This library doesn’t completely follow the border of the valid/invalid ACL set, which is determined by multiple vendors and not completely documented by any of them. We could asymptotically approach that with an enormous amount of testing, although it would require a ‘flavor’ flag (vendor, router model, software version) for full support. The realistic goal is to catch all the errors that we see in practice, and to accept all the ACLs that we use in practice, rather than to try to reject every invalid ACL and accept every valid ACL.

trigger.acl.parser.parse(input_data)

Parse a complete ACL and return an ACL object. This should be the only external interface to the parser.

Parameters:data – An ACL policy as a string or file-like object.
class trigger.acl.parser.Comment(data)

Container for inline comments.

output_ios()

Output the Comment to IOS traditional format.

output_ios_named()

Output the Comment to IOS named format.

output_iosxr()

Output the Comment to IOS XR format.

output_junos()

Output the Comment to JunOS format.

class trigger.acl.parser.Term(name=None, action='accept', match=None, modifiers=None, inactive=False, isglobal=False, extra=None)

An individual term from which an ACL is made

output(format, *largs, **kwargs)

Output the term to the specified format

Parameters:format – The desired output format.
output_ios(prefix=None, acl_name=None)

Output term to IOS traditional format.

Parameters:
  • prefix – Prefix to use, default: ‘access-list’
  • acl_name – Name of access-list to display
output_ios_named(prefix='', *args, **kwargs)

Output term to IOS named format.

output_iosxr(prefix='', *args, **kwargs)

Output term to IOS XR format.

output_junos(*args, **kwargs)

Convert the term to JunOS format.

set_action_or_modifier(action)

Add or replace a modifier, or set the primary action. This method exists for the convenience of parsers.

class trigger.acl.parser.Protocol(arg)

A protocol object used for access membership tests in Term objects. Acts like an integer, but stringify into a name if possible.

class trigger.acl.parser.ACL(name=None, terms=None, format=None, family=None)

An abstract access-list object intended to be created by the parse() function.

name_terms()

Assign names to all unnamed terms.

output(format=None, *largs, **kwargs)

Output the ACL data in the specified format.

output_ios(replace=False)

Output the ACL in IOS traditional format.

Parameters:replace – If set the ACL is preceded by a no access-list line.
output_ios_brocade(replace=False, receive_acl=False)

Output the ACL in Brocade-flavored IOS format.

The difference between this and “traditional” IOS are:

  • Stripping of comments
  • Appending of ip rebind-acl or ip rebind-receive-acl line
Parameters:
  • replace – If set the ACL is preceded by a no access-list line.
  • receive_acl – If set the ACL is suffixed with a ip rebind-receive-acl' instead of ``ip rebind-acl.
output_ios_named(replace=False)

Output the ACL in IOS named format.

Parameters:replace – If set the ACL is preceded by a no access-list line.
output_iosxr(replace=False)

Output the ACL in IOS XR format.

Parameters:replace – If set the ACL is preceded by a no ipv4 access-list line.
output_junos(replace=False, family=None)

Output the ACL in JunOS format.

Parameters:
  • replace – If set the ACL is wrapped in a firewall { replace: ... } section.
  • family – If set, the value is used to wrap the ACL in a family inet { ...} section.
strip_comments()

Strips all comments from ACL header and all terms.

trigger.acl.parser.literals(d)

Longest match of all the strings that are keys of ‘d’.

trigger.acl.parser.IP(arg)

Wrapper for IPy.IP to intercept exception text and make it more user-friendly.

class trigger.acl.parser.Policer(name, data)

Container class for policer policy definitions. This is a dummy class for now, that just passes it through as a string.

class trigger.acl.parser.PolicerGroup(format=None)

Container for Policer objects. Juniper only.

trigger.acl.parser.S(prod)

Wrap your grammar token in this to call your helper function with a list of each parsed subtag, instead of the raw text. This is useful for performing modifiers.

Parameters:prod – The parser product.

trigger.acl.queue

Database interface for automated ACL queue. Used primarily by load_acl and acl` commands for manipulating the work queue.

>>> from trigger.acl.queue import Queue
>>> q = Queue()
>>> q.list()
(('dc1-abc.net.aol.com', 'datacenter-protect'), ('dc2-abc.net.aol.com',
'datacenter-protect'))
class trigger.acl.queue.Queue(verbose=True)

Interacts with firewalls database to insert/remove items into the queue. You may optionally suppress informational messages by passing verbose=False to the constructor.

Parameters:verbose (Boolean) – Toggle verbosity
complete(device, acls)

Integrated queue only.

Mark a device and associated ACLs as complete my updating loaded to current timestampe. Migrated from clear_load_queue() in load_acl.

delete(acl, routers=None, escalation=False)

Delete an ACL from the firewall database queue.

Attempts to delete from integrated queue. If ACL test fails, then item is deleted from manual queue.

insert(acl, routers, escalation=False)

Insert an ACL and associated devices into the ACL load queue.

Attempts to insert into integrated queue. If ACL test fails, then item is inserted into manual queue.

list(queue='integrated', escalation=False)

List items in the queue, defauls to integrated queue.

Valid queue arguments are ‘integrated’ or ‘manual’.

remove(acl, routers, escalation=False)

Integrated queue only.

Mark an ACL and associated devices as “removed” (loaded=0). Intended for use when performing manual actions on the load queue when troubleshooting or addressing errors with automated loads. This leaves the items in the database but removes them from the active queue.

trigger.acl.tools

Various tools for use in scripts or other modules. Heavy lifting from tools that have matured over time have been moved into this module.

trigger.acl.tools.create_trigger_term(source_ips=[], dest_ips=[], source_ports=[], dest_ports=[], protocols=[], action=['accept'], name='generated_term')

Constructs & returns a Term object from constituent parts.

trigger.acl.tools.create_access(terms_to_check, new_term)

Breaks a new_term up into separate constituent parts so that they can be compared in a check_access test.

Returns a list of terms that should be inserted.

trigger.acl.tools.check_access(terms_to_check, new_term, quiet=True, format='junos', acl_name=None)

Determine whether access is permitted by a given ACL (list of terms).

Tests a new term against a list of terms. Return True if access in new term is permitted, or False if not.

Optionally displays the terms that apply and what edits are needed.

Parameters:
  • terms_to_check – A list of Term objects to check
  • new_term – The Term object used for the access test
  • quiet – Toggle whether output is displayed
  • format – The ACL format to use for output display
  • acl_name – The ACL name to use for output display
class trigger.acl.tools.ACLScript(acl=None, mode='insert', cmd='acl_script', show_mods=True, no_worklog=False, no_changes=False)

Interface to generating or modifying access-lists. Intended for use in creating command-line utilities using the ACL API.

trigger.acl.tools.process_bulk_loads(work, max_hits=1, force_bulk=False)

Formerly “process –ones”.

Processes work dict and determines tuple of (prefix, site) for each device. Stores tuple as a dict key in prefix_hits. If prefix_hits[(prefix, site)] is greater than max_hits, remove all further matching devices from work dict.

By default if a device has no acls flagged as bulk_acls, it is not removed from the work dict.

Example:
  • Device ‘foo1-xyz.example.com’ returns (‘foo’, ‘xyz’) as tuple.
  • This is stored as prefix_hits[(‘foo’, ‘xyz’)] = 1
  • All further devices matching that tuple increment the hits for that tuple
  • Any devices matching hit counter exceeds max_hits is removed from work dict

You may override max_hits to increase the num. of devices on which to load a bulk acl. You may pass force_bulk=True to treat all loads as bulk loads.

trigger.acl.tools.get_bulk_acls()

Returns a dict of acls with an applied count over settings.AUTOLOAD_BULK_THRESH

trigger.acl.tools.get_comment_matches(aclobj, requests)

Given an ACL object and a list of ticket numbers return a list of matching comments.

trigger.acl.tools.write_tmpacl(acl, process_name='_tmpacl')

Write a temporary file to disk from an Trigger acl.ACL object & return the filename

trigger.acl.tools.diff_files(old, new)

Return a unified diff between two files

trigger.acl.tools.worklog(title, diff, log_string='updated by express-gen')

Save a diff to the ACL worklog

trigger.acl.tools.insert_term_into_acl(new_term, aclobj, debug=False)

Return a new ACL object with the new_term added in the proper place based on the aclobj. Intended to recursively append to an interim ACL object based on a list of Term objects.

It’s safe to assume that this function is incomplete pending better documentation and examples.

Parameters:
  • new_term – The Term object to use for comparison against aclobj
  • aclobj – The original ACL object to use for creation of new_acl

Example:

import copy
# terms_to_be_added is a list of Term objects that is to be added in
# the "right place" into new_acl based on the contents of aclobj
original_acl = parse(open('acl.original'))
new_acl = copy.deepcopy(original_acl) # Dupe the original
for term in terms_to_be_added:
    new_acl = generate_new_acl(term, new_acl)
trigger.acl.tools.create_new_acl(old_file, terms_to_be_added)

Given a list of Term objects call insert_term_into_acl() to determine what needs to be added in based on the contents of old_file. Returns a new ACL object.