plpython: Add SPI cursor support
authorPeter Eisentraut <peter_e@gmx.net>
Mon, 5 Dec 2011 17:52:15 +0000 (19:52 +0200)
committerPeter Eisentraut <peter_e@gmx.net>
Mon, 5 Dec 2011 17:52:15 +0000 (19:52 +0200)
Add a function plpy.cursor that is similar to plpy.execute but uses an
SPI cursor to avoid fetching the entire result set into memory.

Jan Urbański, reviewed by Steve Singer

doc/src/sgml/plpython.sgml
src/pl/plpython/expected/plpython_spi.out
src/pl/plpython/expected/plpython_subtransaction.out
src/pl/plpython/expected/plpython_subtransaction_0.out
src/pl/plpython/expected/plpython_subtransaction_5.out
src/pl/plpython/expected/plpython_test.out
src/pl/plpython/plpython.c
src/pl/plpython/sql/plpython_spi.sql
src/pl/plpython/sql/plpython_subtransaction.sql

index eda2bbf34c539a23866b61632712ce8c5b168a30..618f8d055e919ba6ec4dae651fcbe1608e80b107 100644 (file)
@@ -891,6 +891,15 @@ $$ LANGUAGE plpythonu;
    can be modified.
   </para>
 
+  <para>
+   Note that calling <literal>plpy.execute</literal> will cause the entire
+   result set to be read into memory.  Only use that function when you are sure
+   that the result set will be relatively small.  If you don't want to risk
+   excessive memory usage when fetching large results,
+   use <literal>plpy.cursor</literal> rather
+   than <literal>plpy.execute</literal>.
+  </para>
+
   <para>
    For example:
 <programlisting>
@@ -958,6 +967,78 @@ $$ LANGUAGE plpythonu;
 
   </sect2>
 
+  <sect2>
+   <title>Accessing Data with Cursors</title>
+
+   <para>
+    The <literal>plpy.cursor</literal> function accepts the same arguments
+    as <literal>plpy.execute</literal> (except for <literal>limit</literal>)
+    and returns a cursor object, which allows you to process large result sets
+    in smaller chunks.  As with <literal>plpy.execute</literal>, either a query
+    string or a plan object along with a list of arguments can be used.  The
+    cursor object provides a <literal>fetch</literal> method that accepts an
+    integer parameter and returns a result object.  Each time you
+    call <literal>fetch</literal>, the returned object will contain the next
+    batch of rows, never larger than the parameter value.  Once all rows are
+    exhausted, <literal>fetch</literal> starts returning an empty result
+    object.  Cursor objects also provide an
+    <ulink url="http://docs.python.org/library/stdtypes.html#iterator-types">iterator
+    interface</ulink>, yielding one row at a time until all rows are exhausted.
+    Data fetched that way is not returned as result objects, but rather as
+    dictionaries, each dictionary corresponding to a single result row.
+   </para>
+
+   <para>
+    Cursors are automatically disposed of.  But if you want to explicitly
+    release all resources held by a cursor, use the <literal>close</literal>
+    method.  Once closed, a cursor cannot be fetched from anymore.
+   </para>
+
+   <tip>
+    <para>
+      Do not confuse objects created by <literal>plpy.cursor</literal> with
+      DB-API cursors as defined by
+      the <ulink url="http://www.python.org/dev/peps/pep-0249/">Python Database
+      API specification</ulink>.  They don't have anything in common except for
+      the name.
+    </para>
+   </tip>
+
+   <para>
+    An example of two ways of processing data from a large table is:
+<programlisting>
+CREATE FUNCTION count_odd_iterator() RETURNS integer AS $$
+odd = 0
+for row in plpy.cursor("select num from largetable"):
+    if row['num'] % 2:
+         odd += 1
+return odd
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION count_odd_fetch(batch_size integer) RETURNS integer AS $$
+odd = 0
+cursor = plpy.cursor("select num from largetable")
+while True:
+    rows = cursor.fetch(batch_size)
+    if not rows:
+        break
+    for row in rows:
+        if row['num'] % 2:
+            odd += 1
+return odd
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION count_odd_prepared() RETURNS integer AS $$
+odd = 0
+plan = plpy.prepare("select num from largetable where num % $1 &lt;&gt; 0", ["integer"])
+rows = list(plpy.cursor(plan, [2]))
+
+return len(rows)
+$$ LANGUAGE plpythonu;
+</programlisting>
+   </para>
+  </sect2>
+
   <sect2 id="plpython-trapping">
    <title>Trapping Errors</title>
 
index 7f4ae5ca9972b95d247a85b2dd7def4e2a42e432..3b4d7a30105c6aa00eea135cdd332d78ba45cb4b 100644 (file)
@@ -133,3 +133,154 @@ CONTEXT:  PL/Python function "result_nrows_test"
                  2
 (1 row)
 
