captive.whump.shanti-portal/plugins/iptables.py

153 lines
4.1 KiB
Python
Raw Normal View History

# Add an iptables rule
2016-12-08 13:25:45 +00:00
# This actually runs a command, so you can either define an iptables
# command or a script. See the plugins.cfg for the options that are
# replaced into the command line.
2016-04-16 19:31:11 +00:00
import re
2016-04-17 09:21:37 +00:00
import socket
2016-04-16 19:31:11 +00:00
from io import BytesIO
from logging import getLogger, DEBUG, WARN, INFO
2016-04-17 09:21:37 +00:00
try:
2016-04-18 19:33:35 +00:00
from configparser import RawConfigParser
2016-04-17 09:21:37 +00:00
except ImportError:
from ConfigParser import RawConfigParser
from portal import logHandler, logFormatter
2016-04-16 19:31:11 +00:00
# Try to import arping for mac_from_ip()
2016-12-08 13:25:45 +00:00
use_arping = True
try:
from sh import arping
except ImportError:
2016-12-08 13:25:45 +00:00
use_arping = False
pass
2016-04-16 19:31:11 +00:00
# By default run iptables through sudo, so the worker process must run with
# a user that is allowed to execute iptables commands with NOPASSWD.
from sh import sudo, ErrorReturnCode
def run(arg):
2016-04-16 19:31:11 +00:00
# Some info from the plugin dispatcher.
environ = arg['environ']
2016-04-17 09:21:37 +00:00
plugin_config = arg['config']
config = RawConfigParser(defaults=plugin_config)
config.add_section('iptables')
config._sections['iptables'] = plugin_config
2016-04-16 19:31:11 +00:00
# Setup plugin logging
l = getLogger('plugin_iptables')
l.addHandler(logHandler)
if config.getboolean('iptables', 'debug'):
l.setLevel(DEBUG)
l.debug('debug logging enabled')
2017-03-03 00:03:56 +00:00
# Get client IP from webapp, try HTTP_X_FORWARDED_FOR and fallback on
# REMOTE_ADDR.
2016-04-16 22:22:13 +00:00
client_ip = environ.get(
'HTTP_X_FORWARDED_FOR',
environ.get('REMOTE_ADDR')
)
2016-04-16 19:31:11 +00:00
client_mac = None
error_msg = None
iptables_failed = False
2016-12-08 13:25:45 +00:00
# Verify client IP
2016-04-17 09:21:37 +00:00
try:
socket.inet_aton(client_ip)
except socket.error:
2017-11-16 22:55:53 +00:00
l.error('Client ip:{ip} is invalid'.format(
ip=repr(client_ip)
))
2016-04-17 09:21:37 +00:00
return {
'error': str(e),
'failed': True
}
2016-12-08 13:25:45 +00:00
# Attempt to get client HW address with arping
if use_arping:
try:
client_mac = mac_from_ip(
l,
config.get('iptables', 'arping'),
client_ip
)
except Exception as e:
2017-11-15 22:11:51 +00:00
l.info('Failed to get client HW address: {error}'.format(
2016-12-08 13:25:45 +00:00
error=str(e)
))
error_msg = str(e)
pass
2016-04-16 19:31:11 +00:00
if client_ip:
iptables_cmd = config.get('iptables', 'iptables_cmd').format(
ip_address=client_ip,
mac_address=client_mac
2016-04-16 19:31:11 +00:00
)
output = BytesIO()
error = BytesIO()
try:
# The two arguments must not contain spaces of course.
rc = sudo(tuple(iptables_cmd.split(' ')), _out=output, _err=error)
2016-04-16 19:31:11 +00:00
except ErrorReturnCode:
error.seek(0)
error_msg = error.read()
l.warn('{cmd}: exited badly: {error}'.format(
cmd=('iptables', iptables_cmd),
2016-04-16 19:31:11 +00:00
error=error_msg
))
iptables_failed = True
2017-11-15 22:11:51 +00:00
raise
2016-04-16 19:31:11 +00:00
except Exception as e:
l.warn('{cmd}: failed: {error}'.format(
cmd=('iptables', iptables_cmd),
2016-04-16 19:31:11 +00:00
error=str(e)
))
error_msg = str(e)
iptables_failed = True
2017-11-15 22:11:51 +00:00
raise
2016-04-16 19:31:11 +00:00
2016-12-08 15:15:14 +00:00
if rc.exit_code == 0:
2017-11-16 22:55:53 +00:00
l.debug('Created iptables rule for client:{ip}'.format(
ip=client_ip
))
2016-12-08 15:15:14 +00:00
2016-04-16 19:31:11 +00:00
# If all else fails, error! This will be shown to end users.
return {
'error': error_msg,
'failed': iptables_failed
}
# Try to get the mac address from the IP. Requires arping installed.
2016-04-16 19:31:11 +00:00
def mac_from_ip(l, arping_args, ip):
# Add IP to args
arping_args = arping_args.format(
ip_address=ip
)
# Make args into tuple
arping_args = tuple(arping_args.split(' '))
# Save output as buffer object
output = BytesIO()
# Run arping
arping(arping_args, _out=output)
output.seek(0)
# Parse HW address from output
for _line in output:
line = _line.decode('utf-8')
2016-04-16 19:31:11 +00:00
line_start = 'Unicast reply from {ip} '.format(
ip=ip
)
if line.startswith(line_start):
m = re.search('(([0-9A-Fa-f]{2}[:]){5}([0-9A-Fa-f]{2}))', line)
if m: return m.group(0)