Jaguar Integration with SparkR

Once you have R and SparkR packages installed, you can start the SparkR program by executing the following command:

 

#!/bin/bash

export JAVA_HOME=/usr/lib/java/jdk1.7.0_75
LIBPATH=/usr/lib/R/site-library/rJava/libs:$HOME/jaguar/lib
LDLIBPATH=$LIBPATH:$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server
JDBCJAR=$HOME/jaguar/lib/jaguar-jdbc-2.0.jar

sparkR \
–driver-class-path $JDBCJAR \
–driver-library-path $LDLIBPATH \
–conf spark.executor.extraClassPath=$JDBCJAR \
–conf spark.executor.extraLibraryPath=$LDLIBPATH

 

Then in the SparkR command line prompt, you can execute the following R commands:

 

library(RJDBC)
library(SparkR)

sc <- sparkR.init(master=”spark://mymaster:7077″, appName=”MyTest”)

sqlContext <- sparkRSQL.init(sc )

drv <- JDBC(“com.jaguar.jdbc.JaguarDriver”, “/home/exeray/jaguar/lib/jaguar-jdbc-2.0.jar”, “`”)

conn <- dbConnect(drv, “jdbc:jaguar://localhost:8888/test”, “test”, “test” )

dbListTables(conn)

df <- dbGetQuery(conn, “select * from int10k where uid > ‘anxnfkjj2329’ limit 5000;”)

head( df )

#correlation
> cor(df$uid,df$score)
[1] 0.05107418

#build the simple linear regression
> model<-lm(uid~score,data=df)
> model

Call:
lm(formula = uid ~ score, data = df)

Coefficients:
(Intercept) score
2.115e+07 1.025e-03

#get the names of all of the attributes
> attributes(model)
$names
[1] “coefficients” “residuals” “effects” “rank”
[5] “fitted.values” “assign” “qr” “df.residual”
[9] “xlevels” “call” “terms” “model”

$class
[1] “lm”

 

 

Jaguar’s successful integration with Spark and SparkR  allows wide range of data analytics  over the underlying fast Jaguar data engine.

 

Advertisements

Jaguar Supports R

R is a powerful language and environment for statistical computing and graphics.  Jaguar’s JDBC API can integrate with R for extensive data modelling and analysis.  To use R with Jaguar, the RJDBC library needs to be installed first:

 

 

           $ sudo apt-get install r-cran-rjava

$ sudo R

> install.packages(“RJDBC”, dep=true)

> q()

 

$ unset JAVA_HOME

$ R

> library(RJDBC)

> drv <- JDBC(“com.jaguar.jdbc.JaguarDriver”, “/pathtomy/jaguar-jdbc-2.0.jar”, “`”)

> conn <- dbConnect(drv, “jdbc:jaguar://localhost:8888/test”, “test”, “test” )

> dbListTables(conn)

> dbGetQuery(conn, “select count(*) from mytable;”)

> d <- dbReadTable(conn, “mytable;”)

> q()

Jaguar Supports Spark

Since now Jaguar provides JDBC connectivity, developers can use Apache Spark to load data from Jaguar and perform data analytics and machine learning. The advantages provided by Jaguar is that Spark can load data faster, especially for loading data satisfying complex conditions, from Jaguar than from other data sources. The following code is based on two tables that have the following structure:

create table int10k ( key: uid int(16), score float(16.3), value: city char(32) );
create table int10k_2 ( key: uid int(16), score float(16.3), value: city char(32) );

Scala program:

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import scala.collection._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.log4j.Logger
import org.apache.log4j.Level
import com.jaguar.jdbc.internal.jaguar._
import com.jaguar.jdbc.JaguarDataSource

object TestScalaJDBC {
def main(args: Array[String]) {
sparkfunc()
}

def sparkfunc()
{
Class.forName(“com.jaguar.jdbc.JaguarDriver”);
val sparkConf = new SparkConf().setAppName(“TestScalaJDBC”)
val sc = new SparkContext(sparkConf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

Logger.getLogger(“org”).setLevel(Level.OFF)
Logger.getLogger(“akka”).setLevel(Level.OFF)

val people = sqlContext.read.format(“jdbc”)
.options(
Map( “url” -> “jdbc:jaguar://127.0.0.1:8888/test”,
“dbtable” -> “int10k”,
“user” -> “test”,
“password” -> “test”,
“partitionColumn” -> “uid”,
“lowerBound” -> “2”,
“upperBound” -> “2000000”,
“numPartitions” -> “4”,
“driver” -> “com.jaguar.jdbc.JaguarDriver”
)).load()

// work fine
people.registerTempTable(“int10k”)
people.printSchema()

val people2 = sqlContext.read.format(“jdbc”)
.options(
Map( “url” -> “jdbc:jaguar://127.0.0.1:8888/test”,
“dbtable” -> “int10k_2”,
“user” -> “test”,
“password” -> “test”,
“partitionColumn” -> “uid”,
“lowerBound” -> “2”,
“upperBound” -> “2000000”,
“numPartitions” -> “4”,
“driver” -> “com.jaguar.jdbc.JaguarDriver”
)).load()
people2.registerTempTable(“int10k_2”)

// sort by columns

people.sort(“score”).show()
people.sort($”score”.desc).show()
people.sort($”score”.desc, $”uid”.asc).show()
people.orderBy($”score”.desc, $”uid”.asc).show()

// select by expression
people.selectExpr(“score”, “uid” ).show()
people.selectExpr(“score”, “uid as keyone” ).show()
people.selectExpr(“score”, “uid as keyone”, “abs(score)” ).show()

// select a few columns
val uid2 = people.select(“uid”, “score”)
uid2.show();

// filter rows
val below60 = people.filter(people(“uid”) > 20990397 ).show()

// group by
people.groupBy(“city”).count().show()

// groupby and average
people.groupBy(“city”).avg().show()

people.groupBy(people(“city”))
.agg(
Map(
“score” -> “avg”,
“uid” -> “max”
)
)
.show();

// rollup
people.rollup(“city”).avg().show()
people.rollup($”city”)
.agg(
Map(
“uid” -> “avg”,
“score” -> “max”
)
)
.show();

// cube
people.cube($”city”).avg().show()
people.cube($”city”)
.agg(
Map(
“uid” -> “avg”,
“score” -> “max”
)
)
.show();

// describe statistics
people.describe( “uid”, “score”).show()

// find frequent items
people.stat.freqItems( Seq(“uid”) ).show()

// join two tables
people.join( people2, “uid” ).show()
people.join( people2, “score” ).show()
people.join(people2).where ( people(“uid”) === people2(“uid”) ).show()
people.join(people2).where ( people(“city”) === people2(“city”) ).show()
people.join(people2).where ( people(“uid”) === people2(“uid”) and people(“city”) === people2(“city”) ).show()
people.join(people2).where ( people(“uid”) === people2(“uid”) && people(“city”) === people2(“city”) ).show()
people.join(people2).where ( people(“uid”) === people2(“uid”) && people(“city”) === people2(“city”) ) .limit(3).show()

// union
people.unionAll(people2).show()

// intersection
people.intersect(people2).show()

// exception
people.except(people2).show()

// Take samples
people.sample( true, 0.1, 100 ).show()

// distinct
people.distinct.show()

// same as distinct
people.dropDuplicates().show()

// cache and persist
people.dropDuplicates.cache.show()
people.dropDuplicates.persist.show()

// SQL dataframe
val df = sqlContext.sql(“SELECT * FROM int10k where uid < 200000000 and city between ‘Alameda’ and ‘Berkeley’ “)
df.distinct.show()

The class generated from the above Scala program can be submitted to Spark as follows:

/bin/spark-submit –class TestScalaJDBC \
–master spark://masterhost:7077 \
–driver-class-path /path/to/your/jaguar-jdbc-2.0.jar \
–driver-library-path $HOME/jaguar/lib \
–conf spark.executor.extraClassPath=/path/to/your/jaguar-jdbc-2.0.jar \
–conf spark.executor.extraLibraryPath=$HOME/jaguar/lib \
/path/to/your_project/target/scala-2.10/testjdbc_2.10-1.0.jar

Source: http://www.datajaguar.com