def combinations(iterable, r):
pool = tuple(iterable)
- if pool:
- n = len(pool)
- vec = range(r)
- yield tuple(pool[i] for i in vec)
- while 1:
- for i in reversed(range(r)):
- if vec[i] == i + n-r:
- continue
- vec[i] += 1
- for j in range(i+1, r):
- vec[j] = vec[j-1] + 1
- yield tuple(pool[i] for i in vec)
+ n = len(pool)
+ assert 0 <= r <= n
+ vec = range(r)
+ yield tuple(pool[i] for i in vec)
+ while 1:
+ for i in reversed(range(r)):
+ if vec[i] != i + n - r:
break
- else:
- return
+ else:
+ return
+ vec[i] += 1
+ for j in range(i+1, r):
+ vec[j] = vec[j-1] + 1
+ yield tuple(pool[i] for i in vec)
.. versionadded:: 2.6
---------------------
-.. class:: Trace([count=1[, trace=1[, countfuncs=0[, countcallers=0[, ignoremods=()[, ignoredirs=()[, infile=None[, outfile=None]]]]]]]])
+.. class:: Trace([count=1[, trace=1[, countfuncs=0[, countcallers=0[, ignoremods=()[, ignoredirs=()[, infile=None[, outfile=None[, timing=False]]]]]]]]])
Create an object to trace execution of a single statement or expression. All
parameters are optional. *count* enables counting of line numbers. *trace*
*ignoremods* is a list of modules or packages to ignore. *ignoredirs* is a list
of directories whose modules or packages should be ignored. *infile* is the
file from which to read stored count information. *outfile* is a file in which
- to write updated count information.
+ to write updated count information. *timing* enables a timestamp relative
+ to when tracing was started to be displayed.
.. method:: Trace.run(cmd)
that are not available on your distro's package. You can easily compile the
latest version of Python from source.
-In the event Python doesn't come preinstalled and isn't in the repositories as
+In the event that Python doesn't come preinstalled and isn't in the repositories as
well, you can easily make packages for your own distro. Have a look at the
following links:
<http://www.python.org/download/releases/>`_ for many years.
With ongoing development of Python, some platforms that used to be supported
-earlier are not longer supported (due to the lack of users or developers).
+earlier are no longer supported (due to the lack of users or developers).
Check :pep:`11` for details on all unsupported platforms.
* DOS and Windows 3.x are deprecated since Python 2.0 and code specific to these
import unittest
from test import test_support
+from _testcapi import getargs_keywords
import warnings
warnings.filterwarnings("ignore",
raise ValueError
self.assertRaises(TypeError, getargs_tuple, 1, seq())
+class Keywords_TestCase(unittest.TestCase):
+ def test_positional_args(self):
+ # using all positional args
+ self.assertEquals(
+ getargs_keywords((1,2), 3, (4,(5,6)), (7,8,9), 10),
+ (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
+ )
+ def test_mixed_args(self):
+ # positional and keyword args
+ self.assertEquals(
+ getargs_keywords((1,2), 3, (4,(5,6)), arg4=(7,8,9), arg5=10),
+ (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
+ )
+ def test_keyword_args(self):
+ # all keywords
+ self.assertEquals(
+ getargs_keywords(arg1=(1,2), arg2=3, arg3=(4,(5,6)), arg4=(7,8,9), arg5=10),
+ (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
+ )
+ def test_optional_args(self):
+ # missing optional keyword args, skipping tuples
+ self.assertEquals(
+ getargs_keywords(arg1=(1,2), arg2=3, arg5=10),
+ (1, 2, 3, -1, -1, -1, -1, -1, -1, 10)
+ )
+ def test_required_args(self):
+ # required arg missing
+ try:
+ getargs_keywords(arg1=(1,2))
+ except TypeError as err:
+ self.assertEquals(str(err), "Required argument 'arg2' (pos 2) not found")
+ else:
+ self.fail('TypeError should have been raised')
+ def test_too_many_args(self):
+ try:
+ getargs_keywords((1,2),3,(4,(5,6)),(7,8,9),10,111)
+ except TypeError as err:
+ self.assertEquals(str(err), "function takes at most 5 arguments (6 given)")
+ else:
+ self.fail('TypeError should have been raised')
+ def test_invalid_keyword(self):
+ # extraneous keyword arg
+ try:
+ getargs_keywords((1,2),3,arg5=10,arg666=666)
+ except TypeError as err:
+ self.assertEquals(str(err), "'arg666' is an invalid keyword argument for this function")
+ else:
+ self.fail('TypeError should have been raised')
def test_main():
- tests = [Signed_TestCase, Unsigned_TestCase, Tuple_TestCase]
+ tests = [Signed_TestCase, Unsigned_TestCase, Tuple_TestCase, Keywords_TestCase]
try:
from _testcapi import getargs_L, getargs_K
except ImportError:
'Convenience function for partially consuming a long of infinite iterable'
return list(islice(seq, n))
+def fact(n):
+ 'Factorial'
+ return reduce(operator.mul, range(1, n+1), 1)
+
class TestBasicOps(unittest.TestCase):
def test_chain(self):
self.assertEqual(list(chain('abc', 'def')), list('abcdef'))
self.assertEqual(take(4, chain('abc', 'def')), list('abcd'))
self.assertRaises(TypeError, chain, 2, 3)
+ def test_combinations(self):
+ self.assertRaises(TypeError, combinations, 'abc') # missing r argument
+ self.assertRaises(TypeError, combinations, 'abc', 2, 1) # too many arguments
+ self.assertRaises(ValueError, combinations, 'abc', -2) # r is negative
+ self.assertRaises(ValueError, combinations, 'abc', 32) # r is too big
+ self.assertEqual(list(combinations(range(4), 3)),
+ [(0,1,2), (0,1,3), (0,2,3), (1,2,3)])
+ for n in range(8):
+ values = [5*x-12 for x in range(n)]
+ for r in range(n+1):
+ result = list(combinations(values, r))
+ self.assertEqual(len(result), fact(n) / fact(r) / fact(n-r)) # right number of combs
+ self.assertEqual(len(result), len(set(result))) # no repeats
+ self.assertEqual(result, sorted(result)) # lexicographic order
+ for c in result:
+ self.assertEqual(len(c), r) # r-length combinations
+ self.assertEqual(len(set(c)), r) # no duplicate elements
+ self.assertEqual(list(c), sorted(c)) # keep original ordering
+ self.assert_(all(e in values for e in c)) # elements taken from input iterable
+
def test_count(self):
self.assertEqual(lzip('abc',count()), [('a', 0), ('b', 1), ('c', 2)])
self.assertEqual(lzip('abc',count(3)), [('a', 3), ('b', 4), ('c', 5)])
PORT = None
def server(evt, buf):
+ serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ serv.settimeout(1)
+ serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ serv.bind(("", 0))
+ global PORT
+ PORT = serv.getsockname()[1]
+ serv.listen(5)
+ evt.set()
try:
- serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- serv.settimeout(3)
- serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- serv.bind(("", 0))
- global PORT
- PORT = serv.getsockname()[1]
- serv.listen(5)
conn, addr = serv.accept()
except socket.timeout:
pass
buf = buf[sent:]
n -= 1
- time.sleep(0.01)
conn.close()
finally:
self.evt = threading.Event()
servargs = (self.evt, b"220 Hola mundo\n")
threading.Thread(target=server, args=servargs).start()
-
- # wait until server thread has assigned a port number
- n = 500
- while PORT is None and n > 0:
- time.sleep(0.01)
- n -= 1
-
- # wait a little longer (sometimes connections are refused
- # on slow machines without this additional wait)
- time.sleep(0.5)
+ self.evt.wait()
+ self.evt.clear()
def tearDown(self):
self.evt.wait()
smtp = smtplib.SMTP("%s:%s" % (HOST, PORT))
smtp.sock.close()
- def testNotConnected(self):
- # Test various operations on an unconnected SMTP object that
- # should raise exceptions (at present the attempt in SMTP.send
- # to reference the nonexistent 'sock' attribute of the SMTP object
- # causes an AttributeError)
- smtp = smtplib.SMTP()
- self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)
- self.assertRaises(smtplib.SMTPServerDisconnected,
- smtp.send, 'test msg')
-
def testLocalHostName(self):
# check that supplied local_hostname is used
smtp = smtplib.SMTP(HOST, PORT, local_hostname="testhost")
self.assertEqual(smtp.local_hostname, "testhost")
smtp.sock.close()
- def testNonnumericPort(self):
- # check that non-numeric port raises socket.error
- self.assertRaises(socket.error, smtplib.SMTP,
- "localhost", "bogus")
- self.assertRaises(socket.error, smtplib.SMTP,
- "localhost:bogus")
-
def testTimeoutDefault(self):
# default
smtp = smtplib.SMTP(HOST, PORT)
serv = server_class(("", 0), ('nowhere', -1))
global PORT
PORT = serv.getsockname()[1]
+ serv_evt.set()
try:
if hasattr(select, 'poll'):
except socket.timeout:
pass
finally:
- # allow some time for the client to read the result
- time.sleep(0.5)
- serv.close()
+ if not client_evt.isSet():
+ # allow some time for the client to read the result
+ time.sleep(0.5)
+ serv.close()
asyncore.close_all()
PORT = None
- time.sleep(0.5)
serv_evt.set()
MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
threading.Thread(target=debugging_server, args=serv_args).start()
# wait until server thread has assigned a port number
- n = 500
- while PORT is None and n > 0:
- time.sleep(0.01)
- n -= 1
-
- # wait a little longer (sometimes connections are refused
- # on slow machines without this additional wait)
- time.sleep(0.5)
+ self.serv_evt.wait()
+ self.serv_evt.clear()
def tearDown(self):
# indicate that the client is finished
self.assertEqual(self.output.getvalue(), mexpect)
+class NonConnectingTests(TestCase):
+
+ def testNotConnected(self):
+ # Test various operations on an unconnected SMTP object that
+ # should raise exceptions (at present the attempt in SMTP.send
+ # to reference the nonexistent 'sock' attribute of the SMTP object
+ # causes an AttributeError)
+ smtp = smtplib.SMTP()
+ self.assertRaises(smtplib.SMTPServerDisconnected, smtp.ehlo)
+ self.assertRaises(smtplib.SMTPServerDisconnected,
+ smtp.send, 'test msg')
+
+ def testNonnumericPort(self):
+ # check that non-numeric port raises socket.error
+ self.assertRaises(socket.error, smtplib.SMTP,
+ "localhost", "bogus")
+ self.assertRaises(socket.error, smtplib.SMTP,
+ "localhost:bogus")
+
+
# test response of client to a non-successful HELO message
class BadHELOServerTests(TestCase):
self.evt = threading.Event()
servargs = (self.evt, b"199 no hello for you!\n")
threading.Thread(target=server, args=servargs).start()
-
- # wait until server thread has assigned a port number
- n = 500
- while PORT is None and n > 0:
- time.sleep(0.01)
- n -= 1
-
- # wait a little longer (sometimes connections are refused
- # on slow machines without this additional wait)
- time.sleep(0.5)
+ self.evt.wait()
+ self.evt.clear()
def tearDown(self):
self.evt.wait()
threading.Thread(target=debugging_server, args=serv_args).start()
# wait until server thread has assigned a port number
- n = 500
- while PORT is None and n > 0:
- time.sleep(0.01)
- n -= 1
-
- # wait a little longer (sometimes connections are refused
- # on slow machines without this additional wait)
- time.sleep(0.5)
+ self.serv_evt.wait()
+ self.serv_evt.clear()
def tearDown(self):
# indicate that the client is finished
def test_main(verbose=None):
test_support.run_unittest(GeneralTests, DebuggingServerTests,
+ NonConnectingTests,
BadHELOServerTests, SMTPSimTests)
if __name__ == '__main__':
import re
import sys
import threading
+import time
import token
import tokenize
import types
with '>>>>>> '.
-s, --summary Write a brief summary on stdout for each file.
(Can only be used with --count or --report.)
+-g, --timing Prefix each line with the time since the program started.
+ Only used while tracing.
Filters, may be repeated multiple times:
--ignore-module=<mod> Ignore the given module(s) and its submodules
class Trace:
def __init__(self, count=1, trace=1, countfuncs=0, countcallers=0,
- ignoremods=(), ignoredirs=(), infile=None, outfile=None):
+ ignoremods=(), ignoredirs=(), infile=None, outfile=None,
+ timing=False):
"""
@param count true iff it should count number of times each
line is executed
@param infile file from which to read stored counts to be
added into the results
@param outfile file in which to write the results
+ @param timing true iff timing information be displayed
"""
self.infile = infile
self.outfile = outfile
self._calledfuncs = {}
self._callers = {}
self._caller_cache = {}
+ self.start_time = None
+ if timing:
+ self.start_time = time.time()
if countcallers:
self.globaltrace = self.globaltrace_trackcallers
elif countfuncs:
key = filename, lineno
self.counts[key] = self.counts.get(key, 0) + 1
+ if self.start_time:
+ print('%.2f' % (time.time() - self.start_time), end=' ')
bname = os.path.basename(filename)
print("%s(%d): %s" % (bname, lineno,
linecache.getline(filename, lineno)), end=' ')
filename = frame.f_code.co_filename
lineno = frame.f_lineno
+ if self.start_time:
+ print('%.2f' % (time.time() - self.start_time), end=' ')
bname = os.path.basename(filename)
print("%s(%d): %s" % (bname, lineno,
linecache.getline(filename, lineno)), end=' ')
if argv is None:
argv = sys.argv
try:
- opts, prog_argv = getopt.getopt(argv[1:], "tcrRf:d:msC:lT",
+ opts, prog_argv = getopt.getopt(argv[1:], "tcrRf:d:msC:lTg",
["help", "version", "trace", "count",
"report", "no-report", "summary",
"file=", "missing",
"ignore-module=", "ignore-dir=",
"coverdir=", "listfuncs",
- "trackcalls"])
+ "trackcalls", "timing"])
except getopt.error as msg:
sys.stderr.write("%s: %s\n" % (sys.argv[0], msg))
summary = 0
listfuncs = False
countcallers = False
+ timing = False
for opt, val in opts:
if opt == "--help":
listfuncs = True
continue
+ if opt == "-g" or opt == "--timing":
+ timing = True
+ continue
+
if opt == "-t" or opt == "--trace":
trace = 1
continue
t = Trace(count, trace, countfuncs=listfuncs,
countcallers=countcallers, ignoremods=ignore_modules,
ignoredirs=ignore_dirs, infile=counts_file,
- outfile=counts_file)
+ outfile=counts_file, timing=timing)
try:
fp = open(progname)
try:
return Py_BuildValue("iii", a, b, c);
}
+/* test PyArg_ParseTupleAndKeywords */
+static PyObject *getargs_keywords(PyObject *self, PyObject *args, PyObject *kwargs)
+{
+ static char *keywords[] = {"arg1","arg2","arg3","arg4","arg5", NULL};
+ static char *fmt="(ii)i|(i(ii))(iii)i";
+ int int_args[10]={-1, -1, -1, -1, -1, -1, -1, -1, -1, -1};
+
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, fmt, keywords,
+ &int_args[0], &int_args[1], &int_args[2], &int_args[3], &int_args[4],
+ &int_args[5], &int_args[6], &int_args[7], &int_args[8], &int_args[9]))
+ return NULL;
+ return Py_BuildValue("iiiiiiiiii",
+ int_args[0], int_args[1], int_args[2], int_args[3], int_args[4],
+ int_args[5], int_args[6], int_args[7], int_args[8], int_args[9]);
+}
+
/* Functions to call PyArg_ParseTuple with integer format codes,
and return the result.
*/
PyDoc_STR("This is a pretty normal docstring.")},
{"getargs_tuple", getargs_tuple, METH_VARARGS},
+ {"getargs_keywords", (PyCFunction)getargs_keywords,
+ METH_VARARGS|METH_KEYWORDS},
{"getargs_b", getargs_b, METH_VARARGS},
{"getargs_B", getargs_B, METH_VARARGS},
{"getargs_H", getargs_H, METH_VARARGS},
/* create productobject structure */
lz = (productobject *)type->tp_alloc(type, 0);
- if (lz == NULL) {
- Py_DECREF(pools);
+ if (lz == NULL)
goto error;
- }
lz->pools = pools;
lz->maxvec = maxvec;
};
+/* combinations object ************************************************************/
+
+typedef struct {
+ PyObject_HEAD
+ PyObject *pool; /* input converted to a tuple */
+ Py_ssize_t *indices; /* one index per result element */
+ PyObject *result; /* most recently returned result tuple */
+ Py_ssize_t r; /* size of result tuple */
+ int stopped; /* set to 1 when the combinations iterator is exhausted */
+} combinationsobject;
+
+static PyTypeObject combinations_type;
+
+static PyObject *
+combinations_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
+{
+ combinationsobject *co;
+ Py_ssize_t n;
+ Py_ssize_t r;
+ PyObject *pool = NULL;
+ PyObject *iterable = NULL;
+ Py_ssize_t *indices = NULL;
+ Py_ssize_t i;
+ static char *kwargs[] = {"iterable", "r", NULL};
+
+ if (!PyArg_ParseTupleAndKeywords(args, kwds, "On:combinations", kwargs,
+ &iterable, &r))
+ return NULL;
+
+ pool = PySequence_Tuple(iterable);
+ if (pool == NULL)
+ goto error;
+ n = PyTuple_GET_SIZE(pool);
+ if (r < 0) {
+ PyErr_SetString(PyExc_ValueError, "r must be non-negative");
+ goto error;
+ }
+ if (r > n) {
+ PyErr_SetString(PyExc_ValueError, "r cannot be bigger than the iterable");
+ goto error;
+ }
+
+ indices = PyMem_Malloc(r * sizeof(Py_ssize_t));
+ if (indices == NULL) {
+ PyErr_NoMemory();
+ goto error;
+ }
+
+ for (i=0 ; i<r ; i++)
+ indices[i] = i;
+
+ /* create combinationsobject structure */
+ co = (combinationsobject *)type->tp_alloc(type, 0);
+ if (co == NULL)
+ goto error;
+
+ co->pool = pool;
+ co->indices = indices;
+ co->result = NULL;
+ co->r = r;
+ co->stopped = 0;
+
+ return (PyObject *)co;
+
+error:
+ if (indices != NULL)
+ PyMem_Free(indices);
+ Py_XDECREF(pool);
+ return NULL;
+}
+
+static void
+combinations_dealloc(combinationsobject *co)
+{
+ PyObject_GC_UnTrack(co);
+ Py_XDECREF(co->pool);
+ Py_XDECREF(co->result);
+ PyMem_Free(co->indices);
+ Py_TYPE(co)->tp_free(co);
+}
+
+static int
+combinations_traverse(combinationsobject *co, visitproc visit, void *arg)
+{
+ Py_VISIT(co->pool);
+ Py_VISIT(co->result);
+ return 0;
+}
+
+static PyObject *
+combinations_next(combinationsobject *co)
+{
+ PyObject *elem;
+ PyObject *oldelem;
+ PyObject *pool = co->pool;
+ Py_ssize_t *indices = co->indices;
+ PyObject *result = co->result;
+ Py_ssize_t n = PyTuple_GET_SIZE(pool);
+ Py_ssize_t r = co->r;
+ Py_ssize_t i, j, index;
+
+ if (co->stopped)
+ return NULL;
+
+ if (result == NULL) {
+ /* On the first pass, initialize result tuple using the indices */
+ result = PyTuple_New(r);
+ if (result == NULL)
+ goto empty;
+ co->result = result;
+ for (i=0; i<r ; i++) {
+ index = indices[i];
+ elem = PyTuple_GET_ITEM(pool, index);
+ Py_INCREF(elem);
+ PyTuple_SET_ITEM(result, i, elem);
+ }
+ } else {
+ /* Copy the previous result tuple or re-use it if available */
+ if (Py_REFCNT(result) > 1) {
+ PyObject *old_result = result;
+ result = PyTuple_New(r);
+ if (result == NULL)
+ goto empty;
+ co->result = result;
+ for (i=0; i<r ; i++) {
+ elem = PyTuple_GET_ITEM(old_result, i);
+ Py_INCREF(elem);
+ PyTuple_SET_ITEM(result, i, elem);
+ }
+ Py_DECREF(old_result);
+ }
+ /* Now, we've got the only copy so we can update it in-place
+ * CPython's empty tuple is a singleton and cached in
+ * PyTuple's freelist.
+ */
+ assert(r == 0 || Py_REFCNT(result) == 1);
+
+ /* Scan indices right-to-left until finding one that is not
+ at its maximum (i + n - r). */
+ for (i=r-1 ; i >= 0 && indices[i] == i+n-r ; i--)
+ ;
+
+ /* If i is negative, then the indices are all at
+ their maximum value and we're done. */
+ if (i < 0)
+ goto empty;
+
+ /* Increment the current index which we know is not at its
+ maximum. Then move back to the right setting each index
+ to its lowest possible value (one higher than the index
+ to its left -- this maintains the sort order invariant). */
+ indices[i]++;
+ for (j=i+1 ; j<r ; j++)
+ indices[j] = indices[j-1] + 1;
+
+ /* Update the result tuple for the new indices
+ starting with i, the leftmost index that changed */
+ for ( ; i<r ; i++) {
+ index = indices[i];
+ elem = PyTuple_GET_ITEM(pool, index);
+ Py_INCREF(elem);
+ oldelem = PyTuple_GET_ITEM(result, i);
+ PyTuple_SET_ITEM(result, i, elem);
+ Py_DECREF(oldelem);
+ }
+ }
+
+ Py_INCREF(result);
+ return result;
+
+empty:
+ co->stopped = 1;
+ return NULL;
+}
+
+PyDoc_STRVAR(combinations_doc,
+"combinations(iterables) --> combinations object\n\
+\n\
+Return successive r-length combinations of elements in the iterable.\n\n\
+combinations(range(4), 3) --> (0,1,2), (0,1,3), (0,2,3), (1,2,3)");
+
+static PyTypeObject combinations_type = {
+ PyVarObject_HEAD_INIT(NULL, 0)
+ "itertools.combinations", /* tp_name */
+ sizeof(combinationsobject), /* tp_basicsize */
+ 0, /* tp_itemsize */
+ /* methods */
+ (destructor)combinations_dealloc, /* tp_dealloc */
+ 0, /* tp_print */
+ 0, /* tp_getattr */
+ 0, /* tp_setattr */
+ 0, /* tp_compare */
+ 0, /* tp_repr */
+ 0, /* tp_as_number */
+ 0, /* tp_as_sequence */
+ 0, /* tp_as_mapping */
+ 0, /* tp_hash */
+ 0, /* tp_call */
+ 0, /* tp_str */
+ PyObject_GenericGetAttr, /* tp_getattro */
+ 0, /* tp_setattro */
+ 0, /* tp_as_buffer */
+ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC |
+ Py_TPFLAGS_BASETYPE, /* tp_flags */
+ combinations_doc, /* tp_doc */
+ (traverseproc)combinations_traverse, /* tp_traverse */
+ 0, /* tp_clear */
+ 0, /* tp_richcompare */
+ 0, /* tp_weaklistoffset */
+ PyObject_SelfIter, /* tp_iter */
+ (iternextfunc)combinations_next, /* tp_iternext */
+ 0, /* tp_methods */
+ 0, /* tp_members */
+ 0, /* tp_getset */
+ 0, /* tp_base */
+ 0, /* tp_dict */
+ 0, /* tp_descr_get */
+ 0, /* tp_descr_set */
+ 0, /* tp_dictoffset */
+ 0, /* tp_init */
+ 0, /* tp_alloc */
+ combinations_new, /* tp_new */
+ PyObject_GC_Del, /* tp_free */
+};
+
+
/* ifilter object ************************************************************/
typedef struct {
PyObject *m;
char *name;
PyTypeObject *typelist[] = {
+ &combinations_type,
&cycle_type,
&dropwhile_type,
&takewhile_type,
&count_type,
&izip_type,
&iziplongest_type,
- &product_type,
+ &product_type,
&repeat_type,
&groupby_type,
NULL
return retval;
}
+#define IS_END_OF_FORMAT(c) (c == '\0' || c == ';' || c == ':')
static int
vgetargskeywords(PyObject *args, PyObject *keywords, const char *format,
{
char msgbuf[512];
int levels[32];
- const char *fname, *message;
- int min, max;
- const char *formatsave;
+ const char *fname, *msg, *custom_msg, *keyword;
+ int min = INT_MAX;
int i, len, nargs, nkeywords;
- const char *msg;
- char **p;
- PyObject *freelist = NULL;
+ PyObject *freelist = NULL, *current_arg;
assert(args != NULL && PyTuple_Check(args));
assert(keywords == NULL || PyDict_Check(keywords));
assert(kwlist != NULL);
assert(p_va != NULL);
- /* Search the format:
- message <- error msg, if any (else NULL).
- fname <- routine name, if any (else NULL).
- min <- # of required arguments, or -1 if all are required.
- max <- most arguments (required + optional).
- Check that kwlist has a non-NULL entry for each arg.
- Raise error if a tuple arg spec is found.
- */
- fname = message = NULL;
- formatsave = format;
- p = kwlist;
- min = -1;
- max = 0;
- while ((i = *format++) != '\0') {
- if (isalpha(Py_CHARMASK(i)) && i != 'e') {
- max++;
- if (*p == NULL) {
- PyErr_SetString(PyExc_RuntimeError,
- "more argument specifiers than "
- "keyword list entries");
- return 0;
- }
- p++;
- }
- else if (i == '|')
- min = max;
- else if (i == ':') {
- fname = format;
- break;
- }
- else if (i == ';') {
- message = format;
- break;
- }
- else if (i == '(') {
- PyErr_SetString(PyExc_RuntimeError,
- "tuple found in format when using keyword "
- "arguments");
- return 0;
- }
- }
- format = formatsave;
- if (*p != NULL) {
- PyErr_SetString(PyExc_RuntimeError,
- "more keyword list entries than "
- "argument specifiers");
- return 0;
- }
- if (min < 0) {
- /* All arguments are required. */
- min = max;
+ /* grab the function name or custom error msg first (mutually exclusive) */
+ fname = strchr(format, ':');
+ if (fname) {
+ fname++;
+ custom_msg = NULL;
}
-
- nargs = PyTuple_GET_SIZE(args);
- nkeywords = keywords == NULL ? 0 : PyDict_Size(keywords);
-
- /* make sure there are no duplicate values for an argument;
- its not clear when to use the term "keyword argument vs.
- keyword parameter in messages */
- if (nkeywords > 0) {
- for (i = 0; i < nargs; i++) {
- const char *thiskw = kwlist[i];
- if (thiskw == NULL)
- break;
- if (PyDict_GetItemString(keywords, thiskw)) {
- PyErr_Format(PyExc_TypeError,
- "keyword parameter '%s' was given "
- "by position and by name",
- thiskw);
- return 0;
- }
- else if (PyErr_Occurred())
- return 0;
- }
+ else {
+ custom_msg = strchr(format,';');
+ if (custom_msg)
+ custom_msg++;
}
- /* required arguments missing from args can be supplied by keyword
- arguments; set len to the number of positional arguments, and,
- if that's less than the minimum required, add in the number of
- required arguments that are supplied by keywords */
- len = nargs;
- if (nkeywords > 0 && nargs < min) {
- for (i = nargs; i < min; i++) {
- if (PyDict_GetItemString(keywords, kwlist[i]))
- len++;
- else if (PyErr_Occurred())
- return 0;
- }
- }
+ /* scan kwlist and get greatest possible nbr of args */
+ for (len=0; kwlist[len]; len++)
+ continue;
- /* make sure we got an acceptable number of arguments; the message
- is a little confusing with keywords since keyword arguments
- which are supplied, but don't match the required arguments
- are not included in the "%d given" part of the message
- XXX and this isn't a bug!? */
- if (len < min || max < len) {
- if (message == NULL) {
- PyOS_snprintf(msgbuf, sizeof(msgbuf),
- "%.200s%s takes %s %d argument%s "
- "(%d given)",
- fname==NULL ? "function" : fname,
- fname==NULL ? "" : "()",
- min==max ? "exactly"
- : len < min ? "at least" : "at most",
- len < min ? min : max,
- (len < min ? min : max) == 1 ? "" : "s",
- len);
- message = msgbuf;
- }
- PyErr_SetString(PyExc_TypeError, message);
+ nargs = PyTuple_GET_SIZE(args);
+ nkeywords = (keywords == NULL) ? 0 : PyDict_Size(keywords);
+ if (nargs + nkeywords > len) {
+ PyErr_Format(PyExc_TypeError, "%s%s takes at most %d "
+ "argument%s (%d given)",
+ (fname == NULL) ? "function" : fname,
+ (fname == NULL) ? "" : "()",
+ len,
+ (len == 1) ? "" : "s",
+ nargs + nkeywords);
return 0;
}
- /* convert the positional arguments */
- for (i = 0; i < nargs; i++) {
- if (*format == '|')
+ /* convert tuple args and keyword args in same loop, using kwlist to drive process */
+ for (i = 0; i < len; i++) {
+ keyword = kwlist[i];
+ if (*format == '|') {
+ min = i;
format++;
- msg = convertitem(PyTuple_GET_ITEM(args, i), &format, p_va,
- flags, levels, msgbuf, sizeof(msgbuf),
- &freelist);
- if (msg) {
- seterror(i+1, msg, levels, fname, message);
+ }
+ if (IS_END_OF_FORMAT(*format)) {
+ PyErr_Format(PyExc_RuntimeError,
+ "More keyword list entries (%d) than "
+ "format specifiers (%d)", len, i);
return cleanreturn(0, freelist);
}
- }
-
- /* handle no keyword parameters in call */
- if (nkeywords == 0)
- return cleanreturn(1, freelist);
-
- /* convert the keyword arguments; this uses the format
- string where it was left after processing args */
- for (i = nargs; i < max; i++) {
- PyObject *item;
- if (*format == '|')
- format++;
- item = PyDict_GetItemString(keywords, kwlist[i]);
- if (item != NULL) {
- Py_INCREF(item);
- msg = convertitem(item, &format, p_va, flags, levels,
- msgbuf, sizeof(msgbuf), &freelist);
- Py_DECREF(item);
- if (msg) {
- seterror(i+1, msg, levels, fname, message);
+ current_arg = NULL;
+ if (nkeywords) {
+ current_arg = PyDict_GetItemString(keywords, keyword);
+ }
+ if (current_arg) {
+ --nkeywords;
+ if (i < nargs) {
+ /* arg present in tuple and in dict */
+ PyErr_Format(PyExc_TypeError,
+ "Argument given by name ('%s') "
+ "and position (%d)",
+ keyword, i+1);
return cleanreturn(0, freelist);
}
- --nkeywords;
- if (nkeywords == 0)
- break;
}
- else if (PyErr_Occurred())
+ else if (nkeywords && PyErr_Occurred())
return cleanreturn(0, freelist);
- else {
- msg = skipitem(&format, p_va, flags);
+ else if (i < nargs)
+ current_arg = PyTuple_GET_ITEM(args, i);
+
+ if (current_arg) {
+ msg = convertitem(current_arg, &format, p_va, flags,
+ levels, msgbuf, sizeof(msgbuf), &freelist);
if (msg) {
- levels[0] = 0;
- seterror(i+1, msg, levels, fname, message);
+ seterror(i+1, msg, levels, fname, custom_msg);
return cleanreturn(0, freelist);
}
+ continue;
+ }
+
+ if (i < min) {
+ PyErr_Format(PyExc_TypeError, "Required argument "
+ "'%s' (pos %d) not found",
+ keyword, i+1);
+ return cleanreturn(0, freelist);
+ }
+ /* current code reports success when all required args
+ * fulfilled and no keyword args left, with no further
+ * validation. XXX Maybe skip this in debug build ?
+ */
+ if (!nkeywords)
+ return cleanreturn(1, freelist);
+
+ /* We are into optional args, skip thru to any remaining
+ * keyword args */
+ msg = skipitem(&format, p_va, flags);
+ if (msg) {
+ PyErr_Format(PyExc_RuntimeError, "%s: '%s'", msg,
+ format);
+ return cleanreturn(0, freelist);
}
}
+ if (!IS_END_OF_FORMAT(*format)) {
+ PyErr_Format(PyExc_RuntimeError,
+ "more argument specifiers than keyword list entries "
+ "(remaining format:'%s')", format);
+ return cleanreturn(0, freelist);
+ }
+
/* make sure there are no extraneous keyword arguments */
if (nkeywords > 0) {
PyObject *key, *value;
return cleanreturn(0, freelist);
}
ks = PyUnicode_AsString(key);
- for (i = 0; i < max; i++) {
+ for (i = 0; i < len; i++) {
if (!strcmp(ks, kwlist[i])) {
match = 1;
break;
static char *
skipitem(const char **p_format, va_list *p_va, int flags)
{
- const char *format = *p_format;
+ const char *format = *p_format;
char c = *format++;
switch (c) {
break;
}
+ case '(': /* bypass tuple, not handled at all previously */
+ {
+ char *msg;
+ for (;;) {
+ if (*format==')')
+ break;
+ if (IS_END_OF_FORMAT(*format))
+ return "Unmatched left paren in format "
+ "string";
+ msg = skipitem(&format, p_va, flags);
+ if (msg)
+ return msg;
+ }
+ format++;
+ break;
+ }
+
+ case ')':
+ return "Unmatched right paren in format string";
+
default:
err:
return "impossible<bad format char>";
}
- /* The "(...)" format code for tuples is not handled here because
- * it is not allowed with keyword args. */
-
*p_format = format;
return NULL;
}