Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 23 additions & 15 deletions gpMgmt/bin/gpcheckperf
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,17 @@ try:
from gppylib.util import ssh_utils
from gppylib.commands import unix
from gppylib.commands.base import *
from gppylib import gpsubprocess
except ImportError as e:
sys.exit('Error: unable to import module: ' + str(e))

if sys.version_info[0] == 3:
import io
StringIO = io.StringIO
else:
import StringIO
StringIO = BytesIO = StringIO.StringIO

GPHOME = os.getenv('GPHOME')
if GPHOME is None:
sys.exit('Please set GPHOME environment variable')
Expand Down Expand Up @@ -107,10 +115,10 @@ def gpssh(cmd, verbose):

if GV.opt['-v']:
print('[Info]', strcmd(c))
p = subprocess.Popen(c, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
p = gpsubprocess.Popen(c, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
out = p.stdout.read(-1)
rc = p.wait()
return not rc, out.decode('utf-8')
return not rc, out


def gpscp(src, dst):
Expand All @@ -128,8 +136,8 @@ def gpscp(src, dst):
c.append(dst)
if GV.opt['-v']:
print('[Info]', strcmd(c))
p = subprocess.Popen(c, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
out = p.stdout.read(-1).decode('utf-8')
p = gpsubprocess.Popen(c, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
out = p.stdout.read(-1)
rc = p.wait()
if rc:
sys.exit('[Error] command failed: gpscp {0} =:{1} with output: {2}'.format(src, dst, out))
Expand All @@ -139,7 +147,7 @@ def run_cmd(cmd, peer):
"""
To create subprocess on specified host with given command
"""
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
proc = gpsubprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
proc.x_cmd = cmd
proc.x_peer = peer
return proc
Expand All @@ -166,7 +174,7 @@ def gpsync(src, dst):

for p in proc:
for line in p.stdout.readlines():
print('[Out {0}] {1}' .format(p.x_peer, line.decode('utf-8')))
print('[Out {0}] {1}' .format(p.x_peer, line))
rc = p.wait()
if rc:
sys.exit('[Error] command failed for host:{0} cmd:{1} with status:{2} error: "{3}"'
Expand Down Expand Up @@ -352,8 +360,8 @@ def run(cmd):
try:
if GV.opt['-v']:
print('[Info]', cmd)
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
out = p.stdout.read(-1).decode('utf-8')
p = gpsubprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
out = p.stdout.read(-1)
ok = p.wait()
except KeyboardInterrupt:
raise
Expand Down Expand Up @@ -471,7 +479,7 @@ def copyExecOver(fname):

def parseMultiDDResult(out):
# parse output of "time -p"
out = io.StringIO(out)
out = StringIO(out)
result = {}
bytes = 0
for line in out:
Expand Down Expand Up @@ -547,7 +555,7 @@ def runStreamTest():
(ok, out) = gpssh(cmd, GV.opt['-V'])
if not ok:
sys.exit('[Error] command failed: %s with output: %s' % (cmd, out))
out = io.StringIO(out)
out = StringIO(out)
result = {}
for line in out:
i = line.find(']')
Expand Down Expand Up @@ -604,7 +612,7 @@ def spawnNetperfTestBetween(x, y, netperf_path, netserver_port, sec=5):
try:
if GV.opt['-v']:
print('[Info]', strcmd(c))
proc = subprocess.Popen(c, stdout=subprocess.PIPE)
proc = gpsubprocess.Popen(c, stdout=subprocess.PIPE)
except KeyboardInterrupt:
killProc(proc)
raise
Expand All @@ -621,7 +629,7 @@ def reapNetperfTest(proc, x, y):
if proc:
try:
rc = proc.wait()
out = proc.stdout.read(-1).decode('utf-8')
out = proc.stdout.read(-1)
ok = not killProc(proc)
proc = None
except KeyboardInterrupt as ki:
Expand All @@ -643,7 +651,7 @@ def reapNetperfTest(proc, x, y):
print('[Warning] netperf failed on %s -> %s' % (x, y))
return []

for line in io.StringIO(out):
for line in StringIO(out):
line = line.split()
if len(line) != 5:
continue
Expand Down Expand Up @@ -827,8 +835,8 @@ def get_host_map(hostlist):
proc = None
try:
if GV.opt['-v']: print('[Info]', strcmd(cmd))
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
out = proc.stdout.read(-1).decode('utf-8')
proc = gpsubprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
out = proc.stdout.read(-1)
rc = proc.wait()
if rc:
raise Exception('ssh error with the following command:\n%s with output: %s' % (cmd, out))
Expand Down
11 changes: 6 additions & 5 deletions gpMgmt/bin/gpload.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@

try:
from gppylib.gpversion import GpVersion
from gppylib import gpsubprocess
except ImportError:
sys.stderr.write("gpload can't import gpversion, will run in GPDB6 compatibility mode.\n")
withGpVersion = False
Expand Down Expand Up @@ -776,7 +777,7 @@ def run(self):
while 1:
# Windows select does not support select on non-file fd's, so we can use the lock fix. Deadlock is possible here.
# We need to look into the Python windows module to see if there is another way to do this in Windows.
line = self.fd.readline().decode('utf-8')
line = self.fd.readline()
if not line:
break
self.gpload.log(self.gpload.DEBUG, 'gpfdist: ' + line.strip('\n'))
Expand All @@ -789,7 +790,7 @@ def run(self):
)
if retList[0] == [self.fd]:
self.theLock.acquire()
line = self.fd.readline().decode('utf-8')
line = self.fd.readline()
self.theLock.release()
else:
continue
Expand Down Expand Up @@ -1710,7 +1711,7 @@ def start_gpfdists(self):
cmd += ' '.join(popenList)
needshell = True

a = subprocess.Popen(cmd, stdout=subprocess.PIPE,
a = gpsubprocess.Popen(cmd, stdout=subprocess.PIPE,
Comment thread
Stolb27 marked this conversation as resolved.
stderr=subprocess.PIPE,
close_fds=cfds, shell=needshell)
self.subprocesses.append(a)
Expand All @@ -1731,7 +1732,7 @@ def start_gpfdists(self):

while 1:
readLock.acquire()
line = a.stdout.readline().decode('utf-8')
line = a.stdout.readline()
readLock.release()
if not line:
self.log(self.ERROR,'failed to start gpfdist: ' +
Expand Down Expand Up @@ -3072,7 +3073,7 @@ def stop_gpfdists(self):
if platform.system() in ['Windows', 'Microsoft']:
# win32 API is better but hard for us
# to install, so we use the crude method
subprocess.Popen("taskkill /F /T /PID %i" % a.pid,
gpsubprocess.Popen("taskkill /F /T /PID %i" % a.pid,
shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)

Expand Down
5 changes: 3 additions & 2 deletions gpMgmt/bin/gpload_test/gpload/TEST.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
except:
import subprocess
from pygresql import pg
from gppylib import gpsubprocess

"""
Global Values
Expand Down Expand Up @@ -149,8 +150,8 @@ def run(cmd):
function, so you can theoretically pass any value that is
valid for the second parameter of open().
"""
p = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
out = p.communicate()[0].decode('utf-8')
p = gpsubprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
out = p.communicate()[0]
ret = []
ret.append(out)
rc = False if p.wait() else True
Expand Down
7 changes: 4 additions & 3 deletions gpMgmt/bin/gpload_test/gpload2/TEST.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
except:
import subprocess
from pygresql import pg
from gppylib import gpsubprocess

def get_port_from_conf():
file = os.environ.get('MASTER_DATA_DIRECTORY')+'/postgresql.conf'
Expand Down Expand Up @@ -305,8 +306,8 @@ def run(cmd):
function, so you can theoretically pass any value that is
valid for the second parameter of open().
"""
p = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
out = p.communicate()[0].decode('utf-8')
p = gpsubprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
out = p.communicate()[0]
ret = []
ret.append(out)
rc = False if p.wait() else True
Expand Down Expand Up @@ -382,7 +383,7 @@ def modify_sql_file(num):

def copy_data(source='',target=''):
cmd = 'cp '+ mkpath('data/' + source) + ' ' + mkpath(target)
p = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
p = gpsubprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
return p.communicate()

hostNameAddrs = get_ip(HOST)
Expand Down
5 changes: 3 additions & 2 deletions gpMgmt/bin/gpload_test/gpload2/TEST_REMOTE.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import subprocess32 as subprocess
except:
import subprocess
from gppylib import gpsubprocess
from shutil import copyfile
from pygresql import pg

Expand Down Expand Up @@ -208,8 +209,8 @@ def run(cmd):
function, so you can theoretically pass any value that is
valid for the second parameter of open().
"""
p = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
out = p.communicate()[0].decode('utf-8')
p = gpsubprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
out = p.communicate()[0]
ret = []
ret.append(out)
rc = False if p.wait() else True
Expand Down
9 changes: 4 additions & 5 deletions gpMgmt/bin/gpload_test/gpload2/TEST_local_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
string_types = basestring

# from gppylib.commands.gp import get_coordinatordatadir

from gppylib import gpsubprocess
try:
import subprocess32 as subprocess
except ImportError:
Expand Down Expand Up @@ -86,8 +86,8 @@ def run(cmd):
function, so you can theoretically pass any value that is
valid for the second parameter of open().
"""
p = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
out = p.communicate()[0].decode('utf-8')
p = gpsubprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
out = p.communicate()[0]
ret = []
ret.append(out)
rc = False if p.wait() else True
Expand Down Expand Up @@ -449,9 +449,8 @@ def modify_sql_file(num):

def copy_data(source='',target=''):
cmd = 'cp '+ mkpath('data/' + source) + ' ' + mkpath(target)
p = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
p = gpsubprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
_, err = p.communicate()
err = err.decode('utf-8')
if err:
sys.stderr.write(str(err))
sys.exit(2)
Expand Down
9 changes: 4 additions & 5 deletions gpMgmt/bin/gpmemreport
Original file line number Diff line number Diff line change
Expand Up @@ -378,23 +378,22 @@ def parsePSOutput(psLines):


def getNextSample(file, start, end):
line = file.readline().decode('utf-8')
line = file.readline()
if not line:
return None
while line == '\n':
line = file.readline().decode('utf-8')
line = file.readline()
print(line)
if not line.startswith('>>>'):
raise Exception("Processing file failed - did not find timestamp where expected")
timestamp = datetime.strptime(line, timestamp_string)

process_this_listing = (start is None or timestamp >= start) and (end is None or timestamp <= end)

header = file.readline().decode('utf-8').split()
header = file.readline().split()
n_cols = len(header)
rows = []
for line in file:
line = line.decode('utf-8')
if line == '\n':
break
if process_this_listing:
Expand Down Expand Up @@ -428,7 +427,7 @@ if __name__ == '__main__':
(options, args) = parseCmdLine()
start = datetime.strptime(options.start_time, datetime_format) if options.start_time else None
end = datetime.strptime(options.end_time, datetime_format) if options.end_time else None
f = gzip.open(args[0], 'r')
f = gzip.open(args[0], 'rt')
try:
sample = getNextSample(f, start, end)
while sample:
Expand Down
7 changes: 4 additions & 3 deletions gpMgmt/bin/gpmemwatcher
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import time
from optparse import OptionParser
from datetime import datetime
from gppylib.commands import gp
from gppylib import gpsubprocess

pidfile = 'reswatch.pid'
ps_file = 'ps.out.gz'
Expand Down Expand Up @@ -94,11 +95,11 @@ def cleanupLogFiles(signum, frame):
def runPSProcess(sleepInt=60):
ps_cmd = 'ps -ewwopid,ppid,rss,vsz,pmem,pcpu,time,etime,start_time,wchan,stat,psr,args '
try:
outfile = gzip.open(ps_file, 'w')
outfile = gzip.open(ps_file, 'wt')
open_files.append(outfile)
while True:
outfile.write(datetime.now().strftime("\n\n>>>%y:%m:%d:%H:%M:%S<<<\n").encode('utf-8'))
cmd = subprocess.Popen(ps_cmd, shell=True, stdout=subprocess.PIPE)
outfile.write(datetime.now().strftime("\n\n>>>%y:%m:%d:%H:%M:%S<<<\n"))
cmd = gpsubprocess.Popen(ps_cmd, shell=True, stdout=subprocess.PIPE)
outfile.writelines(cmd.stdout)
outfile.flush()
time.sleep(sleepInt)
Expand Down
6 changes: 3 additions & 3 deletions gpMgmt/bin/gppylib/commands/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,11 +460,11 @@ def execute(self, cmd, wait=True):
cmd.pid = self.proc.pid
if wait:
(rc, stdout_value, stderr_value) = self.proc.communicate2(input=self.stdin)
assert isinstance(stdout_value, bytes)
assert isinstance(stderr_value, bytes)
assert isinstance(stdout_value, str)
assert isinstance(stderr_value, str)
self.completed = True
cmd.set_results(CommandResult(
rc, stdout_value.decode('utf-8'), stderr_value.decode('utf-8'), self.completed, self.halt))
rc, stdout_value, stderr_value, self.completed, self.halt))

def cancel(self):
if self.proc:
Expand Down
3 changes: 2 additions & 1 deletion gpMgmt/bin/gppylib/commands/gp.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from .unix import *
from gppylib import pgconf
from gppylib.utils import writeLinesToFile, createFromSingleHostFile, shellEscape
from gppylib import gpsubprocess


logger = get_default_logger()
Expand Down Expand Up @@ -1679,7 +1680,7 @@ def list_addrs(hostname=None, include_loopback=False):
else:
args = cmd

result = subprocess.check_output(args).decode('utf-8')
result = gpsubprocess.check_output(args)
return result.split('START_CMD_OUTPUT\n')[1].splitlines()

if __name__ == '__main__':
Expand Down
Loading
Loading