diff --git a/examples/pom.xml b/examples/pom.xml index fcd60e3b776a2cf57d77bb37c4908c5044d7af37..43f3d2e4de8635342a783916917a0e43c3fe3812 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -35,6 +35,10 @@ <sbt.project.name>examples</sbt.project.name> <build.testJarPhase>none</build.testJarPhase> <build.copyDependenciesPhase>package</build.copyDependenciesPhase> + <flume.deps.scope>provided</flume.deps.scope> + <hadoop.deps.scope>provided</hadoop.deps.scope> + <hive.deps.scope>provided</hive.deps.scope> + <parquet.deps.scope>provided</parquet.deps.scope> </properties> <dependencies> @@ -72,211 +76,39 @@ <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume_${scala.binary.version}</artifactId> <version>${project.version}</version> + <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId> <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-protocol</artifactId> - <version>${hbase.version}</version> - <scope>${hbase.deps.scope}</scope> - </dependency> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-common</artifactId> - <version>${hbase.version}</version> - <scope>${hbase.deps.scope}</scope> - <exclusions> - <exclusion> - <!-- SPARK-4455 --> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-annotations</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-client</artifactId> - <version>${hbase.version}</version> - <scope>${hbase.deps.scope}</scope> - <exclusions> - <exclusion> - <!-- SPARK-4455 --> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-annotations</artifactId> - </exclusion> - <exclusion> - <groupId>io.netty</groupId> - <artifactId>netty</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-server</artifactId> - <version>${hbase.version}</version> - <scope>${hbase.deps.scope}</scope> - <exclusions> - <exclusion> - <!-- SPARK-4455 --> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-annotations</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-common</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-jobclient</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-auth</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-annotations</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-hadoop1-compat</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.commons</groupId> - <artifactId>commons-math</artifactId> - </exclusion> - <exclusion> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-core</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </exclusion> - <exclusion> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-server</artifactId> - </exclusion> - <exclusion> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-core</artifactId> - </exclusion> - <exclusion> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-json</artifactId> - </exclusion> - <exclusion> - <!-- hbase uses v2.4, which is better, but ...--> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-hadoop-compat</artifactId> - <version>${hbase.version}</version> - <scope>${hbase.deps.scope}</scope> + <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-math3</artifactId> <scope>provided</scope> </dependency> - <dependency> - <groupId>com.twitter</groupId> - <artifactId>algebird-core_${scala.binary.version}</artifactId> - <version>0.11.0</version> - </dependency> <dependency> <groupId>org.scalacheck</groupId> <artifactId>scalacheck_${scala.binary.version}</artifactId> <scope>test</scope> </dependency> <dependency> - <groupId>org.apache.cassandra</groupId> - <artifactId>cassandra-all</artifactId> - <version>1.2.19</version> - <exclusions> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </exclusion> - <exclusion> - <groupId>com.googlecode.concurrentlinkedhashmap</groupId> - <artifactId>concurrentlinkedhashmap-lru</artifactId> - </exclusion> - <exclusion> - <groupId>com.ning</groupId> - <artifactId>compress-lzf</artifactId> - </exclusion> - <exclusion> - <groupId>commons-cli</groupId> - <artifactId>commons-cli</artifactId> - </exclusion> - <exclusion> - <groupId>commons-codec</groupId> - <artifactId>commons-codec</artifactId> - </exclusion> - <exclusion> - <groupId>commons-lang</groupId> - <artifactId>commons-lang</artifactId> - </exclusion> - <exclusion> - <groupId>commons-logging</groupId> - <artifactId>commons-logging</artifactId> - </exclusion> - <exclusion> - <groupId>io.netty</groupId> - <artifactId>netty</artifactId> - </exclusion> - <exclusion> - <groupId>jline</groupId> - <artifactId>jline</artifactId> - </exclusion> - <exclusion> - <groupId>net.jpountz.lz4</groupId> - <artifactId>lz4</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.cassandra.deps</groupId> - <artifactId>avro</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.commons</groupId> - <artifactId>commons-math3</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.thrift</groupId> - <artifactId>libthrift</artifactId> - </exclusion> - </exclusions> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <scope>provided</scope> </dependency> <dependency> <groupId>com.github.scopt</groupId> <artifactId>scopt_${scala.binary.version}</artifactId> <version>3.3.0</version> </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>parquet-hadoop-bundle</artifactId> + <scope>provided</scope> + </dependency> </dependencies> <build> @@ -314,40 +146,9 @@ <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kinesis-asl_${scala.binary.version}</artifactId> <version>${project.version}</version> + <scope>provided</scope> </dependency> </dependencies> </profile> - - <!-- Profiles that disable inclusion of certain dependencies. --> - <profile> - <id>flume-provided</id> - <properties> - <flume.deps.scope>provided</flume.deps.scope> - </properties> - </profile> - <profile> - <id>hadoop-provided</id> - <properties> - <hadoop.deps.scope>provided</hadoop.deps.scope> - </properties> - </profile> - <profile> - <id>hbase-provided</id> - <properties> - <hbase.deps.scope>provided</hbase.deps.scope> - </properties> - </profile> - <profile> - <id>hive-provided</id> - <properties> - <hive.deps.scope>provided</hive.deps.scope> - </properties> - </profile> - <profile> - <id>parquet-provided</id> - <properties> - <parquet.deps.scope>provided</parquet.deps.scope> - </properties> - </profile> </profiles> </project> diff --git a/examples/src/main/python/cassandra_inputformat.py b/examples/src/main/python/cassandra_inputformat.py deleted file mode 100644 index 93ca0cfcc93025ea0ac54f5d0b5e782d1c9d870c..0000000000000000000000000000000000000000 --- a/examples/src/main/python/cassandra_inputformat.py +++ /dev/null @@ -1,84 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from __future__ import print_function - -import sys - -from pyspark import SparkContext - -""" -Create data in Cassandra fist -(following: https://wiki.apache.org/cassandra/GettingStarted) - -cqlsh> CREATE KEYSPACE test - ... WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }; -cqlsh> use test; -cqlsh:test> CREATE TABLE users ( - ... user_id int PRIMARY KEY, - ... fname text, - ... lname text - ... ); -cqlsh:test> INSERT INTO users (user_id, fname, lname) - ... VALUES (1745, 'john', 'smith'); -cqlsh:test> INSERT INTO users (user_id, fname, lname) - ... VALUES (1744, 'john', 'doe'); -cqlsh:test> INSERT INTO users (user_id, fname, lname) - ... VALUES (1746, 'john', 'smith'); -cqlsh:test> SELECT * FROM users; - - user_id | fname | lname ----------+-------+------- - 1745 | john | smith - 1744 | john | doe - 1746 | john | smith -""" -if __name__ == "__main__": - if len(sys.argv) != 4: - print(""" - Usage: cassandra_inputformat <host> <keyspace> <cf> - - Run with example jar: - ./bin/spark-submit --driver-class-path /path/to/example/jar \ - /path/to/examples/cassandra_inputformat.py <host> <keyspace> <cf> - Assumes you have some data in Cassandra already, running on <host>, in <keyspace> and <cf> - """, file=sys.stderr) - exit(-1) - - host = sys.argv[1] - keyspace = sys.argv[2] - cf = sys.argv[3] - sc = SparkContext(appName="CassandraInputFormat") - - conf = {"cassandra.input.thrift.address": host, - "cassandra.input.thrift.port": "9160", - "cassandra.input.keyspace": keyspace, - "cassandra.input.columnfamily": cf, - "cassandra.input.partitioner.class": "Murmur3Partitioner", - "cassandra.input.page.row.size": "3"} - cass_rdd = sc.newAPIHadoopRDD( - "org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat", - "java.util.Map", - "java.util.Map", - keyConverter="org.apache.spark.examples.pythonconverters.CassandraCQLKeyConverter", - valueConverter="org.apache.spark.examples.pythonconverters.CassandraCQLValueConverter", - conf=conf) - output = cass_rdd.collect() - for (k, v) in output: - print((k, v)) - - sc.stop() diff --git a/examples/src/main/python/cassandra_outputformat.py b/examples/src/main/python/cassandra_outputformat.py deleted file mode 100644 index 5d643eac92f943d867aa7ee91a0473ebe66ce81e..0000000000000000000000000000000000000000 --- a/examples/src/main/python/cassandra_outputformat.py +++ /dev/null @@ -1,88 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from __future__ import print_function - -import sys - -from pyspark import SparkContext - -""" -Create data in Cassandra fist -(following: https://wiki.apache.org/cassandra/GettingStarted) - -cqlsh> CREATE KEYSPACE test - ... WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }; -cqlsh> use test; -cqlsh:test> CREATE TABLE users ( - ... user_id int PRIMARY KEY, - ... fname text, - ... lname text - ... ); - -> cassandra_outputformat <host> test users 1745 john smith -> cassandra_outputformat <host> test users 1744 john doe -> cassandra_outputformat <host> test users 1746 john smith - -cqlsh:test> SELECT * FROM users; - - user_id | fname | lname ----------+-------+------- - 1745 | john | smith - 1744 | john | doe - 1746 | john | smith -""" -if __name__ == "__main__": - if len(sys.argv) != 7: - print(""" - Usage: cassandra_outputformat <host> <keyspace> <cf> <user_id> <fname> <lname> - - Run with example jar: - ./bin/spark-submit --driver-class-path /path/to/example/jar \ - /path/to/examples/cassandra_outputformat.py <args> - Assumes you have created the following table <cf> in Cassandra already, - running on <host>, in <keyspace>. - - cqlsh:<keyspace>> CREATE TABLE <cf> ( - ... user_id int PRIMARY KEY, - ... fname text, - ... lname text - ... ); - """, file=sys.stderr) - exit(-1) - - host = sys.argv[1] - keyspace = sys.argv[2] - cf = sys.argv[3] - sc = SparkContext(appName="CassandraOutputFormat") - - conf = {"cassandra.output.thrift.address": host, - "cassandra.output.thrift.port": "9160", - "cassandra.output.keyspace": keyspace, - "cassandra.output.partitioner.class": "Murmur3Partitioner", - "cassandra.output.cql": "UPDATE " + keyspace + "." + cf + " SET fname = ?, lname = ?", - "mapreduce.output.basename": cf, - "mapreduce.outputformat.class": "org.apache.cassandra.hadoop.cql3.CqlOutputFormat", - "mapreduce.job.output.key.class": "java.util.Map", - "mapreduce.job.output.value.class": "java.util.List"} - key = {"user_id": int(sys.argv[4])} - sc.parallelize([(key, sys.argv[5:])]).saveAsNewAPIHadoopDataset( - conf=conf, - keyConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLKeyConverter", - valueConverter="org.apache.spark.examples.pythonconverters.ToCassandraCQLValueConverter") - - sc.stop() diff --git a/examples/src/main/python/hbase_inputformat.py b/examples/src/main/python/hbase_inputformat.py deleted file mode 100644 index c5ae5d043b8ea227e7a849eae1661c86602ce23a..0000000000000000000000000000000000000000 --- a/examples/src/main/python/hbase_inputformat.py +++ /dev/null @@ -1,90 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from __future__ import print_function - -import sys -import json - -from pyspark import SparkContext - -""" -Create test data in HBase first: - -hbase(main):016:0> create 'test', 'f1' -0 row(s) in 1.0430 seconds - -hbase(main):017:0> put 'test', 'row1', 'f1:a', 'value1' -0 row(s) in 0.0130 seconds - -hbase(main):018:0> put 'test', 'row1', 'f1:b', 'value2' -0 row(s) in 0.0030 seconds - -hbase(main):019:0> put 'test', 'row2', 'f1', 'value3' -0 row(s) in 0.0050 seconds - -hbase(main):020:0> put 'test', 'row3', 'f1', 'value4' -0 row(s) in 0.0110 seconds - -hbase(main):021:0> scan 'test' -ROW COLUMN+CELL - row1 column=f1:a, timestamp=1401883411986, value=value1 - row1 column=f1:b, timestamp=1401883415212, value=value2 - row2 column=f1:, timestamp=1401883417858, value=value3 - row3 column=f1:, timestamp=1401883420805, value=value4 -4 row(s) in 0.0240 seconds -""" -if __name__ == "__main__": - if len(sys.argv) != 3: - print(""" - Usage: hbase_inputformat <host> <table> - - Run with example jar: - ./bin/spark-submit --driver-class-path /path/to/example/jar \ - /path/to/examples/hbase_inputformat.py <host> <table> [<znode>] - Assumes you have some data in HBase already, running on <host>, in <table> - optionally, you can specify parent znode for your hbase cluster - <znode> - """, file=sys.stderr) - exit(-1) - - host = sys.argv[1] - table = sys.argv[2] - sc = SparkContext(appName="HBaseInputFormat") - - # Other options for configuring scan behavior are available. More information available at - # https://github.com/apache/hbase/blob/master/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormat.java - conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table} - if len(sys.argv) > 3: - conf = {"hbase.zookeeper.quorum": host, "zookeeper.znode.parent": sys.argv[3], - "hbase.mapreduce.inputtable": table} - keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter" - valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter" - - hbase_rdd = sc.newAPIHadoopRDD( - "org.apache.hadoop.hbase.mapreduce.TableInputFormat", - "org.apache.hadoop.hbase.io.ImmutableBytesWritable", - "org.apache.hadoop.hbase.client.Result", - keyConverter=keyConv, - valueConverter=valueConv, - conf=conf) - hbase_rdd = hbase_rdd.flatMapValues(lambda v: v.split("\n")).mapValues(json.loads) - - output = hbase_rdd.collect() - for (k, v) in output: - print((k, v)) - - sc.stop() diff --git a/examples/src/main/python/hbase_outputformat.py b/examples/src/main/python/hbase_outputformat.py deleted file mode 100644 index 9e5641789a976ad6bce7dfedf4399381572b0382..0000000000000000000000000000000000000000 --- a/examples/src/main/python/hbase_outputformat.py +++ /dev/null @@ -1,73 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from __future__ import print_function - -import sys - -from pyspark import SparkContext - -""" -Create test table in HBase first: - -hbase(main):001:0> create 'test', 'f1' -0 row(s) in 0.7840 seconds - -> hbase_outputformat <host> test row1 f1 q1 value1 -> hbase_outputformat <host> test row2 f1 q1 value2 -> hbase_outputformat <host> test row3 f1 q1 value3 -> hbase_outputformat <host> test row4 f1 q1 value4 - -hbase(main):002:0> scan 'test' -ROW COLUMN+CELL - row1 column=f1:q1, timestamp=1405659615726, value=value1 - row2 column=f1:q1, timestamp=1405659626803, value=value2 - row3 column=f1:q1, timestamp=1405659640106, value=value3 - row4 column=f1:q1, timestamp=1405659650292, value=value4 -4 row(s) in 0.0780 seconds -""" -if __name__ == "__main__": - if len(sys.argv) != 7: - print(""" - Usage: hbase_outputformat <host> <table> <row> <family> <qualifier> <value> - - Run with example jar: - ./bin/spark-submit --driver-class-path /path/to/example/jar \ - /path/to/examples/hbase_outputformat.py <args> - Assumes you have created <table> with column family <family> in HBase - running on <host> already - """, file=sys.stderr) - exit(-1) - - host = sys.argv[1] - table = sys.argv[2] - sc = SparkContext(appName="HBaseOutputFormat") - - conf = {"hbase.zookeeper.quorum": host, - "hbase.mapred.outputtable": table, - "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat", - "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable", - "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"} - keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter" - valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter" - - sc.parallelize([sys.argv[3:]]).map(lambda x: (x[0], x)).saveAsNewAPIHadoopDataset( - conf=conf, - keyConverter=keyConv, - valueConverter=valueConv) - - sc.stop() diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala deleted file mode 100644 index ca4eea235683ada22fe9a4a484b86ede88847f09..0000000000000000000000000000000000000000 --- a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - // scalastyle:off println -package org.apache.spark.examples - -import java.nio.ByteBuffer -import java.util.Collections - -import org.apache.cassandra.hadoop.ConfigHelper -import org.apache.cassandra.hadoop.cql3.CqlConfigHelper -import org.apache.cassandra.hadoop.cql3.CqlOutputFormat -import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat -import org.apache.cassandra.utils.ByteBufferUtil -import org.apache.hadoop.mapreduce.Job - -import org.apache.spark.{SparkConf, SparkContext} - -/* - Need to create following keyspace and column family in cassandra before running this example - Start CQL shell using ./bin/cqlsh and execute following commands - CREATE KEYSPACE retail WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}; - use retail; - CREATE TABLE salecount (prod_id text, sale_count int, PRIMARY KEY (prod_id)); - CREATE TABLE ordercf (user_id text, - time timestamp, - prod_id text, - quantity int, - PRIMARY KEY (user_id, time)); - INSERT INTO ordercf (user_id, - time, - prod_id, - quantity) VALUES ('bob', 1385983646000, 'iphone', 1); - INSERT INTO ordercf (user_id, - time, - prod_id, - quantity) VALUES ('tom', 1385983647000, 'samsung', 4); - INSERT INTO ordercf (user_id, - time, - prod_id, - quantity) VALUES ('dora', 1385983648000, 'nokia', 2); - INSERT INTO ordercf (user_id, - time, - prod_id, - quantity) VALUES ('charlie', 1385983649000, 'iphone', 2); -*/ - -/** - * This example demonstrates how to read and write to cassandra column family created using CQL3 - * using Spark. - * Parameters : <cassandra_node> <cassandra_port> - * Usage: ./bin/spark-submit examples.jar \ - * --class org.apache.spark.examples.CassandraCQLTest localhost 9160 - */ -object CassandraCQLTest { - - def main(args: Array[String]) { - val sparkConf = new SparkConf().setAppName("CQLTestApp") - - val sc = new SparkContext(sparkConf) - val cHost: String = args(0) - val cPort: String = args(1) - val KeySpace = "retail" - val InputColumnFamily = "ordercf" - val OutputColumnFamily = "salecount" - - val job = Job.getInstance() - job.setInputFormatClass(classOf[CqlPagingInputFormat]) - val configuration = job.getConfiguration - ConfigHelper.setInputInitialAddress(job.getConfiguration(), cHost) - ConfigHelper.setInputRpcPort(job.getConfiguration(), cPort) - ConfigHelper.setInputColumnFamily(job.getConfiguration(), KeySpace, InputColumnFamily) - ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner") - CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3") - - /** CqlConfigHelper.setInputWhereClauses(job.getConfiguration(), "user_id='bob'") */ - - /** An UPDATE writes one or more columns to a record in a Cassandra column family */ - val query = "UPDATE " + KeySpace + "." + OutputColumnFamily + " SET sale_count = ? " - CqlConfigHelper.setOutputCql(job.getConfiguration(), query) - - job.setOutputFormatClass(classOf[CqlOutputFormat]) - ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KeySpace, OutputColumnFamily) - ConfigHelper.setOutputInitialAddress(job.getConfiguration(), cHost) - ConfigHelper.setOutputRpcPort(job.getConfiguration(), cPort) - ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner") - - val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(), - classOf[CqlPagingInputFormat], - classOf[java.util.Map[String, ByteBuffer]], - classOf[java.util.Map[String, ByteBuffer]]) - - println("Count: " + casRdd.count) - val productSaleRDD = casRdd.map { - case (key, value) => - (ByteBufferUtil.string(value.get("prod_id")), ByteBufferUtil.toInt(value.get("quantity"))) - } - val aggregatedRDD = productSaleRDD.reduceByKey(_ + _) - aggregatedRDD.collect().foreach { - case (productId, saleCount) => println(productId + ":" + saleCount) - } - - val casoutputCF = aggregatedRDD.map { - case (productId, saleCount) => - val outKey = Collections.singletonMap("prod_id", ByteBufferUtil.bytes(productId)) - val outVal = Collections.singletonList(ByteBufferUtil.bytes(saleCount)) - (outKey, outVal) - } - - casoutputCF.saveAsNewAPIHadoopFile( - KeySpace, - classOf[java.util.Map[String, ByteBuffer]], - classOf[java.util.List[ByteBuffer]], - classOf[CqlOutputFormat], - job.getConfiguration() - ) - - sc.stop() - } -} -// scalastyle:on println diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala deleted file mode 100644 index eff840d36e8d45d3c64a997766202f6003eee96d..0000000000000000000000000000000000000000 --- a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala +++ /dev/null @@ -1,215 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// scalastyle:off println -package org.apache.spark.examples - -import java.nio.ByteBuffer -import java.util.Arrays -import java.util.SortedMap - -import org.apache.cassandra.db.IColumn -import org.apache.cassandra.hadoop.ColumnFamilyInputFormat -import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat -import org.apache.cassandra.hadoop.ConfigHelper -import org.apache.cassandra.thrift._ -import org.apache.cassandra.utils.ByteBufferUtil -import org.apache.hadoop.mapreduce.Job - -import org.apache.spark.{SparkConf, SparkContext} - -/* - * This example demonstrates using Spark with Cassandra with the New Hadoop API and Cassandra - * support for Hadoop. - * - * To run this example, run this file with the following command params - - * <cassandra_node> <cassandra_port> - * - * So if you want to run this on localhost this will be, - * localhost 9160 - * - * The example makes some assumptions: - * 1. You have already created a keyspace called casDemo and it has a column family named Words - * 2. There are column family has a column named "para" which has test content. - * - * You can create the content by running the following script at the bottom of this file with - * cassandra-cli. - * - */ -object CassandraTest { - - def main(args: Array[String]) { - val sparkConf = new SparkConf().setAppName("casDemo") - // Get a SparkContext - val sc = new SparkContext(sparkConf) - - // Build the job configuration with ConfigHelper provided by Cassandra - val job = Job.getInstance() - job.setInputFormatClass(classOf[ColumnFamilyInputFormat]) - - val host: String = args(1) - val port: String = args(2) - - ConfigHelper.setInputInitialAddress(job.getConfiguration(), host) - ConfigHelper.setInputRpcPort(job.getConfiguration(), port) - ConfigHelper.setOutputInitialAddress(job.getConfiguration(), host) - ConfigHelper.setOutputRpcPort(job.getConfiguration(), port) - ConfigHelper.setInputColumnFamily(job.getConfiguration(), "casDemo", "Words") - ConfigHelper.setOutputColumnFamily(job.getConfiguration(), "casDemo", "WordCount") - - val predicate = new SlicePredicate() - val sliceRange = new SliceRange() - sliceRange.setStart(Array.empty[Byte]) - sliceRange.setFinish(Array.empty[Byte]) - predicate.setSlice_range(sliceRange) - ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate) - - ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner") - ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner") - - // Make a new Hadoop RDD - val casRdd = sc.newAPIHadoopRDD( - job.getConfiguration(), - classOf[ColumnFamilyInputFormat], - classOf[ByteBuffer], - classOf[SortedMap[ByteBuffer, IColumn]]) - - // Let us first get all the paragraphs from the retrieved rows - val paraRdd = casRdd.map { - case (key, value) => - ByteBufferUtil.string(value.get(ByteBufferUtil.bytes("para")).value()) - } - - // Lets get the word count in paras - val counts = paraRdd.flatMap(p => p.split(" ")).map(word => (word, 1)).reduceByKey(_ + _) - - counts.collect().foreach { - case (word, count) => println(word + ":" + count) - } - - counts.map { - case (word, count) => - val colWord = new org.apache.cassandra.thrift.Column() - colWord.setName(ByteBufferUtil.bytes("word")) - colWord.setValue(ByteBufferUtil.bytes(word)) - colWord.setTimestamp(System.currentTimeMillis) - - val colCount = new org.apache.cassandra.thrift.Column() - colCount.setName(ByteBufferUtil.bytes("wcount")) - colCount.setValue(ByteBufferUtil.bytes(count.toLong)) - colCount.setTimestamp(System.currentTimeMillis) - - val outputkey = ByteBufferUtil.bytes(word + "-COUNT-" + System.currentTimeMillis) - - val mutations = Arrays.asList(new Mutation(), new Mutation()) - mutations.get(0).setColumn_or_supercolumn(new ColumnOrSuperColumn()) - mutations.get(0).column_or_supercolumn.setColumn(colWord) - mutations.get(1).setColumn_or_supercolumn(new ColumnOrSuperColumn()) - mutations.get(1).column_or_supercolumn.setColumn(colCount) - (outputkey, mutations) - }.saveAsNewAPIHadoopFile("casDemo", classOf[ByteBuffer], classOf[List[Mutation]], - classOf[ColumnFamilyOutputFormat], job.getConfiguration) - - sc.stop() - } -} -// scalastyle:on println - -/* -create keyspace casDemo; -use casDemo; - -create column family WordCount with comparator = UTF8Type; -update column family WordCount with column_metadata = - [{column_name: word, validation_class: UTF8Type}, - {column_name: wcount, validation_class: LongType}]; - -create column family Words with comparator = UTF8Type; -update column family Words with column_metadata = - [{column_name: book, validation_class: UTF8Type}, - {column_name: para, validation_class: UTF8Type}]; - -assume Words keys as utf8; - -set Words['3musk001']['book'] = 'The Three Musketeers'; -set Words['3musk001']['para'] = 'On the first Monday of the month of April, 1625, the market - town of Meung, in which the author of ROMANCE OF THE ROSE was born, appeared to - be in as perfect a state of revolution as if the Huguenots had just made - a second La Rochelle of it. Many citizens, seeing the women flying - toward the High Street, leaving their children crying at the open doors, - hastened to don the cuirass, and supporting their somewhat uncertain - courage with a musket or a partisan, directed their steps toward the - hostelry of the Jolly Miller, before which was gathered, increasing - every minute, a compact group, vociferous and full of curiosity.'; - -set Words['3musk002']['book'] = 'The Three Musketeers'; -set Words['3musk002']['para'] = 'In those times panics were common, and few days passed without - some city or other registering in its archives an event of this kind. There were - nobles, who made war against each other; there was the king, who made - war against the cardinal; there was Spain, which made war against the - king. Then, in addition to these concealed or public, secret or open - wars, there were robbers, mendicants, Huguenots, wolves, and scoundrels, - who made war upon everybody. The citizens always took up arms readily - against thieves, wolves or scoundrels, often against nobles or - Huguenots, sometimes against the king, but never against cardinal or - Spain. It resulted, then, from this habit that on the said first Monday - of April, 1625, the citizens, on hearing the clamor, and seeing neither - the red-and-yellow standard nor the livery of the Duc de Richelieu, - rushed toward the hostel of the Jolly Miller. When arrived there, the - cause of the hubbub was apparent to all'; - -set Words['3musk003']['book'] = 'The Three Musketeers'; -set Words['3musk003']['para'] = 'You ought, I say, then, to husband the means you have, however - large the sum may be; but you ought also to endeavor to perfect yourself in - the exercises becoming a gentleman. I will write a letter today to the - Director of the Royal Academy, and tomorrow he will admit you without - any expense to yourself. Do not refuse this little service. Our - best-born and richest gentlemen sometimes solicit it without being able - to obtain it. You will learn horsemanship, swordsmanship in all its - branches, and dancing. You will make some desirable acquaintances; and - from time to time you can call upon me, just to tell me how you are - getting on, and to say whether I can be of further service to you.'; - - -set Words['thelostworld001']['book'] = 'The Lost World'; -set Words['thelostworld001']['para'] = 'She sat with that proud, delicate profile of hers outlined - against the red curtain. How beautiful she was! And yet how aloof! We had been - friends, quite good friends; but never could I get beyond the same - comradeship which I might have established with one of my - fellow-reporters upon the Gazette,--perfectly frank, perfectly kindly, - and perfectly unsexual. My instincts are all against a woman being too - frank and at her ease with me. It is no compliment to a man. Where - the real sex feeling begins, timidity and distrust are its companions, - heritage from old wicked days when love and violence went often hand in - hand. The bent head, the averted eye, the faltering voice, the wincing - figure--these, and not the unshrinking gaze and frank reply, are the - true signals of passion. Even in my short life I had learned as much - as that--or had inherited it in that race memory which we call instinct.'; - -set Words['thelostworld002']['book'] = 'The Lost World'; -set Words['thelostworld002']['para'] = 'I always liked McArdle, the crabbed, old, round-backed, - red-headed news editor, and I rather hoped that he liked me. Of course, Beaumont was - the real boss; but he lived in the rarefied atmosphere of some Olympian - height from which he could distinguish nothing smaller than an - international crisis or a split in the Cabinet. Sometimes we saw him - passing in lonely majesty to his inner sanctum, with his eyes staring - vaguely and his mind hovering over the Balkans or the Persian Gulf. He - was above and beyond us. But McArdle was his first lieutenant, and it - was he that we knew. The old man nodded as I entered the room, and he - pushed his spectacles far up on his bald forehead.'; - -*/ diff --git a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala deleted file mode 100644 index 65d748958606258d570763dece8a2ea8808cbe9e..0000000000000000000000000000000000000000 --- a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// scalastyle:off println -package org.apache.spark.examples - -import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName} -import org.apache.hadoop.hbase.client.HBaseAdmin -import org.apache.hadoop.hbase.mapreduce.TableInputFormat - -import org.apache.spark._ - -object HBaseTest { - def main(args: Array[String]) { - val sparkConf = new SparkConf().setAppName("HBaseTest") - val sc = new SparkContext(sparkConf) - - // please ensure HBASE_CONF_DIR is on classpath of spark driver - // e.g: set it through spark.driver.extraClassPath property - // in spark-defaults.conf or through --driver-class-path - // command line option of spark-submit - - val conf = HBaseConfiguration.create() - - if (args.length < 1) { - System.err.println("Usage: HBaseTest <table_name>") - System.exit(1) - } - - // Other options for configuring scan behavior are available. More information available at - // http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html - conf.set(TableInputFormat.INPUT_TABLE, args(0)) - - // Initialize hBase table if necessary - val admin = new HBaseAdmin(conf) - if (!admin.isTableAvailable(args(0))) { - val tableDesc = new HTableDescriptor(TableName.valueOf(args(0))) - admin.createTable(tableDesc) - } - - val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], - classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], - classOf[org.apache.hadoop.hbase.client.Result]) - - hBaseRDD.count() - - sc.stop() - admin.close() - } -} -// scalastyle:on println diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala deleted file mode 100644 index 00ce47af4813de162d831124976e820e512aaeed..0000000000000000000000000000000000000000 --- a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.examples.pythonconverters - -import java.nio.ByteBuffer - -import scala.collection.JavaConverters._ - -import org.apache.cassandra.utils.ByteBufferUtil - -import org.apache.spark.api.python.Converter - -/** - * Implementation of [[org.apache.spark.api.python.Converter]] that converts Cassandra - * output to a Map[String, Int] - */ -class CassandraCQLKeyConverter extends Converter[Any, java.util.Map[String, Int]] { - override def convert(obj: Any): java.util.Map[String, Int] = { - val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]] - result.asScala.mapValues(ByteBufferUtil.toInt).asJava - } -} - -/** - * Implementation of [[org.apache.spark.api.python.Converter]] that converts Cassandra - * output to a Map[String, String] - */ -class CassandraCQLValueConverter extends Converter[Any, java.util.Map[String, String]] { - override def convert(obj: Any): java.util.Map[String, String] = { - val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]] - result.asScala.mapValues(ByteBufferUtil.string).asJava - } -} - -/** - * Implementation of [[org.apache.spark.api.python.Converter]] that converts a - * Map[String, Int] to Cassandra key - */ -class ToCassandraCQLKeyConverter extends Converter[Any, java.util.Map[String, ByteBuffer]] { - override def convert(obj: Any): java.util.Map[String, ByteBuffer] = { - val input = obj.asInstanceOf[java.util.Map[String, Int]] - input.asScala.mapValues(ByteBufferUtil.bytes).asJava - } -} - -/** - * Implementation of [[org.apache.spark.api.python.Converter]] that converts a - * List[String] to Cassandra value - */ -class ToCassandraCQLValueConverter extends Converter[Any, java.util.List[ByteBuffer]] { - override def convert(obj: Any): java.util.List[ByteBuffer] = { - val input = obj.asInstanceOf[java.util.List[String]] - input.asScala.map(ByteBufferUtil.bytes).asJava - } -} diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala deleted file mode 100644 index e252ca882e534b271a00cf06fcd25bd2ce9234c3..0000000000000000000000000000000000000000 --- a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.examples.pythonconverters - -import scala.collection.JavaConverters._ -import scala.util.parsing.json.JSONObject - -import org.apache.hadoop.hbase.CellUtil -import org.apache.hadoop.hbase.KeyValue.Type -import org.apache.hadoop.hbase.client.{Put, Result} -import org.apache.hadoop.hbase.io.ImmutableBytesWritable -import org.apache.hadoop.hbase.util.Bytes - -import org.apache.spark.api.python.Converter - -/** - * Implementation of [[org.apache.spark.api.python.Converter]] that converts all - * the records in an HBase Result to a String - */ -class HBaseResultToStringConverter extends Converter[Any, String] { - override def convert(obj: Any): String = { - val result = obj.asInstanceOf[Result] - val output = result.listCells.asScala.map(cell => - Map( - "row" -> Bytes.toStringBinary(CellUtil.cloneRow(cell)), - "columnFamily" -> Bytes.toStringBinary(CellUtil.cloneFamily(cell)), - "qualifier" -> Bytes.toStringBinary(CellUtil.cloneQualifier(cell)), - "timestamp" -> cell.getTimestamp.toString, - "type" -> Type.codeToType(cell.getTypeByte).toString, - "value" -> Bytes.toStringBinary(CellUtil.cloneValue(cell)) - ) - ) - output.map(JSONObject(_).toString()).mkString("\n") - } -} - -/** - * Implementation of [[org.apache.spark.api.python.Converter]] that converts an - * ImmutableBytesWritable to a String - */ -class ImmutableBytesWritableToStringConverter extends Converter[Any, String] { - override def convert(obj: Any): String = { - val key = obj.asInstanceOf[ImmutableBytesWritable] - Bytes.toStringBinary(key.get()) - } -} - -/** - * Implementation of [[org.apache.spark.api.python.Converter]] that converts a - * String to an ImmutableBytesWritable - */ -class StringToImmutableBytesWritableConverter extends Converter[Any, ImmutableBytesWritable] { - override def convert(obj: Any): ImmutableBytesWritable = { - val bytes = Bytes.toBytes(obj.asInstanceOf[String]) - new ImmutableBytesWritable(bytes) - } -} - -/** - * Implementation of [[org.apache.spark.api.python.Converter]] that converts a - * list of Strings to HBase Put - */ -class StringListToPutConverter extends Converter[Any, Put] { - override def convert(obj: Any): Put = { - val output = obj.asInstanceOf[java.util.ArrayList[String]].asScala.map(Bytes.toBytes).toArray - val put = new Put(output(0)) - put.add(output(1), output(2), output(3)) - } -} diff --git a/pom.xml b/pom.xml index 73334a852d39e2892cd92a04b289bff059eedc63..d3a69df7f279c0ebbb37d82a17d6235862118722 100644 --- a/pom.xml +++ b/pom.xml @@ -126,8 +126,6 @@ <hadoop.version>2.2.0</hadoop.version> <protobuf.version>2.5.0</protobuf.version> <yarn.version>${hadoop.version}</yarn.version> - <hbase.version>0.98.17-hadoop2</hbase.version> - <hbase.artifact>hbase</hbase.artifact> <flume.version>1.6.0</flume.version> <zookeeper.version>3.4.5</zookeeper.version> <curator.version>2.4.0</curator.version> @@ -205,7 +203,6 @@ --> <flume.deps.scope>compile</flume.deps.scope> <hadoop.deps.scope>compile</hadoop.deps.scope> - <hbase.deps.scope>compile</hbase.deps.scope> <hive.deps.scope>compile</hive.deps.scope> <parquet.deps.scope>compile</parquet.deps.scope> <parquet.test.deps.scope>test</parquet.test.deps.scope> @@ -739,7 +736,7 @@ <exclusion> <groupId>jline</groupId> <artifactId>jline</artifactId> - </exclusion> + </exclusion> </exclusions> </dependency> <dependency> @@ -2572,9 +2569,6 @@ <profile> <id>hadoop-provided</id> </profile> - <profile> - <id>hbase-provided</id> - </profile> <profile> <id>hive-provided</id> </profile>