##// END OF EJS Templates
added optional taskids argument to TaskClient.clear()...
Fernando Perez -
Show More
@@ -808,9 +808,11 b' class ITaskController(cs.IControllerBase):'
808 return the number of tasks with each status.
808 return the number of tasks with each status.
809 """
809 """
810
810
811 def clear():
811 def clear(taskids=None):
812 """
812 """
813 Clear all previously run tasks from the task controller.
813 Clear previously run tasks from the task controller.
814
815 If no ids specified, clear all.
814
816
815 This is needed because the task controller keep all task results
817 This is needed because the task controller keep all task results
816 in memory. This can be a problem is there are many completed
818 in memory. This can be a problem is there are many completed
@@ -1053,9 +1055,13 b' class TaskController(cs.ControllerAdapterBase):'
1053 if taskid in self.abortPending:
1055 if taskid in self.abortPending:
1054 self._doAbort(taskid)
1056 self._doAbort(taskid)
1055 aborted = True
1057 aborted = True
1056
1057 if not aborted:
1058 if not aborted:
1058 if not success:
1059 if not success:
1060 if isinstance(result,error.TaskRejectError):
1061 log.msg("Task %i dependencies unmet by worker %i"%(taskid, workerid))
1062
1063 return
1064
1059 log.msg("Task %i failed on worker %i"% (taskid, workerid))
1065 log.msg("Task %i failed on worker %i"% (taskid, workerid))
1060 if task.retries > 0: # resubmit
1066 if task.retries > 0: # resubmit
1061 task.retries -= 1
1067 task.retries -= 1
@@ -1100,7 +1106,7 b' class TaskController(cs.ControllerAdapterBase):'
1100 self.scheduler.add_worker(self.workers[workerid])
1106 self.scheduler.add_worker(self.workers[workerid])
1101 self.distributeTasks()
1107 self.distributeTasks()
1102
1108
1103 def clear(self):
1109 def clear(self,taskids=None):
1104 """
1110 """
1105 Clear all previously run tasks from the task controller.
1111 Clear all previously run tasks from the task controller.
1106
1112
@@ -1109,8 +1115,28 b' class TaskController(cs.ControllerAdapterBase):'
1109 tasks. Users should call this periodically to clean out these
1115 tasks. Users should call this periodically to clean out these
1110 cached task results.
1116 cached task results.
1111 """
1117 """
1112 self.finishedResults = {}
1118 before = len(self.finishedResults)
1113 return defer.succeed(None)
1119 failed = []
1120 if taskids is None:
1121 log.msg("Clearing all results")
1122 self.finishedResults = {}
1123 else:
1124 if isinstance(taskids, int):
1125 taskids = [taskids]
1126 if len(taskids) > 0:
1127 log.msg("Clearing results: %i et al."%(taskids[0]))
1128 for i in taskids:
1129 if self.finishedResults.has_key(i):
1130 self.finishedResults.pop(i)
1131 else:
1132 failed.append(i)
1133 after = len(self.finishedResults)
1134 log.msg("Cleared %i results"%(before-after))
1135 if failed:
1136 fails = ", ".join(map(str, failed))
1137 return defer.fail(KeyError("Cleared %i results, but no tasks found for ids: %s"%(before-after, fails)))
1138 else:
1139 return defer.succeed(before-after)
1114
1140
1115
1141
1116 components.registerAdapter(TaskController, cs.IControllerBase, ITaskController)
1142 components.registerAdapter(TaskController, cs.IControllerBase, ITaskController)
@@ -134,7 +134,7 b' class BlockingTaskClient(object):'
134 """
134 """
135 return blockingCallFromThread(self.task_controller.queue_status, verbose)
135 return blockingCallFromThread(self.task_controller.queue_status, verbose)
136
136
137 def clear(self):
137 def clear(self,taskids=None):
138 """
138 """
139 Clear all previously run tasks from the task controller.
139 Clear all previously run tasks from the task controller.
140
140
@@ -143,7 +143,7 b' class BlockingTaskClient(object):'
143 tasks. Users should call this periodically to clean out these
143 tasks. Users should call this periodically to clean out these
144 cached task results.
144 cached task results.
145 """
145 """
146 return blockingCallFromThread(self.task_controller.clear)
146 return blockingCallFromThread(self.task_controller.clear, taskids)
147
147
148 def map(self, func, *sequences):
148 def map(self, func, *sequences):
149 """
149 """
@@ -72,7 +72,7 b' class IFCTaskController(Interface):'
72 def remote_queue_status(verbose):
72 def remote_queue_status(verbose):
73 """"""
73 """"""
74
74
75 def remote_clear():
75 def remote_clear(taskids=None):
76 """"""
76 """"""
77
77
78
78
@@ -147,8 +147,11 b' class FCTaskControllerFromTaskController(Referenceable):'
147 d.addErrback(self.packageFailure)
147 d.addErrback(self.packageFailure)
148 return d
148 return d
149
149
150 def remote_clear(self):
150 def remote_clear(self,taskids=None):
151 return self.taskController.clear()
151 d = self.taskController.clear(taskids)
152 d.addCallback(self.packageSuccess)
153 d.addErrback(self.packageFailure)
154 return d
152
155
153 def remote_get_client_name(self):
156 def remote_get_client_name(self):
154 return 'IPython.kernel.taskfc.FCTaskClient'
157 return 'IPython.kernel.taskfc.FCTaskClient'
@@ -279,16 +282,24 b' class FCTaskClient(object):'
279 d.addCallback(self.unpackage)
282 d.addCallback(self.unpackage)
280 return d
283 return d
281
284
282 def clear(self):
285 def clear(self,taskids=None):
283 """
286 """
284 Clear all previously run tasks from the task controller.
287 Clear previously run tasks from the task controller.
288 :Parameters:
289 taskids : list, tuple, None
290 A sequence of taskids whose results we should drop.
291 if None: clear all results
292
293 :Returns:
294 An int, the number of tasks cleared
285
295
286 This is needed because the task controller keep all task results
296 This is needed because the task controller keep all task results
287 in memory. This can be a problem is there are many completed
297 in memory. This can be a problem is there are many completed
288 tasks. Users should call this periodically to clean out these
298 tasks. Users should call this periodically to clean out these
289 cached task results.
299 cached task results.
290 """
300 """
291 d = self.remote_reference.callRemote('clear')
301 d = self.remote_reference.callRemote('clear', taskids)
302 d.addCallback(self.unpackage)
292 return d
303 return d
293
304
294 def adapt_to_blocking_client(self):
305 def adapt_to_blocking_client(self):
General Comments 0
You need to be logged in to leave comments. Login now