From b6a81f4720752efe459860d28d7f8f738b2944c3 Mon Sep 17 00:00:00 2001
From: Burak Yavuz <brkyvz@gmail.com>
Date: Thu, 15 Dec 2016 14:26:54 -0800
Subject: [PATCH] [SPARK-18888] partitionBy in DataStreamWriter in Python
 throws _to_seq not defined

## What changes were proposed in this pull request?

`_to_seq` wasn't imported.

## How was this patch tested?

Added partitionBy to existing write path unit test

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #16297 from brkyvz/SPARK-18888.
---
 python/pyspark/sql/streaming.py | 1 +
 python/pyspark/sql/tests.py     | 7 ++++---
 2 files changed, 5 insertions(+), 3 deletions(-)

diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index eabd5ef54c..5014299ad2 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -28,6 +28,7 @@ from abc import ABCMeta, abstractmethod
 
 from pyspark import since, keyword_only
 from pyspark.rdd import ignore_unicode_prefix
+from pyspark.sql.column import _to_seq
 from pyspark.sql.readwriter import OptionUtils, to_str
 from pyspark.sql.types import *
 from pyspark.sql.utils import StreamingQueryException
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 115b4a9bef..6de63e6493 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -50,7 +50,7 @@ from pyspark.sql import SparkSession, HiveContext, Column, Row
 from pyspark.sql.types import *
 from pyspark.sql.types import UserDefinedType, _infer_type
 from pyspark.tests import ReusedPySparkTestCase, SparkSubmitTests
-from pyspark.sql.functions import UserDefinedFunction, sha2
+from pyspark.sql.functions import UserDefinedFunction, sha2, lit
 from pyspark.sql.window import Window
 from pyspark.sql.utils import AnalysisException, ParseException, IllegalArgumentException
 
@@ -1056,7 +1056,8 @@ class SQLTests(ReusedPySparkTestCase):
         self.assertEqual(df.schema.simpleString(), "struct<data:string>")
 
     def test_stream_save_options(self):
-        df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
+        df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') \
+            .withColumn('id', lit(1))
         for q in self.spark._wrapped.streams.active:
             q.stop()
         tmpPath = tempfile.mkdtemp()
@@ -1065,7 +1066,7 @@ class SQLTests(ReusedPySparkTestCase):
         out = os.path.join(tmpPath, 'out')
         chk = os.path.join(tmpPath, 'chk')
         q = df.writeStream.option('checkpointLocation', chk).queryName('this_query') \
-            .format('parquet').outputMode('append').option('path', out).start()
+            .format('parquet').partitionBy('id').outputMode('append').option('path', out).start()
         try:
             self.assertEqual(q.name, 'this_query')
             self.assertTrue(q.isActive)
-- 
GitLab