Hadoop & Παραδείγματα Mapreduce: Δημιουργία πρώτου προγράμματος στην Java

Πίνακας περιεχομένων:

Anonim

Σε αυτό το σεμινάριο, θα μάθετε να χρησιμοποιείτε το Hadoop με παραδείγματα MapReduce. Τα δεδομένα εισαγωγής που χρησιμοποιούνται είναι SalesJan2009.csv. Περιέχει πληροφορίες σχετικά με τις πωλήσεις όπως όνομα προϊόντος, τιμή, τρόπος πληρωμής, πόλη, χώρα πελάτη κ.λπ. Ο στόχος είναι να μάθετε τον αριθμό των προϊόντων που πωλούνται σε κάθε χώρα.

Σε αυτό το σεμινάριο, θα μάθετε-

  • Πρώτο πρόγραμμα HadRop MapReduce
  • Επεξήγηση της κατηγορίας SalesMapper
  • Επεξήγηση της κλάσης SalesCountryReducer
  • Επεξήγηση της κλάσης SalesCountryDriver

Πρώτο πρόγραμμα HadRop MapReduce

Τώρα σε αυτό το σεμινάριο MapReduce, θα δημιουργήσουμε το πρώτο μας πρόγραμμα Java MapReduce:

Δεδομένα του SalesJan2009

Βεβαιωθείτε ότι έχετε εγκαταστήσει το Hadoop. Πριν ξεκινήσετε με την πραγματική διαδικασία, αλλάξτε τον χρήστη σε «hduser» (το αναγνωριστικό που χρησιμοποιείται κατά τη διαμόρφωση Hadoop, μπορείτε να μεταβείτε στο userid που χρησιμοποιήθηκε κατά τη διαμόρφωση προγραμματισμού Hadoop).

su - hduser_

Βήμα 1)

Δημιουργήστε έναν νέο κατάλογο με το όνομα MapReduceTutorial ως shwon στο παρακάτω παράδειγμα MapReduce

sudo mkdir MapReduceTutorial

Δώστε δικαιώματα

sudo chmod -R 777 MapReduceTutorial

SalesMapper.java

package SalesCountry;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;public class SalesMapper extends MapReduceBase implements Mapper  {private final static IntWritable one = new IntWritable(1);public void map(LongWritable key, Text value, OutputCollector  output, Reporter reporter) throws IOException {String valueString = value.toString();String[] SingleCountryData = valueString.split(",");output.collect(new Text(SingleCountryData[7]), one);}}

SalesCountryReducer.java

package SalesCountry;import java.io.IOException;import java.util.*;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;public class SalesCountryReducer extends MapReduceBase implements Reducer {public void reduce(Text t_key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {Text key = t_key;int frequencyForCountry = 0;while (values.hasNext()) {// replace type of value with the actual type of our valueIntWritable value = (IntWritable) values.next();frequencyForCountry += value.get();}output.collect(key, new IntWritable(frequencyForCountry));}}

SalesCountryDriver.java

package SalesCountry;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapred.*;public class SalesCountryDriver {public static void main(String[] args) {JobClient my_client = new JobClient();// Create a configuration object for the jobJobConf job_conf = new JobConf(SalesCountryDriver.class);// Set a name of the Jobjob_conf.setJobName("SalePerCountry");// Specify data type of output key and valuejob_conf.setOutputKeyClass(Text.class);job_conf.setOutputValueClass(IntWritable.class);// Specify names of Mapper and Reducer Classjob_conf.setMapperClass(SalesCountry.SalesMapper.class);job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);// Specify formats of the data type of Input and outputjob_conf.setInputFormat(TextInputFormat.class);job_conf.setOutputFormat(TextOutputFormat.class);// Set input and output directories using command line arguments,//arg[0] = name of input directory on HDFS, and arg[1] = name of output directory to be created to store the output file.FileInputFormat.setInputPaths(job_conf, new Path(args[0]));FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));my_client.setConf(job_conf);try {// Run the jobJobClient.runJob(job_conf);} catch (Exception e) {e.printStackTrace();}}}

Λήψη αρχείων εδώ

Ελέγξτε τα δικαιώματα αρχείων όλων αυτών των αρχείων

και εάν λείπουν δικαιώματα «ανάγνωσης», παραχωρήστε το ίδιο

Βήμα 2)

Εξαγωγή classpath όπως φαίνεται στο παρακάτω παράδειγμα Hadoop

export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"

Βήμα 3)

Μεταγλώττιση αρχείων Java (αυτά τα αρχεία υπάρχουν στον κατάλογο Final-MapReduceHandsOn ). Τα αρχεία της κατηγορίας θα τοποθετηθούν στον κατάλογο πακέτων

javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java

Αυτή η προειδοποίηση μπορεί να αγνοηθεί με ασφάλεια.

Αυτή η συλλογή θα δημιουργήσει έναν κατάλογο σε έναν τρέχοντα κατάλογο που θα ονομάζεται με το όνομα πακέτου που καθορίζεται στο αρχείο προέλευσης java (δηλ. SalesCountry στην περίπτωσή μας) και θα βάλει όλα τα μεταγλωττισμένα αρχεία κλάσης σε αυτό.

Βήμα 4)

Δημιουργήστε ένα νέο αρχείο Manifest.txt

sudo gedit Manifest.txt

προσθέστε τις ακόλουθες γραμμές σε αυτό,

Main-Class: SalesCountry.SalesCountryDriver

SalesCountry.SalesCountryDriver είναι το όνομα της κύριας τάξης. Λάβετε υπόψη ότι πρέπει να πατήσετε το πλήκτρο enter στο τέλος αυτής της γραμμής.

Βήμα 5)

Δημιουργήστε ένα αρχείο Jar

jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class

Ελέγξτε ότι έχει δημιουργηθεί το αρχείο βάζου

Βήμα 6)

Ξεκινήστε το Hadoop

$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh

Βήμα 7)

