*/
#include "postgres.h"
+#include <limits.h>
+
#include "libpq/auth.h"
#include "port.h"
#include "utils/guc.h"
*/
#include "postgres.h"
+#include <limits.h>
+
#include "btree_gist.h"
#include "btree_utils_num.h"
#include "utils/builtins.h"
#include "postgres.h"
+#include <float.h>
+
#include "access/gist_private.h"
#include "access/hash.h"
#include "access/htup_details.h"
#include "postgres.h"
#include <ctype.h>
+#include <math.h>
#include <time.h>
#include <fcntl.h>
#include <sys/stat.h>
*/
#include "postgres.h"
+#include <limits.h>
+
#include "access/xact.h"
#include "catalog/pg_type.h"
#include "commands/createas.h"
*/
#include "postgres.h"
+#include <math.h>
+
#include "access/relscan.h"
#include "access/transam.h"
#include "executor/execdebug.h"
static int64 throttling_counter;
/* The minimum time required to transfer throttling_sample bytes. */
-static int64 elapsed_min_unit;
+static TimeOffset elapsed_min_unit;
/* The last check of the transfer rate. */
-static int64 throttled_last;
+static TimestampTz throttled_last;
/*
* The contents of these directories are removed or recreated during server
throttling_counter = 0;
/* The 'real data' starts now (header was ignored). */
- throttled_last = GetCurrentIntegerTimestamp();
+ throttled_last = GetCurrentTimestamp();
}
else
{
static void
throttle(size_t increment)
{
- int64 elapsed,
+ TimeOffset elapsed,
elapsed_min,
sleep;
int wait_result;
return;
/* Time elapsed since the last measurement (and possible wake up). */
- elapsed = GetCurrentIntegerTimestamp() - throttled_last;
+ elapsed = GetCurrentTimestamp() - throttled_last;
/* How much should have elapsed at minimum? */
elapsed_min = elapsed_min_unit * (throttling_counter / throttling_sample);
sleep = elapsed_min - elapsed;
* Time interval for the remaining amount and possible next increments
* starts now.
*/
- throttled_last = GetCurrentIntegerTimestamp();
+ throttled_last = GetCurrentTimestamp();
}
{
XLogRecPtr start_lsn;
XLogRecPtr end_lsn;
- TimestampTz send_time;
+ TimestampTz send_time;
start_lsn = pq_getmsgint64(&s);
end_lsn = pq_getmsgint64(&s);
- send_time =
- IntegerTimestampToTimestampTz(pq_getmsgint64(&s));
+ send_time = pq_getmsgint64(&s);
if (last_received < start_lsn)
last_received = start_lsn;
}
else if (c == 'k')
{
- XLogRecPtr endpos;
- TimestampTz timestamp;
- bool reply_requested;
+ XLogRecPtr endpos;
+ TimestampTz timestamp;
+ bool reply_requested;
endpos = pq_getmsgint64(&s);
- timestamp =
- IntegerTimestampToTimestampTz(pq_getmsgint64(&s));
+ timestamp = pq_getmsgint64(&s);
reply_requested = pq_getmsgbyte(&s);
send_feedback(endpos, reply_requested, false);
/* read the fields */
dataStart = pq_getmsgint64(&incoming_message);
walEnd = pq_getmsgint64(&incoming_message);
- sendTime = IntegerTimestampToTimestampTz(
- pq_getmsgint64(&incoming_message));
+ sendTime = pq_getmsgint64(&incoming_message);
ProcessWalSndrMessage(walEnd, sendTime);
buf += hdrlen;
/* read the fields */
walEnd = pq_getmsgint64(&incoming_message);
- sendTime = IntegerTimestampToTimestampTz(
- pq_getmsgint64(&incoming_message));
+ sendTime = pq_getmsgint64(&incoming_message);
replyRequested = pq_getmsgbyte(&incoming_message);
ProcessWalSndrMessage(walEnd, sendTime);
pq_sendint64(&reply_message, writePtr);
pq_sendint64(&reply_message, flushPtr);
pq_sendint64(&reply_message, applyPtr);
- pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
+ pq_sendint64(&reply_message, GetCurrentTimestamp());
pq_sendbyte(&reply_message, requestReply ? 1 : 0);
/* Send it */
/* Construct the message and send it. */
resetStringInfo(&reply_message);
pq_sendbyte(&reply_message, 'h');
- pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
+ pq_sendint64(&reply_message, GetCurrentTimestamp());
pq_sendint(&reply_message, xmin, 4);
pq_sendint(&reply_message, nextEpoch, 4);
walrcv_send(wrconn, reply_message.data, reply_message.len);
dest = CreateDestReceiver(DestRemoteSimple);
MemSet(nulls, false, sizeof(nulls));
- /*
+ /*----------
* Need a tuple descriptor representing four columns:
* - first field: the slot name
* - second field: LSN at which we became consistent
* - third field: exported snapshot's name
* - fourth field: output plugin
+ *----------
*/
tupdesc = CreateTemplateTupleDesc(4, false);
TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
* several releases by streaming physical replication.
*/
resetStringInfo(&tmpbuf);
- pq_sendint64(&tmpbuf, GetCurrentIntegerTimestamp());
+ pq_sendint64(&tmpbuf, GetCurrentTimestamp());
memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
tmpbuf.data, sizeof(int64));
* Fill the send timestamp last, so that it is taken as late as possible.
*/
resetStringInfo(&tmpbuf);
- pq_sendint64(&tmpbuf, GetCurrentIntegerTimestamp());
+ pq_sendint64(&tmpbuf, GetCurrentTimestamp());
memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
tmpbuf.data, sizeof(int64));
resetStringInfo(&output_message);
pq_sendbyte(&output_message, 'k');
pq_sendint64(&output_message, sentPtr);
- pq_sendint64(&output_message, GetCurrentIntegerTimestamp());
+ pq_sendint64(&output_message, GetCurrentTimestamp());
pq_sendbyte(&output_message, requestReply ? 1 : 0);
/* ... and send it wrapped in CopyData */
#include "postgres.h"
+#include <limits.h>
+
#include "access/xact.h"
#include "commands/prepare.h"
#include "executor/tstoreReceiver.h"
#include <ctype.h>
#include <float.h>
#include <limits.h>
+#include <math.h>
#include <time.h>
#include <sys/time.h>
*/
#include "postgres.h"
+#include <limits.h>
+
#include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/catalog.h"
* only allowed to move forward.
*/
slock_t mutex_current; /* protect current_timestamp */
- int64 current_timestamp; /* latest snapshot timestamp */
+ TimestampTz current_timestamp; /* latest snapshot timestamp */
slock_t mutex_latest_xmin; /* protect latest_xmin and
* next_map_update */
TransactionId latest_xmin; /* latest snapshot xmin */
- int64 next_map_update; /* latest snapshot valid up to */
+ TimestampTz next_map_update; /* latest snapshot valid up to */
slock_t mutex_threshold; /* protect threshold fields */
- int64 threshold_timestamp; /* earlier snapshot is old */
+ TimestampTz threshold_timestamp; /* earlier snapshot is old */
TransactionId threshold_xid; /* earlier xid may be gone */
/*
* Persistence is not needed.
*/
int head_offset; /* subscript of oldest tracked time */
- int64 head_timestamp; /* time corresponding to head xid */
+ TimestampTz head_timestamp; /* time corresponding to head xid */
int count_used; /* how many slots are in use */
TransactionId xid_by_minute[FLEXIBLE_ARRAY_MEMBER];
} OldSnapshotControlData;
static List *exportedSnapshots = NIL;
/* Prototypes for local functions */
-static int64 AlignTimestampToMinuteBoundary(int64 ts);
+static TimestampTz AlignTimestampToMinuteBoundary(TimestampTz ts);
static Snapshot CopySnapshot(Snapshot snapshot);
static void FreeSnapshot(Snapshot snapshot);
static void SnapshotResetXmin(void);
bool suboverflowed;
bool takenDuringRecovery;
CommandId curcid;
- int64 whenTaken;
+ TimestampTz whenTaken;
XLogRecPtr lsn;
} SerializedSnapshotData;
/*
- * Return an int64 timestamp which is exactly on a minute boundary.
+ * Return a timestamp that is exactly on a minute boundary.
*
* If the argument is already aligned, return that value, otherwise move to
* the next minute boundary following the given time.
*/
-static int64
-AlignTimestampToMinuteBoundary(int64 ts)
+static TimestampTz
+AlignTimestampToMinuteBoundary(TimestampTz ts)
{
- int64 retval = ts + (USECS_PER_MINUTE - 1);
+ TimestampTz retval = ts + (USECS_PER_MINUTE - 1);
return retval - (retval % USECS_PER_MINUTE);
}
/*
- * Get current timestamp for snapshots as int64 that never moves backward.
+ * Get current timestamp for snapshots
+ *
+ * This is basically GetCurrentTimestamp(), but with a guarantee that
+ * the result never moves backward.
*/
-int64
+TimestampTz
GetSnapshotCurrentTimestamp(void)
{
- int64 now = GetCurrentIntegerTimestamp();
+ TimestampTz now = GetCurrentTimestamp();
/*
* Don't let time move backward; if it hasn't advanced, use the old value.
* XXX: So far, we never trust that a 64-bit value can be read atomically; if
* that ever changes, we could get rid of the spinlock here.
*/
-int64
+TimestampTz
GetOldSnapshotThresholdTimestamp(void)
{
- int64 threshold_timestamp;
+ TimestampTz threshold_timestamp;
SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
threshold_timestamp = oldSnapshotControl->threshold_timestamp;
}
static void
-SetOldSnapshotThresholdTimestamp(int64 ts, TransactionId xlimit)
+SetOldSnapshotThresholdTimestamp(TimestampTz ts, TransactionId xlimit)
{
SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
oldSnapshotControl->threshold_timestamp = ts;
&& old_snapshot_threshold >= 0
&& RelationAllowsEarlyPruning(relation))
{
- int64 ts = GetSnapshotCurrentTimestamp();
+ TimestampTz ts = GetSnapshotCurrentTimestamp();
TransactionId xlimit = recentXmin;
TransactionId latest_xmin;
- int64 update_ts;
+ TimestampTz update_ts;
bool same_ts_as_threshold = false;
SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin);
* Take care of the circular buffer that maps time to xid.
*/
void
-MaintainOldSnapshotTimeMapping(int64 whenTaken, TransactionId xmin)
+MaintainOldSnapshotTimeMapping(TimestampTz whenTaken, TransactionId xmin)
{
- int64 ts;
+ TimestampTz ts;
TransactionId latest_xmin;
- int64 update_ts;
+ TimestampTz update_ts;
bool map_update_required = false;
/* Never call this function when old snapshot checking is disabled. */
static volatile sig_atomic_t time_to_abort = false;
static volatile sig_atomic_t output_reopen = false;
static bool output_isfile;
-static int64 output_last_fsync = -1;
+static TimestampTz output_last_fsync = -1;
static bool output_needs_fsync = false;
static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
* Send a Standby Status Update message to server.
*/
static bool
-sendFeedback(PGconn *conn, int64 now, bool force, bool replyRequested)
+sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
{
static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
}
static bool
-OutputFsync(int64 now)
+OutputFsync(TimestampTz now)
{
output_last_fsync = now;
{
PGresult *res;
char *copybuf = NULL;
- int64 last_status = -1;
+ TimestampTz last_status = -1;
int i;
PQExpBuffer query;
int r;
int bytes_left;
int bytes_written;
- int64 now;
+ TimestampTz now;
int hdr_len;
XLogRecPtr cur_record_lsn = InvalidXLogRecPtr;
* response back to the client.
*/
fd_set input_mask;
- int64 message_target = 0;
- int64 fsync_target = 0;
+ TimestampTz message_target = 0;
+ TimestampTz fsync_target = 0;
struct timeval timeout;
struct timeval *timeoutptr = NULL;
/* Now compute when to wakeup. */
if (message_target > 0 || fsync_target > 0)
{
- int64 targettime;
+ TimestampTz targettime;
long secs;
int usecs;
if (outfd != -1 && strcmp(outfile, "-") != 0)
{
- int64 t = feGetCurrentTimestamp();
+ TimestampTz t = feGetCurrentTimestamp();
/* no need to jump to error on failure here, we're finishing anyway */
OutputFsync(t);
static int CopyStreamPoll(PGconn *conn, long timeout_ms);
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
- int len, XLogRecPtr blockpos, int64 *last_status);
+ int len, XLogRecPtr blockpos, TimestampTz *last_status);
static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
XLogRecPtr *blockpos);
static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
XLogRecPtr blockpos, XLogRecPtr *stoppos);
static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
XLogRecPtr *stoppos);
-static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
- int64 last_status);
+static long CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
+ TimestampTz last_status);
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
uint32 *timeline);
* Send a Standby Status Update message to server.
*/
static bool
-sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
+sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyRequested)
{
char replybuf[1 + 8 + 8 + 8 + 8 + 1];
int len = 0;
XLogRecPtr *stoppos)
{
char *copybuf = NULL;
- int64 last_status = -1;
+ TimestampTz last_status = -1;
XLogRecPtr blockpos = stream->startpos;
still_sending = true;
while (1)
{
int r;
- int64 now;
+ TimestampTz now;
long sleeptime;
/*
*/
static bool
ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
- XLogRecPtr blockpos, int64 *last_status)
+ XLogRecPtr blockpos, TimestampTz *last_status)
{
int pos;
bool replyRequested;
- int64 now;
+ TimestampTz now;
/*
* Parse the keepalive message, enclosed in the CopyData message. We just
* Calculate how long send/receive loops should sleep
*/
static long
-CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
- int64 last_status)
+CalculateCopyStreamSleeptime(TimestampTz now, int standby_message_timeout,
+ TimestampTz last_status)
{
- int64 status_targettime = 0;
+ TimestampTz status_targettime = 0;
long sleeptime;
if (standby_message_timeout && still_sending)
/*
* Frontend version of GetCurrentTimestamp(), since we are not linked with
- * backend code. The replication protocol always uses integer timestamps,
- * regardless of the server setting.
+ * backend code.
*/
-int64
+TimestampTz
feGetCurrentTimestamp(void)
{
- int64 result;
+ TimestampTz result;
struct timeval tp;
gettimeofday(&tp, NULL);
- result = (int64) tp.tv_sec -
+ result = (TimestampTz) tp.tv_sec -
((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
-
result = (result * USECS_PER_SEC) + tp.tv_usec;
return result;
* backend code.
*/
void
-feTimestampDifference(int64 start_time, int64 stop_time,
+feTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
long *secs, int *microsecs)
{
- int64 diff = stop_time - start_time;
+ TimestampTz diff = stop_time - start_time;
if (diff <= 0)
{
* linked with backend code.
*/
bool
-feTimestampDifferenceExceeds(int64 start_time,
- int64 stop_time,
+feTimestampDifferenceExceeds(TimestampTz start_time,
+ TimestampTz stop_time,
int msec)
{
- int64 diff = stop_time - start_time;
+ TimestampTz diff = stop_time - start_time;
return (diff >= msec * INT64CONST(1000));
}
#include "libpq-fe.h"
#include "access/xlogdefs.h"
+#include "datatype/timestamp.h"
extern const char *progname;
extern char *connection_string;
TimeLineID *starttli,
XLogRecPtr *startpos,
char **db_name);
-extern int64 feGetCurrentTimestamp(void);
-extern void feTimestampDifference(int64 start_time, int64 stop_time,
+extern TimestampTz feGetCurrentTimestamp(void);
+extern void feTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
long *secs, int *microsecs);
-extern bool feTimestampDifferenceExceeds(int64 start_time, int64 stop_time,
+extern bool feTimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time,
int msec);
extern void fe_sendint64(int64 i, char *buf);
extern int64 fe_recvint64(char *buf);
#ifndef DATATYPE_TIMESTAMP_H
#define DATATYPE_TIMESTAMP_H
-#include <math.h>
-#include <limits.h>
-#include <float.h>
-
/*
* Timestamp represents absolute time.
*
extern Size SnapMgrShmemSize(void);
extern void SnapMgrInit(void);
-extern int64 GetSnapshotCurrentTimestamp(void);
-extern int64 GetOldSnapshotThresholdTimestamp(void);
+extern TimestampTz GetSnapshotCurrentTimestamp(void);
+extern TimestampTz GetOldSnapshotThresholdTimestamp(void);
extern bool FirstSnapshotSet;
extern bool ThereAreNoPriorRegisteredSnapshots(void);
extern TransactionId TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,
Relation relation);
-extern void MaintainOldSnapshotTimeMapping(int64 whenTaken, TransactionId xmin);
+extern void MaintainOldSnapshotTimeMapping(TimestampTz whenTaken,
+ TransactionId xmin);
extern char *ExportSnapshot(Snapshot snapshot);
#include "access/htup.h"
#include "access/xlogdefs.h"
+#include "datatype/timestamp.h"
#include "lib/pairingheap.h"
#include "storage/buf.h"
uint32 regd_count; /* refcount on RegisteredSnapshots */
pairingheap_node ph_node; /* link in the RegisteredSnapshots heap */
- int64 whenTaken; /* timestamp when snapshot was taken */
+ TimestampTz whenTaken; /* timestamp when snapshot was taken */
XLogRecPtr lsn; /* position in the WAL stream when taken */
} SnapshotData;
TimestampTz stop_time,
int msec);
-/*
- * Prototypes for functions to deal with integer timestamps, when the native
- * format is float timestamps.
- */
-#define GetCurrentIntegerTimestamp() GetCurrentTimestamp()
-#define IntegerTimestampToTimestampTz(timestamp) (timestamp)
-
extern TimestampTz time_t_to_timestamptz(pg_time_t tm);
extern pg_time_t timestamptz_to_time_t(TimestampTz t);