diff --git a/IPython/kernel/task.py b/IPython/kernel/task.py index 924d052..f9e6064 100644 --- a/IPython/kernel/task.py +++ b/IPython/kernel/task.py @@ -808,9 +808,11 @@ class ITaskController(cs.IControllerBase): return the number of tasks with each status. """ - def clear(): + def clear(taskids=None): """ - Clear all previously run tasks from the task controller. + Clear previously run tasks from the task controller. + + If no ids specified, clear all. This is needed because the task controller keep all task results in memory. This can be a problem is there are many completed @@ -1053,9 +1055,13 @@ class TaskController(cs.ControllerAdapterBase): if taskid in self.abortPending: self._doAbort(taskid) aborted = True - if not aborted: if not success: + if isinstance(result,error.TaskRejectError): + log.msg("Task %i dependencies unmet by worker %i"%(taskid, workerid)) + + return + log.msg("Task %i failed on worker %i"% (taskid, workerid)) if task.retries > 0: # resubmit task.retries -= 1 @@ -1100,7 +1106,7 @@ class TaskController(cs.ControllerAdapterBase): self.scheduler.add_worker(self.workers[workerid]) self.distributeTasks() - def clear(self): + def clear(self,taskids=None): """ Clear all previously run tasks from the task controller. @@ -1109,8 +1115,28 @@ class TaskController(cs.ControllerAdapterBase): tasks. Users should call this periodically to clean out these cached task results. """ - self.finishedResults = {} - return defer.succeed(None) + before = len(self.finishedResults) + failed = [] + if taskids is None: + log.msg("Clearing all results") + self.finishedResults = {} + else: + if isinstance(taskids, int): + taskids = [taskids] + if len(taskids) > 0: + log.msg("Clearing results: %i et al."%(taskids[0])) + for i in taskids: + if self.finishedResults.has_key(i): + self.finishedResults.pop(i) + else: + failed.append(i) + after = len(self.finishedResults) + log.msg("Cleared %i results"%(before-after)) + if failed: + fails = ", ".join(map(str, failed)) + return defer.fail(KeyError("Cleared %i results, but no tasks found for ids: %s"%(before-after, fails))) + else: + return defer.succeed(before-after) components.registerAdapter(TaskController, cs.IControllerBase, ITaskController) diff --git a/IPython/kernel/taskclient.py b/IPython/kernel/taskclient.py index 69225fb..f98d9dd 100644 --- a/IPython/kernel/taskclient.py +++ b/IPython/kernel/taskclient.py @@ -134,7 +134,7 @@ class BlockingTaskClient(object): """ return blockingCallFromThread(self.task_controller.queue_status, verbose) - def clear(self): + def clear(self,taskids=None): """ Clear all previously run tasks from the task controller. @@ -143,7 +143,7 @@ class BlockingTaskClient(object): tasks. Users should call this periodically to clean out these cached task results. """ - return blockingCallFromThread(self.task_controller.clear) + return blockingCallFromThread(self.task_controller.clear, taskids) def map(self, func, *sequences): """ diff --git a/IPython/kernel/taskfc.py b/IPython/kernel/taskfc.py index c559f64..e308f9b 100644 --- a/IPython/kernel/taskfc.py +++ b/IPython/kernel/taskfc.py @@ -72,7 +72,7 @@ class IFCTaskController(Interface): def remote_queue_status(verbose): """""" - def remote_clear(): + def remote_clear(taskids=None): """""" @@ -147,8 +147,11 @@ class FCTaskControllerFromTaskController(Referenceable): d.addErrback(self.packageFailure) return d - def remote_clear(self): - return self.taskController.clear() + def remote_clear(self,taskids=None): + d = self.taskController.clear(taskids) + d.addCallback(self.packageSuccess) + d.addErrback(self.packageFailure) + return d def remote_get_client_name(self): return 'IPython.kernel.taskfc.FCTaskClient' @@ -279,16 +282,24 @@ class FCTaskClient(object): d.addCallback(self.unpackage) return d - def clear(self): + def clear(self,taskids=None): """ - Clear all previously run tasks from the task controller. + Clear previously run tasks from the task controller. + :Parameters: + taskids : list, tuple, None + A sequence of taskids whose results we should drop. + if None: clear all results + + :Returns: + An int, the number of tasks cleared This is needed because the task controller keep all task results in memory. This can be a problem is there are many completed tasks. Users should call this periodically to clean out these cached task results. """ - d = self.remote_reference.callRemote('clear') + d = self.remote_reference.callRemote('clear', taskids) + d.addCallback(self.unpackage) return d def adapt_to_blocking_client(self):