import Queue import threading import time tasks_queue = Queue.Queue() tasks_queue_items = [] tasks_lock = threading.Lock() class Task(): ST_PENDING = 0 ST_INPROGRESS = 1 ST_COMPLETE = 3 ST_ERROR = 2 def __init__(self, fn, args=[], kwargs={}, desc=''): self.id = id(self) self.fn = fn self.args = args self.kwargs = kwargs self.desc = desc self.status = self.ST_PENDING self.scheduled_at = 0 self.error = None def run(self): self.status = self.ST_INPROGRESS try: res = self.fn(*self.args, **self.kwargs) self.status = self.ST_COMPLETE except BaseException as e: res = False self.status = self.ST_ERROR self.error = e return res def schedule(task): with tasks_lock: task.scheduled_at = time.time() tasks_queue_items[0:0] = [task] tasks_queue.put(task) def tasks_worker(): while True: try: task = tasks_queue.get(timeout=5) if not task: break task.run() tasks_queue.task_done() except Queue.Empty: pass finally: with tasks_lock: tasks_queue_items.sort(key=lambda x: -x.scheduled_at) time.sleep(0.5) worker = threading.Thread(target=tasks_worker) def start(): worker.start() def stop(): tasks_queue.put(False) worker.join()