diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java b/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
index 1c16da982923bc31aea70b33e760b48f7e35af07..0d6b215fe5aaf5355dd1fd198ab54a12c90ccb99 100644
--- a/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
+++ b/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
@@ -107,12 +107,27 @@ public final class Platform {
 
   public static void copyMemory(
     Object src, long srcOffset, Object dst, long dstOffset, long length) {
-    while (length > 0) {
-      long size = Math.min(length, UNSAFE_COPY_THRESHOLD);
-      _UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size);
-      length -= size;
-      srcOffset += size;
-      dstOffset += size;
+    // Check if dstOffset is before or after srcOffset to determine if we should copy
+    // forward or backwards. This is necessary in case src and dst overlap.
+    if (dstOffset < srcOffset) {
+      while (length > 0) {
+        long size = Math.min(length, UNSAFE_COPY_THRESHOLD);
+        _UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size);
+        length -= size;
+        srcOffset += size;
+        dstOffset += size;
+      }
+    } else {
+      srcOffset += length;
+      dstOffset += length;
+      while (length > 0) {
+        long size = Math.min(length, UNSAFE_COPY_THRESHOLD);
+        srcOffset -= size;
+        dstOffset -= size;
+        _UNSAFE.copyMemory(src, srcOffset, dst, dstOffset, size);
+        length -= size;
+      }
+
     }
   }
 
diff --git a/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java b/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
new file mode 100644
index 0000000000000000000000000000000000000000..693ec6ec58dbdf71347569a4ab3fd95eadc34911
--- /dev/null
+++ b/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PlatformUtilSuite {
+
+  @Test
+  public void overlappingCopyMemory() {
+    byte[] data = new byte[3 * 1024 * 1024];
+    int size = 2 * 1024 * 1024;
+    for (int i = 0; i < data.length; ++i) {
+      data[i] = (byte)i;
+    }
+
+    Platform.copyMemory(data, Platform.BYTE_ARRAY_OFFSET, data, Platform.BYTE_ARRAY_OFFSET, size);
+    for (int i = 0; i < data.length; ++i) {
+      Assert.assertEquals((byte)i, data[i]);
+    }
+
+    Platform.copyMemory(
+        data,
+        Platform.BYTE_ARRAY_OFFSET + 1,
+        data,
+        Platform.BYTE_ARRAY_OFFSET,
+        size);
+    for (int i = 0; i < size; ++i) {
+      Assert.assertEquals((byte)(i + 1), data[i]);
+    }
+
+    for (int i = 0; i < data.length; ++i) {
+      data[i] = (byte)i;
+    }
+    Platform.copyMemory(
+        data,
+        Platform.BYTE_ARRAY_OFFSET,
+        data,
+        Platform.BYTE_ARRAY_OFFSET + 1,
+        size);
+    for (int i = 0; i < size; ++i) {
+      Assert.assertEquals((byte)i, data[i + 1]);
+    }
+  }
+}