# -*- coding:utf-8; tab-width:4; mode:python -*-
import os
import time
import select
from threading import Thread
import logging
import random
random.seed()
try:
import Queue
except ImportError:
import queue as Queue
from .log import NullHandler
logger = logging.getLogger('commodity.thread')
logger.setLevel(logging.DEBUG)
logger.addHandler(NullHandler())
logger.propagate = False
from .pattern import memoized
from .type_ import checked_type
all = ['ReaderThread',
'ThreadFunc',
'start_new_thread',
'SimpleThreadPool',
'WorkerGroup']
[docs]def ReaderThread(fin, fout):
if fout is None:
return DummyReaderThread()
return ActualReaderThread(fin, fout)
[docs]class DummyReaderThread(object):
def flush(self):
pass
[docs]class ActualReaderThread(Thread):
"""
A thread that read from an actual file handler and write to an
object that just has the write() method.
"""
class Timeout(Exception):
pass
def __init__(self, fdin, fdout):
super(ActualReaderThread, self).__init__()
self.fdin = fdin
self.fdout = fdout
assert not fdout.closed, fdout
self.start()
time.sleep(0.01)
def _read_in(self):
ready = select.select([self.fdin], [], [], 0.2)[0]
if not ready:
raise self.Timeout
return os.read(self.fdin.fileno(), 2048)
def run(self):
while 1:
try:
data = self._read_in()
if not data:
break
self.fdout.write(data)
except self.Timeout:
continue
def flush(self):
Thread.join(self, 2)
# print self, self.isAlive()
self.fdout.write(self.fdin.read())
self.fdout.flush()
[docs]class ThreadFunc(Thread):
"""
Execute given function in a new thread. It provides return value (or exception) as
object atribbutes.
>>> import math
>>> tf = ThreadFunc(math.sin, 8)
>>> tf.join()
>>> tf.result
0.9893582466233818
>>> tf = ThreadFunc(math.sqrt, -1)
>>> tf.join()
>>> tf.exception
>>> ValueError('math domain error')
"""
def __init__(self, target, *args, **kargs):
self.target = target
self.args = args
self.kargs = kargs
super(ThreadFunc, self).__init__()
self.result = self.exception = None
self.start()
time.sleep(0.01)
def run(self):
try:
self.result = self.target(*self.args, **self.kargs)
except Exception as e:
self.exception = e
[docs]def start_new_thread(target, *args, **kargs):
"""
Execute given function in a new thread. It returns a :class:`ThreadFunc` object.
"""
return ThreadFunc(target, *args, **kargs)
[docs]class SimpleThreadPool:
"""A generic and simple thread pool. Function return value are received by means of
callbacks.
>>> import math
>>>
>>> result = []
>>> pool = SimpleThreadPool(4)
>>> pool.add(math.pow, (1, 2), callback=result.append)
>>> pool.join()
Also implements a parallel :py:func:`map` for an argument sequence for a function executing each
on a different thread:
>>> pool = SimpleThreadPool(4)
>>> pool.map(math.sqrt, ((2, 4, 5, 9)))
(1.4142135623730951, 2.0, 2.23606797749979, 3.0)
>>> pool.map(math.pow, [2, 3, 3], [2, 2, 4])
(4, 9, 81)
"""
def __init__(self, num_workers):
self.tasks = Queue.Queue()
self.threads = [SimpleThreadPool.Worker(self.tasks) for x in range(num_workers)]
def add(self, func, args=(), kargs={}, callback=lambda x: x):
assert callable(func)
self.tasks.put((func, args, kargs, callback))
def map(self, func, *sequences):
if len(sequences) == 1:
it = zip(sequences[0])
else:
it = zip(*sequences)
holders = []
for args in it:
holder = SimpleThreadPool.Holder()
holders.append(holder)
self.add(func, args, {}, holder)
self.join()
return tuple(x.value for x in holders)
def join(self):
self.tasks.join()
class Worker(Thread):
def __init__(self, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()
def run(self):
while 1:
try:
self.run_next()
except Exception as e:
logger.critical("Worker: %s", e)
print(e)
def run_next(self):
func, args, kargs, callback = self.tasks.get()
logger.debug("thread %s taken %s", self, func)
result = func(*args, **kargs)
self.tasks.task_done()
callback(result)
class Holder:
def __init__(self):
self.value = None
def __call__(self, arg):
self.value = arg
[docs]class WorkerGroup(object):
"""
A group of dedicated workers.
>>> import math
>>>
>>> results = []
>>> group = WorkerGroup(4)
>>> w1 = group.get_worker("some unique value")
>>> w1.add(math.square, (9,), callback=results.append)
>>> group.join()
>>>
>>> print results
[81]
"""
def __init__(self, num_workers):
self.workers = [WorkerGroup.Worker(Queue.Queue()) for x in range(num_workers)]
@memoized
def get_worker(self, id_):
return random.choice(self.workers)
def join(self):
for w in self.workers:
w.tasks.join()
class Worker(SimpleThreadPool.Worker):
def add(self, func, args=(), kargs={}, callback=lambda x: x):
assert callable(func)
self.tasks.put((func, args, kargs, callback))