Friday, March 22, 2013

Recommendations with Apache Mahout

Recommendation?

Have you ever been recommended a friend on Facebook? Or visited a shopping portal where you can see the recommended items for you, Or an item you might be interested in on Amazon? If so then you've benefited from the value of recommendation systems.
for example, often see personalized recommendations phrased something like, “If you liked that item, you might like also like this one...” These sites use recommendations to help drive users  to other things they offer in an intelligent, meaningful way, tailored specifically to the user and the user’s preferences.

Recommendation systems apply knowledge discovery techniques to the problem of making recommendations that are personalized for each user. Recommendation systems are one way we can use algorithms to help us sort through the masses of information to find the “good stuff” in a very managed way.

From an algorithmic standpoint, the recommendation systems we’ll talk about today are considered in the k-nearest neighbor family of problems (another type would be a SVD-based recommender). We want to predict the estimated preference of a user towards an item they have never seen before. We also want to generate a ranked (by preference score) list of items the user might be most interested in. Two well-known styles of recommendation algorithms are item-based recommenders and user-based recommenders. Both types rely on the concept of a similarity function/metric (ex: Euclidean distance, log likelihood), whether it is for users or items.

Overview of a recommendation engine

The main purpose of a recommendation engine is to make inferences on existing data to show relationships between objects and entities. Objects can be many things, including users, items, products(in short user related data) and so on. Relationships provide a degree of likeness or belonging between objects. For example, relationships can represent ratings of how much a user likes an item, or indicate if a user bookmarked a particular page.

To make a recommendation, recommendation engines perform several steps to mine the data(Data mining). Initially, you begin with input data that represents the objects as well as their relationships. Input data consists of object identifiers and the relationships to other objects.



Consider the ratings users give to items. Using this input data, a recommendation engine computes a similarity between objects. Computing the similarity between objects(co-similarity) can take a great deal of time depending on the size of the data or the particular algorithm. Distributed algorithms such as Apache Hadoop using Mahout can be used to parallelize the computation of the similarities. There are different types of algorithms to compute similarities. Finally, using the similarity information, the recommendation engine can make recommendation requests based on the parameters requested.

For Example:
GroupLens Movie Data

The input data for this demo is based on 1M anonymous ratings of approximately 4000 movies made by 6,040 MovieLens users, which you can download from the www.grouplens.org site. The zip file contains four files:

movies.dat (movie ids with title and category)
ratings.dat (ratings of movies)
README
users.dat (user information)

The ratings file is most interesting to us since it’s the main input to our recommendation job. Each line has the format:
Ratings.dat description

UserID::MovieID::Rating::Timestamp

So let’s adjust our input file to match what we need to run our job. First download the file and unzip it locally from:

Next run the command:
        tr -s ':' ',' < ratings.dat | cut -f1-3 -d, > ratings.csv

This produces the csv output format we’ll use in the next section when we run our “Itembased Collaborative Filtering” job.

        hadoop fs -put [my_local_file] [user_file_location_in_hdfs]

this command put  input file on HDFS,

create user.txt file which stores the data(userID) of the users to which we want show recommendations.
put it on HDFS under users directory.
With our user list in hdfs we can now run the Mahout  recommendation job with a command in the form of:
     
       mahout recommenditembased --input [input-hdfs-path] --output [output-hdfs-path] --tempDir [tmp-hdfs-path] --usersFile [user_file_location_in_hdfs]

which will run for a while (a chain of 10 MapReduce jobs) and then write out the item recommendations into HDFS we can now take a look at.  If we tail the output from the RecommenderJob with the command:

         hadoop fs -cat [output-hdfs-path]/part-r-00000

The output will show the user(provided into user.txt) with the recommended items.

For more details:
http://blog.cloudera.com/blog/2011/11/recommendation-with-apache-mahout-in-cdh3/

Thursday, March 14, 2013

Apache Hadoop HttpFS : A service that provides HTTP access to HDFS.

HttpFS :  Introduction

Apache Hadoop HttpFS is a service that provides HTTP access to HDFS.
HttpFS provides a REST HTTP gateway supports HDFS operations like read and write, It can be used to transfer data between clusters running different versions of Hadoop. Also HttpFS can be used to access data in HDFS using HTTP utilities.

