Skip to content
Snippets Groups Projects
Commit bc5d6c03 authored by Reynold Xin's avatar Reynold Xin
Browse files

[SPARK-11541][SQL] Break JdbcDialects.scala into multiple files and mark...

[SPARK-11541][SQL] Break JdbcDialects.scala into multiple files and mark various dialects as private.

Author: Reynold Xin <rxin@databricks.com>

Closes #9511 from rxin/SPARK-11541.
parent 363a476c
No related branches found
No related tags found
No related merge requests found
Showing
with 332 additions and 187 deletions
...@@ -116,7 +116,24 @@ object MimaExcludes { ...@@ -116,7 +116,24 @@ object MimaExcludes {
"org.apache.spark.rdd.MapPartitionsWithPreparationRDD$") "org.apache.spark.rdd.MapPartitionsWithPreparationRDD$")
) ++ Seq( ) ++ Seq(
// SPARK-11485 // SPARK-11485
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.DataFrameHolder.df") ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.DataFrameHolder.df"),
// SPARK-11541 mark various JDBC dialects as private
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.productElement"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.productArity"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.canEqual"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.productIterator"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.productPrefix"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.toString"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.NoopDialect.hashCode"),
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.jdbc.PostgresDialect$"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.productElement"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.productArity"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.canEqual"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.productIterator"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.productPrefix"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.toString"),
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.sql.jdbc.PostgresDialect.hashCode"),
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.jdbc.NoopDialect$")
) )
case v if v.startsWith("1.5") => case v if v.startsWith("1.5") =>
Seq( Seq(
......
...@@ -40,7 +40,7 @@ import org.apache.spark.sql.types.NumericType ...@@ -40,7 +40,7 @@ import org.apache.spark.sql.types.NumericType
class GroupedData protected[sql]( class GroupedData protected[sql](
df: DataFrame, df: DataFrame,
groupingExprs: Seq[Expression], groupingExprs: Seq[Expression],
private val groupType: GroupedData.GroupType) { groupType: GroupedData.GroupType) {
private[this] def toDF(aggExprs: Seq[Expression]): DataFrame = { private[this] def toDF(aggExprs: Seq[Expression]): DataFrame = {
val aggregates = if (df.sqlContext.conf.dataFrameRetainGroupColumns) { val aggregates = if (df.sqlContext.conf.dataFrameRetainGroupColumns) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.jdbc
import org.apache.spark.sql.types.{DataType, MetadataBuilder}
/**
* AggregatedDialect can unify multiple dialects into one virtual Dialect.
* Dialects are tried in order, and the first dialect that does not return a
* neutral element will will.
*
* @param dialects List of dialects.
*/
private class AggregatedDialect(dialects: List[JdbcDialect]) extends JdbcDialect {
require(dialects.nonEmpty)
override def canHandle(url : String): Boolean =
dialects.map(_.canHandle(url)).reduce(_ && _)
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
dialects.flatMap(_.getCatalystType(sqlType, typeName, size, md)).headOption
}
override def getJDBCType(dt: DataType): Option[JdbcType] = {
dialects.flatMap(_.getJDBCType(dt)).headOption
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.jdbc
import org.apache.spark.sql.types.{BooleanType, StringType, DataType}
private object DB2Dialect extends JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:db2")
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case StringType => Option(JdbcType("CLOB", java.sql.Types.CLOB))
case BooleanType => Option(JdbcType("CHAR(1)", java.sql.Types.CHAR))
case _ => None
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.jdbc
import java.sql.Types
import org.apache.spark.sql.types._
private object DerbyDialect extends JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:derby")
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
if (sqlType == Types.REAL) Option(FloatType) else None
}
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case StringType => Option(JdbcType("CLOB", java.sql.Types.CLOB))
case ByteType => Option(JdbcType("SMALLINT", java.sql.Types.SMALLINT))
case ShortType => Option(JdbcType("SMALLINT", java.sql.Types.SMALLINT))
case BooleanType => Option(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN))
// 31 is the maximum precision and 5 is the default scale for a Derby DECIMAL
case t: DecimalType if t.precision > 31 =>
Option(JdbcType("DECIMAL(31,5)", java.sql.Types.DECIMAL))
case _ => None
}
}
...@@ -17,8 +17,6 @@ ...@@ -17,8 +17,6 @@
package org.apache.spark.sql.jdbc package org.apache.spark.sql.jdbc
import java.sql.Types
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
import org.apache.spark.annotation.DeveloperApi import org.apache.spark.annotation.DeveloperApi
...@@ -115,11 +113,10 @@ abstract class JdbcDialect { ...@@ -115,11 +113,10 @@ abstract class JdbcDialect {
@DeveloperApi @DeveloperApi
object JdbcDialects { object JdbcDialects {
private var dialects = List[JdbcDialect]()
/** /**
* Register a dialect for use on all new matching jdbc [[org.apache.spark.sql.DataFrame]]. * Register a dialect for use on all new matching jdbc [[org.apache.spark.sql.DataFrame]].
* Readding an existing dialect will cause a move-to-front. * Readding an existing dialect will cause a move-to-front.
*
* @param dialect The new dialect. * @param dialect The new dialect.
*/ */
def registerDialect(dialect: JdbcDialect) : Unit = { def registerDialect(dialect: JdbcDialect) : Unit = {
...@@ -128,12 +125,15 @@ object JdbcDialects { ...@@ -128,12 +125,15 @@ object JdbcDialects {
/** /**
* Unregister a dialect. Does nothing if the dialect is not registered. * Unregister a dialect. Does nothing if the dialect is not registered.
*
* @param dialect The jdbc dialect. * @param dialect The jdbc dialect.
*/ */
def unregisterDialect(dialect : JdbcDialect) : Unit = { def unregisterDialect(dialect : JdbcDialect) : Unit = {
dialects = dialects.filterNot(_ == dialect) dialects = dialects.filterNot(_ == dialect)
} }
private[this] var dialects = List[JdbcDialect]()
registerDialect(MySQLDialect) registerDialect(MySQLDialect)
registerDialect(PostgresDialect) registerDialect(PostgresDialect)
registerDialect(DB2Dialect) registerDialect(DB2Dialect)
...@@ -141,7 +141,6 @@ object JdbcDialects { ...@@ -141,7 +141,6 @@ object JdbcDialects {
registerDialect(DerbyDialect) registerDialect(DerbyDialect)
registerDialect(OracleDialect) registerDialect(OracleDialect)
/** /**
* Fetch the JdbcDialect class corresponding to a given database url. * Fetch the JdbcDialect class corresponding to a given database url.
*/ */
...@@ -156,187 +155,8 @@ object JdbcDialects { ...@@ -156,187 +155,8 @@ object JdbcDialects {
} }
/** /**
* :: DeveloperApi ::
* AggregatedDialect can unify multiple dialects into one virtual Dialect.
* Dialects are tried in order, and the first dialect that does not return a
* neutral element will will.
* @param dialects List of dialects.
*/
@DeveloperApi
class AggregatedDialect(dialects: List[JdbcDialect]) extends JdbcDialect {
require(dialects.nonEmpty)
override def canHandle(url : String): Boolean =
dialects.map(_.canHandle(url)).reduce(_ && _)
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
dialects.flatMap(_.getCatalystType(sqlType, typeName, size, md)).headOption
}
override def getJDBCType(dt: DataType): Option[JdbcType] = {
dialects.flatMap(_.getJDBCType(dt)).headOption
}
}
/**
* :: DeveloperApi ::
* NOOP dialect object, always returning the neutral element. * NOOP dialect object, always returning the neutral element.
*/ */
@DeveloperApi private object NoopDialect extends JdbcDialect {
case object NoopDialect extends JdbcDialect {
override def canHandle(url : String): Boolean = true override def canHandle(url : String): Boolean = true
} }
/**
* :: DeveloperApi ::
* Default postgres dialect, mapping bit/cidr/inet on read and string/binary/boolean on write.
*/
@DeveloperApi
case object PostgresDialect extends JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:postgresql")
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
if (sqlType == Types.BIT && typeName.equals("bit") && size != 1) {
Option(BinaryType)
} else if (sqlType == Types.OTHER && typeName.equals("cidr")) {
Option(StringType)
} else if (sqlType == Types.OTHER && typeName.equals("inet")) {
Option(StringType)
} else if (sqlType == Types.OTHER && typeName.equals("json")) {
Option(StringType)
} else if (sqlType == Types.OTHER && typeName.equals("jsonb")) {
Option(StringType)
} else None
}
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case StringType => Some(JdbcType("TEXT", java.sql.Types.CHAR))
case BinaryType => Some(JdbcType("BYTEA", java.sql.Types.BINARY))
case BooleanType => Some(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN))
case _ => None
}
override def getTableExistsQuery(table: String): String = {
s"SELECT 1 FROM $table LIMIT 1"
}
}
/**
* :: DeveloperApi ::
* Default mysql dialect to read bit/bitsets correctly.
*/
@DeveloperApi
case object MySQLDialect extends JdbcDialect {
override def canHandle(url : String): Boolean = url.startsWith("jdbc:mysql")
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
if (sqlType == Types.VARBINARY && typeName.equals("BIT") && size != 1) {
// This could instead be a BinaryType if we'd rather return bit-vectors of up to 64 bits as
// byte arrays instead of longs.
md.putLong("binarylong", 1)
Option(LongType)
} else if (sqlType == Types.BIT && typeName.equals("TINYINT")) {
Option(BooleanType)
} else None
}
override def quoteIdentifier(colName: String): String = {
s"`$colName`"
}
override def getTableExistsQuery(table: String): String = {
s"SELECT 1 FROM $table LIMIT 1"
}
}
/**
* :: DeveloperApi ::
* Default DB2 dialect, mapping string/boolean on write to valid DB2 types.
* By default string, and boolean gets mapped to db2 invalid types TEXT, and BIT(1).
*/
@DeveloperApi
case object DB2Dialect extends JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:db2")
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case StringType => Some(JdbcType("CLOB", java.sql.Types.CLOB))
case BooleanType => Some(JdbcType("CHAR(1)", java.sql.Types.CHAR))
case _ => None
}
}
/**
* :: DeveloperApi ::
* Default Microsoft SQL Server dialect, mapping the datetimeoffset types to a String on read.
*/
@DeveloperApi
case object MsSqlServerDialect extends JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:sqlserver")
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
if (typeName.contains("datetimeoffset")) {
// String is recommend by Microsoft SQL Server for datetimeoffset types in non-MS clients
Option(StringType)
} else None
}
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case TimestampType => Some(JdbcType("DATETIME", java.sql.Types.TIMESTAMP))
case _ => None
}
}
/**
* :: DeveloperApi ::
* Default Apache Derby dialect, mapping real on read
* and string/byte/short/boolean/decimal on write.
*/
@DeveloperApi
case object DerbyDialect extends JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:derby")
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
if (sqlType == Types.REAL) Option(FloatType) else None
}
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case StringType => Some(JdbcType("CLOB", java.sql.Types.CLOB))
case ByteType => Some(JdbcType("SMALLINT", java.sql.Types.SMALLINT))
case ShortType => Some(JdbcType("SMALLINT", java.sql.Types.SMALLINT))
case BooleanType => Some(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN))
// 31 is the maximum precision and 5 is the default scale for a Derby DECIMAL
case (t: DecimalType) if (t.precision > 31) =>
Some(JdbcType("DECIMAL(31,5)", java.sql.Types.DECIMAL))
case _ => None
}
}
/**
* :: DeveloperApi ::
* Default Oracle dialect, mapping a nonspecific numeric type to a general decimal type.
*/
@DeveloperApi
case object OracleDialect extends JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle")
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
// Handle NUMBER fields that have no precision/scale in special way
// because JDBC ResultSetMetaData converts this to 0 procision and -127 scale
// For more details, please see
// https://github.com/apache/spark/pull/8780#issuecomment-145598968
// and
// https://github.com/apache/spark/pull/8780#issuecomment-144541760
if (sqlType == Types.NUMERIC && size == 0) {
// This is sub-optimal as we have to pick a precision/scale in advance whereas the data
// in Oracle is allowed to have different precision/scale for each value.
Some(DecimalType(DecimalType.MAX_PRECISION, 10))
} else {
None
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.jdbc
import org.apache.spark.sql.types._
private object MsSqlServerDialect extends JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:sqlserver")
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
if (typeName.contains("datetimeoffset")) {
// String is recommend by Microsoft SQL Server for datetimeoffset types in non-MS clients
Option(StringType)
} else {
None
}
}
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case TimestampType => Some(JdbcType("DATETIME", java.sql.Types.TIMESTAMP))
case _ => None
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.jdbc
import java.sql.Types
import org.apache.spark.sql.types.{BooleanType, LongType, DataType, MetadataBuilder}
private case object MySQLDialect extends JdbcDialect {
override def canHandle(url : String): Boolean = url.startsWith("jdbc:mysql")
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
if (sqlType == Types.VARBINARY && typeName.equals("BIT") && size != 1) {
// This could instead be a BinaryType if we'd rather return bit-vectors of up to 64 bits as
// byte arrays instead of longs.
md.putLong("binarylong", 1)
Option(LongType)
} else if (sqlType == Types.BIT && typeName.equals("TINYINT")) {
Option(BooleanType)
} else None
}
override def quoteIdentifier(colName: String): String = {
s"`$colName`"
}
override def getTableExistsQuery(table: String): String = {
s"SELECT 1 FROM $table LIMIT 1"
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.jdbc
import java.sql.Types
import org.apache.spark.sql.types._
private case object OracleDialect extends JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle")
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
// Handle NUMBER fields that have no precision/scale in special way
// because JDBC ResultSetMetaData converts this to 0 procision and -127 scale
// For more details, please see
// https://github.com/apache/spark/pull/8780#issuecomment-145598968
// and
// https://github.com/apache/spark/pull/8780#issuecomment-144541760
if (sqlType == Types.NUMERIC && size == 0) {
// This is sub-optimal as we have to pick a precision/scale in advance whereas the data
// in Oracle is allowed to have different precision/scale for each value.
Option(DecimalType(DecimalType.MAX_PRECISION, 10))
} else {
None
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.jdbc
import java.sql.Types
import org.apache.spark.sql.types._
private object PostgresDialect extends JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:postgresql")
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
if (sqlType == Types.BIT && typeName.equals("bit") && size != 1) {
Option(BinaryType)
} else if (sqlType == Types.OTHER && typeName.equals("cidr")) {
Option(StringType)
} else if (sqlType == Types.OTHER && typeName.equals("inet")) {
Option(StringType)
} else if (sqlType == Types.OTHER && typeName.equals("json")) {
Option(StringType)
} else if (sqlType == Types.OTHER && typeName.equals("jsonb")) {
Option(StringType)
} else None
}
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case StringType => Some(JdbcType("TEXT", java.sql.Types.CHAR))
case BinaryType => Some(JdbcType("BYTEA", java.sql.Types.BINARY))
case BooleanType => Some(JdbcType("BOOLEAN", java.sql.Types.BOOLEAN))
case _ => None
}
override def getTableExistsQuery(table: String): String = {
s"SELECT 1 FROM $table LIMIT 1"
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment