Recording what happened while getting a hadoop version running and testing the first java, python and bash scripts as hadoop standalone and streaming runs.
Installing & running Hadoop in OSX 10.6.8
Downloaded hadoop 2.7.1 binary. I have JAVA 1.6.0_65. when running the basic first test I get error :
$bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.1.jar grep input test/output 'dfs[a-z.]+'
Exception in thread "main" java.lang.UnsupportedClassVersionError: org/apache/hadoop/util/RunJar : Unsupported major.minor version 51.0
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClassCond(ClassLoader.java:637)
at java.lang.ClassLoader.defineClass(ClassLoader.java:621)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:283)
at java.net.URLClassLoader.access$000(URLClassLoader.java:58)
at java.net.URLClassLoader$1.run(URLClassLoader.java:197)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
Latest (as of Thu Nov. 12 2015) JDK jdk-8u65-nb-8_1-macosx-x64.dmg requires OSX 8 or higher. I downloaded JDK 7_u80, but it requires OS X 10.7.3 or higher
Downloaded hadoop 2.5.2 binary, installed and ...it worked. At least so far the basic test
bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.2.jar grep test/input test/output 'dfs[a-z.]+'
Downloaded hadoop 2.6.2 binary, installed and it worked as well -again, so far tested just the basic test above This is the latest version of the 2.6 branch released Oct 2015. The 2.7.1 version was released on Jul 2015. This is the version used in the examples described here.
HCN: NOOA's US historical climate network data.
This will be the reference to compare to simply because it's the fastest. Hadoop as standalone can't beat this.
$time ./max_temp_hcn.sh > hcn-max
real 0m48.068s
user 0m35.809s
sys 0m8.038s
$sort -n hcn-max | head
1853 348 USC00381310185308TMAX 348
1868 161 USC00144559186810TMAX 161
1869 306 USC00144559186908TMAX 306
1874 256 USW00013724187406TMAX 256
1875 306 USW00013724187507TMAX 306
1876 294 USW00013724187607TMAX 294
1876 306 USW00094728187609TMAX 306
1877
1877
1877 289 USW00013724187709TMAX 289
Getting now the max per year for all stations: ./max_temp_hcn-all.sh
#!/bin/bash
cat input/data/NCDC/ghcnd_hcn/*.dly| awk /TMAX/'{ year=substr($0,12,4); temp=substr($0,22,5)+0;
q=substr($0,28,1);
if( temp!=9999 && q ~ /[[:space:]]/ && temp > max[year]) { max[year] = temp ; l[year]=substr($0,1,28)} }
END {for( yr in max) print yr,max[yr],l[yr]}'
$time ./max_temp_hcn-all.sh > hcn-max-all
real 0m29.214s
user 0m27.698s
sys 0m3.862s
$hadoop MaxTemperature input/data/NCDC/ghcnd_hcn/USC00011084.dly output_hcn
Exception in thread "main" java.lang.NoClassDefFoundError: MaxTemperature
Caused by: java.lang.ClassNotFoundException: MaxTemperature
at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
Needed to compile: See example HelloWorld
$cat HelloWorld.java
public class HelloWorld {
public static void main(String args[]) {
System.out.println("Hello World!");
}
}
$java -classpath . HelloWorld.java
$hadoop HelloWorld . out
Hello World!
So let's compile MaxTemperature and proceed:
msantos@MBP-2[19:49]:~/System/HADOOP/current/test/classes$javac -classpath `hadoop classpath` MaxTemperature.java
Note: MaxTemperature.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
msantos@MBP-2[19:53]:~/System/HADOOP/current/test$hadoop MaxTemperature input/data/NCDC/ghcnd_hcn/USC00011084.dly output_hcn
15/11/12 19:53:15 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/11/12 19:53:15 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
15/11/12 19:53:15 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
15/11/12 19:53:16 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
15/11/12 19:53:16 WARN mapreduce.JobResourceUploader: No job jar file set. User classes may not be found. See Job or Job#setJar(String).
15/11/12 19:53:16 INFO input.FileInputFormat: Total input paths to process : 1
15/11/12 19:53:16 INFO mapreduce.JobSubmitter: number of splits:1
15/11/12 19:53:16 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1442023825_0001
15/11/12 19:53:17 INFO mapred.LocalJobRunner: OutputCommitter set in config null
15/11/12 19:53:17 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
15/11/12 19:53:17 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
15/11/12 19:53:17 INFO mapreduce.Job: Running job: job_local1442023825_0001
15/11/12 19:53:17 INFO mapred.LocalJobRunner: Starting task: attempt_local1442023825_0001_m_000000_0
15/11/12 19:53:17 INFO mapred.LocalJobRunner: Waiting for map tasks
15/11/12 19:53:17 INFO util.ProcfsBasedProcessTree: ProcfsBasedProcessTree currently is supported only on Linux.
15/11/12 19:53:17 INFO mapred.Task: Using ResourceCalculatorProcessTree : null
15/11/12 19:53:17 INFO mapred.MapTask: Processing split: file:/Users/msantos/System/HADOOP/hadoop-2.6.2/test/input/data/NCDC/ghcnd_hcn/USC00011084.dly:0+1700190
15/11/12 19:53:17 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
15/11/12 19:53:17 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
15/11/12 19:53:17 INFO mapred.MapTask: soft limit at 83886080
15/11/12 19:53:17 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
15/11/12 19:53:17 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
15/11/12 19:53:17 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
15/11/12 19:53:18 INFO mapred.LocalJobRunner:
15/11/12 19:53:18 INFO mapred.MapTask: Starting flush of map output
15/11/12 19:53:18 INFO mapred.Task: Task:attempt_local1442023825_0001_m_000000_0 is done. And is in the process of committing
15/11/12 19:53:18 INFO mapred.LocalJobRunner: map
15/11/12 19:53:18 INFO mapred.Task: Task 'attempt_local1442023825_0001_m_000000_0' done.
15/11/12 19:53:18 INFO mapred.LocalJobRunner: Finishing task: attempt_local1442023825_0001_m_000000_0
15/11/12 19:53:18 INFO mapred.LocalJobRunner: map task executor complete.
15/11/12 19:53:18 INFO mapred.LocalJobRunner: Waiting for reduce tasks
15/11/12 19:53:18 INFO mapred.LocalJobRunner: Starting task: attempt_local1442023825_0001_r_000000_0
15/11/12 19:53:18 INFO util.ProcfsBasedProcessTree: ProcfsBasedProcessTree currently is supported only on Linux.
15/11/12 19:53:18 INFO mapred.Task: Using ResourceCalculatorProcessTree : null
15/11/12 19:53:18 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@669a4cb
15/11/12 19:53:18 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=372781856, maxSingleShuffleLimit=93195464, mergeThreshold=246036032, ioSortFactor=10, memToMemMergeOutputsThreshold=10
15/11/12 19:53:18 INFO reduce.EventFetcher: attempt_local1442023825_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
15/11/12 19:53:18 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local1442023825_0001_m_000000_0 decomp: 2 len: 6 to MEMORY
15/11/12 19:53:18 INFO reduce.InMemoryMapOutput: Read 2 bytes from map-output for attempt_local1442023825_0001_m_000000_0
15/11/12 19:53:18 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 2, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->2
15/11/12 19:53:18 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning
15/11/12 19:53:18 INFO mapred.LocalJobRunner: 1 / 1 copied.
15/11/12 19:53:18 INFO reduce.MergeManagerImpl: finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
15/11/12 19:53:18 INFO mapred.Merger: Merging 1 sorted segments
15/11/12 19:53:18 INFO mapred.Merger: Down to the last merge-pass, with 0 segments left of total size: 0 bytes
15/11/12 19:53:18 INFO reduce.MergeManagerImpl: Merged 1 segments, 2 bytes to disk to satisfy reduce memory limit
15/11/12 19:53:18 INFO reduce.MergeManagerImpl: Merging 1 files, 6 bytes from disk
15/11/12 19:53:18 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce
15/11/12 19:53:18 INFO mapred.Merger: Merging 1 sorted segments
15/11/12 19:53:18 INFO mapred.Merger: Down to the last merge-pass, with 0 segments left of total size: 0 bytes
15/11/12 19:53:18 INFO mapred.LocalJobRunner: 1 / 1 copied.
15/11/12 19:53:18 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
15/11/12 19:53:18 INFO mapred.Task: Task:attempt_local1442023825_0001_r_000000_0 is done. And is in the process of committing
15/11/12 19:53:18 INFO mapred.LocalJobRunner: 1 / 1 copied.
15/11/12 19:53:18 INFO mapred.Task: Task attempt_local1442023825_0001_r_000000_0 is allowed to commit now
15/11/12 19:53:18 INFO mapreduce.Job: Job job_local1442023825_0001 running in uber mode : false
15/11/12 19:53:18 INFO mapreduce.Job: map 100% reduce 0%
15/11/12 19:53:18 INFO output.FileOutputCommitter: Saved output of task 'attempt_local1442023825_0001_r_000000_0' to file:/Users/msantos/System/HADOOP/hadoop-2.6.2/test/output_hcn/_temporary/0/task_local1442023825_0001_r_000000
15/11/12 19:53:18 INFO mapred.LocalJobRunner: reduce > reduce
15/11/12 19:53:18 INFO mapred.Task: Task 'attempt_local1442023825_0001_r_000000_0' done.
15/11/12 19:53:18 INFO mapred.LocalJobRunner: Finishing task: attempt_local1442023825_0001_r_000000_0
15/11/12 19:53:18 INFO mapred.LocalJobRunner: reduce task executor complete.
15/11/12 19:53:19 INFO mapreduce.Job: map 100% reduce 100%
15/11/12 19:53:19 INFO mapreduce.Job: Job job_local1442023825_0001 completed successfully
15/11/12 19:53:19 INFO mapreduce.Job: Counters: 30
File System Counters
FILE: Number of bytes read=3400854
FILE: Number of bytes written=508728
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
Map-Reduce Framework
Map input records=6297
Map output records=0
Map output bytes=0
Map output materialized bytes=6
Input split bytes=158
Combine input records=0
Combine output records=0
Reduce input groups=0
Reduce shuffle bytes=6
Reduce input records=0
Reduce output records=0
Spilled Records=0
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=11
Total committed heap usage (bytes)=395960320
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=1700190
File Output Format Counters
Bytes Written=8
msantos@MBP-2[19:53]:~/System/HADOOP/current/test$
However there is NO OUTPUT from the reduce task: "Reduce output records=0". Something is wrong with the java code obvously. What is it really doing? Surely it's not reading the records correctly, i.e., not finding the temperature and TMAX strings. Debugging goal: Lets just read a couple of strings from each record and have the reducer task just print this out. No processing. This changes the type of the map & reduce classes. Their template have 4 types. My guess is first 2 are for their respective input, and the other 2 are for their outputs. However, it turns out this is not enough for hadoop to be able to run it. The java compilation works but hadoop API complains that map is passing an unexpected type: The main interface must declare the right type for the reducer as well!! Here the error message:
15/11/13 12:18:23 WARN mapred.LocalJobRunner: job_local858928967_0001
java.lang.Exception: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.IntWritable, received org.apache.hadoop.io.Text
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.io.IOException: Type mismatch in value from map: expected org.apache.hadoop.io.IntWritable, received org.apache.hadoop.io.Text
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1074)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:712)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
at MaxTemperatureMapper.map(MaxTemperatureMapper.java:33)
Once corrected, compiled and run we get:
msantos@MBP-2[12:21]:~/System/HADOOP/current/test$!hadoop
hadoop MaxTemperature input/data/NCDC/ghcnd_hcn/USC00011084.dly output_hcn
15/11/13 12:21:46 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/11/13 12:21:46 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
15/11/13 12:21:46 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
15/11/13 12:21:47 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
15/11/13 12:21:47 WARN mapreduce.JobResourceUploader: No job jar file set. User classes may not be found. See Job or Job#setJar(String).
15/11/13 12:21:47 INFO input.FileInputFormat: Total input paths to process : 1
15/11/13 12:21:47 INFO mapreduce.JobSubmitter: number of splits:1
15/11/13 12:21:48 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1636203858_0001
15/11/13 12:21:48 INFO mapred.LocalJobRunner: OutputCommitter set in config null
15/11/13 12:21:48 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
15/11/13 12:21:48 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
15/11/13 12:21:48 INFO mapreduce.Job: Running job: job_local1636203858_0001
15/11/13 12:21:48 INFO mapred.LocalJobRunner: Starting task: attempt_local1636203858_0001_m_000000_0
15/11/13 12:21:48 INFO mapred.LocalJobRunner: Waiting for map tasks
15/11/13 12:21:49 INFO util.ProcfsBasedProcessTree: ProcfsBasedProcessTree currently is supported only on Linux.
15/11/13 12:21:49 INFO mapred.Task: Using ResourceCalculatorProcessTree : null
15/11/13 12:21:49 INFO mapred.MapTask: Processing split: file:/Users/msantos/System/HADOOP/hadoop-2.6.2/test/input/data/NCDC/ghcnd_hcn/USC00011084.dly:0+1700190
15/11/13 12:21:49 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
15/11/13 12:21:49 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
15/11/13 12:21:49 INFO mapred.MapTask: soft limit at 83886080
15/11/13 12:21:49 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
15/11/13 12:21:49 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
15/11/13 12:21:49 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
15/11/13 12:21:49 INFO mapreduce.Job: Job job_local1636203858_0001 running in uber mode : false
15/11/13 12:21:49 INFO mapreduce.Job: map 0% reduce 0%
15/11/13 12:21:49 INFO mapred.LocalJobRunner:
15/11/13 12:21:50 INFO mapred.MapTask: Starting flush of map output
15/11/13 12:21:50 INFO mapred.MapTask: Spilling map output
15/11/13 12:21:50 INFO mapred.MapTask: bufstart = 0; bufend = 69267; bufvoid = 104857600
15/11/13 12:21:50 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 26189212(104756848); length = 25185/6553600
15/11/13 12:21:50 INFO mapred.MapTask: Finished spill 0
15/11/13 12:21:50 INFO mapred.Task: Task:attempt_local1636203858_0001_m_000000_0 is done. And is in the process of committing
15/11/13 12:21:50 INFO mapred.LocalJobRunner: map
15/11/13 12:21:50 INFO mapred.Task: Task 'attempt_local1636203858_0001_m_000000_0' done.
15/11/13 12:21:50 INFO mapred.LocalJobRunner: Finishing task: attempt_local1636203858_0001_m_000000_0
15/11/13 12:21:50 INFO mapred.LocalJobRunner: map task executor complete.
15/11/13 12:21:50 INFO mapred.LocalJobRunner: Waiting for reduce tasks
15/11/13 12:21:50 INFO mapred.LocalJobRunner: Starting task: attempt_local1636203858_0001_r_000000_0
15/11/13 12:21:50 INFO util.ProcfsBasedProcessTree: ProcfsBasedProcessTree currently is supported only on Linux.
15/11/13 12:21:50 INFO mapred.Task: Using ResourceCalculatorProcessTree : null
15/11/13 12:21:50 INFO mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@3e68cd79
15/11/13 12:21:50 INFO reduce.MergeManagerImpl: MergerManager: memoryLimit=372781856, maxSingleShuffleLimit=93195464, mergeThreshold=246036032, ioSortFactor=10, memToMemMergeOutputsThreshold=10
15/11/13 12:21:50 INFO reduce.EventFetcher: attempt_local1636203858_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
15/11/13 12:21:50 INFO reduce.LocalFetcher: localfetcher#1 about to shuffle output of map attempt_local1636203858_0001_m_000000_0 decomp: 81863 len: 81867 to MEMORY
15/11/13 12:21:50 INFO reduce.InMemoryMapOutput: Read 81863 bytes from map-output for attempt_local1636203858_0001_m_000000_0
15/11/13 12:21:50 INFO reduce.MergeManagerImpl: closeInMemoryFile -> map-output of size: 81863, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->81863
15/11/13 12:21:50 INFO reduce.EventFetcher: EventFetcher is interrupted.. Returning
15/11/13 12:21:50 INFO mapred.LocalJobRunner: 1 / 1 copied.
15/11/13 12:21:50 INFO reduce.MergeManagerImpl: finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
15/11/13 12:21:50 INFO mapred.Merger: Merging 1 sorted segments
15/11/13 12:21:50 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 81856 bytes
15/11/13 12:21:50 INFO reduce.MergeManagerImpl: Merged 1 segments, 81863 bytes to disk to satisfy reduce memory limit
15/11/13 12:21:50 INFO reduce.MergeManagerImpl: Merging 1 files, 81867 bytes from disk
15/11/13 12:21:50 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce
15/11/13 12:21:50 INFO mapred.Merger: Merging 1 sorted segments
15/11/13 12:21:50 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 81856 bytes
15/11/13 12:21:50 INFO mapred.LocalJobRunner: 1 / 1 copied.
15/11/13 12:21:50 INFO Configuration.deprecation: mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
15/11/13 12:21:50 INFO mapreduce.Job: map 100% reduce 0%
15/11/13 12:21:51 INFO mapred.Task: Task:attempt_local1636203858_0001_r_000000_0 is done. And is in the process of committing
15/11/13 12:21:51 INFO mapred.LocalJobRunner: 1 / 1 copied.
15/11/13 12:21:51 INFO mapred.Task: Task attempt_local1636203858_0001_r_000000_0 is allowed to commit now
15/11/13 12:21:51 INFO output.FileOutputCommitter: Saved output of task 'attempt_local1636203858_0001_r_000000_0' to file:/Users/msantos/System/HADOOP/hadoop-2.6.2/test/output_hcn/_temporary/0/task_local1636203858_0001_r_000000
15/11/13 12:21:51 INFO mapred.LocalJobRunner: reduce > reduce
15/11/13 12:21:51 INFO mapred.Task: Task 'attempt_local1636203858_0001_r_000000_0' done.
15/11/13 12:21:51 INFO mapred.LocalJobRunner: Finishing task: attempt_local1636203858_0001_r_000000_0
15/11/13 12:21:51 INFO mapred.LocalJobRunner: reduce task executor complete.
15/11/13 12:21:51 INFO mapreduce.Job: map 100% reduce 100%
15/11/13 12:21:51 INFO mapreduce.Job: Job job_local1636203858_0001 completed successfully
15/11/13 12:21:51 INFO mapreduce.Job: Counters: 30
File System Counters
FILE: Number of bytes read=3564576
FILE: Number of bytes written=824090
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
Map-Reduce Framework
Map input records=6297
Map output records=6297
Map output bytes=69267
Map output materialized bytes=81867
Input split bytes=158
Combine input records=0
Combine output records=0
Reduce input groups=176
Reduce shuffle bytes=81867
Reduce input records=6297
Reduce output records=6297
Spilled Records=12594
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=13
Total committed heap usage (bytes)=395960320
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=1700190
File Output Format Counters
Bytes Written=69819
msantos@MBP-2[12:21]:~/System/HADOOP/current/test$l output_hcn/
total 136
-rw-r--r-- 1 msantos staff 0 13 Nov 12:21 _SUCCESS
-rw-r--r-- 1 msantos staff 69267 13 Nov 12:21 part-r-00000
Now there is an output! "Reduce output records=6297" and so it shows in the output directory where part-r-00000 is no longer empty.
POSTPONED
Let's check the values of TMAX for 1926 as measured by station NCDC/ghcnd_hcn/USC00011084.dly
$less ghcnd_hcn/USC00011084.dly|grep "1926.*TMAX" | cut -c 1-26| less
USC00011084192601TMAX-9999
USC00011084192602TMAX 194
USC00011084192603TMAX 217
USC00011084192604TMAX 222
USC00011084192605TMAX 306
USC00011084192606TMAX 317
USC00011084192607TMAX 367
USC00011084192608TMAX 322
USC00011084192609TMAX 356
USC00011084192610TMAX 367
USC00011084192611TMAX 222
USC00011084192612TMAX 222
and we see the max temp is 36.7C With python:
$cat ../input/data/NCDC/ghcnd_hcn/USC00011084.dly | ../classes/max_temperature_map.py | ../classes/max_temperature_reduce.py | less
1926 367
1927 367
1928 367
1930 361
1931 367
1932 339
1933 367
1934 350
1935 350
1936 339
1937 383
1938 344
1939 339
1940 350
1941 328
1942 333
1943 356
1944 350
1945 356
Here the mapper and reducer are: The mapper
#!/usr/bin/env python
import re
import sys
for line in sys.stdin:
val = line.strip()
element = val[17:21]
#print "element:>%s<" % element
if( element == "TMAX"):
(year, temp, q) = (val[11:15],val[21:26],val[27:28])
if temp != "-9999" and q == " ":
print "%s\t%s" % (year,temp)
and reducer
#!/usr/bin/env python
import sys
(last_key,max_val) = (None, 0)
for line in sys.stdin:
(key,val) = line.strip().split("\t")
if ( last_key and last_key != key ):
print "%s\t%s" % (last_key,max_val)
(last_key,max_val) = (key,int(val))
else:
(last_key,max_val) = (key,max(max_val,int(val)))
if last_key:
print "%s\t%s" % (last_key,max_val)
now Using hadoop streaming with a pythong script:
OUTDIR=output-py hadoop jar $HADOOP_INSTALL/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-input input/data/NCDC/ghcnd_hcn/USC00011084.dly \
-output $OUTDIR \
-mapper classes/max_temperature_map.py \
-reducer classes/max_temperature_reduce.py \
>& log-py
we get
msantos@MBP-2[15:20]:~/System/HADOOP/current/test$head output-py/part-00000
1926 367
1927 367
1928 367
1930 361
1931 367
1932 339
1933 367
1934 350
1935 350
1936 339
Python stream on ALL station files (same command as before with input the whole directory input/data/NCDC/ghcnd_hcn):
msantos@MBP-2[15:33]:~/System/HADOOP/current/test$time ./run-MaxTemperature-py
Reduce output records=145
real 12m25.303s
user 8m36.415s
sys 4m41.120s
By not checking the quality of the measurement I included a value like this one for 1967
$cat ghcnd_hcn/USC00244364.dly |grep USC00244364196701TMAX
USC00244364196701TMAX 1544 X0-9999 -9999 -9999 -9999 -9999 -9999 -9999 -9999 -9999 -9999 -9999 -9999 -9999 -9999 -9999 -9999 -9999 -9999 -9999 -9999 -9999 -9999 -9999 -9999 -9999 -9999 -9999 -9999 -9999 -9999
That is, a max temp of 154.4C !! Obviously this is a failed measurement and the quality indicator correspondingly shows a non-blank character namely 'X'. Filtering for quality we get the final result set.
$time ./run-MaxTemperature-py
Reduce output records=145
real 11m44.270s
user 8m34.876s
sys 4m45.707s
Streaming single python map reading all files and calculating the max temp for each year The python map:
#!/usr/bin/env python
import re
import sys
for line in sys.stdin:
val = line.strip()
element = val[17:21]
#print "element:>%s<" % element
max = {}
if( element == "TMAX"):
(year, temp, q) = (val[11:15],val[21:26],val[27:28])
if temp != "-9999" and q == " ":
#print "%s\t%s" % (year,temp)
if not max.has_key(year) or temp>max[year]:
max[year] = temp
for yr in max.keys():
print "%s\t%s" % (yr,max[yr])
The hadoop command & execution
cat ./run-MaxTemperature-py-single
#!/bin/bash
# all/single-file mode: -input input/data/NCDC/ghcnd_hcn \ #/USC00011084.dly \
OUTDIR=output-py-all-single
[ -d $OUTDIR ] && rm -Rf $OUTDIR
hadoop jar $HADOOP_INSTALL/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-input input/data/NCDC/ghcnd_hcn \
-output $OUTDIR \
-mapper classes/max_temperature_single.py \
-reducer /bin/cat \
>& log-py-all-single
grep "Reduce output" log-py-all-single
$time ./run-MaxTemperature-py-single
real 22m22.339s
user 22m2.756s
sys 0m39.069s
Made the mistake of not testing the script first directly on a few sample files. After 22m it turns out it just loaded everything into memory (+600MBi) and crash with error
Traceback (most recent call last):
File "/Users/msantos/System/HADOOP/hadoop-2.6.2/test/./classes/max_temperature_single.py", line 15, in <module>
if temp>max[year]:
KeyError: '1889'
Once corrected we get
msantos@MBP-2[18:55]:~/System/HADOOP/current/test$time ./run-MaxTemperature-py-single
Reduce output records=1413236
real 11m57.894s
user 7m58.080s
sys 4m52.835s
However, directly from the command line we get
msantos@MBP-2[19:25]:~/System/HADOOP/current/test$time cat input/data/NCDC/ghcnd_hcn/US*.dly | classes/max_temperature_single.py >& o-single
real 0m30.382s
user 0m25.770s
sys 0m3.815s
Basically, the python map/reduce and single step process both under Hadoop take the same time in processing all 1218 station files, namely around 12min.
If we run the python directly from the command line on all station files it takes, however, way less: ~30s. This is more in sync with the awk script we tried at the beginning.
Hadoop incurrs in an overhead that doesn't pay off undless we use many processors. We need to confirm this explicitly.