self.assertRaises(TypeError, sqlite.register_adapter, {}, None)
def CheckSetIsolationLevel(self):
- """
- See issue 3312.
- """
+ # See issue 27881.
+ class CustomStr(str):
+ def upper(self):
+ return None
+ def __del__(self):
+ con.isolation_level = ""
+
con = sqlite.connect(":memory:")
- setattr(con, "isolation_level", "\xe9")
+ con.isolation_level = None
+ for level in "", "DEFERRED", "IMMEDIATE", "EXCLUSIVE":
+ with self.subTest(level=level):
+ con.isolation_level = level
+ con.isolation_level = level.lower()
+ con.isolation_level = level.capitalize()
+ con.isolation_level = CustomStr(level)
+
+ # setting isolation_level failure should not alter previous state
+ con.isolation_level = None
+ con.isolation_level = "DEFERRED"
+ pairs = [
+ (1, TypeError), (b'', TypeError), ("abc", ValueError),
+ ("IMMEDIATE\0EXCLUSIVE", ValueError), ("\xe9", ValueError),
+ ]
+ for value, exc in pairs:
+ with self.subTest(level=value):
+ with self.assertRaises(exc):
+ con.isolation_level = value
+ self.assertEqual(con.isolation_level, "DEFERRED")
def CheckCursorConstructorCallCheck(self):
"""
_Py_IDENTIFIER(cursor);
+static const char * const begin_statements[] = {
+ "BEGIN ",
+ "BEGIN DEFERRED",
+ "BEGIN IMMEDIATE",
+ "BEGIN EXCLUSIVE",
+ NULL
+};
+
static int pysqlite_connection_set_isolation_level(pysqlite_Connection* self, PyObject* isolation_level);
static void _pysqlite_drop_unused_cursor_references(pysqlite_Connection* self);
Py_END_ALLOW_THREADS
}
- if (self->begin_statement) {
- PyMem_Free(self->begin_statement);
- }
Py_XDECREF(self->isolation_level);
Py_XDECREF(self->function_pinboard);
Py_XDECREF(self->row_factory);
static int pysqlite_connection_set_isolation_level(pysqlite_Connection* self, PyObject* isolation_level)
{
- PyObject* res;
- PyObject* begin_statement;
- static PyObject* begin_word;
-
- Py_XDECREF(self->isolation_level);
-
- if (self->begin_statement) {
- PyMem_Free(self->begin_statement);
- self->begin_statement = NULL;
- }
-
if (isolation_level == Py_None) {
- Py_INCREF(Py_None);
- self->isolation_level = Py_None;
-
- res = pysqlite_connection_commit(self, NULL);
+ PyObject *res = pysqlite_connection_commit(self, NULL);
if (!res) {
return -1;
}
Py_DECREF(res);
+ self->begin_statement = NULL;
self->inTransaction = 0;
} else {
- const char *statement;
- Py_ssize_t size;
-
- Py_INCREF(isolation_level);
- self->isolation_level = isolation_level;
-
- if (!begin_word) {
- begin_word = PyUnicode_FromString("BEGIN ");
- if (!begin_word) return -1;
- }
- begin_statement = PyUnicode_Concat(begin_word, isolation_level);
- if (!begin_statement) {
+ const char * const *candidate;
+ PyObject *uppercase_level;
+ _Py_IDENTIFIER(upper);
+
+ if (!PyUnicode_Check(isolation_level)) {
+ PyErr_Format(PyExc_TypeError,
+ "isolation_level must be a string or None, not %.100s",
+ Py_TYPE(isolation_level)->tp_name);
return -1;
}
- statement = _PyUnicode_AsStringAndSize(begin_statement, &size);
- if (!statement) {
- Py_DECREF(begin_statement);
+ uppercase_level = _PyObject_CallMethodIdObjArgs(
+ (PyObject *)&PyUnicode_Type, &PyId_upper,
+ isolation_level, NULL);
+ if (!uppercase_level) {
return -1;
}
- self->begin_statement = PyMem_Malloc(size + 2);
- if (!self->begin_statement) {
- Py_DECREF(begin_statement);
+ for (candidate = begin_statements; *candidate; candidate++) {
+ if (!PyUnicode_CompareWithASCIIString(uppercase_level, *candidate + 6))
+ break;
+ }
+ Py_DECREF(uppercase_level);
+ if (!*candidate) {
+ PyErr_SetString(PyExc_ValueError,
+ "invalid value for isolation_level");
return -1;
}
-
- strcpy(self->begin_statement, statement);
- Py_DECREF(begin_statement);
+ self->begin_statement = *candidate;
}
+ Py_INCREF(isolation_level);
+ Py_XSETREF(self->isolation_level, isolation_level);
return 0;
}