self.nntp_implementation = None
try:
resp, caps = self.capabilities()
- except NNTPPermanentError:
+ except (NNTPPermanentError, NNTPTemporaryError):
# Server doesn't support capabilities
self._caps = {}
else:
resp = self._shortcmd('authinfo pass ' + password)
if not resp.startswith('281'):
raise NNTPPermanentError(resp)
+ # Capabilities might have changed after login
+ self._caps = None
+ self.getcapabilities()
# Attempt to send mode reader if it was requested after login.
if self.readermode_afterauth:
self._setreadermode()
self.allow_posting = True
self._readline = readline
self._push_data = push_data
+ self._logged_in = False
+ self._user_sent = False
# Our welcome
self.handle_welcome()
self.push_lit(self.sample_body)
self.push_lit(".")
+ def handle_AUTHINFO(self, cred_type, data):
+ if self._logged_in:
+ self.push_lit('502 Already Logged In')
+ elif cred_type == 'user':
+ if self._user_sent:
+ self.push_lit('482 User Credential Already Sent')
+ else:
+ self.push_lit('381 Password Required')
+ self._user_sent = True
+ elif cred_type == 'pass':
+ self.push_lit('281 Login Successful')
+ self._logged_in = True
+ else:
+ raise Exception('Unknown cred type {}'.format(cred_type))
+
class NNTPv2Handler(NNTPv1Handler):
"""A handler for RFC 3977 (NNTP "v2")"""
def handle_CAPABILITIES(self):
- self.push_lit("""\
+ fmt = """\
101 Capability list:
VERSION 2 3
- IMPLEMENTATION INN 2.5.1
- AUTHINFO USER
+ IMPLEMENTATION INN 2.5.1{}
HDR
LIST ACTIVE ACTIVE.TIMES DISTRIB.PATS HEADERS NEWSGROUPS OVERVIEW.FMT
OVER
POST
READER
- .""")
+ ."""
+
+ if not self._logged_in:
+ self.push_lit(fmt.format('\n AUTHINFO USER'))
+ else:
+ self.push_lit(fmt.format(''))
def handle_OVER(self, message_spec=None):
return self.handle_XOVER(message_spec)
+class CapsAfterLoginNNTPv2Handler(NNTPv2Handler):
+ """A handler that allows CAPABILITIES only after login"""
+
+ def handle_CAPABILITIES(self):
+ if not self._logged_in:
+ self.push_lit('480 You must log in.')
+ else:
+ super().handle_CAPABILITIES()
+
+
class NNTPv1v2TestsMixin:
def setUp(self):
def test_welcome(self):
self.assertEqual(self.server.welcome, self.handler.welcome)
+ def test_authinfo(self):
+ if self.nntp_version == 2:
+ self.assertIn('AUTHINFO', self.server._caps)
+ self.server.login('testuser', 'testpw')
+ # if AUTHINFO is gone from _caps we also know that getcapabilities()
+ # has been called after login as it should
+ self.assertNotIn('AUTHINFO', self.server._caps)
+
def test_date(self):
resp, date = self.server.date()
self.assertEqual(resp, "111 20100914001155")
self.assertEqual(self.server.nntp_implementation, 'INN 2.5.1')
+class CapsAfterLoginNNTPv2Tests(MockedNNTPTestsMixin, unittest.TestCase):
+ """Tests a probably NNTP v2 server with capabilities only after login."""
+
+ nntp_version = 2
+ handler_class = CapsAfterLoginNNTPv2Handler
+
+ def test_caps_only_after_login(self):
+ self.assertEqual(self.server._caps, {})
+ self.server.login('testuser', 'testpw')
+ self.assertIn('VERSION', self.server._caps)
+
+
class MiscTests(unittest.TestCase):
def test_decode_header(self):
def test_main():
- tests = [MiscTests, NNTPv1Tests, NNTPv2Tests, NetworkedNNTPTests]
+ tests = [MiscTests, NNTPv1Tests, NNTPv2Tests, CapsAfterLoginNNTPv2Tests,
+ NetworkedNNTPTests]
if _have_ssl:
tests.append(NetworkedNNTP_SSLTests)
support.run_unittest(*tests)