##// END OF EJS Templates
ipcluster logging adjustments...
MinRK -
Show More
@@ -327,7 +327,9 b' class IPClusterEngines(BaseParallelApplication):'
327 self.early_shutdown = 0
327 self.early_shutdown = 0
328
328
329 def start_engines(self):
329 def start_engines(self):
330 self.log.info("Starting %i engines"%self.n)
330 # Some EngineSetLaunchers ignore `n` and use their own engine count, such as SSH:
331 n = getattr(self.engine_launcher, 'engine_count', self.n)
332 self.log.info("Starting %s Engines with %s", n, self.engine_launcher_class)
331 self.engine_launcher.start(self.n)
333 self.engine_launcher.start(self.n)
332 self.engine_launcher.on_stop(self.engines_stopped_early)
334 self.engine_launcher.on_stop(self.engines_stopped_early)
333 if self.early_shutdown:
335 if self.early_shutdown:
@@ -497,6 +499,7 b' class IPClusterStart(IPClusterEngines):'
497 pass
499 pass
498
500
499 def start_controller(self):
501 def start_controller(self):
502 self.log.info("Starting Controller with %s", self.controller_launcher_class)
500 self.controller_launcher.start()
503 self.controller_launcher.start()
501
504
502 def stop_controller(self):
505 def stop_controller(self):
@@ -186,7 +186,7 b' class BaseLauncher(LoggingConfigurable):'
186 a pass-through so it can be used as a callback.
186 a pass-through so it can be used as a callback.
187 """
187 """
188
188
189 self.log.info('Process %r started: %r' % (self.args[0], data))
189 self.log.debug('Process %r started: %r', self.args[0], data)
190 self.start_data = data
190 self.start_data = data
191 self.state = 'running'
191 self.state = 'running'
192 return data
192 return data
@@ -197,7 +197,7 b' class BaseLauncher(LoggingConfigurable):'
197 This logs the process stopping and sets the state to 'after'. Call
197 This logs the process stopping and sets the state to 'after'. Call
198 this to trigger callbacks registered via :meth:`on_stop`."""
198 this to trigger callbacks registered via :meth:`on_stop`."""
199
199
200 self.log.info('Process %r stopped: %r' % (self.args[0], data))
200 self.log.debug('Process %r stopped: %r', self.args[0], data)
201 self.stop_data = data
201 self.stop_data = data
202 self.state = 'after'
202 self.state = 'after'
203 for i in range(len(self.stop_callbacks)):
203 for i in range(len(self.stop_callbacks)):
@@ -271,6 +271,7 b' class LocalProcessLauncher(BaseLauncher):'
271 return self.cmd_and_args
271 return self.cmd_and_args
272
272
273 def start(self):
273 def start(self):
274 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
274 if self.state == 'before':
275 if self.state == 'before':
275 self.process = Popen(self.args,
276 self.process = Popen(self.args,
276 stdout=PIPE,stderr=PIPE,stdin=PIPE,
277 stdout=PIPE,stderr=PIPE,stdin=PIPE,
@@ -322,7 +323,7 b' class LocalProcessLauncher(BaseLauncher):'
322 line = self.process.stdout.readline()
323 line = self.process.stdout.readline()
323 # a stopped process will be readable but return empty strings
324 # a stopped process will be readable but return empty strings
324 if line:
325 if line:
325 self.log.info(line[:-1])
326 self.log.debug(line[:-1])
326 else:
327 else:
327 self.poll()
328 self.poll()
328
329
@@ -333,7 +334,7 b' class LocalProcessLauncher(BaseLauncher):'
333 line = self.process.stderr.readline()
334 line = self.process.stderr.readline()
334 # a stopped process will be readable but return empty strings
335 # a stopped process will be readable but return empty strings
335 if line:
336 if line:
336 self.log.error(line[:-1])
337 self.log.debug(line[:-1])
337 else:
338 else:
338 self.poll()
339 self.poll()
339
340
@@ -354,7 +355,6 b' class LocalControllerLauncher(LocalProcessLauncher, ControllerMixin):'
354
355
355 def start(self):
356 def start(self):
356 """Start the controller by profile_dir."""
357 """Start the controller by profile_dir."""
357 self.log.info("Starting LocalControllerLauncher: %r" % self.args)
358 return super(LocalControllerLauncher, self).start()
358 return super(LocalControllerLauncher, self).start()
359
359
360
360
@@ -401,8 +401,6 b' class LocalEngineSetLauncher(LocalEngineLauncher):'
401 el.engine_args = copy.deepcopy(self.engine_args)
401 el.engine_args = copy.deepcopy(self.engine_args)
402 el.on_stop(self._notice_engine_stopped)
402 el.on_stop(self._notice_engine_stopped)
403 d = el.start()
403 d = el.start()
404 if i==0:
405 self.log.info("Starting LocalEngineSetLauncher: %r" % el.args)
406 self.launchers[i] = el
404 self.launchers[i] = el
407 dlist.append(d)
405 dlist.append(d)
408 self.notify_start(dlist)
406 self.notify_start(dlist)
@@ -499,7 +497,6 b' class MPIControllerLauncher(MPILauncher, ControllerMixin):'
499
497
500 def start(self):
498 def start(self):
501 """Start the controller by profile_dir."""
499 """Start the controller by profile_dir."""
502 self.log.info("Starting MPIControllerLauncher: %r", self.args)
503 return super(MPIControllerLauncher, self).start(1)
500 return super(MPIControllerLauncher, self).start(1)
504
501
505
502
@@ -520,7 +517,6 b' class MPIEngineSetLauncher(MPILauncher, EngineMixin):'
520 def start(self, n):
517 def start(self, n):
521 """Start n engines by profile or profile_dir."""
518 """Start n engines by profile or profile_dir."""
522 self.n = n
519 self.n = n
523 self.log.info('Starting MPIEngineSetLauncher: %r', self.args)
524 return super(MPIEngineSetLauncher, self).start(n)
520 return super(MPIEngineSetLauncher, self).start(n)
525
521
526 # deprecated MPIExec names
522 # deprecated MPIExec names
@@ -640,7 +636,17 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):'
640 engines = Dict(config=True,
636 engines = Dict(config=True,
641 help="""dict of engines to launch. This is a dict by hostname of ints,
637 help="""dict of engines to launch. This is a dict by hostname of ints,
642 corresponding to the number of engines to start on that host.""")
638 corresponding to the number of engines to start on that host.""")
643
639
640 @property
641 def engine_count(self):
642 """determine engine count from `engines` dict"""
643 count = 0
644 for n in self.engines.itervalues():
645 if isinstance(n, (tuple,list)):
646 n,args = n
647 count += n
648 return count
649
644 def start(self, n):
650 def start(self, n):
645 """Start engines by profile or profile_dir.
651 """Start engines by profile or profile_dir.
646 `n` is ignored, and the `engines` config property is used instead.
652 `n` is ignored, and the `engines` config property is used instead.
@@ -669,8 +675,6 b' class SSHEngineSetLauncher(LocalEngineSetLauncher):'
669 el.engine_args = args
675 el.engine_args = args
670 el.on_stop(self._notice_engine_stopped)
676 el.on_stop(self._notice_engine_stopped)
671 d = el.start(user=user, hostname=host)
677 d = el.start(user=user, hostname=host)
672 if i==0:
673 self.log.info("Starting SSHEngineSetLauncher: %r" % el.args)
674 self.launchers[ "%s/%i" % (host,i) ] = el
678 self.launchers[ "%s/%i" % (host,i) ] = el
675 dlist.append(d)
679 dlist.append(d)
676 self.notify_start(dlist)
680 self.notify_start(dlist)
@@ -745,7 +749,7 b' class WindowsHPCLauncher(BaseLauncher):'
745 '/jobfile:%s' % self.job_file,
749 '/jobfile:%s' % self.job_file,
746 '/scheduler:%s' % self.scheduler
750 '/scheduler:%s' % self.scheduler
747 ]
751 ]
748 self.log.info("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
752 self.log.debug("Starting Win HPC Job: %s" % (self.job_cmd + ' ' + ' '.join(args),))
749
753
750 output = check_output([self.job_cmd]+args,
754 output = check_output([self.job_cmd]+args,
751 env=os.environ,
755 env=os.environ,
@@ -969,7 +973,7 b' class BatchSystemLauncher(BaseLauncher):'
969 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
973 self.batch_template = u'\n'.join([firstline, self.queue_template, rest])
970
974
971 script_as_string = self.formatter.format(self.batch_template, **self.context)
975 script_as_string = self.formatter.format(self.batch_template, **self.context)
972 self.log.info('Writing instantiated batch script: %s' % self.batch_file)
976 self.log.info('Writing batch script: %s', self.batch_file)
973
977
974 with open(self.batch_file, 'w') as f:
978 with open(self.batch_file, 'w') as f:
975 f.write(script_as_string)
979 f.write(script_as_string)
@@ -977,6 +981,7 b' class BatchSystemLauncher(BaseLauncher):'
977
981
978 def start(self, n):
982 def start(self, n):
979 """Start n copies of the process using a batch system."""
983 """Start n copies of the process using a batch system."""
984 self.log.debug("Starting %s: %r", self.__class__.__name__, self.args)
980 # Here we save profile_dir in the context so they
985 # Here we save profile_dir in the context so they
981 # can be used in the batch script template as {profile_dir}
986 # can be used in the batch script template as {profile_dir}
982 self.write_batch_script(n)
987 self.write_batch_script(n)
@@ -1023,7 +1028,6 b' class PBSControllerLauncher(PBSLauncher, BatchClusterAppMixin):'
1023
1028
1024 def start(self):
1029 def start(self):
1025 """Start the controller by profile or profile_dir."""
1030 """Start the controller by profile or profile_dir."""
1026 self.log.info("Starting PBSControllerLauncher: %r" % self.args)
1027 return super(PBSControllerLauncher, self).start(1)
1031 return super(PBSControllerLauncher, self).start(1)
1028
1032
1029
1033
@@ -1039,7 +1043,6 b' class PBSEngineSetLauncher(PBSLauncher, BatchClusterAppMixin):'
1039
1043
1040 def start(self, n):
1044 def start(self, n):
1041 """Start n engines by profile or profile_dir."""
1045 """Start n engines by profile or profile_dir."""
1042 self.log.info('Starting %i engines with PBSEngineSetLauncher: %r' % (n, self.args))
1043 return super(PBSEngineSetLauncher, self).start(n)
1046 return super(PBSEngineSetLauncher, self).start(n)
1044
1047
1045 #SGE is very similar to PBS
1048 #SGE is very similar to PBS
@@ -1064,7 +1067,6 b' class SGEControllerLauncher(SGELauncher, BatchClusterAppMixin):'
1064
1067
1065 def start(self):
1068 def start(self):
1066 """Start the controller by profile or profile_dir."""
1069 """Start the controller by profile or profile_dir."""
1067 self.log.info("Starting SGEControllerLauncher: %r" % self.args)
1068 return super(SGEControllerLauncher, self).start(1)
1070 return super(SGEControllerLauncher, self).start(1)
1069
1071
1070 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
1072 class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):
@@ -1079,7 +1081,6 b' class SGEEngineSetLauncher(SGELauncher, BatchClusterAppMixin):'
1079
1081
1080 def start(self, n):
1082 def start(self, n):
1081 """Start n engines by profile or profile_dir."""
1083 """Start n engines by profile or profile_dir."""
1082 self.log.info('Starting %i engines with SGEEngineSetLauncher: %r' % (n, self.args))
1083 return super(SGEEngineSetLauncher, self).start(n)
1084 return super(SGEEngineSetLauncher, self).start(n)
1084
1085
1085
1086
@@ -1112,6 +1113,7 b' class LSFLauncher(BatchSystemLauncher):'
1112 self.write_batch_script(n)
1113 self.write_batch_script(n)
1113 #output = check_output(self.args, env=os.environ)
1114 #output = check_output(self.args, env=os.environ)
1114 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1115 piped_cmd = self.args[0]+'<\"'+self.args[1]+'\"'
1116 self.log.debug("Starting %s: %s", self.__class__.__name__, piped_cmd)
1115 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1117 p = Popen(piped_cmd, shell=True,env=os.environ,stdout=PIPE)
1116 output,err = p.communicate()
1118 output,err = p.communicate()
1117 job_id = self.parse_job_id(output)
1119 job_id = self.parse_job_id(output)
@@ -1133,7 +1135,6 b' class LSFControllerLauncher(LSFLauncher, BatchClusterAppMixin):'
1133
1135
1134 def start(self):
1136 def start(self):
1135 """Start the controller by profile or profile_dir."""
1137 """Start the controller by profile or profile_dir."""
1136 self.log.info("Starting LSFControllerLauncher: %r" % self.args)
1137 return super(LSFControllerLauncher, self).start(1)
1138 return super(LSFControllerLauncher, self).start(1)
1138
1139
1139
1140
@@ -1149,7 +1150,6 b' class LSFEngineSetLauncher(LSFLauncher, BatchClusterAppMixin):'
1149
1150
1150 def start(self, n):
1151 def start(self, n):
1151 """Start n engines by profile or profile_dir."""
1152 """Start n engines by profile or profile_dir."""
1152 self.log.info('Starting %i engines with LSFEngineSetLauncher: %r' % (n, self.args))
1153 return super(LSFEngineSetLauncher, self).start(n)
1153 return super(LSFEngineSetLauncher, self).start(n)
1154
1154
1155
1155
@@ -1174,7 +1174,6 b' class IPClusterLauncher(LocalProcessLauncher):'
1174 ['--n=%i'%self.ipcluster_n] + self.ipcluster_args
1174 ['--n=%i'%self.ipcluster_n] + self.ipcluster_args
1175
1175
1176 def start(self):
1176 def start(self):
1177 self.log.info("Starting ipcluster: %r" % self.args)
1178 return super(IPClusterLauncher, self).start()
1177 return super(IPClusterLauncher, self).start()
1179
1178
1180 #-----------------------------------------------------------------------------
1179 #-----------------------------------------------------------------------------
General Comments 0
You need to be logged in to leave comments. Login now