Imagine a graph in which you have labels only for some nodes, but where the edges and their associated weights show the connections between related nodes and their relative strengths. Label Propagation allows you to infer labels for the remaining unlabelled nodes, or additional labels for those already labelled. The Google paper considers a graph in which nodes represent videos and users, and edges between them represent views on YouTube. User nodes are labelled with the videos they have viewed. We can generate recommendations for a user by propagating additional labels from other users connected to them by many short paths i.e. users who have viewed some of the same videos.

The basic algorithm is so simple that, using the elegant dumbo python wrapper for Hadoop, we can write the whole thing in a few lines of code.

Let's suppose we have a log of fruit consumption here at Last.fm HQ:

` norman orange 1`

norman orange 1

mark apple 1

klaas orange 1

mark banana 1

mark apple 1

mark apple 1

norman banana 1

klaas pear 1

ricky banana 1

olivier cherry 1

norman orange 1

klaas cherry 1

olivier banana 1

First of all let's create a graph from this logfile. We map the log entries:

`def parse_log(value):`

user,item,count = value.split("\t")

return user,item,int(count)

def map_log(key,value):

user,item,count = parse_log(value)

yield ((USER,user),(ITEM,item)),count

yield ((ITEM,item),(USER,user)),count

and sum the counts with a dumbo.sumreducer to create the edges. Next we map the edges

`def map_edges(key,value):`

yield key[0],(key[1],value)

In the reducer we output adjacency lists, also adding a shadow or dummy node for each user, to which the observed label distributions are clamped. We use the dumbo

`getpath`

decorator to output the adjacency lists and label distributions into different directories:

`@opt("getpath", "yes")`

def reduce_edges(key,values):

values = list(values)

yield ("adjacency_lists",key),values

if key[0] == USER:

yield ("adjacency_lists",(DUMMY_USER,key[1])),[(key,INJECTION_WEIGHT)]

dist = normalise_dist(values,MAX_LABELS_PER_NODE)

yield ("label_dists",(DUMMY_USER,key[1])),label_dist

Here's the function we use to normalise and prune label distributions, and the dumbo runner for this job:

`def normalise_dist(dist,max_labels):`

dist = sorted(dist,key=itemgetter(1),reverse=True)[:max_labels]

norm = sum(weight for label,weight in dist)

return [(label,weight/norm) for label,weight in dist]

def runner(job):

job.additer(map_log,sumreducer)

job.additer(map_edges,reduce_edges)

Now we're ready to propagate the label distributions. We do this iteratively, on each iteration sending the distribution at each node to each of its neighbours. First we use some dumbo magic to join distributions to adjacency lists. We just need to write a reducer:

`class Reducer(JoinReducer):`

def primary(self,key,values):

self.label_dist = values.next()

def secondary(self,key,values):

yield key,(self.label_dist,values.next())

Then we transmit the distribution from each node to its neighbours:

`def map_propagate(key,value):`

label_dist,adjacency_list = value

for node,weight in adjacency_list:

yield node, [(label,prob*weight) for label,prob in label_dist]

Finally we sum and normalise the incoming distributions at each receiving node:

`def reduce_sum_normalise(key,values):`

dist = defaultdict(lambda:0)

for d in values:

for label,prob in d:

dist[label] += float(prob)

dist = normalise_dist(dist.items(),MAX_LABELS_PER_NODE)

yield key,dist

Here's the runner for this job:

`def runner(job):`

multimapper = MultiMapper()

multimapper.add("label_dists", primary(identitymapper))

multimapper.add("adjacency_lists", secondary(identitymapper))

job.additer(multimapper,Reducer)

job.additer(map_propagate,reduce_sum_normalise)

And that's it: a massively scalable graph-based recommender in under 50 lines of python. Well ok, we still need to interpret the ouput. We started with an observed distribution at the dummy node for Norman like this:

`(3, 'norman') [((2, 'orange'), 0.75), ((2, 'banana'), 0.25)]`

and after a few iterations our recommender has inferred this at his node:

`(1, 'norman') [((2, 'orange'), 0.45735654387297453), ((2, 'banana'), 0.28646536729600702), ((2, 'cherry'), 0.11646859085648147), ((2, 'apple'), 0.074826725260416671), ((2, 'pear'), 0.064882772714120379)]`

So next lunchtime I'll suggest he tries some cherries.