Liberamente tratto e tradotto da:http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html (con eventuali modifiche, revisioni ed integrazioni)
RDD e' l'acronimo di Resilient Distributed Dataset. I RDD sono il punto focale e di partenza del sistema Spark. Come utente, si puo' considerare un RDD come il contenitore per una raccolta di singole partizioni di dati, che sono il risultato di alcuni calcoli.
Tuttavia, un RDD e' in realta' piu' di questo. Nelle installazioni cluster, partizioni dati separate possono essere su nodi separati. Utilizzando l'RDD come contenitore, e' possibile accedere a tutte le partizioni ed eseguire calcoli e trasformazioni utilizzando i dati contenuti. Ogni volta che si perde una parte di un RDD o di un intero RDD, il sistema e' in grado di ricostruire i dati delle partizioni perse utilizzando le informazioni di derivazione.
Tutti gli RDD disponibili in Spark derivano direttamente o indirettamente dalla classe RDD. Questa classe viene fornita con un ampio set di metodi che eseguono operazioni sui dati all'interno delle partizioni associate. La classe RDD e' astratta. Ogni volta che si utilizza un RDD, si sta effettivamente utilizzando un'implementazione concertata di RDD. Queste implementazioni devono sovrascrivere alcune funzioni di base per rendere il comportamento dell'RDD come previsto.
Uno dei motivi per cui Spark e' diventato recentemente un sistema molto popolare per l'elaborazione di big data e' che non impone restrizioni riguardo a quali dati possono essere memorizzati all'interno delle partizioni RDD. L'API RDD contiene gia' molte operazioni utili. Tuttavia, poiche' i creatori di Spark dovevano mantenere l'API di base dei RDD abbastanza generica da gestire tipi di dati arbitrari, mancano molte funzioni di convenienza.
L'API RDD di base considera ogni elemento di dati come un singolo valore. Tuttavia, gli utenti spesso desiderano lavorare con coppie chiave-valore. Pertanto Spark ha esteso l'interfaccia di RDD per fornire funzioni aggiuntive (PairRDDFunctions), che funzionano esplicitamente sulle coppie chiave-valore. Attualmente, ci sono quattro estensioni dell'API RDD disponibili in spark. Sono come segue:
Applica una trasformazione ad ogni elemento dell'RDD
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val b = a.map(_.length) val c = a.zip(b) c.collect res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))
Operazione che viene eseguita una sola volta per partizione
val a = sc.parallelize(1 to 9, 3) def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = { var res = List[(T, T)]() var pre = iter.next while (iter.hasNext) { val cur = iter.next; res .::= (pre, cur) pre = cur; } res.iterator } a.mapPartitions(myfunc).collect res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
Operazione di map su un pair RDD lavorando sui valori
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val b = a.map(x => (x.length, x)) b.mapValues("x" + _ + "x").collect res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (7,xpantherx), (5,xeaglex))
Inserisce un punto di persistenza tra le trasformazioni facendo ripartire un eventuale seconda action da questo punto senza ripetere tutte le trasformazioni.Si puo' persistere in ram o su disco in formato compresso o meno (persist(StorageLevel.MEMORY_ONLY))
Esegue una operazione f sui dati
val a = sc.parallelize(1 to 100, 3) a.reduce(_ + _) res41: Int = 5050
Esegue una operazione per chiave
val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2) val b = a.map(x => (x.length, x)) b.reduceByKey(_ + _).collect res86: Array[(Int, String)] = Array((3,dogcatowlgnuant))
Ripartisce i dati per uno specifico numero di partizioni
val rdd = sc.parallelize(List(1, 2, 10, 4, 5, 2, 1, 1, 1), 3) rdd.partitions.length res2: Int = 3 val rdd2 = rdd.repartition(5) rdd2.partitions.length res6: Int = 5
Campiona il dato
val a = sc.parallelize(1 to 10000, 3) a.sample(false, 0.1, 0).count res24: Long = 960 a.sample(true, 0.3, 0).count res25: Long = 2888 a.sample(true, 0.3, 13).count res26: Long = 2985
Campionamento per chiave
val randRDD = sc.parallelize(List( (7,"cat"), (6, "mouse"),(7, "cup"), (6, "book"), (7, "tv"), (6, "screen"), (7, "heater"))) val sampleMap = List((7, 0.4), (6, 0.6)).toMap randRDD.sampleByKey(false, sampleMap,42).collect res6: Array[(Int, String)] = Array((7,cat), (6,mouse), (6,book), (6,screen), (7,heater))
Numero degli elementi di un RDD
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2) c.count res2: Long = 4
Conta gli elemti per chiave
val c = sc.parallelize(List((3, "Gnu"), (3, "Yak"), (5, "Mouse"), (3, "Dog")), 2) c.countByKey res3: scala.collection.Map[Int,Long] = Map(3 -> 3, 5 -> 1)
Conta le occorrenze di un valore
val b = sc.parallelize(List(1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1)) b.countByValue res27: scala.collection.Map[Int,Long] = Map(5 -> 1, 8 -> 1, 3 -> 1, 6 -> 1, 1 -> 6, 2 -> 3, 4 -> 2, 7 -> 1)
Torna un RDD con i valori distinti
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2) c.distinct.collect res6: Array[String] = Array(Dog, Gnu, Cat, Rat)
Primo elemento dell'RDD
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2) c.first res1: String = Gnu
Filtra il dato
val a = sc.parallelize(1 to 10, 3) val b = a.filter(_ % 2 == 0) b.collect res3: Array[Int] = Array(2, 4, 6, 8, 10)
Filtra per intervallo
val randRDD = sc.parallelize(List( (2,"cat"), (6, "mouse"),(7, "cup"), (3, "book"), (4, "tv"), (1, "screen"), (5, "heater")), 3) val sortedRDD = randRDD.sortByKey() sortedRDD.filterByRange(1, 3).collect res66: Array[(Int, String)] = Array((1,screen), (2,cat), (3,book))
A-B
val a = sc.parallelize(1 to 9, 3) val b = sc.parallelize(1 to 3, 3) val c = a.subtract(b) c.collect res3: Array[Int] = Array(6, 9, 4, 7, 5, 8)
A-B per chiave
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2) val b = a.keyBy(_.length) val c = sc.parallelize(List("ant", "falcon", "squid"), 2) val d = c.keyBy(_.length) b.subtractByKey(d).collect res15: Array[(Int, String)] = Array((4,lion))
prende "n" elementi dell'RDD
val b = sc.parallelize(List("dog", "cat", "ape", "salmon", "gnu"), 2) b.take(2) res18: Array[String] = Array(dog, cat) val b = sc.parallelize(1 to 10000, 5000) b.take(100) res6: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)
Unisce due RDD (di stessa dimensione)
val a = sc.parallelize(1 to 100, 3) val b = sc.parallelize(101 to 200, 3) a.zip(b).collect res1: Array[(Int, Int)] = Array((1,101), (2,102), (3,103), (4,104), (5,105), (6,106), (7,107), (8,108), (9,109), (10,110), (11,111), (12,112), (13,113), (14,114), (15,115), (16,116), (17,117), (18,118), (19,119), (20,120), (21,121), (22,122), (23,123), (24,124), (25,125), (26,126), (27,127), (28,128), (29,129), (30,130), (31,131), (32,132), (33,133), (34,134), (35,135), (36,136), (37,137), (38,138), (39,139), (40,140), (41,141), (42,142), (43,143), (44,144), (45,145), (46,146), (47,147), (48,148), (49,149), (50,150), (51,151), (52,152), (53,153), (54,154), (55,155), (56,156), (57,157), (58,158), (59,159), (60,160), (61,161), (62,162), (63,163), (64,164), (65,165), (66,166), (67,167), (68,168), (69,169), (70,170), (71,171), (72,172), (73,173), (74,174), (75,175), (76,176), (77,177), (78,... val a = sc.parallelize(1 to 100, 3) val b = sc.parallelize(101 to 200, 3) val c = sc.parallelize(201 to 300, 3) a.zip(b).zip(c).map((x) => (x._1._1, x._1._2, x._2 )).collect res12: Array[(Int, Int, Int)] = Array((1,101,201), (2,102,202), (3,103,203), (4,104,204), (5,105,205), (6,106,206), (7,107,207), (8,108,208), (9,109,209), (10,110,210), (11,111,211), (12,112,212), (13,113,213), (14,114,214), (15,115,215), (16,116,216), (17,117,217), (18,118,218), (19,119,219), (20,120,220), (21,121,221), (22,122,222), (23,123,223), (24,124,224), (25,125,225), (26,126,226), (27,127,227), (28,128,228), (29,129,229), (30,130,230), (31,131,231), (32,132,232), (33,133,233), (34,134,234), (35,135,235), (36,136,236), (37,137,237), (38,138,238), (39,139,239), (40,140,240), (41,141,241), (42,142,242), (43,143,243), (44,144,244), (45,145,245), (46,146,246), (47,147,247), (48,148,248), (49,149,249), (50,150,250), (51,151,251), (52,152,252), (53,153,253), (54,154,254), (55,155,255)...
Unione tra due RDD
val a = sc.parallelize(1 to 3, 1) val b = sc.parallelize(5 to 7, 1) (a ++ b).collect res0: Array[Int] = Array(1, 2, 3, 5, 6, 7)
Inner join tra pair RDD
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val b = a.keyBy(_.length) val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3) val d = c.keyBy(_.length) b.join(d).collect res0: Array[(Int, (String, String))] = Array((6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (3,(dog,dog)), (3,(dog,cat)), (3,(dog,gnu)), (3,(dog,bee)), (3,(rat,dog)), (3,(rat,cat)), (3,(rat,gnu)), (3,(rat,bee)))