[python] Rephrase async client request chains as coroutines.

This commit is contained in:
Kris Maglione 2010-07-09 17:48:46 -04:00
parent f4d2ca3261
commit 71891f6f00
4 changed files with 148 additions and 147 deletions

View File

@ -2,196 +2,194 @@ from pyxp import client, fcall
from pyxp.client import * from pyxp.client import *
from functools import wraps from functools import wraps
def awithfile(*oargs, **okwargs): def send(iter, val, default=None):
def wrapper(fn): try:
@wraps(fn) return iter.send(val)
def next(self, path, *args, **kwargs): except StopIteration:
def next(file, exc, tb): return default
fn(self, (file, exc, tb), *args, **kwargs)
self.aopen(path, next, *oargs, **okwargs) def awithfile(fn):
return next @wraps(fn)
def wrapper(self, path, *args, **kwargs):
gen = fn(self, *args, **kwargs)
callback, fail, mode = next(gen)
def cont(file):
send(gen, file)
self.aopen(path, cont, fail=fail or callback, mode=mode)
return wrapper return wrapper
def wrap_callback(fn, file): def requestchain(fn):
def callback(data, exc, tb): @wraps(fn)
file.close() def wrapper(self, *args, **kwargs):
Client.respond(fn, data, exc, tb) gen = fn(self, *args, **kwargs)
return callback callback, fail = next(gen)
def cont(val):
data = gen.send(val)
if isinstance(data, fcall.Fcall):
self._dorpc(data, cont, fail or callback)
else:
Client.respond(callback, data)
cont(None)
return wrapper
class Client(client.Client): class Client(client.Client):
ROOT_FID = 0 ROOT_FID = 0
def _awalk(self, path, callback, fail=None): def _awalk(fn):
path = self._splitpath(path) @wraps(fn)
ctxt = dict(path=path, fid=self._getfid(), ofid=ROOT_FID) @requestchain
def wrapper(self, *args, **kwargs):
gen = fn(self, *args, **kwargs)
path, callback, fail = next(gen)
def next(resp=None, exc=None, tb=None): path = self._splitpath(path)
if exc and ctxt['ofid'] != ROOT_FID: fid = self._getfid()
self._aclunk(ctxt['fid']) ofid = ROOT_FID
ctxt['fid'] = None
if not ctxt['path'] and resp or exc: def fail_(resp, exc, tb):
return self.respond(fail if exc and fail else callback, if ofid != ROOT_FID:
ctxt['fid'], exc, tb) self._aclunk(fid)
self.respond(fail or callback, resp, exc, tb)
yield callback, fail_
wname = ctxt['path'][:fcall.MAX_WELEM] while path:
ofid = ctxt['ofid'] wname = path[:fcall.MAX_WELEM]
ctxt['path'] = ctxt['path'][fcall.MAX_WELEM:] path = path[fcall.MAX_WELEM:]
if resp:
ctxt['ofid'] = ctxt['fid']
self._dorpc(fcall.Twalk(fid=ofid, newfid=ctxt['fid'], wname=wname), resp = yield fcall.Twalk(fid=ofid, newfid=fid, wname=wname)
next) ofid = fid
next()
resp = fid
while resp is not None:
resp = yield send(gen, resp)
return wrapper
_file = property(lambda self: File) _file = property(lambda self: File)
@_awalk
def _aopen(self, path, mode, fcall, callback, fail=None, origpath=None): def _aopen(self, path, mode, fcall, callback, fail=None, origpath=None):
path = self._splitpath(path) path = self._splitpath(path)
def next(fid, exc, tb): fcall.fid = yield path, callback, fail
def next(resp, exc, tb): resp = yield fcall
file = self._file(self, origpath or '/'.join(path), resp, fid, mode, yield self._file(self, origpath or '/'.join(path), resp, fcall.fid, mode,
cleanup=lambda: self._clunk(fid)) cleanup=lambda: self._clunk(fcall.fid))
self.respond(callback, file)
fcall.fid = fid
self._dorpc(fcall, next, fail or callback)
self._awalk(path, next, fail or callback)
def aopen(self, path, callback=True, fail=None, mode=OREAD): def aopen(self, path, callback=True, fail=None, mode=OREAD):
assert callable(callback) assert callable(callback)
return self._aopen(path, mode, fcall.Topen(mode=mode), self._aopen(path, mode, fcall.Topen(mode=mode), callback, fail)
callback, fail)
def acreate(self, path, callback=True, fail=None, mode=OREAD, perm=0): def acreate(self, path, callback=True, fail=None, mode=OREAD, perm=0):
path = self._splitpath(path) path = self._splitpath(path)
name = path.pop() name = path.pop()
if not callable(callback): self._aopen(path, mode,
callback = lambda resp: resp and resp.close() fcall.Tcreate(mode=mode, name=name, perm=perm),
callback if callable(callback) else lambda resp: resp and resp.close(),
return self._aopen(path, mode, fcall.Tcreate(mode=mode, name=name, perm=perm), fail, origpath='/'.join(path + [name]))
callback, fail, origpath='/'.join(path + [name]))
@_awalk
def aremove(self, path, callback=True, fail=None): def aremove(self, path, callback=True, fail=None):
def next(fid): yield fcall.Tremove(fid=(yield path, callback, fail))
self._dorpc(fcall.Tremove(fid=fid), callback, fail)
self._awalk(path, next, fail or callback)
@_awalk
def astat(self, path, callback, fail=None): def astat(self, path, callback, fail=None):
def next(fid): resp = yield fcall.Tstat(fid=(yield path, callback, fail))
def next(resp): yield resp.stat
self.respond(callback, resp.stat)
self._dorpc(fcall.Tstat(fid=fid), next, fail or callback)
self._awalk(self, next, fail or callback)
@awithfile() @awithfile
def aread(self, (file, exc, tb), callback, *args, **kwargs): def aread(self, callback, fail=None, count=None, offset=None, buf=''):
if exc: file = yield callback, fail, OREAD
self.respond(callback, file, exc, tb) file.aread(callback, fail, count, offset, buf)
else:
file.aread(wrap_callback(callback, file), *args, **kwargs)
@awithfile(mode=OWRITE) @awithfile
def awrite(self, (file, exc, tb), data, callback=True, *args, **kwargs): def awrite(self, data, callback=True, fail=None, offset=None):
if exc: file = yield callback, fail, OWRITE
self.respond(callback, file, exc, tb) file.awrite(data, callback, fail, offset)
else:
file.awrite(data, wrap_callback(callback, file), *args, **kwargs)
@awithfile() @awithfile
def areadlines(self, (file, exc, tb), fn): def areadlines(self, callback):
def callback(resp): file = yield callback, fail, OREAD
if resp is None: file.areadlines(callback)
file.close()
if fn(resp) is False:
file.close()
return False
if exc:
callback(None)
else:
file.sreadlines(callback)
class File(client.File): class File(client.File):
def stat(self, callback): @requestchain
def next(resp, exc, tb): def stat(self, callback, fail=None):
Client.respond(callback, resp.stat, exc, tb) yield callback, fail
resp = self._dorpc(fcall.Tstat(), next, callback) resp = yield fcall.Tstat()
yield resp.stat
@requestchain
def aread(self, callback, fail=None, count=None, offset=None, buf=''): def aread(self, callback, fail=None, count=None, offset=None, buf=''):
ctxt = dict(res=[], count=self.iounit, offset=self.offset) yield callback, fail
if count is not None: setoffset = offset is None
ctxt['count'] = count if count is None:
if offset is not None: count = self.iounit
ctxt['offset'] = offset if offset is None:
offset = self.offset
def next(resp=None, exc=None, tb=None): res = []
if resp and resp.data: while count > 0:
ctxt['res'].append(resp.data) n = min(count, self.iounit)
ctxt['offset'] += len(resp.data) count -= n
resp = yield fcall.Tread(offset=offset, count=n)
res.append(resp.data)
offset += len(resp.data)
if len(resp.data) == 0:
break
if ctxt['count'] == 0: if setoffset:
if offset is None: self.offset = offset
self.offset = ctxt['offset'] yield ''.join(res)
return Client.respond(callback, ''.join(ctxt['res']), exc, tb)
n = min(ctxt['count'], self.iounit)
ctxt['count'] -= n
self._dorpc(fcall.Tread(offset=ctxt['offset'], count=n),
next, fail or callback)
next()
def areadlines(self, callback): def areadlines(self, callback):
ctxt = dict(last=None) class ctxt:
def next(data, exc, tb): last = None
def cont(data, exc, tb):
res = True res = True
if data: if data:
lines = data.split('\n') lines = data.split('\n')
if ctxt['last']: if ctxt.last:
lines[0] = ctxt['last'] + lines[0] lines[0] = ctxt.last + lines[0]
for i in range(0, len(lines) - 1): for i in range(0, len(lines) - 1):
res = callback(lines[i]) res = callback(lines[i])
if res is False: if res is False:
break return
ctxt['last'] = lines[-1] ctxt.last = lines[-1]
if res is not False: self.aread(cont)
self.aread(next)
else: else:
if ctxt['last']: if ctxt.last:
callback(ctxt['last']) callback(ctxt.last)
callback(None) callback(None)
self.aread(next) self.aread(cont)
@requestchain
def awrite(self, data, callback=True, fail=None, offset=None): def awrite(self, data, callback=True, fail=None, offset=None):
ctxt = dict(offset=self.offset, off=0) yield callback, fail
if offset is not None: setoffset = offset is None
ctxt['offset'] = offset if offset is None:
offset = self.offset
def next(resp=None, exc=None, tb=None): off = 0
if resp: while off < len(data):
ctxt['off'] += resp.count n = min(len(data), self.iounit)
ctxt['offset'] += resp.count resp = yield fcall.Twrite(offset=offset, data=data[off:off+n])
if ctxt['off'] < len(data) or not (exc or resp): off += resp.count
n = min(len(data), self.iounit) offset += resp.count
self._dorpc(fcall.Twrite(offset=ctxt['offset'], if setoffset:
data=data[ctxt['off']:ctxt['off']+n]), self.offset = offset
next, fail or callback) yield off
else:
if offset is None:
self.offset = ctxt['offset']
Client.respond(callback, ctxt['off'], exc, tb)
next()
@requestchain
def aremove(self, callback=True, fail=None): def aremove(self, callback=True, fail=None):
def next(resp, exc, tb): yield callback, fail
self.close() yield fcall.Tremove()
if exc and fail: self.close()
Client.respond(fail, resp and True, exc, tb) yield True
else:
Client.respond(callback, resp and True, exc, tb)
self._dorpc(fcall.Tremove(), next)
# vim:se sts=4 sw=4 et: # vim:se sts=4 sw=4 et:

