Wednesday, January 26, 2011

Distributing Hadoop

As mentioned in the previous article Hadoop Basics the value of hadoop is in running it distributed in many machines.

In this article I will introduce the how-to configure Hadoop for distributed processing.
I’ll show how to do it with just to machines, but it will be the same for more as one of the main values of hadoop is the ability to scale easily.

1. Ok, so we download hadoop 0.21.0 from here http://mirror.lividpenguin.com/pub/apache//hadoop/core/hadoop-0.21.0/hadoop-0.21.0.tar.gz
in both machines. uncompress the file.

2. We have two inndependent Hadoops right now, but we want them to run in cluster. So we have to make some configuration.
Hadoop distributed works with 5 different daemons that communicate with each other. The daemons are:

NameNode: Is the main controller of the HDFS, it takes care of how the files are broken into blocks, which nodes contain each block and the general tracking of the distributed filesystem.

DataNode: This daemon serves the HDFS requirements of individual slave nodes communicating and coordinating with the NameNode.

Secondary NameNode: Takes snapshots of the NameNode for possible recoveries.

JobTracker: Is in charge of coordinating the task submissions to different nodes.

TaskTracker: Existent in each processing node, they are in charge of executing the tasks submited by the JobTracker, communicating with it constantly.

All communication between the hadoop is done through ssh. We will designate a Master Node (which will contain the NameNode and JobTracker) and two slave nodes. The master node must be able to communicate with the slave nodes through ssh using the same username. (I’m using my username cscarioni communicating without passphrase using private/public key authentication)

So as we are using two machines our architecture will be like this:
Machine 1 (Master) Machine 2 (Slave)
NameNode
JobTracker
Secondary NameNode
TaskTracker
DataNode

TaskTracker
DataNode


We go to our Master installation of hadoop, and enter the conf directory.

In the core-site.xml we specify the NameNode information. we put the following.

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://master-hadoop:9000</value>
</property>
</configuration>


In the mapred-site.xml we specify where the job tracker daemon is:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>mapred.job.tracker</name>
<value>master-hadoop:9001</value>
</property>
</configuration>
In the hdfs-site.xml we specify the replication of the cluster. In our case 2:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
</configuration>

The masters and slaves files as they name says contains the names of the masters and slaves nodes. We have to modify them to include our master and slave nodes. (I defined in the hosts file of both machines the following host names.)

So in the masters we put

hadoop-master


And in the slaves we put

master-hadoop
carlo-netbook

we change now the hadoop-env.sh, uncommenting the JAVA_HOME line and point it to our JAVA_HOME.


Ok, these are all the files we need, we now distribute (copy) these files to both machines.


We go now to the bin node on the master node and execute ./hadoop namenode -format, to format the hdfs.

We execute now in the same directory: ./start-all.sh.

That’s it, we ran Hadoop. We now need to put some files in the HDFS and submit a map reduce task to it.

For this example i’ll use a custom made file that in each line has the word God or the Word Devil. I created the file with the following Groovy script

def a  = new File("/tmp/biblia.txt")
random = new Random()
a.withWriter{
    for (i in (0..5000000)){
        if(random.nextInt(2)){
            it << "GOD\n"
            }else{
            it << "Devil\n"
        }
    }
}









from the master’s hadoop bin directory, copy the file from the file system into hdfs with:

./hadoop fs -put /home/cscarioni/downloads/bible.txt bible.txt

to see that the file has been created do:

./hadoop fs -ls

I get the follwoing output

-rw-r--r-- 2 cscarioni supergroup 4445256 2011-01-24 18:25 /user/cscarioni/bible.txt

Now we create our MapReduce program (It just counts how many times the words GOD and Devil are in the file):



import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class GodVsDevils
{
    public static class WordMapper extends Mapper<LongWritable, Text, Text, LongWritable>
    {
        private LongWritable word = new LongWritable();
        private Text theKey = new Text();
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
        {
            String who =value.toString();
            word.set(1);
            if(who.equals("GOD"))
            {
                theKey.set("God");
                context.write(theKey, word);
            }
            else if(who.equals("Devil"))
            {
                theKey.set("Devil");
                context.write(theKey, word);
            }
        }
    }
    public static class AllTranslationsReducer
    extends Reducer<Text,LongWritable,Text,LongWritable>
    {
        private LongWritable result = new LongWritable();
        public void reduce(Text key, Iterable<;LongWritable>; values,
        Context context
        ) throws IOException, InterruptedException
        {
            long count = 0;
            for (LongWritable val : values)
            {
                count += val.get();
            }
            result.set(count);
            context.write(key, result);
        }
    }
    public static void main(String[] args) throws Exception
    {
        Configuration conf = new Configuration();
        Job job = new Job(conf,"GodDevils");
        job.setJarByClass(GodVsDevils.class);
        job.setMapperClass(WordMapper.class);
        job.setReducerClass(AllTranslationsReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        FileInputFormat.addInputPath(job, new Path("/user/cscarioni"));
        FileOutputFormat.setOutputPath(job, new Path("output"));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}






We compile it , jar it and then execute the following in the master node:

./hadoop jar god.jar GodVsDevils -fs master-hadoop:9000 -jt master-hadoop:9001

This will run our map reduce in the hadoop cluster.

No comments: