Skip to content
Snippets Groups Projects
Commit 8db0f7e2 authored by Thomas Graves's avatar Thomas Graves
Browse files

SPARK-1557 Set permissions on event log files/directories

This adds minimal setting of event log directory/files permissions.  To have a secure environment the user must manually create the top level event log directory and set permissions up.   We can add logic to do that automatically later if we want.

Author: Thomas Graves <tgraves@apache.org>

Closes #538 from tgravescs/SPARK-1557 and squashes the following commits:

e471d8e [Thomas Graves] rework
d8b6620 [Thomas Graves] update use of octal
3ca9b79 [Thomas Graves] Updated based on comments
5a09709 [Thomas Graves] add in missing import
3150ed6 [Thomas Graves] SPARK-1557 Set permissions on event log files/directories
parent 9a1184a8
No related branches found
No related tags found
No related merge requests found
...@@ -21,6 +21,7 @@ import scala.collection.mutable ...@@ -21,6 +21,7 @@ import scala.collection.mutable
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.permission.FsPermission
import org.json4s.jackson.JsonMethods._ import org.json4s.jackson.JsonMethods._
import org.apache.spark.{Logging, SparkConf, SparkContext} import org.apache.spark.{Logging, SparkConf, SparkContext}
...@@ -54,7 +55,7 @@ private[spark] class EventLoggingListener( ...@@ -54,7 +55,7 @@ private[spark] class EventLoggingListener(
private val logger = private val logger =
new FileLogger(logDir, conf, hadoopConfiguration, outputBufferSize, shouldCompress, new FileLogger(logDir, conf, hadoopConfiguration, outputBufferSize, shouldCompress,
shouldOverwrite) shouldOverwrite, Some(LOG_FILE_PERMISSIONS))
/** /**
* Begin logging events. * Begin logging events.
...@@ -124,6 +125,9 @@ private[spark] object EventLoggingListener extends Logging { ...@@ -124,6 +125,9 @@ private[spark] object EventLoggingListener extends Logging {
val SPARK_VERSION_PREFIX = "SPARK_VERSION_" val SPARK_VERSION_PREFIX = "SPARK_VERSION_"
val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_" val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_"
val APPLICATION_COMPLETE = "APPLICATION_COMPLETE" val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
val LOG_FILE_PERMISSIONS: FsPermission =
FsPermission.createImmutable(Integer.parseInt("770", 8).toShort)
// A cache for compression codecs to avoid creating the same codec many times // A cache for compression codecs to avoid creating the same codec many times
private val codecMap = new mutable.HashMap[String, CompressionCodec] private val codecMap = new mutable.HashMap[String, CompressionCodec]
......
...@@ -24,6 +24,7 @@ import java.util.Date ...@@ -24,6 +24,7 @@ import java.util.Date
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.spark.{Logging, SparkConf} import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.io.CompressionCodec import org.apache.spark.io.CompressionCodec
...@@ -42,7 +43,8 @@ private[spark] class FileLogger( ...@@ -42,7 +43,8 @@ private[spark] class FileLogger(
hadoopConfiguration: Configuration, hadoopConfiguration: Configuration,
outputBufferSize: Int = 8 * 1024, // 8 KB outputBufferSize: Int = 8 * 1024, // 8 KB
compress: Boolean = false, compress: Boolean = false,
overwrite: Boolean = true) overwrite: Boolean = true,
dirPermissions: Option[FsPermission] = None)
extends Logging { extends Logging {
private val dateFormat = new ThreadLocal[SimpleDateFormat]() { private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
...@@ -79,16 +81,25 @@ private[spark] class FileLogger( ...@@ -79,16 +81,25 @@ private[spark] class FileLogger(
if (!fileSystem.mkdirs(path)) { if (!fileSystem.mkdirs(path)) {
throw new IOException("Error in creating log directory: %s".format(logDir)) throw new IOException("Error in creating log directory: %s".format(logDir))
} }
if (dirPermissions.isDefined) {
val fsStatus = fileSystem.getFileStatus(path)
if (fsStatus.getPermission().toShort() != dirPermissions.get.toShort) {
fileSystem.setPermission(path, dirPermissions.get)
}
}
} }
/** /**
* Create a new writer for the file identified by the given path. * Create a new writer for the file identified by the given path.
* If the permissions are not passed in, it will default to use the permissions
* (dirpermissions) used when class was instantiated.
*/ */
private def createWriter(fileName: String): PrintWriter = { private def createWriter(fileName: String, perms: Option[FsPermission] = None): PrintWriter = {
val logPath = logDir + "/" + fileName val logPath = logDir + "/" + fileName
val uri = new URI(logPath) val uri = new URI(logPath)
val defaultFs = FileSystem.getDefaultUri(hadoopConfiguration).getScheme val defaultFs = FileSystem.getDefaultUri(hadoopConfiguration).getScheme
val isDefaultLocal = (defaultFs == null || defaultFs == "file") val isDefaultLocal = (defaultFs == null || defaultFs == "file")
val path = new Path(logPath)
/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844). /* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844).
* Therefore, for local files, use FileOutputStream instead. */ * Therefore, for local files, use FileOutputStream instead. */
...@@ -97,11 +108,11 @@ private[spark] class FileLogger( ...@@ -97,11 +108,11 @@ private[spark] class FileLogger(
// Second parameter is whether to append // Second parameter is whether to append
new FileOutputStream(uri.getPath, !overwrite) new FileOutputStream(uri.getPath, !overwrite)
} else { } else {
val path = new Path(logPath)
hadoopDataStream = Some(fileSystem.create(path, overwrite)) hadoopDataStream = Some(fileSystem.create(path, overwrite))
hadoopDataStream.get hadoopDataStream.get
} }
perms.orElse(dirPermissions).foreach {p => fileSystem.setPermission(path, p)}
val bstream = new BufferedOutputStream(dstream, outputBufferSize) val bstream = new BufferedOutputStream(dstream, outputBufferSize)
val cstream = if (compress) compressionCodec.compressedOutputStream(bstream) else bstream val cstream = if (compress) compressionCodec.compressedOutputStream(bstream) else bstream
new PrintWriter(cstream) new PrintWriter(cstream)
...@@ -152,15 +163,16 @@ private[spark] class FileLogger( ...@@ -152,15 +163,16 @@ private[spark] class FileLogger(
/** /**
* Start a writer for a new file, closing the existing one if it exists. * Start a writer for a new file, closing the existing one if it exists.
* @param fileName Name of the new file, defaulting to the file index if not provided. * @param fileName Name of the new file, defaulting to the file index if not provided.
* @param perms Permissions to put on the new file.
*/ */
def newFile(fileName: String = "") { def newFile(fileName: String = "", perms: Option[FsPermission] = None) {
fileIndex += 1 fileIndex += 1
writer.foreach(_.close()) writer.foreach(_.close())
val name = fileName match { val name = fileName match {
case "" => fileIndex.toString case "" => fileIndex.toString
case _ => fileName case _ => fileName
} }
writer = Some(createWriter(name)) writer = Some(createWriter(name, perms))
} }
/** /**
......
...@@ -7,6 +7,8 @@ Spark currently supports authentication via a shared secret. Authentication can ...@@ -7,6 +7,8 @@ Spark currently supports authentication via a shared secret. Authentication can
The Spark UI can also be secured by using javax servlet filters. A user may want to secure the UI if it has data that other users should not be allowed to see. The javax servlet filter specified by the user can authenticate the user and then once the user is logged in, Spark can compare that user versus the view acls to make sure they are authorized to view the UI. The configs 'spark.ui.acls.enable' and 'spark.ui.view.acls' control the behavior of the acls. Note that the person who started the application always has view access to the UI. The Spark UI can also be secured by using javax servlet filters. A user may want to secure the UI if it has data that other users should not be allowed to see. The javax servlet filter specified by the user can authenticate the user and then once the user is logged in, Spark can compare that user versus the view acls to make sure they are authorized to view the UI. The configs 'spark.ui.acls.enable' and 'spark.ui.view.acls' control the behavior of the acls. Note that the person who started the application always has view access to the UI.
If your applications are using event logging, the directory where the event logs go (`spark.eventLog.dir`) should be manually created and have the proper permissions set on it. If you want those log files secure, the permissions should be set to drwxrwxrwxt for that directory. The owner of the directory should be the super user who is running the history server and the group permissions should be restricted to super user group. This will allow all users to write to the directory but will prevent unprivileged users from removing or renaming a file unless they own the file or directory. The event log files will be created by Spark with permissions such that only the user and group have read and write access.
For Spark on Yarn deployments, configuring `spark.authenticate` to true will automatically handle generating and distributing the shared secret. Each application will use a unique shared secret. The Spark UI uses the standard YARN web application proxy mechanism and will authenticate via any installed Hadoop filters. If an authentication filter is enabled, the acls controls can be used by control which users can via the Spark UI. For Spark on Yarn deployments, configuring `spark.authenticate` to true will automatically handle generating and distributing the shared secret. Each application will use a unique shared secret. The Spark UI uses the standard YARN web application proxy mechanism and will authenticate via any installed Hadoop filters. If an authentication filter is enabled, the acls controls can be used by control which users can via the Spark UI.
For other types of Spark deployments, the spark config `spark.authenticate.secret` should be configured on each of the nodes. This secret will be used by all the Master/Workers and applications. The UI can be secured using a javax servlet filter installed via `spark.ui.filters`. If an authentication filter is enabled, the acls controls can be used by control which users can via the Spark UI. For other types of Spark deployments, the spark config `spark.authenticate.secret` should be configured on each of the nodes. This secret will be used by all the Master/Workers and applications. The UI can be secured using a javax servlet filter installed via `spark.ui.filters`. If an authentication filter is enabled, the acls controls can be used by control which users can via the Spark UI.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment