Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * nodeHash.c
4 : * Routines to hash relations for hashjoin
5 : *
6 : * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/executor/nodeHash.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : /*
16 : * INTERFACE ROUTINES
17 : * MultiExecHash - generate an in-memory hash table of the relation
18 : * ExecInitHash - initialize node and subnodes
19 : * ExecEndHash - shutdown node and subnodes
20 : */
21 :
22 : #include "postgres.h"
23 :
24 : #include <math.h>
25 : #include <limits.h>
26 :
27 : #include "access/htup_details.h"
28 : #include "catalog/pg_statistic.h"
29 : #include "commands/tablespace.h"
30 : #include "executor/execdebug.h"
31 : #include "executor/hashjoin.h"
32 : #include "executor/nodeHash.h"
33 : #include "executor/nodeHashjoin.h"
34 : #include "miscadmin.h"
35 : #include "utils/dynahash.h"
36 : #include "utils/memutils.h"
37 : #include "utils/lsyscache.h"
38 : #include "utils/syscache.h"
39 :
40 :
41 : static void ExecHashIncreaseNumBatches(HashJoinTable hashtable);
42 : static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable);
43 : static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node,
44 : int mcvsToUse);
45 : static void ExecHashSkewTableInsert(HashJoinTable hashtable,
46 : TupleTableSlot *slot,
47 : uint32 hashvalue,
48 : int bucketNumber);
49 : static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable);
50 :
51 : static void *dense_alloc(HashJoinTable hashtable, Size size);
52 :
53 : /* ----------------------------------------------------------------
54 : * ExecHash
55 : *
56 : * stub for pro forma compliance
57 : * ----------------------------------------------------------------
58 : */
59 : static TupleTableSlot *
60 0 : ExecHash(PlanState *pstate)
61 : {
62 0 : elog(ERROR, "Hash node does not support ExecProcNode call convention");
63 : return NULL;
64 : }
65 :
66 : /* ----------------------------------------------------------------
67 : * MultiExecHash
68 : *
69 : * build hash table for hashjoin, doing partitioning if more
70 : * than one batch is required.
71 : * ----------------------------------------------------------------
72 : */
73 : Node *
74 1007 : MultiExecHash(HashState *node)
75 : {
76 : PlanState *outerNode;
77 : List *hashkeys;
78 : HashJoinTable hashtable;
79 : TupleTableSlot *slot;
80 : ExprContext *econtext;
81 : uint32 hashvalue;
82 :
83 : /* must provide our own instrumentation support */
84 1007 : if (node->ps.instrument)
85 0 : InstrStartNode(node->ps.instrument);
86 :
87 : /*
88 : * get state info from node
89 : */
90 1007 : outerNode = outerPlanState(node);
91 1007 : hashtable = node->hashtable;
92 :
93 : /*
94 : * set expression context
95 : */
96 1007 : hashkeys = node->hashkeys;
97 1007 : econtext = node->ps.ps_ExprContext;
98 :
99 : /*
100 : * get all inner tuples and insert into the hash table (or temp files)
101 : */
102 : for (;;)
103 : {
104 277443 : slot = ExecProcNode(outerNode);
105 277443 : if (TupIsNull(slot))
106 : break;
107 : /* We have to compute the hash value */
108 276436 : econtext->ecxt_innertuple = slot;
109 276436 : if (ExecHashGetHashValue(hashtable, econtext, hashkeys,
110 276436 : false, hashtable->keepNulls,
111 : &hashvalue))
112 : {
113 : int bucketNumber;
114 :
115 276436 : bucketNumber = ExecHashGetSkewBucket(hashtable, hashvalue);
116 276436 : if (bucketNumber != INVALID_SKEW_BUCKET_NO)
117 : {
118 : /* It's a skew tuple, so put it into that hash table */
119 77 : ExecHashSkewTableInsert(hashtable, slot, hashvalue,
120 : bucketNumber);
121 77 : hashtable->skewTuples += 1;
122 : }
123 : else
124 : {
125 : /* Not subject to skew optimization, so insert normally */
126 276359 : ExecHashTableInsert(hashtable, slot, hashvalue);
127 : }
128 276436 : hashtable->totalTuples += 1;
129 : }
130 276436 : }
131 :
132 : /* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */
133 1007 : if (hashtable->nbuckets != hashtable->nbuckets_optimal)
134 1 : ExecHashIncreaseNumBuckets(hashtable);
135 :
136 : /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
137 1007 : hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
138 1007 : if (hashtable->spaceUsed > hashtable->spacePeak)
139 1006 : hashtable->spacePeak = hashtable->spaceUsed;
140 :
141 : /* must provide our own instrumentation support */
142 1007 : if (node->ps.instrument)
143 0 : InstrStopNode(node->ps.instrument, hashtable->totalTuples);
144 :
145 : /*
146 : * We do not return the hash table directly because it's not a subtype of
147 : * Node, and so would violate the MultiExecProcNode API. Instead, our
148 : * parent Hashjoin node is expected to know how to fish it out of our node
149 : * state. Ugly but not really worth cleaning up, since Hashjoin knows
150 : * quite a bit more about Hash besides that.
151 : */
152 1007 : return NULL;
153 : }
154 :
155 : /* ----------------------------------------------------------------
156 : * ExecInitHash
157 : *
158 : * Init routine for Hash node
159 : * ----------------------------------------------------------------
160 : */
161 : HashState *
162 1361 : ExecInitHash(Hash *node, EState *estate, int eflags)
163 : {
164 : HashState *hashstate;
165 :
166 : /* check for unsupported flags */
167 1361 : Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
168 :
169 : /*
170 : * create state structure
171 : */
172 1361 : hashstate = makeNode(HashState);
173 1361 : hashstate->ps.plan = (Plan *) node;
174 1361 : hashstate->ps.state = estate;
175 1361 : hashstate->ps.ExecProcNode = ExecHash;
176 1361 : hashstate->hashtable = NULL;
177 1361 : hashstate->hashkeys = NIL; /* will be set by parent HashJoin */
178 :
179 : /*
180 : * Miscellaneous initialization
181 : *
182 : * create expression context for node
183 : */
184 1361 : ExecAssignExprContext(estate, &hashstate->ps);
185 :
186 : /*
187 : * initialize our result slot
188 : */
189 1361 : ExecInitResultTupleSlot(estate, &hashstate->ps);
190 :
191 : /*
192 : * initialize child expressions
193 : */
194 1361 : hashstate->ps.qual =
195 1361 : ExecInitQual(node->plan.qual, (PlanState *) hashstate);
196 :
197 : /*
198 : * initialize child nodes
199 : */
200 1361 : outerPlanState(hashstate) = ExecInitNode(outerPlan(node), estate, eflags);
201 :
202 : /*
203 : * initialize tuple type. no need to initialize projection info because
204 : * this node doesn't do projections
205 : */
206 1361 : ExecAssignResultTypeFromTL(&hashstate->ps);
207 1361 : hashstate->ps.ps_ProjInfo = NULL;
208 :
209 1361 : return hashstate;
210 : }
211 :
212 : /* ---------------------------------------------------------------
213 : * ExecEndHash
214 : *
215 : * clean up routine for Hash node
216 : * ----------------------------------------------------------------
217 : */
218 : void
219 1354 : ExecEndHash(HashState *node)
220 : {
221 : PlanState *outerPlan;
222 :
223 : /*
224 : * free exprcontext
225 : */
226 1354 : ExecFreeExprContext(&node->ps);
227 :
228 : /*
229 : * shut down the subplan
230 : */
231 1354 : outerPlan = outerPlanState(node);
232 1354 : ExecEndNode(outerPlan);
233 1354 : }
234 :
235 :
236 : /* ----------------------------------------------------------------
237 : * ExecHashTableCreate
238 : *
239 : * create an empty hashtable data structure for hashjoin.
240 : * ----------------------------------------------------------------
241 : */
242 : HashJoinTable
243 1007 : ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
244 : {
245 : HashJoinTable hashtable;
246 : Plan *outerNode;
247 : int nbuckets;
248 : int nbatch;
249 : int num_skew_mcvs;
250 : int log2_nbuckets;
251 : int nkeys;
252 : int i;
253 : ListCell *ho;
254 : MemoryContext oldcxt;
255 :
256 : /*
257 : * Get information about the size of the relation to be hashed (it's the
258 : * "outer" subtree of this node, but the inner relation of the hashjoin).
259 : * Compute the appropriate size of the hash table.
260 : */
261 1007 : outerNode = outerPlan(node);
262 :
263 1007 : ExecChooseHashTableSize(outerNode->plan_rows, outerNode->plan_width,
264 1007 : OidIsValid(node->skewTable),
265 : &nbuckets, &nbatch, &num_skew_mcvs);
266 :
267 : /* nbuckets must be a power of 2 */
268 1007 : log2_nbuckets = my_log2(nbuckets);
269 1007 : Assert(nbuckets == (1 << log2_nbuckets));
270 :
271 : /*
272 : * Initialize the hash table control block.
273 : *
274 : * The hashtable control block is just palloc'd from the executor's
275 : * per-query memory context.
276 : */
277 1007 : hashtable = (HashJoinTable) palloc(sizeof(HashJoinTableData));
278 1007 : hashtable->nbuckets = nbuckets;
279 1007 : hashtable->nbuckets_original = nbuckets;
280 1007 : hashtable->nbuckets_optimal = nbuckets;
281 1007 : hashtable->log2_nbuckets = log2_nbuckets;
282 1007 : hashtable->log2_nbuckets_optimal = log2_nbuckets;
283 1007 : hashtable->buckets = NULL;
284 1007 : hashtable->keepNulls = keepNulls;
285 1007 : hashtable->skewEnabled = false;
286 1007 : hashtable->skewBucket = NULL;
287 1007 : hashtable->skewBucketLen = 0;
288 1007 : hashtable->nSkewBuckets = 0;
289 1007 : hashtable->skewBucketNums = NULL;
290 1007 : hashtable->nbatch = nbatch;
291 1007 : hashtable->curbatch = 0;
292 1007 : hashtable->nbatch_original = nbatch;
293 1007 : hashtable->nbatch_outstart = nbatch;
294 1007 : hashtable->growEnabled = true;
295 1007 : hashtable->totalTuples = 0;
296 1007 : hashtable->skewTuples = 0;
297 1007 : hashtable->innerBatchFile = NULL;
298 1007 : hashtable->outerBatchFile = NULL;
299 1007 : hashtable->spaceUsed = 0;
300 1007 : hashtable->spacePeak = 0;
301 1007 : hashtable->spaceAllowed = work_mem * 1024L;
302 1007 : hashtable->spaceUsedSkew = 0;
303 1007 : hashtable->spaceAllowedSkew =
304 1007 : hashtable->spaceAllowed * SKEW_WORK_MEM_PERCENT / 100;
305 1007 : hashtable->chunks = NULL;
306 :
307 : #ifdef HJDEBUG
308 : printf("Hashjoin %p: initial nbatch = %d, nbuckets = %d\n",
309 : hashtable, nbatch, nbuckets);
310 : #endif
311 :
312 : /*
313 : * Get info about the hash functions to be used for each hash key. Also
314 : * remember whether the join operators are strict.
315 : */
316 1007 : nkeys = list_length(hashOperators);
317 1007 : hashtable->outer_hashfunctions =
318 1007 : (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo));
319 1007 : hashtable->inner_hashfunctions =
320 1007 : (FmgrInfo *) palloc(nkeys * sizeof(FmgrInfo));
321 1007 : hashtable->hashStrict = (bool *) palloc(nkeys * sizeof(bool));
322 1007 : i = 0;
323 2072 : foreach(ho, hashOperators)
324 : {
325 1065 : Oid hashop = lfirst_oid(ho);
326 : Oid left_hashfn;
327 : Oid right_hashfn;
328 :
329 1065 : if (!get_op_hash_functions(hashop, &left_hashfn, &right_hashfn))
330 0 : elog(ERROR, "could not find hash function for hash operator %u",
331 : hashop);
332 1065 : fmgr_info(left_hashfn, &hashtable->outer_hashfunctions[i]);
333 1065 : fmgr_info(right_hashfn, &hashtable->inner_hashfunctions[i]);
334 1065 : hashtable->hashStrict[i] = op_strict(hashop);
335 1065 : i++;
336 : }
337 :
338 : /*
339 : * Create temporary memory contexts in which to keep the hashtable working
340 : * storage. See notes in executor/hashjoin.h.
341 : */
342 1007 : hashtable->hashCxt = AllocSetContextCreate(CurrentMemoryContext,
343 : "HashTableContext",
344 : ALLOCSET_DEFAULT_SIZES);
345 :
346 1007 : hashtable->batchCxt = AllocSetContextCreate(hashtable->hashCxt,
347 : "HashBatchContext",
348 : ALLOCSET_DEFAULT_SIZES);
349 :
350 : /* Allocate data that will live for the life of the hashjoin */
351 :
352 1007 : oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
353 :
354 1007 : if (nbatch > 1)
355 : {
356 : /*
357 : * allocate and initialize the file arrays in hashCxt
358 : */
359 1 : hashtable->innerBatchFile = (BufFile **)
360 1 : palloc0(nbatch * sizeof(BufFile *));
361 1 : hashtable->outerBatchFile = (BufFile **)
362 1 : palloc0(nbatch * sizeof(BufFile *));
363 : /* The files will not be opened until needed... */
364 : /* ... but make sure we have temp tablespaces established for them */
365 1 : PrepareTempTablespaces();
366 : }
367 :
368 : /*
369 : * Prepare context for the first-scan space allocations; allocate the
370 : * hashbucket array therein, and set each bucket "empty".
371 : */
372 1007 : MemoryContextSwitchTo(hashtable->batchCxt);
373 :
374 1007 : hashtable->buckets = (HashJoinTuple *)
375 1007 : palloc0(nbuckets * sizeof(HashJoinTuple));
376 :
377 : /*
378 : * Set up for skew optimization, if possible and there's a need for more
379 : * than one batch. (In a one-batch join, there's no point in it.)
380 : */
381 1007 : if (nbatch > 1)
382 1 : ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs);
383 :
384 1007 : MemoryContextSwitchTo(oldcxt);
385 :
386 1007 : return hashtable;
387 : }
388 :
389 :
390 : /*
391 : * Compute appropriate size for hashtable given the estimated size of the
392 : * relation to be hashed (number of rows and average row width).
393 : *
394 : * This is exported so that the planner's costsize.c can use it.
395 : */
396 :
397 : /* Target bucket loading (tuples per bucket) */
398 : #define NTUP_PER_BUCKET 1
399 :
400 : void
401 13387 : ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
402 : int *numbuckets,
403 : int *numbatches,
404 : int *num_skew_mcvs)
405 : {
406 : int tupsize;
407 : double inner_rel_bytes;
408 : long bucket_bytes;
409 : long hash_table_bytes;
410 : long skew_table_bytes;
411 : long max_pointers;
412 : long mppow2;
413 13387 : int nbatch = 1;
414 : int nbuckets;
415 : double dbuckets;
416 :
417 : /* Force a plausible relation size if no info */
418 13387 : if (ntuples <= 0.0)
419 1 : ntuples = 1000.0;
420 :
421 : /*
422 : * Estimate tupsize based on footprint of tuple in hashtable... note this
423 : * does not allow for any palloc overhead. The manipulations of spaceUsed
424 : * don't count palloc overhead either.
425 : */
426 13387 : tupsize = HJTUPLE_OVERHEAD +
427 13387 : MAXALIGN(SizeofMinimalTupleHeader) +
428 13387 : MAXALIGN(tupwidth);
429 13387 : inner_rel_bytes = ntuples * tupsize;
430 :
431 : /*
432 : * Target in-memory hashtable size is work_mem kilobytes.
433 : */
434 13387 : hash_table_bytes = work_mem * 1024L;
435 :
436 : /*
437 : * If skew optimization is possible, estimate the number of skew buckets
438 : * that will fit in the memory allowed, and decrement the assumed space
439 : * available for the main hash table accordingly.
440 : *
441 : * We make the optimistic assumption that each skew bucket will contain
442 : * one inner-relation tuple. If that turns out to be low, we will recover
443 : * at runtime by reducing the number of skew buckets.
444 : *
445 : * hashtable->skewBucket will have up to 8 times as many HashSkewBucket
446 : * pointers as the number of MCVs we allow, since ExecHashBuildSkewHash
447 : * will round up to the next power of 2 and then multiply by 4 to reduce
448 : * collisions.
449 : */
450 13387 : if (useskew)
451 : {
452 13272 : skew_table_bytes = hash_table_bytes * SKEW_WORK_MEM_PERCENT / 100;
453 :
454 : /*----------
455 : * Divisor is:
456 : * size of a hash tuple +
457 : * worst-case size of skewBucket[] per MCV +
458 : * size of skewBucketNums[] entry +
459 : * size of skew bucket struct itself
460 : *----------
461 : */
462 26544 : *num_skew_mcvs = skew_table_bytes / (tupsize +
463 : (8 * sizeof(HashSkewBucket *)) +
464 13272 : sizeof(int) +
465 : SKEW_BUCKET_OVERHEAD);
466 13272 : if (*num_skew_mcvs > 0)
467 13272 : hash_table_bytes -= skew_table_bytes;
468 : }
469 : else
470 115 : *num_skew_mcvs = 0;
471 :
472 : /*
473 : * Set nbuckets to achieve an average bucket load of NTUP_PER_BUCKET when
474 : * memory is filled, assuming a single batch; but limit the value so that
475 : * the pointer arrays we'll try to allocate do not exceed work_mem nor
476 : * MaxAllocSize.
477 : *
478 : * Note that both nbuckets and nbatch must be powers of 2 to make
479 : * ExecHashGetBucketAndBatch fast.
480 : */
481 13387 : max_pointers = (work_mem * 1024L) / sizeof(HashJoinTuple);
482 13387 : max_pointers = Min(max_pointers, MaxAllocSize / sizeof(HashJoinTuple));
483 : /* If max_pointers isn't a power of 2, must round it down to one */
484 13387 : mppow2 = 1L << my_log2(max_pointers);
485 13387 : if (max_pointers != mppow2)
486 0 : max_pointers = mppow2 / 2;
487 :
488 : /* Also ensure we avoid integer overflow in nbatch and nbuckets */
489 : /* (this step is redundant given the current value of MaxAllocSize) */
490 13387 : max_pointers = Min(max_pointers, INT_MAX / 2);
491 :
492 13387 : dbuckets = ceil(ntuples / NTUP_PER_BUCKET);
493 13387 : dbuckets = Min(dbuckets, max_pointers);
494 13387 : nbuckets = (int) dbuckets;
495 : /* don't let nbuckets be really small, though ... */
496 13387 : nbuckets = Max(nbuckets, 1024);
497 : /* ... and force it to be a power of 2. */
498 13387 : nbuckets = 1 << my_log2(nbuckets);
499 :
500 : /*
501 : * If there's not enough space to store the projected number of tuples and
502 : * the required bucket headers, we will need multiple batches.
503 : */
504 13387 : bucket_bytes = sizeof(HashJoinTuple) * nbuckets;
505 13387 : if (inner_rel_bytes + bucket_bytes > hash_table_bytes)
506 : {
507 : /* We'll need multiple batches */
508 : long lbuckets;
509 : double dbatch;
510 : int minbatch;
511 : long bucket_size;
512 :
513 : /*
514 : * Estimate the number of buckets we'll want to have when work_mem is
515 : * entirely full. Each bucket will contain a bucket pointer plus
516 : * NTUP_PER_BUCKET tuples, whose projected size already includes
517 : * overhead for the hash code, pointer to the next tuple, etc.
518 : */
519 122 : bucket_size = (tupsize * NTUP_PER_BUCKET + sizeof(HashJoinTuple));
520 122 : lbuckets = 1L << my_log2(hash_table_bytes / bucket_size);
521 122 : lbuckets = Min(lbuckets, max_pointers);
522 122 : nbuckets = (int) lbuckets;
523 122 : nbuckets = 1 << my_log2(nbuckets);
524 122 : bucket_bytes = nbuckets * sizeof(HashJoinTuple);
525 :
526 : /*
527 : * Buckets are simple pointers to hashjoin tuples, while tupsize
528 : * includes the pointer, hash code, and MinimalTupleData. So buckets
529 : * should never really exceed 25% of work_mem (even for
530 : * NTUP_PER_BUCKET=1); except maybe for work_mem values that are not
531 : * 2^N bytes, where we might get more because of doubling. So let's
532 : * look for 50% here.
533 : */
534 122 : Assert(bucket_bytes <= hash_table_bytes / 2);
535 :
536 : /* Calculate required number of batches. */
537 122 : dbatch = ceil(inner_rel_bytes / (hash_table_bytes - bucket_bytes));
538 122 : dbatch = Min(dbatch, max_pointers);
539 122 : minbatch = (int) dbatch;
540 122 : nbatch = 2;
541 729 : while (nbatch < minbatch)
542 485 : nbatch <<= 1;
543 : }
544 :
545 13387 : Assert(nbuckets > 0);
546 13387 : Assert(nbatch > 0);
547 :
548 13387 : *numbuckets = nbuckets;
549 13387 : *numbatches = nbatch;
550 13387 : }
551 :
552 :
553 : /* ----------------------------------------------------------------
554 : * ExecHashTableDestroy
555 : *
556 : * destroy a hash table
557 : * ----------------------------------------------------------------
558 : */
559 : void
560 1000 : ExecHashTableDestroy(HashJoinTable hashtable)
561 : {
562 : int i;
563 :
564 : /*
565 : * Make sure all the temp files are closed. We skip batch 0, since it
566 : * can't have any temp files (and the arrays might not even exist if
567 : * nbatch is only 1).
568 : */
569 1007 : for (i = 1; i < hashtable->nbatch; i++)
570 : {
571 7 : if (hashtable->innerBatchFile[i])
572 0 : BufFileClose(hashtable->innerBatchFile[i]);
573 7 : if (hashtable->outerBatchFile[i])
574 0 : BufFileClose(hashtable->outerBatchFile[i]);
575 : }
576 :
577 : /* Release working memory (batchCxt is a child, so it goes away too) */
578 1000 : MemoryContextDelete(hashtable->hashCxt);
579 :
580 : /* And drop the control block */
581 1000 : pfree(hashtable);
582 1000 : }
583 :
584 : /*
585 : * ExecHashIncreaseNumBatches
586 : * increase the original number of batches in order to reduce
587 : * current memory consumption
588 : */
589 : static void
590 2 : ExecHashIncreaseNumBatches(HashJoinTable hashtable)
591 : {
592 2 : int oldnbatch = hashtable->nbatch;
593 2 : int curbatch = hashtable->curbatch;
594 : int nbatch;
595 : MemoryContext oldcxt;
596 : long ninmemory;
597 : long nfreed;
598 : HashMemoryChunk oldchunks;
599 :
600 : /* do nothing if we've decided to shut off growth */
601 2 : if (!hashtable->growEnabled)
602 0 : return;
603 :
604 : /* safety check to avoid overflow */
605 2 : if (oldnbatch > Min(INT_MAX / 2, MaxAllocSize / (sizeof(void *) * 2)))
606 0 : return;
607 :
608 2 : nbatch = oldnbatch * 2;
609 2 : Assert(nbatch > 1);
610 :
611 : #ifdef HJDEBUG
612 : printf("Hashjoin %p: increasing nbatch to %d because space = %zu\n",
613 : hashtable, nbatch, hashtable->spaceUsed);
614 : #endif
615 :
616 2 : oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
617 :
618 2 : if (hashtable->innerBatchFile == NULL)
619 : {
620 : /* we had no file arrays before */
621 0 : hashtable->innerBatchFile = (BufFile **)
622 0 : palloc0(nbatch * sizeof(BufFile *));
623 0 : hashtable->outerBatchFile = (BufFile **)
624 0 : palloc0(nbatch * sizeof(BufFile *));
625 : /* time to establish the temp tablespaces, too */
626 0 : PrepareTempTablespaces();
627 : }
628 : else
629 : {
630 : /* enlarge arrays and zero out added entries */
631 2 : hashtable->innerBatchFile = (BufFile **)
632 2 : repalloc(hashtable->innerBatchFile, nbatch * sizeof(BufFile *));
633 2 : hashtable->outerBatchFile = (BufFile **)
634 2 : repalloc(hashtable->outerBatchFile, nbatch * sizeof(BufFile *));
635 2 : MemSet(hashtable->innerBatchFile + oldnbatch, 0,
636 : (nbatch - oldnbatch) * sizeof(BufFile *));
637 2 : MemSet(hashtable->outerBatchFile + oldnbatch, 0,
638 : (nbatch - oldnbatch) * sizeof(BufFile *));
639 : }
640 :
641 2 : MemoryContextSwitchTo(oldcxt);
642 :
643 2 : hashtable->nbatch = nbatch;
644 :
645 : /*
646 : * Scan through the existing hash table entries and dump out any that are
647 : * no longer of the current batch.
648 : */
649 2 : ninmemory = nfreed = 0;
650 :
651 : /* If know we need to resize nbuckets, we can do it while rebatching. */
652 2 : if (hashtable->nbuckets_optimal != hashtable->nbuckets)
653 : {
654 : /* we never decrease the number of buckets */
655 0 : Assert(hashtable->nbuckets_optimal > hashtable->nbuckets);
656 :
657 0 : hashtable->nbuckets = hashtable->nbuckets_optimal;
658 0 : hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal;
659 :
660 0 : hashtable->buckets = repalloc(hashtable->buckets,
661 0 : sizeof(HashJoinTuple) * hashtable->nbuckets);
662 : }
663 :
664 : /*
665 : * We will scan through the chunks directly, so that we can reset the
666 : * buckets now and not have to keep track which tuples in the buckets have
667 : * already been processed. We will free the old chunks as we go.
668 : */
669 2 : memset(hashtable->buckets, 0, sizeof(HashJoinTuple) * hashtable->nbuckets);
670 2 : oldchunks = hashtable->chunks;
671 2 : hashtable->chunks = NULL;
672 :
673 : /* so, let's scan through the old chunks, and all tuples in each chunk */
674 8 : while (oldchunks != NULL)
675 : {
676 4 : HashMemoryChunk nextchunk = oldchunks->next;
677 :
678 : /* position within the buffer (up to oldchunks->used) */
679 4 : size_t idx = 0;
680 :
681 : /* process all tuples stored in this chunk (and then free it) */
682 4007 : while (idx < oldchunks->used)
683 : {
684 3999 : HashJoinTuple hashTuple = (HashJoinTuple) (oldchunks->data + idx);
685 3999 : MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
686 3999 : int hashTupleSize = (HJTUPLE_OVERHEAD + tuple->t_len);
687 : int bucketno;
688 : int batchno;
689 :
690 3999 : ninmemory++;
691 3999 : ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
692 : &bucketno, &batchno);
693 :
694 3999 : if (batchno == curbatch)
695 : {
696 : /* keep tuple in memory - copy it into the new chunk */
697 : HashJoinTuple copyTuple;
698 :
699 2004 : copyTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize);
700 2004 : memcpy(copyTuple, hashTuple, hashTupleSize);
701 :
702 : /* and add it back to the appropriate bucket */
703 2004 : copyTuple->next = hashtable->buckets[bucketno];
704 2004 : hashtable->buckets[bucketno] = copyTuple;
705 : }
706 : else
707 : {
708 : /* dump it out */
709 1995 : Assert(batchno > curbatch);
710 3990 : ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple),
711 : hashTuple->hashvalue,
712 3990 : &hashtable->innerBatchFile[batchno]);
713 :
714 1995 : hashtable->spaceUsed -= hashTupleSize;
715 1995 : nfreed++;
716 : }
717 :
718 : /* next tuple in this chunk */
719 3999 : idx += MAXALIGN(hashTupleSize);
720 :
721 : /* allow this loop to be cancellable */
722 3999 : CHECK_FOR_INTERRUPTS();
723 : }
724 :
725 : /* we're done with this chunk - free it and proceed to the next one */
726 4 : pfree(oldchunks);
727 4 : oldchunks = nextchunk;
728 : }
729 :
730 : #ifdef HJDEBUG
731 : printf("Hashjoin %p: freed %ld of %ld tuples, space now %zu\n",
732 : hashtable, nfreed, ninmemory, hashtable->spaceUsed);
733 : #endif
734 :
735 : /*
736 : * If we dumped out either all or none of the tuples in the table, disable
737 : * further expansion of nbatch. This situation implies that we have
738 : * enough tuples of identical hashvalues to overflow spaceAllowed.
739 : * Increasing nbatch will not fix it since there's no way to subdivide the
740 : * group any more finely. We have to just gut it out and hope the server
741 : * has enough RAM.
742 : */
743 2 : if (nfreed == 0 || nfreed == ninmemory)
744 : {
745 0 : hashtable->growEnabled = false;
746 : #ifdef HJDEBUG
747 : printf("Hashjoin %p: disabling further increase of nbatch\n",
748 : hashtable);
749 : #endif
750 : }
751 : }
752 :
753 : /*
754 : * ExecHashIncreaseNumBuckets
755 : * increase the original number of buckets in order to reduce
756 : * number of tuples per bucket
757 : */
758 : static void
759 1 : ExecHashIncreaseNumBuckets(HashJoinTable hashtable)
760 : {
761 : HashMemoryChunk chunk;
762 :
763 : /* do nothing if not an increase (it's called increase for a reason) */
764 1 : if (hashtable->nbuckets >= hashtable->nbuckets_optimal)
765 1 : return;
766 :
767 : #ifdef HJDEBUG
768 : printf("Hashjoin %p: increasing nbuckets %d => %d\n",
769 : hashtable, hashtable->nbuckets, hashtable->nbuckets_optimal);
770 : #endif
771 :
772 1 : hashtable->nbuckets = hashtable->nbuckets_optimal;
773 1 : hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal;
774 :
775 1 : Assert(hashtable->nbuckets > 1);
776 1 : Assert(hashtable->nbuckets <= (INT_MAX / 2));
777 1 : Assert(hashtable->nbuckets == (1 << hashtable->log2_nbuckets));
778 :
779 : /*
780 : * Just reallocate the proper number of buckets - we don't need to walk
781 : * through them - we can walk the dense-allocated chunks (just like in
782 : * ExecHashIncreaseNumBatches, but without all the copying into new
783 : * chunks)
784 : */
785 1 : hashtable->buckets =
786 1 : (HashJoinTuple *) repalloc(hashtable->buckets,
787 1 : hashtable->nbuckets * sizeof(HashJoinTuple));
788 :
789 1 : memset(hashtable->buckets, 0, hashtable->nbuckets * sizeof(HashJoinTuple));
790 :
791 : /* scan through all tuples in all chunks to rebuild the hash table */
792 3 : for (chunk = hashtable->chunks; chunk != NULL; chunk = chunk->next)
793 : {
794 : /* process all tuples stored in this chunk */
795 2 : size_t idx = 0;
796 :
797 2064 : while (idx < chunk->used)
798 : {
799 2060 : HashJoinTuple hashTuple = (HashJoinTuple) (chunk->data + idx);
800 : int bucketno;
801 : int batchno;
802 :
803 2060 : ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
804 : &bucketno, &batchno);
805 :
806 : /* add the tuple to the proper bucket */
807 2060 : hashTuple->next = hashtable->buckets[bucketno];
808 2060 : hashtable->buckets[bucketno] = hashTuple;
809 :
810 : /* advance index past the tuple */
811 2060 : idx += MAXALIGN(HJTUPLE_OVERHEAD +
812 : HJTUPLE_MINTUPLE(hashTuple)->t_len);
813 : }
814 :
815 : /* allow this loop to be cancellable */
816 2 : CHECK_FOR_INTERRUPTS();
817 : }
818 : }
819 :
820 :
821 : /*
822 : * ExecHashTableInsert
823 : * insert a tuple into the hash table depending on the hash value
824 : * it may just go to a temp file for later batches
825 : *
826 : * Note: the passed TupleTableSlot may contain a regular, minimal, or virtual
827 : * tuple; the minimal case in particular is certain to happen while reloading
828 : * tuples from batch files. We could save some cycles in the regular-tuple
829 : * case by not forcing the slot contents into minimal form; not clear if it's
830 : * worth the messiness required.
831 : */
832 : void
833 288690 : ExecHashTableInsert(HashJoinTable hashtable,
834 : TupleTableSlot *slot,
835 : uint32 hashvalue)
836 : {
837 288690 : MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot);
838 : int bucketno;
839 : int batchno;
840 :
841 288690 : ExecHashGetBucketAndBatch(hashtable, hashvalue,
842 : &bucketno, &batchno);
843 :
844 : /*
845 : * decide whether to put the tuple in the hash table or a temp file
846 : */
847 288690 : if (batchno == hashtable->curbatch)
848 : {
849 : /*
850 : * put the tuple in hash table
851 : */
852 : HashJoinTuple hashTuple;
853 : int hashTupleSize;
854 278385 : double ntuples = (hashtable->totalTuples - hashtable->skewTuples);
855 :
856 : /* Create the HashJoinTuple */
857 278385 : hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
858 278385 : hashTuple = (HashJoinTuple) dense_alloc(hashtable, hashTupleSize);
859 :
860 278385 : hashTuple->hashvalue = hashvalue;
861 278385 : memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
862 :
863 : /*
864 : * We always reset the tuple-matched flag on insertion. This is okay
865 : * even when reloading a tuple from a batch file, since the tuple
866 : * could not possibly have been matched to an outer tuple before it
867 : * went into the batch file.
868 : */
869 278385 : HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple));
870 :
871 : /* Push it onto the front of the bucket's list */
872 278385 : hashTuple->next = hashtable->buckets[bucketno];
873 278385 : hashtable->buckets[bucketno] = hashTuple;
874 :
875 : /*
876 : * Increase the (optimal) number of buckets if we just exceeded the
877 : * NTUP_PER_BUCKET threshold, but only when there's still a single
878 : * batch.
879 : */
880 544821 : if (hashtable->nbatch == 1 &&
881 266436 : ntuples > (hashtable->nbuckets_optimal * NTUP_PER_BUCKET))
882 : {
883 : /* Guard against integer overflow and alloc size overflow */
884 4 : if (hashtable->nbuckets_optimal <= INT_MAX / 2 &&
885 2 : hashtable->nbuckets_optimal * 2 <= MaxAllocSize / sizeof(HashJoinTuple))
886 : {
887 2 : hashtable->nbuckets_optimal *= 2;
888 2 : hashtable->log2_nbuckets_optimal += 1;
889 : }
890 : }
891 :
892 : /* Account for space used, and back off if we've used too much */
893 278385 : hashtable->spaceUsed += hashTupleSize;
894 278385 : if (hashtable->spaceUsed > hashtable->spacePeak)
895 268404 : hashtable->spacePeak = hashtable->spaceUsed;
896 835155 : if (hashtable->spaceUsed +
897 278385 : hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
898 278385 : > hashtable->spaceAllowed)
899 2 : ExecHashIncreaseNumBatches(hashtable);
900 : }
901 : else
902 : {
903 : /*
904 : * put the tuple into a temp file for later batches
905 : */
906 10305 : Assert(batchno > hashtable->curbatch);
907 20610 : ExecHashJoinSaveTuple(tuple,
908 : hashvalue,
909 20610 : &hashtable->innerBatchFile[batchno]);
910 : }
911 288690 : }
912 :
913 : /*
914 : * ExecHashGetHashValue
915 : * Compute the hash value for a tuple
916 : *
917 : * The tuple to be tested must be in either econtext->ecxt_outertuple or
918 : * econtext->ecxt_innertuple. Vars in the hashkeys expressions should have
919 : * varno either OUTER_VAR or INNER_VAR.
920 : *
921 : * A TRUE result means the tuple's hash value has been successfully computed
922 : * and stored at *hashvalue. A FALSE result means the tuple cannot match
923 : * because it contains a null attribute, and hence it should be discarded
924 : * immediately. (If keep_nulls is true then FALSE is never returned.)
925 : */
926 : bool
927 676180 : ExecHashGetHashValue(HashJoinTable hashtable,
928 : ExprContext *econtext,
929 : List *hashkeys,
930 : bool outer_tuple,
931 : bool keep_nulls,
932 : uint32 *hashvalue)
933 : {
934 676180 : uint32 hashkey = 0;
935 : FmgrInfo *hashfunctions;
936 : ListCell *hk;
937 676180 : int i = 0;
938 : MemoryContext oldContext;
939 :
940 : /*
941 : * We reset the eval context each time to reclaim any memory leaked in the
942 : * hashkey expressions.
943 : */
944 676180 : ResetExprContext(econtext);
945 :
946 676180 : oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
947 :
948 676180 : if (outer_tuple)
949 399744 : hashfunctions = hashtable->outer_hashfunctions;
950 : else
951 276436 : hashfunctions = hashtable->inner_hashfunctions;
952 :
953 1509749 : foreach(hk, hashkeys)
954 : {
955 833684 : ExprState *keyexpr = (ExprState *) lfirst(hk);
956 : Datum keyval;
957 : bool isNull;
958 :
959 : /* rotate hashkey left 1 bit at each step */
960 833684 : hashkey = (hashkey << 1) | ((hashkey & 0x80000000) ? 1 : 0);
961 :
962 : /*
963 : * Get the join attribute value of the tuple
964 : */
965 833684 : keyval = ExecEvalExpr(keyexpr, econtext, &isNull);
966 :
967 : /*
968 : * If the attribute is NULL, and the join operator is strict, then
969 : * this tuple cannot pass the join qual so we can reject it
970 : * immediately (unless we're scanning the outside of an outer join, in
971 : * which case we must not reject it). Otherwise we act like the
972 : * hashcode of NULL is zero (this will support operators that act like
973 : * IS NOT DISTINCT, though not any more-random behavior). We treat
974 : * the hash support function as strict even if the operator is not.
975 : *
976 : * Note: currently, all hashjoinable operators must be strict since
977 : * the hash index AM assumes that. However, it takes so little extra
978 : * code here to allow non-strict that we may as well do it.
979 : */
980 833684 : if (isNull)
981 : {
982 128 : if (hashtable->hashStrict[i] && !keep_nulls)
983 : {
984 115 : MemoryContextSwitchTo(oldContext);
985 115 : return false; /* cannot match */
986 : }
987 : /* else, leave hashkey unmodified, equivalent to hashcode 0 */
988 : }
989 : else
990 : {
991 : /* Compute the hash function */
992 : uint32 hkey;
993 :
994 833556 : hkey = DatumGetUInt32(FunctionCall1(&hashfunctions[i], keyval));
995 833556 : hashkey ^= hkey;
996 : }
997 :
998 833569 : i++;
999 : }
1000 :
1001 676065 : MemoryContextSwitchTo(oldContext);
1002 :
1003 676065 : *hashvalue = hashkey;
1004 676065 : return true;
1005 : }
1006 :
1007 : /*
1008 : * ExecHashGetBucketAndBatch
1009 : * Determine the bucket number and batch number for a hash value
1010 : *
1011 : * Note: on-the-fly increases of nbatch must not change the bucket number
1012 : * for a given hash code (since we don't move tuples to different hash
1013 : * chains), and must only cause the batch number to remain the same or
1014 : * increase. Our algorithm is
1015 : * bucketno = hashvalue MOD nbuckets
1016 : * batchno = (hashvalue DIV nbuckets) MOD nbatch
1017 : * where nbuckets and nbatch are both expected to be powers of 2, so we can
1018 : * do the computations by shifting and masking. (This assumes that all hash
1019 : * functions are good about randomizing all their output bits, else we are
1020 : * likely to have very skewed bucket or batch occupancy.)
1021 : *
1022 : * nbuckets and log2_nbuckets may change while nbatch == 1 because of dynamic
1023 : * bucket count growth. Once we start batching, the value is fixed and does
1024 : * not change over the course of the join (making it possible to compute batch
1025 : * number the way we do here).
1026 : *
1027 : * nbatch is always a power of 2; we increase it only by doubling it. This
1028 : * effectively adds one more bit to the top of the batchno.
1029 : */
1030 : void
1031 702795 : ExecHashGetBucketAndBatch(HashJoinTable hashtable,
1032 : uint32 hashvalue,
1033 : int *bucketno,
1034 : int *batchno)
1035 : {
1036 702795 : uint32 nbuckets = (uint32) hashtable->nbuckets;
1037 702795 : uint32 nbatch = (uint32) hashtable->nbatch;
1038 :
1039 702795 : if (nbatch > 1)
1040 : {
1041 : /* we can do MOD by masking, DIV by shifting */
1042 44670 : *bucketno = hashvalue & (nbuckets - 1);
1043 44670 : *batchno = (hashvalue >> hashtable->log2_nbuckets) & (nbatch - 1);
1044 : }
1045 : else
1046 : {
1047 658125 : *bucketno = hashvalue & (nbuckets - 1);
1048 658125 : *batchno = 0;
1049 : }
1050 702795 : }
1051 :
1052 : /*
1053 : * ExecScanHashBucket
1054 : * scan a hash bucket for matches to the current outer tuple
1055 : *
1056 : * The current outer tuple must be stored in econtext->ecxt_outertuple.
1057 : *
1058 : * On success, the inner tuple is stored into hjstate->hj_CurTuple and
1059 : * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
1060 : * for the latter.
1061 : */
1062 : bool
1063 558875 : ExecScanHashBucket(HashJoinState *hjstate,
1064 : ExprContext *econtext)
1065 : {
1066 558875 : ExprState *hjclauses = hjstate->hashclauses;
1067 558875 : HashJoinTable hashtable = hjstate->hj_HashTable;
1068 558875 : HashJoinTuple hashTuple = hjstate->hj_CurTuple;
1069 558875 : uint32 hashvalue = hjstate->hj_CurHashValue;
1070 :
1071 : /*
1072 : * hj_CurTuple is the address of the tuple last returned from the current
1073 : * bucket, or NULL if it's time to start scanning a new bucket.
1074 : *
1075 : * If the tuple hashed to a skew bucket then scan the skew bucket
1076 : * otherwise scan the standard hashtable bucket.
1077 : */
1078 558875 : if (hashTuple != NULL)
1079 159246 : hashTuple = hashTuple->next;
1080 399629 : else if (hjstate->hj_CurSkewBucketNo != INVALID_SKEW_BUCKET_NO)
1081 200 : hashTuple = hashtable->skewBucket[hjstate->hj_CurSkewBucketNo]->tuples;
1082 : else
1083 399429 : hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo];
1084 :
1085 1217396 : while (hashTuple != NULL)
1086 : {
1087 342051 : if (hashTuple->hashvalue == hashvalue)
1088 : {
1089 : TupleTableSlot *inntuple;
1090 :
1091 : /* insert hashtable's tuple into exec slot so ExecQual sees it */
1092 242405 : inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
1093 : hjstate->hj_HashTupleSlot,
1094 : false); /* do not pfree */
1095 242405 : econtext->ecxt_innertuple = inntuple;
1096 :
1097 : /* reset temp memory each time to avoid leaks from qual expr */
1098 242405 : ResetExprContext(econtext);
1099 :
1100 242405 : if (ExecQual(hjclauses, econtext))
1101 : {
1102 242405 : hjstate->hj_CurTuple = hashTuple;
1103 242405 : return true;
1104 : }
1105 : }
1106 :
1107 99646 : hashTuple = hashTuple->next;
1108 : }
1109 :
1110 : /*
1111 : * no match
1112 : */
1113 316470 : return false;
1114 : }
1115 :
1116 : /*
1117 : * ExecPrepHashTableForUnmatched
1118 : * set up for a series of ExecScanHashTableForUnmatched calls
1119 : */
1120 : void
1121 173 : ExecPrepHashTableForUnmatched(HashJoinState *hjstate)
1122 : {
1123 : /*----------
1124 : * During this scan we use the HashJoinState fields as follows:
1125 : *
1126 : * hj_CurBucketNo: next regular bucket to scan
1127 : * hj_CurSkewBucketNo: next skew bucket (an index into skewBucketNums)
1128 : * hj_CurTuple: last tuple returned, or NULL to start next bucket
1129 : *----------
1130 : */
1131 173 : hjstate->hj_CurBucketNo = 0;
1132 173 : hjstate->hj_CurSkewBucketNo = 0;
1133 173 : hjstate->hj_CurTuple = NULL;
1134 173 : }
1135 :
1136 : /*
1137 : * ExecScanHashTableForUnmatched
1138 : * scan the hash table for unmatched inner tuples
1139 : *
1140 : * On success, the inner tuple is stored into hjstate->hj_CurTuple and
1141 : * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
1142 : * for the latter.
1143 : */
1144 : bool
1145 771 : ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
1146 : {
1147 771 : HashJoinTable hashtable = hjstate->hj_HashTable;
1148 771 : HashJoinTuple hashTuple = hjstate->hj_CurTuple;
1149 :
1150 : for (;;)
1151 : {
1152 : /*
1153 : * hj_CurTuple is the address of the tuple last returned from the
1154 : * current bucket, or NULL if it's time to start scanning a new
1155 : * bucket.
1156 : */
1157 176899 : if (hashTuple != NULL)
1158 598 : hashTuple = hashTuple->next;
1159 176301 : else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
1160 : {
1161 176129 : hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo];
1162 176129 : hjstate->hj_CurBucketNo++;
1163 : }
1164 172 : else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets)
1165 : {
1166 0 : int j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo];
1167 :
1168 0 : hashTuple = hashtable->skewBucket[j]->tuples;
1169 0 : hjstate->hj_CurSkewBucketNo++;
1170 : }
1171 : else
1172 172 : break; /* finished all buckets */
1173 :
1174 355553 : while (hashTuple != NULL)
1175 : {
1176 2698 : if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
1177 : {
1178 : TupleTableSlot *inntuple;
1179 :
1180 : /* insert hashtable's tuple into exec slot */
1181 599 : inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
1182 : hjstate->hj_HashTupleSlot,
1183 : false); /* do not pfree */
1184 599 : econtext->ecxt_innertuple = inntuple;
1185 :
1186 : /*
1187 : * Reset temp memory each time; although this function doesn't
1188 : * do any qual eval, the caller will, so let's keep it
1189 : * parallel to ExecScanHashBucket.
1190 : */
1191 599 : ResetExprContext(econtext);
1192 :
1193 599 : hjstate->hj_CurTuple = hashTuple;
1194 599 : return true;
1195 : }
1196 :
1197 2099 : hashTuple = hashTuple->next;
1198 : }
1199 :
1200 : /* allow this loop to be cancellable */
1201 176128 : CHECK_FOR_INTERRUPTS();
1202 176128 : }
1203 :
1204 : /*
1205 : * no more unmatched tuples
1206 : */
1207 172 : return false;
1208 : }
1209 :
1210 : /*
1211 : * ExecHashTableReset
1212 : *
1213 : * reset hash table header for new batch
1214 : */
1215 : void
1216 7 : ExecHashTableReset(HashJoinTable hashtable)
1217 : {
1218 : MemoryContext oldcxt;
1219 7 : int nbuckets = hashtable->nbuckets;
1220 :
1221 : /*
1222 : * Release all the hash buckets and tuples acquired in the prior pass, and
1223 : * reinitialize the context for a new pass.
1224 : */
1225 7 : MemoryContextReset(hashtable->batchCxt);
1226 7 : oldcxt = MemoryContextSwitchTo(hashtable->batchCxt);
1227 :
1228 : /* Reallocate and reinitialize the hash bucket headers. */
1229 7 : hashtable->buckets = (HashJoinTuple *)
1230 7 : palloc0(nbuckets * sizeof(HashJoinTuple));
1231 :
1232 7 : hashtable->spaceUsed = 0;
1233 :
1234 7 : MemoryContextSwitchTo(oldcxt);
1235 :
1236 : /* Forget the chunks (the memory was freed by the context reset above). */
1237 7 : hashtable->chunks = NULL;
1238 7 : }
1239 :
1240 : /*
1241 : * ExecHashTableResetMatchFlags
1242 : * Clear all the HeapTupleHeaderHasMatch flags in the table
1243 : */
1244 : void
1245 4 : ExecHashTableResetMatchFlags(HashJoinTable hashtable)
1246 : {
1247 : HashJoinTuple tuple;
1248 : int i;
1249 :
1250 : /* Reset all flags in the main table ... */
1251 4100 : for (i = 0; i < hashtable->nbuckets; i++)
1252 : {
1253 4116 : for (tuple = hashtable->buckets[i]; tuple != NULL; tuple = tuple->next)
1254 20 : HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(tuple));
1255 : }
1256 :
1257 : /* ... and the same for the skew buckets, if any */
1258 4 : for (i = 0; i < hashtable->nSkewBuckets; i++)
1259 : {
1260 0 : int j = hashtable->skewBucketNums[i];
1261 0 : HashSkewBucket *skewBucket = hashtable->skewBucket[j];
1262 :
1263 0 : for (tuple = skewBucket->tuples; tuple != NULL; tuple = tuple->next)
1264 0 : HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(tuple));
1265 : }
1266 4 : }
1267 :
1268 :
1269 : void
1270 128 : ExecReScanHash(HashState *node)
1271 : {
1272 : /*
1273 : * if chgParam of subnode is not null then plan will be re-scanned by
1274 : * first ExecProcNode.
1275 : */
1276 128 : if (node->ps.lefttree->chgParam == NULL)
1277 0 : ExecReScan(node->ps.lefttree);
1278 128 : }
1279 :
1280 :
1281 : /*
1282 : * ExecHashBuildSkewHash
1283 : *
1284 : * Set up for skew optimization if we can identify the most common values
1285 : * (MCVs) of the outer relation's join key. We make a skew hash bucket
1286 : * for the hash value of each MCV, up to the number of slots allowed
1287 : * based on available memory.
1288 : */
1289 : static void
1290 1 : ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node, int mcvsToUse)
1291 : {
1292 : HeapTupleData *statsTuple;
1293 : AttStatsSlot sslot;
1294 :
1295 : /* Do nothing if planner didn't identify the outer relation's join key */
1296 1 : if (!OidIsValid(node->skewTable))
1297 0 : return;
1298 : /* Also, do nothing if we don't have room for at least one skew bucket */
1299 1 : if (mcvsToUse <= 0)
1300 0 : return;
1301 :
1302 : /*
1303 : * Try to find the MCV statistics for the outer relation's join key.
1304 : */
1305 1 : statsTuple = SearchSysCache3(STATRELATTINH,
1306 : ObjectIdGetDatum(node->skewTable),
1307 : Int16GetDatum(node->skewColumn),
1308 : BoolGetDatum(node->skewInherit));
1309 1 : if (!HeapTupleIsValid(statsTuple))
1310 0 : return;
1311 :
1312 1 : if (get_attstatsslot(&sslot, statsTuple,
1313 : STATISTIC_KIND_MCV, InvalidOid,
1314 : ATTSTATSSLOT_VALUES | ATTSTATSSLOT_NUMBERS))
1315 : {
1316 : double frac;
1317 : int nbuckets;
1318 : FmgrInfo *hashfunctions;
1319 : int i;
1320 :
1321 1 : if (mcvsToUse > sslot.nvalues)
1322 0 : mcvsToUse = sslot.nvalues;
1323 :
1324 : /*
1325 : * Calculate the expected fraction of outer relation that will
1326 : * participate in the skew optimization. If this isn't at least
1327 : * SKEW_MIN_OUTER_FRACTION, don't use skew optimization.
1328 : */
1329 1 : frac = 0;
1330 20 : for (i = 0; i < mcvsToUse; i++)
1331 19 : frac += sslot.numbers[i];
1332 1 : if (frac < SKEW_MIN_OUTER_FRACTION)
1333 : {
1334 0 : free_attstatsslot(&sslot);
1335 0 : ReleaseSysCache(statsTuple);
1336 0 : return;
1337 : }
1338 :
1339 : /*
1340 : * Okay, set up the skew hashtable.
1341 : *
1342 : * skewBucket[] is an open addressing hashtable with a power of 2 size
1343 : * that is greater than the number of MCV values. (This ensures there
1344 : * will be at least one null entry, so searches will always
1345 : * terminate.)
1346 : *
1347 : * Note: this code could fail if mcvsToUse exceeds INT_MAX/8 or
1348 : * MaxAllocSize/sizeof(void *)/8, but that is not currently possible
1349 : * since we limit pg_statistic entries to much less than that.
1350 : */
1351 1 : nbuckets = 2;
1352 6 : while (nbuckets <= mcvsToUse)
1353 4 : nbuckets <<= 1;
1354 : /* use two more bits just to help avoid collisions */
1355 1 : nbuckets <<= 2;
1356 :
1357 1 : hashtable->skewEnabled = true;
1358 1 : hashtable->skewBucketLen = nbuckets;
1359 :
1360 : /*
1361 : * We allocate the bucket memory in the hashtable's batch context. It
1362 : * is only needed during the first batch, and this ensures it will be
1363 : * automatically removed once the first batch is done.
1364 : */
1365 1 : hashtable->skewBucket = (HashSkewBucket **)
1366 1 : MemoryContextAllocZero(hashtable->batchCxt,
1367 : nbuckets * sizeof(HashSkewBucket *));
1368 1 : hashtable->skewBucketNums = (int *)
1369 1 : MemoryContextAllocZero(hashtable->batchCxt,
1370 : mcvsToUse * sizeof(int));
1371 :
1372 2 : hashtable->spaceUsed += nbuckets * sizeof(HashSkewBucket *)
1373 1 : + mcvsToUse * sizeof(int);
1374 2 : hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *)
1375 1 : + mcvsToUse * sizeof(int);
1376 1 : if (hashtable->spaceUsed > hashtable->spacePeak)
1377 1 : hashtable->spacePeak = hashtable->spaceUsed;
1378 :
1379 : /*
1380 : * Create a skew bucket for each MCV hash value.
1381 : *
1382 : * Note: it is very important that we create the buckets in order of
1383 : * decreasing MCV frequency. If we have to remove some buckets, they
1384 : * must be removed in reverse order of creation (see notes in
1385 : * ExecHashRemoveNextSkewBucket) and we want the least common MCVs to
1386 : * be removed first.
1387 : */
1388 1 : hashfunctions = hashtable->outer_hashfunctions;
1389 :
1390 20 : for (i = 0; i < mcvsToUse; i++)
1391 : {
1392 : uint32 hashvalue;
1393 : int bucket;
1394 :
1395 19 : hashvalue = DatumGetUInt32(FunctionCall1(&hashfunctions[0],
1396 : sslot.values[i]));
1397 :
1398 : /*
1399 : * While we have not hit a hole in the hashtable and have not hit
1400 : * the desired bucket, we have collided with some previous hash
1401 : * value, so try the next bucket location. NB: this code must
1402 : * match ExecHashGetSkewBucket.
1403 : */
1404 19 : bucket = hashvalue & (nbuckets - 1);
1405 38 : while (hashtable->skewBucket[bucket] != NULL &&
1406 0 : hashtable->skewBucket[bucket]->hashvalue != hashvalue)
1407 0 : bucket = (bucket + 1) & (nbuckets - 1);
1408 :
1409 : /*
1410 : * If we found an existing bucket with the same hashvalue, leave
1411 : * it alone. It's okay for two MCVs to share a hashvalue.
1412 : */
1413 19 : if (hashtable->skewBucket[bucket] != NULL)
1414 0 : continue;
1415 :
1416 : /* Okay, create a new skew bucket for this hashvalue. */
1417 38 : hashtable->skewBucket[bucket] = (HashSkewBucket *)
1418 19 : MemoryContextAlloc(hashtable->batchCxt,
1419 : sizeof(HashSkewBucket));
1420 19 : hashtable->skewBucket[bucket]->hashvalue = hashvalue;
1421 19 : hashtable->skewBucket[bucket]->tuples = NULL;
1422 19 : hashtable->skewBucketNums[hashtable->nSkewBuckets] = bucket;
1423 19 : hashtable->nSkewBuckets++;
1424 19 : hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD;
1425 19 : hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD;
1426 19 : if (hashtable->spaceUsed > hashtable->spacePeak)
1427 19 : hashtable->spacePeak = hashtable->spaceUsed;
1428 : }
1429 :
1430 1 : free_attstatsslot(&sslot);
1431 : }
1432 :
1433 1 : ReleaseSysCache(statsTuple);
1434 : }
1435 :
1436 : /*
1437 : * ExecHashGetSkewBucket
1438 : *
1439 : * Returns the index of the skew bucket for this hashvalue,
1440 : * or INVALID_SKEW_BUCKET_NO if the hashvalue is not
1441 : * associated with any active skew bucket.
1442 : */
1443 : int
1444 684465 : ExecHashGetSkewBucket(HashJoinTable hashtable, uint32 hashvalue)
1445 : {
1446 : int bucket;
1447 :
1448 : /*
1449 : * Always return INVALID_SKEW_BUCKET_NO if not doing skew optimization (in
1450 : * particular, this happens after the initial batch is done).
1451 : */
1452 684465 : if (!hashtable->skewEnabled)
1453 664465 : return INVALID_SKEW_BUCKET_NO;
1454 :
1455 : /*
1456 : * Since skewBucketLen is a power of 2, we can do a modulo by ANDing.
1457 : */
1458 20000 : bucket = hashvalue & (hashtable->skewBucketLen - 1);
1459 :
1460 : /*
1461 : * While we have not hit a hole in the hashtable and have not hit the
1462 : * desired bucket, we have collided with some other hash value, so try the
1463 : * next bucket location.
1464 : */
1465 42119 : while (hashtable->skewBucket[bucket] != NULL &&
1466 1198 : hashtable->skewBucket[bucket]->hashvalue != hashvalue)
1467 921 : bucket = (bucket + 1) & (hashtable->skewBucketLen - 1);
1468 :
1469 : /*
1470 : * Found the desired bucket?
1471 : */
1472 20000 : if (hashtable->skewBucket[bucket] != NULL)
1473 277 : return bucket;
1474 :
1475 : /*
1476 : * There must not be any hashtable entry for this hash value.
1477 : */
1478 19723 : return INVALID_SKEW_BUCKET_NO;
1479 : }
1480 :
1481 : /*
1482 : * ExecHashSkewTableInsert
1483 : *
1484 : * Insert a tuple into the skew hashtable.
1485 : *
1486 : * This should generally match up with the current-batch case in
1487 : * ExecHashTableInsert.
1488 : */
1489 : static void
1490 77 : ExecHashSkewTableInsert(HashJoinTable hashtable,
1491 : TupleTableSlot *slot,
1492 : uint32 hashvalue,
1493 : int bucketNumber)
1494 : {
1495 77 : MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot);
1496 : HashJoinTuple hashTuple;
1497 : int hashTupleSize;
1498 :
1499 : /* Create the HashJoinTuple */
1500 77 : hashTupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
1501 77 : hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt,
1502 : hashTupleSize);
1503 77 : hashTuple->hashvalue = hashvalue;
1504 77 : memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
1505 77 : HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple));
1506 :
1507 : /* Push it onto the front of the skew bucket's list */
1508 77 : hashTuple->next = hashtable->skewBucket[bucketNumber]->tuples;
1509 77 : hashtable->skewBucket[bucketNumber]->tuples = hashTuple;
1510 :
1511 : /* Account for space used, and back off if we've used too much */
1512 77 : hashtable->spaceUsed += hashTupleSize;
1513 77 : hashtable->spaceUsedSkew += hashTupleSize;
1514 77 : if (hashtable->spaceUsed > hashtable->spacePeak)
1515 54 : hashtable->spacePeak = hashtable->spaceUsed;
1516 171 : while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew)
1517 17 : ExecHashRemoveNextSkewBucket(hashtable);
1518 :
1519 : /* Check we are not over the total spaceAllowed, either */
1520 77 : if (hashtable->spaceUsed > hashtable->spaceAllowed)
1521 0 : ExecHashIncreaseNumBatches(hashtable);
1522 77 : }
1523 :
1524 : /*
1525 : * ExecHashRemoveNextSkewBucket
1526 : *
1527 : * Remove the least valuable skew bucket by pushing its tuples into
1528 : * the main hash table.
1529 : */
1530 : static void
1531 17 : ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
1532 : {
1533 : int bucketToRemove;
1534 : HashSkewBucket *bucket;
1535 : uint32 hashvalue;
1536 : int bucketno;
1537 : int batchno;
1538 : HashJoinTuple hashTuple;
1539 :
1540 : /* Locate the bucket to remove */
1541 17 : bucketToRemove = hashtable->skewBucketNums[hashtable->nSkewBuckets - 1];
1542 17 : bucket = hashtable->skewBucket[bucketToRemove];
1543 :
1544 : /*
1545 : * Calculate which bucket and batch the tuples belong to in the main
1546 : * hashtable. They all have the same hash value, so it's the same for all
1547 : * of them. Also note that it's not possible for nbatch to increase while
1548 : * we are processing the tuples.
1549 : */
1550 17 : hashvalue = bucket->hashvalue;
1551 17 : ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
1552 :
1553 : /* Process all tuples in the bucket */
1554 17 : hashTuple = bucket->tuples;
1555 91 : while (hashTuple != NULL)
1556 : {
1557 57 : HashJoinTuple nextHashTuple = hashTuple->next;
1558 : MinimalTuple tuple;
1559 : Size tupleSize;
1560 :
1561 : /*
1562 : * This code must agree with ExecHashTableInsert. We do not use
1563 : * ExecHashTableInsert directly as ExecHashTableInsert expects a
1564 : * TupleTableSlot while we already have HashJoinTuples.
1565 : */
1566 57 : tuple = HJTUPLE_MINTUPLE(hashTuple);
1567 57 : tupleSize = HJTUPLE_OVERHEAD + tuple->t_len;
1568 :
1569 : /* Decide whether to put the tuple in the hash table or a temp file */
1570 57 : if (batchno == hashtable->curbatch)
1571 : {
1572 : /* Move the tuple to the main hash table */
1573 : HashJoinTuple copyTuple;
1574 :
1575 : /*
1576 : * We must copy the tuple into the dense storage, else it will not
1577 : * be found by, eg, ExecHashIncreaseNumBatches.
1578 : */
1579 26 : copyTuple = (HashJoinTuple) dense_alloc(hashtable, tupleSize);
1580 26 : memcpy(copyTuple, hashTuple, tupleSize);
1581 26 : pfree(hashTuple);
1582 :
1583 26 : copyTuple->next = hashtable->buckets[bucketno];
1584 26 : hashtable->buckets[bucketno] = copyTuple;
1585 :
1586 : /* We have reduced skew space, but overall space doesn't change */
1587 26 : hashtable->spaceUsedSkew -= tupleSize;
1588 : }
1589 : else
1590 : {
1591 : /* Put the tuple into a temp file for later batches */
1592 31 : Assert(batchno > hashtable->curbatch);
1593 62 : ExecHashJoinSaveTuple(tuple, hashvalue,
1594 62 : &hashtable->innerBatchFile[batchno]);
1595 31 : pfree(hashTuple);
1596 31 : hashtable->spaceUsed -= tupleSize;
1597 31 : hashtable->spaceUsedSkew -= tupleSize;
1598 : }
1599 :
1600 57 : hashTuple = nextHashTuple;
1601 :
1602 : /* allow this loop to be cancellable */
1603 57 : CHECK_FOR_INTERRUPTS();
1604 : }
1605 :
1606 : /*
1607 : * Free the bucket struct itself and reset the hashtable entry to NULL.
1608 : *
1609 : * NOTE: this is not nearly as simple as it looks on the surface, because
1610 : * of the possibility of collisions in the hashtable. Suppose that hash
1611 : * values A and B collide at a particular hashtable entry, and that A was
1612 : * entered first so B gets shifted to a different table entry. If we were
1613 : * to remove A first then ExecHashGetSkewBucket would mistakenly start
1614 : * reporting that B is not in the hashtable, because it would hit the NULL
1615 : * before finding B. However, we always remove entries in the reverse
1616 : * order of creation, so this failure cannot happen.
1617 : */
1618 17 : hashtable->skewBucket[bucketToRemove] = NULL;
1619 17 : hashtable->nSkewBuckets--;
1620 17 : pfree(bucket);
1621 17 : hashtable->spaceUsed -= SKEW_BUCKET_OVERHEAD;
1622 17 : hashtable->spaceUsedSkew -= SKEW_BUCKET_OVERHEAD;
1623 :
1624 : /*
1625 : * If we have removed all skew buckets then give up on skew optimization.
1626 : * Release the arrays since they aren't useful any more.
1627 : */
1628 17 : if (hashtable->nSkewBuckets == 0)
1629 : {
1630 0 : hashtable->skewEnabled = false;
1631 0 : pfree(hashtable->skewBucket);
1632 0 : pfree(hashtable->skewBucketNums);
1633 0 : hashtable->skewBucket = NULL;
1634 0 : hashtable->skewBucketNums = NULL;
1635 0 : hashtable->spaceUsed -= hashtable->spaceUsedSkew;
1636 0 : hashtable->spaceUsedSkew = 0;
1637 : }
1638 17 : }
1639 :
1640 : /*
1641 : * Allocate 'size' bytes from the currently active HashMemoryChunk
1642 : */
1643 : static void *
1644 280415 : dense_alloc(HashJoinTable hashtable, Size size)
1645 : {
1646 : HashMemoryChunk newChunk;
1647 : char *ptr;
1648 :
1649 : /* just in case the size is not already aligned properly */
1650 280415 : size = MAXALIGN(size);
1651 :
1652 : /*
1653 : * If tuple size is larger than of 1/4 of chunk size, allocate a separate
1654 : * chunk.
1655 : */
1656 280415 : if (size > HASH_CHUNK_THRESHOLD)
1657 : {
1658 : /* allocate new chunk and put it at the beginning of the list */
1659 0 : newChunk = (HashMemoryChunk) MemoryContextAlloc(hashtable->batchCxt,
1660 : offsetof(HashMemoryChunkData, data) + size);
1661 0 : newChunk->maxlen = size;
1662 0 : newChunk->used = 0;
1663 0 : newChunk->ntuples = 0;
1664 :
1665 : /*
1666 : * Add this chunk to the list after the first existing chunk, so that
1667 : * we don't lose the remaining space in the "current" chunk.
1668 : */
1669 0 : if (hashtable->chunks != NULL)
1670 : {
1671 0 : newChunk->next = hashtable->chunks->next;
1672 0 : hashtable->chunks->next = newChunk;
1673 : }
1674 : else
1675 : {
1676 0 : newChunk->next = hashtable->chunks;
1677 0 : hashtable->chunks = newChunk;
1678 : }
1679 :
1680 0 : newChunk->used += size;
1681 0 : newChunk->ntuples += 1;
1682 :
1683 0 : return newChunk->data;
1684 : }
1685 :
1686 : /*
1687 : * See if we have enough space for it in the current chunk (if any). If
1688 : * not, allocate a fresh chunk.
1689 : */
1690 559979 : if ((hashtable->chunks == NULL) ||
1691 279564 : (hashtable->chunks->maxlen - hashtable->chunks->used) < size)
1692 : {
1693 : /* allocate new chunk and put it at the beginning of the list */
1694 1340 : newChunk = (HashMemoryChunk) MemoryContextAlloc(hashtable->batchCxt,
1695 : offsetof(HashMemoryChunkData, data) + HASH_CHUNK_SIZE);
1696 :
1697 1340 : newChunk->maxlen = HASH_CHUNK_SIZE;
1698 1340 : newChunk->used = size;
1699 1340 : newChunk->ntuples = 1;
1700 :
1701 1340 : newChunk->next = hashtable->chunks;
1702 1340 : hashtable->chunks = newChunk;
1703 :
1704 1340 : return newChunk->data;
1705 : }
1706 :
1707 : /* There is enough space in the current chunk, let's add the tuple */
1708 279075 : ptr = hashtable->chunks->data + hashtable->chunks->used;
1709 279075 : hashtable->chunks->used += size;
1710 279075 : hashtable->chunks->ntuples += 1;
1711 :
1712 : /* return pointer to the start of the tuple memory */
1713 279075 : return ptr;
1714 : }
|