Spark Accumlator 解析
问题:
- Accumlator 的更新粒度是 Task 级别,还是什么粒度?
- Task 失败时的累加器信息,是否仍会更新,出现重复的情况?
- Action 中的累加器如何保证只执行一次?
- Transform 中的累加器是否有办法解决重算时多次更新的问题?
大家好,我是 xliuqq.
问题:
在 Github 上发布 Release 时,默认只会发布源码的zip包,但是有时候会想要将一些编译出来的文件也放在 Release 中。虽然可以在本地编译,然后通过界面手动上传。但是可以基于 Github Action 实现自动发布。
在开发通过包/类/方法注解自动生成README的功能时,在使用时,如果使用 Jar 依赖的方式,需要为每个工程定义一个 Main 方法,并且需要手动调用。
因此想通过开发Maven 插件,在compile阶段自动调用并生成 README,因此便研究如何开发 Maven 插件。
https://community.cloudera.com/t5/Community-Articles/Starting-Spark-jobs-directly-via-YARN-REST-API/ta-p/245998 这篇文章是Spark 1.6, 已经过时,只能作为基本的参考,具体还是要阅读Spark on Yarn的提交代码。
- 本文基于 Spark 2.4.8 和 Hadoop 3.2 进行验证。
将 Spark 作业提交到 Yarn上时,只能通过命令行 spark-submit 进行操作,本文通过解析 spark-submit 的源码,探究如何使用 Yarn Rest API 进行提交 Spark 作业(仅 cluster 模式,因 client 模式 driver 运行在 client 中而不是 AM 中)。
一句话总结:还是用命令行调用spark-submit
!
在开发CRD时,定义 controller
的时候,会看到如下代码
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
// 是否进行 leader 选举
LeaderElection: enableLeaderElection,
// Namespace and name
LeaderElectionNamespace: leaderElectionNamespace,
LeaderElectionID: "alluxio.data.fluid.io",
// ...
})
对于有状态组件来说,实现高可用一般来说通过选主来达到同一时刻只能有一个组件在处理业务逻辑。
这里,会比较好奇这个选举是如何实现的,接下来的内容便从源码的角度进行解读。
当自己开发一个工具包,然后另一个项目要引用时,因此需要将 jar 包放到可访问的公网上:
Maven 官方 repo,如使用 Sonatype OSSRH,但其注册复杂,此次不进行介绍;
第三方公共仓库,如 JitPack, Github/Gitee等,后面进行介绍。
MPI Operator 在执行 MPI 作业时,通过 SSH 的方式,因此需要在不同的 MPI Workers Pod间配置免密。
本文通过源码分析,探究如何在 K8s Pod 间配置免密。
在 k8s 中挂载 configmap 时,默认情况下,会以符号链接的形式存在。
在某些场景下,如 Pod 挂载 .ssh
进行免密时,由于.ssh
的特殊权限,因此不能以符号链接的形式存在,否则不能 ssh 免密。
此时,可以使用 subpath
进行挂载。
当系统与 Yarn 集成时,一般会通过 YarnClient / AdminProtocol 以及 Restful 接口等方式跟 Yarn 通信。
那么,当系统在进行单元测试时,就需要对 Yarn 进行 Mock,来验证系统的正确性。
Yarn 提供了 MiniYarnCluster 来建立内存级的集群进行测试,但其也有一些局限性。
当磁盘满负荷时,希望能够降低读写的速率,避免 HDFS 进程卡住,整个HDFS 不可用,导致 Client Socket 异常,作业失败。
当前(2023.12.18,HDFS 3.4 版本):
dfs.client.congestion.backoff.mean.time
、dfs.client.congestion.backoff.max.time
控制写入拥塞时 Client 的等待时间,用dfs.pipeline.congestion.ratio
来控制 DataNode 被判断阻塞时的跟CPU核数的比率。