HttpFS was inspired by Hadoop HDFS proxy, It can be seening as a full rewrite of Hadoop HDFS proxy.
Hadoop HDFS proxy provides a subset of file system operations (read only), Its also provides support for all file system operations.

HttpFS uses a clean HTTP REST API making its use with HTTP tools more intuitive.


About security, HttpFS supports Hadoop pseudo-authentication, HTTP SPNEGO Kerberos, and additional authentication mechanisms via a plugin API. HttpFS also supports Hadoop proxy user functionality. 


HttpFS :  Installation

Prerequisites for installing HttpFS are:

  • Java 6+
  • Maven 3+

 Installing HttpFS

      HttpFS is distributed in the hadoop-httpfs package. To install it, use your preferred package manager application. Install the package on the system that will run the HttpFS server.

      $ sudo yum install hadoop-httpfs    //on a Red Hat-compatible system
   $ sudo zypper install hadoop-httpfs  / /on a SLES system
   $ sudo apt-get install hadoop-httpfs  //on an Ubuntu or Debian system

or If you have a httpfs tarball then you can simply untar it,
 
   $ tar xzf  httpfs-2.0.3-alpha.tar.gz
 
now you are ready to configure HttpFS. 

Configure HttpFS

     HttpFS reads the HDFS configuration from the core-site.xml and hdfs-site.xml files in /etc/hadoop/conf/. If necessary edit those files to configure the HDFS HttpFS will use. By default, HttpFS assumes that Hadoop configuration files (core-site.xml & hdfs-site.xml) are in the HttpFS configuration directory.

Configure Hadoop

Edit Hadoop core-site.xml and defined the Unix user that will run the HttpFS server as a proxyuser. For example:

     <property>
        <name>hadoop.proxyuser.myhttpfsuser.hosts</name>
        <value>httpfs-host.foo.com</value>
     </property>
     <property>
        <name>hadoop.proxyuser.myhttpfsuser.groups</name>
        <value>*</value>
     </property>
 
Note : Please replace "myhttpfsuser" to your httpfs host name. 
IMPORTANT : You need to restart Hadoop for the proxyuser configuration
            become active. 

Starting/Stopping the HttpFS Server

 To start/stop HttpFS use HttpFS's bin/httpfs.sh script. For example:
    
       httpfs-2.0.3-alpha $ bin/httpfs.sh start  --> for start
       httpfs-2.0.3-alpha $ bin/httpfs.sh stop   --> for stop 

Test HttpFS is working

A tool such as curl to access HDFS via HttpFS. For example, to obtain the home directory of the user ubantu, use a command such as this:
$ curl -i "http://<MyHttpFSHostName>:14000?user.name=ubantu&op=homedir"
       HTTP/1.1 200 OK
       Content-Type: application/json
       Transfer-Encoding: chunked

       {"homeDir":"http:\/\/<MyHttpFSHostName>:14000\/user\/ubantu"} 
  
$ curl "http://<MyHttpFSHostName>:14000/webhdfs/v1?op=homedir&user.name=ubantu" 
       HTTP/1.1 200 OK
       Server: Apache-Coyote/1.1 Set-Cookie: hadoop.auth="u=ubantu&p=ubantu&t=simple =4558977754545&s=wtFFgaGHHJFGffasWXK68rc 
       /0xI=";Version=1; Path=/
       Content-Type: application/json
       Transfer-Encoding: chunked
       Date: Wed, 28 Mar 2012 13:35:55 GMT
 
       {"Path":"\/user\/MyHttpFSHostName"}
 

 See the WebHDFS REST API web page for complete documentation of the API.

Friday, March 8, 2013

Hoop : Hadoop HDFS over HTTP/s

Hoop provides access to all Hadoop Distributed File System (HDFS) operations (read and write) over HTTP/S

Hoop is a server that provides a REST HTTP gateway supporting all HDFS File System operations (read and write). It can be used to transfer data between clusters running different versions of Hadoop. The Hoop server acts as a gateway and is the only system that is allowed to cross the firewall into the cluster so can be used for access data from HDFS.
Hoop can be used to access data in HDFS using HTTP utilities (such as curl) and HTTP libraries Perl from other languages than Java. It also provides a Hadoop HDFS FileSystem implementation to allow that enables access to HDFS over HTTP using the hadoop command line tool as well as the Hadoop FileSystem Java API.

