##// END OF EJS Templates
added optional taskids argument to TaskClient.clear()...
Fernando Perez -
Show More
@@ -808,9 +808,11 b' class ITaskController(cs.IControllerBase):'
808 808 return the number of tasks with each status.
809 809 """
810 810
811 def clear():
811 def clear(taskids=None):
812 812 """
813 Clear all previously run tasks from the task controller.
813 Clear previously run tasks from the task controller.
814
815 If no ids specified, clear all.
814 816
815 817 This is needed because the task controller keep all task results
816 818 in memory. This can be a problem is there are many completed
@@ -1053,9 +1055,13 b' class TaskController(cs.ControllerAdapterBase):'
1053 1055 if taskid in self.abortPending:
1054 1056 self._doAbort(taskid)
1055 1057 aborted = True
1056
1057 1058 if not aborted:
1058 1059 if not success:
1060 if isinstance(result,error.TaskRejectError):
1061 log.msg("Task %i dependencies unmet by worker %i"%(taskid, workerid))
1062
1063 return
1064
1059 1065 log.msg("Task %i failed on worker %i"% (taskid, workerid))
1060 1066 if task.retries > 0: # resubmit
1061 1067 task.retries -= 1
@@ -1100,7 +1106,7 b' class TaskController(cs.ControllerAdapterBase):'
1100 1106 self.scheduler.add_worker(self.workers[workerid])
1101 1107 self.distributeTasks()
1102 1108
1103 def clear(self):
1109 def clear(self,taskids=None):
1104 1110 """
1105 1111 Clear all previously run tasks from the task controller.
1106 1112
@@ -1109,8 +1115,28 b' class TaskController(cs.ControllerAdapterBase):'
1109 1115 tasks. Users should call this periodically to clean out these
1110 1116 cached task results.
1111 1117 """
1112 self.finishedResults = {}
1113 return defer.succeed(None)
1118 before = len(self.finishedResults)
1119 failed = []
1120 if taskids is None:
1121 log.msg("Clearing all results")
1122 self.finishedResults = {}
1123 else:
1124 if isinstance(taskids, int):
1125 taskids = [taskids]
1126 if len(taskids) > 0:
1127 log.msg("Clearing results: %i et al."%(taskids[0]))
1128 for i in taskids:
1129 if self.finishedResults.has_key(i):
1130 self.finishedResults.pop(i)
1131 else:
1132 failed.append(i)
1133 after = len(self.finishedResults)
1134 log.msg("Cleared %i results"%(before-after))
1135 if failed:
1136 fails = ", ".join(map(str, failed))
1137 return defer.fail(KeyError("Cleared %i results, but no tasks found for ids: %s"%(before-after, fails)))
1138 else:
1139 return defer.succeed(before-after)
1114 1140
1115 1141
1116 1142 components.registerAdapter(TaskController, cs.IControllerBase, ITaskController)
@@ -134,7 +134,7 b' class BlockingTaskClient(object):'
134 134 """
135 135 return blockingCallFromThread(self.task_controller.queue_status, verbose)
136 136
137 def clear(self):
137 def clear(self,taskids=None):
138 138 """
139 139 Clear all previously run tasks from the task controller.
140 140
@@ -143,7 +143,7 b' class BlockingTaskClient(object):'
143 143 tasks. Users should call this periodically to clean out these
144 144 cached task results.
145 145 """
146 return blockingCallFromThread(self.task_controller.clear)
146 return blockingCallFromThread(self.task_controller.clear, taskids)
147 147
148 148 def map(self, func, *sequences):
149 149 """
@@ -72,7 +72,7 b' class IFCTaskController(Interface):'
72 72 def remote_queue_status(verbose):
73 73 """"""
74 74
75 def remote_clear():
75 def remote_clear(taskids=None):
76 76 """"""
77 77
78 78
@@ -147,8 +147,11 b' class FCTaskControllerFromTaskController(Referenceable):'
147 147 d.addErrback(self.packageFailure)
148 148 return d
149 149
150 def remote_clear(self):
151 return self.taskController.clear()
150 def remote_clear(self,taskids=None):
151 d = self.taskController.clear(taskids)
152 d.addCallback(self.packageSuccess)
153 d.addErrback(self.packageFailure)
154 return d
152 155
153 156 def remote_get_client_name(self):
154 157 return 'IPython.kernel.taskfc.FCTaskClient'
@@ -279,16 +282,24 b' class FCTaskClient(object):'
279 282 d.addCallback(self.unpackage)
280 283 return d
281 284
282 def clear(self):
285 def clear(self,taskids=None):
283 286 """
284 Clear all previously run tasks from the task controller.
287 Clear previously run tasks from the task controller.
288 :Parameters:
289 taskids : list, tuple, None
290 A sequence of taskids whose results we should drop.
291 if None: clear all results
292
293 :Returns:
294 An int, the number of tasks cleared
285 295
286 296 This is needed because the task controller keep all task results
287 297 in memory. This can be a problem is there are many completed
288 298 tasks. Users should call this periodically to clean out these
289 299 cached task results.
290 300 """
291 d = self.remote_reference.callRemote('clear')
301 d = self.remote_reference.callRemote('clear', taskids)
302 d.addCallback(self.unpackage)
292 303 return d
293 304
294 305 def adapt_to_blocking_client(self):
General Comments 0
You need to be logged in to leave comments. Login now