##// END OF EJS Templates
use HMAC digest to sign messages instead of cleartext key...
MinRK -
Show More
@@ -30,6 +30,7 b' from zmq.devices import ProcessMonitoredQueue'
30 30 from zmq.log.handlers import PUBHandler
31 31 from zmq.utils import jsonapi as json
32 32
33 from IPython.config.application import boolean_flag
33 34 from IPython.core.newapplication import ProfileDir
34 35
35 36 from IPython.parallel.apps.baseapp import (
@@ -98,7 +99,10 b' flags.update({'
98 99 'reuse existing json connection files')
99 100 })
100 101
101 flags.update()
102 flags.update(boolean_flag('secure', 'IPControllerApp.secure',
103 "Use HMAC digests for authentication of messages.",
104 "Don't authenticate messages."
105 ))
102 106
103 107 class IPControllerApp(BaseParallelApplication):
104 108
@@ -109,18 +113,18 b' class IPControllerApp(BaseParallelApplication):'
109 113
110 114 # change default to True
111 115 auto_create = Bool(True, config=True,
112 help="""Whether to create profile dir if it doesn't exist""")
116 help="""Whether to create profile dir if it doesn't exist.""")
113 117
114 118 reuse_files = Bool(False, config=True,
115 help='Whether to reuse existing json connection files [default: False]'
119 help='Whether to reuse existing json connection files.'
116 120 )
117 121 secure = Bool(True, config=True,
118 help='Whether to use exec_keys for extra authentication [default: True]'
122 help='Whether to use HMAC digests for extra message authentication.'
119 123 )
120 124 ssh_server = Unicode(u'', config=True,
121 125 help="""ssh url for clients to use when connecting to the Controller
122 126 processes. It should be of the form: [user@]server[:port]. The
123 Controller\'s listening addresses must be accessible from the ssh server""",
127 Controller's listening addresses must be accessible from the ssh server""",
124 128 )
125 129 location = Unicode(u'', config=True,
126 130 help="""The external IP or domain name of the Controller, used for disambiguating
@@ -468,20 +468,23 b' class Hub(LoggingFactory):'
468 468
469 469 def dispatch_query(self, msg):
470 470 """Route registration requests and queries from clients."""
471 idents, msg = self.session.feed_identities(msg)
471 try:
472 idents, msg = self.session.feed_identities(msg)
473 except ValueError:
474 idents = []
472 475 if not idents:
473 476 self.log.error("Bad Query Message: %r"%msg)
474 477 return
475 478 client_id = idents[0]
476 479 try:
477 480 msg = self.session.unpack_message(msg, content=True)
478 except:
481 except Exception:
479 482 content = error.wrap_exception()
480 483 self.log.error("Bad Query Message: %r"%msg, exc_info=True)
481 484 self.session.send(self.query, "hub_error", ident=client_id,
482 485 content=content)
483 486 return
484
487 print( idents, msg)
485 488 # print client_id, header, parent, content
486 489 #switch on message type:
487 490 msg_type = msg['msg_type']
@@ -1123,9 +1126,10 b' class Hub(LoggingFactory):'
1123 1126 return finish(error.wrap_exception())
1124 1127
1125 1128 # clear the existing records
1129 now = datetime.now()
1126 1130 rec = empty_record()
1127 1131 map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted'])
1128 rec['resubmitted'] = datetime.now()
1132 rec['resubmitted'] = now
1129 1133 rec['queue'] = 'task'
1130 1134 rec['client_uuid'] = client_id[0]
1131 1135 try:
@@ -1137,8 +1141,11 b' class Hub(LoggingFactory):'
1137 1141 reply = error.wrap_exception()
1138 1142 else:
1139 1143 # send the messages
1144 now_s = now.strftime(util.ISO8601)
1140 1145 for rec in records:
1141 1146 header = rec['header']
1147 # include resubmitted in header to prevent digest collision
1148 header['resubmitted'] = now_s
1142 1149 msg = self.session.msg(header['msg_type'])
1143 1150 msg['content'] = rec['content']
1144 1151 msg['header'] = header
@@ -196,17 +196,27 b' class TaskScheduler(SessionFactory):'
196 196
197 197 def dispatch_notification(self, msg):
198 198 """dispatch register/unregister events."""
199 idents,msg = self.session.feed_identities(msg)
200 msg = self.session.unpack_message(msg)
199 try:
200 idents,msg = self.session.feed_identities(msg)
201 except ValueError:
202 self.log.warn("task::Invalid Message: %r"%msg)
203 return
204 try:
205 msg = self.session.unpack_message(msg)
206 except ValueError:
207 self.log.warn("task::Unauthorized message from: %r"%idents)
208 return
209
201 210 msg_type = msg['msg_type']
211
202 212 handler = self._notification_handlers.get(msg_type, None)
203 213 if handler is None:
204 raise Exception("Unhandled message type: %s"%msg_type)
214 self.log.error("Unhandled message type: %r"%msg_type)
205 215 else:
206 216 try:
207 217 handler(str(msg['content']['queue']))
208 218 except KeyError:
209 self.log.error("task::Invalid notification msg: %s"%msg)
219 self.log.error("task::Invalid notification msg: %r"%msg)
210 220
211 221 @logged
212 222 def _register_engine(self, uid):
@@ -262,8 +272,7 b' class TaskScheduler(SessionFactory):'
262 272
263 273 raw_msg = lost[msg_id][0]
264 274 idents,msg = self.session.feed_identities(raw_msg, copy=False)
265 msg = self.session.unpack_message(msg, copy=False, content=False)
266 parent = msg['header']
275 parent = self.session.unpack(msg[1].bytes)
267 276 idents = [engine, idents[0]]
268 277
269 278 # build fake error reply
@@ -296,7 +305,7 b' class TaskScheduler(SessionFactory):'
296 305 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
297 306 return
298 307
299
308
300 309 # send to monitor
301 310 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
302 311
@@ -377,8 +386,7 b' class TaskScheduler(SessionFactory):'
377 386
378 387 # FIXME: unpacking a message I've already unpacked, but didn't save:
379 388 idents,msg = self.session.feed_identities(raw_msg, copy=False)
380 msg = self.session.unpack_message(msg, copy=False, content=False)
381 header = msg['header']
389 header = self.session.unpack(msg[1].bytes)
382 390
383 391 try:
384 392 raise why()
@@ -8,7 +8,11 b''
8 8 # the file COPYING, distributed as part of this software.
9 9 #-----------------------------------------------------------------------------
10 10
11 #-----------------------------------------------------------------------------
12 # Imports
13 #-----------------------------------------------------------------------------
11 14
15 import hmac
12 16 import os
13 17 import pprint
14 18 import uuid
@@ -27,10 +31,13 b' from zmq.eventloop.zmqstream import ZMQStream'
27 31
28 32 from IPython.config.configurable import Configurable
29 33 from IPython.utils.importstring import import_item
30 from IPython.utils.traitlets import CStr, Unicode, Bool, Any
34 from IPython.utils.traitlets import CStr, Unicode, Bool, Any, Instance, Set
31 35
32 36 from .util import ISO8601
33 37
38 #-----------------------------------------------------------------------------
39 # utility functions
40 #-----------------------------------------------------------------------------
34 41
35 42 def squash_unicode(obj):
36 43 """coerce unicode back to bytestrings."""
@@ -52,6 +59,10 b' def _date_default(obj):'
52 59 else:
53 60 raise TypeError("%r is not JSON serializable"%obj)
54 61
62 #-----------------------------------------------------------------------------
63 # globals and defaults
64 #-----------------------------------------------------------------------------
65
55 66 _default_key = 'on_unknown' if jsonapi.jsonmod.__name__ == 'jsonlib' else 'default'
56 67 json_packer = lambda obj: jsonapi.dumps(obj, **{_default_key:_date_default})
57 68 json_unpacker = lambda s: squash_unicode(jsonapi.loads(s))
@@ -65,6 +76,10 b' default_unpacker = json_unpacker'
65 76
66 77 DELIM="<IDS|MSG>"
67 78
79 #-----------------------------------------------------------------------------
80 # Classes
81 #-----------------------------------------------------------------------------
82
68 83 class Message(object):
69 84 """A simple message object that maps dict keys to attributes.
70 85
@@ -152,11 +167,21 b' class StreamSession(Configurable):'
152 167 help="""The UUID identifying this session.""")
153 168 def _session_default(self):
154 169 return bytes(uuid.uuid4())
155 username = Unicode(os.environ.get('USER','username'),config=True,
170 username = Unicode(os.environ.get('USER','username'), config=True,
156 171 help="""Username for the Session. Default is your system username.""")
172
173 # message signature related traits:
157 174 key = CStr('', config=True,
158 175 help="""execution key, for extra authentication.""")
159
176 def _key_changed(self, name, old, new):
177 if new:
178 self.auth = hmac.HMAC(new)
179 else:
180 self.auth = None
181 auth = Instance(hmac.HMAC)
182 counters = Instance('collections.defaultdict', (int,))
183 digest_history = Set()
184
160 185 keyfile = Unicode('', config=True,
161 186 help="""path to file containing execution key.""")
162 187 def _keyfile_changed(self, name, old, new):
@@ -202,7 +227,15 b' class StreamSession(Configurable):'
202 227 return True
203 228 header = extract_header(msg_or_header)
204 229 return header.get('key', '') == self.key
205
230
231 def sign(self, msg):
232 """Sign a message with HMAC digest. If no auth, return b''."""
233 if self.auth is None:
234 return b''
235 h = self.auth.copy()
236 for m in msg:
237 h.update(m)
238 return h.hexdigest()
206 239
207 240 def serialize(self, msg, ident=None):
208 241 content = msg.get('content', {})
@@ -218,7 +251,12 b' class StreamSession(Configurable):'
218 251 content = content.encode('utf8')
219 252 else:
220 253 raise TypeError("Content incorrect type: %s"%type(content))
221
254
255 real_message = [self.pack(msg['header']),
256 self.pack(msg['parent_header']),
257 content
258 ]
259
222 260 to_send = []
223 261
224 262 if isinstance(ident, list):
@@ -227,11 +265,11 b' class StreamSession(Configurable):'
227 265 elif ident is not None:
228 266 to_send.append(ident)
229 267 to_send.append(DELIM)
230 if self.key:
231 to_send.append(self.key)
232 to_send.append(self.pack(msg['header']))
233 to_send.append(self.pack(msg['parent_header']))
234 to_send.append(content)
268
269 signature = self.sign(real_message)
270 to_send.append(signature)
271
272 to_send.extend(real_message)
235 273
236 274 return to_send
237 275
@@ -323,9 +361,9 b' class StreamSession(Configurable):'
323 361 ident = [ident]
324 362 if ident is not None:
325 363 to_send.extend(ident)
364
326 365 to_send.append(DELIM)
327 if self.key:
328 to_send.append(self.key)
366 to_send.append(self.sign(msg))
329 367 to_send.extend(msg)
330 368 stream.send_multipart(msg, flags, copy=copy)
331 369
@@ -372,23 +410,19 b' class StreamSession(Configurable):'
372 410 msg will be a list of bytes or Messages, unchanged from input
373 411 msg should be unpackable via self.unpack_message at this point.
374 412 """
375 ikey = int(self.key != '')
376 minlen = 3 + ikey
377 msg = list(msg)
378 idents = []
379 while len(msg) > minlen:
380 if copy:
381 s = msg[0]
382 else:
383 s = msg[0].bytes
384 if s == DELIM:
385 msg.pop(0)
386 break
387 else:
388 idents.append(s)
389 msg.pop(0)
390
391 return idents, msg
413 if copy:
414 idx = msg.index(DELIM)
415 return msg[:idx], msg[idx+1:]
416 else:
417 failed = True
418 for idx,m in enumerate(msg):
419 if m.bytes == DELIM:
420 failed = False
421 break
422 if failed:
423 raise ValueError("DELIM not in msg")
424 idents, msg = msg[:idx], msg[idx+1:]
425 return [m.bytes for m in idents], msg
392 426
393 427 def unpack_message(self, msg, content=True, copy=True):
394 428 """Return a message object from the format
@@ -406,28 +440,31 b' class StreamSession(Configurable):'
406 440 or the non-copying Message object in each place (False)
407 441
408 442 """
409 ikey = int(self.key != '')
410 minlen = 3 + ikey
443 minlen = 4
411 444 message = {}
412 445 if not copy:
413 446 for i in range(minlen):
414 447 msg[i] = msg[i].bytes
415 if ikey:
416 if not self.key == msg[0]:
417 raise KeyError("Invalid Session Key: %s"%msg[0])
448 if self.auth is not None:
449 signature = msg[0]
450 if signature in self.digest_history:
451 raise ValueError("Duplicate Signature: %r"%signature)
452 self.digest_history.add(signature)
453 check = self.sign(msg[1:4])
454 if not signature == check:
455 raise ValueError("Invalid Signature: %r"%signature)
418 456 if not len(msg) >= minlen:
419 457 raise TypeError("malformed message, must have at least %i elements"%minlen)
420 message['header'] = self.unpack(msg[ikey+0])
458 message['header'] = self.unpack(msg[1])
421 459 message['msg_type'] = message['header']['msg_type']
422 message['parent_header'] = self.unpack(msg[ikey+1])
460 message['parent_header'] = self.unpack(msg[2])
423 461 if content:
424 message['content'] = self.unpack(msg[ikey+2])
462 message['content'] = self.unpack(msg[3])
425 463 else:
426 message['content'] = msg[ikey+2]
464 message['content'] = msg[3]
427 465
428 message['buffers'] = msg[ikey+3:]# [ m.buffer for m in msg[3:] ]
466 message['buffers'] = msg[4:]
429 467 return message
430
431 468
432 469 def test_msg2obj():
433 470 am = dict(x=1)
@@ -15,8 +15,8 b" computing. This feature brings up the important question of IPython's security"
15 15 model. This document gives details about this model and how it is implemented
16 16 in IPython's architecture.
17 17
18 Processs and network topology
19 =============================
18 Process and network topology
19 ============================
20 20
21 21 To enable parallel computing, IPython has a number of different processes that
22 22 run. These processes are discussed at length in the IPython documentation and
@@ -36,15 +36,9 b' are summarized here:'
36 36 interactive Python process that is used to coordinate the
37 37 engines to get a parallel computation done.
38 38
39 Collectively, these processes are called the IPython *kernel*, and the hub and schedulers
39 Collectively, these processes are called the IPython *cluster*, and the hub and schedulers
40 40 together are referred to as the *controller*.
41 41
42 .. note::
43
44 Are these really still referred to as the Kernel? It doesn't seem so to me. 'cluster'
45 seems more accurate.
46
47 -MinRK
48 42
49 43 These processes communicate over any transport supported by ZeroMQ (tcp,pgm,infiniband,ipc)
50 44 with a well defined topology. The IPython hub and schedulers listen on sockets. Upon
@@ -118,20 +112,23 b' controller were on loopback on the connecting machine.'
118 112 Authentication
119 113 --------------
120 114
121 To protect users of shared machines, an execution key is used to authenticate all messages.
115 To protect users of shared machines, [HMAC]_ digests are used to sign messages, using a
116 shared key.
122 117
123 118 The Session object that handles the message protocol uses a unique key to verify valid
124 119 messages. This can be any value specified by the user, but the default behavior is a
125 pseudo-random 128-bit number, as generated by `uuid.uuid4()`. This key is checked on every
126 message everywhere it is unpacked (Controller, Engine, and Client) to ensure that it came
127 from an authentic user, and no messages that do not contain this key are acted upon in any
128 way.
129
130 There is exactly one key per cluster - it must be the same everywhere. Typically, the
131 controller creates this key, and stores it in the private connection files
120 pseudo-random 128-bit number, as generated by `uuid.uuid4()`. This key is used to
121 initialize an HMAC object, which digests all messages, and includes that digest as a
122 signature and part of the message. Every message that is unpacked (on Controller, Engine,
123 and Client) will also be digested by the receiver, ensuring that the sender's key is the
124 same as the receiver's. No messages that do not contain this key are acted upon in any
125 way. The key itself is never sent over the network.
126
127 There is exactly one shared key per cluster - it must be the same everywhere. Typically,
128 the controller creates this key, and stores it in the private connection files
132 129 `ipython-{engine|client}.json`. These files are typically stored in the
133 `~/.ipython/cluster_<profile>/security` directory, and are maintained as readable only by
134 the owner, just as is common practice with a user's keys in their `.ssh` directory.
130 `~/.ipython/profile_<name>/security` directory, and are maintained as readable only by the
131 owner, just as is common practice with a user's keys in their `.ssh` directory.
135 132
136 133 .. warning::
137 134
@@ -171,13 +168,15 b' It is highly unlikely that an execution key could be guessed by an attacker'
171 168 in a brute force guessing attack. A given instance of the IPython controller
172 169 only runs for a relatively short amount of time (on the order of hours). Thus
173 170 an attacker would have only a limited amount of time to test a search space of
174 size 2**128.
171 size 2**128. For added security, users can have arbitrarily long keys.
175 172
176 173 .. warning::
177 174
178 If the attacker has gained enough access to intercept loopback connections on
179 *either* the controller or client, then the key is easily deduced from network
180 traffic.
175 If the attacker has gained enough access to intercept loopback connections on *either* the
176 controller or client, then a duplicate message can be sent. To protect against this,
177 recipients only allow each signature once, and consider duplicates invalid. However,
178 the duplicate message could be sent to *another* recipient using the same key,
179 and it would be considered valid.
181 180
182 181
183 182 Unauthorized engines
@@ -322,3 +321,4 b' channel is established.'
322 321
323 322 .. [OpenSSH] <http://www.openssh.com/>
324 323 .. [Paramiko] <http://www.lag.net/paramiko/>
324 .. [HMAC] <http://tools.ietf.org/html/rfc2104.html>
General Comments 0
You need to be logged in to leave comments. Login now