So we can easily integrate the HDFS using Hoop with our normal java application using Apache tomcat or any other application server. It is very feasible way to store large amount/variety of data in short big data over the cluster file system(HDFS).

Hoop has hoop server and client components,
  • The Hoop server component is a REST HTTP gateway to HDFS supporting all file system operations. It can be accessed using standard HTTP tools, HTTP libraries from different programing languages (i.e. Perl, JavaScript) as well as using the Hoop client. 
  • The Hoop client component is an implementation of Hadoop FileSystem(HDFS) client that allows using the familiar Hadoop filesystem API to access HDFS data through a Hoop server. 

Accessing HDFS via Hoop using linux command

To get access to home directory:

$ curl -i "http://my_server:14333?op=homedir&user.name=ubuntu"

To reading a file

$ curl -i "http://my_server:14333?/user/ubuntu/test.txt&user.name=ubuntu"

To writing a file

$ curl -i -X POST "http://my_server:14333/user/ubuntu/test.txt?op=create" 
     --data-binary @data.txt --header "content-type: application/octet-stream" 
 

You can find hoop source code and documentation and setup here 

Hoop is distributed with an Apache License 2.0.
The source code is available at http://github.com/cloudera/hoop.
Instructions on how to build, install and configure Hoop server and the rest of documentation is available at http://cloudera.github.com/hoop.

Thursday, March 7, 2013

BigData : The data growth is 100 times bigger than population.


There are approximately 490,000 babies born and Over 150,000 People Die every day worldwide.

1. Near about 175 Million People Log Into Facebook Every Day, more on 250 million photos uploaded per day, 2.7 billion likes and comments per day.but more than this,
2. Twitter has confirmed that there are over 250,000,000 tweets posted every single day on the network.
3. Over 800 million unique users visit YouTube each month, Over 4 billion hours of video are watched each month, 72 hours of video are uploaded to every minute.
4. In 2011, YouTube had more than 1 trillion views or around 140 views for every person on Earth

This is present, what about the future, data is increasing 100 times more than human growth speed.

More recently, multiple analysts have estimated that data will grow 800% over the next five years. Computer World states that unstructured information might account for more than 70%–80% of all data in organizations

Volume,Variety and Velocity are the three measure pillars of BigData to achieve this data becomes Unstructured Data.
Unstructured Data : The Data that either does not have a data model or does not have relational tables. Unstructured data is typically text-heavy, but may contain data such as pictures, videos, songs, and of course the tabular data.

However, unstructured content is largely created by humans: inconsistent, emotional, careless, opinionated, lazy, driven, over-worked, always unique, humans. Appreciating this difference in the origins of the data that we seek to analyze is the first step to producing actionable insight and business advantage.

Then only Hadoop is one of the best the solution for BigData.

Hadoop:
The Apache Hadoop is a open source framework that allows for the distributed processing of large data sets(BigData) across clusters of computers using simple programming models.

Hadoop changes the economics and the dynamics of large scale computing. Its impact can be boiled down to four salient characteristics.

Eighty percent of the world’s data is unstructured, and most businesses don’t even attempt to use this data to their advantage. Imagine if you could afford to keep all the data generated by your business? Imagine if you had a way to analyze that data?

For more details about hadoop : http://hadoop.apache.org/

Tuesday, March 5, 2013

Big Data spans three dimensions : Volume, Velocity and Variety.


•• Volume: 
Enterprises are awash with ever-growing data of all types, easily amassing terabytes—even petabytes—of information.
       1. Input data to big data systems could be chatter from social networks, web server logs, traffic flow sensors, satellite imagery, broadcast audio streams, banking transactions, MP3s of rock music, the content of web pages, scans of government documents, GPS trails, telemetry from automobiles, financial market data, and so on.
       2. 100 billion Facebook requests and 200 million Tweets per day

•• Velocity: 
Sometimes a  minute is too late. For time-sensitive processes such as catching fraud, financial Market or traffic flow sensors, big data must be used as it streams into your enterprise in order to maximize its value.
     1. Scrutinize 5 million trade events created each day to identify potential   fraud
     2. Analyze 1000's million daily call detail records in real-time.

