Source code for cea.utilities.workerstream

"""
This file implements ``WorkerStream`` for capturing stdout and stderr.
"""
import queue
import sys


[docs]class WorkerStream(object): """File-like object for wrapping the output of the scripts into connection messages"""
[docs] def __init__(self, name, connection): self.name = name # 'stdout' or 'stderr' self.connection = connection
[docs] def __repr__(self): return "WorkerStream({name})".format(name=self.name)
[docs] def close(self): self.connection.close()
[docs] def write(self, str): self.connection.send((self.name, str))
[docs] def isatty(self): return False
[docs] def flush(self): pass
[docs]class HttpWorkerStream(object):
[docs] def __init__(self, name, jobid, url): self.name = name self.jobid = jobid self.url = url
# def write(self, str): # async requests.post("url + jobid", str)
[docs]class QueueWorkerStream(object): """File-like object for wrapping the output of the scripts with queues - to be created in child process"""
[docs] def __init__(self, name, q): self.name = name # 'stdout' or 'stderr' self.q = q
[docs] def __repr__(self): return "QueueWorkerStream({name})".format(name=self.name)
[docs] def close(self): pass
[docs] def write(self, str): self.q.put((self.name, str))
[docs] def isatty(self): return False
[docs] def flush(self): pass
[docs]def stream_from_queue(q): """Stream the contents from the queue to STDOUT / STDERR - to be called from parent process""" try: stream, msg = q.get(block=True, timeout=0.1) if stream == 'stdout': sys.stdout.write(msg) elif stream == 'stderr': sys.stderr.write(msg) except queue.Empty: pass