diff --git a/python/pyspark/ml/param/__init__.py b/python/pyspark/ml/param/__init__.py
index dc3d23ff1661dd73c8de3d30f4ab6e1a4213efed..99d8fa3a5b73e06e7a9442cfd0cf3d983b60ef7f 100644
--- a/python/pyspark/ml/param/__init__.py
+++ b/python/pyspark/ml/param/__init__.py
@@ -372,6 +372,7 @@ class Params(Identifiable):
             extra = dict()
         that = copy.copy(self)
         that._paramMap = {}
+        that._defaultParamMap = {}
         return self._copyValues(that, extra)
 
     def _shouldOwn(self, param):
@@ -452,12 +453,16 @@ class Params(Identifiable):
         :param extra: extra params to be copied
         :return: the target instance with param values copied
         """
-        if extra is None:
-            extra = dict()
-        paramMap = self.extractParamMap(extra)
-        for p in self.params:
-            if p in paramMap and to.hasParam(p.name):
-                to._set(**{p.name: paramMap[p]})
+        paramMap = self._paramMap.copy()
+        if extra is not None:
+            paramMap.update(extra)
+        for param in self.params:
+            # copy default params
+            if param in self._defaultParamMap and to.hasParam(param.name):
+                to._defaultParamMap[to.getParam(param.name)] = self._defaultParamMap[param]
+            # copy explicitly set params
+            if param in paramMap and to.hasParam(param.name):
+                to._set(**{param.name: paramMap[param]})
         return to
 
     def _resetUid(self, newUid):
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index 53204cde29b74bd394d94cd19c186b3074fe0316..293c6c0b0f36a3168f89f8e3fc3f714fafab6000 100755
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -389,6 +389,22 @@ class ParamTests(PySparkTestCase):
         # Check windowSize is set properly
         self.assertEqual(model.getWindowSize(), 6)
 
+    def test_copy_param_extras(self):
+        tp = TestParams(seed=42)
+        extra = {tp.getParam(TestParams.inputCol.name): "copy_input"}
+        tp_copy = tp.copy(extra=extra)
+        self.assertEqual(tp.uid, tp_copy.uid)
+        self.assertEqual(tp.params, tp_copy.params)
+        for k, v in extra.items():
+            self.assertTrue(tp_copy.isDefined(k))
+            self.assertEqual(tp_copy.getOrDefault(k), v)
+        copied_no_extra = {}
+        for k, v in tp_copy._paramMap.items():
+            if k not in extra:
+                copied_no_extra[k] = v
+        self.assertEqual(tp._paramMap, copied_no_extra)
+        self.assertEqual(tp._defaultParamMap, tp_copy._defaultParamMap)
+
 
 class EvaluatorTests(SparkSessionTestCase):