##// END OF EJS Templates
use HMAC digest to sign messages instead of cleartext key...
MinRK -
Show More
@@ -30,6 +30,7 b' from zmq.devices import ProcessMonitoredQueue'
30 from zmq.log.handlers import PUBHandler
30 from zmq.log.handlers import PUBHandler
31 from zmq.utils import jsonapi as json
31 from zmq.utils import jsonapi as json
32
32
33 from IPython.config.application import boolean_flag
33 from IPython.core.newapplication import ProfileDir
34 from IPython.core.newapplication import ProfileDir
34
35
35 from IPython.parallel.apps.baseapp import (
36 from IPython.parallel.apps.baseapp import (
@@ -98,7 +99,10 b' flags.update({'
98 'reuse existing json connection files')
99 'reuse existing json connection files')
99 })
100 })
100
101
101 flags.update()
102 flags.update(boolean_flag('secure', 'IPControllerApp.secure',
103 "Use HMAC digests for authentication of messages.",
104 "Don't authenticate messages."
105 ))
102
106
103 class IPControllerApp(BaseParallelApplication):
107 class IPControllerApp(BaseParallelApplication):
104
108
@@ -109,18 +113,18 b' class IPControllerApp(BaseParallelApplication):'
109
113
110 # change default to True
114 # change default to True
111 auto_create = Bool(True, config=True,
115 auto_create = Bool(True, config=True,
112 help="""Whether to create profile dir if it doesn't exist""")
116 help="""Whether to create profile dir if it doesn't exist.""")
113
117
114 reuse_files = Bool(False, config=True,
118 reuse_files = Bool(False, config=True,
115 help='Whether to reuse existing json connection files [default: False]'
119 help='Whether to reuse existing json connection files.'
116 )
120 )
117 secure = Bool(True, config=True,
121 secure = Bool(True, config=True,
118 help='Whether to use exec_keys for extra authentication [default: True]'
122 help='Whether to use HMAC digests for extra message authentication.'
119 )
123 )
120 ssh_server = Unicode(u'', config=True,
124 ssh_server = Unicode(u'', config=True,
121 help="""ssh url for clients to use when connecting to the Controller
125 help="""ssh url for clients to use when connecting to the Controller
122 processes. It should be of the form: [user@]server[:port]. The
126 processes. It should be of the form: [user@]server[:port]. The
123 Controller\'s listening addresses must be accessible from the ssh server""",
127 Controller's listening addresses must be accessible from the ssh server""",
124 )
128 )
125 location = Unicode(u'', config=True,
129 location = Unicode(u'', config=True,
126 help="""The external IP or domain name of the Controller, used for disambiguating
130 help="""The external IP or domain name of the Controller, used for disambiguating
@@ -468,20 +468,23 b' class Hub(LoggingFactory):'
468
468
469 def dispatch_query(self, msg):
469 def dispatch_query(self, msg):
470 """Route registration requests and queries from clients."""
470 """Route registration requests and queries from clients."""
471 idents, msg = self.session.feed_identities(msg)
471 try:
472 idents, msg = self.session.feed_identities(msg)
473 except ValueError:
474 idents = []
472 if not idents:
475 if not idents:
473 self.log.error("Bad Query Message: %r"%msg)
476 self.log.error("Bad Query Message: %r"%msg)
474 return
477 return
475 client_id = idents[0]
478 client_id = idents[0]
476 try:
479 try:
477 msg = self.session.unpack_message(msg, content=True)
480 msg = self.session.unpack_message(msg, content=True)
478 except:
481 except Exception:
479 content = error.wrap_exception()
482 content = error.wrap_exception()
480 self.log.error("Bad Query Message: %r"%msg, exc_info=True)
483 self.log.error("Bad Query Message: %r"%msg, exc_info=True)
481 self.session.send(self.query, "hub_error", ident=client_id,
484 self.session.send(self.query, "hub_error", ident=client_id,
482 content=content)
485 content=content)
483 return
486 return
484
487 print( idents, msg)
485 # print client_id, header, parent, content
488 # print client_id, header, parent, content
486 #switch on message type:
489 #switch on message type:
487 msg_type = msg['msg_type']
490 msg_type = msg['msg_type']
@@ -1123,9 +1126,10 b' class Hub(LoggingFactory):'
1123 return finish(error.wrap_exception())
1126 return finish(error.wrap_exception())
1124
1127
1125 # clear the existing records
1128 # clear the existing records
1129 now = datetime.now()
1126 rec = empty_record()
1130 rec = empty_record()
1127 map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted'])
1131 map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted'])
1128 rec['resubmitted'] = datetime.now()
1132 rec['resubmitted'] = now
1129 rec['queue'] = 'task'
1133 rec['queue'] = 'task'
1130 rec['client_uuid'] = client_id[0]
1134 rec['client_uuid'] = client_id[0]
1131 try:
1135 try:
@@ -1137,8 +1141,11 b' class Hub(LoggingFactory):'
1137 reply = error.wrap_exception()
1141 reply = error.wrap_exception()
1138 else:
1142 else:
1139 # send the messages
1143 # send the messages
1144 now_s = now.strftime(util.ISO8601)
1140 for rec in records:
1145 for rec in records:
1141 header = rec['header']
1146 header = rec['header']
1147 # include resubmitted in header to prevent digest collision
1148 header['resubmitted'] = now_s
1142 msg = self.session.msg(header['msg_type'])
1149 msg = self.session.msg(header['msg_type'])
1143 msg['content'] = rec['content']
1150 msg['content'] = rec['content']
1144 msg['header'] = header
1151 msg['header'] = header
@@ -196,17 +196,27 b' class TaskScheduler(SessionFactory):'
196
196
197 def dispatch_notification(self, msg):
197 def dispatch_notification(self, msg):
198 """dispatch register/unregister events."""
198 """dispatch register/unregister events."""
199 idents,msg = self.session.feed_identities(msg)
199 try:
200 msg = self.session.unpack_message(msg)
200 idents,msg = self.session.feed_identities(msg)
201 except ValueError:
202 self.log.warn("task::Invalid Message: %r"%msg)
203 return
204 try:
205 msg = self.session.unpack_message(msg)
206 except ValueError:
207 self.log.warn("task::Unauthorized message from: %r"%idents)
208 return
209
201 msg_type = msg['msg_type']
210 msg_type = msg['msg_type']
211
202 handler = self._notification_handlers.get(msg_type, None)
212 handler = self._notification_handlers.get(msg_type, None)
203 if handler is None:
213 if handler is None:
204 raise Exception("Unhandled message type: %s"%msg_type)
214 self.log.error("Unhandled message type: %r"%msg_type)
205 else:
215 else:
206 try:
216 try:
207 handler(str(msg['content']['queue']))
217 handler(str(msg['content']['queue']))
208 except KeyError:
218 except KeyError:
209 self.log.error("task::Invalid notification msg: %s"%msg)
219 self.log.error("task::Invalid notification msg: %r"%msg)
210
220
211 @logged
221 @logged
212 def _register_engine(self, uid):
222 def _register_engine(self, uid):
@@ -262,8 +272,7 b' class TaskScheduler(SessionFactory):'
262
272
263 raw_msg = lost[msg_id][0]
273 raw_msg = lost[msg_id][0]
264 idents,msg = self.session.feed_identities(raw_msg, copy=False)
274 idents,msg = self.session.feed_identities(raw_msg, copy=False)
265 msg = self.session.unpack_message(msg, copy=False, content=False)
275 parent = self.session.unpack(msg[1].bytes)
266 parent = msg['header']
267 idents = [engine, idents[0]]
276 idents = [engine, idents[0]]
268
277
269 # build fake error reply
278 # build fake error reply
@@ -296,7 +305,7 b' class TaskScheduler(SessionFactory):'
296 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
305 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
297 return
306 return
298
307
299
308
300 # send to monitor
309 # send to monitor
301 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
310 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
302
311
@@ -377,8 +386,7 b' class TaskScheduler(SessionFactory):'
377
386
378 # FIXME: unpacking a message I've already unpacked, but didn't save:
387 # FIXME: unpacking a message I've already unpacked, but didn't save:
379 idents,msg = self.session.feed_identities(raw_msg, copy=False)
388 idents,msg = self.session.feed_identities(raw_msg, copy=False)
380 msg = self.session.unpack_message(msg, copy=False, content=False)
389 header = self.session.unpack(msg[1].bytes)
381 header = msg['header']
382
390
383 try:
391 try:
384 raise why()
392 raise why()
@@ -8,7 +8,11 b''
8 # the file COPYING, distributed as part of this software.
8 # the file COPYING, distributed as part of this software.
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10
10
11 #-----------------------------------------------------------------------------
12 # Imports
13 #-----------------------------------------------------------------------------
11
14
15 import hmac
12 import os
16 import os
13 import pprint
17 import pprint
14 import uuid
18 import uuid
@@ -27,10 +31,13 b' from zmq.eventloop.zmqstream import ZMQStream'
27
31
28 from IPython.config.configurable import Configurable
32 from IPython.config.configurable import Configurable
29 from IPython.utils.importstring import import_item
33 from IPython.utils.importstring import import_item
30 from IPython.utils.traitlets import CStr, Unicode, Bool, Any
34 from IPython.utils.traitlets import CStr, Unicode, Bool, Any, Instance, Set
31
35
32 from .util import ISO8601
36 from .util import ISO8601
33
37
38 #-----------------------------------------------------------------------------
39 # utility functions
40 #-----------------------------------------------------------------------------
34
41
35 def squash_unicode(obj):
42 def squash_unicode(obj):
36 """coerce unicode back to bytestrings."""
43 """coerce unicode back to bytestrings."""
@@ -52,6 +59,10 b' def _date_default(obj):'
52 else:
59 else:
53 raise TypeError("%r is not JSON serializable"%obj)
60 raise TypeError("%r is not JSON serializable"%obj)
54
61
62 #-----------------------------------------------------------------------------
63 # globals and defaults
64 #-----------------------------------------------------------------------------
65
55 _default_key = 'on_unknown' if jsonapi.jsonmod.__name__ == 'jsonlib' else 'default'
66 _default_key = 'on_unknown' if jsonapi.jsonmod.__name__ == 'jsonlib' else 'default'
56 json_packer = lambda obj: jsonapi.dumps(obj, **{_default_key:_date_default})
67 json_packer = lambda obj: jsonapi.dumps(obj, **{_default_key:_date_default})
57 json_unpacker = lambda s: squash_unicode(jsonapi.loads(s))
68 json_unpacker = lambda s: squash_unicode(jsonapi.loads(s))
@@ -65,6 +76,10 b' default_unpacker = json_unpacker'
65
76
66 DELIM="<IDS|MSG>"
77 DELIM="<IDS|MSG>"
67
78
79 #-----------------------------------------------------------------------------
80 # Classes
81 #-----------------------------------------------------------------------------
82
68 class Message(object):
83 class Message(object):
69 """A simple message object that maps dict keys to attributes.
84 """A simple message object that maps dict keys to attributes.
70
85
@@ -152,11 +167,21 b' class StreamSession(Configurable):'
152 help="""The UUID identifying this session.""")
167 help="""The UUID identifying this session.""")
153 def _session_default(self):
168 def _session_default(self):
154 return bytes(uuid.uuid4())
169 return bytes(uuid.uuid4())
155 username = Unicode(os.environ.get('USER','username'),config=True,
170 username = Unicode(os.environ.get('USER','username'), config=True,
156 help="""Username for the Session. Default is your system username.""")
171 help="""Username for the Session. Default is your system username.""")
172
173 # message signature related traits:
157 key = CStr('', config=True,
174 key = CStr('', config=True,
158 help="""execution key, for extra authentication.""")
175 help="""execution key, for extra authentication.""")
159
176 def _key_changed(self, name, old, new):
177 if new:
178 self.auth = hmac.HMAC(new)
179 else:
180 self.auth = None
181 auth = Instance(hmac.HMAC)
182 counters = Instance('collections.defaultdict', (int,))
183 digest_history = Set()
184
160 keyfile = Unicode('', config=True,
185 keyfile = Unicode('', config=True,
161 help="""path to file containing execution key.""")
186 help="""path to file containing execution key.""")
162 def _keyfile_changed(self, name, old, new):
187 def _keyfile_changed(self, name, old, new):
@@ -202,7 +227,15 b' class StreamSession(Configurable):'
202 return True
227 return True
203 header = extract_header(msg_or_header)
228 header = extract_header(msg_or_header)
204 return header.get('key', '') == self.key
229 return header.get('key', '') == self.key
205
230
231 def sign(self, msg):
232 """Sign a message with HMAC digest. If no auth, return b''."""
233 if self.auth is None:
234 return b''
235 h = self.auth.copy()
236 for m in msg:
237 h.update(m)
238 return h.hexdigest()
206
239
207 def serialize(self, msg, ident=None):
240 def serialize(self, msg, ident=None):
208 content = msg.get('content', {})
241 content = msg.get('content', {})
@@ -218,7 +251,12 b' class StreamSession(Configurable):'
218 content = content.encode('utf8')
251 content = content.encode('utf8')
219 else:
252 else:
220 raise TypeError("Content incorrect type: %s"%type(content))
253 raise TypeError("Content incorrect type: %s"%type(content))
221
254
255 real_message = [self.pack(msg['header']),
256 self.pack(msg['parent_header']),
257 content
258 ]
259
222 to_send = []
260 to_send = []
223
261
224 if isinstance(ident, list):
262 if isinstance(ident, list):
@@ -227,11 +265,11 b' class StreamSession(Configurable):'
227 elif ident is not None:
265 elif ident is not None:
228 to_send.append(ident)
266 to_send.append(ident)
229 to_send.append(DELIM)
267 to_send.append(DELIM)
230 if self.key:
268
231 to_send.append(self.key)
269 signature = self.sign(real_message)
232 to_send.append(self.pack(msg['header']))
270 to_send.append(signature)
233 to_send.append(self.pack(msg['parent_header']))
271
234 to_send.append(content)
272 to_send.extend(real_message)
235
273
236 return to_send
274 return to_send
237
275
@@ -323,9 +361,9 b' class StreamSession(Configurable):'
323 ident = [ident]
361 ident = [ident]
324 if ident is not None:
362 if ident is not None:
325 to_send.extend(ident)
363 to_send.extend(ident)
364
326 to_send.append(DELIM)
365 to_send.append(DELIM)
327 if self.key:
366 to_send.append(self.sign(msg))
328 to_send.append(self.key)
329 to_send.extend(msg)
367 to_send.extend(msg)
330 stream.send_multipart(msg, flags, copy=copy)
368 stream.send_multipart(msg, flags, copy=copy)
331
369
@@ -372,23 +410,19 b' class StreamSession(Configurable):'
372 msg will be a list of bytes or Messages, unchanged from input
410 msg will be a list of bytes or Messages, unchanged from input
373 msg should be unpackable via self.unpack_message at this point.
411 msg should be unpackable via self.unpack_message at this point.
374 """
412 """
375 ikey = int(self.key != '')
413 if copy:
376 minlen = 3 + ikey
414 idx = msg.index(DELIM)
377 msg = list(msg)
415 return msg[:idx], msg[idx+1:]
378 idents = []
416 else:
379 while len(msg) > minlen:
417 failed = True
380 if copy:
418 for idx,m in enumerate(msg):
381 s = msg[0]
419 if m.bytes == DELIM:
382 else:
420 failed = False
383 s = msg[0].bytes
421 break
384 if s == DELIM:
422 if failed:
385 msg.pop(0)
423 raise ValueError("DELIM not in msg")
386 break
424 idents, msg = msg[:idx], msg[idx+1:]
387 else:
425 return [m.bytes for m in idents], msg
388 idents.append(s)
389 msg.pop(0)
390
391 return idents, msg
392
426
393 def unpack_message(self, msg, content=True, copy=True):
427 def unpack_message(self, msg, content=True, copy=True):
394 """Return a message object from the format
428 """Return a message object from the format
@@ -406,28 +440,31 b' class StreamSession(Configurable):'
406 or the non-copying Message object in each place (False)
440 or the non-copying Message object in each place (False)
407
441
408 """
442 """
409 ikey = int(self.key != '')
443 minlen = 4
410 minlen = 3 + ikey
411 message = {}
444 message = {}
412 if not copy:
445 if not copy:
413 for i in range(minlen):
446 for i in range(minlen):
414 msg[i] = msg[i].bytes
447 msg[i] = msg[i].bytes
415 if ikey:
448 if self.auth is not None:
416 if not self.key == msg[0]:
449 signature = msg[0]
417 raise KeyError("Invalid Session Key: %s"%msg[0])
450 if signature in self.digest_history:
451 raise ValueError("Duplicate Signature: %r"%signature)
452 self.digest_history.add(signature)
453 check = self.sign(msg[1:4])
454 if not signature == check:
455 raise ValueError("Invalid Signature: %r"%signature)
418 if not len(msg) >= minlen:
456 if not len(msg) >= minlen:
419 raise TypeError("malformed message, must have at least %i elements"%minlen)
457 raise TypeError("malformed message, must have at least %i elements"%minlen)
420 message['header'] = self.unpack(msg[ikey+0])
458 message['header'] = self.unpack(msg[1])
421 message['msg_type'] = message['header']['msg_type']
459 message['msg_type'] = message['header']['msg_type']
422 message['parent_header'] = self.unpack(msg[ikey+1])
460 message['parent_header'] = self.unpack(msg[2])
423 if content:
461 if content:
424 message['content'] = self.unpack(msg[ikey+2])
462 message['content'] = self.unpack(msg[3])
425 else:
463 else:
426 message['content'] = msg[ikey+2]
464 message['content'] = msg[3]
427
465
428 message['buffers'] = msg[ikey+3:]# [ m.buffer for m in msg[3:] ]
466 message['buffers'] = msg[4:]
429 return message
467 return message
430
431
468
432 def test_msg2obj():
469 def test_msg2obj():
433 am = dict(x=1)
470 am = dict(x=1)
@@ -15,8 +15,8 b" computing. This feature brings up the important question of IPython's security"
15 model. This document gives details about this model and how it is implemented
15 model. This document gives details about this model and how it is implemented
16 in IPython's architecture.
16 in IPython's architecture.
17
17
18 Processs and network topology
18 Process and network topology
19 =============================
19 ============================
20
20
21 To enable parallel computing, IPython has a number of different processes that
21 To enable parallel computing, IPython has a number of different processes that
22 run. These processes are discussed at length in the IPython documentation and
22 run. These processes are discussed at length in the IPython documentation and
@@ -36,15 +36,9 b' are summarized here:'
36 interactive Python process that is used to coordinate the
36 interactive Python process that is used to coordinate the
37 engines to get a parallel computation done.
37 engines to get a parallel computation done.
38
38
39 Collectively, these processes are called the IPython *kernel*, and the hub and schedulers
39 Collectively, these processes are called the IPython *cluster*, and the hub and schedulers
40 together are referred to as the *controller*.
40 together are referred to as the *controller*.
41
41
42 .. note::
43
44 Are these really still referred to as the Kernel? It doesn't seem so to me. 'cluster'
45 seems more accurate.
46
47 -MinRK
48
42
49 These processes communicate over any transport supported by ZeroMQ (tcp,pgm,infiniband,ipc)
43 These processes communicate over any transport supported by ZeroMQ (tcp,pgm,infiniband,ipc)
50 with a well defined topology. The IPython hub and schedulers listen on sockets. Upon
44 with a well defined topology. The IPython hub and schedulers listen on sockets. Upon
@@ -118,20 +112,23 b' controller were on loopback on the connecting machine.'
118 Authentication
112 Authentication
119 --------------
113 --------------
120
114
121 To protect users of shared machines, an execution key is used to authenticate all messages.
115 To protect users of shared machines, [HMAC]_ digests are used to sign messages, using a
116 shared key.
122
117
123 The Session object that handles the message protocol uses a unique key to verify valid
118 The Session object that handles the message protocol uses a unique key to verify valid
124 messages. This can be any value specified by the user, but the default behavior is a
119 messages. This can be any value specified by the user, but the default behavior is a
125 pseudo-random 128-bit number, as generated by `uuid.uuid4()`. This key is checked on every
120 pseudo-random 128-bit number, as generated by `uuid.uuid4()`. This key is used to
126 message everywhere it is unpacked (Controller, Engine, and Client) to ensure that it came
121 initialize an HMAC object, which digests all messages, and includes that digest as a
127 from an authentic user, and no messages that do not contain this key are acted upon in any
122 signature and part of the message. Every message that is unpacked (on Controller, Engine,
128 way.
123 and Client) will also be digested by the receiver, ensuring that the sender's key is the
129
124 same as the receiver's. No messages that do not contain this key are acted upon in any
130 There is exactly one key per cluster - it must be the same everywhere. Typically, the
125 way. The key itself is never sent over the network.
131 controller creates this key, and stores it in the private connection files
126
127 There is exactly one shared key per cluster - it must be the same everywhere. Typically,
128 the controller creates this key, and stores it in the private connection files
132 `ipython-{engine|client}.json`. These files are typically stored in the
129 `ipython-{engine|client}.json`. These files are typically stored in the
133 `~/.ipython/cluster_<profile>/security` directory, and are maintained as readable only by
130 `~/.ipython/profile_<name>/security` directory, and are maintained as readable only by the
134 the owner, just as is common practice with a user's keys in their `.ssh` directory.
131 owner, just as is common practice with a user's keys in their `.ssh` directory.
135
132
136 .. warning::
133 .. warning::
137
134
@@ -171,13 +168,15 b' It is highly unlikely that an execution key could be guessed by an attacker'
171 in a brute force guessing attack. A given instance of the IPython controller
168 in a brute force guessing attack. A given instance of the IPython controller
172 only runs for a relatively short amount of time (on the order of hours). Thus
169 only runs for a relatively short amount of time (on the order of hours). Thus
173 an attacker would have only a limited amount of time to test a search space of
170 an attacker would have only a limited amount of time to test a search space of
174 size 2**128.
171 size 2**128. For added security, users can have arbitrarily long keys.
175
172
176 .. warning::
173 .. warning::
177
174
178 If the attacker has gained enough access to intercept loopback connections on
175 If the attacker has gained enough access to intercept loopback connections on *either* the
179 *either* the controller or client, then the key is easily deduced from network
176 controller or client, then a duplicate message can be sent. To protect against this,
180 traffic.
177 recipients only allow each signature once, and consider duplicates invalid. However,
178 the duplicate message could be sent to *another* recipient using the same key,
179 and it would be considered valid.
181
180
182
181
183 Unauthorized engines
182 Unauthorized engines
@@ -322,3 +321,4 b' channel is established.'
322
321
323 .. [OpenSSH] <http://www.openssh.com/>
322 .. [OpenSSH] <http://www.openssh.com/>
324 .. [Paramiko] <http://www.lag.net/paramiko/>
323 .. [Paramiko] <http://www.lag.net/paramiko/>
324 .. [HMAC] <http://tools.ietf.org/html/rfc2104.html>
General Comments 0
You need to be logged in to leave comments. Login now