A seguire una serie di esempi di import (save su file system o hdfs) di file con la shell di spark 1.6
Avvio della shell:
simon@Itaca ~: spark-shell log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Using Spark's repl log4j profile: org/apache/spark/log4j-defaults-repl.properties To adjust logging level use sc.setLogLevel("INFO") Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.6.3 /_/ Using Scala version 2.10.5 (OpenJDK 64-Bit Server VM, Java 1.8.0_171) Type in expressions to have them evaluated. Type :help for more information. 18/06/22 23:42:08 WARN Utils: Your hostname, Itaca resolves to a loopback address: 127.0.0.1; using 192.168.1.102 instead (on interface wlan0) 18/06/22 23:42:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Spark context available as sc. 18/06/22 23:42:11 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 18/06/22 23:42:11 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 18/06/22 23:42:15 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0 18/06/22 23:42:15 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException 18/06/22 23:42:17 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 18/06/22 23:42:17 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) SQL context available as sqlContext. scala>
Nota:
Per contestualizzare le varie forme di import possibili procediamo con un esempio. Leggiamo un file csv employees.csv, andiamo a contare il numero dei dipendenti per genere e salvare in vari formati il risultato.
Gli step logici sono:
Il file ha il seguente formato:
10001,1953-09-02,Georgi,Facello,M,1986-06-26 10002,1964-06-02,Bezalel,Simmel,F,1985-11-21 10003,1959-12-03,Parto,Bamford,M,1986-08-28 10004,1954-05-01,Chirstian,Koblick,M,1986-12-01 10005,1955-01-21,Kyoichi,Maliniak,M,1989-09-12 10006,1953-04-20,Anneke,Preusig,F,1989-06-02 10007,1957-05-23,Tzvetan,Zielinski,F,1989-02-10 10008,1958-02-19,Saniya,Kalloufi,M,1994-09-15 ...
Lettura del file
scala> val rdd =sc.textFile("/home/simon/resources/employees.csv") rdd: org.apache.spark.rdd.RDD[String] = /home/simon/resources/employees.csv MapPartitionsRDD[7] at textFile at <console>:27 scala> rdd.take(1) res3: Array[String] = Array(10001,1953-09-02,Georgi,Facello,M,1986-06-26)
Divisione dei dati per riga in un array per riga Creazione di una case class, e binding sulla case class
scala> case class Record(c_0:String,c_1:String,c_2:String,c_3:String,c_4:String,c_5:String) defined class Record scala> val rdd2 = rdd.map(_.split(",")).map(x=>Record(x(0),x(1),x(2),x(3),x(4),x(5))) rdd2: org.apache.spark.rdd.RDD[Record] = MapPartitionsRDD[28] at map at <console>:31
Trasformazione in dataframe
scala> val df = rdd2.toDF df: org.apache.spark.sql.DataFrame = [c_0: string, c_0: string, c_2: string, c_3: string, c_4: string, c_5: string] scala> df.show(1) +-----+----------+------+-------+---+----------+ | c_0| c_1| c_2| c_3|c_4| c_5| +-----+----------+------+-------+---+----------+ |10001|1953-09-02|Georgi|Facello| M|1986-06-26| +-----+----------+------+-------+---+----------+ only showing top 1 row
registrazione come tabella e estrazione delle informazioni
scala> df.registerTempTable("employees") scala> val dfresult = sqlContext.sql("select c_4 , count(*) as totale from employees group by c_4") dfresult: org.apache.spark.sql.DataFrame = [c_4: string, totale: bigint] scala> dfresult.show(2) +---+------+ |c_5|totale| +---+------+ | F|120051| | M|179973| +---+------+
In sequenza i comandi proposti per un eventuale veloce reinserimento nel terminale.
val rdd =sc.textFile("/home/simon/resources/employees.csv") case class Record(c_0:String,c_1:String,c_2:String,c_3:String,c_4:String,c_5:String) val rdd2 = rdd.map(_.split(",")).map(x=>Record(x(0),x(1),x(2),x(3),x(4),x(5))) val df = rdd2.toDF df.registerTempTable("employees") val dfresult = sqlContext.sql("select c_4 , count(*) as totale from employees group by c_4")
dfresult.rdd.map(_.mkString(";")).saveAsTextFile("hdfs://localhost:9000/cca175/ouput") dfresult.repartition(1).write.format("com.databricks.spark.csv").option("header","true").save("hdfs://localhost:9000/cca175/output") dfresult.repartition(1).write.format("com.databricks.spark.csv").option("header","true").option("delimiter", "|").save("hdfs://localhost:9000/cca175/output")
dfresult.repartition(1).write.format("parquet").save("hdfs://localhost:9000/cca175/output3")
import com.databricks.spark.avro._ dfresult.repartition(1).write.format("com.databricks.spark.avro").save("hdfs://localhost:9000/cca175/output4")
In sequenza i comandi proposti per un eventuale veloce reinserimento nel terminale (lasciando tutti i dati per poter controllare la dimensione dei file prodotti).
val rdd =sc.textFile("/home/simon/resources/employees.csv") case class Record(c_0:String,c_1:String,c_2:String,c_3:String,c_4:String,c_5:String) val rdd2 = rdd.map(_.split(",")).map(x=>Record(x(0),x(1),x(2),x(3),x(4),x(5))) val dfresult = rdd2.toDF
Compression format | Hadoop CompressionCodec |
DEFLATE | org.apache.hadoop.io.compress.DefaultCodec |
gzip | org.apache.hadoop.io.compress.GzipCodec |
bzip2 | org.apache.hadoop.io.compress.BZip2Codec |
LZO | com.hadoop.compression.lzo.LzopCodec |
LZ4 | org.apache.hadoop.io.compress.Lz4Codec |
Snappy | org.apache.hadoop.io.compress.SnappyCodec |
dfresult.map(x=> x.get(0)+"-"+x.get(1)).saveAsTextFile("/home/simon/output/gzip", classOf[org.apache.hadoop.io.compress.GzipCodec]) dfresult.map(x=> x.get(0)+"-"+x.get(1)).saveAsTextFile("/home/simon/output/bzip2", classOf[org.apache.hadoop.io.compress.BZip2Codec]) # mancano le dipendenze nella spark-shell dfresult.map(x=> x.get(0)+"-"+x.get(1)).saveAsTextFile("/home/simon/output/lzo", classOf[com.hadoop.compression.lzo.LzopCodec]) dfresult.map(x=> x.get(0)+"-"+x.get(1)).saveAsTextFile("/home/simon/output/lz4", classOf[org.apache.hadoop.io.compress.Lz4Codec]) dfresult.map(x=> x.get(0)+"-"+x.get(1)).saveAsTextFile("/home/simon/output/snappy", classOf[org.apache.hadoop.io.compress.SnappyCodec])
codec: compression codec to use when saving to file. Should be the fully qualified name of a class implementing org.apache.hadoop.io.compress.CompressionCodec or one of case-insensitive shorten names (bzip2, gzip, lz4, and snappy). Defaults to no compression when a codec is not specified.
dfresult.repartition(1).write.format("com.databricks.spark.csv").option("header","true").option("codec", "org.apache.hadoop.io.compress.GzipCodec").save("/home/simon/output/gzip") dfresult.repartition(1).write.format("com.databricks.spark.csv").option("header","true").option("codec", "org.apache.hadoop.io.compress.BZip2Codec").save("/home/simon/output/bzip2") ... OPPURE (da testare) import org.apache.spark.sql.hive.HiveContext // sc - existing spark context val sqlContext = new HiveContext(sc) val df = sqlContext.sql("SELECT * FROM testtable") df.write.format("com.databricks.spark.csv").save("/data/home/csv") To write the contents into a single file import org.apache.spark.sql.hive.HiveContext // sc - existing spark context val sqlContext = new HiveContext(sc) val df = sqlContext.sql("SELECT * FROM testtable") df.coalesce(1).write.format("com.databricks.spark.csv").save("/data/home/sample.csv")
Compressione di default e' gzip bsqlContext.setConf("spark.sql.parquet.compression.codec", "uncompressed") The value highlighted could be one of the four : uncompressed, snappy, gzip, lzo
The value highlighted could be one of the four : uncompressed, snappy, gzip, lzo
dfresult.repartition(1).write.format("parquet").option("codec", "org.apache.hadoop.io.compress.GzipCodec").save("/home/simon/output/gzip") dfresult.repartition(1).write.format("parquet").option("codec", "org.apache.hadoop.io.compress.BZip2Codec").save("/home/simon/output/bzip")
Compression: You can specify the type of compression to use when writing Avro out to disk. The supported types are uncompressed, snappy, and deflate. You can also specify the deflate level.( di default è snappy)
import com.databricks.spark.avro._ // configuration to use deflate compression sqlContext.setConf("spark.sql.avro.compression.codec", "deflate") sqlContext.setConf("spark.sql.avro.deflate.level", "5") dfresult.repartition(1).write.format("com.databricks.spark.avro").save("/home/simon/output/deflate") // configuration to use snappy compression sqlContext.setConf("spark.sql.avro.compression.codec", "snappy") dfresult.repartition(1).write.format("com.databricks.spark.avro").save("/home/simon/output/snappy") // configuration to use uncompressed compression sqlContext.setConf("spark.sql.avro.compression.codec", "uncompressed") dfresult.repartition(1).write.format("com.databricks.spark.avro").save("/home/simon/output/uncompressed")