##// END OF EJS Templates
Fix iopubwatcher example script.
Blake Griffith -
Show More
@@ -1,83 +1,77
1 1 """A script for watching all traffic on the IOPub channel (stdout/stderr/pyerr) of engines.
2 2
3 3 This connects to the default cluster, or you can pass the path to your ipcontroller-client.json
4 4
5 5 Try running this script, and then running a few jobs that print (and call sys.stdout.flush),
6 6 and you will see the print statements as they arrive, notably not waiting for the results
7 7 to finish.
8 8
9 9 You can use the zeromq SUBSCRIBE mechanism to only receive information from specific engines,
10 10 and easily filter by message type.
11 11
12 12 Authors
13 13 -------
14 14 * MinRK
15 15 """
16 16
17 import os
18 17 import sys
19 18 import json
20 19 import zmq
21 20
22 21 from IPython.kernel.zmq.session import Session
23 from IPython.parallel.util import disambiguate_url
24 22 from IPython.utils.py3compat import str_to_bytes
25 23 from IPython.utils.path import get_security_file
26 24
27 25 def main(connection_file):
28 26 """watch iopub channel, and print messages"""
29
27
30 28 ctx = zmq.Context.instance()
31
29
32 30 with open(connection_file) as f:
33 31 cfg = json.loads(f.read())
34
35 location = cfg['location']
36 reg_url = cfg['url']
37 session = Session(key=str_to_bytes(cfg['exec_key']))
38
39 query = ctx.socket(zmq.DEALER)
40 query.connect(disambiguate_url(cfg['url'], location))
41 session.send(query, "connection_request")
42 idents,msg = session.recv(query, mode=0)
43 c = msg['content']
44 iopub_url = disambiguate_url(c['iopub'], location)
32
33 reg_url = cfg['interface']
34 iopub_port = cfg['iopub']
35 iopub_url = "%s:%s"%(reg_url, iopub_port)
36
37 session = Session(key=str_to_bytes(cfg['key']))
45 38 sub = ctx.socket(zmq.SUB)
39
46 40 # This will subscribe to all messages:
47 41 sub.setsockopt(zmq.SUBSCRIBE, b'')
48 42 # replace with b'' with b'engine.1.stdout' to subscribe only to engine 1's stdout
49 43 # 0MQ subscriptions are simple 'foo*' matches, so 'engine.1.' subscribes
50 44 # to everything from engine 1, but there is no way to subscribe to
51 45 # just stdout from everyone.
52 46 # multiple calls to subscribe will add subscriptions, e.g. to subscribe to
53 47 # engine 1's stderr and engine 2's stdout:
54 48 # sub.setsockopt(zmq.SUBSCRIBE, b'engine.1.stderr')
55 49 # sub.setsockopt(zmq.SUBSCRIBE, b'engine.2.stdout')
56 50 sub.connect(iopub_url)
57 51 while True:
58 52 try:
59 53 idents,msg = session.recv(sub, mode=0)
60 54 except KeyboardInterrupt:
61 55 return
62 56 # ident always length 1 here
63 57 topic = idents[0]
64 58 if msg['msg_type'] == 'stream':
65 59 # stdout/stderr
66 60 # stream names are in msg['content']['name'], if you want to handle
67 61 # them differently
68 62 print("%s: %s" % (topic, msg['content']['data']))
69 63 elif msg['msg_type'] == 'pyerr':
70 64 # Python traceback
71 65 c = msg['content']
72 66 print(topic + ':')
73 67 for line in c['traceback']:
74 68 # indent lines
75 69 print(' ' + line)
76 70
77 71 if __name__ == '__main__':
78 72 if len(sys.argv) > 1:
79 73 cf = sys.argv[1]
80 74 else:
81 75 # This gets the security file for the default profile:
82 76 cf = get_security_file('ipcontroller-client.json')
83 77 main(cf)
General Comments 0
You need to be logged in to leave comments. Login now