From 75abe3f7a5fa57fd5ce4a792c4413a06151f0e13 Mon Sep 17 00:00:00 2001 From: Kris Maglione Date: Thu, 21 May 2009 14:38:38 -0400 Subject: [PATCH] Add ridiculously overwrought asynchronous 9P client. --- alternative_wmiircs/python/pygmi/__init__.py | 2 +- alternative_wmiircs/python/pygmi/fs.py | 20 +- .../python/pyxp/asyncclient.py | 206 ++++++++++++++++++ alternative_wmiircs/python/pyxp/client.py | 100 +++++---- alternative_wmiircs/python/pyxp/fcall.py | 2 +- alternative_wmiircs/python/pyxp/fields.py | 22 +- alternative_wmiircs/python/pyxp/mux.py | 132 ++++++----- alternative_wmiircs/python/wmiirc | 6 +- 8 files changed, 374 insertions(+), 116 deletions(-) create mode 100644 alternative_wmiircs/python/pyxp/asyncclient.py diff --git a/alternative_wmiircs/python/pygmi/__init__.py b/alternative_wmiircs/python/pygmi/__init__.py index ac9cc4c2..8f75fd67 100644 --- a/alternative_wmiircs/python/pygmi/__init__.py +++ b/alternative_wmiircs/python/pygmi/__init__.py @@ -1,7 +1,7 @@ import os import sys -from pyxp import Client +from pyxp.asyncclient import Client if 'WMII_ADDRESS' in os.environ: client = Client(os.environ['WMII_ADDRESS']) diff --git a/alternative_wmiircs/python/pygmi/fs.py b/alternative_wmiircs/python/pygmi/fs.py index a843ed16..4a93c768 100644 --- a/alternative_wmiircs/python/pygmi/fs.py +++ b/alternative_wmiircs/python/pygmi/fs.py @@ -16,7 +16,7 @@ class Ctl(object): pass def ctl(self, msg): - client.write(self.ctl_path, msg) + client.awrite(self.ctl_path, msg) def __getitem__(self, key): for line in self.ctl_lines(): @@ -58,7 +58,7 @@ class Ctl(object): lines = tuple(client.readlines(self.ctl_path)) if self.ctl_hasid: lines = lines[1:] - return lines[:-1] + return lines _id = None @property @@ -111,7 +111,7 @@ class Dir(Ctl): def __set__(self, dir, val): if not self.writable: raise NotImplementedError('File %s is not writable' % self.name) - return client.write('%s/%s' % (dir.path, self.name), val) + return client.awrite('%s/%s' % (dir.path, self.name), val) @property def ctl_path(self): @@ -341,13 +341,9 @@ class Button(object): def create(self, colors=None, label=None): with client.create(self.path, OWRITE) as f: - if colors or label: - f.write(self.getval(colors, label)) + f.write(self.getval(colors, label)) def remove(self): - try: - client.remove(self.path) - except Exception: - pass + client.aremove(self.path) def getval(self, colors=None, label=None): if colors is None: @@ -358,7 +354,7 @@ class Button(object): colors = property( lambda self: tuple(map(Color, client.read(self.path).split(' ')[:3])), - lambda self, val: client.write(self.path, self.getval(colors=val))) + lambda self, val: client.awrite(self.path, self.getval(colors=val))) label = property( lambda self: client.read(self.path).split(' ', 3)[3], @@ -480,7 +476,7 @@ class Rules(collections.MutableMapping): assert '/' not in k and '\n' not in v lines.append('/%s/ -> %s' % (k, v)) lines.append('') - client.write(self.path, '\n'.join(lines)) + client.awrite(self.path, '\n'.join(lines)) def iteritems(self): for line in client.readlines(self.path): @@ -520,7 +516,7 @@ class Tags(object): self.add(t.id) for b in wmii.lbuttons: if b.name not in self.tags: - b.remove() + b.aremove() self.focus(Tag('sel').id) self.mru = [self.sel.id] diff --git a/alternative_wmiircs/python/pyxp/asyncclient.py b/alternative_wmiircs/python/pyxp/asyncclient.py new file mode 100644 index 00000000..2a08664e --- /dev/null +++ b/alternative_wmiircs/python/pyxp/asyncclient.py @@ -0,0 +1,206 @@ +from pyxp import client, fcall +from pyxp.client import * + +def awithfile(*oargs, **okwargs): + def wrapper(fn): + def next(self, path, *args, **kwargs): + def next(file, exc, tb): + fn(self, (file, exc, tb), *args, **kwargs) + self.aopen(path, next, *oargs, **okwargs) + return next + return wrapper +def wrap_callback(fn, file): + file.called = 0 + def callback(data, exc, tb): + file.called += 1 + file.close() + if callable(fn): + fn(data, exc, tb) + return callback + +class Client(client.Client): + ROOT_FID = 0 + + def awalk(self, path, async, fail=None): + ctxt = dict(path=path, fid=self.getfid(), ofid=ROOT_FID) + def next(resp=None, exc=None, tb=None): + if exc and ctxt['ofid'] != ROOT_FID: + self.aclunk(ctxt['fid']) + if not ctxt['path'] and resp or exc: + if exc and fail: + return self.respond(fail, None, exc, tb) + return self.respond(async, ctxt['fid'], exc, tb) + wname = ctxt['path'][:fcall.MAX_WELEM] + ofid = ctxt['ofid'] + ctxt['path'] = ctxt['path'][fcall.MAX_WELEM:] + if resp: + ctxt['ofid'] = ctxt['fid'] + self.dorpc(fcall.Twalk(fid=ofid, + newfid=ctxt['fid'], + wname=wname), + next) + next() + + def _open(self, path, mode, open): + resp = None + + with self.walk(path) as nfid: + fid = nfid + resp = self.dorpc(open(fid)) + + def cleanup(): + self.aclunk(fid) + file = File(self, '/'.join(path), resp, fid, mode, cleanup) + self.files[fid] = file + + return file + + def _aopen(self, path, mode, open, callback): + resp = None + def next(fid, exc, tb): + def next(resp, exc, tb): + def cleanup(): + self.clunk(fid) + file = File(self, '/'.join(path), resp, fid, mode, cleanup) + self.files[fid] = file + self.respond(callback, file) + self.dorpc(open(fid), next, callback) + self.awalk(path, next, callback) + + def aopen(self, path, callback=True, mode=OREAD): + assert callable(callback) + path = self.splitpath(path) + def open(fid): + return fcall.Topen(fid=fid, mode=mode) + return self._aopen(path, mode, open, callback) + + def acreate(self, path, callback=True, mode=OREAD, perm=0): + path = self.splitpath(path) + name = path.pop() + def open(fid): + return fcall.Tcreate(fid=fid, mode=mode, name=name, perm=perm) + if not callable(callback): + def callback(resp, exc, tb): + if resp: + resp.close() + return self._aopen(path, mode, open, async) + + def aremove(self, path, callback=True): + path = self.splitpath(path) + def next(fid, exc, tb): + self.dorpc(fcall.Tremove(fid=fid), callback) + self.awalk(path, next, callback) + + def astat(self, path, callback): + path = self.splitpath(path) + def next(fid, exc, tb): + def next(resp, exc, tb): + callback(resp.stat, exc, tb) + self.dorpc(fcall.Tstat(fid=fid), next, callback) + + @awithfile() + def aread(self, (file, exc, tb), callback, *args, **kwargs): + if exc: + callback(file, exc, tb) + else: + file.aread(wrap_callback(callback, file), *args, **kwargs) + @awithfile(mode=OWRITE) + def awrite(self, (file, exc, tb), data, callback=True, *args, **kwargs): + if exc: + self.respond(callback, file, exc, tb) + else: + file.awrite(data, wrap_callback(callback, file), *args, **kwargs) + @awithfile() + def areadlines(self, (file, exc, tb), fn): + def callback(resp): + if resp is None: + file.close() + if fn(resp) is False: + file.close() + return False + if exc: + callback(None) + else: + file.sreadlines(callback) + +class File(client.File): + @staticmethod + def respond(callback, data, exc=None, tb=None): + if callable(callback): + callback(data, exc, tb) + + def stat(self, callback): + def next(resp, exc, tb): + callback(resp.stat, exc, tb) + resp = self.dorpc(fcall.Tstat(), next, callback) + + def aread(self, callback, count=None, offset=None, buf=''): + ctxt = dict(res=[], count=self.iounit, offset=self.offset) + if count is not None: + ctxt['count'] = count + if offset is not None: + ctxt['offset'] = offset + def next(resp=None, exc=None, tb=None): + if resp and resp.data: + ctxt['res'].append(resp.data) + ctxt['offset'] += len(resp.data) + if ctxt['count'] == 0: + if offset is None: + self.offset = ctxt['offset'] + return callback(''.join(ctxt['res']), exc, tb) + + n = min(ctxt['count'], self.iounit) + ctxt['count'] -= n + + self.dorpc(fcall.Tread(offset=ctxt['offset'], count=n), + next, callback) + next() + + def areadlines(self, callback): + ctxt = dict(last=None) + def next(data, exc, tb): + res = True + if data: + lines = data.split('\n') + if ctxt['last']: + lines[0] = ctxt['last'] + lines[0] + for i in range(0, len(lines) - 1): + res = callback(lines[i]) + if res is False: + break + ctxt['last'] = lines[-1] + if res is not False: + self.aread(next) + else: + if ctxt['last']: + callback(ctxt['last']) + callback(None) + self.aread(next) + + def awrite(self, data, callback, offset=None): + ctxt = dict(offset=self.offset, off=0) + if offset is not None: + ctxt['offset'] = offset + def next(resp=None, exc=None, tb=None): + if resp: + ctxt['off'] += resp.count + ctxt['offset'] += resp.count + if ctxt['off'] < len(data): + n = min(len(data), self.iounit) + + self.dorpc(fcall.Twrite(offset=ctxt['offset'], + data=data[ctxt['off']:ctxt['off']+n]), + next, callback) + else: + if offset is None: + self.offset = ctxt['offset'] + self.respond(callback, ctxt['off'], exc, tb) + next() + + def aremove(self, callback=True): + def next(resp, exc, tb): + self.close() + self.respond(resp and True, exc, tb) + self.dorpc(fcall.Tremove(), next) + +# vim:se sts=4 sw=4 et: diff --git a/alternative_wmiircs/python/pyxp/client.py b/alternative_wmiircs/python/pyxp/client.py index f019d853..8215a31c 100644 --- a/alternative_wmiircs/python/pyxp/client.py +++ b/alternative_wmiircs/python/pyxp/client.py @@ -1,5 +1,4 @@ -# Copyright (C) 2007 Kris Maglione -# See PERMISSIONS +# Copyright (C) 2009 Kris Maglione import operator import os @@ -44,6 +43,12 @@ class RPCError(Exception): class Client(object): ROOT_FID = 0 + @staticmethod + def respond(callback, data, exc=None, tb=None): + if callable(callback): + callback(data, exc, tb) + + def __enter__(self): return self def __exit__(self, *args): @@ -89,31 +94,50 @@ class Client(object): self.mux.fd.close() self.mux = None - def dorpc(self, req): - resp = self.mux.rpc(req) - if isinstance(resp, fcall.Rerror): - raise RPCError, "RPC returned error (%s): %s" % ( - req.__class__.__name__, resp.ename) - if req.type != resp.type ^ 1: - raise ProtocolException, "Missmatched RPC message types: %s => %s" % ( - req.__class__.__name__, resp.__class__.__name__) - return resp + def dorpc(self, req, callback=None, error=None): + def doresp(resp): + if isinstance(resp, fcall.Rerror): + raise RPCError, "%s[%d] RPC returned error: %s" % ( + req.__class__.__name__, resp.tag, resp.ename) + if req.type != resp.type ^ 1: + raise ProtocolException, "Missmatched RPC message types: %s => %s" % ( + req.__class__.__name__, resp.__class__.__name__) + return resp + def next(mux, resp): + try: + res = doresp(resp) + except Exception, e: + if error: + self.respond(error, None, e, None) + else: + self.respond(callback, None, e, None) + else: + self.respond(callback, res) + if not callback: + return doresp(self.mux.rpc(req)) + self.mux.rpc(req, next) def splitpath(self, path): return [v for v in path.split('/') if v != ''] def getfid(self): with self.lock: - if len(self.fids): + if self.fids: return self.fids.pop() - else: - self.lastfid += 1 - return self.lastfid + self.lastfid += 1 + return self.lastfid def putfid(self, fid): with self.lock: self.files.pop(fid) self.fids.append(fid) + def aclunk(self, fid, callback=None): + def next(resp, exc, tb): + if resp: + self.putfid(fid) + self.respond(callback, resp, exc, tb) + self.dorpc(fcall.Tclunk(fid=fid), next) + def clunk(self, fid): try: self.dorpc(fcall.Tclunk(fid=fid)) @@ -133,9 +157,9 @@ class Client(object): @apply class Res: - def __enter__(self): + def __enter__(res): return fid - def __exit__(self, exc_type, exc_value, traceback): + def __exit__(res, exc_type, exc_value, traceback): if exc_type: self.clunk(fid) return Res @@ -148,20 +172,20 @@ class Client(object): resp = self.dorpc(open(fid)) def cleanup(): - self.clunk(fid) - file = File(self, resp, fid, mode, cleanup) + self.aclunk(fid) + file = File(self, '/'.join(path), resp, fid, mode, cleanup) self.files[fid] = file return file - def open(self, path, mode = OREAD): + def open(self, path, mode=OREAD): path = self.splitpath(path) def open(fid): return fcall.Topen(fid=fid, mode=mode) return self._open(path, mode, open) - def create(self, path, mode = OREAD, perm = 0): + def create(self, path, mode=OREAD, perm=0): path = self.splitpath(path) name = path.pop() @@ -209,22 +233,23 @@ class File(object): def __exit__(self, *args): self.close() - def __init__(self, client, fcall, fid, mode, cleanup): + def __init__(self, client, path, fcall, fid, mode, cleanup): self.lock = RLock() self.client = client + self.path = path self.fid = fid self.cleanup = cleanup self.mode = mode self.iounit = fcall.iounit self.qid = fcall.qid + self.closed = False self.offset = 0 - self.fd = None - def dorpc(self, fcall): + def dorpc(self, fcall, async=None, error=None): if hasattr(fcall, 'fid'): fcall.fid = self.fid - return self.client.dorpc(fcall) + return self.client.dorpc(fcall, async, error) def stat(self): resp = self.dorpc(fcall.Tstat()) @@ -262,6 +287,9 @@ class File(object): if not data: break lines = data.split('\n') + if last: + lines[0] = last + lines[0] + last = None for i in range(0, len(lines) - 1): yield lines[i] last = lines[-1] @@ -300,15 +328,13 @@ class File(object): yield s def close(self): - try: - if self.fd: - self.fd_close() - finally: - self.cleanup() - self.tg = None - self.fid = None - self.client = None - self.qid = None + assert not self.closed + self.closed = True + self.cleanup() + self.tg = None + self.fid = None + self.client = None + self.qid = None def remove(self): try: @@ -319,10 +345,4 @@ class File(object): except Exception: pass - def fd_close(self): - try: - self.fd.close() - finally: - self.fd = None - # vim:se sts=4 sw=4 et: diff --git a/alternative_wmiircs/python/pyxp/fcall.py b/alternative_wmiircs/python/pyxp/fcall.py index 15f4601f..8e3c264d 100644 --- a/alternative_wmiircs/python/pyxp/fcall.py +++ b/alternative_wmiircs/python/pyxp/fcall.py @@ -61,7 +61,7 @@ class Rattach(Fcall): class Terror(Fcall): def __init__(self): - raise Error("Illegal 9P tag 'Terror' encountered") + raise Exception("Illegal 9P tag 'Terror' encountered") class Rerror(Fcall): ename = String() diff --git a/alternative_wmiircs/python/pyxp/fields.py b/alternative_wmiircs/python/pyxp/fields.py index 9fce5f81..e6e0311f 100644 --- a/alternative_wmiircs/python/pyxp/fields.py +++ b/alternative_wmiircs/python/pyxp/fields.py @@ -74,13 +74,9 @@ class Date(Int): def marshall(self, val): return self.encode(int(val.strftime('%s'))) -# To do: use unicode strings, ensure UTF-8. -# Not a problem in Python 3K, but there the other -# data blobs are. -class String(Int): +class Data(Int): def __init__(self, size=2): - super(String, self).__init__(size) - + super(Data, self).__init__(size) def unmarshall(self, data, offset): n = self.decode(data, offset) offset += self.size @@ -89,8 +85,18 @@ class String(Int): def marshall(self, val): return [self.encode(len(val)), val] -class Data(String): - pass +# Note: Py3K strings are Unicode by default. They can't store binary +# data. +class String(Data): + def unmarshall(self, data, offset): + off, val = super(String, self).unmarshall(data, offset) + return off, val.decode('UTF-8') + def marshall(self, val): + if isinstance(val, str): + str.decode('UTF-8') + else: + val = val.encode('UTF-8') + return super(String, self).marshall(val) class Array(Int): def __init__(self, size, spec): diff --git a/alternative_wmiircs/python/pyxp/mux.py b/alternative_wmiircs/python/pyxp/mux.py index 42272df3..553f9741 100644 --- a/alternative_wmiircs/python/pyxp/mux.py +++ b/alternative_wmiircs/python/pyxp/mux.py @@ -14,6 +14,9 @@ # The above copyright notice and this permission notice shall be # included in all copies or substantial portions of the Software. +import sys +import traceback + from pyxp import fields from pyxp.dial import dial from threading import * @@ -22,13 +25,14 @@ Condition = Condition().__class__ __all__ = 'Mux', class Mux(object): - def __init__(self, con, process, mintag=0, maxtag=1<<16 - 1): + def __init__(self, con, process, flush=None, mintag=0, maxtag=1<<16 - 1): self.queue = set() self.lock = RLock() self.rendez = Condition(self.lock) self.outlock = RLock() self.inlock = RLock() self.process = process + self.flush = flush self.wait = {} self.free = set(range(mintag, maxtag)) self.mintag = mintag @@ -42,60 +46,78 @@ class Mux(object): if self.fd is None: raise Exception("No connection") - def rpc(self, dat): - r = self.newrpc(dat) - + def mux(self, rpc): try: + rpc.waiting = True self.lock.acquire() - while self.muxer and self.muxer != r and r.data is None: - r.wait() + while self.muxer and self.muxer != rpc and rpc.data is None: + rpc.wait() - if r.data is None: - if self.muxer and self.muxer != r: - self.fail() - self.muxer = r + if rpc.data is None: + assert not self.muxer or self.muxer is rpc + self.muxer = rpc self.lock.release() try: - while r.data is None: + while rpc.data is None: data = self.recv() if data is None: self.lock.acquire() - self.queue.remove(r) + self.queue.remove(rpc) raise Exception("unexpected eof") self.dispatch(data) - self.lock.acquire() finally: + self.lock.acquire() self.electmuxer() - self.puttag(r) except Exception, e: - import sys - import traceback traceback.print_exc(sys.stdout) - print e + if self.flush: + self.flush(self, rpc.data) + raise e finally: if self.lock._is_owned(): self.lock.release() - return r.data + + if rpc.async: + if callable(rpc.async): + rpc.async(self, rpc.data) + else: + return rpc.data + + def rpc(self, dat, async=None): + rpc = self.newrpc(dat, async) + if async: + with self.lock: + if self.muxer is None: + self.electmuxer() + else: + return self.mux(rpc) def electmuxer(self): + async = None for rpc in self.queue: - if self.muxer != rpc and rpc.async == False: - self.muxer = rpc - rpc.notify() - return + if self.muxer != rpc: + if rpc.async: + async = rpc + else: + self.muxer = rpc + rpc.notify() + return self.muxer = None + if async: + self.muxer = async + Thread(target=self.mux, args=(async,)).start() def dispatch(self, dat): - tag = dat.tag - self.mintag - r = None + tag = dat.tag + rpc = None with self.lock: - r = self.wait.get(tag, None) - if r is None or r not in self.queue: - print "bad rpc tag: %u (no one waiting on it)" % dat.tag + rpc = self.wait.get(tag, None) + if rpc is None or rpc not in self.queue: + #print "bad rpc tag: %u (no one waiting on it)" % dat.tag return - self.queue.remove(r) - r.data = dat - r.notify() + self.puttag(rpc) + self.queue.remove(rpc) + rpc.dispatch(dat) def gettag(self, r): tag = 0 @@ -111,16 +133,13 @@ class Mux(object): self.wait[tag] = r r.tag = tag - r.data.tag = r.tag - r.data = None + r.orig.tag = r.tag return r.tag - def puttag(self, r): - t = r.tag - if self.wait.get(t, None) != r: - self.fail() - del self.wait[t] - self.free.add(t) + def puttag(self, rpc): + if rpc.tag in self.wait: + del self.wait[rpc.tag] + self.free.add(rpc.tag) self.rendez.notify() def send(self, dat): @@ -128,17 +147,20 @@ class Mux(object): n = self.fd.send(data) return n == len(data) def recv(self): - data = self.fd.recv(4) - if data: - len = fields.Int.decoders[4](data, 0) - data += self.fd.recv(len - 4) - return self.process(data) + try: + with self.inlock: + data = self.fd.recv(4) + if data: + len = fields.Int.decoders[4](data, 0) + data += self.fd.recv(len - 4) + return self.process(data) + except Exception, e: + traceback.print_exc(sys.stdout) + print repr(data) + return None - def fail(): - raise Exception() - - def newrpc(self, dat): - rpc = Rpc(self, dat) + def newrpc(self, dat, async=None): + rpc = Rpc(self, dat, async) tag = None with self.lock: @@ -153,11 +175,19 @@ class Mux(object): self.puttag(rpc) class Rpc(Condition): - def __init__(self, mux, data): + def __init__(self, mux, data, async=None): super(Rpc, self).__init__(mux.lock) self.mux = mux + self.orig = data + self.data = None + self.waiting = False + self.async = async + + def dispatch(self, data=None): self.data = data - self.waiting = True - self.async = False + if not self.async or self.waiting: + self.notify() + elif callable(self.async): + Thread(target=self.async, args=(self.mux, data)).start() # vim:se sts=4 sw=4 et: diff --git a/alternative_wmiircs/python/wmiirc b/alternative_wmiircs/python/wmiirc index 77055ad0..661687b1 100644 --- a/alternative_wmiircs/python/wmiirc +++ b/alternative_wmiircs/python/wmiirc @@ -65,7 +65,7 @@ def unresponsive_client(client): # End Configuration -client.write('/event', 'Start wmiirc') +client.awrite('/event', 'Start wmiirc') tags = Tags() bind_events({ @@ -105,7 +105,7 @@ class Actions(events.Actions): def exec_(self, args=''): wmii['exec'] = args def exit(self, args=''): - client.write('/event', 'Quit') + client.awrite('/event', 'Quit') program_menu = Menu(histfile='%s/history.prog' % confpath[0], nhist=5000, action=curry(call, 'wmiir', 'setsid', @@ -140,7 +140,7 @@ class Notice(Button): self.label = '' def write(self, notice): - client.write('/event', 'Notice %s' % notice.replace('\n', ' ')) + client.awrite('/event', 'Notice %s' % notice.replace('\n', ' ')) def show(self, notice): if self.timer: