Create infrastructure for moving-aggregate optimization.

Until now, when executing an aggregate function as a window function
within a window with moving frame start (that is, any frame start mode
except UNBOUNDED PRECEDING), we had to recalculate the aggregate from
scratch each time the frame head moved.  This patch allows an aggregate
definition to include an alternate "moving aggregate" implementation
that includes an inverse transition function for removing rows from
the aggregate's running state.  As long as this can be done successfully,
runtime is proportional to the total number of input rows, rather than
to the number of input rows times the average frame length.

This commit includes the core infrastructure, documentation, and regression
tests using user-defined aggregates.  Follow-on commits will update some
of the built-in aggregates to use this feature.

David Rowley and Florian Pflug, reviewed by Dean Rasheed; additional
hacking by me
This commit is contained in:
Tom Lane 2014-04-12 11:58:53 -04:00
parent 3c41b812c5
commit a9d9acbf21
20 changed files with 2239 additions and 301 deletions

View File

@ -386,6 +386,24 @@
<entry><literal><link linkend="catalog-pg-proc"><structname>pg_proc</structname></link>.oid</literal></entry>
<entry>Final function (zero if none)</entry>
</row>
<row>
<entry><structfield>aggmtransfn</structfield></entry>
<entry><type>regproc</type></entry>
<entry><literal><link linkend="catalog-pg-proc"><structname>pg_proc</structname></link>.oid</literal></entry>
<entry>Forward transition function for moving-aggregate mode (zero if none)</entry>
</row>
<row>
<entry><structfield>aggminvtransfn</structfield></entry>
<entry><type>regproc</type></entry>
<entry><literal><link linkend="catalog-pg-proc"><structname>pg_proc</structname></link>.oid</literal></entry>
<entry>Inverse transition function for moving-aggregate mode (zero if none)</entry>
</row>
<row>
<entry><structfield>aggmfinalfn</structfield></entry>
<entry><type>regproc</type></entry>
<entry><literal><link linkend="catalog-pg-proc"><structname>pg_proc</structname></link>.oid</literal></entry>
<entry>Final function for moving-aggregate mode (zero if none)</entry>
</row>
<row>
<entry><structfield>aggsortop</structfield></entry>
<entry><type>oid</type></entry>
@ -405,6 +423,20 @@
<entry>Approximate average size (in bytes) of the transition state
data, or zero to use a default estimate</entry>
</row>
<row>
<entry><structfield>aggmtranstype</structfield></entry>
<entry><type>oid</type></entry>
<entry><literal><link linkend="catalog-pg-type"><structname>pg_type</structname></link>.oid</literal></entry>
<entry>Data type of the aggregate function's internal transition (state)
data for moving-aggregate mode (zero if none)</entry>
</row>
<row>
<entry><structfield>aggmtransspace</structfield></entry>
<entry><type>int4</type></entry>
<entry></entry>
<entry>Approximate average size (in bytes) of the transition state data
for moving-aggregate mode, or zero to use a default estimate</entry>
</row>
<row>
<entry><structfield>agginitval</structfield></entry>
<entry><type>text</type></entry>
@ -416,6 +448,17 @@
value starts out null.
</entry>
</row>
<row>
<entry><structfield>aggminitval</structfield></entry>
<entry><type>text</type></entry>
<entry></entry>
<entry>
The initial value of the transition state for moving-aggregate mode.
This is a text field containing the initial value in its external
string representation. If this field is null, the transition state
value starts out null.
</entry>
</row>
</tbody>
</tgroup>
</table>

View File

