cls._UDPResponder.start()
def checkResponseContent(self, rawResponse, value):
- self.assertEquals(len(rawResponse), self._udpTruncationThreshold)
- response = dns.message.from_wire(rawResponse)
+ response = dns.message.from_wire(rawResponse)
- self.assertRcodeEqual(response, dns.rcode.NOERROR)
- self.assertFalse(response.flags & dns.flags.TC)
+ self.assertEquals(len(rawResponse), self._udpTruncationThreshold)
+ self.assertRcodeEqual(response, dns.rcode.NOERROR)
- for record in response.answer:
- self.assertEquals(record.rdtype, dns.rdatatype.TXT)
- for part in record:
- for string in part.strings:
- self.assertTrue(len(string) == 255 or len(string) == 16)
- for c in string:
- self.assertEquals(c, value)
+ self.assertMessageHasFlags(response, ['QR', 'RD', 'RA'])
+
+ for record in response.answer:
+ self.assertEquals(record.rdtype, dns.rdatatype.TXT)
+ for part in record:
+ for string in part.strings:
+ self.assertTrue(len(string) == 255 or len(string) == 5)
+ for c in string:
+ self.assertEquals(c, value)
+
+ def checkTruncatedResponse(self, message):
+ self.assertMessageHasFlags(message, ['QR', 'RD', 'RA', 'TC'])
def testLargeAnswer(self):
# why the same query 10 times, do you ask? because if we are reading from
raw = self.sendUDPQuery(query, decode=False)
self.checkResponseContent(raw, 'Z')
+ def testLargeAnswerTruncate(self):
+ """
+ Check that we get a TC answer
+ """
+ query = dns.message.make_query('BBBB.large-answer.example.', 'TXT', 'IN', use_edns=True, payload=4096)
+ for _ in range(10):
+ response = self.sendUDPQuery(query)
+ self.checkTruncatedResponse(response)
+
+ query = dns.message.make_query('CCCC.large-answer.example.', 'TXT', 'IN', use_edns=True, payload=4096)
+ for _ in range(10):
+ response = self.sendUDPQuery(query)
+ self.checkTruncatedResponse(response)
+
class UDPLargeResponder(DatagramProtocol):
def datagramReceived(self, datagram, address):
if request.question[0].name == dns.name.from_text('AAAA.large-answer.example.'):
value = 'A'
- else:
+ final_count = 5
+ elif request.question[0].name == dns.name.from_text('ZZZZ.large-answer.example.'):
value = 'Z'
+ final_count = 5
+ elif request.question[0].name == dns.name.from_text('BBBB.large-answer.example.'):
+ value = 'B'
+ final_count = 6
+ elif request.question[0].name == dns.name.from_text('CCCC.large-answer.example.'):
+ value = 'C'
+ final_count = 6
answer = dns.rrset.from_text(request.question[0].name, 0, dns.rdataclass.IN, 'TXT', value*255)
for _ in range(6):
response.answer.append(answer)
- answer = dns.rrset.from_text(request.question[0].name, 0, dns.rdataclass.IN, 'TXT', value*16)
+ answer = dns.rrset.from_text(request.question[0].name, 0, dns.rdataclass.IN, 'TXT', value*final_count)
response.answer.append(answer)
self.transport.write(response.to_wire(max_size=65535), address)