Spark的spark 您所在的位置:网站首页 spark是干用的 Spark的spark

Spark的spark

2024-07-18 00:43| 来源: 网络整理| 查看: 265

blockmgr-* blockmgr-* 是谁创建的 grep blockmgr-83325444-bc05-491e-8db4-752c5b282d0a stderr 21/09/01 11:17:15 INFO storage.DiskBlockManager: Created local directory at /var/tmp/mesos/slaves/2a06b6b8-3000-40f9-9008-51caaef53399-S1/frameworks/2a06b6b8-3000-40f9-9008-51caaef53399-0000/executors/0/runs/155527cb-0086-46b9-9de5-ff718c092c8b/blockmgr-83325444-bc05-491e-8db4-752c5b282d0a

可以看到blockmgr-83325444-bc05-491e-8db4-752c5b282d0a是DiskBlockManager创建的

/** * Create local directories for storing block data. These directories are * located inside configured local directories and won't * be deleted on JVM exit when using the external shuffle service. */ private def createLocalDirs(conf: SparkConf): Array[File] = { Utils.getConfiguredLocalDirs(conf).flatMap { rootDir => try { val localDir = Utils.createDirectory(rootDir, "blockmgr") // 看这个日志是不是很醒目,和上边打印的一模一样,没错就是它 logInfo(s"Created local directory at $localDir") Some(localDir) } catch { case e: IOException => logError(s"Failed to create local dir in $rootDir. Ignoring this directory.", e) None } } }

看这个方法的注释,主要有两点内容:

这个目录是存放block data的(这里描述的还不够清楚)当开启external shuffle service的时候不会删除blockmgr-目录

问题:这里留一个问题,这个目录在哪配置的?

/** * Creates and maintains the logical mapping between logical blocks and physical on-disk * locations. One block is mapped to one file with a name given by its BlockId. * * Block files are hashed among the directories listed in spark.local.dir (or in * SPARK_LOCAL_DIRS, if it's set). */ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolean) extends Logging { private[spark] val subDirsPerLocalDir = conf.getInt("spark.diskStore.subDirectories", 64) /* Create one local directory for each path mentioned in spark.local.dir; then, inside this * directory, create multiple subdirectories that we will hash files into, in order to avoid * having really large inodes at the top level. */ private[spark] val localDirs: Array[File] = createLocalDirs(conf) if (localDirs.isEmpty) { logError("Failed to create any local dir.") System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) }

这一段代码首先解决了上述问题,这个目录是通过spark.local.dir或环境变量SPARK_LOCAL_DIRS配置的

blockmgr-* 放了什么东西 缓存shuffle文件 其中一个目录里有rdd_59_12, 这个rdd是我缓存进去的 在这里插入图片描述 在这里插入图片描述 block id是能对的上的 du -h -d 1 81M ./blockmgr-83325444-bc05-491e-8db4-752c5b282d0a

大小基本是相等的。

shuffle文件的话要找机会制作了例子

blockmgr-* 什么时候创建的 /** * Manager running on every node (driver and executors) which provides interfaces for putting and * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap). * * Note that [[initialize()]] must be called before the BlockManager is usable. */ private[spark] class BlockManager( executorId: String, rpcEnv: RpcEnv, val master: BlockManagerMaster, val serializerManager: SerializerManager, val conf: SparkConf, memoryManager: MemoryManager, mapOutputTracker: MapOutputTracker, shuffleManager: ShuffleManager, val blockTransferService: BlockTransferService, securityManager: SecurityManager, numUsableCores: Int) extends BlockDataManager with BlockEvictionHandler with Logging { private[spark] val externalShuffleServiceEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED) private val remoteReadNioBufferConversion = conf.getBoolean("spark.network.remoteReadNioBufferConversion", false) val diskBlockManager = { // Only perform cleanup if an external service is not serving our shuffle files. val deleteFilesOnStop = !externalShuffleServiceEnabled || executorId == SparkContext.DRIVER_IDENTIFIER // 初始化DiskBlockManager new DiskBlockManager(conf, deleteFilesOnStop) }

从这看是初始化BlockManager的时候初始化的DiskBlockManager

/** * Spark executor, backed by a threadpool to run tasks. * * This can be used with Mesos, YARN, and the standalone scheduler. * An internal RPC interface is used for communication with the driver, * except in the case of Mesos fine-grained mode. */ private[spark] class Executor( executorId: String, executorHostname: String, env: SparkEnv, userClassPath: Seq[URL] = Nil, isLocal: Boolean = false, uncaughtExceptionHandler: UncaughtExceptionHandler = new SparkUncaughtExceptionHandler) extends Logging { logInfo(s"Starting executor ID $executorId on host $executorHostname") ... if (!isLocal) { // 调用initialize, env.blockManager.initialize(conf.getAppId) env.metricsSystem.registerSource(executorSource) env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource) }

在初始化Executor的时候调用实例方法initialize,说明在初始化Executor之前就已经初始化了BlockManager,开代码是在SparkEnv中

object SparkEnv extends Logging { @volatile private var env: SparkEnv = _ private[spark] val driverSystemName = "sparkDriver" private[spark] val executorSystemName = "sparkExecutor" def set(e: SparkEnv) { env = e } /** * Returns the SparkEnv. */ def get: SparkEnv = { env } /** * Create a SparkEnv for an executor. * In coarse-grained mode, the executor provides an RpcEnv that is already instantiated. */ private[spark] def createExecutorEnv( conf: SparkConf, executorId: String, hostname: String, numCores: Int, ioEncryptionKey: Option[Array[Byte]], isLocal: Boolean): SparkEnv = { val env = create( conf, executorId, hostname, hostname, None, isLocal, numCores, ioEncryptionKey ) SparkEnv.set(env) env } /** * Helper method to create a SparkEnv for a driver or an executor. */ private def create( conf: SparkConf, executorId: String, bindAddress: String, advertiseAddress: String, port: Option[Int], isLocal: Boolean, numUsableCores: Int, ioEncryptionKey: Option[Array[Byte]], listenerBus: LiveListenerBus = null, mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = { // NB: blockManager is not valid until initialize() is called later. val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster, serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager, blockTransferService, securityManager, numUsableCores)

那么是谁调用了SparkEnv.createExecutorEnv?

private[spark] object CoarseGrainedExecutorBackend extends Logging { private def run( driverUrl: String, executorId: String, hostname: String, cores: Int, appId: String, workerUrl: Option[String], userClassPath: Seq[URL]) { Utils.initDaemon(log) SparkHadoopUtil.get.runAsSparkUser { () => // Debug code Utils.checkHost(hostname) // 创建Executor环境信息 val env = SparkEnv.createExecutorEnv( driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false) } } def main(args: Array[String]) { var driverUrl: String = null var executorId: String = null var hostname: String = null var cores: Int = 0 var appId: String = null var workerUrl: Option[String] = None val userClassPath = new mutable.ListBuffer[URL]() // main方法启动的时候调用 run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath) System.exit(0) }

最终找到了CoarseGrainedExecutorBackend类,再向前推请参考https://blog.csdn.net/chic_data/article/details/77317730

spark-* spark-* 是谁创建的 grep spark-ef0819de-ff5e-49eb-b3f8-5daa990a37a2 stderr 21/09/01 11:19:08 INFO util.Utils: Fetching spark://ip-10-211-173-146.ap-northeast-2.compute.internal:31266/files/com.eclipsesource.minimal-json_minimal-json-0.9.4.jar to /var/tmp/mesos/slaves/2a06b6b8-3000-40f9-9008-51caaef53399-S1/frameworks/2a06b6b8-3000-40f9-9008-51caaef53399-0000/executors/0/runs/155527cb-0086-46b9-9de5-ff718c092c8b/spark-ef0819de-ff5e-49eb-b3f8-5daa990a37a2/fetchFileTemp4775646530125357578.tmp 21/09/01 11:19:08 INFO util.Utils: Copying /var/tmp/mesos/slaves/2a06b6b8-3000-40f9-9008-51caaef53399-S1/frameworks/2a06b6b8-3000-40f9-9008-51caaef53399-0000/executors/0/runs/155527cb-0086-46b9-9de5-ff718c092c8b/spark-ef0819de-ff5e-49eb-b3f8-5daa990a37a2/-18725443601630462630480_cache to /var/tmp/mesos/slaves/2a06b6b8-3000-40f9-9008-51caaef53399-S1/frameworks/2a06b6b8-3000-40f9-9008-51caaef53399-0000/executors/0/runs/155527cb-0086-46b9-9de5-ff718c092c8b/./com.eclipsesource.minimal-json_minimal-json-0.9.4.jar ...

打开日志文件仔细看一下:

21/09/01 11:19:08 INFO executor.Executor: Fetching spark://ip-10-211-173-146.ap-northeast-2.compute.internal:31266/files/com.eclipsesource.minimal-json_minimal-json-0.9.4.jar with timestamp 1630462630480 21/09/01 11:19:08 INFO client.TransportClientFactory: Successfully created connection to ip-10-211-173-146.ap-northeast-2.compute.internal/10.211.173.146:31266 after 2 ms (0 ms spent in bootstraps) 21/09/01 11:19:08 INFO util.Utils: Fetching spark://ip-10-211-173-146.ap-northeast-2.compute.internal:31266/files/com.eclipsesource.minimal-json_minimal-json-0.9.4.jar to /var/tmp/mesos/slaves/2a06b6b8-3000-40f9-9008-51caaef53399-S1/frameworks/2a06b6b8-3000-40f9-9008-51caaef53399-0000/executors/0/runs/155527cb-0086-46b9-9de5-ff718c092c8b/spark-ef0819de-ff5e-49eb-b3f8-5daa990a37a2/fetchFileTemp4775646530125357578.tmp 21/09/01 11:19:08 INFO util.Utils: Copying /var/tmp/mesos/slaves/2a06b6b8-3000-40f9-9008-51caaef53399-S1/frameworks/2a06b6b8-3000-40f9-9008-51caaef53399-0000/executors/0/runs/155527cb-0086-46b9-9de5-ff718c092c8b/spark-ef0819de-ff5e-49eb-b3f8-5daa990a37a2/-18725443601630462630480_cache to /var/tmp/mesos/slaves/2a06b6b8-3000-40f9-9008-51caaef53399-S1/frameworks/2a06b6b8-3000-40f9-9008-51caaef53399-0000/executors/0/runs/155527cb-0086-46b9-9de5-ff718c092c8b/./com.eclipsesource.minimal-json_minimal-json-0.9.4.jar 21/09/01 11:19:08 INFO executor.Executor: Fetching spark://ip-10-211-173-146.ap-northeast-2.compute.internal:31266/files/org.antlr_ST4-4.0.8.jar with timestamp 1630462630521 ...

日志中可以清楚的看到是Utils类打印的日志,我们去看一下

/** * Download a file or directory to target directory. Supports fetching the file in a variety of * ways, including HTTP, Hadoop-compatible filesystems, and files on a standard filesystem, based * on the URL parameter. Fetching directories is only supported from Hadoop-compatible * filesystems. * * If `useCache` is true, first attempts to fetch the file to a local cache that's shared * across executors running the same application. `useCache` is used mainly for * the executors, and not in local mode. * * Throws SparkException if the target file already exists and has different contents than * the requested file. */ def fetchFile( url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager, hadoopConf: Configuration, timestamp: Long, useCache: Boolean): File = { val fileName = decodeFileNameInURI(new URI(url)) // 目标文件 val targetFile = new File(targetDir, fileName) val fetchCacheEnabled = conf.getBoolean("spark.files.useFetchCache", defaultValue = true) if (useCache && fetchCacheEnabled) { // 缓存文件和锁文件 val cachedFileName = s"${url.hashCode}${timestamp}_cache" val lockFileName = s"${url.hashCode}${timestamp}_lock" // Set the cachedLocalDir for the first time and re-use it later if (cachedLocalDir.isEmpty) { this.synchronized { if (cachedLocalDir.isEmpty) { cachedLocalDir = getLocalDir(conf) } } } val localDir = new File(cachedLocalDir) val lockFile = new File(localDir, lockFileName) val lockFileChannel = new RandomAccessFile(lockFile, "rw").getChannel() // Only one executor entry. // The FileLock is only used to control synchronization for executors download file, // it's always safe regardless of lock type (mandatory or advisory). val lock = lockFileChannel.lock() val cachedFile = new File(localDir, cachedFileName) try { if (!cachedFile.exists()) { // 去下载文件 doFetchFile(url, localDir, cachedFileName, conf, securityMgr, hadoopConf) } } finally { lock.release() lockFileChannel.close() } // 下载完成后,拷贝到目标文件,最后一个参数是false,会打印一行日志 copyFile( url, cachedFile, targetFile, conf.getBoolean("spark.files.overwrite", false) ) } else { doFetchFile(url, targetDir, fileName, conf, securityMgr, hadoopConf) } // Decompress the file if it's a .tar or .tar.gz if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) { logInfo("Untarring " + fileName) executeAndGetOutput(Seq("tar", "-xzf", fileName), targetDir) } else if (fileName.endsWith(".tar")) { logInfo("Untarring " + fileName) executeAndGetOutput(Seq("tar", "-xf", fileName), targetDir) } // Make the file executable - That's necessary for scripts FileUtil.chmod(targetFile.getAbsolutePath, "a+x") // Windows does not grant read permission by default to non-admin users // Add read permission to owner explicitly if (isWindows) { FileUtil.chmod(targetFile.getAbsolutePath, "u+r") } targetFile }

接下来看doFetchFile(url, localDir, cachedFileName, conf, securityMgr, hadoopConf)

/** * Download a file or directory to target directory. Supports fetching the file in a variety of * ways, including HTTP, Hadoop-compatible filesystems, and files on a standard filesystem, based * on the URL parameter. Fetching directories is only supported from Hadoop-compatible * filesystems. * * Throws SparkException if the target file already exists and has different contents than * the requested file. */ def doFetchFile( url: String, targetDir: File, filename: String, conf: SparkConf, securityMgr: SecurityManager, hadoopConf: Configuration): File = { val targetFile = new File(targetDir, filename) val uri = new URI(url) val fileOverwrite = conf.getBoolean("spark.files.overwrite", defaultValue = false) Option(uri.getScheme).getOrElse("file") match { case "spark" => if (SparkEnv.get == null) { throw new IllegalStateException( "Cannot retrieve files with 'spark' scheme without an active SparkEnv.") } val source = SparkEnv.get.rpcEnv.openChannel(url) val is = Channels.newInputStream(source) // 去下载文件 downloadFile(url, is, targetFile, fileOverwrite) case "http" | "https" | "ftp" => var uc: URLConnection = null if (securityMgr.isAuthenticationEnabled()) { logDebug("fetchFile with security enabled") val newuri = constructURIForAuthentication(uri, securityMgr) uc = newuri.toURL().openConnection() uc.setAllowUserInteraction(false) } else { logDebug("fetchFile not using security") uc = new URL(url).openConnection() } val timeoutMs = conf.getTimeAsSeconds("spark.files.fetchTimeout", "60s").toInt * 1000 uc.setConnectTimeout(timeoutMs) uc.setReadTimeout(timeoutMs) uc.connect() val in = uc.getInputStream() downloadFile(url, in, targetFile, fileOverwrite) case "file" => // In the case of a local file, copy the local file to the target directory. // Note the difference between uri vs url. val sourceFile = if (uri.isAbsolute) new File(uri) else new File(url) copyFile(url, sourceFile, targetFile, fileOverwrite) case _ => val fs = getHadoopFileSystem(uri, hadoopConf) val path = new Path(uri) fetchHcfsFile(path, targetDir, fs, conf, hadoopConf, fileOverwrite, filename = Some(filename)) } targetFile } /** * Download `in` to `tempFile`, then move it to `destFile`. * * If `destFile` already exists: * - no-op if its contents equal those of `sourceFile`, * - throw an exception if `fileOverwrite` is false, * - attempt to overwrite it otherwise. * * @param url URL that `sourceFile` originated from, for logging purposes. * @param in InputStream to download. * @param destFile File path to move `tempFile` to. * @param fileOverwrite Whether to delete/overwrite an existing `destFile` that does not match * `sourceFile` */ private def downloadFile( url: String, in: InputStream, destFile: File, fileOverwrite: Boolean): Unit = { val tempFile = File.createTempFile("fetchFileTemp", null, new File(destFile.getParentFile.getAbsolutePath)) // 这个日志熟悉吧,就是上文贴的第一行,把Jar包Fetching到tmp文件 logInfo(s"Fetching $url to $tempFile") try { val out = new FileOutputStream(tempFile) Utils.copyStream(in, out, closeStreams = true) // 这里的copy是把tmp文件copy到cache文件,注意这里的removeSourceFile是true,这个会导致copyFile不打印日志 copyFile(url, tempFile, destFile, fileOverwrite, removeSourceFile = true) } finally { // Catch-all for the couple of cases where for some reason we didn't move `tempFile` to // `destFile`. // 然后再删除tmp文件,一会我们可以验证在spark-*目录里看不到tmp文件,只能看到cache文件和lock文件 if (tempFile.exists()) { tempFile.delete() } } } /** * Copy `sourceFile` to `destFile`. * * If `destFile` already exists: * - no-op if its contents equal those of `sourceFile`, * - throw an exception if `fileOverwrite` is false, * - attempt to overwrite it otherwise. * * @param url URL that `sourceFile` originated from, for logging purposes. * @param sourceFile File path to copy/move from. * @param destFile File path to copy/move to. * @param fileOverwrite Whether to delete/overwrite an existing `destFile` that does not match * `sourceFile` * @param removeSourceFile Whether to remove `sourceFile` after / as part of moving/copying it to * `destFile`. */ private def copyFile( url: String, sourceFile: File, destFile: File, fileOverwrite: Boolean, removeSourceFile: Boolean = false): Unit = { // 第一次肯定不存在,不走这个if if (destFile.exists) { if (!filesEqualRecursive(sourceFile, destFile)) { if (fileOverwrite) { logInfo( s"File $destFile exists and does not match contents of $url, replacing it with $url" ) if (!destFile.delete()) { throw new SparkException( "Failed to delete %s while attempting to overwrite it with %s".format( destFile.getAbsolutePath, sourceFile.getAbsolutePath ) ) } } else { throw new SparkException( s"File $destFile exists and does not match contents of $url") } } else { // Do nothing if the file contents are the same, i.e. this file has been copied // previously. logInfo( "%s has been previously copied to %s".format( sourceFile.getAbsolutePath, destFile.getAbsolutePath ) ) return } } // The file does not exist in the target directory. Copy or move it there. // 第一次copyFile是tmp到cache,是删除sourceFile即tmp文件的,只删除,没有日志 if (removeSourceFile) { Files.move(sourceFile.toPath, destFile.toPath) } else { // 第二次copyFile的时候是cache到具体jar包的,以日志为例是com.eclipsesource.minimal-json_minimal-json-0.9.4.jar,所以就有了Copying这行日志 // util.Utils: Copying /var/tmp/mesos/slaves/2a06b6b8-3000-40f9-9008-51caaef53399-S1/frameworks/2a06b6b8-3000-40f9-9008-51caaef53399-0000/executors/0/runs/155527cb-0086-46b9-9de5-ff718c092c8b/spark-ef0819de-ff5e-49eb-b3f8-5daa990a37a2/-18725443601630462630480_cache to /var/tmp/mesos/slaves/2a06b6b8-3000-40f9-9008-51caaef53399-S1/frameworks/2a06b6b8-3000-40f9-9008-51caaef53399-0000/executors/0/runs/155527cb-0086-46b9-9de5-ff718c092c8b/./com.eclipsesource.minimal-json_minimal-json-0.9.4.jar logInfo(s"Copying ${sourceFile.getAbsolutePath} to ${destFile.getAbsolutePath}") copyRecursive(sourceFile, destFile) } } spark-* 放了什么东西

放了什么东西呢,从日志来看是放了jar包,还有没有别的东西呢? 我们看到是executor调用的Utils的fetchFile方法:

/** * Download any missing dependencies if we receive a new set of files and JARs from the * SparkContext. Also adds any new JARs we fetched to the class loader. */ private def updateDependencies(newFiles: Map[String, Long], newJars: Map[String, Long]) { lazy val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) synchronized { // Fetch missing dependencies for ((name, timestamp) val localName = new URI(name).getPath.split("/").last val currentTimeStamp = currentJars.get(name) .orElse(currentJars.get(localName)) .getOrElse(-1L) if (currentTimeStamp logInfo("Adding " + url + " to class loader") urlClassLoader.addURL(url) } } } } }

看注释是说所有的missing依赖,不仅仅有jar包,对于今天这个例子来说就只有jar包了

spark-* 什么时候创建的

说到什么时候创建的,现在继续向前追溯,找到executor的updateDependencies方法是谁调用的?

override def run(): Unit = { threadId = Thread.currentThread.getId Thread.currentThread.setName(threadName) val threadMXBean = ManagementFactory.getThreadMXBean val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId) val deserializeStartTime = System.currentTimeMillis() val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime } else 0L Thread.currentThread.setContextClassLoader(replClassLoader) val ser = env.closureSerializer.newInstance() logInfo(s"Running $taskName (TID $taskId)") execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) var taskStartTime: Long = 0 var taskStartCpu: Long = 0 startGCTime = computeTotalGcTime() try { // Must be set before updateDependencies() is called, in case fetching dependencies // requires access to properties contained within (e.g. for access control). Executor.taskDeserializationProps.set(taskDescription.properties) // 这里调用了updateDependencies updateDependencies(taskDescription.addedFiles, taskDescription.addedJars) ...

run方法是谁的,又是谁调用的呢?

class TaskRunner( execBackend: ExecutorBackend, private val taskDescription: TaskDescription) extends Runnable {

是TaskRunner的,这里再结合我在/var/tmp/mesos/slaves/2a06b6b8-3000-40f9-9008-51caaef53399-S1/frameworks/2a06b6b8-3000-40f9-9008-51caaef53399-0000/executors/0/runs/155527cb-0086-46b9-9de5-ff718c092c8b/看到的信息,当executor加入的时候是没有spark-*目录的,当之下第一个task的时候就有了。但是当我执行第二个task(job)的时候并没有日志显示下载信息。 所以就是执行第一个task的时候创建的。

如果要再从代码层面验证,就要深挖TaskRunner和执行Task了。以后有空再挖



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有