+-- cursor objects
+CREATE FUNCTION simple_cursor_test() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+does = 0
+for row in res:
+    if row['lname'] == 'doe':
+        does += 1
+return does
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION double_cursor_close() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+res.close()
+res.close()
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION cursor_fetch() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+assert len(res.fetch(3)) == 3
+assert len(res.fetch(3)) == 1
+assert len(res.fetch(3)) == 0
+assert len(res.fetch(3)) == 0
+try:
+    # use next() or __next__(), the method name changed in
+    # http://www.python.org/dev/peps/pep-3114/
+    try:
+        res.next()
+    except AttributeError:
+        res.__next__()
+except StopIteration:
+    pass
+else:
+    assert False, "StopIteration not raised"
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION cursor_mix_next_and_fetch() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users order by fname")
+assert len(res.fetch(2)) == 2
+
+item = None
+try:
+    item = res.next()
+except AttributeError:
+    item = res.__next__()
+assert item['fname'] == 'rick'
+
+assert len(res.fetch(2)) == 1
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION fetch_after_close() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+res.close()
+try:
+    res.fetch(1)
+except ValueError:
+    pass
+else:
+    assert False, "ValueError not raised"
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION next_after_close() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+res.close()
+try:
+    try:
+        res.next()
+    except AttributeError:
+        res.__next__()
+except ValueError:
+    pass
+else:
+    assert False, "ValueError not raised"
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION cursor_fetch_next_empty() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users where false")
+assert len(res.fetch(1)) == 0
+try:
+    try:
+        res.next()
+    except AttributeError:
+        res.__next__()
+except StopIteration:
+    pass
+else:
+    assert False, "StopIteration not raised"
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION cursor_plan() RETURNS SETOF text AS $$
+plan = plpy.prepare(
+    "select fname, lname from users where fname like $1 || '%' order by fname",
+    ["text"])
+for row in plpy.cursor(plan, ["w"]):
+    yield row['fname']
+for row in plpy.cursor(plan, ["j"]):
+    yield row['fname']
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION cursor_plan_wrong_args() RETURNS SETOF text AS $$
+plan = plpy.prepare("select fname, lname from users where fname like $1 || '%'",
+                    ["text"])
+c = plpy.cursor(plan, ["a", "b"])
+$$ LANGUAGE plpythonu;
+SELECT simple_cursor_test();
+ simple_cursor_test 
+--------------------
+                  3
+(1 row)
+
+SELECT double_cursor_close();
+ double_cursor_close 
+---------------------
+                    
+(1 row)
+
+SELECT cursor_fetch();
+ cursor_fetch 
+--------------
+             
+(1 row)
+
+SELECT cursor_mix_next_and_fetch();
+ cursor_mix_next_and_fetch 
+---------------------------
+                          
+(1 row)
+
+SELECT fetch_after_close();
+ fetch_after_close 
+-------------------
+                  
+(1 row)
+
+SELECT next_after_close();
+ next_after_close 
+------------------
+                 
+(1 row)
+
+SELECT cursor_fetch_next_empty();
+ cursor_fetch_next_empty 
+-------------------------
+                        
+(1 row)
+
+SELECT cursor_plan();
+ cursor_plan 
+-------------
+ willem
+ jane
+ john
+(3 rows)
+
+SELECT cursor_plan_wrong_args();
+ERROR:  TypeError: Expected sequence of 1 argument, got 2: ['a', 'b']
+CONTEXT:  Traceback (most recent call last):
+  PL/Python function "cursor_plan_wrong_args", line 4, in <module>
+    c = plpy.cursor(plan, ["a", "b"])
+PL/Python function "cursor_plan_wrong_args"
index 515b0bb734443e21b507ab8c35ff61a12008cca6..c2c22f0ae41cf7bb0015dc07884ff2182e5cf914 100644 (file)
@@ -409,3 +409,69 @@ SELECT * FROM subtransaction_tbl;
 (1 row)
 
 DROP TABLE subtransaction_tbl;
+-- cursor/subtransactions interactions
+CREATE FUNCTION cursor_in_subxact() RETURNS int AS $$
+with plpy.subtransaction():
+    cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)")
+    cur.fetch(10)
+fetched = cur.fetch(10);
+return int(fetched[5]["i"])
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION cursor_aborted_subxact() RETURNS int AS $$
+try:
+    with plpy.subtransaction():
+        cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)")
+        cur.fetch(10);
+        plpy.execute("select no_such_function()")
+except plpy.SPIError:
+    fetched = cur.fetch(10)
+    return int(fetched[5]["i"])
+return 0 # not reached
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION cursor_plan_aborted_subxact() RETURNS int AS $$
+try:
+    with plpy.subtransaction():
+        plpy.execute('create temporary table tmp(i) '
+                     'as select generate_series(1, 10)')
+        plan = plpy.prepare("select i from tmp")
+        cur = plpy.cursor(plan)
+        plpy.execute("select no_such_function()")
+except plpy.SPIError:
+    fetched = cur.fetch(5)
+    return fetched[2]["i"]
+return 0 # not reached
+$$ LANGUAGE plpythonu;
+CREATE FUNCTION cursor_close_aborted_subxact() RETURNS boolean AS $$
+try:
+    with plpy.subtransaction():
+        cur = plpy.cursor('select 1')
+        plpy.execute("select no_such_function()")
+except plpy.SPIError:
+    cur.close()
+    return True
+return False # not reached
+$$ LANGUAGE plpythonu;
+SELECT cursor_in_subxact();
+ cursor_in_subxact 
+-------------------
+                16
+(1 row)
+
+SELECT cursor_aborted_subxact();
+ERROR:  ValueError: iterating a cursor in an aborted subtransaction
+CONTEXT:  Traceback (most recent call last):
+  PL/Python function "cursor_aborted_subxact", line 8, in <module>
+    fetched = cur.fetch(10)
+PL/Python function "cursor_aborted_subxact"
+SELECT cursor_plan_aborted_subxact();
+ERROR:  ValueError: iterating a cursor in an aborted subtransaction
+CONTEXT:  Traceback (most recent call last):
+  PL/Python function "cursor_plan_aborted_subxact", line 10, in <module>
+    fetched = cur.fetch(5)
+PL/Python function "cursor_plan_aborted_subxact"
+SELECT cursor_close_aborted_subxact();
+ERROR:  ValueError: closing a cursor in an aborted subtransaction
+CONTEXT:  Traceback (most recent call last):
+  PL/Python function "cursor_close_aborted_subxact", line 7, in <module>
+    cur.close()
+PL/Python function "cursor_close_aborted_subxact"
index 4017c41edc92606831b1cc15843835b2ed11efe0..ece0134a946af1c233800e14c75ab4877060e277 100644 (file)
@@ -382,3 +382,73 @@ SELECT * FROM subtransaction_tbl;
 (0 rows)
 
 DROP TABLE subtransaction_tbl;
