Spark Import

A seguire una serie di esempi di import (save su file system o hdfs) di file con la shell di spark 1.6

Avvio della shell:

simon@Itaca ~: spark-shell
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's repl log4j profile: org/apache/spark/log4j-defaults-repl.properties
To adjust logging level use sc.setLogLevel("INFO")
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.3
      /_/

Using Scala version 2.10.5 (OpenJDK 64-Bit Server VM, Java 1.8.0_171)
Type in expressions to have them evaluated.
Type :help for more information.
18/06/22 23:42:08 WARN Utils: Your hostname, Itaca resolves to a loopback address: 127.0.0.1; using 192.168.1.102 instead (on interface wlan0)
18/06/22 23:42:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Spark context available as sc.
18/06/22 23:42:11 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
18/06/22 23:42:11 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
18/06/22 23:42:15 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
18/06/22 23:42:15 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
18/06/22 23:42:17 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
18/06/22 23:42:17 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
SQL context available as sqlContext.

scala> 

Nota:

  • avviare il terminale con "spark-shell --packages com.databricks:spark-avro_2.10:2.0.1,com.databricks:spark-csv_2.10:1.5.0" per consentire il salvataggio nei formati csv e avro.
  • If you wish to run that script the way you are running it, you'll need to use the --jars for local jars or --packages for remote repo when you run the command.

Prologo:

Per contestualizzare le varie forme di import possibili procediamo con un esempio. Leggiamo un file csv employees.csv, andiamo a contare il numero dei dipendenti per genere e salvare in vari formati il risultato.

Gli step logici sono:

  • lettura RDD
  • creazione di una case class e mappatura
  • trasformazione in dataframe
  • regitrazione come tabella temporanea
  • business logic
  • salvataggio del risultato (nei vari formati)

Il file ha il seguente formato:

10001,1953-09-02,Georgi,Facello,M,1986-06-26
10002,1964-06-02,Bezalel,Simmel,F,1985-11-21
10003,1959-12-03,Parto,Bamford,M,1986-08-28
10004,1954-05-01,Chirstian,Koblick,M,1986-12-01
10005,1955-01-21,Kyoichi,Maliniak,M,1989-09-12
10006,1953-04-20,Anneke,Preusig,F,1989-06-02
10007,1957-05-23,Tzvetan,Zielinski,F,1989-02-10
10008,1958-02-19,Saniya,Kalloufi,M,1994-09-15
...

Lettura del file

scala> val rdd =sc.textFile("/home/simon/resources/employees.csv")
rdd: org.apache.spark.rdd.RDD[String] = /home/simon/resources/employees.csv MapPartitionsRDD[7] at textFile at <console>:27

scala> rdd.take(1)
res3: Array[String] = Array(10001,1953-09-02,Georgi,Facello,M,1986-06-26)

Divisione dei dati per riga in un array per riga Creazione di una case class, e binding sulla case class

scala> case class Record(c_0:String,c_1:String,c_2:String,c_3:String,c_4:String,c_5:String)
defined class Record

scala> val rdd2 = rdd.map(_.split(",")).map(x=>Record(x(0),x(1),x(2),x(3),x(4),x(5)))
rdd2: org.apache.spark.rdd.RDD[Record] = MapPartitionsRDD[28] at map at <console>:31

Trasformazione in dataframe

scala> val df = rdd2.toDF
df: org.apache.spark.sql.DataFrame = [c_0: string, c_0: string, c_2: string, c_3: string, c_4: string, c_5: string]

scala> df.show(1)
+-----+----------+------+-------+---+----------+
|  c_0|       c_1|   c_2|    c_3|c_4|       c_5|
+-----+----------+------+-------+---+----------+
|10001|1953-09-02|Georgi|Facello|  M|1986-06-26|
+-----+----------+------+-------+---+----------+
only showing top 1 row

registrazione come tabella e estrazione delle informazioni

scala> df.registerTempTable("employees")

scala> val dfresult = sqlContext.sql("select c_4 , count(*) as totale from employees group by c_4")
dfresult: org.apache.spark.sql.DataFrame = [c_4: string, totale: bigint]


scala> dfresult.show(2)
+---+------+
|c_5|totale|
+---+------+
|  F|120051|
|  M|179973|
+---+------+

Salvataggio dei dati

In sequenza i comandi proposti per un eventuale veloce reinserimento nel terminale.

val rdd =sc.textFile("/home/simon/resources/employees.csv") 
case class Record(c_0:String,c_1:String,c_2:String,c_3:String,c_4:String,c_5:String) 
val rdd2 = rdd.map(_.split(",")).map(x=>Record(x(0),x(1),x(2),x(3),x(4),x(5)))
val df = rdd2.toDF
df.registerTempTable("employees")
val dfresult = sqlContext.sql("select c_4 , count(*) as totale from employees group by c_4")

TXT

dfresult.map(x=> x.get(0)+"-"+x.get(1)).saveAsTextFile("hdfs://localhost:9000/cca175/ouput5")

CSV

  • bisogna importare la libreria "spark-shell --packages com.databricks:spark-csv_2.10:1.5.0"
dfresult.rdd.map(_.mkString(";")).saveAsTextFile("hdfs://localhost:9000/cca175/ouput")