•• Variety:  
Big data is any type of data - structured (RDBMS) and unstructured data such as text, sensor data, audio, video, click streams, log files and more. New insights are found when analyzing these data types together.
    1. Monitor 100’s of live video feeds from surveillance cameras to target points of interest
     2. Exploit the 80% data growth in images, video and documents to improve customer satisfaction



 "Big data concept is not only the limited for size; it is an brightest way to find insights in new and various emerging types of data and contents, to make your solution more robust and powerful, and to answer questions that were previously considered beyond your reach, only the sky is the limit…!!!"

HBase : A HDFS Database

HBase: A Simple Introduction
HBase: Features
  • Strongly consistent reads/writes: HBase is not an "eventually consistent" Data Store. This makes it very suitable for tasks such as high-speed counter aggregation.
  • Automatic sharding: HBase tables are distributed on the cluster via regions, and regions are automatically split and re-distributed as your data grows.
  • Automatic Region Server failover
  • Hadoop/HDFS Integration: HBase supports HDFS out of the box as its distributed file system.
  • MapReduce: HBase supports massively parallelized processing via MapReduce for using HBase as both source and sink.
  • Java Client API: HBase supports an easy to use Java API for programmatic access.
  • Thrift/REST API: HBase also supports Thrift and REST for non-Java front-ends.
  • Block Cache and Bloom Filters: HBase supports a Block Cache and Bloom Filters for high volume query optimization.
  • Operational Management: HBase provides build-in web-pages for operational insight as well as JMX metrics.
HBase: Data Model
HBase: Architecture
                                                                HBase: Architecture
1.      Master
2.     Region Servers
Table       (HBase table)
Region      (Regions for the table)
Store       (Store per Column Family for each Region for the table)
MemStore    (MemStore for each Store for each Region for the table)
StoreFile   (StoreFiles for each Store for each Region for the table)
Block       (Blocks within a StoreFile within a Store for each Region for the table)


        ·  StoreFile (HFile):
StoreFiles are where your data lives.
        $ ${HBASE_HOME}/bin/hbase org.apache.hadoop.hbase.io.hfile.HFile

         ·  MemStore:
       The MemStore holds in-memory modifications to the Store. Modifications are Key-Values. When asked to flush, current MemStore is moved to snapshot and is cleared. HBase continues to serve edits out of new MemStore and backing snapshot until flusher reports in that the flush succeeded. At this point the snapshot is let go.

HBase: Bulk Loading
Bulk Load Architecture
2.      Completing the data load
When Would I Use Apache HBase?
What Is The Difference Between HBase and Hadoop/HDFS?
References:

Apache HBase is the Hadoop database, a distributed, scalable, big data store.



HBase is a type of "NoSQL" database. "NoSQL" is a general term meaning that the database isn't an RDBMS which supports SQL as its primary access language, but there are many types of NoSQL databases: Berkeley DB is an example of a local NoSQL database, whereas HBase is very much a distributed database. Technically speaking, HBase is really more a "Data Store" than "Data Base" because it lacks many of the features you find in an RDBMS, such as typed columns, secondary indexes, triggers, and advanced query languages, etc.


(Table, Rowkey, Family, column, Timestamp) a Value
Think of tags, values are of any length, no predefined names/types or widths, and column names are carry information about just like tags.
 (Table, Rowkey, Family, column, Timestamp) a Value

(Table, Rowkey, Family, column, Timestamp) a Value

HMaster is the implementation of the Master Server. The Master server is responsible for monitoring all RegionServer instances in the cluster, and is the interface for all metadata changes. In a distributed cluster, the Master typically runs on  “NameNode”. If run in a multi-Master environment, all Masters compete to run the cluster. If the active Master loses its lease in ZooKeeper (or the Master shuts down), then then the remaining Masters jostle to take over the Master role.
RegionServer is the RegionServer implementation. It is responsible for serving and managing regions. In a distributed cluster, a RegionServer runs on a  “DataNode” Regions are the basic element of availability and distribution for tables, and are comprised of a Store per Column Family. The hierarchy of objects is as follows:
     ·  Write Ahead Log (WAL):
Each RegionServer adds updates (Puts, Deletes) to its write-ahead log (WAL) first, and then to the  “MemStore” for the affected  “Store”. This ensures that HBase has durable writes. Without WAL, there is the possibility of data loss in the case of a RegionServer failure before each MemStore is flushed and new StoreFiles are written. HLog is the HBase WAL implementation, and there is one HLog instance per RegionServer. The WAL is in HDFS in /hbase/.logs/ with subdirectories per region.

