diff --git a/IPython/parallel/apps/ipcontrollerapp.py b/IPython/parallel/apps/ipcontrollerapp.py index e2fa88b..81a7c2d 100755 --- a/IPython/parallel/apps/ipcontrollerapp.py +++ b/IPython/parallel/apps/ipcontrollerapp.py @@ -30,6 +30,7 @@ from zmq.devices import ProcessMonitoredQueue from zmq.log.handlers import PUBHandler from zmq.utils import jsonapi as json +from IPython.config.application import boolean_flag from IPython.core.newapplication import ProfileDir from IPython.parallel.apps.baseapp import ( @@ -98,7 +99,10 @@ flags.update({ 'reuse existing json connection files') }) -flags.update() +flags.update(boolean_flag('secure', 'IPControllerApp.secure', + "Use HMAC digests for authentication of messages.", + "Don't authenticate messages." +)) class IPControllerApp(BaseParallelApplication): @@ -109,18 +113,18 @@ class IPControllerApp(BaseParallelApplication): # change default to True auto_create = Bool(True, config=True, - help="""Whether to create profile dir if it doesn't exist""") + help="""Whether to create profile dir if it doesn't exist.""") reuse_files = Bool(False, config=True, - help='Whether to reuse existing json connection files [default: False]' + help='Whether to reuse existing json connection files.' ) secure = Bool(True, config=True, - help='Whether to use exec_keys for extra authentication [default: True]' + help='Whether to use HMAC digests for extra message authentication.' ) ssh_server = Unicode(u'', config=True, help="""ssh url for clients to use when connecting to the Controller processes. It should be of the form: [user@]server[:port]. The - Controller\'s listening addresses must be accessible from the ssh server""", + Controller's listening addresses must be accessible from the ssh server""", ) location = Unicode(u'', config=True, help="""The external IP or domain name of the Controller, used for disambiguating diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index 24a5773..ea30b8d 100755 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -468,20 +468,23 @@ class Hub(LoggingFactory): def dispatch_query(self, msg): """Route registration requests and queries from clients.""" - idents, msg = self.session.feed_identities(msg) + try: + idents, msg = self.session.feed_identities(msg) + except ValueError: + idents = [] if not idents: self.log.error("Bad Query Message: %r"%msg) return client_id = idents[0] try: msg = self.session.unpack_message(msg, content=True) - except: + except Exception: content = error.wrap_exception() self.log.error("Bad Query Message: %r"%msg, exc_info=True) self.session.send(self.query, "hub_error", ident=client_id, content=content) return - + print( idents, msg) # print client_id, header, parent, content #switch on message type: msg_type = msg['msg_type'] @@ -1123,9 +1126,10 @@ class Hub(LoggingFactory): return finish(error.wrap_exception()) # clear the existing records + now = datetime.now() rec = empty_record() map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted']) - rec['resubmitted'] = datetime.now() + rec['resubmitted'] = now rec['queue'] = 'task' rec['client_uuid'] = client_id[0] try: @@ -1137,8 +1141,11 @@ class Hub(LoggingFactory): reply = error.wrap_exception() else: # send the messages + now_s = now.strftime(util.ISO8601) for rec in records: header = rec['header'] + # include resubmitted in header to prevent digest collision + header['resubmitted'] = now_s msg = self.session.msg(header['msg_type']) msg['content'] = rec['content'] msg['header'] = header diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py index 28f0d42..1141934 100644 --- a/IPython/parallel/controller/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -196,17 +196,27 @@ class TaskScheduler(SessionFactory): def dispatch_notification(self, msg): """dispatch register/unregister events.""" - idents,msg = self.session.feed_identities(msg) - msg = self.session.unpack_message(msg) + try: + idents,msg = self.session.feed_identities(msg) + except ValueError: + self.log.warn("task::Invalid Message: %r"%msg) + return + try: + msg = self.session.unpack_message(msg) + except ValueError: + self.log.warn("task::Unauthorized message from: %r"%idents) + return + msg_type = msg['msg_type'] + handler = self._notification_handlers.get(msg_type, None) if handler is None: - raise Exception("Unhandled message type: %s"%msg_type) + self.log.error("Unhandled message type: %r"%msg_type) else: try: handler(str(msg['content']['queue'])) except KeyError: - self.log.error("task::Invalid notification msg: %s"%msg) + self.log.error("task::Invalid notification msg: %r"%msg) @logged def _register_engine(self, uid): @@ -262,8 +272,7 @@ class TaskScheduler(SessionFactory): raw_msg = lost[msg_id][0] idents,msg = self.session.feed_identities(raw_msg, copy=False) - msg = self.session.unpack_message(msg, copy=False, content=False) - parent = msg['header'] + parent = self.session.unpack(msg[1].bytes) idents = [engine, idents[0]] # build fake error reply @@ -296,7 +305,7 @@ class TaskScheduler(SessionFactory): self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True) return - + # send to monitor self.mon_stream.send_multipart(['intask']+raw_msg, copy=False) @@ -377,8 +386,7 @@ class TaskScheduler(SessionFactory): # FIXME: unpacking a message I've already unpacked, but didn't save: idents,msg = self.session.feed_identities(raw_msg, copy=False) - msg = self.session.unpack_message(msg, copy=False, content=False) - header = msg['header'] + header = self.session.unpack(msg[1].bytes) try: raise why() diff --git a/IPython/parallel/streamsession.py b/IPython/parallel/streamsession.py index 96e4955..fbff1ac 100644 --- a/IPython/parallel/streamsession.py +++ b/IPython/parallel/streamsession.py @@ -8,7 +8,11 @@ # the file COPYING, distributed as part of this software. #----------------------------------------------------------------------------- +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- +import hmac import os import pprint import uuid @@ -27,10 +31,13 @@ from zmq.eventloop.zmqstream import ZMQStream from IPython.config.configurable import Configurable from IPython.utils.importstring import import_item -from IPython.utils.traitlets import CStr, Unicode, Bool, Any +from IPython.utils.traitlets import CStr, Unicode, Bool, Any, Instance, Set from .util import ISO8601 +#----------------------------------------------------------------------------- +# utility functions +#----------------------------------------------------------------------------- def squash_unicode(obj): """coerce unicode back to bytestrings.""" @@ -52,6 +59,10 @@ def _date_default(obj): else: raise TypeError("%r is not JSON serializable"%obj) +#----------------------------------------------------------------------------- +# globals and defaults +#----------------------------------------------------------------------------- + _default_key = 'on_unknown' if jsonapi.jsonmod.__name__ == 'jsonlib' else 'default' json_packer = lambda obj: jsonapi.dumps(obj, **{_default_key:_date_default}) json_unpacker = lambda s: squash_unicode(jsonapi.loads(s)) @@ -65,6 +76,10 @@ default_unpacker = json_unpacker DELIM="" +#----------------------------------------------------------------------------- +# Classes +#----------------------------------------------------------------------------- + class Message(object): """A simple message object that maps dict keys to attributes. @@ -152,11 +167,21 @@ class StreamSession(Configurable): help="""The UUID identifying this session.""") def _session_default(self): return bytes(uuid.uuid4()) - username = Unicode(os.environ.get('USER','username'),config=True, + username = Unicode(os.environ.get('USER','username'), config=True, help="""Username for the Session. Default is your system username.""") + + # message signature related traits: key = CStr('', config=True, help="""execution key, for extra authentication.""") - + def _key_changed(self, name, old, new): + if new: + self.auth = hmac.HMAC(new) + else: + self.auth = None + auth = Instance(hmac.HMAC) + counters = Instance('collections.defaultdict', (int,)) + digest_history = Set() + keyfile = Unicode('', config=True, help="""path to file containing execution key.""") def _keyfile_changed(self, name, old, new): @@ -202,7 +227,15 @@ class StreamSession(Configurable): return True header = extract_header(msg_or_header) return header.get('key', '') == self.key - + + def sign(self, msg): + """Sign a message with HMAC digest. If no auth, return b''.""" + if self.auth is None: + return b'' + h = self.auth.copy() + for m in msg: + h.update(m) + return h.hexdigest() def serialize(self, msg, ident=None): content = msg.get('content', {}) @@ -218,7 +251,12 @@ class StreamSession(Configurable): content = content.encode('utf8') else: raise TypeError("Content incorrect type: %s"%type(content)) - + + real_message = [self.pack(msg['header']), + self.pack(msg['parent_header']), + content + ] + to_send = [] if isinstance(ident, list): @@ -227,11 +265,11 @@ class StreamSession(Configurable): elif ident is not None: to_send.append(ident) to_send.append(DELIM) - if self.key: - to_send.append(self.key) - to_send.append(self.pack(msg['header'])) - to_send.append(self.pack(msg['parent_header'])) - to_send.append(content) + + signature = self.sign(real_message) + to_send.append(signature) + + to_send.extend(real_message) return to_send @@ -323,9 +361,9 @@ class StreamSession(Configurable): ident = [ident] if ident is not None: to_send.extend(ident) + to_send.append(DELIM) - if self.key: - to_send.append(self.key) + to_send.append(self.sign(msg)) to_send.extend(msg) stream.send_multipart(msg, flags, copy=copy) @@ -372,23 +410,19 @@ class StreamSession(Configurable): msg will be a list of bytes or Messages, unchanged from input msg should be unpackable via self.unpack_message at this point. """ - ikey = int(self.key != '') - minlen = 3 + ikey - msg = list(msg) - idents = [] - while len(msg) > minlen: - if copy: - s = msg[0] - else: - s = msg[0].bytes - if s == DELIM: - msg.pop(0) - break - else: - idents.append(s) - msg.pop(0) - - return idents, msg + if copy: + idx = msg.index(DELIM) + return msg[:idx], msg[idx+1:] + else: + failed = True + for idx,m in enumerate(msg): + if m.bytes == DELIM: + failed = False + break + if failed: + raise ValueError("DELIM not in msg") + idents, msg = msg[:idx], msg[idx+1:] + return [m.bytes for m in idents], msg def unpack_message(self, msg, content=True, copy=True): """Return a message object from the format @@ -406,28 +440,31 @@ class StreamSession(Configurable): or the non-copying Message object in each place (False) """ - ikey = int(self.key != '') - minlen = 3 + ikey + minlen = 4 message = {} if not copy: for i in range(minlen): msg[i] = msg[i].bytes - if ikey: - if not self.key == msg[0]: - raise KeyError("Invalid Session Key: %s"%msg[0]) + if self.auth is not None: + signature = msg[0] + if signature in self.digest_history: + raise ValueError("Duplicate Signature: %r"%signature) + self.digest_history.add(signature) + check = self.sign(msg[1:4]) + if not signature == check: + raise ValueError("Invalid Signature: %r"%signature) if not len(msg) >= minlen: raise TypeError("malformed message, must have at least %i elements"%minlen) - message['header'] = self.unpack(msg[ikey+0]) + message['header'] = self.unpack(msg[1]) message['msg_type'] = message['header']['msg_type'] - message['parent_header'] = self.unpack(msg[ikey+1]) + message['parent_header'] = self.unpack(msg[2]) if content: - message['content'] = self.unpack(msg[ikey+2]) + message['content'] = self.unpack(msg[3]) else: - message['content'] = msg[ikey+2] + message['content'] = msg[3] - message['buffers'] = msg[ikey+3:]# [ m.buffer for m in msg[3:] ] + message['buffers'] = msg[4:] return message - def test_msg2obj(): am = dict(x=1) diff --git a/docs/source/parallel/parallel_security.txt b/docs/source/parallel/parallel_security.txt index 7f65a96..ea8b773 100644 --- a/docs/source/parallel/parallel_security.txt +++ b/docs/source/parallel/parallel_security.txt @@ -15,8 +15,8 @@ computing. This feature brings up the important question of IPython's security model. This document gives details about this model and how it is implemented in IPython's architecture. -Processs and network topology -============================= +Process and network topology +============================ To enable parallel computing, IPython has a number of different processes that run. These processes are discussed at length in the IPython documentation and @@ -36,15 +36,9 @@ are summarized here: interactive Python process that is used to coordinate the engines to get a parallel computation done. -Collectively, these processes are called the IPython *kernel*, and the hub and schedulers +Collectively, these processes are called the IPython *cluster*, and the hub and schedulers together are referred to as the *controller*. -.. note:: - - Are these really still referred to as the Kernel? It doesn't seem so to me. 'cluster' - seems more accurate. - - -MinRK These processes communicate over any transport supported by ZeroMQ (tcp,pgm,infiniband,ipc) with a well defined topology. The IPython hub and schedulers listen on sockets. Upon @@ -118,20 +112,23 @@ controller were on loopback on the connecting machine. Authentication -------------- -To protect users of shared machines, an execution key is used to authenticate all messages. +To protect users of shared machines, [HMAC]_ digests are used to sign messages, using a +shared key. The Session object that handles the message protocol uses a unique key to verify valid messages. This can be any value specified by the user, but the default behavior is a -pseudo-random 128-bit number, as generated by `uuid.uuid4()`. This key is checked on every -message everywhere it is unpacked (Controller, Engine, and Client) to ensure that it came -from an authentic user, and no messages that do not contain this key are acted upon in any -way. - -There is exactly one key per cluster - it must be the same everywhere. Typically, the -controller creates this key, and stores it in the private connection files +pseudo-random 128-bit number, as generated by `uuid.uuid4()`. This key is used to +initialize an HMAC object, which digests all messages, and includes that digest as a +signature and part of the message. Every message that is unpacked (on Controller, Engine, +and Client) will also be digested by the receiver, ensuring that the sender's key is the +same as the receiver's. No messages that do not contain this key are acted upon in any +way. The key itself is never sent over the network. + +There is exactly one shared key per cluster - it must be the same everywhere. Typically, +the controller creates this key, and stores it in the private connection files `ipython-{engine|client}.json`. These files are typically stored in the -`~/.ipython/cluster_/security` directory, and are maintained as readable only by -the owner, just as is common practice with a user's keys in their `.ssh` directory. +`~/.ipython/profile_/security` directory, and are maintained as readable only by the +owner, just as is common practice with a user's keys in their `.ssh` directory. .. warning:: @@ -171,13 +168,15 @@ It is highly unlikely that an execution key could be guessed by an attacker in a brute force guessing attack. A given instance of the IPython controller only runs for a relatively short amount of time (on the order of hours). Thus an attacker would have only a limited amount of time to test a search space of -size 2**128. +size 2**128. For added security, users can have arbitrarily long keys. .. warning:: - If the attacker has gained enough access to intercept loopback connections on - *either* the controller or client, then the key is easily deduced from network - traffic. + If the attacker has gained enough access to intercept loopback connections on *either* the + controller or client, then a duplicate message can be sent. To protect against this, + recipients only allow each signature once, and consider duplicates invalid. However, + the duplicate message could be sent to *another* recipient using the same key, + and it would be considered valid. Unauthorized engines @@ -322,3 +321,4 @@ channel is established. .. [OpenSSH] .. [Paramiko] +.. [HMAC]