From 6c4982851a7ce1585fb89adc2747c8f848183d1b Mon Sep 17 00:00:00 2001 From: "Marc G. Fournier" Date: Sun, 30 Aug 1998 21:05:27 +0000 Subject: [PATCH] From: Massimo Dal Zotto --- src/backend/commands/async.c | 325 ++++++++++++++++++++++++----------- src/backend/tcop/postgres.c | 55 ++++-- src/man/create_sequence.l | 19 +- src/man/listen.l | 13 +- 4 files changed, 295 insertions(+), 117 deletions(-) diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 75d0e9d4a0..03e5a4ca04 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -7,7 +7,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.37 1998/08/19 02:01:39 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/commands/async.c,v 1.38 1998/08/30 21:04:43 scrappy Exp $ * *------------------------------------------------------------------------- */ @@ -34,29 +34,7 @@ * -- jw, 12/28/93 * */ -/* - * The following is the old model which does not work. - */ -/* - * Model is: - * 1. Multiple backends on same machine. - * - * 2. Query on one backend sends stuff over an asynchronous portal by - * appending to a relation, and then doing an async. notification - * (which takes place after commit) to all listeners on this relation. - * - * 3. Async. notification results in all backends listening on relation - * to be woken up, by a process signal kill(SIGUSR2), with name of relation - * passed in shared memory. - * - * 4. Each backend notifies its respective frontend over the comm - * channel using the out-of-band channel. - * - * 5. Each frontend receives this notification and processes accordingly. - * - * #4,#5 are changing soon with pending rewrite of portal/protocol. - * - */ + #include #include #include @@ -82,17 +60,28 @@ #include "tcop/dest.h" #include "utils/mcxt.h" #include "utils/syscache.h" +#include +#include + +#define NotifyUnlock pg_options[OPT_NOTIFYUNLOCK] +#define NotifyHack pg_options[OPT_NOTIFYHACK] + +extern TransactionState CurrentTransactionState; +extern CommandDest whereToSendOutput; + +GlobalMemory notifyContext = NULL; static int notifyFrontEndPending = 0; static int notifyIssued = 0; static Dllist *pendingNotifies = NULL; - static int AsyncExistsPendingNotify(char *); static void ClearPendingNotify(void); static void Async_NotifyFrontEnd(void); +static void Async_NotifyFrontEnd_Aux(void); void Async_Unlisten(char *relname, int pid); static void Async_UnlistenOnExit(int code, char *relname); +static void Async_UnlistenAll(void); /* *-------------------------------------------------------------- @@ -116,33 +105,36 @@ static void Async_UnlistenOnExit(int code, char *relname); void Async_NotifyHandler(SIGNAL_ARGS) { - extern TransactionState CurrentTransactionState; + TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler"); if ((CurrentTransactionState->state == TRANS_DEFAULT) && (CurrentTransactionState->blockState == TRANS_DEFAULT)) { - -#ifdef ASYNC_DEBUG - elog(DEBUG, "Waking up sleeping backend process"); -#endif + TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: " + "waking up sleeping backend process"); + PS_SET_STATUS("async_notify"); Async_NotifyFrontEnd(); - + PS_SET_STATUS("idle"); } else { -#ifdef ASYNC_DEBUG - elog(DEBUG, "Process is in the middle of another transaction, state = %d, block state = %d", - CurrentTransactionState->state, - CurrentTransactionState->blockState); -#endif + TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: " + "process in middle of transaction, state=%d, blockstate=%d", + CurrentTransactionState->state, + CurrentTransactionState->blockState); notifyFrontEndPending = 1; + TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: notify frontend pending"); } + + TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: done"); } /* *-------------------------------------------------------------- * Async_Notify -- * + * This is executed by the SQL notify command. + * * Adds the relation to the list of pending notifies. * All notification happens at end of commit. * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -151,7 +143,6 @@ Async_NotifyHandler(SIGNAL_ARGS) * then each backend notifies its corresponding front end at * the end of commit. * - * This correspond to 'notify ' command * -- jw, 12/28/93 * * Results: @@ -180,9 +171,7 @@ Async_Notify(char *relname) char *notifyName; -#ifdef ASYNC_DEBUG - elog(DEBUG, "Async_Notify: %s", relname); -#endif + TPRINTF(TRACE_NOTIFY, "Async_Notify: %s", relname); if (!pendingNotifies) pendingNotifies = DLNewList(); @@ -217,18 +206,32 @@ Async_Notify(char *relname) { rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl); heap_replace(lRel, &lTuple->t_ctid, rTuple); + /* notify is really issued only if a tuple has been changed */ + notifyIssued = 1; } } heap_endscan(sRel); - RelationUnsetLockForWrite(lRel); + + /* + * Note: if the write lock is unset we can get multiple tuples + * with same oid if other backends notify the same relation. + * Use this option at your own risk. + */ + if (NotifyUnlock) { + RelationUnsetLockForWrite(lRel); + } + heap_close(lRel); - notifyIssued = 1; + + TPRINTF(TRACE_NOTIFY, "Async_Notify: done %s", relname); } /* *-------------------------------------------------------------- * Async_NotifyAtCommit -- * + * This is called at transaction commit. + * * Signal our corresponding frontend process on relations that * were notified. Signal all other backend process that * are listening also. @@ -265,14 +268,12 @@ Async_NotifyAtCommit() if ((CurrentTransactionState->state == TRANS_DEFAULT) && (CurrentTransactionState->blockState == TRANS_DEFAULT)) { - if (notifyIssued) - { /* 'notify ' issued by us */ + { + /* 'notify ' issued by us */ notifyIssued = 0; StartTransactionCommand(); -#ifdef ASYNC_DEBUG - elog(DEBUG, "Async_NotifyAtCommit."); -#endif + TPRINTF(TRACE_NOTIFY, "Async_NotifyAtCommit"); ScanKeyEntryInitialize(&key, 0, Anum_pg_listener_notify, F_INT4EQ, @@ -294,16 +295,15 @@ Async_NotifyAtCommit() if (MyProcPid == DatumGetInt32(d)) { -#ifdef ASYNC_DEBUG - elog(DEBUG, "Notifying self, setting notifyFronEndPending to 1"); -#endif notifyFrontEndPending = 1; + TPRINTF(TRACE_NOTIFY, + "Async_NotifyAtCommit: notifying self"); } else { -#ifdef ASYNC_DEBUG - elog(DEBUG, "Notifying others"); -#endif + TPRINTF(TRACE_NOTIFY, + "Async_NotifyAtCommit: notifying pid %d", + DatumGetInt32(d)); #ifdef HAVE_KILL if (kill(DatumGetInt32(d), SIGUSR2) < 0) { @@ -315,19 +315,35 @@ Async_NotifyAtCommit() } } heap_endscan(sRel); - RelationUnsetLockForWrite(lRel); heap_close(lRel); + /* + * Notify the frontend inside the current transaction while + * we still have a valid write lock on pg_listeners. This + * avoid waiting until all other backends have finished + * with pg_listener. + */ + if (notifyFrontEndPending) { + /* The aux version is called inside transaction */ + Async_NotifyFrontEnd_Aux(); + } + + TPRINTF(TRACE_NOTIFY, "Async_NotifyAtCommit: done"); CommitTransactionCommand(); - ClearPendingNotify(); } - - if (notifyFrontEndPending) - { /* we need to notify the frontend of all - * pending notifies. */ - notifyFrontEndPending = 1; - Async_NotifyFrontEnd(); + else + { + /* + * No notifies issued by us. If notifyFrontEndPending has been set + * by Async_NotifyHandler notify the frontend of pending notifies + * from other backends. + */ + if (notifyFrontEndPending) { + Async_NotifyFrontEnd(); + } } + + ClearPendingNotify(); } } @@ -335,6 +351,8 @@ Async_NotifyAtCommit() *-------------------------------------------------------------- * Async_NotifyAtAbort -- * + * This is called at transaction commit. + * * Gets rid of pending notifies. List elements are automatically * freed through memory context. * @@ -350,20 +368,19 @@ Async_NotifyAtCommit() void Async_NotifyAtAbort() { - extern TransactionState CurrentTransactionState; - - if (notifyIssued) + if (pendingNotifies) { ClearPendingNotify(); - notifyIssued = 0; - if (pendingNotifies) DLFreeList(pendingNotifies); + } pendingNotifies = DLNewList(); + notifyIssued = 0; if ((CurrentTransactionState->state == TRANS_DEFAULT) && (CurrentTransactionState->blockState == TRANS_DEFAULT)) { + /* don't forget to notify front end */ if (notifyFrontEndPending) - { /* don't forget to notify front end */ + { Async_NotifyFrontEnd(); } } @@ -373,11 +390,11 @@ Async_NotifyAtAbort() *-------------------------------------------------------------- * Async_Listen -- * + * This is executed by the SQL listen command. + * * Register a backend (identified by its Unix PID) as listening * on the specified relation. * - * This corresponds to the 'listen ' command in SQL - * * One listener per relation, pg_listener relation is keyed * on (relname,pid) to provide multiple listeners in future. * @@ -406,9 +423,13 @@ Async_Listen(char *relname, int pid) char *relnamei; TupleDesc tupDesc; -#ifdef ASYNC_DEBUG - elog(DEBUG, "Async_Listen: %s", relname); -#endif + if (whereToSendOutput != Remote) { + elog(NOTICE, "Async_Listen: " + "listen not available on interactive sessions"); + return; + } + + TPRINTF(TRACE_NOTIFY, "Async_Listen: %s", relname); for (i = 0; i < Natts_pg_listener; i++) { nulls[i] = ' '; @@ -438,6 +459,10 @@ Async_Listen(char *relname, int pid) if (pid == MyProcPid) alreadyListener = 1; } + if (alreadyListener) { + /* No need to scan the rest of the table */ + break; + } } heap_endscan(scan); @@ -445,15 +470,14 @@ Async_Listen(char *relname, int pid) { elog(NOTICE, "Async_Listen: We are already listening on %s", relname); + RelationUnsetLockForWrite(lDesc); + heap_close(lDesc); return; } tupDesc = lDesc->rd_att; - newtup = heap_formtuple(tupDesc, - values, - nulls); + newtup = heap_formtuple(tupDesc, values, nulls); heap_insert(lDesc, newtup); - pfree(newtup); /* @@ -477,12 +501,11 @@ Async_Listen(char *relname, int pid) *-------------------------------------------------------------- * Async_Unlisten -- * + * This is executed by the SQL unlisten command. + * * Remove the backend from the list of listening backends * for the specified relation. * - * This would correspond to the 'unlisten ' - * command, but there isn't one yet. - * * Results: * pg_listeners is updated. * @@ -497,20 +520,81 @@ Async_Unlisten(char *relname, int pid) Relation lDesc; HeapTuple lTuple; - lTuple = SearchSysCacheTuple(LISTENREL, - PointerGetDatum(relname), + /* Handle specially the `unlisten "*"' command */ + if ((!relname) || (*relname == '\0') || (strcmp(relname,"*")==0)) { + Async_UnlistenAll(); + return; + } + + TPRINTF(TRACE_NOTIFY, "Async_Unlisten %s", relname); + + lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname), Int32GetDatum(pid), 0, 0); - lDesc = heap_openr(ListenerRelationName); - RelationSetLockForWrite(lDesc); - if (lTuple != NULL) + { + lDesc = heap_openr(ListenerRelationName); + RelationSetLockForWrite(lDesc); heap_delete(lDesc, &lTuple->t_ctid); - - RelationUnsetLockForWrite(lDesc); - heap_close(lDesc); + RelationUnsetLockForWrite(lDesc); + heap_close(lDesc); + } } +/* + *-------------------------------------------------------------- + * Async_UnlistenAll -- + * + * Unlisten all relations for this backend. + * + * Results: + * pg_listeners is updated. + * + * Side effects: + * XXX + * + *-------------------------------------------------------------- + */ +static void +Async_UnlistenAll() +{ + HeapTuple lTuple; + Relation lRel; + HeapScanDesc sRel; + TupleDesc tdesc; + ScanKeyData key[1]; + + TPRINTF(TRACE_NOTIFY, "Async_UnlistenAll"); + ScanKeyEntryInitialize(&key[0], 0, + Anum_pg_listener_pid, + F_INT4EQ, + Int32GetDatum(MyProcPid)); + lRel = heap_openr(ListenerRelationName); + RelationSetLockForWrite(lRel); + tdesc = RelationGetTupleDescriptor(lRel); + sRel = heap_beginscan(lRel, 0, SnapshotNow, 1, key); + + while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0))) + { + heap_delete(lRel, &lTuple->t_ctid); + } + heap_endscan(sRel); + RelationUnsetLockForWrite(lRel); + heap_close(lRel); + TPRINTF(TRACE_NOTIFY, "Async_UnlistenAll: done"); +} + +/* + * -------------------------------------------------------------- + * Async_UnlistenOnExit -- + * + * This is called at backend exit for each registered listen. + * + * Results: + * XXX + * + * -------------------------------------------------------------- + */ static void Async_UnlistenOnExit(int code, /* from exitpg */ char *relname) @@ -522,6 +606,25 @@ Async_UnlistenOnExit(int code, /* from exitpg */ * -------------------------------------------------------------- * Async_NotifyFrontEnd -- * + * This is called outside transactions. The real work is done + * by Async_NotifyFrontEnd_Aux(). + * + * -------------------------------------------------------------- + */ +static void +Async_NotifyFrontEnd() +{ + StartTransactionCommand(); + Async_NotifyFrontEnd_Aux(); + CommitTransactionCommand(); +} + +/* + * -------------------------------------------------------------- + * Async_NotifyFrontEnd_Aux -- + * + * This must be called inside a transaction block. + * * Perform an asynchronous notification to front end over * portal comm channel. The name of the relation which contains the * data is sent to the front end. @@ -534,12 +637,9 @@ Async_UnlistenOnExit(int code, /* from exitpg */ * * -------------------------------------------------------------- */ -GlobalMemory notifyContext = NULL; - static void -Async_NotifyFrontEnd() +Async_NotifyFrontEnd_Aux() { - extern CommandDest whereToSendOutput; HeapTuple lTuple, rTuple; Relation lRel; @@ -552,12 +652,15 @@ Async_NotifyFrontEnd() nulls[3]; bool isnull; - notifyFrontEndPending = 0; +#define MAX_DONE 64 -#ifdef ASYNC_DEBUG - elog(DEBUG, "Async_NotifyFrontEnd: notifying front end."); -#endif + char *done[MAX_DONE]; + int ndone = 0; + int i; + + notifyFrontEndPending = 0; + TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd"); StartTransactionCommand(); ScanKeyEntryInitialize(&key[0], 0, Anum_pg_listener_notify, @@ -580,11 +683,35 @@ Async_NotifyFrontEnd() while (HeapTupleIsValid(lTuple = heap_getnext(sRel, 0))) { - d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc, &isnull); + d = heap_getattr(lTuple, Anum_pg_listener_relname, tdesc, + &isnull); + + /* + * This hack deletes duplicate tuples which can be left + * in the table if the NotifyUnlock option is set. + * I'm further investigating this. -- dz + */ + if (NotifyHack) { + for (i=0; idata, done[i]) == 0) { + TPRINTF(TRACE_NOTIFY, + "Async_NotifyFrontEnd: duplicate %s", + DatumGetName(d)->data); + heap_delete(lRel, &lTuple->t_ctid); + continue; + } + } + if (ndone < MAX_DONE) { + done[ndone++] = pstrdup(DatumGetName(d)->data); + } + } + rTuple = heap_modifytuple(lTuple, lRel, value, nulls, repl); heap_replace(lRel, &lTuple->t_ctid, rTuple); /* notifying the front end */ + TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd: notifying %s", + DatumGetName(d)->data); if (whereToSendOutput == Remote) { @@ -593,12 +720,12 @@ Async_NotifyFrontEnd() pq_putstr(DatumGetName(d)->data); pq_flush(); } - else - elog(NOTICE, "Async_NotifyFrontEnd: no asynchronous notification to frontend on interactive sessions"); } heap_endscan(sRel); + RelationUnsetLockForWrite(lRel); heap_close(lRel); - CommitTransactionCommand(); + + TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd: done"); } static int diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 5ad49c8ac4..6366d18717 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -7,7 +7,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/tcop/postgres.c,v 1.86 1998/08/25 21:34:04 scrappy Exp $ + * $Header: /cvsroot/pgsql/src/backend/tcop/postgres.c,v 1.87 1998/08/30 21:05:27 scrappy Exp $ * * NOTES * this is the "main" module of the postgres backend and @@ -443,11 +443,41 @@ pg_parse_and_plan(char *query_string, /* string to execute */ querytree = querytree_list->qtrees[i]; - if (DebugPrintQuery == true) + if (DebugPrintQuery) { - printf("\n---- \tquery is:\n%s\n", query_string); - printf("\n"); - fflush(stdout); + if (DebugPrintQuery > 3) { + /* Print the query string as is if query debug level > 3 */ + TPRINTF(TRACE_QUERY, "query: %s",query_string); + } else { + /* Print condensed query string to fit in one log line */ + char buff[8192+1]; + char c, + *s, + *d; + int n, + is_space=1; + + for (s=query_string,d=buff,n=0; (c=*s) && (n<8192); s++) { + switch (c) { + case '\r': + case '\n': + case '\t': + c = ' '; + /* fall through */ + case ' ': + if (is_space) continue; + is_space = 1; + break; + default: + is_space = 0; + break; + } + *d++ = c; + n++; + } + *d = '\0'; + TPRINTF(TRACE_QUERY, "query: %s",buff); + } } /* don't rewrite utilites */ @@ -457,11 +487,10 @@ pg_parse_and_plan(char *query_string, /* string to execute */ continue; } - if (DebugPrintParse == true) + if (DebugPrintParse) { - printf("\n---- \tparser outputs :\n"); + TPRINTF(TRACE_PARSE, "parser outputs:"); nodeDisplay(querytree); - printf("\n"); } /* rewrite queries (retrieve, append, delete, replace) */ @@ -906,9 +935,11 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[]) char firstchar; char parser_input[MAX_PARSE_BUFFER]; char *userName; - char *remote_info; - char *remote_host; - unsigned short remote_port = 0; + + /* Used if verbose is set, must be initialized */ + char *remote_info = "interactive"; + char *remote_host = ""; + unsigned short remote_port = 0; char *DBDate = NULL; extern int optind; @@ -1490,7 +1521,7 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[]) if (!IsUnderPostmaster) { puts("\nPOSTGRES backend interactive interface"); - puts("$Revision: 1.86 $ $Date: 1998/08/25 21:34:04 $"); + puts("$Revision: 1.87 $ $Date: 1998/08/30 21:05:27 $"); } /* ---------------- diff --git a/src/man/create_sequence.l b/src/man/create_sequence.l index 0a695fa6bf..588e70f394 100644 --- a/src/man/create_sequence.l +++ b/src/man/create_sequence.l @@ -1,6 +1,6 @@ .\" This is -*-nroff-*- .\" XXX standard disclaimer belongs here.... -.\" $Header: /cvsroot/pgsql/src/man/Attic/create_sequence.l,v 1.5 1998/07/14 01:45:25 momjian Exp $ +.\" $Header: /cvsroot/pgsql/src/man/Attic/create_sequence.l,v 1.6 1998/08/30 21:03:19 scrappy Exp $ .TH "CREATE SEQUENCE" SQL 07/13/98 PostgreSQL PostgreSQL .SH NAME create sequence - create a new sequence number generator @@ -82,6 +82,14 @@ given sequence in the current backend session. Also beware that it does not give the last number ever allocated, only the last one allocated by this backend. .PP +The function +.BR setval +('sequence_name', value) +may be used to set the current value of the specified sequence. +The next call to +.BR nextval +will return the given value + the sequence increment. +.PP Use a query like .nf SELECT * FROM ; @@ -134,6 +142,15 @@ select nextval ('seq'); -- Use sequence in insert -- insert into table _table_ values (nextval ('seq'),...); +.nf +-- +-- Set the sequence value after a copy in +-- +create function table_id_max() returns int4 + as 'select max(id) from _table_' + language 'sql'; +copy _table_ from 'input_file'; +select setval('seq', table_id_max()); .fi .SH "SEE ALSO" drop_sequence(l). diff --git a/src/man/listen.l b/src/man/listen.l index 49801408f7..165fe7ab02 100644 --- a/src/man/listen.l +++ b/src/man/listen.l @@ -1,6 +1,6 @@ .\" This is -*-nroff-*- .\" XXX standard disclaimer belongs here.... -.\" $Header: /cvsroot/pgsql/src/man/Attic/listen.l,v 1.7 1998/07/09 03:29:09 scrappy Exp $ +.\" $Header: /cvsroot/pgsql/src/man/Attic/listen.l,v 1.8 1998/08/30 21:03:20 scrappy Exp $ .TH "LISTEN" SQL 03/12/94 PostgreSQL PostgreSQL .SH NAME listen - listen for notification on a relation @@ -27,16 +27,19 @@ in order to find out the name of the class to which a given notification corresponds. If this code is not included in the application, the event notification will be queued and never be processed. +.PP +Note that +.IR class_name +needs not to be a valid class name but can be any ascii string up to 32 +characters long. It must however be eclosed in double-quotes if it is +not valid as class name. .SH "SEE ALSO" create_rule(l), notify(l), select(l), +unlisten(l), libpq. .SH BUGS -There is no way to un-\c -.BR listen -except to drop the connection (i.e., restart the backend server). -.PP The .IR psql(1) command does not poll for asynchronous events. -- 2.40.0