- MapReduce
-
MapReduce ist ein von Google Inc. eingeführtes Framework für nebenläufige Berechnungen über große (mehrere Petabyte[1]) Datenmengen auf Computerclustern. Dieses Framework wurde durch die in der funktionalen Programmierung häufig verwendeten Funktionen map und reduce inspiriert,[2] auch wenn die Semantik des Frameworks von diesen abweicht.[3] 2010 wurde für MapReduce ein US-Patent erteilt[4]
MapReduce-Implementierungen wurden in C++, Erlang, Java, Python und vielen anderen Programmiersprachen realisiert.
Inhaltsverzeichnis
Arbeitsweise
Illustration des Datenflusses
Das obige Bild illustriert den Datenfluss bei der MapReduce-Berechnung.
- Die Eingabedaten ("D, A, T, A") werden auf eine Reihe von Map-Prozessen verteilt (bunte Rechtecke), welche jeweils die vom Nutzer bereitgestellte Map-Funktion berechnen.
- Die Map-Prozesse werden idealerweise parallel ausgeführt.
- Jede dieser Map-Instanzen legt Zwischenergebnisse ab (dargestellt durch die gelben Dreiecke).
- Von jeder Map-Instanz fließen Daten in eventuell verschiedene Zwischenergebnisspeicher.
- Sind alle Zwischenergebnisse berechnet, ist diese sogenannte Map-Phase beendet und die Reduce-Phase beginnt.
- Für jeden Satz an Zwischenergebnissen berechnet jeweils genau ein Reduce-Prozess (graue Rechtecke) die vom Nutzer bereitgestellte Reduce-Funktion und damit die Ausgabedaten (violette Ovale).
- Die Reduce-Prozesse werden idealerweise auch parallel ausgeführt.
Definition der MapReduce-Funktion
Das MapReduce-Framework realisiert eine Funktion, welche aus einer Liste von Schlüssel-Wert-Paaren (Eingabeliste) eine neue Liste von Schlüssel-Wert-Paaren (Ausgabeliste) berechnet:
Erläuterung:
- Die Mengen K und L enthalten Schlüssel, die Mengen V und W enthalten Werte.
- Alle Schlüssel sind vom gleichen Typ, z. B. Strings.
- Alle Schlüssel sind vom gleichen Typ, z. B. ganze Zahlen.
- Alle Werte sind vom gleichen Typ, z. B. Atome.
- Alle Werte sind vom gleichen Typ, z. B. Gleitkommazahlen.
- Wenn A und B Mengen sind, so ist mit die Menge aller Paare (a,b) gemeint, wobei und (Kartesisches Produkt).
- Wenn M eine Menge ist, so ist mit M * die Menge aller endlichen Listen mit Elementen aus M gemeint (angelehnt an den Kleene-Stern) - die Liste kann auch leer sein.
Definition der Map- und Reduce-Funktionen
Der Nutzer konfiguriert das Framework über die Bereitstellung der beiden Funktionen Map und Reduce, die wie folgt definiert sind:
bzw.
Map-Phase
- Map bildet ein Paar, bestehend aus einem Schlüssel k und einem Wert v, auf eine Liste von neuen Paaren (lr,xr) ab, welche die Rolle von Zwischenergebnissen spielen, die Werte xr sind vom gleichen Typ wie die Endergebnisse wi.
- Bei einem neuen Paar (l,x) verweist der von Map vergebene Schlüssel l dabei auf eine Liste Tl von Zwischenergebnissen, in welcher der von Map berechnete Wert x gesammelt wird.
- Das Framework ruft für jedes Paar in der Eingabeliste die Funktion Map auf.
- All diese Map-Berechnungen sind voneinander unabhängig, so dass man sie nebenläufig und verteilt auf einem Computercluster ausführen kann.
Reduce-Phase
- Sind alle Map-Aufrufe erfolgt bzw. liegen alle Zwischenergebnisse in Tl vor, so ruft das Framework für jede Zwischenwertliste die Funktion Reduce auf, welche daraus eine Liste von Ergebniswerten wj berechnet, die vom Framework in der Ausgabeliste als Paare (l,wj) gesammelt werden.
- Auch die Aufrufe von Reduce können unabhängig auf verschiedene Prozesse im Computercluster verteilt werden.
Anmerkung: Diese Darstellung war etwas vereinfacht, denn in der Regel wird die Steuerung des MapReduce Verfahrens eine Anzahl R von Reduce-Prozessen anstreben, so dass, wenn es mehr Zwischenergebnisse (l,x) als Reduce-Prozesse gibt, mehrere l Zwischenergebnisse in einer Liste gespeichert werden. Die entsprechenden Paare werden vor der Reduce-Berechnung nach Schlüsseln sortiert.
Combine-Phase
Optional kann zwischen der Map- und der Reduce-Phase noch eine Combine-Phase erfolgen. Diese hat in der Regel die gleiche Funktionalität wie die Reducefunktion, wird aber auf dem gleichen Knoten wie die Map-Phase ausgeführt. Dabei geht es darum, die Netzwerklast zu reduzieren.[5] Der Sinn der Combine-Phase erschließt sich sofort bei der Betrachtung des Wordcount-Beispiels: Auf Grund der unterschiedlichen Häufigkeit von Wörtern in natürlicher Sprache, würde bei einem deutschen Text beispielsweise sehr oft eine Ausgabe der Form ("und", 1) erzeugt (gleiches gilt für Artikel und Hilfsverben). Durch die Combine-Phase wird nun aus 100 Nachrichten der Form ("und", 1) lediglich eine Nachricht der Form ("und", 100). Dies kann die Netzwerkbelastung signifikant reduzieren, ist aber nicht in allen Anwendungsfällen möglich.
Beispiel: Verteilte Häufigkeitsanalyse mit MapReduce
Problem
Man möchte für umfangreiche Texte herausfinden, wie oft welche Wörter vorkommen.
Angabe der Map- und Reduce-Funktionen
map(String name, String document): // name: document name // document: document contents for each word w in document: EmitIntermediate(w, 1); reduce(String word, Iterator partialCounts): // word: a word // partialCounts: a list of aggregated partial counts int result = 0; for each v in partialCounts: result +=v; Emit(result);
Map-Phase
- Map bekommt jeweils einen Dokumentnamen name und ein Dokument document als Zeichenkette übergeben.
- Map durchläuft das Dokument Wort für Wort.
- Jedes Mal, wenn ein Wort w angetroffen wird, wandert eine 1 in die w-Zwischenergebnisliste (falls diese noch nicht existiert, wird sie angelegt).
- Ist man mit allen Wörtern durch und hat der Text insgesamt n verschiedene Wörter, so endet die Map-Phase mit n Zwischenergebnislisten, jede für ein anderes Wort sammelnd, welche so viele 1-Einträge enthält, wie das entsprechende Wort im Dokument gefunden wurde.
- Eventuell liefen viele Map-Instanzen gleichzeitig, falls dem Framework mehrere Worte und Dokumente übergeben wurden.
Reduce-Phase
- Reduce wird für das Wort word und die Zwischenergebnisliste partialCounts aufgerufen.
- Reduce durchläuft die Zwischenergebnisliste und addiert alle gefundenen Zahlen auf.
- Die Summe result wird an das Framework zurückgegeben, sie enthält, wie oft das Wort word im Dokument gefunden wurde.
- Die Zwischenergebnisse konnten parallel, durch gleichzeitige Reduce-Aufrufe, berechnet werden.
Insgesamt:
- Aus einer Liste von Dokumentnamen und Dokumenten, wird eine Liste von Worten und Worthäufigkeiten generiert.
Beispielhafte Berechnung
Zum Beispiel wäre folgende Berechnung auf einem klassischen Text denkbar:
Text = "Fest gemauert in der Erden
Steht die Form, aus Lehm gebrannt.
Heute muß die Glocke werden,
Frisch, Gesellen! seid zur Hand.
Von der Stirne heiß
Rinnen muß der Schweiß,
Soll das Werk den Meister loben,
Doch der Segen kommt von oben."Der Text wird in Sätze aufgeteilt, dabei bietet sich eine Normalisierung an, indem man alles klein schreibt und die Satzzeichen entfernt:
Eingabeliste = [ (satz_1, "fest gemauert in der erden steht die form aus lehm gebrannt"), (satz_2, "heute muß die glocke werden frisch gesellen seid zur hand"), (satz_3, "von der stirne heiß rinnen muß der schweiß soll das werk den meister loben doch der " "segen kommt von oben") ]
Die Eingabeliste hat drei Paare als Elemente, wir können daher drei Map-Prozesse starten:
P1 = Map(satz_1, "fest gemauert in der erden steht die form aus lehm gebrannt") P2 = Map(satz_2, "heute muß die glocke werden frisch gesellen seid zur hand") P3 = Map(satz_3, "von der stirne heiß rinnen muß der schweiß soll das werk den meister loben doch der segen " "kommt von oben")
Die Map-Aufrufe generieren diese Zwischenergebnispaare:
P1 = [ ("fest", 1), ("gemauert", 1), ("in", 1), ("der", 1), ("erden", 1), ("steht", 1), ("die", 1), ("form", 1), ("aus", 1), ("lehm, 1), ("gebrannt", 1) ] P2 = [ ("heute", 1), ("muß", 1), ("die", 1), ("glocke", 1), ("werden", 1), ("frisch", 1), ("gesellen", 1), ("seid", 1), ("zur", 1), ("hand", 1) ] P3 = [ ("von", 1), ("der", 1), ("stirne", 1), ("heiß", 1), ("rinnen", 1), ("muß, 1), ("der", 1), ("schweiß", 1), ("soll", 1), ("das", 1), ("werk", 1), ("den", 1), ("meister", 1), ("loben", 1), ("doch", 1), ("der", 1), ("segen", 1), ("kommt", 1), ("von", 1), ("oben", 1) ]
Die Map-Prozesse liefern ihre Paare an das MapReduce Framework, welches diese in den Zwischenergebnislisten sammelt. Parallel könnte folgendes geschehen (Die gleiche Taktung der 3 Map-Prozesse ist unrealistisch, tatsächlich überlappen sich die Ausführungen. Die T_wort-Listen sind lokal pro Map-Prozess vorhanden und werden *nicht* zwischen den Schritten synchronisiert):
1. Schritt: T_fest = [ 1 ], neu angelegt vom Framework T_heute = [ 1 ], neu T_von = [ 1 ], neu 2. Schritt: T_gemauert = [ 1 ], neu T_muß = [ 1 ], neu T_der = [ 1 ], neu 3. Schritt: T_in = [ 1 ], neu T_die = [ 1 ], neu T_stirne = [ 1 ], neu
Im vierten Schritt sieht man, dass Zwischenergebnislisten lokal für jeden Map-Prozess existieren und nicht global wiederverwendet werden können:
4. Schritt: T_der = [ 1 ], neu (der 1. Map-Prozess hat noch kein T_der, nur der 3.!) T_glocke = [ 1 ], neu T_heiss = [ 1], neu 5. Schritt T_erden = [ 1 ], neu T_werden = [ 1 ], neu T_rinnen = [ 1 ], neu 6. Schritt T_steht = [ 1 ], neu T_frisch = [ 1 ], neu T_muß = [ 1 ], neu (der 3. Map-Prozess hat noch kein T_muß!)
Im siebten Schritt kommt dann zum ersten Mal vor, dass ein weiteres Vorkommen in einer bereits angelegten Zwischenergebnisliste gesammelt wird:
7. Schritt T_die = [ 1 ], neu (der 1. Map-Prozess hat noch kein T_die!) T_gesellen = [ 1 ], neu T_der = [ 1, 1 ], beim 3. Map-Prozess seit Schritt 2 vorhandene Liste verwenden
usw.
Nach 21 Schritten sind alle drei Map-Prozesse mit ihrer Arbeit durch, die Map-Phase endet und es beginnt die Reduce-Phase. Die Zwischenergebnislisten, die von verschiedenen Map-Prozessen zu demselben Wort angelegt wurden, werden zusammengefügt. Für jede der entstandenen Zwischenergebnislisten (hier sortiert aufgeführt)
reduce T_der = [ 1 ] ++ [ 1, 1, 1 ] -> [ 4 ] T_die = [ 1, 1 ] -> [ 2 ] T_fest = [ 1 ] -> [ 1 ] T_gemauert = [ 1 ] -> [ 1 ] T_glocke = [ 1 ] -> [ 1 ] T_heiss = [ 1 ] -> [ 1 ] T_heute = [ 1 ] -> [ 1 ] T_in = [ 1 ] -> [ 1 ] T_muß = [ 1 ] ++ [ 1 ] -> [ 2 ] T_stirne = [ 1 ] -> [ 1 ] T_von = [ 1, 1 ] -> [ 2 ]
usw.
können wir parallel einen Reduce-Prozess starten, der jeweils die Elemente aufzählt. Das Ergebnis von MapReduce sieht in etwa so aus:
Ausgabeliste = [ ("fest", 1), ("heute", 1), ("von", 2), ("gemauert", 1), ("muß", 2), ("der", 4), ("in", 1), ("die", 2), .. ]
Fazit
Die MapReduce-Formulierung hat den Vorteil, dass sich mit den zwei Phasen in natürlicher Weise jeweils eine Parallelisierungsmöglichkeit ergibt, welche man mit einem Cluster für eine beschleunigte Berechnung verwenden kann. Bei sehr großen Datenmengen ist die Parallelisierung unter Umständen bereits erforderlich, weil die Datenmengen für einen einzelnen Prozess (und das ausführende Rechnersystem) zu groß sind.
Mehr Beispiele
Verfahren Map-Funktion Reduce-Funktion Verteiltes grep Gibt die gefundene Zeile (hit) in einen Zwischenergebnisspeicher Reicht durch (Identische Abbildung, genauer: Projektion der 2. Komponente) Weblinks
Fachartikel
- "MapReduce: Simplified Data Processing on Large Clusters" – Paper von Jeffrey Dean und Sanjay Ghemawat, Google Labs
- "Interpreting the Data: Parallel Analysis with Sawzall" – Paper von Rob Pike, Sean Dorward, Robert Griesemer, Sean Quinlan, Google Labs
- "Evaluating MapReduce for Multi-core and Multiprocessor Systems" – Paper von Colby Ranger, Ramanan Raghuraman, Arun Penmetsa, Gary Bradski, und Christos Kozyrakis, Stanford University (PDF-Datei; 353 kB)
- "Why MapReduce Matters to SQL Data Warehousing" – Analyse zur Einführung von MapReduce/SQL seitens Aster Data Systems und Greenplum
- "MapReduce for the Cell B.E. Architecture" – Paper von Marc de Kruijf und Karthikeyan Sankaralingam, University of Wisconsin-Madison (PDF-Datei; 528 kB)
- "Mars: A MapReduce Framework on Graphics Processors" – Paper von Bingsheng He, Wenbin Fang, Qiong Luo, Naga K. Govindaraju, Tuyong Wang, Hong Kong University of Science and Technology, veröffentlicht in Proc. PACT 2008. Es führt Design und Implementierung von MapReduce auf Graphikprozessoren vor. (PDF-Datei; 326 kB)
- "Map-Reduce-Merge: Simplified Relational Data Processing on Large Clusters" – Paper von Hung-Chih Yang, Ali Dasdan, Ruey-Lung Hsiao und D. Stott Parker, Yahoo und UCLA, veröffentlicht in Proc. of ACM SIGMOD, pp. 1029--1040, 2007. (Dieses Paper zeigt, wie man MapReduce auf relationale Datenverarbeitung ausweitet)
- FLuX: Der Fault-tolerant, Load Balancing eXchange operator der UC Berkeley bietet eine Alternative zu Googles MapReduce, mit Failover aber zusätzlichen Implementierungskosten.
Software
- Apache Hadoop MapReduce
- disco Open Source Projekt (Python und Erlang) des Nokia Research Center
- Greenplum MapReduce von Greenplum
- IBM MapReduce Tools for Eclipse – ein Plug-in, welches die Erstellung von MapReduce Anwendungen mit Eclipse unterstützt.
- QtConcurrent Open Source C++ MapReduce Implementierung (nicht-verteilt) des Qt_Development_Frameworks von Nokia
- Skynet Ruby Map/Reduce Framework
- Plasma MapReduce ist eine Open Source MapReduce Implementierung in Ocaml mit einem eigenen verteilten Dateisystem, PlasmaFS. Plasma MapReduce wurde von Gerd Stolpmann (Darmstadt) entwickelt.
- Stratosphere PACT Programmiermodell: Erweiterung und Generalisierung des MapReduce Programmiermodells
- [1] Data Management und Analyse Engine für Big Data welche auf MapReduce basiert
Quellen
- ↑ Google spotlights data center inner workings | Tech news blog - CNET News.com
- ↑ "Our abstraction is inspired by the map and reduce primitives present in Lisp and many other functional languages." -"MapReduce: Simplified Data Processing on Large Clusters", von Jeffrey Dean und Sanjay Ghemawat, Google Labs
- ↑ "Google's MapReduce Programming Model -- Revisited" – Paper von Ralf Lämmel, Microsoft
- ↑ Patentanmeldung (US Patent Office)
- ↑ MapReduce-Paper
Kategorien:- Computercluster
- Parallelverarbeitung
- Softwarearchitektur
- Verteiltes System
Wikimedia Foundation.