|
|
#!/usr/bin/env python
|
|
|
# encoding: utf-8
|
|
|
"""
|
|
|
Job and task components for writing .xml files that the Windows HPC Server
|
|
|
2008 can use to start jobs.
|
|
|
"""
|
|
|
|
|
|
#-----------------------------------------------------------------------------
|
|
|
# Copyright (C) 2008-2009 The IPython Development Team
|
|
|
#
|
|
|
# Distributed under the terms of the BSD License. The full license is in
|
|
|
# the file COPYING, distributed as part of this software.
|
|
|
#-----------------------------------------------------------------------------
|
|
|
|
|
|
#-----------------------------------------------------------------------------
|
|
|
# Imports
|
|
|
#-----------------------------------------------------------------------------
|
|
|
|
|
|
from __future__ import with_statement
|
|
|
|
|
|
import os
|
|
|
import re
|
|
|
import uuid
|
|
|
|
|
|
from xml.etree import ElementTree as ET
|
|
|
|
|
|
from IPython.core.component import Component
|
|
|
from IPython.utils.traitlets import (
|
|
|
Str, Int, List, Instance,
|
|
|
Enum, Bool, CStr
|
|
|
)
|
|
|
|
|
|
#-----------------------------------------------------------------------------
|
|
|
# Job and Task Component
|
|
|
#-----------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
def as_str(value):
|
|
|
if isinstance(value, str):
|
|
|
return value
|
|
|
elif isinstance(value, bool):
|
|
|
if value:
|
|
|
return 'true'
|
|
|
else:
|
|
|
return 'false'
|
|
|
elif isinstance(value, (int, float)):
|
|
|
return repr(value)
|
|
|
else:
|
|
|
return value
|
|
|
|
|
|
|
|
|
def indent(elem, level=0):
|
|
|
i = "\n" + level*" "
|
|
|
if len(elem):
|
|
|
if not elem.text or not elem.text.strip():
|
|
|
elem.text = i + " "
|
|
|
if not elem.tail or not elem.tail.strip():
|
|
|
elem.tail = i
|
|
|
for elem in elem:
|
|
|
indent(elem, level+1)
|
|
|
if not elem.tail or not elem.tail.strip():
|
|
|
elem.tail = i
|
|
|
else:
|
|
|
if level and (not elem.tail or not elem.tail.strip()):
|
|
|
elem.tail = i
|
|
|
|
|
|
|
|
|
def find_username():
|
|
|
domain = os.environ.get('USERDOMAIN')
|
|
|
username = os.environ.get('USERNAME','')
|
|
|
if domain is None:
|
|
|
return username
|
|
|
else:
|
|
|
return '%s\\%s' % (domain, username)
|
|
|
|
|
|
|
|
|
class WinHPCJob(Component):
|
|
|
|
|
|
job_id = Str('')
|
|
|
job_name = Str('MyJob', config=True)
|
|
|
min_cores = Int(1, config=True)
|
|
|
max_cores = Int(1, config=True)
|
|
|
min_sockets = Int(1, config=True)
|
|
|
max_sockets = Int(1, config=True)
|
|
|
min_nodes = Int(1, config=True)
|
|
|
max_nodes = Int(1, config=True)
|
|
|
unit_type = Str("Core", config=True)
|
|
|
auto_calculate_min = Bool(True, config=True)
|
|
|
auto_calculate_max = Bool(True, config=True)
|
|
|
run_until_canceled = Bool(False, config=True)
|
|
|
is_exclusive = Bool(False, config=True)
|
|
|
username = Str(find_username(), config=True)
|
|
|
job_type = Str('Batch', config=True)
|
|
|
priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
|
|
|
default_value='Highest', config=True)
|
|
|
requested_nodes = Str('', config=True)
|
|
|
project = Str('IPython', config=True)
|
|
|
xmlns = Str('http://schemas.microsoft.com/HPCS2008/scheduler/')
|
|
|
version = Str("2.000")
|
|
|
tasks = List([])
|
|
|
|
|
|
@property
|
|
|
def owner(self):
|
|
|
return self.username
|
|
|
|
|
|
def _write_attr(self, root, attr, key):
|
|
|
s = as_str(getattr(self, attr, ''))
|
|
|
if s:
|
|
|
root.set(key, s)
|
|
|
|
|
|
def as_element(self):
|
|
|
# We have to add _A_ type things to get the right order than
|
|
|
# the MSFT XML parser expects.
|
|
|
root = ET.Element('Job')
|
|
|
self._write_attr(root, 'version', '_A_Version')
|
|
|
self._write_attr(root, 'job_name', '_B_Name')
|
|
|
self._write_attr(root, 'unit_type', '_C_UnitType')
|
|
|
self._write_attr(root, 'min_cores', '_D_MinCores')
|
|
|
self._write_attr(root, 'max_cores', '_E_MaxCores')
|
|
|
self._write_attr(root, 'min_sockets', '_F_MinSockets')
|
|
|
self._write_attr(root, 'max_sockets', '_G_MaxSockets')
|
|
|
self._write_attr(root, 'min_nodes', '_H_MinNodes')
|
|
|
self._write_attr(root, 'max_nodes', '_I_MaxNodes')
|
|
|
self._write_attr(root, 'run_until_canceled', '_J_RunUntilCanceled')
|
|
|
self._write_attr(root, 'is_exclusive', '_K_IsExclusive')
|
|
|
self._write_attr(root, 'username', '_L_UserName')
|
|
|
self._write_attr(root, 'job_type', '_M_JobType')
|
|
|
self._write_attr(root, 'priority', '_N_Priority')
|
|
|
self._write_attr(root, 'requested_nodes', '_O_RequestedNodes')
|
|
|
self._write_attr(root, 'auto_calculate_max', '_P_AutoCalculateMax')
|
|
|
self._write_attr(root, 'auto_calculate_min', '_Q_AutoCalculateMin')
|
|
|
self._write_attr(root, 'project', '_R_Project')
|
|
|
self._write_attr(root, 'owner', '_S_Owner')
|
|
|
self._write_attr(root, 'xmlns', '_T_xmlns')
|
|
|
dependencies = ET.SubElement(root, "Dependencies")
|
|
|
etasks = ET.SubElement(root, "Tasks")
|
|
|
for t in self.tasks:
|
|
|
etasks.append(t.as_element())
|
|
|
return root
|
|
|
|
|
|
def tostring(self):
|
|
|
"""Return the string representation of the job description XML."""
|
|
|
root = self.as_element()
|
|
|
indent(root)
|
|
|
txt = ET.tostring(root, encoding="utf-8")
|
|
|
# Now remove the tokens used to order the attributes.
|
|
|
txt = re.sub(r'_[A-Z]_','',txt)
|
|
|
txt = '<?xml version="1.0" encoding="utf-8"?>\n' + txt
|
|
|
return txt
|
|
|
|
|
|
def write(self, filename):
|
|
|
"""Write the XML job description to a file."""
|
|
|
txt = self.tostring()
|
|
|
with open(filename, 'w') as f:
|
|
|
f.write(txt)
|
|
|
|
|
|
def add_task(self, task):
|
|
|
"""Add a task to the job.
|
|
|
|
|
|
Parameters
|
|
|
----------
|
|
|
task : :class:`WinHPCTask`
|
|
|
The task object to add.
|
|
|
"""
|
|
|
self.tasks.append(task)
|
|
|
|
|
|
|
|
|
class WinHPCTask(Component):
|
|
|
|
|
|
task_id = Str('')
|
|
|
task_name = Str('')
|
|
|
version = Str("2.000")
|
|
|
min_cores = Int(1, config=True)
|
|
|
max_cores = Int(1, config=True)
|
|
|
min_sockets = Int(1, config=True)
|
|
|
max_sockets = Int(1, config=True)
|
|
|
min_nodes = Int(1, config=True)
|
|
|
max_nodes = Int(1, config=True)
|
|
|
unit_type = Str("Core", config=True)
|
|
|
command_line = CStr('', config=True)
|
|
|
work_directory = CStr('', config=True)
|
|
|
is_rerunnaable = Bool(True, config=True)
|
|
|
std_out_file_path = CStr('', config=True)
|
|
|
std_err_file_path = CStr('', config=True)
|
|
|
is_parametric = Bool(False, config=True)
|
|
|
environment_variables = Instance(dict, args=(), config=True)
|
|
|
|
|
|
def _write_attr(self, root, attr, key):
|
|
|
s = as_str(getattr(self, attr, ''))
|
|
|
if s:
|
|
|
root.set(key, s)
|
|
|
|
|
|
def as_element(self):
|
|
|
root = ET.Element('Task')
|
|
|
self._write_attr(root, 'version', '_A_Version')
|
|
|
self._write_attr(root, 'task_name', '_B_Name')
|
|
|
self._write_attr(root, 'min_cores', '_C_MinCores')
|
|
|
self._write_attr(root, 'max_cores', '_D_MaxCores')
|
|
|
self._write_attr(root, 'min_sockets', '_E_MinSockets')
|
|
|
self._write_attr(root, 'max_sockets', '_F_MaxSockets')
|
|
|
self._write_attr(root, 'min_nodes', '_G_MinNodes')
|
|
|
self._write_attr(root, 'max_nodes', '_H_MaxNodes')
|
|
|
self._write_attr(root, 'command_line', '_I_CommandLine')
|
|
|
self._write_attr(root, 'work_directory', '_J_WorkDirectory')
|
|
|
self._write_attr(root, 'is_rerunnaable', '_K_IsRerunnable')
|
|
|
self._write_attr(root, 'std_out_file_path', '_L_StdOutFilePath')
|
|
|
self._write_attr(root, 'std_err_file_path', '_M_StdErrFilePath')
|
|
|
self._write_attr(root, 'is_parametric', '_N_IsParametric')
|
|
|
self._write_attr(root, 'unit_type', '_O_UnitType')
|
|
|
root.append(self.get_env_vars())
|
|
|
return root
|
|
|
|
|
|
def get_env_vars(self):
|
|
|
env_vars = ET.Element('EnvironmentVariables')
|
|
|
for k, v in self.environment_variables.items():
|
|
|
variable = ET.SubElement(env_vars, "Variable")
|
|
|
name = ET.SubElement(variable, "Name")
|
|
|
name.text = k
|
|
|
value = ET.SubElement(variable, "Value")
|
|
|
value.text = v
|
|
|
return env_vars
|
|
|
|
|
|
|
|
|
|
|
|
# By declaring these, we can configure the controller and engine separately!
|
|
|
|
|
|
class IPControllerJob(WinHPCJob):
|
|
|
job_name = Str('IPController', config=False)
|
|
|
is_exclusive = Bool(False, config=True)
|
|
|
username = Str(find_username(), config=True)
|
|
|
priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
|
|
|
default_value='Highest', config=True)
|
|
|
requested_nodes = Str('', config=True)
|
|
|
project = Str('IPython', config=True)
|
|
|
|
|
|
|
|
|
class IPEngineSetJob(WinHPCJob):
|
|
|
job_name = Str('IPEngineSet', config=False)
|
|
|
is_exclusive = Bool(False, config=True)
|
|
|
username = Str(find_username(), config=True)
|
|
|
priority = Enum(('Lowest','BelowNormal','Normal','AboveNormal','Highest'),
|
|
|
default_value='Highest', config=True)
|
|
|
requested_nodes = Str('', config=True)
|
|
|
project = Str('IPython', config=True)
|
|
|
|
|
|
|
|
|
class IPControllerTask(WinHPCTask):
|
|
|
|
|
|
task_name = Str('IPController', config=True)
|
|
|
controller_cmd = List(['ipcontroller.exe'], config=True)
|
|
|
controller_args = List(['--log-to-file', '--log-level', '40'], config=True)
|
|
|
# I don't want these to be configurable
|
|
|
std_out_file_path = CStr('', config=False)
|
|
|
std_err_file_path = CStr('', config=False)
|
|
|
min_cores = Int(1, config=False)
|
|
|
max_cores = Int(1, config=False)
|
|
|
min_sockets = Int(1, config=False)
|
|
|
max_sockets = Int(1, config=False)
|
|
|
min_nodes = Int(1, config=False)
|
|
|
max_nodes = Int(1, config=False)
|
|
|
unit_type = Str("Core", config=False)
|
|
|
work_directory = CStr('', config=False)
|
|
|
|
|
|
def __init__(self, parent, name=None, config=None):
|
|
|
super(IPControllerTask, self).__init__(parent, name, config)
|
|
|
the_uuid = uuid.uuid1()
|
|
|
self.std_out_file_path = os.path.join('log','ipcontroller-%s.out' % the_uuid)
|
|
|
self.std_err_file_path = os.path.join('log','ipcontroller-%s.err' % the_uuid)
|
|
|
|
|
|
@property
|
|
|
def command_line(self):
|
|
|
return ' '.join(self.controller_cmd + self.controller_args)
|
|
|
|
|
|
|
|
|
class IPEngineTask(WinHPCTask):
|
|
|
|
|
|
task_name = Str('IPEngine', config=True)
|
|
|
engine_cmd = List(['ipengine.exe'], config=True)
|
|
|
engine_args = List(['--log-to-file', '--log-level', '40'], config=True)
|
|
|
# I don't want these to be configurable
|
|
|
std_out_file_path = CStr('', config=False)
|
|
|
std_err_file_path = CStr('', config=False)
|
|
|
min_cores = Int(1, config=False)
|
|
|
max_cores = Int(1, config=False)
|
|
|
min_sockets = Int(1, config=False)
|
|
|
max_sockets = Int(1, config=False)
|
|
|
min_nodes = Int(1, config=False)
|
|
|
max_nodes = Int(1, config=False)
|
|
|
unit_type = Str("Core", config=False)
|
|
|
work_directory = CStr('', config=False)
|
|
|
|
|
|
def __init__(self, parent, name=None, config=None):
|
|
|
super(IPEngineTask,self).__init__(parent, name, config)
|
|
|
the_uuid = uuid.uuid1()
|
|
|
self.std_out_file_path = os.path.join('log','ipengine-%s.out' % the_uuid)
|
|
|
self.std_err_file_path = os.path.join('log','ipengine-%s.err' % the_uuid)
|
|
|
|
|
|
@property
|
|
|
def command_line(self):
|
|
|
return ' '.join(self.engine_cmd + self.engine_args)
|
|
|
|
|
|
|
|
|
# j = WinHPCJob(None)
|
|
|
# j.job_name = 'IPCluster'
|
|
|
# j.username = 'GNET\\bgranger'
|
|
|
# j.requested_nodes = 'GREEN'
|
|
|
#
|
|
|
# t = WinHPCTask(None)
|
|
|
# t.task_name = 'Controller'
|
|
|
# t.command_line = r"\\blue\domainusers$\bgranger\Python\Python25\Scripts\ipcontroller.exe --log-to-file -p default --log-level 10"
|
|
|
# t.work_directory = r"\\blue\domainusers$\bgranger\.ipython\cluster_default"
|
|
|
# t.std_out_file_path = 'controller-out.txt'
|
|
|
# t.std_err_file_path = 'controller-err.txt'
|
|
|
# t.environment_variables['PYTHONPATH'] = r"\\blue\domainusers$\bgranger\Python\Python25\Lib\site-packages"
|
|
|
# j.add_task(t)
|
|
|
|
|
|
|