##// END OF EJS Templates
Merge pull request #1556 from minrk/safe_query...
Fernando Perez -
r6448:816e3fab merge
parent child Browse files
Show More
@@ -46,7 +46,7 b' We support a subset of mongodb operators:'
46 # the file COPYING, distributed as part of this software.
46 # the file COPYING, distributed as part of this software.
47 #-----------------------------------------------------------------------------
47 #-----------------------------------------------------------------------------
48
48
49
49 from copy import copy
50 from datetime import datetime
50 from datetime import datetime
51
51
52 from IPython.config.configurable import LoggingConfigurable
52 from IPython.config.configurable import LoggingConfigurable
@@ -120,7 +120,7 b' class DictDB(BaseDB):'
120
120
121 for rec in self._records.itervalues():
121 for rec in self._records.itervalues():
122 if self._match_one(rec, tests):
122 if self._match_one(rec, tests):
123 matches.append(rec)
123 matches.append(copy(rec))
124 return matches
124 return matches
125
125
126 def _extract_subdict(self, rec, keys):
126 def _extract_subdict(self, rec, keys):
@@ -139,9 +139,9 b' class DictDB(BaseDB):'
139
139
140 def get_record(self, msg_id):
140 def get_record(self, msg_id):
141 """Get a specific Task Record, by msg_id."""
141 """Get a specific Task Record, by msg_id."""
142 if not self._records.has_key(msg_id):
142 if not msg_id in self._records:
143 raise KeyError("No such msg_id %r"%(msg_id))
143 raise KeyError("No such msg_id %r"%(msg_id))
144 return self._records[msg_id]
144 return copy(self._records[msg_id])
145
145
146 def update_record(self, msg_id, rec):
146 def update_record(self, msg_id, rec):
147 """Update the data in an existing record."""
147 """Update the data in an existing record."""
@@ -25,7 +25,7 b' from zmq.eventloop import ioloop, zmqstream'
25 from IPython.config.configurable import LoggingConfigurable
25 from IPython.config.configurable import LoggingConfigurable
26 from IPython.utils.traitlets import Set, Instance, CFloat, Integer
26 from IPython.utils.traitlets import Set, Instance, CFloat, Integer
27
27
28 from IPython.parallel.util import asbytes
28 from IPython.parallel.util import asbytes, log_errors
29
29
30 class Heart(object):
30 class Heart(object):
31 """A basic heart object for responding to a HeartMonitor.
31 """A basic heart object for responding to a HeartMonitor.
@@ -148,6 +148,7 b' class HeartMonitor(LoggingConfigurable):'
148 self.hearts.remove(heart)
148 self.hearts.remove(heart)
149
149
150
150
151 @log_errors
151 def handle_pong(self, msg):
152 def handle_pong(self, msg):
152 "a heart just beat"
153 "a heart just beat"
153 current = asbytes(str(self.lifetime))
154 current = asbytes(str(self.lifetime))
@@ -454,6 +454,7 b' class Hub(SessionFactory):'
454 #-----------------------------------------------------------------------------
454 #-----------------------------------------------------------------------------
455
455
456
456
457 @util.log_errors
457 def dispatch_monitor_traffic(self, msg):
458 def dispatch_monitor_traffic(self, msg):
458 """all ME and Task queue messages come through here, as well as
459 """all ME and Task queue messages come through here, as well as
459 IOPub traffic."""
460 IOPub traffic."""
@@ -473,6 +474,7 b' class Hub(SessionFactory):'
473 self.log.error("Invalid monitor topic: %r", switch)
474 self.log.error("Invalid monitor topic: %r", switch)
474
475
475
476
477 @util.log_errors
476 def dispatch_query(self, msg):
478 def dispatch_query(self, msg):
477 """Route registration requests and queries from clients."""
479 """Route registration requests and queries from clients."""
478 try:
480 try:
@@ -43,7 +43,7 b' from IPython.config.application import Application'
43 from IPython.config.loader import Config
43 from IPython.config.loader import Config
44 from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes
44 from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes
45
45
46 from IPython.parallel import error
46 from IPython.parallel import error, util
47 from IPython.parallel.factory import SessionFactory
47 from IPython.parallel.factory import SessionFactory
48 from IPython.parallel.util import connect_logger, local_logger, asbytes
48 from IPython.parallel.util import connect_logger, local_logger, asbytes
49
49
@@ -240,6 +240,8 b' class TaskScheduler(SessionFactory):'
240 # [Un]Registration Handling
240 # [Un]Registration Handling
241 #-----------------------------------------------------------------------
241 #-----------------------------------------------------------------------
242
242
243
244 @util.log_errors
243 def dispatch_notification(self, msg):
245 def dispatch_notification(self, msg):
244 """dispatch register/unregister events."""
246 """dispatch register/unregister events."""
245 try:
247 try:
@@ -343,6 +345,9 b' class TaskScheduler(SessionFactory):'
343 #-----------------------------------------------------------------------
345 #-----------------------------------------------------------------------
344 # Job Submission
346 # Job Submission
345 #-----------------------------------------------------------------------
347 #-----------------------------------------------------------------------
348
349
350 @util.log_errors
346 def dispatch_submission(self, raw_msg):
351 def dispatch_submission(self, raw_msg):
347 """Dispatch job submission to appropriate handlers."""
352 """Dispatch job submission to appropriate handlers."""
348 # ensure targets up to date:
353 # ensure targets up to date:
@@ -560,6 +565,9 b' class TaskScheduler(SessionFactory):'
560 #-----------------------------------------------------------------------
565 #-----------------------------------------------------------------------
561 # Result Handling
566 # Result Handling
562 #-----------------------------------------------------------------------
567 #-----------------------------------------------------------------------
568
569
570 @util.log_errors
563 def dispatch_result(self, raw_msg):
571 def dispatch_result(self, raw_msg):
564 """dispatch method for result replies"""
572 """dispatch method for result replies"""
565 try:
573 try:
@@ -238,6 +238,18 b' class TestClient(ClusterTestCase):'
238 for rec in found:
238 for rec in found:
239 self.assertTrue('msg_id' in rec.keys())
239 self.assertTrue('msg_id' in rec.keys())
240
240
241 def test_db_query_get_result(self):
242 """pop in db_query shouldn't pop from result itself"""
243 self.client[:].apply_sync(lambda : 1)
244 found = self.client.db_query({'msg_id': {'$ne' : ''}})
245 rc2 = clientmod.Client(profile='iptest')
246 # If this bug is not fixed, this call will hang:
247 ar = rc2.get_result(self.client.history[-1])
248 ar.wait(2)
249 self.assertTrue(ar.ready())
250 ar.get()
251 rc2.close()
252
241 def test_db_query_in(self):
253 def test_db_query_in(self):
242 """test db query with '$in','$nin' operators"""
254 """test db query with '$in','$nin' operators"""
243 hist = self.client.hub_history()
255 hist = self.client.hub_history()
@@ -182,6 +182,36 b' class TestDictBackend(TestCase):'
182 query = {'msg_id' : {'$ne' : None}}
182 query = {'msg_id' : {'$ne' : None}}
183 recs = self.db.find_records(query)
183 recs = self.db.find_records(query)
184 self.assertTrue(len(recs) >= 10)
184 self.assertTrue(len(recs) >= 10)
185
186 def test_pop_safe_get(self):
187 """editing query results shouldn't affect record [get]"""
188 msg_id = self.db.get_history()[-1]
189 rec = self.db.get_record(msg_id)
190 rec.pop('buffers')
191 rec['garbage'] = 'hello'
192 rec2 = self.db.get_record(msg_id)
193 self.assertTrue('buffers' in rec2)
194 self.assertFalse('garbage' in rec2)
195
196 def test_pop_safe_find(self):
197 """editing query results shouldn't affect record [find]"""
198 msg_id = self.db.get_history()[-1]
199 rec = self.db.find_records({'msg_id' : msg_id})[0]
200 rec.pop('buffers')
201 rec['garbage'] = 'hello'
202 rec2 = self.db.find_records({'msg_id' : msg_id})[0]
203 self.assertTrue('buffers' in rec2)
204 self.assertFalse('garbage' in rec2)
205
206 def test_pop_safe_find_keys(self):
207 """editing query results shouldn't affect record [find+keys]"""
208 msg_id = self.db.get_history()[-1]
209 rec = self.db.find_records({'msg_id' : msg_id}, keys=['buffers'])[0]
210 rec.pop('buffers')
211 rec['garbage'] = 'hello'
212 rec2 = self.db.find_records({'msg_id' : msg_id})[0]
213 self.assertTrue('buffers' in rec2)
214 self.assertFalse('garbage' in rec2)
185
215
186
216
187 class TestSQLiteBackend(TestDictBackend):
217 class TestSQLiteBackend(TestDictBackend):
@@ -39,6 +39,8 b' except:'
39 import zmq
39 import zmq
40 from zmq.log import handlers
40 from zmq.log import handlers
41
41
42 from IPython.external.decorator import decorator
43
42 # IPython imports
44 # IPython imports
43 from IPython.config.application import Application
45 from IPython.config.application import Application
44 from IPython.utils import py3compat
46 from IPython.utils import py3compat
@@ -106,6 +108,19 b' class ReverseDict(dict):'
106 # Functions
108 # Functions
107 #-----------------------------------------------------------------------------
109 #-----------------------------------------------------------------------------
108
110
111 @decorator
112 def log_errors(f, self, *args, **kwargs):
113 """decorator to log unhandled exceptions raised in a method.
114
115 For use wrapping on_recv callbacks, so that exceptions
116 do not cause the stream to be closed.
117 """
118 try:
119 return f(self, *args, **kwargs)
120 except Exception:
121 self.log.error("Uncaught exception in %r" % f, exc_info=True)
122
123
109 def asbytes(s):
124 def asbytes(s):
110 """ensure that an object is ascii bytes"""
125 """ensure that an object is ascii bytes"""
111 if isinstance(s, unicode):
126 if isinstance(s, unicode):
General Comments 0
You need to be logged in to leave comments. Login now