##// END OF EJS Templates
Work in multiple places to improve state of the test suite....
Work in multiple places to improve state of the test suite. With these changes, on my system now all the test sub-suites pass except for the Twisted one (see https://bugs.launchpad.net/ipython/+bug/504515 for details on that one).

File last commit:

r2335:6ad607f9
r2398:f173ff8e
Show More
winhpcjob.py
318 lines | 11.3 KiB | text/x-python | PythonLexer
Brian Granger
Added components to manage jobs/tasks for Win HPC job scheduler.
r2326 #!/usr/bin/env python
# encoding: utf-8
"""
Job and task components for writing .xml files that the Windows HPC Server
2008 can use to start jobs.
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2008-2009 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
from __future__ import with_statement
import os
import re
bgranger
Initial version of Win HPC job scehduler support.
r2327 import uuid
Brian Granger
Added components to manage jobs/tasks for Win HPC job scheduler.
r2326
from xml.etree import ElementTree as ET
from xml.dom import minidom
from IPython.core.component import Component
from IPython.external import Itpl
from IPython.utils.traitlets import (
Str, Int, List, Unicode, Instance,
bgranger
Initial version of Win HPC job scehduler support.
r2327 Enum, Bool, CStr
Brian Granger
Added components to manage jobs/tasks for Win HPC job scheduler.
r2326 )
#-----------------------------------------------------------------------------
# Job and Task Component
#-----------------------------------------------------------------------------
def as_str(value):
if isinstance(value, str):
return value
elif isinstance(value, bool):
if value:
return 'true'
else:
return 'false'
elif isinstance(value, (int, float)):
return repr(value)
else:
return value
def indent(elem, level=0):
i = "\n" + level*" "
if len(elem):
if not elem.text or not elem.text.strip():
elem.text = i + " "
if not elem.tail or not elem.tail.strip():
elem.tail = i
for elem in elem:
indent(elem, level+1)
if not elem.tail or not elem.tail.strip():
elem.tail = i
else:
if level and (not elem.tail or not elem.tail.strip()):
elem.tail = i
Brian Granger
More work on the launchers and Win HPC support.
r2333 def find_username():
domain = os.environ.get('USERDOMAIN')
username = os.environ.get('USERNAME','')
if domain is None:
return username
else:
return '%s\\%s' % (domain, username)
Brian Granger
Added components to manage jobs/tasks for Win HPC job scheduler.
r2326 class WinHPCJob(Component):
job_id = Str('')
job_name = Str('MyJob', config=True)
min_cores = Int(1, config=True)
max_cores = Int(1, config=True)
min_sockets = Int(1, config=True)
max_sockets = Int(1, config=True)
min_nodes = Int(1, config=True)
max_nodes = Int(1, config=True)
unit_type = Str("Core", config=True)
auto_calculate_min = Bool(True, config=True)
auto_calculate_max = Bool(True, config=True)
run_until_canceled = Bool(False, config=True)
is_exclusive = Bool(False, config=True)
Brian Granger
More work on the launchers and Win HPC support.
r2333 username = Str(find_username(), config=True)
Brian Granger
Added components to manage jobs/tasks for Win HPC job scheduler.
r2326 job_type = Str('Batch', config=True)
priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
default_value='Highest', config=True)
requested_nodes = Str('', config=True)
project = Str('IPython', config=True)
xmlns = Str('http://schemas.microsoft.com/HPCS2008/scheduler/')
version = Str("2.000")
tasks = List([])
bgranger
Initial version of Win HPC job scehduler support.
r2327 @property
def owner(self):
return self.username
Brian Granger
Added components to manage jobs/tasks for Win HPC job scheduler.
r2326
def _write_attr(self, root, attr, key):
s = as_str(getattr(self, attr, ''))
if s:
root.set(key, s)
def as_element(self):
# We have to add _A_ type things to get the right order than
# the MSFT XML parser expects.
root = ET.Element('Job')
self._write_attr(root, 'version', '_A_Version')
self._write_attr(root, 'job_name', '_B_Name')
self._write_attr(root, 'unit_type', '_C_UnitType')
self._write_attr(root, 'min_cores', '_D_MinCores')
self._write_attr(root, 'max_cores', '_E_MaxCores')
self._write_attr(root, 'min_sockets', '_F_MinSockets')
self._write_attr(root, 'max_sockets', '_G_MaxSockets')
self._write_attr(root, 'min_nodes', '_H_MinNodes')
self._write_attr(root, 'max_nodes', '_I_MaxNodes')
self._write_attr(root, 'run_until_canceled', '_J_RunUntilCanceled')
self._write_attr(root, 'is_exclusive', '_K_IsExclusive')
self._write_attr(root, 'username', '_L_UserName')
self._write_attr(root, 'job_type', '_M_JobType')
self._write_attr(root, 'priority', '_N_Priority')
self._write_attr(root, 'requested_nodes', '_O_RequestedNodes')
self._write_attr(root, 'auto_calculate_max', '_P_AutoCalculateMax')
self._write_attr(root, 'auto_calculate_min', '_Q_AutoCalculateMin')
self._write_attr(root, 'project', '_R_Project')
self._write_attr(root, 'owner', '_S_Owner')
self._write_attr(root, 'xmlns', '_T_xmlns')
dependencies = ET.SubElement(root, "Dependencies")
etasks = ET.SubElement(root, "Tasks")
for t in self.tasks:
etasks.append(t.as_element())
return root
def tostring(self):
"""Return the string representation of the job description XML."""
root = self.as_element()
indent(root)
txt = ET.tostring(root, encoding="utf-8")
# Now remove the tokens used to order the attributes.
txt = re.sub(r'_[A-Z]_','',txt)
txt = '<?xml version="1.0" encoding="utf-8"?>\n' + txt
return txt
def write(self, filename):
"""Write the XML job description to a file."""
txt = self.tostring()
with open(filename, 'w') as f:
f.write(txt)
def add_task(self, task):
"""Add a task to the job.
Parameters
----------
task : :class:`WinHPCTask`
The task object to add.
"""
self.tasks.append(task)
class WinHPCTask(Component):
task_id = Str('')
task_name = Str('')
version = Str("2.000")
min_cores = Int(1, config=True)
max_cores = Int(1, config=True)
min_sockets = Int(1, config=True)
max_sockets = Int(1, config=True)
min_nodes = Int(1, config=True)
max_nodes = Int(1, config=True)
unit_type = Str("Core", config=True)
bgranger
Initial version of Win HPC job scehduler support.
r2327 command_line = CStr('', config=True)
work_directory = CStr('', config=True)
Brian Granger
Added components to manage jobs/tasks for Win HPC job scheduler.
r2326 is_rerunnaable = Bool(True, config=True)
bgranger
Initial version of Win HPC job scehduler support.
r2327 std_out_file_path = CStr('', config=True)
std_err_file_path = CStr('', config=True)
Brian Granger
Added components to manage jobs/tasks for Win HPC job scheduler.
r2326 is_parametric = Bool(False, config=True)
bgranger
Initial version of Win HPC job scehduler support.
r2327 environment_variables = Instance(dict, args=(), config=True)
Brian Granger
Added components to manage jobs/tasks for Win HPC job scheduler.
r2326
def _write_attr(self, root, attr, key):
s = as_str(getattr(self, attr, ''))
if s:
root.set(key, s)
def as_element(self):
root = ET.Element('Task')
self._write_attr(root, 'version', '_A_Version')
self._write_attr(root, 'task_name', '_B_Name')
self._write_attr(root, 'min_cores', '_C_MinCores')
self._write_attr(root, 'max_cores', '_D_MaxCores')
self._write_attr(root, 'min_sockets', '_E_MinSockets')
self._write_attr(root, 'max_sockets', '_F_MaxSockets')
self._write_attr(root, 'min_nodes', '_G_MinNodes')
self._write_attr(root, 'max_nodes', '_H_MaxNodes')
self._write_attr(root, 'command_line', '_I_CommandLine')
self._write_attr(root, 'work_directory', '_J_WorkDirectory')
self._write_attr(root, 'is_rerunnaable', '_K_IsRerunnable')
self._write_attr(root, 'std_out_file_path', '_L_StdOutFilePath')
self._write_attr(root, 'std_err_file_path', '_M_StdErrFilePath')
self._write_attr(root, 'is_parametric', '_N_IsParametric')
self._write_attr(root, 'unit_type', '_O_UnitType')
root.append(self.get_env_vars())
return root
def get_env_vars(self):
env_vars = ET.Element('EnvironmentVariables')
for k, v in self.environment_variables.items():
variable = ET.SubElement(env_vars, "Variable")
name = ET.SubElement(variable, "Name")
name.text = k
value = ET.SubElement(variable, "Value")
value.text = v
return env_vars
bgranger
Initial version of Win HPC job scehduler support.
r2327
# By declaring these, we can configure the controller and engine separately!
Brian Granger
More work on the launchers and Win HPC support.
r2333
class IPControllerJob(WinHPCJob):
job_name = Str('IPController', config=False)
is_exclusive = Bool(False, config=True)
username = Str(find_username(), config=True)
priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
default_value='Highest', config=True)
requested_nodes = Str('', config=True)
project = Str('IPython', config=True)
class IPEngineSetJob(WinHPCJob):
job_name = Str('IPEngineSet', config=False)
is_exclusive = Bool(False, config=True)
username = Str(find_username(), config=True)
priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
default_value='Highest', config=True)
requested_nodes = Str('', config=True)
project = Str('IPython', config=True)
bgranger
Initial version of Win HPC job scehduler support.
r2327 class IPControllerTask(WinHPCTask):
task_name = Str('IPController', config=True)
controller_cmd = List(['ipcontroller.exe'], config=True)
controller_args = List(['--log-to-file', '--log-level', '40'], config=True)
# I don't want these to be configurable
bgranger
Reworking how controller and engines startup in ipcluster....
r2335 std_out_file_path = CStr('', config=False)
std_err_file_path = CStr('', config=False)
bgranger
Initial version of Win HPC job scehduler support.
r2327 min_cores = Int(1, config=False)
max_cores = Int(1, config=False)
min_sockets = Int(1, config=False)
max_sockets = Int(1, config=False)
min_nodes = Int(1, config=False)
max_nodes = Int(1, config=False)
unit_type = Str("Core", config=False)
work_directory = CStr('', config=False)
bgranger
Reworking how controller and engines startup in ipcluster....
r2335 def __init__(self, parent, name=None, config=None):
super(IPControllerTask, self).__init__(parent, name, config)
the_uuid = uuid.uuid1()
self.std_out_file_path = os.path.join('log','ipcontroller-%s.out' % the_uuid)
self.std_err_file_path = os.path.join('log','ipcontroller-%s.err' % the_uuid)
bgranger
Initial version of Win HPC job scehduler support.
r2327 @property
def command_line(self):
return ' '.join(self.controller_cmd + self.controller_args)
class IPEngineTask(WinHPCTask):
task_name = Str('IPEngine', config=True)
engine_cmd = List(['ipengine.exe'], config=True)
engine_args = List(['--log-to-file', '--log-level', '40'], config=True)
# I don't want these to be configurable
bgranger
Reworking how controller and engines startup in ipcluster....
r2335 std_out_file_path = CStr('', config=False)
std_err_file_path = CStr('', config=False)
bgranger
Initial version of Win HPC job scehduler support.
r2327 min_cores = Int(1, config=False)
max_cores = Int(1, config=False)
min_sockets = Int(1, config=False)
max_sockets = Int(1, config=False)
min_nodes = Int(1, config=False)
max_nodes = Int(1, config=False)
unit_type = Str("Core", config=False)
work_directory = CStr('', config=False)
bgranger
Reworking how controller and engines startup in ipcluster....
r2335 def __init__(self, parent, name=None, config=None):
super(IPEngineTask,self).__init__(parent, name, config)
the_uuid = uuid.uuid1()
self.std_out_file_path = os.path.join('log','ipengine-%s.out' % the_uuid)
self.std_err_file_path = os.path.join('log','ipengine-%s.err' % the_uuid)
bgranger
Initial version of Win HPC job scehduler support.
r2327 @property
def command_line(self):
return ' '.join(self.engine_cmd + self.engine_args)
Brian Granger
Added components to manage jobs/tasks for Win HPC job scheduler.
r2326 # j = WinHPCJob(None)
# j.job_name = 'IPCluster'
# j.username = 'GNET\\bgranger'
# j.requested_nodes = 'GREEN'
#
# t = WinHPCTask(None)
# t.task_name = 'Controller'
# t.command_line = r"\\blue\domainusers$\bgranger\Python\Python25\Scripts\ipcontroller.exe --log-to-file -p default --log-level 10"
# t.work_directory = r"\\blue\domainusers$\bgranger\.ipython\cluster_default"
# t.std_out_file_path = 'controller-out.txt'
# t.std_err_file_path = 'controller-err.txt'
# t.environment_variables['PYTHONPATH'] = r"\\blue\domainusers$\bgranger\Python\Python25\Lib\site-packages"
# j.add_task(t)