+-- cursor/subtransactions interactions
+CREATE FUNCTION cursor_in_subxact() RETURNS int AS $$
+with plpy.subtransaction():
+    cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)")
+    cur.fetch(10)
+fetched = cur.fetch(10);
+return int(fetched[5]["i"])
+$$ LANGUAGE plpythonu;
+ERROR:  could not compile PL/Python function "cursor_in_subxact"
+DETAIL:  SyntaxError: invalid syntax (line 3)
+CREATE FUNCTION cursor_aborted_subxact() RETURNS int AS $$
+try:
+    with plpy.subtransaction():
+        cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)")
+        cur.fetch(10);
+        plpy.execute("select no_such_function()")
+except plpy.SPIError:
+    fetched = cur.fetch(10)
+    return int(fetched[5]["i"])
+return 0 # not reached
+$$ LANGUAGE plpythonu;
+ERROR:  could not compile PL/Python function "cursor_aborted_subxact"
+DETAIL:  SyntaxError: invalid syntax (line 4)
+CREATE FUNCTION cursor_plan_aborted_subxact() RETURNS int AS $$
+try:
+    with plpy.subtransaction():
+        plpy.execute('create temporary table tmp(i) '
+                     'as select generate_series(1, 10)')
+        plan = plpy.prepare("select i from tmp")
+        cur = plpy.cursor(plan)
+        plpy.execute("select no_such_function()")
+except plpy.SPIError:
+    fetched = cur.fetch(5)
+    return fetched[2]["i"]
+return 0 # not reached
+$$ LANGUAGE plpythonu;
+ERROR:  could not compile PL/Python function "cursor_plan_aborted_subxact"
+DETAIL:  SyntaxError: invalid syntax (line 4)
+CREATE FUNCTION cursor_close_aborted_subxact() RETURNS boolean AS $$
+try:
+    with plpy.subtransaction():
+        cur = plpy.cursor('select 1')
+        plpy.execute("select no_such_function()")
+except plpy.SPIError:
+    cur.close()
+    return True
+return False # not reached
+$$ LANGUAGE plpythonu;
+ERROR:  could not compile PL/Python function "cursor_close_aborted_subxact"
+DETAIL:  SyntaxError: invalid syntax (line 4)
+SELECT cursor_in_subxact();
+ERROR:  function cursor_in_subxact() does not exist
+LINE 1: SELECT cursor_in_subxact();
+               ^
+HINT:  No function matches the given name and argument types. You might need to add explicit type casts.
+SELECT cursor_aborted_subxact();
+ERROR:  function cursor_aborted_subxact() does not exist
+LINE 1: SELECT cursor_aborted_subxact();
+               ^
+HINT:  No function matches the given name and argument types. You might need to add explicit type casts.
+SELECT cursor_plan_aborted_subxact();
+ERROR:  function cursor_plan_aborted_subxact() does not exist
+LINE 1: SELECT cursor_plan_aborted_subxact();
+               ^
+HINT:  No function matches the given name and argument types. You might need to add explicit type casts.
+SELECT cursor_close_aborted_subxact();
+ERROR:  function cursor_close_aborted_subxact() does not exist
+LINE 1: SELECT cursor_close_aborted_subxact();
+               ^
+HINT:  No function matches the given name and argument types. You might need to add explicit type casts.
index 9216151b94e5b56760d4ab814181cbad7e66422a..66de2394990b6ae7f304998e0da6a1173c36cd60 100644 (file)
@@ -382,3 +382,73 @@ SELECT * FROM subtransaction_tbl;
 (0 rows)
 
 DROP TABLE subtransaction_tbl;
+-- cursor/subtransactions interactions
+CREATE FUNCTION cursor_in_subxact() RETURNS int AS $$
+with plpy.subtransaction():
+    cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)")
+    cur.fetch(10)
+fetched = cur.fetch(10);
+return int(fetched[5]["i"])
+$$ LANGUAGE plpythonu;
+ERROR:  could not compile PL/Python function "cursor_in_subxact"
+DETAIL:  SyntaxError: invalid syntax (<string>, line 3)
+CREATE FUNCTION cursor_aborted_subxact() RETURNS int AS $$
+try:
+    with plpy.subtransaction():
+        cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)")
+        cur.fetch(10);
+        plpy.execute("select no_such_function()")
+except plpy.SPIError:
+    fetched = cur.fetch(10)
+    return int(fetched[5]["i"])
+return 0 # not reached
+$$ LANGUAGE plpythonu;
+ERROR:  could not compile PL/Python function "cursor_aborted_subxact"
+DETAIL:  SyntaxError: invalid syntax (<string>, line 4)
+CREATE FUNCTION cursor_plan_aborted_subxact() RETURNS int AS $$
+try:
+    with plpy.subtransaction():
+        plpy.execute('create temporary table tmp(i) '
+                     'as select generate_series(1, 10)')
+        plan = plpy.prepare("select i from tmp")
+        cur = plpy.cursor(plan)
+        plpy.execute("select no_such_function()")
+except plpy.SPIError:
+    fetched = cur.fetch(5)
+    return fetched[2]["i"]
+return 0 # not reached
+$$ LANGUAGE plpythonu;
+ERROR:  could not compile PL/Python function "cursor_plan_aborted_subxact"
+DETAIL:  SyntaxError: invalid syntax (<string>, line 4)
+CREATE FUNCTION cursor_close_aborted_subxact() RETURNS boolean AS $$
+try:
+    with plpy.subtransaction():
+        cur = plpy.cursor('select 1')
+        plpy.execute("select no_such_function()")
+except plpy.SPIError:
+    cur.close()
+    return True
+return False # not reached
+$$ LANGUAGE plpythonu;
+ERROR:  could not compile PL/Python function "cursor_close_aborted_subxact"
+DETAIL:  SyntaxError: invalid syntax (<string>, line 4)
+SELECT cursor_in_subxact();
+ERROR:  function cursor_in_subxact() does not exist
+LINE 1: SELECT cursor_in_subxact();
+               ^
+HINT:  No function matches the given name and argument types. You might need to add explicit type casts.
+SELECT cursor_aborted_subxact();
+ERROR:  function cursor_aborted_subxact() does not exist
+LINE 1: SELECT cursor_aborted_subxact();
+               ^
+HINT:  No function matches the given name and argument types. You might need to add explicit type casts.
+SELECT cursor_plan_aborted_subxact();
+ERROR:  function cursor_plan_aborted_subxact() does not exist
+LINE 1: SELECT cursor_plan_aborted_subxact();
+               ^
+HINT:  No function matches the given name and argument types. You might need to add explicit type casts.
+SELECT cursor_close_aborted_subxact();
+ERROR:  function cursor_close_aborted_subxact() does not exist
+LINE 1: SELECT cursor_close_aborted_subxact();
+               ^
+HINT:  No function matches the given name and argument types. You might need to add explicit type casts.
index f2dda66532e525f1ae79a9d92beb4bdde6cd51bd..a884fc0e27feb677e2e7a38bc569e9f6e5b887bb 100644 (file)
@@ -43,9 +43,9 @@ contents.sort()
 return ", ".join(contents)
 $$ LANGUAGE plpythonu;
 select module_contents();
