Show More
@@ -14,6 +14,7 b' from __future__ import print_function' | |||||
14 |
|
14 | |||
15 | import os |
|
15 | import os | |
16 | import time |
|
16 | import time | |
|
17 | from getpass import getpass | |||
17 | from pprint import pprint |
|
18 | from pprint import pprint | |
18 |
|
19 | |||
19 | import zmq |
|
20 | import zmq | |
@@ -86,7 +87,29 b' def remote(client, bound=False, block=None, targets=None):' | |||||
86 | #-------------------------------------------------------------------------- |
|
87 | #-------------------------------------------------------------------------- | |
87 |
|
88 | |||
88 | class RemoteFunction(object): |
|
89 | class RemoteFunction(object): | |
89 |
"""Turn an existing function into a remote function |
|
90 | """Turn an existing function into a remote function. | |
|
91 | ||||
|
92 | Parameters | |||
|
93 | ---------- | |||
|
94 | ||||
|
95 | client : Client instance | |||
|
96 | The client to be used to connect to engines | |||
|
97 | f : callable | |||
|
98 | The function to be wrapped into a remote function | |||
|
99 | bound : bool [default: False] | |||
|
100 | Whether the affect the remote namespace when called | |||
|
101 | block : bool [default: None] | |||
|
102 | Whether to wait for results or not. The default behavior is | |||
|
103 | to use the current `block` attribute of `client` | |||
|
104 | targets : valid target list [default: all] | |||
|
105 | The targets on which to execute. | |||
|
106 | """ | |||
|
107 | ||||
|
108 | client = None # the remote connection | |||
|
109 | func = None # the wrapped function | |||
|
110 | block = None # whether to block | |||
|
111 | bound = None # whether to affect the namespace | |||
|
112 | targets = None # where to execute | |||
90 |
|
113 | |||
91 | def __init__(self, client, f, bound=False, block=None, targets=None): |
|
114 | def __init__(self, client, f, bound=False, block=None, targets=None): | |
92 | self.client = client |
|
115 | self.client = client | |
@@ -106,6 +129,7 b' class AbortedTask(object):' | |||||
106 | self.msg_id = msg_id |
|
129 | self.msg_id = msg_id | |
107 |
|
130 | |||
108 | class ControllerError(Exception): |
|
131 | class ControllerError(Exception): | |
|
132 | """Exception Class for errors in the controller (not the Engine).""" | |||
109 | def __init__(self, etype, evalue, tb): |
|
133 | def __init__(self, etype, evalue, tb): | |
110 | self.etype = etype |
|
134 | self.etype = etype | |
111 | self.evalue = evalue |
|
135 | self.evalue = evalue | |
@@ -795,7 +819,7 b' class Client(object):' | |||||
795 | """Pull objects from `target`'s namespace by `keys`""" |
|
819 | """Pull objects from `target`'s namespace by `keys`""" | |
796 | if isinstance(keys, str): |
|
820 | if isinstance(keys, str): | |
797 | pass |
|
821 | pass | |
798 | elif isistance(keys, (list,tuple,set)): |
|
822 | elif isinstance(keys, (list,tuple,set)): | |
799 | for key in keys: |
|
823 | for key in keys: | |
800 | if not isinstance(key, str): |
|
824 | if not isinstance(key, str): | |
801 | raise TypeError |
|
825 | raise TypeError |
@@ -495,29 +495,30 b' class Controller(object):' | |||||
495 | client_id = idents[0] |
|
495 | client_id = idents[0] | |
496 |
|
496 | |||
497 | try: |
|
497 | try: | |
498 |
msg = self.session.unpack_message(msg, content=False |
|
498 | msg = self.session.unpack_message(msg, content=False) | |
499 | except: |
|
499 | except: | |
500 | logger.error("task::client %r sent invalid task message: %s"%( |
|
500 | logger.error("task::client %r sent invalid task message: %s"%( | |
501 | client_id, msg), exc_info=True) |
|
501 | client_id, msg), exc_info=True) | |
502 | return |
|
502 | return | |
503 | rec = init_record(msg) |
|
503 | record = init_record(msg) | |
504 | if MongoDB is not None and isinstance(self.db, MongoDB): |
|
504 | if MongoDB is not None and isinstance(self.db, MongoDB): | |
505 | record['buffers'] = map(Binary, record['buffers']) |
|
505 | record['buffers'] = map(Binary, record['buffers']) | |
506 | rec['client_uuid'] = client_id |
|
506 | record['client_uuid'] = client_id | |
507 | rec['queue'] = 'task' |
|
507 | record['queue'] = 'task' | |
508 | header = msg['header'] |
|
508 | header = msg['header'] | |
509 | msg_id = header['msg_id'] |
|
509 | msg_id = header['msg_id'] | |
510 | self.pending.add(msg_id) |
|
510 | self.pending.add(msg_id) | |
511 | self.db.add_record(msg_id, rec) |
|
511 | self.db.add_record(msg_id, record) | |
512 |
|
512 | |||
513 | def save_task_result(self, idents, msg): |
|
513 | def save_task_result(self, idents, msg): | |
514 | """save the result of a completed task.""" |
|
514 | """save the result of a completed task.""" | |
515 | client_id = idents[0] |
|
515 | client_id = idents[0] | |
516 | try: |
|
516 | try: | |
517 |
msg = self.session.unpack_message(msg, content=False |
|
517 | msg = self.session.unpack_message(msg, content=False) | |
518 | except: |
|
518 | except: | |
519 | logger.error("task::invalid task result message send to %r: %s"%( |
|
519 | logger.error("task::invalid task result message send to %r: %s"%( | |
520 | client_id, msg)) |
|
520 | client_id, msg)) | |
|
521 | raise | |||
521 | return |
|
522 | return | |
522 |
|
523 | |||
523 | parent = msg['parent_header'] |
|
524 | parent = msg['parent_header'] |
@@ -65,7 +65,7 b' def wrap_exception():' | |||||
65 | tb = traceback.format_exception(etype, evalue, tb) |
|
65 | tb = traceback.format_exception(etype, evalue, tb) | |
66 | exc_content = { |
|
66 | exc_content = { | |
67 | 'status' : 'error', |
|
67 | 'status' : 'error', | |
68 |
'traceback' : |
|
68 | 'traceback' : [ line.encode('utf8') for line in tb ], | |
69 | 'etype' : etype.encode('utf8'), |
|
69 | 'etype' : etype.encode('utf8'), | |
70 | 'evalue' : evalue.encode('utf8') |
|
70 | 'evalue' : evalue.encode('utf8') | |
71 | } |
|
71 | } |
General Comments 0
You need to be logged in to leave comments.
Login now