RDD API

Liberamente tratto e tradotto da:http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html (con eventuali modifiche, revisioni ed integrazioni)

RDD e' l'acronimo di Resilient Distributed Dataset. I RDD sono il punto focale e di partenza del sistema Spark. Come utente, si puo' considerare un RDD come il contenitore per una raccolta di singole partizioni di dati, che sono il risultato di alcuni calcoli.

Tuttavia, un RDD e' in realta' piu' di questo. Nelle installazioni cluster, partizioni dati separate possono essere su nodi separati. Utilizzando l'RDD come contenitore, e' possibile accedere a tutte le partizioni ed eseguire calcoli e trasformazioni utilizzando i dati contenuti. Ogni volta che si perde una parte di un RDD o di un intero RDD, il sistema e' in grado di ricostruire i dati delle partizioni perse utilizzando le informazioni di derivazione.

Tutti gli RDD disponibili in Spark derivano direttamente o indirettamente dalla classe RDD. Questa classe viene fornita con un ampio set di metodi che eseguono operazioni sui dati all'interno delle partizioni associate. La classe RDD e' astratta. Ogni volta che si utilizza un RDD, si sta effettivamente utilizzando un'implementazione concertata di RDD. Queste implementazioni devono sovrascrivere alcune funzioni di base per rendere il comportamento dell'RDD come previsto.

Uno dei motivi per cui Spark e' diventato recentemente un sistema molto popolare per l'elaborazione di big data e' che non impone restrizioni riguardo a quali dati possono essere memorizzati all'interno delle partizioni RDD. L'API RDD contiene gia' molte operazioni utili. Tuttavia, poiche' i creatori di Spark dovevano mantenere l'API di base dei RDD abbastanza generica da gestire tipi di dati arbitrari, mancano molte funzioni di convenienza.

L'API RDD di base considera ogni elemento di dati come un singolo valore. Tuttavia, gli utenti spesso desiderano lavorare con coppie chiave-valore. Pertanto Spark ha esteso l'interfaccia di RDD per fornire funzioni aggiuntive (PairRDDFunctions), che funzionano esplicitamente sulle coppie chiave-valore. Attualmente, ci sono quattro estensioni dell'API RDD disponibili in spark. Sono come segue:

  • DoubleRDDFunctions: Questa estensione contiene molti metodi utili per l'aggregazione di valori numerici. Diventano disponibili se gli elementi di dati di un RDD sono implicitamente convertibili nel Double di Scala.
  • PairRDDFunctions: I metodi definiti in questa interfaccia diventano disponibili quando gli elementi di dati hanno una struttura di tupla a due componenti (chiave,valore).
  • OrderedRDDFunctions: I metodi definiti in questa interfaccia diventano disponibili se gli elementi di dati sono tuple a due componenti (chiave,valore) in cui la chiave e' implicitamente ordinabile.
  • SequenceFileRDDFunctions: Questa estensione contiene diversi metodi che consentono agli utenti di creare sequenze Hadoop da RDD. I dati devono essere PairRDDFunctions con particolari condizioni.

map

Applica una trasformazione ad ogni elemento dell'RDD

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.map(_.length)
val c = a.zip(b)
c.collect
res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))

mapPartitions

Operazione che viene eseguita una sola volta per partizione

val a = sc.parallelize(1 to 9, 3)
def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
  var res = List[(T, T)]()
  var pre = iter.next
  while (iter.hasNext)
  {
    val cur = iter.next;
    res .::= (pre, cur)
    pre = cur;
  }
  res.iterator
}
a.mapPartitions(myfunc).collect
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))

mapValues

Operazione di map su un pair RDD lavorando sui valori

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.mapValues("x" + _ + "x").collect
res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (7,xpantherx), (5,xeaglex))

persist, cache

Inserisce un punto di persistenza tra le trasformazioni facendo ripartire un eventuale seconda action da questo punto senza ripetere tutte le trasformazioni.Si puo' persistere in ram o su disco in formato compresso o meno (persist(StorageLevel.MEMORY_ONLY))

reduce

Esegue una operazione f sui dati

val a = sc.parallelize(1 to 100, 3)
a.reduce(_ + _)
res41: Int = 5050

reduceByKey

Esegue una operazione per chiave

val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
val b = a.map(x => (x.length, x))
b.reduceByKey(_ + _).collect
res86: Array[(Int, String)] = Array((3,dogcatowlgnuant))

repartition

Ripartisce i dati per uno specifico numero di partizioni

val rdd = sc.parallelize(List(1, 2, 10, 4, 5, 2, 1, 1, 1), 3)
rdd.partitions.length
res2: Int = 3
val rdd2  = rdd.repartition(5)
rdd2.partitions.length
res6: Int = 5

sample

Campiona il dato

val a = sc.parallelize(1 to 10000, 3)
a.sample(false, 0.1, 0).count
res24: Long = 960

a.sample(true, 0.3, 0).count
res25: Long = 2888

a.sample(true, 0.3, 13).count
res26: Long = 2985

sampleByKey

Campionamento per chiave

val randRDD = sc.parallelize(List( (7,"cat"), (6, "mouse"),(7, "cup"), (6, "book"), (7, "tv"), (6, "screen"), (7, "heater")))
val sampleMap = List((7, 0.4), (6, 0.6)).toMap
randRDD.sampleByKey(false, sampleMap,42).collect

res6: Array[(Int, String)] = Array((7,cat), (6,mouse), (6,book), (6,screen), (7,heater))

count

Numero degli elementi di un RDD

