diff --git a/python/pyspark/ml/param/__init__.py b/python/pyspark/ml/param/__init__.py
index ade4864e1d7858c03c4079fa9cd070d7c0163f6d..205b8d516a6f9e05e3f908f6607d898e603be015 100644
--- a/python/pyspark/ml/param/__init__.py
+++ b/python/pyspark/ml/param/__init__.py
@@ -385,6 +385,7 @@ class Params(Identifiable):
             extra = dict()
         that = copy.copy(self)
         that._paramMap = {}
+        that._defaultParamMap = {}
         return self._copyValues(that, extra)
 
     def _shouldOwn(self, param):
@@ -465,12 +466,16 @@ class Params(Identifiable):
         :param extra: extra params to be copied
         :return: the target instance with param values copied
         """
-        if extra is None:
-            extra = dict()
-        paramMap = self.extractParamMap(extra)
-        for p in self.params:
-            if p in paramMap and to.hasParam(p.name):
-                to._set(**{p.name: paramMap[p]})
+        paramMap = self._paramMap.copy()
+        if extra is not None:
+            paramMap.update(extra)
+        for param in self.params:
+            # copy default params
+            if param in self._defaultParamMap and to.hasParam(param.name):
+                to._defaultParamMap[to.getParam(param.name)] = self._defaultParamMap[param]
+            # copy explicitly set params
+            if param in paramMap and to.hasParam(param.name):
+                to._set(**{param.name: paramMap[param]})
         return to
 
     def _resetUid(self, newUid):
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index 68f5bc30ac57f854485c682dad3c30926b1bb313..46be031ee8ff13adedc70b505cb056c0127a5268 100755
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -389,6 +389,22 @@ class ParamTests(PySparkTestCase):
         # Check windowSize is set properly
         self.assertEqual(model.getWindowSize(), 6)
 
+    def test_copy_param_extras(self):
+        tp = TestParams(seed=42)
+        extra = {tp.getParam(TestParams.inputCol.name): "copy_input"}
+        tp_copy = tp.copy(extra=extra)
+        self.assertEqual(tp.uid, tp_copy.uid)
+        self.assertEqual(tp.params, tp_copy.params)
+        for k, v in extra.items():
+            self.assertTrue(tp_copy.isDefined(k))
+            self.assertEqual(tp_copy.getOrDefault(k), v)
+        copied_no_extra = {}
+        for k, v in tp_copy._paramMap.items():
+            if k not in extra:
+                copied_no_extra[k] = v
+        self.assertEqual(tp._paramMap, copied_no_extra)
+        self.assertEqual(tp._defaultParamMap, tp_copy._defaultParamMap)
+
 
 class EvaluatorTests(SparkSessionTestCase):