-                                                                           module_contents                                                                            
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------
- Error, Fatal, SPIError, debug, error, execute, fatal, info, log, notice, prepare, quote_ident, quote_literal, quote_nullable, spiexceptions, subtransaction, warning
+                                                                               module_contents                                                                                
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Error, Fatal, SPIError, cursor, debug, error, execute, fatal, info, log, notice, prepare, quote_ident, quote_literal, quote_nullable, spiexceptions, subtransaction, warning
 (1 row)
 
 CREATE FUNCTION elog_test() RETURNS void
index afd5dfce83a9c33e3d84536cf7fc81c1e615b77a..29e0ac7c45466fbf17bb8cbee6327a1c66d964a4 100644 (file)
@@ -134,6 +134,11 @@ typedef int Py_ssize_t;
        PyObject_HEAD_INIT(type) size,
 #endif
 
+/* Python 3 removed the Py_TPFLAGS_HAVE_ITER flag */
+#if PY_MAJOR_VERSION >= 3
+#define Py_TPFLAGS_HAVE_ITER 0
+#endif
+
 /* define our text domain for translations */
 #undef TEXTDOMAIN
 #define TEXTDOMAIN PG_TEXTDOMAIN("plpython")
@@ -310,6 +315,14 @@ typedef struct PLySubtransactionObject
    bool        exited;
 } PLySubtransactionObject;
 
+typedef struct PLyCursorObject
+{
+   PyObject_HEAD
+   char        *portalname;
+   PLyTypeInfo result;
+   bool        closed;
+} PLyCursorObject;
+
 /* A list of all known exceptions, generated from backend/utils/errcodes.txt */
 typedef struct ExceptionMap
 {
@@ -486,6 +499,10 @@ static char PLy_subtransaction_doc[] = {
    "PostgreSQL subtransaction context manager"
 };
 
+static char PLy_cursor_doc[] = {
+   "Wrapper around a PostgreSQL cursor"
+};
+
 
 /*
  * the function definitions
@@ -2963,6 +2980,14 @@ static void PLy_subtransaction_dealloc(PyObject *);
 static PyObject *PLy_subtransaction_enter(PyObject *, PyObject *);
 static PyObject *PLy_subtransaction_exit(PyObject *, PyObject *);
 
+static PyObject *PLy_cursor(PyObject *self, PyObject *unused);
+static PyObject *PLy_cursor_query(const char *query);
+static PyObject *PLy_cursor_plan(PyObject *ob, PyObject *args);
+static void PLy_cursor_dealloc(PyObject *arg);
+static PyObject *PLy_cursor_iternext(PyObject *self);
+static PyObject *PLy_cursor_fetch(PyObject *self, PyObject *args);
+static PyObject *PLy_cursor_close(PyObject *self, PyObject *unused);
+
 
 static PyMethodDef PLy_plan_methods[] = {
    {"status", PLy_plan_status, METH_VARARGS, NULL},
@@ -3099,6 +3124,47 @@ static PyTypeObject PLy_SubtransactionType = {
    PLy_subtransaction_methods, /* tp_tpmethods */
 };
 
+static PyMethodDef PLy_cursor_methods[] = {
+   {"fetch", PLy_cursor_fetch, METH_VARARGS, NULL},
+   {"close", PLy_cursor_close, METH_NOARGS, NULL},
+   {NULL, NULL, 0, NULL}
+};
+
+static PyTypeObject PLy_CursorType = {
+   PyVarObject_HEAD_INIT(NULL, 0)
+   "PLyCursor",        /* tp_name */
+   sizeof(PLyCursorObject),    /* tp_size */
+   0,                          /* tp_itemsize */
+
+   /*
+    * methods
+    */
+   PLy_cursor_dealloc,         /* tp_dealloc */
+   0,                          /* tp_print */
+   0,                          /* tp_getattr */
+   0,                          /* tp_setattr */
+   0,                          /* tp_compare */
+   0,                          /* tp_repr */
+   0,                          /* tp_as_number */
+   0,                          /* tp_as_sequence */
+   0,                          /* tp_as_mapping */
+   0,                          /* tp_hash */
+   0,                          /* tp_call */
+   0,                          /* tp_str */
+   0,                          /* tp_getattro */
+   0,                          /* tp_setattro */
+   0,                          /* tp_as_buffer */
+   Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_ITER,    /* tp_flags */
+   PLy_cursor_doc,             /* tp_doc */
+   0,                          /* tp_traverse */
+   0,                          /* tp_clear */
+   0,                          /* tp_richcompare */
+   0,                          /* tp_weaklistoffset */
+   PyObject_SelfIter,          /* tp_iter */
+   PLy_cursor_iternext,        /* tp_iternext */
+   PLy_cursor_methods,         /* tp_tpmethods */
+};
+
 static PyMethodDef PLy_methods[] = {
    /*
     * logging methods
@@ -3133,6 +3199,11 @@ static PyMethodDef PLy_methods[] = {
     */
    {"subtransaction", PLy_subtransaction, METH_NOARGS, NULL},
 
+   /*
+    * create a cursor
+    */
+   {"cursor", PLy_cursor, METH_VARARGS, NULL},
+
    {NULL, NULL, 0, NULL}
 };
 
@@ -3833,6 +3904,575 @@ PLy_spi_execute_fetch_result(SPITupleTable *tuptable, int rows, int status)
    return (PyObject *) result;
 }
 
+/*
+ * c = plpy.cursor("select * from largetable")
+ * c = plpy.cursor(plan, [])
+ */
+static PyObject *
+PLy_cursor(PyObject *self, PyObject *args)
+{
+   char       *query;
+   PyObject   *plan;
+   PyObject   *planargs = NULL;
+
+   if (PyArg_ParseTuple(args, "s", &query))
+       return PLy_cursor_query(query);
+
+   PyErr_Clear();
+
+   if (PyArg_ParseTuple(args, "O|O", &plan, &planargs))
+       return PLy_cursor_plan(plan, planargs);
+
+   PLy_exception_set(PLy_exc_error, "plpy.cursor expected a query or a plan");
+   return NULL;
+}
+
+
+static PyObject *
+PLy_cursor_query(const char *query)
+{
+   PLyCursorObject *cursor;
+   volatile MemoryContext oldcontext;
+   volatile ResourceOwner oldowner;
+
+   if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
+       return NULL;
+   cursor->portalname = NULL;
+   cursor->closed = false;
+   PLy_typeinfo_init(&cursor->result);
+
+   oldcontext = CurrentMemoryContext;
+   oldowner = CurrentResourceOwner;
+
+   BeginInternalSubTransaction(NULL);
+   MemoryContextSwitchTo(oldcontext);
+
+   PG_TRY();
+   {
+       SPIPlanPtr  plan;
+       Portal      portal;
+
+       pg_verifymbstr(query, strlen(query), false);
+
+       plan = SPI_prepare(query, 0, NULL);
+       if (plan == NULL)
+           elog(ERROR, "SPI_prepare failed: %s",
+                SPI_result_code_string(SPI_result));
+
+       portal = SPI_cursor_open(NULL, plan, NULL, NULL,
+                                PLy_curr_procedure->fn_readonly);
+       SPI_freeplan(plan);
+
+       if (portal == NULL)
+           elog(ERROR, "SPI_cursor_open() failed:%s",
+                SPI_result_code_string(SPI_result));
+
+       cursor->portalname = PLy_strdup(portal->name);
+
+       /* Commit the inner transaction, return to outer xact context */
+       ReleaseCurrentSubTransaction();
+       MemoryContextSwitchTo(oldcontext);
+       CurrentResourceOwner = oldowner;
+
+       /*
+        * AtEOSubXact_SPI() should not have popped any SPI context, but just
+        * in case it did, make sure we remain connected.
+        */
+       SPI_restore_connection();
+   }
+   PG_CATCH();
+   {
+       ErrorData  *edata;
+       PLyExceptionEntry *entry;
+       PyObject   *exc;
+
+       /* Save error info */
+       MemoryContextSwitchTo(oldcontext);
+       edata = CopyErrorData();
+       FlushErrorState();
+
+       /* Abort the inner transaction */
+       RollbackAndReleaseCurrentSubTransaction();
+       MemoryContextSwitchTo(oldcontext);
+       CurrentResourceOwner = oldowner;
+
+       Py_DECREF(cursor);
+
+       /*
+        * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will
+        * have left us in a disconnected state.  We need this hack to return
+        * to connected state.
+        */
+       SPI_restore_connection();
+
+       /* Look up the correct exception */
+       entry = hash_search(PLy_spi_exceptions, &(edata->sqlerrcode),
+                           HASH_FIND, NULL);
+       /* We really should find it, but just in case have a fallback */
+       Assert(entry != NULL);
+       exc = entry ? entry->exc : PLy_exc_spi_error;
+       /* Make Python raise the exception */
+       PLy_spi_exception_set(exc, edata);
+       return NULL;
+   }
+   PG_END_TRY();
+
+   Assert(cursor->portalname != NULL);
+   return (PyObject *) cursor;
+}
+
+static PyObject *
+PLy_cursor_plan(PyObject *ob, PyObject *args)
+{
+   PLyCursorObject *cursor;
+   volatile int nargs;
+   int         i;
+   PLyPlanObject *plan;
+   volatile MemoryContext oldcontext;
+   volatile ResourceOwner oldowner;
+
+   if (args)
+   {
+       if (!PySequence_Check(args) || PyString_Check(args) || PyUnicode_Check(args))
+       {
+           PLy_exception_set(PyExc_TypeError, "plpy.cursor takes a sequence as its second argument");
+           return NULL;
+       }
+       nargs = PySequence_Length(args);
+   }
+   else
+       nargs = 0;
+
+   plan = (PLyPlanObject *) ob;
+
+   if (nargs != plan->nargs)
+   {
+       char       *sv;
+       PyObject   *so = PyObject_Str(args);
+
+       if (!so)
+           PLy_elog(ERROR, "could not execute plan");
+       sv = PyString_AsString(so);
+       PLy_exception_set_plural(PyExc_TypeError,
+                                "Expected sequence of %d argument, got %d: %s",
+                                "Expected sequence of %d arguments, got %d: %s",
+                                plan->nargs,
+                                plan->nargs, nargs, sv);
+       Py_DECREF(so);
+
+       return NULL;
+   }
+
+   if ((cursor = PyObject_New(PLyCursorObject, &PLy_CursorType)) == NULL)
+       return NULL;
+   cursor->portalname = NULL;
+   cursor->closed = false;
+   PLy_typeinfo_init(&cursor->result);
+
+   oldcontext = CurrentMemoryContext;
+   oldowner = CurrentResourceOwner;
+
+   BeginInternalSubTransaction(NULL);
+   MemoryContextSwitchTo(oldcontext);
+
+   PG_TRY();
+   {
+       Portal      portal;
+       char       *volatile nulls;
+       volatile int j;
+
+       if (nargs > 0)
+           nulls = palloc(nargs * sizeof(char));
+       else
+           nulls = NULL;
+
+       for (j = 0; j < nargs; j++)
+       {
+           PyObject   *elem;
+
+           elem = PySequence_GetItem(args, j);
+           if (elem != Py_None)
+           {
+               PG_TRY();
+               {
+                   plan->values[j] =
+                       plan->args[j].out.d.func(&(plan->args[j].out.d),
+                                                -1,
+                                                elem);
+               }
+               PG_CATCH();
+               {
+                   Py_DECREF(elem);
+                   PG_RE_THROW();
+               }
+               PG_END_TRY();
+
+               Py_DECREF(elem);
+               nulls[j] = ' ';
+           }
+           else
+           {
+               Py_DECREF(elem);
+               plan->values[j] =
+                   InputFunctionCall(&(plan->args[j].out.d.typfunc),
+                                     NULL,
+                                     plan->args[j].out.d.typioparam,
+                                     -1);
+               nulls[j] = 'n';
+           }
+       }
+
+       portal = SPI_cursor_open(NULL, plan->plan, plan->values, nulls,
+                                PLy_curr_procedure->fn_readonly);
+       if (portal == NULL)
+           elog(ERROR, "SPI_cursor_open() failed:%s",
+                SPI_result_code_string(SPI_result));
+
+       cursor->portalname = PLy_strdup(portal->name);
+
+       /* Commit the inner transaction, return to outer xact context */
+       ReleaseCurrentSubTransaction();
+       MemoryContextSwitchTo(oldcontext);
+       CurrentResourceOwner = oldowner;
+
+       /*
+        * AtEOSubXact_SPI() should not have popped any SPI context, but just
+        * in case it did, make sure we remain connected.
+        */
+       SPI_restore_connection();
+   }
+   PG_CATCH();
+   {
+       int         k;
+       ErrorData  *edata;
+       PLyExceptionEntry *entry;
+       PyObject   *exc;
+
+       /* Save error info */
+       MemoryContextSwitchTo(oldcontext);
+       edata = CopyErrorData();
+       FlushErrorState();
+
+       /* cleanup plan->values array */
+       for (k = 0; k < nargs; k++)
+       {
+           if (!plan->args[k].out.d.typbyval &&
+               (plan->values[k] != PointerGetDatum(NULL)))
+           {
+               pfree(DatumGetPointer(plan->values[k]));
+               plan->values[k] = PointerGetDatum(NULL);
+           }
+       }
+
+       /* Abort the inner transaction */
+       RollbackAndReleaseCurrentSubTransaction();
+       MemoryContextSwitchTo(oldcontext);
+       CurrentResourceOwner = oldowner;
+
+       Py_DECREF(cursor);
+
+       /*
+        * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will
+        * have left us in a disconnected state.  We need this hack to return
+        * to connected state.
+        */
+       SPI_restore_connection();
+
+       /* Look up the correct exception */
+       entry = hash_search(PLy_spi_exceptions, &(edata->sqlerrcode),
+                           HASH_FIND, NULL);
+       /* We really should find it, but just in case have a fallback */
+       Assert(entry != NULL);
+       exc = entry ? entry->exc : PLy_exc_spi_error;
+       /* Make Python raise the exception */
+       PLy_spi_exception_set(exc, edata);
+       return NULL;
+   }
+   PG_END_TRY();
+
+   for (i = 0; i < nargs; i++)
+   {
+       if (!plan->args[i].out.d.typbyval &&
+           (plan->values[i] != PointerGetDatum(NULL)))
+       {
+           pfree(DatumGetPointer(plan->values[i]));
+           plan->values[i] = PointerGetDatum(NULL);
+       }
+   }
+
+   Assert(cursor->portalname != NULL);
+   return (PyObject *) cursor;
+}
+
+static void
+PLy_cursor_dealloc(PyObject *arg)
+{
+   PLyCursorObject *cursor;
+   Portal          portal;
+
+   cursor = (PLyCursorObject *) arg;
+
+   if (!cursor->closed)
+   {
+       portal = GetPortalByName(cursor->portalname);
+
+       if (PortalIsValid(portal))
+           SPI_cursor_close(portal);
+   }
+
+   PLy_free(cursor->portalname);
+   cursor->portalname = NULL;
+
+   PLy_typeinfo_dealloc(&cursor->result);
+   arg->ob_type->tp_free(arg);
+}
+
+static PyObject *
+PLy_cursor_iternext(PyObject *self)
+{
+   PLyCursorObject *cursor;
+   PyObject        *ret;
+   volatile MemoryContext oldcontext;
+   volatile ResourceOwner oldowner;
+   Portal          portal;
+
+   cursor = (PLyCursorObject *) self;
+
+   if (cursor->closed)
+   {
+       PLy_exception_set(PyExc_ValueError, "iterating a closed cursor");
+       return NULL;
+   }
+
+   portal = GetPortalByName(cursor->portalname);
+   if (!PortalIsValid(portal))
+   {
+       PLy_exception_set(PyExc_ValueError,
+                         "iterating a cursor in an aborted subtransaction");
+       return NULL;
+   }
+
+   oldcontext = CurrentMemoryContext;
+   oldowner = CurrentResourceOwner;
+
+   BeginInternalSubTransaction(NULL);
+   MemoryContextSwitchTo(oldcontext);
+
+   PG_TRY();
+   {
+       SPI_cursor_fetch(portal, true, 1);
+       if (SPI_processed == 0)
+       {
+           PyErr_SetNone(PyExc_StopIteration);
+           ret = NULL;
+       }
+       else
+       {
+           if (cursor->result.is_rowtype != 1)
+               PLy_input_tuple_funcs(&cursor->result, SPI_tuptable->tupdesc);
+
+           ret = PLyDict_FromTuple(&cursor->result, SPI_tuptable->vals[0],
+                                   SPI_tuptable->tupdesc);
+       }
+
+       SPI_freetuptable(SPI_tuptable);
+
+       /* Commit the inner transaction, return to outer xact context */
+       ReleaseCurrentSubTransaction();
+       MemoryContextSwitchTo(oldcontext);
+       CurrentResourceOwner = oldowner;
+
+       /*
+        * AtEOSubXact_SPI() should not have popped any SPI context, but just
+        * in case it did, make sure we remain connected.
+        */
+       SPI_restore_connection();
+   }
+   PG_CATCH();
+   {
+       ErrorData  *edata;
+       PLyExceptionEntry *entry;
+       PyObject   *exc;
+
+       /* Save error info */
+       MemoryContextSwitchTo(oldcontext);
+       edata = CopyErrorData();
+       FlushErrorState();
+
+       /* Abort the inner transaction */
+       RollbackAndReleaseCurrentSubTransaction();
+       MemoryContextSwitchTo(oldcontext);
+       CurrentResourceOwner = oldowner;
+
+       SPI_freetuptable(SPI_tuptable);
+
+       /*
+        * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will
+        * have left us in a disconnected state.  We need this hack to return
+        * to connected state.
+        */
+       SPI_restore_connection();
+
+       /* Look up the correct exception */
+       entry = hash_search(PLy_spi_exceptions, &edata->sqlerrcode,
+                           HASH_FIND, NULL);
+       /* We really should find it, but just in case have a fallback */
+       Assert(entry != NULL);
+       exc = entry ? entry->exc : PLy_exc_spi_error;
+       /* Make Python raise the exception */
+       PLy_spi_exception_set(exc, edata);
+       return NULL;
+   }
+   PG_END_TRY();
+
+   return ret;
+}
+
+static PyObject *
+PLy_cursor_fetch(PyObject *self, PyObject *args)
+{
+   PLyCursorObject *cursor;
+   int             count;
+   PLyResultObject *ret;
+   volatile MemoryContext oldcontext;
+   volatile ResourceOwner oldowner;
+   Portal          portal;
+
+   if (!PyArg_ParseTuple(args, "i", &count))
+       return NULL;
+
+   cursor = (PLyCursorObject *) self;
+
+   if (cursor->closed)
+   {
+       PLy_exception_set(PyExc_ValueError, "fetch from a closed cursor");
+       return NULL;
+   }
+
+   portal = GetPortalByName(cursor->portalname);
+   if (!PortalIsValid(portal))
+   {
+       PLy_exception_set(PyExc_ValueError,
+                         "iterating a cursor in an aborted subtransaction");
+       return NULL;
+   }
+
+   ret = (PLyResultObject *) PLy_result_new();
+   if (ret == NULL)
+       return NULL;
+
+   oldcontext = CurrentMemoryContext;
+   oldowner = CurrentResourceOwner;
+
+   BeginInternalSubTransaction(NULL);
+   MemoryContextSwitchTo(oldcontext);
+
+   PG_TRY();
+   {
+       SPI_cursor_fetch(portal, true, count);
+
+       if (cursor->result.is_rowtype != 1)
+           PLy_input_tuple_funcs(&cursor->result, SPI_tuptable->tupdesc);
+
+       Py_DECREF(ret->status);
+       ret->status = PyInt_FromLong(SPI_OK_FETCH);
+
+       Py_DECREF(ret->nrows);
+       ret->nrows = PyInt_FromLong(SPI_processed);
+
+       if (SPI_processed != 0)
+       {
+           int i;
+
+           Py_DECREF(ret->rows);
+           ret->rows = PyList_New(SPI_processed);
+
+           for (i = 0; i < SPI_processed; i++)
+           {
+               PyObject   *row = PLyDict_FromTuple(&cursor->result,
+                                                   SPI_tuptable->vals[i],
+                                                   SPI_tuptable->tupdesc);
+               PyList_SetItem(ret->rows, i, row);
+           }
+       }
+
+       SPI_freetuptable(SPI_tuptable);
+
+       /* Commit the inner transaction, return to outer xact context */
+       ReleaseCurrentSubTransaction();
+       MemoryContextSwitchTo(oldcontext);
+       CurrentResourceOwner = oldowner;
+
+       /*
+        * AtEOSubXact_SPI() should not have popped any SPI context, but just
+        * in case it did, make sure we remain connected.
+        */
+       SPI_restore_connection();
+   }
+   PG_CATCH();
+   {
+       ErrorData  *edata;
+       PLyExceptionEntry *entry;
+       PyObject   *exc;
+
+       /* Save error info */
+       MemoryContextSwitchTo(oldcontext);
+       edata = CopyErrorData();
+       FlushErrorState();
+
+       /* Abort the inner transaction */
+       RollbackAndReleaseCurrentSubTransaction();
+       MemoryContextSwitchTo(oldcontext);
+       CurrentResourceOwner = oldowner;
+
+       SPI_freetuptable(SPI_tuptable);
+
+       /*
+        * If AtEOSubXact_SPI() popped any SPI context of the subxact, it will
+        * have left us in a disconnected state.  We need this hack to return
+        * to connected state.
+        */
+       SPI_restore_connection();
+
+       /* Look up the correct exception */
+       entry = hash_search(PLy_spi_exceptions, &edata->sqlerrcode,
+                           HASH_FIND, NULL);
+       /* We really should find it, but just in case have a fallback */
+       Assert(entry != NULL);
+       exc = entry ? entry->exc : PLy_exc_spi_error;
+       /* Make Python raise the exception */
+       PLy_spi_exception_set(exc, edata);
+       return NULL;
+   }
+   PG_END_TRY();
+
+   return (PyObject *) ret;
+}
+
+static PyObject *
+PLy_cursor_close(PyObject *self, PyObject *unused)
+{
+   PLyCursorObject *cursor = (PLyCursorObject *) self;
+
+   if (!cursor->closed)
+   {
+       Portal portal = GetPortalByName(cursor->portalname);
+
+       if (!PortalIsValid(portal))
+       {
+           PLy_exception_set(PyExc_ValueError,
+                             "closing a cursor in an aborted subtransaction");
+           return NULL;
+       }
+
+       SPI_cursor_close(portal);
+       cursor->closed = true;
+   }
+
+   Py_INCREF(Py_None);
+   return Py_None;
+}
+
 /* s = plpy.subtransaction() */
 static PyObject *
 PLy_subtransaction(PyObject *self, PyObject *unused)
