Showing posts with label graph algorithm. Show all posts
Showing posts with label graph algorithm. Show all posts

Thursday, 18 April 2013

Contributing to GraphChi

GraphChi is a framework for processing large graphs efficiently on a single machine, developed by Aapo Kyrölä of CMU as a spin off from the impressive GraphLab distributed graph processing project.  Both GraphLab and GraphChi come with a really handy Collaborative Filtering Toolbox, implementing numerous recent algorithms and developed at CMU by Danny Bickson.

GraphChi looks like a great project so I decided to try to contribute to it and took the chance to implement an algorithm that I'd been wanting to investigate with for a while: Collaborative Less-is-More Filtering, developed by Yue Shi at TU Delft, which won a best paper award at RecSys last year.  CLiMF optimises for the Mean Reciprocal Rank of correctly predicted items i.e. it's designed to promote accuracy and diversity in recommendations at the same time.  Although it's really intended for binary preference data like follow or friend relations, it's easy to implement a threshold on ratings that automatically binarises them during learning, so CLiMF can also be used with ratings datasets.

Danny made contributing to the toolbox really easy and CLiMF is now available in GraphChi, and documented alongside the other algorithms.  I also wrote a simple Python implementation which works fine for small datasets and which was useful for reference.

You can get the latest version of GraphChi and the collaborative filtering toolbox from here.

Thursday, 21 March 2013

Hadoop and beyond: power tools for data mining

Last week Dell Zhang kindly invited me to give a guest lecture to Birkbeck and UCL students on his Cloud Computing course.  It was fun to show them some of the tools that make Hadoop development easier and more maintainable, and also some of the problems for which Hadoop is not a magic bullet.

I took the chance to evangelise about some of my favourite tools and frameworks, and to spend a bit of time getting to know some new ones.  I specially enjoyed taking a look at Scalding, Spark and GraphChi.

Here are the slides:

 

Tuesday, 6 April 2010

Graph processing for really big data

MapReduce implementations of graph algorithms like PageRank and adsorption scale to millions of nodes on a cluster of around 50 machines, but if you want to process billions (or even tens of millions, depending on your algorithm) then you need a different framework.  Google uses Pregel, about which they've said little except that it was inspired by the Bulk Synchronous Parallel model for parallel programming.

So the announcement of a BSP package for Hadoop in the Apache HAMA project could be an interesting one to watch.  There's even a BSP hello world, although getting further may be hard work with the current level of documentation.

MapReduce algorithm design

Data-Intensive Text Processing with MapReduce is a new book by Jimmy Lin and Chris Dyer of the University of Maryland.  It's due for publication later this year but a full draft is already available as a pdf.  The book shows how you can implement a variety of useful algorithms on a MapReduce cluster, including graph algorithms such as breadth first search and PageRank, and parameter estimation for latent variable models, with a detailed explanation of how to do this for Hidden Markov Models, and even a sketch for Conditional Random Fields.  Although the book is a practical manual, the algorithms are given in simple pseudo-code rather than Java classes intended for use on a Hadoop cluster.  This has huge advantages for readability, and makes it much easier for the authors to draw out some generally applicable design patterns for MapReduce algorithms.

The order inversion pattern is a nice trick that lets a reducer see intermediate results before it processes the data that generated them.  Lin and Dyer illustrate this with the example of computing relative frequencies for co-occurring word pairs e.g. what are the relative frequencies of words occurring within a small window of the word "dog"?  The mapper counts word pairs in the corpus, so its output looks like
((dog, cat), 125)
((dog, foot), 246)
...
But it also keeps a running total of all the word pairs containing "dog", outputting this as
((dog,*), 5348)
Using a suitable partitioner, so that all (dog,...) pairs get sent to the same reducer, and choosing the "*" token so that it occurs before any word in the sort order, the reducer sees the total ((dog,*), 5348) first, followed by all the other counts, and can trivially store the total and then output relative frequencies.  The benefit of the pattern is that it avoids an extra MapReduce iteration without creating any additional scalability bottleneck.

Other patterns explained in the book include the pairs and stripes approaches to produce large sparse matrix mapper output, in-mapper combining to limit the amount of mapper output written to disk (a common scalability bottleneck in MapReduce), and value-to-key conversion for relational joins of large datasets.

All in all, this book is a great complement to Tom White's Hadoop book.  An extra plus is that the pseudo code can be translated with virtually no effort into dumbo code for Hadoop.  I can see Data-Intensive Text Processing and dumbo becoming a standard way of teaching MapReduce quickly and excitingly in the classroom.



Tuesday, 23 February 2010

Adsorption: scalable graph-based everything

