Show More
@@ -808,9 +808,11 b' class ITaskController(cs.IControllerBase):' | |||||
808 | return the number of tasks with each status. |
|
808 | return the number of tasks with each status. | |
809 | """ |
|
809 | """ | |
810 |
|
810 | |||
811 | def clear(): |
|
811 | def clear(taskids=None): | |
812 | """ |
|
812 | """ | |
813 |
Clear |
|
813 | Clear previously run tasks from the task controller. | |
|
814 | ||||
|
815 | If no ids specified, clear all. | |||
814 |
|
816 | |||
815 | This is needed because the task controller keep all task results |
|
817 | This is needed because the task controller keep all task results | |
816 | in memory. This can be a problem is there are many completed |
|
818 | in memory. This can be a problem is there are many completed | |
@@ -1053,9 +1055,13 b' class TaskController(cs.ControllerAdapterBase):' | |||||
1053 | if taskid in self.abortPending: |
|
1055 | if taskid in self.abortPending: | |
1054 | self._doAbort(taskid) |
|
1056 | self._doAbort(taskid) | |
1055 | aborted = True |
|
1057 | aborted = True | |
1056 |
|
||||
1057 | if not aborted: |
|
1058 | if not aborted: | |
1058 | if not success: |
|
1059 | if not success: | |
|
1060 | if isinstance(result,error.TaskRejectError): | |||
|
1061 | log.msg("Task %i dependencies unmet by worker %i"%(taskid, workerid)) | |||
|
1062 | ||||
|
1063 | return | |||
|
1064 | ||||
1059 | log.msg("Task %i failed on worker %i"% (taskid, workerid)) |
|
1065 | log.msg("Task %i failed on worker %i"% (taskid, workerid)) | |
1060 | if task.retries > 0: # resubmit |
|
1066 | if task.retries > 0: # resubmit | |
1061 | task.retries -= 1 |
|
1067 | task.retries -= 1 | |
@@ -1100,7 +1106,7 b' class TaskController(cs.ControllerAdapterBase):' | |||||
1100 | self.scheduler.add_worker(self.workers[workerid]) |
|
1106 | self.scheduler.add_worker(self.workers[workerid]) | |
1101 | self.distributeTasks() |
|
1107 | self.distributeTasks() | |
1102 |
|
1108 | |||
1103 | def clear(self): |
|
1109 | def clear(self,taskids=None): | |
1104 | """ |
|
1110 | """ | |
1105 | Clear all previously run tasks from the task controller. |
|
1111 | Clear all previously run tasks from the task controller. | |
1106 |
|
1112 | |||
@@ -1109,8 +1115,28 b' class TaskController(cs.ControllerAdapterBase):' | |||||
1109 | tasks. Users should call this periodically to clean out these |
|
1115 | tasks. Users should call this periodically to clean out these | |
1110 | cached task results. |
|
1116 | cached task results. | |
1111 | """ |
|
1117 | """ | |
1112 |
self.finishedResults |
|
1118 | before = len(self.finishedResults) | |
1113 | return defer.succeed(None) |
|
1119 | failed = [] | |
|
1120 | if taskids is None: | |||
|
1121 | log.msg("Clearing all results") | |||
|
1122 | self.finishedResults = {} | |||
|
1123 | else: | |||
|
1124 | if isinstance(taskids, int): | |||
|
1125 | taskids = [taskids] | |||
|
1126 | if len(taskids) > 0: | |||
|
1127 | log.msg("Clearing results: %i et al."%(taskids[0])) | |||
|
1128 | for i in taskids: | |||
|
1129 | if self.finishedResults.has_key(i): | |||
|
1130 | self.finishedResults.pop(i) | |||
|
1131 | else: | |||
|
1132 | failed.append(i) | |||
|
1133 | after = len(self.finishedResults) | |||
|
1134 | log.msg("Cleared %i results"%(before-after)) | |||
|
1135 | if failed: | |||
|
1136 | fails = ", ".join(map(str, failed)) | |||
|
1137 | return defer.fail(KeyError("Cleared %i results, but no tasks found for ids: %s"%(before-after, fails))) | |||
|
1138 | else: | |||
|
1139 | return defer.succeed(before-after) | |||
1114 |
|
1140 | |||
1115 |
|
1141 | |||
1116 | components.registerAdapter(TaskController, cs.IControllerBase, ITaskController) |
|
1142 | components.registerAdapter(TaskController, cs.IControllerBase, ITaskController) |
@@ -134,7 +134,7 b' class BlockingTaskClient(object):' | |||||
134 | """ |
|
134 | """ | |
135 | return blockingCallFromThread(self.task_controller.queue_status, verbose) |
|
135 | return blockingCallFromThread(self.task_controller.queue_status, verbose) | |
136 |
|
136 | |||
137 | def clear(self): |
|
137 | def clear(self,taskids=None): | |
138 | """ |
|
138 | """ | |
139 | Clear all previously run tasks from the task controller. |
|
139 | Clear all previously run tasks from the task controller. | |
140 |
|
140 | |||
@@ -143,7 +143,7 b' class BlockingTaskClient(object):' | |||||
143 | tasks. Users should call this periodically to clean out these |
|
143 | tasks. Users should call this periodically to clean out these | |
144 | cached task results. |
|
144 | cached task results. | |
145 | """ |
|
145 | """ | |
146 | return blockingCallFromThread(self.task_controller.clear) |
|
146 | return blockingCallFromThread(self.task_controller.clear, taskids) | |
147 |
|
147 | |||
148 | def map(self, func, *sequences): |
|
148 | def map(self, func, *sequences): | |
149 | """ |
|
149 | """ |
@@ -72,7 +72,7 b' class IFCTaskController(Interface):' | |||||
72 | def remote_queue_status(verbose): |
|
72 | def remote_queue_status(verbose): | |
73 | """""" |
|
73 | """""" | |
74 |
|
74 | |||
75 | def remote_clear(): |
|
75 | def remote_clear(taskids=None): | |
76 | """""" |
|
76 | """""" | |
77 |
|
77 | |||
78 |
|
78 | |||
@@ -147,8 +147,11 b' class FCTaskControllerFromTaskController(Referenceable):' | |||||
147 | d.addErrback(self.packageFailure) |
|
147 | d.addErrback(self.packageFailure) | |
148 | return d |
|
148 | return d | |
149 |
|
149 | |||
150 | def remote_clear(self): |
|
150 | def remote_clear(self,taskids=None): | |
151 |
|
|
151 | d = self.taskController.clear(taskids) | |
|
152 | d.addCallback(self.packageSuccess) | |||
|
153 | d.addErrback(self.packageFailure) | |||
|
154 | return d | |||
152 |
|
155 | |||
153 | def remote_get_client_name(self): |
|
156 | def remote_get_client_name(self): | |
154 | return 'IPython.kernel.taskfc.FCTaskClient' |
|
157 | return 'IPython.kernel.taskfc.FCTaskClient' | |
@@ -279,16 +282,24 b' class FCTaskClient(object):' | |||||
279 | d.addCallback(self.unpackage) |
|
282 | d.addCallback(self.unpackage) | |
280 | return d |
|
283 | return d | |
281 |
|
284 | |||
282 | def clear(self): |
|
285 | def clear(self,taskids=None): | |
283 | """ |
|
286 | """ | |
284 |
Clear |
|
287 | Clear previously run tasks from the task controller. | |
|
288 | :Parameters: | |||
|
289 | taskids : list, tuple, None | |||
|
290 | A sequence of taskids whose results we should drop. | |||
|
291 | if None: clear all results | |||
|
292 | ||||
|
293 | :Returns: | |||
|
294 | An int, the number of tasks cleared | |||
285 |
|
295 | |||
286 | This is needed because the task controller keep all task results |
|
296 | This is needed because the task controller keep all task results | |
287 | in memory. This can be a problem is there are many completed |
|
297 | in memory. This can be a problem is there are many completed | |
288 | tasks. Users should call this periodically to clean out these |
|
298 | tasks. Users should call this periodically to clean out these | |
289 | cached task results. |
|
299 | cached task results. | |
290 | """ |
|
300 | """ | |
291 | d = self.remote_reference.callRemote('clear') |
|
301 | d = self.remote_reference.callRemote('clear', taskids) | |
|
302 | d.addCallback(self.unpackage) | |||
292 | return d |
|
303 | return d | |
293 |
|
304 | |||
294 | def adapt_to_blocking_client(self): |
|
305 | def adapt_to_blocking_client(self): |
General Comments 0
You need to be logged in to leave comments.
Login now