#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File: tunnel.py
#
# Copyright 2021 Vincent Schouten
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
#
"""
Main code for tunnel.
.. _Google Python Style Guide:
http://google.github.io/styleguide/pyguide.html
NOTE: The Tunnel classes are responsible to purge the stream (ie. index in stream is at COMMAND_PROMPT)
"""
# from abc import ABC, abstractmethod
import threading
from time import sleep
import pexpect
from .logging import LoggerMixin
__author__ = '''Vincent Schouten <powermole@protonmail.com>'''
__docformat__ = '''google'''
__date__ = '''10-05-2019'''
__copyright__ = '''Copyright 2021, Vincent Schouten'''
__credits__ = ["Vincent Schouten"]
__license__ = '''MIT'''
__maintainer__ = '''Vincent Schouten'''
__email__ = '''<powermole@protonmail.com>'''
__status__ = '''Development''' # "Prototype", "Development", "Production".
# Constant for Pexpect. This prompt is default for Fedora and CentOS.
COMMAND_PROMPT = '[#$] '
[docs]class Tunnel(LoggerMixin):
"""Establishes a connection to the target destination host via one or more intermediaries.
Be aware, the child's buffer needs to be purged periodically. This can be done by invoking
periodically_purge_buffer(). As verbose mode is enabled for SSH (the child process), it
will slowly fill up the buffer, so this has to be taken care of. But don't invoke this
method before having start()'ed BootstrapAgent.
"""
def __init__(self, path_ssh_cfg, mode, all_host_addr, group_ports, # pylint: disable=too-many-arguments
forward_connections=None):
"""Initializes the Tunnel object.
Args:
path_ssh_cfg (str): Path to the SSH config file that is generated by write_ssh_config_file().
mode (str): Contains any of these values: TOR|FOR|PLAIN.
all_host_addr (list): IP addresses of all hosts (eg. gateway/intermediary and destination hosts).
group_ports (dict): Port names with port numbers.
forward_connections (str): Formatted as "-Lport:host:hostport".
"""
super().__init__()
self.path_ssh_cfg = path_ssh_cfg
self.mode = mode
self.all_host_addr = all_host_addr
self.last_host_addr = all_host_addr[-1]
self.group_ports = group_ports
self.forward_connections = forward_connections
self.inbound_address_socks = 'localhost'
self.child = None
self.thread = None
self.terminate = False
self.authenticated_hosts = []
def __str__(self):
return 'Tunnel'
def _generate_ssh_runtime_param(self):
var_param = None
# block below composes _mode specific_ forwarding strings
if self.mode == 'FOR':
var_param = f'{self.forward_connections} '
elif self.mode == 'TOR':
var_param = f'-L{self.group_ports["local_port_proxy"]}:{self.inbound_address_socks}:' \
f'{self.group_ports["remote_port_proxy"]} '
elif self.mode == 'PLAIN':
var_param = '' # no *additional* ports will be forwarded
if len(self.all_host_addr) == 2:
# the result will be in this format: 'host1 host2' --> '10.10.1.72 10.10.2.92'
order_of_hosts = f'{self.all_host_addr[0]} {self.all_host_addr[1]}'
else:
# the result will be something in this format: 'host1,host2 host3' --> '10.10.1.72,10.10.2.92 10.10.3.52'
order_of_hosts = ''
for i, host in enumerate(self.all_host_addr):
if i == 0:
order_of_hosts += f'{host}'
elif i < len(self.all_host_addr) - 1:
order_of_hosts += f',{host}'
else:
order_of_hosts += f' {host}' # is this branch necessary?
# block below composes _base_ forwarding strings
runtime_param = f'ssh -v -F {self.path_ssh_cfg} ' \
f'-L{self.group_ports["local_port_agent"]}:localhost:' \
f'{self.group_ports["remote_port_agent"]} ' \
f'-L{self.group_ports["local_port_heartbeat"]}:localhost:' \
f'{self.group_ports["remote_port_heartbeat"]} ' \
f'-L{self.group_ports["local_port_command"]}:localhost:' \
f'{self.group_ports["remote_port_command"]} ' \
f'-L{self.group_ports["local_port_transfer"]}:localhost:' \
f'{self.group_ports["remote_port_transfer"]} '
runtime_param += var_param
runtime_param += f'-J {order_of_hosts}'
self._logger.debug(runtime_param)
return runtime_param
[docs] def start(self, debug=None):
"""Establishes an SSH tunnel.
It determines along the way if the authentication process is successful.
In addition, this method and mines for 'Authenticated' keywords, so
we can keep track which hosts have been connected through.
SSH is here a 'child application'.
Args:
debug(basestring): if True, TIMEOUT will not be raised and may block indefinitely. Use only for debugging
purposes to capture the output of the child, which is essentially, hidden 'under the
hood', and write it to a file.
"""
result = False
try:
arguments = {"command": self._generate_ssh_runtime_param(), "env": {"TERM": "dumb"}, "encoding": 'utf-8'}
if debug:
arguments.update({"timeout": 10})
self.child = pexpect.spawn(**arguments)
# setecho() doesn't seem to have effect.
# doc says: Not supported on platforms where isatty() returns False.
# perhaps related to the recursive shells (SSH spawns a new shell in the current shell)
self.child.setecho(False)
self._logger.debug('going through the stream to match patterns: %s', self.all_host_addr)
for hostname in self.all_host_addr:
# according to the documentation, "If you wish to read up to the end of the child's output
# without generating an EOF exception then use the expect(pexpect.EOF) method."
# but apparently this doesn't work in a shell within a shell (SSH spawns a new shell)
index = self.child.expect(
[f'Authenticated to {hostname}', 'Last failed login:', 'Last login:', 'socket error',
'not accessible', 'fingerprint', 'open failed: connect failed:', pexpect.TIMEOUT])
result = False # reset var as this var could be set True in a previous iteration, we want fresh start
if index == 0:
self._logger.info('authenticated to %s', hostname) # logger level is "info" to inform user
self.authenticated_hosts.append(hostname)
result = True
elif index == 1:
self._logger.debug('there were failed login attempts')
result = True
elif index == 2:
self._logger.debug('there were no failed login attempts')
result = True
elif index == 3:
self._logger.error('socket error. probable cause: SSH service on proxy or target machine disabled')
break
elif index == 4:
self._logger.error('the identity file is not accessible')
break
elif index == 5:
self._logger.warning('warning: hostname automatically added to list of known hosts')
self.child.sendline('yes') # security issue
elif index == 6:
self._logger.error('SSH could not connect to %s', hostname)
break
elif index == 7:
self._logger.error('TIMEOUT exception was thrown. SSH could probably not connect to %s', hostname)
break
else:
self._logger.error('unknown state reached')
self.child.expect(COMMAND_PROMPT)
except pexpect.exceptions.ExceptionPexpect:
self._logger.error('EOF is read; SSH has exited abnormally')
self.child.terminate()
if not result:
self._logger.error('debug information: %s', str(self.child))
self.child.terminate()
return result
[docs] def stop(self):
"""Closes the SSH connection essentially by terminating the program SSH."""
self.terminate = True
if self.child.isalive():
self._logger.debug('SSH is alive, terminating')
self.child.terminate()
self._logger.debug('SSH terminated')
return True
[docs] def debug(self):
"""Captures the output of the child (warning: BLOCKING)."""
with open('~/mylog.txt', 'a', encoding='utf-8') as file:
fout = file
self.child.logfile = fout
try:
self.child.readlines()
except pexpect.ExceptionPexpect:
pass
[docs] def periodically_purge_buffer(self):
"""Purges the child's (SSH) output buffer due to buffer limitations."""
self.thread = threading.Thread(target=self._run_purger)
self.thread.start()
def _run_purger(self):
while not self.terminate:
try:
self.child.expect([pexpect.TIMEOUT], timeout=0.2)
sleep(2)
except pexpect.exceptions.EOF:
pass