Skip to content

[SparkMicroBatchStream] Executors prematurely close I/O client during Spark broadcast cleanup #12858

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
2 of 3 tasks
bk-mz opened this issue Apr 21, 2025 · 6 comments · May be fixed by #12868
Open
2 of 3 tasks

[SparkMicroBatchStream] Executors prematurely close I/O client during Spark broadcast cleanup #12858

bk-mz opened this issue Apr 21, 2025 · 6 comments · May be fixed by #12868
Labels
bug Something isn't working

Comments

@bk-mz
Copy link

bk-mz commented Apr 21, 2025

Apache Iceberg version

1.8.1 (latest release)

Query engine

Spark

Please describe the bug 🐞

Affected Code
In SerializableTableWithSize (and SerializableMetadataTableWithSize), serializationMarker is transient. Deserialized copies on executors see it as null and, when Spark triggers broadcast cleanup, call io().close().

Observed Behavior

  • Stream runs normally for some time.
  • Shortly after the connection reaper logs appear, Spark GC/broadcast cleanup invokes SerializableTableWithSize.close() on an executor.
  • That call shuts down the shared I/O client and the streaming job fails.

Workaround
Disable executor‐side io().close():

@Override
public void close() throws Exception {
  if (serializationMarker == null) {
    LOG.info("Releasing resources");
    // io().close();
  }
  invalidateCache(name());
}

Logs