@ -27,6 +27,12 @@ CREATE AGGREGATE <replaceable class="parameter">name</replaceable> ( [ <replacea
[ , SSPACE = <replaceable class="PARAMETER">state_data_size</replaceable> ]
[ , FINALFUNC = <replaceable class="PARAMETER">ffunc</replaceable> ]
[ , INITCOND = <replaceable class="PARAMETER">initial_condition</replaceable> ]
[ , MSFUNC = <replaceable class="PARAMETER">msfunc</replaceable> ]
[ , MINVFUNC = <replaceable class="PARAMETER">minvfunc</replaceable> ]
[ , MSTYPE = <replaceable class="PARAMETER">mstate_data_type</replaceable> ]
[ , MSSPACE = <replaceable class="PARAMETER">mstate_data_size</replaceable> ]
[ , MFINALFUNC = <replaceable class="PARAMETER">mffunc</replaceable> ]
[ , MINITCOND = <replaceable class="PARAMETER">minitial_condition</replaceable> ]
[ , SORTOP = <replaceable class="PARAMETER">sort_operator</replaceable> ]
)
@ -49,6 +55,12 @@ CREATE AGGREGATE <replaceable class="PARAMETER">name</replaceable> (
[ , SSPACE = <replaceable class="PARAMETER">state_data_size</replaceable> ]
[ , FINALFUNC = <replaceable class="PARAMETER">ffunc</replaceable> ]
[ , INITCOND = <replaceable class="PARAMETER">initial_condition</replaceable> ]
[ , MSFUNC = <replaceable class="PARAMETER">sfunc</replaceable> ]
[ , MINVFUNC = <replaceable class="PARAMETER">invfunc</replaceable> ]
[ , MSTYPE = <replaceable class="PARAMETER">state_data_type</replaceable> ]
[ , MSSPACE = <replaceable class="PARAMETER">state_data_size</replaceable> ]
[ , MFINALFUNC = <replaceable class="PARAMETER">ffunc</replaceable> ]
[ , MINITCOND = <replaceable class="PARAMETER">initial_condition</replaceable> ]
[ , SORTOP = <replaceable class="PARAMETER">sort_operator</replaceable> ]
)
</synopsis>
@ -84,7 +96,7 @@ CREATE AGGREGATE <replaceable class="PARAMETER">name</replaceable> (
</para>
<para>
An aggregate function is made from one or two ordinary
A simple aggregate function is made from one or two ordinary
functions:
a state transition function
<replaceable class="PARAMETER">sfunc</replaceable>,
@ -126,7 +138,7 @@ CREATE AGGREGATE <replaceable class="PARAMETER">name</replaceable> (
values are ignored (the function is not called and the previous state value
is retained). If the initial state value is null, then at the first row
with all-nonnull input values, the first argument value replaces the state
value, and the transition function is invoked at subsequent rows with
value, and the transition function is invoked at each subsequent row with
all-nonnull input values.
This is handy for implementing aggregates like <function>max</function>.
Note that this behavior is only available when
@ -154,6 +166,18 @@ CREATE AGGREGATE <replaceable class="PARAMETER">name</replaceable> (
input rows.
</para>
<para>
An aggregate can optionally support <firstterm>moving-aggregate mode</>,
as described in <xref linkend="xaggr-moving-aggregates">. This requires
specifying the <literal>MSFUNC</>, <literal>MINVFUNC</>,
and <literal>MSTYPE</> parameters, and optionally
the <literal>MSPACE</>, <literal>MFINALFUNC</>,
and <literal>MINITCOND</> parameters. Except for <literal>MINVFUNC</>,
these parameters work like the corresponding simple-aggregate parameters
without <literal>M</>; they define a separate implementation of the
aggregate that includes an inverse transition function.
</para>
<para>
The syntax with <literal>ORDER BY</literal> in the parameter list creates
a special type of aggregate called an <firstterm>ordered-set
@ -197,8 +221,8 @@ SELECT col FROM tab ORDER BY col USING sortop LIMIT 1;
<para>
To be able to create an aggregate function, you must
have <literal>USAGE</literal> privilege on the argument types, the state
type, and the return type, as well as <literal>EXECUTE</literal> privilege
on the transition and final functions.
type(s), and the return type, as well as <literal>EXECUTE</literal>
privilege on the transition and final functions.
</para>
</refsect1>
@ -359,6 +383,79 @@ SELECT col FROM tab ORDER BY col USING sortop LIMIT 1;
</listitem>
</varlistentry>
<varlistentry>
<term><replaceable class="PARAMETER">msfunc</replaceable></term>
<listitem>
<para>
The name of the forward state transition function to be called for each
input row in moving-aggregate mode. This is exactly like the regular
transition function, except that its first argument and result are of
type <replaceable>mstate_data_type</>, which might be different
from <replaceable>state_data_type</>.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><replaceable class="PARAMETER">minvfunc</replaceable></term>
<listitem>
<para>
The name of the inverse state transition function to be used in
moving-aggregate mode. This function has the same argument and
result types as <replaceable>msfunc</>, but it is used to remove
a value from the current aggregate state, rather than add a value to
it. The inverse transition function must have the same strictness
attribute as the forward state transition function.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><replaceable class="PARAMETER">mstate_data_type</replaceable></term>
<listitem>
<para>
The data type for the aggregate's state value, when using
moving-aggregate mode.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><replaceable class="PARAMETER">mstate_data_size</replaceable></term>
<listitem>
<para>
The approximate average size (in bytes) of the aggregate's state
value, when using moving-aggregate mode. This works the same as
<replaceable>state_data_size</>.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><replaceable class="PARAMETER">mffunc</replaceable></term>
<listitem>
<para>
The name of the final function called to compute the aggregate's
result after all input rows have been traversed, when using
moving-aggregate mode. This works the same as <replaceable>ffunc</>,
except that its input type is <replaceable>mstate_data_type</>.
The aggregate result type determined by <replaceable>mffunc</>
and <replaceable>mstate_data_type</> must match that determined by the
aggregate's regular implementation.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><replaceable class="PARAMETER">minitial_condition</replaceable></term>
<listitem>
<para>
The initial setting for the state value, when using moving-aggregate
mode. This works the same as <replaceable>initial_condition</>.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><replaceable class="PARAMETER">sort_operator</replaceable></term>
<listitem>
@ -397,6 +494,49 @@ SELECT col FROM tab ORDER BY col USING sortop LIMIT 1;
<refsect1>
<title>Notes</title>
<para>
If an aggregate supports moving-aggregate mode, it will improve
calculation efficiency when the aggregate is used as a window function
for a window with moving frame start (that is, a frame start mode other
than <literal>UNBOUNDED PRECEDING</>). Conceptually, the forward
transition function adds input values to the aggregate's state when
they enter the window frame from the bottom, and the inverse transition
function removes them again when they leave the frame at the top. So,
when values are removed, they are always removed in the same order they
were added. Whenever the inverse transition function is invoked, it will
thus receive the earliest added but not yet removed argument value(s).
The inverse transition function can assume that at least one row will
remain in the current state after it removes the oldest row. (When this
would not be the case, the window function mechanism simply starts a
fresh aggregation, rather than using the inverse transition function.)
</para>
<para>
The forward transition function for moving-aggregate mode is not
allowed to return NULL as the new state value. If the inverse
transition function returns NULL, this is taken as an indication that
the inverse function cannot reverse the state calculation for this
particular input, and so the aggregate calculation will be redone from
scratch for the current frame starting position. This convention
allows moving-aggregate mode to be used in situations where there are
some infrequent cases that are impractical to reverse out of the
running state value.
</para>
<para>
If no moving-aggregate implementation is supplied,
the aggregate can still be used with moving frames,
but <productname>PostgreSQL</productname> will recompute the whole
aggregation whenever the start of the frame moves.
Note that whether or not the aggregate supports moving-aggregate
mode, <productname>PostgreSQL</productname> can handle a moving frame
end without recalculation; this is done by continuing to add new values
to the aggregate's state. It is assumed that the final function does
not damage the aggregate's state value, so that the aggregation can be
continued even after an aggregate result value has been obtained for
one set of frame boundaries.
</para>
<para>
The syntax for ordered-set aggregates allows <literal>VARIADIC</>
to be specified for both the last direct parameter and the last
@ -415,6 +555,11 @@ SELECT col FROM tab ORDER BY col USING sortop LIMIT 1;
ones; any preceding parameters represent additional direct arguments
that are not constrained to match the aggregated arguments.
</para>
<para>
Currently, ordered-set aggregates do not need to support
moving-aggregate mode, since they cannot be used as window functions.
</para>
</refsect1>
<refsect1>

View File

@ -131,6 +131,161 @@ CREATE AGGREGATE avg (float8)
</para>
</note>
<para>
Aggregate function calls in SQL allow <literal>DISTINCT</>
and <literal>ORDER BY</> options that control which rows are fed
to the aggregate's transition function and in what order. These
options are implemented behind the scenes and are not the concern
of the aggregate's support functions.
</para>
<para>
For further details see the
<xref linkend="sql-createaggregate">
command.
</para>
<sect2 id="xaggr-moving-aggregates">
<title>Moving-Aggregate Mode</title>
<indexterm>
<primary>moving-aggregate mode</primary>
</indexterm>
<indexterm>
<primary>aggregate function</primary>
<secondary>moving aggregate</secondary>
</indexterm>
<para>
Aggregate functions can optionally support <firstterm>moving-aggregate
mode</>, which allows substantially faster execution of aggregate
functions within windows with moving frame starting points.
(See <xref linkend="tutorial-window">
and <xref linkend="syntax-window-functions"> for information about use of
aggregate functions as window functions.)
The basic idea is that in addition to a normal <quote>forward</>
transition function, the aggregate provides an <firstterm>inverse
transition function</>, which allows rows to be removed from the
aggregate's running state value when they exit the window frame.
For example a <function>sum</> aggregate, which uses addition as the
forward transition function, would use subtraction as the inverse
transition function. Without an inverse transition function, the window
function mechanism must recalculate the aggregate from scratch each time
the frame starting point moves, resulting in run time proportional to the
number of input rows times the average frame length. With an inverse
transition function, the run time is only proportional to the number of
input rows.
</para>
<para>
The inverse transition function is passed the current state value and the
aggregate input value(s) for the earliest row included in the current
state. It must reconstruct what the state value would have been if the
given input value had never been aggregated, but only the rows following
it. This sometimes requires that the forward transition function keep
more state than is needed for plain aggregation mode. Therefore, the
moving-aggregate mode uses a completely separate implementation from the
plain mode: it has its own state data type, its own forward transition
function, and its own final function if needed. These can be the same as
the plain mode's data type and functions, if there is no need for extra
state.
</para>
<para>
As an example, we could extend the <function>sum</> aggregate given above
to support moving-aggregate mode like this:
<programlisting>
CREATE AGGREGATE sum (complex)
(
sfunc = complex_add,
stype = complex,
initcond = '(0,0)',
msfunc = complex_add,
minvfunc = complex_sub,
mstype = complex,
minitcond = '(0,0)'
);
</programlisting>
The parameters whose names begin with <literal>m</> define the
moving-aggregate implementation. Except for the inverse transition
function <literal>minvfunc</>, they correspond to the plain-aggregate
parameters without <literal>m</>.
</para>
<para>
The forward transition function for moving-aggregate mode is not allowed
to return NULL as the new state value. If the inverse transition
function returns NULL, this is taken as an indication that the inverse
function cannot reverse the state calculation for this particular input,
and so the aggregate calculation will be redone from scratch for the
current frame starting position. This convention allows moving-aggregate
mode to be used in situations where there are some infrequent cases that
are impractical to reverse out of the running state value. The inverse
transition function can <quote>punt</> on these cases, and yet still come
out ahead so long as it can work for most cases. As an example, an
aggregate working with floating-point numbers might choose to punt when
a <literal>NaN</> (not a number) input has to be removed from the running
state value.
</para>
<para>
When writing moving-aggregate support functions, it is important to be
sure that the inverse transition function can reconstruct the correct
state value exactly. Otherwise there might be user-visible differences
in results depending on whether the moving-aggregate mode is used.
An example of an aggregate for which adding an inverse transition
function seems easy at first, yet where this requirement cannot be met
is <function>sum</> over <type>float4</> or <type>float8</> inputs. A
naive declaration of <function>sum(<type>float8</>)</function> could be
<programlisting>
CREATE AGGREGATE unsafe_sum (float8)
(
stype = float8,
sfunc = float8pl,
mstype = float8,
msfunc = float8pl,
minvfunc = float8mi
);
</programlisting>
This aggregate, however, can give wildly different results than it would
have without the inverse transition function. For example, consider
<programlisting>
SELECT
unsafe_sum(x) OVER (ORDER BY n ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING)
FROM (VALUES (1, 1.0e20::float8),
(2, 1.0::float8)) AS v (n,x);
</programlisting>
This query returns <literal>0</> as its second result, rather than the
expected answer of <literal>1</>. The cause is the limited precision of
floating-point values: adding <literal>1</> to <literal>1e20</> results
in <literal>1e20</> again, and so subtracting <literal>1e20</> from that
yields <literal>0</>, not <literal>1</>. Note that this is a limitation
of floating-point arithmetic in general, not a limitation
of <productname>PostgreSQL</>.
</para>
</sect2>
<sect2 id="xaggr-polymorphic-aggregates">
<title>Polymorphic and Variadic Aggregates</title>
<indexterm>
<primary>aggregate function</primary>
<secondary>polymorphic</secondary>
</indexterm>
<indexterm>
<primary>aggregate function</primary>
<secondary>variadic</secondary>
</indexterm>
<para>
Aggregate functions can use polymorphic
state transition functions or final functions, so that the same functions
@ -189,8 +344,8 @@ SELECT attrelid::regclass, array_accum(atttypid::regtype)
by declaring its last argument as a <literal>VARIADIC</> array, in much
the same fashion as for regular functions; see
<xref linkend="xfunc-sql-variadic-functions">. The aggregate's transition
function must have the same array type as its last argument. The
transition function typically would also be marked <literal>VARIADIC</>,
function(s) must have the same array type as their last argument. The
transition function(s) typically would also be marked <literal>VARIADIC</>,
but this is not strictly required.
</para>
@ -220,13 +375,15 @@ SELECT myaggregate(a, b, c ORDER BY a) FROM ...
</para>
</note>
<para>
Aggregate function calls in SQL allow <literal>DISTINCT</>
and <literal>ORDER BY</> options that control which rows are fed
to the aggregate's transition function and in what order. These
options are implemented behind the scenes and are not the concern
of the aggregate's support functions.
</para>
</sect2>
<sect2 id="xaggr-ordered-set-aggregates">
<title>Ordered-Set Aggregates</title>
<indexterm>
<primary>aggregate function</primary>
<secondary>ordered set</secondary>
</indexterm>
<para>
The aggregates we have been describing so far are <quote>normal</>
@ -311,6 +468,21 @@ SELECT percentile_disc(0.5) WITHIN GROUP (ORDER BY income) FROM households;
returns anyelement</>.
</para>
<para>
Currently, ordered-set aggregates cannot be used as window functions,
and therefore there is no need for them to support moving-aggregate mode.
</para>
</sect2>
<sect2 id="xaggr-support-functions">
<title>Support Functions for Aggregates</title>
<indexterm>
<primary>aggregate function</primary>
<secondary>support functions for</secondary>
</indexterm>
<para>
A function written in C can detect that it is being called as an
aggregate transition or final function by calling
@ -341,9 +513,6 @@ if (AggCheckCallContext(fcinfo, NULL))
source code.
</para>
<para>
For further details see the
<xref linkend="sql-createaggregate">
command.
</para>
</sect2>
</sect1>

View File

@ -57,10 +57,16 @@ AggregateCreate(const char *aggName,
Oid variadicArgType,
List *aggtransfnName,
List *aggfinalfnName,
List *aggmtransfnName,
List *aggminvtransfnName,
List *aggmfinalfnName,
List *aggsortopName,
Oid aggTransType,
int32 aggTransSpace,
const char *agginitval)
Oid aggmTransType,
int32 aggmTransSpace,
const char *agginitval,
const char *aggminitval)
{
Relation aggdesc;
HeapTuple tup;
@ -69,14 +75,19 @@ AggregateCreate(const char *aggName,
Form_pg_proc proc;
Oid transfn;
Oid finalfn = InvalidOid; /* can be omitted */
Oid mtransfn = InvalidOid; /* can be omitted */
Oid minvtransfn = InvalidOid; /* can be omitted */
Oid mfinalfn = InvalidOid; /* can be omitted */
Oid sortop = InvalidOid; /* can be omitted */
Oid *aggArgTypes = parameterTypes->values;
bool hasPolyArg;
bool hasInternalArg;
bool mtransIsStrict = false;
Oid rettype;
Oid finaltype;
Oid fnArgs[FUNC_MAX_ARGS];
int nargs_transfn;
int nargs_finalfn;
Oid procOid;
TupleDesc tupDesc;
int i;
@ -128,6 +139,16 @@ AggregateCreate(const char *aggName,
errmsg("cannot determine transition data type"),
errdetail("An aggregate using a polymorphic transition type must have at least one polymorphic argument.")));
/*
* Likewise for moving-aggregate transtype, if any
*/
if (OidIsValid(aggmTransType) &&
IsPolymorphicType(aggmTransType) && !hasPolyArg)
ereport(ERROR,
(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
errmsg("cannot determine transition data type"),
errdetail("An aggregate using a polymorphic transition type must have at least one polymorphic argument.")));
/*
* An ordered-set aggregate that is VARIADIC must be VARIADIC ANY. In
* principle we could support regular variadic types, but it would make
@ -234,32 +255,120 @@ AggregateCreate(const char *aggName,
(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
errmsg("must not omit initial value when transition function is strict and transition type is not compatible with input type")));
}
ReleaseSysCache(tup);
/* handle moving-aggregate transfn, if supplied */
if (aggmtransfnName)
{
/*
* The arguments are the same as for the regular transfn, except that
* the transition data type might be different. So re-use the fnArgs
* values set up above, except for that one.
*/
Assert(OidIsValid(aggmTransType));
fnArgs[0] = aggmTransType;
mtransfn = lookup_agg_function(aggmtransfnName, nargs_transfn,
fnArgs, variadicArgType,
&rettype);
/* As above, return type must exactly match declared mtranstype. */
if (rettype != aggmTransType)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("return type of transition function %s is not %s",
NameListToString(aggmtransfnName),
format_type_be(aggmTransType))));
tup = SearchSysCache1(PROCOID, ObjectIdGetDatum(mtransfn));
if (!HeapTupleIsValid(tup))
elog(ERROR, "cache lookup failed for function %u", mtransfn);
proc = (Form_pg_proc) GETSTRUCT(tup);
/*
* If the mtransfn is strict and the minitval is NULL, check first
* input type and mtranstype are binary-compatible.
*/
if (proc->proisstrict && aggminitval == NULL)
{
if (numArgs < 1 ||
!IsBinaryCoercible(aggArgTypes[0], aggmTransType))
ereport(ERROR,
(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
errmsg("must not omit initial value when transition function is strict and transition type is not compatible with input type")));
}
/* Remember if mtransfn is strict; we may need this below */
mtransIsStrict = proc->proisstrict;
ReleaseSysCache(tup);
}
/* handle minvtransfn, if supplied */
if (aggminvtransfnName)
{
/*
* This must have the same number of arguments with the same types as
* the forward transition function, so just re-use the fnArgs data.
*/
Assert(aggmtransfnName);
minvtransfn = lookup_agg_function(aggminvtransfnName, nargs_transfn,
fnArgs, variadicArgType,
&rettype);
/* As above, return type must exactly match declared mtranstype. */
if (rettype != aggmTransType)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("return type of inverse transition function %s is not %s",
NameListToString(aggminvtransfnName),
format_type_be(aggmTransType))));
tup = SearchSysCache1(PROCOID, ObjectIdGetDatum(minvtransfn));
if (!HeapTupleIsValid(tup))
elog(ERROR, "cache lookup failed for function %u", minvtransfn);
proc = (Form_pg_proc) GETSTRUCT(tup);
/*
* We require the strictness settings of the forward and inverse
* transition functions to agree. This saves having to handle
* assorted special cases at execution time.
*/
if (proc->proisstrict != mtransIsStrict)
ereport(ERROR,
(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
errmsg("strictness of aggregate's forward and inverse transition functions must match")));
ReleaseSysCache(tup);
}
/*
* Set up fnArgs for looking up finalfn(s)
*
* For ordinary aggs, the finalfn just takes the transtype. For
* ordered-set aggs, it takes the transtype plus all args. (The
* aggregated args are useless at runtime, and are actually passed as
* NULLs, but we may need them in the function signature to allow
* resolution of a polymorphic agg's result type.)
*/
fnArgs[0] = aggTransType;
if (AGGKIND_IS_ORDERED_SET(aggKind))
{
nargs_finalfn = numArgs + 1;
memcpy(fnArgs + 1, aggArgTypes, numArgs * sizeof(Oid));
}
else
{
nargs_finalfn = 1;
/* variadic-ness of the aggregate doesn't affect finalfn */
variadicArgType = InvalidOid;
}
/* handle finalfn, if supplied */
if (aggfinalfnName)
{
int nargs_finalfn;
/*
* For ordinary aggs, the finalfn just takes the transtype. For
* ordered-set aggs, it takes the transtype plus all args. (The
* aggregated args are useless at runtime, and are actually passed as
* NULLs, but we may need them in the function signature to allow
* resolution of a polymorphic agg's result type.)
*/
fnArgs[0] = aggTransType;
if (AGGKIND_IS_ORDERED_SET(aggKind))
{
nargs_finalfn = numArgs + 1;
memcpy(fnArgs + 1, aggArgTypes, numArgs * sizeof(Oid));
}
else
{
nargs_finalfn = 1;
/* variadic-ness of the aggregate doesn't affect finalfn */
variadicArgType = InvalidOid;
}
finalfn = lookup_agg_function(aggfinalfnName, nargs_finalfn,
fnArgs, variadicArgType,
&finaltype);
@ -314,6 +423,49 @@ AggregateCreate(const char *aggName,
errmsg("unsafe use of pseudo-type \"internal\""),
errdetail("A function returning \"internal\" must have at least one \"internal\" argument.")));
/*
* If a moving-aggregate implementation is supplied, look up its finalfn
* if any, and check that the implied aggregate result type matches the
* plain implementation.
*/
if (OidIsValid(aggmTransType))
{
/* handle finalfn, if supplied */
if (aggmfinalfnName)
{
/*
* The arguments are the same as for the regular finalfn, except
* that the transition data type might be different. So re-use
* the fnArgs values set up above, except for that one.
*/
fnArgs[0] = aggmTransType;
mfinalfn = lookup_agg_function(aggmfinalfnName, nargs_finalfn,
fnArgs, variadicArgType,
&rettype);
/* As above, check strictness if it's an ordered-set agg */
if (AGGKIND_IS_ORDERED_SET(aggKind) && func_strict(mfinalfn))
ereport(ERROR,
(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
errmsg("final function of an ordered-set aggregate must not be declared STRICT")));
}
else
{
/*
* If no finalfn, aggregate result type is type of the state value
*/
rettype = aggmTransType;
}
Assert(OidIsValid(rettype));
if (rettype != finaltype)
ereport(ERROR,
(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
errmsg("moving-aggregate implementation returns type %s, but plain implementation returns type %s",
format_type_be(aggmTransType),
format_type_be(aggTransType))));
}
/* handle sortop, if supplied */
if (aggsortopName)
{
@ -340,6 +492,13 @@ AggregateCreate(const char *aggName,
if (aclresult != ACLCHECK_OK)
aclcheck_error_type(aclresult, aggTransType);
if (OidIsValid(aggmTransType))
{
aclresult = pg_type_aclcheck(aggmTransType, GetUserId(), ACL_USAGE);
if (aclresult != ACLCHECK_OK)
aclcheck_error_type(aclresult, aggmTransType);
}
aclresult = pg_type_aclcheck(finaltype, GetUserId(), ACL_USAGE);
if (aclresult != ACLCHECK_OK)
aclcheck_error_type(aclresult, finaltype);
@ -392,13 +551,22 @@ AggregateCreate(const char *aggName,
values[Anum_pg_aggregate_aggnumdirectargs - 1] = Int16GetDatum(numDirectArgs);
values[Anum_pg_aggregate_aggtransfn - 1] = ObjectIdGetDatum(transfn);
values[Anum_pg_aggregate_aggfinalfn - 1] = ObjectIdGetDatum(finalfn);
values[Anum_pg_aggregate_aggmtransfn - 1] = ObjectIdGetDatum(mtransfn);
values[Anum_pg_aggregate_aggminvtransfn - 1] = ObjectIdGetDatum(minvtransfn);
values[Anum_pg_aggregate_aggmfinalfn - 1] = ObjectIdGetDatum(mfinalfn);
values[Anum_pg_aggregate_aggsortop - 1] = ObjectIdGetDatum(sortop);
values[Anum_pg_aggregate_aggtranstype - 1] = ObjectIdGetDatum(aggTransType);
values[Anum_pg_aggregate_aggtransspace - 1] = Int32GetDatum(aggTransSpace);
values[Anum_pg_aggregate_aggmtranstype - 1] = ObjectIdGetDatum(aggmTransType);
values[Anum_pg_aggregate_aggmtransspace - 1] = Int32GetDatum(aggmTransSpace);
if (agginitval)
values[Anum_pg_aggregate_agginitval - 1] = CStringGetTextDatum(agginitval);
else
nulls[Anum_pg_aggregate_agginitval - 1] = true;
if (aggminitval)
values[Anum_pg_aggregate_aggminitval - 1] = CStringGetTextDatum(aggminitval);
else
nulls[Anum_pg_aggregate_aggminitval - 1] = true;
aggdesc = heap_open(AggregateRelationId, RowExclusiveLock);
tupDesc = aggdesc->rd_att;
@ -414,6 +582,7 @@ AggregateCreate(const char *aggName,
* Create dependencies for the aggregate (above and beyond those already
* made by ProcedureCreate). Note: we don't need an explicit dependency
* on aggTransType since we depend on it indirectly through transfn.
* Likewise for aggmTransType if any.
*/
myself.classId = ProcedureRelationId;
myself.objectId = procOid;
@ -434,6 +603,33 @@ AggregateCreate(const char *aggName,
recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
}
/* Depends on forward transition function, if any */
if (OidIsValid(mtransfn))
{
referenced.classId = ProcedureRelationId;
referenced.objectId = mtransfn;
referenced.objectSubId = 0;
recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
}
/* Depends on inverse transition function, if any */
if (OidIsValid(minvtransfn))
{
referenced.classId = ProcedureRelationId;
referenced.objectId = minvtransfn;
referenced.objectSubId = 0;
recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
}
/* Depends on final function, if any */
if (OidIsValid(mfinalfn))
{
referenced.classId = ProcedureRelationId;
referenced.objectId = mfinalfn;
referenced.objectSubId = 0;
recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
}
/* Depends on sort operator, if any */
if (OidIsValid(sortop))
{
@ -447,7 +643,12 @@ AggregateCreate(const char *aggName,
}
/*
* lookup_agg_function -- common code for finding both transfn and finalfn
* lookup_agg_function
* common code for finding transfn, invtransfn and finalfn
*
* Returns OID of function, and stores its return type into *rettype
*
* NB: must not scribble on input_types[], as we may re-use those
*/
static Oid
lookup_agg_function(List *fnName,

View File

@ -61,11 +61,17 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
char aggKind = AGGKIND_NORMAL;
List *transfuncName = NIL;
List *finalfuncName = NIL;
List *mtransfuncName = NIL;
List *minvtransfuncName = NIL;
List *mfinalfuncName = NIL;
List *sortoperatorName = NIL;
TypeName *baseType = NULL;
TypeName *transType = NULL;
TypeName *mtransType = NULL;
int32 transSpace = 0;
int32 mtransSpace = 0;
char *initval = NULL;
char *minitval = NULL;
int numArgs;
int numDirectArgs = 0;
oidvector *parameterTypes;
@ -75,7 +81,9 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
List *parameterDefaults;
Oid variadicArgType;
Oid transTypeId;
Oid mtransTypeId = InvalidOid;
char transTypeType;
char mtransTypeType = 0;
ListCell *pl;
/* Convert list of names to a name and namespace */
@ -114,6 +122,12 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
transfuncName = defGetQualifiedName(defel);
else if (pg_strcasecmp(defel->defname, "finalfunc") == 0)
finalfuncName = defGetQualifiedName(defel);
else if (pg_strcasecmp(defel->defname, "msfunc") == 0)
mtransfuncName = defGetQualifiedName(defel);
else if (pg_strcasecmp(defel->defname, "minvfunc") == 0)
minvtransfuncName = defGetQualifiedName(defel);
else if (pg_strcasecmp(defel->defname, "mfinalfunc") == 0)
mfinalfuncName = defGetQualifiedName(defel);
else if (pg_strcasecmp(defel->defname, "sortop") == 0)
sortoperatorName = defGetQualifiedName(defel);
else if (pg_strcasecmp(defel->defname, "basetype") == 0)
@ -135,10 +149,16 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
transType = defGetTypeName(defel);
else if (pg_strcasecmp(defel->defname, "sspace") == 0)
transSpace = defGetInt32(defel);
else if (pg_strcasecmp(defel->defname, "mstype") == 0)
mtransType = defGetTypeName(defel);
else if (pg_strcasecmp(defel->defname, "msspace") == 0)
mtransSpace = defGetInt32(defel);
else if (pg_strcasecmp(defel->defname, "initcond") == 0)
initval = defGetString(defel);
else if (pg_strcasecmp(defel->defname, "initcond1") == 0)
initval = defGetString(defel);
else if (pg_strcasecmp(defel->defname, "minitcond") == 0)
minitval = defGetString(defel);
else
ereport(WARNING,
(errcode(ERRCODE_SYNTAX_ERROR),
@ -158,6 +178,46 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
errmsg("aggregate sfunc must be specified")));
/*
* if mtransType is given, mtransfuncName and minvtransfuncName must be as
* well; if not, then none of the moving-aggregate options should have
* been given.
*/
if (mtransType != NULL)
{
if (mtransfuncName == NIL)
ereport(ERROR,
(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
errmsg("aggregate msfunc must be specified when mstype is specified")));
if (minvtransfuncName == NIL)
ereport(ERROR,
(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
errmsg("aggregate minvfunc must be specified when mstype is specified")));
}
else
{
if (mtransfuncName != NIL)
ereport(ERROR,
(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
errmsg("aggregate msfunc must not be specified without mstype")));
if (minvtransfuncName != NIL)
ereport(ERROR,
(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
errmsg("aggregate minvfunc must not be specified without mstype")));
if (mfinalfuncName != NIL)
ereport(ERROR,
(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
errmsg("aggregate mfinalfunc must not be specified without mstype")));
if (mtransSpace != 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
errmsg("aggregate msspace must not be specified without mstype")));
if (minitval != NULL)
ereport(ERROR,
(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
errmsg("aggregate minitcond must not be specified without mstype")));
}
/*
* look up the aggregate's input datatype(s).
*/
@ -250,6 +310,27 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
format_type_be(transTypeId))));
}
/*
* If a moving-aggregate transtype is specified, look that up. Same
* restrictions as for transtype.
*/
if (mtransType)
{
mtransTypeId = typenameTypeId(NULL, mtransType);
mtransTypeType = get_typtype(mtransTypeId);
if (mtransTypeType == TYPTYPE_PSEUDO &&
!IsPolymorphicType(mtransTypeId))
{
if (mtransTypeId == INTERNALOID && superuser())
/* okay */ ;
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
errmsg("aggregate transition data type cannot be %s",
format_type_be(mtransTypeId))));
}
}
/*
* If we have an initval, and it's not for a pseudotype (particularly a
* polymorphic type), make sure it's acceptable to the type's input
@ -268,6 +349,18 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
(void) OidInputFunctionCall(typinput, initval, typioparam, -1);
}
/*
* Likewise for moving-aggregate initval.
*/
if (minitval && mtransTypeType != TYPTYPE_PSEUDO)
{
Oid typinput,
typioparam;
getTypeInputInfo(mtransTypeId, &typinput, &typioparam);
(void) OidInputFunctionCall(typinput, minitval, typioparam, -1);
}
/*
* Most of the argument-checking is done inside of AggregateCreate
*/
@ -284,8 +377,14 @@ DefineAggregate(List *name, List *args, bool oldstyle, List *parameters,
variadicArgType,
transfuncName, /* step function name */
finalfuncName, /* final function name */
mtransfuncName, /* fwd trans function name */
minvtransfuncName, /* inv trans function name */
mfinalfuncName, /* final function name */
sortoperatorName, /* sort operator name */
transTypeId, /* transition data type */
transSpace, /* transition space */
initval); /* initial condition */
mtransTypeId, /* transition data type */
mtransSpace, /* transition space */
initval, /* initial condition */
minitval); /* initial condition */
}

View File

