]> granicus.if.org Git - postgresql/blob - src/backend/executor/execCurrent.c
Support UPDATE/DELETE WHERE CURRENT OF cursor_name, per SQL standard.
[postgresql] / src / backend / executor / execCurrent.c
1 /*-------------------------------------------------------------------------
2  *
3  * execCurrent.c
4  *        executor support for WHERE CURRENT OF cursor
5  *
6  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *      $PostgreSQL: pgsql/src/backend/executor/execCurrent.c,v 1.1 2007/06/11 01:16:22 tgl Exp $
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14
15 #include "executor/executor.h"
16 #include "utils/lsyscache.h"
17 #include "utils/portal.h"
18
19
20 static ScanState *search_plan_tree(PlanState *node, Oid table_oid);
21
22
23 /*
24  * execCurrentOf
25  *
26  * Given the name of a cursor and the OID of a table, determine which row
27  * of the table is currently being scanned by the cursor, and return its
28  * TID into *current_tid.
29  *
30  * Returns TRUE if a row was identified.  Returns FALSE if the cursor is valid
31  * for the table but is not currently scanning a row of the table (this is a
32  * legal situation in inheritance cases).  Raises error if cursor is not a
33  * valid updatable scan of the specified table.
34  */
35 bool
36 execCurrentOf(char *cursor_name, Oid table_oid,
37                           ItemPointer current_tid)
38 {
39         char       *table_name;
40         Portal          portal;
41         QueryDesc *queryDesc;
42         ScanState  *scanstate;
43         HeapTuple tup;
44
45         /* Fetch table name for possible use in error messages */
46         table_name = get_rel_name(table_oid);
47         if (table_name == NULL)
48                 elog(ERROR, "cache lookup failed for relation %u", table_oid);
49
50         /* Find the cursor's portal */
51         portal = GetPortalByName(cursor_name);
52         if (!PortalIsValid(portal))
53                 ereport(ERROR,
54                                 (errcode(ERRCODE_UNDEFINED_CURSOR),
55                                  errmsg("cursor \"%s\" does not exist", cursor_name)));
56
57         /*
58          * We have to watch out for non-SELECT queries as well as held cursors,
59          * both of which may have null queryDesc.
60          */
61         if (portal->strategy != PORTAL_ONE_SELECT)
62                 ereport(ERROR,
63                                 (errcode(ERRCODE_INVALID_CURSOR_STATE),
64                                  errmsg("cursor \"%s\" is not a SELECT query",
65                                                 cursor_name)));
66         queryDesc = PortalGetQueryDesc(portal);
67         if (queryDesc == NULL)
68                 ereport(ERROR,
69                                 (errcode(ERRCODE_INVALID_CURSOR_STATE),
70                                  errmsg("cursor \"%s\" is held from a previous transaction",
71                                                 cursor_name)));
72
73         /*
74          * Dig through the cursor's plan to find the scan node.  Fail if it's
75          * not there or buried underneath aggregation.
76          */
77         scanstate = search_plan_tree(ExecGetActivePlanTree(queryDesc),
78                                                                  table_oid);
79         if (!scanstate)
80                 ereport(ERROR,
81                                 (errcode(ERRCODE_INVALID_CURSOR_STATE),
82                                  errmsg("cursor \"%s\" is not a simply updatable scan of table \"%s\"",
83                                                 cursor_name, table_name)));
84
85         /*
86          * The cursor must have a current result row: per the SQL spec, it's
87          * an error if not.  We test this at the top level, rather than at
88          * the scan node level, because in inheritance cases any one table
89          * scan could easily not be on a row.  We want to return false, not
90          * raise error, if the passed-in table OID is for one of the inactive
91          * scans.
92          */
93         if (portal->atStart || portal->atEnd)
94                 ereport(ERROR,
95                                 (errcode(ERRCODE_INVALID_CURSOR_STATE),
96                                  errmsg("cursor \"%s\" is not positioned on a row",
97                                                 cursor_name)));
98
99         /* Now OK to return false if we found an inactive scan */
100         if (TupIsNull(scanstate->ss_ScanTupleSlot))
101                 return false;
102
103         tup = scanstate->ss_ScanTupleSlot->tts_tuple;
104         if (tup == NULL)
105                 elog(ERROR, "CURRENT OF applied to non-materialized tuple");
106         Assert(tup->t_tableOid == table_oid);
107
108         *current_tid = tup->t_self;
109
110         return true;
111 }
112
113 /*
114  * search_plan_tree
115  *
116  * Search through a PlanState tree for a scan node on the specified table.
117  * Return NULL if not found or multiple candidates.
118  */
119 static ScanState *
120 search_plan_tree(PlanState *node, Oid table_oid)
121 {
122         if (node == NULL)
123                 return NULL;
124         switch (nodeTag(node))
125         {
126                         /*
127                          * scan nodes can all be treated alike
128                          */
129                 case T_SeqScanState:
130                 case T_IndexScanState:
131                 case T_BitmapHeapScanState:
132                 case T_TidScanState:
133                 {
134                         ScanState *sstate = (ScanState *) node;
135
136                         if (RelationGetRelid(sstate->ss_currentRelation) == table_oid)
137                                 return sstate;
138                         break;
139                 }
140
141                         /*
142                          * For Append, we must look through the members; watch out for
143                          * multiple matches (possible if it was from UNION ALL)
144                          */
145                 case T_AppendState:
146                 {
147                         AppendState *astate = (AppendState *) node;
148                         ScanState *result = NULL;
149                         int             i;
150
151                         for (i = 0; i < astate->as_nplans; i++)
152                         {
153                                 ScanState *elem = search_plan_tree(astate->appendplans[i],
154                                                                                                    table_oid);
155
156                                 if (!elem)
157                                         continue;
158                                 if (result)
159                                         return NULL;                            /* multiple matches */
160                                 result = elem;
161                         }
162                         return result;
163                 }
164
165                         /*
166                          * Result and Limit can be descended through (these are safe
167                          * because they always return their input's current row)
168                          */
169                 case T_ResultState:
170                 case T_LimitState:
171                         return search_plan_tree(node->lefttree, table_oid);
172
173                         /*
174                          * SubqueryScan too, but it keeps the child in a different place
175                          */
176                 case T_SubqueryScanState:
177                         return search_plan_tree(((SubqueryScanState *) node)->subplan,
178                                                                         table_oid);
179
180                 default:
181                         /* Otherwise, assume we can't descend through it */
182                         break;
183         }
184         return NULL;
185 }