diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index 86de90984ca00045c4e9ef1502fb7b9f85615e89..56994fafe064b3890b75d1a0c73dc500f0325e7f 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -550,7 +550,7 @@ public final class UnsafeRow extends InternalRow implements Externalizable, Kryo
    */
   public void writeToStream(OutputStream out, byte[] writeBuffer) throws IOException {
     if (baseObject instanceof byte[]) {
-      int offsetInByteArray = (int) (Platform.BYTE_ARRAY_OFFSET - baseOffset);
+      int offsetInByteArray = (int) (baseOffset - Platform.BYTE_ARRAY_OFFSET);
       out.write((byte[]) baseObject, offsetInByteArray, sizeInBytes);
     } else {
       int dataRemaining = sizeInBytes;
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
index a32763db054f3da1c1c643e34bf6d36ac5ea8643..a5f904c621e6ece72cbefd81c61f79985534a232 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
@@ -101,9 +101,22 @@ class UnsafeRowSuite extends SparkFunSuite {
         MemoryAllocator.UNSAFE.free(offheapRowPage)
       }
     }
+    val (bytesFromArrayBackedRowWithOffset, field0StringFromArrayBackedRowWithOffset) = {
+      val baos = new ByteArrayOutputStream()
+      val numBytes = arrayBackedUnsafeRow.getSizeInBytes
+      val bytesWithOffset = new Array[Byte](numBytes + 100)
+      System.arraycopy(arrayBackedUnsafeRow.getBaseObject.asInstanceOf[Array[Byte]], 0,
+        bytesWithOffset, 100, numBytes)
+      val arrayBackedRow = new UnsafeRow(arrayBackedUnsafeRow.numFields())
+      arrayBackedRow.pointTo(bytesWithOffset, Platform.BYTE_ARRAY_OFFSET + 100, numBytes)
+      arrayBackedRow.writeToStream(baos, null)
+      (baos.toByteArray, arrayBackedRow.getString(0))
+    }
 
     assert(bytesFromArrayBackedRow === bytesFromOffheapRow)
     assert(field0StringFromArrayBackedRow === field0StringFromOffheapRow)
+    assert(bytesFromArrayBackedRow === bytesFromArrayBackedRowWithOffset)
+    assert(field0StringFromArrayBackedRow === field0StringFromArrayBackedRowWithOffset)
   }
 
   test("calling getDouble() and getFloat() on null columns") {