val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)
c.count
res2: Long = 4

countByKey

Conta gli elemti per chiave

val c = sc.parallelize(List((3, "Gnu"), (3, "Yak"), (5, "Mouse"), (3, "Dog")), 2)
c.countByKey
res3: scala.collection.Map[Int,Long] = Map(3 -> 3, 5 -> 1)

countByValue

Conta le occorrenze di un valore

val b = sc.parallelize(List(1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1))
b.countByValue
res27: scala.collection.Map[Int,Long] = Map(5 -> 1, 8 -> 1, 3 -> 1, 6 -> 1, 1 -> 6, 2 -> 3, 4 -> 2, 7 -> 1)

distinct

Torna un RDD con i valori distinti

val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
c.distinct.collect
res6: Array[String] = Array(Dog, Gnu, Cat, Rat)

first

Primo elemento dell'RDD

val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)
c.first
res1: String = Gnu

filter

Filtra il dato

val a = sc.parallelize(1 to 10, 3)
val b = a.filter(_ % 2 == 0)
b.collect
res3: Array[Int] = Array(2, 4, 6, 8, 10)

filterByRange

Filtra per intervallo

val randRDD = sc.parallelize(List( (2,"cat"), (6, "mouse"),(7, "cup"), (3, "book"), (4, "tv"), (1, "screen"), (5, "heater")), 3)
val sortedRDD = randRDD.sortByKey()

sortedRDD.filterByRange(1, 3).collect
res66: Array[(Int, String)] = Array((1,screen), (2,cat), (3,book))

subtract

A-B

val a = sc.parallelize(1 to 9, 3)
val b = sc.parallelize(1 to 3, 3)
val c = a.subtract(b)
c.collect
res3: Array[Int] = Array(6, 9, 4, 7, 5, 8)

subtractByKey

A-B per chiave

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2)
val b = a.keyBy(_.length)
val c = sc.parallelize(List("ant", "falcon", "squid"), 2)
val d = c.keyBy(_.length)
b.subtractByKey(d).collect
res15: Array[(Int, String)] = Array((4,lion))

take

prende "n" elementi dell'RDD

val b = sc.parallelize(List("dog", "cat", "ape", "salmon", "gnu"), 2)
b.take(2)
res18: Array[String] = Array(dog, cat)

val b = sc.parallelize(1 to 10000, 5000)
b.take(100)
res6: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)

zip

Unisce due RDD (di stessa dimensione)

val a = sc.parallelize(1 to 100, 3)
val b = sc.parallelize(101 to 200, 3)
a.zip(b).collect
res1: Array[(Int, Int)] = Array((1,101), (2,102), (3,103), (4,104), (5,105), (6,106), (7,107), (8,108), (9,109), (10,110), (11,111), (12,112), (13,113), (14,114), (15,115), (16,116), (17,117), (18,118), (19,119), (20,120), (21,121), (22,122), (23,123), (24,124), (25,125), (26,126), (27,127), (28,128), (29,129), (30,130), (31,131), (32,132), (33,133), (34,134), (35,135), (36,136), (37,137), (38,138), (39,139), (40,140), (41,141), (42,142), (43,143), (44,144), (45,145), (46,146), (47,147), (48,148), (49,149), (50,150), (51,151), (52,152), (53,153), (54,154), (55,155), (56,156), (57,157), (58,158), (59,159), (60,160), (61,161), (62,162), (63,163), (64,164), (65,165), (66,166), (67,167), (68,168), (69,169), (70,170), (71,171), (72,172), (73,173), (74,174), (75,175), (76,176), (77,177), (78,...

val a = sc.parallelize(1 to 100, 3)
val b = sc.parallelize(101 to 200, 3)
val c = sc.parallelize(201 to 300, 3)
a.zip(b).zip(c).map((x) => (x._1._1, x._1._2, x._2 )).collect
res12: Array[(Int, Int, Int)] = Array((1,101,201), (2,102,202), (3,103,203), (4,104,204), (5,105,205), (6,106,206), (7,107,207), (8,108,208), (9,109,209), (10,110,210), (11,111,211), (12,112,212), (13,113,213), (14,114,214), (15,115,215), (16,116,216), (17,117,217), (18,118,218), (19,119,219), (20,120,220), (21,121,221), (22,122,222), (23,123,223), (24,124,224), (25,125,225), (26,126,226), (27,127,227), (28,128,228), (29,129,229), (30,130,230), (31,131,231), (32,132,232), (33,133,233), (34,134,234), (35,135,235), (36,136,236), (37,137,237), (38,138,238), (39,139,239), (40,140,240), (41,141,241), (42,142,242), (43,143,243), (44,144,244), (45,145,245), (46,146,246), (47,147,247), (48,148,248), (49,149,249), (50,150,250), (51,151,251), (52,152,252), (53,153,253), (54,154,254), (55,155,255)...

union, ++

Unione tra due RDD

val a = sc.parallelize(1 to 3, 1)
val b = sc.parallelize(5 to 7, 1)
(a ++ b).collect
res0: Array[Int] = Array(1, 2, 3, 5, 6, 7)

join

Inner join tra pair RDD

val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.keyBy(_.length)
val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val d = c.keyBy(_.length)
b.join(d).collect

res0: Array[(Int, (String, String))] = Array((6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (3,(dog,dog)), (3,(dog,cat)), (3,(dog,gnu)), (3,(dog,bee)), (3,(rat,dog)), (3,(rat,cat)), (3,(rat,gnu)), (3,(rat,bee)))