Skip to content
Snippets Groups Projects
Commit 1fcefef0 authored by Liang-Chi Hsieh's avatar Liang-Chi Hsieh Committed by Reynold Xin
Browse files

[SPARK-10446][SQL] Support to specify join type when calling join with usingColumns

JIRA: https://issues.apache.org/jira/browse/SPARK-10446

Currently the method `join(right: DataFrame, usingColumns: Seq[String])` only supports inner join. It is more convenient to have it support other join types.

Author: Liang-Chi Hsieh <viirya@appier.com>

Closes #8600 from viirya/usingcolumns_df.
parent 781b21ba
No related branches found
No related tags found
No related merge requests found
...@@ -567,7 +567,11 @@ class DataFrame(object): ...@@ -567,7 +567,11 @@ class DataFrame(object):
if on is None or len(on) == 0: if on is None or len(on) == 0:
jdf = self._jdf.join(other._jdf) jdf = self._jdf.join(other._jdf)
elif isinstance(on[0], basestring): elif isinstance(on[0], basestring):
jdf = self._jdf.join(other._jdf, self._jseq(on)) if how is None:
jdf = self._jdf.join(other._jdf, self._jseq(on), "inner")
else:
assert isinstance(how, basestring), "how should be basestring"
jdf = self._jdf.join(other._jdf, self._jseq(on), how)
else: else:
assert isinstance(on[0], Column), "on should be Column or list of Column" assert isinstance(on[0], Column), "on should be Column or list of Column"
if len(on) > 1: if len(on) > 1:
......
...@@ -484,6 +484,26 @@ class DataFrame private[sql]( ...@@ -484,6 +484,26 @@ class DataFrame private[sql](
* @since 1.4.0 * @since 1.4.0
*/ */
def join(right: DataFrame, usingColumns: Seq[String]): DataFrame = { def join(right: DataFrame, usingColumns: Seq[String]): DataFrame = {
join(right, usingColumns, "inner")
}
/**
* Equi-join with another [[DataFrame]] using the given columns.
*
* Different from other join functions, the join columns will only appear once in the output,
* i.e. similar to SQL's `JOIN USING` syntax.
*
* Note that if you perform a self-join using this function without aliasing the input
* [[DataFrame]]s, you will NOT be able to reference any columns after the join, since
* there is no way to disambiguate which side of the join you would like to reference.
*
* @param right Right side of the join operation.
* @param usingColumns Names of the columns to join on. This columns must exist on both sides.
* @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `leftsemi`.
* @group dfops
* @since 1.6.0
*/
def join(right: DataFrame, usingColumns: Seq[String], joinType: String): DataFrame = {
// Analyze the self join. The assumption is that the analyzer will disambiguate left vs right // Analyze the self join. The assumption is that the analyzer will disambiguate left vs right
// by creating a new instance for one of the branch. // by creating a new instance for one of the branch.
val joined = sqlContext.executePlan( val joined = sqlContext.executePlan(
...@@ -502,7 +522,7 @@ class DataFrame private[sql]( ...@@ -502,7 +522,7 @@ class DataFrame private[sql](
Join( Join(
joined.left, joined.left,
joined.right, joined.right,
joinType = Inner, joinType = JoinType(joinType),
condition) condition)
) )
} }
......
...@@ -42,6 +42,19 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { ...@@ -42,6 +42,19 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
Row(1, 2, "1", "2") :: Row(2, 3, "2", "3") :: Row(3, 4, "3", "4") :: Nil) Row(1, 2, "1", "2") :: Row(2, 3, "2", "3") :: Row(3, 4, "3", "4") :: Nil)
} }
test("join - join using multiple columns and specifying join type") {
val df = Seq(1, 2, 3).map(i => (i, i + 1, i.toString)).toDF("int", "int2", "str")
val df2 = Seq(1, 2, 3).map(i => (i, i + 1, (i + 1).toString)).toDF("int", "int2", "str")
checkAnswer(
df.join(df2, Seq("int", "str"), "left"),
Row(1, 2, "1", null) :: Row(2, 3, "2", null) :: Row(3, 4, "3", null) :: Nil)
checkAnswer(
df.join(df2, Seq("int", "str"), "right"),
Row(null, null, null, 2) :: Row(null, null, null, 3) :: Row(null, null, null, 4) :: Nil)
}
test("join - join using self join") { test("join - join using self join") {
val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str") val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment