Skip to content

SparkExecutorCache causes slowness of RewriteDataFilesSparkAction #11648

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
3 tasks
davseitsev opened this issue Nov 25, 2024 · 5 comments · May be fixed by #12893
Open
3 tasks

SparkExecutorCache causes slowness of RewriteDataFilesSparkAction #11648

davseitsev opened this issue Nov 25, 2024 · 5 comments · May be fixed by #12893
Labels
bug Something isn't working

Comments

@davseitsev
Copy link

Apache Iceberg version

1.7.0 (latest release)

Query engine

Spark

Please describe the bug 🐞

We have a scheduled Spark maintenance jobs which runs all necessary Spark actions to keep our data lake clean and healthy. Recently we have upgraded iceberg from 1.1.0 to 1.7.0 and enabled deletes compaction.
We noticed that data compaction of tables with position deletes is really slow and it blocks almost all other data compaction jobs on the cluster. And here slow means it almost never ends, you can wait a few hours and se zero progress.

Investigation showed that there are some threads on Spark workers which consumes all the CPU called iceberg-delete-worker-pool-%. Here is flame graph:
image
Thread dump example:

"iceberg-delete-worker-pool-0" #190 daemon prio=5 os_prio=0 cpu=1108299.90ms elapsed=7369.57s tid=0x0000aaaaed08d370 nid=0x37a0 runnable  [0x0000ffff2a0da000]
   java.lang.Thread.State: RUNNABLE
	at org.apache.iceberg.types.JavaHashes.hashCode(JavaHashes.java:34)
	at org.apache.iceberg.util.CharSequenceWrapper.hashCode(CharSequenceWrapper.java:85)
	at java.util.HashMap.hash([email protected]/HashMap.java:338)
	at java.util.HashMap.getNode([email protected]/HashMap.java:568)
	at java.util.HashMap.get([email protected]/HashMap.java:556)
	at org.apache.iceberg.util.CharSequenceMap.get(CharSequenceMap.java:92)
	at java.util.Map.computeIfAbsent([email protected]/Map.java:1052)
	at org.apache.iceberg.deletes.Deletes.toPositionIndexes(Deletes.java:152)
	at org.apache.iceberg.data.BaseDeleteLoader.readPosDeletes(BaseDeleteLoader.java:169)
	at org.apache.iceberg.data.BaseDeleteLoader.lambda$getOrReadPosDeletes$3(BaseDeleteLoader.java:160)
	at org.apache.iceberg.data.BaseDeleteLoader$$Lambda$3227/0x0000009002085c10.get(Unknown Source)
	at org.apache.iceberg.spark.SparkExecutorCache.lambda$loadFunc$0(SparkExecutorCache.java:122)
	at org.apache.iceberg.spark.SparkExecutorCache$$Lambda$3230/0x000000900208f190.apply(Unknown Source)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache.lambda$statsAware$0(LocalCache.java:139)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache$$Lambda$3231/0x000000900208f3d8.apply(Unknown Source)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2406)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache$$Lambda$2184/0x0000009001d62478.apply(Unknown Source)
	at java.util.concurrent.ConcurrentHashMap.compute([email protected]/ConcurrentHashMap.java:1916)
	- locked <0x000000068b0009a0> (a java.util.concurrent.ConcurrentHashMap$ReservationNode)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108)
	at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62)
	at org.apache.iceberg.spark.SparkExecutorCache.getOrLoad(SparkExecutorCache.java:114)
	at org.apache.iceberg.spark.source.BaseReader$SparkDeleteFilter$CachingDeleteLoader.getOrLoad(BaseReader.java:297)
	at org.apache.iceberg.data.BaseDeleteLoader.getOrReadPosDeletes(BaseDeleteLoader.java:160)
	at org.apache.iceberg.data.BaseDeleteLoader.lambda$loadPositionDeletes$2(BaseDeleteLoader.java:150)
	at org.apache.iceberg.data.BaseDeleteLoader$$Lambda$3224/0x0000009002084498.apply(Unknown Source)
	at org.apache.iceberg.data.BaseDeleteLoader.lambda$execute$7(BaseDeleteLoader.java:236)
	at org.apache.iceberg.data.BaseDeleteLoader$$Lambda$3226/0x0000009002085408.run(Unknown Source)
	at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
	at org.apache.iceberg.util.Tasks$Builder$1.run(Tasks.java:315)
	at java.util.concurrent.Executors$RunnableAdapter.call([email protected]/Executors.java:539)
	at java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
	at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]/ThreadPoolExecutor.java:1136)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]/ThreadPoolExecutor.java:635)
	at java.lang.Thread.run([email protected]/Thread.java:840)

After some research I figured out that there is a cache of delete files which loads the whole delete file even if you need position deletes only for single data file. As far as I understand in our case the size of the cache is really small and it's constantly evicted. It dramatically slows down reading deletes. When I turned it off spark.sql.iceberg.executor-cache.enabled=false, jobs which ran for a few hours without progress started to finish in about 1 minute.

I don't see any benefit from having this cache for RewriteDataFilesSparkAction and I suggest to disable it by default for this action.

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time
@davseitsev davseitsev added the bug Something isn't working label Nov 25, 2024
@nastra
Copy link
Contributor

nastra commented Nov 25, 2024

@davseitsev I think this is a good topic to bring up on the Dev mailing list to reach a broader audience and facilitate a discussion

@singhpk234
Copy link
Contributor

@davseitsev can you please also try setting to true ?

spark.sql.iceberg.executor-cache.locality.enabled

please ref : #9563 (comment)

Also for this :

As far as I understand in our case the size of the cache is really small and it's constantly evicted

we can increase this via :

public static final String EXECUTOR_CACHE_MAX_TOTAL_SIZE =

Do you have memory profiles / heap dump for it ? 128 MB is decent, what I am a bit skeptical of is it's just that delete from diff partitions landed in same executor so above might help as to process it, as cache hit would minimized.
There are some gotchas for it obviously for ex : #8755 (comment)

as @nastra recommended good discussion on mailing chain.

@aokolnychyi
Copy link
Contributor

aokolnychyi commented Nov 27, 2024

Having as much details as possible about the state of the table would help (e.g. the total file size, the total record count, file-scoped vs partition-scoped deletes, the total number of delete files, the number of delete files per partition).

@aokolnychyi
Copy link
Contributor

Unfortunately, we haven't heard back. That said, I may have a guess. I believe it is related to the connection pool we use for reading deletes. The rewrite action submits multiple actions at the same time and they may use the same connection pool.

Overall, I agree we shouldn't be using the executor cache for this action as each partition is rewritten separately. We need a config to disable executor cache for deletes and the action should automatically set it.

@anuragmantri
Copy link
Contributor

anuragmantri commented Jan 22, 2025

I will work on this. I will create a config for disabling executor cache for deletes. Thanks for the bug report and inputs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
5 participants