-#!/usr/bin/env python3
+#!/usr/bin/env python
""" Representation for connd state as returned by tell-connd --status
(c) Intra2net AG 2015
"""
-# Version History
-# 16/01/15 Christian Herdtweck: started creation
-
+from __future__ import print_function
import subprocess
from re import match as regexp
from os import EX_OK
# constants
-default_tell_connd_binary = '/usr/intranator/bin/tell-connd'
-timeout = 1
+DEFAULT_TELL_CONND_BINARY = '/usr/intranator/bin/tell-connd'
+TIMEOUT = 1
ONLINE_STATE_ALWAYS_ONLINE = 'always online'
ONLINE_STATE_ALWAYS_OFFLINE = 'always offline'
ONLINE_STATE_DIAL_ON_COMMAND = 'dial on command'
ONLINE_STATE_DIAL_ON_DEMAND = 'dial on demand'
-SUBSYS_DNS = 'dns'
-SUBSYS_DYNDNS = 'dyndns'
-SUBSYS_MAIL = 'mail'
-SUBSYS_NTP = 'ntp'
-SUBSYS_SOCKS = 'socks'
-SUBSYS_VPN = 'vpn'
-SUBSYS_WEBPROXY = 'webproxy'
+SUBSYS_DNS = 'dns'
+SUBSYS_DYNDNS = 'dyndns'
+SUBSYS_MAIL = 'mail'
+SUBSYS_NTP = 'ntp'
+SUBSYS_SOCKS = 'socks'
+SUBSYS_VPN = 'vpn'
+SUBSYS_WEBPROXY = 'webproxy'
SUBSYS_PINGCHECK = 'pingcheck'
-SUBSYS_IPONLINE = 'iponline'
-ALL_SUBSYS = (SUBSYS_DNS, SUBSYS_DYNDNS, SUBSYS_MAIL, SUBSYS_NTP, \
- SUBSYS_SOCKS, SUBSYS_VPN, SUBSYS_WEBPROXY, SUBSYS_PINGCHECK, \
+SUBSYS_IPONLINE = 'iponline'
+ALL_SUBSYS = (SUBSYS_DNS, SUBSYS_DYNDNS, SUBSYS_MAIL, SUBSYS_NTP,
+ SUBSYS_SOCKS, SUBSYS_VPN, SUBSYS_WEBPROXY, SUBSYS_PINGCHECK,
SUBSYS_IPONLINE)
-class ConndState:
+ALL_STATES = (ONLINE_STATE_DIAL_ON_DEMAND, ONLINE_STATE_DIAL_ON_COMMAND,
+ ONLINE_STATE_ALWAYS_OFFLINE, ONLINE_STATE_ALWAYS_ONLINE)
+
+
+class ConndState(object):
""" representation of connd's status as returned by tell-connd --status """
online_mode = None
actions = None
online_ips = None
connected_vpns = None
+ log_level = None
+ log_file = None
def __str__(self):
- return '[ConndState: {0} (default {1}), {2} conn\'s, {3} ips, {4} vpns ]'.format(\
- self.online_mode, self.default_provider, len(self.connections), \
- len(self.online_ips), len(self.connected_vpns))
+ return \
+ '[ConndState: {0} (default {1}), {2} conn\'s, {3} ips, {4} vpns ]'\
+ .format(self.online_mode, self.default_provider,
+ len(self.connections), len(self.online_ips),
+ len(self.connected_vpns))
def complete_str(self):
+ """ return a string representating the complete state """
+
# general
- parts = ['ConndState: online mode = "{0}" (default provider: {1})\n'.format(\
- self.online_mode, self.default_provider), ]
+ parts = [
+ 'ConndState: online mode = "{0}" (default provider: {1})\n'
+ .format(self.online_mode, self.default_provider), ]
# subsys
- # ' connctns:
+ # ' connctns: (repeated here for correct aligning)
parts.append(' subsys: online: ')
if self.subsys_online:
for subsys in self.subsys_online:
else:
parts.append('None\n')
for name, info, actions in self.connections[1:]:
- # ' connctns:
- parts.append(' {0}: {1}, {2}\n'.format(name, info, actions))
+ # ' connctns: (repeated here for correct aligning)
+ parts.append(' {0}: {1}, {2}\n'.format(name, info,
+ actions))
# actions
- # ' connctns:
+ # ' connctns: (repeated here for correct aligning)
parts.append(' actions: ')
if self.actions:
parts.append(self.actions[0] + '\n')
else:
parts.append('None\n')
for action in self.actions[1:]:
- # ' connctns:
+ # ' connctns: (repeated here for correct aligning)
parts.append(' {0}\n'.format(action))
# online IPs
- # ' connctns:
+ # ' connctns: (repeated here for correct aligning)
parts.append(' IPs: ')
if self.online_ips:
parts.append(self.online_ips[0])
- for ip in self.online_ips[1:]:
- parts.append(', {0}'.format(ip))
+ for curr_ip in self.online_ips[1:]:
+ parts.append(', {0}'.format(curr_ip))
else:
parts.append('None')
parts.append('\n')
# VPNs
- # ' connctns:
+ # ' connctns: (repeated here for correct aligning)
parts.append(' VPNs: ')
if self.connected_vpns:
parts.append(self.connected_vpns[0])
parts.append('None')
parts.append('\n')
+ # log level and target:
+ # ' connctns: (repeated here for correct aligning)
+ parts.append(' Log: level {0}'.format(self.log_level))
+ if self.log_file:
+ parts.append(' to {0}'.format(self.log_file))
+ parts.append('\n')
return ''.join(parts)
- #end: ConndState.complete_str
-
+ # end: ConndState.complete_str
@staticmethod
- def run_tell_connd(tell_connd_binary=default_tell_connd_binary):
+ def run_tell_connd(tell_connd_binary=DEFAULT_TELL_CONND_BINARY):
""" run tell-connd --status, return output iterator and return code
catches all it can, so should usually return (output, return_code)
if return_code != 0, output's first line(s) is error message
"""
try:
- output = subprocess.check_output([tell_connd_binary, '--status'], \
- stderr=subprocess.STDOUT, universal_newlines=True, shell=False, timeout=timeout)
+ output = subprocess.check_output(
+ [tell_connd_binary, '--status'], stderr=subprocess.STDOUT,
+ universal_newlines=True, shell=False, timeout=TIMEOUT)
return EX_OK, output.splitlines()
- except subprocess.CalledProcessError as cpe: # non-zero return status
- output = ['tell-connd exited with status {0}'.format(cpe.returncode), ]
- output.extend( cpe.output.splitlines() )
+ except subprocess.CalledProcessError as cpe: # non-zero return status
+ output = [
+ 'tell-connd exited with status {0}'.format(cpe.returncode), ]
+ output.extend(cpe.output.splitlines())
return cpe.returncode, output
- except subprocess.TimeoutExpired as te:
- output = ['tell-connd timed out after {0}s. Returning -1'.format(te.timeout), ]
- output.extend( te.output.splitlines() )
- return -1, output
- except Exception as e:
- output = [str(e),]
+ # not python-2-compatible:
+ # except subprocess.TimeoutExpired as texp:
+ # output = [
+ # 'tell-connd timed out after {0}s. Returning -1'.format(
+ # texp.timeout), ]
+ # output.extend(te.output.splitlines())
+ # return -1, output
+ except Exception as exp:
+ output = [str(exp), ]
return -1, output
- #end: ConndState.run_tell_connd
-
+ # end: ConndState.run_tell_connd
@staticmethod
- def get_state(tell_connd_binary=default_tell_connd_binary):
+ def get_state(tell_connd_binary=DEFAULT_TELL_CONND_BINARY):
""" get actual state from tell-connd --status
- returns (err_code, output_lines) if something goes wrong running binary;
- raises assertion if output from tell-connd does not match expected format
+ returns (err_code, output_lines) if something goes wrong running
+ binary; raises assertion if output from tell-connd does not match
+ expected format
"""
state = ConndState()
# first section
line = next(output).strip()
state.online_mode = regexp('online mode\s*:\s*(.+)$', line).groups()[0]
- assert( state.online_mode in (ONLINE_STATE_DIAL_ON_DEMAND, ONLINE_STATE_DIAL_ON_COMMAND, \
- ONLINE_STATE_ALWAYS_OFFLINE, ONLINE_STATE_ALWAYS_ONLINE) )
+ assert state.online_mode in ALL_STATES, \
+ 'unexpected online mode: {0}'.format(state.online_mode)
+
line = next(output).strip()
- state.default_provider = regexp('default provider\s*:\s*(.*)$', \
+ state.default_provider = regexp('default provider\s*:\s*(.*)$',
line).groups()[0]
if len(state.default_provider) == 0:
state.default_provider = None
line = next(output).strip()
- assert( len(line) == 0 )
+ assert len(line) == 0, 'expected empty line, but got {0}'.format(line)
# subsys
line = next(output).strip()
- assert( line == 'subsys' )
+ assert line == 'subsys', 'expected subsys but got {0}'.format(line)
line = next(output).strip()
- state.subsys_online = regexp( 'online\s*:\s*(.*)$', line).groups()[0].split()
+ state.subsys_online = regexp('online\s*:\s*(.*)$', line)\
+ .groups()[0].split()
for subsys in state.subsys_online:
- assert(subsys in ALL_SUBSYS)
+ assert subsys in ALL_SUBSYS, \
+ 'unexpected subsys: {0}'.format(subsys)
line = next(output).strip()
- state.subsys_offline = regexp( 'offline\s*:\s*(.*)$', line).groups()[0].split()
+ state.subsys_offline = regexp('offline\s*:\s*(.*)$', line)\
+ .groups()[0].split()
for subsys in state.subsys_offline:
- assert(subsys in ALL_SUBSYS)
+ assert subsys in ALL_SUBSYS, \
+ 'unexpected subsys: {0}'.format(subsys)
line = next(output).strip()
- state.subsys_disabled = regexp('disabled\s*:\s*(.*)$', line).groups()[0].split()
+ state.subsys_disabled = regexp('disabled\s*:\s*(.*)$', line)\
+ .groups()[0].split()
for subsys in state.subsys_disabled:
- assert(subsys in ALL_SUBSYS)
+ assert subsys in ALL_SUBSYS, \
+ 'unexpected subsys: {0}'.format(subsys)
line = next(output).strip()
- assert( len(line) == 0 )
+ assert len(line) == 0, 'expected empty line, but got {0}'.format(line)
# connection map
state.connections = []
line = next(output).strip()
- assert( line == 'connection map:' )
+ assert line == 'connection map:', \
+ 'expected connection map but got {0}'.format(line)
expect_new = True
for line in output:
line = line.strip()
if expect_new:
if line == 'end of connection map':
break
- conn_name, conn_info = regexp('\[\s*(.+)\s*\]\s*:\s*\(\s*(.*)\s*\)', line).groups()
+ conn_name, conn_info = regexp(
+ '\[\s*(.+)\s*\]\s*:\s*\(\s*(.*)\s*\)', line).groups()
expect_new = False
else:
- conn_actions = regexp('actions\s*:\s*\[\s*(.+)\s*\]', line).groups()
- state.connections.append( (conn_name, conn_info, conn_actions) )
+ conn_actions = regexp('actions\s*:\s*\[\s*(.+)\s*\]', line)\
+ .groups()
+ state.connections.append((conn_name, conn_info, conn_actions))
expect_new = True
- #end: for lines
- assert( expect_new )
+ # end: for lines
+ assert expect_new
line = next(output).strip()
- assert( len(line) == 0 )
+ assert len(line) == 0, 'expected empty line, but got {0}'.format(line)
# actions
line = next(output).strip()
if len(state.actions) == 1 and state.actions[0].strip() == '-':
state.actions = []
line = next(output).strip()
- assert( len(line) == 0 )
+ assert len(line) == 0, 'expected empty line, but got {0}'.format(line)
# online IPs
line = next(output).strip()
- state.online_ips = regexp('list of online ips\s*:\s*(.*)', \
- line).groups()[0].split()
- if len(state.online_ips) == 1 and state.online_ips[0].strip() == 'NONE':
+ state.online_ips = regexp('list of online ips\s*:\s*(.*)', line)\
+ .groups()[0].split()
+ if len(state.online_ips) == 1 \
+ and state.online_ips[0].strip() == 'NONE':
state.online_ips = []
line = next(output).strip()
- assert( len(line) == 0 )
+ assert len(line) == 0, 'expected empty line, but got {0}'.format(line)
# VPNs
state.connected_vpns = []
line = next(output).strip()
- assert( line == 'vpns connected:' )
+ assert line == 'vpns connected:', \
+ 'expected vpns connected, got {0}'.format(line)
for line in output:
line = line.strip()
if len(line) == 0:
break
else:
state.connected_vpns.append(line)
- #end: for lines
+ # end: for lines
+ line = next(output).strip()
+ assert len(line) == 0, 'expected empty line, but got {0}'.format(line)
+
+ # log level
+ line = next(output).strip()
+ state.log_level, state.log_file = \
+ regexp('Logging with level (.+)(?:\s+to\s+(.+))?', line).groups()
# done
line = next(output).strip()
- assert( len(line) == 0 )
+ assert len(line) == 0, 'expected empty line, but got {0}'.format(line)
line = next(output).strip()
- assert( line == 'Done.' )
+ assert line == 'Done.', 'expect Done but got {0}'.format(line)
return state
- #end: ConndState.get_state
+ # end: ConndState.get_state
-#end: class ConndState
+# end: class ConndState
def test():
+ """ get state and print it """
state = ConndState.get_state()
print(state)
print(state.complete_str())
+
def main():
- """ Main function, called when running file as script; runs test()
- """
+ """ Main function, called when running file as script; runs test() """
test()
-#end: function main
+# end: function main
if __name__ == '__main__':
main()
-