dfresult.repartition(1).write.format("com.databricks.spark.csv").option("header","true").save("hdfs://localhost:9000/cca175/output")

dfresult.repartition(1).write.format("com.databricks.spark.csv").option("header","true").option("delimiter", "|").save("hdfs://localhost:9000/cca175/output")

JSON

dfresult.repartition(1).write.format("json").save("hdfs://localhost:9000/cca175/output2")

PARQUET

dfresult.repartition(1).write.format("parquet").save("hdfs://localhost:9000/cca175/output3")

AVRO

  • bisogna importare la libreria "spark-shell --packages com.databricks:spark-avro_2.10:2.0.1"
  • per spark 2.0 usare -packages com.databricks:spark-avro_2.11:3.2.0
import com.databricks.spark.avro._
dfresult.repartition(1).write.format("com.databricks.spark.avro").save("hdfs://localhost:9000/cca175/output4")

Compressione dei dati

In sequenza i comandi proposti per un eventuale veloce reinserimento nel terminale (lasciando tutti i dati per poter controllare la dimensione dei file prodotti).

val rdd =sc.textFile("/home/simon/resources/employees.csv") 
case class Record(c_0:String,c_1:String,c_2:String,c_3:String,c_4:String,c_5:String) 
val rdd2 = rdd.map(_.split(",")).map(x=>Record(x(0),x(1),x(2),x(3),x(4),x(5)))
val dfresult = rdd2.toDF
Compression format Hadoop CompressionCodec
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
LZO com.hadoop.compression.lzo.LzopCodec
LZ4 org.apache.hadoop.io.compress.Lz4Codec
Snappy org.apache.hadoop.io.compress.SnappyCodec

TXT

dfresult.map(x=> x.get(0)+"-"+x.get(1)).saveAsTextFile("/home/simon/output/gzip", classOf[org.apache.hadoop.io.compress.GzipCodec])

dfresult.map(x=> x.get(0)+"-"+x.get(1)).saveAsTextFile("/home/simon/output/bzip2", classOf[org.apache.hadoop.io.compress.BZip2Codec])

# mancano le dipendenze nella spark-shell
dfresult.map(x=> x.get(0)+"-"+x.get(1)).saveAsTextFile("/home/simon/output/lzo", classOf[com.hadoop.compression.lzo.LzopCodec])
dfresult.map(x=> x.get(0)+"-"+x.get(1)).saveAsTextFile("/home/simon/output/lz4", classOf[org.apache.hadoop.io.compress.Lz4Codec])
dfresult.map(x=> x.get(0)+"-"+x.get(1)).saveAsTextFile("/home/simon/output/snappy", classOf[org.apache.hadoop.io.compress.SnappyCodec])

CSV

codec: compression codec to use when saving to file. Should be the fully qualified name of a class implementing org.apache.hadoop.io.compress.CompressionCodec or one of case-insensitive shorten names (bzip2, gzip, lz4, and snappy). Defaults to no compression when a codec is not specified.

dfresult.repartition(1).write.format("com.databricks.spark.csv").option("header","true").option("codec", "org.apache.hadoop.io.compress.GzipCodec").save("/home/simon/output/gzip")

dfresult.repartition(1).write.format("com.databricks.spark.csv").option("header","true").option("codec", "org.apache.hadoop.io.compress.BZip2Codec").save("/home/simon/output/bzip2")

...


OPPURE (da testare)

import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM testtable")
df.write.format("com.databricks.spark.csv").save("/data/home/csv")

To write the contents into a single file

import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM testtable")
df.coalesce(1).write.format("com.databricks.spark.csv").save("/data/home/sample.csv")

PARQUET

Compressione di default e' gzip bsqlContext.setConf("spark.sql.parquet.compression.codec", "uncompressed") The value highlighted could be one of the four : uncompressed, snappy, gzip, lzo

The value highlighted could be one of the four : uncompressed, snappy, gzip, lzo

dfresult.repartition(1).write.format("parquet").option("codec", "org.apache.hadoop.io.compress.GzipCodec").save("/home/simon/output/gzip")

dfresult.repartition(1).write.format("parquet").option("codec", "org.apache.hadoop.io.compress.BZip2Codec").save("/home/simon/output/bzip")

AVRO

Compression: You can specify the type of compression to use when writing Avro out to disk. The supported types are uncompressed, snappy, and deflate. You can also specify the deflate level.( di default è snappy)

import com.databricks.spark.avro._

// configuration to use deflate compression
sqlContext.setConf("spark.sql.avro.compression.codec", "deflate")
sqlContext.setConf("spark.sql.avro.deflate.level", "5")

dfresult.repartition(1).write.format("com.databricks.spark.avro").save("/home/simon/output/deflate")

// configuration to use snappy compression
sqlContext.setConf("spark.sql.avro.compression.codec", "snappy")

dfresult.repartition(1).write.format("com.databricks.spark.avro").save("/home/simon/output/snappy")

// configuration to use uncompressed compression
sqlContext.setConf("spark.sql.avro.compression.codec", "uncompressed")

dfresult.repartition(1).write.format("com.databricks.spark.avro").save("/home/simon/output/uncompressed")