Skip to content

Commit f22a3e7

Browse files
committed
CXF-7396: CachedOutputStream doesn't delete temp files
1 parent e094bb4 commit f22a3e7

File tree

10 files changed

+466
-4
lines changed

10 files changed

+466
-4
lines changed

core/pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,11 @@
175175
<artifactId>saaj-impl</artifactId>
176176
<scope>test</scope>
177177
</dependency>
178+
<dependency>
179+
<groupId>org.awaitility</groupId>
180+
<artifactId>awaitility</artifactId>
181+
<scope>test</scope>
182+
</dependency>
178183
</dependencies>
179184
<build>
180185
<plugins>

core/src/main/java/org/apache/cxf/bus/extension/ExtensionManagerBus.java

+1
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ public InputStream getAsStream(String name) {
141141
if (null == this.getExtension(BindingFactoryManager.class)) {
142142
new BindingFactoryManagerImpl(this);
143143
}
144+
144145
extensionManager.load(new String[] {ExtensionManagerImpl.BUS_EXTENSION_RESOURCE});
145146
extensionManager.activateAllByType(ResourceResolver.class);
146147

core/src/main/java/org/apache/cxf/io/CachedConstants.java

+8
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,14 @@ public final class CachedConstants {
7171
public static final String CIPHER_TRANSFORMATION_BUS_PROP =
7272
"bus.io.CachedOutputStream.CipherTransformation";
7373

74+
/**
75+
* The delay (in ms) for cleaning up unclosed {@code CachedOutputStream} instances. 30 minutes
76+
* is specified by default. If the value of the delay is set to 0 (or is negative), the cleaner
77+
* will be disabled.
78+
*/
79+
public static final String CLEANER_DELAY_BUS_PROP =
80+
"bus.io.CachedOutputStreamCleaner.Delay";
81+
7482
private CachedConstants() {
7583
// complete
7684
}

core/src/main/java/org/apache/cxf/io/CachedOutputStream.java

+21-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.io.BufferedOutputStream;
2323
import java.io.ByteArrayInputStream;
2424
import java.io.ByteArrayOutputStream;
25+
import java.io.Closeable;
2526
import java.io.File;
2627
import java.io.FileInputStream;
2728
import java.io.FileNotFoundException;
@@ -93,6 +94,7 @@ public class CachedOutputStream extends OutputStream {
9394
private List<CachedOutputStreamCallback> callbacks;
9495

9596
private List<Object> streamList = new ArrayList<>();
97+
private CachedOutputStreamCleaner cachedOutputStreamCleaner;
9698

9799
public CachedOutputStream() {
98100
this(defaultThreshold);
@@ -127,6 +129,8 @@ private void readBusProperties() {
127129
outputDir = f;
128130
}
129131
}
132+
133+
cachedOutputStreamCleaner = b.getExtension(CachedOutputStreamCleaner.class);
130134
}
131135
}
132136

@@ -279,6 +283,9 @@ public void resetOut(OutputStream out, boolean copyOldContent) throws IOExceptio
279283
}
280284
} finally {
281285
streamList.remove(currentStream);
286+
if (cachedOutputStreamCleaner != null) {
287+
cachedOutputStreamCleaner.unregister(currentStream);
288+
}
282289
deleteTempFile();
283290
inmem = true;
284291
}
@@ -481,6 +488,9 @@ private void createFileOutputStream() throws IOException {
481488
bout.writeTo(currentStream);
482489
inmem = false;
483490
streamList.add(currentStream);
491+
if (cachedOutputStreamCleaner != null) {
492+
cachedOutputStreamCleaner.register(this);
493+
}
484494
} catch (Exception ex) {
485495
//Could be IOException or SecurityException or other issues.
486496
//Don't care what, just keep it in memory.
@@ -512,6 +522,10 @@ public InputStream getInputStream() throws IOException {
512522
try {
513523
InputStream fileInputStream = new TransferableFileInputStream(tempFile);
514524
streamList.add(fileInputStream);
525+
if (cachedOutputStreamCleaner != null) {
526+
cachedOutputStreamCleaner.register(fileInputStream);
527+
}
528+
515529
if (cipherTransformation != null) {
516530
fileInputStream = new CipherInputStream(fileInputStream, ciphers.getDecryptor()) {
517531
boolean closed;
@@ -537,7 +551,7 @@ private synchronized void deleteTempFile() {
537551
FileUtils.delete(file);
538552
}
539553
}
540-
private boolean maybeDeleteTempFile(Object stream) {
554+
private boolean maybeDeleteTempFile(Closeable stream) {
541555
boolean postClosedInvoked = false;
542556
streamList.remove(stream);
543557
if (!inmem && tempFile != null && streamList.isEmpty() && allowDeleteOfFile) {
@@ -549,6 +563,9 @@ private boolean maybeDeleteTempFile(Object stream) {
549563
//ignore
550564
}
551565
postClosedInvoked = true;
566+
if (cachedOutputStreamCleaner != null) {
567+
cachedOutputStreamCleaner.unregister(this);
568+
}
552569
}
553570
deleteTempFile();
554571
currentStream = new LoadingByteArrayOutputStream(1024);
@@ -665,6 +682,9 @@ public void close() throws IOException {
665682
if (!closed) {
666683
super.close();
667684
maybeDeleteTempFile(this);
685+
if (cachedOutputStreamCleaner != null) {
686+
cachedOutputStreamCleaner.unregister(this);
687+
}
668688
}
669689
closed = true;
670690
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.cxf.io;
21+
22+
import java.io.Closeable;
23+
24+
/**
25+
* The {@link Bus} extension to clean up unclosed {@link CachedOutputStream} instances (and alike) backed by
26+
* temporary files (leading to disk fill, see https://issues.apache.org/jira/browse/CXF-7396.
27+
*/
28+
public interface CachedOutputStreamCleaner {
29+
/**
30+
* Run the clean up
31+
*/
32+
void clean();
33+
34+
/**
35+
* Register the stream instance for the clean up
36+
*/
37+
void unregister(Closeable closeable);
38+
39+
/**
40+
* Unregister the stream instance from the clean up (closed properly)
41+
*/
42+
void register(Closeable closeable);
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.cxf.io;
21+
22+
import java.io.Closeable;
23+
import java.io.IOException;
24+
import java.util.ArrayList;
25+
import java.util.Collection;
26+
import java.util.Iterator;
27+
import java.util.Objects;
28+
import java.util.Timer;
29+
import java.util.TimerTask;
30+
import java.util.concurrent.DelayQueue;
31+
import java.util.concurrent.Delayed;
32+
import java.util.concurrent.TimeUnit;
33+
import java.util.logging.Logger;
34+
35+
import jakarta.annotation.Resource;
36+
import org.apache.cxf.Bus;
37+
import org.apache.cxf.buslifecycle.BusLifeCycleListener;
38+
import org.apache.cxf.buslifecycle.BusLifeCycleManager;
39+
import org.apache.cxf.common.logging.LogUtils;
40+
41+
public final class DelayedCachedOutputStreamCleaner implements CachedOutputStreamCleaner, BusLifeCycleListener {
42+
private static final Logger LOG = LogUtils.getL7dLogger(DelayedCachedOutputStreamCleaner.class);
43+
44+
private long delay; /* default is 30 minutes, in milliseconds */
45+
private final DelayQueue<DelayedCloseable> queue = new DelayQueue<>();
46+
private Timer timer;
47+
48+
private static final class DelayedCloseable implements Delayed {
49+
private final Closeable closeable;
50+
private final long expiredAt;
51+
52+
DelayedCloseable(final Closeable closeable, final long delay) {
53+
this.closeable = closeable;
54+
this.expiredAt = System.nanoTime() + delay;
55+
}
56+
57+
@Override
58+
public int compareTo(Delayed o) {
59+
return Long.compare(getDelay(TimeUnit.NANOSECONDS), o.getDelay(TimeUnit.NANOSECONDS));
60+
}
61+
62+
@Override
63+
public long getDelay(TimeUnit unit) {
64+
return unit.convert(expiredAt - System.nanoTime(), TimeUnit.NANOSECONDS);
65+
}
66+
67+
@Override
68+
public int hashCode() {
69+
return Objects.hash(closeable);
70+
}
71+
72+
@Override
73+
public boolean equals(Object obj) {
74+
if (this == obj) {
75+
return true;
76+
}
77+
78+
if (obj == null) {
79+
return false;
80+
}
81+
82+
if (getClass() != obj.getClass()) {
83+
return false;
84+
}
85+
86+
final DelayedCloseable other = (DelayedCloseable) obj;
87+
return Objects.equals(closeable, other.closeable);
88+
}
89+
}
90+
91+
@Resource
92+
public void setBus(Bus bus) {
93+
Number delayValue = null;
94+
BusLifeCycleManager busLifeCycleManager = null;
95+
96+
if (bus != null) {
97+
delayValue = (Number) bus.getProperty(CachedConstants.CLEANER_DELAY_BUS_PROP);
98+
busLifeCycleManager = bus.getExtension(BusLifeCycleManager.class);
99+
}
100+
101+
if (delayValue == null) {
102+
delay = TimeUnit.MILLISECONDS.convert(30, TimeUnit.MINUTES);
103+
} else {
104+
final long value = delayValue.longValue();
105+
if (value > 0) {
106+
delay = value; /* already in milliseconds */
107+
}
108+
}
109+
110+
if (busLifeCycleManager != null) {
111+
busLifeCycleManager.registerLifeCycleListener(this);
112+
}
113+
114+
if (timer != null) {
115+
timer.cancel();
116+
timer = null;
117+
}
118+
119+
if (delay > 0) {
120+
timer = new Timer("DelayedCachedOutputStreamCleaner", true);
121+
timer.scheduleAtFixedRate(new TimerTask() {
122+
@Override
123+
public void run() {
124+
clean();
125+
}
126+
}, 0, Math.max(1, delay >> 1));
127+
}
128+
129+
}
130+
131+
@Override
132+
public void register(Closeable closeable) {
133+
queue.put(new DelayedCloseable(closeable, delay));
134+
}
135+
136+
@Override
137+
public void unregister(Closeable closeable) {
138+
queue.remove(new DelayedCloseable(closeable, delay));
139+
}
140+
141+
@Override
142+
public void clean() {
143+
final Collection<DelayedCloseable> closeables = new ArrayList<>();
144+
queue.drainTo(closeables);
145+
clean(closeables);
146+
}
147+
148+
@Override
149+
public void initComplete() {
150+
}
151+
152+
@Override
153+
public void postShutdown() {
154+
}
155+
156+
@Override
157+
public void preShutdown() {
158+
if (timer != null) {
159+
timer.cancel();
160+
}
161+
}
162+
163+
public void forceClean() {
164+
clean(queue);
165+
}
166+
167+
private void clean(Collection<DelayedCloseable> closeables) {
168+
final Iterator<DelayedCloseable> iterator = closeables.iterator();
169+
while (iterator.hasNext()) {
170+
final DelayedCloseable next = iterator.next();
171+
try {
172+
iterator.remove();
173+
LOG.warning("Unclosed (leaked?) stream detected: " + next.closeable);
174+
next.closeable.close();
175+
} catch (final IOException | RuntimeException ex) {
176+
LOG.warning("Unable to close (leaked?) stream: " + ex.getMessage());
177+
}
178+
}
179+
}
180+
}

core/src/main/resources/META-INF/cxf/bus-extensions.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,4 @@ org.apache.cxf.bus.resource.ResourceManagerImpl:org.apache.cxf.resource.Resource
1111
org.apache.cxf.catalog.OASISCatalogManager:org.apache.cxf.catalog.OASISCatalogManager:true
1212
org.apache.cxf.common.util.ASMHelperImpl:org.apache.cxf.common.util.ASMHelper:true
1313
org.apache.cxf.common.spi.ClassLoaderProxyService:org.apache.cxf.common.spi.ClassLoaderService:true
14-
14+
org.apache.cxf.io.DelayedCachedOutputStreamCleaner:org.apache.cxf.io.CachedOutputStreamCleaner:true

0 commit comments

Comments
 (0)