Skip to content
Snippets Groups Projects
Commit b104c7f5 authored by Rohit Rai's avatar Rohit Rai
Browse files

Example to write the output to cassandra

parent 56c64c40
No related branches found
No related tags found
No related merge requests found
package spark.examples
import org.apache.hadoop.mapreduce.Job
import org.apache.cassandra.hadoop.{ConfigHelper, ColumnFamilyInputFormat}
import org.apache.cassandra.thrift.{IndexExpression, SliceRange, SlicePredicate}
import org.apache.cassandra.hadoop.{ColumnFamilyOutputFormat, ConfigHelper, ColumnFamilyInputFormat}
import org.apache.cassandra.thrift._
import spark.{RDD, SparkContext}
import SparkContext._
import spark.SparkContext._
import java.nio.ByteBuffer
import java.util.SortedMap
import org.apache.cassandra.db.IColumn
import org.apache.cassandra.utils.ByteBufferUtil
import scala.collection.JavaConversions._
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/*
......@@ -44,8 +43,15 @@ object CassandraTest {
ConfigHelper.setInputRpcPort(job.getConfiguration(), args(2))
ConfigHelper.setOutputInitialAddress(job.getConfiguration(), args(1))
ConfigHelper.setOutputRpcPort(job.getConfiguration(), args(2))
ConfigHelper.setInputColumnFamily(job.getConfiguration(), "casDemo", "Words")
ConfigHelper.setOutputColumnFamily(job.getConfiguration(), "casDemo", "WordCount")
val predicate = new SlicePredicate()
val sliceRange = new SliceRange()
sliceRange.setStart(Array.empty[Byte])
......@@ -55,6 +61,8 @@ object CassandraTest {
ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner")
ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner")
//Make a new Hadoop RDD
val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(),
classOf[ColumnFamilyInputFormat],
......@@ -74,6 +82,33 @@ object CassandraTest {
counts.collect().foreach {
case (word, count) => println(word + ":" + count)
}
counts.map {
case (word, count) => {
val colWord = new org.apache.cassandra.thrift.Column()
colWord.setName(ByteBufferUtil.bytes("word"))
colWord.setValue(ByteBufferUtil.bytes(word))
colWord.setTimestamp(System.currentTimeMillis)
val colCount = new org.apache.cassandra.thrift.Column()
colCount.setName(ByteBufferUtil.bytes("wcount"))
colCount.setValue(ByteBufferUtil.bytes(count.toLong))
colCount.setTimestamp(System.currentTimeMillis)
val outputkey = ByteBufferUtil.bytes(word + "-COUNT-" + System.currentTimeMillis)
val mutations: java.util.List[Mutation] = new Mutation() :: new Mutation() :: Nil
mutations.get(0).setColumn_or_supercolumn(new ColumnOrSuperColumn())
mutations.get(0).column_or_supercolumn.setColumn(colWord)
mutations.get(1).setColumn_or_supercolumn(new ColumnOrSuperColumn())
mutations.get(1).column_or_supercolumn.setColumn(colCount)
(outputkey, mutations)
}
}.saveAsNewAPIHadoopFile("casDemo", classOf[ByteBuffer], classOf[List[Mutation]],
classOf[ColumnFamilyOutputFormat], job.getConfiguration)
}
}
......@@ -81,6 +116,9 @@ object CassandraTest {
create keyspace casDemo;
use casDemo;
create column family WordCount with comparator = UTF8Type;
update column family WordCount with column_metadata = [{column_name: word, validation_class: UTF8Type}, {column_name: wcount, validation_class: LongType}];
create column family Words with comparator = UTF8Type;
update column family Words with column_metadata = [{column_name: book, validation_class: UTF8Type}, {column_name: para, validation_class: UTF8Type}];
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment