From 6ea518e10a7f28acdedff278dd5a93f6ffa4ec7a Mon Sep 17 00:00:00 2001 From: HoverHell Date: Thu, 17 Apr 2014 14:02:03 +0400 Subject: [PATCH 01/11] setup.py minor improvements --- setup.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 126e755..f9e386b 100644 --- a/setup.py +++ b/setup.py @@ -1,8 +1,21 @@ -from distutils.core import setup +#!/usr/bin/env python +# coding: utf8 + +try: + from setuptools import setup +except ImportError: + from distutils.core import setup + import os from psshlib import version -long_description = """PSSH (Parallel SSH) provides parallel versions of OpenSSH and related tools, including pssh, pscp, prsync, pnuke, and pslurp. The project includes psshlib which can be used within custom applications.""" +long_description = ( + "PSSH (Parallel SSH) provides parallel versions of OpenSSH and" + " related tools, including pssh, pscp, prsync, pnuke, and pslurp." + " The project includes psshlib which can be used within custom" + " applications." +) + setup( name = "pssh", From d262c87ef9d71e6d27dbfacf5764a54304520e14 Mon Sep 17 00:00:00 2001 From: HoverHell Date: Thu, 17 Apr 2014 14:02:39 +0400 Subject: [PATCH 02/11] annotate each line in the output --- .gitignore | 1 + psshlib/task.py | 40 +++++++++++++++++++++++++++++++++++++--- 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/.gitignore b/.gitignore index 0d20b64..7fdea58 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ *.pyc +*.egg-info diff --git a/psshlib/task.py b/psshlib/task.py index f805256..b6e221f 100644 --- a/psshlib/task.py +++ b/psshlib/task.py @@ -12,6 +12,7 @@ from psshlib import color BUFFER_SIZE = 1 << 16 +ANNOTATE_LINES = True try: bytes @@ -192,9 +193,12 @@ def handle_stdout(self, fd, iomap): if self.outfile: self.writer.write(self.outfile, buf) if self.print_out: - sys.stdout.write('%s: %s' % (self.host, buf)) - if buf[-1] != '\n': - sys.stdout.write('\n') + if ANNOTATE_LINES: + self.print_annotated_lines(buf) + else: + sys.stdout.write('%s: %s' % (self.host, buf)) + if buf[-1] != '\n': + sys.stdout.write('\n') else: self.close_stdout(iomap) except (OSError, IOError): @@ -221,6 +225,8 @@ def handle_stderr(self, fd, iomap): self.errorbuffer += buf if self.errfile: self.writer.write(self.errfile, buf) + if self.print_out: + self.print_annotated_lines(buf, atype='err') else: self.close_stderr(iomap) except (OSError, IOError): @@ -238,6 +244,34 @@ def close_stderr(self, iomap): self.writer.close(self.errfile) self.errfile = None + def print_annotated_lines(self, buf, atype='out'): + lines_are_unfinished = (buf[-1] != '\n') + lines = buf.splitlines() ## .split('\n') + outs = [] + formats = { + 'err': '%(host)s ' + color.B(color.r('=>')) + ' %(line)s', + 'out': '%(host)s ' + color.b(color.g('->')) + ' %(line)s', + '': '%(host)s ' + color.B(color.b('->')) + ' %(line)s', + #'eco': '%(host)s =: %(line)s', # exit code OK + #'ece': '%(host)s =: %(line)s', # exit code error + } + sformat = formats.get(atype) or formats[''] + for i, line in enumerate(lines): + outline = sformat % dict(line=line, host=self.host) + if i == len(lines) - 1: ## last line + if lines_are_unfinished: + ## Sort-of-disambiguate + outline = outline + color.b(color.y('\\')) + outs.append(outline) + out = '\n'.join(outs) + '\n' + if atype == 'err': + outbuf = sys.stderr + else: + outbuf = sys.stdout + outbuf.write(out) + outbuf.flush() + return out + def log_exception(self, e): """Saves a record of the most recent exception for error reporting.""" if self.verbose: From 71cd4ccff33b38bd44dc32eda413c2d097db3d2e Mon Sep 17 00:00:00 2001 From: HoverHell Date: Sat, 19 Apr 2014 10:30:01 +0400 Subject: [PATCH 03/11] put 'annotate_lines' in options as enabled by default --- bin/pssh | 3 +++ psshlib/task.py | 11 ++++++++--- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/bin/pssh b/bin/pssh index 5b6c2a5..5060205 100755 --- a/bin/pssh +++ b/bin/pssh @@ -42,6 +42,9 @@ def option_parser(): help='read from standard input and send as input to ssh') parser.add_option('-P', '--print', dest='print_out', action='store_true', help='print output as we get it') + parser.add_option('--no-annotate-each-line', + dest='no_annotate_lines', action='store_true', + help="Don't prefix each line of the `-P`rinted output with the host info") return parser diff --git a/psshlib/task.py b/psshlib/task.py index b6e221f..585875b 100644 --- a/psshlib/task.py +++ b/psshlib/task.py @@ -12,7 +12,7 @@ from psshlib import color BUFFER_SIZE = 1 << 16 -ANNOTATE_LINES = True + try: bytes @@ -65,6 +65,10 @@ def __init__(self, host, port, user, cmd, opts, stdin=None): self.inline = bool(opts.inline) except AttributeError: self.inline = False + try: + self.annotate_lines = not bool(opts.no_annotate_lines) + except AttributeError: + self.annotate_lines = True try: self.inline_stdout = bool(opts.inline_stdout) except AttributeError: @@ -193,7 +197,7 @@ def handle_stdout(self, fd, iomap): if self.outfile: self.writer.write(self.outfile, buf) if self.print_out: - if ANNOTATE_LINES: + if self.annotate_lines: self.print_annotated_lines(buf) else: sys.stdout.write('%s: %s' % (self.host, buf)) @@ -226,7 +230,8 @@ def handle_stderr(self, fd, iomap): if self.errfile: self.writer.write(self.errfile, buf) if self.print_out: - self.print_annotated_lines(buf, atype='err') + if self.annotate_lines: + self.print_annotated_lines(buf, atype='err') else: self.close_stderr(iomap) except (OSError, IOError): From d87a0d199a9ac3d6075b13eb9caeedab925f84bf Mon Sep 17 00:00:00 2001 From: HoverHell Date: Sun, 21 Jun 2015 20:14:52 +0300 Subject: [PATCH 04/11] annotated output: line-buffered --- bin/pssh | 3 +++ psshlib/task.py | 35 ++++++++++++++++++++++++++++------- 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/bin/pssh b/bin/pssh index 5060205..4f1cfc1 100755 --- a/bin/pssh +++ b/bin/pssh @@ -45,6 +45,9 @@ def option_parser(): parser.add_option('--no-annotate-each-line', dest='no_annotate_lines', action='store_true', help="Don't prefix each line of the `-P`rinted output with the host info") + parser.add_option('--no-buffer-each-line', + dest='no_buffer_lines', action='store_true', + help="Don't use line-buffering i.e. allow breaking the lines") return parser diff --git a/psshlib/task.py b/psshlib/task.py index 585875b..5e6dcfa 100644 --- a/psshlib/task.py +++ b/psshlib/task.py @@ -47,6 +47,7 @@ def __init__(self, host, port, user, cmd, opts, stdin=None): self.inputbuffer = stdin self.byteswritten = 0 self.outputbuffer = bytes() + self.fd_to_buffer = {} self.errorbuffer = bytes() self.stdin = None @@ -69,6 +70,10 @@ def __init__(self, host, port, user, cmd, opts, stdin=None): self.annotate_lines = not bool(opts.no_annotate_lines) except AttributeError: self.annotate_lines = True + try: + self.buffer_lines = not bool(opts.no_buffer_lines) + except AttributeError: + self.buffer_lines = True try: self.inline_stdout = bool(opts.inline_stdout) except AttributeError: @@ -199,6 +204,7 @@ def handle_stdout(self, fd, iomap): if self.print_out: if self.annotate_lines: self.print_annotated_lines(buf) + self.print_annotated_lines(buf, fd=fd) else: sys.stdout.write('%s: %s' % (self.host, buf)) if buf[-1] != '\n': @@ -249,23 +255,38 @@ def close_stderr(self, iomap): self.writer.close(self.errfile) self.errfile = None - def print_annotated_lines(self, buf, atype='out'): + def print_annotated_lines(self, buf, atype='out', fd=None): lines_are_unfinished = (buf[-1] != '\n') - lines = buf.splitlines() ## .split('\n') + if fd is not None: + buf_pre = self.fd_to_buffer.get(fd, bytes()) + self.fd_to_buffer[fd] = bytes() + if buf_pre: + buf = buf_pre + buf + + lines = buf.splitlines() # .split('\n') + + if lines_are_unfinished and self.buffer_lines: + self.fd_to_buffer[fd] = lines[-1] + lines = lines[:-1] + lines_are_unfinished = False + + if not lines: + return '' + outs = [] formats = { 'err': '%(host)s ' + color.B(color.r('=>')) + ' %(line)s', 'out': '%(host)s ' + color.b(color.g('->')) + ' %(line)s', '': '%(host)s ' + color.B(color.b('->')) + ' %(line)s', - #'eco': '%(host)s =: %(line)s', # exit code OK - #'ece': '%(host)s =: %(line)s', # exit code error + # 'eco': '%(host)s =: %(line)s', # exit code OK + # 'ece': '%(host)s =: %(line)s', # exit code error } sformat = formats.get(atype) or formats[''] for i, line in enumerate(lines): outline = sformat % dict(line=line, host=self.host) - if i == len(lines) - 1: ## last line + if i == len(lines) - 1: # last line if lines_are_unfinished: - ## Sort-of-disambiguate + # Sort-of-disambiguate outline = outline + color.b(color.y('\\')) outs.append(outline) out = '\n'.join(outs) + '\n' @@ -282,7 +303,7 @@ def log_exception(self, e): if self.verbose: exc_type, exc_value, exc_traceback = sys.exc_info() exc = ("Exception: %s, %s, %s" % - (exc_type, exc_value, traceback.format_tb(exc_traceback))) + (exc_type, exc_value, traceback.format_tb(exc_traceback))) else: exc = str(e) self.failures.append(exc) From fcc256b272479931319a8277ed1fc3b3d565df92 Mon Sep 17 00:00:00 2001 From: HoverHell Date: Tue, 23 Jun 2015 11:10:07 +0300 Subject: [PATCH 05/11] Fix of the line-buffering --- psshlib/task.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/psshlib/task.py b/psshlib/task.py index 5e6dcfa..a78bf89 100644 --- a/psshlib/task.py +++ b/psshlib/task.py @@ -203,7 +203,6 @@ def handle_stdout(self, fd, iomap): self.writer.write(self.outfile, buf) if self.print_out: if self.annotate_lines: - self.print_annotated_lines(buf) self.print_annotated_lines(buf, fd=fd) else: sys.stdout.write('%s: %s' % (self.host, buf)) @@ -237,7 +236,7 @@ def handle_stderr(self, fd, iomap): self.writer.write(self.errfile, buf) if self.print_out: if self.annotate_lines: - self.print_annotated_lines(buf, atype='err') + self.print_annotated_lines(buf, fd=fd, atype='err') else: self.close_stderr(iomap) except (OSError, IOError): @@ -265,7 +264,7 @@ def print_annotated_lines(self, buf, atype='out', fd=None): lines = buf.splitlines() # .split('\n') - if lines_are_unfinished and self.buffer_lines: + if fd is not None and lines_are_unfinished and self.buffer_lines: self.fd_to_buffer[fd] = lines[-1] lines = lines[:-1] lines_are_unfinished = False From 78b34cf8d55deacbc6f301e4fa3beb55d14a345b Mon Sep 17 00:00:00 2001 From: HoverHell Date: Mon, 21 Sep 2015 17:04:27 +0300 Subject: [PATCH 06/11] Fix: another fix of the line-buffering: flush the buffer at the end --- psshlib/task.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/psshlib/task.py b/psshlib/task.py index a78bf89..34d26c1 100644 --- a/psshlib/task.py +++ b/psshlib/task.py @@ -209,6 +209,9 @@ def handle_stdout(self, fd, iomap): if buf[-1] != '\n': sys.stdout.write('\n') else: + if self.annotate_lines: + # Flush the remaining buffer + self.print_annotated_lines('', fd=fd, force_finish=True) self.close_stdout(iomap) except (OSError, IOError): _, e, _ = sys.exc_info() @@ -238,6 +241,9 @@ def handle_stderr(self, fd, iomap): if self.annotate_lines: self.print_annotated_lines(buf, fd=fd, atype='err') else: + if self.annotate_lines: + # Flush the remaining buffer + self.print_annotated_lines('', fd=fd, atype='err', force_finish=True) self.close_stderr(iomap) except (OSError, IOError): _, e, _ = sys.exc_info() @@ -254,18 +260,20 @@ def close_stderr(self, iomap): self.writer.close(self.errfile) self.errfile = None - def print_annotated_lines(self, buf, atype='out', fd=None): - lines_are_unfinished = (buf[-1] != '\n') + def print_annotated_lines(self, buf, atype='out', fd=None, force_finish=False): if fd is not None: - buf_pre = self.fd_to_buffer.get(fd, bytes()) - self.fd_to_buffer[fd] = bytes() + buf_pre = self.fd_to_buffer.get((fd, atype), bytes()) + self.fd_to_buffer[(fd, atype)] = bytes() if buf_pre: buf = buf_pre + buf + lines_are_unfinished = buf and (buf[-1] != '\n') lines = buf.splitlines() # .split('\n') - if fd is not None and lines_are_unfinished and self.buffer_lines: - self.fd_to_buffer[fd] = lines[-1] + # Quite a few conditions for putting stuff into the buffer + if (fd is not None and lines_are_unfinished and + self.buffer_lines and not force_finish): + self.fd_to_buffer[(fd, atype)] = lines[-1] lines = lines[:-1] lines_are_unfinished = False From 36a7131ae850fba79a761beb1d1d8292fa8aecc5 Mon Sep 17 00:00:00 2001 From: HoverHell Date: Tue, 6 Oct 2015 15:10:00 +0300 Subject: [PATCH 07/11] TASK: write the success/failure reporting to stderr --- psshlib/task.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/psshlib/task.py b/psshlib/task.py index 34d26c1..6a2c93d 100644 --- a/psshlib/task.py +++ b/psshlib/task.py @@ -318,7 +318,7 @@ def log_exception(self, e): def report(self, n): """Pretty prints a status report after the Task completes.""" error = ', '.join(self.failures) - tstamp = time.asctime().split()[3] # Current time + tstamp = time.asctime().split()[3] # Current time if color.has_colors(sys.stdout): progress = color.c("[%s]" % color.B(n)) success = color.g("[%s]" % color.B("SUCCESS")) @@ -332,9 +332,11 @@ def report(self, n): stderr = "Stderr: " host = self.pretty_host if self.failures: - print(' '.join((progress, tstamp, failure, host, error))) + msg = ' '.join((progress, tstamp, failure, host, error)) else: - print(' '.join((progress, tstamp, success, host))) + msg = ' '.join((progress, tstamp, success, host)) + sys.stderr.write(msg + '\n') + sys.stderr.flush() # NOTE: The extra flushes are to ensure that the data is output in # the correct order with the C implementation of io. if self.outputbuffer: From 1b0d011dff8d6c018d8c265e8ba4ed4a88f42903 Mon Sep 17 00:00:00 2001 From: HoverHell Date: Mon, 30 Jul 2018 14:29:02 +0300 Subject: [PATCH 08/11] py3 support --- psshlib/manager.py | 2 + psshlib/task.py | 142 ++++++++++++++++++++++++--------------------- 2 files changed, 78 insertions(+), 66 deletions(-) diff --git a/psshlib/manager.py b/psshlib/manager.py index 7dbf4e3..db35e76 100644 --- a/psshlib/manager.py +++ b/psshlib/manager.py @@ -2,6 +2,7 @@ from errno import EINTR import os +import fcntl import select import signal import sys @@ -209,6 +210,7 @@ def __init__(self): # Setup the wakeup file descriptor to avoid hanging on lost signals. wakeup_readfd, wakeup_writefd = os.pipe() + fcntl.fcntl(wakeup_writefd, fcntl.F_SETFL, os.O_NONBLOCK) self.register_read(wakeup_readfd, self.wakeup_handler) # TODO: remove test when we stop supporting Python <2.5 if hasattr(signal, 'set_wakeup_fd'): diff --git a/psshlib/task.py b/psshlib/task.py index 6a2c93d..50fc3ab 100644 --- a/psshlib/task.py +++ b/psshlib/task.py @@ -20,16 +20,31 @@ bytes = str +DEFAULT_ENCODING = 'utf-8' +OUTPUT_FORMATS = { + 'err': '%(host)s ' + color.B(color.r('=>')) + ' %(line)s', + 'out': '%(host)s ' + color.b(color.g('->')) + ' %(line)s', + '': '%(host)s ' + color.B(color.b('->')) + ' %(line)s', + # 'eco': '%(host)s =: %(line)s', # exit code OK + # 'ece': '%(host)s =: %(line)s', # exit code error +} +OUTPUT_FORMATS = {key: val.encode(DEFAULT_ENCODING) for key, val in OUTPUT_FORMATS.items()} +UNTERMINATED_LINE_MARK = color.b(color.y('\\')).encode(DEFAULT_ENCODING) + + class Task(object): - """Starts a process and manages its input and output. + ''' + Starts a process and manages its input and output. Upon completion, the `exitstatus` attribute is set to the exit status of the process. - """ + ''' + def __init__(self, host, port, user, cmd, opts, stdin=None): self.exitstatus = None self.host = host + self.host_b = host.encode(DEFAULT_ENCODING) self.pretty_host = host self.port = port self.cmd = cmd @@ -55,6 +70,12 @@ def __init__(self, host, port, user, cmd, opts, stdin=None): self.stderr = None self.outfile = None self.errfile = None + try: + self.outstream = sys.stdout.buffer + self.errstream = sys.stderr.buffer + except AttributeError: # PY2 + self.outstream = sys.stdout + self.errstream = sys.stderr # Set options. self.verbose = opts.verbose @@ -80,7 +101,7 @@ def __init__(self, host, port, user, cmd, opts, stdin=None): self.inline_stdout = False def start(self, nodenum, iomap, writer, askpass_socket=None): - """Starts the process and registers files with the IOMap.""" + ''' Starts the process and registers files with the IOMap. ''' self.writer = writer if writer: @@ -119,7 +140,7 @@ def start(self, nodenum, iomap, writer, askpass_socket=None): iomap.register_read(self.stderr.fileno(), self.handle_stderr) def _kill(self): - """Signals the process to terminate.""" + ''' Signals the process to terminate. ''' if self.proc: try: os.kill(-self.proc.pid, signal.SIGKILL) @@ -129,27 +150,27 @@ def _kill(self): self.killed = True def timedout(self): - """Kills the process and registers a timeout error.""" + ''' Kills the process and registers a timeout error. ''' if not self.killed: self._kill() self.failures.append('Timed out') def interrupted(self): - """Kills the process and registers an keyboard interrupt error.""" + ''' Kills the process and registers an keyboard interrupt error. ''' if not self.killed: self._kill() self.failures.append('Interrupted') def cancel(self): - """Stops a task that has not started.""" + ''' Stops a task that has not started. ''' self.failures.append('Cancelled') def elapsed(self): - """Finds the time in seconds since the process was started.""" + ''' Finds the time in seconds since the process was started. ''' return time.time() - self.timestamp def running(self): - """Finds if the process has terminated and saves the return code.""" + ''' Finds if the process has terminated and saves the return code. ''' if self.stdin or self.stdout or self.stderr: return True if self.proc: @@ -172,7 +193,7 @@ def running(self): return False def handle_stdin(self, fd, iomap): - """Called when the process's standard input is ready for writing.""" + ''' Called when the process's standard input is ready for writing. ''' try: start = self.byteswritten if start < len(self.inputbuffer): @@ -180,11 +201,11 @@ def handle_stdin(self, fd, iomap): self.byteswritten = start + os.write(fd, chunk) else: self.close_stdin(iomap) - except (OSError, IOError): - _, e, _ = sys.exc_info() - if e.errno != EINTR: + except (OSError, IOError) as exc: + ei = sys.exc_info() + if exc.errno != EINTR: self.close_stdin(iomap) - self.log_exception(e) + self.log_exception(exc, ei=ei) def close_stdin(self, iomap): if self.stdin: @@ -193,7 +214,7 @@ def close_stdin(self, iomap): self.stdin = None def handle_stdout(self, fd, iomap): - """Called when the process's standard output is ready for reading.""" + ''' Called when the process's standard output is ready for reading. ''' try: buf = os.read(fd, BUFFER_SIZE) if buf: @@ -205,19 +226,19 @@ def handle_stdout(self, fd, iomap): if self.annotate_lines: self.print_annotated_lines(buf, fd=fd) else: - sys.stdout.write('%s: %s' % (self.host, buf)) + self.outstream.write(b'%s: %s' % (self.host, buf)) if buf[-1] != '\n': - sys.stdout.write('\n') + self.outstream.write(b'\n') else: if self.annotate_lines: # Flush the remaining buffer - self.print_annotated_lines('', fd=fd, force_finish=True) + self.print_annotated_lines(b'', fd=fd, force_finish=True) self.close_stdout(iomap) - except (OSError, IOError): - _, e, _ = sys.exc_info() - if e.errno != EINTR: + except (OSError, IOError) as exc: + ei = sys.exc_info() + if exc.errno != EINTR: self.close_stdout(iomap) - self.log_exception(e) + self.log_exception(exc) def close_stdout(self, iomap): if self.stdout: @@ -229,7 +250,7 @@ def close_stdout(self, iomap): self.outfile = None def handle_stderr(self, fd, iomap): - """Called when the process's standard error is ready for reading.""" + ''' Called when the process's standard error is ready for reading. ''' try: buf = os.read(fd, BUFFER_SIZE) if buf: @@ -245,11 +266,11 @@ def handle_stderr(self, fd, iomap): # Flush the remaining buffer self.print_annotated_lines('', fd=fd, atype='err', force_finish=True) self.close_stderr(iomap) - except (OSError, IOError): - _, e, _ = sys.exc_info() - if e.errno != EINTR: - self.close_stderr(iomap) - self.log_exception(e) + except (OSError, IOError) as exc: + ei = sys.exc_info() + if exc.errno != EINTR: + self.close_stdin(iomap) + self.log_exception(exc, ei=ei) def close_stderr(self, iomap): if self.stderr: @@ -267,7 +288,7 @@ def print_annotated_lines(self, buf, atype='out', fd=None, force_finish=False): if buf_pre: buf = buf_pre + buf - lines_are_unfinished = buf and (buf[-1] != '\n') + lines_are_unfinished = buf and (buf[-1:] != b'\n') lines = buf.splitlines() # .split('\n') # Quite a few conditions for putting stuff into the buffer @@ -281,45 +302,41 @@ def print_annotated_lines(self, buf, atype='out', fd=None, force_finish=False): return '' outs = [] - formats = { - 'err': '%(host)s ' + color.B(color.r('=>')) + ' %(line)s', - 'out': '%(host)s ' + color.b(color.g('->')) + ' %(line)s', - '': '%(host)s ' + color.B(color.b('->')) + ' %(line)s', - # 'eco': '%(host)s =: %(line)s', # exit code OK - # 'ece': '%(host)s =: %(line)s', # exit code error - } - sformat = formats.get(atype) or formats[''] - for i, line in enumerate(lines): - outline = sformat % dict(line=line, host=self.host) - if i == len(lines) - 1: # last line + sformat = OUTPUT_FORMATS.get(atype) or OUTPUT_FORMATS[''] + for idx, line in enumerate(lines): + outline = sformat % {b'line': line, b'host': self.host_b} + if idx == len(lines) - 1: # last line if lines_are_unfinished: # Sort-of-disambiguate - outline = outline + color.b(color.y('\\')) + outline = outline + UNTERMINATED_LINE_MARK outs.append(outline) - out = '\n'.join(outs) + '\n' + out = b'\n'.join(outs) + b'\n' if atype == 'err': - outbuf = sys.stderr + outbuf = self.errstream else: - outbuf = sys.stdout + outbuf = self.outstream outbuf.write(out) outbuf.flush() return out - def log_exception(self, e): - """Saves a record of the most recent exception for error reporting.""" + def log_exception(self, exc, ei=None): + ''' Saves a record of the most recent exception for error reporting. ''' if self.verbose: - exc_type, exc_value, exc_traceback = sys.exc_info() - exc = ("Exception: %s, %s, %s" % - (exc_type, exc_value, traceback.format_tb(exc_traceback))) + if ei is None: + ei = sys.exc_info() + exc_type, exc_value, exc_traceback = ei + exc = ( + 'Exception: %s, %s, %s' % + (exc_type, exc_value, traceback.format_tb(exc_traceback))) else: - exc = str(e) + exc = str(exc) self.failures.append(exc) def report(self, n): """Pretty prints a status report after the Task completes.""" error = ', '.join(self.failures) tstamp = time.asctime().split()[3] # Current time - if color.has_colors(sys.stdout): + if color.has_colors(self.outstream): progress = color.c("[%s]" % color.B(n)) success = color.g("[%s]" % color.B("SUCCESS")) failure = color.r("[%s]" % color.B("FAILURE")) @@ -335,23 +352,16 @@ def report(self, n): msg = ' '.join((progress, tstamp, failure, host, error)) else: msg = ' '.join((progress, tstamp, success, host)) - sys.stderr.write(msg + '\n') - sys.stderr.flush() + self.errstream.write(msg.encode(DEFAULT_ENCODING) + b'\n') + self.errstream.flush() # NOTE: The extra flushes are to ensure that the data is output in # the correct order with the C implementation of io. if self.outputbuffer: - sys.stdout.flush() - try: - sys.stdout.buffer.write(self.outputbuffer) - sys.stdout.flush() - except AttributeError: - sys.stdout.write(self.outputbuffer) + self.outstream.flush() + self.outstream.write(self.outputbuffer) + self.outstream.flush() if self.errorbuffer: - sys.stdout.write(stderr) + self.outstream.write(stderr) # Flush the TextIOWrapper before writing to the binary buffer. - sys.stdout.flush() - try: - sys.stdout.buffer.write(self.errorbuffer) - except AttributeError: - sys.stdout.write(self.errorbuffer) - + self.outstream.flush() + self.outstream.write(self.errorbuffer) From e89edd90cb2454ac153f2aeafaaaa83648575f30 Mon Sep 17 00:00:00 2001 From: HoverHell Date: Sun, 24 Mar 2019 18:37:18 +0300 Subject: [PATCH 09/11] readme format fix --- README.md | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 38fadbb..86d25ed 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,14 @@ -#Parallel SSH +Parallel SSH +============ + This is a fork of the wonderful project at http://code.google.com/p/parallel-ssh/ A goal of this fork is to keep up to date with the original repository. Ideally we would set up a trigger so updates from the main project will populate to this repo automatically. Another goal is to submit patches to the original project so we can contribute back. -###Changes -* Adding `padb`. This is a parallel implementation of the Android Device Bridge command to allow for easier management of multiple Android devices via USB or adb over tcp. + +Changes +======= + + * Adding `padb`. This is a parallel implementation of the Android Device Bridge command to allow for easier management of multiple Android devices via USB or adb over tcp. From f667845ff32f0767e48a0452b250ab9c609e797a Mon Sep 17 00:00:00 2001 From: hoverhell Date: Wed, 4 Sep 2019 15:15:32 +0300 Subject: [PATCH 10/11] Option to unpack hostnames into multiple addresses (minimal version) --- bin/pssh | 2 +- psshlib/cli.py | 2 ++ psshlib/psshutil.py | 31 ++++++++++++++++++++++++++++--- 3 files changed, 31 insertions(+), 4 deletions(-) diff --git a/bin/pssh b/bin/pssh index 4f1cfc1..5315926 100755 --- a/bin/pssh +++ b/bin/pssh @@ -120,5 +120,5 @@ if __name__ == "__main__": sys.exit(1) if opts.host_strings: for s in opts.host_strings: - hosts.extend(psshutil.parse_host_string(s, default_user=opts.user)) + hosts.extend(psshutil.parse_host_string(s, default_user=opts.user, all_addresses=opts.all_addresses)) do_pssh(hosts, cmdline, opts) diff --git a/psshlib/cli.py b/psshlib/cli.py index c342cde..88babd0 100644 --- a/psshlib/cli.py +++ b/psshlib/cli.py @@ -31,6 +31,8 @@ def common_parser(): parser.add_option('-H', '--host', dest='host_strings', action='append', metavar='HOST_STRING', help='additional host entries ("[user@]host[:port]")') + parser.add_option('-a', '--all-addresses', dest='all_addresses', action='store_true', + help='Unpack all addresses in the specified hostnames as hosts') parser.add_option('-l', '--user', dest='user', help='username (OPTIONAL)') parser.add_option('-p', '--par', dest='par', type='int', diff --git a/psshlib/psshutil.py b/psshlib/psshutil.py index 6c67f66..0f1d8ef 100644 --- a/psshlib/psshutil.py +++ b/psshlib/psshutil.py @@ -2,6 +2,7 @@ # Copyright (c) 2003-2008, Brent N. Chun import fcntl +import socket import string import sys @@ -72,8 +73,9 @@ def parse_host_entry(line, default_user, default_port): return host, port, user -def parse_host_string(host_string, default_user=None, default_port=None): - """Parses a whitespace-delimited string of "[user@]host[:port]" entries. +def parse_host_string(host_string, default_user=None, default_port=None, all_addresses=False): + """ + Parses a whitespace-delimited string of "[user@]host[:port]" entries. Returns a list of (host, port, user) triples. """ @@ -81,6 +83,12 @@ def parse_host_string(host_string, default_user=None, default_port=None): entries = host_string.split() for entry in entries: hosts.append(parse_host(entry, default_user, default_port)) + if all_addresses: + hosts_full = [] + for host, port, user in hosts: + for host_address in get_host_addresses(host, port=port): + hosts_full.append((host_address, port, user)) + hosts = hosts_full return hosts @@ -99,8 +107,25 @@ def parse_host(host, default_user=None, default_port=None): return (host, port, user) +def get_host_addresses(host, port=0, **kwargs): + results = socket.getaddrinfo( + host, + port, + # family= + socket.AF_UNSPEC, + # type= + socket.SOCK_STREAM, + # proto= + socket.IPPROTO_TCP, + # flags= + socket.AI_ADDRCONFIG, + ) + return [item[4][0] for item in results] + + def set_cloexec(filelike): - """Sets the underlying filedescriptor to automatically close on exec. + """ + Sets the underlying filedescriptor to automatically close on exec. If set_cloexec is called for all open files, then subprocess.Popen does not require the close_fds option. From c65fe78e42ebed7dc716897f34d93a0baecf6dab Mon Sep 17 00:00:00 2001 From: hoverhell Date: Tue, 3 Dec 2019 13:36:05 +0300 Subject: [PATCH 11/11] no-annotate-each-line: use the default line-buffering logic for it, highly useful e.g. for ndjson output --- psshlib/task.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/psshlib/task.py b/psshlib/task.py index 50fc3ab..aef6cee 100644 --- a/psshlib/task.py +++ b/psshlib/task.py @@ -223,10 +223,11 @@ def handle_stdout(self, fd, iomap): if self.outfile: self.writer.write(self.outfile, buf) if self.print_out: - if self.annotate_lines: + if True: self.print_annotated_lines(buf, fd=fd) - else: - self.outstream.write(b'%s: %s' % (self.host, buf)) + else: # This is nearly uselss because it intermixes the hosts' data. + # self.outstream.write(b'%s: %s' % (self.host, buf)) + self.outstream.write(buf) if buf[-1] != '\n': self.outstream.write(b'\n') else: @@ -304,7 +305,10 @@ def print_annotated_lines(self, buf, atype='out', fd=None, force_finish=False): outs = [] sformat = OUTPUT_FORMATS.get(atype) or OUTPUT_FORMATS[''] for idx, line in enumerate(lines): - outline = sformat % {b'line': line, b'host': self.host_b} + if self.annotate_lines: + outline = sformat % {b'line': line, b'host': self.host_b} + else: + outline = line if idx == len(lines) - 1: # last line if lines_are_unfinished: # Sort-of-disambiguate