Show More
@@ -46,7 +46,7 b' We support a subset of mongodb operators:' | |||||
46 | # the file COPYING, distributed as part of this software. |
|
46 | # the file COPYING, distributed as part of this software. | |
47 | #----------------------------------------------------------------------------- |
|
47 | #----------------------------------------------------------------------------- | |
48 |
|
48 | |||
49 |
|
49 | from copy import copy | ||
50 | from datetime import datetime |
|
50 | from datetime import datetime | |
51 |
|
51 | |||
52 | from IPython.config.configurable import LoggingConfigurable |
|
52 | from IPython.config.configurable import LoggingConfigurable | |
@@ -120,7 +120,7 b' class DictDB(BaseDB):' | |||||
120 |
|
120 | |||
121 | for rec in self._records.itervalues(): |
|
121 | for rec in self._records.itervalues(): | |
122 | if self._match_one(rec, tests): |
|
122 | if self._match_one(rec, tests): | |
123 | matches.append(rec) |
|
123 | matches.append(copy(rec)) | |
124 | return matches |
|
124 | return matches | |
125 |
|
125 | |||
126 | def _extract_subdict(self, rec, keys): |
|
126 | def _extract_subdict(self, rec, keys): | |
@@ -139,9 +139,9 b' class DictDB(BaseDB):' | |||||
139 |
|
139 | |||
140 | def get_record(self, msg_id): |
|
140 | def get_record(self, msg_id): | |
141 | """Get a specific Task Record, by msg_id.""" |
|
141 | """Get a specific Task Record, by msg_id.""" | |
142 |
if not self._records |
|
142 | if not msg_id in self._records: | |
143 | raise KeyError("No such msg_id %r"%(msg_id)) |
|
143 | raise KeyError("No such msg_id %r"%(msg_id)) | |
144 | return self._records[msg_id] |
|
144 | return copy(self._records[msg_id]) | |
145 |
|
145 | |||
146 | def update_record(self, msg_id, rec): |
|
146 | def update_record(self, msg_id, rec): | |
147 | """Update the data in an existing record.""" |
|
147 | """Update the data in an existing record.""" |
@@ -25,7 +25,7 b' from zmq.eventloop import ioloop, zmqstream' | |||||
25 | from IPython.config.configurable import LoggingConfigurable |
|
25 | from IPython.config.configurable import LoggingConfigurable | |
26 | from IPython.utils.traitlets import Set, Instance, CFloat, Integer |
|
26 | from IPython.utils.traitlets import Set, Instance, CFloat, Integer | |
27 |
|
27 | |||
28 | from IPython.parallel.util import asbytes |
|
28 | from IPython.parallel.util import asbytes, log_errors | |
29 |
|
29 | |||
30 | class Heart(object): |
|
30 | class Heart(object): | |
31 | """A basic heart object for responding to a HeartMonitor. |
|
31 | """A basic heart object for responding to a HeartMonitor. | |
@@ -148,6 +148,7 b' class HeartMonitor(LoggingConfigurable):' | |||||
148 | self.hearts.remove(heart) |
|
148 | self.hearts.remove(heart) | |
149 |
|
149 | |||
150 |
|
150 | |||
|
151 | @log_errors | |||
151 | def handle_pong(self, msg): |
|
152 | def handle_pong(self, msg): | |
152 | "a heart just beat" |
|
153 | "a heart just beat" | |
153 | current = asbytes(str(self.lifetime)) |
|
154 | current = asbytes(str(self.lifetime)) |
@@ -454,6 +454,7 b' class Hub(SessionFactory):' | |||||
454 | #----------------------------------------------------------------------------- |
|
454 | #----------------------------------------------------------------------------- | |
455 |
|
455 | |||
456 |
|
456 | |||
|
457 | @util.log_errors | |||
457 | def dispatch_monitor_traffic(self, msg): |
|
458 | def dispatch_monitor_traffic(self, msg): | |
458 | """all ME and Task queue messages come through here, as well as |
|
459 | """all ME and Task queue messages come through here, as well as | |
459 | IOPub traffic.""" |
|
460 | IOPub traffic.""" | |
@@ -473,6 +474,7 b' class Hub(SessionFactory):' | |||||
473 | self.log.error("Invalid monitor topic: %r", switch) |
|
474 | self.log.error("Invalid monitor topic: %r", switch) | |
474 |
|
475 | |||
475 |
|
476 | |||
|
477 | @util.log_errors | |||
476 | def dispatch_query(self, msg): |
|
478 | def dispatch_query(self, msg): | |
477 | """Route registration requests and queries from clients.""" |
|
479 | """Route registration requests and queries from clients.""" | |
478 | try: |
|
480 | try: |
@@ -43,7 +43,7 b' from IPython.config.application import Application' | |||||
43 | from IPython.config.loader import Config |
|
43 | from IPython.config.loader import Config | |
44 | from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes |
|
44 | from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes | |
45 |
|
45 | |||
46 | from IPython.parallel import error |
|
46 | from IPython.parallel import error, util | |
47 | from IPython.parallel.factory import SessionFactory |
|
47 | from IPython.parallel.factory import SessionFactory | |
48 | from IPython.parallel.util import connect_logger, local_logger, asbytes |
|
48 | from IPython.parallel.util import connect_logger, local_logger, asbytes | |
49 |
|
49 | |||
@@ -240,6 +240,8 b' class TaskScheduler(SessionFactory):' | |||||
240 | # [Un]Registration Handling |
|
240 | # [Un]Registration Handling | |
241 | #----------------------------------------------------------------------- |
|
241 | #----------------------------------------------------------------------- | |
242 |
|
242 | |||
|
243 | ||||
|
244 | @util.log_errors | |||
243 | def dispatch_notification(self, msg): |
|
245 | def dispatch_notification(self, msg): | |
244 | """dispatch register/unregister events.""" |
|
246 | """dispatch register/unregister events.""" | |
245 | try: |
|
247 | try: | |
@@ -343,6 +345,9 b' class TaskScheduler(SessionFactory):' | |||||
343 | #----------------------------------------------------------------------- |
|
345 | #----------------------------------------------------------------------- | |
344 | # Job Submission |
|
346 | # Job Submission | |
345 | #----------------------------------------------------------------------- |
|
347 | #----------------------------------------------------------------------- | |
|
348 | ||||
|
349 | ||||
|
350 | @util.log_errors | |||
346 | def dispatch_submission(self, raw_msg): |
|
351 | def dispatch_submission(self, raw_msg): | |
347 | """Dispatch job submission to appropriate handlers.""" |
|
352 | """Dispatch job submission to appropriate handlers.""" | |
348 | # ensure targets up to date: |
|
353 | # ensure targets up to date: | |
@@ -560,6 +565,9 b' class TaskScheduler(SessionFactory):' | |||||
560 | #----------------------------------------------------------------------- |
|
565 | #----------------------------------------------------------------------- | |
561 | # Result Handling |
|
566 | # Result Handling | |
562 | #----------------------------------------------------------------------- |
|
567 | #----------------------------------------------------------------------- | |
|
568 | ||||
|
569 | ||||
|
570 | @util.log_errors | |||
563 | def dispatch_result(self, raw_msg): |
|
571 | def dispatch_result(self, raw_msg): | |
564 | """dispatch method for result replies""" |
|
572 | """dispatch method for result replies""" | |
565 | try: |
|
573 | try: |
@@ -238,6 +238,18 b' class TestClient(ClusterTestCase):' | |||||
238 | for rec in found: |
|
238 | for rec in found: | |
239 | self.assertTrue('msg_id' in rec.keys()) |
|
239 | self.assertTrue('msg_id' in rec.keys()) | |
240 |
|
240 | |||
|
241 | def test_db_query_get_result(self): | |||
|
242 | """pop in db_query shouldn't pop from result itself""" | |||
|
243 | self.client[:].apply_sync(lambda : 1) | |||
|
244 | found = self.client.db_query({'msg_id': {'$ne' : ''}}) | |||
|
245 | rc2 = clientmod.Client(profile='iptest') | |||
|
246 | # If this bug is not fixed, this call will hang: | |||
|
247 | ar = rc2.get_result(self.client.history[-1]) | |||
|
248 | ar.wait(2) | |||
|
249 | self.assertTrue(ar.ready()) | |||
|
250 | ar.get() | |||
|
251 | rc2.close() | |||
|
252 | ||||
241 | def test_db_query_in(self): |
|
253 | def test_db_query_in(self): | |
242 | """test db query with '$in','$nin' operators""" |
|
254 | """test db query with '$in','$nin' operators""" | |
243 | hist = self.client.hub_history() |
|
255 | hist = self.client.hub_history() |
@@ -182,6 +182,36 b' class TestDictBackend(TestCase):' | |||||
182 | query = {'msg_id' : {'$ne' : None}} |
|
182 | query = {'msg_id' : {'$ne' : None}} | |
183 | recs = self.db.find_records(query) |
|
183 | recs = self.db.find_records(query) | |
184 | self.assertTrue(len(recs) >= 10) |
|
184 | self.assertTrue(len(recs) >= 10) | |
|
185 | ||||
|
186 | def test_pop_safe_get(self): | |||
|
187 | """editing query results shouldn't affect record [get]""" | |||
|
188 | msg_id = self.db.get_history()[-1] | |||
|
189 | rec = self.db.get_record(msg_id) | |||
|
190 | rec.pop('buffers') | |||
|
191 | rec['garbage'] = 'hello' | |||
|
192 | rec2 = self.db.get_record(msg_id) | |||
|
193 | self.assertTrue('buffers' in rec2) | |||
|
194 | self.assertFalse('garbage' in rec2) | |||
|
195 | ||||
|
196 | def test_pop_safe_find(self): | |||
|
197 | """editing query results shouldn't affect record [find]""" | |||
|
198 | msg_id = self.db.get_history()[-1] | |||
|
199 | rec = self.db.find_records({'msg_id' : msg_id})[0] | |||
|
200 | rec.pop('buffers') | |||
|
201 | rec['garbage'] = 'hello' | |||
|
202 | rec2 = self.db.find_records({'msg_id' : msg_id})[0] | |||
|
203 | self.assertTrue('buffers' in rec2) | |||
|
204 | self.assertFalse('garbage' in rec2) | |||
|
205 | ||||
|
206 | def test_pop_safe_find_keys(self): | |||
|
207 | """editing query results shouldn't affect record [find+keys]""" | |||
|
208 | msg_id = self.db.get_history()[-1] | |||
|
209 | rec = self.db.find_records({'msg_id' : msg_id}, keys=['buffers'])[0] | |||
|
210 | rec.pop('buffers') | |||
|
211 | rec['garbage'] = 'hello' | |||
|
212 | rec2 = self.db.find_records({'msg_id' : msg_id})[0] | |||
|
213 | self.assertTrue('buffers' in rec2) | |||
|
214 | self.assertFalse('garbage' in rec2) | |||
185 |
|
215 | |||
186 |
|
216 | |||
187 | class TestSQLiteBackend(TestDictBackend): |
|
217 | class TestSQLiteBackend(TestDictBackend): |
@@ -39,6 +39,8 b' except:' | |||||
39 | import zmq |
|
39 | import zmq | |
40 | from zmq.log import handlers |
|
40 | from zmq.log import handlers | |
41 |
|
41 | |||
|
42 | from IPython.external.decorator import decorator | |||
|
43 | ||||
42 | # IPython imports |
|
44 | # IPython imports | |
43 | from IPython.config.application import Application |
|
45 | from IPython.config.application import Application | |
44 | from IPython.utils import py3compat |
|
46 | from IPython.utils import py3compat | |
@@ -106,6 +108,19 b' class ReverseDict(dict):' | |||||
106 | # Functions |
|
108 | # Functions | |
107 | #----------------------------------------------------------------------------- |
|
109 | #----------------------------------------------------------------------------- | |
108 |
|
110 | |||
|
111 | @decorator | |||
|
112 | def log_errors(f, self, *args, **kwargs): | |||
|
113 | """decorator to log unhandled exceptions raised in a method. | |||
|
114 | ||||
|
115 | For use wrapping on_recv callbacks, so that exceptions | |||
|
116 | do not cause the stream to be closed. | |||
|
117 | """ | |||
|
118 | try: | |||
|
119 | return f(self, *args, **kwargs) | |||
|
120 | except Exception: | |||
|
121 | self.log.error("Uncaught exception in %r" % f, exc_info=True) | |||
|
122 | ||||
|
123 | ||||
109 | def asbytes(s): |
|
124 | def asbytes(s): | |
110 | """ensure that an object is ascii bytes""" |
|
125 | """ensure that an object is ascii bytes""" | |
111 | if isinstance(s, unicode): |
|
126 | if isinstance(s, unicode): |
General Comments 0
You need to be logged in to leave comments.
Login now