Thursday, August 8, 2013

Friend Recommender In MapReduce

Hello Guys, today MapReduce is becoming a very popular framework for designing a data processing system for application has huge amount of data inshort #Bigdata. The main reason behind the popularity of MapReduce is the Scalability. You can easily carry out the very complex data processing through a huge amount of data in very short span of time(Nearly real time), unlike the traditional data processing systems takes hours to process it.

Here I wanna discuss a very popular use case of bigdata processing is the Friend Recommendations or you may name it as artifact recommendation

Here is the problem.
How to find out the Nth degree mutual friend from given list of friends like
A is direct friend of B and B is direct friend of C then C is the 2nd degree mutual friend of A.
below is the input(userid and their direct friends userid)

5101,5102
5102,5104
5102,5105
5103,5106
5101,5106
5106,5107
5105,5107
5104,5102

In the first phase MapReduce will findout the group of friends by user, in Map phase produces the Mapping of 2xN and reduce will reduce it to N with group of friends by user.

Mapper:
public static class Map extends Mapper<Longwritable,Text, Text, Text> {

  @Override
  public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
   String line[] = value.toString().split("\\t");
   String fromUser = line[0].trim();

   if (line.length == 2) {
    String toUser = line[1].trim();
    context.write(new Text(toUser), new Text(fromUser));
    context.write(new Text(fromUser),new Text(toUser));
   }else{
    context.write(new Text(fromUser),null);
   }
  }
 }
Reducer:
public static class Reduce extends Reducer<Text,Text, Text, Text> {
  @Override
  public void reduce(Text key, Iterable<text> values, Context context)
    throws IOException, InterruptedException {

   ArrayList<string> userEntryList = new ArrayList<>();
   Iterator<text> friends = values.iterator();

   while(friends.hasNext()){
    Text e = friends.next();
    if(e!=null){
     userEntryList.add(String.valueOf(e.toString()));
    }
   }
   context.write(key, new Text(userEntryList.toString()));
  }
 }

And the output will be generated
5101   [5102, 5106]
5102   [5104, 5105, 5101, 5104]
5103   [5106]
5104   [5102, 5102]
5105   [5102]
5106   [5107, 5103, 5101]
5107   [5106]

Now you need to find out the 2nd degree friends like friends of each friend
In Map Phase, Emit the <touser1, r=touser2,m=fromuser>, here touser1 is current user, r means recommended friend and m means mutual friend. Like A is friend of B and B of C, then we can recommend C to A though mutual friend B, means here  above formula becomes<touser1=A,r=touser2=C,m=fromuser=B>. It will emit n(n-1) records Totally there are n^2 records emitted though map phase. In reduce phase we just sum the how many friend will be there for current user and key.

As emitted value is not primitive type in hadoop, so we can create our own datatype

static public class FriendCount implements Writable {
  public Long user;
  public Long mutualFriend;

  public FriendCount(Long user, Long mutualFriend) {
   this.user = user;
   this.mutualFriend = mutualFriend;
  }

  public FriendCount() {
   this(-1L, -1L);
  }

  @Override
  public void write(DataOutput out) throws IOException {
   out.writeLong(user);
   out.writeLong(mutualFriend);
  }

  @Override
  public void readFields(DataInput in) throws IOException {
   user = in.readLong();
   mutualFriend = in.readLong();
  }

  @Override
  public String toString() {
   return " toUser: "
     + Long.toString(user) + " mutualFriend: " + Long.toString(mutualFriend);
  }
 }

Map and Reduce can be implemented by
public static class Map extends Mapper<LongWritable, Text, LongWritable, FriendCount> {
  private Text word = new Text();

  @Override
  public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
   String line[] = value.toString().split("\\t");
   Long fromUser = Long.parseLong(line[0]);
   List<Long> toUsers = new ArrayList<Long>();