@@ -4184,6 +4824,8 @@ PLy_init_plpy(void)
        elog(ERROR, "could not initialize PLy_ResultType");
    if (PyType_Ready(&PLy_SubtransactionType) < 0)
        elog(ERROR, "could not initialize PLy_SubtransactionType");
+   if (PyType_Ready(&PLy_CursorType) < 0)
+       elog(ERROR, "could not initialize PLy_CursorType");
 
 #if PY_MAJOR_VERSION >= 3
    PyModule_Create(&PLy_module);
index 7f8f6a33d26c09cac84ba01827126a59de0176c0..874b31e6df6bbe2a7cecaf857aa15e53e7162467 100644 (file)
@@ -105,3 +105,119 @@ else:
 $$ LANGUAGE plpythonu;
 
 SELECT result_nrows_test();
+
+
+-- cursor objects
+
+CREATE FUNCTION simple_cursor_test() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+does = 0
+for row in res:
+    if row['lname'] == 'doe':
+        does += 1
+return does
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION double_cursor_close() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+res.close()
+res.close()
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION cursor_fetch() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+assert len(res.fetch(3)) == 3
+assert len(res.fetch(3)) == 1
+assert len(res.fetch(3)) == 0
+assert len(res.fetch(3)) == 0
+try:
+    # use next() or __next__(), the method name changed in
+    # http://www.python.org/dev/peps/pep-3114/
+    try:
+        res.next()
+    except AttributeError:
+        res.__next__()
+except StopIteration:
+    pass
+else:
+    assert False, "StopIteration not raised"
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION cursor_mix_next_and_fetch() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users order by fname")
+assert len(res.fetch(2)) == 2
+
+item = None
+try:
+    item = res.next()
+except AttributeError:
+    item = res.__next__()
+assert item['fname'] == 'rick'
+
+assert len(res.fetch(2)) == 1
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION fetch_after_close() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+res.close()
+try:
+    res.fetch(1)
+except ValueError:
+    pass
+else:
+    assert False, "ValueError not raised"
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION next_after_close() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users")
+res.close()
+try:
+    try:
+        res.next()
+    except AttributeError:
+        res.__next__()
+except ValueError:
+    pass
+else:
+    assert False, "ValueError not raised"
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION cursor_fetch_next_empty() RETURNS int AS $$
+res = plpy.cursor("select fname, lname from users where false")
+assert len(res.fetch(1)) == 0
+try:
+    try:
+        res.next()
+    except AttributeError:
+        res.__next__()
+except StopIteration:
+    pass
+else:
+    assert False, "StopIteration not raised"
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION cursor_plan() RETURNS SETOF text AS $$
+plan = plpy.prepare(
+    "select fname, lname from users where fname like $1 || '%' order by fname",
+    ["text"])
+for row in plpy.cursor(plan, ["w"]):
+    yield row['fname']
+for row in plpy.cursor(plan, ["j"]):
+    yield row['fname']
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION cursor_plan_wrong_args() RETURNS SETOF text AS $$
+plan = plpy.prepare("select fname, lname from users where fname like $1 || '%'",
+                    ["text"])
+c = plpy.cursor(plan, ["a", "b"])
+$$ LANGUAGE plpythonu;
+
+SELECT simple_cursor_test();
+SELECT double_cursor_close();
+SELECT cursor_fetch();
+SELECT cursor_mix_next_and_fetch();
+SELECT fetch_after_close();
+SELECT next_after_close();
+SELECT cursor_fetch_next_empty();
+SELECT cursor_plan();
+SELECT cursor_plan_wrong_args();
index a19cad5104e92e7d9c76ea102760b09befa76ad2..9ad6377c7cd0c74c8cf710a73ced27a848015104 100644 (file)
@@ -242,3 +242,55 @@ SELECT pk_violation_inside_subtransaction();
 SELECT * FROM subtransaction_tbl;
 
 DROP TABLE subtransaction_tbl;
