====== Apache Spark ====== Vorteil ggü. MapReduce: * Inmemory * Weitere Operationen (Join, Filter, ...) ===== Resilient Distributed Datasets ===== * Können nach dem Erstellen nicht mehr geändert werden * Anzahl der Partitionen wird durch Programmierer bestimmt * Zwei Arten von Operationen: * Transformationen: Lazy, d.h. nicht direkt berechnet; Transformierte RDD wird ausgeführt, wenn eine Aktion darauf ausgeführt wird. * Actions * Persistieren von RDDs in Memory oder auf Disk. Workflow: * Erstellen eines RDD von Datenquelle (Datei, Liste) * Anwenden von Transformationen auf RDD: map, filter * Anwenden von Actions auf RDD: collect, count * Collect löst parallelize, filter und map aus. Erstellen von RDD: fillde = sc.textFile("...", 4) ==== Transformation: ==== Spark optimiert die notwendigen Schritte * map(f): Neues RDD in dem jedes Element der Data Source durch Funktion f gegeben wird. * filter(f): Neues RDD in dem alle Elemente enthalten sind, für die f true zurückgibt. * distinct([numTasks]): Neues RDD, das eindeutige Elemente enthält. * flatMap(f): Ähnlich zu map, nur dass jedes Element zu 0 oder mehreren Ausgabeelementen gemappt werden kann (f gibt eine Sequenz zurück anstelle eines einzelnen Elements) Beispiele: rdd = sc.parallelize([1,2,3,4]) rdd.map(lambda x: x * 2); rdd.filter(lambda x: x % 2 == 0) (Funktionen werden auf die Worker verteilt). rdd.Map(lambda x: [x, x+5]) [[:1_6_2_7_3_8]] rdd.flatMap(lambda x: [x, x+5]) [1,6,2,7,3,8] Key-Value Transformationen rdd = sc.parallelize([(1,2), (3,4)]) * reduceByKey(f): RDD mit (K,V) Paaren, bei dem die Werte jedes Keys mit der reduce Funktion f aggregiert wurden (V,V) -> V * sortByKey(): RDD mit nach Key sortierten (K,V) Paaren * groupByKey(): neues RDD mit (K, Iterable) Paaren Beispiel: rdd = sc.parallelize([(1,2), (3,4), (3,6)]) rdd.reduceByKey(lambda a, b: a + b) [(1,2), (3,4), (3,6)] -> [(1,2), (3,10)] rdd2 = sc.parallelize([(1,'a'), (2,'c'), (1,'b')]) rdd.groupByKey() [(1,'a'), (2,'c'), (1,'b')] -> [(1,['a','b']), (2,'c')] ==== Spark Actions ==== * reduce(f): Assoziative, kommutative Funktion f aggregiert Elemente in dem sie zwei Argumente entgegennimmt. * take(n): Array mit ersten n Elementen. * collect(): Gibt alle Elemente als Array zurück. * takeOrdered(n, key=f): Gibt n Elemente geordnet nach f zurück. .cache() zum Cachen von Objekten. ==== Spark Programmlebenszyklus ==== 1. RDDs erstellen 2. RDDs transformieren 3. Cachen 4. Actions ausführen ==== pySpark SharedVariables ==== Broadcast Variables: * Senden großer read-only Werte zu Worker (z.B. Datenset) * Gespeichert auf jedem Worker für ein oder mehrere Operationen signPrefixes = sc.broadcast(...) signPrefixes.value Accumulator: * Aggregierte Werte von Worker zu Driver * Write-Only, nur Driver können auf den Wert zurückgreifen. accum = sc.accumulator(0) ... def f(x): global accum accum += x ... accum.value