HBase includes several methods of loading data into tables. The most straightforward method is to either use the TableOutputFormat class from a MapReduce job, or use the normal client APIs; however, these are not always the most efficient methods. The bulk load feature uses a MapReduce job to output table data in HBase's internal data format, and then directly loads the generated StoreFiles into a running cluster. Using bulk load will use less CPU and network resources than simply using the HBase API.

The HBase bulk load process consists of two main steps.
1.      Preparing data via a MapReduce job
The first step of a bulk load is to generate HBase data files (StoreFiles) from a MapReduce job using HFileOutputFormat. This output format writes out data in HBase's internal storage format so that they can be later loaded very efficiently into the cluster.
In order to function efficiently, HFileOutputFormat must be configured such that each output HFile fits within a single region. In order to do this, jobs whose output will be bulk loaded into HBase use Hadoop's TotalOrderPartitioner class to partition the map output into disjoint ranges of the key space, corresponding to the key ranges of the regions in the table.

After the data has been prepared using HFileOutputFormat, it is loaded into the cluster using completebulkload. This command line tool iterates through the prepared data files, and for each one determines the region the file belongs to. It then contacts the appropriate Region Server which adopts the HFile, moving it into its storage directory and making the data available to clients.

Use Apache HBase when you need random, real-time read/write access to your Big Data. This project's goal is the hosting of very large tables -- billions of rows X millions of columns -- atop clusters of commodity hardware. Apache HBase is an open-source, distributed, versioned, column-oriented store modelled after Google's Bigtable: A Distributed Storage System for Structured Data by Chang et al. Just as Bigtable leverages the distributed data storage provided by the Google File System, Apache HBase provides Bigtable-like capabilities on top of Hadoop and HDFS.

HDFS is a distributed file system that is well suited for the storage of large files. It's documentation states that it is not, however, a general purpose file system, and does not provide fast individual record lookups in files. HBase, on the other hand, is built on top of HDFS and provides fast record lookups (and updates) for large tables. This can sometimes be a point of conceptual confusion. HBase internally puts your data in indexed "StoreFiles" that exist on HDFS for high-speed lookups. See the Chapter 5, Data Model and the rest of this chapter for more information on how HBase achieves its goals.


Hadoop : Hive In Action

Hive: A Simple Introduction

Hive is built on Hadoop and uses Hadoop’s storage(HDFS) and execution(MapReduce) modules. In terms of storage Hive can use any file system supported by Hadoop, although HDFS is by far the most common. If you’re not going to be using HDFS you probably want a more in-depth introduction than what I’m going to give here. Hive Queries are translated to a graph of Hadoop MapReduce jobs that get executed on your Hadoop grid. As you can see Pig and Hive use the same methods of execution abstraction. The big difference comes in the language used to access the data.
Figure 1 : Hive Structure diagram.

Hive Query Language (HQL) is based on SQL, and you’ll find many of the familiar constructs such as SHOW, DESCRIBE, SELECT, USE and JOIN. You can find the full definition of the language here: Hive Lanuage Manual. Similar to an RDBMS in Hive you will find Databases that contain one or more Tables that contain some Data defined by a Schema.
Hive also supports User Defined Functions (UDFs) and Serialization/Deserialization functions (SerDe’s). UDFs allow programmers to write functions to abstract common tasks in Hive. It also allows you to seamlessly connect a Query Language in Hive with functional, procedural or scripting languages. SerDe’s allow for the management of arbitrarily structured or unstructured data in Hive. One particularly useful and popular SerDe is the now built-in one for Avro.
Hive uses a metadata store to keep the data warehouse information that links Hive and the raw data. Data loaded into Hive can be maintained in an external location or a copy made into the Hive warehouse. In this way you can create a self-contained data warehouse, or share the infrastructure with MapReduce and Pig programmer.
Out of the box Hive uses a Derby database for the metadata store. For scalability and performance you’ll want to switch to use an RDBMS such as MySQL as your Hive system matures.

Getting Started

The first thing you’ll need to do is install and get Hive up and running. The Getting Started Guide is your best resource for this.
Once Hive is installed you’ll want to create and use a database:

1.      Create Database

hive> CREATE DATABASE demo;
hive> use demo;
Next create a table with a schema:

