Show More
@@ -808,9 +808,11 b' class ITaskController(cs.IControllerBase):' | |||
|
808 | 808 | return the number of tasks with each status. |
|
809 | 809 | """ |
|
810 | 810 | |
|
811 | def clear(): | |
|
811 | def clear(taskids=None): | |
|
812 | 812 | """ |
|
813 |
Clear |
|
|
813 | Clear previously run tasks from the task controller. | |
|
814 | ||
|
815 | If no ids specified, clear all. | |
|
814 | 816 | |
|
815 | 817 | This is needed because the task controller keep all task results |
|
816 | 818 | in memory. This can be a problem is there are many completed |
@@ -1053,9 +1055,13 b' class TaskController(cs.ControllerAdapterBase):' | |||
|
1053 | 1055 | if taskid in self.abortPending: |
|
1054 | 1056 | self._doAbort(taskid) |
|
1055 | 1057 | aborted = True |
|
1056 | ||
|
1057 | 1058 | if not aborted: |
|
1058 | 1059 | if not success: |
|
1060 | if isinstance(result,error.TaskRejectError): | |
|
1061 | log.msg("Task %i dependencies unmet by worker %i"%(taskid, workerid)) | |
|
1062 | ||
|
1063 | return | |
|
1064 | ||
|
1059 | 1065 | log.msg("Task %i failed on worker %i"% (taskid, workerid)) |
|
1060 | 1066 | if task.retries > 0: # resubmit |
|
1061 | 1067 | task.retries -= 1 |
@@ -1100,7 +1106,7 b' class TaskController(cs.ControllerAdapterBase):' | |||
|
1100 | 1106 | self.scheduler.add_worker(self.workers[workerid]) |
|
1101 | 1107 | self.distributeTasks() |
|
1102 | 1108 | |
|
1103 | def clear(self): | |
|
1109 | def clear(self,taskids=None): | |
|
1104 | 1110 | """ |
|
1105 | 1111 | Clear all previously run tasks from the task controller. |
|
1106 | 1112 | |
@@ -1109,8 +1115,28 b' class TaskController(cs.ControllerAdapterBase):' | |||
|
1109 | 1115 | tasks. Users should call this periodically to clean out these |
|
1110 | 1116 | cached task results. |
|
1111 | 1117 | """ |
|
1112 |
self.finishedResults |
|
|
1113 | return defer.succeed(None) | |
|
1118 | before = len(self.finishedResults) | |
|
1119 | failed = [] | |
|
1120 | if taskids is None: | |
|
1121 | log.msg("Clearing all results") | |
|
1122 | self.finishedResults = {} | |
|
1123 | else: | |
|
1124 | if isinstance(taskids, int): | |
|
1125 | taskids = [taskids] | |
|
1126 | if len(taskids) > 0: | |
|
1127 | log.msg("Clearing results: %i et al."%(taskids[0])) | |
|
1128 | for i in taskids: | |
|
1129 | if self.finishedResults.has_key(i): | |
|
1130 | self.finishedResults.pop(i) | |
|
1131 | else: | |
|
1132 | failed.append(i) | |
|
1133 | after = len(self.finishedResults) | |
|
1134 | log.msg("Cleared %i results"%(before-after)) | |
|
1135 | if failed: | |
|
1136 | fails = ", ".join(map(str, failed)) | |
|
1137 | return defer.fail(KeyError("Cleared %i results, but no tasks found for ids: %s"%(before-after, fails))) | |
|
1138 | else: | |
|
1139 | return defer.succeed(before-after) | |
|
1114 | 1140 | |
|
1115 | 1141 | |
|
1116 | 1142 | components.registerAdapter(TaskController, cs.IControllerBase, ITaskController) |
@@ -134,7 +134,7 b' class BlockingTaskClient(object):' | |||
|
134 | 134 | """ |
|
135 | 135 | return blockingCallFromThread(self.task_controller.queue_status, verbose) |
|
136 | 136 | |
|
137 | def clear(self): | |
|
137 | def clear(self,taskids=None): | |
|
138 | 138 | """ |
|
139 | 139 | Clear all previously run tasks from the task controller. |
|
140 | 140 | |
@@ -143,7 +143,7 b' class BlockingTaskClient(object):' | |||
|
143 | 143 | tasks. Users should call this periodically to clean out these |
|
144 | 144 | cached task results. |
|
145 | 145 | """ |
|
146 | return blockingCallFromThread(self.task_controller.clear) | |
|
146 | return blockingCallFromThread(self.task_controller.clear, taskids) | |
|
147 | 147 | |
|
148 | 148 | def map(self, func, *sequences): |
|
149 | 149 | """ |
@@ -72,7 +72,7 b' class IFCTaskController(Interface):' | |||
|
72 | 72 | def remote_queue_status(verbose): |
|
73 | 73 | """""" |
|
74 | 74 | |
|
75 | def remote_clear(): | |
|
75 | def remote_clear(taskids=None): | |
|
76 | 76 | """""" |
|
77 | 77 | |
|
78 | 78 | |
@@ -147,8 +147,11 b' class FCTaskControllerFromTaskController(Referenceable):' | |||
|
147 | 147 | d.addErrback(self.packageFailure) |
|
148 | 148 | return d |
|
149 | 149 | |
|
150 | def remote_clear(self): | |
|
151 |
|
|
|
150 | def remote_clear(self,taskids=None): | |
|
151 | d = self.taskController.clear(taskids) | |
|
152 | d.addCallback(self.packageSuccess) | |
|
153 | d.addErrback(self.packageFailure) | |
|
154 | return d | |
|
152 | 155 | |
|
153 | 156 | def remote_get_client_name(self): |
|
154 | 157 | return 'IPython.kernel.taskfc.FCTaskClient' |
@@ -279,16 +282,24 b' class FCTaskClient(object):' | |||
|
279 | 282 | d.addCallback(self.unpackage) |
|
280 | 283 | return d |
|
281 | 284 | |
|
282 | def clear(self): | |
|
285 | def clear(self,taskids=None): | |
|
283 | 286 | """ |
|
284 |
Clear |
|
|
287 | Clear previously run tasks from the task controller. | |
|
288 | :Parameters: | |
|
289 | taskids : list, tuple, None | |
|
290 | A sequence of taskids whose results we should drop. | |
|
291 | if None: clear all results | |
|
292 | ||
|
293 | :Returns: | |
|
294 | An int, the number of tasks cleared | |
|
285 | 295 | |
|
286 | 296 | This is needed because the task controller keep all task results |
|
287 | 297 | in memory. This can be a problem is there are many completed |
|
288 | 298 | tasks. Users should call this periodically to clean out these |
|
289 | 299 | cached task results. |
|
290 | 300 | """ |
|
291 | d = self.remote_reference.callRemote('clear') | |
|
301 | d = self.remote_reference.callRemote('clear', taskids) | |
|
302 | d.addCallback(self.unpackage) | |
|
292 | 303 | return d |
|
293 | 304 | |
|
294 | 305 | def adapt_to_blocking_client(self): |
General Comments 0
You need to be logged in to leave comments.
Login now