##// END OF EJS Templates
speed up AsyncResult._wait_for_outputs (don't waste 0.01 seconds)...
Anton Akhmerov -
Show More
@@ -1,708 +1,708 b''
1 1 """AsyncResult objects for the client
2 2
3 3 Authors:
4 4
5 5 * MinRK
6 6 """
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2010-2011 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 from __future__ import print_function
19 19
20 20 import sys
21 21 import time
22 22 from datetime import datetime
23 23
24 24 from zmq import MessageTracker
25 25
26 26 from IPython.core.display import clear_output, display, display_pretty
27 27 from IPython.external.decorator import decorator
28 28 from IPython.parallel import error
29 29
30 30 #-----------------------------------------------------------------------------
31 31 # Functions
32 32 #-----------------------------------------------------------------------------
33 33
34 34 def _total_seconds(td):
35 35 """timedelta.total_seconds was added in 2.7"""
36 36 try:
37 37 # Python >= 2.7
38 38 return td.total_seconds()
39 39 except AttributeError:
40 40 # Python 2.6
41 41 return 1e-6 * (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)
42 42
43 43 def _raw_text(s):
44 44 display_pretty(s, raw=True)
45 45
46 46 #-----------------------------------------------------------------------------
47 47 # Classes
48 48 #-----------------------------------------------------------------------------
49 49
50 50 # global empty tracker that's always done:
51 51 finished_tracker = MessageTracker()
52 52
53 53 @decorator
54 54 def check_ready(f, self, *args, **kwargs):
55 55 """Call spin() to sync state prior to calling the method."""
56 56 self.wait(0)
57 57 if not self._ready:
58 58 raise error.TimeoutError("result not ready")
59 59 return f(self, *args, **kwargs)
60 60
61 61 class AsyncResult(object):
62 62 """Class for representing results of non-blocking calls.
63 63
64 64 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
65 65 """
66 66
67 67 msg_ids = None
68 68 _targets = None
69 69 _tracker = None
70 70 _single_result = False
71 71
72 72 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
73 73 if isinstance(msg_ids, basestring):
74 74 # always a list
75 75 msg_ids = [msg_ids]
76 76 if tracker is None:
77 77 # default to always done
78 78 tracker = finished_tracker
79 79 self._client = client
80 80 self.msg_ids = msg_ids
81 81 self._fname=fname
82 82 self._targets = targets
83 83 self._tracker = tracker
84 84 self._ready = False
85 85 self._outputs_ready = False
86 86 self._success = None
87 87 self._metadata = [ self._client.metadata.get(id) for id in self.msg_ids ]
88 88 if len(msg_ids) == 1:
89 89 self._single_result = not isinstance(targets, (list, tuple))
90 90 else:
91 91 self._single_result = False
92 92
93 93 def __repr__(self):
94 94 if self._ready:
95 95 return "<%s: finished>"%(self.__class__.__name__)
96 96 else:
97 97 return "<%s: %s>"%(self.__class__.__name__,self._fname)
98 98
99 99
100 100 def _reconstruct_result(self, res):
101 101 """Reconstruct our result from actual result list (always a list)
102 102
103 103 Override me in subclasses for turning a list of results
104 104 into the expected form.
105 105 """
106 106 if self._single_result:
107 107 return res[0]
108 108 else:
109 109 return res
110 110
111 111 def get(self, timeout=-1):
112 112 """Return the result when it arrives.
113 113
114 114 If `timeout` is not ``None`` and the result does not arrive within
115 115 `timeout` seconds then ``TimeoutError`` is raised. If the
116 116 remote call raised an exception then that exception will be reraised
117 117 by get() inside a `RemoteError`.
118 118 """
119 119 if not self.ready():
120 120 self.wait(timeout)
121 121
122 122 if self._ready:
123 123 if self._success:
124 124 return self._result
125 125 else:
126 126 raise self._exception
127 127 else:
128 128 raise error.TimeoutError("Result not ready.")
129 129
130 130 def _check_ready(self):
131 131 if not self.ready():
132 132 raise error.TimeoutError("Result not ready.")
133 133
134 134 def ready(self):
135 135 """Return whether the call has completed."""
136 136 if not self._ready:
137 137 self.wait(0)
138 138 elif not self._outputs_ready:
139 139 self._wait_for_outputs(0)
140 140
141 141 return self._ready
142 142
143 143 def wait(self, timeout=-1):
144 144 """Wait until the result is available or until `timeout` seconds pass.
145 145
146 146 This method always returns None.
147 147 """
148 148 if self._ready:
149 149 self._wait_for_outputs(timeout)
150 150 return
151 151 self._ready = self._client.wait(self.msg_ids, timeout)
152 152 if self._ready:
153 153 try:
154 154 results = map(self._client.results.get, self.msg_ids)
155 155 self._result = results
156 156 if self._single_result:
157 157 r = results[0]
158 158 if isinstance(r, Exception):
159 159 raise r
160 160 else:
161 161 results = error.collect_exceptions(results, self._fname)
162 162 self._result = self._reconstruct_result(results)
163 163 except Exception as e:
164 164 self._exception = e
165 165 self._success = False
166 166 else:
167 167 self._success = True
168 168 finally:
169 169 if timeout is None or timeout < 0:
170 170 # cutoff infinite wait at 10s
171 171 timeout = 10
172 172 self._wait_for_outputs(timeout)
173 173
174 174
175 175 def successful(self):
176 176 """Return whether the call completed without raising an exception.
177 177
178 178 Will raise ``AssertionError`` if the result is not ready.
179 179 """
180 180 assert self.ready()
181 181 return self._success
182 182
183 183 #----------------------------------------------------------------
184 184 # Extra methods not in mp.pool.AsyncResult
185 185 #----------------------------------------------------------------
186 186
187 187 def get_dict(self, timeout=-1):
188 188 """Get the results as a dict, keyed by engine_id.
189 189
190 190 timeout behavior is described in `get()`.
191 191 """
192 192
193 193 results = self.get(timeout)
194 194 engine_ids = [ md['engine_id'] for md in self._metadata ]
195 195 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
196 196 maxcount = bycount.count(bycount[-1])
197 197 if maxcount > 1:
198 198 raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
199 199 maxcount, bycount[-1]))
200 200
201 201 return dict(zip(engine_ids,results))
202 202
203 203 @property
204 204 def result(self):
205 205 """result property wrapper for `get(timeout=-1)`."""
206 206 return self.get()
207 207
208 208 # abbreviated alias:
209 209 r = result
210 210
211 211 @property
212 212 def metadata(self):
213 213 """property for accessing execution metadata."""
214 214 if self._single_result:
215 215 return self._metadata[0]
216 216 else:
217 217 return self._metadata
218 218
219 219 @property
220 220 def result_dict(self):
221 221 """result property as a dict."""
222 222 return self.get_dict()
223 223
224 224 def __dict__(self):
225 225 return self.get_dict(0)
226 226
227 227 def abort(self):
228 228 """abort my tasks."""
229 229 assert not self.ready(), "Can't abort, I am already done!"
230 230 return self._client.abort(self.msg_ids, targets=self._targets, block=True)
231 231
232 232 @property
233 233 def sent(self):
234 234 """check whether my messages have been sent."""
235 235 return self._tracker.done
236 236
237 237 def wait_for_send(self, timeout=-1):
238 238 """wait for pyzmq send to complete.
239 239
240 240 This is necessary when sending arrays that you intend to edit in-place.
241 241 `timeout` is in seconds, and will raise TimeoutError if it is reached
242 242 before the send completes.
243 243 """
244 244 return self._tracker.wait(timeout)
245 245
246 246 #-------------------------------------
247 247 # dict-access
248 248 #-------------------------------------
249 249
250 250 def __getitem__(self, key):
251 251 """getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
252 252 """
253 253 if isinstance(key, int):
254 254 self._check_ready()
255 255 return error.collect_exceptions([self._result[key]], self._fname)[0]
256 256 elif isinstance(key, slice):
257 257 self._check_ready()
258 258 return error.collect_exceptions(self._result[key], self._fname)
259 259 elif isinstance(key, basestring):
260 260 # metadata proxy *does not* require that results are done
261 261 self.wait(0)
262 262 values = [ md[key] for md in self._metadata ]
263 263 if self._single_result:
264 264 return values[0]
265 265 else:
266 266 return values
267 267 else:
268 268 raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
269 269
270 270 def __getattr__(self, key):
271 271 """getattr maps to getitem for convenient attr access to metadata."""
272 272 try:
273 273 return self.__getitem__(key)
274 274 except (error.TimeoutError, KeyError):
275 275 raise AttributeError("%r object has no attribute %r"%(
276 276 self.__class__.__name__, key))
277 277
278 278 # asynchronous iterator:
279 279 def __iter__(self):
280 280 if self._single_result:
281 281 raise TypeError("AsyncResults with a single result are not iterable.")
282 282 try:
283 283 rlist = self.get(0)
284 284 except error.TimeoutError:
285 285 # wait for each result individually
286 286 for msg_id in self.msg_ids:
287 287 ar = AsyncResult(self._client, msg_id, self._fname)
288 288 yield ar.get()
289 289 else:
290 290 # already done
291 291 for r in rlist:
292 292 yield r
293 293
294 294 def __len__(self):
295 295 return len(self.msg_ids)
296 296
297 297 #-------------------------------------
298 298 # Sugar methods and attributes
299 299 #-------------------------------------
300 300
301 301 def timedelta(self, start, end, start_key=min, end_key=max):
302 302 """compute the difference between two sets of timestamps
303 303
304 304 The default behavior is to use the earliest of the first
305 305 and the latest of the second list, but this can be changed
306 306 by passing a different
307 307
308 308 Parameters
309 309 ----------
310 310
311 311 start : one or more datetime objects (e.g. ar.submitted)
312 312 end : one or more datetime objects (e.g. ar.received)
313 313 start_key : callable
314 314 Function to call on `start` to extract the relevant
315 315 entry [defalt: min]
316 316 end_key : callable
317 317 Function to call on `end` to extract the relevant
318 318 entry [default: max]
319 319
320 320 Returns
321 321 -------
322 322
323 323 dt : float
324 324 The time elapsed (in seconds) between the two selected timestamps.
325 325 """
326 326 if not isinstance(start, datetime):
327 327 # handle single_result AsyncResults, where ar.stamp is single object,
328 328 # not a list
329 329 start = start_key(start)
330 330 if not isinstance(end, datetime):
331 331 # handle single_result AsyncResults, where ar.stamp is single object,
332 332 # not a list
333 333 end = end_key(end)
334 334 return _total_seconds(end - start)
335 335
336 336 @property
337 337 def progress(self):
338 338 """the number of tasks which have been completed at this point.
339 339
340 340 Fractional progress would be given by 1.0 * ar.progress / len(ar)
341 341 """
342 342 self.wait(0)
343 343 return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding))
344 344
345 345 @property
346 346 def elapsed(self):
347 347 """elapsed time since initial submission"""
348 348 if self.ready():
349 349 return self.wall_time
350 350
351 351 now = submitted = datetime.now()
352 352 for msg_id in self.msg_ids:
353 353 if msg_id in self._client.metadata:
354 354 stamp = self._client.metadata[msg_id]['submitted']
355 355 if stamp and stamp < submitted:
356 356 submitted = stamp
357 357 return _total_seconds(now-submitted)
358 358
359 359 @property
360 360 @check_ready
361 361 def serial_time(self):
362 362 """serial computation time of a parallel calculation
363 363
364 364 Computed as the sum of (completed-started) of each task
365 365 """
366 366 t = 0
367 367 for md in self._metadata:
368 368 t += _total_seconds(md['completed'] - md['started'])
369 369 return t
370 370
371 371 @property
372 372 @check_ready
373 373 def wall_time(self):
374 374 """actual computation time of a parallel calculation
375 375
376 376 Computed as the time between the latest `received` stamp
377 377 and the earliest `submitted`.
378 378
379 379 Only reliable if Client was spinning/waiting when the task finished, because
380 380 the `received` timestamp is created when a result is pulled off of the zmq queue,
381 381 which happens as a result of `client.spin()`.
382 382
383 383 For similar comparison of other timestamp pairs, check out AsyncResult.timedelta.
384 384
385 385 """
386 386 return self.timedelta(self.submitted, self.received)
387 387
388 388 def wait_interactive(self, interval=1., timeout=-1):
389 389 """interactive wait, printing progress at regular intervals"""
390 390 if timeout is None:
391 391 timeout = -1
392 392 N = len(self)
393 393 tic = time.time()
394 394 while not self.ready() and (timeout < 0 or time.time() - tic <= timeout):
395 395 self.wait(interval)
396 396 clear_output()
397 397 print("%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed), end="")
398 398 sys.stdout.flush()
399 399 print()
400 400 print("done")
401 401
402 402 def _republish_displaypub(self, content, eid):
403 403 """republish individual displaypub content dicts"""
404 404 try:
405 405 ip = get_ipython()
406 406 except NameError:
407 407 # displaypub is meaningless outside IPython
408 408 return
409 409 md = content['metadata'] or {}
410 410 md['engine'] = eid
411 411 ip.display_pub.publish(content['source'], content['data'], md)
412 412
413 413 def _display_stream(self, text, prefix='', file=None):
414 414 if not text:
415 415 # nothing to display
416 416 return
417 417 if file is None:
418 418 file = sys.stdout
419 419 end = '' if text.endswith('\n') else '\n'
420 420
421 421 multiline = text.count('\n') > int(text.endswith('\n'))
422 422 if prefix and multiline and not text.startswith('\n'):
423 423 prefix = prefix + '\n'
424 424 print("%s%s" % (prefix, text), file=file, end=end)
425 425
426 426
427 427 def _display_single_result(self):
428 428 self._display_stream(self.stdout)
429 429 self._display_stream(self.stderr, file=sys.stderr)
430 430
431 431 try:
432 432 get_ipython()
433 433 except NameError:
434 434 # displaypub is meaningless outside IPython
435 435 return
436 436
437 437 for output in self.outputs:
438 438 self._republish_displaypub(output, self.engine_id)
439 439
440 440 if self.pyout is not None:
441 441 display(self.get())
442 442
443 443 def _wait_for_outputs(self, timeout=-1):
444 444 """wait for the 'status=idle' message that indicates we have all outputs
445 445 """
446 446 if self._outputs_ready or not self._success:
447 447 # don't wait on errors
448 448 return
449 449
450 450 # cast None to -1 for infinite timeout
451 451 if timeout is None:
452 452 timeout = -1
453 453
454 454 tic = time.time()
455 while True:
455 456 self._client._flush_iopub(self._client._iopub_socket)
456 self._outputs_ready = all(md['outputs_ready'] for md in self._metadata)
457 while not self._outputs_ready:
458 time.sleep(0.01)
459 self._client._flush_iopub(self._client._iopub_socket)
460 self._outputs_ready = all(md['outputs_ready'] for md in self._metadata)
461 if timeout >= 0 and time.time() > tic + timeout:
457 self._outputs_ready = all(md['outputs_ready']
458 for md in self._metadata)
459 if self._outputs_ready or \
460 (timeout >= 0 and time.time() > tic + timeout):
462 461 break
462 time.sleep(0.01)
463 463
464 464 @check_ready
465 465 def display_outputs(self, groupby="type"):
466 466 """republish the outputs of the computation
467 467
468 468 Parameters
469 469 ----------
470 470
471 471 groupby : str [default: type]
472 472 if 'type':
473 473 Group outputs by type (show all stdout, then all stderr, etc.):
474 474
475 475 [stdout:1] foo
476 476 [stdout:2] foo
477 477 [stderr:1] bar
478 478 [stderr:2] bar
479 479 if 'engine':
480 480 Display outputs for each engine before moving on to the next:
481 481
482 482 [stdout:1] foo
483 483 [stderr:1] bar
484 484 [stdout:2] foo
485 485 [stderr:2] bar
486 486
487 487 if 'order':
488 488 Like 'type', but further collate individual displaypub
489 489 outputs. This is meant for cases of each command producing
490 490 several plots, and you would like to see all of the first
491 491 plots together, then all of the second plots, and so on.
492 492 """
493 493 if self._single_result:
494 494 self._display_single_result()
495 495 return
496 496
497 497 stdouts = self.stdout
498 498 stderrs = self.stderr
499 499 pyouts = self.pyout
500 500 output_lists = self.outputs
501 501 results = self.get()
502 502
503 503 targets = self.engine_id
504 504
505 505 if groupby == "engine":
506 506 for eid,stdout,stderr,outputs,r,pyout in zip(
507 507 targets, stdouts, stderrs, output_lists, results, pyouts
508 508 ):
509 509 self._display_stream(stdout, '[stdout:%i] ' % eid)
510 510 self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
511 511
512 512 try:
513 513 get_ipython()
514 514 except NameError:
515 515 # displaypub is meaningless outside IPython
516 516 return
517 517
518 518 if outputs or pyout is not None:
519 519 _raw_text('[output:%i]' % eid)
520 520
521 521 for output in outputs:
522 522 self._republish_displaypub(output, eid)
523 523
524 524 if pyout is not None:
525 525 display(r)
526 526
527 527 elif groupby in ('type', 'order'):
528 528 # republish stdout:
529 529 for eid,stdout in zip(targets, stdouts):
530 530 self._display_stream(stdout, '[stdout:%i] ' % eid)
531 531
532 532 # republish stderr:
533 533 for eid,stderr in zip(targets, stderrs):
534 534 self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr)
535 535
536 536 try:
537 537 get_ipython()
538 538 except NameError:
539 539 # displaypub is meaningless outside IPython
540 540 return
541 541
542 542 if groupby == 'order':
543 543 output_dict = dict((eid, outputs) for eid,outputs in zip(targets, output_lists))
544 544 N = max(len(outputs) for outputs in output_lists)
545 545 for i in range(N):
546 546 for eid in targets:
547 547 outputs = output_dict[eid]
548 548 if len(outputs) >= N:
549 549 _raw_text('[output:%i]' % eid)
550 550 self._republish_displaypub(outputs[i], eid)
551 551 else:
552 552 # republish displaypub output
553 553 for eid,outputs in zip(targets, output_lists):
554 554 if outputs:
555 555 _raw_text('[output:%i]' % eid)
556 556 for output in outputs:
557 557 self._republish_displaypub(output, eid)
558 558
559 559 # finally, add pyout:
560 560 for eid,r,pyout in zip(targets, results, pyouts):
561 561 if pyout is not None:
562 562 display(r)
563 563
564 564 else:
565 565 raise ValueError("groupby must be one of 'type', 'engine', 'collate', not %r" % groupby)
566 566
567 567
568 568
569 569
570 570 class AsyncMapResult(AsyncResult):
571 571 """Class for representing results of non-blocking gathers.
572 572
573 573 This will properly reconstruct the gather.
574 574
575 575 This class is iterable at any time, and will wait on results as they come.
576 576
577 577 If ordered=False, then the first results to arrive will come first, otherwise
578 578 results will be yielded in the order they were submitted.
579 579
580 580 """
581 581
582 582 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
583 583 AsyncResult.__init__(self, client, msg_ids, fname=fname)
584 584 self._mapObject = mapObject
585 585 self._single_result = False
586 586 self.ordered = ordered
587 587
588 588 def _reconstruct_result(self, res):
589 589 """Perform the gather on the actual results."""
590 590 return self._mapObject.joinPartitions(res)
591 591
592 592 # asynchronous iterator:
593 593 def __iter__(self):
594 594 it = self._ordered_iter if self.ordered else self._unordered_iter
595 595 for r in it():
596 596 yield r
597 597
598 598 # asynchronous ordered iterator:
599 599 def _ordered_iter(self):
600 600 """iterator for results *as they arrive*, preserving submission order."""
601 601 try:
602 602 rlist = self.get(0)
603 603 except error.TimeoutError:
604 604 # wait for each result individually
605 605 for msg_id in self.msg_ids:
606 606 ar = AsyncResult(self._client, msg_id, self._fname)
607 607 rlist = ar.get()
608 608 try:
609 609 for r in rlist:
610 610 yield r
611 611 except TypeError:
612 612 # flattened, not a list
613 613 # this could get broken by flattened data that returns iterables
614 614 # but most calls to map do not expose the `flatten` argument
615 615 yield rlist
616 616 else:
617 617 # already done
618 618 for r in rlist:
619 619 yield r
620 620
621 621 # asynchronous unordered iterator:
622 622 def _unordered_iter(self):
623 623 """iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
624 624 try:
625 625 rlist = self.get(0)
626 626 except error.TimeoutError:
627 627 pending = set(self.msg_ids)
628 628 while pending:
629 629 try:
630 630 self._client.wait(pending, 1e-3)
631 631 except error.TimeoutError:
632 632 # ignore timeout error, because that only means
633 633 # *some* jobs are outstanding
634 634 pass
635 635 # update ready set with those no longer outstanding:
636 636 ready = pending.difference(self._client.outstanding)
637 637 # update pending to exclude those that are finished
638 638 pending = pending.difference(ready)
639 639 while ready:
640 640 msg_id = ready.pop()
641 641 ar = AsyncResult(self._client, msg_id, self._fname)
642 642 rlist = ar.get()
643 643 try:
644 644 for r in rlist:
645 645 yield r
646 646 except TypeError:
647 647 # flattened, not a list
648 648 # this could get broken by flattened data that returns iterables
649 649 # but most calls to map do not expose the `flatten` argument
650 650 yield rlist
651 651 else:
652 652 # already done
653 653 for r in rlist:
654 654 yield r
655 655
656 656
657 657 class AsyncHubResult(AsyncResult):
658 658 """Class to wrap pending results that must be requested from the Hub.
659 659
660 660 Note that waiting/polling on these objects requires polling the Hubover the network,
661 661 so use `AsyncHubResult.wait()` sparingly.
662 662 """
663 663
664 664 def _wait_for_outputs(self, timeout=-1):
665 665 """no-op, because HubResults are never incomplete"""
666 666 self._outputs_ready = True
667 667
668 668 def wait(self, timeout=-1):
669 669 """wait for result to complete."""
670 670 start = time.time()
671 671 if self._ready:
672 672 return
673 673 local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
674 674 local_ready = self._client.wait(local_ids, timeout)
675 675 if local_ready:
676 676 remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
677 677 if not remote_ids:
678 678 self._ready = True
679 679 else:
680 680 rdict = self._client.result_status(remote_ids, status_only=False)
681 681 pending = rdict['pending']
682 682 while pending and (timeout < 0 or time.time() < start+timeout):
683 683 rdict = self._client.result_status(remote_ids, status_only=False)
684 684 pending = rdict['pending']
685 685 if pending:
686 686 time.sleep(0.1)
687 687 if not pending:
688 688 self._ready = True
689 689 if self._ready:
690 690 try:
691 691 results = map(self._client.results.get, self.msg_ids)
692 692 self._result = results
693 693 if self._single_result:
694 694 r = results[0]
695 695 if isinstance(r, Exception):
696 696 raise r
697 697 else:
698 698 results = error.collect_exceptions(results, self._fname)
699 699 self._result = self._reconstruct_result(results)
700 700 except Exception as e:
701 701 self._exception = e
702 702 self._success = False
703 703 else:
704 704 self._success = True
705 705 finally:
706 706 self._metadata = map(self._client.metadata.get, self.msg_ids)
707 707
708 708 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult']
General Comments 0
You need to be logged in to leave comments. Login now