Package zeroinstall :: Package support :: Module tasks
[frames] | no frames]

Source Code for Module zeroinstall.support.tasks

  1  """The tasks module provides a simple light-weight alternative to threads. 
  2   
  3  When you have a long-running job you will want to run it in the background, 
  4  while the user does other things. There are four ways to do this: 
  5   
  6   - Use a new thread for each task. 
  7   - Use callbacks from an idle handler. 
  8   - Use a recursive mainloop. 
  9   - Use this module. 
 10   
 11  Using threads causes a number of problems. Some builds of pygtk/python don't 
 12  support them, they introduce race conditions, often lead to many subtle 
 13  bugs, and they require lots of resources (you probably wouldn't want 10,000 
 14  threads running at once). In particular, two threads can run at exactly the 
 15  same time (perhaps on different processors), so you have to be really careful 
 16  that they don't both try to update the same variable at the same time. This 
 17  requires lots of messy locking, which is hard to get right. 
 18   
 19  Callbacks work within a single thread. For example, you open a dialog box and 
 20  then tell the system to call one function if it's closed, and another if the 
 21  user clicks OK, etc. The function that opened the box then returns, and the 
 22  system calls one of the given callback functions later. Callbacks only 
 23  execute one at a time, so you don't have to worry about race conditions. 
 24  However, they are often very awkward to program with, because you have to 
 25  save state somewhere and then pass it to the functions when they're called. 
 26   
 27  A recursive mainloop only works with nested tasks (you can create a 
 28  sub-task, but the main task can't continue until the sub-task has 
 29  finished). We use these for, eg, rox.alert() boxes since you don't 
 30  normally want to do anything else until the box is closed, but it is not 
 31  appropriate for long-running jobs. 
 32   
 33  Tasks use python's generator API to provide a more pleasant interface to 
 34  callbacks. See the Task class (below) for more information. 
 35  """ 
 36   
 37  import sys 
 38  from logging import info, warn 
 39  import gobject 
 40   
 41  # The list of Blockers whose event has happened, in the order they were 
 42  # triggered 
 43  _run_queue = [] 
 44   
