Skip to content

gh-92206: Improve scoping of sqlite3 bind param functions #92250

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 249 additions & 1 deletion Modules/_sqlite/cursor.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,18 @@
*/

#include "cursor.h"
#include "microprotocols.h"
#include "module.h"
#include "util.h"

typedef enum {
TYPE_LONG,
TYPE_FLOAT,
TYPE_UNICODE,
TYPE_BUFFER,
TYPE_UNKNOWN
} parameter_type;

#define clinic_state() (pysqlite_get_state_by_type(Py_TYPE(self)))
#include "clinic/cursor.c.h"
#undef clinic_state
Expand Down Expand Up @@ -506,6 +515,245 @@ stmt_step(sqlite3_stmt *statement)
return rc;
}

static int
bind_param(pysqlite_Statement *self, int pos, PyObject *parameter)
{
int rc = SQLITE_OK;
const char *string;
Py_ssize_t buflen;
parameter_type paramtype;

if (parameter == Py_None) {
rc = sqlite3_bind_null(self->st, pos);
goto final;
}

if (PyLong_CheckExact(parameter)) {
paramtype = TYPE_LONG;
} else if (PyFloat_CheckExact(parameter)) {
paramtype = TYPE_FLOAT;
} else if (PyUnicode_CheckExact(parameter)) {
paramtype = TYPE_UNICODE;
} else if (PyLong_Check(parameter)) {
paramtype = TYPE_LONG;
} else if (PyFloat_Check(parameter)) {
paramtype = TYPE_FLOAT;
} else if (PyUnicode_Check(parameter)) {
paramtype = TYPE_UNICODE;
} else if (PyObject_CheckBuffer(parameter)) {
paramtype = TYPE_BUFFER;
} else {
paramtype = TYPE_UNKNOWN;
}

switch (paramtype) {
case TYPE_LONG: {
sqlite_int64 value = _pysqlite_long_as_int64(parameter);
if (value == -1 && PyErr_Occurred())
rc = -1;
else
rc = sqlite3_bind_int64(self->st, pos, value);
break;
}
case TYPE_FLOAT: {
double value = PyFloat_AsDouble(parameter);
if (value == -1 && PyErr_Occurred()) {
rc = -1;
}
else {
rc = sqlite3_bind_double(self->st, pos, value);
}
break;
}
case TYPE_UNICODE:
string = PyUnicode_AsUTF8AndSize(parameter, &buflen);
if (string == NULL)
return -1;
if (buflen > INT_MAX) {
PyErr_SetString(PyExc_OverflowError,
"string longer than INT_MAX bytes");
return -1;
}
rc = sqlite3_bind_text(self->st, pos, string, (int)buflen, SQLITE_TRANSIENT);
break;
case TYPE_BUFFER: {
Py_buffer view;
if (PyObject_GetBuffer(parameter, &view, PyBUF_SIMPLE) != 0) {
return -1;
}
if (view.len > INT_MAX) {
PyErr_SetString(PyExc_OverflowError,
"BLOB longer than INT_MAX bytes");
PyBuffer_Release(&view);
return -1;
}
rc = sqlite3_bind_blob(self->st, pos, view.buf, (int)view.len, SQLITE_TRANSIENT);
PyBuffer_Release(&view);
break;
}
case TYPE_UNKNOWN:
rc = -1;
}

final:
return rc;
}

/* returns 0 if the object is one of Python's internal ones that don't need to be adapted */
static inline int
need_adapt(pysqlite_state *state, PyObject *obj)
{
if (state->BaseTypeAdapted) {
return 1;
}

if (PyLong_CheckExact(obj) || PyFloat_CheckExact(obj)
|| PyUnicode_CheckExact(obj) || PyByteArray_CheckExact(obj)) {
return 0;
} else {
return 1;
}
}

