##// END OF EJS Templates
wireproto: use maybecapturestdio() for push responses (API)...
Gregory Szorc -
r36084:caca3ac2 default
parent child Browse files
Show More
@@ -34,27 +34,26 b' httpoldcallstream = None'
34 def putlfile(repo, proto, sha):
34 def putlfile(repo, proto, sha):
35 '''Server command for putting a largefile into a repository's local store
35 '''Server command for putting a largefile into a repository's local store
36 and into the user cache.'''
36 and into the user cache.'''
37 proto.redirect()
37 with proto.mayberedirectstdio() as output:
38
38 path = lfutil.storepath(repo, sha)
39 path = lfutil.storepath(repo, sha)
39 util.makedirs(os.path.dirname(path))
40 util.makedirs(os.path.dirname(path))
40 tmpfp = util.atomictempfile(path, createmode=repo.store.createmode)
41 tmpfp = util.atomictempfile(path, createmode=repo.store.createmode)
42
41
43 try:
42 try:
44 proto.getfile(tmpfp)
43 proto.getfile(tmpfp)
45 tmpfp._fp.seek(0)
44 tmpfp._fp.seek(0)
46 if sha != lfutil.hexsha1(tmpfp._fp):
45 if sha != lfutil.hexsha1(tmpfp._fp):
47 raise IOError(0, _('largefile contents do not match hash'))
46 raise IOError(0, _('largefile contents do not match hash'))
48 tmpfp.close()
47 tmpfp.close()
49 lfutil.linktousercache(repo, sha)
48 lfutil.linktousercache(repo, sha)
50 except IOError as e:
49 except IOError as e:
51 repo.ui.warn(_('largefiles: failed to put %s into store: %s\n') %
50 repo.ui.warn(_('largefiles: failed to put %s into store: %s\n') %
52 (sha, e.strerror))
51 (sha, e.strerror))
53 return wireproto.pushres(1)
52 return wireproto.pushres(1, output.getvalue() if output else '')
54 finally:
53 finally:
55 tmpfp.discard()
54 tmpfp.discard()
56
55
57 return wireproto.pushres(0)
56 return wireproto.pushres(0, output.getvalue() if output else '')
58
57
59 def getlfile(repo, proto, sha):
58 def getlfile(repo, proto, sha):
60 '''Server command for retrieving a largefile from the repository-local
59 '''Server command for retrieving a largefile from the repository-local
@@ -510,16 +510,18 b' class pushres(object):'
510
510
511 The call was successful and returned an integer contained in `self.res`.
511 The call was successful and returned an integer contained in `self.res`.
512 """
512 """
513 def __init__(self, res):
513 def __init__(self, res, output):
514 self.res = res
514 self.res = res
515 self.output = output
515
516
516 class pusherr(object):
517 class pusherr(object):
517 """wireproto reply: failure
518 """wireproto reply: failure
518
519
519 The call failed. The `self.res` attribute contains the error message.
520 The call failed. The `self.res` attribute contains the error message.
520 """
521 """
521 def __init__(self, res):
522 def __init__(self, res, output):
522 self.res = res
523 self.res = res
524 self.output = output
523
525
524 class ooberror(object):
526 class ooberror(object):
525 """wireproto reply: failure of a batch of operation
527 """wireproto reply: failure of a batch of operation
@@ -997,97 +999,98 b' def stream(repo, proto):'
997 def unbundle(repo, proto, heads):
999 def unbundle(repo, proto, heads):
998 their_heads = decodelist(heads)
1000 their_heads = decodelist(heads)
999
1001
1000 try:
1002 with proto.mayberedirectstdio() as output:
1001 proto.redirect()
1002
1003 exchange.check_heads(repo, their_heads, 'preparing changes')
1004
1005 # write bundle data to temporary file because it can be big
1006 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1007 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
1008 r = 0
1009 try:
1003 try:
1010 proto.getfile(fp)
1004 exchange.check_heads(repo, their_heads, 'preparing changes')
1011 fp.seek(0)
1012 gen = exchange.readbundle(repo.ui, fp, None)
1013 if (isinstance(gen, changegroupmod.cg1unpacker)
1014 and not bundle1allowed(repo, 'push')):
1015 if proto.name == 'http':
1016 # need to special case http because stderr do not get to
1017 # the http client on failed push so we need to abuse some
1018 # other error type to make sure the message get to the
1019 # user.
1020 return ooberror(bundle2required)
1021 raise error.Abort(bundle2requiredmain,
1022 hint=bundle2requiredhint)
1023
1005
1024 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1006 # write bundle data to temporary file because it can be big
1025 proto._client())
1007 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1026 if util.safehasattr(r, 'addpart'):
1008 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
1027 # The return looks streamable, we are in the bundle2 case and
1009 r = 0
1028 # should return a stream.
1029 return streamres_legacy(gen=r.getchunks())
1030 return pushres(r)
1031
1032 finally:
1033 fp.close()
1034 os.unlink(tempname)
1035
1036 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1037 # handle non-bundle2 case first
1038 if not getattr(exc, 'duringunbundle2', False):
1039 try:
1010 try:
1040 raise
1011 proto.getfile(fp)
1041 except error.Abort:
1012 fp.seek(0)
1042 # The old code we moved used util.stderr directly.
1013 gen = exchange.readbundle(repo.ui, fp, None)
1043 # We did not change it to minimise code change.
1014 if (isinstance(gen, changegroupmod.cg1unpacker)
1044 # This need to be moved to something proper.
1015 and not bundle1allowed(repo, 'push')):
1045 # Feel free to do it.
1016 if proto.name == 'http':
1046 util.stderr.write("abort: %s\n" % exc)
1017 # need to special case http because stderr do not get to
1047 if exc.hint is not None:
1018 # the http client on failed push so we need to abuse
1048 util.stderr.write("(%s)\n" % exc.hint)
1019 # some other error type to make sure the message get to
1049 return pushres(0)
1020 # the user.
1050 except error.PushRaced:
1021 return ooberror(bundle2required)
1051 return pusherr(str(exc))
1022 raise error.Abort(bundle2requiredmain,
1023 hint=bundle2requiredhint)
1052
1024
1053 bundler = bundle2.bundle20(repo.ui)
1025 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1054 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1026 proto._client())
1055 bundler.addpart(out)
1027 if util.safehasattr(r, 'addpart'):
1056 try:
1028 # The return looks streamable, we are in the bundle2 case
1057 try:
1029 # and should return a stream.
1058 raise
1030 return streamres_legacy(gen=r.getchunks())
1059 except error.PushkeyFailed as exc:
1031 return pushres(r, output.getvalue() if output else '')
1060 # check client caps
1032
1061 remotecaps = getattr(exc, '_replycaps', None)
1033 finally:
1062 if (remotecaps is not None
1034 fp.close()
1063 and 'pushkey' not in remotecaps.get('error', ())):
1035 os.unlink(tempname)
1064 # no support remote side, fallback to Abort handler.
1036
1037 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1038 # handle non-bundle2 case first
1039 if not getattr(exc, 'duringunbundle2', False):
1040 try:
1065 raise
1041 raise
1066 part = bundler.newpart('error:pushkey')
1042 except error.Abort:
1067 part.addparam('in-reply-to', exc.partid)
1043 # The old code we moved used util.stderr directly.
1068 if exc.namespace is not None:
1044 # We did not change it to minimise code change.
1069 part.addparam('namespace', exc.namespace, mandatory=False)
1045 # This need to be moved to something proper.
1070 if exc.key is not None:
1046 # Feel free to do it.
1071 part.addparam('key', exc.key, mandatory=False)
1047 util.stderr.write("abort: %s\n" % exc)
1072 if exc.new is not None:
1048 if exc.hint is not None:
1073 part.addparam('new', exc.new, mandatory=False)
1049 util.stderr.write("(%s)\n" % exc.hint)
1074 if exc.old is not None:
1050 return pushres(0, output.getvalue() if output else '')
1075 part.addparam('old', exc.old, mandatory=False)
1051 except error.PushRaced:
1076 if exc.ret is not None:
1052 return pusherr(str(exc),
1077 part.addparam('ret', exc.ret, mandatory=False)
1053 output.getvalue() if output else '')
1078 except error.BundleValueError as exc:
1054
1079 errpart = bundler.newpart('error:unsupportedcontent')
1055 bundler = bundle2.bundle20(repo.ui)
1080 if exc.parttype is not None:
1056 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1081 errpart.addparam('parttype', exc.parttype)
1057 bundler.addpart(out)
1082 if exc.params:
1058 try:
1083 errpart.addparam('params', '\0'.join(exc.params))
1059 try:
1084 except error.Abort as exc:
1060 raise
1085 manargs = [('message', str(exc))]
1061 except error.PushkeyFailed as exc:
1086 advargs = []
1062 # check client caps
1087 if exc.hint is not None:
1063 remotecaps = getattr(exc, '_replycaps', None)
1088 advargs.append(('hint', exc.hint))
1064 if (remotecaps is not None
1089 bundler.addpart(bundle2.bundlepart('error:abort',
1065 and 'pushkey' not in remotecaps.get('error', ())):
1090 manargs, advargs))
1066 # no support remote side, fallback to Abort handler.
1091 except error.PushRaced as exc:
1067 raise
1092 bundler.newpart('error:pushraced', [('message', str(exc))])
1068 part = bundler.newpart('error:pushkey')
1093 return streamres_legacy(gen=bundler.getchunks())
1069 part.addparam('in-reply-to', exc.partid)
1070 if exc.namespace is not None:
1071 part.addparam('namespace', exc.namespace,
1072 mandatory=False)
1073 if exc.key is not None:
1074 part.addparam('key', exc.key, mandatory=False)
1075 if exc.new is not None:
1076 part.addparam('new', exc.new, mandatory=False)
1077 if exc.old is not None:
1078 part.addparam('old', exc.old, mandatory=False)
1079 if exc.ret is not None:
1080 part.addparam('ret', exc.ret, mandatory=False)
1081 except error.BundleValueError as exc:
1082 errpart = bundler.newpart('error:unsupportedcontent')
1083 if exc.parttype is not None:
1084 errpart.addparam('parttype', exc.parttype)
1085 if exc.params:
1086 errpart.addparam('params', '\0'.join(exc.params))
1087 except error.Abort as exc:
1088 manargs = [('message', str(exc))]
1089 advargs = []
1090 if exc.hint is not None:
1091 advargs.append(('hint', exc.hint))
1092 bundler.addpart(bundle2.bundlepart('error:abort',
1093 manargs, advargs))
1094 except error.PushRaced as exc:
1095 bundler.newpart('error:pushraced', [('message', str(exc))])
1096 return streamres_legacy(gen=bundler.getchunks())
@@ -320,15 +320,13 b' def _callhttp(repo, req, proto, cmd):'
320 req.respond(HTTP_OK, mediatype)
320 req.respond(HTTP_OK, mediatype)
321 return gen
321 return gen
322 elif isinstance(rsp, wireproto.pushres):
322 elif isinstance(rsp, wireproto.pushres):
323 val = proto.restore()
323 rsp = '%d\n%s' % (rsp.res, rsp.output)
324 rsp = '%d\n%s' % (rsp.res, val)
325 req.respond(HTTP_OK, HGTYPE, body=rsp)
324 req.respond(HTTP_OK, HGTYPE, body=rsp)
326 return []
325 return []
327 elif isinstance(rsp, wireproto.pusherr):
326 elif isinstance(rsp, wireproto.pusherr):
328 # This is the httplib workaround documented in _handlehttperror().
327 # This is the httplib workaround documented in _handlehttperror().
329 req.drain()
328 req.drain()
330
329
331 proto.restore()
332 rsp = '0\n%s\n' % rsp.res
330 rsp = '0\n%s\n' % rsp.res
333 req.respond(HTTP_OK, HGTYPE, body=rsp)
331 req.respond(HTTP_OK, HGTYPE, body=rsp)
334 return []
332 return []
General Comments 0
You need to be logged in to leave comments. Login now