+
+-- cursor/subtransactions interactions
+
+CREATE FUNCTION cursor_in_subxact() RETURNS int AS $$
+with plpy.subtransaction():
+    cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)")
+    cur.fetch(10)
+fetched = cur.fetch(10);
+return int(fetched[5]["i"])
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION cursor_aborted_subxact() RETURNS int AS $$
+try:
+    with plpy.subtransaction():
+        cur = plpy.cursor("select * from generate_series(1, 20) as gen(i)")
+        cur.fetch(10);
+        plpy.execute("select no_such_function()")
+except plpy.SPIError:
+    fetched = cur.fetch(10)
+    return int(fetched[5]["i"])
+return 0 # not reached
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION cursor_plan_aborted_subxact() RETURNS int AS $$
+try:
+    with plpy.subtransaction():
+        plpy.execute('create temporary table tmp(i) '
+                     'as select generate_series(1, 10)')
+        plan = plpy.prepare("select i from tmp")
+        cur = plpy.cursor(plan)
+        plpy.execute("select no_such_function()")
+except plpy.SPIError:
+    fetched = cur.fetch(5)
+    return fetched[2]["i"]
+return 0 # not reached
+$$ LANGUAGE plpythonu;
+
+CREATE FUNCTION cursor_close_aborted_subxact() RETURNS boolean AS $$
+try:
+    with plpy.subtransaction():
+        cur = plpy.cursor('select 1')
+        plpy.execute("select no_such_function()")
+except plpy.SPIError:
+    cur.close()
+    return True
+return False # not reached
+$$ LANGUAGE plpythonu;
+
+SELECT cursor_in_subxact();
+SELECT cursor_aborted_subxact();
+SELECT cursor_plan_aborted_subxact();
+SELECT cursor_close_aborted_subxact();