##// END OF EJS Templates
Fix iopubwatcher example script.
Blake Griffith -
Show More
@@ -1,83 +1,77 b''
1 """A script for watching all traffic on the IOPub channel (stdout/stderr/pyerr) of engines.
1 """A script for watching all traffic on the IOPub channel (stdout/stderr/pyerr) of engines.
2
2
3 This connects to the default cluster, or you can pass the path to your ipcontroller-client.json
3 This connects to the default cluster, or you can pass the path to your ipcontroller-client.json
4
4
5 Try running this script, and then running a few jobs that print (and call sys.stdout.flush),
5 Try running this script, and then running a few jobs that print (and call sys.stdout.flush),
6 and you will see the print statements as they arrive, notably not waiting for the results
6 and you will see the print statements as they arrive, notably not waiting for the results
7 to finish.
7 to finish.
8
8
9 You can use the zeromq SUBSCRIBE mechanism to only receive information from specific engines,
9 You can use the zeromq SUBSCRIBE mechanism to only receive information from specific engines,
10 and easily filter by message type.
10 and easily filter by message type.
11
11
12 Authors
12 Authors
13 -------
13 -------
14 * MinRK
14 * MinRK
15 """
15 """
16
16
17 import os
18 import sys
17 import sys
19 import json
18 import json
20 import zmq
19 import zmq
21
20
22 from IPython.kernel.zmq.session import Session
21 from IPython.kernel.zmq.session import Session
23 from IPython.parallel.util import disambiguate_url
24 from IPython.utils.py3compat import str_to_bytes
22 from IPython.utils.py3compat import str_to_bytes
25 from IPython.utils.path import get_security_file
23 from IPython.utils.path import get_security_file
26
24
27 def main(connection_file):
25 def main(connection_file):
28 """watch iopub channel, and print messages"""
26 """watch iopub channel, and print messages"""
29
27
30 ctx = zmq.Context.instance()
28 ctx = zmq.Context.instance()
31
29
32 with open(connection_file) as f:
30 with open(connection_file) as f:
33 cfg = json.loads(f.read())
31 cfg = json.loads(f.read())
34
32
35 location = cfg['location']
33 reg_url = cfg['interface']
36 reg_url = cfg['url']
34 iopub_port = cfg['iopub']
37 session = Session(key=str_to_bytes(cfg['exec_key']))
35 iopub_url = "%s:%s"%(reg_url, iopub_port)
38
36
39 query = ctx.socket(zmq.DEALER)
37 session = Session(key=str_to_bytes(cfg['key']))
40 query.connect(disambiguate_url(cfg['url'], location))
41 session.send(query, "connection_request")
42 idents,msg = session.recv(query, mode=0)
43 c = msg['content']
44 iopub_url = disambiguate_url(c['iopub'], location)
45 sub = ctx.socket(zmq.SUB)
38 sub = ctx.socket(zmq.SUB)
39
46 # This will subscribe to all messages:
40 # This will subscribe to all messages:
47 sub.setsockopt(zmq.SUBSCRIBE, b'')
41 sub.setsockopt(zmq.SUBSCRIBE, b'')
48 # replace with b'' with b'engine.1.stdout' to subscribe only to engine 1's stdout
42 # replace with b'' with b'engine.1.stdout' to subscribe only to engine 1's stdout
49 # 0MQ subscriptions are simple 'foo*' matches, so 'engine.1.' subscribes
43 # 0MQ subscriptions are simple 'foo*' matches, so 'engine.1.' subscribes
50 # to everything from engine 1, but there is no way to subscribe to
44 # to everything from engine 1, but there is no way to subscribe to
51 # just stdout from everyone.
45 # just stdout from everyone.
52 # multiple calls to subscribe will add subscriptions, e.g. to subscribe to
46 # multiple calls to subscribe will add subscriptions, e.g. to subscribe to
53 # engine 1's stderr and engine 2's stdout:
47 # engine 1's stderr and engine 2's stdout:
54 # sub.setsockopt(zmq.SUBSCRIBE, b'engine.1.stderr')
48 # sub.setsockopt(zmq.SUBSCRIBE, b'engine.1.stderr')
55 # sub.setsockopt(zmq.SUBSCRIBE, b'engine.2.stdout')
49 # sub.setsockopt(zmq.SUBSCRIBE, b'engine.2.stdout')
56 sub.connect(iopub_url)
50 sub.connect(iopub_url)
57 while True:
51 while True:
58 try:
52 try:
59 idents,msg = session.recv(sub, mode=0)
53 idents,msg = session.recv(sub, mode=0)
60 except KeyboardInterrupt:
54 except KeyboardInterrupt:
61 return
55 return
62 # ident always length 1 here
56 # ident always length 1 here
63 topic = idents[0]
57 topic = idents[0]
64 if msg['msg_type'] == 'stream':
58 if msg['msg_type'] == 'stream':
65 # stdout/stderr
59 # stdout/stderr
66 # stream names are in msg['content']['name'], if you want to handle
60 # stream names are in msg['content']['name'], if you want to handle
67 # them differently
61 # them differently
68 print("%s: %s" % (topic, msg['content']['data']))
62 print("%s: %s" % (topic, msg['content']['data']))
69 elif msg['msg_type'] == 'pyerr':
63 elif msg['msg_type'] == 'pyerr':
70 # Python traceback
64 # Python traceback
71 c = msg['content']
65 c = msg['content']
72 print(topic + ':')
66 print(topic + ':')
73 for line in c['traceback']:
67 for line in c['traceback']:
74 # indent lines
68 # indent lines
75 print(' ' + line)
69 print(' ' + line)
76
70
77 if __name__ == '__main__':
71 if __name__ == '__main__':
78 if len(sys.argv) > 1:
72 if len(sys.argv) > 1:
79 cf = sys.argv[1]
73 cf = sys.argv[1]
80 else:
74 else:
81 # This gets the security file for the default profile:
75 # This gets the security file for the default profile:
82 cf = get_security_file('ipcontroller-client.json')
76 cf = get_security_file('ipcontroller-client.json')
83 main(cf)
77 main(cf)
General Comments 0
You need to be logged in to leave comments. Login now