iopubwatcher.py
77 lines
| 2.6 KiB
| text/x-python
|
PythonLexer
MinRK
|
r5169 | """A script for watching all traffic on the IOPub channel (stdout/stderr/pyerr) of engines. | ||
This connects to the default cluster, or you can pass the path to your ipcontroller-client.json | ||||
Try running this script, and then running a few jobs that print (and call sys.stdout.flush), | ||||
and you will see the print statements as they arrive, notably not waiting for the results | ||||
to finish. | ||||
You can use the zeromq SUBSCRIBE mechanism to only receive information from specific engines, | ||||
and easily filter by message type. | ||||
Authors | ||||
------- | ||||
* MinRK | ||||
""" | ||||
import sys | ||||
import json | ||||
import zmq | ||||
Martin Spacek
|
r9455 | from IPython.kernel.zmq.session import Session | ||
MinRK
|
r5169 | from IPython.utils.py3compat import str_to_bytes | ||
from IPython.utils.path import get_security_file | ||||
def main(connection_file): | ||||
"""watch iopub channel, and print messages""" | ||||
Blake Griffith
|
r15158 | |||
MinRK
|
r5169 | ctx = zmq.Context.instance() | ||
Blake Griffith
|
r15158 | |||
MinRK
|
r5169 | with open(connection_file) as f: | ||
cfg = json.loads(f.read()) | ||||
Blake Griffith
|
r15158 | |||
reg_url = cfg['interface'] | ||||
iopub_port = cfg['iopub'] | ||||
iopub_url = "%s:%s"%(reg_url, iopub_port) | ||||
session = Session(key=str_to_bytes(cfg['key'])) | ||||
MinRK
|
r5169 | sub = ctx.socket(zmq.SUB) | ||
Blake Griffith
|
r15158 | |||
MinRK
|
r5169 | # This will subscribe to all messages: | ||
sub.setsockopt(zmq.SUBSCRIBE, b'') | ||||
# replace with b'' with b'engine.1.stdout' to subscribe only to engine 1's stdout | ||||
# 0MQ subscriptions are simple 'foo*' matches, so 'engine.1.' subscribes | ||||
# to everything from engine 1, but there is no way to subscribe to | ||||
# just stdout from everyone. | ||||
# multiple calls to subscribe will add subscriptions, e.g. to subscribe to | ||||
# engine 1's stderr and engine 2's stdout: | ||||
# sub.setsockopt(zmq.SUBSCRIBE, b'engine.1.stderr') | ||||
# sub.setsockopt(zmq.SUBSCRIBE, b'engine.2.stdout') | ||||
sub.connect(iopub_url) | ||||
while True: | ||||
try: | ||||
idents,msg = session.recv(sub, mode=0) | ||||
except KeyboardInterrupt: | ||||
return | ||||
# ident always length 1 here | ||||
topic = idents[0] | ||||
if msg['msg_type'] == 'stream': | ||||
# stdout/stderr | ||||
# stream names are in msg['content']['name'], if you want to handle | ||||
# them differently | ||||
Nathan Goldbaum
|
r18178 | print("%s: %s" % (topic, msg['content']['text'])) | ||
MinRK
|
r5169 | elif msg['msg_type'] == 'pyerr': | ||
# Python traceback | ||||
c = msg['content'] | ||||
Thomas Kluyver
|
r6455 | print(topic + ':') | ||
MinRK
|
r5169 | for line in c['traceback']: | ||
# indent lines | ||||
Thomas Kluyver
|
r6455 | print(' ' + line) | ||
MinRK
|
r5169 | |||
if __name__ == '__main__': | ||||
if len(sys.argv) > 1: | ||||
cf = sys.argv[1] | ||||
else: | ||||
# This gets the security file for the default profile: | ||||
cf = get_security_file('ipcontroller-client.json') | ||||
Thomas Kluyver
|
r6455 | main(cf) | ||