diff --git a/.gitignore b/.gitignore index 0d20b64..7fdea58 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ *.pyc +*.egg-info diff --git a/README.md b/README.md index 38fadbb..86d25ed 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,14 @@ -#Parallel SSH +Parallel SSH +============ + This is a fork of the wonderful project at http://code.google.com/p/parallel-ssh/ A goal of this fork is to keep up to date with the original repository. Ideally we would set up a trigger so updates from the main project will populate to this repo automatically. Another goal is to submit patches to the original project so we can contribute back. -###Changes -* Adding `padb`. This is a parallel implementation of the Android Device Bridge command to allow for easier management of multiple Android devices via USB or adb over tcp. + +Changes +======= + + * Adding `padb`. This is a parallel implementation of the Android Device Bridge command to allow for easier management of multiple Android devices via USB or adb over tcp. diff --git a/bin/pssh b/bin/pssh index 5b6c2a5..5315926 100755 --- a/bin/pssh +++ b/bin/pssh @@ -42,6 +42,12 @@ def option_parser(): help='read from standard input and send as input to ssh') parser.add_option('-P', '--print', dest='print_out', action='store_true', help='print output as we get it') + parser.add_option('--no-annotate-each-line', + dest='no_annotate_lines', action='store_true', + help="Don't prefix each line of the `-P`rinted output with the host info") + parser.add_option('--no-buffer-each-line', + dest='no_buffer_lines', action='store_true', + help="Don't use line-buffering i.e. allow breaking the lines") return parser @@ -114,5 +120,5 @@ if __name__ == "__main__": sys.exit(1) if opts.host_strings: for s in opts.host_strings: - hosts.extend(psshutil.parse_host_string(s, default_user=opts.user)) + hosts.extend(psshutil.parse_host_string(s, default_user=opts.user, all_addresses=opts.all_addresses)) do_pssh(hosts, cmdline, opts) diff --git a/psshlib/cli.py b/psshlib/cli.py index c342cde..88babd0 100644 --- a/psshlib/cli.py +++ b/psshlib/cli.py @@ -31,6 +31,8 @@ def common_parser(): parser.add_option('-H', '--host', dest='host_strings', action='append', metavar='HOST_STRING', help='additional host entries ("[user@]host[:port]")') + parser.add_option('-a', '--all-addresses', dest='all_addresses', action='store_true', + help='Unpack all addresses in the specified hostnames as hosts') parser.add_option('-l', '--user', dest='user', help='username (OPTIONAL)') parser.add_option('-p', '--par', dest='par', type='int', diff --git a/psshlib/manager.py b/psshlib/manager.py index 7dbf4e3..db35e76 100644 --- a/psshlib/manager.py +++ b/psshlib/manager.py @@ -2,6 +2,7 @@ from errno import EINTR import os +import fcntl import select import signal import sys @@ -209,6 +210,7 @@ def __init__(self): # Setup the wakeup file descriptor to avoid hanging on lost signals. wakeup_readfd, wakeup_writefd = os.pipe() + fcntl.fcntl(wakeup_writefd, fcntl.F_SETFL, os.O_NONBLOCK) self.register_read(wakeup_readfd, self.wakeup_handler) # TODO: remove test when we stop supporting Python <2.5 if hasattr(signal, 'set_wakeup_fd'): diff --git a/psshlib/psshutil.py b/psshlib/psshutil.py index 6c67f66..0f1d8ef 100644 --- a/psshlib/psshutil.py +++ b/psshlib/psshutil.py @@ -2,6 +2,7 @@ # Copyright (c) 2003-2008, Brent N. Chun import fcntl +import socket import string import sys @@ -72,8 +73,9 @@ def parse_host_entry(line, default_user, default_port): return host, port, user -def parse_host_string(host_string, default_user=None, default_port=None): - """Parses a whitespace-delimited string of "[user@]host[:port]" entries. +def parse_host_string(host_string, default_user=None, default_port=None, all_addresses=False): + """ + Parses a whitespace-delimited string of "[user@]host[:port]" entries. Returns a list of (host, port, user) triples. """ @@ -81,6 +83,12 @@ def parse_host_string(host_string, default_user=None, default_port=None): entries = host_string.split() for entry in entries: hosts.append(parse_host(entry, default_user, default_port)) + if all_addresses: + hosts_full = [] + for host, port, user in hosts: + for host_address in get_host_addresses(host, port=port): + hosts_full.append((host_address, port, user)) + hosts = hosts_full return hosts @@ -99,8 +107,25 @@ def parse_host(host, default_user=None, default_port=None): return (host, port, user) +def get_host_addresses(host, port=0, **kwargs): + results = socket.getaddrinfo( + host, + port, + # family= + socket.AF_UNSPEC, + # type= + socket.SOCK_STREAM, + # proto= + socket.IPPROTO_TCP, + # flags= + socket.AI_ADDRCONFIG, + ) + return [item[4][0] for item in results] + + def set_cloexec(filelike): - """Sets the underlying filedescriptor to automatically close on exec. + """ + Sets the underlying filedescriptor to automatically close on exec. If set_cloexec is called for all open files, then subprocess.Popen does not require the close_fds option. diff --git a/psshlib/task.py b/psshlib/task.py index f805256..aef6cee 100644 --- a/psshlib/task.py +++ b/psshlib/task.py @@ -13,22 +13,38 @@ BUFFER_SIZE = 1 << 16 + try: bytes except NameError: bytes = str +DEFAULT_ENCODING = 'utf-8' +OUTPUT_FORMATS = { + 'err': '%(host)s ' + color.B(color.r('=>')) + ' %(line)s', + 'out': '%(host)s ' + color.b(color.g('->')) + ' %(line)s', + '': '%(host)s ' + color.B(color.b('->')) + ' %(line)s', + # 'eco': '%(host)s =: %(line)s', # exit code OK + # 'ece': '%(host)s =: %(line)s', # exit code error +} +OUTPUT_FORMATS = {key: val.encode(DEFAULT_ENCODING) for key, val in OUTPUT_FORMATS.items()} +UNTERMINATED_LINE_MARK = color.b(color.y('\\')).encode(DEFAULT_ENCODING) + + class Task(object): - """Starts a process and manages its input and output. + ''' + Starts a process and manages its input and output. Upon completion, the `exitstatus` attribute is set to the exit status of the process. - """ + ''' + def __init__(self, host, port, user, cmd, opts, stdin=None): self.exitstatus = None self.host = host + self.host_b = host.encode(DEFAULT_ENCODING) self.pretty_host = host self.port = port self.cmd = cmd @@ -46,6 +62,7 @@ def __init__(self, host, port, user, cmd, opts, stdin=None): self.inputbuffer = stdin self.byteswritten = 0 self.outputbuffer = bytes() + self.fd_to_buffer = {} self.errorbuffer = bytes() self.stdin = None @@ -53,6 +70,12 @@ def __init__(self, host, port, user, cmd, opts, stdin=None): self.stderr = None self.outfile = None self.errfile = None + try: + self.outstream = sys.stdout.buffer + self.errstream = sys.stderr.buffer + except AttributeError: # PY2 + self.outstream = sys.stdout + self.errstream = sys.stderr # Set options. self.verbose = opts.verbose @@ -64,13 +87,21 @@ def __init__(self, host, port, user, cmd, opts, stdin=None): self.inline = bool(opts.inline) except AttributeError: self.inline = False + try: + self.annotate_lines = not bool(opts.no_annotate_lines) + except AttributeError: + self.annotate_lines = True + try: + self.buffer_lines = not bool(opts.no_buffer_lines) + except AttributeError: + self.buffer_lines = True try: self.inline_stdout = bool(opts.inline_stdout) except AttributeError: self.inline_stdout = False def start(self, nodenum, iomap, writer, askpass_socket=None): - """Starts the process and registers files with the IOMap.""" + ''' Starts the process and registers files with the IOMap. ''' self.writer = writer if writer: @@ -109,7 +140,7 @@ def start(self, nodenum, iomap, writer, askpass_socket=None): iomap.register_read(self.stderr.fileno(), self.handle_stderr) def _kill(self): - """Signals the process to terminate.""" + ''' Signals the process to terminate. ''' if self.proc: try: os.kill(-self.proc.pid, signal.SIGKILL) @@ -119,27 +150,27 @@ def _kill(self): self.killed = True def timedout(self): - """Kills the process and registers a timeout error.""" + ''' Kills the process and registers a timeout error. ''' if not self.killed: self._kill() self.failures.append('Timed out') def interrupted(self): - """Kills the process and registers an keyboard interrupt error.""" + ''' Kills the process and registers an keyboard interrupt error. ''' if not self.killed: self._kill() self.failures.append('Interrupted') def cancel(self): - """Stops a task that has not started.""" + ''' Stops a task that has not started. ''' self.failures.append('Cancelled') def elapsed(self): - """Finds the time in seconds since the process was started.""" + ''' Finds the time in seconds since the process was started. ''' return time.time() - self.timestamp def running(self): - """Finds if the process has terminated and saves the return code.""" + ''' Finds if the process has terminated and saves the return code. ''' if self.stdin or self.stdout or self.stderr: return True if self.proc: @@ -162,7 +193,7 @@ def running(self): return False def handle_stdin(self, fd, iomap): - """Called when the process's standard input is ready for writing.""" + ''' Called when the process's standard input is ready for writing. ''' try: start = self.byteswritten if start < len(self.inputbuffer): @@ -170,11 +201,11 @@ def handle_stdin(self, fd, iomap): self.byteswritten = start + os.write(fd, chunk) else: self.close_stdin(iomap) - except (OSError, IOError): - _, e, _ = sys.exc_info() - if e.errno != EINTR: + except (OSError, IOError) as exc: + ei = sys.exc_info() + if exc.errno != EINTR: self.close_stdin(iomap) - self.log_exception(e) + self.log_exception(exc, ei=ei) def close_stdin(self, iomap): if self.stdin: @@ -183,7 +214,7 @@ def close_stdin(self, iomap): self.stdin = None def handle_stdout(self, fd, iomap): - """Called when the process's standard output is ready for reading.""" + ''' Called when the process's standard output is ready for reading. ''' try: buf = os.read(fd, BUFFER_SIZE) if buf: @@ -192,16 +223,23 @@ def handle_stdout(self, fd, iomap): if self.outfile: self.writer.write(self.outfile, buf) if self.print_out: - sys.stdout.write('%s: %s' % (self.host, buf)) - if buf[-1] != '\n': - sys.stdout.write('\n') + if True: + self.print_annotated_lines(buf, fd=fd) + else: # This is nearly uselss because it intermixes the hosts' data. + # self.outstream.write(b'%s: %s' % (self.host, buf)) + self.outstream.write(buf) + if buf[-1] != '\n': + self.outstream.write(b'\n') else: + if self.annotate_lines: + # Flush the remaining buffer + self.print_annotated_lines(b'', fd=fd, force_finish=True) self.close_stdout(iomap) - except (OSError, IOError): - _, e, _ = sys.exc_info() - if e.errno != EINTR: + except (OSError, IOError) as exc: + ei = sys.exc_info() + if exc.errno != EINTR: self.close_stdout(iomap) - self.log_exception(e) + self.log_exception(exc) def close_stdout(self, iomap): if self.stdout: @@ -213,7 +251,7 @@ def close_stdout(self, iomap): self.outfile = None def handle_stderr(self, fd, iomap): - """Called when the process's standard error is ready for reading.""" + ''' Called when the process's standard error is ready for reading. ''' try: buf = os.read(fd, BUFFER_SIZE) if buf: @@ -221,13 +259,19 @@ def handle_stderr(self, fd, iomap): self.errorbuffer += buf if self.errfile: self.writer.write(self.errfile, buf) + if self.print_out: + if self.annotate_lines: + self.print_annotated_lines(buf, fd=fd, atype='err') else: + if self.annotate_lines: + # Flush the remaining buffer + self.print_annotated_lines('', fd=fd, atype='err', force_finish=True) self.close_stderr(iomap) - except (OSError, IOError): - _, e, _ = sys.exc_info() - if e.errno != EINTR: - self.close_stderr(iomap) - self.log_exception(e) + except (OSError, IOError) as exc: + ei = sys.exc_info() + if exc.errno != EINTR: + self.close_stdin(iomap) + self.log_exception(exc, ei=ei) def close_stderr(self, iomap): if self.stderr: @@ -238,21 +282,65 @@ def close_stderr(self, iomap): self.writer.close(self.errfile) self.errfile = None - def log_exception(self, e): - """Saves a record of the most recent exception for error reporting.""" + def print_annotated_lines(self, buf, atype='out', fd=None, force_finish=False): + if fd is not None: + buf_pre = self.fd_to_buffer.get((fd, atype), bytes()) + self.fd_to_buffer[(fd, atype)] = bytes() + if buf_pre: + buf = buf_pre + buf + + lines_are_unfinished = buf and (buf[-1:] != b'\n') + lines = buf.splitlines() # .split('\n') + + # Quite a few conditions for putting stuff into the buffer + if (fd is not None and lines_are_unfinished and + self.buffer_lines and not force_finish): + self.fd_to_buffer[(fd, atype)] = lines[-1] + lines = lines[:-1] + lines_are_unfinished = False + + if not lines: + return '' + + outs = [] + sformat = OUTPUT_FORMATS.get(atype) or OUTPUT_FORMATS[''] + for idx, line in enumerate(lines): + if self.annotate_lines: + outline = sformat % {b'line': line, b'host': self.host_b} + else: + outline = line + if idx == len(lines) - 1: # last line + if lines_are_unfinished: + # Sort-of-disambiguate + outline = outline + UNTERMINATED_LINE_MARK + outs.append(outline) + out = b'\n'.join(outs) + b'\n' + if atype == 'err': + outbuf = self.errstream + else: + outbuf = self.outstream + outbuf.write(out) + outbuf.flush() + return out + + def log_exception(self, exc, ei=None): + ''' Saves a record of the most recent exception for error reporting. ''' if self.verbose: - exc_type, exc_value, exc_traceback = sys.exc_info() - exc = ("Exception: %s, %s, %s" % - (exc_type, exc_value, traceback.format_tb(exc_traceback))) + if ei is None: + ei = sys.exc_info() + exc_type, exc_value, exc_traceback = ei + exc = ( + 'Exception: %s, %s, %s' % + (exc_type, exc_value, traceback.format_tb(exc_traceback))) else: - exc = str(e) + exc = str(exc) self.failures.append(exc) def report(self, n): """Pretty prints a status report after the Task completes.""" error = ', '.join(self.failures) - tstamp = time.asctime().split()[3] # Current time - if color.has_colors(sys.stdout): + tstamp = time.asctime().split()[3] # Current time + if color.has_colors(self.outstream): progress = color.c("[%s]" % color.B(n)) success = color.g("[%s]" % color.B("SUCCESS")) failure = color.r("[%s]" % color.B("FAILURE")) @@ -265,24 +353,19 @@ def report(self, n): stderr = "Stderr: " host = self.pretty_host if self.failures: - print(' '.join((progress, tstamp, failure, host, error))) + msg = ' '.join((progress, tstamp, failure, host, error)) else: - print(' '.join((progress, tstamp, success, host))) + msg = ' '.join((progress, tstamp, success, host)) + self.errstream.write(msg.encode(DEFAULT_ENCODING) + b'\n') + self.errstream.flush() # NOTE: The extra flushes are to ensure that the data is output in # the correct order with the C implementation of io. if self.outputbuffer: - sys.stdout.flush() - try: - sys.stdout.buffer.write(self.outputbuffer) - sys.stdout.flush() - except AttributeError: - sys.stdout.write(self.outputbuffer) + self.outstream.flush() + self.outstream.write(self.outputbuffer) + self.outstream.flush() if self.errorbuffer: - sys.stdout.write(stderr) + self.outstream.write(stderr) # Flush the TextIOWrapper before writing to the binary buffer. - sys.stdout.flush() - try: - sys.stdout.buffer.write(self.errorbuffer) - except AttributeError: - sys.stdout.write(self.errorbuffer) - + self.outstream.flush() + self.outstream.write(self.errorbuffer) diff --git a/setup.py b/setup.py index 126e755..f9e386b 100644 --- a/setup.py +++ b/setup.py @@ -1,8 +1,21 @@ -from distutils.core import setup +#!/usr/bin/env python +# coding: utf8 + +try: + from setuptools import setup +except ImportError: + from distutils.core import setup + import os from psshlib import version -long_description = """PSSH (Parallel SSH) provides parallel versions of OpenSSH and related tools, including pssh, pscp, prsync, pnuke, and pslurp. The project includes psshlib which can be used within custom applications.""" +long_description = ( + "PSSH (Parallel SSH) provides parallel versions of OpenSSH and" + " related tools, including pssh, pscp, prsync, pnuke, and pslurp." + " The project includes psshlib which can be used within custom" + " applications." +) + setup( name = "pssh",