2025-04-19 12:57:32.801 INFO  org.apache.spark.executor.Executor [Executor task launch worker for task 82.0 in stage 0.0 (TID 82)] - Running task 82.0 in stage 0.0 (TID 82)
2025-04-19 12:57:32.818 DEBUG org.apache.iceberg.spark.source.RowDataReader [Executor task launch worker for task 82.0 in stage 0.0 (TID 82)] - Reading 1 file split(s) for table messaging_prod.messaging_data_platform.mdru_raw_json
2025-04-19 12:57:32.818 DEBUG org.apache.iceberg.spark.source.RowDataReader [Executor task launch worker for task 82.0 in stage 0.0 (TID 82)] - Opening data file s3://mytable/table/data/timestamp_hour=2025-04-18-14/00010-3371252-48d6e634-de53-4d7e-b0f3-5f581c671ee8-0-00001.parquet
2025-04-19 12:57:32.818 DEBUG org.apache.iceberg.aws.s3.S3InputStream [Executor task launch worker for task 82.0 in stage 0.0 (TID 82)] - Seek with new stream for s3://mytable/table/data/timestamp_hour=2025-04-18-14/00010-3371252-48d6e634-de53-4d7e-b0f3-5f581c671ee8-0-00001.parquet to offset 241151296
2025-04-19 12:57:32.820 DEBUG software.amazon.awssdk.thirdparty.org.apache.http.impl.conn.PoolingHttpClientConnectionManager [Executor task launch worker for task 82.0 in stage 0.0 (TID 82)] - Connection request: [route: {s}->https://s3.us-east-1.amazonaws.com:443][total available: 0; route allocated: 2 of 50000; total allocated: 2 of 50000]
2025-04-19 12:57:32.820 DEBUG software.amazon.awssdk.thirdparty.org.apache.http.impl.conn.PoolingHttpClientConnectionManager [Executor task launch worker for task 82.0 in stage 0.0 (TID 82)] - Connection leased: [id: 34][route: {s}->https://s3.us-east-1.amazonaws.com:443][total available: 0; route allocated: 3 of 50000; total allocated: 3 of 50000]
2025-04-19 12:57:32.869 DEBUG software.amazon.awssdk.thirdparty.org.apache.http.impl.conn.PoolingHttpClientConnectionManager [Executor task launch worker for task 82.0 in stage 0.0 (TID 82)] - Connection [id: 34][route: {s}->https://s3.us-east-1.amazonaws.com:443] can be kept alive for 60.0 seconds
2025-04-19 12:57:32.870 DEBUG software.amazon.awssdk.thirdparty.org.apache.http.impl.conn.PoolingHttpClientConnectionManager [Executor task launch worker for task 82.0 in stage 0.0 (TID 82)] - Connection released: [id: 34][route: {s}->https://s3.us-east-1.amazonaws.com:443][total available: 2; route allocated: 4 of 50000; total allocated: 4 of 50000]
2025-04-19 12:57:32.870 DEBUG org.apache.iceberg.aws.s3.S3InputStream [Executor task launch worker for task 82.0 in stage 0.0 (TID 82)] - Seek with new stream for s3://mytable/table/data/timestamp_hour=2025-04-18-14/00010-3371252-48d6e634-de53-4d7e-b0f3-5f581c671ee8-0-00001.parquet to offset 241144615
2025-04-19 12:57:32.871 DEBUG software.amazon.awssdk.thirdparty.org.apache.http.impl.conn.PoolingHttpClientConnectionManager [Executor task launch worker for task 82.0 in stage 0.0 (TID 82)] - Connection request: [route: {s}->https://s3.us-east-1.amazonaws.com:443][total available: 2; route allocated: 4 of 50000; total allocated: 4 of 50000]
2025-04-19 12:57:32.872 DEBUG software.amazon.awssdk.thirdparty.org.apache.http.impl.conn.PoolingHttpClientConnectionManager [Executor task launch worker for task 82.0 in stage 0.0 (TID 82)] - Connection leased: [id: 34][route: {s}->https://s3.us-east-1.amazonaws.com:443][total available: 1; route allocated: 4 of 50000; total allocated: 4 of 50000]
2025-04-19 12:57:32.901 DEBUG org.apache.iceberg.aws.s3.S3InputStream [Executor task launch worker for task 82.0 in stage 0.0 (TID 82)] - Seek with new stream for s3://mytable/table/data/timestamp_hour=2025-04-18-14/00010-3371252-48d6e634-de53-4d7e-b0f3-5f581c671ee8-0-00001.parquet to offset 133086016
2025-04-19 12:57:32.902 DEBUG software.amazon.awssdk.thirdparty.org.apache.http.impl.conn.PoolingHttpClientConnectionManager [Executor task launch worker for task 82.0 in stage 0.0 (TID 82)] - Connection released: [id: 34][route: {s}->https://s3.us-east-1.amazonaws.com:443][total available: 0; route allocated: 4 of 50000; total allocated: 4 of 50000]
2025-04-19 12:57:32.904 DEBUG software.amazon.awssdk.thirdparty.org.apache.http.impl.conn.PoolingHttpClientConnectionManager [Executor task launch worker for task 82.0 in stage 0.0 (TID 82)] - Connection request: [route: {s}->https://s3.us-east-1.amazonaws.com:443][total available: 0; route allocated: 4 of 50000; total allocated: 4 of 50000]
2025-04-19 12:57:32.904 DEBUG software.amazon.awssdk.thirdparty.org.apache.http.impl.conn.PoolingHttpClientConnectionManager [Executor task launch worker for task 82.0 in stage 0.0 (TID 82)] - Connection leased: [id: 39][route: {s}->https://s3.us-east-1.amazonaws.com:443][total available: 0; route allocated: 5 of 50000; total allocated: 5 of 50000]
2025-04-19 12:57:34.456 INFO  org.apache.hadoop.io.compress.CodecPool [Executor task launch worker for task 82.0 in stage 0.0 (TID 82)] - Got brand-new decompressor [.zstd]
2025-04-19 12:57:38.914 DEBUG software.amazon.awssdk.thirdparty.org.apache.http.impl.conn.PoolingHttpClientConnectionManager [Executor task launch worker for task 82.0 in stage 0.0 (TID 82)] - Connection released: [id: 39][route: {s}->https://s3.us-east-1.amazonaws.com:443][total available: 0; route allocated: 14 of 50000; total allocated: 14 of 50000]
2025-04-19 12:57:38.917 INFO  org.apache.spark.storage.memory.MemoryStore [Executor task launch worker for task 82.0 in stage 0.0 (TID 82)] - Block rdd_6_82 stored as values in memory (estimated size 1469.6 MiB, free 22.1 GiB)
2025-04-19 12:57:40.183 INFO  org.apache.spark.storage.memory.MemoryStore [Executor task launch worker for task 82.0 in stage 0.0 (TID 82)] - 5 blocks selected for dropping (1485.0 MiB bytes)
2025-04-19 12:57:40.188 INFO  org.apache.spark.storage.BlockManager [Executor task launch worker for task 82.0 in stage 0.0 (TID 82)] - Dropping block broadcast_2 from memory
2025-04-19 12:57:40.189 INFO  org.apache.spark.storage.BlockManager [Executor task launch worker for task 82.0 in stage 0.0 (TID 82)] - Writing block broadcast_2 to disk
2025-04-19 12:57:40.427 INFO  org.apache.spark.storage.BlockManager [Executor task launch worker for task 82.0 in stage 0.0 (TID 82)] - Dropping block broadcast_2_piece0 from memory
2025-04-19 12:57:40.427 INFO  org.apache.spark.storage.BlockManager [Executor task launch worker for task 82.0 in stage 0.0 (TID 82)] - Writing block broadcast_2_piece0 to disk
2025-04-19 12:57:40.431 INFO  org.apache.spark.storage.BlockManager [Executor task launch worker for task 82.0 in stage 0.0 (TID 82)] - Dropping block broadcast_1 from memory
2025-04-19 12:57:40.431 INFO  org.apache.spark.storage.BlockManager [Executor task launch worker for task 82.0 in stage 0.0 (TID 82)] - Writing block broadcast_1 to disk
👇 SUDDENLY 👇
2025-04-19 12:57:40.433 INFO  org.apache.iceberg.spark.source.SerializableTableWithSize [Executor task launch worker for task 82.0 in stage 0.0 (TID 82)] - Releasing resources
2025-04-19 12:57:40.433 DEBUG software.amazon.awssdk.thirdparty.org.apache.http.impl.conn.PoolingHttpClientConnectionManager [Executor task launch worker for task 82.0 in stage 0.0 (TID 82)] - Connection manager is shutting down
2025-04-19 12:57:40.550 DEBUG software.amazon.awssdk.thirdparty.org.apache.http.impl.conn.PoolingHttpClientConnectionManager [Executor task launch worker for task 82.0 in stage 0.0 (TID 82)] - Connection manager shut down
👆 SUDDENLY 👆
2025-04-19 12:57:40.551 INFO  org.apache.spark.storage.BlockManager [Executor task launch worker for task 82.0 in stage 0.0 (TID 82)] - Dropping block broadcast_1_piece0 from memory
2025-04-19 12:57:40.551 INFO  org.apache.spark.storage.BlockManager [Executor task launch worker for task 82.0 in stage 0.0 (TID 82)] - Writing block broadcast_1_piece0 to disk
2025-04-19 12:57:40.553 INFO  org.apache.spark.storage.BlockManager [Executor task launch worker for task 82.0 in stage 0.0 (TID 82)] - Dropping block rdd_6_25 from memory
2025-04-19 12:57:40.553 INFO  org.apache.spark.storage.BlockManager [Executor task launch worker for task 82.0 in stage 0.0 (TID 82)] - Writing block rdd_6_25 to disk
2025-04-19 12:57:41.370 INFO  org.apache.spark.storage.memory.MemoryStore [Executor task launch worker for task 82.0 in stage 0.0 (TID 82)] - After dropping 5 blocks, free memory is 15.2 GiB
2025-04-19 12:57:51.504 INFO  org.apache.spark.executor.Executor [dispatcher-Executor] - Executor is trying to kill task 82.0 in stage 0.0 (TID 82), reason: Stage cancelled: Job aborted due to stage failure: Task 96 in stage 0.0 failed 4 times, most recent failure: Lost task 96.3 in stage 0.0 (TID 102) (ip-172-31-7-100.ec2.internal executor 5): java.lang.IllegalStateException: Connection pool shut down
2025-04-19 12:57:51.511 INFO  org.apache.spark.executor.Executor [Executor task launch worker for task 82.0 in stage 0.0 (TID 82)] - Executor interrupted and killed task 82.0 in stage 0.0 (TID 82), reason: Stage cancelled: Job aborted due to stage failure: Task 96 in stage 0.0 failed 4 times, most recent failure: Lost task 96.3 in stage 0.0 (TID 102) (ip-172-31-7-100.ec2.internal executor 5): java.lang.IllegalStateException: Connection pool shut down

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time
@bk-mz bk-mz added the bug Something isn't working label Apr 21, 2025
@bk-mz bk-mz changed the title Streaming job terminates after HTTP connection reaper triggers premature I/O close [SparkMicroBatchStream] Executors prematurely close I/O client during Spark broadcast cleanup Apr 21, 2025
@singhpk234
Copy link
Contributor

@bk-mz is this issue happening for SparkMicroBatchStream or anything using SerializableTableWithSize ?

@bk-mz
Copy link
Author

bk-mz commented Apr 21, 2025

@singhpk234 Hey.

This happens now only in iceberg->iceberg and only during high memory usage and high load. I.e. small or moderate loads does not cause it, or maybe i wasn't able to find exact condition that triggers that closable.

So when it triggers, all is dead.

Note: we got a decent number of iceberg applications, most of them kafka->iceberg streaming. There's no issue there, even with high-load.

@singhpk234
Copy link
Contributor

I see this was ideally intended to be called when done processing at executor end, If GC is triggering this we need a way to protect this from these case if we are still need it (5a98aef) is there a way to ensure that the io() is eventually closed ?

That call shuts down the shared I/O client and the streaming job fails.

Is this share between tasks or executors ? do we need to implement reference count ? before we called close of IO ?

@bk-mz
Copy link
Author

bk-mz commented Apr 21, 2025

Is this share between tasks

b/w tasks.

do we need to implement reference count ? before we called close of IO ?

TBH I dunno -> the matter looks quite complicated to have simple solution. Just commenting io.close works and it seems there's no distinctive memory leak.

Though yeah, just commenting this out is not a proper solution.

@xiaoxuandev
Copy link
Contributor

Hi @singhpk234, after looking into the code, I believe the issue is with the underlying HTTP connection pool being shared. This #12868 should fix the problem. Could you help take a look? Thanks!

@bk-mz
Copy link
Author

bk-mz commented Apr 23, 2025

@xiaoxuandev correct, this for sure will fix it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants