Python using Sparks Aggregate function example.

Aggregate Function

aggregate(zeroValue, seqOp, combOp)

Aggregate lets you take an RDD and generate a single value that is of a different type than what was stored in the original RDD.

It does this with three parameters. A zeroValue (or initial value) in the format of the result. A seqOp function that given the resulting type and an individual element in the RDD will merge the RDD element into the resulting object.

The combOb merges two resulting objects together.

Consider an example. We want to take a list of records about people and then we want to sum up their ages and count them. So for this example the type in the RDD will be a Dictionary in the format of {name: NAME, age:AGE, gender:GENDER}. The result type will be a tuple that looks like so (Sum of Ages, Count)

Lets first generate a peopleRDD with 5 people

In []:
people = []
people.append({'name':'Bob', 'age':45,'gender':'M'})
people.append({'name':'Gloria', 'age':43,'gender':'F'})
people.append({'name':'Albert', 'age':28,'gender':'M'})
people.append({'name':'Laura', 'age':33,'gender':'F'})
people.append({'name':'Simone', 'age':18,'gender':'T'})
peopleRdd=sc.parallelize(people)
len(peopleRdd.collect())

Now we need to create the seqOp. This takes an object of the rdd type and merge it into a record of the result type. Or another way to say this is add the age to the first element of the resulting tuple and add 1 for the second element of the tuple

In [25]:
seqOp = (lambda x,y: (x[0] + y['age'],x[1] + 1))

Now we write an operation to merge two resulting tuple.

In [26]:
combOp = (lambda x,y: (x[0] + y[0], x[1] + y[1]))

Run the function

In [27]:
peopleRdd.aggregate((0,0), seqOp, combOp)
Out[27]:
(167, 5)

And here is the result. So why is this convoluted? The combOp seems unecessary but in the map reduce world of spark you need that seperate operation. Realize that these functions are going to be parallelized. peopleRDD is partitioned up. And dependending on its source and method of converting the data to an RDD each row could be on its own partition.

So lets backup and define a few things

partition - A partition is how the RDD is split up. If our RDD was 100,000 records we could have as many as 100,000 partitions or only 1 partition depending on how we created the RDD.

task - A small job that operates on a single partition. A single task can run on only one machine at a time and can operate on only one partiton at a time.

For the aggregate function the seqOp will run once for every record in a partition. This will result in a resulting object for each partition. The combOp will be used to merge all the resulting objects together.