Replace nth() calls in inner loops with traversal of the list via

lnext, to eliminate O(N^2) behavior with lots of indexquals.
This commit is contained in:
Tom Lane 2000-02-05 23:19:44 +00:00
parent 78296c2797
commit 60be6da731
1 changed files with 32 additions and 28 deletions

View File

@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/executor/nodeIndexscan.c,v 1.45 2000/01/26 05:56:23 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/executor/nodeIndexscan.c,v 1.46 2000/02/05 23:19:44 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -124,7 +124,7 @@ IndexNext(IndexScan *node)
if (estate->es_evTuple != NULL &&
estate->es_evTuple[node->scan.scanrelid - 1] != NULL)
{
int iptr;
List *qual;
ExecClearTuple(slot);
if (estate->es_evTupleNull[node->scan.scanrelid - 1])
@ -134,20 +134,23 @@ IndexNext(IndexScan *node)
slot->val = estate->es_evTuple[node->scan.scanrelid - 1];
slot->ttc_shouldFree = false;
for (iptr = 0; iptr < numIndices; iptr++)
scanstate->cstate.cs_ExprContext->ecxt_scantuple = slot;
/* Does the tuple meet any of the OR'd indxqual conditions? */
foreach(qual, node->indxqualorig)
{
scanstate->cstate.cs_ExprContext->ecxt_scantuple = slot;
if (ExecQual(nth(iptr, node->indxqualorig),
if (ExecQual((List *) lfirst(qual),
scanstate->cstate.cs_ExprContext,
false))
break;
}
if (iptr == numIndices) /* would not be returned by indices */
if (qual == NIL) /* would not be returned by indices */
slot->val = NULL;
/* Flag for the next call that no more tuples */
estate->es_evTupleNull[node->scan.scanrelid - 1] = true;
return (slot);
return slot;
}
tuple = &(indexstate->iss_htup);
@ -189,14 +192,14 @@ IndexNext(IndexScan *node)
{
bool prev_matches = false;
int prev_index;
List *qual;
/* ----------------
/*
* store the scanned tuple in the scan tuple slot of
* the scan state. Eventually we will only do this and not
* return a tuple. Note: we pass 'false' because tuples
* returned by amgetnext are pointers onto disk pages and
* must not be pfree()'d.
* ----------------
*/
ExecStoreTuple(tuple, /* tuple to store */
slot, /* slot to store in */
@ -211,28 +214,29 @@ IndexNext(IndexScan *node)
ReleaseBuffer(buffer);
/*
* We must check to see if the current tuple would have
* been matched by an earlier index, so we don't double
* report it. We do this by passing the tuple through
* ExecQual and look for failure with all previous
* qualifications.
* We must check to see if the current tuple was already
* matched by an earlier index, so we don't double-report it.
* We do this by passing the tuple through ExecQual and
* checking for failure with all previous qualifications.
*/
scanstate->cstate.cs_ExprContext->ecxt_scantuple = slot;
qual = node->indxqualorig;
for (prev_index = 0; prev_index < indexstate->iss_IndexPtr;
prev_index++)
{
scanstate->cstate.cs_ExprContext->ecxt_scantuple = slot;
if (ExecQual(nth(prev_index, node->indxqualorig),
if (ExecQual((List *) lfirst(qual),
scanstate->cstate.cs_ExprContext,
false))
{
prev_matches = true;
break;
}
qual = lnext(qual);
}
if (!prev_matches)
return slot;
else
ExecClearTuple(slot);
return slot; /* OK to return tuple */
/* Duplicate tuple, so drop it and loop back for another */
ExecClearTuple(slot);
}
}
if (indexNumber < numIndices)
@ -329,7 +333,6 @@ ExecIndexReScan(IndexScan *node, ExprContext *exprCtxt, Plan *parent)
scanDescs = indexstate->iss_ScanDescs;
scanKeys = indexstate->iss_ScanKeys;
runtimeKeyInfo = (Pointer *) indexstate->iss_RuntimeKeyInfo;
indxqual = node->indxqual;
numScanKeys = indexstate->iss_NumScanKeys;
indexstate->iss_IndexPtr = -1;
if (ScanDirectionIsBackward(node->indxorderdir))
@ -352,9 +355,11 @@ ExecIndexReScan(IndexScan *node, ExprContext *exprCtxt, Plan *parent)
/*
* get the index qualifications and recalculate the appropriate values
*/
indxqual = node->indxqual;
for (i = 0; i < numIndices; i++)
{
qual = nth(i, indxqual);
qual = lfirst(indxqual);
indxqual = lnext(indxqual);
n_keys = numScanKeys[i];
scan_keys = (ScanKey) scanKeys[i];
@ -672,7 +677,6 @@ ExecInitIndexScan(IndexScan *node, EState *estate, Plan *parent)
* ----------------
*/
indxid = node->indxid;
indxqual = node->indxqual;
numIndices = length(indxid);
indexPtr = -1;
@ -701,6 +705,7 @@ ExecInitIndexScan(IndexScan *node, EState *estate, Plan *parent)
* build the index scan keys from the index qualification
* ----------------
*/
indxqual = node->indxqual;
for (i = 0; i < numIndices; i++)
{
int j;
@ -709,7 +714,8 @@ ExecInitIndexScan(IndexScan *node, EState *estate, Plan *parent)
ScanKey scan_keys;
int *run_keys;
qual = nth(i, indxqual);
qual = lfirst(indxqual);
indxqual = lnext(indxqual);
n_keys = length(qual);
scan_keys = (n_keys <= 0) ? NULL :
(ScanKey) palloc(n_keys * sizeof(ScanKeyData));
@ -743,8 +749,8 @@ ExecInitIndexScan(IndexScan *node, EState *estate, Plan *parent)
clause = nth(j, qual);
op = (Oper *) clause->oper;
if (!IsA(op, Oper))
elog(ERROR, "ExecInitIndexScan: op not an Oper!");
if (!IsA(clause, Expr) || !IsA(op, Oper))
elog(ERROR, "ExecInitIndexScan: indxqual not an opclause!");
opid = op->opid;
@ -1066,9 +1072,7 @@ ExecInitIndexScan(IndexScan *node, EState *estate, Plan *parent)
*/
for (i = 0; i < numIndices; i++)
{
Oid indexOid;
indexOid = (Oid) nthi(i, indxid);
Oid indexOid = (Oid) nthi(i, indxid);
if (indexOid != 0)
{