Σε αυτό το σεμινάριο, θα μάθετε να χρησιμοποιείτε το Hadoop με παραδείγματα MapReduce. Τα δεδομένα εισαγωγής που χρησιμοποιούνται είναι SalesJan2009.csv. Περιέχει πληροφορίες σχετικά με τις πωλήσεις όπως όνομα προϊόντος, τιμή, τρόπος πληρωμής, πόλη, χώρα πελάτη κ.λπ. Ο στόχος είναι να μάθετε τον αριθμό των προϊόντων που πωλούνται σε κάθε χώρα.
Σε αυτό το σεμινάριο, θα μάθετε-
- Πρώτο πρόγραμμα HadRop MapReduce
- Επεξήγηση της κατηγορίας SalesMapper
- Επεξήγηση της κλάσης SalesCountryReducer
- Επεξήγηση της κλάσης SalesCountryDriver
Πρώτο πρόγραμμα HadRop MapReduce
Τώρα σε αυτό το σεμινάριο MapReduce, θα δημιουργήσουμε το πρώτο μας πρόγραμμα Java MapReduce:
Βεβαιωθείτε ότι έχετε εγκαταστήσει το Hadoop. Πριν ξεκινήσετε με την πραγματική διαδικασία, αλλάξτε τον χρήστη σε «hduser» (το αναγνωριστικό που χρησιμοποιείται κατά τη διαμόρφωση Hadoop, μπορείτε να μεταβείτε στο userid που χρησιμοποιήθηκε κατά τη διαμόρφωση προγραμματισμού Hadoop).
su - hduser_
Βήμα 1)
Δημιουργήστε έναν νέο κατάλογο με το όνομα MapReduceTutorial ως shwon στο παρακάτω παράδειγμα MapReduce
sudo mkdir MapReduceTutorial
Δώστε δικαιώματα
sudo chmod -R 777 MapReduceTutorial
SalesMapper.java
package SalesCountry;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;public class SalesMapper extends MapReduceBase implements Mapper{private final static IntWritable one = new IntWritable(1);public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException {String valueString = value.toString();String[] SingleCountryData = valueString.split(",");output.collect(new Text(SingleCountryData[7]), one);}}
SalesCountryReducer.java
package SalesCountry;import java.io.IOException;import java.util.*;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;public class SalesCountryReducer extends MapReduceBase implements Reducer{public void reduce(Text t_key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {Text key = t_key;int frequencyForCountry = 0;while (values.hasNext()) {// replace type of value with the actual type of our valueIntWritable value = (IntWritable) values.next();frequencyForCountry += value.get();}output.collect(key, new IntWritable(frequencyForCountry));}}
SalesCountryDriver.java
package SalesCountry;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapred.*;public class SalesCountryDriver {public static void main(String[] args) {JobClient my_client = new JobClient();// Create a configuration object for the jobJobConf job_conf = new JobConf(SalesCountryDriver.class);// Set a name of the Jobjob_conf.setJobName("SalePerCountry");// Specify data type of output key and valuejob_conf.setOutputKeyClass(Text.class);job_conf.setOutputValueClass(IntWritable.class);// Specify names of Mapper and Reducer Classjob_conf.setMapperClass(SalesCountry.SalesMapper.class);job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);// Specify formats of the data type of Input and outputjob_conf.setInputFormat(TextInputFormat.class);job_conf.setOutputFormat(TextOutputFormat.class);// Set input and output directories using command line arguments,//arg[0] = name of input directory on HDFS, and arg[1] = name of output directory to be created to store the output file.FileInputFormat.setInputPaths(job_conf, new Path(args[0]));FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));my_client.setConf(job_conf);try {// Run the jobJobClient.runJob(job_conf);} catch (Exception e) {e.printStackTrace();}}}
Λήψη αρχείων εδώ
Ελέγξτε τα δικαιώματα αρχείων όλων αυτών των αρχείων
και εάν λείπουν δικαιώματα «ανάγνωσης», παραχωρήστε το ίδιο
Βήμα 2)
Εξαγωγή classpath όπως φαίνεται στο παρακάτω παράδειγμα Hadoop
export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"
Βήμα 3)
Μεταγλώττιση αρχείων Java (αυτά τα αρχεία υπάρχουν στον κατάλογο Final-MapReduceHandsOn ). Τα αρχεία της κατηγορίας θα τοποθετηθούν στον κατάλογο πακέτων
javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java
Αυτή η προειδοποίηση μπορεί να αγνοηθεί με ασφάλεια.
Αυτή η συλλογή θα δημιουργήσει έναν κατάλογο σε έναν τρέχοντα κατάλογο που θα ονομάζεται με το όνομα πακέτου που καθορίζεται στο αρχείο προέλευσης java (δηλ. SalesCountry στην περίπτωσή μας) και θα βάλει όλα τα μεταγλωττισμένα αρχεία κλάσης σε αυτό.
Βήμα 4)
Δημιουργήστε ένα νέο αρχείο Manifest.txt
sudo gedit Manifest.txt
προσθέστε τις ακόλουθες γραμμές σε αυτό,
Main-Class: SalesCountry.SalesCountryDriver
SalesCountry.SalesCountryDriver είναι το όνομα της κύριας τάξης. Λάβετε υπόψη ότι πρέπει να πατήσετε το πλήκτρο enter στο τέλος αυτής της γραμμής.
Βήμα 5)
Δημιουργήστε ένα αρχείο Jar
jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class
Ελέγξτε ότι έχει δημιουργηθεί το αρχείο βάζου
Βήμα 6)
Ξεκινήστε το Hadoop
$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh
Βήμα 7)
Αντιγράψτε το αρχείο SalesJan2009.csv στο ~ / inputMapReduce
Τώρα χρησιμοποιήστε την παρακάτω εντολή για να αντιγράψετε ~ / inputMapReduce σε HDFS.
$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /
Μπορούμε να αγνοήσουμε με ασφάλεια αυτήν την προειδοποίηση.
Επαληθεύστε εάν ένα αρχείο έχει αντιγραφεί ή όχι.
$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce
Βήμα 8)
Εκτελέστε την εργασία MapReduce
$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales
Αυτό θα δημιουργήσει έναν κατάλογο εξόδου με το όνομα mapreduce_output_sales στο HDFS. Τα περιεχόμενα αυτού του καταλόγου θα είναι ένα αρχείο που περιέχει πωλήσεις προϊόντων ανά χώρα.
Βήμα 9)
Το αποτέλεσμα μπορεί να φανεί μέσω διεπαφής εντολών ως,
$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000
Τα αποτελέσματα μπορούν επίσης να δουν μέσω μιας διεπαφής Ιστού ως
Ανοίξτε το r σε ένα πρόγραμμα περιήγησης ιστού.
Τώρα επιλέξτε «Περιήγηση στο σύστημα αρχείων» και πλοηγηθείτε στο / mapreduce_output_sales
Ανοίξτε το μέρος-r-00000
Επεξήγηση της κατηγορίας SalesMapper
Σε αυτήν την ενότητα, θα κατανοήσουμε την εφαρμογή της κλάσης SalesMapper .
1. Ξεκινάμε καθορίζοντας ένα όνομα πακέτου για την τάξη μας. Το SalesCountry είναι ένα όνομα του πακέτου μας. Σημειώστε ότι η έξοδος της συλλογής, το SalesMapper.class θα μεταβεί σε έναν κατάλογο που ονομάζεται με αυτό το όνομα πακέτου: SalesCountry .
Μετά από αυτό, εισάγουμε πακέτα βιβλιοθηκών.
Παρακάτω στιγμιότυπο δείχνει μια υλοποίηση της SalesMapper ταξικές
Επεξήγηση δείγματος κώδικα:
1. Ορισμός κατηγορίας SalesMapper-
Δημόσια τάξη SalesMapper επεκτείνει το MapReduceBase υλοποιεί το Mapper
Κάθε τάξη χαρτογράφησης πρέπει να επεκταθεί από την κατηγορία MapReduceBase και πρέπει να εφαρμόσει τη διεπαφή Mapper .
2. Ορισμός της λειτουργίας «χάρτη »-
public void map(LongWritable key,Text value,OutputCollectoroutput,Reporter reporter) throws IOException
Το κύριο μέρος της κλάσης Mapper είναι μια μέθοδος «map ()» που δέχεται τέσσερα επιχειρήματα.
Σε κάθε μέθοδο κλήσης σε «χάρτη ()» , ένα ζεύγος κλειδιού-τιμής ( «κλειδί» και «τιμή» σε αυτόν τον κώδικα) μεταβιβάζεται.
Η μέθοδος 'map ()' ξεκινά διαχωρίζοντας το κείμενο εισαγωγής που λαμβάνεται ως επιχείρημα. Χρησιμοποιεί το tokenizer για να χωρίσει αυτές τις γραμμές σε λέξεις.
String valueString = value.toString();String[] SingleCountryData = valueString.split(",");
Εδώ, το "," χρησιμοποιείται ως οριοθέτης.
Μετά από αυτό, σχηματίζεται ένα ζεύγος χρησιμοποιώντας μια εγγραφή στον 7ο δείκτη του πίνακα «SingleCountryData» και μια τιμή «1» .
output.collect (νέο κείμενο (SingleCountryData [7]), ένα);
Επιλέγουμε εγγραφή στο 7ο ευρετήριο επειδή χρειαζόμαστε δεδομένα χώρας και βρίσκεται στο 7ο ευρετήριο στον πίνακα «SingleCountryData» .
Παρακαλείστε να σημειώσετε ότι τα δεδομένα εισόδου μας στην παρακάτω μορφή (όπου Χώρα βρίσκεται στο 7 ο δείκτης, με το 0 ως δείκτης εκκίνησης) -
Ημερομηνία συναλλαγής, Προϊόν, Τιμή, Τύπος_ Πληρωμής, Όνομα, Πόλη, Πολιτεία, Χώρα , Λογαριασμός_Δημιούργησε, Τελευταία_Login, Γεωγραφικό πλάτος, Μήκος
Η έξοδος του mapper είναι και πάλι ένα ζεύγος κλειδιού-τιμής που εξάγεται χρησιμοποιώντας τη μέθοδο «collect ()» του «OutputCollector» .
Επεξήγηση της κλάσης SalesCountryReducer
Σε αυτήν την ενότητα, θα κατανοήσουμε την εφαρμογή της κλάσης SalesCountryReducer .
1. Ξεκινάμε καθορίζοντας ένα όνομα του πακέτου για την τάξη μας. Το SalesCountry είναι ένα όνομα πακέτου εκτός. Λάβετε υπόψη ότι το αποτέλεσμα της συλλογής, το SalesCountryReducer.class θα μεταβεί σε έναν κατάλογο που ονομάζεται με αυτό το όνομα πακέτου: SalesCountry .
Μετά από αυτό, εισάγουμε πακέτα βιβλιοθηκών.
Παρακάτω στιγμιότυπο δείχνει μια υλοποίηση της SalesCountryReducer ταξικές
Επεξήγηση κώδικα:
1. Ορισμός κατηγορίας SalesCountryReducer-
δημόσια κλάση SalesCountryReducer επεκτείνει το MapReduceBase υλοποιεί Reducer
Εδώ, οι δύο πρώτοι τύποι δεδομένων, «Κείμενο» και «IntWritable» είναι τύπος δεδομένων εισόδου κλειδιού-τιμής στον μειωτή.
Η έξοδος του mapper έχει τη μορφή
Οι δύο τελευταίοι τύποι δεδομένων, «Κείμενο» και «IntWritable» είναι τύπος δεδομένων εξόδου που παράγεται από μειωτή με τη μορφή ζεύγους κλειδιού-τιμής.
Κάθε κλάση μειωτή πρέπει να επεκταθεί από την κατηγορία MapReduceBase και πρέπει να εφαρμόσει διεπαφή Reducer .
2. Ορισμός της λειτουργίας «μείωση»
public void reduce( Text t_key,Iteratorvalues,OutputCollector output,Reporter reporter) throws IOException {
Μια εισαγωγή στη μέθοδο μείωσης () είναι ένα κλειδί με μια λίστα πολλαπλών τιμών.
Για παράδειγμα, στην περίπτωσή μας, θα είναι-
<Ηνωμένα Αραβικά Εμιράτα, 1>, <Ηνωμένα Αραβικά Εμιράτα, 1>, <Ηνωμένα Αραβικά Εμιράτα, 1>, <Ηνωμένα Αραβικά Εμιράτα, 1>, <Ηνωμένα Αραβικά Εμιράτα, 1>, <Ηνωμένα Αραβικά Εμιράτα, 1>.
Αυτό δίνεται στον μειωτή ως <Ηνωμένα Αραβικά Εμιράτα, {1,1,1,1,1,1}>
Έτσι, για να αποδεχτείτε επιχειρήματα αυτής της φόρμας, χρησιμοποιούνται δύο πρώτοι τύποι δεδομένων, δηλαδή, Κείμενο και Iterator
Το επόμενο όρισμα είναι του τύπου OutputCollector
η μέθοδος μείωση () ξεκινά αντιγράφοντας την τιμή κλειδιού και αρχικοποιώντας τον αριθμό συχνότητας στο 0
Κλειδί κειμένου = t_key; int συχνότηταForCountry = 0;
Στη συνέχεια, χρησιμοποιώντας το βρόχο « while» , επαναλαμβάνουμε τη λίστα τιμών που σχετίζονται με το κλειδί και υπολογίζουμε την τελική συχνότητα αθροίζοντας όλες τις τιμές.
while (values.hasNext()) {// replace type of value with the actual type of our valueIntWritable value = (IntWritable) values.next();frequencyForCountry += value.get();}
Τώρα, προωθούμε το αποτέλεσμα στον συλλέκτη εξόδου με τη μορφή κλειδιού και αποκτήσαμε τον αριθμό συχνότητας .
Ο παρακάτω κώδικας κάνει αυτό-
output.collect(key, new IntWritable(frequencyForCountry));
Επεξήγηση της κλάσης SalesCountryDriver
Σε αυτήν την ενότητα, θα κατανοήσουμε την εφαρμογή της κλάσης SalesCountryDriver
1. Ξεκινάμε καθορίζοντας ένα όνομα πακέτου για την τάξη μας. Το SalesCountry είναι ένα όνομα πακέτου εκτός. Λάβετε υπόψη ότι η έξοδος της συλλογής, το SalesCountryDriver.class θα μεταβεί στον κατάλογο που ονομάζεται με αυτό το όνομα πακέτου: SalesCountry .
Ακολουθεί μια γραμμή που καθορίζει το όνομα του πακέτου και ακολουθεί ο κωδικός για την εισαγωγή πακέτων βιβλιοθήκης.
2. Ορίστε μια κλάση προγράμματος οδήγησης που θα δημιουργήσει μια νέα εργασία πελάτη, ένα αντικείμενο διαμόρφωσης και θα διαφημίσει τις κατηγορίες Mapper και Reducer.
Η τάξη οδηγού είναι υπεύθυνη για τη ρύθμιση της εργασίας MapReduce στο Hadoop. Σε αυτήν την τάξη, καθορίζουμε το όνομα εργασίας, τον τύπο δεδομένων εισόδου / εξόδου και τα ονόματα των τάξεων χαρτών και μειωτών .
3. Στο παρακάτω απόσπασμα κώδικα, ορίζουμε καταλόγους εισόδου και εξόδου που χρησιμοποιούνται για την κατανάλωση συνόλου δεδομένων εισόδου και την παραγωγή εξόδου, αντίστοιχα.
arg [0] και arg [1] είναι τα ορίσματα γραμμής εντολών που περνούν με μια εντολή που δίνεται στο MapReduce hands-on, δηλαδή,
$ HADOOP_HOME / bin / hadoop jar ProductSalePerCountry.jar / inputMapReduce / mapreduce_output_sales
4. Ενεργοποιήστε τη δουλειά μας
Κάτω από τον κώδικα ξεκινήστε την εκτέλεση της εργασίας MapReduce-
try {// Run the jobJobClient.runJob(job_conf);} catch (Exception e) {e.printStackTrace();}