Αντιγράψτε το αρχείο SalesJan2009.csv στο ~ / inputMapReduce

Τώρα χρησιμοποιήστε την παρακάτω εντολή για να αντιγράψετε ~ / inputMapReduce σε HDFS.

$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /

Μπορούμε να αγνοήσουμε με ασφάλεια αυτήν την προειδοποίηση.

Επαληθεύστε εάν ένα αρχείο έχει αντιγραφεί ή όχι.

$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce

Βήμα 8)

Εκτελέστε την εργασία MapReduce

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

Αυτό θα δημιουργήσει έναν κατάλογο εξόδου με το όνομα mapreduce_output_sales στο HDFS. Τα περιεχόμενα αυτού του καταλόγου θα είναι ένα αρχείο που περιέχει πωλήσεις προϊόντων ανά χώρα.

Βήμα 9)

Το αποτέλεσμα μπορεί να φανεί μέσω διεπαφής εντολών ως,

$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000

Τα αποτελέσματα μπορούν επίσης να δουν μέσω μιας διεπαφής Ιστού ως

Ανοίξτε το r σε ένα πρόγραμμα περιήγησης ιστού.

Τώρα επιλέξτε «Περιήγηση στο σύστημα αρχείων» και πλοηγηθείτε στο / mapreduce_output_sales

Ανοίξτε το μέρος-r-00000

Επεξήγηση της κατηγορίας SalesMapper

Σε αυτήν την ενότητα, θα κατανοήσουμε την εφαρμογή της κλάσης SalesMapper .

1. Ξεκινάμε καθορίζοντας ένα όνομα πακέτου για την τάξη μας. Το SalesCountry είναι ένα όνομα του πακέτου μας. Σημειώστε ότι η έξοδος της συλλογής, το SalesMapper.class θα μεταβεί σε έναν κατάλογο που ονομάζεται με αυτό το όνομα πακέτου: SalesCountry .

Μετά από αυτό, εισάγουμε πακέτα βιβλιοθηκών.

Παρακάτω στιγμιότυπο δείχνει μια υλοποίηση της SalesMapper ταξικές

Επεξήγηση δείγματος κώδικα:

1. Ορισμός κατηγορίας SalesMapper-

