"""
This file implements ``WorkerStream`` for capturing stdout and stderr.
"""
import queue
import sys
[docs]class WorkerStream(object):
"""File-like object for wrapping the output of the scripts into connection messages"""
[docs] def __init__(self, name, connection):
self.name = name # 'stdout' or 'stderr'
self.connection = connection
[docs] def __repr__(self):
return "WorkerStream({name})".format(name=self.name)
[docs] def close(self):
self.connection.close()
[docs] def write(self, str):
self.connection.send((self.name, str))
[docs] def isatty(self):
return False
[docs]class HttpWorkerStream(object):
[docs] def __init__(self, name, jobid, url):
self.name = name
self.jobid = jobid
self.url = url
# def write(self, str):
# async requests.post("url + jobid", str)
[docs]class QueueWorkerStream(object):
"""File-like object for wrapping the output of the scripts with queues - to be created in child process"""
[docs] def __init__(self, name, q):
self.name = name # 'stdout' or 'stderr'
self.q = q
[docs] def __repr__(self):
return "QueueWorkerStream({name})".format(name=self.name)
[docs] def write(self, str):
self.q.put((self.name, str))
[docs] def isatty(self):
return False
[docs]def stream_from_queue(q):
"""Stream the contents from the queue to STDOUT / STDERR - to be called from parent process"""
try:
stream, msg = q.get(block=True, timeout=0.1)
if stream == 'stdout':
sys.stdout.write(msg)
elif stream == 'stderr':
sys.stderr.write(msg)
except queue.Empty:
pass