@ -1798,8 +1798,10 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
aggref->aggtype,
aggref->inputcollid,
transfn_oid,
InvalidOid, /* invtrans is not needed here */
finalfn_oid,
&transfnexpr,
NULL,
&finalfnexpr);
/* set up infrastructure for calling the transfn and finalfn */
@ -1847,7 +1849,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
* type and transtype are the same (or at least binary-compatible), so
* that it's OK to use the first aggregated input value as the initial
* transValue. This should have been checked at agg definition time,
* but just in case...
* but we must check again in case the transfn's strictness property
* has been changed.
*/
if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull)
{
@ -2126,6 +2129,12 @@ ExecReScanAgg(AggState *node)
ExecReScan(node->ss.ps.lefttree);
}
/***********************************************************************
* API exposed to aggregate functions
***********************************************************************/
/*
* AggCheckCallContext - test if a SQL function is being called as an aggregate
*
@ -2152,7 +2161,7 @@ AggCheckCallContext(FunctionCallInfo fcinfo, MemoryContext *aggcontext)
if (fcinfo->context && IsA(fcinfo->context, WindowAggState))
{
if (aggcontext)
*aggcontext = ((WindowAggState *) fcinfo->context)->aggcontext;
*aggcontext = ((WindowAggState *) fcinfo->context)->curaggcontext;
return AGG_CONTEXT_WINDOW;
}

View File

@ -102,16 +102,18 @@ typedef struct WindowStatePerFuncData
*/
typedef struct WindowStatePerAggData
{
/* Oids of transfer functions */
/* Oids of transition functions */
Oid transfn_oid;
Oid invtransfn_oid; /* may be InvalidOid */
Oid finalfn_oid; /* may be InvalidOid */
/*
* fmgr lookup data for transfer functions --- only valid when
* fmgr lookup data for transition functions --- only valid when
* corresponding oid is not InvalidOid. Note in particular that fn_strict
* flags are kept here.
*/
FmgrInfo transfn;
FmgrInfo invtransfn;
FmgrInfo finalfn;
/*
@ -139,11 +141,17 @@ typedef struct WindowStatePerAggData
int wfuncno; /* index of associated PerFuncData */
/* Context holding transition value and possibly other subsidiary data */
MemoryContext aggcontext; /* may be private, or winstate->aggcontext */
/* Current transition value */
Datum transValue; /* current transition value */
bool transValueIsNull;
bool noTransValue; /* true if transValue not set yet */
int64 transValueCount; /* number of currently-aggregated rows */
/* Data local to eval_windowaggregates() */
bool restart; /* need to restart this agg in this cycle? */
} WindowStatePerAggData;
static void initialize_windowaggregate(WindowAggState *winstate,
@ -152,6 +160,9 @@ static void initialize_windowaggregate(WindowAggState *winstate,
static void advance_windowaggregate(WindowAggState *winstate,
WindowStatePerFunc perfuncstate,
WindowStatePerAgg peraggstate);
static bool advance_windowaggregate_base(WindowAggState *winstate,
WindowStatePerFunc perfuncstate,
WindowStatePerAgg peraggstate);
static void finalize_windowaggregate(WindowAggState *winstate,
WindowStatePerFunc perfuncstate,
WindowStatePerAgg peraggstate,
@ -193,18 +204,27 @@ initialize_windowaggregate(WindowAggState *winstate,
{
MemoryContext oldContext;
/*
* If we're using a private aggcontext, we may reset it here. But if the
* context is shared, we don't know which other aggregates may still need
* it, so we must leave it to the caller to reset at an appropriate time.
*/
if (peraggstate->aggcontext != winstate->aggcontext)
MemoryContextResetAndDeleteChildren(peraggstate->aggcontext);
if (peraggstate->initValueIsNull)
peraggstate->transValue = peraggstate->initValue;
else
{
oldContext = MemoryContextSwitchTo(winstate->aggcontext);
oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);
peraggstate->transValue = datumCopy(peraggstate->initValue,
peraggstate->transtypeByVal,
peraggstate->transtypeLen);
MemoryContextSwitchTo(oldContext);
}
peraggstate->transValueIsNull = peraggstate->initValueIsNull;
peraggstate->noTransValue = peraggstate->initValueIsNull;
peraggstate->transValueCount = 0;
peraggstate->resultValue = (Datum) 0;
peraggstate->resultValueIsNull = true;
}
@ -258,7 +278,8 @@ advance_windowaggregate(WindowAggState *winstate,
{
/*
* For a strict transfn, nothing happens when there's a NULL input; we
* just keep the prior transValue.
* just keep the prior transValue. Note transValueCount doesn't
* change either.
*/
for (i = 1; i <= numArguments; i++)
{
@ -268,41 +289,47 @@ advance_windowaggregate(WindowAggState *winstate,
return;
}
}
if (peraggstate->noTransValue)
/*
* For strict transition functions with initial value NULL we use the
* first non-NULL input as the initial state. (We already checked
* that the agg's input type is binary-compatible with its transtype,
* so straight copy here is OK.)
*
* We must copy the datum into aggcontext if it is pass-by-ref. We do
* not need to pfree the old transValue, since it's NULL.
*/
if (peraggstate->transValueCount == 0 && peraggstate->transValueIsNull)
{
/*
* transValue has not been initialized. This is the first non-NULL
* input value. We use it as the initial value for transValue. (We
* already checked that the agg's input type is binary-compatible
* with its transtype, so straight copy here is OK.)
*
* We must copy the datum into aggcontext if it is pass-by-ref. We
* do not need to pfree the old transValue, since it's NULL.
*/
MemoryContextSwitchTo(winstate->aggcontext);
MemoryContextSwitchTo(peraggstate->aggcontext);
peraggstate->transValue = datumCopy(fcinfo->arg[1],
peraggstate->transtypeByVal,
peraggstate->transtypeLen);
peraggstate->transValueIsNull = false;
peraggstate->noTransValue = false;
peraggstate->transValueCount = 1;
MemoryContextSwitchTo(oldContext);
return;
}
if (peraggstate->transValueIsNull)
{
/*
* Don't call a strict function with NULL inputs. Note it is
* possible to get here despite the above tests, if the transfn is
* strict *and* returned a NULL on a prior cycle. If that happens
* we will propagate the NULL all the way to the end.
* strict *and* returned a NULL on a prior cycle. If that happens
* we will propagate the NULL all the way to the end. That can
* only happen if there's no inverse transition function, though,
* since we disallow transitions back to NULL when there is one.
*/
MemoryContextSwitchTo(oldContext);
Assert(!OidIsValid(peraggstate->invtransfn_oid));
return;
}
}
/*
* OK to call the transition function
* OK to call the transition function. Set winstate->curaggcontext while
* calling it, for possible use by AggCheckCallContext.
*/
InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn),
numArguments + 1,
@ -310,7 +337,26 @@ advance_windowaggregate(WindowAggState *winstate,
(void *) winstate, NULL);
fcinfo->arg[0] = peraggstate->transValue;
fcinfo->argnull[0] = peraggstate->transValueIsNull;
winstate->curaggcontext = peraggstate->aggcontext;
newVal = FunctionCallInvoke(fcinfo);
winstate->curaggcontext = NULL;
/*
* Moving-aggregate transition functions must not return NULL, see
* advance_windowaggregate_base().
*/
if (fcinfo->isnull && OidIsValid(peraggstate->invtransfn_oid))
ereport(ERROR,
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("moving-aggregate transition function must not return NULL")));
/*
* We must track the number of rows included in transValue, since to
* remove the last input, advance_windowaggregate_base() musn't call the
* inverse transition function, but simply reset transValue back to its
* initial value.
*/
peraggstate->transValueCount++;
/*
* If pass-by-ref datatype, must copy the new value into aggcontext and
@ -322,7 +368,7 @@ advance_windowaggregate(WindowAggState *winstate,
{
if (!fcinfo->isnull)
{
MemoryContextSwitchTo(winstate->aggcontext);
MemoryContextSwitchTo(peraggstate->aggcontext);
newVal = datumCopy(newVal,
peraggstate->transtypeByVal,
peraggstate->transtypeLen);
@ -336,6 +382,162 @@ advance_windowaggregate(WindowAggState *winstate,
peraggstate->transValueIsNull = fcinfo->isnull;
}
/*
* advance_windowaggregate_base
* Remove the oldest tuple from an aggregation.
*
* This is very much like advance_windowaggregate, except that we will call
* the inverse transition function (which caller must have checked is
* available).
*
* Returns true if we successfully removed the current row from this
* aggregate, false if not (in the latter case, caller is responsible
* for cleaning up by restarting the aggregation).
*/
static bool
advance_windowaggregate_base(WindowAggState *winstate,
WindowStatePerFunc perfuncstate,
WindowStatePerAgg peraggstate)
{
WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
int numArguments = perfuncstate->numArguments;
FunctionCallInfoData fcinfodata;
FunctionCallInfo fcinfo = &fcinfodata;
Datum newVal;
ListCell *arg;
int i;
MemoryContext oldContext;
ExprContext *econtext = winstate->tmpcontext;
ExprState *filter = wfuncstate->aggfilter;
oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
/* Skip anything FILTERed out */
if (filter)
{
bool isnull;
Datum res = ExecEvalExpr(filter, econtext, &isnull, NULL);
if (isnull || !DatumGetBool(res))
{
MemoryContextSwitchTo(oldContext);
return true;
}
}
/* We start from 1, since the 0th arg will be the transition value */
i = 1;
foreach(arg, wfuncstate->args)
{
ExprState *argstate = (ExprState *) lfirst(arg);
fcinfo->arg[i] = ExecEvalExpr(argstate, econtext,
&fcinfo->argnull[i], NULL);
i++;
}
if (peraggstate->invtransfn.fn_strict)
{
/*
* For a strict (inv)transfn, nothing happens when there's a NULL
* input; we just keep the prior transValue. Note transValueCount
* doesn't change either.
*/
for (i = 1; i <= numArguments; i++)
{
if (fcinfo->argnull[i])
{
MemoryContextSwitchTo(oldContext);
return true;
}
}
}
/* There should still be an added but not yet removed value */
Assert(peraggstate->transValueCount > 0);
/*
* In moving-aggregate mode, the state must never be NULL, except possibly
* before any rows have been aggregated (which is surely not the case at
* this point). This restriction allows us to interpret a NULL result
* from the inverse function as meaning "sorry, can't do an inverse
* transition in this case". We already checked this in
* advance_windowaggregate, but just for safety, check again.
*/
if (peraggstate->transValueIsNull)
elog(ERROR, "aggregate transition value is NULL before inverse transition");
/*
* We mustn't use the inverse transition function to remove the last
* input. Doing so would yield a non-NULL state, whereas we should be in
* the initial state afterwards which may very well be NULL. So instead,
* we simply re-initialize the aggregate in this case.
*/
if (peraggstate->transValueCount == 1)
{
MemoryContextSwitchTo(oldContext);
initialize_windowaggregate(winstate,
&winstate->perfunc[peraggstate->wfuncno],
peraggstate);
return true;
}
/*
* OK to call the inverse transition function. Set
* winstate->curaggcontext while calling it, for possible use by
* AggCheckCallContext.
*/
InitFunctionCallInfoData(*fcinfo, &(peraggstate->invtransfn),
numArguments + 1,
perfuncstate->winCollation,
(void *) winstate, NULL);
fcinfo->arg[0] = peraggstate->transValue;
fcinfo->argnull[0] = peraggstate->transValueIsNull;
winstate->curaggcontext = peraggstate->aggcontext;
newVal = FunctionCallInvoke(fcinfo);
winstate->curaggcontext = NULL;
/*
* If the function returns NULL, report failure, forcing a restart.
*/
if (fcinfo->isnull)
{
MemoryContextSwitchTo(oldContext);
return false;
}
/* Update number of rows included in transValue */
peraggstate->transValueCount--;
/*
* If pass-by-ref datatype, must copy the new value into aggcontext and
* pfree the prior transValue. But if invtransfn returned a pointer to
* its first input, we don't need to do anything.
*
* Note: the checks for null values here will never fire, but it seems
* best to have this stanza look just like advance_windowaggregate.
*/
if (!peraggstate->transtypeByVal &&
DatumGetPointer(newVal) != DatumGetPointer(peraggstate->transValue))
{
if (!fcinfo->isnull)
{
MemoryContextSwitchTo(peraggstate->aggcontext);
newVal = datumCopy(newVal,
peraggstate->transtypeByVal,
peraggstate->transtypeLen);
}
if (!peraggstate->transValueIsNull)
pfree(DatumGetPointer(peraggstate->transValue));
}
MemoryContextSwitchTo(oldContext);
peraggstate->transValue = newVal;
peraggstate->transValueIsNull = fcinfo->isnull;
return true;
}
/*
* finalize_windowaggregate
* parallel to finalize_aggregate in nodeAgg.c
@ -370,7 +572,9 @@ finalize_windowaggregate(WindowAggState *winstate,
}
else
{
winstate->curaggcontext = peraggstate->aggcontext;
*result = FunctionCallInvoke(&fcinfo);
winstate->curaggcontext = NULL;
*isnull = fcinfo.isnull;
}
}
@ -396,7 +600,9 @@ finalize_windowaggregate(WindowAggState *winstate,
* eval_windowaggregates
* evaluate plain aggregates being used as window functions
*
* Much of this is duplicated from nodeAgg.c. But NOTE that we expect to be
* This differs from nodeAgg.c in two ways. First, if the window's frame
* start position moves, we use the inverse transition function (if it exists)
* to remove rows from the transition value. And second, we expect to be
* able to call aggregate final functions repeatedly after aggregating more
* data onto the same transition value. This is not a behavior required by
* nodeAgg.c.
@ -406,12 +612,15 @@ eval_windowaggregates(WindowAggState *winstate)
{
WindowStatePerAgg peraggstate;
int wfuncno,
numaggs;
int i;
numaggs,
numaggs_restart,
i;
int64 aggregatedupto_nonrestarted;
MemoryContext oldContext;
ExprContext *econtext;
WindowObject agg_winobj;
TupleTableSlot *agg_row_slot;
TupleTableSlot *temp_slot;
numaggs = winstate->numaggs;
if (numaggs == 0)
@ -421,6 +630,7 @@ eval_windowaggregates(WindowAggState *winstate)
econtext = winstate->ss.ps.ps_ExprContext;
agg_winobj = winstate->agg_winobj;
agg_row_slot = winstate->agg_row_slot;
temp_slot = winstate->temp_slot_1;
/*
* Currently, we support only a subset of the SQL-standard window framing
@ -438,9 +648,17 @@ eval_windowaggregates(WindowAggState *winstate)
* damage the running transition value, but we have the same assumption in
* nodeAgg.c too (when it rescans an existing hash table).
*
* For other frame start rules, we discard the aggregate state and re-run
* the aggregates whenever the frame head row moves. We can still
* optimize as above whenever successive rows share the same frame head.
* If the frame start does sometimes move, we can still optimize as above
* whenever successive rows share the same frame head, but if the frame
* head moves beyond the previous head we try to remove those rows using
* the aggregate's inverse transition function. This function restores
* the aggregate's current state to what it would be if the removed row
* had never been aggregated in the first place. Inverse transition
* functions may optionally return NULL, indicating that the function was
* unable to remove the tuple from aggregation. If this happens, or if
* the aggregate doesn't have an inverse transition function at all, we
* must perform the aggregation all over again for all tuples within the
* new frame boundaries.
*
* In many common cases, multiple rows share the same frame and hence the
* same aggregate value. (In particular, if there's no ORDER BY in a RANGE
@ -452,63 +670,31 @@ eval_windowaggregates(WindowAggState *winstate)
* 'aggregatedupto' keeps track of the first row that has not yet been
* accumulated into the aggregate transition values. Whenever we start a
* new peer group, we accumulate forward to the end of the peer group.
*
* TODO: Rerunning aggregates from the frame start can be pretty slow. For
* some aggregates like SUM and COUNT we could avoid that by implementing
* a "negative transition function" that would be called for each row as
* it exits the frame. We'd have to think about avoiding recalculation of
* volatile arguments of aggregate functions, too.
*/
/*
* First, update the frame head position.
*
* The frame head should never move backwards, and the code below wouldn't
* cope if it did, so for safety we complain if it does.
*/
update_frameheadpos(agg_winobj, winstate->temp_slot_1);
update_frameheadpos(agg_winobj, temp_slot);
if (winstate->frameheadpos < winstate->aggregatedbase)
elog(ERROR, "window frame head moved backward");
/*
* Initialize aggregates on first call for partition, or if the frame head
* position moved since last time.
* If the frame didn't change compared to the previous row, we can re-use
* the result values that were previously saved at the bottom of this
* function. Since we don't know the current frame's end yet, this is not
* possible to check for fully. But if the frame end mode is UNBOUNDED
* FOLLOWING or CURRENT ROW, and the current row lies within the previous
* row's frame, then the two frames' ends must coincide. Note that on the
* first row aggregatedbase == aggregatedupto, meaning this test must
* fail, so we don't need to check the "there was no previous row" case
* explicitly here.
*/
if (winstate->currentpos == 0 ||
winstate->frameheadpos != winstate->aggregatedbase)
{
/*
* Discard transient aggregate values
*/
MemoryContextResetAndDeleteChildren(winstate->aggcontext);
for (i = 0; i < numaggs; i++)
{
peraggstate = &winstate->peragg[i];
wfuncno = peraggstate->wfuncno;
initialize_windowaggregate(winstate,
&winstate->perfunc[wfuncno],
peraggstate);
}
/*
* If we created a mark pointer for aggregates, keep it pushed up to
* frame head, so that tuplestore can discard unnecessary rows.
*/
if (agg_winobj->markptr >= 0)
WinSetMarkPosition(agg_winobj, winstate->frameheadpos);
/*
* Initialize for loop below
*/
ExecClearTuple(agg_row_slot);
winstate->aggregatedbase = winstate->frameheadpos;
winstate->aggregatedupto = winstate->frameheadpos;
}
/*
* In UNBOUNDED_FOLLOWING mode, we don't have to recalculate aggregates
* except when the frame head moves. In END_CURRENT_ROW mode, we only
* have to recalculate when the frame head moves or currentpos has
* advanced past the place we'd aggregated up to. Check for these cases
* and if so, reuse the saved result values.
*/
if ((winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING |
if (winstate->aggregatedbase == winstate->frameheadpos &&
(winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING |
FRAMEOPTION_END_CURRENT_ROW)) &&
winstate->aggregatedbase <= winstate->currentpos &&
winstate->aggregatedupto > winstate->currentpos)
@ -523,6 +709,158 @@ eval_windowaggregates(WindowAggState *winstate)
return;
}
/*----------
* Initialize restart flags.
*
* We restart the aggregation:
* - if we're processing the first row in the partition, or
* - if the frame's head moved and we cannot use an inverse
* transition function, or
* - if the new frame doesn't overlap the old one
*
* Note that we don't strictly need to restart in the last case, but if
* we're going to remove all rows from the aggregation anyway, a restart
* surely is faster.
*----------
*/
numaggs_restart = 0;
for (i = 0; i < numaggs; i++)
{
peraggstate = &winstate->peragg[i];
if (winstate->currentpos == 0 ||
(winstate->aggregatedbase != winstate->frameheadpos &&
!OidIsValid(peraggstate->invtransfn_oid)) ||
winstate->aggregatedupto <= winstate->frameheadpos)
{
peraggstate->restart = true;
numaggs_restart++;
}
else
peraggstate->restart = false;
}
/*
* If we have any possibly-moving aggregates, attempt to advance
* aggregatedbase to match the frame's head by removing input rows that
* fell off the top of the frame from the aggregations. This can fail,
* i.e. advance_windowaggregate_base() can return false, in which case
* we'll restart that aggregate below.
*/
while (numaggs_restart < numaggs &&
winstate->aggregatedbase < winstate->frameheadpos)
{
/*
* Fetch the next tuple of those being removed. This should never fail
* as we should have been here before.
*/
if (!window_gettupleslot(agg_winobj, winstate->aggregatedbase,
temp_slot))
elog(ERROR, "could not re-fetch previously fetched frame row");
/* Set tuple context for evaluation of aggregate arguments */
winstate->tmpcontext->ecxt_outertuple = temp_slot;
/*
* Perform the inverse transition for each aggregate function in the
* window, unless it has already been marked as needing a restart.
*/
for (i = 0; i < numaggs; i++)
{
bool ok;
peraggstate = &winstate->peragg[i];
if (peraggstate->restart)
continue;
wfuncno = peraggstate->wfuncno;
ok = advance_windowaggregate_base(winstate,
&winstate->perfunc[wfuncno],
peraggstate);
if (!ok)
{
/* Inverse transition function has failed, must restart */
peraggstate->restart = true;
numaggs_restart++;
}
}
/* Reset per-input-tuple context after each tuple */
ResetExprContext(winstate->tmpcontext);
/* And advance the aggregated-row state */
winstate->aggregatedbase++;
ExecClearTuple(temp_slot);
}
/*
* If we successfully advanced the base rows of all the aggregates,
* aggregatedbase now equals frameheadpos; but if we failed for any, we
* must forcibly update aggregatedbase.
*/
winstate->aggregatedbase = winstate->frameheadpos;
/*
* If we created a mark pointer for aggregates, keep it pushed up to frame
* head, so that tuplestore can discard unnecessary rows.
*/
if (agg_winobj->markptr >= 0)
WinSetMarkPosition(agg_winobj, winstate->frameheadpos);
/*
* Now restart the aggregates that require it.
*
* We assume that aggregates using the shared context always restart if
* *any* aggregate restarts, and we may thus clean up the shared
* aggcontext if that is the case. Private aggcontexts are reset by
* initialize_windowaggregate() if their owning aggregate restarts. If we
* aren't restarting an aggregate, we need to free any previously saved
* result for it, else we'll leak memory.
*/
if (numaggs_restart > 0)
MemoryContextResetAndDeleteChildren(winstate->aggcontext);
for (i = 0; i < numaggs; i++)
{
peraggstate = &winstate->peragg[i];
/* Aggregates using the shared ctx must restart if *any* agg does */
Assert(peraggstate->aggcontext != winstate->aggcontext ||
numaggs_restart == 0 ||
peraggstate->restart);
if (peraggstate->restart)
{
wfuncno = peraggstate->wfuncno;
initialize_windowaggregate(winstate,
&winstate->perfunc[wfuncno],
peraggstate);
}
else if (!peraggstate->resultValueIsNull)
{
if (!peraggstate->resulttypeByVal)
pfree(DatumGetPointer(peraggstate->resultValue));
peraggstate->resultValue = (Datum) 0;
peraggstate->resultValueIsNull = true;
}
}
/*
* Non-restarted aggregates now contain the rows between aggregatedbase
* (i.e., frameheadpos) and aggregatedupto, while restarted aggregates
* contain no rows. If there are any restarted aggregates, we must thus
* begin aggregating anew at frameheadpos, otherwise we may simply
* continue at aggregatedupto. We must remember the old value of
* aggregatedupto to know how long to skip advancing non-restarted
* aggregates. If we modify aggregatedupto, we must also clear
* agg_row_slot, per the loop invariant below.
*/
aggregatedupto_nonrestarted = winstate->aggregatedupto;
if (numaggs_restart > 0 &&
winstate->aggregatedupto != winstate->frameheadpos)
{
winstate->aggregatedupto = winstate->frameheadpos;
ExecClearTuple(agg_row_slot);
}
/*
* Advance until we reach a row not in frame (or end of partition).
*
@ -551,6 +889,12 @@ eval_windowaggregates(WindowAggState *winstate)
for (i = 0; i < numaggs; i++)
{
peraggstate = &winstate->peragg[i];
/* Non-restarted aggs skip until aggregatedupto_nonrestarted */
if (!peraggstate->restart &&
winstate->aggregatedupto < aggregatedupto_nonrestarted)
continue;
wfuncno = peraggstate->wfuncno;
advance_windowaggregate(winstate,
&winstate->perfunc[wfuncno],
@ -565,6 +909,9 @@ eval_windowaggregates(WindowAggState *winstate)
ExecClearTuple(agg_row_slot);
}
/* The frame's end is not supposed to move backwards, ever */
Assert(aggregatedupto_nonrestarted <= winstate->aggregatedupto);
/*
* finalize aggregates and fill result/isnull fields.
*/
@ -589,28 +936,14 @@ eval_windowaggregates(WindowAggState *winstate)
* advance that the next row can't possibly share the same frame. Is
* it worth detecting that and skipping this code?
*/
if (!peraggstate->resulttypeByVal)
if (!peraggstate->resulttypeByVal && !*isnull)
{
/*
* clear old resultValue in order not to leak memory. (Note: the
* new result can't possibly be the same datum as old resultValue,
* because we never passed it to the trans function.)
*/
if (!peraggstate->resultValueIsNull)
pfree(DatumGetPointer(peraggstate->resultValue));
/*
* If pass-by-ref, copy it into our aggregate context.
*/
if (!*isnull)
{
oldContext = MemoryContextSwitchTo(winstate->aggcontext);
peraggstate->resultValue =
datumCopy(*result,
peraggstate->resulttypeByVal,
peraggstate->resulttypeLen);
MemoryContextSwitchTo(oldContext);
}
oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);
peraggstate->resultValue =
datumCopy(*result,
peraggstate->resulttypeByVal,
peraggstate->resulttypeLen);
MemoryContextSwitchTo(oldContext);
}
else
{
@ -650,6 +983,8 @@ eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate,
(void *) perfuncstate->winobj, NULL);
/* Just in case, make all the regular argument slots be null */
memset(fcinfo.argnull, true, perfuncstate->numArguments);
/* Window functions don't have a current aggregate context, either */
winstate->curaggcontext = NULL;
*result = FunctionCallInvoke(&fcinfo);
*isnull = fcinfo.isnull;
@ -870,6 +1205,11 @@ release_partition(WindowAggState *winstate)
*/
MemoryContextResetAndDeleteChildren(winstate->partcontext);
MemoryContextResetAndDeleteChildren(winstate->aggcontext);
for (i = 0; i < winstate->numaggs; i++)
{
if (winstate->peragg[i].aggcontext != winstate->aggcontext)
MemoryContextResetAndDeleteChildren(winstate->peragg[i].aggcontext);
}
if (winstate->buffer)
tuplestore_end(winstate->buffer);
@ -1450,7 +1790,12 @@ ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
/* Create mid-lived context for aggregate trans values etc */
/*
* Create mid-lived context for aggregate trans values etc.
*
* Note that moving aggregates each use their own private context, not
* this one.
*/
winstate->aggcontext =
AllocSetContextCreate(CurrentMemoryContext,
"WindowAgg_Aggregates",
@ -1657,12 +2002,10 @@ void
ExecEndWindowAgg(WindowAggState *node)
{
PlanState *outerPlan;
int i;
release_partition(node);
pfree(node->perfunc);
pfree(node->peragg);
ExecClearTuple(node->ss.ss_ScanTupleSlot);
ExecClearTuple(node->first_part_slot);
ExecClearTuple(node->agg_row_slot);
@ -1676,9 +2019,17 @@ ExecEndWindowAgg(WindowAggState *node)
node->ss.ps.ps_ExprContext = node->tmpcontext;
ExecFreeExprContext(&node->ss.ps);
for (i = 0; i < node->numaggs; i++)
{
if (node->peragg[i].aggcontext != node->aggcontext)
MemoryContextDelete(node->peragg[i].aggcontext);
}
MemoryContextDelete(node->partcontext);
MemoryContextDelete(node->aggcontext);
pfree(node->perfunc);
pfree(node->peragg);
outerPlan = outerPlanState(node);
ExecEndNode(outerPlan);
}
@ -1733,10 +2084,13 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
HeapTuple aggTuple;
Form_pg_aggregate aggform;
Oid aggtranstype;
AttrNumber initvalAttNo;
AclResult aclresult;
Oid transfn_oid,
invtransfn_oid,
finalfn_oid;
Expr *transfnexpr,
*invtransfnexpr,
*finalfnexpr;
Datum textInitVal;
int i;
@ -1756,14 +2110,40 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
wfunc->winfnoid);
aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple);
/*
* Figure out whether we want to use the moving-aggregate implementation,
* and collect the right set of fields from the pg_attribute entry.
*
* If the frame head can't move, we don't need moving-aggregate code. Even
* if we'd like to use it, don't do so if the aggregate's arguments (and
* FILTER clause if any) contain any calls to volatile functions.
* Otherwise, the difference between restarting and not restarting the
* aggregation would be user-visible.
*/
if (OidIsValid(aggform->aggminvtransfn) &&
!(winstate->frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING) &&
!contain_volatile_functions((Node *) wfunc))
{
peraggstate->transfn_oid = transfn_oid = aggform->aggmtransfn;
peraggstate->invtransfn_oid = invtransfn_oid = aggform->aggminvtransfn;
peraggstate->finalfn_oid = finalfn_oid = aggform->aggmfinalfn;
aggtranstype = aggform->aggmtranstype;
initvalAttNo = Anum_pg_aggregate_aggminitval;
}
else
{
peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn;
peraggstate->invtransfn_oid = invtransfn_oid = InvalidOid;
peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
aggtranstype = aggform->aggtranstype;
initvalAttNo = Anum_pg_aggregate_agginitval;
}
/*
* ExecInitWindowAgg already checked permission to call aggregate function
* ... but we still need to check the component functions
*/
peraggstate->transfn_oid = transfn_oid = aggform->aggtransfn;
peraggstate->finalfn_oid = finalfn_oid = aggform->aggfinalfn;
/* Check that aggregate owner has permission to call component fns */
{
HeapTuple procTuple;
@ -1783,6 +2163,17 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
aclcheck_error(aclresult, ACL_KIND_PROC,
get_func_name(transfn_oid));
InvokeFunctionExecuteHook(transfn_oid);
if (OidIsValid(invtransfn_oid))
{
aclresult = pg_proc_aclcheck(invtransfn_oid, aggOwner,
ACL_EXECUTE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_PROC,
get_func_name(invtransfn_oid));
InvokeFunctionExecuteHook(invtransfn_oid);
}
if (OidIsValid(finalfn_oid))
{
aclresult = pg_proc_aclcheck(finalfn_oid, aggOwner,
@ -1796,7 +2187,7 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
/* resolve actual type of transition state, if polymorphic */
aggtranstype = resolve_aggregate_transtype(wfunc->winfnoid,
aggform->aggtranstype,
aggtranstype,
inputTypes,
numArguments);
@ -1810,13 +2201,21 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
wfunc->wintype,
wfunc->inputcollid,
transfn_oid,
invtransfn_oid,
finalfn_oid,
&transfnexpr,
&invtransfnexpr,
&finalfnexpr);
fmgr_info(transfn_oid, &peraggstate->transfn);
fmgr_info_set_expr((Node *) transfnexpr, &peraggstate->transfn);
if (OidIsValid(invtransfn_oid))
{
fmgr_info(invtransfn_oid, &peraggstate->invtransfn);
fmgr_info_set_expr((Node *) invtransfnexpr, &peraggstate->invtransfn);
}
if (OidIsValid(finalfn_oid))
{
fmgr_info(finalfn_oid, &peraggstate->finalfn);
@ -1834,8 +2233,7 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
* initval is potentially null, so don't try to access it as a struct
* field. Must do it the hard way with SysCacheGetAttr.
*/
textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple,
Anum_pg_aggregate_agginitval,
textInitVal = SysCacheGetAttr(AGGFNOID, aggTuple, initvalAttNo,
&peraggstate->initValueIsNull);
if (peraggstate->initValueIsNull)
@ -1848,7 +2246,8 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
* If the transfn is strict and the initval is NULL, make sure input type
* and transtype are the same (or at least binary-compatible), so that
* it's OK to use the first input value as the initial transValue. This
* should have been checked at agg definition time, but just in case...
* should have been checked at agg definition time, but we must check
* again in case the transfn's strictness property has been changed.
*/
if (peraggstate->transfn.fn_strict && peraggstate->initValueIsNull)
{
@ -1860,6 +2259,44 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
wfunc->winfnoid)));
}
/*
* Insist that forward and inverse transition functions have the same
* strictness setting. Allowing them to differ would require handling
* more special cases in advance_windowaggregate and
* advance_windowaggregate_base, for no discernible benefit. This should
* have been checked at agg definition time, but we must check again in
* case either function's strictness property has been changed.
*/
if (OidIsValid(invtransfn_oid) &&
peraggstate->transfn.fn_strict != peraggstate->invtransfn.fn_strict)
ereport(ERROR,
(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
errmsg("strictness of aggregate's forward and inverse transition functions must match")));
/*
* Moving aggregates use their own aggcontext.
*
* This is necessary because they might restart at different times, so we
* might never be able to reset the shared context otherwise. We can't
* make it the aggregates' responsibility to clean up after themselves,
* because strict aggregates must be restarted whenever we remove their
* last non-NULL input, which the aggregate won't be aware is happening.
* Also, just pfree()ing the transValue upon restarting wouldn't help,
* since we'd miss any indirectly referenced data. We could, in theory,
* make the memory allocation rules for moving aggregates different than
* they have historically been for plain aggregates, but that seems grotty
* and likely to lead to memory leaks.
*/
if (OidIsValid(invtransfn_oid))
peraggstate->aggcontext =
AllocSetContextCreate(CurrentMemoryContext,
"WindowAgg_AggregatePrivate",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
else
peraggstate->aggcontext = winstate->aggcontext;
ReleaseSysCache(aggTuple);
return peraggstate;

View File

@ -471,7 +471,11 @@ count_agg_clauses_walker(Node *node, count_agg_clauses_context *context)
Assert(aggref->agglevelsup == 0);
/* fetch info about aggregate from pg_aggregate */
/*
* Fetch info about aggregate from pg_aggregate. Note it's correct to
* ignore the moving-aggregate variant, since what we're concerned
* with here is aggregates not window functions.
*/
aggTuple = SearchSysCache1(AGGFNOID,
ObjectIdGetDatum(aggref->aggfnoid));
if (!HeapTupleIsValid(aggTuple))

View File

@ -1187,11 +1187,13 @@ resolve_aggregate_transtype(Oid aggfuncid,
* For an ordered-set aggregate, remember that agg_input_types describes
* the direct arguments followed by the aggregated arguments.
*
* transfn_oid and finalfn_oid identify the funcs to be called; the latter
* may be InvalidOid.
* transfn_oid, invtransfn_oid and finalfn_oid identify the funcs to be
* called; the latter two may be InvalidOid.
*
* Pointers to the constructed trees are returned into *transfnexpr and
* *finalfnexpr. The latter is set to NULL if there's no finalfn.
* Pointers to the constructed trees are returned into *transfnexpr,
* *invtransfnexpr and *finalfnexpr. If there is no invtransfn or finalfn,
* the respective pointers are set to NULL. Since use of the invtransfn is
* optional, NULL may be passed for invtransfnexpr.
*/
void
build_aggregate_fnexprs(Oid *agg_input_types,
@ -1203,8 +1205,10 @@ build_aggregate_fnexprs(Oid *agg_input_types,
Oid agg_result_type,
Oid agg_input_collation,
Oid transfn_oid,
Oid invtransfn_oid,
Oid finalfn_oid,
Expr **transfnexpr,
Expr **invtransfnexpr,
Expr **finalfnexpr)
{
Param *argp;
@ -1249,6 +1253,26 @@ build_aggregate_fnexprs(Oid *agg_input_types,
fexpr->funcvariadic = agg_variadic;
*transfnexpr = (Expr *) fexpr;
/*
* Build invtransfn expression if requested, with same args as transfn
*/
if (invtransfnexpr != NULL)
{
if (OidIsValid(invtransfn_oid))
{
fexpr = makeFuncExpr(invtransfn_oid,
agg_state_type,
args,
InvalidOid,
agg_input_collation,
COERCE_EXPLICIT_CALL);
fexpr->funcvariadic = agg_variadic;
*invtransfnexpr = (Expr *) fexpr;
}
else
*invtransfnexpr = NULL;
}
/* see if we have a final function */
if (!OidIsValid(finalfn_oid))
{

View File

@ -11548,20 +11548,32 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
PGresult *res;
int i_aggtransfn;
int i_aggfinalfn;
int i_aggmtransfn;
int i_aggminvtransfn;
int i_aggmfinalfn;
int i_aggsortop;
int i_hypothetical;
int i_aggtranstype;
int i_aggtransspace;
int i_aggmtranstype;
int i_aggmtransspace;
int i_agginitval;
int i_aggminitval;
int i_convertok;
const char *aggtransfn;
const char *aggfinalfn;
const char *aggmtransfn;
const char *aggminvtransfn;
const char *aggmfinalfn;
const char *aggsortop;
char *aggsortconvop;
bool hypothetical;
const char *aggtranstype;
const char *aggtransspace;
const char *aggmtranstype;
const char *aggmtransspace;
const char *agginitval;
const char *aggminitval;
bool convertok;
/* Skip if not to be dumped */
@ -11582,9 +11594,12 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
{
appendPQExpBuffer(query, "SELECT aggtransfn, "
"aggfinalfn, aggtranstype::pg_catalog.regtype, "
"aggmtransfn, aggminvtransfn, aggmfinalfn, "
"aggmtranstype::pg_catalog.regtype, "
"aggsortop::pg_catalog.regoperator, "
"(aggkind = 'h') as hypothetical, "
"aggtransspace, agginitval, "
"aggmtransspace, aggminitval, "
"'t'::boolean AS convertok, "
"pg_catalog.pg_get_function_arguments(p.oid) AS funcargs, "
"pg_catalog.pg_get_function_identity_arguments(p.oid) AS funciargs "
@ -11597,9 +11612,12 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
{
appendPQExpBuffer(query, "SELECT aggtransfn, "
"aggfinalfn, aggtranstype::pg_catalog.regtype, "
"'-' AS aggmtransfn, '-' AS aggminvtransfn, "
"'-' AS aggmfinalfn, 0 AS aggmtranstype, "
"aggsortop::pg_catalog.regoperator, "
"false as hypothetical, "
"0 AS aggtransspace, agginitval, "
"0 AS aggmtransspace, NULL AS aggminitval, "
"'t'::boolean AS convertok, "
"pg_catalog.pg_get_function_arguments(p.oid) AS funcargs, "
"pg_catalog.pg_get_function_identity_arguments(p.oid) AS funciargs "
@ -11612,9 +11630,12 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
{
appendPQExpBuffer(query, "SELECT aggtransfn, "
"aggfinalfn, aggtranstype::pg_catalog.regtype, "
"'-' AS aggmtransfn, '-' AS aggminvtransfn, "
"'-' AS aggmfinalfn, 0 AS aggmtranstype, "
"aggsortop::pg_catalog.regoperator, "
"false as hypothetical, "
"0 AS aggtransspace, agginitval, "
"0 AS aggmtransspace, NULL AS aggminitval, "
"'t'::boolean AS convertok "
"FROM pg_catalog.pg_aggregate a, pg_catalog.pg_proc p "
"WHERE a.aggfnoid = p.oid "
@ -11625,9 +11646,12 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
{
appendPQExpBuffer(query, "SELECT aggtransfn, "
"aggfinalfn, aggtranstype::pg_catalog.regtype, "
"'-' AS aggmtransfn, '-' AS aggminvtransfn, "
"'-' AS aggmfinalfn, 0 AS aggmtranstype, "
"0 AS aggsortop, "
"'f'::boolean as hypothetical, "
"0 AS aggtransspace, agginitval, "
"0 AS aggmtransspace, NULL AS aggminitval, "
"'t'::boolean AS convertok "
"FROM pg_catalog.pg_aggregate a, pg_catalog.pg_proc p "
"WHERE a.aggfnoid = p.oid "
@ -11638,9 +11662,12 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
{
appendPQExpBuffer(query, "SELECT aggtransfn, aggfinalfn, "
"format_type(aggtranstype, NULL) AS aggtranstype, "
"'-' AS aggmtransfn, '-' AS aggminvtransfn, "
"'-' AS aggmfinalfn, 0 AS aggmtranstype, "
"0 AS aggsortop, "
"'f'::boolean as hypothetical, "
"0 AS aggtransspace, agginitval, "
"0 AS aggmtransspace, NULL AS aggminitval, "
"'t'::boolean AS convertok "
"FROM pg_aggregate "
"WHERE oid = '%u'::oid",
@ -11651,9 +11678,12 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
appendPQExpBuffer(query, "SELECT aggtransfn1 AS aggtransfn, "
"aggfinalfn, "
"(SELECT typname FROM pg_type WHERE oid = aggtranstype1) AS aggtranstype, "
"'-' AS aggmtransfn, '-' AS aggminvtransfn, "
"'-' AS aggmfinalfn, 0 AS aggmtranstype, "
"0 AS aggsortop, "
"'f'::boolean as hypothetical, "
"0 AS aggtransspace, agginitval1 AS agginitval, "
"0 AS aggmtransspace, NULL AS aggminitval, "
"(aggtransfn2 = 0 and aggtranstype2 = 0 and agginitval2 is null) AS convertok "
"FROM pg_aggregate "
"WHERE oid = '%u'::oid",
@ -11664,20 +11694,32 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
i_aggtransfn = PQfnumber(res, "aggtransfn");
i_aggfinalfn = PQfnumber(res, "aggfinalfn");
i_aggmtransfn = PQfnumber(res, "aggmtransfn");
i_aggminvtransfn = PQfnumber(res, "aggminvtransfn");
i_aggmfinalfn = PQfnumber(res, "aggmfinalfn");
i_aggsortop = PQfnumber(res, "aggsortop");
i_hypothetical = PQfnumber(res, "hypothetical");
i_aggtranstype = PQfnumber(res, "aggtranstype");
i_aggtransspace = PQfnumber(res, "aggtransspace");
i_aggmtranstype = PQfnumber(res, "aggmtranstype");
i_aggmtransspace = PQfnumber(res, "aggmtransspace");
i_agginitval = PQfnumber(res, "agginitval");
i_aggminitval = PQfnumber(res, "aggminitval");
i_convertok = PQfnumber(res, "convertok");
aggtransfn = PQgetvalue(res, 0, i_aggtransfn);
aggfinalfn = PQgetvalue(res, 0, i_aggfinalfn);
aggmtransfn = PQgetvalue(res, 0, i_aggmtransfn);
aggminvtransfn = PQgetvalue(res, 0, i_aggminvtransfn);
aggmfinalfn = PQgetvalue(res, 0, i_aggmfinalfn);
aggsortop = PQgetvalue(res, 0, i_aggsortop);
hypothetical = (PQgetvalue(res, 0, i_hypothetical)[0] == 't');
aggtranstype = PQgetvalue(res, 0, i_aggtranstype);
aggtransspace = PQgetvalue(res, 0, i_aggtransspace);
aggmtranstype = PQgetvalue(res, 0, i_aggmtranstype);
aggmtransspace = PQgetvalue(res, 0, i_aggmtransspace);
agginitval = PQgetvalue(res, 0, i_agginitval);
aggminitval = PQgetvalue(res, 0, i_aggminitval);
convertok = (PQgetvalue(res, 0, i_convertok)[0] == 't');
if (fout->remoteVersion >= 80400)
@ -11751,6 +11793,32 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
aggfinalfn);
}
if (strcmp(aggmtransfn, "-") != 0)
{
appendPQExpBuffer(details, ",\n MSFUNC = %s,\n MINVFUNC = %s,\n MSTYPE = %s",
aggmtransfn,
aggminvtransfn,
aggmtranstype);
}
if (strcmp(aggmtransspace, "0") != 0)
{
appendPQExpBuffer(details, ",\n MSSPACE = %s",
aggmtransspace);
}
if (!PQgetisnull(res, 0, i_aggminitval))
{
appendPQExpBufferStr(details, ",\n MINITCOND = ");
appendStringLiteralAH(details, aggminitval, fout);
}
if (strcmp(aggmfinalfn, "-") != 0)
{
appendPQExpBuffer(details, ",\n MFINALFUNC = %s",
aggmfinalfn);
}
aggsortconvop = convertOperatorReference(fout, aggsortop);
if (aggsortconvop)
{

View File

@ -53,6 +53,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 201404082
#define CATALOG_VERSION_NO 201404121
#endif

View File

@ -32,10 +32,16 @@
* aggnumdirectargs number of arguments that are "direct" arguments
* aggtransfn transition function
* aggfinalfn final function (0 if none)
* aggmtransfn forward function for moving-aggregate mode (0 if none)
* aggminvtransfn inverse function for moving-aggregate mode (0 if none)
* aggmfinalfn final function for moving-aggregate mode (0 if none)
* aggsortop associated sort operator (0 if none)
* aggtranstype type of aggregate's transition (state) data
* aggtransspace estimated size of state data (0 for default estimate)
* aggmtranstype type of moving-aggregate state data (0 if none)
* aggmtransspace estimated size of moving-agg state (0 for default est)
* agginitval initial value for transition state (can be NULL)
* aggminitval initial value for moving-agg state (can be NULL)
* ----------------------------------------------------------------
*/
#define AggregateRelationId 2600
@ -47,12 +53,18 @@ CATALOG(pg_aggregate,2600) BKI_WITHOUT_OIDS
int16 aggnumdirectargs;
regproc aggtransfn;
regproc aggfinalfn;
regproc aggmtransfn;
regproc aggminvtransfn;
regproc aggmfinalfn;
Oid aggsortop;
Oid aggtranstype;
int32 aggtransspace;
Oid aggmtranstype;
int32 aggmtransspace;
#ifdef CATALOG_VARLEN /* variable-length fields start here */
text agginitval;
text aggminitval;
#endif
} FormData_pg_aggregate;
@ -68,16 +80,22 @@ typedef FormData_pg_aggregate *Form_pg_aggregate;
* ----------------
*/
#define Natts_pg_aggregate 9
#define Natts_pg_aggregate 15
#define Anum_pg_aggregate_aggfnoid 1
#define Anum_pg_aggregate_aggkind 2
#define Anum_pg_aggregate_aggnumdirectargs 3
#define Anum_pg_aggregate_aggtransfn 4
#define Anum_pg_aggregate_aggfinalfn 5
#define Anum_pg_aggregate_aggsortop 6
#define Anum_pg_aggregate_aggtranstype 7
#define Anum_pg_aggregate_aggtransspace 8
#define Anum_pg_aggregate_agginitval 9
#define Anum_pg_aggregate_aggmtransfn 6
#define Anum_pg_aggregate_aggminvtransfn 7
#define Anum_pg_aggregate_aggmfinalfn 8
#define Anum_pg_aggregate_aggsortop 9
#define Anum_pg_aggregate_aggtranstype 10
#define Anum_pg_aggregate_aggtransspace 11
#define Anum_pg_aggregate_aggmtranstype 12
#define Anum_pg_aggregate_aggmtransspace 13
#define Anum_pg_aggregate_agginitval 14
#define Anum_pg_aggregate_aggminitval 15
/*
* Symbolic values for aggkind column. We distinguish normal aggregates
@ -101,177 +119,177 @@ typedef FormData_pg_aggregate *Form_pg_aggregate;
*/
/* avg */
DATA(insert ( 2100 n 0 int8_avg_accum numeric_avg 0 2281 128 _null_ ));
DATA(insert ( 2101 n 0 int4_avg_accum int8_avg 0 1016 0 "{0,0}" ));
DATA(insert ( 2102 n 0 int2_avg_accum int8_avg 0 1016 0 "{0,0}" ));
DATA(insert ( 2103 n 0 numeric_avg_accum numeric_avg 0 2281 128 _null_ ));
DATA(insert ( 2104 n 0 float4_accum float8_avg 0 1022 0 "{0,0,0}" ));
DATA(insert ( 2105 n 0 float8_accum float8_avg 0 1022 0 "{0,0,0}" ));
DATA(insert ( 2106 n 0 interval_accum interval_avg 0 1187 0 "{0 second,0 second}" ));
DATA(insert ( 2100 n 0 int8_avg_accum numeric_avg - - - 0 2281 128 0 0 _null_ _null_ ));
DATA(insert ( 2101 n 0 int4_avg_accum int8_avg - - - 0 1016 0 0 0 "{0,0}" _null_ ));
DATA(insert ( 2102 n 0 int2_avg_accum int8_avg - - - 0 1016 0 0 0 "{0,0}" _null_ ));
DATA(insert ( 2103 n 0 numeric_avg_accum numeric_avg - - - 0 2281 128 0 0 _null_ _null_ ));
DATA(insert ( 2104 n 0 float4_accum float8_avg - - - 0 1022 0 0 0 "{0,0,0}" _null_ ));
DATA(insert ( 2105 n 0 float8_accum float8_avg - - - 0 1022 0 0 0 "{0,0,0}" _null_ ));
DATA(insert ( 2106 n 0 interval_accum interval_avg - - - 0 1187 0 0 0 "{0 second,0 second}" _null_ ));
/* sum */
DATA(insert ( 2107 n 0 int8_avg_accum numeric_sum 0 2281 128 _null_ ));
DATA(insert ( 2108 n 0 int4_sum - 0 20 0 _null_ ));
DATA(insert ( 2109 n 0 int2_sum - 0 20 0 _null_ ));
DATA(insert ( 2110 n 0 float4pl - 0 700 0 _null_ ));
DATA(insert ( 2111 n 0 float8pl - 0 701 0 _null_ ));
DATA(insert ( 2112 n 0 cash_pl - 0 790 0 _null_ ));
DATA(insert ( 2113 n 0 interval_pl - 0 1186 0 _null_ ));
DATA(insert ( 2114 n 0 numeric_avg_accum numeric_sum 0 2281 128 _null_ ));
DATA(insert ( 2107 n 0 int8_avg_accum numeric_sum - - - 0 2281 128 0 0 _null_ _null_ ));
DATA(insert ( 2108 n 0 int4_sum - - - - 0 20 0 0 0 _null_ _null_ ));
DATA(insert ( 2109 n 0 int2_sum - - - - 0 20 0 0 0 _null_ _null_ ));
DATA(insert ( 2110 n 0 float4pl - - - - 0 700 0 0 0 _null_ _null_ ));
DATA(insert ( 2111 n 0 float8pl - - - - 0 701 0 0 0 _null_ _null_ ));
DATA(insert ( 2112 n 0 cash_pl - - - - 0 790 0 0 0 _null_ _null_ ));
DATA(insert ( 2113 n 0 interval_pl - - - - 0 1186 0 0 0 _null_ _null_ ));
DATA(insert ( 2114 n 0 numeric_avg_accum numeric_sum - - - 0 2281 128 0 0 _null_ _null_ ));
/* max */
DATA(insert ( 2115 n 0 int8larger - 413 20 0 _null_ ));
DATA(insert ( 2116 n 0 int4larger - 521 23 0 _null_ ));
DATA(insert ( 2117 n 0 int2larger - 520 21 0 _null_ ));
DATA(insert ( 2118 n 0 oidlarger - 610 26 0 _null_ ));
DATA(insert ( 2119 n 0 float4larger - 623 700 0 _null_ ));
DATA(insert ( 2120 n 0 float8larger - 674 701 0 _null_ ));
DATA(insert ( 2121 n 0 int4larger - 563 702 0 _null_ ));
DATA(insert ( 2122 n 0 date_larger - 1097 1082 0 _null_ ));
DATA(insert ( 2123 n 0 time_larger - 1112 1083 0 _null_ ));
DATA(insert ( 2124 n 0 timetz_larger - 1554 1266 0 _null_ ));
DATA(insert ( 2125 n 0 cashlarger - 903 790 0 _null_ ));
DATA(insert ( 2126 n 0 timestamp_larger - 2064 1114 0 _null_ ));
DATA(insert ( 2127 n 0 timestamptz_larger - 1324 1184 0 _null_ ));
DATA(insert ( 2128 n 0 interval_larger - 1334 1186 0 _null_ ));
DATA(insert ( 2129 n 0 text_larger - 666 25 0 _null_ ));
DATA(insert ( 2130 n 0 numeric_larger - 1756 1700 0 _null_ ));
DATA(insert ( 2050 n 0 array_larger - 1073 2277 0 _null_ ));
DATA(insert ( 2244 n 0 bpchar_larger - 1060 1042 0 _null_ ));
DATA(insert ( 2797 n 0 tidlarger - 2800 27 0 _null_ ));
DATA(insert ( 3526 n 0 enum_larger - 3519 3500 0 _null_ ));
DATA(insert ( 2115 n 0 int8larger - - - - 413 20 0 0 0 _null_ _null_ ));
DATA(insert ( 2116 n 0 int4larger - - - - 521 23 0 0 0 _null_ _null_ ));
DATA(insert ( 2117 n 0 int2larger - - - - 520 21 0 0 0 _null_ _null_ ));
DATA(insert ( 2118 n 0 oidlarger - - - - 610 26 0 0 0 _null_ _null_ ));
DATA(insert ( 2119 n 0 float4larger - - - - 623 700 0 0 0 _null_ _null_ ));
DATA(insert ( 2120 n 0 float8larger - - - - 674 701 0 0 0 _null_ _null_ ));
DATA(insert ( 2121 n 0 int4larger - - - - 563 702 0 0 0 _null_ _null_ ));
DATA(insert ( 2122 n 0 date_larger - - - - 1097 1082 0 0 0 _null_ _null_ ));
DATA(insert ( 2123 n 0 time_larger - - - - 1112 1083 0 0 0 _null_ _null_ ));
DATA(insert ( 2124 n 0 timetz_larger - - - - 1554 1266 0 0 0 _null_ _null_ ));
DATA(insert ( 2125 n 0 cashlarger - - - - 903 790 0 0 0 _null_ _null_ ));
DATA(insert ( 2126 n 0 timestamp_larger - - - - 2064 1114 0 0 0 _null_ _null_ ));
DATA(insert ( 2127 n 0 timestamptz_larger - - - - 1324 1184 0 0 0 _null_ _null_ ));
DATA(insert ( 2128 n 0 interval_larger - - - - 1334 1186 0 0 0 _null_ _null_ ));
DATA(insert ( 2129 n 0 text_larger - - - - 666 25 0 0 0 _null_ _null_ ));
DATA(insert ( 2130 n 0 numeric_larger - - - - 1756 1700 0 0 0 _null_ _null_ ));
DATA(insert ( 2050 n 0 array_larger - - - - 1073 2277 0 0 0 _null_ _null_ ));
DATA(insert ( 2244 n 0 bpchar_larger - - - - 1060 1042 0 0 0 _null_ _null_ ));
DATA(insert ( 2797 n 0 tidlarger - - - - 2800 27 0 0 0 _null_ _null_ ));
DATA(insert ( 3526 n 0 enum_larger - - - - 3519 3500 0 0 0 _null_ _null_ ));
/* min */
DATA(insert ( 2131 n 0 int8smaller - 412 20 0 _null_ ));
DATA(insert ( 2132 n 0 int4smaller - 97 23 0 _null_ ));
DATA(insert ( 2133 n 0 int2smaller - 95 21 0 _null_ ));
DATA(insert ( 2134 n 0 oidsmaller - 609 26 0 _null_ ));
DATA(insert ( 2135 n 0 float4smaller - 622 700 0 _null_ ));
DATA(insert ( 2136 n 0 float8smaller - 672 701 0 _null_ ));
DATA(insert ( 2137 n 0 int4smaller - 562 702 0 _null_ ));
DATA(insert ( 2138 n 0 date_smaller - 1095 1082 0 _null_ ));
DATA(insert ( 2139 n 0 time_smaller - 1110 1083 0 _null_ ));
DATA(insert ( 2140 n 0 timetz_smaller - 1552 1266 0 _null_ ));
DATA(insert ( 2141 n 0 cashsmaller - 902 790 0 _null_ ));
DATA(insert ( 2142 n 0 timestamp_smaller - 2062 1114 0 _null_ ));
DATA(insert ( 2143 n 0 timestamptz_smaller - 1322 1184 0 _null_ ));
DATA(insert ( 2144 n 0 interval_smaller - 1332 1186 0 _null_ ));
DATA(insert ( 2145 n 0 text_smaller - 664 25 0 _null_ ));
DATA(insert ( 2146 n 0 numeric_smaller - 1754 1700 0 _null_ ));
DATA(insert ( 2051 n 0 array_smaller - 1072 2277 0 _null_ ));
DATA(insert ( 2245 n 0 bpchar_smaller - 1058 1042 0 _null_ ));
DATA(insert ( 2798 n 0 tidsmaller - 2799 27 0 _null_ ));
DATA(insert ( 3527 n 0 enum_smaller - 3518 3500 0 _null_ ));
DATA(insert ( 2131 n 0 int8smaller - - - - 412 20 0 0 0 _null_ _null_ ));
DATA(insert ( 2132 n 0 int4smaller - - - - 97 23 0 0 0 _null_ _null_ ));
DATA(insert ( 2133 n 0 int2smaller - - - - 95 21 0 0 0 _null_ _null_ ));
DATA(insert ( 2134 n 0 oidsmaller - - - - 609 26 0 0 0 _null_ _null_ ));
DATA(insert ( 2135 n 0 float4smaller - - - - 622 700 0 0 0 _null_ _null_ ));
DATA(insert ( 2136 n 0 float8smaller - - - - 672 701 0 0 0 _null_ _null_ ));
DATA(insert ( 2137 n 0 int4smaller - - - - 562 702 0 0 0 _null_ _null_ ));
DATA(insert ( 2138 n 0 date_smaller - - - - 1095 1082 0 0 0 _null_ _null_ ));
DATA(insert ( 2139 n 0 time_smaller - - - - 1110 1083 0 0 0 _null_ _null_ ));
DATA(insert ( 2140 n 0 timetz_smaller - - - - 1552 1266 0 0 0 _null_ _null_ ));
DATA(insert ( 2141 n 0 cashsmaller - - - - 902 790 0 0 0 _null_ _null_ ));
DATA(insert ( 2142 n 0 timestamp_smaller - - - - 2062 1114 0 0 0 _null_ _null_ ));
DATA(insert ( 2143 n 0 timestamptz_smaller - - - - 1322 1184 0 0 0 _null_ _null_ ));
DATA(insert ( 2144 n 0 interval_smaller - - - - 1332 1186 0 0 0 _null_ _null_ ));
DATA(insert ( 2145 n 0 text_smaller - - - - 664 25 0 0 0 _null_ _null_ ));
DATA(insert ( 2146 n 0 numeric_smaller - - - - 1754 1700 0 0 0 _null_ _null_ ));
DATA(insert ( 2051 n 0 array_smaller - - - - 1072 2277 0 0 0 _null_ _null_ ));
DATA(insert ( 2245 n 0 bpchar_smaller - - - - 1058 1042 0 0 0 _null_ _null_ ));
DATA(insert ( 2798 n 0 tidsmaller - - - - 2799 27 0 0 0 _null_ _null_ ));
DATA(insert ( 3527 n 0 enum_smaller - - - - 3518 3500 0 0 0 _null_ _null_ ));
/* count */
DATA(insert ( 2147 n 0 int8inc_any - 0 20 0 "0" ));
DATA(insert ( 2803 n 0 int8inc - 0 20 0 "0" ));
DATA(insert ( 2147 n 0 int8inc_any - - - - 0 20 0 0 0 "0" _null_ ));
DATA(insert ( 2803 n 0 int8inc - - - - 0 20 0 0 0 "0" _null_ ));
/* var_pop */
DATA(insert ( 2718 n 0 int8_accum numeric_var_pop 0 2281 128 _null_ ));
DATA(insert ( 2719 n 0 int4_accum numeric_var_pop 0 2281 128 _null_ ));
DATA(insert ( 2720 n 0 int2_accum numeric_var_pop 0 2281 128 _null_ ));
DATA(insert ( 2721 n 0 float4_accum float8_var_pop 0 1022 0 "{0,0,0}" ));
DATA(insert ( 2722 n 0 float8_accum float8_var_pop 0 1022 0 "{0,0,0}" ));
DATA(insert ( 2723 n 0 numeric_accum numeric_var_pop 0 2281 128 _null_ ));
DATA(insert ( 2718 n 0 int8_accum numeric_var_pop - - - 0 2281 128 0 0 _null_ _null_ ));
DATA(insert ( 2719 n 0 int4_accum numeric_var_pop - - - 0 2281 128 0 0 _null_ _null_ ));
DATA(insert ( 2720 n 0 int2_accum numeric_var_pop - - - 0 2281 128 0 0 _null_ _null_ ));
DATA(insert ( 2721 n 0 float4_accum float8_var_pop - - - 0 1022 0 0 0 "{0,0,0}" _null_ ));
DATA(insert ( 2722 n 0 float8_accum float8_var_pop - - - 0 1022 0 0 0 "{0,0,0}" _null_ ));
DATA(insert ( 2723 n 0 numeric_accum numeric_var_pop - - - 0 2281 128 0 0 _null_ _null_ ));
/* var_samp */
DATA(insert ( 2641 n 0 int8_accum numeric_var_samp 0 2281 128 _null_ ));
DATA(insert ( 2642 n 0 int4_accum numeric_var_samp 0 2281 128 _null_ ));
DATA(insert ( 2643 n 0 int2_accum numeric_var_samp 0 2281 128 _null_ ));
DATA(insert ( 2644 n 0 float4_accum float8_var_samp 0 1022 0 "{0,0,0}" ));
DATA(insert ( 2645 n 0 float8_accum float8_var_samp 0 1022 0 "{0,0,0}" ));
DATA(insert ( 2646 n 0 numeric_accum numeric_var_samp 0 2281 128 _null_ ));
DATA(insert ( 2641 n 0 int8_accum numeric_var_samp - - - 0 2281 128 0 0 _null_ _null_ ));
DATA(insert ( 2642 n 0 int4_accum numeric_var_samp - - - 0 2281 128 0 0 _null_ _null_ ));
DATA(insert ( 2643 n 0 int2_accum numeric_var_samp - - - 0 2281 128 0 0 _null_ _null_ ));
DATA(insert ( 2644 n 0 float4_accum float8_var_samp - - - 0 1022 0 0 0 "{0,0,0}" _null_ ));
DATA(insert ( 2645 n 0 float8_accum float8_var_samp - - - 0 1022 0 0 0 "{0,0,0}" _null_ ));
DATA(insert ( 2646 n 0 numeric_accum numeric_var_samp - - - 0 2281 128 0 0 _null_ _null_ ));
/* variance: historical Postgres syntax for var_samp */
DATA(insert ( 2148 n 0 int8_accum numeric_var_samp 0 2281 128 _null_ ));
DATA(insert ( 2149 n 0 int4_accum numeric_var_samp 0 2281 128 _null_ ));
DATA(insert ( 2150 n 0 int2_accum numeric_var_samp 0 2281 128 _null_ ));
DATA(insert ( 2151 n 0 float4_accum float8_var_samp 0 1022 0 "{0,0,0}" ));
DATA(insert ( 2152 n 0 float8_accum float8_var_samp 0 1022 0 "{0,0,0}" ));
DATA(insert ( 2153 n 0 numeric_accum numeric_var_samp 0 2281 128 _null_ ));
DATA(insert ( 2148 n 0 int8_accum numeric_var_samp - - - 0 2281 128 0 0 _null_ _null_ ));
DATA(insert ( 2149 n 0 int4_accum numeric_var_samp - - - 0 2281 128 0 0 _null_ _null_ ));
DATA(insert ( 2150 n 0 int2_accum numeric_var_samp - - - 0 2281 128 0 0 _null_ _null_ ));
DATA(insert ( 2151 n 0 float4_accum float8_var_samp - - - 0 1022 0 0 0 "{0,0,0}" _null_ ));
DATA(insert ( 2152 n 0 float8_accum float8_var_samp - - - 0 1022 0 0 0 "{0,0,0}" _null_ ));
DATA(insert ( 2153 n 0 numeric_accum numeric_var_samp - - - 0 2281 128 0 0 _null_ _null_ ));
/* stddev_pop */
DATA(insert ( 2724 n 0 int8_accum numeric_stddev_pop 0 2281 128 _null_ ));
DATA(insert ( 2725 n 0 int4_accum numeric_stddev_pop 0 2281 128 _null_ ));
DATA(insert ( 2726 n 0 int2_accum numeric_stddev_pop 0 2281 128 _null_ ));
DATA(insert ( 2727 n 0 float4_accum float8_stddev_pop 0 1022 0 "{0,0,0}" ));
DATA(insert ( 2728 n 0 float8_accum float8_stddev_pop 0 1022 0 "{0,0,0}" ));
DATA(insert ( 2729 n 0 numeric_accum numeric_stddev_pop 0 2281 128 _null_ ));
DATA(insert ( 2724 n 0 int8_accum numeric_stddev_pop - - - 0 2281 128 0 0 _null_ _null_ ));
DATA(insert ( 2725 n 0 int4_accum numeric_stddev_pop - - - 0 2281 128 0 0 _null_ _null_ ));
DATA(insert ( 2726 n 0 int2_accum numeric_stddev_pop - - - 0 2281 128 0 0 _null_ _null_ ));
DATA(insert ( 2727 n 0 float4_accum float8_stddev_pop - - - 0 1022 0 0 0 "{0,0,0}" _null_ ));
DATA(insert ( 2728 n 0 float8_accum float8_stddev_pop - - - 0 1022 0 0 0 "{0,0,0}" _null_ ));
DATA(insert ( 2729 n 0 numeric_accum numeric_stddev_pop - - - 0 2281 128 0 0 _null_ _null_ ));
/* stddev_samp */
DATA(insert ( 2712 n 0 int8_accum numeric_stddev_samp 0 2281 128 _null_ ));
DATA(insert ( 2713 n 0 int4_accum numeric_stddev_samp 0 2281 128 _null_ ));
DATA(insert ( 2714 n 0 int2_accum numeric_stddev_samp 0 2281 128 _null_ ));
DATA(insert ( 2715 n 0 float4_accum float8_stddev_samp 0 1022 0 "{0,0,0}" ));
DATA(insert ( 2716 n 0 float8_accum float8_stddev_samp 0 1022 0 "{0,0,0}" ));
DATA(insert ( 2717 n 0 numeric_accum numeric_stddev_samp 0 2281 128 _null_ ));
DATA(insert ( 2712 n 0 int8_accum numeric_stddev_samp - - - 0 2281 128 0 0 _null_ _null_ ));
DATA(insert ( 2713 n 0 int4_accum numeric_stddev_samp - - - 0 2281 128 0 0 _null_ _null_ ));
DATA(insert ( 2714 n 0 int2_accum numeric_stddev_samp - - - 0 2281 128 0 0 _null_ _null_ ));
DATA(insert ( 2715 n 0 float4_accum float8_stddev_samp - - - 0 1022 0 0 0 "{0,0,0}" _null_ ));
DATA(insert ( 2716 n 0 float8_accum float8_stddev_samp - - - 0 1022 0 0 0 "{0,0,0}" _null_ ));
DATA(insert ( 2717 n 0 numeric_accum numeric_stddev_samp - - - 0 2281 128 0 0 _null_ _null_ ));
/* stddev: historical Postgres syntax for stddev_samp */
DATA(insert ( 2154 n 0 int8_accum numeric_stddev_samp 0 2281 128 _null_ ));
DATA(insert ( 2155 n 0 int4_accum numeric_stddev_samp 0 2281 128 _null_ ));
DATA(insert ( 2156 n 0 int2_accum numeric_stddev_samp 0 2281 128 _null_ ));
DATA(insert ( 2157 n 0 float4_accum float8_stddev_samp 0 1022 0 "{0,0,0}" ));
DATA(insert ( 2158 n 0 float8_accum float8_stddev_samp 0 1022 0 "{0,0,0}" ));
DATA(insert ( 2159 n 0 numeric_accum numeric_stddev_samp 0 2281 128 _null_ ));
DATA(insert ( 2154 n 0 int8_accum numeric_stddev_samp - - - 0 2281 128 0 0 _null_ _null_ ));
DATA(insert ( 2155 n 0 int4_accum numeric_stddev_samp - - - 0 2281 128 0 0 _null_ _null_ ));
DATA(insert ( 2156 n 0 int2_accum numeric_stddev_samp - - - 0 2281 128 0 0 _null_ _null_ ));
DATA(insert ( 2157 n 0 float4_accum float8_stddev_samp - - - 0 1022 0 0 0 "{0,0,0}" _null_ ));
DATA(insert ( 2158 n 0 float8_accum float8_stddev_samp - - - 0 1022 0 0 0 "{0,0,0}" _null_ ));
DATA(insert ( 2159 n 0 numeric_accum numeric_stddev_samp - - - 0 2281 128 0 0 _null_ _null_ ));
/* SQL2003 binary regression aggregates */
DATA(insert ( 2818 n 0 int8inc_float8_float8 - 0 20 0 "0" ));
DATA(insert ( 2819 n 0 float8_regr_accum float8_regr_sxx 0 1022 0 "{0,0,0,0,0,0}" ));
DATA(insert ( 2820 n 0 float8_regr_accum float8_regr_syy 0 1022 0 "{0,0,0,0,0,0}" ));
DATA(insert ( 2821 n 0 float8_regr_accum float8_regr_sxy 0 1022 0 "{0,0,0,0,0,0}" ));
DATA(insert ( 2822 n 0 float8_regr_accum float8_regr_avgx 0 1022 0 "{0,0,0,0,0,0}" ));
DATA(insert ( 2823 n 0 float8_regr_accum float8_regr_avgy 0 1022 0 "{0,0,0,0,0,0}" ));
DATA(insert ( 2824 n 0 float8_regr_accum float8_regr_r2 0 1022 0 "{0,0,0,0,0,0}" ));
DATA(insert ( 2825 n 0 float8_regr_accum float8_regr_slope 0 1022 0 "{0,0,0,0,0,0}" ));
DATA(insert ( 2826 n 0 float8_regr_accum float8_regr_intercept 0 1022 0 "{0,0,0,0,0,0}" ));
DATA(insert ( 2827 n 0 float8_regr_accum float8_covar_pop 0 1022 0 "{0,0,0,0,0,0}" ));
DATA(insert ( 2828 n 0 float8_regr_accum float8_covar_samp 0 1022 0 "{0,0,0,0,0,0}" ));
DATA(insert ( 2829 n 0 float8_regr_accum float8_corr 0 1022 0 "{0,0,0,0,0,0}" ));
DATA(insert ( 2818 n 0 int8inc_float8_float8 - - - - 0 20 0 0 0 "0" _null_ ));
DATA(insert ( 2819 n 0 float8_regr_accum float8_regr_sxx - - - 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ ));
DATA(insert ( 2820 n 0 float8_regr_accum float8_regr_syy - - - 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ ));
DATA(insert ( 2821 n 0 float8_regr_accum float8_regr_sxy - - - 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ ));
DATA(insert ( 2822 n 0 float8_regr_accum float8_regr_avgx - - - 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ ));
DATA(insert ( 2823 n 0 float8_regr_accum float8_regr_avgy - - - 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ ));
DATA(insert ( 2824 n 0 float8_regr_accum float8_regr_r2 - - - 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ ));
DATA(insert ( 2825 n 0 float8_regr_accum float8_regr_slope - - - 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ ));
DATA(insert ( 2826 n 0 float8_regr_accum float8_regr_intercept - - - 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ ));
DATA(insert ( 2827 n 0 float8_regr_accum float8_covar_pop - - - 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ ));
DATA(insert ( 2828 n 0 float8_regr_accum float8_covar_samp - - - 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ ));
DATA(insert ( 2829 n 0 float8_regr_accum float8_corr - - - 0 1022 0 0 0 "{0,0,0,0,0,0}" _null_ ));
/* boolean-and and boolean-or */
DATA(insert ( 2517 n 0 booland_statefunc - 58 16 0 _null_ ));
DATA(insert ( 2518 n 0 boolor_statefunc - 59 16 0 _null_ ));
DATA(insert ( 2519 n 0 booland_statefunc - 58 16 0 _null_ ));
DATA(insert ( 2517 n 0 booland_statefunc - - - - 58 16 0 0 0 _null_ _null_ ));
DATA(insert ( 2518 n 0 boolor_statefunc - - - - 59 16 0 0 0 _null_ _null_ ));
DATA(insert ( 2519 n 0 booland_statefunc - - - - 58 16 0 0 0 _null_ _null_ ));
/* bitwise integer */
DATA(insert ( 2236 n 0 int2and - 0 21 0 _null_ ));
DATA(insert ( 2237 n 0 int2or - 0 21 0 _null_ ));
DATA(insert ( 2238 n 0 int4and - 0 23 0 _null_ ));
DATA(insert ( 2239 n 0 int4or - 0 23 0 _null_ ));
DATA(insert ( 2240 n 0 int8and - 0 20 0 _null_ ));
DATA(insert ( 2241 n 0 int8or - 0 20 0 _null_ ));
DATA(insert ( 2242 n 0 bitand - 0 1560 0 _null_ ));
DATA(insert ( 2243 n 0 bitor - 0 1560 0 _null_ ));
DATA(insert ( 2236 n 0 int2and - - - - 0 21 0 0 0 _null_ _null_ ));
DATA(insert ( 2237 n 0 int2or - - - - 0 21 0 0 0 _null_ _null_ ));
DATA(insert ( 2238 n 0 int4and - - - - 0 23 0 0 0 _null_ _null_ ));
DATA(insert ( 2239 n 0 int4or - - - - 0 23 0 0 0 _null_ _null_ ));
DATA(insert ( 2240 n 0 int8and - - - - 0 20 0 0 0 _null_ _null_ ));
DATA(insert ( 2241 n 0 int8or - - - - 0 20 0 0 0 _null_ _null_ ));
DATA(insert ( 2242 n 0 bitand - - - - 0 1560 0 0 0 _null_ _null_ ));
DATA(insert ( 2243 n 0 bitor - - - - 0 1560 0 0 0 _null_ _null_ ));
/* xml */
DATA(insert ( 2901 n 0 xmlconcat2 - 0 142 0 _null_ ));
DATA(insert ( 2901 n 0 xmlconcat2 - - - - 0 142 0 0 0 _null_ _null_ ));
/* array */
DATA(insert ( 2335 n 0 array_agg_transfn array_agg_finalfn 0 2281 0 _null_ ));
DATA(insert ( 2335 n 0 array_agg_transfn array_agg_finalfn - - - 0 2281 0 0 0 _null_ _null_ ));
/* text */
DATA(insert ( 3538 n 0 string_agg_transfn string_agg_finalfn 0 2281 0 _null_ ));
DATA(insert ( 3538 n 0 string_agg_transfn string_agg_finalfn - - - 0 2281 0 0 0 _null_ _null_ ));
/* bytea */
DATA(insert ( 3545 n 0 bytea_string_agg_transfn bytea_string_agg_finalfn 0 2281 0 _null_ ));
DATA(insert ( 3545 n 0 bytea_string_agg_transfn bytea_string_agg_finalfn - - - 0 2281 0 0 0 _null_ _null_ ));
/* json */
DATA(insert ( 3175 n 0 json_agg_transfn json_agg_finalfn 0 2281 0 _null_ ));
DATA(insert ( 3197 n 0 json_object_agg_transfn json_object_agg_finalfn 0 2281 0 _null_ ));
DATA(insert ( 3175 n 0 json_agg_transfn json_agg_finalfn - - - 0 2281 0 0 0 _null_ _null_ ));
DATA(insert ( 3197 n 0 json_object_agg_transfn json_object_agg_finalfn - - - 0 2281 0 0 0 _null_ _null_ ));
/* ordered-set and hypothetical-set aggregates */
DATA(insert ( 3972 o 1 ordered_set_transition percentile_disc_final 0 2281 0 _null_ ));
DATA(insert ( 3974 o 1 ordered_set_transition percentile_cont_float8_final 0 2281 0 _null_ ));
DATA(insert ( 3976 o 1 ordered_set_transition percentile_cont_interval_final 0 2281 0 _null_ ));
DATA(insert ( 3978 o 1 ordered_set_transition percentile_disc_multi_final 0 2281 0 _null_ ));
DATA(insert ( 3980 o 1 ordered_set_transition percentile_cont_float8_multi_final 0 2281 0 _null_ ));
DATA(insert ( 3982 o 1 ordered_set_transition percentile_cont_interval_multi_final 0 2281 0 _null_ ));
DATA(insert ( 3984 o 0 ordered_set_transition mode_final 0 2281 0 _null_ ));
DATA(insert ( 3986 h 1 ordered_set_transition_multi rank_final 0 2281 0 _null_ ));
DATA(insert ( 3988 h 1 ordered_set_transition_multi percent_rank_final 0 2281 0 _null_ ));
DATA(insert ( 3990 h 1 ordered_set_transition_multi cume_dist_final 0 2281 0 _null_ ));
DATA(insert ( 3992 h 1 ordered_set_transition_multi dense_rank_final 0 2281 0 _null_ ));
DATA(insert ( 3972 o 1 ordered_set_transition percentile_disc_final - - - 0 2281 0 0 0 _null_ _null_ ));
DATA(insert ( 3974 o 1 ordered_set_transition percentile_cont_float8_final - - - 0 2281 0 0 0 _null_ _null_ ));
DATA(insert ( 3976 o 1 ordered_set_transition percentile_cont_interval_final - - - 0 2281 0 0 0 _null_ _null_ ));
DATA(insert ( 3978 o 1 ordered_set_transition percentile_disc_multi_final - - - 0 2281 0 0 0 _null_ _null_ ));
DATA(insert ( 3980 o 1 ordered_set_transition percentile_cont_float8_multi_final - - - 0 2281 0 0 0 _null_ _null_ ));
DATA(insert ( 3982 o 1 ordered_set_transition percentile_cont_interval_multi_final - - - 0 2281 0 0 0 _null_ _null_ ));
DATA(insert ( 3984 o 0 ordered_set_transition mode_final - - - 0 2281 0 0 0 _null_ _null_ ));
DATA(insert ( 3986 h 1 ordered_set_transition_multi rank_final - - - 0 2281 0 0 0 _null_ _null_ ));
DATA(insert ( 3988 h 1 ordered_set_transition_multi percent_rank_final - - - 0 2281 0 0 0 _null_ _null_ ));
DATA(insert ( 3990 h 1 ordered_set_transition_multi cume_dist_final - - - 0 2281 0 0 0 _null_ _null_ ));
DATA(insert ( 3992 h 1 ordered_set_transition_multi dense_rank_final - - - 0 2281 0 0 0 _null_ _null_ ));
/*
@ -290,9 +308,15 @@ extern Oid AggregateCreate(const char *aggName,
Oid variadicArgType,
List *aggtransfnName,
List *aggfinalfnName,
List *aggmtransfnName,
List *aggminvtransfnName,
List *aggmfinalfnName,
List *aggsortopName,
Oid aggTransType,
int32 aggTransSpace,
const char *agginitval);
Oid aggmTransType,
int32 aggmTransSpace,
const char *agginitval,
const char *aggminitval);
#endif /* PG_AGGREGATE_H */

View File

@ -1762,7 +1762,8 @@ typedef struct WindowAggState
Datum endOffsetValue; /* result of endOffset evaluation */
MemoryContext partcontext; /* context for partition-lifespan data */
MemoryContext aggcontext; /* context for each aggregate data */
MemoryContext aggcontext; /* shared context for aggregate working data */
MemoryContext curaggcontext; /* current aggregate's working data */
ExprContext *tmpcontext; /* short-term evaluation context */
bool all_first; /* true if the scan is starting */

View File

@ -39,8 +39,10 @@ extern void build_aggregate_fnexprs(Oid *agg_input_types,
Oid agg_result_type,
Oid agg_input_collation,
Oid transfn_oid,
Oid invtransfn_oid,
Oid finalfn_oid,
Expr **transfnexpr,
Expr **invtransfnexpr,
Expr **finalfnexpr);
#endif /* PARSE_AGG_H */

View File

@ -90,3 +90,38 @@ alter aggregate my_rank(VARIADIC "any" ORDER BY VARIADIC "any")
public | test_rank | bigint | VARIADIC "any" ORDER BY VARIADIC "any" |
(2 rows)
-- moving-aggregate options
CREATE AGGREGATE sumdouble (float8)
(
stype = float8,
sfunc = float8pl,
mstype = float8,
msfunc = float8pl,
minvfunc = float8mi
);
-- invalid: nonstrict inverse with strict forward function
CREATE FUNCTION float8mi_n(float8, float8) RETURNS float8 AS
$$ SELECT $1 - $2; $$
LANGUAGE SQL;
CREATE AGGREGATE invalidsumdouble (float8)
(
stype = float8,
sfunc = float8pl,
mstype = float8,
msfunc = float8pl,
minvfunc = float8mi_n
);
ERROR: strictness of aggregate's forward and inverse transition functions must match
-- invalid: non-matching result types
CREATE FUNCTION float8mi_int(float8, float8) RETURNS int AS
$$ SELECT CAST($1 - $2 AS INT); $$
LANGUAGE SQL;
CREATE AGGREGATE wrongreturntype (float8)
(
stype = float8,
sfunc = float8pl,
mstype = float8,
msfunc = float8pl,
minvfunc = float8mi_int
);
ERROR: return type of inverse transition function float8mi_int is not double precision

View File

@ -735,7 +735,7 @@ WHERE aggfnoid = 0 OR aggtransfn = 0 OR
aggkind NOT IN ('n', 'o', 'h') OR
aggnumdirectargs < 0 OR
(aggkind = 'n' AND aggnumdirectargs > 0) OR
aggtranstype = 0 OR aggtransspace < 0;
aggtranstype = 0 OR aggtransspace < 0 OR aggmtransspace < 0;
ctid | aggfnoid
------+----------
(0 rows)
@ -827,6 +827,126 @@ WHERE a.aggfnoid = p.oid AND
----------+---------+-----+---------
(0 rows)
-- Check for inconsistent specifications of moving-aggregate columns.
SELECT ctid, aggfnoid::oid
FROM pg_aggregate as p1
WHERE aggmtranstype != 0 AND
(aggmtransfn = 0 OR aggminvtransfn = 0);
ctid | aggfnoid
------+----------
(0 rows)
SELECT ctid, aggfnoid::oid
FROM pg_aggregate as p1
WHERE aggmtranstype = 0 AND
(aggmtransfn != 0 OR aggminvtransfn != 0 OR aggmfinalfn != 0 OR
aggmtransspace != 0 OR aggminitval IS NOT NULL);
ctid | aggfnoid
------+----------
(0 rows)
-- If there is no mfinalfn then the output type must be the mtranstype.
SELECT a.aggfnoid::oid, p.proname
FROM pg_aggregate as a, pg_proc as p
WHERE a.aggfnoid = p.oid AND
a.aggmtransfn != 0 AND
a.aggmfinalfn = 0 AND p.prorettype != a.aggmtranstype;
aggfnoid | proname
----------+---------
(0 rows)
-- Cross-check mtransfn (if present) against its entry in pg_proc.
SELECT a.aggfnoid::oid, p.proname, ptr.oid, ptr.proname
FROM pg_aggregate AS a, pg_proc AS p, pg_proc AS ptr
WHERE a.aggfnoid = p.oid AND
a.aggmtransfn = ptr.oid AND
(ptr.proretset
OR NOT (ptr.pronargs =
CASE WHEN a.aggkind = 'n' THEN p.pronargs + 1
ELSE greatest(p.pronargs - a.aggnumdirectargs, 1) + 1 END)
OR NOT physically_coercible(ptr.prorettype, a.aggmtranstype)
OR NOT physically_coercible(a.aggmtranstype, ptr.proargtypes[0])
OR (p.pronargs > 0 AND
NOT physically_coercible(p.proargtypes[0], ptr.proargtypes[1]))
OR (p.pronargs > 1 AND
NOT physically_coercible(p.proargtypes[1], ptr.proargtypes[2]))
OR (p.pronargs > 2 AND
NOT physically_coercible(p.proargtypes[2], ptr.proargtypes[3]))
-- we could carry the check further, but 3 args is enough for now
);
aggfnoid | proname | oid | proname
----------+---------+-----+---------
(0 rows)
-- Cross-check minvtransfn (if present) against its entry in pg_proc.
SELECT a.aggfnoid::oid, p.proname, ptr.oid, ptr.proname
FROM pg_aggregate AS a, pg_proc AS p, pg_proc AS ptr
WHERE a.aggfnoid = p.oid AND
a.aggminvtransfn = ptr.oid AND
(ptr.proretset
OR NOT (ptr.pronargs =
CASE WHEN a.aggkind = 'n' THEN p.pronargs + 1
ELSE greatest(p.pronargs - a.aggnumdirectargs, 1) + 1 END)
OR NOT physically_coercible(ptr.prorettype, a.aggmtranstype)
OR NOT physically_coercible(a.aggmtranstype, ptr.proargtypes[0])
OR (p.pronargs > 0 AND
NOT physically_coercible(p.proargtypes[0], ptr.proargtypes[1]))
OR (p.pronargs > 1 AND
NOT physically_coercible(p.proargtypes[1], ptr.proargtypes[2]))
OR (p.pronargs > 2 AND
NOT physically_coercible(p.proargtypes[2], ptr.proargtypes[3]))
-- we could carry the check further, but 3 args is enough for now
);
aggfnoid | proname | oid | proname
----------+---------+-----+---------
(0 rows)
-- Cross-check mfinalfn (if present) against its entry in pg_proc.
SELECT a.aggfnoid::oid, p.proname, pfn.oid, pfn.proname
FROM pg_aggregate AS a, pg_proc AS p, pg_proc AS pfn
WHERE a.aggfnoid = p.oid AND
a.aggmfinalfn = pfn.oid AND
(pfn.proretset OR
NOT binary_coercible(pfn.prorettype, p.prorettype) OR
NOT binary_coercible(a.aggmtranstype, pfn.proargtypes[0]) OR
CASE WHEN a.aggkind = 'n' THEN pfn.pronargs != 1
ELSE pfn.pronargs != p.pronargs + 1
OR (p.pronargs > 0 AND
NOT binary_coercible(p.proargtypes[0], pfn.proargtypes[1]))
OR (p.pronargs > 1 AND
NOT binary_coercible(p.proargtypes[1], pfn.proargtypes[2]))
OR (p.pronargs > 2 AND
NOT binary_coercible(p.proargtypes[2], pfn.proargtypes[3]))
-- we could carry the check further, but 3 args is enough for now
END);
aggfnoid | proname | oid | proname
----------+---------+-----+---------
(0 rows)
-- If mtransfn is strict then either minitval should be non-NULL, or
-- input type should match mtranstype so that the first non-null input
-- can be assigned as the state value.
SELECT a.aggfnoid::oid, p.proname, ptr.oid, ptr.proname
FROM pg_aggregate AS a, pg_proc AS p, pg_proc AS ptr
WHERE a.aggfnoid = p.oid AND
a.aggmtransfn = ptr.oid AND ptr.proisstrict AND
a.aggminitval IS NULL AND
NOT binary_coercible(p.proargtypes[0], a.aggmtranstype);
aggfnoid | proname | oid | proname
----------+---------+-----+---------
(0 rows)
-- transfn and mtransfn should have same strictness setting.
SELECT a.aggfnoid::oid, p.proname, ptr.oid, ptr.proname, mptr.oid, mptr.proname
FROM pg_aggregate AS a, pg_proc AS p, pg_proc AS ptr, pg_proc AS mptr
WHERE a.aggfnoid = p.oid AND
a.aggtransfn = ptr.oid AND
a.aggmtransfn = mptr.oid AND
ptr.proisstrict != mptr.proisstrict;
aggfnoid | proname | oid | proname | oid | proname
----------+---------+-----+---------+-----+---------
(0 rows)
-- Cross-check aggsortop (if present) against pg_operator.
-- We expect to find entries for bool_and, bool_or, every, max, and min.
SELECT DISTINCT proname, oprname

View File

@ -1071,3 +1071,226 @@ SELECT nth_value_def(ten) OVER (PARTITION BY four), ten, four
1 | 3 | 3
(10 rows)
--
-- Test the basic moving-aggregate machinery
--
-- create aggregates that record the series of transform calls (these are
-- intentionally not true inverses)
CREATE FUNCTION logging_sfunc_nonstrict(text, anyelement) RETURNS text AS
$$ SELECT COALESCE($1, '') || '*' || quote_nullable($2) $$
LANGUAGE SQL IMMUTABLE;
CREATE FUNCTION logging_msfunc_nonstrict(text, anyelement) RETURNS text AS
$$ SELECT COALESCE($1, '') || '+' || quote_nullable($2) $$
LANGUAGE SQL IMMUTABLE;
CREATE FUNCTION logging_minvfunc_nonstrict(text, anyelement) RETURNS text AS
$$ SELECT $1 || '-' || quote_nullable($2) $$
LANGUAGE SQL IMMUTABLE;
CREATE AGGREGATE logging_agg_nonstrict (anyelement)
(
stype = text,
sfunc = logging_sfunc_nonstrict,
mstype = text,
msfunc = logging_msfunc_nonstrict,
minvfunc = logging_minvfunc_nonstrict
);
CREATE AGGREGATE logging_agg_nonstrict_initcond (anyelement)
(
stype = text,
sfunc = logging_sfunc_nonstrict,
mstype = text,
msfunc = logging_msfunc_nonstrict,
minvfunc = logging_minvfunc_nonstrict,
initcond = 'I',
minitcond = 'MI'
);
CREATE FUNCTION logging_sfunc_strict(text, anyelement) RETURNS text AS
$$ SELECT $1 || '*' || quote_nullable($2) $$
LANGUAGE SQL STRICT IMMUTABLE;
CREATE FUNCTION logging_msfunc_strict(text, anyelement) RETURNS text AS
$$ SELECT $1 || '+' || quote_nullable($2) $$
LANGUAGE SQL STRICT IMMUTABLE;
CREATE FUNCTION logging_minvfunc_strict(text, anyelement) RETURNS text AS
$$ SELECT $1 || '-' || quote_nullable($2) $$
LANGUAGE SQL STRICT IMMUTABLE;
CREATE AGGREGATE logging_agg_strict (text)
(
stype = text,
sfunc = logging_sfunc_strict,
mstype = text,
msfunc = logging_msfunc_strict,
minvfunc = logging_minvfunc_strict
);
CREATE AGGREGATE logging_agg_strict_initcond (anyelement)
(
stype = text,
sfunc = logging_sfunc_strict,
mstype = text,
msfunc = logging_msfunc_strict,
minvfunc = logging_minvfunc_strict,
initcond = 'I',
minitcond = 'MI'
);
-- test strict and non-strict cases
SELECT
p::text || ',' || i::text || ':' || COALESCE(v::text, 'NULL') AS row,
logging_agg_nonstrict(v) over wnd as nstrict,
logging_agg_nonstrict_initcond(v) over wnd as nstrict_init,
logging_agg_strict(v::text) over wnd as strict,
logging_agg_strict_initcond(v) over wnd as strict_init
FROM (VALUES
(1, 1, NULL),
(1, 2, 'a'),
(1, 3, 'b'),
(1, 4, NULL),
(1, 5, NULL),
(1, 6, 'c'),
(2, 1, NULL),
(2, 2, 'x'),
(3, 1, 'z')
) AS t(p, i, v)
WINDOW wnd AS (PARTITION BY P ORDER BY i ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)
ORDER BY p, i;
row | nstrict | nstrict_init | strict | strict_init
----------+-----------------------------------------------+-------------------------------------------------+-----------+----------------
1,1:NULL | +NULL | MI+NULL | | MI
1,2:a | +NULL+'a' | MI+NULL+'a' | a | MI+'a'
1,3:b | +NULL+'a'-NULL+'b' | MI+NULL+'a'-NULL+'b' | a+'b' | MI+'a'+'b'
1,4:NULL | +NULL+'a'-NULL+'b'-'a'+NULL | MI+NULL+'a'-NULL+'b'-'a'+NULL | a+'b'-'a' | MI+'a'+'b'-'a'
1,5:NULL | +NULL+'a'-NULL+'b'-'a'+NULL-'b'+NULL | MI+NULL+'a'-NULL+'b'-'a'+NULL-'b'+NULL | | MI
1,6:c | +NULL+'a'-NULL+'b'-'a'+NULL-'b'+NULL-NULL+'c' | MI+NULL+'a'-NULL+'b'-'a'+NULL-'b'+NULL-NULL+'c' | c | MI+'c'
2,1:NULL | +NULL | MI+NULL | | MI
2,2:x | +NULL+'x' | MI+NULL+'x' | x | MI+'x'
3,1:z | +'z' | MI+'z' | z | MI+'z'
(9 rows)
-- and again, but with filter
SELECT
p::text || ',' || i::text || ':' ||
CASE WHEN f THEN COALESCE(v::text, 'NULL') ELSE '-' END as row,
logging_agg_nonstrict(v) filter(where f) over wnd as nstrict_filt,
logging_agg_nonstrict_initcond(v) filter(where f) over wnd as nstrict_init_filt,
logging_agg_strict(v::text) filter(where f) over wnd as strict_filt,
logging_agg_strict_initcond(v) filter(where f) over wnd as strict_init_filt
FROM (VALUES
(1, 1, true, NULL),
(1, 2, false, 'a'),
(1, 3, true, 'b'),
(1, 4, false, NULL),
(1, 5, false, NULL),
(1, 6, false, 'c'),
(2, 1, false, NULL),
(2, 2, true, 'x'),
(3, 1, true, 'z')
) AS t(p, i, f, v)
WINDOW wnd AS (PARTITION BY p ORDER BY i ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)
ORDER BY p, i;
row | nstrict_filt | nstrict_init_filt | strict_filt | strict_init_filt
----------+--------------+-------------------+-------------+------------------
1,1:NULL | +NULL | MI+NULL | | MI
1,2:- | +NULL | MI+NULL | | MI
1,3:b | +'b' | MI+'b' | b | MI+'b'
1,4:- | +'b' | MI+'b' | b | MI+'b'
1,5:- | | MI | | MI
1,6:- | | MI | | MI
2,1:- | | MI | | MI
2,2:x | +'x' | MI+'x' | x | MI+'x'
3,1:z | +'z' | MI+'z' | z | MI+'z'
(9 rows)
-- test that volatile arguments disable moving-aggregate mode
SELECT
i::text || ':' || COALESCE(v::text, 'NULL') as row,
logging_agg_strict(v::text)
over wnd as inverse,
logging_agg_strict(v::text || CASE WHEN random() < 0 then '?' ELSE '' END)
over wnd as noinverse
FROM (VALUES
(1, 'a'),
(2, 'b'),
(3, 'c')
) AS t(i, v)
WINDOW wnd AS (ORDER BY i ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)
ORDER BY i;
row | inverse | noinverse
-----+---------------+-----------
1:a | a | a
2:b | a+'b' | a*'b'
3:c | a+'b'-'a'+'c' | b*'c'
(3 rows)
SELECT
i::text || ':' || COALESCE(v::text, 'NULL') as row,
logging_agg_strict(v::text) filter(where true)
over wnd as inverse,
logging_agg_strict(v::text) filter(where random() >= 0)
over wnd as noinverse
FROM (VALUES
(1, 'a'),
(2, 'b'),
(3, 'c')
) AS t(i, v)
WINDOW wnd AS (ORDER BY i ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)
ORDER BY i;
row | inverse | noinverse
-----+---------------+-----------
1:a | a | a
2:b | a+'b' | a*'b'
3:c | a+'b'-'a'+'c' | b*'c'
(3 rows)
-- test that non-overlapping windows don't use inverse transitions
SELECT
logging_agg_strict(v::text) OVER wnd
FROM (VALUES
(1, 'a'),
(2, 'b'),
(3, 'c')
) AS t(i, v)
WINDOW wnd AS (ORDER BY i ROWS BETWEEN CURRENT ROW AND CURRENT ROW)
ORDER BY i;
logging_agg_strict
--------------------
a
b
c
(3 rows)
-- test that returning NULL from the inverse transition functions
-- restarts the aggregation from scratch. The second aggregate is supposed
-- to test cases where only some aggregates restart, the third one checks
-- that one aggregate restarting doesn't cause others to restart.
CREATE FUNCTION sum_int_randrestart_minvfunc(int4, int4) RETURNS int4 AS
$$ SELECT CASE WHEN random() < 0.2 THEN NULL ELSE $1 - $2 END $$
LANGUAGE SQL STRICT;
CREATE AGGREGATE sum_int_randomrestart (int4)
(
stype = int4,
sfunc = int4pl,
mstype = int4,
msfunc = int4pl,
minvfunc = sum_int_randrestart_minvfunc
);
WITH
vs AS (
SELECT i, (random() * 100)::int4 AS v
FROM generate_series(1, 100) AS i
),
sum_following AS (
SELECT i, SUM(v) OVER
(ORDER BY i DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS s
FROM vs
)
SELECT DISTINCT
sum_following.s = sum_int_randomrestart(v) OVER fwd AS eq1,
-sum_following.s = sum_int_randomrestart(-v) OVER fwd AS eq2,
100*3+(vs.i-1)*3 = length(logging_agg_nonstrict(''::text) OVER fwd) AS eq3
FROM vs
JOIN sum_following ON sum_following.i = vs.i
WINDOW fwd AS (
ORDER BY vs.i ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING
);
eq1 | eq2 | eq3
-----+-----+-----
t | t | t
(1 row)

View File

@ -101,3 +101,44 @@ alter aggregate my_rank(VARIADIC "any" ORDER BY VARIADIC "any")
rename to test_rank;
\da test_*
-- moving-aggregate options
CREATE AGGREGATE sumdouble (float8)
(
stype = float8,
sfunc = float8pl,
mstype = float8,
msfunc = float8pl,
minvfunc = float8mi
);
-- invalid: nonstrict inverse with strict forward function
CREATE FUNCTION float8mi_n(float8, float8) RETURNS float8 AS
$$ SELECT $1 - $2; $$
LANGUAGE SQL;
CREATE AGGREGATE invalidsumdouble (float8)
(
stype = float8,
sfunc = float8pl,
mstype = float8,
msfunc = float8pl,
minvfunc = float8mi_n
);
-- invalid: non-matching result types
CREATE FUNCTION float8mi_int(float8, float8) RETURNS int AS
$$ SELECT CAST($1 - $2 AS INT); $$
LANGUAGE SQL;
CREATE AGGREGATE wrongreturntype (float8)
(
stype = float8,
sfunc = float8pl,
mstype = float8,
msfunc = float8pl,
minvfunc = float8mi_int
);

View File

@ -592,7 +592,7 @@ WHERE aggfnoid = 0 OR aggtransfn = 0 OR
aggkind NOT IN ('n', 'o', 'h') OR
aggnumdirectargs < 0 OR
(aggkind = 'n' AND aggnumdirectargs > 0) OR
aggtranstype = 0 OR aggtransspace < 0;
aggtranstype = 0 OR aggtransspace < 0 OR aggmtransspace < 0;
-- Make sure the matching pg_proc entry is sensible, too.
@ -668,6 +668,107 @@ WHERE a.aggfnoid = p.oid AND
a.agginitval IS NULL AND
NOT binary_coercible(p.proargtypes[0], a.aggtranstype);
-- Check for inconsistent specifications of moving-aggregate columns.
SELECT ctid, aggfnoid::oid
FROM pg_aggregate as p1
WHERE aggmtranstype != 0 AND
(aggmtransfn = 0 OR aggminvtransfn = 0);
SELECT ctid, aggfnoid::oid
FROM pg_aggregate as p1
WHERE aggmtranstype = 0 AND
(aggmtransfn != 0 OR aggminvtransfn != 0 OR aggmfinalfn != 0 OR
aggmtransspace != 0 OR aggminitval IS NOT NULL);
-- If there is no mfinalfn then the output type must be the mtranstype.
SELECT a.aggfnoid::oid, p.proname
FROM pg_aggregate as a, pg_proc as p
WHERE a.aggfnoid = p.oid AND
a.aggmtransfn != 0 AND
a.aggmfinalfn = 0 AND p.prorettype != a.aggmtranstype;
-- Cross-check mtransfn (if present) against its entry in pg_proc.
SELECT a.aggfnoid::oid, p.proname, ptr.oid, ptr.proname
FROM pg_aggregate AS a, pg_proc AS p, pg_proc AS ptr
WHERE a.aggfnoid = p.oid AND
a.aggmtransfn = ptr.oid AND
(ptr.proretset
OR NOT (ptr.pronargs =
CASE WHEN a.aggkind = 'n' THEN p.pronargs + 1
ELSE greatest(p.pronargs - a.aggnumdirectargs, 1) + 1 END)
OR NOT physically_coercible(ptr.prorettype, a.aggmtranstype)
OR NOT physically_coercible(a.aggmtranstype, ptr.proargtypes[0])
OR (p.pronargs > 0 AND
NOT physically_coercible(p.proargtypes[0], ptr.proargtypes[1]))
OR (p.pronargs > 1 AND
NOT physically_coercible(p.proargtypes[1], ptr.proargtypes[2]))
OR (p.pronargs > 2 AND
NOT physically_coercible(p.proargtypes[2], ptr.proargtypes[3]))
-- we could carry the check further, but 3 args is enough for now
);
-- Cross-check minvtransfn (if present) against its entry in pg_proc.
SELECT a.aggfnoid::oid, p.proname, ptr.oid, ptr.proname
FROM pg_aggregate AS a, pg_proc AS p, pg_proc AS ptr
WHERE a.aggfnoid = p.oid AND
a.aggminvtransfn = ptr.oid AND
(ptr.proretset
OR NOT (ptr.pronargs =
CASE WHEN a.aggkind = 'n' THEN p.pronargs + 1
ELSE greatest(p.pronargs - a.aggnumdirectargs, 1) + 1 END)
OR NOT physically_coercible(ptr.prorettype, a.aggmtranstype)
OR NOT physically_coercible(a.aggmtranstype, ptr.proargtypes[0])
OR (p.pronargs > 0 AND
NOT physically_coercible(p.proargtypes[0], ptr.proargtypes[1]))
OR (p.pronargs > 1 AND
NOT physically_coercible(p.proargtypes[1], ptr.proargtypes[2]))
OR (p.pronargs > 2 AND
NOT physically_coercible(p.proargtypes[2], ptr.proargtypes[3]))
-- we could carry the check further, but 3 args is enough for now
);
-- Cross-check mfinalfn (if present) against its entry in pg_proc.
SELECT a.aggfnoid::oid, p.proname, pfn.oid, pfn.proname
FROM pg_aggregate AS a, pg_proc AS p, pg_proc AS pfn
WHERE a.aggfnoid = p.oid AND
a.aggmfinalfn = pfn.oid AND
(pfn.proretset OR
NOT binary_coercible(pfn.prorettype, p.prorettype) OR
NOT binary_coercible(a.aggmtranstype, pfn.proargtypes[0]) OR
CASE WHEN a.aggkind = 'n' THEN pfn.pronargs != 1
ELSE pfn.pronargs != p.pronargs + 1
OR (p.pronargs > 0 AND
NOT binary_coercible(p.proargtypes[0], pfn.proargtypes[1]))
OR (p.pronargs > 1 AND
NOT binary_coercible(p.proargtypes[1], pfn.proargtypes[2]))
OR (p.pronargs > 2 AND
NOT binary_coercible(p.proargtypes[2], pfn.proargtypes[3]))
-- we could carry the check further, but 3 args is enough for now
END);
-- If mtransfn is strict then either minitval should be non-NULL, or
-- input type should match mtranstype so that the first non-null input
-- can be assigned as the state value.
SELECT a.aggfnoid::oid, p.proname, ptr.oid, ptr.proname
FROM pg_aggregate AS a, pg_proc AS p, pg_proc AS ptr
WHERE a.aggfnoid = p.oid AND
a.aggmtransfn = ptr.oid AND ptr.proisstrict AND
a.aggminitval IS NULL AND
NOT binary_coercible(p.proargtypes[0], a.aggmtranstype);
-- transfn and mtransfn should have same strictness setting.
SELECT a.aggfnoid::oid, p.proname, ptr.oid, ptr.proname, mptr.oid, mptr.proname
FROM pg_aggregate AS a, pg_proc AS p, pg_proc AS ptr, pg_proc AS mptr
WHERE a.aggfnoid = p.oid AND
a.aggtransfn = ptr.oid AND
a.aggmtransfn = mptr.oid AND
ptr.proisstrict != mptr.proisstrict;
-- Cross-check aggsortop (if present) against pg_operator.
-- We expect to find entries for bool_and, bool_or, every, max, and min.

View File

@ -284,3 +284,195 @@ SELECT nth_value_def(n := 2, val := ten) OVER (PARTITION BY four), ten, four
SELECT nth_value_def(ten) OVER (PARTITION BY four), ten, four
FROM (SELECT * FROM tenk1 WHERE unique2 < 10 ORDER BY four, ten) s;
--
-- Test the basic moving-aggregate machinery
--
-- create aggregates that record the series of transform calls (these are
-- intentionally not true inverses)
CREATE FUNCTION logging_sfunc_nonstrict(text, anyelement) RETURNS text AS
$$ SELECT COALESCE($1, '') || '*' || quote_nullable($2) $$
LANGUAGE SQL IMMUTABLE;
CREATE FUNCTION logging_msfunc_nonstrict(text, anyelement) RETURNS text AS
$$ SELECT COALESCE($1, '') || '+' || quote_nullable($2) $$
LANGUAGE SQL IMMUTABLE;
CREATE FUNCTION logging_minvfunc_nonstrict(text, anyelement) RETURNS text AS
$$ SELECT $1 || '-' || quote_nullable($2) $$
LANGUAGE SQL IMMUTABLE;
CREATE AGGREGATE logging_agg_nonstrict (anyelement)
(
stype = text,
sfunc = logging_sfunc_nonstrict,
mstype = text,
msfunc = logging_msfunc_nonstrict,
minvfunc = logging_minvfunc_nonstrict
);
CREATE AGGREGATE logging_agg_nonstrict_initcond (anyelement)
(
stype = text,
sfunc = logging_sfunc_nonstrict,
mstype = text,
msfunc = logging_msfunc_nonstrict,
minvfunc = logging_minvfunc_nonstrict,
initcond = 'I',
minitcond = 'MI'
);
CREATE FUNCTION logging_sfunc_strict(text, anyelement) RETURNS text AS
$$ SELECT $1 || '*' || quote_nullable($2) $$
LANGUAGE SQL STRICT IMMUTABLE;
CREATE FUNCTION logging_msfunc_strict(text, anyelement) RETURNS text AS
$$ SELECT $1 || '+' || quote_nullable($2) $$
LANGUAGE SQL STRICT IMMUTABLE;
CREATE FUNCTION logging_minvfunc_strict(text, anyelement) RETURNS text AS
$$ SELECT $1 || '-' || quote_nullable($2) $$
LANGUAGE SQL STRICT IMMUTABLE;
CREATE AGGREGATE logging_agg_strict (text)
(
stype = text,
sfunc = logging_sfunc_strict,
mstype = text,
msfunc = logging_msfunc_strict,
minvfunc = logging_minvfunc_strict
);
CREATE AGGREGATE logging_agg_strict_initcond (anyelement)
(
stype = text,
sfunc = logging_sfunc_strict,
mstype = text,
msfunc = logging_msfunc_strict,
minvfunc = logging_minvfunc_strict,
initcond = 'I',
minitcond = 'MI'
);
-- test strict and non-strict cases
SELECT
p::text || ',' || i::text || ':' || COALESCE(v::text, 'NULL') AS row,
logging_agg_nonstrict(v) over wnd as nstrict,
logging_agg_nonstrict_initcond(v) over wnd as nstrict_init,
logging_agg_strict(v::text) over wnd as strict,
logging_agg_strict_initcond(v) over wnd as strict_init
FROM (VALUES
(1, 1, NULL),
(1, 2, 'a'),
(1, 3, 'b'),
(1, 4, NULL),
(1, 5, NULL),
(1, 6, 'c'),
(2, 1, NULL),
(2, 2, 'x'),
(3, 1, 'z')
) AS t(p, i, v)
WINDOW wnd AS (PARTITION BY P ORDER BY i ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)
ORDER BY p, i;
-- and again, but with filter
SELECT
p::text || ',' || i::text || ':' ||
CASE WHEN f THEN COALESCE(v::text, 'NULL') ELSE '-' END as row,
logging_agg_nonstrict(v) filter(where f) over wnd as nstrict_filt,
logging_agg_nonstrict_initcond(v) filter(where f) over wnd as nstrict_init_filt,
logging_agg_strict(v::text) filter(where f) over wnd as strict_filt,
logging_agg_strict_initcond(v) filter(where f) over wnd as strict_init_filt
FROM (VALUES
(1, 1, true, NULL),
(1, 2, false, 'a'),
(1, 3, true, 'b'),
(1, 4, false, NULL),
(1, 5, false, NULL),
(1, 6, false, 'c'),
(2, 1, false, NULL),
(2, 2, true, 'x'),
(3, 1, true, 'z')
) AS t(p, i, f, v)
WINDOW wnd AS (PARTITION BY p ORDER BY i ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)
ORDER BY p, i;
-- test that volatile arguments disable moving-aggregate mode
SELECT
i::text || ':' || COALESCE(v::text, 'NULL') as row,
logging_agg_strict(v::text)
over wnd as inverse,
logging_agg_strict(v::text || CASE WHEN random() < 0 then '?' ELSE '' END)
over wnd as noinverse
FROM (VALUES
(1, 'a'),
(2, 'b'),
(3, 'c')
) AS t(i, v)
WINDOW wnd AS (ORDER BY i ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)
ORDER BY i;
SELECT
i::text || ':' || COALESCE(v::text, 'NULL') as row,
logging_agg_strict(v::text) filter(where true)
over wnd as inverse,
logging_agg_strict(v::text) filter(where random() >= 0)
over wnd as noinverse
FROM (VALUES
(1, 'a'),
(2, 'b'),
(3, 'c')
) AS t(i, v)
WINDOW wnd AS (ORDER BY i ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)
ORDER BY i;
-- test that non-overlapping windows don't use inverse transitions
SELECT
logging_agg_strict(v::text) OVER wnd
FROM (VALUES
(1, 'a'),
(2, 'b'),
(3, 'c')
) AS t(i, v)
WINDOW wnd AS (ORDER BY i ROWS BETWEEN CURRENT ROW AND CURRENT ROW)
ORDER BY i;
-- test that returning NULL from the inverse transition functions
-- restarts the aggregation from scratch. The second aggregate is supposed
-- to test cases where only some aggregates restart, the third one checks
-- that one aggregate restarting doesn't cause others to restart.
CREATE FUNCTION sum_int_randrestart_minvfunc(int4, int4) RETURNS int4 AS
$$ SELECT CASE WHEN random() < 0.2 THEN NULL ELSE $1 - $2 END $$
LANGUAGE SQL STRICT;
CREATE AGGREGATE sum_int_randomrestart (int4)
(
stype = int4,
sfunc = int4pl,
mstype = int4,
msfunc = int4pl,
minvfunc = sum_int_randrestart_minvfunc
);
WITH
vs AS (
SELECT i, (random() * 100)::int4 AS v
FROM generate_series(1, 100) AS i
),
sum_following AS (
SELECT i, SUM(v) OVER
(ORDER BY i DESC ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS s
FROM vs
)
SELECT DISTINCT
sum_following.s = sum_int_randomrestart(v) OVER fwd AS eq1,
-sum_following.s = sum_int_randomrestart(-v) OVER fwd AS eq2,
100*3+(vs.i-1)*3 = length(logging_agg_nonstrict(''::text) OVER fwd) AS eq3
FROM vs
JOIN sum_following ON sum_following.i = vs.i
WINDOW fwd AS (
ORDER BY vs.i ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING
);