添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
相关文章推荐
重感情的番茄  ·  try to re connect to ...·  1 年前    · 
霸气的大蒜  ·  typescript - ...·  2 年前    · 
重感情的大象  ·  和我一起学 ...·  2 年前    · 
Collectives™ on Stack Overflow

Find centralized, trusted content and collaborate around the technologies you use most.

Learn more about Collectives

Teams

Q&A for work

Connect and share knowledge within a single location that is structured and easy to search.

Learn more about Teams def run_cmd(command, cwd=None): p = subprocess.Popen(command, cwd=cwd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE) outs, errs = p.communicate() rc = p.returncode outs = outs.decode('utf-8') errs = errs.decode('utf-8') return (rc, (outs, errs))

Thanks to @unutbu, special thanks for @j-f-sebastian, final function:

#!/usr/bin/python3
# -*- coding: utf-8 -*-
import sys
from queue import Queue
from subprocess import PIPE, Popen
from threading import Thread
def read_output(pipe, funcs):
    for line in iter(pipe.readline, b''):
        for func in funcs:
            func(line.decode('utf-8'))
    pipe.close()
def write_output(get):
    for line in iter(get, None):
        sys.stdout.write(line)
def run_cmd(command, cwd=None, passthrough=True):
    outs, errs = None, None
    proc = Popen(
        command,
        cwd=cwd,
        shell=False,
        close_fds=True,
        stdout=PIPE,
        stderr=PIPE,
        bufsize=1
    if passthrough:
        outs, errs = [], []
        q = Queue()
        stdout_thread = Thread(
            target=read_output, args=(proc.stdout, [q.put, outs.append])
        stderr_thread = Thread(
            target=read_output, args=(proc.stderr, [q.put, errs.append])
        writer_thread = Thread(
            target=write_output, args=(q.get,)
        for t in (stdout_thread, stderr_thread, writer_thread):
            t.daemon = True
            t.start()
        proc.wait()
        for t in (stdout_thread, stderr_thread):
            t.join()
        q.put(None)
        outs = ' '.join(outs)
        errs = ' '.join(errs)
    else:
        outs, errs = proc.communicate()
        outs = '' if outs == None else outs.decode('utf-8')
        errs = '' if errs == None else errs.decode('utf-8')
    rc = proc.returncode
    return (rc, (outs, errs))
                The code example does store outs and errs and returns them... To print to the terminal, simply if outs: print outs if errs: print errs
– bnlucas
                Jun 19, 2013 at 11:43
                @bnlucas Thanks, but as I stated in first point: the output should be printed in REAL TIME to terminal, like as without PIPEing.
– Łukasz Zdun
                Jun 19, 2013 at 12:00
                If you need Python 3 code; add python-3.x tag (i see python3 in the shebang). Your code as written will leave reading threads hanging. In Python 3 '' is a Unicode literal, but pipe.readline() returns bytes by default ('' != b"" on Python 3). If you fix it then the writer thread won't end, because nothing puts "" into the queue.
– jfs
                Jun 19, 2013 at 16:02

To capture and display at the same time both stdout and stderr from a child process line by line in a single thread, you could use asynchronous I/O:

#!/usr/bin/env python3
import asyncio
import os
import sys
from asyncio.subprocess import PIPE
@asyncio.coroutine
def read_stream_and_display(stream, display):
    """Read from stream line by line until EOF, display, and capture the lines.
    output = []
    while True:
        line = yield from stream.readline()
        if not line:
            break
        output.append(line)
        display(line) # assume it doesn't block
    return b''.join(output)
@asyncio.coroutine
def read_and_display(*cmd):
    """Capture cmd's stdout, stderr while displaying them as they arrive
    (line by line).
    # start process
    process = yield from asyncio.create_subprocess_exec(*cmd,
            stdout=PIPE, stderr=PIPE)
    # read child's stdout/stderr concurrently (capture and display)
        stdout, stderr = yield from asyncio.gather(
            read_stream_and_display(process.stdout, sys.stdout.buffer.write),
            read_stream_and_display(process.stderr, sys.stderr.buffer.write))
    except Exception:
        process.kill()
        raise
    finally:
        # wait for the process to exit
        rc = yield from process.wait()
    return rc, stdout, stderr
# run the event loop
if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
rc, *output = loop.run_until_complete(read_and_display(*cmd))
loop.close()
                @kinORnirvana: asyncio works only on Python 3.3+ There is trollius—a Python 2 clone but it is deprecated
– jfs
                Apr 27, 2016 at 17:09
                Note that once the loop is closed doing get_event_loop will get you the same closed loop which cannot be re-used as is (event loop is closed message). I ended up doing asyncio.set_event_loop(asyncio.new_event_loop()) to get a fresh event loop.
– Adversus
                Oct 20, 2017 at 14:52
                I was running this code in a Jupyter notebook. I was getting an AttributeError because sys.stdout.buffer no longer existed. This helped clear it up: docs.python.org/3/library/sys.html#sys.stderr When in a Jupyter notebook I used sys.stdout.write in lieu of sys.stdout.buffer.writeand the output appeared in the notebook logging output.
– dmmfll
                Jan 28, 2018 at 21:48

You could spawn threads to read the stdout and stderr pipes, write to a common queue, and append to lists. Then use a third thread to print items from the queue.

import time
import Queue
import sys
import threading
import subprocess
PIPE = subprocess.PIPE
def read_output(pipe, funcs):
    for line in iter(pipe.readline, ''):
        for func in funcs:
            func(line)
            # time.sleep(1)
    pipe.close()
def write_output(get):
    for line in iter(get, None):
        sys.stdout.write(line)
process = subprocess.Popen(
    ['random_print.py'], stdout=PIPE, stderr=PIPE, close_fds=True, bufsize=1)
q = Queue.Queue()
out, err = [], []
tout = threading.Thread(
    target=read_output, args=(process.stdout, [q.put, out.append]))
terr = threading.Thread(
    target=read_output, args=(process.stderr, [q.put, err.append]))
twrite = threading.Thread(target=write_output, args=(q.get,))
for t in (tout, terr, twrite):
    t.daemon = True
    t.start()
process.wait()
for t in (tout, terr):
    t.join()
q.put(None)
print(out)
print(err)

The reason for using the third thread -- instead of letting the first two threads both print directly to the terminal -- is to prevent both print statements from occurring concurrently, which can result in sometimes garbled text.

The above calls random_print.py, which prints to stdout and stderr at random:

import sys
import time
import random
for i in range(50):
    f = random.choice([sys.stdout,sys.stderr])
    f.write(str(i)+'\n')
    f.flush()
    time.sleep(0.1)

Here is an alternative solution for Unix-like systems, using select.select:

import collections
import select
import fcntl
import os
import time
import Queue
import sys
import threading
import subprocess
PIPE = subprocess.PIPE
def make_async(fd):
    # https://stackoverflow.com/a/7730201/190597
    '''add the O_NONBLOCK flag to a file descriptor'''
    fcntl.fcntl(
        fd, fcntl.F_SETFL, fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK)
def read_async(fd):
    # https://stackoverflow.com/a/7730201/190597
    '''read some data from a file descriptor, ignoring EAGAIN errors'''
    # time.sleep(1)
        return fd.read()
    except IOError, e:
        if e.errno != errno.EAGAIN:
            raise e
        else:
            return ''
def write_output(fds, outmap):
    for fd in fds:
        line = read_async(fd)
        sys.stdout.write(line)
        outmap[fd.fileno()].append(line)
process = subprocess.Popen(
    ['random_print.py'], stdout=PIPE, stderr=PIPE, close_fds=True)
make_async(process.stdout)
make_async(process.stderr)
outmap = collections.defaultdict(list)
while True:
    rlist, wlist, xlist = select.select([process.stdout, process.stderr], [], [])
    write_output(rlist, outmap)
    if process.poll() is not None:
        write_output([process.stdout, process.stderr], outmap)
        break
fileno = {'stdout': process.stdout.fileno(),
          'stderr': process.stderr.fileno()}
print(outmap[fileno['stdout']])
print(outmap[fileno['stderr']])

This solution uses code and ideas from Adam Rosenfield's post, here.

you could add q.put(None) after process.wait() and exit the 3rd thread on None e.g., for line in iter(get, None):. Also pipe.close() is missing. – jfs Jun 19, 2013 at 13:07 @J.F.Sebastian: Thanks for the corrections. Suppose read_output for some reason does not keep pace with the output being written to pipe. (I try to simulate that with a time.sleep(1) above). When the time.sleep(1) is uncommented, out and err fail to collect all the output before process.wait() completes. Do you know a way to guarantee that out and err get all the output? – unutbu Jun 19, 2013 at 13:41 t{err,out}.join() before put(None). btw, to get lines in "real time", bufsize=1 might help (ignoring `block-buffering issue) – jfs Jun 19, 2013 at 14:16

To stream live output (stdout and stderr) of a subprocess to the terminal, as well as to variables, you can spawn two threads to handle the streams concurrently.

Adapted from my more detailed answer:

import logging
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from subprocess import PIPE, CalledProcessError, CompletedProcess, Popen
def stream_command(
    args,
    stdout_handler=logging.info,
    stderr_handler=logging.error,
    check=True,
    text=True,
    stdout=PIPE,
    stderr=PIPE,
    **kwargs,
    """Mimic subprocess.run, while processing the command output in real time."""
    with Popen(args, text=text, stdout=stdout, stderr=stderr, **kwargs) as process:
        with ThreadPoolExecutor(2) as pool:  # two threads to handle the streams
            exhaust = partial(pool.submit, partial(deque, maxlen=0))
            exhaust(stdout_handler(line[:-1]) for line in process.stdout)
            exhaust(stderr_handler(line[:-1]) for line in process.stderr)
    retcode = process.poll()
    if check and retcode:
        raise CalledProcessError(retcode, process.args)
    return CompletedProcess(process.args, retcode)

Call with custom handlers:

outs, errs = [], []
def stdout_handler(line):
    outs.append(line)
    print(line)
def stderr_handler(line):
    errs.append(line)
    print(line)
stream_command(
    ["echo", "test"],
    stdout_handler=stdout_handler,
    stderr_handler=stderr_handler,
# test
print(outs)
# ['test']
        

Thanks for contributing an answer to Stack Overflow!

  • Please be sure to answer the question. Provide details and share your research!

But avoid

  • Asking for help, clarification, or responding to other answers.
  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.