View File

@ -45,8 +45,10 @@ class Client(object):
@staticmethod @staticmethod
def respond(callback, data, exc=None, tb=None): def respond(callback, data, exc=None, tb=None):
if callable(callback): if hasattr(callback, 'func_code'):
callback(*(data, exc, tb)[0:callback.func_code.co_argcount]) callback(*(data, exc, tb)[0:callback.func_code.co_argcount])
elif callable(callback):
callback(data)
def __enter__(self): def __enter__(self):
return self return self

View File

@ -23,18 +23,20 @@ class Int(Field):
def encoder(cls, n): def encoder(cls, n):
if n not in cls.encoders: if n not in cls.encoders:
exec ('def enc(n):\n' + exec ('def enc(n):\n' +
' assert n == n & 0x%s, "Arithmetic overflow"\n' % ('ff' * n) + ' assert n == n & 0x%s, "Arithmetic overflow"\n' +
' return "".join((' + ','.join( ' return ''.join((%s,))'
'chr((n >> %d) & 0xff)' % (i * 8) ) % ('ff' * n,
for i in range(0, n)) + ',))\n') ','.join('chr((n >> %d) & 0xff)' % (i * 8)
for i in range(0, n)))
cls.encoders[n] = enc cls.encoders[n] = enc
return cls.encoders[n] return cls.encoders[n]
@classmethod @classmethod
def decoder(cls, n): def decoder(cls, n):
if n not in cls.decoders: if n not in cls.decoders:
cls.decoders[n] = eval('lambda data, offset: ' + '|'.join( cls.decoders[n] = eval('lambda data, offset: ' +
'ord(data[offset + %d]) << %d' % (i, i * 8) '|'.join('ord(data[offset + %d]) << %d' % (i, i * 8)
for i in range(0, n))) for i in range(0, n)))
return cls.decoders[n] return cls.decoders[n]
def __init__(self, size): def __init__(self, size):

View File

@ -51,9 +51,8 @@ class Message(object):
vals = {} vals = {}
start = offset start = offset
for field in cls.fields: for field in cls.fields:
size, val = field.unmarshall(data, offset) size, vals[field.name] = field.unmarshall(data, offset)
offset += size offset += size
vals[field.name] = val
return offset - start, cls(**vals) return offset - start, cls(**vals)
def marshall(self): def marshall(self):
res = [] res = []