##// END OF EJS Templates
Merge pull request #6646 from ngoldbaum/iopubfix...
Min RK -
r18181:9fa3ea2b merge
parent child Browse files
Show More
@@ -1,77 +1,77
1 """A script for watching all traffic on the IOPub channel (stdout/stderr/pyerr) of engines.
1 """A script for watching all traffic on the IOPub channel (stdout/stderr/pyerr) of engines.
2
2
3 This connects to the default cluster, or you can pass the path to your ipcontroller-client.json
3 This connects to the default cluster, or you can pass the path to your ipcontroller-client.json
4
4
5 Try running this script, and then running a few jobs that print (and call sys.stdout.flush),
5 Try running this script, and then running a few jobs that print (and call sys.stdout.flush),
6 and you will see the print statements as they arrive, notably not waiting for the results
6 and you will see the print statements as they arrive, notably not waiting for the results
7 to finish.
7 to finish.
8
8
9 You can use the zeromq SUBSCRIBE mechanism to only receive information from specific engines,
9 You can use the zeromq SUBSCRIBE mechanism to only receive information from specific engines,
10 and easily filter by message type.
10 and easily filter by message type.
11
11
12 Authors
12 Authors
13 -------
13 -------
14 * MinRK
14 * MinRK
15 """
15 """
16
16
17 import sys
17 import sys
18 import json
18 import json
19 import zmq
19 import zmq
20
20
21 from IPython.kernel.zmq.session import Session
21 from IPython.kernel.zmq.session import Session
22 from IPython.utils.py3compat import str_to_bytes
22 from IPython.utils.py3compat import str_to_bytes
23 from IPython.utils.path import get_security_file
23 from IPython.utils.path import get_security_file
24
24
25 def main(connection_file):
25 def main(connection_file):
26 """watch iopub channel, and print messages"""
26 """watch iopub channel, and print messages"""
27
27
28 ctx = zmq.Context.instance()
28 ctx = zmq.Context.instance()
29
29
30 with open(connection_file) as f:
30 with open(connection_file) as f:
31 cfg = json.loads(f.read())
31 cfg = json.loads(f.read())
32
32
33 reg_url = cfg['interface']
33 reg_url = cfg['interface']
34 iopub_port = cfg['iopub']
34 iopub_port = cfg['iopub']
35 iopub_url = "%s:%s"%(reg_url, iopub_port)
35 iopub_url = "%s:%s"%(reg_url, iopub_port)
36
36
37 session = Session(key=str_to_bytes(cfg['key']))
37 session = Session(key=str_to_bytes(cfg['key']))
38 sub = ctx.socket(zmq.SUB)
38 sub = ctx.socket(zmq.SUB)
39
39
40 # This will subscribe to all messages:
40 # This will subscribe to all messages:
41 sub.setsockopt(zmq.SUBSCRIBE, b'')
41 sub.setsockopt(zmq.SUBSCRIBE, b'')
42 # replace with b'' with b'engine.1.stdout' to subscribe only to engine 1's stdout
42 # replace with b'' with b'engine.1.stdout' to subscribe only to engine 1's stdout
43 # 0MQ subscriptions are simple 'foo*' matches, so 'engine.1.' subscribes
43 # 0MQ subscriptions are simple 'foo*' matches, so 'engine.1.' subscribes
44 # to everything from engine 1, but there is no way to subscribe to
44 # to everything from engine 1, but there is no way to subscribe to
45 # just stdout from everyone.
45 # just stdout from everyone.
46 # multiple calls to subscribe will add subscriptions, e.g. to subscribe to
46 # multiple calls to subscribe will add subscriptions, e.g. to subscribe to
47 # engine 1's stderr and engine 2's stdout:
47 # engine 1's stderr and engine 2's stdout:
48 # sub.setsockopt(zmq.SUBSCRIBE, b'engine.1.stderr')
48 # sub.setsockopt(zmq.SUBSCRIBE, b'engine.1.stderr')
49 # sub.setsockopt(zmq.SUBSCRIBE, b'engine.2.stdout')
49 # sub.setsockopt(zmq.SUBSCRIBE, b'engine.2.stdout')
50 sub.connect(iopub_url)
50 sub.connect(iopub_url)
51 while True:
51 while True:
52 try:
52 try:
53 idents,msg = session.recv(sub, mode=0)
53 idents,msg = session.recv(sub, mode=0)
54 except KeyboardInterrupt:
54 except KeyboardInterrupt:
55 return
55 return
56 # ident always length 1 here
56 # ident always length 1 here
57 topic = idents[0]
57 topic = idents[0]
58 if msg['msg_type'] == 'stream':
58 if msg['msg_type'] == 'stream':
59 # stdout/stderr
59 # stdout/stderr
60 # stream names are in msg['content']['name'], if you want to handle
60 # stream names are in msg['content']['name'], if you want to handle
61 # them differently
61 # them differently
62 print("%s: %s" % (topic, msg['content']['data']))
62 print("%s: %s" % (topic, msg['content']['text']))
63 elif msg['msg_type'] == 'pyerr':
63 elif msg['msg_type'] == 'pyerr':
64 # Python traceback
64 # Python traceback
65 c = msg['content']
65 c = msg['content']
66 print(topic + ':')
66 print(topic + ':')
67 for line in c['traceback']:
67 for line in c['traceback']:
68 # indent lines
68 # indent lines
69 print(' ' + line)
69 print(' ' + line)
70
70
71 if __name__ == '__main__':
71 if __name__ == '__main__':
72 if len(sys.argv) > 1:
72 if len(sys.argv) > 1:
73 cf = sys.argv[1]
73 cf = sys.argv[1]
74 else:
74 else:
75 # This gets the security file for the default profile:
75 # This gets the security file for the default profile:
76 cf = get_security_file('ipcontroller-client.json')
76 cf = get_security_file('ipcontroller-client.json')
77 main(cf)
77 main(cf)
General Comments 0
You need to be logged in to leave comments. Login now