   if (line.length == 2) {
    StringTokenizer tokenizer = new StringTokenizer(line[1], ",");
    while (tokenizer.hasMoreTokens()) {
     Long toUser = Long.parseLong(tokenizer.nextToken().replace("[", "").replace("]", "").trim());
     toUsers.add(toUser);
     context.write(new LongWritable(fromUser), new FriendCount(toUser, -1L));
    }

    for (int i = 0; i < toUsers.size(); i++) {
     for (int j = i + 1; j < toUsers.size(); j++) {
      context.write(new LongWritable(toUsers.get(i)), new FriendCount((toUsers.get(j)), fromUser));
      context.write(new LongWritable(toUsers.get(j)), new FriendCount((toUsers.get(i)), fromUser));
     }
    }
   }
  }
 }

 public static class Reduce extends Reducer<LongWritable, FriendCount, LongWritable, Text> {
  @Override
  public void reduce(LongWritable key, Iterable<FriendCount> values, Context context)
    throws IOException, InterruptedException {

   final java.util.Map<Long, Set<Long>> mutualFriends = new HashMap<Long, Set<Long>>();

   for (FriendCount val : values) {
    final Boolean isAlreadyFriend = (val.mutualFriend == -1);
    final Long toUser = val.user;
    final Long mutualFriend = val.mutualFriend;

    if (mutualFriends.containsKey(toUser)) {
     if (isAlreadyFriend) {
      mutualFriends.put(toUser, null);
     } else if (mutualFriends.get(toUser) != null) {
      mutualFriends.get(toUser).add(mutualFriend);
     }
    } else {
     if (!isAlreadyFriend) {
      mutualFriends.put(toUser, new HashSet<Long>() {
       {
        add(mutualFriend);
       }
      });
     } else {
      mutualFriends.put(toUser, null);
     }
    }
   }

   java.util.SortedMap<Long, Set<Long>> sortedMutualFriends = new TreeMap<Long, Set<Long>>(new Comparator<Long>() {
    @Override
    public int compare(Long key1, Long key2) {
     Integer v1 = mutualFriends.get(key1).size();
     Integer v2 = mutualFriends.get(key2).size();
     if (v1 > v2) {
      return -1;
     } else if (v1.equals(v2) && key1 < key2) {
      return -1;
     } else {
      return 1;
     }
    }
   });

   for (java.util.Map.Entry<Long, Set<Long>> entry : mutualFriends.entrySet()) {
    if (entry.getValue() != null) {
     sortedMutualFriends.put(entry.getKey(), entry.getValue());
    }
   }

   Integer i = 0;
         String output = "";
         Set<Long> entrySet = new HashSet<>();
   for (java.util.Map.Entry<Long, Set<Long>> entry : sortedMutualFriends.entrySet()) {
    entrySet.add(entry.getKey());
             entrySet.addAll(entry.getValue());            
   }
   Iterator<Long> setItr = entrySet.iterator();
   while(setItr.hasNext()){
    if(i==0)
     output+=setItr.next();
    else
     output+="\t"+setItr.next();
    
    ++i;
   }

  context.write(key, new Text(output));
 }
Final Output you can see like first is the current user id and against you can see the direct friends with recommended friends
[5101, 5102, 5106, 5104, 5105, 5107, 5103]
[5102, 5104, 5105, 5101, 5104, 5106]
[5103, 5106, 5107, 5101]
[5104, 5102, 5102, 5105, 5101]
[5105, 5102, 5104, 5101]
[5106, 5107, 5103, 5101, 5102]
[5107, 5106, 5103, 5101]
You can implement the same code in simple java programmer without using MapReduce framework, it works well but not much scalable as MapReduce, You can find below the Normal JAVA code to find out the recommended friends might help you to design MapReduce
package com.java.amolfasale;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

@SuppressWarnings("serial")
public class FriendRecommendationWithoutMapReduce extends TreeMap<String, List<String>> {

 //Overriding put method to append friends of same user
 public void put(String key, String number) {
  List<String> current = get(key);
  if (current == null) {
   current = new ArrayList<String>();
   super.put(key, current);
  }
  current.add(number);
 }

 @SuppressWarnings("rawtypes")
 public static void main(String[] args) {
 
  FriendRecommendationWithoutMapReduce user = new FriendRecommendationWithoutMapReduce();
  //Putting all values in map
  user.put("5101", "5102");
  user.put("5102", "5104");
  user.put("5102", "5105");
  user.put("5103", "5106");
  user.put("5101", "5106");
  user.put("5106", "5107");
  user.put("5104", "5102");
  
  // Putting the same value in reverse
  user.put("5102","5101");
  user.put("5104", "5102");
  user.put("5105", "5102");
  user.put("5106", "5103");
  user.put("5106", "5101");
  user.put("5107", "5106");
  user.put("5102", "5104");
  
  System.out.println("\n___________________Group By Friends__________________________\n");
  
  ArrayList<String> userEntryList = new ArrayList<>();

  // For N=2
  for (Map.Entry e : user.entrySet()) {
   System.out.println(e.getKey() + "    " + e.getValue());
   userEntryList.add(String.valueOf(e.getKey()));
  }

  System.out.println("\n___________________Final Output__________________________\n");
  // For Rest Case
  for (int i = 0; i <= userEntryList.size() - 1; i++) {
   List<String> output = new ArrayList<>();
   output.add(userEntryList.get(i));

   // Get All 2nd degree Related Friend of User i
   List<String> friends = user.get(userEntryList.get(i));
   output.addAll(friends);
   
   for (int j = 0; j < friends.size(); j++) {
    List<String> aList = new ArrayList<>();
    aList.addAll(user.get(friends.get(j)));
    for (int k = 0; k < aList.size(); k++) {
     if(!output.contains(aList.get(k))){
      output.add(aList.get(k));
     }
    }
   }
   System.out.println(output.toString());
  }
  System.out.println("\n___________________End Final Output__________________________\n");
 }
}

Followers