# cvs.py: CVS conversion code inspired by hg-cvs-import and git-cvsimport # # Copyright 2005-2009 Olivia Mackall and others # # This software may be used and distributed according to the terms of the # GNU General Public License version 2 or any later version. from __future__ import annotations import errno import os import re import socket from mercurial.i18n import _ from mercurial import ( encoding, error, util, ) from mercurial.utils import ( dateutil, procutil, ) from . import ( common, cvsps, ) stringio = util.stringio checktool = common.checktool commit = common.commit converter_source = common.converter_source makedatetimestamp = common.makedatetimestamp NoRepo = common.NoRepo class convert_cvs(converter_source): def __init__(self, ui, repotype, path, revs=None): super(convert_cvs, self).__init__(ui, repotype, path, revs=revs) cvs = os.path.join(path, b"CVS") if not os.path.exists(cvs): raise NoRepo(_(b"%s does not look like a CVS checkout") % path) checktool(b'cvs') self.changeset = None self.files = {} self.tags = {} self.lastbranch = {} self.socket = None self.cvsroot = util.readfile(os.path.join(cvs, b"Root"))[:-1] self.cvsrepo = util.readfile(os.path.join(cvs, b"Repository"))[:-1] self.encoding = encoding.encoding self._connect() def _parse(self): if self.changeset is not None: return self.changeset = {} maxrev = 0 if self.revs: if len(self.revs) > 1: raise error.Abort( _( b'cvs source does not support specifying ' b'multiple revs' ) ) # TODO: handle tags try: # patchset number? maxrev = int(self.revs[0]) except ValueError: raise error.Abort( _(b'revision %s is not a patchset number') % self.revs[0] ) d = encoding.getcwd() try: os.chdir(self.path) cache = b'update' if not self.ui.configbool(b'convert', b'cvsps.cache'): cache = None db = cvsps.createlog(self.ui, cache=cache) db = cvsps.createchangeset( self.ui, db, fuzz=int(self.ui.config(b'convert', b'cvsps.fuzz')), mergeto=self.ui.config(b'convert', b'cvsps.mergeto'), mergefrom=self.ui.config(b'convert', b'cvsps.mergefrom'), ) for cs in db: if maxrev and cs.id > maxrev: break id = b"%d" % cs.id cs.author = self.recode(cs.author) self.lastbranch[cs.branch] = id cs.comment = self.recode(cs.comment) if self.ui.configbool(b'convert', b'localtimezone'): cs.date = makedatetimestamp(cs.date[0]) date = dateutil.datestr(cs.date, b'%Y-%m-%d %H:%M:%S %1%2') self.tags.update(dict.fromkeys(cs.tags, id)) files = {} for f in cs.entries: files[f.file] = b"%s%s" % ( b'.'.join([(b"%d" % x) for x in f.revision]), [b'', b'(DEAD)'][f.dead], ) # add current commit to set c = commit( author=cs.author, date=date, parents=[(b"%d" % p.id) for p in cs.parents], desc=cs.comment, branch=cs.branch or b'', ) self.changeset[id] = c self.files[id] = files self.heads = self.lastbranch.values() finally: os.chdir(d) def _connect(self): root = self.cvsroot conntype = None user, host = None, None cmd = [b'cvs', b'server'] self.ui.status(_(b"connecting to %s\n") % root) if root.startswith(b":pserver:"): root = root[9:] m = re.match( br'(?:(.*?)(?::(.*?))?@)?([^:/]*)(?::(\d*))?(.*)', root ) if m: conntype = b"pserver" user, passw, serv, port, root = m.groups() if not user: user = b"anonymous" if not port: port = 2401 else: port = int(port) format0 = b":pserver:%s@%s:%s" % (user, serv, root) format1 = b":pserver:%s@%s:%d%s" % (user, serv, port, root) if not passw: passw = b"A" cvspass = os.path.expanduser(b"~/.cvspass") try: for line in util.readfile(cvspass).splitlines(): part1, part2 = line.split(b' ', 1) # /1 :pserver:user@example.com:2401/cvsroot/foo # Ah 0: data = fp.read(min(count, chunksize)) if not data: raise error.Abort( _(b"%d bytes missing from remote file") % count ) count -= len(data) output.write(data) return output.getvalue() self._parse() if rev.endswith(b"(DEAD)"): return None, None args = (b"-N -P -kk -r %s --" % rev).split() args.append(self.cvsrepo + b'/' + name) for x in args: self.writep.write(b"Argument %s\n" % x) self.writep.write(b"Directory .\n%s\nco\n" % self.realroot) self.writep.flush() data = b"" mode = None while True: line = self.readp.readline() if line.startswith(b"Created ") or line.startswith(b"Updated "): self.readp.readline() # path self.readp.readline() # entries mode = self.readp.readline()[:-1] count = int(self.readp.readline()[:-1]) data = chunkedread(self.readp, count) elif line.startswith(b" "): data += line[1:] elif line.startswith(b"M "): pass elif line.startswith(b"Mbinary "): count = int(self.readp.readline()[:-1]) data = chunkedread(self.readp, count) else: if line == b"ok\n": if mode is None: raise error.Abort(_(b'malformed response from CVS')) return (data, b"x" in mode and b"x" or b"") elif line.startswith(b"E "): self.ui.warn(_(b"cvs server: %s\n") % line[2:]) elif line.startswith(b"Remove"): self.readp.readline() else: raise error.Abort(_(b"unknown CVS response: %s") % line) def getchanges(self, rev, full): if full: raise error.Abort(_(b"convert from cvs does not support --full")) self._parse() return sorted(self.files[rev].items()), {}, set() def getcommit(self, rev): self._parse() return self.changeset[rev] def gettags(self): self._parse() return self.tags def getchangedfiles(self, rev, i): self._parse() return sorted(self.files[rev])