The hadoop cluster is made up of a resource manager, name node and 6 data nodes. The cluster has the following hardware resources available:
Node Type | Total Available | RAM | Storage |
---|---|---|---|
Resource Manager | 1 | n/a | n/a |
Name Node | 1 | n/a | n/a |
Data Node | 6 | 128GB | 3 x 400GB SSD |
The HDFS file system comprises 7.71TB of total storage. All nodes are connected over an FDR infiniband network for high speed internode communication.
The hadoop cluster runs the Hortonworks hadoop software packages. Hortonworks provides a reliable base of software that can be used for many data analytics purposes.
The cluster provides traditional map-reduce in addition to spark, hive and flume. The cluster uses the YARN resource scheduler.
Users connect to the cluster using standard ssh and authenticate using Midas username and password.
$ ssh user@namenode.hpc.odu.edu
User may only connect to hadoop on campus network, via VPN or from Turing or Wahab.
To view data in the HDFS file system use the executable hdfs dfs
followed by a command. For example, in order to list the current directory use: hdfs dfs -ls
$ hdfs dfs -ls
Found 32 items
drwx------ - user hdfs 0 2016-04-18 08:00 .Trash
drwx------ - user hdfs 0 2016-05-03 14:16 .staging
drwxr-xr-x - user hdfs 0 2016-05-23 14:58 tweets
Here is a list of commands that can be used in HDFS:
Command | Function |
---|---|
hdfs dfs -mkdir <directory name> |
creates a directory in HDFS |
hdfs dfs -cat <filename> |
lists the contents of the specified file |
hdfs dfs -put <filename> |
copy the specified file to HDFS |
hdfs dfs -get <filename> |
copy the specified file from HDFS to the local directory |
hdfs dfs -cp <source filename> <destination filename> |
copy the source file to the destination file in HDFS |
Below is an example of a wordcount that uses the map-reduce functionality on the hadoop cluster:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
A mapreduce program has several stages:
In the example above is a mapper function called TokenizerMapper. It takes the input data and maps each word to a key, value pair. The key, value pairs are then combined a sorted and the reducer counts the matching words in the IntSumReducer function.
$ hdfs dfs -put file01
$ hadoop com.sun.tools.javac.Main WordCount.java
$ jar cf wc.jar WordCount*.class
/user/<username>
.$ hadoop jar wc.jar WordCount file01 wcout
WARNING: Use "yarn jar" to launch YARN applications.
16/05/24 11:52:55 INFO impl.TimelineClientImpl: Timeline service address: http://rm.ib.cluster:8188/ws/v1/timeline/
16/05/24 11:52:55 INFO client.RMProxy: Connecting to ResourceManager at rm.ib.cluster/172.25.30.157:8050
16/05/24 11:52:56 INFO hdfs.DFSClient: Created HDFS_DELEGATION_TOKEN token 930 for user on 172.25.30.156:8020
16/05/24 11:52:56 INFO security.TokenCache: Got dt for hdfs://namenode.ib.cluster:8020; Kind: HDFS_DELEGATION_TOKEN, Service: 172.25.30.156:8020, Ident: (HDFS_DELEGATION_TOKEN token 930 for user)
16/05/24 11:52:56 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
16/05/24 11:52:56 INFO input.FileInputFormat: Total input paths to process : 1
16/05/24 11:52:56 INFO mapreduce.JobSubmitter: number of splits:1
16/05/24 11:52:56 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1461083135942_0013
16/05/24 11:52:56 INFO mapreduce.JobSubmitter: Kind: HDFS_DELEGATION_TOKEN, Service: 172.25.30.156:8020, Ident: (HDFS_DELEGATION_TOKEN token 930 for user)
16/05/24 11:52:57 INFO impl.YarnClientImpl: Submitted application application_1461083135942_0013
16/05/24 11:52:57 INFO mapreduce.Job: The url to track the job: http://rm.ib.cluster:8088/proxy/application_1461083135942_0013/
16/05/24 11:52:57 INFO mapreduce.Job: Running job: job_1461083135942_0013
16/05/24 11:53:04 INFO mapreduce.Job: Job job_1461083135942_0013 running in uber mode : false
16/05/24 11:53:04 INFO mapreduce.Job: map 0% reduce 0%
16/05/24 11:53:11 INFO mapreduce.Job: map 100% reduce 0%
16/05/24 11:53:16 INFO mapreduce.Job: map 100% reduce 100%
16/05/24 11:53:16 INFO mapreduce.Job: Job job_1461083135942_0013 completed successfully
16/05/24 11:53:16 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=40
FILE: Number of bytes written=272941
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=137
HDFS: Number of bytes written=22
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Rack-local map tasks=1
Total time spent by all maps in occupied slots (ms)=5045
Total time spent by all reduces in occupied slots (ms)=4596
Total time spent by all map tasks (ms)=5045
Total time spent by all reduce tasks (ms)=2298
Total vcore-seconds taken by all map tasks=5045
Total vcore-seconds taken by all reduce tasks=2298
Total megabyte-seconds taken by all map tasks=67159040
Total megabyte-seconds taken by all reduce tasks=61181952
Map-Reduce Framework
Map input records=1
Map output records=4
Map output bytes=38
Map output materialized bytes=40
Input split bytes=115
Combine input records=4
Combine output records=3
Reduce input groups=3
Reduce shuffle bytes=40
Reduce input records=3
Reduce output records=3
Spilled Records=6
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=110
CPU time spent (ms)=2800
Physical memory (bytes) snapshot=2827038720
Virtual memory (bytes) snapshot=38793658368
Total committed heap usage (bytes)=3375366144
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=22
File Output Format Counters
Bytes Written=22
The output of the job is located in wcout:$ hdfs dfs -cat wcout/*
Bye 1
Hello 1
World 2