草庐IT

spark报错:Cannot overwrite a path that is also being read from.

cclovezbf 2023-07-09 原文

Cannot overwrite a path that is also being read from.

这个错看起来很简单。代码简化为

Dataset<Row> selectBefore = session.sql("select * from table1") //表里原先的数据

Dataset<Row> dataset = session.createDataset(list,xx.class)//新增加的数据csv txt kafka

大概就是获取表里的原始数据,然后从别的地方搞来的新数据两个合起来继续存到表里去

selectBefore 
.union(dataset) --两个数据union融合
.write()
.mode(SaveMode.Overwrite) --重新写到hive
.format("hive")
.insertInto(table1);

为啥不用append 因为有时候会重复调用。。 反正就是这么个情况。就是要先查再插入。

为什么报错?

Spark SQL在雪球的实践 - 腾讯云开发者社区-腾讯云

 解决办法

第一个解决办法真不行。我查了下这两个参数大多是解决spark读取hive表数据量不对的情况用的。而且我设置之后还有报错就不贴出来了

spark sql读取不到orc格式hive表数据问题_Java小田的博客-CSDN博客_spark读取不到hive表

第二个 确实可以。

session.sparkContext().setCheckpointDir("/tmp/spark/job/OrderOnlineSparkJob");

Dataset<Row> selectBefore = session.sql("select * from table1").checkpoint();

 第三个这种lowb方法就不说了

找个时间好好学习下spark的checkpoint知识。 

备注下:这种方式有个弊端,checkpoint 在hdfs的目录不是一定会删除的,经过百度,说到GC的时候才会删除,还有什么弱引用。给两个解决办法

1..config("spark.cleaner.referenceTracking.cleanCheckpoints", "true") 这个就是checkpoint的清理线程去清理,但是不一定完全有用,建议大家可以试下System.gc(); 手动触发下。

2.搞个脚本跑完spark后 hdfs dfs -rm -r /checkpoint/*

___________________________________________________________________________

2022-11-15更新。为什么checkpoint一下就可以了?

我们稍微百度下,spark.checkpoint的作用?

spark checkpoint详解 - 超级核弹头 - 博客园

简单来说,

1.截断血缘关系,避免rdd从头开始继续计算,解决链路过长的问题

2.上面说的云里雾里,那就看这里,就是把数据存到了hdfs的目录。可以从磁盘文件去恢复数据

那为啥没有checkpoint就不能插入读取的表?

之前看到的一篇文章说了,spark的逻辑是没有临时目录,hive是有临时目录的。

所以这里就很清楚了

没有checkpoint= 读自己目录,写自己目录  报错

有了checkpoint=读hdfs的checkpoint dir,写自己目录,所以是可以的

——————————————————————————————————————————

稍微分析下源码

 上图中internalRdd是我们查出来的dataset然后toRdd 在copy,简单来说就是复制了一份,

reliableCheckpoint默认是true。 这个一直是true,应该就是你的checkpoint是否可靠,比如存到hdfs就可靠,你存到磁盘肯定不可靠。。。

接着看 

internalRdd.checkpoint()和 internalRdd.count()

 上图看你的checkpointDir 设置没有,没有就报错

最后new ReliableRDDCheckpointData(this)->new ReliableRDDCheckpointData(internalRdd)

我们再看ReliableRDDCheckpointData 类

 这里只是new了这个对象。并没有存储rdd

实际是在后面的action后调用该doCheckpoint方法

Materialize this RDD and write its content to a reliable DFS. This is called immediately after the first action invoked on this RDD has completed.

将rdd物化也就是存储,将它的内容写到可靠的dfs上,这个在第一个action执行后会被立马调用!

 val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)

 这里可以看到说明是啥,不是把原来的rdd返回了,是返回了一个新的rdd!!!!所以我们这个时候使用的rdd不是我们spark读取的rdd了。

这个时候我们再看下我之前提到的问题,spark.cleaner.referenceTracking.cleanCheckpoints这个参数有什么用?

这里设置了true,看代码不就是把rdd的cleaner遍历,然后去清理checkpoint目录吗?为什么有时候不起作用?(我跑了十次大概清理了1-2次,最后hdfs checkpoint目录大概有1G的大小,如果数据特别大,这个空间不就浪费了?)

继续研究下,这个cleaner是啥?

  可以看到 spark.cleaner.referenceTracking 这个设置为true了会new一个 默认为true。然后foreach start。

 

 这个代码就有意思了。。。。

cleaningThread.setDaemon(true)//设置为守护线程,主线程没了它也没了
cleaningThread.setName("Spark Context Cleaner")//加个名字不重要。
cleaningThread.start() //start 方法 肯定要看
periodicGCService.scheduleAtFixedRate(new Runnable { 
  override def run(): Unit = System.gc()
}, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)

//这个搞了一个定时调度的一个东西,仔细看这个是啥 

periodicGCInterval=30min periodicGCInterval=30min

意思是啥?就是每隔30min来次System.GC来清除弱引用。这就是为什么有时候checkpoint会自己删 有时候不删了。

这段代码主要两个是

1.开启线程清理

2.开启定时GC

cleaningThread是什么呢?

private val cleaningThread = new Thread() { override def run() { keepCleaning() }}

cleaningThread.start()就是keepCleaning对吧。

//开启清理线程,清理没有引用的checkpoint boradcast rdd
//怎么清理的?referenceQueue这个引用队列里获取的。
/** Keep cleaning RDD, shuffle, and broadcast state. */
private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
  while (!stopped) {//差不多就是while(true)了
    try {
      val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
        .map(_.asInstanceOf[CleanupTaskWeakReference])//原来是这里!!!
      // Synchronize here to avoid being interrupted on stop()
      synchronized {
        reference.foreach { ref =>
          logDebug("Got cleaning task " + ref.task)
          referenceBuffer.remove(ref)
          ref.task match {
            case CleanRDD(rddId) =>
              doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
            case CleanShuffle(shuffleId) =>
              doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
            case CleanBroadcast(broadcastId) =>
              doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
            case CleanAccum(accId) =>
              doCleanupAccum(accId, blocking = blockOnCleanupTasks)
            case CleanCheckpoint(rddId) => //!!!!清理checkpoint目录
              doCleanCheckpoint(rddId)
          }
        }
      }
    } catch {
      case ie: InterruptedException if stopped => // ignore
      case e: Exception => logError("Error in cleaning thread", e)
    }
  }
}

 ——————————————————————————————————

/**
 * Clean up checkpoint files written to a reliable storage.
 * Locally checkpointed files are cleaned up separately through RDD cleanups.
 */
def doCleanCheckpoint(rddId: Int): Unit = {
  try {
    logDebug("Cleaning rdd checkpoint data " + rddId)
    ReliableRDDCheckpointData.cleanCheckpoint(sc, rddId)
    listeners.asScala.foreach(_.checkpointCleaned(rddId))
    logInfo("Cleaned rdd checkpoint data " + rddId)
  }
  catch {
    case e: Exception => logError("Error cleaning rdd checkpoint data " + rddId, e)
  }
}

———————————————清理hdfs目录———————————————

/** Clean up the files associated with the checkpoint data for this RDD. */
def cleanCheckpoint(sc: SparkContext, rddId: Int): Unit = {
  checkpointPath(sc, rddId).foreach { path =>
    path.getFileSystem(sc.hadoopConfiguration).delete(path, true)
  }
}

看着好像没问题为啥没有清理呢? 能力有限查不出来。。。但是可以借鉴别人的,别人比我写的好多了。

Spark ContextCleaner及checkpoint的clean机制分析 - 知乎

———————————————继续学习弱引用————————————————————

不清楚弱引用的可以看这个文章

Java中弱引用的概念和作用是什么 - 编程语言 - 亿速云

之前的文章分析到 checkpoint目录没有被删除是因为弱引用的问题。弱引用假装已经很熟练了。那么继续源码分析。

ContextCleaner类

//搞了一个引用队列,存放引用对象

private val referenceQueue = new ReferenceQueue[AnyRef]
/** Register an RDD for cleanup when it is garbage collected. */
def registerRDDForCleanup(rdd: RDD[_]): Unit = {
  registerForCleanup(rdd, CleanRDD(rdd.id))
}

def registerAccumulatorForCleanup(a: AccumulatorV2[_, _]): Unit = {
  registerForCleanup(a, CleanAccum(a.id))
}

/** Register a ShuffleDependency for cleanup when it is garbage collected. */
def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _, _]): Unit = {
  registerForCleanup(shuffleDependency, CleanShuffle(shuffleDependency.shuffleId))
}

/** Register a Broadcast for cleanup when it is garbage collected. */
def registerBroadcastForCleanup[T](broadcast: Broadcast[T]): Unit = {
  registerForCleanup(broadcast, CleanBroadcast(broadcast.id))
}

/** Register a RDDCheckpointData for cleanup when it is garbage collected. */
def registerRDDCheckpointDataForCleanup[T](rdd: RDD[_], parentId: Int): Unit = {
  registerForCleanup(rdd, CleanCheckpoint(parentId))
}

/** Register an object for cleanup. */
private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = {
  referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue))
}

这里多复制点。可以看到acc rdd RDDCheckpointData 等都是调用的registerForCleanup这个方法。最后都是变成了 new CleanupTaskWeakReference(task,acc/rdd/checkpoint,引用队列)

CleanupTaskWeakReference

/**
 * A WeakReference associated with a CleanupTask.
 *
 * When the referent object becomes only weakly reachable, the corresponding
 * CleanupTaskWeakReference is automatically added to the given reference queue.
 */
private class CleanupTaskWeakReference(
    val task: CleanupTask,
    referent: AnyRef,
    referenceQueue: ReferenceQueue[AnyRef])
  extends WeakReference(referent, referenceQueue)

 至此我们分析下checkpoint的流程。

rdd.checkpoint()->

new ReliableRDDCheckpointData(this)

ReliableRDDCheckpointData.checkpointPath(rdd.context, rdd.id)//确定checkpoint路径

spark.cleaner.referenceTracking.cleanCheckpoints=TURE会将当前rddCheckpoint放到引用队列里,

最开始我们spark代码里new SparkContext的时候

会根据spark.cleaner.referenceTracking=true去 new ContextCleaner()然后调用这个的start方法

而这个ContextCleaner的start方法开启了两个线程

一个不停的看队列里是否有需要清除弱引用对象

一个定时去GC

经过我测试 弱引用对象什么时候会被清理?

参考文章

referenceQueue用法_gmHappy的博客-CSDN博客_referencequeue

    private void test2() throws InterruptedException {
        ReferenceQueue<Object> referenceQueue = new ReferenceQueue<>();
        Object value = new Object();
        Map<Object, Object> map = new HashMap<>();
        for (int i = 0; i < 10; i++) {
            WeakReference<String> weakReference = new WeakReference<String>(String.valueOf(i), referenceQueue);
            System.out.println("创造了:"+weakReference+",value="+weakReference.get());
            map.put(weakReference, value);
        }
        System.gc();
        Thread.sleep(100);
        WeakReference<String> result=null;
        int cnt = 0;
        while ((result=(WeakReference)referenceQueue.poll())!=null){
            System.out.println((cnt++) + "回收了:" + result+",value="+result.get());
        }
    }
-----------
创造了:java.lang.ref.WeakReference@383534aa,value=0
创造了:java.lang.ref.WeakReference@6bc168e5,value=1
创造了:java.lang.ref.WeakReference@7b3300e5,value=2
创造了:java.lang.ref.WeakReference@2e5c649,value=3
创造了:java.lang.ref.WeakReference@136432db,value=4
0回收了:java.lang.ref.WeakReference@6bc168e5,value=null
1回收了:java.lang.ref.WeakReference@2e5c649,value=null
2回收了:java.lang.ref.WeakReference@7b3300e5,value=null
3回收了:java.lang.ref.WeakReference@136432db,value=null
4回收了:java.lang.ref.WeakReference@383534aa,value=null
    private void testBigObjectWithoutGC() throws InterruptedException {
        int _1M = 1024 * 1024;

        ReferenceQueue<Object> referenceQueue = new ReferenceQueue<>();
        Thread thread = new Thread(() -> {
            try {
                int cnt = 0;
                WeakReference<byte[]> k;
                while ((k = (WeakReference) referenceQueue.remove()) != null) {
                    System.out.println((cnt++) + "回收了:" + k);
                }
            } catch (InterruptedException e) {
                // 结束循环
            }
        });
        thread.setDaemon(true);
        thread.start();

        Object value = new Object();
        Map<Object, Object> map = new HashMap<>();
        for (int i = 0; i < 10000; i++) {
            byte[] bytes = new byte[_1M];
            WeakReference<byte[]> weakReference = new WeakReference<byte[]>(bytes, referenceQueue);
            map.put(weakReference, value);
        }
        System.out.println("map.size->" + map.size());
    }
--------
8342回收了:java.lang.ref.WeakReference@67a3bd51
8343回收了:java.lang.ref.WeakReference@2cec704c
8344回收了:java.lang.ref.WeakReference@bd1111a
8345回收了:java.lang.ref.WeakReference@5918c260
8346回收了:java.lang.ref.WeakReference@7fc7c4a
8347回收了:java.lang.ref.WeakReference@9d3c67
注意这里没有了。没有回收10000个对象 只回收了8000多个。。有时候9000多。

经过测试发现

需要使用

System.gc(); //触发GC 但是GC没有真正开始

Thread.sleep(1000);//给个时间让GC去执行。

或者

你队列里放的对象都很大,并且数量也不少 这个时候会主动触发GC

这样弱引用对象就会被清除了。。。所以我们之前如何清理checkpoint目录呢?

spark.cleaner.referenceTracking.cleanCheckpoints true

System.gc();+ Thread.sleep(1000); 也不会特别影响spark的任务。我就懒得测试了。。。各位成功了留个言让我知道下。

