From d6500f29aa11fdedad8c0f45e66efd8ad1d7f9e2 Mon Sep 17 00:00:00 2001 From: Douglas Raillard Date: Mon, 1 Jul 2024 10:18:26 +0100 Subject: [PATCH 1/2] external/devlib.manifest.yaml: Add fix_signals PR for dogfooding --- external/devlib.manifest.yaml | 15 +++++++++++---- external/subtrees.conf | 2 +- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/external/devlib.manifest.yaml b/external/devlib.manifest.yaml index ce1123083..249405c3b 100644 --- a/external/devlib.manifest.yaml +++ b/external/devlib.manifest.yaml @@ -7,15 +7,22 @@ rebase-conf: rr-cache: ./rr-cache base: - remote: github + remote: upstream ref: master topics: - - remote: github + remote: douglas base: master - tip: async + tip: fix_nest_asyncio + + - + remote: douglas + base: master + tip: fix_signals remotes: - github: + upstream: + url: https://github.com/ARM-Software/devlib.git + douglas: url: https://github.com/douglas-raillard-arm/devlib.git diff --git a/external/subtrees.conf b/external/subtrees.conf index 6bbe1587e..69006f922 100644 --- a/external/subtrees.conf +++ b/external/subtrees.conf @@ -5,7 +5,7 @@ path = external/devlib url = https://github.com/douglas-raillard-arm/devlib.git # Dogfooding on our PR -ref = fix_nest_asyncio +ref = lisa # # See external/devlib.manifest.yaml for instructions on how to build this -- GitLab From 156d3c06f6d67a4236ef2c9bb845750147caadd5 Mon Sep 17 00:00:00 2001 From: Douglas Raillard Date: Mon, 1 Jul 2024 10:20:03 +0100 Subject: [PATCH 2/2] Squashed 'external/devlib/' changes from ab193ed41..e1407127e e1407127e ftrace: Add write-to-disk mode 81674b046 connection: Support all signals in BackgroundCommand.send_signal() 1c1e860e9 utils/asyn: Ensure the async generators inside context managers are fully consumed on a single event loop 9cab4713c utils/asyn: Replace nest_asyncio with greenlet 7e0e7442c Revert "utils/asyn: Replace nest_asyncio with greenback" c9178d20a utils/asyn: Replace nest_asyncio with greenback 22df0fb7c utils/asyn: Factor out the calls to asyncio.run 02f030936 target: Allow reuse of a connection once the owning thread is terminated a5d2411e9 tests: Add tests for nested async support REVERT: ab193ed41 utils/asyn: Ensure the async generators inside context managers are fully consumed on a single event loop REVERT: b2c7bb9b0 utils/asyn: Replace nest_asyncio with greenlet REVERT: 5ff1d543a Revert "utils/asyn: Replace nest_asyncio with greenback" REVERT: 4cf7d1856 utils/asyn: Replace nest_asyncio with greenback REVERT: 03521493d utils/asyn: Factor out the calls to asyncio.run REVERT: 5aa850ec9 target: Allow reuse of a connection once the owning thread is terminated REVERT: a9a2d75c4 tests: Add tests for nested async support git-subtree-dir: external/devlib git-subtree-split: e1407127e6ce4485dbf26d03e5850758d7334971 --- devlib/bin/scripts/devlib-signal-target | 20 ++ devlib/collector/ftrace.py | 73 +++++-- devlib/connection.py | 147 ++++++++++---- devlib/host.py | 27 ++- devlib/target.py | 11 +- devlib/utils/android.py | 37 ++-- devlib/utils/ssh.py | 244 ++++++++++++------------ 7 files changed, 354 insertions(+), 205 deletions(-) create mode 100644 devlib/bin/scripts/devlib-signal-target diff --git a/devlib/bin/scripts/devlib-signal-target b/devlib/bin/scripts/devlib-signal-target new file mode 100644 index 000000000..26ac6fe84 --- /dev/null +++ b/devlib/bin/scripts/devlib-signal-target @@ -0,0 +1,20 @@ +( + # If there is no data dir, it means we are not running as a background + # command so we just do nothing + if [ -e "$_DEVLIB_BG_CMD_DATA_DIR" ]; then + pid_file="$_DEVLIB_BG_CMD_DATA_DIR/pid" + # Atomically check if the PID file already exist and make the write + # fail if it already does. This way we don't have any race condition + # with the Python API, as there is either no PID or the same PID for + # the duration of the command + set -o noclobber + if ! printf "%u\n" $$ > "$pid_file"; then + echo "$0 was already called for this command" >&2 + exit 1 + fi + fi +) || exit $? + +# Use exec so that the PID of the command we run is the same as the current $$ +# PID that we just registered +exec "$@" diff --git a/devlib/collector/ftrace.py b/devlib/collector/ftrace.py index d35615401..df8ec2162 100644 --- a/devlib/collector/ftrace.py +++ b/devlib/collector/ftrace.py @@ -21,6 +21,7 @@ import subprocess import sys import contextlib from shlex import quote +import signal from devlib.collector import (CollectorBase, CollectorOutput, CollectorOutputEntry) @@ -71,6 +72,7 @@ class FtraceCollector(CollectorBase): report_on_target=False, trace_clock='local', saved_cmdlines_nr=4096, + mode='write-to-memory', ): super(FtraceCollector, self).__init__(target) self.events = events if events is not None else DEFAULT_EVENTS @@ -98,6 +100,8 @@ class FtraceCollector(CollectorBase): self.trace_clock = trace_clock self.saved_cmdlines_nr = saved_cmdlines_nr self._reset_needed = True + self.mode = mode + self._bg_cmd = None # pylint: disable=bad-whitespace # Setup tracing paths @@ -276,18 +280,33 @@ class FtraceCollector(CollectorBase): with contextlib.suppress(TargetStableError): self.target.write_value('/proc/sys/kernel/kptr_restrict', 0) - self.target.execute( - '{} start -B devlib {buffer_size} {cmdlines_size} {clock} {events} {tracer} {functions}'.format( - self.target_binary, - events=self.event_string, - tracer=tracer_string, - functions=tracecmd_functions, - buffer_size='-b {}'.format(self.buffer_size) if self.buffer_size is not None else '', - clock='-C {}'.format(self.trace_clock) if self.trace_clock else '', - cmdlines_size='--cmdlines-size {}'.format(self.saved_cmdlines_nr) if self.saved_cmdlines_nr is not None else '', - ), - as_root=True, + params = '-B devlib {buffer_size} {cmdlines_size} {clock} {events} {tracer} {functions}'.format( + events=self.event_string, + tracer=tracer_string, + functions=tracecmd_functions, + buffer_size='-b {}'.format(self.buffer_size) if self.buffer_size is not None else '', + clock='-C {}'.format(self.trace_clock) if self.trace_clock else '', + cmdlines_size='--cmdlines-size {}'.format(self.saved_cmdlines_nr) if self.saved_cmdlines_nr is not None else '', ) + + mode = self.mode + if mode == 'write-to-disk': + bg_cmd = self.target.background( + # cd into the working_directory first to workaround this issue: + # https://lore.kernel.org/linux-trace-devel/20240119162743.1a107fa9@gandalf.local.home/ + f'cd {self.target.working_directory} && devlib-signal-target {self.target_binary} record -o {quote(self.target_output_file)} {params}', + as_root=True, + ) + assert self._bg_cmd is None + self._bg_cmd = bg_cmd.__enter__() + elif mode == 'write-to-memory': + self.target.execute( + f'{self.target_binary} start {params}', + as_root=True, + ) + else: + raise ValueError(f'Unknown mode {mode}') + if self.automark: self.mark_start() if 'cpufreq' in self.target.modules: @@ -322,8 +341,21 @@ class FtraceCollector(CollectorBase): self.stop_time = time.time() if self.automark: self.mark_stop() - self.target.execute('{} stop -B devlib'.format(self.target_binary), - timeout=TIMEOUT, as_root=True) + + mode = self.mode + if mode == 'write-to-disk': + bg_cmd = self._bg_cmd + self._bg_cmd = None + assert bg_cmd is not None + bg_cmd.send_signal(signal.SIGINT) + bg_cmd.communicate() + bg_cmd.__exit__(None, None, None) + elif mode == 'write-to-memory': + self.target.execute('{} stop -B devlib'.format(self.target_binary), + timeout=TIMEOUT, as_root=True) + else: + raise ValueError(f'Unknown mode {mode}') + self._reset_needed = True def set_output(self, output_path): @@ -334,9 +366,18 @@ class FtraceCollector(CollectorBase): def get_data(self): if self.output_path is None: raise RuntimeError("Output path was not set.") - self.target.execute('{0} extract -B devlib -o {1}; chmod 666 {1}'.format(self.target_binary, - self.target_output_file), - timeout=TIMEOUT, as_root=True) + + busybox = quote(self.target.busybox) + + mode = self.mode + if mode == 'write-to-disk': + # Interrupting trace-cmd record will make it create the file + pass + elif mode == 'write-to-memory': + cmd = f'{self.target_binary} extract -B devlib -o {self.target_output_file} && {busybox} chmod 666 {self.target_output_file}' + self.target.execute(cmd, timeout=TIMEOUT, as_root=True) + else: + raise ValueError(f'Unknown mode {mode}') # The size of trace.dat will depend on how long trace-cmd was running. # Therefore timout for the pull command must also be adjusted diff --git a/devlib/connection.py b/devlib/connection.py index 460997580..99055a3c1 100644 --- a/devlib/connection.py +++ b/devlib/connection.py @@ -17,6 +17,7 @@ from abc import ABC, abstractmethod from contextlib import contextmanager, nullcontext from shlex import quote import os +from pathlib import Path import signal import subprocess import threading @@ -25,14 +26,11 @@ import logging import select import fcntl -from devlib.utils.misc import InitCheckpoint +from devlib.utils.misc import InitCheckpoint, memoized _KILL_TIMEOUT = 3 -def _kill_pgid_cmd(pgid, sig, busybox): - return '{} kill -{} -{}'.format(busybox, sig.value, pgid) - def _popen_communicate(bg, popen, input, timeout): try: stdout, stderr = popen.communicate(input=input, timeout=timeout) @@ -130,8 +128,11 @@ class BackgroundCommand(ABC): semantic as :class:`subprocess.Popen`. """ - def __init__(self, conn): + def __init__(self, conn, data_dir, cmd, as_root): self.conn = conn + self._data_dir = data_dir + self.as_root = as_root + self.cmd = cmd # Poll currently opened background commands on that connection to make # them deregister themselves if they are completed. This avoids @@ -147,15 +148,65 @@ class BackgroundCommand(ABC): conn._current_bg_cmds.add(self) + @classmethod + def from_factory(cls, conn, cmd, as_root, make_init_kwargs): + cmd, data_dir = cls._with_data_dir(conn, cmd) + return cls( + conn=conn, + data_dir=data_dir, + cmd=cmd, + as_root=as_root, + **make_init_kwargs(cmd), + ) + def _deregister(self): try: self.conn._current_bg_cmds.remove(self) except KeyError: pass - @abstractmethod - def _send_signal(self, sig): - pass + @property + def _pid_file(self): + return str(Path(self._data_dir, 'pid')) + + @property + @memoized + def _targeted_pid(self): + """ + PID of the process pointed at by ``devlib-signal-target`` command. + """ + path = quote(self._pid_file) + busybox = quote(self.conn.busybox) + + def execute(cmd): + return self.conn.execute(cmd, as_root=self.as_root) + + while self.poll() is None: + try: + pid = execute(f'{busybox} cat {path}') + except subprocess.CalledProcessError: + time.sleep(0.01) + else: + if pid.endswith('\n'): + return int(pid.strip()) + else: + # We got a partial write in the PID file + continue + + raise ValueError(f'The background commmand did not use devlib-signal-target wrapper to designate which command should be the target of signals') + + @classmethod + def _with_data_dir(cls, conn, cmd): + busybox = quote(conn.busybox) + data_dir = conn.execute(f'{busybox} mktemp -d').strip() + cmd = f'_DEVLIB_BG_CMD_DATA_DIR={data_dir} exec {busybox} sh -c {quote(cmd)}' + return cmd, data_dir + + def _cleanup_data_dir(self): + path = quote(self._data_dir) + busybox = quote(self.conn.busybox) + cmd = f'{busybox} rm -r {path} || true' + self.conn.execute(cmd, as_root=self.as_root) def send_signal(self, sig): """ @@ -165,8 +216,29 @@ class BackgroundCommand(ABC): :param signal: Signal to send. :type signal: signal.Signals """ + + def execute(cmd): + return self.conn.execute(cmd, as_root=self.as_root) + + def send(sig): + busybox = quote(self.conn.busybox) + # If the command has already completed, we don't want to send a + # signal to another process that might have gotten that PID in the + # meantime. + if self.poll() is None: + if sig in (signal.SIGTERM, signal.SIGQUIT, signal.SIGKILL): + # Use -PGID to target a process group rather than just the + # process itself. This will work in any condition and will + # not require cooperation from the command. + execute(f'{busybox} kill -{sig.value} -{self.pid}') + else: + # Other signals require cooperation from the shell command + # so that it points to a specific process using + # devlib-signal-target + pid = self._targeted_pid + execute(f'{busybox} kill -{sig.value} {pid}') try: - return self._send_signal(sig) + return send(sig) finally: # Deregister if the command has finished self.poll() @@ -287,6 +359,7 @@ class BackgroundCommand(ABC): return self._close() finally: self._deregister() + self._cleanup_data_dir() def __enter__(self): return self @@ -300,13 +373,15 @@ class PopenBackgroundCommand(BackgroundCommand): :class:`subprocess.Popen`-based background command. """ - def __init__(self, conn, popen): - super().__init__(conn=conn) + def __init__(self, conn, data_dir, cmd, as_root, popen): + super().__init__( + conn=conn, + data_dir=data_dir, + cmd=cmd, + as_root=as_root, + ) self.popen = popen - def _send_signal(self, sig): - return os.killpg(self.popen.pid, sig) - @property def stdin(self): return self.popen.stdin @@ -354,26 +429,20 @@ class ParamikoBackgroundCommand(BackgroundCommand): """ :mod:`paramiko`-based background command. """ - def __init__(self, conn, chan, pid, as_root, cmd, stdin, stdout, stderr, redirect_thread): - super().__init__(conn=conn) + def __init__(self, conn, data_dir, cmd, as_root, chan, pid, stdin, stdout, stderr, redirect_thread): + super().__init__( + conn=conn, + data_dir=data_dir, + cmd=cmd, + as_root=as_root, + ) + self.chan = chan - self.as_root = as_root self._pid = pid self._stdin = stdin self._stdout = stdout self._stderr = stderr self.redirect_thread = redirect_thread - self.cmd = cmd - - def _send_signal(self, sig): - # If the command has already completed, we don't want to send a signal - # to another process that might have gotten that PID in the meantime. - if self.poll() is not None: - return - # Use -PGID to target a process group rather than just the process - # itself - cmd = _kill_pgid_cmd(self.pid, sig, self.conn.busybox) - self.conn.execute(cmd, as_root=self.as_root) @property def pid(self): @@ -517,18 +586,16 @@ class AdbBackgroundCommand(BackgroundCommand): ``adb``-based background command. """ - def __init__(self, conn, adb_popen, pid, as_root): - super().__init__(conn=conn) - self.as_root = as_root + def __init__(self, conn, data_dir, cmd, as_root, adb_popen, pid): + super().__init__( + conn=conn, + data_dir=data_dir, + cmd=cmd, + as_root=as_root, + ) self.adb_popen = adb_popen self._pid = pid - def _send_signal(self, sig): - self.conn.execute( - _kill_pgid_cmd(self.pid, sig, self.conn.busybox), - as_root=self.as_root, - ) - @property def stdin(self): return self.adb_popen.stdin @@ -638,7 +705,7 @@ class TransferHandleBase(ABC): class PopenTransferHandle(TransferHandleBase): - def __init__(self, bg_cmd, dest, direction, *args, **kwargs): + def __init__(self, popen, dest, direction, *args, **kwargs): super().__init__(*args, **kwargs) if direction == 'push': @@ -650,7 +717,7 @@ class PopenTransferHandle(TransferHandleBase): self.sample_size = lambda: sample_size(dest) - self.bg_cmd = bg_cmd + self.popen = popen self.last_sample = 0 @staticmethod @@ -671,7 +738,7 @@ class PopenTransferHandle(TransferHandleBase): return int(out.split()[0]) def cancel(self): - self.bg_cmd.cancel() + self.popen.terminate() def isactive(self): try: diff --git a/devlib/host.py b/devlib/host.py index f202fccaa..70d7943aa 100644 --- a/devlib/host.py +++ b/devlib/host.py @@ -145,16 +145,25 @@ class LocalConnection(ConnectionBase): def preexec_fn(): os.setpgrp() - popen = subprocess.Popen( - command, - stdout=stdout, - stderr=stderr, - stdin=subprocess.PIPE, - shell=True, - preexec_fn=preexec_fn, + def make_init_kwargs(command): + popen = subprocess.Popen( + command, + stdout=stdout, + stderr=stderr, + stdin=subprocess.PIPE, + shell=True, + preexec_fn=preexec_fn, + ) + return dict( + popen=popen, + ) + + return PopenBackgroundCommand.from_factory( + conn=self, + cmd=command, + as_root=as_root, + make_init_kwargs=make_init_kwargs, ) - bg_cmd = PopenBackgroundCommand(self, popen) - return bg_cmd def _close(self): pass diff --git a/devlib/target.py b/devlib/target.py index 96958b5f8..099427e75 100644 --- a/devlib/target.py +++ b/devlib/target.py @@ -281,7 +281,7 @@ class Target(object): @property def shutils(self): if self._shutils is None: - self._setup_shutils() + self._setup_scripts() return self._shutils def is_running(self, comm): @@ -588,7 +588,7 @@ class Target(object): @asyn.asyncf async def setup(self, executables=None): - await self._setup_shutils.asyn() + await self._setup_scripts.asyn() for host_exe in (executables or []): # pylint: disable=superfluous-parens await self.install.asyn(host_exe) @@ -1559,8 +1559,9 @@ fi # internal methods @asyn.asyncf - async def _setup_shutils(self): - shutils_ifile = os.path.join(PACKAGE_BIN_DIRECTORY, 'scripts', 'shutils.in') + async def _setup_scripts(self): + scripts = os.path.join(PACKAGE_BIN_DIRECTORY, 'scripts') + shutils_ifile = os.path.join(scripts, 'shutils.in') with open(shutils_ifile) as fh: lines = fh.readlines() with tempfile.TemporaryDirectory() as folder: @@ -1571,6 +1572,8 @@ fi ofile.write(line) self._shutils = await self.install.asyn(shutils_ofile) + await self.install.asyn(os.path.join(scripts, 'devlib-signal-target')) + @asyn.asyncf @call_conn async def _execute_util(self, command, timeout=None, check_exit_code=True, as_root=False): diff --git a/devlib/utils/android.py b/devlib/utils/android.py index 81d70c28c..bfc889284 100755 --- a/devlib/utils/android.py +++ b/devlib/utils/android.py @@ -40,7 +40,7 @@ from shlex import quote from devlib.exception import TargetTransientError, TargetStableError, HostError, TargetTransientCalledProcessError, TargetStableCalledProcessError, AdbRootError from devlib.utils.misc import check_output, which, ABI_MAP, redirect_streams, get_subprocess -from devlib.connection import ConnectionBase, AdbBackgroundCommand, PopenBackgroundCommand, PopenTransferHandle +from devlib.connection import ConnectionBase, AdbBackgroundCommand, PopenTransferHandle logger = logging.getLogger('android') @@ -340,7 +340,7 @@ class AdbConnection(ConnectionBase): if timeout: adb_command(self.device, command, timeout=timeout, adb_server=self.adb_server, adb_port=self.adb_port) else: - bg_cmd = adb_command_background( + popen = adb_command_popen( device=self.device, conn=self, command=command, @@ -350,12 +350,12 @@ class AdbConnection(ConnectionBase): handle = PopenTransferHandle( manager=self.transfer_manager, - bg_cmd=bg_cmd, + popen=popen, dest=dest, direction=action ) - with bg_cmd, self.transfer_manager.manage(sources, dest, action, handle): - bg_cmd.communicate() + with popen, self.transfer_manager.manage(sources, dest, action, handle): + popen.communicate() # pylint: disable=unused-argument def execute(self, command, timeout=None, check_exit_code=False, @@ -386,12 +386,18 @@ class AdbConnection(ConnectionBase): return bg_cmd def _background(self, command, stdout, stderr, as_root): - adb_shell, pid = adb_background_shell(self, command, stdout, stderr, as_root) - bg_cmd = AdbBackgroundCommand( + def make_init_kwargs(command): + adb_popen, pid = adb_background_shell(self, command, stdout, stderr, as_root) + return dict( + adb_popen=adb_popen, + pid=pid, + ) + + bg_cmd = AdbBackgroundCommand.from_factory( conn=self, - adb_popen=adb_shell, - pid=pid, - as_root=as_root + cmd=command, + as_root=as_root, + make_init_kwargs=make_init_kwargs, ) return bg_cmd @@ -748,12 +754,11 @@ def adb_command(device, command, timeout=None, adb_server=None, adb_port=None): return output -def adb_command_background(device, conn, command, adb_server=None, adb_port=None): - full_command = get_adb_command(device, command, adb_server, adb_port) - logger.debug(full_command) - popen = get_subprocess(full_command, shell=True) - cmd = PopenBackgroundCommand(conn=conn, popen=popen) - return cmd +def adb_command_popen(device, conn, command, adb_server=None, adb_port=None): + command = get_adb_command(device, command, adb_server, adb_port) + logger.debug(command) + popen = get_subprocess(command, shell=True) + return popen def grant_app_permissions(target, package): diff --git a/devlib/utils/ssh.py b/devlib/utils/ssh.py index bc130226e..bd02934d5 100644 --- a/devlib/utils/ssh.py +++ b/devlib/utils/ssh.py @@ -586,138 +586,142 @@ class SshConnection(SshConnectionBase): return self._background(command, stdout, stderr, as_root) def _background(self, command, stdout, stderr, as_root): - orig_command = command - stdout, stderr, command = redirect_streams(stdout, stderr, command) - - command = "printf '%s\n' $$; exec sh -c {}".format(quote(command)) - channel = self._make_channel() - - def executor(cmd, timeout): - channel.exec_command(cmd) - # Read are not buffered so we will always get the data as soon as - # they arrive - return ( - channel.makefile_stdin('w', 0), - channel.makefile(), - channel.makefile_stderr(), - ) + def make_init_kwargs(command): + _stdout, _stderr, _command = redirect_streams(stdout, stderr, command) + + _command = "printf '%s\n' $$; exec sh -c {}".format(quote(_command)) + channel = self._make_channel() + + def executor(cmd, timeout): + channel.exec_command(cmd) + # Read are not buffered so we will always get the data as soon as + # they arrive + return ( + channel.makefile_stdin('w', 0), + channel.makefile(), + channel.makefile_stderr(), + ) - stdin, stdout_in, stderr_in = self._execute_command( - command, - as_root=as_root, - log=False, - timeout=None, - executor=executor, - ) - pid = stdout_in.readline() - if not pid: - stderr = stderr_in.read() - if channel.exit_status_ready(): - ret = channel.recv_exit_status() - else: - ret = 126 - raise subprocess.CalledProcessError( - ret, - command, - b'', - stderr, + stdin, stdout_in, stderr_in = self._execute_command( + _command, + as_root=as_root, + log=False, + timeout=None, + executor=executor, ) - pid = int(pid) - - def create_out_stream(stream_in, stream_out): - """ - Create a pair of file-like objects. The first one is used to read - data and the second one to write. - """ - - if stream_out == subprocess.DEVNULL: - r, w = None, None - # When asked for a pipe, we just give the file-like object as the - # reading end and no writing end, since paramiko already writes to - # it - elif stream_out == subprocess.PIPE: - r, w = os.pipe() - r = os.fdopen(r, 'rb') - w = os.fdopen(w, 'wb') - # Turn a file descriptor into a file-like object - elif isinstance(stream_out, int) and stream_out >= 0: - r = os.fdopen(stream_in, 'rb') - w = os.fdopen(stream_out, 'wb') - # file-like object - else: - r = stream_in - w = stream_out + pid = stdout_in.readline() + if not pid: + _stderr = stderr_in.read() + if channel.exit_status_ready(): + ret = channel.recv_exit_status() + else: + ret = 126 + raise subprocess.CalledProcessError( + ret, + _command, + b'', + _stderr, + ) + pid = int(pid) + + def create_out_stream(stream_in, stream_out): + """ + Create a pair of file-like objects. The first one is used to read + data and the second one to write. + """ + + if stream_out == subprocess.DEVNULL: + r, w = None, None + # When asked for a pipe, we just give the file-like object as the + # reading end and no writing end, since paramiko already writes to + # it + elif stream_out == subprocess.PIPE: + r, w = os.pipe() + r = os.fdopen(r, 'rb') + w = os.fdopen(w, 'wb') + # Turn a file descriptor into a file-like object + elif isinstance(stream_out, int) and stream_out >= 0: + r = os.fdopen(stream_in, 'rb') + w = os.fdopen(stream_out, 'wb') + # file-like object + else: + r = stream_in + w = stream_out - return (r, w) + return (r, w) - out_streams = { - name: create_out_stream(stream_in, stream_out) - for stream_in, stream_out, name in ( - (stdout_in, stdout, 'stdout'), - (stderr_in, stderr, 'stderr'), - ) - } + out_streams = { + name: create_out_stream(stream_in, stream_out) + for stream_in, stream_out, name in ( + (stdout_in, _stdout, 'stdout'), + (stderr_in, _stderr, 'stderr'), + ) + } + + def redirect_thread_f(stdout_in, stderr_in, out_streams, select_timeout): + def callback(out_streams, name, chunk): + try: + r, w = out_streams[name] + except KeyError: + return out_streams + + try: + w.write(chunk) + # Write failed + except ValueError: + # Since that stream is now closed, stop trying to write to it + del out_streams[name] + # If that was the last open stream, we raise an + # exception so the thread can terminate. + if not out_streams: + raise - def redirect_thread_f(stdout_in, stderr_in, out_streams, select_timeout): - def callback(out_streams, name, chunk): - try: - r, w = out_streams[name] - except KeyError: return out_streams try: - w.write(chunk) - # Write failed + _read_paramiko_streams(stdout_in, stderr_in, select_timeout, callback, copy.copy(out_streams)) + # The streams closed while we were writing to it, the job is done here except ValueError: - # Since that stream is now closed, stop trying to write to it - del out_streams[name] - # If that was the last open stream, we raise an - # exception so the thread can terminate. - if not out_streams: - raise - - return out_streams - - try: - _read_paramiko_streams(stdout_in, stderr_in, select_timeout, callback, copy.copy(out_streams)) - # The streams closed while we were writing to it, the job is done here - except ValueError: - pass - - # Make sure the writing end are closed proper since we are not - # going to write anything anymore - for r, w in out_streams.values(): - w.flush() - if r is not w and w is not None: - w.close() - - # If there is anything we need to redirect to, spawn a thread taking - # care of that - select_timeout = 1 - thread_out_streams = { - name: (r, w) - for name, (r, w) in out_streams.items() - if w is not None - } - redirect_thread = threading.Thread( - target=redirect_thread_f, - args=(stdout_in, stderr_in, thread_out_streams, select_timeout), - # The thread will die when the main thread dies - daemon=True, - ) - redirect_thread.start() + pass + + # Make sure the writing end are closed proper since we are not + # going to write anything anymore + for r, w in out_streams.values(): + w.flush() + if r is not w and w is not None: + w.close() + + # If there is anything we need to redirect to, spawn a thread taking + # care of that + select_timeout = 1 + thread_out_streams = { + name: (r, w) + for name, (r, w) in out_streams.items() + if w is not None + } + redirect_thread = threading.Thread( + target=redirect_thread_f, + args=(stdout_in, stderr_in, thread_out_streams, select_timeout), + # The thread will die when the main thread dies + daemon=True, + ) + redirect_thread.start() + + return dict( + chan=channel, + pid=pid, + stdin=stdin, + # We give the reading end to the consumer of the data + stdout=out_streams['stdout'][0], + stderr=out_streams['stderr'][0], + redirect_thread=redirect_thread, + ) - return ParamikoBackgroundCommand( + return ParamikoBackgroundCommand.from_factory( conn=self, + cmd=command, as_root=as_root, - chan=channel, - pid=pid, - stdin=stdin, - # We give the reading end to the consumer of the data - stdout=out_streams['stdout'][0], - stderr=out_streams['stderr'][0], - redirect_thread=redirect_thread, - cmd=orig_command, + make_init_kwargs=make_init_kwargs, ) def _close(self): -- GitLab