Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * copy.c
4 : * Implements the COPY utility command
5 : *
6 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/commands/copy.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : #include "postgres.h"
16 :
17 : #include <ctype.h>
18 : #include <unistd.h>
19 : #include <sys/stat.h>
20 : #include <netinet/in.h>
21 : #include <arpa/inet.h>
22 :
23 : #include "access/heapam.h"
24 : #include "access/htup_details.h"
25 : #include "access/sysattr.h"
26 : #include "access/xact.h"
27 : #include "access/xlog.h"
28 : #include "catalog/pg_type.h"
29 : #include "commands/copy.h"
30 : #include "commands/defrem.h"
31 : #include "commands/trigger.h"
32 : #include "executor/executor.h"
33 : #include "libpq/libpq.h"
34 : #include "libpq/pqformat.h"
35 : #include "mb/pg_wchar.h"
36 : #include "miscadmin.h"
37 : #include "optimizer/clauses.h"
38 : #include "optimizer/planner.h"
39 : #include "nodes/makefuncs.h"
40 : #include "parser/parse_relation.h"
41 : #include "rewrite/rewriteHandler.h"
42 : #include "storage/fd.h"
43 : #include "tcop/tcopprot.h"
44 : #include "utils/builtins.h"
45 : #include "utils/lsyscache.h"
46 : #include "utils/memutils.h"
47 : #include "utils/portal.h"
48 : #include "utils/rel.h"
49 : #include "utils/rls.h"
50 : #include "utils/snapmgr.h"
51 :
52 :
53 : #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
54 : #define OCTVALUE(c) ((c) - '0')
55 :
56 : /*
57 : * Represents the different source/dest cases we need to worry about at
58 : * the bottom level
59 : */
60 : typedef enum CopyDest
61 : {
62 : COPY_FILE, /* to/from file (or a piped program) */
63 : COPY_OLD_FE, /* to/from frontend (2.0 protocol) */
64 : COPY_NEW_FE, /* to/from frontend (3.0 protocol) */
65 : COPY_CALLBACK /* to/from callback function */
66 : } CopyDest;
67 :
68 : /*
69 : * Represents the end-of-line terminator type of the input
70 : */
71 : typedef enum EolType
72 : {
73 : EOL_UNKNOWN,
74 : EOL_NL,
75 : EOL_CR,
76 : EOL_CRNL
77 : } EolType;
78 :
79 : /*
80 : * This struct contains all the state variables used throughout a COPY
81 : * operation. For simplicity, we use the same struct for all variants of COPY,
82 : * even though some fields are used in only some cases.
83 : *
84 : * Multi-byte encodings: all supported client-side encodings encode multi-byte
85 : * characters by having the first byte's high bit set. Subsequent bytes of the
86 : * character can have the high bit not set. When scanning data in such an
87 : * encoding to look for a match to a single-byte (ie ASCII) character, we must
88 : * use the full pg_encoding_mblen() machinery to skip over multibyte
89 : * characters, else we might find a false match to a trailing byte. In
90 : * supported server encodings, there is no possibility of a false match, and
91 : * it's faster to make useless comparisons to trailing bytes than it is to
92 : * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is TRUE
93 : * when we have to do it the hard way.
94 : */
95 : typedef struct CopyStateData
96 : {
97 : /* low-level state data */
98 : CopyDest copy_dest; /* type of copy source/destination */
99 : FILE *copy_file; /* used if copy_dest == COPY_FILE */
100 : StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for
101 : * dest == COPY_NEW_FE in COPY FROM */
102 : bool fe_eof; /* true if detected end of copy data */
103 : EolType eol_type; /* EOL type of input */
104 : int file_encoding; /* file or remote side's character encoding */
105 : bool need_transcoding; /* file encoding diff from server? */
106 : bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
107 :
108 : /* parameters from the COPY command */
109 : Relation rel; /* relation to copy to or from */
110 : QueryDesc *queryDesc; /* executable query to copy from */
111 : List *attnumlist; /* integer list of attnums to copy */
112 : char *filename; /* filename, or NULL for STDIN/STDOUT */
113 : bool is_program; /* is 'filename' a program to popen? */
114 : copy_data_source_cb data_source_cb; /* function for reading data */
115 : bool binary; /* binary format? */
116 : bool oids; /* include OIDs? */
117 : bool freeze; /* freeze rows on loading? */
118 : bool csv_mode; /* Comma Separated Value format? */
119 : bool header_line; /* CSV header line? */
120 : char *null_print; /* NULL marker string (server encoding!) */
121 : int null_print_len; /* length of same */
122 : char *null_print_client; /* same converted to file encoding */
123 : char *delim; /* column delimiter (must be 1 byte) */
124 : char *quote; /* CSV quote char (must be 1 byte) */
125 : char *escape; /* CSV escape char (must be 1 byte) */
126 : List *force_quote; /* list of column names */
127 : bool force_quote_all; /* FORCE_QUOTE *? */
128 : bool *force_quote_flags; /* per-column CSV FQ flags */
129 : List *force_notnull; /* list of column names */
130 : bool *force_notnull_flags; /* per-column CSV FNN flags */
131 : List *force_null; /* list of column names */
132 : bool *force_null_flags; /* per-column CSV FN flags */
133 : bool convert_selectively; /* do selective binary conversion? */
134 : List *convert_select; /* list of column names (can be NIL) */
135 : bool *convert_select_flags; /* per-column CSV/TEXT CS flags */
136 :
137 : /* these are just for error messages, see CopyFromErrorCallback */
138 : const char *cur_relname; /* table name for error messages */
139 : int cur_lineno; /* line number for error messages */
140 : const char *cur_attname; /* current att for error messages */
141 : const char *cur_attval; /* current att value for error messages */
142 :
143 : /*
144 : * Working state for COPY TO/FROM
145 : */
146 : MemoryContext copycontext; /* per-copy execution context */
147 :
148 : /*
149 : * Working state for COPY TO
150 : */
151 : FmgrInfo *out_functions; /* lookup info for output functions */
152 : MemoryContext rowcontext; /* per-row evaluation context */
153 :
154 : /*
155 : * Working state for COPY FROM
156 : */
157 : AttrNumber num_defaults;
158 : bool file_has_oids;
159 : FmgrInfo oid_in_function;
160 : Oid oid_typioparam;
161 : FmgrInfo *in_functions; /* array of input functions for each attrs */
162 : Oid *typioparams; /* array of element types for in_functions */
163 : int *defmap; /* array of default att numbers */
164 : ExprState **defexprs; /* array of default att expressions */
165 : bool volatile_defexprs; /* is any of defexprs volatile? */
166 : List *range_table;
167 :
168 : PartitionDispatch *partition_dispatch_info;
169 : int num_dispatch; /* Number of entries in the above array */
170 : int num_partitions; /* Number of members in the following arrays */
171 : ResultRelInfo *partitions; /* Per partition result relation */
172 : TupleConversionMap **partition_tupconv_maps;
173 : TupleTableSlot *partition_tuple_slot;
174 : TransitionCaptureState *transition_capture;
175 : TupleConversionMap **transition_tupconv_maps;
176 :
177 : /*
178 : * These variables are used to reduce overhead in textual COPY FROM.
179 : *
180 : * attribute_buf holds the separated, de-escaped text for each field of
181 : * the current line. The CopyReadAttributes functions return arrays of
182 : * pointers into this buffer. We avoid palloc/pfree overhead by re-using
183 : * the buffer on each cycle.
184 : */
185 : StringInfoData attribute_buf;
186 :
187 : /* field raw data pointers found by COPY FROM */
188 :
189 : int max_fields;
190 : char **raw_fields;
191 :
192 : /*
193 : * Similarly, line_buf holds the whole input line being processed. The
194 : * input cycle is first to read the whole line into line_buf, convert it
195 : * to server encoding there, and then extract the individual attribute
196 : * fields into attribute_buf. line_buf is preserved unmodified so that we
197 : * can display it in error messages if appropriate.
198 : */
199 : StringInfoData line_buf;
200 : bool line_buf_converted; /* converted to server encoding? */
201 : bool line_buf_valid; /* contains the row being processed? */
202 :
203 : /*
204 : * Finally, raw_buf holds raw data read from the data source (file or
205 : * client connection). CopyReadLine parses this data sufficiently to
206 : * locate line boundaries, then transfers the data to line_buf and
207 : * converts it. Note: we guarantee that there is a \0 at
208 : * raw_buf[raw_buf_len].
209 : */
210 : #define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */
211 : char *raw_buf;
212 : int raw_buf_index; /* next byte to process */
213 : int raw_buf_len; /* total # of bytes stored */
214 : } CopyStateData;
215 :
216 : /* DestReceiver for COPY (query) TO */
217 : typedef struct
218 : {
219 : DestReceiver pub; /* publicly-known function pointers */
220 : CopyState cstate; /* CopyStateData for the command */
221 : uint64 processed; /* # of tuples processed */
222 : } DR_copy;
223 :
224 :
225 : /*
226 : * These macros centralize code used to process line_buf and raw_buf buffers.
227 : * They are macros because they often do continue/break control and to avoid
228 : * function call overhead in tight COPY loops.
229 : *
230 : * We must use "if (1)" because the usual "do {...} while(0)" wrapper would
231 : * prevent the continue/break processing from working. We end the "if (1)"
232 : * with "else ((void) 0)" to ensure the "if" does not unintentionally match
233 : * any "else" in the calling code, and to avoid any compiler warnings about
234 : * empty statements. See http://www.cit.gu.edu.au/~anthony/info/C/C.macros.
235 : */
236 :
237 : /*
238 : * This keeps the character read at the top of the loop in the buffer
239 : * even if there is more than one read-ahead.
240 : */
241 : #define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \
242 : if (1) \
243 : { \
244 : if (raw_buf_ptr + (extralen) >= copy_buf_len && !hit_eof) \
245 : { \
246 : raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \
247 : need_data = true; \
248 : continue; \
249 : } \
250 : } else ((void) 0)
251 :
252 : /* This consumes the remainder of the buffer and breaks */
253 : #define IF_NEED_REFILL_AND_EOF_BREAK(extralen) \
254 : if (1) \
255 : { \
256 : if (raw_buf_ptr + (extralen) >= copy_buf_len && hit_eof) \
257 : { \
258 : if (extralen) \
259 : raw_buf_ptr = copy_buf_len; /* consume the partial character */ \
260 : /* backslash just before EOF, treat as data char */ \
261 : result = true; \
262 : break; \
263 : } \
264 : } else ((void) 0)
265 :
266 : /*
267 : * Transfer any approved data to line_buf; must do this to be sure
268 : * there is some room in raw_buf.
269 : */
270 : #define REFILL_LINEBUF \
271 : if (1) \
272 : { \
273 : if (raw_buf_ptr > cstate->raw_buf_index) \
274 : { \
275 : appendBinaryStringInfo(&cstate->line_buf, \
276 : cstate->raw_buf + cstate->raw_buf_index, \
277 : raw_buf_ptr - cstate->raw_buf_index); \
278 : cstate->raw_buf_index = raw_buf_ptr; \
279 : } \
280 : } else ((void) 0)
281 :
282 : /* Undo any read-ahead and jump out of the block. */
283 : #define NO_END_OF_COPY_GOTO \
284 : if (1) \
285 : { \
286 : raw_buf_ptr = prev_raw_ptr + 1; \
287 : goto not_end_of_copy; \
288 : } else ((void) 0)
289 :
290 : static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
291 :
292 :
293 : /* non-export function prototypes */
294 : static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel,
295 : RawStmt *raw_query, Oid queryRelId, List *attnamelist,
296 : List *options);
297 : static void EndCopy(CopyState cstate);
298 : static void ClosePipeToProgram(CopyState cstate);
299 : static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
300 : Oid queryRelId, const char *filename, bool is_program,
301 : List *attnamelist, List *options);
302 : static void EndCopyTo(CopyState cstate);
303 : static uint64 DoCopyTo(CopyState cstate);
304 : static uint64 CopyTo(CopyState cstate);
305 : static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
306 : Datum *values, bool *nulls);
307 : static void CopyFromInsertBatch(CopyState cstate, EState *estate,
308 : CommandId mycid, int hi_options,
309 : ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
310 : BulkInsertState bistate,
311 : int nBufferedTuples, HeapTuple *bufferedTuples,
312 : int firstBufferedLineNo);
313 : static bool CopyReadLine(CopyState cstate);
314 : static bool CopyReadLineText(CopyState cstate);
315 : static int CopyReadAttributesText(CopyState cstate);
316 : static int CopyReadAttributesCSV(CopyState cstate);
317 : static Datum CopyReadBinaryAttribute(CopyState cstate,
318 : int column_no, FmgrInfo *flinfo,
319 : Oid typioparam, int32 typmod,
320 : bool *isnull);
321 : static void CopyAttributeOutText(CopyState cstate, char *string);
322 : static void CopyAttributeOutCSV(CopyState cstate, char *string,
323 : bool use_quote, bool single_attr);
324 : static List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
325 : List *attnamelist);
326 : static char *limit_printout_length(const char *str);
327 :
328 : /* Low-level communications functions */
329 : static void SendCopyBegin(CopyState cstate);
330 : static void ReceiveCopyBegin(CopyState cstate);
331 : static void SendCopyEnd(CopyState cstate);
332 : static void CopySendData(CopyState cstate, const void *databuf, int datasize);
333 : static void CopySendString(CopyState cstate, const char *str);
334 : static void CopySendChar(CopyState cstate, char c);
335 : static void CopySendEndOfRow(CopyState cstate);
336 : static int CopyGetData(CopyState cstate, void *databuf,
337 : int minread, int maxread);
338 : static void CopySendInt32(CopyState cstate, int32 val);
339 : static bool CopyGetInt32(CopyState cstate, int32 *val);
340 : static void CopySendInt16(CopyState cstate, int16 val);
341 : static bool CopyGetInt16(CopyState cstate, int16 *val);
342 :
343 :
344 : /*
345 : * Send copy start/stop messages for frontend copies. These have changed
346 : * in past protocol redesigns.
347 : */
348 : static void
349 64 : SendCopyBegin(CopyState cstate)
350 : {
351 64 : if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
352 : {
353 : /* new way */
354 : StringInfoData buf;
355 64 : int natts = list_length(cstate->attnumlist);
356 64 : int16 format = (cstate->binary ? 1 : 0);
357 : int i;
358 :
359 64 : pq_beginmessage(&buf, 'H');
360 64 : pq_sendbyte(&buf, format); /* overall format */
361 64 : pq_sendint(&buf, natts, 2);
362 184 : for (i = 0; i < natts; i++)
363 120 : pq_sendint(&buf, format, 2); /* per-column formats */
364 64 : pq_endmessage(&buf);
365 64 : cstate->copy_dest = COPY_NEW_FE;
366 : }
367 : else
368 : {
369 : /* old way */
370 0 : if (cstate->binary)
371 0 : ereport(ERROR,
372 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
373 : errmsg("COPY BINARY is not supported to stdout or from stdin")));
374 0 : pq_putemptymessage('H');
375 : /* grottiness needed for old COPY OUT protocol */
376 0 : pq_startcopyout();
377 0 : cstate->copy_dest = COPY_OLD_FE;
378 : }
379 64 : }
380 :
381 : static void
382 77 : ReceiveCopyBegin(CopyState cstate)
383 : {
384 77 : if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
385 : {
386 : /* new way */
387 : StringInfoData buf;
388 77 : int natts = list_length(cstate->attnumlist);
389 77 : int16 format = (cstate->binary ? 1 : 0);
390 : int i;
391 :
392 77 : pq_beginmessage(&buf, 'G');
393 77 : pq_sendbyte(&buf, format); /* overall format */
394 77 : pq_sendint(&buf, natts, 2);
395 263 : for (i = 0; i < natts; i++)
396 186 : pq_sendint(&buf, format, 2); /* per-column formats */
397 77 : pq_endmessage(&buf);
398 77 : cstate->copy_dest = COPY_NEW_FE;
399 77 : cstate->fe_msgbuf = makeStringInfo();
400 : }
401 : else
402 : {
403 : /* old way */
404 0 : if (cstate->binary)
405 0 : ereport(ERROR,
406 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
407 : errmsg("COPY BINARY is not supported to stdout or from stdin")));
408 0 : pq_putemptymessage('G');
409 : /* any error in old protocol will make us lose sync */
410 0 : pq_startmsgread();
411 0 : cstate->copy_dest = COPY_OLD_FE;
412 : }
413 : /* We *must* flush here to ensure FE knows it can send. */
414 77 : pq_flush();
415 77 : }
416 :
417 : static void
418 64 : SendCopyEnd(CopyState cstate)
419 : {
420 64 : if (cstate->copy_dest == COPY_NEW_FE)
421 : {
422 : /* Shouldn't have any unsent data */
423 64 : Assert(cstate->fe_msgbuf->len == 0);
424 : /* Send Copy Done message */
425 64 : pq_putemptymessage('c');
426 : }
427 : else
428 : {
429 0 : CopySendData(cstate, "\\.", 2);
430 : /* Need to flush out the trailer (this also appends a newline) */
431 0 : CopySendEndOfRow(cstate);
432 0 : pq_endcopyout(false);
433 : }
434 64 : }
435 :
436 : /*----------
437 : * CopySendData sends output data to the destination (file or frontend)
438 : * CopySendString does the same for null-terminated strings
439 : * CopySendChar does the same for single characters
440 : * CopySendEndOfRow does the appropriate thing at end of each data row
441 : * (data is not actually flushed except by CopySendEndOfRow)
442 : *
443 : * NB: no data conversion is applied by these functions
444 : *----------
445 : */
446 : static void
447 32570 : CopySendData(CopyState cstate, const void *databuf, int datasize)
448 : {
449 32570 : appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize);
450 32570 : }
451 :
452 : static void
453 74 : CopySendString(CopyState cstate, const char *str)
454 : {
455 74 : appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str));
456 74 : }
457 :
458 : static void
459 32732 : CopySendChar(CopyState cstate, char c)
460 : {
461 32732 : appendStringInfoCharMacro(cstate->fe_msgbuf, c);
462 32732 : }
463 :
464 : static void
465 2266 : CopySendEndOfRow(CopyState cstate)
466 : {
467 2266 : StringInfo fe_msgbuf = cstate->fe_msgbuf;
468 :
469 2266 : switch (cstate->copy_dest)
470 : {
471 : case COPY_FILE:
472 2012 : if (!cstate->binary)
473 : {
474 : /* Default line termination depends on platform */
475 : #ifndef WIN32
476 2008 : CopySendChar(cstate, '\n');
477 : #else
478 : CopySendString(cstate, "\r\n");
479 : #endif
480 : }
481 :
482 2012 : if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
483 2012 : cstate->copy_file) != 1 ||
484 2012 : ferror(cstate->copy_file))
485 : {
486 0 : if (cstate->is_program)
487 : {
488 0 : if (errno == EPIPE)
489 : {
490 : /*
491 : * The pipe will be closed automatically on error at
492 : * the end of transaction, but we might get a better
493 : * error message from the subprocess' exit code than
494 : * just "Broken Pipe"
495 : */
496 0 : ClosePipeToProgram(cstate);
497 :
498 : /*
499 : * If ClosePipeToProgram() didn't throw an error, the
500 : * program terminated normally, but closed the pipe
501 : * first. Restore errno, and throw an error.
502 : */
503 0 : errno = EPIPE;
504 : }
505 0 : ereport(ERROR,
506 : (errcode_for_file_access(),
507 : errmsg("could not write to COPY program: %m")));
508 : }
509 : else
510 0 : ereport(ERROR,
511 : (errcode_for_file_access(),
512 : errmsg("could not write to COPY file: %m")));
513 : }
514 2012 : break;
515 : case COPY_OLD_FE:
516 : /* The FE/BE protocol uses \n as newline for all platforms */
517 0 : if (!cstate->binary)
518 0 : CopySendChar(cstate, '\n');
519 :
520 0 : if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))
521 : {
522 : /* no hope of recovering connection sync, so FATAL */
523 0 : ereport(FATAL,
524 : (errcode(ERRCODE_CONNECTION_FAILURE),
525 : errmsg("connection lost during COPY to stdout")));
526 : }
527 0 : break;
528 : case COPY_NEW_FE:
529 : /* The FE/BE protocol uses \n as newline for all platforms */
530 254 : if (!cstate->binary)
531 254 : CopySendChar(cstate, '\n');
532 :
533 : /* Dump the accumulated row as one CopyData message */
534 254 : (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
535 254 : break;
536 : case COPY_CALLBACK:
537 0 : Assert(false); /* Not yet supported. */
538 : break;
539 : }
540 :
541 2266 : resetStringInfo(fe_msgbuf);
542 2266 : }
543 :
544 : /*
545 : * CopyGetData reads data from the source (file or frontend)
546 : *
547 : * We attempt to read at least minread, and at most maxread, bytes from
548 : * the source. The actual number of bytes read is returned; if this is
549 : * less than minread, EOF was detected.
550 : *
551 : * Note: when copying from the frontend, we expect a proper EOF mark per
552 : * protocol; if the frontend simply drops the connection, we raise error.
553 : * It seems unwise to allow the COPY IN to complete normally in that case.
554 : *
555 : * NB: no data conversion is applied here.
556 : */
557 : static int
558 479 : CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
559 : {
560 479 : int bytesread = 0;
561 :
562 479 : switch (cstate->copy_dest)
563 : {
564 : case COPY_FILE:
565 144 : bytesread = fread(databuf, 1, maxread, cstate->copy_file);
566 144 : if (ferror(cstate->copy_file))
567 0 : ereport(ERROR,
568 : (errcode_for_file_access(),
569 : errmsg("could not read from COPY file: %m")));
570 144 : break;
571 : case COPY_OLD_FE:
572 :
573 : /*
574 : * We cannot read more than minread bytes (which in practice is 1)
575 : * because old protocol doesn't have any clear way of separating
576 : * the COPY stream from following data. This is slow, but not any
577 : * slower than the code path was originally, and we don't care
578 : * much anymore about the performance of old protocol.
579 : */
580 0 : if (pq_getbytes((char *) databuf, minread))
581 : {
582 : /* Only a \. terminator is legal EOF in old protocol */
583 0 : ereport(ERROR,
584 : (errcode(ERRCODE_CONNECTION_FAILURE),
585 : errmsg("unexpected EOF on client connection with an open transaction")));
586 : }
587 0 : bytesread = minread;
588 0 : break;
589 : case COPY_NEW_FE:
590 883 : while (maxread > 0 && bytesread < minread && !cstate->fe_eof)
591 : {
592 : int avail;
593 :
594 761 : while (cstate->fe_msgbuf->cursor >= cstate->fe_msgbuf->len)
595 : {
596 : /* Try to receive another message */
597 : int mtype;
598 :
599 : readmessage:
600 274 : HOLD_CANCEL_INTERRUPTS();
601 274 : pq_startmsgread();
602 274 : mtype = pq_getbyte();
603 274 : if (mtype == EOF)
604 0 : ereport(ERROR,
605 : (errcode(ERRCODE_CONNECTION_FAILURE),
606 : errmsg("unexpected EOF on client connection with an open transaction")));
607 274 : if (pq_getmessage(cstate->fe_msgbuf, 0))
608 0 : ereport(ERROR,
609 : (errcode(ERRCODE_CONNECTION_FAILURE),
610 : errmsg("unexpected EOF on client connection with an open transaction")));
611 274 : RESUME_CANCEL_INTERRUPTS();
612 274 : switch (mtype)
613 : {
614 : case 'd': /* CopyData */
615 213 : break;
616 : case 'c': /* CopyDone */
617 : /* COPY IN correctly terminated by frontend */
618 61 : cstate->fe_eof = true;
619 61 : return bytesread;
620 : case 'f': /* CopyFail */
621 0 : ereport(ERROR,
622 : (errcode(ERRCODE_QUERY_CANCELED),
623 : errmsg("COPY from stdin failed: %s",
624 : pq_getmsgstring(cstate->fe_msgbuf))));
625 : break;
626 : case 'H': /* Flush */
627 : case 'S': /* Sync */
628 :
629 : /*
630 : * Ignore Flush/Sync for the convenience of client
631 : * libraries (such as libpq) that may send those
632 : * without noticing that the command they just
633 : * sent was COPY.
634 : */
635 0 : goto readmessage;
636 : default:
637 0 : ereport(ERROR,
638 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
639 : errmsg("unexpected message type 0x%02X during COPY from stdin",
640 : mtype)));
641 : break;
642 : }
643 : }
644 213 : avail = cstate->fe_msgbuf->len - cstate->fe_msgbuf->cursor;
645 213 : if (avail > maxread)
646 0 : avail = maxread;
647 213 : pq_copymsgbytes(cstate->fe_msgbuf, databuf, avail);
648 213 : databuf = (void *) ((char *) databuf + avail);
649 213 : maxread -= avail;
650 213 : bytesread += avail;
651 : }
652 274 : break;
653 : case COPY_CALLBACK:
654 0 : bytesread = cstate->data_source_cb(databuf, minread, maxread);
655 0 : break;
656 : }
657 :
658 418 : return bytesread;
659 : }
660 :
661 :
662 : /*
663 : * These functions do apply some data conversion
664 : */
665 :
666 : /*
667 : * CopySendInt32 sends an int32 in network byte order
668 : */
669 : static void
670 23 : CopySendInt32(CopyState cstate, int32 val)
671 : {
672 : uint32 buf;
673 :
674 23 : buf = htonl((uint32) val);
675 23 : CopySendData(cstate, &buf, sizeof(buf));
676 23 : }
677 :
678 : /*
679 : * CopyGetInt32 reads an int32 that appears in network byte order
680 : *
681 : * Returns true if OK, false if EOF
682 : */
683 : static bool
684 23 : CopyGetInt32(CopyState cstate, int32 *val)
685 : {
686 : uint32 buf;
687 :
688 23 : if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
689 : {
690 0 : *val = 0; /* suppress compiler warning */
691 0 : return false;
692 : }
693 23 : *val = (int32) ntohl(buf);
694 23 : return true;
695 : }
696 :
697 : /*
698 : * CopySendInt16 sends an int16 in network byte order
699 : */
700 : static void
701 4 : CopySendInt16(CopyState cstate, int16 val)
702 : {
703 : uint16 buf;
704 :
705 4 : buf = htons((uint16) val);
706 4 : CopySendData(cstate, &buf, sizeof(buf));
707 4 : }
708 :
709 : /*
710 : * CopyGetInt16 reads an int16 that appears in network byte order
711 : */
712 : static bool
713 4 : CopyGetInt16(CopyState cstate, int16 *val)
714 : {
715 : uint16 buf;
716 :
717 4 : if (CopyGetData(cstate, &buf, sizeof(buf), sizeof(buf)) != sizeof(buf))
718 : {
719 0 : *val = 0; /* suppress compiler warning */
720 0 : return false;
721 : }
722 4 : *val = (int16) ntohs(buf);
723 4 : return true;
724 : }
725 :
726 :
727 : /*
728 : * CopyLoadRawBuf loads some more data into raw_buf
729 : *
730 : * Returns TRUE if able to obtain at least one more byte, else FALSE.
731 : *
732 : * If raw_buf_index < raw_buf_len, the unprocessed bytes are transferred
733 : * down to the start of the buffer and then we load more data after that.
734 : * This case is used only when a frontend multibyte character crosses a
735 : * bufferload boundary.
736 : */
737 : static bool
738 434 : CopyLoadRawBuf(CopyState cstate)
739 : {
740 : int nbytes;
741 : int inbytes;
742 :
743 434 : if (cstate->raw_buf_index < cstate->raw_buf_len)
744 : {
745 : /* Copy down the unprocessed data */
746 0 : nbytes = cstate->raw_buf_len - cstate->raw_buf_index;
747 0 : memmove(cstate->raw_buf, cstate->raw_buf + cstate->raw_buf_index,
748 : nbytes);
749 : }
750 : else
751 434 : nbytes = 0; /* no data need be saved */
752 :
753 434 : inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
754 : 1, RAW_BUF_SIZE - nbytes);
755 434 : nbytes += inbytes;
756 434 : cstate->raw_buf[nbytes] = '\0';
757 434 : cstate->raw_buf_index = 0;
758 434 : cstate->raw_buf_len = nbytes;
759 434 : return (inbytes > 0);
760 : }
761 :
762 :
763 : /*
764 : * DoCopy executes the SQL COPY statement
765 : *
766 : * Either unload or reload contents of table <relation>, depending on <from>.
767 : * (<from> = TRUE means we are inserting into the table.) In the "TO" case
768 : * we also support copying the output of an arbitrary SELECT, INSERT, UPDATE
769 : * or DELETE query.
770 : *
771 : * If <pipe> is false, transfer is between the table and the file named
772 : * <filename>. Otherwise, transfer is between the table and our regular
773 : * input/output stream. The latter could be either stdin/stdout or a
774 : * socket, depending on whether we're running under Postmaster control.
775 : *
776 : * Do not allow a Postgres user without superuser privilege to read from
777 : * or write to a file.
778 : *
779 : * Do not allow the copy if user doesn't have proper permission to access
780 : * the table or the specifically requested columns.
781 : */
782 : void
783 226 : DoCopy(ParseState *pstate, const CopyStmt *stmt,
784 : int stmt_location, int stmt_len,
785 : uint64 *processed)
786 : {
787 : CopyState cstate;
788 226 : bool is_from = stmt->is_from;
789 226 : bool pipe = (stmt->filename == NULL);
790 : Relation rel;
791 : Oid relid;
792 226 : RawStmt *query = NULL;
793 :
794 : /* Disallow COPY to/from file or program except to superusers. */
795 226 : if (!pipe && !superuser())
796 : {
797 0 : if (stmt->is_program)
798 0 : ereport(ERROR,
799 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
800 : errmsg("must be superuser to COPY to or from an external program"),
801 : errhint("Anyone can COPY to stdout or from stdin. "
802 : "psql's \\copy command also works for anyone.")));
803 : else
804 0 : ereport(ERROR,
805 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
806 : errmsg("must be superuser to COPY to or from a file"),
807 : errhint("Anyone can COPY to stdout or from stdin. "
808 : "psql's \\copy command also works for anyone.")));
809 : }
810 :
811 226 : if (stmt->relation)
812 : {
813 : TupleDesc tupDesc;
814 : List *attnums;
815 : ListCell *cur;
816 : RangeTblEntry *rte;
817 :
818 182 : Assert(!stmt->query);
819 :
820 : /* Open and lock the relation, using the appropriate lock type. */
821 182 : rel = heap_openrv(stmt->relation,
822 : (is_from ? RowExclusiveLock : AccessShareLock));
823 :
824 182 : relid = RelationGetRelid(rel);
825 :
826 182 : rte = addRangeTableEntryForRelation(pstate, rel, NULL, false, false);
827 182 : rte->requiredPerms = (is_from ? ACL_INSERT : ACL_SELECT);
828 :
829 182 : tupDesc = RelationGetDescr(rel);
830 182 : attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
831 697 : foreach(cur, attnums)
832 : {
833 521 : int attno = lfirst_int(cur) -
834 : FirstLowInvalidHeapAttributeNumber;
835 :
836 521 : if (is_from)
837 372 : rte->insertedCols = bms_add_member(rte->insertedCols, attno);
838 : else
839 149 : rte->selectedCols = bms_add_member(rte->selectedCols, attno);
840 : }
841 176 : ExecCheckRTPerms(pstate->p_rtable, true);
842 :
843 : /*
844 : * Permission check for row security policies.
845 : *
846 : * check_enable_rls will ereport(ERROR) if the user has requested
847 : * something invalid and will otherwise indicate if we should enable
848 : * RLS (returns RLS_ENABLED) or not for this COPY statement.
849 : *
850 : * If the relation has a row security policy and we are to apply it
851 : * then perform a "query" copy and allow the normal query processing
852 : * to handle the policies.
853 : *
854 : * If RLS is not enabled for this, then just fall through to the
855 : * normal non-filtering relation handling.
856 : */
857 164 : if (check_enable_rls(rte->relid, InvalidOid, false) == RLS_ENABLED)
858 : {
859 : SelectStmt *select;
860 : ColumnRef *cr;
861 : ResTarget *target;
862 : RangeVar *from;
863 9 : List *targetList = NIL;
864 :
865 9 : if (is_from)
866 1 : ereport(ERROR,
867 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
868 : errmsg("COPY FROM not supported with row-level security"),
869 : errhint("Use INSERT statements instead.")));
870 :
871 : /*
872 : * Build target list
873 : *
874 : * If no columns are specified in the attribute list of the COPY
875 : * command, then the target list is 'all' columns. Therefore, '*'
876 : * should be used as the target list for the resulting SELECT
877 : * statement.
878 : *
879 : * In the case that columns are specified in the attribute list,
880 : * create a ColumnRef and ResTarget for each column and add them
881 : * to the target list for the resulting SELECT statement.
882 : */
883 8 : if (!stmt->attlist)
884 : {
885 2 : cr = makeNode(ColumnRef);
886 2 : cr->fields = list_make1(makeNode(A_Star));
887 2 : cr->location = -1;
888 :
889 2 : target = makeNode(ResTarget);
890 2 : target->name = NULL;
891 2 : target->indirection = NIL;
892 2 : target->val = (Node *) cr;
893 2 : target->location = -1;
894 :
895 2 : targetList = list_make1(target);
896 : }
897 : else
898 : {
899 : ListCell *lc;
900 :
901 17 : foreach(lc, stmt->attlist)
902 : {
903 : /*
904 : * Build the ColumnRef for each column. The ColumnRef
905 : * 'fields' property is a String 'Value' node (see
906 : * nodes/value.h) that corresponds to the column name
907 : * respectively.
908 : */
909 11 : cr = makeNode(ColumnRef);
910 11 : cr->fields = list_make1(lfirst(lc));
911 11 : cr->location = -1;
912 :
913 : /* Build the ResTarget and add the ColumnRef to it. */
914 11 : target = makeNode(ResTarget);
915 11 : target->name = NULL;
916 11 : target->indirection = NIL;
917 11 : target->val = (Node *) cr;
918 11 : target->location = -1;
919 :
920 : /* Add each column to the SELECT statement's target list */
921 11 : targetList = lappend(targetList, target);
922 : }
923 : }
924 :
925 : /*
926 : * Build RangeVar for from clause, fully qualified based on the
927 : * relation which we have opened and locked.
928 : */
929 8 : from = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
930 8 : pstrdup(RelationGetRelationName(rel)),
931 : -1);
932 :
933 : /* Build query */
934 8 : select = makeNode(SelectStmt);
935 8 : select->targetList = targetList;
936 8 : select->fromClause = list_make1(from);
937 :
938 8 : query = makeNode(RawStmt);
939 8 : query->stmt = (Node *) select;
940 8 : query->stmt_location = stmt_location;
941 8 : query->stmt_len = stmt_len;
942 :
943 : /*
944 : * Close the relation for now, but keep the lock on it to prevent
945 : * changes between now and when we start the query-based COPY.
946 : *
947 : * We'll reopen it later as part of the query-based COPY.
948 : */
949 8 : heap_close(rel, NoLock);
950 8 : rel = NULL;
951 : }
952 : }
953 : else
954 : {
955 44 : Assert(stmt->query);
956 :
957 44 : query = makeNode(RawStmt);
958 44 : query->stmt = stmt->query;
959 44 : query->stmt_location = stmt_location;
960 44 : query->stmt_len = stmt_len;
961 :
962 44 : relid = InvalidOid;
963 44 : rel = NULL;
964 : }
965 :
966 205 : if (is_from)
967 : {
968 113 : Assert(rel);
969 :
970 : /* check read-only transaction and parallel mode */
971 113 : if (XactReadOnly && !rel->rd_islocaltemp)
972 0 : PreventCommandIfReadOnly("COPY FROM");
973 113 : PreventCommandIfParallelMode("COPY FROM");
974 :
975 113 : cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
976 : NULL, stmt->attlist, stmt->options);
977 110 : *processed = CopyFrom(cstate); /* copy from file to database */
978 93 : EndCopyFrom(cstate);
979 : }
980 : else
981 : {
982 276 : cstate = BeginCopyTo(pstate, rel, query, relid,
983 184 : stmt->filename, stmt->is_program,
984 : stmt->attlist, stmt->options);
985 69 : *processed = DoCopyTo(cstate); /* copy from database to file */
986 69 : EndCopyTo(cstate);
987 : }
988 :
989 : /*
990 : * Close the relation. If reading, we can release the AccessShareLock we
991 : * got; if writing, we should hold the lock until end of transaction to
992 : * ensure that updates will be committed before lock is released.
993 : */
994 162 : if (rel != NULL)
995 130 : heap_close(rel, (is_from ? NoLock : AccessShareLock));
996 162 : }
997 :
998 : /*
999 : * Process the statement option list for COPY.
1000 : *
1001 : * Scan the options list (a list of DefElem) and transpose the information
1002 : * into cstate, applying appropriate error checking.
1003 : *
1004 : * cstate is assumed to be filled with zeroes initially.
1005 : *
1006 : * This is exported so that external users of the COPY API can sanity-check
1007 : * a list of options. In that usage, cstate should be passed as NULL
1008 : * (since external users don't know sizeof(CopyStateData)) and the collected
1009 : * data is just leaked until CurrentMemoryContext is reset.
1010 : *
1011 : * Note that additional checking, such as whether column names listed in FORCE
1012 : * QUOTE actually exist, has to be applied later. This just checks for
1013 : * self-consistency of the options list.
1014 : */
1015 : void
1016 203 : ProcessCopyOptions(ParseState *pstate,
1017 : CopyState cstate,
1018 : bool is_from,
1019 : List *options)
1020 : {
1021 203 : bool format_specified = false;
1022 : ListCell *option;
1023 :
1024 : /* Support external use for option sanity checking */
1025 203 : if (cstate == NULL)
1026 0 : cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
1027 :
1028 203 : cstate->file_encoding = -1;
1029 :
1030 : /* Extract options from the statement node tree */
1031 312 : foreach(option, options)
1032 : {
1033 109 : DefElem *defel = lfirst_node(DefElem, option);
1034 :
1035 109 : if (strcmp(defel->defname, "format") == 0)
1036 : {
1037 39 : char *fmt = defGetString(defel);
1038 :
1039 39 : if (format_specified)
1040 0 : ereport(ERROR,
1041 : (errcode(ERRCODE_SYNTAX_ERROR),
1042 : errmsg("conflicting or redundant options"),
1043 : parser_errposition(pstate, defel->location)));
1044 39 : format_specified = true;
1045 39 : if (strcmp(fmt, "text") == 0)
1046 : /* default format */ ;
1047 39 : else if (strcmp(fmt, "csv") == 0)
1048 37 : cstate->csv_mode = true;
1049 2 : else if (strcmp(fmt, "binary") == 0)
1050 2 : cstate->binary = true;
1051 : else
1052 0 : ereport(ERROR,
1053 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1054 : errmsg("COPY format \"%s\" not recognized", fmt),
1055 : parser_errposition(pstate, defel->location)));
1056 : }
1057 70 : else if (strcmp(defel->defname, "oids") == 0)
1058 : {
1059 6 : if (cstate->oids)
1060 0 : ereport(ERROR,
1061 : (errcode(ERRCODE_SYNTAX_ERROR),
1062 : errmsg("conflicting or redundant options"),
1063 : parser_errposition(pstate, defel->location)));
1064 6 : cstate->oids = defGetBoolean(defel);
1065 : }
1066 64 : else if (strcmp(defel->defname, "freeze") == 0)
1067 : {
1068 7 : if (cstate->freeze)
1069 0 : ereport(ERROR,
1070 : (errcode(ERRCODE_SYNTAX_ERROR),
1071 : errmsg("conflicting or redundant options"),
1072 : parser_errposition(pstate, defel->location)));
1073 7 : cstate->freeze = defGetBoolean(defel);
1074 : }
1075 57 : else if (strcmp(defel->defname, "delimiter") == 0)
1076 : {
1077 19 : if (cstate->delim)
1078 0 : ereport(ERROR,
1079 : (errcode(ERRCODE_SYNTAX_ERROR),
1080 : errmsg("conflicting or redundant options"),
1081 : parser_errposition(pstate, defel->location)));
1082 19 : cstate->delim = defGetString(defel);
1083 : }
1084 38 : else if (strcmp(defel->defname, "null") == 0)
1085 : {
1086 8 : if (cstate->null_print)
1087 0 : ereport(ERROR,
1088 : (errcode(ERRCODE_SYNTAX_ERROR),
1089 : errmsg("conflicting or redundant options"),
1090 : parser_errposition(pstate, defel->location)));
1091 8 : cstate->null_print = defGetString(defel);
1092 : }
1093 30 : else if (strcmp(defel->defname, "header") == 0)
1094 : {
1095 3 : if (cstate->header_line)
1096 0 : ereport(ERROR,
1097 : (errcode(ERRCODE_SYNTAX_ERROR),
1098 : errmsg("conflicting or redundant options"),
1099 : parser_errposition(pstate, defel->location)));
1100 3 : cstate->header_line = defGetBoolean(defel);
1101 : }
1102 27 : else if (strcmp(defel->defname, "quote") == 0)
1103 : {
1104 5 : if (cstate->quote)
1105 0 : ereport(ERROR,
1106 : (errcode(ERRCODE_SYNTAX_ERROR),
1107 : errmsg("conflicting or redundant options"),
1108 : parser_errposition(pstate, defel->location)));
1109 5 : cstate->quote = defGetString(defel);
1110 : }
1111 22 : else if (strcmp(defel->defname, "escape") == 0)
1112 : {
1113 5 : if (cstate->escape)
1114 0 : ereport(ERROR,
1115 : (errcode(ERRCODE_SYNTAX_ERROR),
1116 : errmsg("conflicting or redundant options"),
1117 : parser_errposition(pstate, defel->location)));
1118 5 : cstate->escape = defGetString(defel);
1119 : }
1120 17 : else if (strcmp(defel->defname, "force_quote") == 0)
1121 : {
1122 7 : if (cstate->force_quote || cstate->force_quote_all)
1123 0 : ereport(ERROR,
1124 : (errcode(ERRCODE_SYNTAX_ERROR),
1125 : errmsg("conflicting or redundant options"),
1126 : parser_errposition(pstate, defel->location)));
1127 7 : if (defel->arg && IsA(defel->arg, A_Star))
1128 3 : cstate->force_quote_all = true;
1129 4 : else if (defel->arg && IsA(defel->arg, List))
1130 4 : cstate->force_quote = castNode(List, defel->arg);
1131 : else
1132 0 : ereport(ERROR,
1133 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1134 : errmsg("argument to option \"%s\" must be a list of column names",
1135 : defel->defname),
1136 : parser_errposition(pstate, defel->location)));
1137 : }
1138 10 : else if (strcmp(defel->defname, "force_not_null") == 0)
1139 : {
1140 4 : if (cstate->force_notnull)
1141 0 : ereport(ERROR,
1142 : (errcode(ERRCODE_SYNTAX_ERROR),
1143 : errmsg("conflicting or redundant options"),
1144 : parser_errposition(pstate, defel->location)));
1145 4 : if (defel->arg && IsA(defel->arg, List))
1146 4 : cstate->force_notnull = castNode(List, defel->arg);
1147 : else
1148 0 : ereport(ERROR,
1149 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1150 : errmsg("argument to option \"%s\" must be a list of column names",
1151 : defel->defname),
1152 : parser_errposition(pstate, defel->location)));
1153 : }
1154 6 : else if (strcmp(defel->defname, "force_null") == 0)
1155 : {
1156 4 : if (cstate->force_null)
1157 0 : ereport(ERROR,
1158 : (errcode(ERRCODE_SYNTAX_ERROR),
1159 : errmsg("conflicting or redundant options")));
1160 4 : if (defel->arg && IsA(defel->arg, List))
1161 4 : cstate->force_null = castNode(List, defel->arg);
1162 : else
1163 0 : ereport(ERROR,
1164 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1165 : errmsg("argument to option \"%s\" must be a list of column names",
1166 : defel->defname),
1167 : parser_errposition(pstate, defel->location)));
1168 : }
1169 2 : else if (strcmp(defel->defname, "convert_selectively") == 0)
1170 : {
1171 : /*
1172 : * Undocumented, not-accessible-from-SQL option: convert only the
1173 : * named columns to binary form, storing the rest as NULLs. It's
1174 : * allowed for the column list to be NIL.
1175 : */
1176 0 : if (cstate->convert_selectively)
1177 0 : ereport(ERROR,
1178 : (errcode(ERRCODE_SYNTAX_ERROR),
1179 : errmsg("conflicting or redundant options"),
1180 : parser_errposition(pstate, defel->location)));
1181 0 : cstate->convert_selectively = true;
1182 0 : if (defel->arg == NULL || IsA(defel->arg, List))
1183 0 : cstate->convert_select = castNode(List, defel->arg);
1184 : else
1185 0 : ereport(ERROR,
1186 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1187 : errmsg("argument to option \"%s\" must be a list of column names",
1188 : defel->defname),
1189 : parser_errposition(pstate, defel->location)));
1190 : }
1191 2 : else if (strcmp(defel->defname, "encoding") == 0)
1192 : {
1193 2 : if (cstate->file_encoding >= 0)
1194 0 : ereport(ERROR,
1195 : (errcode(ERRCODE_SYNTAX_ERROR),
1196 : errmsg("conflicting or redundant options"),
1197 : parser_errposition(pstate, defel->location)));
1198 2 : cstate->file_encoding = pg_char_to_encoding(defGetString(defel));
1199 2 : if (cstate->file_encoding < 0)
1200 0 : ereport(ERROR,
1201 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1202 : errmsg("argument to option \"%s\" must be a valid encoding name",
1203 : defel->defname),
1204 : parser_errposition(pstate, defel->location)));
1205 : }
1206 : else
1207 0 : ereport(ERROR,
1208 : (errcode(ERRCODE_SYNTAX_ERROR),
1209 : errmsg("option \"%s\" not recognized",
1210 : defel->defname),
1211 : parser_errposition(pstate, defel->location)));
1212 : }
1213 :
1214 : /*
1215 : * Check for incompatible options (must do these two before inserting
1216 : * defaults)
1217 : */
1218 203 : if (cstate->binary && cstate->delim)
1219 0 : ereport(ERROR,
1220 : (errcode(ERRCODE_SYNTAX_ERROR),
1221 : errmsg("cannot specify DELIMITER in BINARY mode")));
1222 :
1223 203 : if (cstate->binary && cstate->null_print)
1224 0 : ereport(ERROR,
1225 : (errcode(ERRCODE_SYNTAX_ERROR),
1226 : errmsg("cannot specify NULL in BINARY mode")));
1227 :
1228 : /* Set defaults for omitted options */
1229 203 : if (!cstate->delim)
1230 184 : cstate->delim = cstate->csv_mode ? "," : "\t";
1231 :
1232 203 : if (!cstate->null_print)
1233 195 : cstate->null_print = cstate->csv_mode ? "" : "\\N";
1234 203 : cstate->null_print_len = strlen(cstate->null_print);
1235 :
1236 203 : if (cstate->csv_mode)
1237 : {
1238 37 : if (!cstate->quote)
1239 32 : cstate->quote = "\"";
1240 37 : if (!cstate->escape)
1241 32 : cstate->escape = cstate->quote;
1242 : }
1243 :
1244 : /* Only single-byte delimiter strings are supported. */
1245 203 : if (strlen(cstate->delim) != 1)
1246 0 : ereport(ERROR,
1247 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1248 : errmsg("COPY delimiter must be a single one-byte character")));
1249 :
1250 : /* Disallow end-of-line characters */
1251 406 : if (strchr(cstate->delim, '\r') != NULL ||
1252 203 : strchr(cstate->delim, '\n') != NULL)
1253 0 : ereport(ERROR,
1254 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1255 : errmsg("COPY delimiter cannot be newline or carriage return")));
1256 :
1257 406 : if (strchr(cstate->null_print, '\r') != NULL ||
1258 203 : strchr(cstate->null_print, '\n') != NULL)
1259 0 : ereport(ERROR,
1260 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1261 : errmsg("COPY null representation cannot use newline or carriage return")));
1262 :
1263 : /*
1264 : * Disallow unsafe delimiter characters in non-CSV mode. We can't allow
1265 : * backslash because it would be ambiguous. We can't allow the other
1266 : * cases because data characters matching the delimiter must be
1267 : * backslashed, and certain backslash combinations are interpreted
1268 : * non-literally by COPY IN. Disallowing all lower case ASCII letters is
1269 : * more than strictly necessary, but seems best for consistency and
1270 : * future-proofing. Likewise we disallow all digits though only octal
1271 : * digits are actually dangerous.
1272 : */
1273 369 : if (!cstate->csv_mode &&
1274 166 : strchr("\\.abcdefghijklmnopqrstuvwxyz0123456789",
1275 166 : cstate->delim[0]) != NULL)
1276 0 : ereport(ERROR,
1277 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1278 : errmsg("COPY delimiter cannot be \"%s\"", cstate->delim)));
1279 :
1280 : /* Check header */
1281 203 : if (!cstate->csv_mode && cstate->header_line)
1282 0 : ereport(ERROR,
1283 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1284 : errmsg("COPY HEADER available only in CSV mode")));
1285 :
1286 : /* Check quote */
1287 203 : if (!cstate->csv_mode && cstate->quote != NULL)
1288 0 : ereport(ERROR,
1289 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1290 : errmsg("COPY quote available only in CSV mode")));
1291 :
1292 203 : if (cstate->csv_mode && strlen(cstate->quote) != 1)
1293 0 : ereport(ERROR,
1294 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1295 : errmsg("COPY quote must be a single one-byte character")));
1296 :
1297 203 : if (cstate->csv_mode && cstate->delim[0] == cstate->quote[0])
1298 0 : ereport(ERROR,
1299 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1300 : errmsg("COPY delimiter and quote must be different")));
1301 :
1302 : /* Check escape */
1303 203 : if (!cstate->csv_mode && cstate->escape != NULL)
1304 0 : ereport(ERROR,
1305 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1306 : errmsg("COPY escape available only in CSV mode")));
1307 :
1308 203 : if (cstate->csv_mode && strlen(cstate->escape) != 1)
1309 0 : ereport(ERROR,
1310 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1311 : errmsg("COPY escape must be a single one-byte character")));
1312 :
1313 : /* Check force_quote */
1314 203 : if (!cstate->csv_mode && (cstate->force_quote || cstate->force_quote_all))
1315 0 : ereport(ERROR,
1316 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1317 : errmsg("COPY force quote available only in CSV mode")));
1318 203 : if ((cstate->force_quote || cstate->force_quote_all) && is_from)
1319 0 : ereport(ERROR,
1320 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1321 : errmsg("COPY force quote only available using COPY TO")));
1322 :
1323 : /* Check force_notnull */
1324 203 : if (!cstate->csv_mode && cstate->force_notnull != NIL)
1325 0 : ereport(ERROR,
1326 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1327 : errmsg("COPY force not null available only in CSV mode")));
1328 203 : if (cstate->force_notnull != NIL && !is_from)
1329 0 : ereport(ERROR,
1330 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1331 : errmsg("COPY force not null only available using COPY FROM")));
1332 :
1333 : /* Check force_null */
1334 203 : if (!cstate->csv_mode && cstate->force_null != NIL)
1335 0 : ereport(ERROR,
1336 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1337 : errmsg("COPY force null available only in CSV mode")));
1338 :
1339 203 : if (cstate->force_null != NIL && !is_from)
1340 0 : ereport(ERROR,
1341 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1342 : errmsg("COPY force null only available using COPY FROM")));
1343 :
1344 : /* Don't allow the delimiter to appear in the null string. */
1345 203 : if (strchr(cstate->null_print, cstate->delim[0]) != NULL)
1346 0 : ereport(ERROR,
1347 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1348 : errmsg("COPY delimiter must not appear in the NULL specification")));
1349 :
1350 : /* Don't allow the CSV quote char to appear in the null string. */
1351 240 : if (cstate->csv_mode &&
1352 37 : strchr(cstate->null_print, cstate->quote[0]) != NULL)
1353 0 : ereport(ERROR,
1354 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1355 : errmsg("CSV quote character must not appear in the NULL specification")));
1356 203 : }
1357 :
1358 : /*
1359 : * Common setup routines used by BeginCopyFrom and BeginCopyTo.
1360 : *
1361 : * Iff <binary>, unload or reload in the binary format, as opposed to the
1362 : * more wasteful but more robust and portable text format.
1363 : *
1364 : * Iff <oids>, unload or reload the format that includes OID information.
1365 : * On input, we accept OIDs whether or not the table has an OID column,
1366 : * but silently drop them if it does not. On output, we report an error
1367 : * if the user asks for OIDs in a table that has none (not providing an
1368 : * OID column might seem friendlier, but could seriously confuse programs).
1369 : *
1370 : * If in the text format, delimit columns with delimiter <delim> and print
1371 : * NULL values as <null_print>.
1372 : */
1373 : static CopyState
1374 203 : BeginCopy(ParseState *pstate,
1375 : bool is_from,
1376 : Relation rel,
1377 : RawStmt *raw_query,
1378 : Oid queryRelId,
1379 : List *attnamelist,
1380 : List *options)
1381 : {
1382 : CopyState cstate;
1383 : TupleDesc tupDesc;
1384 : int num_phys_attrs;
1385 : MemoryContext oldcontext;
1386 :
1387 : /* Allocate workspace and zero all fields */
1388 203 : cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
1389 :
1390 : /*
1391 : * We allocate everything used by a cstate in a new memory context. This
1392 : * avoids memory leaks during repeated use of COPY in a query.
1393 : */
1394 203 : cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
1395 : "COPY",
1396 : ALLOCSET_DEFAULT_SIZES);
1397 :
1398 203 : oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1399 :
1400 : /* Extract options from the statement node tree */
1401 203 : ProcessCopyOptions(pstate, cstate, is_from, options);
1402 :
1403 : /* Process the source/target relation or query */
1404 203 : if (rel)
1405 : {
1406 151 : Assert(!raw_query);
1407 :
1408 151 : cstate->rel = rel;
1409 :
1410 151 : tupDesc = RelationGetDescr(cstate->rel);
1411 :
1412 : /* Don't allow COPY w/ OIDs to or from a table without them */
1413 151 : if (cstate->oids && !cstate->rel->rd_rel->relhasoids)
1414 2 : ereport(ERROR,
1415 : (errcode(ERRCODE_UNDEFINED_COLUMN),
1416 : errmsg("table \"%s\" does not have OIDs",
1417 : RelationGetRelationName(cstate->rel))));
1418 : }
1419 : else
1420 : {
1421 : List *rewritten;
1422 : Query *query;
1423 : PlannedStmt *plan;
1424 : DestReceiver *dest;
1425 :
1426 52 : Assert(!is_from);
1427 52 : cstate->rel = NULL;
1428 :
1429 : /* Don't allow COPY w/ OIDs from a query */
1430 52 : if (cstate->oids)
1431 0 : ereport(ERROR,
1432 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1433 : errmsg("COPY (query) WITH OIDS is not supported")));
1434 :
1435 : /*
1436 : * Run parse analysis and rewrite. Note this also acquires sufficient
1437 : * locks on the source table(s).
1438 : *
1439 : * Because the parser and planner tend to scribble on their input, we
1440 : * make a preliminary copy of the source querytree. This prevents
1441 : * problems in the case that the COPY is in a portal or plpgsql
1442 : * function and is executed repeatedly. (See also the same hack in
1443 : * DECLARE CURSOR and PREPARE.) XXX FIXME someday.
1444 : */
1445 52 : rewritten = pg_analyze_and_rewrite(copyObject(raw_query),
1446 : pstate->p_sourcetext, NULL, 0,
1447 : NULL);
1448 :
1449 : /* check that we got back something we can work with */
1450 50 : if (rewritten == NIL)
1451 : {
1452 3 : ereport(ERROR,
1453 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1454 : errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));
1455 : }
1456 47 : else if (list_length(rewritten) > 1)
1457 : {
1458 : ListCell *lc;
1459 :
1460 : /* examine queries to determine which error message to issue */
1461 17 : foreach(lc, rewritten)
1462 : {
1463 14 : Query *q = lfirst_node(Query, lc);
1464 :
1465 14 : if (q->querySource == QSRC_QUAL_INSTEAD_RULE)
1466 3 : ereport(ERROR,
1467 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1468 : errmsg("conditional DO INSTEAD rules are not supported for COPY")));
1469 11 : if (q->querySource == QSRC_NON_INSTEAD_RULE)
1470 3 : ereport(ERROR,
1471 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1472 : errmsg("DO ALSO rules are not supported for the COPY")));
1473 : }
1474 :
1475 3 : ereport(ERROR,
1476 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1477 : errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
1478 : }
1479 :
1480 38 : query = linitial_node(Query, rewritten);
1481 :
1482 : /* The grammar allows SELECT INTO, but we don't support that */
1483 40 : if (query->utilityStmt != NULL &&
1484 2 : IsA(query->utilityStmt, CreateTableAsStmt))
1485 2 : ereport(ERROR,
1486 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1487 : errmsg("COPY (SELECT INTO) is not supported")));
1488 :
1489 36 : Assert(query->utilityStmt == NULL);
1490 :
1491 : /*
1492 : * Similarly the grammar doesn't enforce the presence of a RETURNING
1493 : * clause, but this is required here.
1494 : */
1495 48 : if (query->commandType != CMD_SELECT &&
1496 12 : query->returningList == NIL)
1497 : {
1498 3 : Assert(query->commandType == CMD_INSERT ||
1499 : query->commandType == CMD_UPDATE ||
1500 : query->commandType == CMD_DELETE);
1501 :
1502 3 : ereport(ERROR,
1503 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1504 : errmsg("COPY query must have a RETURNING clause")));
1505 : }
1506 :
1507 : /* plan the query */
1508 33 : plan = pg_plan_query(query, CURSOR_OPT_PARALLEL_OK, NULL);
1509 :
1510 : /*
1511 : * With row level security and a user using "COPY relation TO", we
1512 : * have to convert the "COPY relation TO" to a query-based COPY (eg:
1513 : * "COPY (SELECT * FROM relation) TO"), to allow the rewriter to add
1514 : * in any RLS clauses.
1515 : *
1516 : * When this happens, we are passed in the relid of the originally
1517 : * found relation (which we have locked). As the planner will look up
1518 : * the relation again, we double-check here to make sure it found the
1519 : * same one that we have locked.
1520 : */
1521 33 : if (queryRelId != InvalidOid)
1522 : {
1523 : /*
1524 : * Note that with RLS involved there may be multiple relations,
1525 : * and while the one we need is almost certainly first, we don't
1526 : * make any guarantees of that in the planner, so check the whole
1527 : * list and make sure we find the original relation.
1528 : */
1529 8 : if (!list_member_oid(plan->relationOids, queryRelId))
1530 0 : ereport(ERROR,
1531 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1532 : errmsg("relation referenced by COPY statement has changed")));
1533 : }
1534 :
1535 : /*
1536 : * Use a snapshot with an updated command ID to ensure this query sees
1537 : * results of any previously executed queries.
1538 : */
1539 33 : PushCopiedSnapshot(GetActiveSnapshot());
1540 33 : UpdateActiveSnapshotCommandId();
1541 :
1542 : /* Create dest receiver for COPY OUT */
1543 33 : dest = CreateDestReceiver(DestCopyOut);
1544 33 : ((DR_copy *) dest)->cstate = cstate;
1545 :
1546 : /* Create a QueryDesc requesting no output */
1547 33 : cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext,
1548 : GetActiveSnapshot(),
1549 : InvalidSnapshot,
1550 : dest, NULL, NULL, 0);
1551 :
1552 : /*
1553 : * Call ExecutorStart to prepare the plan for execution.
1554 : *
1555 : * ExecutorStart computes a result tupdesc for us
1556 : */
1557 33 : ExecutorStart(cstate->queryDesc, 0);
1558 :
1559 32 : tupDesc = cstate->queryDesc->tupDesc;
1560 : }
1561 :
1562 : /* Generate or convert list of attributes to process */
1563 181 : cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1564 :
1565 181 : num_phys_attrs = tupDesc->natts;
1566 :
1567 : /* Convert FORCE_QUOTE name list to per-column flags, check validity */
1568 181 : cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1569 181 : if (cstate->force_quote_all)
1570 : {
1571 : int i;
1572 :
1573 9 : for (i = 0; i < num_phys_attrs; i++)
1574 6 : cstate->force_quote_flags[i] = true;
1575 : }
1576 178 : else if (cstate->force_quote)
1577 : {
1578 : List *attnums;
1579 : ListCell *cur;
1580 :
1581 4 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_quote);
1582 :
1583 8 : foreach(cur, attnums)
1584 : {
1585 4 : int attnum = lfirst_int(cur);
1586 4 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1587 :
1588 4 : if (!list_member_int(cstate->attnumlist, attnum))
1589 0 : ereport(ERROR,
1590 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1591 : errmsg("FORCE_QUOTE column \"%s\" not referenced by COPY",
1592 : NameStr(attr->attname))));
1593 4 : cstate->force_quote_flags[attnum - 1] = true;
1594 : }
1595 : }
1596 :
1597 : /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
1598 181 : cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1599 181 : if (cstate->force_notnull)
1600 : {
1601 : List *attnums;
1602 : ListCell *cur;
1603 :
1604 4 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_notnull);
1605 :
1606 8 : foreach(cur, attnums)
1607 : {
1608 5 : int attnum = lfirst_int(cur);
1609 5 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1610 :
1611 5 : if (!list_member_int(cstate->attnumlist, attnum))
1612 1 : ereport(ERROR,
1613 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1614 : errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY",
1615 : NameStr(attr->attname))));
1616 4 : cstate->force_notnull_flags[attnum - 1] = true;
1617 : }
1618 : }
1619 :
1620 : /* Convert FORCE_NULL name list to per-column flags, check validity */
1621 180 : cstate->force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1622 180 : if (cstate->force_null)
1623 : {
1624 : List *attnums;
1625 : ListCell *cur;
1626 :
1627 4 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->force_null);
1628 :
1629 8 : foreach(cur, attnums)
1630 : {
1631 5 : int attnum = lfirst_int(cur);
1632 5 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1633 :
1634 5 : if (!list_member_int(cstate->attnumlist, attnum))
1635 1 : ereport(ERROR,
1636 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1637 : errmsg("FORCE_NULL column \"%s\" not referenced by COPY",
1638 : NameStr(attr->attname))));
1639 4 : cstate->force_null_flags[attnum - 1] = true;
1640 : }
1641 : }
1642 :
1643 : /* Convert convert_selectively name list to per-column flags */
1644 179 : if (cstate->convert_selectively)
1645 : {
1646 : List *attnums;
1647 : ListCell *cur;
1648 :
1649 0 : cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1650 :
1651 0 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->convert_select);
1652 :
1653 0 : foreach(cur, attnums)
1654 : {
1655 0 : int attnum = lfirst_int(cur);
1656 0 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1657 :
1658 0 : if (!list_member_int(cstate->attnumlist, attnum))
1659 0 : ereport(ERROR,
1660 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1661 : errmsg_internal("selected column \"%s\" not referenced by COPY",
1662 : NameStr(attr->attname))));
1663 0 : cstate->convert_select_flags[attnum - 1] = true;
1664 : }
1665 : }
1666 :
1667 : /* Use client encoding when ENCODING option is not specified. */
1668 179 : if (cstate->file_encoding < 0)
1669 177 : cstate->file_encoding = pg_get_client_encoding();
1670 :
1671 : /*
1672 : * Set up encoding conversion info. Even if the file and server encodings
1673 : * are the same, we must apply pg_any_to_server() to validate data in
1674 : * multibyte encodings.
1675 : */
1676 179 : cstate->need_transcoding =
1677 356 : (cstate->file_encoding != GetDatabaseEncoding() ||
1678 177 : pg_database_encoding_max_length() > 1);
1679 : /* See Multibyte encoding comment above */
1680 179 : cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding);
1681 :
1682 179 : cstate->copy_dest = COPY_FILE; /* default */
1683 :
1684 179 : MemoryContextSwitchTo(oldcontext);
1685 :
1686 179 : return cstate;
1687 : }
1688 :
1689 : /*
1690 : * Closes the pipe to an external program, checking the pclose() return code.
1691 : */
1692 : static void
1693 0 : ClosePipeToProgram(CopyState cstate)
1694 : {
1695 : int pclose_rc;
1696 :
1697 0 : Assert(cstate->is_program);
1698 :
1699 0 : pclose_rc = ClosePipeStream(cstate->copy_file);
1700 0 : if (pclose_rc == -1)
1701 0 : ereport(ERROR,
1702 : (errcode_for_file_access(),
1703 : errmsg("could not close pipe to external command: %m")));
1704 0 : else if (pclose_rc != 0)
1705 0 : ereport(ERROR,
1706 : (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
1707 : errmsg("program \"%s\" failed",
1708 : cstate->filename),
1709 : errdetail_internal("%s", wait_result_to_str(pclose_rc))));
1710 0 : }
1711 :
1712 : /*
1713 : * Release resources allocated in a cstate for COPY TO/FROM.
1714 : */
1715 : static void
1716 162 : EndCopy(CopyState cstate)
1717 : {
1718 162 : if (cstate->is_program)
1719 : {
1720 0 : ClosePipeToProgram(cstate);
1721 : }
1722 : else
1723 : {
1724 162 : if (cstate->filename != NULL && FreeFile(cstate->copy_file))
1725 0 : ereport(ERROR,
1726 : (errcode_for_file_access(),
1727 : errmsg("could not close file \"%s\": %m",
1728 : cstate->filename)));
1729 : }
1730 :
1731 162 : MemoryContextDelete(cstate->copycontext);
1732 162 : pfree(cstate);
1733 162 : }
1734 :
1735 : /*
1736 : * Setup CopyState to read tuples from a table or a query for COPY TO.
1737 : */
1738 : static CopyState
1739 92 : BeginCopyTo(ParseState *pstate,
1740 : Relation rel,
1741 : RawStmt *query,
1742 : Oid queryRelId,
1743 : const char *filename,
1744 : bool is_program,
1745 : List *attnamelist,
1746 : List *options)
1747 : {
1748 : CopyState cstate;
1749 92 : bool pipe = (filename == NULL);
1750 : MemoryContext oldcontext;
1751 :
1752 92 : if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
1753 : {
1754 2 : if (rel->rd_rel->relkind == RELKIND_VIEW)
1755 2 : ereport(ERROR,
1756 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1757 : errmsg("cannot copy from view \"%s\"",
1758 : RelationGetRelationName(rel)),
1759 : errhint("Try the COPY (SELECT ...) TO variant.")));
1760 0 : else if (rel->rd_rel->relkind == RELKIND_MATVIEW)
1761 0 : ereport(ERROR,
1762 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1763 : errmsg("cannot copy from materialized view \"%s\"",
1764 : RelationGetRelationName(rel)),
1765 : errhint("Try the COPY (SELECT ...) TO variant.")));
1766 0 : else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
1767 0 : ereport(ERROR,
1768 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1769 : errmsg("cannot copy from foreign table \"%s\"",
1770 : RelationGetRelationName(rel)),
1771 : errhint("Try the COPY (SELECT ...) TO variant.")));
1772 0 : else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
1773 0 : ereport(ERROR,
1774 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1775 : errmsg("cannot copy from sequence \"%s\"",
1776 : RelationGetRelationName(rel))));
1777 0 : else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1778 0 : ereport(ERROR,
1779 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1780 : errmsg("cannot copy from partitioned table \"%s\"",
1781 : RelationGetRelationName(rel)),
1782 : errhint("Try the COPY (SELECT ...) TO variant.")));
1783 : else
1784 0 : ereport(ERROR,
1785 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1786 : errmsg("cannot copy from non-table relation \"%s\"",
1787 : RelationGetRelationName(rel))));
1788 : }
1789 :
1790 90 : cstate = BeginCopy(pstate, false, rel, query, queryRelId, attnamelist,
1791 : options);
1792 69 : oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1793 :
1794 69 : if (pipe)
1795 : {
1796 64 : Assert(!is_program); /* the grammar does not allow this */
1797 64 : if (whereToSendOutput != DestRemote)
1798 0 : cstate->copy_file = stdout;
1799 : }
1800 : else
1801 : {
1802 5 : cstate->filename = pstrdup(filename);
1803 5 : cstate->is_program = is_program;
1804 :
1805 5 : if (is_program)
1806 : {
1807 0 : cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
1808 0 : if (cstate->copy_file == NULL)
1809 0 : ereport(ERROR,
1810 : (errcode_for_file_access(),
1811 : errmsg("could not execute command \"%s\": %m",
1812 : cstate->filename)));
1813 : }
1814 : else
1815 : {
1816 : mode_t oumask; /* Pre-existing umask value */
1817 : struct stat st;
1818 :
1819 : /*
1820 : * Prevent write to relative path ... too easy to shoot oneself in
1821 : * the foot by overwriting a database file ...
1822 : */
1823 5 : if (!is_absolute_path(filename))
1824 0 : ereport(ERROR,
1825 : (errcode(ERRCODE_INVALID_NAME),
1826 : errmsg("relative path not allowed for COPY to file")));
1827 :
1828 5 : oumask = umask(S_IWGRP | S_IWOTH);
1829 5 : cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
1830 5 : umask(oumask);
1831 5 : if (cstate->copy_file == NULL)
1832 : {
1833 : /* copy errno because ereport subfunctions might change it */
1834 0 : int save_errno = errno;
1835 :
1836 0 : ereport(ERROR,
1837 : (errcode_for_file_access(),
1838 : errmsg("could not open file \"%s\" for writing: %m",
1839 : cstate->filename),
1840 : (save_errno == ENOENT || save_errno == EACCES) ?
1841 : errhint("COPY TO instructs the PostgreSQL server process to write a file. "
1842 : "You may want a client-side facility such as psql's \\copy.") : 0));
1843 : }
1844 :
1845 5 : if (fstat(fileno(cstate->copy_file), &st))
1846 0 : ereport(ERROR,
1847 : (errcode_for_file_access(),
1848 : errmsg("could not stat file \"%s\": %m",
1849 : cstate->filename)));
1850 :
1851 5 : if (S_ISDIR(st.st_mode))
1852 0 : ereport(ERROR,
1853 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1854 : errmsg("\"%s\" is a directory", cstate->filename)));
1855 : }
1856 : }
1857 :
1858 69 : MemoryContextSwitchTo(oldcontext);
1859 :
1860 69 : return cstate;
1861 : }
1862 :
1863 : /*
1864 : * This intermediate routine exists mainly to localize the effects of setjmp
1865 : * so we don't need to plaster a lot of variables with "volatile".
1866 : */
1867 : static uint64
1868 69 : DoCopyTo(CopyState cstate)
1869 : {
1870 69 : bool pipe = (cstate->filename == NULL);
1871 69 : bool fe_copy = (pipe && whereToSendOutput == DestRemote);
1872 : uint64 processed;
1873 :
1874 69 : PG_TRY();
1875 : {
1876 69 : if (fe_copy)
1877 64 : SendCopyBegin(cstate);
1878 :
1879 69 : processed = CopyTo(cstate);
1880 :
1881 69 : if (fe_copy)
1882 64 : SendCopyEnd(cstate);
1883 : }
1884 0 : PG_CATCH();
1885 : {
1886 : /*
1887 : * Make sure we turn off old-style COPY OUT mode upon error. It is
1888 : * okay to do this in all cases, since it does nothing if the mode is
1889 : * not on.
1890 : */
1891 0 : pq_endcopyout(true);
1892 0 : PG_RE_THROW();
1893 : }
1894 69 : PG_END_TRY();
1895 :
1896 69 : return processed;
1897 : }
1898 :
1899 : /*
1900 : * Clean up storage and release resources for COPY TO.
1901 : */
1902 : static void
1903 69 : EndCopyTo(CopyState cstate)
1904 : {
1905 69 : if (cstate->queryDesc != NULL)
1906 : {
1907 : /* Close down the query and free resources. */
1908 32 : ExecutorFinish(cstate->queryDesc);
1909 32 : ExecutorEnd(cstate->queryDesc);
1910 32 : FreeQueryDesc(cstate->queryDesc);
1911 32 : PopActiveSnapshot();
1912 : }
1913 :
1914 : /* Clean up storage */
1915 69 : EndCopy(cstate);
1916 69 : }
1917 :
1918 : /*
1919 : * Copy from relation or query TO file.
1920 : */
1921 : static uint64
1922 69 : CopyTo(CopyState cstate)
1923 : {
1924 : TupleDesc tupDesc;
1925 : int num_phys_attrs;
1926 : ListCell *cur;
1927 : uint64 processed;
1928 :
1929 69 : if (cstate->rel)
1930 37 : tupDesc = RelationGetDescr(cstate->rel);
1931 : else
1932 32 : tupDesc = cstate->queryDesc->tupDesc;
1933 69 : num_phys_attrs = tupDesc->natts;
1934 69 : cstate->null_print_client = cstate->null_print; /* default */
1935 :
1936 : /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
1937 69 : cstate->fe_msgbuf = makeStringInfo();
1938 :
1939 : /* Get info about the columns we need to process. */
1940 69 : cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1941 234 : foreach(cur, cstate->attnumlist)
1942 : {
1943 165 : int attnum = lfirst_int(cur);
1944 : Oid out_func_oid;
1945 : bool isvarlena;
1946 165 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1947 :
1948 165 : if (cstate->binary)
1949 7 : getTypeBinaryOutputInfo(attr->atttypid,
1950 : &out_func_oid,
1951 : &isvarlena);
1952 : else
1953 158 : getTypeOutputInfo(attr->atttypid,
1954 : &out_func_oid,
1955 : &isvarlena);
1956 165 : fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
1957 : }
1958 :
1959 : /*
1960 : * Create a temporary memory context that we can reset once per row to
1961 : * recover palloc'd memory. This avoids any problems with leaks inside
1962 : * datatype output routines, and should be faster than retail pfree's
1963 : * anyway. (We don't need a whole econtext as CopyFrom does.)
1964 : */
1965 69 : cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
1966 : "COPY TO",
1967 : ALLOCSET_DEFAULT_SIZES);
1968 :
1969 69 : if (cstate->binary)
1970 : {
1971 : /* Generate header for a binary copy */
1972 : int32 tmp;
1973 :
1974 : /* Signature */
1975 1 : CopySendData(cstate, BinarySignature, 11);
1976 : /* Flags field */
1977 1 : tmp = 0;
1978 1 : if (cstate->oids)
1979 0 : tmp |= (1 << 16);
1980 1 : CopySendInt32(cstate, tmp);
1981 : /* No header extension */
1982 1 : tmp = 0;
1983 1 : CopySendInt32(cstate, tmp);
1984 : }
1985 : else
1986 : {
1987 : /*
1988 : * For non-binary copy, we need to convert null_print to file
1989 : * encoding, because it will be sent directly with CopySendString.
1990 : */
1991 68 : if (cstate->need_transcoding)
1992 68 : cstate->null_print_client = pg_server_to_any(cstate->null_print,
1993 : cstate->null_print_len,
1994 : cstate->file_encoding);
1995 :
1996 : /* if a header has been requested send the line */
1997 68 : if (cstate->header_line)
1998 : {
1999 2 : bool hdr_delim = false;
2000 :
2001 6 : foreach(cur, cstate->attnumlist)
2002 : {
2003 4 : int attnum = lfirst_int(cur);
2004 : char *colname;
2005 :
2006 4 : if (hdr_delim)
2007 2 : CopySendChar(cstate, cstate->delim[0]);
2008 4 : hdr_delim = true;
2009 :
2010 4 : colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
2011 :
2012 4 : CopyAttributeOutCSV(cstate, colname, false,
2013 4 : list_length(cstate->attnumlist) == 1);
2014 : }
2015 :
2016 2 : CopySendEndOfRow(cstate);
2017 : }
2018 : }
2019 :
2020 69 : if (cstate->rel)
2021 : {
2022 : Datum *values;
2023 : bool *nulls;
2024 : HeapScanDesc scandesc;
2025 : HeapTuple tuple;
2026 :
2027 37 : values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
2028 37 : nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
2029 :
2030 37 : scandesc = heap_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
2031 :
2032 37 : processed = 0;
2033 2240 : while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
2034 : {
2035 2166 : CHECK_FOR_INTERRUPTS();
2036 :
2037 : /* Deconstruct the tuple ... faster than repeated heap_getattr */
2038 2166 : heap_deform_tuple(tuple, tupDesc, values, nulls);
2039 :
2040 : /* Format and send the data */
2041 2166 : CopyOneRowTo(cstate, HeapTupleGetOid(tuple), values, nulls);
2042 2166 : processed++;
2043 : }
2044 :
2045 37 : heap_endscan(scandesc);
2046 :
2047 37 : pfree(values);
2048 37 : pfree(nulls);
2049 : }
2050 : else
2051 : {
2052 : /* run the plan --- the dest receiver will send tuples */
2053 32 : ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true);
2054 32 : processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
2055 : }
2056 :
2057 69 : if (cstate->binary)
2058 : {
2059 : /* Generate trailer for a binary copy */
2060 1 : CopySendInt16(cstate, -1);
2061 : /* Need to flush out the trailer */
2062 1 : CopySendEndOfRow(cstate);
2063 : }
2064 :
2065 69 : MemoryContextDelete(cstate->rowcontext);
2066 :
2067 69 : return processed;
2068 : }
2069 :
2070 : /*
2071 : * Emit one row during CopyTo().
2072 : */
2073 : static void
2074 2263 : CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls)
2075 : {
2076 2263 : bool need_delim = false;
2077 2263 : FmgrInfo *out_functions = cstate->out_functions;
2078 : MemoryContext oldcontext;
2079 : ListCell *cur;
2080 : char *string;
2081 :
2082 2263 : MemoryContextReset(cstate->rowcontext);
2083 2263 : oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
2084 :
2085 2263 : if (cstate->binary)
2086 : {
2087 : /* Binary per-tuple header */
2088 3 : CopySendInt16(cstate, list_length(cstate->attnumlist));
2089 : /* Send OID if wanted --- note attnumlist doesn't include it */
2090 3 : if (cstate->oids)
2091 : {
2092 : /* Hack --- assume Oid is same size as int32 */
2093 0 : CopySendInt32(cstate, sizeof(int32));
2094 0 : CopySendInt32(cstate, tupleOid);
2095 : }
2096 : }
2097 : else
2098 : {
2099 : /* Text format has no per-tuple header, but send OID if wanted */
2100 : /* Assume digits don't need any quoting or encoding conversion */
2101 2260 : if (cstate->oids)
2102 : {
2103 0 : string = DatumGetCString(DirectFunctionCall1(oidout,
2104 : ObjectIdGetDatum(tupleOid)));
2105 0 : CopySendString(cstate, string);
2106 0 : need_delim = true;
2107 : }
2108 : }
2109 :
2110 34870 : foreach(cur, cstate->attnumlist)
2111 : {
2112 32607 : int attnum = lfirst_int(cur);
2113 32607 : Datum value = values[attnum - 1];
2114 32607 : bool isnull = nulls[attnum - 1];
2115 :
2116 32607 : if (!cstate->binary)
2117 : {
2118 32586 : if (need_delim)
2119 30326 : CopySendChar(cstate, cstate->delim[0]);
2120 32586 : need_delim = true;
2121 : }
2122 :
2123 32607 : if (isnull)
2124 : {
2125 34 : if (!cstate->binary)
2126 29 : CopySendString(cstate, cstate->null_print_client);
2127 : else
2128 5 : CopySendInt32(cstate, -1);
2129 : }
2130 : else
2131 : {
2132 32573 : if (!cstate->binary)
2133 : {
2134 32557 : string = OutputFunctionCall(&out_functions[attnum - 1],
2135 : value);
2136 32557 : if (cstate->csv_mode)
2137 190 : CopyAttributeOutCSV(cstate, string,
2138 95 : cstate->force_quote_flags[attnum - 1],
2139 95 : list_length(cstate->attnumlist) == 1);
2140 : else
2141 32462 : CopyAttributeOutText(cstate, string);
2142 : }
2143 : else
2144 : {
2145 : bytea *outputbytes;
2146 :
2147 16 : outputbytes = SendFunctionCall(&out_functions[attnum - 1],
2148 : value);
2149 16 : CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
2150 16 : CopySendData(cstate, VARDATA(outputbytes),
2151 16 : VARSIZE(outputbytes) - VARHDRSZ);
2152 : }
2153 : }
2154 : }
2155 :
2156 2263 : CopySendEndOfRow(cstate);
2157 :
2158 2263 : MemoryContextSwitchTo(oldcontext);
2159 2263 : }
2160 :
2161 :
2162 : /*
2163 : * error context callback for COPY FROM
2164 : *
2165 : * The argument for the error context must be CopyState.
2166 : */
2167 : void
2168 18 : CopyFromErrorCallback(void *arg)
2169 : {
2170 18 : CopyState cstate = (CopyState) arg;
2171 :
2172 18 : if (cstate->binary)
2173 : {
2174 : /* can't usefully display the data */
2175 0 : if (cstate->cur_attname)
2176 0 : errcontext("COPY %s, line %d, column %s",
2177 : cstate->cur_relname, cstate->cur_lineno,
2178 : cstate->cur_attname);
2179 : else
2180 0 : errcontext("COPY %s, line %d",
2181 : cstate->cur_relname, cstate->cur_lineno);
2182 : }
2183 : else
2184 : {
2185 18 : if (cstate->cur_attname && cstate->cur_attval)
2186 3 : {
2187 : /* error is relevant to a particular column */
2188 : char *attval;
2189 :
2190 3 : attval = limit_printout_length(cstate->cur_attval);
2191 3 : errcontext("COPY %s, line %d, column %s: \"%s\"",
2192 : cstate->cur_relname, cstate->cur_lineno,
2193 : cstate->cur_attname, attval);
2194 3 : pfree(attval);
2195 : }
2196 15 : else if (cstate->cur_attname)
2197 : {
2198 : /* error is relevant to a particular column, value is NULL */
2199 1 : errcontext("COPY %s, line %d, column %s: null input",
2200 : cstate->cur_relname, cstate->cur_lineno,
2201 : cstate->cur_attname);
2202 : }
2203 : else
2204 : {
2205 : /*
2206 : * Error is relevant to a particular line.
2207 : *
2208 : * If line_buf still contains the correct line, and it's already
2209 : * transcoded, print it. If it's still in a foreign encoding, it's
2210 : * quite likely that the error is precisely a failure to do
2211 : * encoding conversion (ie, bad data). We dare not try to convert
2212 : * it, and at present there's no way to regurgitate it without
2213 : * conversion. So we have to punt and just report the line number.
2214 : */
2215 28 : if (cstate->line_buf_valid &&
2216 14 : (cstate->line_buf_converted || !cstate->need_transcoding))
2217 14 : {
2218 : char *lineval;
2219 :
2220 14 : lineval = limit_printout_length(cstate->line_buf.data);
2221 14 : errcontext("COPY %s, line %d: \"%s\"",
2222 : cstate->cur_relname, cstate->cur_lineno, lineval);
2223 14 : pfree(lineval);
2224 : }
2225 : else
2226 : {
2227 0 : errcontext("COPY %s, line %d",
2228 : cstate->cur_relname, cstate->cur_lineno);
2229 : }
2230 : }
2231 : }
2232 18 : }
2233 :
2234 : /*
2235 : * Make sure we don't print an unreasonable amount of COPY data in a message.
2236 : *
2237 : * It would seem a lot easier to just use the sprintf "precision" limit to
2238 : * truncate the string. However, some versions of glibc have a bug/misfeature
2239 : * that vsnprintf will always fail (return -1) if it is asked to truncate
2240 : * a string that contains invalid byte sequences for the current encoding.
2241 : * So, do our own truncation. We return a pstrdup'd copy of the input.
2242 : */
2243 : static char *
2244 17 : limit_printout_length(const char *str)
2245 : {
2246 : #define MAX_COPY_DATA_DISPLAY 100
2247 :
2248 17 : int slen = strlen(str);
2249 : int len;
2250 : char *res;
2251 :
2252 : /* Fast path if definitely okay */
2253 17 : if (slen <= MAX_COPY_DATA_DISPLAY)
2254 17 : return pstrdup(str);
2255 :
2256 : /* Apply encoding-dependent truncation */
2257 0 : len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
2258 :
2259 : /*
2260 : * Truncate, and add "..." to show we truncated the input.
2261 : */
2262 0 : res = (char *) palloc(len + 4);
2263 0 : memcpy(res, str, len);
2264 0 : strcpy(res + len, "...");
2265 :
2266 0 : return res;
2267 : }
2268 :
2269 : /*
2270 : * Copy FROM file to relation.
2271 : */
2272 : uint64
2273 110 : CopyFrom(CopyState cstate)
2274 : {
2275 : HeapTuple tuple;
2276 : TupleDesc tupDesc;
2277 : Datum *values;
2278 : bool *nulls;
2279 : ResultRelInfo *resultRelInfo;
2280 110 : ResultRelInfo *saved_resultRelInfo = NULL;
2281 110 : EState *estate = CreateExecutorState(); /* for ExecConstraints() */
2282 : ExprContext *econtext;
2283 : TupleTableSlot *myslot;
2284 110 : MemoryContext oldcontext = CurrentMemoryContext;
2285 :
2286 : ErrorContextCallback errcallback;
2287 110 : CommandId mycid = GetCurrentCommandId(true);
2288 110 : int hi_options = 0; /* start with default heap_insert options */
2289 : BulkInsertState bistate;
2290 110 : uint64 processed = 0;
2291 : bool useHeapMultiInsert;
2292 110 : int nBufferedTuples = 0;
2293 110 : int prev_leaf_part_index = -1;
2294 :
2295 : #define MAX_BUFFERED_TUPLES 1000
2296 110 : HeapTuple *bufferedTuples = NULL; /* initialize to silence warning */
2297 110 : Size bufferedTuplesSize = 0;
2298 110 : int firstBufferedLineNo = 0;
2299 :
2300 110 : Assert(cstate->rel);
2301 :
2302 : /*
2303 : * The target must be a plain relation or have an INSTEAD OF INSERT row
2304 : * trigger. (Currently, such triggers are only allowed on views, so we
2305 : * only hint about them in the view case.)
2306 : */
2307 116 : if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
2308 8 : cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
2309 3 : !(cstate->rel->trigdesc &&
2310 1 : cstate->rel->trigdesc->trig_insert_instead_row))
2311 : {
2312 1 : if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
2313 1 : ereport(ERROR,
2314 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2315 : errmsg("cannot copy to view \"%s\"",
2316 : RelationGetRelationName(cstate->rel)),
2317 : errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
2318 0 : else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
2319 0 : ereport(ERROR,
2320 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2321 : errmsg("cannot copy to materialized view \"%s\"",
2322 : RelationGetRelationName(cstate->rel))));
2323 0 : else if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
2324 0 : ereport(ERROR,
2325 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2326 : errmsg("cannot copy to foreign table \"%s\"",
2327 : RelationGetRelationName(cstate->rel))));
2328 0 : else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
2329 0 : ereport(ERROR,
2330 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2331 : errmsg("cannot copy to sequence \"%s\"",
2332 : RelationGetRelationName(cstate->rel))));
2333 : else
2334 0 : ereport(ERROR,
2335 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
2336 : errmsg("cannot copy to non-table relation \"%s\"",
2337 : RelationGetRelationName(cstate->rel))));
2338 : }
2339 :
2340 109 : tupDesc = RelationGetDescr(cstate->rel);
2341 :
2342 : /*----------
2343 : * Check to see if we can avoid writing WAL
2344 : *
2345 : * If archive logging/streaming is not enabled *and* either
2346 : * - table was created in same transaction as this COPY
2347 : * - data is being written to relfilenode created in this transaction
2348 : * then we can skip writing WAL. It's safe because if the transaction
2349 : * doesn't commit, we'll discard the table (or the new relfilenode file).
2350 : * If it does commit, we'll have done the heap_sync at the bottom of this
2351 : * routine first.
2352 : *
2353 : * As mentioned in comments in utils/rel.h, the in-same-transaction test
2354 : * is not always set correctly, since in rare cases rd_newRelfilenodeSubid
2355 : * can be cleared before the end of the transaction. The exact case is
2356 : * when a relation sets a new relfilenode twice in same transaction, yet
2357 : * the second one fails in an aborted subtransaction, e.g.
2358 : *
2359 : * BEGIN;
2360 : * TRUNCATE t;
2361 : * SAVEPOINT save;
2362 : * TRUNCATE t;
2363 : * ROLLBACK TO save;
2364 : * COPY ...
2365 : *
2366 : * Also, if the target file is new-in-transaction, we assume that checking
2367 : * FSM for free space is a waste of time, even if we must use WAL because
2368 : * of archiving. This could possibly be wrong, but it's unlikely.
2369 : *
2370 : * The comments for heap_insert and RelationGetBufferForTuple specify that
2371 : * skipping WAL logging is only safe if we ensure that our tuples do not
2372 : * go into pages containing tuples from any other transactions --- but this
2373 : * must be the case if we have a new table or new relfilenode, so we need
2374 : * no additional work to enforce that.
2375 : *----------
2376 : */
2377 : /* createSubid is creation check, newRelfilenodeSubid is truncation check */
2378 217 : if (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
2379 108 : cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId)
2380 : {
2381 8 : hi_options |= HEAP_INSERT_SKIP_FSM;
2382 8 : if (!XLogIsNeeded())
2383 0 : hi_options |= HEAP_INSERT_SKIP_WAL;
2384 : }
2385 :
2386 : /*
2387 : * Optimize if new relfilenode was created in this subxact or one of its
2388 : * committed children and we won't see those rows later as part of an
2389 : * earlier scan or command. This ensures that if this subtransaction
2390 : * aborts then the frozen rows won't be visible after xact cleanup. Note
2391 : * that the stronger test of exactly which subtransaction created it is
2392 : * crucial for correctness of this optimization.
2393 : */
2394 109 : if (cstate->freeze)
2395 : {
2396 7 : if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
2397 0 : ereport(ERROR,
2398 : (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
2399 : errmsg("cannot perform FREEZE because of prior transaction activity")));
2400 :
2401 14 : if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
2402 7 : cstate->rel->rd_newRelfilenodeSubid != GetCurrentSubTransactionId())
2403 3 : ereport(ERROR,
2404 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2405 : errmsg("cannot perform FREEZE because the table was not created or truncated in the current subtransaction")));
2406 :
2407 4 : hi_options |= HEAP_INSERT_FROZEN;
2408 : }
2409 :
2410 : /*
2411 : * We need a ResultRelInfo so we can use the regular executor's
2412 : * index-entry-making machinery. (There used to be a huge amount of code
2413 : * here that basically duplicated execUtils.c ...)
2414 : */
2415 106 : resultRelInfo = makeNode(ResultRelInfo);
2416 106 : InitResultRelInfo(resultRelInfo,
2417 : cstate->rel,
2418 : 1, /* dummy rangetable index */
2419 : NULL,
2420 : 0);
2421 :
2422 106 : ExecOpenIndices(resultRelInfo, false);
2423 :
2424 106 : estate->es_result_relations = resultRelInfo;
2425 106 : estate->es_num_result_relations = 1;
2426 106 : estate->es_result_relation_info = resultRelInfo;
2427 106 : estate->es_range_table = cstate->range_table;
2428 :
2429 : /* Set up a tuple slot too */
2430 106 : myslot = ExecInitExtraTupleSlot(estate);
2431 106 : ExecSetSlotDescriptor(myslot, tupDesc);
2432 : /* Triggers might need a slot as well */
2433 106 : estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
2434 :
2435 : /*
2436 : * If there are any triggers with transition tables on the named relation,
2437 : * we need to be prepared to capture transition tuples.
2438 : */
2439 106 : cstate->transition_capture =
2440 106 : MakeTransitionCaptureState(cstate->rel->trigdesc);
2441 :
2442 : /*
2443 : * If the named relation is a partitioned table, initialize state for
2444 : * CopyFrom tuple routing.
2445 : */
2446 106 : if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
2447 : {
2448 : PartitionDispatch *partition_dispatch_info;
2449 : ResultRelInfo *partitions;
2450 : TupleConversionMap **partition_tupconv_maps;
2451 : TupleTableSlot *partition_tuple_slot;
2452 : int num_parted,
2453 : num_partitions;
2454 :
2455 4 : ExecSetupPartitionTupleRouting(cstate->rel,
2456 : 1,
2457 : estate,
2458 : &partition_dispatch_info,
2459 : &partitions,
2460 : &partition_tupconv_maps,
2461 : &partition_tuple_slot,
2462 : &num_parted, &num_partitions);
2463 4 : cstate->partition_dispatch_info = partition_dispatch_info;
2464 4 : cstate->num_dispatch = num_parted;
2465 4 : cstate->partitions = partitions;
2466 4 : cstate->num_partitions = num_partitions;
2467 4 : cstate->partition_tupconv_maps = partition_tupconv_maps;
2468 4 : cstate->partition_tuple_slot = partition_tuple_slot;
2469 :
2470 : /*
2471 : * If we are capturing transition tuples, they may need to be
2472 : * converted from partition format back to partitioned table format
2473 : * (this is only ever necessary if a BEFORE trigger modifies the
2474 : * tuple).
2475 : */
2476 4 : if (cstate->transition_capture != NULL)
2477 : {
2478 : int i;
2479 :
2480 3 : cstate->transition_tupconv_maps = (TupleConversionMap **)
2481 3 : palloc0(sizeof(TupleConversionMap *) * cstate->num_partitions);
2482 12 : for (i = 0; i < cstate->num_partitions; ++i)
2483 : {
2484 18 : cstate->transition_tupconv_maps[i] =
2485 9 : convert_tuples_by_name(RelationGetDescr(cstate->partitions[i].ri_RelationDesc),
2486 9 : RelationGetDescr(cstate->rel),
2487 : gettext_noop("could not convert row type"));
2488 : }
2489 : }
2490 : }
2491 :
2492 : /*
2493 : * It's more efficient to prepare a bunch of tuples for insertion, and
2494 : * insert them in one heap_multi_insert() call, than call heap_insert()
2495 : * separately for every tuple. However, we can't do that if there are
2496 : * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
2497 : * expressions. Such triggers or expressions might query the table we're
2498 : * inserting to, and act differently if the tuples that have already been
2499 : * processed and prepared for insertion are not there. We also can't do
2500 : * it if the table is partitioned.
2501 : */
2502 127 : if ((resultRelInfo->ri_TrigDesc != NULL &&
2503 30 : (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
2504 102 : resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
2505 182 : cstate->partition_dispatch_info != NULL ||
2506 89 : cstate->volatile_defexprs)
2507 : {
2508 17 : useHeapMultiInsert = false;
2509 : }
2510 : else
2511 : {
2512 89 : useHeapMultiInsert = true;
2513 89 : bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
2514 : }
2515 :
2516 : /* Prepare to catch AFTER triggers. */
2517 106 : AfterTriggerBeginQuery();
2518 :
2519 : /*
2520 : * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
2521 : * should do this for COPY, since it's not really an "INSERT" statement as
2522 : * such. However, executing these triggers maintains consistency with the
2523 : * EACH ROW triggers that we already fire on COPY.
2524 : */
2525 106 : ExecBSInsertTriggers(estate, resultRelInfo);
2526 :
2527 106 : values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
2528 106 : nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
2529 :
2530 106 : bistate = GetBulkInsertState();
2531 106 : econtext = GetPerTupleExprContext(estate);
2532 :
2533 : /* Set up callback to identify error line number */
2534 106 : errcallback.callback = CopyFromErrorCallback;
2535 106 : errcallback.arg = (void *) cstate;
2536 106 : errcallback.previous = error_context_stack;
2537 106 : error_context_stack = &errcallback;
2538 :
2539 : for (;;)
2540 : {
2541 : TupleTableSlot *slot;
2542 : bool skip_tuple;
2543 108225 : Oid loaded_oid = InvalidOid;
2544 :
2545 108225 : CHECK_FOR_INTERRUPTS();
2546 :
2547 108225 : if (nBufferedTuples == 0)
2548 : {
2549 : /*
2550 : * Reset the per-tuple exprcontext. We can only do this if the
2551 : * tuple buffer is empty. (Calling the context the per-tuple
2552 : * memory context is a bit of a misnomer now.)
2553 : */
2554 308 : ResetPerTupleExprContext(estate);
2555 : }
2556 :
2557 : /* Switch into its memory context */
2558 108225 : MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2559 :
2560 108225 : if (!NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid))
2561 93 : break;
2562 :
2563 : /* And now we can form the input tuple. */
2564 108124 : tuple = heap_form_tuple(tupDesc, values, nulls);
2565 :
2566 108124 : if (loaded_oid != InvalidOid)
2567 14 : HeapTupleSetOid(tuple, loaded_oid);
2568 :
2569 : /*
2570 : * Constraints might reference the tableoid column, so initialize
2571 : * t_tableOid before evaluating them.
2572 : */
2573 108124 : tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
2574 :
2575 : /* Triggers and stuff need to be invoked in query context. */
2576 108124 : MemoryContextSwitchTo(oldcontext);
2577 :
2578 : /* Place tuple in tuple slot --- but slot shouldn't free it */
2579 108124 : slot = myslot;
2580 108124 : ExecStoreTuple(tuple, slot, InvalidBuffer, false);
2581 :
2582 : /* Determine the partition to heap_insert the tuple into */
2583 108124 : if (cstate->partition_dispatch_info)
2584 : {
2585 : int leaf_part_index;
2586 : TupleConversionMap *map;
2587 :
2588 : /*
2589 : * Away we go ... If we end up not finding a partition after all,
2590 : * ExecFindPartition() does not return and errors out instead.
2591 : * Otherwise, the returned value is to be used as an index into
2592 : * arrays mt_partitions[] and mt_partition_tupconv_maps[] that
2593 : * will get us the ResultRelInfo and TupleConversionMap for the
2594 : * partition, respectively.
2595 : */
2596 11 : leaf_part_index = ExecFindPartition(resultRelInfo,
2597 : cstate->partition_dispatch_info,
2598 : slot,
2599 : estate);
2600 11 : Assert(leaf_part_index >= 0 &&
2601 : leaf_part_index < cstate->num_partitions);
2602 :
2603 : /*
2604 : * If this tuple is mapped to a partition that is not same as the
2605 : * previous one, we'd better make the bulk insert mechanism gets a
2606 : * new buffer.
2607 : */
2608 11 : if (prev_leaf_part_index != leaf_part_index)
2609 : {
2610 11 : ReleaseBulkInsertStatePin(bistate);
2611 11 : prev_leaf_part_index = leaf_part_index;
2612 : }
2613 :
2614 : /*
2615 : * Save the old ResultRelInfo and switch to the one corresponding
2616 : * to the selected partition.
2617 : */
2618 11 : saved_resultRelInfo = resultRelInfo;
2619 11 : resultRelInfo = cstate->partitions + leaf_part_index;
2620 :
2621 : /* We do not yet have a way to insert into a foreign partition */
2622 11 : if (resultRelInfo->ri_FdwRoutine)
2623 0 : ereport(ERROR,
2624 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2625 : errmsg("cannot route inserted tuples to a foreign table")));
2626 :
2627 : /*
2628 : * For ExecInsertIndexTuples() to work on the partition's indexes
2629 : */
2630 11 : estate->es_result_relation_info = resultRelInfo;
2631 :
2632 : /*
2633 : * If we're capturing transition tuples, we might need to convert
2634 : * from the partition rowtype to parent rowtype.
2635 : */
2636 11 : if (cstate->transition_capture != NULL)
2637 : {
2638 13 : if (resultRelInfo->ri_TrigDesc &&
2639 7 : (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
2640 3 : resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
2641 : {
2642 : /*
2643 : * If there are any BEFORE or INSTEAD triggers on the
2644 : * partition, we'll have to be ready to convert their
2645 : * result back to tuplestore format.
2646 : */
2647 1 : cstate->transition_capture->tcs_original_insert_tuple = NULL;
2648 2 : cstate->transition_capture->tcs_map =
2649 1 : cstate->transition_tupconv_maps[leaf_part_index];
2650 : }
2651 : else
2652 : {
2653 : /*
2654 : * Otherwise, just remember the original unconverted
2655 : * tuple, to avoid a needless round trip conversion.
2656 : */
2657 8 : cstate->transition_capture->tcs_original_insert_tuple = tuple;
2658 8 : cstate->transition_capture->tcs_map = NULL;
2659 : }
2660 : }
2661 :
2662 : /*
2663 : * We might need to convert from the parent rowtype to the
2664 : * partition rowtype.
2665 : */
2666 11 : map = cstate->partition_tupconv_maps[leaf_part_index];
2667 11 : if (map)
2668 : {
2669 6 : Relation partrel = resultRelInfo->ri_RelationDesc;
2670 :
2671 6 : tuple = do_convert_tuple(tuple, map);
2672 :
2673 : /*
2674 : * We must use the partition's tuple descriptor from this
2675 : * point on. Use a dedicated slot from this point on until
2676 : * we're finished dealing with the partition.
2677 : */
2678 6 : slot = cstate->partition_tuple_slot;
2679 6 : Assert(slot != NULL);
2680 6 : ExecSetSlotDescriptor(slot, RelationGetDescr(partrel));
2681 6 : ExecStoreTuple(tuple, slot, InvalidBuffer, true);
2682 : }
2683 :
2684 11 : tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
2685 : }
2686 :
2687 108124 : skip_tuple = false;
2688 :
2689 : /* BEFORE ROW INSERT Triggers */
2690 108164 : if (resultRelInfo->ri_TrigDesc &&
2691 40 : resultRelInfo->ri_TrigDesc->trig_insert_before_row)
2692 : {
2693 28 : slot = ExecBRInsertTriggers(estate, resultRelInfo, slot);
2694 :
2695 28 : if (slot == NULL) /* "do nothing" */
2696 0 : skip_tuple = true;
2697 : else /* trigger might have changed tuple */
2698 28 : tuple = ExecMaterializeSlot(slot);
2699 : }
2700 :
2701 108124 : if (!skip_tuple)
2702 : {
2703 108164 : if (resultRelInfo->ri_TrigDesc &&
2704 40 : resultRelInfo->ri_TrigDesc->trig_insert_instead_row)
2705 : {
2706 : /* Pass the data to the INSTEAD ROW INSERT trigger */
2707 1 : ExecIRInsertTriggers(estate, resultRelInfo, slot);
2708 : }
2709 : else
2710 : {
2711 : /*
2712 : * We always check the partition constraint, including when
2713 : * the tuple got here via tuple-routing. However we don't
2714 : * need to in the latter case if no BR trigger is defined on
2715 : * the partition. Note that a BR trigger might modify the
2716 : * tuple such that the partition constraint is no longer
2717 : * satisfied, so we need to check in that case.
2718 : */
2719 108123 : bool check_partition_constr =
2720 108123 : (resultRelInfo->ri_PartitionCheck != NIL);
2721 :
2722 108134 : if (saved_resultRelInfo != NULL &&
2723 16 : !(resultRelInfo->ri_TrigDesc &&
2724 5 : resultRelInfo->ri_TrigDesc->trig_insert_before_row))
2725 9 : check_partition_constr = false;
2726 :
2727 : /* Check the constraints of the tuple */
2728 108123 : if (cstate->rel->rd_att->constr || check_partition_constr)
2729 43 : ExecConstraints(resultRelInfo, slot, estate);
2730 :
2731 108118 : if (useHeapMultiInsert)
2732 : {
2733 : /* Add this tuple to the tuple buffer */
2734 108081 : if (nBufferedTuples == 0)
2735 240 : firstBufferedLineNo = cstate->cur_lineno;
2736 108081 : bufferedTuples[nBufferedTuples++] = tuple;
2737 108081 : bufferedTuplesSize += tuple->t_len;
2738 :
2739 : /*
2740 : * If the buffer filled up, flush it. Also flush if the
2741 : * total size of all the tuples in the buffer becomes
2742 : * large, to avoid using large amounts of memory for the
2743 : * buffer when the tuples are exceptionally wide.
2744 : */
2745 108081 : if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
2746 : bufferedTuplesSize > 65535)
2747 : {
2748 164 : CopyFromInsertBatch(cstate, estate, mycid, hi_options,
2749 : resultRelInfo, myslot, bistate,
2750 : nBufferedTuples, bufferedTuples,
2751 : firstBufferedLineNo);
2752 164 : nBufferedTuples = 0;
2753 164 : bufferedTuplesSize = 0;
2754 : }
2755 : }
2756 : else
2757 : {
2758 37 : List *recheckIndexes = NIL;
2759 :
2760 : /* OK, store the tuple and create index entries for it */
2761 37 : heap_insert(resultRelInfo->ri_RelationDesc, tuple, mycid,
2762 : hi_options, bistate);
2763 :
2764 37 : if (resultRelInfo->ri_NumIndices > 0)
2765 0 : recheckIndexes = ExecInsertIndexTuples(slot,
2766 : &(tuple->t_self),
2767 : estate,
2768 : false,
2769 : NULL,
2770 : NIL);
2771 :
2772 : /* AFTER ROW INSERT Triggers */
2773 37 : ExecARInsertTriggers(estate, resultRelInfo, tuple,
2774 : recheckIndexes, cstate->transition_capture);
2775 :
2776 37 : list_free(recheckIndexes);
2777 : }
2778 : }
2779 :
2780 : /*
2781 : * We count only tuples not suppressed by a BEFORE INSERT trigger;
2782 : * this is the same definition used by execMain.c for counting
2783 : * tuples inserted by an INSERT command.
2784 : */
2785 108119 : processed++;
2786 :
2787 108119 : if (saved_resultRelInfo)
2788 : {
2789 11 : resultRelInfo = saved_resultRelInfo;
2790 11 : estate->es_result_relation_info = resultRelInfo;
2791 : }
2792 : }
2793 108119 : }
2794 :
2795 : /* Flush any remaining buffered tuples */
2796 93 : if (nBufferedTuples > 0)
2797 74 : CopyFromInsertBatch(cstate, estate, mycid, hi_options,
2798 : resultRelInfo, myslot, bistate,
2799 : nBufferedTuples, bufferedTuples,
2800 : firstBufferedLineNo);
2801 :
2802 : /* Done, clean up */
2803 93 : error_context_stack = errcallback.previous;
2804 :
2805 93 : FreeBulkInsertState(bistate);
2806 :
2807 93 : MemoryContextSwitchTo(oldcontext);
2808 :
2809 : /*
2810 : * In the old protocol, tell pqcomm that we can process normal protocol
2811 : * messages again.
2812 : */
2813 93 : if (cstate->copy_dest == COPY_OLD_FE)
2814 0 : pq_endmsgread();
2815 :
2816 : /* Execute AFTER STATEMENT insertion triggers */
2817 93 : ExecASInsertTriggers(estate, resultRelInfo, cstate->transition_capture);
2818 :
2819 : /* Handle queued AFTER triggers */
2820 93 : AfterTriggerEndQuery(estate);
2821 :
2822 93 : pfree(values);
2823 93 : pfree(nulls);
2824 :
2825 93 : ExecResetTupleTable(estate->es_tupleTable, false);
2826 :
2827 93 : ExecCloseIndices(resultRelInfo);
2828 :
2829 : /* Close all the partitioned tables, leaf partitions, and their indices */
2830 93 : if (cstate->partition_dispatch_info)
2831 : {
2832 : int i;
2833 :
2834 : /*
2835 : * Remember cstate->partition_dispatch_info[0] corresponds to the root
2836 : * partitioned table, which we must not try to close, because it is
2837 : * the main target table of COPY that will be closed eventually by
2838 : * DoCopy(). Also, tupslot is NULL for the root partitioned table.
2839 : */
2840 4 : for (i = 1; i < cstate->num_dispatch; i++)
2841 : {
2842 0 : PartitionDispatch pd = cstate->partition_dispatch_info[i];
2843 :
2844 0 : heap_close(pd->reldesc, NoLock);
2845 0 : ExecDropSingleTupleTableSlot(pd->tupslot);
2846 : }
2847 15 : for (i = 0; i < cstate->num_partitions; i++)
2848 : {
2849 11 : ResultRelInfo *resultRelInfo = cstate->partitions + i;
2850 :
2851 11 : ExecCloseIndices(resultRelInfo);
2852 11 : heap_close(resultRelInfo->ri_RelationDesc, NoLock);
2853 : }
2854 :
2855 : /* Release the standalone partition tuple descriptor */
2856 4 : ExecDropSingleTupleTableSlot(cstate->partition_tuple_slot);
2857 : }
2858 :
2859 : /* Close any trigger target relations */
2860 93 : ExecCleanUpTriggerState(estate);
2861 :
2862 93 : FreeExecutorState(estate);
2863 :
2864 : /*
2865 : * If we skipped writing WAL, then we need to sync the heap (but not
2866 : * indexes since those use WAL anyway)
2867 : */
2868 93 : if (hi_options & HEAP_INSERT_SKIP_WAL)
2869 0 : heap_sync(cstate->rel);
2870 :
2871 93 : return processed;
2872 : }
2873 :
2874 : /*
2875 : * A subroutine of CopyFrom, to write the current batch of buffered heap
2876 : * tuples to the heap. Also updates indexes and runs AFTER ROW INSERT
2877 : * triggers.
2878 : */
2879 : static void
2880 238 : CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
2881 : int hi_options, ResultRelInfo *resultRelInfo,
2882 : TupleTableSlot *myslot, BulkInsertState bistate,
2883 : int nBufferedTuples, HeapTuple *bufferedTuples,
2884 : int firstBufferedLineNo)
2885 : {
2886 : MemoryContext oldcontext;
2887 : int i;
2888 : int save_cur_lineno;
2889 :
2890 : /*
2891 : * Print error context information correctly, if one of the operations
2892 : * below fail.
2893 : */
2894 238 : cstate->line_buf_valid = false;
2895 238 : save_cur_lineno = cstate->cur_lineno;
2896 :
2897 : /*
2898 : * heap_multi_insert leaks memory, so switch to short-lived memory context
2899 : * before calling it.
2900 : */
2901 238 : oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
2902 238 : heap_multi_insert(cstate->rel,
2903 : bufferedTuples,
2904 : nBufferedTuples,
2905 : mycid,
2906 : hi_options,
2907 : bistate);
2908 238 : MemoryContextSwitchTo(oldcontext);
2909 :
2910 : /*
2911 : * If there are any indexes, update them for all the inserted tuples, and
2912 : * run AFTER ROW INSERT triggers.
2913 : */
2914 238 : if (resultRelInfo->ri_NumIndices > 0)
2915 : {
2916 2022 : for (i = 0; i < nBufferedTuples; i++)
2917 : {
2918 : List *recheckIndexes;
2919 :
2920 2008 : cstate->cur_lineno = firstBufferedLineNo + i;
2921 2008 : ExecStoreTuple(bufferedTuples[i], myslot, InvalidBuffer, false);
2922 2008 : recheckIndexes =
2923 2008 : ExecInsertIndexTuples(myslot, &(bufferedTuples[i]->t_self),
2924 : estate, false, NULL, NIL);
2925 4016 : ExecARInsertTriggers(estate, resultRelInfo,
2926 2008 : bufferedTuples[i],
2927 : recheckIndexes, cstate->transition_capture);
2928 2008 : list_free(recheckIndexes);
2929 : }
2930 : }
2931 :
2932 : /*
2933 : * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
2934 : * anyway.
2935 : */
2936 227 : else if (resultRelInfo->ri_TrigDesc != NULL &&
2937 5 : (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
2938 2 : resultRelInfo->ri_TrigDesc->trig_insert_new_table))
2939 : {
2940 7 : for (i = 0; i < nBufferedTuples; i++)
2941 : {
2942 5 : cstate->cur_lineno = firstBufferedLineNo + i;
2943 10 : ExecARInsertTriggers(estate, resultRelInfo,
2944 5 : bufferedTuples[i],
2945 : NIL, cstate->transition_capture);
2946 : }
2947 : }
2948 :
2949 : /* reset cur_lineno to where we were */
2950 238 : cstate->cur_lineno = save_cur_lineno;
2951 238 : }
2952 :
2953 : /*
2954 : * Setup to read tuples from a file for COPY FROM.
2955 : *
2956 : * 'rel': Used as a template for the tuples
2957 : * 'filename': Name of server-local file to read
2958 : * 'attnamelist': List of char *, columns to include. NIL selects all cols.
2959 : * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
2960 : *
2961 : * Returns a CopyState, to be passed to NextCopyFrom and related functions.
2962 : */
2963 : CopyState
2964 113 : BeginCopyFrom(ParseState *pstate,
2965 : Relation rel,
2966 : const char *filename,
2967 : bool is_program,
2968 : copy_data_source_cb data_source_cb,
2969 : List *attnamelist,
2970 : List *options)
2971 : {
2972 : CopyState cstate;
2973 113 : bool pipe = (filename == NULL);
2974 : TupleDesc tupDesc;
2975 : AttrNumber num_phys_attrs,
2976 : num_defaults;
2977 : FmgrInfo *in_functions;
2978 : Oid *typioparams;
2979 : int attnum;
2980 : Oid in_func_oid;
2981 : int *defmap;
2982 : ExprState **defexprs;
2983 : MemoryContext oldcontext;
2984 : bool volatile_defexprs;
2985 :
2986 113 : cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, options);
2987 110 : oldcontext = MemoryContextSwitchTo(cstate->copycontext);
2988 :
2989 : /* Initialize state variables */
2990 110 : cstate->fe_eof = false;
2991 110 : cstate->eol_type = EOL_UNKNOWN;
2992 110 : cstate->cur_relname = RelationGetRelationName(cstate->rel);
2993 110 : cstate->cur_lineno = 0;
2994 110 : cstate->cur_attname = NULL;
2995 110 : cstate->cur_attval = NULL;
2996 :
2997 : /* Set up variables to avoid per-attribute overhead. */
2998 110 : initStringInfo(&cstate->attribute_buf);
2999 110 : initStringInfo(&cstate->line_buf);
3000 110 : cstate->line_buf_converted = false;
3001 110 : cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
3002 110 : cstate->raw_buf_index = cstate->raw_buf_len = 0;
3003 :
3004 : /* Assign range table, we'll need it in CopyFrom. */
3005 110 : if (pstate)
3006 110 : cstate->range_table = pstate->p_rtable;
3007 :
3008 110 : tupDesc = RelationGetDescr(cstate->rel);
3009 110 : num_phys_attrs = tupDesc->natts;
3010 110 : num_defaults = 0;
3011 110 : volatile_defexprs = false;
3012 :
3013 : /*
3014 : * Pick up the required catalog information for each attribute in the
3015 : * relation, including the input function, the element type (to pass to
3016 : * the input function), and info about defaults and constraints. (Which
3017 : * input function we use depends on text/binary format choice.)
3018 : */
3019 110 : in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
3020 110 : typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
3021 110 : defmap = (int *) palloc(num_phys_attrs * sizeof(int));
3022 110 : defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
3023 :
3024 494 : for (attnum = 1; attnum <= num_phys_attrs; attnum++)
3025 : {
3026 384 : Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
3027 :
3028 : /* We don't need info for dropped attributes */
3029 384 : if (att->attisdropped)
3030 4 : continue;
3031 :
3032 : /* Fetch the input function and typioparam info */
3033 380 : if (cstate->binary)
3034 7 : getTypeBinaryInputInfo(att->atttypid,
3035 7 : &in_func_oid, &typioparams[attnum - 1]);
3036 : else
3037 373 : getTypeInputInfo(att->atttypid,
3038 373 : &in_func_oid, &typioparams[attnum - 1]);
3039 380 : fmgr_info(in_func_oid, &in_functions[attnum - 1]);
3040 :
3041 : /* Get default info if needed */
3042 380 : if (!list_member_int(cstate->attnumlist, attnum))
3043 : {
3044 : /* attribute is NOT to be copied from input */
3045 : /* use default value if one exists */
3046 30 : Expr *defexpr = (Expr *) build_column_default(cstate->rel,
3047 : attnum);
3048 :
3049 30 : if (defexpr != NULL)
3050 : {
3051 : /* Run the expression through planner */
3052 12 : defexpr = expression_planner(defexpr);
3053 :
3054 : /* Initialize executable expression in copycontext */
3055 12 : defexprs[num_defaults] = ExecInitExpr(defexpr, NULL);
3056 12 : defmap[num_defaults] = attnum - 1;
3057 12 : num_defaults++;
3058 :
3059 : /*
3060 : * If a default expression looks at the table being loaded,
3061 : * then it could give the wrong answer when using
3062 : * multi-insert. Since database access can be dynamic this is
3063 : * hard to test for exactly, so we use the much wider test of
3064 : * whether the default expression is volatile. We allow for
3065 : * the special case of when the default expression is the
3066 : * nextval() of a sequence which in this specific case is
3067 : * known to be safe for use with the multi-insert
3068 : * optimization. Hence we use this special case function
3069 : * checker rather than the standard check for
3070 : * contain_volatile_functions().
3071 : */
3072 12 : if (!volatile_defexprs)
3073 12 : volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
3074 : }
3075 : }
3076 : }
3077 :
3078 : /* We keep those variables in cstate. */
3079 110 : cstate->in_functions = in_functions;
3080 110 : cstate->typioparams = typioparams;
3081 110 : cstate->defmap = defmap;
3082 110 : cstate->defexprs = defexprs;
3083 110 : cstate->volatile_defexprs = volatile_defexprs;
3084 110 : cstate->num_defaults = num_defaults;
3085 110 : cstate->is_program = is_program;
3086 :
3087 110 : if (data_source_cb)
3088 : {
3089 0 : cstate->copy_dest = COPY_CALLBACK;
3090 0 : cstate->data_source_cb = data_source_cb;
3091 : }
3092 110 : else if (pipe)
3093 : {
3094 77 : Assert(!is_program); /* the grammar does not allow this */
3095 77 : if (whereToSendOutput == DestRemote)
3096 77 : ReceiveCopyBegin(cstate);
3097 : else
3098 0 : cstate->copy_file = stdin;
3099 : }
3100 : else
3101 : {
3102 33 : cstate->filename = pstrdup(filename);
3103 :
3104 33 : if (cstate->is_program)
3105 : {
3106 0 : cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
3107 0 : if (cstate->copy_file == NULL)
3108 0 : ereport(ERROR,
3109 : (errcode_for_file_access(),
3110 : errmsg("could not execute command \"%s\": %m",
3111 : cstate->filename)));
3112 : }
3113 : else
3114 : {
3115 : struct stat st;
3116 :
3117 33 : cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
3118 33 : if (cstate->copy_file == NULL)
3119 : {
3120 : /* copy errno because ereport subfunctions might change it */
3121 0 : int save_errno = errno;
3122 :
3123 0 : ereport(ERROR,
3124 : (errcode_for_file_access(),
3125 : errmsg("could not open file \"%s\" for reading: %m",
3126 : cstate->filename),
3127 : (save_errno == ENOENT || save_errno == EACCES) ?
3128 : errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
3129 : "You may want a client-side facility such as psql's \\copy.") : 0));
3130 : }
3131 :
3132 33 : if (fstat(fileno(cstate->copy_file), &st))
3133 0 : ereport(ERROR,
3134 : (errcode_for_file_access(),
3135 : errmsg("could not stat file \"%s\": %m",
3136 : cstate->filename)));
3137 :
3138 33 : if (S_ISDIR(st.st_mode))
3139 0 : ereport(ERROR,
3140 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
3141 : errmsg("\"%s\" is a directory", cstate->filename)));
3142 : }
3143 : }
3144 :
3145 110 : if (!cstate->binary)
3146 : {
3147 : /* must rely on user to tell us... */
3148 109 : cstate->file_has_oids = cstate->oids;
3149 : }
3150 : else
3151 : {
3152 : /* Read and verify binary header */
3153 : char readSig[11];
3154 : int32 tmp;
3155 :
3156 : /* Signature */
3157 2 : if (CopyGetData(cstate, readSig, 11, 11) != 11 ||
3158 1 : memcmp(readSig, BinarySignature, 11) != 0)
3159 0 : ereport(ERROR,
3160 : (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3161 : errmsg("COPY file signature not recognized")));
3162 : /* Flags field */
3163 1 : if (!CopyGetInt32(cstate, &tmp))
3164 0 : ereport(ERROR,
3165 : (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3166 : errmsg("invalid COPY file header (missing flags)")));
3167 1 : cstate->file_has_oids = (tmp & (1 << 16)) != 0;
3168 1 : tmp &= ~(1 << 16);
3169 1 : if ((tmp >> 16) != 0)
3170 0 : ereport(ERROR,
3171 : (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3172 : errmsg("unrecognized critical flags in COPY file header")));
3173 : /* Header extension length */
3174 2 : if (!CopyGetInt32(cstate, &tmp) ||
3175 1 : tmp < 0)
3176 0 : ereport(ERROR,
3177 : (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3178 : errmsg("invalid COPY file header (missing length)")));
3179 : /* Skip extension header, if present */
3180 2 : while (tmp-- > 0)
3181 : {
3182 0 : if (CopyGetData(cstate, readSig, 1, 1) != 1)
3183 0 : ereport(ERROR,
3184 : (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3185 : errmsg("invalid COPY file header (wrong length)")));
3186 : }
3187 : }
3188 :
3189 110 : if (cstate->file_has_oids && cstate->binary)
3190 : {
3191 0 : getTypeBinaryInputInfo(OIDOID,
3192 : &in_func_oid, &cstate->oid_typioparam);
3193 0 : fmgr_info(in_func_oid, &cstate->oid_in_function);
3194 : }
3195 :
3196 : /* create workspace for CopyReadAttributes results */
3197 110 : if (!cstate->binary)
3198 : {
3199 109 : AttrNumber attr_count = list_length(cstate->attnumlist);
3200 109 : int nfields = cstate->file_has_oids ? (attr_count + 1) : attr_count;
3201 :
3202 109 : cstate->max_fields = nfields;
3203 109 : cstate->raw_fields = (char **) palloc(nfields * sizeof(char *));
3204 : }
3205 :
3206 110 : MemoryContextSwitchTo(oldcontext);
3207 :
3208 110 : return cstate;
3209 : }
3210 :
3211 : /*
3212 : * Read raw fields in the next line for COPY FROM in text or csv mode.
3213 : * Return false if no more lines.
3214 : *
3215 : * An internal temporary buffer is returned via 'fields'. It is valid until
3216 : * the next call of the function. Since the function returns all raw fields
3217 : * in the input file, 'nfields' could be different from the number of columns
3218 : * in the relation.
3219 : *
3220 : * NOTE: force_not_null option are not applied to the returned fields.
3221 : */
3222 : bool
3223 108221 : NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
3224 : {
3225 : int fldct;
3226 : bool done;
3227 :
3228 : /* only available for text or csv input */
3229 108221 : Assert(!cstate->binary);
3230 :
3231 : /* on input just throw the header line away */
3232 108221 : if (cstate->cur_lineno == 0 && cstate->header_line)
3233 : {
3234 1 : cstate->cur_lineno++;
3235 1 : if (CopyReadLine(cstate))
3236 0 : return false; /* done */
3237 : }
3238 :
3239 108221 : cstate->cur_lineno++;
3240 :
3241 : /* Actually read the line into memory here */
3242 108221 : done = CopyReadLine(cstate);
3243 :
3244 : /*
3245 : * EOF at start of line means we're done. If we see EOF after some
3246 : * characters, we act as though it was newline followed by EOF, ie,
3247 : * process the line and then exit loop on next iteration.
3248 : */
3249 108221 : if (done && cstate->line_buf.len == 0)
3250 92 : return false;
3251 :
3252 : /* Parse the line into de-escaped field values */
3253 108129 : if (cstate->csv_mode)
3254 32 : fldct = CopyReadAttributesCSV(cstate);
3255 : else
3256 108097 : fldct = CopyReadAttributesText(cstate);
3257 :
3258 108129 : *fields = cstate->raw_fields;
3259 108129 : *nfields = fldct;
3260 108129 : return true;
3261 : }
3262 :
3263 : /*
3264 : * Read next tuple from file for COPY FROM. Return false if no more tuples.
3265 : *
3266 : * 'econtext' is used to evaluate default expression for each columns not
3267 : * read from the file. It can be NULL when no default values are used, i.e.
3268 : * when all columns are read from the file.
3269 : *
3270 : * 'values' and 'nulls' arrays must be the same length as columns of the
3271 : * relation passed to BeginCopyFrom. This function fills the arrays.
3272 : * Oid of the tuple is returned with 'tupleOid' separately.
3273 : */
3274 : bool
3275 108225 : NextCopyFrom(CopyState cstate, ExprContext *econtext,
3276 : Datum *values, bool *nulls, Oid *tupleOid)
3277 : {
3278 : TupleDesc tupDesc;
3279 : AttrNumber num_phys_attrs,
3280 : attr_count,
3281 108225 : num_defaults = cstate->num_defaults;
3282 108225 : FmgrInfo *in_functions = cstate->in_functions;
3283 108225 : Oid *typioparams = cstate->typioparams;
3284 : int i;
3285 : int nfields;
3286 : bool isnull;
3287 108225 : bool file_has_oids = cstate->file_has_oids;
3288 108225 : int *defmap = cstate->defmap;
3289 108225 : ExprState **defexprs = cstate->defexprs;
3290 :
3291 108225 : tupDesc = RelationGetDescr(cstate->rel);
3292 108225 : num_phys_attrs = tupDesc->natts;
3293 108225 : attr_count = list_length(cstate->attnumlist);
3294 108225 : nfields = file_has_oids ? (attr_count + 1) : attr_count;
3295 :
3296 : /* Initialize all values for row to NULL */
3297 108225 : MemSet(values, 0, num_phys_attrs * sizeof(Datum));
3298 108225 : MemSet(nulls, true, num_phys_attrs * sizeof(bool));
3299 :
3300 108225 : if (!cstate->binary)
3301 : {
3302 : char **field_strings;
3303 : ListCell *cur;
3304 : int fldct;
3305 : int fieldno;
3306 : char *string;
3307 :
3308 : /* read raw fields in the next line */
3309 108221 : if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
3310 92 : return false;
3311 :
3312 : /* check for overflowing fields */
3313 108129 : if (nfields > 0 && fldct > nfields)
3314 2 : ereport(ERROR,
3315 : (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3316 : errmsg("extra data after last expected column")));
3317 :
3318 108127 : fieldno = 0;
3319 :
3320 : /* Read the OID field if present */
3321 108127 : if (file_has_oids)
3322 : {
3323 14 : if (fieldno >= fldct)
3324 0 : ereport(ERROR,
3325 : (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3326 : errmsg("missing data for OID column")));
3327 14 : string = field_strings[fieldno++];
3328 :
3329 14 : if (string == NULL)
3330 0 : ereport(ERROR,
3331 : (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3332 : errmsg("null OID in COPY data")));
3333 14 : else if (cstate->oids && tupleOid != NULL)
3334 : {
3335 14 : cstate->cur_attname = "oid";
3336 14 : cstate->cur_attval = string;
3337 14 : *tupleOid = DatumGetObjectId(DirectFunctionCall1(oidin,
3338 : CStringGetDatum(string)));
3339 14 : if (*tupleOid == InvalidOid)
3340 0 : ereport(ERROR,
3341 : (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3342 : errmsg("invalid OID in COPY data")));
3343 14 : cstate->cur_attname = NULL;
3344 14 : cstate->cur_attval = NULL;
3345 : }
3346 : }
3347 :
3348 : /* Loop to read the user attributes on the line. */
3349 524962 : foreach(cur, cstate->attnumlist)
3350 : {
3351 416841 : int attnum = lfirst_int(cur);
3352 416841 : int m = attnum - 1;
3353 416841 : Form_pg_attribute att = TupleDescAttr(tupDesc, m);
3354 :
3355 416841 : if (fieldno >= fldct)
3356 2 : ereport(ERROR,
3357 : (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3358 : errmsg("missing data for column \"%s\"",
3359 : NameStr(att->attname))));
3360 416839 : string = field_strings[fieldno++];
3361 :
3362 416839 : if (cstate->convert_select_flags &&
3363 0 : !cstate->convert_select_flags[m])
3364 : {
3365 : /* ignore input field, leaving column as NULL */
3366 0 : continue;
3367 : }
3368 :
3369 416839 : if (cstate->csv_mode)
3370 : {
3371 64 : if (string == NULL &&
3372 3 : cstate->force_notnull_flags[m])
3373 : {
3374 : /*
3375 : * FORCE_NOT_NULL option is set and column is NULL -
3376 : * convert it to the NULL string.
3377 : */
3378 2 : string = cstate->null_print;
3379 : }
3380 59 : else if (string != NULL && cstate->force_null_flags[m]
3381 2 : && strcmp(string, cstate->null_print) == 0)
3382 : {
3383 : /*
3384 : * FORCE_NULL option is set and column matches the NULL
3385 : * string. It must have been quoted, or otherwise the
3386 : * string would already have been set to NULL. Convert it
3387 : * to NULL as specified.
3388 : */
3389 2 : string = NULL;
3390 : }
3391 : }
3392 :
3393 416839 : cstate->cur_attname = NameStr(att->attname);
3394 416839 : cstate->cur_attval = string;
3395 833678 : values[m] = InputFunctionCall(&in_functions[m],
3396 : string,
3397 416839 : typioparams[m],
3398 : att->atttypmod);
3399 416835 : if (string != NULL)
3400 416514 : nulls[m] = false;
3401 416835 : cstate->cur_attname = NULL;
3402 416835 : cstate->cur_attval = NULL;
3403 : }
3404 :
3405 108121 : Assert(fieldno == nfields);
3406 : }
3407 : else
3408 : {
3409 : /* binary */
3410 : int16 fld_count;
3411 : ListCell *cur;
3412 :
3413 4 : cstate->cur_lineno++;
3414 :
3415 4 : if (!CopyGetInt16(cstate, &fld_count))
3416 : {
3417 : /* EOF detected (end of file, or protocol-level EOF) */
3418 1 : return false;
3419 : }
3420 :
3421 4 : if (fld_count == -1)
3422 : {
3423 : /*
3424 : * Received EOF marker. In a V3-protocol copy, wait for the
3425 : * protocol-level EOF, and complain if it doesn't come
3426 : * immediately. This ensures that we correctly handle CopyFail,
3427 : * if client chooses to send that now.
3428 : *
3429 : * Note that we MUST NOT try to read more data in an old-protocol
3430 : * copy, since there is no protocol-level EOF marker then. We
3431 : * could go either way for copy from file, but choose to throw
3432 : * error if there's data after the EOF marker, for consistency
3433 : * with the new-protocol case.
3434 : */
3435 : char dummy;
3436 :
3437 2 : if (cstate->copy_dest != COPY_OLD_FE &&
3438 1 : CopyGetData(cstate, &dummy, 1, 1) > 0)
3439 0 : ereport(ERROR,
3440 : (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3441 : errmsg("received copy data after EOF marker")));
3442 1 : return false;
3443 : }
3444 :
3445 3 : if (fld_count != attr_count)
3446 0 : ereport(ERROR,
3447 : (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3448 : errmsg("row field count is %d, expected %d",
3449 : (int) fld_count, attr_count)));
3450 :
3451 3 : if (file_has_oids)
3452 : {
3453 : Oid loaded_oid;
3454 :
3455 0 : cstate->cur_attname = "oid";
3456 0 : loaded_oid =
3457 0 : DatumGetObjectId(CopyReadBinaryAttribute(cstate,
3458 : 0,
3459 : &cstate->oid_in_function,
3460 : cstate->oid_typioparam,
3461 : -1,
3462 : &isnull));
3463 0 : if (isnull || loaded_oid == InvalidOid)
3464 0 : ereport(ERROR,
3465 : (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3466 : errmsg("invalid OID in COPY data")));
3467 0 : cstate->cur_attname = NULL;
3468 0 : if (cstate->oids && tupleOid != NULL)
3469 0 : *tupleOid = loaded_oid;
3470 : }
3471 :
3472 3 : i = 0;
3473 24 : foreach(cur, cstate->attnumlist)
3474 : {
3475 21 : int attnum = lfirst_int(cur);
3476 21 : int m = attnum - 1;
3477 21 : Form_pg_attribute att = TupleDescAttr(tupDesc, m);
3478 :
3479 21 : cstate->cur_attname = NameStr(att->attname);
3480 21 : i++;
3481 63 : values[m] = CopyReadBinaryAttribute(cstate,
3482 : i,
3483 21 : &in_functions[m],
3484 21 : typioparams[m],
3485 : att->atttypmod,
3486 : &nulls[m]);
3487 21 : cstate->cur_attname = NULL;
3488 : }
3489 : }
3490 :
3491 : /*
3492 : * Now compute and insert any defaults available for the columns not
3493 : * provided by the input data. Anything not processed here or above will
3494 : * remain NULL.
3495 : */
3496 108144 : for (i = 0; i < num_defaults; i++)
3497 : {
3498 : /*
3499 : * The caller must supply econtext and have switched into the
3500 : * per-tuple memory context in it.
3501 : */
3502 20 : Assert(econtext != NULL);
3503 20 : Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
3504 :
3505 40 : values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
3506 20 : &nulls[defmap[i]]);
3507 : }
3508 :
3509 108124 : return true;
3510 : }
3511 :
3512 : /*
3513 : * Clean up storage and release resources for COPY FROM.
3514 : */
3515 : void
3516 93 : EndCopyFrom(CopyState cstate)
3517 : {
3518 : /* No COPY FROM related resources except memory. */
3519 :
3520 93 : EndCopy(cstate);
3521 93 : }
3522 :
3523 : /*
3524 : * Read the next input line and stash it in line_buf, with conversion to
3525 : * server encoding.
3526 : *
3527 : * Result is true if read was terminated by EOF, false if terminated
3528 : * by newline. The terminating newline or EOF marker is not included
3529 : * in the final value of line_buf.
3530 : */
3531 : static bool
3532 108222 : CopyReadLine(CopyState cstate)
3533 : {
3534 : bool result;
3535 :
3536 108222 : resetStringInfo(&cstate->line_buf);
3537 108222 : cstate->line_buf_valid = true;
3538 :
3539 : /* Mark that encoding conversion hasn't occurred yet */
3540 108222 : cstate->line_buf_converted = false;
3541 :
3542 : /* Parse data and transfer into line_buf */
3543 108222 : result = CopyReadLineText(cstate);
3544 :
3545 108222 : if (result)
3546 : {
3547 : /*
3548 : * Reached EOF. In protocol version 3, we should ignore anything
3549 : * after \. up to the protocol end of copy data. (XXX maybe better
3550 : * not to treat \. as special?)
3551 : */
3552 92 : if (cstate->copy_dest == COPY_NEW_FE)
3553 : {
3554 : do
3555 : {
3556 61 : cstate->raw_buf_index = cstate->raw_buf_len;
3557 61 : } while (CopyLoadRawBuf(cstate));
3558 : }
3559 : }
3560 : else
3561 : {
3562 : /*
3563 : * If we didn't hit EOF, then we must have transferred the EOL marker
3564 : * to line_buf along with the data. Get rid of it.
3565 : */
3566 108130 : switch (cstate->eol_type)
3567 : {
3568 : case EOL_NL:
3569 108130 : Assert(cstate->line_buf.len >= 1);
3570 108130 : Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
3571 108130 : cstate->line_buf.len--;
3572 108130 : cstate->line_buf.data[cstate->line_buf.len] = '\0';
3573 108130 : break;
3574 : case EOL_CR:
3575 0 : Assert(cstate->line_buf.len >= 1);
3576 0 : Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\r');
3577 0 : cstate->line_buf.len--;
3578 0 : cstate->line_buf.data[cstate->line_buf.len] = '\0';
3579 0 : break;
3580 : case EOL_CRNL:
3581 0 : Assert(cstate->line_buf.len >= 2);
3582 0 : Assert(cstate->line_buf.data[cstate->line_buf.len - 2] == '\r');
3583 0 : Assert(cstate->line_buf.data[cstate->line_buf.len - 1] == '\n');
3584 0 : cstate->line_buf.len -= 2;
3585 0 : cstate->line_buf.data[cstate->line_buf.len] = '\0';
3586 0 : break;
3587 : case EOL_UNKNOWN:
3588 : /* shouldn't get here */
3589 0 : Assert(false);
3590 : break;
3591 : }
3592 : }
3593 :
3594 : /* Done reading the line. Convert it to server encoding. */
3595 108222 : if (cstate->need_transcoding)
3596 : {
3597 : char *cvt;
3598 :
3599 108222 : cvt = pg_any_to_server(cstate->line_buf.data,
3600 : cstate->line_buf.len,
3601 : cstate->file_encoding);
3602 108222 : if (cvt != cstate->line_buf.data)
3603 : {
3604 : /* transfer converted data back to line_buf */
3605 0 : resetStringInfo(&cstate->line_buf);
3606 0 : appendBinaryStringInfo(&cstate->line_buf, cvt, strlen(cvt));
3607 0 : pfree(cvt);
3608 : }
3609 : }
3610 :
3611 : /* Now it's safe to use the buffer in error messages */
3612 108222 : cstate->line_buf_converted = true;
3613 :
3614 108222 : return result;
3615 : }
3616 :
3617 : /*
3618 : * CopyReadLineText - inner loop of CopyReadLine for text mode
3619 : */
3620 : static bool
3621 108222 : CopyReadLineText(CopyState cstate)
3622 : {
3623 : char *copy_raw_buf;
3624 : int raw_buf_ptr;
3625 : int copy_buf_len;
3626 108222 : bool need_data = false;
3627 108222 : bool hit_eof = false;
3628 108222 : bool result = false;
3629 : char mblen_str[2];
3630 :
3631 : /* CSV variables */
3632 108222 : bool first_char_in_line = true;
3633 108222 : bool in_quote = false,
3634 108222 : last_was_esc = false;
3635 108222 : char quotec = '\0';
3636 108222 : char escapec = '\0';
3637 :
3638 108222 : if (cstate->csv_mode)
3639 : {
3640 47 : quotec = cstate->quote[0];
3641 47 : escapec = cstate->escape[0];
3642 : /* ignore special escape processing if it's the same as quotec */
3643 47 : if (quotec == escapec)
3644 42 : escapec = '\0';
3645 : }
3646 :
3647 108222 : mblen_str[1] = '\0';
3648 :
3649 : /*
3650 : * The objective of this loop is to transfer the entire next input line
3651 : * into line_buf. Hence, we only care for detecting newlines (\r and/or
3652 : * \n) and the end-of-copy marker (\.).
3653 : *
3654 : * In CSV mode, \r and \n inside a quoted field are just part of the data
3655 : * value and are put in line_buf. We keep just enough state to know if we
3656 : * are currently in a quoted field or not.
3657 : *
3658 : * These four characters, and the CSV escape and quote characters, are
3659 : * assumed the same in frontend and backend encodings.
3660 : *
3661 : * For speed, we try to move data from raw_buf to line_buf in chunks
3662 : * rather than one character at a time. raw_buf_ptr points to the next
3663 : * character to examine; any characters from raw_buf_index to raw_buf_ptr
3664 : * have been determined to be part of the line, but not yet transferred to
3665 : * line_buf.
3666 : *
3667 : * For a little extra speed within the loop, we copy raw_buf and
3668 : * raw_buf_len into local variables.
3669 : */
3670 108222 : copy_raw_buf = cstate->raw_buf;
3671 108222 : raw_buf_ptr = cstate->raw_buf_index;
3672 108222 : copy_buf_len = cstate->raw_buf_len;
3673 :
3674 : for (;;)
3675 : {
3676 : int prev_raw_ptr;
3677 : char c;
3678 :
3679 : /*
3680 : * Load more data if needed. Ideally we would just force four bytes
3681 : * of read-ahead and avoid the many calls to
3682 : * IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(), but the COPY_OLD_FE protocol
3683 : * does not allow us to read too far ahead or we might read into the
3684 : * next data, so we read-ahead only as far we know we can. One
3685 : * optimization would be to read-ahead four byte here if
3686 : * cstate->copy_dest != COPY_OLD_FE, but it hardly seems worth it,
3687 : * considering the size of the buffer.
3688 : */
3689 2957371 : if (raw_buf_ptr >= copy_buf_len || need_data)
3690 : {
3691 373 : REFILL_LINEBUF;
3692 :
3693 : /*
3694 : * Try to read some more data. This will certainly reset
3695 : * raw_buf_index to zero, and raw_buf_ptr must go with it.
3696 : */
3697 373 : if (!CopyLoadRawBuf(cstate))
3698 92 : hit_eof = true;
3699 373 : raw_buf_ptr = 0;
3700 373 : copy_buf_len = cstate->raw_buf_len;
3701 :
3702 : /*
3703 : * If we are completely out of data, break out of the loop,
3704 : * reporting EOF.
3705 : */
3706 373 : if (copy_buf_len <= 0)
3707 : {
3708 92 : result = true;
3709 92 : break;
3710 : }
3711 281 : need_data = false;
3712 : }
3713 :
3714 : /* OK to fetch a character */
3715 2957279 : prev_raw_ptr = raw_buf_ptr;
3716 2957279 : c = copy_raw_buf[raw_buf_ptr++];
3717 :
3718 2957279 : if (cstate->csv_mode)
3719 : {
3720 : /*
3721 : * If character is '\\' or '\r', we may need to look ahead below.
3722 : * Force fetch of the next character if we don't already have it.
3723 : * We need to do this before changing CSV state, in case one of
3724 : * these characters is also the quote or escape character.
3725 : *
3726 : * Note: old-protocol does not like forced prefetch, but it's OK
3727 : * here since we cannot validly be at EOF.
3728 : */
3729 335 : if (c == '\\' || c == '\r')
3730 : {
3731 24 : IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3732 : }
3733 :
3734 : /*
3735 : * Dealing with quotes and escapes here is mildly tricky. If the
3736 : * quote char is also the escape char, there's no problem - we
3737 : * just use the char as a toggle. If they are different, we need
3738 : * to ensure that we only take account of an escape inside a
3739 : * quoted field and immediately preceding a quote char, and not
3740 : * the second in an escape-escape sequence.
3741 : */
3742 335 : if (in_quote && c == escapec)
3743 8 : last_was_esc = !last_was_esc;
3744 335 : if (c == quotec && !last_was_esc)
3745 26 : in_quote = !in_quote;
3746 335 : if (c != escapec)
3747 326 : last_was_esc = false;
3748 :
3749 : /*
3750 : * Updating the line count for embedded CR and/or LF chars is
3751 : * necessarily a little fragile - this test is probably about the
3752 : * best we can do. (XXX it's arguable whether we should do this
3753 : * at all --- is cur_lineno a physical or logical count?)
3754 : */
3755 335 : if (in_quote && c == (cstate->eol_type == EOL_NL ? '\n' : '\r'))
3756 6 : cstate->cur_lineno++;
3757 : }
3758 :
3759 : /* Process \r */
3760 2957279 : if (c == '\r' && (!cstate->csv_mode || !in_quote))
3761 : {
3762 : /* Check for \r\n on first line, _and_ handle \r\n. */
3763 0 : if (cstate->eol_type == EOL_UNKNOWN ||
3764 0 : cstate->eol_type == EOL_CRNL)
3765 : {
3766 : /*
3767 : * If need more data, go back to loop top to load it.
3768 : *
3769 : * Note that if we are at EOF, c will wind up as '\0' because
3770 : * of the guaranteed pad of raw_buf.
3771 : */
3772 0 : IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3773 :
3774 : /* get next char */
3775 0 : c = copy_raw_buf[raw_buf_ptr];
3776 :
3777 0 : if (c == '\n')
3778 : {
3779 0 : raw_buf_ptr++; /* eat newline */
3780 0 : cstate->eol_type = EOL_CRNL; /* in case not set yet */
3781 : }
3782 : else
3783 : {
3784 : /* found \r, but no \n */
3785 0 : if (cstate->eol_type == EOL_CRNL)
3786 0 : ereport(ERROR,
3787 : (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3788 : !cstate->csv_mode ?
3789 : errmsg("literal carriage return found in data") :
3790 : errmsg("unquoted carriage return found in data"),
3791 : !cstate->csv_mode ?
3792 : errhint("Use \"\\r\" to represent carriage return.") :
3793 : errhint("Use quoted CSV field to represent carriage return.")));
3794 :
3795 : /*
3796 : * if we got here, it is the first line and we didn't find
3797 : * \n, so don't consume the peeked character
3798 : */
3799 0 : cstate->eol_type = EOL_CR;
3800 : }
3801 : }
3802 0 : else if (cstate->eol_type == EOL_NL)
3803 0 : ereport(ERROR,
3804 : (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3805 : !cstate->csv_mode ?
3806 : errmsg("literal carriage return found in data") :
3807 : errmsg("unquoted carriage return found in data"),
3808 : !cstate->csv_mode ?
3809 : errhint("Use \"\\r\" to represent carriage return.") :
3810 : errhint("Use quoted CSV field to represent carriage return.")));
3811 : /* If reach here, we have found the line terminator */
3812 0 : break;
3813 : }
3814 :
3815 : /* Process \n */
3816 2957279 : if (c == '\n' && (!cstate->csv_mode || !in_quote))
3817 : {
3818 108130 : if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL)
3819 0 : ereport(ERROR,
3820 : (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3821 : !cstate->csv_mode ?
3822 : errmsg("literal newline found in data") :
3823 : errmsg("unquoted newline found in data"),
3824 : !cstate->csv_mode ?
3825 : errhint("Use \"\\n\" to represent newline.") :
3826 : errhint("Use quoted CSV field to represent newline.")));
3827 108130 : cstate->eol_type = EOL_NL; /* in case not set yet */
3828 : /* If reach here, we have found the line terminator */
3829 108130 : break;
3830 : }
3831 :
3832 : /*
3833 : * In CSV mode, we only recognize \. alone on a line. This is because
3834 : * \. is a valid CSV data value.
3835 : */
3836 2849149 : if (c == '\\' && (!cstate->csv_mode || first_char_in_line))
3837 : {
3838 : char c2;
3839 :
3840 838 : IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3841 838 : IF_NEED_REFILL_AND_EOF_BREAK(0);
3842 :
3843 : /* -----
3844 : * get next character
3845 : * Note: we do not change c so if it isn't \., we can fall
3846 : * through and continue processing for file encoding.
3847 : * -----
3848 : */
3849 838 : c2 = copy_raw_buf[raw_buf_ptr];
3850 :
3851 838 : if (c2 == '.')
3852 : {
3853 1 : raw_buf_ptr++; /* consume the '.' */
3854 :
3855 : /*
3856 : * Note: if we loop back for more data here, it does not
3857 : * matter that the CSV state change checks are re-executed; we
3858 : * will come back here with no important state changed.
3859 : */
3860 1 : if (cstate->eol_type == EOL_CRNL)
3861 : {
3862 : /* Get the next character */
3863 0 : IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3864 : /* if hit_eof, c2 will become '\0' */
3865 0 : c2 = copy_raw_buf[raw_buf_ptr++];
3866 :
3867 0 : if (c2 == '\n')
3868 : {
3869 0 : if (!cstate->csv_mode)
3870 0 : ereport(ERROR,
3871 : (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3872 : errmsg("end-of-copy marker does not match previous newline style")));
3873 : else
3874 0 : NO_END_OF_COPY_GOTO;
3875 : }
3876 0 : else if (c2 != '\r')
3877 : {
3878 0 : if (!cstate->csv_mode)
3879 0 : ereport(ERROR,
3880 : (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3881 : errmsg("end-of-copy marker corrupt")));
3882 : else
3883 0 : NO_END_OF_COPY_GOTO;
3884 : }
3885 : }
3886 :
3887 : /* Get the next character */
3888 1 : IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
3889 : /* if hit_eof, c2 will become '\0' */
3890 1 : c2 = copy_raw_buf[raw_buf_ptr++];
3891 :
3892 1 : if (c2 != '\r' && c2 != '\n')
3893 : {
3894 1 : if (!cstate->csv_mode)
3895 0 : ereport(ERROR,
3896 : (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3897 : errmsg("end-of-copy marker corrupt")));
3898 : else
3899 1 : NO_END_OF_COPY_GOTO;
3900 : }
3901 :
3902 0 : if ((cstate->eol_type == EOL_NL && c2 != '\n') ||
3903 0 : (cstate->eol_type == EOL_CRNL && c2 != '\n') ||
3904 0 : (cstate->eol_type == EOL_CR && c2 != '\r'))
3905 : {
3906 0 : ereport(ERROR,
3907 : (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
3908 : errmsg("end-of-copy marker does not match previous newline style")));
3909 : }
3910 :
3911 : /*
3912 : * Transfer only the data before the \. into line_buf, then
3913 : * discard the data and the \. sequence.
3914 : */
3915 0 : if (prev_raw_ptr > cstate->raw_buf_index)
3916 0 : appendBinaryStringInfo(&cstate->line_buf,
3917 0 : cstate->raw_buf + cstate->raw_buf_index,
3918 0 : prev_raw_ptr - cstate->raw_buf_index);
3919 0 : cstate->raw_buf_index = raw_buf_ptr;
3920 0 : result = true; /* report EOF */
3921 0 : break;
3922 : }
3923 837 : else if (!cstate->csv_mode)
3924 :
3925 : /*
3926 : * If we are here, it means we found a backslash followed by
3927 : * something other than a period. In non-CSV mode, anything
3928 : * after a backslash is special, so we skip over that second
3929 : * character too. If we didn't do that \\. would be
3930 : * considered an eof-of copy, while in non-CSV mode it is a
3931 : * literal backslash followed by a period. In CSV mode,
3932 : * backslashes are not special, so we want to process the
3933 : * character after the backslash just like a normal character,
3934 : * so we don't increment in those cases.
3935 : */
3936 837 : raw_buf_ptr++;
3937 : }
3938 :
3939 : /*
3940 : * This label is for CSV cases where \. appears at the start of a
3941 : * line, but there is more text after it, meaning it was a data value.
3942 : * We are more strict for \. in CSV mode because \. could be a data
3943 : * value, while in non-CSV mode, \. cannot be a data value.
3944 : */
3945 : not_end_of_copy:
3946 :
3947 : /*
3948 : * Process all bytes of a multi-byte character as a group.
3949 : *
3950 : * We only support multi-byte sequences where the first byte has the
3951 : * high-bit set, so as an optimization we can avoid this block
3952 : * entirely if it is not set.
3953 : */
3954 2849149 : if (cstate->encoding_embeds_ascii && IS_HIGHBIT_SET(c))
3955 : {
3956 : int mblen;
3957 :
3958 0 : mblen_str[0] = c;
3959 : /* All our encodings only read the first byte to get the length */
3960 0 : mblen = pg_encoding_mblen(cstate->file_encoding, mblen_str);
3961 0 : IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(mblen - 1);
3962 0 : IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1);
3963 0 : raw_buf_ptr += mblen - 1;
3964 : }
3965 2849149 : first_char_in_line = false;
3966 2849149 : } /* end of outer loop */
3967 :
3968 : /*
3969 : * Transfer any still-uncopied data to line_buf.
3970 : */
3971 108222 : REFILL_LINEBUF;
3972 :
3973 108222 : return result;
3974 : }
3975 :
3976 : /*
3977 : * Return decimal value for a hexadecimal digit
3978 : */
3979 : static int
3980 0 : GetDecimalFromHex(char hex)
3981 : {
3982 0 : if (isdigit((unsigned char) hex))
3983 0 : return hex - '0';
3984 : else
3985 0 : return tolower((unsigned char) hex) - 'a' + 10;
3986 : }
3987 :
3988 : /*
3989 : * Parse the current line into separate attributes (fields),
3990 : * performing de-escaping as needed.
3991 : *
3992 : * The input is in line_buf. We use attribute_buf to hold the result
3993 : * strings. cstate->raw_fields[k] is set to point to the k'th attribute
3994 : * string, or NULL when the input matches the null marker string.
3995 : * This array is expanded as necessary.
3996 : *
3997 : * (Note that the caller cannot check for nulls since the returned
3998 : * string would be the post-de-escaping equivalent, which may look
3999 : * the same as some valid data string.)
4000 : *
4001 : * delim is the column delimiter string (must be just one byte for now).
4002 : * null_print is the null marker string. Note that this is compared to
4003 : * the pre-de-escaped input string.
4004 : *
4005 : * The return value is the number of fields actually read.
4006 : */
4007 : static int
4008 108097 : CopyReadAttributesText(CopyState cstate)
4009 : {
4010 108097 : char delimc = cstate->delim[0];
4011 : int fieldno;
4012 : char *output_ptr;
4013 : char *cur_ptr;
4014 : char *line_end_ptr;
4015 :
4016 : /*
4017 : * We need a special case for zero-column tables: check that the input
4018 : * line is empty, and return.
4019 : */
4020 108097 : if (cstate->max_fields <= 0)
4021 : {
4022 0 : if (cstate->line_buf.len != 0)
4023 0 : ereport(ERROR,
4024 : (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4025 : errmsg("extra data after last expected column")));
4026 0 : return 0;
4027 : }
4028 :
4029 108097 : resetStringInfo(&cstate->attribute_buf);
4030 :
4031 : /*
4032 : * The de-escaped attributes will certainly not be longer than the input
4033 : * data line, so we can just force attribute_buf to be large enough and
4034 : * then transfer data without any checks for enough space. We need to do
4035 : * it this way because enlarging attribute_buf mid-stream would invalidate
4036 : * pointers already stored into cstate->raw_fields[].
4037 : */
4038 108097 : if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
4039 0 : enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
4040 108097 : output_ptr = cstate->attribute_buf.data;
4041 :
4042 : /* set pointer variables for loop */
4043 108097 : cur_ptr = cstate->line_buf.data;
4044 108097 : line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
4045 :
4046 : /* Outer loop iterates over fields */
4047 108097 : fieldno = 0;
4048 : for (;;)
4049 : {
4050 416802 : bool found_delim = false;
4051 : char *start_ptr;
4052 : char *end_ptr;
4053 : int input_len;
4054 416802 : bool saw_non_ascii = false;
4055 :
4056 : /* Make sure there is enough space for the next value */
4057 416802 : if (fieldno >= cstate->max_fields)
4058 : {
4059 2 : cstate->max_fields *= 2;
4060 2 : cstate->raw_fields =
4061 2 : repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
4062 : }
4063 :
4064 : /* Remember start of field on both input and output sides */
4065 416802 : start_ptr = cur_ptr;
4066 416802 : cstate->raw_fields[fieldno] = output_ptr;
4067 :
4068 : /*
4069 : * Scan data for field.
4070 : *
4071 : * Note that in this loop, we are scanning to locate the end of field
4072 : * and also speculatively performing de-escaping. Once we find the
4073 : * end-of-field, we can match the raw field contents against the null
4074 : * marker string. Only after that comparison fails do we know that
4075 : * de-escaping is actually the right thing to do; therefore we *must
4076 : * not* throw any syntax errors before we've done the null-marker
4077 : * check.
4078 : */
4079 : for (;;)
4080 : {
4081 : char c;
4082 :
4083 2956944 : end_ptr = cur_ptr;
4084 2956944 : if (cur_ptr >= line_end_ptr)
4085 108097 : break;
4086 2848847 : c = *cur_ptr++;
4087 2848847 : if (c == delimc)
4088 : {
4089 308705 : found_delim = true;
4090 308705 : break;
4091 : }
4092 2540142 : if (c == '\\')
4093 : {
4094 837 : if (cur_ptr >= line_end_ptr)
4095 0 : break;
4096 837 : c = *cur_ptr++;
4097 837 : switch (c)
4098 : {
4099 : case '0':
4100 : case '1':
4101 : case '2':
4102 : case '3':
4103 : case '4':
4104 : case '5':
4105 : case '6':
4106 : case '7':
4107 : {
4108 : /* handle \013 */
4109 : int val;
4110 :
4111 2 : val = OCTVALUE(c);
4112 2 : if (cur_ptr < line_end_ptr)
4113 : {
4114 1 : c = *cur_ptr;
4115 1 : if (ISOCTAL(c))
4116 : {
4117 0 : cur_ptr++;
4118 0 : val = (val << 3) + OCTVALUE(c);
4119 0 : if (cur_ptr < line_end_ptr)
4120 : {
4121 0 : c = *cur_ptr;
4122 0 : if (ISOCTAL(c))
4123 : {
4124 0 : cur_ptr++;
4125 0 : val = (val << 3) + OCTVALUE(c);
4126 : }
4127 : }
4128 : }
4129 : }
4130 2 : c = val & 0377;
4131 2 : if (c == '\0' || IS_HIGHBIT_SET(c))
4132 2 : saw_non_ascii = true;
4133 : }
4134 2 : break;
4135 : case 'x':
4136 : /* Handle \x3F */
4137 2 : if (cur_ptr < line_end_ptr)
4138 : {
4139 1 : char hexchar = *cur_ptr;
4140 :
4141 1 : if (isxdigit((unsigned char) hexchar))
4142 : {
4143 0 : int val = GetDecimalFromHex(hexchar);
4144 :
4145 0 : cur_ptr++;
4146 0 : if (cur_ptr < line_end_ptr)
4147 : {
4148 0 : hexchar = *cur_ptr;
4149 0 : if (isxdigit((unsigned char) hexchar))
4150 : {
4151 0 : cur_ptr++;
4152 0 : val = (val << 4) + GetDecimalFromHex(hexchar);
4153 : }
4154 : }
4155 0 : c = val & 0xff;
4156 0 : if (c == '\0' || IS_HIGHBIT_SET(c))
4157 0 : saw_non_ascii = true;
4158 : }
4159 : }
4160 2 : break;
4161 : case 'b':
4162 0 : c = '\b';
4163 0 : break;
4164 : case 'f':
4165 0 : c = '\f';
4166 0 : break;
4167 : case 'n':
4168 508 : c = '\n';
4169 508 : break;
4170 : case 'r':
4171 0 : c = '\r';
4172 0 : break;
4173 : case 't':
4174 0 : c = '\t';
4175 0 : break;
4176 : case 'v':
4177 0 : c = '\v';
4178 0 : break;
4179 :
4180 : /*
4181 : * in all other cases, take the char after '\'
4182 : * literally
4183 : */
4184 : }
4185 : }
4186 :
4187 : /* Add c to output string */
4188 2540142 : *output_ptr++ = c;
4189 2540142 : }
4190 :
4191 : /* Check whether raw input matched null marker */
4192 416802 : input_len = end_ptr - start_ptr;
4193 457348 : if (input_len == cstate->null_print_len &&
4194 40546 : strncmp(start_ptr, cstate->null_print, input_len) == 0)
4195 319 : cstate->raw_fields[fieldno] = NULL;
4196 : else
4197 : {
4198 : /*
4199 : * At this point we know the field is supposed to contain data.
4200 : *
4201 : * If we de-escaped any non-7-bit-ASCII chars, make sure the
4202 : * resulting string is valid data for the db encoding.
4203 : */
4204 416483 : if (saw_non_ascii)
4205 : {
4206 0 : char *fld = cstate->raw_fields[fieldno];
4207 :
4208 0 : pg_verifymbstr(fld, output_ptr - fld, false);
4209 : }
4210 : }
4211 :
4212 : /* Terminate attribute value in output area */
4213 416802 : *output_ptr++ = '\0';
4214 :
4215 416802 : fieldno++;
4216 : /* Done if we hit EOL instead of a delim */
4217 416802 : if (!found_delim)
4218 108097 : break;
4219 308705 : }
4220 :
4221 : /* Clean up state of attribute_buf */
4222 108097 : output_ptr--;
4223 108097 : Assert(*output_ptr == '\0');
4224 108097 : cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
4225 :
4226 108097 : return fieldno;
4227 : }
4228 :
4229 : /*
4230 : * Parse the current line into separate attributes (fields),
4231 : * performing de-escaping as needed. This has exactly the same API as
4232 : * CopyReadAttributesText, except we parse the fields according to
4233 : * "standard" (i.e. common) CSV usage.
4234 : */
4235 : static int
4236 32 : CopyReadAttributesCSV(CopyState cstate)
4237 : {
4238 32 : char delimc = cstate->delim[0];
4239 32 : char quotec = cstate->quote[0];
4240 32 : char escapec = cstate->escape[0];
4241 : int fieldno;
4242 : char *output_ptr;
4243 : char *cur_ptr;
4244 : char *line_end_ptr;
4245 :
4246 : /*
4247 : * We need a special case for zero-column tables: check that the input
4248 : * line is empty, and return.
4249 : */
4250 32 : if (cstate->max_fields <= 0)
4251 : {
4252 0 : if (cstate->line_buf.len != 0)
4253 0 : ereport(ERROR,
4254 : (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4255 : errmsg("extra data after last expected column")));
4256 0 : return 0;
4257 : }
4258 :
4259 32 : resetStringInfo(&cstate->attribute_buf);
4260 :
4261 : /*
4262 : * The de-escaped attributes will certainly not be longer than the input
4263 : * data line, so we can just force attribute_buf to be large enough and
4264 : * then transfer data without any checks for enough space. We need to do
4265 : * it this way because enlarging attribute_buf mid-stream would invalidate
4266 : * pointers already stored into cstate->raw_fields[].
4267 : */
4268 32 : if (cstate->attribute_buf.maxlen <= cstate->line_buf.len)
4269 0 : enlargeStringInfo(&cstate->attribute_buf, cstate->line_buf.len);
4270 32 : output_ptr = cstate->attribute_buf.data;
4271 :
4272 : /* set pointer variables for loop */
4273 32 : cur_ptr = cstate->line_buf.data;
4274 32 : line_end_ptr = cstate->line_buf.data + cstate->line_buf.len;
4275 :
4276 : /* Outer loop iterates over fields */
4277 32 : fieldno = 0;
4278 : for (;;)
4279 : {
4280 61 : bool found_delim = false;
4281 61 : bool saw_quote = false;
4282 : char *start_ptr;
4283 : char *end_ptr;
4284 : int input_len;
4285 :
4286 : /* Make sure there is enough space for the next value */
4287 61 : if (fieldno >= cstate->max_fields)
4288 : {
4289 0 : cstate->max_fields *= 2;
4290 0 : cstate->raw_fields =
4291 0 : repalloc(cstate->raw_fields, cstate->max_fields * sizeof(char *));
4292 : }
4293 :
4294 : /* Remember start of field on both input and output sides */
4295 61 : start_ptr = cur_ptr;
4296 61 : cstate->raw_fields[fieldno] = output_ptr;
4297 :
4298 : /*
4299 : * Scan data for field,
4300 : *
4301 : * The loop starts in "not quote" mode and then toggles between that
4302 : * and "in quote" mode. The loop exits normally if it is in "not
4303 : * quote" mode and a delimiter or line end is seen.
4304 : */
4305 : for (;;)
4306 : {
4307 : char c;
4308 :
4309 : /* Not in quote */
4310 : for (;;)
4311 : {
4312 160 : end_ptr = cur_ptr;
4313 160 : if (cur_ptr >= line_end_ptr)
4314 32 : goto endfield;
4315 128 : c = *cur_ptr++;
4316 : /* unquoted field delimiter */
4317 128 : if (c == delimc)
4318 : {
4319 29 : found_delim = true;
4320 29 : goto endfield;
4321 : }
4322 : /* start of quoted field (or part of field) */
4323 99 : if (c == quotec)
4324 : {
4325 13 : saw_quote = true;
4326 13 : break;
4327 : }
4328 : /* Add c to output string */
4329 86 : *output_ptr++ = c;
4330 86 : }
4331 :
4332 : /* In quote */
4333 : for (;;)
4334 : {
4335 107 : end_ptr = cur_ptr;
4336 107 : if (cur_ptr >= line_end_ptr)
4337 0 : ereport(ERROR,
4338 : (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4339 : errmsg("unterminated CSV quoted field")));
4340 :
4341 107 : c = *cur_ptr++;
4342 :
4343 : /* escape within a quoted field */
4344 107 : if (c == escapec)
4345 : {
4346 : /*
4347 : * peek at the next char if available, and escape it if it
4348 : * is an escape char or a quote char
4349 : */
4350 13 : if (cur_ptr < line_end_ptr)
4351 : {
4352 9 : char nextc = *cur_ptr;
4353 :
4354 9 : if (nextc == escapec || nextc == quotec)
4355 : {
4356 4 : *output_ptr++ = nextc;
4357 4 : cur_ptr++;
4358 4 : continue;
4359 : }
4360 : }
4361 : }
4362 :
4363 : /*
4364 : * end of quoted field. Must do this test after testing for
4365 : * escape in case quote char and escape char are the same
4366 : * (which is the common case).
4367 : */
4368 103 : if (c == quotec)
4369 13 : break;
4370 :
4371 : /* Add c to output string */
4372 90 : *output_ptr++ = c;
4373 94 : }
4374 13 : }
4375 : endfield:
4376 :
4377 : /* Terminate attribute value in output area */
4378 61 : *output_ptr++ = '\0';
4379 :
4380 : /* Check whether raw input matched null marker */
4381 61 : input_len = end_ptr - start_ptr;
4382 64 : if (!saw_quote && input_len == cstate->null_print_len &&
4383 3 : strncmp(start_ptr, cstate->null_print, input_len) == 0)
4384 3 : cstate->raw_fields[fieldno] = NULL;
4385 :
4386 61 : fieldno++;
4387 : /* Done if we hit EOL instead of a delim */
4388 61 : if (!found_delim)
4389 32 : break;
4390 29 : }
4391 :
4392 : /* Clean up state of attribute_buf */
4393 32 : output_ptr--;
4394 32 : Assert(*output_ptr == '\0');
4395 32 : cstate->attribute_buf.len = (output_ptr - cstate->attribute_buf.data);
4396 :
4397 32 : return fieldno;
4398 : }
4399 :
4400 :
4401 : /*
4402 : * Read a binary attribute
4403 : */
4404 : static Datum
4405 21 : CopyReadBinaryAttribute(CopyState cstate,
4406 : int column_no, FmgrInfo *flinfo,
4407 : Oid typioparam, int32 typmod,
4408 : bool *isnull)
4409 : {
4410 : int32 fld_size;
4411 : Datum result;
4412 :
4413 21 : if (!CopyGetInt32(cstate, &fld_size))
4414 0 : ereport(ERROR,
4415 : (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4416 : errmsg("unexpected EOF in COPY data")));
4417 21 : if (fld_size == -1)
4418 : {
4419 5 : *isnull = true;
4420 5 : return ReceiveFunctionCall(flinfo, NULL, typioparam, typmod);
4421 : }
4422 16 : if (fld_size < 0)
4423 0 : ereport(ERROR,
4424 : (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4425 : errmsg("invalid field size")));
4426 :
4427 : /* reset attribute_buf to empty, and load raw data in it */
4428 16 : resetStringInfo(&cstate->attribute_buf);
4429 :
4430 16 : enlargeStringInfo(&cstate->attribute_buf, fld_size);
4431 32 : if (CopyGetData(cstate, cstate->attribute_buf.data,
4432 16 : fld_size, fld_size) != fld_size)
4433 0 : ereport(ERROR,
4434 : (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
4435 : errmsg("unexpected EOF in COPY data")));
4436 :
4437 16 : cstate->attribute_buf.len = fld_size;
4438 16 : cstate->attribute_buf.data[fld_size] = '\0';
4439 :
4440 : /* Call the column type's binary input converter */
4441 16 : result = ReceiveFunctionCall(flinfo, &cstate->attribute_buf,
4442 : typioparam, typmod);
4443 :
4444 : /* Trouble if it didn't eat the whole buffer */
4445 16 : if (cstate->attribute_buf.cursor != cstate->attribute_buf.len)
4446 0 : ereport(ERROR,
4447 : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
4448 : errmsg("incorrect binary data format")));
4449 :
4450 16 : *isnull = false;
4451 16 : return result;
4452 : }
4453 :
4454 : /*
4455 : * Send text representation of one attribute, with conversion and escaping
4456 : */
4457 : #define DUMPSOFAR() \
4458 : do { \
4459 : if (ptr > start) \
4460 : CopySendData(cstate, start, ptr - start); \
4461 : } while (0)
4462 :
4463 : static void
4464 32462 : CopyAttributeOutText(CopyState cstate, char *string)
4465 : {
4466 : char *ptr;
4467 : char *start;
4468 : char c;
4469 32462 : char delimc = cstate->delim[0];
4470 :
4471 32462 : if (cstate->need_transcoding)
4472 32462 : ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
4473 : else
4474 0 : ptr = string;
4475 :
4476 : /*
4477 : * We have to grovel through the string searching for control characters
4478 : * and instances of the delimiter character. In most cases, though, these
4479 : * are infrequent. To avoid overhead from calling CopySendData once per
4480 : * character, we dump out all characters between escaped characters in a
4481 : * single call. The loop invariant is that the data from "start" to "ptr"
4482 : * can be sent literally, but hasn't yet been.
4483 : *
4484 : * We can skip pg_encoding_mblen() overhead when encoding is safe, because
4485 : * in valid backend encodings, extra bytes of a multibyte character never
4486 : * look like ASCII. This loop is sufficiently performance-critical that
4487 : * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out
4488 : * of the normal safe-encoding path.
4489 : */
4490 32462 : if (cstate->encoding_embeds_ascii)
4491 : {
4492 0 : start = ptr;
4493 0 : while ((c = *ptr) != '\0')
4494 : {
4495 0 : if ((unsigned char) c < (unsigned char) 0x20)
4496 : {
4497 : /*
4498 : * \r and \n must be escaped, the others are traditional. We
4499 : * prefer to dump these using the C-like notation, rather than
4500 : * a backslash and the literal character, because it makes the
4501 : * dump file a bit more proof against Microsoftish data
4502 : * mangling.
4503 : */
4504 0 : switch (c)
4505 : {
4506 : case '\b':
4507 0 : c = 'b';
4508 0 : break;
4509 : case '\f':
4510 0 : c = 'f';
4511 0 : break;
4512 : case '\n':
4513 0 : c = 'n';
4514 0 : break;
4515 : case '\r':
4516 0 : c = 'r';
4517 0 : break;
4518 : case '\t':
4519 0 : c = 't';
4520 0 : break;
4521 : case '\v':
4522 0 : c = 'v';
4523 0 : break;
4524 : default:
4525 : /* If it's the delimiter, must backslash it */
4526 0 : if (c == delimc)
4527 0 : break;
4528 : /* All ASCII control chars are length 1 */
4529 0 : ptr++;
4530 0 : continue; /* fall to end of loop */
4531 : }
4532 : /* if we get here, we need to convert the control char */
4533 0 : DUMPSOFAR();
4534 0 : CopySendChar(cstate, '\\');
4535 0 : CopySendChar(cstate, c);
4536 0 : start = ++ptr; /* do not include char in next run */
4537 : }
4538 0 : else if (c == '\\' || c == delimc)
4539 : {
4540 0 : DUMPSOFAR();
4541 0 : CopySendChar(cstate, '\\');
4542 0 : start = ptr++; /* we include char in next run */
4543 : }
4544 0 : else if (IS_HIGHBIT_SET(c))
4545 0 : ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
4546 : else
4547 0 : ptr++;
4548 : }
4549 : }
4550 : else
4551 : {
4552 32462 : start = ptr;
4553 153348 : while ((c = *ptr) != '\0')
4554 : {
4555 88424 : if ((unsigned char) c < (unsigned char) 0x20)
4556 : {
4557 : /*
4558 : * \r and \n must be escaped, the others are traditional. We
4559 : * prefer to dump these using the C-like notation, rather than
4560 : * a backslash and the literal character, because it makes the
4561 : * dump file a bit more proof against Microsoftish data
4562 : * mangling.
4563 : */
4564 0 : switch (c)
4565 : {
4566 : case '\b':
4567 0 : c = 'b';
4568 0 : break;
4569 : case '\f':
4570 0 : c = 'f';
4571 0 : break;
4572 : case '\n':
4573 0 : c = 'n';
4574 0 : break;
4575 : case '\r':
4576 0 : c = 'r';
4577 0 : break;
4578 : case '\t':
4579 0 : c = 't';
4580 0 : break;
4581 : case '\v':
4582 0 : c = 'v';
4583 0 : break;
4584 : default:
4585 : /* If it's the delimiter, must backslash it */
4586 0 : if (c == delimc)
4587 0 : break;
4588 : /* All ASCII control chars are length 1 */
4589 0 : ptr++;
4590 0 : continue; /* fall to end of loop */
4591 : }
4592 : /* if we get here, we need to convert the control char */
4593 0 : DUMPSOFAR();
4594 0 : CopySendChar(cstate, '\\');
4595 0 : CopySendChar(cstate, c);
4596 0 : start = ++ptr; /* do not include char in next run */
4597 : }
4598 88424 : else if (c == '\\' || c == delimc)
4599 : {
4600 8 : DUMPSOFAR();
4601 8 : CopySendChar(cstate, '\\');
4602 8 : start = ptr++; /* we include char in next run */
4603 : }
4604 : else
4605 88416 : ptr++;
4606 : }
4607 : }
4608 :
4609 32462 : DUMPSOFAR();
4610 32462 : }
4611 :
4612 : /*
4613 : * Send text representation of one attribute, with conversion and
4614 : * CSV-style escaping
4615 : */
4616 : static void
4617 99 : CopyAttributeOutCSV(CopyState cstate, char *string,
4618 : bool use_quote, bool single_attr)
4619 : {
4620 : char *ptr;
4621 : char *start;
4622 : char c;
4623 99 : char delimc = cstate->delim[0];
4624 99 : char quotec = cstate->quote[0];
4625 99 : char escapec = cstate->escape[0];
4626 :
4627 : /* force quoting if it matches null_print (before conversion!) */
4628 99 : if (!use_quote && strcmp(string, cstate->null_print) == 0)
4629 9 : use_quote = true;
4630 :
4631 99 : if (cstate->need_transcoding)
4632 99 : ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
4633 : else
4634 0 : ptr = string;
4635 :
4636 : /*
4637 : * Make a preliminary pass to discover if it needs quoting
4638 : */
4639 99 : if (!use_quote)
4640 : {
4641 : /*
4642 : * Because '\.' can be a data value, quote it if it appears alone on a
4643 : * line so it is not interpreted as the end-of-data marker.
4644 : */
4645 68 : if (single_attr && strcmp(ptr, "\\.") == 0)
4646 1 : use_quote = true;
4647 : else
4648 : {
4649 67 : char *tptr = ptr;
4650 :
4651 419 : while ((c = *tptr) != '\0')
4652 : {
4653 307 : if (c == delimc || c == quotec || c == '\n' || c == '\r')
4654 : {
4655 22 : use_quote = true;
4656 22 : break;
4657 : }
4658 285 : if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
4659 0 : tptr += pg_encoding_mblen(cstate->file_encoding, tptr);
4660 : else
4661 285 : tptr++;
4662 : }
4663 : }
4664 : }
4665 :
4666 99 : if (use_quote)
4667 : {
4668 54 : CopySendChar(cstate, quotec);
4669 :
4670 : /*
4671 : * We adopt the same optimization strategy as in CopyAttributeOutText
4672 : */
4673 54 : start = ptr;
4674 477 : while ((c = *ptr) != '\0')
4675 : {
4676 369 : if (c == quotec || c == escapec)
4677 : {
4678 26 : DUMPSOFAR();
4679 26 : CopySendChar(cstate, escapec);
4680 26 : start = ptr; /* we include char in next run */
4681 : }
4682 369 : if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
4683 0 : ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
4684 : else
4685 369 : ptr++;
4686 : }
4687 54 : DUMPSOFAR();
4688 :
4689 54 : CopySendChar(cstate, quotec);
4690 : }
4691 : else
4692 : {
4693 : /* If it doesn't need quoting, we can just dump it as-is */
4694 45 : CopySendString(cstate, ptr);
4695 : }
4696 99 : }
4697 :
4698 : /*
4699 : * CopyGetAttnums - build an integer list of attnums to be copied
4700 : *
4701 : * The input attnamelist is either the user-specified column list,
4702 : * or NIL if there was none (in which case we want all the non-dropped
4703 : * columns).
4704 : *
4705 : * rel can be NULL ... it's only used for error reports.
4706 : */
4707 : static List *
4708 375 : CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
4709 : {
4710 375 : List *attnums = NIL;
4711 :
4712 375 : if (attnamelist == NIL)
4713 : {
4714 : /* Generate default column list */
4715 271 : int attr_count = tupDesc->natts;
4716 : int i;
4717 :
4718 1130 : for (i = 0; i < attr_count; i++)
4719 : {
4720 859 : if (TupleDescAttr(tupDesc, i)->attisdropped)
4721 8 : continue;
4722 851 : attnums = lappend_int(attnums, i + 1);
4723 : }
4724 : }
4725 : else
4726 : {
4727 : /* Validate the user-supplied list and extract attnums */
4728 : ListCell *l;
4729 :
4730 312 : foreach(l, attnamelist)
4731 : {
4732 214 : char *name = strVal(lfirst(l));
4733 : int attnum;
4734 : int i;
4735 :
4736 : /* Lookup column name */
4737 214 : attnum = InvalidAttrNumber;
4738 526 : for (i = 0; i < tupDesc->natts; i++)
4739 : {
4740 521 : Form_pg_attribute att = TupleDescAttr(tupDesc, i);
4741 :
4742 521 : if (att->attisdropped)
4743 8 : continue;
4744 513 : if (namestrcmp(&(att->attname), name) == 0)
4745 : {
4746 209 : attnum = att->attnum;
4747 209 : break;
4748 : }
4749 : }
4750 214 : if (attnum == InvalidAttrNumber)
4751 : {
4752 5 : if (rel != NULL)
4753 5 : ereport(ERROR,
4754 : (errcode(ERRCODE_UNDEFINED_COLUMN),
4755 : errmsg("column \"%s\" of relation \"%s\" does not exist",
4756 : name, RelationGetRelationName(rel))));
4757 : else
4758 0 : ereport(ERROR,
4759 : (errcode(ERRCODE_UNDEFINED_COLUMN),
4760 : errmsg("column \"%s\" does not exist",
4761 : name)));
4762 : }
4763 : /* Check for duplicates */
4764 209 : if (list_member_int(attnums, attnum))
4765 1 : ereport(ERROR,
4766 : (errcode(ERRCODE_DUPLICATE_COLUMN),
4767 : errmsg("column \"%s\" specified more than once",
4768 : name)));
4769 208 : attnums = lappend_int(attnums, attnum);
4770 : }
4771 : }
4772 :
4773 369 : return attnums;
4774 : }
4775 :
4776 :
4777 : /*
4778 : * copy_dest_startup --- executor startup
4779 : */
4780 : static void
4781 32 : copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
4782 : {
4783 : /* no-op */
4784 32 : }
4785 :
4786 : /*
4787 : * copy_dest_receive --- receive one tuple
4788 : */
4789 : static bool
4790 97 : copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
4791 : {
4792 97 : DR_copy *myState = (DR_copy *) self;
4793 97 : CopyState cstate = myState->cstate;
4794 :
4795 : /* Make sure the tuple is fully deconstructed */
4796 97 : slot_getallattrs(slot);
4797 :
4798 : /* And send the data */
4799 97 : CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull);
4800 97 : myState->processed++;
4801 :
4802 97 : return true;
4803 : }
4804 :
4805 : /*
4806 : * copy_dest_shutdown --- executor end
4807 : */
4808 : static void
4809 32 : copy_dest_shutdown(DestReceiver *self)
4810 : {
4811 : /* no-op */
4812 32 : }
4813 :
4814 : /*
4815 : * copy_dest_destroy --- release DestReceiver object
4816 : */
4817 : static void
4818 0 : copy_dest_destroy(DestReceiver *self)
4819 : {
4820 0 : pfree(self);
4821 0 : }
4822 :
4823 : /*
4824 : * CreateCopyDestReceiver -- create a suitable DestReceiver object
4825 : */
4826 : DestReceiver *
4827 33 : CreateCopyDestReceiver(void)
4828 : {
4829 33 : DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy));
4830 :
4831 33 : self->pub.receiveSlot = copy_dest_receive;
4832 33 : self->pub.rStartup = copy_dest_startup;
4833 33 : self->pub.rShutdown = copy_dest_shutdown;
4834 33 : self->pub.rDestroy = copy_dest_destroy;
4835 33 : self->pub.mydest = DestCopyOut;
4836 :
4837 33 : self->cstate = NULL; /* will be set later */
4838 33 : self->processed = 0;
4839 :
4840 33 : return (DestReceiver *) self;
4841 : }
|