Tuesday 23 February 2010

Adsorption: scalable graph-based everything

Graph-based machine learning algorithms have a reputation for scaling badly, so I've enjoyed reading two papers describing a simple graph algorithm that does scale easily and which can be applied to a large range of problems.  Video suggestion and discovery for YouTube: taking random walks through the view graph by Shumeet Baluja and seven colleagues from Google, and Ranking and semi-supervised classification on large scale graphs using map-reduce by Delip Rao and David Yarowsky of Johns Hopkins University, both show how Label Propagation has a simple implementation in the map-reduce framework which is closely related to PageRank.

Imagine a graph in which you have labels only for some nodes, but where the edges and their associated weights show the connections between related nodes and their relative strengths.  Label Propagation allows you to infer labels for the remaining unlabelled nodes, or additional labels for those already labelled.  The Google paper considers a graph in which nodes represent videos and users, and edges between them represent views on YouTube.  User nodes are labelled with the videos they have viewed.  We can generate recommendations for a user by propagating additional labels from other users connected to them by many short paths i.e. users who have viewed some of the same videos.

The basic algorithm is so simple that, using the elegant dumbo python wrapper for Hadoop, we can write the whole thing in a few lines of code.

Let's suppose we have a log of fruit consumption here at Last.fm HQ:

norman  orange  1
norman  orange  1
mark    apple   1
klaas   orange  1
mark    banana  1
mark    apple   1
mark    apple   1
norman  banana  1
klaas   pear    1
ricky   banana  1
olivier cherry  1
norman  orange  1
klaas   cherry  1
olivier banana  1



First of all let's create a graph from this logfile.  We map the log entries:

def parse_log(value):
    user,item,count = value.split("\t")
    return user,item,int(count)
  
def map_log(key,value):
    user,item,count = parse_log(value)
    yield ((USER,user),(ITEM,item)),count
    yield ((ITEM,item),(USER,user)),count
 
  
and sum the counts with a dumbo.sumreducer to create the edges.  Next we map the edges

def map_edges(key,value):
    yield key[0],(key[1],value)
 
  
In the reducer we output adjacency lists, also adding a shadow or dummy node for each user, to which the observed label distributions are clamped.  We use the dumbo getpath decorator to output the adjacency lists and label distributions into different directories:

@opt("getpath", "yes")
def reduce_edges(key,values):
    values = list(values)
    yield ("adjacency_lists",key),values
    if key[0] == USER:
        yield ("adjacency_lists",(DUMMY_USER,key[1])),[(key,INJECTION_WEIGHT)]
        dist = normalise_dist(values,MAX_LABELS_PER_NODE)
        yield ("label_dists",(DUMMY_USER,key[1])),label_dist
     
      
Here's the function we use to normalise and prune label distributions, and the dumbo runner for this job:

def normalise_dist(dist,max_labels):
    dist = sorted(dist,key=itemgetter(1),reverse=True)[:max_labels]
    norm = sum(weight for label,weight in dist)
    return [(label,weight/norm) for label,weight in dist]

def runner(job):
    job.additer(map_log,sumreducer)
    job.additer(map_edges,reduce_edges)



Now we're ready to propagate the label distributions.  We do this iteratively, on each iteration sending the distribution at each node to each of its neighbours.  First we use some dumbo magic to join distributions to adjacency lists.  We just need to write a reducer:

class Reducer(JoinReducer):
    def primary(self,key,values):
        self.label_dist = values.next()
    def secondary(self,key,values):
        yield key,(self.label_dist,values.next())

      
Then we transmit the distribution from each node to its neighbours:

def map_propagate(key,value):
    label_dist,adjacency_list = value
    for node,weight in adjacency_list:
        yield node, [(label,prob*weight) for label,prob in label_dist]


Finally we sum and normalise the incoming distributions at each receiving node:

def reduce_sum_normalise(key,values):
    dist = defaultdict(lambda:0)
    for d in values:
        for label,prob in d:
            dist[label] += float(prob)
    dist = normalise_dist(dist.items(),MAX_LABELS_PER_NODE)
    yield key,dist


Here's the runner for this job:

def runner(job):
    multimapper = MultiMapper()
    multimapper.add("label_dists", primary(identitymapper))
    multimapper.add("adjacency_lists", secondary(identitymapper))
    job.additer(multimapper,Reducer)
    job.additer(map_propagate,reduce_sum_normalise)


And that's it: a massively scalable graph-based recommender in under 50 lines of python.  Well ok, we still need to interpret the ouput.  We started with an observed distribution at the dummy node for Norman like this:

(3, 'norman')   [((2, 'orange'), 0.75), ((2, 'banana'), 0.25)]


and after a few iterations our recommender has inferred this at his node:

(1, 'norman')   [((2, 'orange'), 0.45735654387297453), ((2, 'banana'), 0.28646536729600702), ((2, 'cherry'), 0.11646859085648147), ((2, 'apple'), 0.074826725260416671), ((2, 'pear'), 0.064882772714120379)]


So next lunchtime I'll suggest he tries some cherries.

1 comment:

  1. Hi

    This all looks great and Id like to try this algorithm out, Im quite new to python and a bit confused with certain bits above: at the moment Im getting

    NameError: global name 'USER' is not defined

    Thanks

    John

    ReplyDelete