-
Notifications
You must be signed in to change notification settings - Fork 5
/
pg_rewrite.h
318 lines (268 loc) · 8.34 KB
/
pg_rewrite.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
/*------------------------------------------------------------
*
* pg_rewrite.h
* Tools for maintenance that requires table rewriting.
*
* Copyright (c) 2023, Cybertec PostgreSQL International GmbH
*
*------------------------------------------------------------
*/
#include <sys/time.h>
#include "c.h"
#include "postgres.h"
#include "fmgr.h"
#include "miscadmin.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/relscan.h"
#include "access/xlog_internal.h"
#include "access/xact.h"
#include "catalog/pg_class.h"
#include "nodes/execnodes.h"
#include "postmaster/bgworker.h"
#include "replication/logical.h"
#include "replication/origin.h"
#include "utils/inval.h"
#include "utils/resowner.h"
#include "utils/snapmgr.h"
typedef enum
{
CHANGE_INSERT,
CHANGE_UPDATE_OLD,
CHANGE_UPDATE_NEW,
CHANGE_DELETE
} ConcurrentChangeKind;
typedef struct ConcurrentChange
{
/* See the enum above. */
ConcurrentChangeKind kind;
/*
* The actual tuple.
*
* The tuple data follows the ConcurrentChange structure. Before use make
* sure the tuple is correctly aligned (ConcurrentChange can be stored as
* bytea) and that tuple->t_data is fixed.
*/
HeapTupleData tup_data;
} ConcurrentChange;
typedef struct DecodingOutputState
{
/* The relation whose changes we're decoding. */
Oid relid;
/*
* Decoded changes are stored here. Although we try to avoid excessive
* batches, it can happen that the changes need to be stored to disk. The
* tuplestore does this transparently.
*/
Tuplestorestate *tstore;
/* The current number of changes in tstore. */
double nchanges;
/*
* Descriptor to store the ConcurrentChange structure serialized (bytea).
* We can't store the tuple directly because tuplestore only supports
* minimum tuple and we may need to transfer OID system column from the
* output plugin. Also we need to transfer the change kind, so it's better
* to put everything in the structure than to use 2 tuplestores "in
* parallel".
*/
TupleDesc tupdesc_change;
/* Tuple descriptor needed to update indexes. */
TupleDesc tupdesc;
/* Slot to retrieve data from tstore. */
TupleTableSlot *tsslot;
/*
* WAL records having this origin have been created by the initial load
* and should not be decoded.
*/
RepOriginId rorigin;
ResourceOwner resowner;
} DecodingOutputState;
/* The WAL segment being decoded. */
extern XLogSegNo part_current_segment;
extern void _PG_init(void);
/*
* Subset of fields of pg_class, plus the necessary info on attributes. It
* represents either the source relation or a composite type of the source
* relation's attribute.
*/
typedef struct PgClassCatInfo
{
/* pg_class(oid) */
Oid relid;
/*
* pg_class(xmin)
*/
TransactionId xmin;
/* Array of pg_attribute(xmin). (Dropped columns are here too.) */
TransactionId *attr_xmins;
int16 relnatts;
} PgClassCatInfo;
/*
* Information on source relation index, used to build the index on the
* transient relation. To avoid repeated retrieval of the pg_index fields we
* also add pg_class(xmin) and pass the same structure to
* check_catalog_changes().
*/
typedef struct IndexCatInfo
{
Oid oid; /* pg_index(indexrelid) */
NameData relname; /* pg_class(relname) */
Oid reltablespace; /* pg_class(reltablespace) */
TransactionId xmin; /* pg_index(xmin) */
TransactionId pg_class_xmin; /* pg_class(xmin) of the index (not the
* parent relation) */
} IndexCatInfo;
/*
* If the source relation has attribute(s) of composite type, we need to check
* for changes of those types.
*/
typedef struct TypeCatInfo
{
Oid oid; /* pg_type(oid) */
TransactionId xmin; /* pg_type(xmin) */
/*
* The pg_class entry whose oid == pg_type(typrelid) of this type.
*/
PgClassCatInfo rel;
} TypeCatInfo;
/*
* Information to check whether an "incompatible" catalog change took
* place. Such a change prevents us from completing processing of the current
* table.
*/
typedef struct CatalogState
{
/* The relation whose changes we'll check for. */
PgClassCatInfo rel;
/* Copy of pg_class tuple of the source relation. */
Form_pg_class form_class;
/* Copy of pg_class tuple descriptor of the source relation. */
TupleDesc desc_class;
/* Per-index info. */
int relninds;
IndexCatInfo *indexes;
/* Composite types used by the source rel attributes. */
TypeCatInfo *comptypes;
/* Size of the array. */
int ncomptypes_max;
/* Used elements of the array. */
int ncomptypes;
/*
* Does at least one index have wrong value of indisvalid, indisready or
* indislive?
*/
bool invalid_index;
/* Does the table have primary key index? */
bool have_pk_index;
} CatalogState;
/*
* Hash table to cache partition-specific information.
*/
typedef struct PartitionEntry
{
Oid part_oid; /* key */
Relation ident_index;
/*
* Slot to retrieve tuples from identity index. Since we only allow
* partitions to have exactly the same attributes as the parent table, it
* should work if we used the same slot for all partitions. However it
* seems cleaner if separate slots are used.
*/
TupleTableSlot *ind_slot;
/* This should make insertions into partitions more efficient. */
BulkInsertState bistate;
char status; /* used by simplehash */
} PartitionEntry;
#define SH_PREFIX partitions
#define SH_ELEMENT_TYPE PartitionEntry
#define SH_KEY_TYPE Oid
#define SH_KEY part_oid
#define SH_HASH_KEY(tb, key) (key)
#define SH_EQUAL(tb, a, b) ((a) == (b))
#define SH_SCOPE static inline
#define SH_DECLARE
#define SH_DEFINE
#include "lib/simplehash.h"
/* Progress tracking. */
typedef struct TaskProgress
{
/* Tuples inserted during the initial load. */
int64 ins_initial;
/*
* Tuples inserted, updated and deleted after the initial load (i.e.
* during the catch-up phase).
*/
int64 ins;
int64 upd;
int64 del;
} TaskProgress;
/*
* The new implementation, which delegates the execution to a background
* worker (as opposed to the PG executor).
*
* Arguments are passed to the worker via this structure, located in the
* shared memory.
*/
typedef struct WorkerTask
{
/* Connection info. */
Oid dbid;
Oid roleid;
/* Worker that performs the task both sets and clears this field. */
pid_t pid;
/* See the comments of pg_rewrite_exit_if_requested(). */
bool exit_requested;
/* The progress is only valid if the dbid is valid. */
TaskProgress progress;
/*
* Use this when setting / clearing the fields above. Once dbid is set,
* the task belongs to the backend that set it, so the other fields may be
* assigned w/o the lock.
*/
slock_t mutex;
/* The tables to work on. */
NameData relschema_src;
NameData relname_src;
NameData relname_src_new;
NameData relschema_dst;
NameData relname_dst;
/* Space for the worker to send an error message to the backend. */
#define MAX_ERR_MSG_LEN 1024
char msg[MAX_ERR_MSG_LEN];
/* The rewrite.wait_after_load GUC, for test purposes. */
int wait_after_load;
/* The rewrite.check_constraints GUC, for test purposes. */
bool check_constraints;
} WorkerTask;
#define MAX_TASKS 8
/* Each backend stores here the pointer to its task in the shared memory. */
extern WorkerTask *MyWorkerTask;
extern PGDLLEXPORT void rewrite_worker_main(Datum main_arg);
extern void pg_rewrite_exit_if_requested(void);
/*
* Use function names distinct from those in pg_squeeze, in case both
* extensions are installed.
*/
extern void pg_rewrite_check_catalog_changes(CatalogState *state,
LOCKMODE lock_held);
extern bool pg_rewrite_process_concurrent_changes(EState *estate,
ModifyTableState *mtstate,
struct PartitionTupleRouting *proute,
LogicalDecodingContext *ctx,
XLogRecPtr end_of_wal,
CatalogState *cat_state,
Relation rel_dst,
ScanKey ident_key,
int ident_key_nentries,
LOCKMODE lock_held,
partitions_hash *partitions,
TupleConversionMap *conv_map,
struct timeval *must_complete);
extern bool pg_rewrite_decode_concurrent_changes(LogicalDecodingContext *ctx,
XLogRecPtr end_of_wal,
struct timeval *must_complete);
extern HeapTuple convert_tuple_for_dest_table(HeapTuple tuple,
TupleConversionMap *conv_map);
extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
extern BulkInsertState get_partition_insert_state(partitions_hash *partitions,
Oid part_oid);;