import os
import sys
import logging
-import struct
try: # Python 2
import SocketServer as socketserver
except ImportError: # Python 3
# The strings that indicate the test framework is checking our aliveness
-VERIFIED_REQ = b"verifiedserver"
-VERIFIED_RSP = b"WE ROOLZ: {pid}"
+VERIFIED_REQ = "verifiedserver"
+VERIFIED_RSP = "WE ROOLZ: {pid}"
def telnetserver(options):
if options.pidfile:
pid = os.getpid()
with open(options.pidfile, "w") as f:
- f.write(b"{0}".format(pid))
+ f.write(str(pid))
local_bind = (HOST, options.port)
log.info("Listening on %s", local_bind)
data = neg.recv(1024)
log.debug("Incoming data: %r", data)
- if VERIFIED_REQ in data:
+ if VERIFIED_REQ.encode('ascii') in data:
log.debug("Received verification request from test framework")
- response_data = VERIFIED_RSP.format(pid=os.getpid())
+ response = VERIFIED_RSP.format(pid=os.getpid())
+ response_data = response.encode('ascii')
else:
log.debug("Received normal request - echoing back")
response_data = data.strip()
# TCP failed to give us any data. Break out.
break
- for byte in data:
- byte_int = self.byte_to_int(byte)
-
+ for byte_int in bytearray(data):
if self.state == self.NO_NEG:
- self.no_neg(byte, byte_int, buffer)
+ self.no_neg(byte_int, buffer)
elif self.state == self.START_NEG:
self.start_neg(byte_int)
elif self.state in [self.WILL, self.WONT, self.DO, self.DONT]:
return buffer
- def byte_to_int(self, byte):
- return struct.unpack(b'B', byte)[0]
-
- def no_neg(self, byte, byte_int, buffer):
+ def no_neg(self, byte_int, buffer):
# Not negotiating anything thus far. Check to see if we
# should.
if byte_int == NegTokens.IAC:
self.state = self.START_NEG
else:
# Just append the incoming byte to the buffer
- buffer.append(byte)
+ buffer.append(byte_int)
def start_neg(self, byte_int):
# In a negotiation.
self.state)
self.state = self.NO_NEG
- def send_message(self, message):
- packed_message = self.pack(message)
- self.tcp.sendall(packed_message)
-
- def pack(self, arr):
- return struct.pack(b'{0}B'.format(len(arr)), *arr)
+ def send_message(self, message_ints):
+ self.tcp.sendall(bytearray(message_ints))
def send_iac(self, arr):
message = [NegTokens.IAC]