message = dns.message.from_wire(data)
return message
+ @classmethod
+ def sendTCPQueries(cls, queries, timeout=2.0):
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ if timeout:
+ sock.settimeout(timeout)
+
+ sock.connect(("127.0.0.1", cls._recursorPort))
+ data = []
+ try:
+ for query in queries:
+ wire = query.to_wire()
+ sock.send(struct.pack("!H", len(wire)))
+ sock.send(wire)
+ for i in range(len(queries)):
+ try:
+ datalen = sock.recv(2)
+ if datalen:
+ (datalen,) = struct.unpack("!H", datalen)
+ data.append(sock.recv(datalen))
+ except socket.timeout as e:
+ continue
+ except socket.error as e:
+ print("Network error: %s" % (str(e)))
+ data = None
+ finally:
+ sock.close()
+
+ messages = []
+ for d in data:
+ messages.append(dns.message.from_wire(d))
+ return messages
+
def setUp(self):
# This function is called before every tests
- return
+ super(RecursorTest, self).setUp()
## Functions for comparisons
def assertMessageHasFlags(self, msg, flags, ednsflags=[]):