Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * worker_internal.h
4 : * Internal headers shared by logical replication workers.
5 : *
6 : * Portions Copyright (c) 2016-2017, PostgreSQL Global Development Group
7 : *
8 : * src/include/replication/worker_internal.h
9 : *
10 : *-------------------------------------------------------------------------
11 : */
12 : #ifndef WORKER_INTERNAL_H
13 : #define WORKER_INTERNAL_H
14 :
15 : #include <signal.h>
16 :
17 : #include "access/xlogdefs.h"
18 : #include "catalog/pg_subscription.h"
19 : #include "datatype/timestamp.h"
20 : #include "storage/lock.h"
21 :
22 : typedef struct LogicalRepWorker
23 : {
24 : /* Time at which this worker was launched. */
25 : TimestampTz launch_time;
26 :
27 : /* Indicates if this slot is used or free. */
28 : bool in_use;
29 :
30 : /* Increased everytime the slot is taken by new worker. */
31 : uint16 generation;
32 :
33 : /* Pointer to proc array. NULL if not running. */
34 : PGPROC *proc;
35 :
36 : /* Database id to connect to. */
37 : Oid dbid;
38 :
39 : /* User to use for connection (will be same as owner of subscription). */
40 : Oid userid;
41 :
42 : /* Subscription id for the worker. */
43 : Oid subid;
44 :
45 : /* Used for initial table synchronization. */
46 : Oid relid;
47 : char relstate;
48 : XLogRecPtr relstate_lsn;
49 : slock_t relmutex;
50 :
51 : /* Stats. */
52 : XLogRecPtr last_lsn;
53 : TimestampTz last_send_time;
54 : TimestampTz last_recv_time;
55 : XLogRecPtr reply_lsn;
56 : TimestampTz reply_time;
57 : } LogicalRepWorker;
58 :
59 : /* Main memory context for apply worker. Permanent during worker lifetime. */
60 : extern MemoryContext ApplyContext;
61 :
62 : /* libpqreceiver connection */
63 : extern struct WalReceiverConn *wrconn;
64 :
65 : /* Worker and subscription objects. */
66 : extern Subscription *MySubscription;
67 : extern LogicalRepWorker *MyLogicalRepWorker;
68 :
69 : extern bool in_remote_transaction;
70 :
71 : extern void logicalrep_worker_attach(int slot);
72 : extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
73 : bool only_running);
74 : extern List *logicalrep_workers_find(Oid subid, bool only_running);
75 : extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
76 : Oid userid, Oid relid);
77 : extern void logicalrep_worker_stop(Oid subid, Oid relid);
78 : extern void logicalrep_worker_stop_at_commit(Oid subid, Oid relid);
79 : extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
80 : extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
81 :
82 : extern int logicalrep_sync_worker_count(Oid subid);
83 :
84 : extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
85 : void process_syncing_tables(XLogRecPtr current_lsn);
86 : void invalidate_syncing_table_states(Datum arg, int cacheid,
87 : uint32 hashvalue);
88 :
89 : static inline bool
90 0 : am_tablesync_worker(void)
91 : {
92 0 : return OidIsValid(MyLogicalRepWorker->relid);
93 : }
94 :
95 : #endif /* WORKER_INTERNAL_H */
|