Graph-based machine learning algorithms have a reputation for scaling badly, so I've enjoyed reading two papers describing a simple graph algorithm that does scale easily and which can be applied to a large range of problems.  Video suggestion and discovery for YouTube: taking random walks through the view graph by Shumeet Baluja and seven colleagues from Google, and Ranking and semi-supervised classification on large scale graphs using map-reduce by Delip Rao and David Yarowsky of Johns Hopkins University, both show how Label Propagation has a simple implementation in the map-reduce framework which is closely related to PageRank.

Imagine a graph in which you have labels only for some nodes, but where the edges and their associated weights show the connections between related nodes and their relative strengths.  Label Propagation allows you to infer labels for the remaining unlabelled nodes, or additional labels for those already labelled.  The Google paper considers a graph in which nodes represent videos and users, and edges between them represent views on YouTube.  User nodes are labelled with the videos they have viewed.  We can generate recommendations for a user by propagating additional labels from other users connected to them by many short paths i.e. users who have viewed some of the same videos.

The basic algorithm is so simple that, using the elegant dumbo python wrapper for Hadoop, we can write the whole thing in a few lines of code.

Let's suppose we have a log of fruit consumption here at Last.fm HQ:

norman  orange  1
norman  orange  1
mark    apple   1
klaas   orange  1
mark    banana  1
mark    apple   1
mark    apple   1
norman  banana  1
klaas   pear    1
ricky   banana  1
olivier cherry  1
norman  orange  1
klaas   cherry  1
olivier banana  1



First of all let's create a graph from this logfile.  We map the log entries:

def parse_log(value):
    user,item,count = value.split("\t")
    return user,item,int(count)
  
def map_log(key,value):
    user,item,count = parse_log(value)
    yield ((USER,user),(ITEM,item)),count
    yield ((ITEM,item),(USER,user)),count
 
  
and sum the counts with a dumbo.sumreducer to create the edges.  Next we map the edges

def map_edges(key,value):
    yield key[0],(key[1],value)
 
  
In the reducer we output adjacency lists, also adding a shadow or dummy node for each user, to which the observed label distributions are clamped.  We use the dumbo getpath decorator to output the adjacency lists and label distributions into different directories:

@opt("getpath", "yes")
def reduce_edges(key,values):
    values = list(values)
    yield ("adjacency_lists",key),values
    if key[0] == USER:
        yield ("adjacency_lists",(DUMMY_USER,key[1])),[(key,INJECTION_WEIGHT)]
        dist = normalise_dist(values,MAX_LABELS_PER_NODE)
        yield ("label_dists",(DUMMY_USER,key[1])),label_dist
     
      
Here's the function we use to normalise and prune label distributions, and the dumbo runner for this job:

def normalise_dist(dist,max_labels):
    dist = sorted(dist,key=itemgetter(1),reverse=True)[:max_labels]
    norm = sum(weight for label,weight in dist)
    return [(label,weight/norm) for label,weight in dist]

def runner(job):
    job.additer(map_log,sumreducer)
    job.additer(map_edges,reduce_edges)



Now we're ready to propagate the label distributions.  We do this iteratively, on each iteration sending the distribution at each node to each of its neighbours.  First we use some dumbo magic to join distributions to adjacency lists.  We just need to write a reducer:

class Reducer(JoinReducer):
    def primary(self,key,values):
        self.label_dist = values.next()
    def secondary(self,key,values):
        yield key,(self.label_dist,values.next())

      
Then we transmit the distribution from each node to its neighbours:

def map_propagate(key,value):
    label_dist,adjacency_list = value
    for node,weight in adjacency_list:
        yield node, [(label,prob*weight) for label,prob in label_dist]


Finally we sum and normalise the incoming distributions at each receiving node:

def reduce_sum_normalise(key,values):
    dist = defaultdict(lambda:0)
    for d in values:
        for label,prob in d:
            dist[label] += float(prob)
    dist = normalise_dist(dist.items(),MAX_LABELS_PER_NODE)
    yield key,dist


Here's the runner for this job:

def runner(job):
    multimapper = MultiMapper()
    multimapper.add("label_dists", primary(identitymapper))
    multimapper.add("adjacency_lists", secondary(identitymapper))
    job.additer(multimapper,Reducer)
    job.additer(map_propagate,reduce_sum_normalise)


And that's it: a massively scalable graph-based recommender in under 50 lines of python.  Well ok, we still need to interpret the ouput.  We started with an observed distribution at the dummy node for Norman like this:

(3, 'norman')   [((2, 'orange'), 0.75), ((2, 'banana'), 0.25)]


and after a few iterations our recommender has inferred this at his node:

(1, 'norman')   [((2, 'orange'), 0.45735654387297453), ((2, 'banana'), 0.28646536729600702), ((2, 'cherry'), 0.11646859085648147), ((2, 'apple'), 0.074826725260416671), ((2, 'pear'), 0.064882772714120379)]


So next lunchtime I'll suggest he tries some cherries.