static void
bind_parameters(pysqlite_state *state, pysqlite_Statement *self,
PyObject *parameters)
{
PyObject* current_param;
PyObject* adapted;
const char* binding_name;
int i;
int rc;
int num_params_needed;
Py_ssize_t num_params;

Py_BEGIN_ALLOW_THREADS
num_params_needed = sqlite3_bind_parameter_count(self->st);
Py_END_ALLOW_THREADS

if (PyTuple_CheckExact(parameters) || PyList_CheckExact(parameters) || (!PyDict_Check(parameters) && PySequence_Check(parameters))) {
/* parameters passed as sequence */
if (PyTuple_CheckExact(parameters)) {
num_params = PyTuple_GET_SIZE(parameters);
} else if (PyList_CheckExact(parameters)) {
num_params = PyList_GET_SIZE(parameters);
} else {
num_params = PySequence_Size(parameters);
if (num_params == -1) {
return;
}
}
if (num_params != num_params_needed) {
PyErr_Format(state->ProgrammingError,
"Incorrect number of bindings supplied. The current "
"statement uses %d, and there are %zd supplied.",
num_params_needed, num_params);
return;
}
for (i = 0; i < num_params; i++) {
if (PyTuple_CheckExact(parameters)) {
PyObject *item = PyTuple_GET_ITEM(parameters, i);
current_param = Py_NewRef(item);
} else if (PyList_CheckExact(parameters)) {
PyObject *item = PyList_GetItem(parameters, i);
current_param = Py_XNewRef(item);
} else {
current_param = PySequence_GetItem(parameters, i);
}
if (!current_param) {
return;
}

if (!need_adapt(state, current_param)) {
adapted = current_param;
} else {
PyObject *protocol = (PyObject *)state->PrepareProtocolType;
adapted = pysqlite_microprotocols_adapt(state, current_param,
protocol,
current_param);
Py_DECREF(current_param);
if (!adapted) {
return;
}
}

rc = bind_param(self, i + 1, adapted);
Py_DECREF(adapted);

if (rc != SQLITE_OK) {
if (!PyErr_Occurred()) {
PyErr_Format(state->InterfaceError,
"Error binding parameter %d - "
"probably unsupported type.", i);
}
return;
}
}
} else if (PyDict_Check(parameters)) {
/* parameters passed as dictionary */
for (i = 1; i <= num_params_needed; i++) {
PyObject *binding_name_obj;
Py_BEGIN_ALLOW_THREADS
binding_name = sqlite3_bind_parameter_name(self->st, i);
Py_END_ALLOW_THREADS
if (!binding_name) {
PyErr_Format(state->ProgrammingError,
"Binding %d has no name, but you supplied a "
"dictionary (which has only names).", i);
return;
}

binding_name++; /* skip first char (the colon) */
binding_name_obj = PyUnicode_FromString(binding_name);
if (!binding_name_obj) {
return;
}
if (PyDict_CheckExact(parameters)) {
PyObject *item = PyDict_GetItemWithError(parameters, binding_name_obj);
current_param = Py_XNewRef(item);
} else {
current_param = PyObject_GetItem(parameters, binding_name_obj);
}
Py_DECREF(binding_name_obj);
if (!current_param) {
if (!PyErr_Occurred() || PyErr_ExceptionMatches(PyExc_LookupError)) {
PyErr_Format(state->ProgrammingError,
"You did not supply a value for binding "
"parameter :%s.", binding_name);
}
return;
}

if (!need_adapt(state, current_param)) {
adapted = current_param;
} else {
PyObject *protocol = (PyObject *)state->PrepareProtocolType;
adapted = pysqlite_microprotocols_adapt(state, current_param,
protocol,
current_param);
Py_DECREF(current_param);
if (!adapted) {
return;
}
}

rc = bind_param(self, i, adapted);
Py_DECREF(adapted);

if (rc != SQLITE_OK) {
if (!PyErr_Occurred()) {
PyErr_Format(state->InterfaceError,
"Error binding parameter :%s - "
"probably unsupported type.", binding_name);
}
return;
}
}
} else {
PyErr_SetString(PyExc_ValueError, "parameters are of unsupported type");
}
}

PyObject *
_pysqlite_query_execute(pysqlite_Cursor* self, int multiple, PyObject* operation, PyObject* second_argument)
{
Expand Down Expand Up @@ -617,7 +865,7 @@ _pysqlite_query_execute(pysqlite_Cursor* self, int multiple, PyObject* operation

pysqlite_statement_mark_dirty(self->statement);

pysqlite_statement_bind_parameters(state, self->statement, parameters);
bind_parameters(state, self->statement, parameters);
if (PyErr_Occurred()) {
goto error;
}
Expand Down
Loading