#!/usr/bin/env python # The software in this package is distributed under the GNU General # Public License version 2 (with a special exception described below). # # A copy of GNU General Public License (GPL) is included in this distribution, # in the file COPYING.GPL. # # As a special exception, if other files instantiate templates or use macros # or inline functions from this file, or you compile this file and link it # with other works to produce a work based on this file, this file # does not by itself cause the resulting work to be covered # by the GNU General Public License. # # However the source code for this file must still be made available # in accordance with section (3) of the GNU General Public License. # # This exception does not invalidate any other reasons why a work based # on this file might be covered by the GNU General Public License. # # Copyright (c) 2016-2018 Intra2net AG """ Representation for connd state as returned by "tell-connd --status". Copyright: Intra2net AG """ import subprocess from re import match as regexp from os import EX_OK # constants DEFAULT_TELL_CONND_BINARY = '/usr/intranator/bin/tell-connd' TIMEOUT = 10 ONLINE_MODE_ALWAYS_ONLINE = 'always online' ONLINE_MODE_ALWAYS_OFFLINE = 'always offline' ONLINE_MODE_DIAL_ON_COMMAND = 'dial on command' ONLINE_MODE_DIAL_ON_DEMAND = 'dial on demand' SUBSYS_CONND2 = 'connd2' SUBSYS_DNS = 'dns' SUBSYS_DYNDNS = 'dyndns' SUBSYS_MAIL = 'mail' SUBSYS_NTP = 'ntp' SUBSYS_SOCKS = 'socks' SUBSYS_VPN = 'vpn' SUBSYS_WEBPROXY = 'webproxy' SUBSYS_PINGCHECK = 'pingcheck' SUBSYS_IPONLINE = 'iponline' ALL_SUBSYS = (SUBSYS_CONND2, SUBSYS_DNS, SUBSYS_DYNDNS, SUBSYS_MAIL, SUBSYS_NTP, SUBSYS_SOCKS, SUBSYS_VPN, SUBSYS_WEBPROXY, SUBSYS_PINGCHECK, SUBSYS_IPONLINE) ALL_MODES = (ONLINE_MODE_DIAL_ON_DEMAND, ONLINE_MODE_DIAL_ON_COMMAND, ONLINE_MODE_ALWAYS_OFFLINE, ONLINE_MODE_ALWAYS_ONLINE) class ConndState(object): """Representation of connd's status as returned by tell-connd --status.""" online_mode = None default_provider = None subsys_online = None subsys_offline = None subsys_disabled = None connections = None actions = None online_ips = None connected_vpns = None log_level = None log_file = None def __str__(self): return \ '[ConndState: {0} (default {1}), {2} conn\'s, {3} ips, {4} vpns ]' \ .format(self.online_mode, self.default_provider, len(self.connections), len(self.online_ips), len(self.connected_vpns)) def complete_str(self): """Return a string representating the complete state.""" # general parts = [ 'ConndState: online mode = "{0}" (default provider: {1})\n' .format(self.online_mode, self.default_provider), ] # subsys # ' connctns: (repeated here for correct aligning) parts.append(' subsys: online: ') if self.subsys_online: for subsys in self.subsys_online: parts.append(subsys + ' ') else: parts.append('None ') parts.append('; offline: ') if self.subsys_offline: for subsys in self.subsys_offline: parts.append(subsys + ' ') else: parts.append('None ') parts.append('; disabled: ') if self.subsys_disabled: for subsys in self.subsys_disabled: parts.append(subsys + ' ') else: parts.append('None') parts.append('\n') # connections parts.append(' conns: ') if self.connections: name, info, actions = self.connections[0] parts.append('{0}: {1}, {2}\n'.format(name, info, actions)) else: parts.append('None\n') for name, info, actions in self.connections[1:]: # ' connctns: (repeated here for correct aligning) parts.append(' {0}: {1}, {2}\n'.format(name, info, actions)) # actions # ' connctns: (repeated here for correct aligning) parts.append(' actions: ') if self.actions: parts.append(self.actions[0] + '\n') else: parts.append('None\n') for action in self.actions[1:]: # ' connctns: (repeated here for correct aligning) parts.append(' {0}\n'.format(action)) # online IPs # ' connctns: (repeated here for correct aligning) parts.append(' IPs: ') if self.online_ips: parts.append(self.online_ips[0]) for curr_ip in self.online_ips[1:]: parts.append(', {0}'.format(curr_ip)) else: parts.append('None') parts.append('\n') # VPNs # ' connctns: (repeated here for correct aligning) parts.append(' VPNs: ') if self.connected_vpns: parts.append(self.connected_vpns[0]) for vpn in self.connected_vpns[1:]: parts.append(', {0}'.format(vpn)) else: parts.append('None') parts.append('\n') # log level and target: # ' connctns: (repeated here for correct aligning) parts.append(' Log: level {0}'.format(self.log_level)) if self.log_file: parts.append(' to {0}'.format(self.log_file)) parts.append('\n') return ''.join(parts) @staticmethod def run_tell_connd(tell_connd_binary=DEFAULT_TELL_CONND_BINARY, *args): """ Run "tell-connd --status", return output iterator and return code. Catches all it can, so should usually return (output, return_code) where output = [line1, line2, ...] If return_code != 0, output's first line(s) is error message. .. todo:: Use reST parameter description here. """ try: cmd_parts = [tell_connd_binary, ] cmd_parts.extend(*args) output = subprocess.check_output(cmd_parts, stderr=subprocess.STDOUT, universal_newlines=True, shell=False, timeout=TIMEOUT) return EX_OK, output.splitlines() except subprocess.CalledProcessError as cpe: # non-zero return status output = [ 'tell-connd exited with status {0}'.format(cpe.returncode), ] output.extend(cpe.output.splitlines()) return cpe.returncode, output except subprocess.TimeoutExpired as texp: output = [f'tell-connd timed out after {texp.timeout}s. Returning -1', ] output.extend(texp.output.splitlines()) return -1, output except Exception as exp: output = [str(exp), ] return -1, output @staticmethod def get_state(tell_connd_binary=DEFAULT_TELL_CONND_BINARY): """ Get actual state from "tell-connd --status". Returns (err_code, output_lines) if something goes wrong running binary; raises assertion if output from tell-connd does not match expected format. .. todo:: Use reST parameter description here. """ state = ConndState() err_code, all_lines = ConndState.run_tell_connd(tell_connd_binary, ['--status', ]) if err_code != EX_OK: return err_code, all_lines output = iter(all_lines) # first section line = next(output).strip() state.online_mode = regexp(r'online mode\s*:\s*(.+)$', line).groups()[0] assert state.online_mode in ALL_MODES, \ 'unexpected online mode: {0}'.format(state.online_mode) line = next(output).strip() state.default_provider = regexp(r'default provider\s*:\s*(.*)$', line).groups()[0] if len(state.default_provider) == 0: state.default_provider = None line = next(output).strip() assert len(line) == 0, 'expected empty line, but got {0}'.format(line) # subsys line = next(output).strip() assert line == 'subsys', 'expected subsys but got {0}'.format(line) line = next(output).strip() state.subsys_online = regexp(r'online\s*:\s*(.*)$', line) \ .groups()[0].split() for subsys in state.subsys_online: assert subsys in ALL_SUBSYS, \ 'unexpected subsys: {0}'.format(subsys) line = next(output).strip() state.subsys_offline = regexp(r'offline\s*:\s*(.*)$', line) \ .groups()[0].split() for subsys in state.subsys_offline: assert subsys in ALL_SUBSYS, \ 'unexpected subsys: {0}'.format(subsys) line = next(output).strip() state.subsys_disabled = regexp(r'disabled\s*:\s*(.*)$', line) \ .groups()[0].split() for subsys in state.subsys_disabled: assert subsys in ALL_SUBSYS, \ 'unexpected subsys: {0}'.format(subsys) line = next(output).strip() assert len(line) == 0, 'expected empty line, but got {0}'.format(line) # connection map state.connections = [] line = next(output).strip() assert line == 'connection map:', \ 'expected connection map but got {0}'.format(line) expect_new = True conn_name = None conn_info = None for line in output: line = line.strip() if len(line) == 0: continue if expect_new: if line == 'end of connection map': break conn_name, conn_info = regexp( r'\[\s*(.+)\s*\]\s*:\s*\(\s*(.*)\s*\)', line).groups() expect_new = False else: conn_actions = regexp(r'actions\s*:\s*\[\s*(.+)\s*\]', line) \ .groups() assert conn_name is not None and conn_info is not None, \ 'error parsing connection maps' state.connections.append((conn_name, conn_info, conn_actions)) expect_new = True conn_name = None conn_info = None assert expect_new line = next(output).strip() assert len(line) == 0, 'expected empty line, but got {0}'.format(line) # actions line = next(output).strip() state.actions = regexp(r'actions\s*:\s*(.*)', line).groups()[0].split() if len(state.actions) == 1 and state.actions[0].strip() == '-': state.actions = [] line = next(output).strip() assert len(line) == 0, 'expected empty line, but got {0}'.format(line) # online IPs line = next(output).strip() state.online_ips = regexp(r'list of online ips\s*:\s*(.*)', line) \ .groups()[0].split() if len(state.online_ips) == 1 \ and state.online_ips[0].strip() == 'NONE': state.online_ips = [] line = next(output).strip() assert len(line) == 0, 'expected empty line, but got {0}'.format(line) # VPNs state.connected_vpns = [] line = next(output).strip() assert line == 'vpns connected:', \ 'expected vpns connected, got {0}'.format(line) for line in output: line = line.strip() if len(line) == 0: continue elif line == 'end of list of connected vpns': break else: state.connected_vpns.append(line) line = next(output).strip() assert len(line) == 0, 'expected empty line, but got {0}'.format(line) # log level line = next(output).strip() state.log_level, state.log_file = \ regexp(r'Logging with level (.+)(?:\s+to\s+(.+))?', line).groups() # done line = next(output).strip() assert len(line) == 0, 'expected empty line, but got {0}'.format(line) line = next(output).strip() assert line == 'Done.', 'expect Done but got {0}'.format(line) return state @staticmethod def set_online_mode(state, provider=None, tell_connd_binary=DEFAULT_TELL_CONND_BINARY): """ Change online state with optional provider. Provider is silently ignored for ONLINE_MODE_ALWAYS_OFFLINE and otherwise required. Returns result of :py:func:`run_tell_connd`: (error_code, output_lines). """ # check args need_provider = True if state == ONLINE_MODE_DIAL_ON_DEMAND: args = ['--dial-on-demand', provider] elif state == ONLINE_MODE_DIAL_ON_COMMAND: args = ['--dial-on-command', provider] elif state == ONLINE_MODE_ALWAYS_ONLINE: args = ['--online', provider] elif state == ONLINE_MODE_ALWAYS_OFFLINE: args = ['--offline', ] need_provider = False else: raise ValueError('unknown state: {0}!'.format(state)) if need_provider and not provider: raise ValueError('Given state {0} requires a provider!'.format( state)) # run binary return ConndState.run_tell_connd(tell_connd_binary, args) def test(): """Get state and print it.""" state = ConndState.get_state() if not isinstance(state, ConndState): err_code, output_lines = state print('tell-connd failed with error code {0} and output:'.format( err_code)) for line in output_lines: print('tell-connd: {0}'.format(line)) print('(end of tell-connd output)') else: print(state) print(state.complete_str()) def main(): """Main function, called when running file as script; runs test().""" test() if __name__ == '__main__': main()