45 -def check(blockers, reporter = None):
46 """See if any of the blockers have pending exceptions. 47 @param reporter: invoke this function on each error 48 If reporter is None, raise the first and log the rest.""" 49 ex = None 50 if isinstance(blockers, Blocker): 51 blockers = (blockers,) 52 for b in blockers: 53 if b.exception: 54 b.exception_read = True 55 if reporter: 56 reporter(*b.exception) 57 elif ex is None: 58 ex = b.exception 59 else: 60 warn("Multiple exceptions waiting; skipping %s", b.exception[0]) 61 if ex: 62 raise ex[0], None, ex[1]
63
64 -class Blocker:
65 """A Blocker object starts life with 'happened = False'. Tasks can 66 ask to be suspended until 'happened = True'. The value is changed 67 by a call to trigger(). 68 69 Example: 70 71 >>> kettle_boiled = tasks.Blocker() 72 >>> def make_tea(): 73 print "Get cup" 74 print "Add tea leaves" 75 yield kettle_boiled 76 print "Pour water into cup" 77 print "Brew..." 78 yield tasks.TimeoutBlocker(120) 79 print "Add milk" 80 print "Ready!" 81 >>> tasks.Task(make_tea()) 82 83 Then elsewhere, later:: 84 85 print "Kettle boiled!" 86 kettle_boiled.trigger() 87 88 You can also yield a list of Blockers. Your function will resume 89 after any one of them is triggered. Use blocker.happened to 90 find out which one(s). Yielding a Blocker that has already 91 happened is the same as yielding None (gives any other Tasks a 92 chance to run, and then continues). 93 """ 94 95 exception = None 96
97 - def __init__(self, name):
98 self.happened = False # False until event triggered 99 self._zero_lib_tasks = set() # Tasks waiting on this blocker 100 self.name = name
101
102 - def trigger(self, exception = None):
103 """The event has happened. Note that this cannot be undone; 104 instead, create a new Blocker to handle the next occurance 105 of the event. 106 @param exception: exception to raise in waiting tasks 107 @type exception: (Exception, traceback)""" 108 if self.happened: return # Already triggered 109 self.happened = True 110 self.exception = exception 111 self.exception_read = False 112 #assert self not in _run_queue # Slow 113 if not _run_queue: 114 _schedule() 115 _run_queue.append(self) 116 117 if exception: 118 assert isinstance(exception, tuple), exception 119 if not self._zero_lib_tasks: 120 info("Exception from '%s', but nothing is waiting for it", self)
121 #import traceback 122 #traceback.print_exception(exception[0], None, exception[1]) 123
124 - def __del__(self):
125 if self.exception and not self.exception_read: 126 warn("Blocker %s garbage collected without having it's exception read: %s", self, self.exception)
127
128 - def add_task(self, task):
129 """Called by the schedular when a Task yields this 130 Blocker. If you override this method, be sure to still 131 call this method with Blocker.add_task(self)!""" 132 self._zero_lib_tasks.add(task)
133
134 - def remove_task(self, task):
135 """Called by the schedular when a Task that was waiting for 136 this blocker is resumed.""" 137 self._zero_lib_tasks.remove(task)
138
139 - def __repr__(self):
140 return "<Blocker:%s>" % self
141
142 - def __str__(self):
143 return self.name
144
145 -class IdleBlocker(Blocker):
146 """An IdleBlocker blocks until a task starts waiting on it, then 147 immediately triggers. An instance of this class is used internally 148 when a Task yields None."""
149 - def add_task(self, task):
150 """Also calls trigger.""" 151 Blocker.add_task(self, task) 152 self.trigger()
153
154 -class TimeoutBlocker(Blocker):
155 """Triggers after a set number of seconds."""
156 - def __init__(self, timeout, name):
157 """Trigger after 'timeout' seconds (may be a fraction).""" 158 Blocker.__init__(self, name) 159 gobject.timeout_add(long(timeout * 1000), self._timeout)
160
161 - def _timeout(self):
162 self.trigger()
163
164 -def _io_callback(src, cond, blocker):
165 blocker.trigger() 166 return False
167
168 -class InputBlocker(Blocker):
169 """Triggers when os.read(stream) would not block.""" 170 _tag = None 171 _stream = None
172 - def __init__(self, stream, name):
173 Blocker.__init__(self, name) 174 self._stream = stream
175
176 - def add_task(self, task):
177 Blocker.add_task(self, task) 178 if self._tag is None: 179 self._tag = gobject.io_add_watch(self._stream, gobject.IO_IN | gobject.IO_HUP, 180 _io_callback, self)
181
182 - def remove_task(self, task):
183 Blocker.remove_task(self, task) 184 if not self._zero_lib_tasks: 185 gobject.source_remove(self._tag) 186 self._tag = None
187
188 -class OutputBlocker(Blocker):
189 """Triggers when os.write(stream) would not block.""" 190 _tag = None 191 _stream = None
192 - def __init__(self, stream, name):
193 Blocker.__init__(self, name) 194 self._stream = stream
195
196 - def add_task(self, task):
197 Blocker.add_task(self, task) 198 if self._tag is None: 199 self._tag = gobject.io_add_watch(self._stream, gobject.IO_OUT | gobject.IO_HUP, 200 _io_callback, self)
201
202 - def remove_task(self, task):
203 Blocker.remove_task(self, task) 204 if not self._zero_lib_tasks: 205 gobject.source_remove(self._tag) 206 self._tag = None
207 208 _idle_blocker = IdleBlocker("(idle)") 209
210 -class Task:
211 """Create a new Task when you have some long running function to 212 run in the background, but which needs to do work in 'chunks'. 213 Example: 214 215 >>> from zeroinstall import tasks 216 >>> def my_task(start): 217 for x in range(start, start + 5): 218 print "x =", x 219 yield None 220 221 >>> tasks.Task(my_task(0)) 222 >>> tasks.Task(my_task(10)) 223 >>> mainloop() 224 225 Yielding None gives up control of the processor to another Task, 226 causing the sequence printed to be interleaved. You can also yield a 227 Blocker (or a list of Blockers) if you want to wait for some 228 particular event before resuming (see the Blocker class for details). 229 """ 230
231 - def __init__(self, iterator, name):
232 """Call iterator.next() from a glib idle function. This function 233 can yield Blocker() objects to suspend processing while waiting 234 for events. name is used only for debugging.""" 235 assert iterator.next, "Object passed is not an iterator!" 236 self.iterator = iterator 237 self.finished = Blocker(name) 238 # Block new task on the idle handler... 239 _idle_blocker.add_task(self) 240 self._zero_blockers = (_idle_blocker,) 241 info("Scheduling new task: %s", self)
242
243 - def _resume(self):
244 # Remove from our blockers' queues 245 exception = None 246 for blocker in self._zero_blockers: 247 blocker.remove_task(self) 248 # Resume the task 249 try: 250 new_blockers = self.iterator.next() 251 except StopIteration: 252 # Task ended 253 self.finished.trigger() 254 return 255 except SystemExit: 256 raise 257 except Exception, ex: 258 # Task crashed 259 info("Exception from '%s': %s", self.finished.name, ex) 260 #import traceback 261 #traceback.print_exc() 262 tb = sys.exc_info()[2] 263 self.finished.trigger(exception = (ex, tb)) 264 return 265 if new_blockers is None: 266 # Just give up control briefly 267 new_blockers = (_idle_blocker,) 268 else: 269 if isinstance(new_blockers, Blocker): 270 # Wrap a single yielded blocker into a list 271 new_blockers = (new_blockers,) 272 # Are we blocking on something that already happened? 273 for blocker in new_blockers: 274 assert hasattr(blocker, 'happened'), "Not a Blocker: %s from %s" % (blocker, self) 275 if blocker.happened: 276 new_blockers = (_idle_blocker,) 277 info("Task '%s' waiting on ready blocker %s!", self, blocker) 278 break 279 else: 280 info("Task '%s' stopping and waiting for '%s'", self, new_blockers) 281 # Add to new blockers' queues 282 for blocker in new_blockers: 283 blocker.add_task(self) 284 self._zero_blockers = new_blockers
285
286 - def __repr__(self):
287 return "Task(%s)" % self.finished.name
288
289 - def __str__(self):
290 return self.finished.name
291 292 # Must append to _run_queue right after calling this!
293 -def _schedule():
294 assert not _run_queue 295 gobject.idle_add(_handle_run_queue)
296
297 -def _handle_run_queue():
298 global _idle_blocker 299 assert _run_queue 300 301 next = _run_queue[0] 302 assert next.happened 303 304 if next is _idle_blocker: 305 # Since this blocker will never run again, create a 306 # new one for future idling. 307 _idle_blocker = IdleBlocker("(idle)") 308 elif next._zero_lib_tasks: 309 info("Running %s due to triggering of '%s'", next._zero_lib_tasks, next) 310 else: 311 info("Running %s", next) 312 313 tasks = frozenset(next._zero_lib_tasks) 314 if tasks: 315 next.noticed = True 316 317 for task in tasks: 318 # Run 'task'. 319 task._resume() 320 321 del _run_queue[0] 322 323 if _run_queue: 324 return True 325 return False
326
327 -def named_async(name):
328 """Decorator that turns a generator function into a function that runs the 329 generator as a Task and returns the Task's finished blocker. 330 @param name: the name for the Task""" 331 def deco(fn): 332 def run(*args, **kwargs): 333 return Task(fn(*args, **kwargs), name).finished
334 run.__name__ = fn.__name__ 335 return run 336 return deco 337
338 -def async(fn):
339 """Decorator that turns a generator function into a function that runs the 340 generator as a Task and returns the Task's finished blocker.""" 341 def run(*args, **kwargs): 342 return Task(fn(*args, **kwargs), fn.__name__).finished
343 run.__name__ = fn.__name__ 344 return run 345