Δημόσια τάξη SalesMapper επεκτείνει το MapReduceBase υλοποιεί το Mapper {

Κάθε τάξη χαρτογράφησης πρέπει να επεκταθεί από την κατηγορία MapReduceBase και πρέπει να εφαρμόσει τη διεπαφή Mapper .

2. Ορισμός της λειτουργίας «χάρτη »-

public void map(LongWritable key,Text value,OutputCollector output,Reporter reporter) throws IOException

Το κύριο μέρος της κλάσης Mapper είναι μια μέθοδος «map ()» που δέχεται τέσσερα επιχειρήματα.

Σε κάθε μέθοδο κλήσης σε «χάρτη ()» , ένα ζεύγος κλειδιού-τιμής ( «κλειδί» και «τιμή» σε αυτόν τον κώδικα) μεταβιβάζεται.

Η μέθοδος 'map ()' ξεκινά διαχωρίζοντας το κείμενο εισαγωγής που λαμβάνεται ως επιχείρημα. Χρησιμοποιεί το tokenizer για να χωρίσει αυτές τις γραμμές σε λέξεις.

String valueString = value.toString();String[] SingleCountryData = valueString.split(",");

Εδώ, το "," χρησιμοποιείται ως οριοθέτης.

Μετά από αυτό, σχηματίζεται ένα ζεύγος χρησιμοποιώντας μια εγγραφή στον 7ο δείκτη του πίνακα «SingleCountryData» και μια τιμή «1» .

output.collect (νέο κείμενο (SingleCountryData [7]), ένα);

Επιλέγουμε εγγραφή στο 7ο ευρετήριο επειδή χρειαζόμαστε δεδομένα χώρας και βρίσκεται στο 7ο ευρετήριο στον πίνακα «SingleCountryData» .

Παρακαλείστε να σημειώσετε ότι τα δεδομένα εισόδου μας στην παρακάτω μορφή (όπου Χώρα βρίσκεται στο 7 ο δείκτης, με το 0 ως δείκτης εκκίνησης) -

Ημερομηνία συναλλαγής, Προϊόν, Τιμή, Τύπος_ Πληρωμής, Όνομα, Πόλη, Πολιτεία, Χώρα , Λογαριασμός_Δημιούργησε, Τελευταία_Login, Γεωγραφικό πλάτος, Μήκος

Η έξοδος του mapper είναι και πάλι ένα ζεύγος κλειδιού-τιμής που εξάγεται χρησιμοποιώντας τη μέθοδο «collect ()» του «OutputCollector» .

Επεξήγηση της κλάσης SalesCountryReducer

Σε αυτήν την ενότητα, θα κατανοήσουμε την εφαρμογή της κλάσης SalesCountryReducer .

1. Ξεκινάμε καθορίζοντας ένα όνομα του πακέτου για την τάξη μας. Το SalesCountry είναι ένα όνομα πακέτου εκτός. Λάβετε υπόψη ότι το αποτέλεσμα της συλλογής, το SalesCountryReducer.class θα μεταβεί σε έναν κατάλογο που ονομάζεται με αυτό το όνομα πακέτου: SalesCountry .

Μετά από αυτό, εισάγουμε πακέτα βιβλιοθηκών.

Παρακάτω στιγμιότυπο δείχνει μια υλοποίηση της SalesCountryReducer ταξικές

Επεξήγηση κώδικα:

1. Ορισμός κατηγορίας SalesCountryReducer-

δημόσια κλάση SalesCountryReducer επεκτείνει το MapReduceBase υλοποιεί Reducer {

Εδώ, οι δύο πρώτοι τύποι δεδομένων, «Κείμενο» και «IntWritable» είναι τύπος δεδομένων εισόδου κλειδιού-τιμής στον μειωτή.

Η έξοδος του mapper έχει τη μορφή , . Αυτή η έξοδος του mapper γίνεται είσοδος στον μειωτή. Έτσι, για να ευθυγραμμιστούν με τον τύπο δεδομένων του, το κείμενο και το IntWritable χρησιμοποιούνται ως τύπος δεδομένων εδώ

Οι δύο τελευταίοι τύποι δεδομένων, «Κείμενο» και «IntWritable» είναι τύπος δεδομένων εξόδου που παράγεται από μειωτή με τη μορφή ζεύγους κλειδιού-τιμής.

Κάθε κλάση μειωτή πρέπει να επεκταθεί από την κατηγορία MapReduceBase και πρέπει να εφαρμόσει διεπαφή Reducer .

2. Ορισμός της λειτουργίας «μείωση»

public void reduce( Text t_key,Iterator values,OutputCollector output,Reporter reporter) throws IOException {

Μια εισαγωγή στη μέθοδο μείωσης () είναι ένα κλειδί με μια λίστα πολλαπλών τιμών.

Για παράδειγμα, στην περίπτωσή μας, θα είναι-

<Ηνωμένα Αραβικά Εμιράτα, 1>, <Ηνωμένα Αραβικά Εμιράτα, 1>, <Ηνωμένα Αραβικά Εμιράτα, 1>, <Ηνωμένα Αραβικά Εμιράτα, 1>, <Ηνωμένα Αραβικά Εμιράτα, 1>, <Ηνωμένα Αραβικά Εμιράτα, 1>.

Αυτό δίνεται στον μειωτή ως <Ηνωμένα Αραβικά Εμιράτα, {1,1,1,1,1,1}>

Έτσι, για να αποδεχτείτε επιχειρήματα αυτής της φόρμας, χρησιμοποιούνται δύο πρώτοι τύποι δεδομένων, δηλαδή, Κείμενο και Iterator . Το κείμενο είναι ένας τύπος δεδομένων κλειδιού και το Iterator είναι ένας τύπος δεδομένων για μια λίστα τιμών για αυτό το κλειδί.

Το επόμενο όρισμα είναι του τύπου OutputCollector που συλλέγει την έξοδο της φάσης μειωτή.

η μέθοδος μείωση () ξεκινά αντιγράφοντας την τιμή κλειδιού και αρχικοποιώντας τον αριθμό συχνότητας στο 0

Κλειδί κειμένου = t_key; int συχνότηταForCountry = 0;

Στη συνέχεια, χρησιμοποιώντας το βρόχο « while» , επαναλαμβάνουμε τη λίστα τιμών που σχετίζονται με το κλειδί και υπολογίζουμε την τελική συχνότητα αθροίζοντας όλες τις τιμές.

 while (values.hasNext()) {// replace type of value with the actual type of our valueIntWritable value = (IntWritable) values.next();frequencyForCountry += value.get();}

Τώρα, προωθούμε το αποτέλεσμα στον συλλέκτη εξόδου με τη μορφή κλειδιού και αποκτήσαμε τον αριθμό συχνότητας .

Ο παρακάτω κώδικας κάνει αυτό-

output.collect(key, new IntWritable(frequencyForCountry));

Επεξήγηση της κλάσης SalesCountryDriver

Σε αυτήν την ενότητα, θα κατανοήσουμε την εφαρμογή της κλάσης SalesCountryDriver

1. Ξεκινάμε καθορίζοντας ένα όνομα πακέτου για την τάξη μας. Το SalesCountry είναι ένα όνομα πακέτου εκτός. Λάβετε υπόψη ότι η έξοδος της συλλογής, το SalesCountryDriver.class θα μεταβεί στον κατάλογο που ονομάζεται με αυτό το όνομα πακέτου: SalesCountry .

Ακολουθεί μια γραμμή που καθορίζει το όνομα του πακέτου και ακολουθεί ο κωδικός για την εισαγωγή πακέτων βιβλιοθήκης.

2. Ορίστε μια κλάση προγράμματος οδήγησης που θα δημιουργήσει μια νέα εργασία πελάτη, ένα αντικείμενο διαμόρφωσης και θα διαφημίσει τις κατηγορίες Mapper και Reducer.

Η τάξη οδηγού είναι υπεύθυνη για τη ρύθμιση της εργασίας MapReduce στο Hadoop. Σε αυτήν την τάξη, καθορίζουμε το όνομα εργασίας, τον τύπο δεδομένων εισόδου / εξόδου και τα ονόματα των τάξεων χαρτών και μειωτών .

3. Στο παρακάτω απόσπασμα κώδικα, ορίζουμε καταλόγους εισόδου και εξόδου που χρησιμοποιούνται για την κατανάλωση συνόλου δεδομένων εισόδου και την παραγωγή εξόδου, αντίστοιχα.

arg [0] και arg [1] είναι τα ορίσματα γραμμής εντολών που περνούν με μια εντολή που δίνεται στο MapReduce hands-on, δηλαδή,

$ HADOOP_HOME / bin / hadoop jar ProductSalePerCountry.jar / inputMapReduce / mapreduce_output_sales

4. Ενεργοποιήστε τη δουλειά μας

Κάτω από τον κώδικα ξεκινήστε την εκτέλεση της εργασίας MapReduce-

try {// Run the jobJobClient.runJob(job_conf);} catch (Exception e) {e.printStackTrace();}