2.      Create Table

hive> CREATE TABLE demoTable (foo INT, bar STRING);
hive> describe demoTable;
Now let’s load some data from HDFS into Hive. Assuming there’s an input file where each row has an integer and a string:

3.      Load data to table

hive> LOAD DATA LOCAL INPATH ‘./resources/input.txt’ OWERWRITE INTO TABLE demoTable;

4.      get data from table

hive> SELECT * FROM demoTable;

5.      Create Table : reference  an external data-store

Note that when data is LOAD’ed like we do above the data is copied and moved into Hive’s warehouse. This is all well and good if you want your Hive queries to be self-contained and not affect the rest of the Hadoop analysts who are using MapReduce and/or Pig. Dropping the table from Hive will not affect the original data.
I find one of the most powerful features of the Hadoop eco-system is allowing programmers to access the same data using different interfaces. If you want Hive to simply reference an external datastore you can create your tables in the following way:

hive> CREATE EXTERNAL TABLE demoTable (foo INT, bar STRING) ROW FORMAT STORED AS TEXTFILE LOCATION ‘./resource/input.txt’;

This links your external file to the Hive meta-store. Manipulations on the data in Hive will be reflected in the shared data, and changes in the data will be reflected in Hive. If you drop the table only the Hive meta-store is affected.

Pretty quickly you’ll get into the situation where you want to define the structure of the data and maintain that schema separately. This is where Avro comes in. Let’s load some data using Avro instead of the default SequenceFile:

Create an Avro schema and save it to the file system (the namespace and name are not important):
{
“namespace”: “org.demo.schema”,
“name”: “demo_schema”,
“type”: “record”,
“fields”: [
   { "name": "foo", "type": "int" },
   { "name": "bar", "type": "string" }
] }
hive> CREATE TABLE demoTableAvro PARTITIONED BY (ds string)
ROW FORMAT SERDE ‘org.apache.hadoop.hive.serde2.avro.AvroSerDe’
WITH TBLPROPERTIES(‘avro.schema.url’='file:///path/to/schema/demoSchema.avsc’ STORED AS
INPUTFORMAT ‘org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat’
OUTPUTFORMAT ‘org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat’;

Using any of these three approaches you can create tables and load data into it. Now programmers can write queries against this data warehouse similar to their existing workflow and experience.

Conclusion

Hive is a great way to share your Hadoop infrastructure with programmers who are the most comfortable with Query Languages. By combining MapReduce, Pig and Hive you can create a full-featured data warehouse and analytics system. Next up you’ll want to look at workflow automation tools and ways to store and serve your results. Look for this in upcoming posts.

Introduction to MapReduce : Hadoop Programming Component

MapReduce: A Simple Introduction


MapReduce is a framework for processing parallel problems across the cluster (Cluster is the large network of synchronized nodes connected together) or a grid (if the nodes are shared across geographically and administratively distributed systems, and use more heterogeneous hardware). Computational processing can occur on data stored either in a file system (unstructured) for example HDFS (Hadoop Distributed File System) or in a database (structured) for example any RDBMS. MapReduce can take advantage of locality of data, processing data on or near the storage assets to decrease transmission of data.
Figure 1 : MapReduce Flow Structure.

MapReduce: Logical view

The Map and Reduce functions of MapReduce are both defined with respect to data structured in (key, value) pairs. Map takes one pair of data with a type in one data domain, and returns a list of pairs in a different domain:
Map (k1,v1) → list(k2,v2)
The Map function is applied in parallel to every pair in the input dataset. This produces a list of pairs for each call. After that, the MapReduce framework collects all pairs with the same key from all lists and groups them together, creating one group for each key.
The Reduce function is then applied in parallel to each group, which in turn produces a collection of values in the same domain:
Reduce(k2, list (v2)) → list(v3)

                          As an example of the utility of map: Suppose you had a function toUpper(str) which returns an uppercase version of its input string. You could use this function with map to turn a list of strings into a list of uppercase strings. Note that we are not modifying the input string here: we are returning a new string that will form part of a new output list.
Each Reduce call typically produces either one value v3 or an empty return, though one call is allowed to return more than one value. The returns of all calls are collected as the desired result list. Thus the MapReduce framework transforms a list of (key, value) pairs into a list of values. This behaviour is different from the typical functional programming map and reduces combination, which accepts a list of arbitrary values and returns one single value that combines all the values returned by map.


Figure 3: MapReduce Key-Value pair example.

Dataflow

The frozen part of the MapReduce framework is a large distributed sort. The hot spots, which the application defines, are:
  1. an input reader
  2. a Map function
  3. a partition function
  4. a compare function
  5. a Reduce function
  6. an output writer

1.      Input reader

The input reader divides the input into appropriate size 'splits' (in practice typically 16 MB to 128 MB) and the framework assigns one split to each Map function. The input reader reads data from stable storage (typically a distributed file system) and generates key/value pairs.
A common example will read a directory full of text files and return each line as a record.

2.      Map function

The Map function takes a series of key/value pairs, processes each, and generates zero or more output key/value pairs. The input and output types of the map can be (and often are) different from each other.
If the application is doing a word count, the map function would break the line into words and output a key/value pair for each word. Each output pair would contain the word as the key and the number of instances of that word in the line as the value.

3.      Partition function

Each Map function output is allocated to a particular reducer by the application's partition function for shredding purposes. The partition function is given the key and the number of reducers and returns the index of the desired reduces.
A typical default is to hash the key and use the hash value modulo the number of reducers. It is important to pick a partition function that gives an approximately uniform distribution of data per shard for load-balancing purposes, otherwise the MapReduce operation can be held up waiting for slow reducers (reducers assigned more than their share of data) to finish.
Between the map and reduce stages, the data is shuffled (parallel-sorted / exchanged between nodes) in order to move the data from the map node that produced it to the shard in which it will be reduced. The shuffle can sometimes take longer than the computation time depending on network bandwidth, CPU speeds, data produced and time taken by map and reduce computations.

4.      Comparison function

The input for each Reduce is pulled from the machine where the Map ran and sorted using the application's comparison function

5.      Reduce function

The framework calls the application's Reduce function once for each unique key in the sorted order. The Reduce can iterate through the values that are associated with that key and produce zero or more outputs.
In the word count example, the Reduce function takes the input values, sums them and generates a single output of the word and the final sum.

6.      Output writer

The Output Writer writes the output of the Reduce to the stable storage, usually a distributed file system.

Example 1: MapReduce All Phases





                                                   Figure 4: MapReduce All Phases.

Example 2: The Programming View

The prototypical MapReduce example counts the appearance of each word in a set of documents
The Mapper:
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                     String line = value.toString();
                     StringTokenizer tokenizer = new StringTokenizer(line);
                     while (tokenizer.hasMoreTokens()) {
                           word.set(tokenizer.nextToken());
                           context.write(word, one);
                     }
              }

