##// END OF EJS Templates
worker: support parallelization of functions with return values...
Valentin Gatien-Baron -
r42655:5ca136bb default
parent child Browse files
Show More
@@ -1,368 +1,393
1 # worker.py - master-slave parallelism support
1 # worker.py - master-slave parallelism support
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import errno
10 import errno
11 import os
11 import os
12 import signal
12 import signal
13 import sys
13 import sys
14 import threading
14 import threading
15 import time
15 import time
16
16
17 try:
17 try:
18 import selectors
18 import selectors
19 selectors.BaseSelector
19 selectors.BaseSelector
20 except ImportError:
20 except ImportError:
21 from .thirdparty import selectors2 as selectors
21 from .thirdparty import selectors2 as selectors
22
22
23 from .i18n import _
23 from .i18n import _
24 from . import (
24 from . import (
25 encoding,
25 encoding,
26 error,
26 error,
27 pycompat,
27 pycompat,
28 scmutil,
28 scmutil,
29 util,
29 util,
30 )
30 )
31
31
32 def countcpus():
32 def countcpus():
33 '''try to count the number of CPUs on the system'''
33 '''try to count the number of CPUs on the system'''
34
34
35 # posix
35 # posix
36 try:
36 try:
37 n = int(os.sysconf(r'SC_NPROCESSORS_ONLN'))
37 n = int(os.sysconf(r'SC_NPROCESSORS_ONLN'))
38 if n > 0:
38 if n > 0:
39 return n
39 return n
40 except (AttributeError, ValueError):
40 except (AttributeError, ValueError):
41 pass
41 pass
42
42
43 # windows
43 # windows
44 try:
44 try:
45 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
45 n = int(encoding.environ['NUMBER_OF_PROCESSORS'])
46 if n > 0:
46 if n > 0:
47 return n
47 return n
48 except (KeyError, ValueError):
48 except (KeyError, ValueError):
49 pass
49 pass
50
50
51 return 1
51 return 1
52
52
53 def _numworkers(ui):
53 def _numworkers(ui):
54 s = ui.config('worker', 'numcpus')
54 s = ui.config('worker', 'numcpus')
55 if s:
55 if s:
56 try:
56 try:
57 n = int(s)
57 n = int(s)
58 if n >= 1:
58 if n >= 1:
59 return n
59 return n
60 except ValueError:
60 except ValueError:
61 raise error.Abort(_('number of cpus must be an integer'))
61 raise error.Abort(_('number of cpus must be an integer'))
62 return min(max(countcpus(), 4), 32)
62 return min(max(countcpus(), 4), 32)
63
63
64 if pycompat.isposix or pycompat.iswindows:
64 if pycompat.isposix or pycompat.iswindows:
65 _STARTUP_COST = 0.01
65 _STARTUP_COST = 0.01
66 # The Windows worker is thread based. If tasks are CPU bound, threads
66 # The Windows worker is thread based. If tasks are CPU bound, threads
67 # in the presence of the GIL result in excessive context switching and
67 # in the presence of the GIL result in excessive context switching and
68 # this overhead can slow down execution.
68 # this overhead can slow down execution.
69 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
69 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
70 else:
70 else:
71 _STARTUP_COST = 1e30
71 _STARTUP_COST = 1e30
72 _DISALLOW_THREAD_UNSAFE = False
72 _DISALLOW_THREAD_UNSAFE = False
73
73
74 def worthwhile(ui, costperop, nops, threadsafe=True):
74 def worthwhile(ui, costperop, nops, threadsafe=True):
75 '''try to determine whether the benefit of multiple processes can
75 '''try to determine whether the benefit of multiple processes can
76 outweigh the cost of starting them'''
76 outweigh the cost of starting them'''
77
77
78 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
78 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
79 return False
79 return False
80
80
81 linear = costperop * nops
81 linear = costperop * nops
82 workers = _numworkers(ui)
82 workers = _numworkers(ui)
83 benefit = linear - (_STARTUP_COST * workers + linear / workers)
83 benefit = linear - (_STARTUP_COST * workers + linear / workers)
84 return benefit >= 0.15
84 return benefit >= 0.15
85
85
86 def worker(ui, costperarg, func, staticargs, args, threadsafe=True):
86 def worker(ui, costperarg, func, staticargs, args, hasretval=False,
87 threadsafe=True):
87 '''run a function, possibly in parallel in multiple worker
88 '''run a function, possibly in parallel in multiple worker
88 processes.
89 processes.
89
90
90 returns a progress iterator
91 returns a progress iterator
91
92
92 costperarg - cost of a single task
93 costperarg - cost of a single task
93
94
94 func - function to run
95 func - function to run. It is expected to return a progress iterator.
95
96
96 staticargs - arguments to pass to every invocation of the function
97 staticargs - arguments to pass to every invocation of the function
97
98
98 args - arguments to split into chunks, to pass to individual
99 args - arguments to split into chunks, to pass to individual
99 workers
100 workers
100
101
102 hasretval - when True, func and the current function return an progress
103 iterator then a list (encoded as an iterator that yield many (False, ..)
104 then a (True, list)). The resulting list is in the natural order.
105
101 threadsafe - whether work items are thread safe and can be executed using
106 threadsafe - whether work items are thread safe and can be executed using
102 a thread-based worker. Should be disabled for CPU heavy tasks that don't
107 a thread-based worker. Should be disabled for CPU heavy tasks that don't
103 release the GIL.
108 release the GIL.
104 '''
109 '''
105 enabled = ui.configbool('worker', 'enabled')
110 enabled = ui.configbool('worker', 'enabled')
106 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
111 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
107 return _platformworker(ui, func, staticargs, args)
112 return _platformworker(ui, func, staticargs, args, hasretval)
108 return func(*staticargs + (args,))
113 return func(*staticargs + (args,))
109
114
110 def _posixworker(ui, func, staticargs, args):
115 def _posixworker(ui, func, staticargs, args, hasretval):
111 workers = _numworkers(ui)
116 workers = _numworkers(ui)
112 oldhandler = signal.getsignal(signal.SIGINT)
117 oldhandler = signal.getsignal(signal.SIGINT)
113 signal.signal(signal.SIGINT, signal.SIG_IGN)
118 signal.signal(signal.SIGINT, signal.SIG_IGN)
114 pids, problem = set(), [0]
119 pids, problem = set(), [0]
115 def killworkers():
120 def killworkers():
116 # unregister SIGCHLD handler as all children will be killed. This
121 # unregister SIGCHLD handler as all children will be killed. This
117 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
122 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
118 # could be updated while iterating, which would cause inconsistency.
123 # could be updated while iterating, which would cause inconsistency.
119 signal.signal(signal.SIGCHLD, oldchldhandler)
124 signal.signal(signal.SIGCHLD, oldchldhandler)
120 # if one worker bails, there's no good reason to wait for the rest
125 # if one worker bails, there's no good reason to wait for the rest
121 for p in pids:
126 for p in pids:
122 try:
127 try:
123 os.kill(p, signal.SIGTERM)
128 os.kill(p, signal.SIGTERM)
124 except OSError as err:
129 except OSError as err:
125 if err.errno != errno.ESRCH:
130 if err.errno != errno.ESRCH:
126 raise
131 raise
127 def waitforworkers(blocking=True):
132 def waitforworkers(blocking=True):
128 for pid in pids.copy():
133 for pid in pids.copy():
129 p = st = 0
134 p = st = 0
130 while True:
135 while True:
131 try:
136 try:
132 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
137 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
133 break
138 break
134 except OSError as e:
139 except OSError as e:
135 if e.errno == errno.EINTR:
140 if e.errno == errno.EINTR:
136 continue
141 continue
137 elif e.errno == errno.ECHILD:
142 elif e.errno == errno.ECHILD:
138 # child would already be reaped, but pids yet been
143 # child would already be reaped, but pids yet been
139 # updated (maybe interrupted just after waitpid)
144 # updated (maybe interrupted just after waitpid)
140 pids.discard(pid)
145 pids.discard(pid)
141 break
146 break
142 else:
147 else:
143 raise
148 raise
144 if not p:
149 if not p:
145 # skip subsequent steps, because child process should
150 # skip subsequent steps, because child process should
146 # be still running in this case
151 # be still running in this case
147 continue
152 continue
148 pids.discard(p)
153 pids.discard(p)
149 st = _exitstatus(st)
154 st = _exitstatus(st)
150 if st and not problem[0]:
155 if st and not problem[0]:
151 problem[0] = st
156 problem[0] = st
152 def sigchldhandler(signum, frame):
157 def sigchldhandler(signum, frame):
153 waitforworkers(blocking=False)
158 waitforworkers(blocking=False)
154 if problem[0]:
159 if problem[0]:
155 killworkers()
160 killworkers()
156 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
161 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
157 ui.flush()
162 ui.flush()
158 parentpid = os.getpid()
163 parentpid = os.getpid()
159 pipes = []
164 pipes = []
160 for pargs in partition(args, workers):
165 retvals = []
166 for i, pargs in enumerate(partition(args, workers)):
161 # Every worker gets its own pipe to send results on, so we don't have to
167 # Every worker gets its own pipe to send results on, so we don't have to
162 # implement atomic writes larger than PIPE_BUF. Each forked process has
168 # implement atomic writes larger than PIPE_BUF. Each forked process has
163 # its own pipe's descriptors in the local variables, and the parent
169 # its own pipe's descriptors in the local variables, and the parent
164 # process has the full list of pipe descriptors (and it doesn't really
170 # process has the full list of pipe descriptors (and it doesn't really
165 # care what order they're in).
171 # care what order they're in).
166 rfd, wfd = os.pipe()
172 rfd, wfd = os.pipe()
167 pipes.append((rfd, wfd))
173 pipes.append((rfd, wfd))
174 retvals.append(None)
168 # make sure we use os._exit in all worker code paths. otherwise the
175 # make sure we use os._exit in all worker code paths. otherwise the
169 # worker may do some clean-ups which could cause surprises like
176 # worker may do some clean-ups which could cause surprises like
170 # deadlock. see sshpeer.cleanup for example.
177 # deadlock. see sshpeer.cleanup for example.
171 # override error handling *before* fork. this is necessary because
178 # override error handling *before* fork. this is necessary because
172 # exception (signal) may arrive after fork, before "pid =" assignment
179 # exception (signal) may arrive after fork, before "pid =" assignment
173 # completes, and other exception handler (dispatch.py) can lead to
180 # completes, and other exception handler (dispatch.py) can lead to
174 # unexpected code path without os._exit.
181 # unexpected code path without os._exit.
175 ret = -1
182 ret = -1
176 try:
183 try:
177 pid = os.fork()
184 pid = os.fork()
178 if pid == 0:
185 if pid == 0:
179 signal.signal(signal.SIGINT, oldhandler)
186 signal.signal(signal.SIGINT, oldhandler)
180 signal.signal(signal.SIGCHLD, oldchldhandler)
187 signal.signal(signal.SIGCHLD, oldchldhandler)
181
188
182 def workerfunc():
189 def workerfunc():
183 for r, w in pipes[:-1]:
190 for r, w in pipes[:-1]:
184 os.close(r)
191 os.close(r)
185 os.close(w)
192 os.close(w)
186 os.close(rfd)
193 os.close(rfd)
187 for result in func(*(staticargs + (pargs,))):
194 for result in func(*(staticargs + (pargs,))):
188 os.write(wfd, util.pickle.dumps(result))
195 os.write(wfd, util.pickle.dumps((i, result)))
189 return 0
196 return 0
190
197
191 ret = scmutil.callcatch(ui, workerfunc)
198 ret = scmutil.callcatch(ui, workerfunc)
192 except: # parent re-raises, child never returns
199 except: # parent re-raises, child never returns
193 if os.getpid() == parentpid:
200 if os.getpid() == parentpid:
194 raise
201 raise
195 exctype = sys.exc_info()[0]
202 exctype = sys.exc_info()[0]
196 force = not issubclass(exctype, KeyboardInterrupt)
203 force = not issubclass(exctype, KeyboardInterrupt)
197 ui.traceback(force=force)
204 ui.traceback(force=force)
198 finally:
205 finally:
199 if os.getpid() != parentpid:
206 if os.getpid() != parentpid:
200 try:
207 try:
201 ui.flush()
208 ui.flush()
202 except: # never returns, no re-raises
209 except: # never returns, no re-raises
203 pass
210 pass
204 finally:
211 finally:
205 os._exit(ret & 255)
212 os._exit(ret & 255)
206 pids.add(pid)
213 pids.add(pid)
207 selector = selectors.DefaultSelector()
214 selector = selectors.DefaultSelector()
208 for rfd, wfd in pipes:
215 for rfd, wfd in pipes:
209 os.close(wfd)
216 os.close(wfd)
210 selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ)
217 selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ)
211 def cleanup():
218 def cleanup():
212 signal.signal(signal.SIGINT, oldhandler)
219 signal.signal(signal.SIGINT, oldhandler)
213 waitforworkers()
220 waitforworkers()
214 signal.signal(signal.SIGCHLD, oldchldhandler)
221 signal.signal(signal.SIGCHLD, oldchldhandler)
215 selector.close()
222 selector.close()
216 return problem[0]
223 return problem[0]
217 try:
224 try:
218 openpipes = len(pipes)
225 openpipes = len(pipes)
219 while openpipes > 0:
226 while openpipes > 0:
220 for key, events in selector.select():
227 for key, events in selector.select():
221 try:
228 try:
222 yield util.pickle.load(key.fileobj)
229 i, res = util.pickle.load(key.fileobj)
230 if hasretval and res[0]:
231 retvals[i] = res[1]
232 else:
233 yield res
223 except EOFError:
234 except EOFError:
224 selector.unregister(key.fileobj)
235 selector.unregister(key.fileobj)
225 key.fileobj.close()
236 key.fileobj.close()
226 openpipes -= 1
237 openpipes -= 1
227 except IOError as e:
238 except IOError as e:
228 if e.errno == errno.EINTR:
239 if e.errno == errno.EINTR:
229 continue
240 continue
230 raise
241 raise
231 except: # re-raises
242 except: # re-raises
232 killworkers()
243 killworkers()
233 cleanup()
244 cleanup()
234 raise
245 raise
235 status = cleanup()
246 status = cleanup()
236 if status:
247 if status:
237 if status < 0:
248 if status < 0:
238 os.kill(os.getpid(), -status)
249 os.kill(os.getpid(), -status)
239 sys.exit(status)
250 sys.exit(status)
251 if hasretval:
252 yield True, sum(retvals, [])
240
253
241 def _posixexitstatus(code):
254 def _posixexitstatus(code):
242 '''convert a posix exit status into the same form returned by
255 '''convert a posix exit status into the same form returned by
243 os.spawnv
256 os.spawnv
244
257
245 returns None if the process was stopped instead of exiting'''
258 returns None if the process was stopped instead of exiting'''
246 if os.WIFEXITED(code):
259 if os.WIFEXITED(code):
247 return os.WEXITSTATUS(code)
260 return os.WEXITSTATUS(code)
248 elif os.WIFSIGNALED(code):
261 elif os.WIFSIGNALED(code):
249 return -os.WTERMSIG(code)
262 return -os.WTERMSIG(code)
250
263
251 def _windowsworker(ui, func, staticargs, args):
264 def _windowsworker(ui, func, staticargs, args, hasretval):
252 class Worker(threading.Thread):
265 class Worker(threading.Thread):
253 def __init__(self, taskqueue, resultqueue, func, staticargs, *args,
266 def __init__(self, taskqueue, resultqueue, func, staticargs, *args,
254 **kwargs):
267 **kwargs):
255 threading.Thread.__init__(self, *args, **kwargs)
268 threading.Thread.__init__(self, *args, **kwargs)
256 self._taskqueue = taskqueue
269 self._taskqueue = taskqueue
257 self._resultqueue = resultqueue
270 self._resultqueue = resultqueue
258 self._func = func
271 self._func = func
259 self._staticargs = staticargs
272 self._staticargs = staticargs
260 self._interrupted = False
273 self._interrupted = False
261 self.daemon = True
274 self.daemon = True
262 self.exception = None
275 self.exception = None
263
276
264 def interrupt(self):
277 def interrupt(self):
265 self._interrupted = True
278 self._interrupted = True
266
279
267 def run(self):
280 def run(self):
268 try:
281 try:
269 while not self._taskqueue.empty():
282 while not self._taskqueue.empty():
270 try:
283 try:
271 args = self._taskqueue.get_nowait()
284 i, args = self._taskqueue.get_nowait()
272 for res in self._func(*self._staticargs + (args,)):
285 for res in self._func(*self._staticargs + (args,)):
273 self._resultqueue.put(res)
286 self._resultqueue.put((i, res))
274 # threading doesn't provide a native way to
287 # threading doesn't provide a native way to
275 # interrupt execution. handle it manually at every
288 # interrupt execution. handle it manually at every
276 # iteration.
289 # iteration.
277 if self._interrupted:
290 if self._interrupted:
278 return
291 return
279 except pycompat.queue.Empty:
292 except pycompat.queue.Empty:
280 break
293 break
281 except Exception as e:
294 except Exception as e:
282 # store the exception such that the main thread can resurface
295 # store the exception such that the main thread can resurface
283 # it as if the func was running without workers.
296 # it as if the func was running without workers.
284 self.exception = e
297 self.exception = e
285 raise
298 raise
286
299
287 threads = []
300 threads = []
288 def trykillworkers():
301 def trykillworkers():
289 # Allow up to 1 second to clean worker threads nicely
302 # Allow up to 1 second to clean worker threads nicely
290 cleanupend = time.time() + 1
303 cleanupend = time.time() + 1
291 for t in threads:
304 for t in threads:
292 t.interrupt()
305 t.interrupt()
293 for t in threads:
306 for t in threads:
294 remainingtime = cleanupend - time.time()
307 remainingtime = cleanupend - time.time()
295 t.join(remainingtime)
308 t.join(remainingtime)
296 if t.is_alive():
309 if t.is_alive():
297 # pass over the workers joining failure. it is more
310 # pass over the workers joining failure. it is more
298 # important to surface the inital exception than the
311 # important to surface the inital exception than the
299 # fact that one of workers may be processing a large
312 # fact that one of workers may be processing a large
300 # task and does not get to handle the interruption.
313 # task and does not get to handle the interruption.
301 ui.warn(_("failed to kill worker threads while "
314 ui.warn(_("failed to kill worker threads while "
302 "handling an exception\n"))
315 "handling an exception\n"))
303 return
316 return
304
317
305 workers = _numworkers(ui)
318 workers = _numworkers(ui)
306 resultqueue = pycompat.queue.Queue()
319 resultqueue = pycompat.queue.Queue()
307 taskqueue = pycompat.queue.Queue()
320 taskqueue = pycompat.queue.Queue()
321 retvals = []
308 # partition work to more pieces than workers to minimize the chance
322 # partition work to more pieces than workers to minimize the chance
309 # of uneven distribution of large tasks between the workers
323 # of uneven distribution of large tasks between the workers
310 for pargs in partition(args, workers * 20):
324 for pargs in enumerate(partition(args, workers * 20)):
325 retvals.append(None)
311 taskqueue.put(pargs)
326 taskqueue.put(pargs)
312 for _i in range(workers):
327 for _i in range(workers):
313 t = Worker(taskqueue, resultqueue, func, staticargs)
328 t = Worker(taskqueue, resultqueue, func, staticargs)
314 threads.append(t)
329 threads.append(t)
315 t.start()
330 t.start()
316 try:
331 try:
317 while len(threads) > 0:
332 while len(threads) > 0:
318 while not resultqueue.empty():
333 while not resultqueue.empty():
319 yield resultqueue.get()
334 (i, res) = resultqueue.get()
335 if hasretval and res[0]:
336 retvals[i] = res[1]
337 else:
338 yield res
320 threads[0].join(0.05)
339 threads[0].join(0.05)
321 finishedthreads = [_t for _t in threads if not _t.is_alive()]
340 finishedthreads = [_t for _t in threads if not _t.is_alive()]
322 for t in finishedthreads:
341 for t in finishedthreads:
323 if t.exception is not None:
342 if t.exception is not None:
324 raise t.exception
343 raise t.exception
325 threads.remove(t)
344 threads.remove(t)
326 except (Exception, KeyboardInterrupt): # re-raises
345 except (Exception, KeyboardInterrupt): # re-raises
327 trykillworkers()
346 trykillworkers()
328 raise
347 raise
329 while not resultqueue.empty():
348 while not resultqueue.empty():
330 yield resultqueue.get()
349 (i, res) = resultqueue.get()
350 if hasretval and res[0]:
351 retvals[i] = res[1]
352 else:
353 yield res
354 if hasretval:
355 yield True, sum(retvals, [])
331
356
332 if pycompat.iswindows:
357 if pycompat.iswindows:
333 _platformworker = _windowsworker
358 _platformworker = _windowsworker
334 else:
359 else:
335 _platformworker = _posixworker
360 _platformworker = _posixworker
336 _exitstatus = _posixexitstatus
361 _exitstatus = _posixexitstatus
337
362
338 def partition(lst, nslices):
363 def partition(lst, nslices):
339 '''partition a list into N slices of roughly equal size
364 '''partition a list into N slices of roughly equal size
340
365
341 The current strategy takes every Nth element from the input. If
366 The current strategy takes every Nth element from the input. If
342 we ever write workers that need to preserve grouping in input
367 we ever write workers that need to preserve grouping in input
343 we should consider allowing callers to specify a partition strategy.
368 we should consider allowing callers to specify a partition strategy.
344
369
345 mpm is not a fan of this partitioning strategy when files are involved.
370 mpm is not a fan of this partitioning strategy when files are involved.
346 In his words:
371 In his words:
347
372
348 Single-threaded Mercurial makes a point of creating and visiting
373 Single-threaded Mercurial makes a point of creating and visiting
349 files in a fixed order (alphabetical). When creating files in order,
374 files in a fixed order (alphabetical). When creating files in order,
350 a typical filesystem is likely to allocate them on nearby regions on
375 a typical filesystem is likely to allocate them on nearby regions on
351 disk. Thus, when revisiting in the same order, locality is maximized
376 disk. Thus, when revisiting in the same order, locality is maximized
352 and various forms of OS and disk-level caching and read-ahead get a
377 and various forms of OS and disk-level caching and read-ahead get a
353 chance to work.
378 chance to work.
354
379
355 This effect can be quite significant on spinning disks. I discovered it
380 This effect can be quite significant on spinning disks. I discovered it
356 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
381 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
357 Tarring a repo and copying it to another disk effectively randomized
382 Tarring a repo and copying it to another disk effectively randomized
358 the revlog ordering on disk by sorting the revlogs by hash and suddenly
383 the revlog ordering on disk by sorting the revlogs by hash and suddenly
359 performance of my kernel checkout benchmark dropped by ~10x because the
384 performance of my kernel checkout benchmark dropped by ~10x because the
360 "working set" of sectors visited no longer fit in the drive's cache and
385 "working set" of sectors visited no longer fit in the drive's cache and
361 the workload switched from streaming to random I/O.
386 the workload switched from streaming to random I/O.
362
387
363 What we should really be doing is have workers read filenames from a
388 What we should really be doing is have workers read filenames from a
364 ordered queue. This preserves locality and also keeps any worker from
389 ordered queue. This preserves locality and also keeps any worker from
365 getting more than one file out of balance.
390 getting more than one file out of balance.
366 '''
391 '''
367 for i in range(nslices):
392 for i in range(nslices):
368 yield lst[i::nslices]
393 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now