Apache Spark
Vorteil ggü. MapReduce:
- Inmemory
- Weitere Operationen (Join, Filter, …)
Resilient Distributed Datasets
- Können nach dem Erstellen nicht mehr geändert werden
- Anzahl der Partitionen wird durch Programmierer bestimmt
- Zwei Arten von Operationen:
- Transformationen: Lazy, d.h. nicht direkt berechnet; Transformierte RDD wird ausgeführt, wenn eine Aktion darauf ausgeführt wird.
- Actions
- Persistieren von RDDs in Memory oder auf Disk.
Workflow:
- Erstellen eines RDD von Datenquelle (Datei, Liste)
- Anwenden von Transformationen auf RDD: map, filter
- Anwenden von Actions auf RDD: collect, count
- Collect löst parallelize, filter und map aus.
Erstellen von RDD:
fillde = sc.textFile(“…”, 4)
Transformation:
Spark optimiert die notwendigen Schritte
- map(f): Neues RDD in dem jedes Element der Data Source durch Funktion f gegeben wird.
- filter(f): Neues RDD in dem alle Elemente enthalten sind, für die f true zurückgibt.
- distinct([numTasks]): Neues RDD, das eindeutige Elemente enthält.
- flatMap(f): Ähnlich zu map, nur dass jedes Element zu 0 oder mehreren Ausgabeelementen gemappt werden kann (f gibt eine Sequenz zurück anstelle eines einzelnen Elements)
Beispiele:
rdd = sc.parallelize([1,2,3,4]) rdd.map(lambda x: x * 2); rdd.filter(lambda x: x % 2 == 0)
(Funktionen werden auf die Worker verteilt).
rdd.Map(lambda x: [x, x+5]) 1_6_2_7_3_8 rdd.flatMap(lambda x: [x, x+5]) [1,6,2,7,3,8]
Key-Value Transformationen rdd = sc.parallelize([(1,2), (3,4)])
- reduceByKey(f): RDD mit (K,V) Paaren, bei dem die Werte jedes Keys mit der reduce Funktion f aggregiert wurden (V,V) → V
- sortByKey(): RDD mit nach Key sortierten (K,V) Paaren
- groupByKey(): neues RDD mit (K, Iterable<V>) Paaren
Beispiel: rdd = sc.parallelize([(1,2), (3,4), (3,6)]) rdd.reduceByKey(lambda a, b: a + b) [(1,2), (3,4), (3,6)] → [(1,2), (3,10)]
rdd2 = sc.parallelize([(1,'a'), (2,'c'), (1,'b')]) rdd.groupByKey() [(1,'a'), (2,'c'), (1,'b')] → [(1,['a','b']), (2,'c')]
Spark Actions
- reduce(f): Assoziative, kommutative Funktion f aggregiert Elemente in dem sie zwei Argumente entgegennimmt.
- take(n): Array mit ersten n Elementen.
- collect(): Gibt alle Elemente als Array zurück.
- takeOrdered(n, key=f): Gibt n Elemente geordnet nach f zurück.
.cache() zum Cachen von Objekten.
Spark Programmlebenszyklus
1. RDDs erstellen 2. RDDs transformieren 3. Cachen 4. Actions ausführen
pySpark SharedVariables
Broadcast Variables:
- Senden großer read-only Werte zu Worker (z.B. Datenset)
- Gespeichert auf jedem Worker für ein oder mehrere Operationen
signPrefixes = sc.broadcast(…) signPrefixes.value
Accumulator:
- Aggregierte Werte von Worker zu Driver
- Write-Only, nur Driver können auf den Wert zurückgreifen.
accum = sc.accumulator(0) … def f(x):
global accum accum += x
…
accum.value