Show More
@@ -30,6 +30,7 b' from zmq.devices import ProcessMonitoredQueue' | |||
|
30 | 30 | from zmq.log.handlers import PUBHandler |
|
31 | 31 | from zmq.utils import jsonapi as json |
|
32 | 32 | |
|
33 | from IPython.config.application import boolean_flag | |
|
33 | 34 | from IPython.core.newapplication import ProfileDir |
|
34 | 35 | |
|
35 | 36 | from IPython.parallel.apps.baseapp import ( |
@@ -98,7 +99,10 b' flags.update({' | |||
|
98 | 99 | 'reuse existing json connection files') |
|
99 | 100 | }) |
|
100 | 101 | |
|
101 | flags.update() | |
|
102 | flags.update(boolean_flag('secure', 'IPControllerApp.secure', | |
|
103 | "Use HMAC digests for authentication of messages.", | |
|
104 | "Don't authenticate messages." | |
|
105 | )) | |
|
102 | 106 | |
|
103 | 107 | class IPControllerApp(BaseParallelApplication): |
|
104 | 108 | |
@@ -109,18 +113,18 b' class IPControllerApp(BaseParallelApplication):' | |||
|
109 | 113 | |
|
110 | 114 | # change default to True |
|
111 | 115 | auto_create = Bool(True, config=True, |
|
112 | help="""Whether to create profile dir if it doesn't exist""") | |
|
116 | help="""Whether to create profile dir if it doesn't exist.""") | |
|
113 | 117 | |
|
114 | 118 | reuse_files = Bool(False, config=True, |
|
115 |
help='Whether to reuse existing json connection files |
|
|
119 | help='Whether to reuse existing json connection files.' | |
|
116 | 120 | ) |
|
117 | 121 | secure = Bool(True, config=True, |
|
118 |
help='Whether to use |
|
|
122 | help='Whether to use HMAC digests for extra message authentication.' | |
|
119 | 123 | ) |
|
120 | 124 | ssh_server = Unicode(u'', config=True, |
|
121 | 125 | help="""ssh url for clients to use when connecting to the Controller |
|
122 | 126 | processes. It should be of the form: [user@]server[:port]. The |
|
123 |
Controller |
|
|
127 | Controller's listening addresses must be accessible from the ssh server""", | |
|
124 | 128 | ) |
|
125 | 129 | location = Unicode(u'', config=True, |
|
126 | 130 | help="""The external IP or domain name of the Controller, used for disambiguating |
@@ -468,20 +468,23 b' class Hub(LoggingFactory):' | |||
|
468 | 468 | |
|
469 | 469 | def dispatch_query(self, msg): |
|
470 | 470 | """Route registration requests and queries from clients.""" |
|
471 | idents, msg = self.session.feed_identities(msg) | |
|
471 | try: | |
|
472 | idents, msg = self.session.feed_identities(msg) | |
|
473 | except ValueError: | |
|
474 | idents = [] | |
|
472 | 475 | if not idents: |
|
473 | 476 | self.log.error("Bad Query Message: %r"%msg) |
|
474 | 477 | return |
|
475 | 478 | client_id = idents[0] |
|
476 | 479 | try: |
|
477 | 480 | msg = self.session.unpack_message(msg, content=True) |
|
478 | except: | |
|
481 | except Exception: | |
|
479 | 482 | content = error.wrap_exception() |
|
480 | 483 | self.log.error("Bad Query Message: %r"%msg, exc_info=True) |
|
481 | 484 | self.session.send(self.query, "hub_error", ident=client_id, |
|
482 | 485 | content=content) |
|
483 | 486 | return |
|
484 | ||
|
487 | print( idents, msg) | |
|
485 | 488 | # print client_id, header, parent, content |
|
486 | 489 | #switch on message type: |
|
487 | 490 | msg_type = msg['msg_type'] |
@@ -1123,9 +1126,10 b' class Hub(LoggingFactory):' | |||
|
1123 | 1126 | return finish(error.wrap_exception()) |
|
1124 | 1127 | |
|
1125 | 1128 | # clear the existing records |
|
1129 | now = datetime.now() | |
|
1126 | 1130 | rec = empty_record() |
|
1127 | 1131 | map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted']) |
|
1128 |
rec['resubmitted'] = |
|
|
1132 | rec['resubmitted'] = now | |
|
1129 | 1133 | rec['queue'] = 'task' |
|
1130 | 1134 | rec['client_uuid'] = client_id[0] |
|
1131 | 1135 | try: |
@@ -1137,8 +1141,11 b' class Hub(LoggingFactory):' | |||
|
1137 | 1141 | reply = error.wrap_exception() |
|
1138 | 1142 | else: |
|
1139 | 1143 | # send the messages |
|
1144 | now_s = now.strftime(util.ISO8601) | |
|
1140 | 1145 | for rec in records: |
|
1141 | 1146 | header = rec['header'] |
|
1147 | # include resubmitted in header to prevent digest collision | |
|
1148 | header['resubmitted'] = now_s | |
|
1142 | 1149 | msg = self.session.msg(header['msg_type']) |
|
1143 | 1150 | msg['content'] = rec['content'] |
|
1144 | 1151 | msg['header'] = header |
@@ -196,17 +196,27 b' class TaskScheduler(SessionFactory):' | |||
|
196 | 196 | |
|
197 | 197 | def dispatch_notification(self, msg): |
|
198 | 198 | """dispatch register/unregister events.""" |
|
199 | idents,msg = self.session.feed_identities(msg) | |
|
200 |
msg = self.session. |
|
|
199 | try: | |
|
200 | idents,msg = self.session.feed_identities(msg) | |
|
201 | except ValueError: | |
|
202 | self.log.warn("task::Invalid Message: %r"%msg) | |
|
203 | return | |
|
204 | try: | |
|
205 | msg = self.session.unpack_message(msg) | |
|
206 | except ValueError: | |
|
207 | self.log.warn("task::Unauthorized message from: %r"%idents) | |
|
208 | return | |
|
209 | ||
|
201 | 210 | msg_type = msg['msg_type'] |
|
211 | ||
|
202 | 212 | handler = self._notification_handlers.get(msg_type, None) |
|
203 | 213 | if handler is None: |
|
204 |
|
|
|
214 | self.log.error("Unhandled message type: %r"%msg_type) | |
|
205 | 215 | else: |
|
206 | 216 | try: |
|
207 | 217 | handler(str(msg['content']['queue'])) |
|
208 | 218 | except KeyError: |
|
209 |
self.log.error("task::Invalid notification msg: % |
|
|
219 | self.log.error("task::Invalid notification msg: %r"%msg) | |
|
210 | 220 | |
|
211 | 221 | @logged |
|
212 | 222 | def _register_engine(self, uid): |
@@ -262,8 +272,7 b' class TaskScheduler(SessionFactory):' | |||
|
262 | 272 | |
|
263 | 273 | raw_msg = lost[msg_id][0] |
|
264 | 274 | idents,msg = self.session.feed_identities(raw_msg, copy=False) |
|
265 |
|
|
|
266 | parent = msg['header'] | |
|
275 | parent = self.session.unpack(msg[1].bytes) | |
|
267 | 276 | idents = [engine, idents[0]] |
|
268 | 277 | |
|
269 | 278 | # build fake error reply |
@@ -296,7 +305,7 b' class TaskScheduler(SessionFactory):' | |||
|
296 | 305 | self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True) |
|
297 | 306 | return |
|
298 | 307 | |
|
299 | ||
|
308 | ||
|
300 | 309 | # send to monitor |
|
301 | 310 | self.mon_stream.send_multipart(['intask']+raw_msg, copy=False) |
|
302 | 311 | |
@@ -377,8 +386,7 b' class TaskScheduler(SessionFactory):' | |||
|
377 | 386 | |
|
378 | 387 | # FIXME: unpacking a message I've already unpacked, but didn't save: |
|
379 | 388 | idents,msg = self.session.feed_identities(raw_msg, copy=False) |
|
380 |
|
|
|
381 | header = msg['header'] | |
|
389 | header = self.session.unpack(msg[1].bytes) | |
|
382 | 390 | |
|
383 | 391 | try: |
|
384 | 392 | raise why() |
@@ -8,7 +8,11 b'' | |||
|
8 | 8 | # the file COPYING, distributed as part of this software. |
|
9 | 9 | #----------------------------------------------------------------------------- |
|
10 | 10 | |
|
11 | #----------------------------------------------------------------------------- | |
|
12 | # Imports | |
|
13 | #----------------------------------------------------------------------------- | |
|
11 | 14 | |
|
15 | import hmac | |
|
12 | 16 | import os |
|
13 | 17 | import pprint |
|
14 | 18 | import uuid |
@@ -27,10 +31,13 b' from zmq.eventloop.zmqstream import ZMQStream' | |||
|
27 | 31 | |
|
28 | 32 | from IPython.config.configurable import Configurable |
|
29 | 33 | from IPython.utils.importstring import import_item |
|
30 | from IPython.utils.traitlets import CStr, Unicode, Bool, Any | |
|
34 | from IPython.utils.traitlets import CStr, Unicode, Bool, Any, Instance, Set | |
|
31 | 35 | |
|
32 | 36 | from .util import ISO8601 |
|
33 | 37 | |
|
38 | #----------------------------------------------------------------------------- | |
|
39 | # utility functions | |
|
40 | #----------------------------------------------------------------------------- | |
|
34 | 41 | |
|
35 | 42 | def squash_unicode(obj): |
|
36 | 43 | """coerce unicode back to bytestrings.""" |
@@ -52,6 +59,10 b' def _date_default(obj):' | |||
|
52 | 59 | else: |
|
53 | 60 | raise TypeError("%r is not JSON serializable"%obj) |
|
54 | 61 | |
|
62 | #----------------------------------------------------------------------------- | |
|
63 | # globals and defaults | |
|
64 | #----------------------------------------------------------------------------- | |
|
65 | ||
|
55 | 66 | _default_key = 'on_unknown' if jsonapi.jsonmod.__name__ == 'jsonlib' else 'default' |
|
56 | 67 | json_packer = lambda obj: jsonapi.dumps(obj, **{_default_key:_date_default}) |
|
57 | 68 | json_unpacker = lambda s: squash_unicode(jsonapi.loads(s)) |
@@ -65,6 +76,10 b' default_unpacker = json_unpacker' | |||
|
65 | 76 | |
|
66 | 77 | DELIM="<IDS|MSG>" |
|
67 | 78 | |
|
79 | #----------------------------------------------------------------------------- | |
|
80 | # Classes | |
|
81 | #----------------------------------------------------------------------------- | |
|
82 | ||
|
68 | 83 | class Message(object): |
|
69 | 84 | """A simple message object that maps dict keys to attributes. |
|
70 | 85 | |
@@ -152,11 +167,21 b' class StreamSession(Configurable):' | |||
|
152 | 167 | help="""The UUID identifying this session.""") |
|
153 | 168 | def _session_default(self): |
|
154 | 169 | return bytes(uuid.uuid4()) |
|
155 | username = Unicode(os.environ.get('USER','username'),config=True, | |
|
170 | username = Unicode(os.environ.get('USER','username'), config=True, | |
|
156 | 171 | help="""Username for the Session. Default is your system username.""") |
|
172 | ||
|
173 | # message signature related traits: | |
|
157 | 174 | key = CStr('', config=True, |
|
158 | 175 | help="""execution key, for extra authentication.""") |
|
159 | ||
|
176 | def _key_changed(self, name, old, new): | |
|
177 | if new: | |
|
178 | self.auth = hmac.HMAC(new) | |
|
179 | else: | |
|
180 | self.auth = None | |
|
181 | auth = Instance(hmac.HMAC) | |
|
182 | counters = Instance('collections.defaultdict', (int,)) | |
|
183 | digest_history = Set() | |
|
184 | ||
|
160 | 185 | keyfile = Unicode('', config=True, |
|
161 | 186 | help="""path to file containing execution key.""") |
|
162 | 187 | def _keyfile_changed(self, name, old, new): |
@@ -202,7 +227,15 b' class StreamSession(Configurable):' | |||
|
202 | 227 | return True |
|
203 | 228 | header = extract_header(msg_or_header) |
|
204 | 229 | return header.get('key', '') == self.key |
|
205 | ||
|
230 | ||
|
231 | def sign(self, msg): | |
|
232 | """Sign a message with HMAC digest. If no auth, return b''.""" | |
|
233 | if self.auth is None: | |
|
234 | return b'' | |
|
235 | h = self.auth.copy() | |
|
236 | for m in msg: | |
|
237 | h.update(m) | |
|
238 | return h.hexdigest() | |
|
206 | 239 | |
|
207 | 240 | def serialize(self, msg, ident=None): |
|
208 | 241 | content = msg.get('content', {}) |
@@ -218,7 +251,12 b' class StreamSession(Configurable):' | |||
|
218 | 251 | content = content.encode('utf8') |
|
219 | 252 | else: |
|
220 | 253 | raise TypeError("Content incorrect type: %s"%type(content)) |
|
221 | ||
|
254 | ||
|
255 | real_message = [self.pack(msg['header']), | |
|
256 | self.pack(msg['parent_header']), | |
|
257 | content | |
|
258 | ] | |
|
259 | ||
|
222 | 260 | to_send = [] |
|
223 | 261 | |
|
224 | 262 | if isinstance(ident, list): |
@@ -227,11 +265,11 b' class StreamSession(Configurable):' | |||
|
227 | 265 | elif ident is not None: |
|
228 | 266 | to_send.append(ident) |
|
229 | 267 | to_send.append(DELIM) |
|
230 | if self.key: | |
|
231 | to_send.append(self.key) | |
|
232 |
to_send.append(s |
|
|
233 | to_send.append(self.pack(msg['parent_header'])) | |
|
234 |
to_send. |
|
|
268 | ||
|
269 | signature = self.sign(real_message) | |
|
270 | to_send.append(signature) | |
|
271 | ||
|
272 | to_send.extend(real_message) | |
|
235 | 273 | |
|
236 | 274 | return to_send |
|
237 | 275 | |
@@ -323,9 +361,9 b' class StreamSession(Configurable):' | |||
|
323 | 361 | ident = [ident] |
|
324 | 362 | if ident is not None: |
|
325 | 363 | to_send.extend(ident) |
|
364 | ||
|
326 | 365 | to_send.append(DELIM) |
|
327 | if self.key: | |
|
328 | to_send.append(self.key) | |
|
366 | to_send.append(self.sign(msg)) | |
|
329 | 367 | to_send.extend(msg) |
|
330 | 368 | stream.send_multipart(msg, flags, copy=copy) |
|
331 | 369 | |
@@ -372,23 +410,19 b' class StreamSession(Configurable):' | |||
|
372 | 410 | msg will be a list of bytes or Messages, unchanged from input |
|
373 | 411 | msg should be unpackable via self.unpack_message at this point. |
|
374 | 412 | """ |
|
375 | ikey = int(self.key != '') | |
|
376 | minlen = 3 + ikey | |
|
377 | msg = list(msg) | |
|
378 |
|
|
|
379 | while len(msg) > minlen: | |
|
380 | if copy: | |
|
381 |
|
|
|
382 |
else |
|
|
383 |
|
|
|
384 |
if |
|
|
385 | msg.pop(0) | |
|
386 | break | |
|
387 | else: | |
|
388 | idents.append(s) | |
|
389 | msg.pop(0) | |
|
390 | ||
|
391 | return idents, msg | |
|
413 | if copy: | |
|
414 | idx = msg.index(DELIM) | |
|
415 | return msg[:idx], msg[idx+1:] | |
|
416 | else: | |
|
417 | failed = True | |
|
418 | for idx,m in enumerate(msg): | |
|
419 | if m.bytes == DELIM: | |
|
420 | failed = False | |
|
421 | break | |
|
422 | if failed: | |
|
423 | raise ValueError("DELIM not in msg") | |
|
424 | idents, msg = msg[:idx], msg[idx+1:] | |
|
425 | return [m.bytes for m in idents], msg | |
|
392 | 426 | |
|
393 | 427 | def unpack_message(self, msg, content=True, copy=True): |
|
394 | 428 | """Return a message object from the format |
@@ -406,28 +440,31 b' class StreamSession(Configurable):' | |||
|
406 | 440 | or the non-copying Message object in each place (False) |
|
407 | 441 | |
|
408 | 442 | """ |
|
409 | ikey = int(self.key != '') | |
|
410 | minlen = 3 + ikey | |
|
443 | minlen = 4 | |
|
411 | 444 | message = {} |
|
412 | 445 | if not copy: |
|
413 | 446 | for i in range(minlen): |
|
414 | 447 | msg[i] = msg[i].bytes |
|
415 | if ikey: | |
|
416 |
|
|
|
417 | raise KeyError("Invalid Session Key: %s"%msg[0]) | |
|
448 | if self.auth is not None: | |
|
449 | signature = msg[0] | |
|
450 | if signature in self.digest_history: | |
|
451 | raise ValueError("Duplicate Signature: %r"%signature) | |
|
452 | self.digest_history.add(signature) | |
|
453 | check = self.sign(msg[1:4]) | |
|
454 | if not signature == check: | |
|
455 | raise ValueError("Invalid Signature: %r"%signature) | |
|
418 | 456 | if not len(msg) >= minlen: |
|
419 | 457 | raise TypeError("malformed message, must have at least %i elements"%minlen) |
|
420 |
message['header'] = self.unpack(msg[ |
|
|
458 | message['header'] = self.unpack(msg[1]) | |
|
421 | 459 | message['msg_type'] = message['header']['msg_type'] |
|
422 |
message['parent_header'] = self.unpack(msg[ |
|
|
460 | message['parent_header'] = self.unpack(msg[2]) | |
|
423 | 461 | if content: |
|
424 |
message['content'] = self.unpack(msg[ |
|
|
462 | message['content'] = self.unpack(msg[3]) | |
|
425 | 463 | else: |
|
426 |
message['content'] = msg[ |
|
|
464 | message['content'] = msg[3] | |
|
427 | 465 | |
|
428 |
message['buffers'] = msg[ |
|
|
466 | message['buffers'] = msg[4:] | |
|
429 | 467 | return message |
|
430 | ||
|
431 | 468 | |
|
432 | 469 | def test_msg2obj(): |
|
433 | 470 | am = dict(x=1) |
@@ -15,8 +15,8 b" computing. This feature brings up the important question of IPython's security" | |||
|
15 | 15 | model. This document gives details about this model and how it is implemented |
|
16 | 16 | in IPython's architecture. |
|
17 | 17 | |
|
18 |
Process |
|
|
19 |
============================ |
|
|
18 | Process and network topology | |
|
19 | ============================ | |
|
20 | 20 | |
|
21 | 21 | To enable parallel computing, IPython has a number of different processes that |
|
22 | 22 | run. These processes are discussed at length in the IPython documentation and |
@@ -36,15 +36,9 b' are summarized here:' | |||
|
36 | 36 | interactive Python process that is used to coordinate the |
|
37 | 37 | engines to get a parallel computation done. |
|
38 | 38 | |
|
39 |
Collectively, these processes are called the IPython * |
|
|
39 | Collectively, these processes are called the IPython *cluster*, and the hub and schedulers | |
|
40 | 40 | together are referred to as the *controller*. |
|
41 | 41 | |
|
42 | .. note:: | |
|
43 | ||
|
44 | Are these really still referred to as the Kernel? It doesn't seem so to me. 'cluster' | |
|
45 | seems more accurate. | |
|
46 | ||
|
47 | -MinRK | |
|
48 | 42 | |
|
49 | 43 | These processes communicate over any transport supported by ZeroMQ (tcp,pgm,infiniband,ipc) |
|
50 | 44 | with a well defined topology. The IPython hub and schedulers listen on sockets. Upon |
@@ -118,20 +112,23 b' controller were on loopback on the connecting machine.' | |||
|
118 | 112 | Authentication |
|
119 | 113 | -------------- |
|
120 | 114 | |
|
121 |
To protect users of shared machines, |
|
|
115 | To protect users of shared machines, [HMAC]_ digests are used to sign messages, using a | |
|
116 | shared key. | |
|
122 | 117 | |
|
123 | 118 | The Session object that handles the message protocol uses a unique key to verify valid |
|
124 | 119 | messages. This can be any value specified by the user, but the default behavior is a |
|
125 |
pseudo-random 128-bit number, as generated by `uuid.uuid4()`. This key is |
|
|
126 | message everywhere it is unpacked (Controller, Engine, and Client) to ensure that it came | |
|
127 | from an authentic user, and no messages that do not contain this key are acted upon in any | |
|
128 | way. | |
|
129 | ||
|
130 | There is exactly one key per cluster - it must be the same everywhere. Typically, the | |
|
131 | controller creates this key, and stores it in the private connection files | |
|
120 | pseudo-random 128-bit number, as generated by `uuid.uuid4()`. This key is used to | |
|
121 | initialize an HMAC object, which digests all messages, and includes that digest as a | |
|
122 | signature and part of the message. Every message that is unpacked (on Controller, Engine, | |
|
123 | and Client) will also be digested by the receiver, ensuring that the sender's key is the | |
|
124 | same as the receiver's. No messages that do not contain this key are acted upon in any | |
|
125 | way. The key itself is never sent over the network. | |
|
126 | ||
|
127 | There is exactly one shared key per cluster - it must be the same everywhere. Typically, | |
|
128 | the controller creates this key, and stores it in the private connection files | |
|
132 | 129 | `ipython-{engine|client}.json`. These files are typically stored in the |
|
133 |
`~/.ipython/ |
|
|
134 |
|
|
|
130 | `~/.ipython/profile_<name>/security` directory, and are maintained as readable only by the | |
|
131 | owner, just as is common practice with a user's keys in their `.ssh` directory. | |
|
135 | 132 | |
|
136 | 133 | .. warning:: |
|
137 | 134 | |
@@ -171,13 +168,15 b' It is highly unlikely that an execution key could be guessed by an attacker' | |||
|
171 | 168 | in a brute force guessing attack. A given instance of the IPython controller |
|
172 | 169 | only runs for a relatively short amount of time (on the order of hours). Thus |
|
173 | 170 | an attacker would have only a limited amount of time to test a search space of |
|
174 | size 2**128. | |
|
171 | size 2**128. For added security, users can have arbitrarily long keys. | |
|
175 | 172 | |
|
176 | 173 | .. warning:: |
|
177 | 174 | |
|
178 | If the attacker has gained enough access to intercept loopback connections on | |
|
179 | *either* the controller or client, then the key is easily deduced from network | |
|
180 | traffic. | |
|
175 | If the attacker has gained enough access to intercept loopback connections on *either* the | |
|
176 | controller or client, then a duplicate message can be sent. To protect against this, | |
|
177 | recipients only allow each signature once, and consider duplicates invalid. However, | |
|
178 | the duplicate message could be sent to *another* recipient using the same key, | |
|
179 | and it would be considered valid. | |
|
181 | 180 | |
|
182 | 181 | |
|
183 | 182 | Unauthorized engines |
@@ -322,3 +321,4 b' channel is established.' | |||
|
322 | 321 | |
|
323 | 322 | .. [OpenSSH] <http://www.openssh.com/> |
|
324 | 323 | .. [Paramiko] <http://www.lag.net/paramiko/> |
|
324 | .. [HMAC] <http://tools.ietf.org/html/rfc2104.html> |
General Comments 0
You need to be logged in to leave comments.
Login now