有关spark报错:Cannot overwrite a path that is also being read from.的更多相关文章

  1. 深度学习部署:Windows安装pycocotools报错解决方法 - 2

    深度学习部署:Windows安装pycocotools报错解决方法1.pycocotools库的简介2.pycocotools安装的坑3.解决办法更多Ai资讯:公主号AiCharm本系列是作者在跑一些深度学习实例时,遇到的各种各样的问题及解决办法,希望能够帮助到大家。ERROR:Commanderroredoutwithexitstatus1:'D:\Anaconda3\python.exe'-u-c'importsys,setuptools,tokenize;sys.argv[0]='"'"'C:\\Users\\46653\\AppData\\Local\\Temp\\pip-instal

  2. ruby-on-rails - 报错 - 在 Snow Leopard 上安装 RVM - 2

    我正在尝试在我的SnowLeopard10.6.8上安装RVM,方法是:\curl-Lhttps://get.rvm.io|bash-sstable--ruby我得到这个错误:InstallingRubyfromsourceto:/Users/Villa/.rvm/rubies/ruby-2.0.0-p0,thismaytakeawhiledependingonyourcpu(s)...ruby-2.0.0-p0-#downloadingruby-2.0.0-p0,thismaytakeawhiledependingonyourconnection...ruby-2.0.0-p0-#e

  3. Unity 报错No ‘git‘ executable was found. Please install Git on your system then restart - 2

    亲测可用。Anerroroccurredwhileresolvingpackages:Projecthasinvaliddependencies: com.unity.xxx:No'git'executablewasfound.PleaseinstallGitonyour  systemthenrestartUnityandUnityHub在我们使用PackageManager时,Unity允许我们使用Git上的package(点击加号,选择addpackagefromgitURL,或者是直接在Asset/Packages/manifest.json中添加包名)。但是这种操作需要我们事先装好g

  4. ruby-on-rails - Rails/Ruby创建数据库报错: Unable to load the EventMachine C extension - 2

    更新:eventmachinegem已安装并在我的gemfile中:eventmachine(1.0.0,0.12.10)请帮忙!尝试使用以下内容创建数据库:Fitzs-MacBook-Pro:twilio_insanityFitz$rakedb:create'返回以下错误:UnabletoloadtheEventMachineCextension;Tousethepure-rubyreactor,require'em/pure_ruby'rakeaborted!cannotloadsuchfile--rubyeventmachine/Users/Fitz/.rvm/gems/ruby

  5. nginx配置https后报错nginx: [emerg] https protocol requires SSL support in XXX.conf详细解决方法 - 2

    一、前言最近,在测试环境的nginx里增加了一个https配置:location/api-meeting-qq/{proxy_passhttps://api.meeting.qq.com/;}然后,执行命令://这个是nginx启动文件的路径,根据实际情况自行更改sudo/home/useradmin/nginx/sbin/nginx-sreload结果,nginx就报错了:nginx:[emerg]httpsprotocolrequiresSSLsupportin/home/useradmin/nginx/conf.d/trainNginx.conf:9二、解决方法百度发现,是之前安装ngi

  6. Spark的常用SQL日期函数 - 2

    一、获取当前时间1、current_date当前日期(年月日)Examples:SELECTcurrent_date;2、current_timestamp/now()当前日期(时间戳)Examples:SELECTcurrent_timestamp;二、从日期字段中提取时间1、year,month,day/dayofmonth,hour,minute,secondExamples:SELECTyear(now());其他的日期函数以此类推month:1day:12(当月的第几天)dayofmonth:12hour,minute,second:分别对应时分秒2、dayofweek、dayofm

  7. nvm报错Now using node v版本号 (64-bit)解决方法 - 2

    nvm报错Nowusingnodev版本号(64-bit)解决方法先上报错(安装后的一些问题请直接跳到尾部查看)安装NVM的原因是使用React时addreact-redux时提示我node版本问题,遂打算安装一Node版本管理工具因为我电脑上很早就安装了Node,安装NVM时提示我是否覆盖并管理本地已有版本,我选了Yes之后安装成功(后来检查发现和版本没关系,是因为我在node里去ADD真离谱自己这操作)安装NVM注意问题1.若修改安装路径一定补上nodejs2.打开安装文件位置3.增加以下映射node_mirror:npm.taobao.org/mirrors/node/npm_mirro

  8. git push报错:fatal: Authentication failed for ‘https://github.com/... - 2

    第一次用git传代码到GitHub时,填写用户名和密码出现报错:fatal:Authenticationfailedfor'https://github.com/试了下面的没用😢gitconfig-–globaluser.name"xxx"gitconfig--globaluser.email"xxx@xx.com"查看报错原因发现是因为git更新了认证方式在错误提示(糟糕忘截图)的网站里有说明-->https://docs.github.com/en/get-started/getting-started-with-git/about-remote-repositories#cloning-

  9. ruby - 在 Ruby 中,为什么在启动 irb 之后出现 foo.nil?说未定义的错误,@foo.nil?给出 "true"和 @@wah.nil?又报错了? - 2

    在Ruby1.8.7和1.9.2中相同:$irbruby-1.8.7-p302>foo.nil?NameError:undefinedlocalvariableormethod`foo'for#from(irb):1ruby-1.8.7-p302>@bar.nil?=>trueruby-1.8.7-p302>@@wah.nil?NameError:uninitializedclassvariable@@wahinObjectfrom(irb):3为什么实例变量与局部变量和类变量的处理方式不同? 最佳答案 在Ruby中,大多数未初始化

  10. Fiddler手机抓包网络报错解决办法 - 2

    首先打开fiddler,点击Tools-Options-Connections一、这里有两个注意点点击HTTPS,左边选项选择如图,右边Actions点击如图第二项会提示Success,点击确定点击Connections,这里注意Fiddlerlistensonport这里面填写默认8888即可,左边三个选项选择如图,以上操作完成后,重启Fiddler二、手机打开WiFi1.长按或者点击Wifi进行修改网络(如不会操作,此处根据具体机型自行百度)修改代理为手动,服务器主机名两种方式可以查到:①win+R,在输入框输入cmd,在弹窗中输入ipconfig,此时IPv4后面的地址就是你的主机ip,

随机推荐