# HG changeset patch # User Daniel Atallah # Date 2011-03-23 18:26:56 # Node ID ed97955e0c041aabc6886a6b99527dcbf402c0d6 # Parent 49b818fd26d803ef218e874b4a8c29f2d15fd372 convert/mtn: add support for using monotone's "automate stdio" when available Currently the convert extension spawns a new mtn process for each operation. For a large repository, this ends up being hundreds of thousands of processes. The following enables usage of monotone's "automate stdio" functionality - documented at: http://www.monotone.ca/docs/Automation.html#index-mtn-automate-stdio-188 The effect is that (after determining that a new enough mtn executable is available) a single long-running mtn process is used for all the operations, using stdin/stdout to send commands and read output. This has a pretty significant effect on the performance of some parts of the conversion process. diff --git a/hgext/convert/monotone.py b/hgext/convert/monotone.py --- a/hgext/convert/monotone.py +++ b/hgext/convert/monotone.py @@ -19,6 +19,7 @@ class monotone_source(converter_source, self.ui = ui self.path = path + self.automatestdio = False norepo = NoRepo(_("%s does not look like a monotone repository") % path) @@ -73,9 +74,102 @@ class monotone_source(converter_source, self.rev = rev def mtnrun(self, *args, **kwargs): + if self.automatestdio: + return self.mtnrunstdio(*args, **kwargs) + else: + return self.mtnrunsingle(*args, **kwargs) + + def mtnrunsingle(self, *args, **kwargs): kwargs['d'] = self.path return self.run0('automate', *args, **kwargs) + def mtnrunstdio(self, *args, **kwargs): + # Prepare the command in automate stdio format + command = [] + for k, v in kwargs.iteritems(): + command.append("%s:%s" % (len(k), k)) + if v: + command.append("%s:%s" % (len(v), v)) + if command: + command.insert(0, 'o') + command.append('e') + + command.append('l') + for arg in args: + command += "%s:%s" % (len(arg), arg) + command.append('e') + command = ''.join(command) + + self.ui.debug("mtn: sending '%s'\n" % command) + self.mtnwritefp.write(command) + self.mtnwritefp.flush() + + return self.mtnstdioreadcommandoutput(command) + + def mtnstdioreadpacket(self): + read = None + commandnbr = '' + while read != ':': + read = self.mtnreadfp.read(1) + if not read: + raise util.Abort(_('bad mtn packet - no end of commandnbr')) + commandnbr += read + commandnbr = commandnbr[:-1] + + stream = self.mtnreadfp.read(1) + if stream not in 'mewptl': + raise util.Abort(_('bad mtn packet - bad stream type %s' % stream)) + + read = self.mtnreadfp.read(1) + if read != ':': + raise util.Abort(_('bad mtn packet - no divider before size')) + + read = None + lengthstr = '' + while read != ':': + read = self.mtnreadfp.read(1) + if not read: + raise util.Abort(_('bad mtn packet - no end of packet size')) + lengthstr += read + try: + length = long(lengthstr[:-1]) + except TypeError: + raise util.Abort(_('bad mtn packet - bad packet size %s') + % lengthstr) + + read = self.mtnreadfp.read(length) + if len(read) != length: + raise util.Abort(_("bad mtn packet - unable to read full packet " + "read %s of %s") % (len(read), length)) + + return (commandnbr, stream, length, read) + + def mtnstdioreadcommandoutput(self, command): + retval = '' + while True: + commandnbr, stream, length, output = self.mtnstdioreadpacket() + self.ui.debug('mtn: read packet %s:%s:%s\n' % + (commandnbr, stream, length)) + + if stream == 'l': + # End of command + if output != '0': + raise util.Abort(_("mtn command '%s' returned %s") % + (command, output)) + break + elif stream in 'ew': + # Error, warning output + self.ui.warn(_('%s error:\n') % self.command) + self.ui.warn(output) + elif stream == 'p': + # Progress messages + self.ui.debug('mtn: ' + output) + elif stream == 'm': + # Main stream - command output + retval = output + + return retval + def mtnloadmanifest(self, rev): if self.manifest_rev == rev: return @@ -225,3 +319,43 @@ class monotone_source(converter_source, # This function is only needed to support --filemap # ... and we don't support that raise NotImplementedError() + + def before(self): + # Check if we have a new enough version to use automate stdio + version = 0.0 + try: + versionstr = self.mtnrunsingle("interface_version") + version = float(versionstr) + except Exception: + raise util.Abort(_("unable to determine mtn automate interface " + "version")) + + if version >= 12.0: + self.automatestdio = True + self.ui.debug("mtn automate version %s - using automate stdio\n" % + version) + + # launch the long-running automate stdio process + self.mtnwritefp, self.mtnreadfp = self._run2('automate', 'stdio', + '-d', self.path) + # read the headers + read = self.mtnreadfp.readline() + if read != 'format-version: 2\n': + raise util.Abort(_('mtn automate stdio header unexpected: %s') + % read) + while read != '\n': + read = self.mtnreadfp.readline() + if not read: + raise util.Abort(_("failed to reach end of mtn automate " + "stdio headers")) + else: + self.ui.debug("mtn automate version %s - not using automate stdio " + "(automate >= 12.0 - mtn >= 0.46 is needed)\n" % version) + + def after(self): + if self.automatestdio: + self.mtnwritefp.close() + self.mtnwritefp = None + self.mtnreadfp.close() + self.mtnreadfp = None +