The Reducer:
public void reduce(Text key, Iterable<IntWritable> values, Context context)
              throws IOException, InterruptedException {
                     int sum = 0;
                     for (IntWritable val : values) {
                           sum += val.get();
                     }
                     context.write(key, new IntWritable(sum));
}

Here, each document is split into words, and each word is counted by the map function, using the word as the result key. The framework puts together all the pairs with the same key and feeds them to the same call to reduce, thus this function just needs to sum all of its input values to find the total appearances of that word.

Uses:

MapReduce is useful in a wide range of applications, including distributed pattern-based searching, distributed sorting, web link-graph reversal, term-vector per host, web access log stats, inverted index construction, document clustering, machine learning, and statistical machine translation. Moreover, the MapReduce model has been adapted to several computing environments like multi-core and many-core systems, desktop grids, volunteer computing environments, dynamic cloud environments, and mobile environments.

Limitations

1.         For maximum parallelism, you need the Maps and Reduces to be stateless, to not depend on any data generated in the same MapReduce job. You cannot control the order in which the maps run, or the reductions.
2.         It is very inefficient if you are repeating similar searches again and again. A database with an index will always be faster than running an MR job over unindexed data. However, if that index needs to be regenerated whenever data is added, and data is being added continually, MR jobs may have an edge. That inefficiency can be measured in both CPU time and power consumed.
3.         In the Hadoop implementation Reduce operations do not take place until all the Maps are complete (or have failed and been skipped). As a result, you do not get any data back until the entire mapping has finished.
4.         There is a general assumption that the output of the reduce is smaller than the input to the Map. That is, you are taking a large datasource and generating smaller final values.


Followers