self.assertEqual(ev, ev)
self.assertNotEqual(ev, other)
+ bignum = sys.maxsize * 2 + 1
+ ev = select.kevent(bignum, 1, 2, 3, sys.maxsize, bignum)
+ self.assertEqual(ev.ident, bignum)
+ self.assertEqual(ev.filter, 1)
+ self.assertEqual(ev.flags, 2)
+ self.assertEqual(ev.fflags, 3)
+ self.assertEqual(ev.data, sys.maxsize)
+ self.assertEqual(ev.udata, bignum)
+ self.assertEqual(ev, ev)
+ self.assertNotEqual(ev, other)
+
def test_queue_event(self):
serverSocket = socket.socket()
serverSocket.bind(('127.0.0.1', 0))
#define kqueue_queue_Check(op) (PyObject_TypeCheck((op), &kqueue_queue_Type))
+#if (SIZEOF_UINTPTR_T != SIZEOF_VOID_P)
+# error uintptr_t does not match void *!
+#elif (SIZEOF_UINTPTR_T == SIZEOF_LONG_LONG)
+# define T_UINTPTRT T_ULONGLONG
+# define T_INTPTRT T_LONGLONG
+# define PyLong_AsUintptr_t PyLong_AsUnsignedLongLong
+# define UINTPTRT_FMT_UNIT "K"
+# define INTPTRT_FMT_UNIT "L"
+#elif (SIZEOF_UINTPTR_T == SIZEOF_LONG)
+# define T_UINTPTRT T_ULONG
+# define T_INTPTRT T_LONG
+# define PyLong_AsUintptr_t PyLong_AsUnsignedLong
+# define UINTPTRT_FMT_UNIT "k"
+# define INTPTRT_FMT_UNIT "l"
+#elif (SIZEOF_UINTPTR_T == SIZEOF_INT)
+# define T_UINTPTRT T_UINT
+# define T_INTPTRT T_INT
+# define PyLong_AsUintptr_t PyLong_AsUnsignedLong
+# define UINTPTRT_FMT_UNIT "I"
+# define INTPTRT_FMT_UNIT "i"
+#else
+# error uintptr_t does not match int, long, or long long!
+#endif
+
/* Unfortunately, we can't store python objects in udata, because
* kevents in the kernel can be removed without warning, which would
* forever lose the refcount on the object stored with it.
#define KQ_OFF(x) offsetof(kqueue_event_Object, x)
static struct PyMemberDef kqueue_event_members[] = {
- {"ident", T_UINT, KQ_OFF(e.ident)},
+ {"ident", T_UINTPTRT, KQ_OFF(e.ident)},
{"filter", T_SHORT, KQ_OFF(e.filter)},
{"flags", T_USHORT, KQ_OFF(e.flags)},
{"fflags", T_UINT, KQ_OFF(e.fflags)},
- {"data", T_INT, KQ_OFF(e.data)},
- {"udata", T_INT, KQ_OFF(e.udata)},
+ {"data", T_INTPTRT, KQ_OFF(e.data)},
+ {"udata", T_UINTPTRT, KQ_OFF(e.udata)},
{NULL} /* Sentinel */
};
#undef KQ_OFF
char buf[1024];
PyOS_snprintf(
buf, sizeof(buf),
- "<select.kevent ident=%lu filter=%d flags=0x%x fflags=0x%x "
- "data=0x%lx udata=%p>",
- (unsigned long)(s->e.ident), s->e.filter, s->e.flags,
- s->e.fflags, (long)(s->e.data), s->e.udata);
- return PyBytes_FromString(buf);
+ "<select.kevent ident=%zu filter=%d flags=0x%x fflags=0x%x "
+ "data=0x%zd udata=%p>",
+ (size_t)(s->e.ident), s->e.filter, s->e.flags,
+ s->e.fflags, (Py_ssize_t)(s->e.data), s->e.udata);
+ return PyUnicode_FromString(buf);
}
static int
PyObject *pfd;
static char *kwlist[] = {"ident", "filter", "flags", "fflags",
"data", "udata", NULL};
+ static char *fmt = "O|hhi" INTPTRT_FMT_UNIT UINTPTRT_FMT_UNIT ":kevent";
EV_SET(&(self->e), 0, EVFILT_READ, EV_ADD, 0, 0, 0); /* defaults */
- if (!PyArg_ParseTupleAndKeywords(args, kwds, "O|hhiii:kevent", kwlist,
+ if (!PyArg_ParseTupleAndKeywords(args, kwds, fmt, kwlist,
&pfd, &(self->e.filter), &(self->e.flags),
&(self->e.fflags), &(self->e.data), &(self->e.udata))) {
return -1;
}
- self->e.ident = PyObject_AsFileDescriptor(pfd);
- if (self->e.ident == -1) {
+ if (PyLong_Check(pfd)) {
+ self->e.ident = PyLong_AsUintptr_t(pfd);
+ }
+ else {
+ self->e.ident = PyObject_AsFileDescriptor(pfd);
+ }
+ if (PyErr_Occurred()) {
return -1;
}
return 0;
kqueue_event_richcompare(kqueue_event_Object *s, kqueue_event_Object *o,
int op)
{
- int result = 0;
+ Py_intptr_t result = 0;
if (!kqueue_event_Check(o)) {
if (op == Py_EQ || op == Py_NE) {
result = (result > 0);
break;
}
- return PyBool_FromLong(result);
+ return PyBool_FromLong((long)result);
}
static PyTypeObject kqueue_event_Type = {