Jul 7, 2016

Basic Spark operations

Here are few basic spark operations like filtering, grouping etc.
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
object MySQLSpark {
def main(args: Array[String]): Unit = {
println("Testing MySQL Connection from Spark")
System.setProperty("hadoop.home.dir", "D:/sw/hadoop/hadoop-common-2.2.0-bin-master/");
val cfg = new SparkConf();
cfg.setMaster("local").setAppName("Spark MySQL Example in Scala");
val ctx = new SparkContext(cfg);
val sqlCtx = new SQLContext(ctx);
val orderDF = sqlCtx.read.format("jdbc").option("url", "jdbc:mysql://localhost/northwind")
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "orders").option("user", "root").option("password", "").load()
val customerDF = sqlCtx.read.format("jdbc").option("url", "jdbc:mysql://localhost/northwind")
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "customers").option("user", "root").option("password", "").load()
orderDF.select("id", "employee_id","customer_id","ship_name","order_date","shipper_id").filter("order_date is not null").filter("shipper_id = 2").limit(5).show()
orderDF.filter("order_date is not null").filter("shipper_id = 2").groupBy("ship_state_province").count().show()
orderDF.join(customerDF,orderDF("customer_id") === customerDF("id")).show();
}
}
Ensure following is set in argument
-Xms128m -Xmx512m