Page 1 of 1

Parallel aggregator

Posted: Mon Aug 20, 2018 5:14 pm
by perspicax
Is pipe-lining possible when aggregator stage is used. For instance, I have a job using aggregator, that sums up colD by grouping 3 input columns, ColA, ColB, ColC, so before aggregator start spitting the rows out, it waits for all the input rows to be read.

Is there a way to make aggregator stage producing output even before all the input data is read?

Posted: Mon Aug 20, 2018 7:02 pm
by chulett
Sort the data to support the aggregation and then assert that it is sorted in the stage. It will bust you if you lie, be forewarned. :wink:

Posted: Mon Aug 20, 2018 9:13 pm
by ray.wurlod
What Craig said.

The reason this "pipelines" the data is that the Aggregator stage does not have to build a table in memory to accumulate all the results. It can processing a single grouping key at a time, processing all the rows for that group (since the data are sorted by grouping key(s)), and immediately transfer that group result to the output line and free the memory that was being used to hold the results.

Thus Sort mode make minimal use of memory compared to Hash mode. Of course, some of that memory will be consumed sorting the data upstream of the Aggregator stage.

Posted: Tue Aug 21, 2018 1:58 pm
by perspicax
Thanks. It worked. Earlier, the sort order was different than order of group key column. I selected partition type 'Same'. I am not sure what it means. 'Auto' and 'Entire' didn't work. What is the recommended partition type?

Posted: Tue Aug 21, 2018 6:40 pm
by chulett
I'm not sure there's a "recommended" so much as a default one but others can address that. The partitioning types need to be something you become very familiar with unless you only ever run your jobs on a single node. The moment you open that up to more than one node, how the data is partitioned can make or break things. Simplest case is it runs longer that it should. Worst case, your output data is wrong or incomplete. Or the job goes boom. :wink:

Perhaps this will help.

Posted: Tue Aug 21, 2018 9:39 pm
by ray.wurlod
Same is recommended iff the upstream stage:
  • is executing in parallel mode
    is executing on the same nodes
    is partitioned using the correct algorithm
In the case of an Aggregator stage I would typically use a key-based partitioning algorithm (e.g. Hash or Modulus) on the first grouping key, and sort on all of the grouping keys in their correct order.

Posted: Mon Aug 27, 2018 10:41 am
by perspicax
Yes the job is executing on 6 nodes with all the stages running on the same nodes. In this eg, the upstream stage is a transformer stage. The input stage is a Oracle stage where partitioned read is enabled (with default rowid range).

The group is performed on Year(kind of redundant because we only process current year and target inserts into current year partition only), month, id1, id2...Id10. That forms the grain of the target table. So Year is the first grouping key and sorting is in the same as grouping key order.

I did get a significant performance improvement with 'same' partition, aggregating 26 M rows in less than 4 min. But when I specified 'Hash' or 'Modulus', I do not see the pipe-lining and the job takes long time to complete

Posted: Mon Aug 27, 2018 7:58 pm
by ray.wurlod
How is the upstream Transformer stage partitioned?

Posted: Mon Aug 27, 2018 10:15 pm
by perspicax
The transformer is set to auto. So if 'hash' or 'modulus' is selected in Agg stage then the same partition type should be set in the transformer? I will change the settings and report back.

Thanks

Posted: Tue Aug 28, 2018 2:09 am
by ray.wurlod
That would depend on what partitioning is used upstream of the Transformer stage. Your answer suggests an unfamiliarity with what partitioning does and achieves. Is there any explicit partitioning upstream in your job design?

Posted: Tue Aug 28, 2018 6:26 am
by chulett
chulett wrote:Perhaps this will help.
Did you click on the included link? Partitioning is core, key knowledge required for success using the tool. IMHO.