class SocketIO:
+ nextseq = 0
+
def __init__(self, sock, objtable=None, debugging=None):
self.mainthread = threading.currentThread()
if debugging is not None:
return ("EXCEPTION", (mod, name, args, tb))
def remotecall(self, oid, methodname, args, kwargs):
- self.debug("remotecall:")
+ self.debug("calling asynccall via remotecall")
seq = self.asynccall(oid, methodname, args, kwargs)
return self.asyncreturn(seq)
def asynccall(self, oid, methodname, args, kwargs):
request = ("call", (oid, methodname, args, kwargs))
- seq = self.putrequest(request)
- self.debug(("asyncall:%d:" % seq), oid, methodname, args, kwargs)
+ seq = self.newseq()
+ self.debug(("asynccall:%d:" % seq), oid, methodname, args, kwargs)
+ self.putmessage((seq, request))
return seq
def asyncreturn(self, seq):
+ self.debug("asyncreturn:%d:call getresponse(): " % seq)
response = self.getresponse(seq)
- self.debug(("asyncreturn:%d:" % seq), response)
+ self.debug(("asyncreturn:%d:response: " % seq), response)
return self.decoderesponse(response)
def decoderesponse(self, response):
if how == "OK":
return what
if how == "EXCEPTION":
- self.debug("decoderesponse: Internal EXCEPTION:", what)
+ self.debug("decoderesponse: EXCEPTION:", what)
mod, name, args, tb = what
self.traceback = tb
if mod: # not string exception
raise SystemError, (how, what)
def mainloop(self):
+ """Listen on socket until I/O not ready or EOF
+
+ pollpacket() will loop looking for seq number None, which never
+ comes. The loop will exit when self.ioready() returns 0.
+
+ """
try:
self.getresponse(None)
except EOFError:
return obj
def _getresponse(self, myseq):
+ self.debug("_getresponse:myseq:", myseq)
if threading.currentThread() is self.mainthread:
- # Main thread: does all reading of requests and responses
+ # Main thread: does all reading of requests or responses
+ # Loop here until there is message traffic on the socket
while 1:
response = self.pollresponse(myseq, None)
if response is not None:
del self.responses[myseq]
del self.cvars[myseq]
self.statelock.release()
- return response
-
- def putrequest(self, request):
- seq = self.newseq()
- self.putmessage((seq, request))
- return seq
-
- nextseq = 0
+ return response # might be None
def newseq(self):
self.nextseq = seq = self.nextseq + 2
return seq
def putmessage(self, message):
- ##self.debug("putmessage: ", message)
+ self.debug("putmessage:%d:" % message[0])
try:
s = pickle.dumps(message)
except:
return message
def pollresponse(self, myseq, wait=0.0):
- # Loop while there's no more buffered input or until specific response
+ """Handle messages received on the socket.
+
+ Some messages received may be asynchronous 'call' commands, and
+ some may be responses intended for other threads.
+
+ Loop until message with myseq sequence number is received. Save others
+ in self.responses and notify the owning thread, except that 'call'
+ commands are handed off to localcall() and the response sent back
+ across the link with the appropriate sequence number.
+
+ """
while 1:
message = self.pollmessage(wait)
- if message is None:
+ if message is None: # socket not ready
return None
wait = 0.0
seq, resq = message
+ self.debug("pollresponse:%d:myseq:%s" % (seq, myseq))
if resq[0] == "call":
- self.debug("call_localcall:%d:" % seq)
+ self.debug("pollresponse:%d:call_localcall" % seq)
response = self.localcall(resq)
+ self.debug("pollresponse:%d:response:%s" % (seq, response))
self.putmessage((seq, response))
continue
elif seq == myseq:
def accept(self):
working_sock, address = self.listening_sock.accept()
if self.debugging:
- print>>sys.__stderr__, "** Connection request from ", address
+ print>>sys.__stderr__, "****** Connection request from ", address
if address[0] == '127.0.0.1':
SocketIO.__init__(self, working_sock)
else: