Skip to content

Commit 3ba9271

Browse files
committed
CXF-7396: CachedOutputStream doesn't delete temp files
1 parent e094bb4 commit 3ba9271

File tree

9 files changed

+480
-3
lines changed

9 files changed

+480
-3
lines changed

core/pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,11 @@
175175
<artifactId>saaj-impl</artifactId>
176176
<scope>test</scope>
177177
</dependency>
178+
<dependency>
179+
<groupId>org.awaitility</groupId>
180+
<artifactId>awaitility</artifactId>
181+
<scope>test</scope>
182+
</dependency>
178183
</dependencies>
179184
<build>
180185
<plugins>

core/src/main/java/org/apache/cxf/bus/extension/ExtensionManagerBus.java

+7
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@
4040
import org.apache.cxf.configuration.NullConfigurer;
4141
import org.apache.cxf.feature.Feature;
4242
import org.apache.cxf.interceptor.AbstractBasicInterceptorProvider;
43+
import org.apache.cxf.io.CachedOutputStreamCleaner;
44+
import org.apache.cxf.io.DelayedCachedOutputStreamCleaner;
4345
import org.apache.cxf.resource.DefaultResourceManager;
4446
import org.apache.cxf.resource.ObjectTypeResolver;
4547
import org.apache.cxf.resource.PropertiesResolver;
@@ -141,6 +143,11 @@ public InputStream getAsStream(String name) {
141143
if (null == this.getExtension(BindingFactoryManager.class)) {
142144
new BindingFactoryManagerImpl(this);
143145
}
146+
147+
if (null == this.getExtension(CachedOutputStreamCleaner.class)) {
148+
this.extensions.put(CachedOutputStreamCleaner.class, DelayedCachedOutputStreamCleaner.create(this));
149+
}
150+
144151
extensionManager.load(new String[] {ExtensionManagerImpl.BUS_EXTENSION_RESOURCE});
145152
extensionManager.activateAllByType(ResourceResolver.class);
146153

core/src/main/java/org/apache/cxf/io/CachedConstants.java

+8
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,14 @@ public final class CachedConstants {
7171
public static final String CIPHER_TRANSFORMATION_BUS_PROP =
7272
"bus.io.CachedOutputStream.CipherTransformation";
7373

74+
/**
75+
* The delay (in ms) for cleaning up unclosed {@code CachedOutputStream} instances. 30 minutes
76+
* is specified by default. If the value of the delay is set to 0 (or is negative), the cleaner
77+
* will be disabled.
78+
*/
79+
public static final String CLEANER_DELAY_BUS_PROP =
80+
"bus.io.CachedOutputStreamCleaner.Delay";
81+
7482
private CachedConstants() {
7583
// complete
7684
}

core/src/main/java/org/apache/cxf/io/CachedOutputStream.java

+21-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.io.BufferedOutputStream;
2323
import java.io.ByteArrayInputStream;
2424
import java.io.ByteArrayOutputStream;
25+
import java.io.Closeable;
2526
import java.io.File;
2627
import java.io.FileInputStream;
2728
import java.io.FileNotFoundException;
@@ -93,6 +94,7 @@ public class CachedOutputStream extends OutputStream {
9394
private List<CachedOutputStreamCallback> callbacks;
9495

9596
private List<Object> streamList = new ArrayList<>();
97+
private CachedOutputStreamCleaner cachedOutputStreamCleaner;
9698

9799
public CachedOutputStream() {
98100
this(defaultThreshold);
@@ -127,6 +129,8 @@ private void readBusProperties() {
127129
outputDir = f;
128130
}
129131
}
132+
133+
cachedOutputStreamCleaner = b.getExtension(CachedOutputStreamCleaner.class);
130134
}
131135
}
132136

@@ -279,6 +283,9 @@ public void resetOut(OutputStream out, boolean copyOldContent) throws IOExceptio
279283
}
280284
} finally {
281285
streamList.remove(currentStream);
286+
if (cachedOutputStreamCleaner != null) {
287+
cachedOutputStreamCleaner.unregister(currentStream);
288+
}
282289
deleteTempFile();
283290
inmem = true;
284291
}
@@ -481,6 +488,9 @@ private void createFileOutputStream() throws IOException {
481488
bout.writeTo(currentStream);
482489
inmem = false;
483490
streamList.add(currentStream);
491+
if (cachedOutputStreamCleaner != null) {
492+
cachedOutputStreamCleaner.register(this);
493+
}
484494
} catch (Exception ex) {
485495
//Could be IOException or SecurityException or other issues.
486496
//Don't care what, just keep it in memory.
@@ -512,6 +522,10 @@ public InputStream getInputStream() throws IOException {
512522
try {
513523
InputStream fileInputStream = new TransferableFileInputStream(tempFile);
514524
streamList.add(fileInputStream);
525+
if (cachedOutputStreamCleaner != null) {
526+
cachedOutputStreamCleaner.register(fileInputStream);
527+
}
528+
515529
if (cipherTransformation != null) {
516530
fileInputStream = new CipherInputStream(fileInputStream, ciphers.getDecryptor()) {
517531
boolean closed;
@@ -537,7 +551,7 @@ private synchronized void deleteTempFile() {
537551
FileUtils.delete(file);
538552
}
539553
}
540-
private boolean maybeDeleteTempFile(Object stream) {
554+
private boolean maybeDeleteTempFile(Closeable stream) {
541555
boolean postClosedInvoked = false;
542556
streamList.remove(stream);
543557
if (!inmem && tempFile != null && streamList.isEmpty() && allowDeleteOfFile) {
@@ -549,6 +563,9 @@ private boolean maybeDeleteTempFile(Object stream) {
549563
//ignore
550564
}
551565
postClosedInvoked = true;
566+
if (cachedOutputStreamCleaner != null) {
567+
cachedOutputStreamCleaner.unregister(this);
568+
}
552569
}
553570
deleteTempFile();
554571
currentStream = new LoadingByteArrayOutputStream(1024);
@@ -665,6 +682,9 @@ public void close() throws IOException {
665682
if (!closed) {
666683
super.close();
667684
maybeDeleteTempFile(this);
685+
if (cachedOutputStreamCleaner != null) {
686+
cachedOutputStreamCleaner.unregister(this);
687+
}
668688
}
669689
closed = true;
670690
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.cxf.io;
21+
22+
import java.io.Closeable;
23+
24+
/**
25+
* The {@link Bus} extension to clean up unclosed {@link CachedOutputStream} instances (and alike) backed by
26+
* temporary files (leading to disk fill, see https://issues.apache.org/jira/browse/CXF-7396.
27+
*/
28+
public interface CachedOutputStreamCleaner {
29+
/**
30+
* Run the clean up
31+
*/
32+
void clean();
33+
34+
/**
35+
* Register the stream instance for the clean up
36+
*/
37+
void unregister(Closeable closeable);
38+
39+
/**
40+
* Unregister the stream instance from the clean up (closed properly)
41+
*/
42+
void register(Closeable closeable);
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.cxf.io;
21+
22+
import java.io.Closeable;
23+
import java.io.IOException;
24+
import java.util.ArrayList;
25+
import java.util.Collection;
26+
import java.util.Iterator;
27+
import java.util.Objects;
28+
import java.util.Timer;
29+
import java.util.TimerTask;
30+
import java.util.concurrent.DelayQueue;
31+
import java.util.concurrent.Delayed;
32+
import java.util.concurrent.TimeUnit;
33+
import java.util.logging.Logger;
34+
35+
import org.apache.cxf.Bus;
36+
import org.apache.cxf.Bus.BusState;
37+
import org.apache.cxf.buslifecycle.BusLifeCycleListener;
38+
import org.apache.cxf.buslifecycle.BusLifeCycleManager;
39+
import org.apache.cxf.common.logging.LogUtils;
40+
41+
public final class DelayedCachedOutputStreamCleaner implements CachedOutputStreamCleaner, BusLifeCycleListener {
42+
private static final Logger LOG = LogUtils.getL7dLogger(DelayedCachedOutputStreamCleaner.class);
43+
44+
private final long delay; /* default is 30 minutes */
45+
private final DelayQueue<DelayedCloseable> queue = new DelayQueue<>();
46+
private final Timer timer;
47+
48+
private static final class DelayedCloseable implements Delayed {
49+
private final Closeable closeable;
50+
private final long expiredAt;
51+
52+
DelayedCloseable(final Closeable closeable, final long delay) {
53+
this.closeable = closeable;
54+
this.expiredAt = System.nanoTime() + delay;
55+
}
56+
57+
@Override
58+
public int compareTo(Delayed o) {
59+
return Long.compare(getDelay(TimeUnit.NANOSECONDS), o.getDelay(TimeUnit.NANOSECONDS));
60+
}
61+
62+
@Override
63+
public long getDelay(TimeUnit unit) {
64+
return unit.convert(expiredAt - System.nanoTime(), TimeUnit.NANOSECONDS);
65+
}
66+
67+
@Override
68+
public int hashCode() {
69+
return Objects.hash(closeable);
70+
}
71+
72+
@Override
73+
public boolean equals(Object obj) {
74+
if (this == obj) {
75+
return true;
76+
}
77+
78+
if (obj == null) {
79+
return false;
80+
}
81+
82+
if (getClass() != obj.getClass()) {
83+
return false;
84+
}
85+
86+
final DelayedCloseable other = (DelayedCloseable) obj;
87+
return Objects.equals(closeable, other.closeable);
88+
}
89+
}
90+
91+
protected DelayedCachedOutputStreamCleaner(long delay, TimeUnit unit) {
92+
this.delay = TimeUnit.NANOSECONDS.convert(delay, unit);
93+
this.timer = new Timer("DelayedCachedOutputStreamCleaner", true);
94+
this.timer.scheduleAtFixedRate(new TimerTask() {
95+
@Override
96+
public void run() {
97+
clean();
98+
}
99+
}, 0, TimeUnit.MILLISECONDS.convert(Math.max(1, delay >> 1), unit));
100+
}
101+
102+
@Override
103+
public void register(Closeable closeable) {
104+
queue.put(new DelayedCloseable(closeable, delay));
105+
}
106+
107+
@Override
108+
public void unregister(Closeable closeable) {
109+
queue.remove(new DelayedCloseable(closeable, delay));
110+
}
111+
112+
@Override
113+
public void clean() {
114+
final Collection<DelayedCloseable> closeables = new ArrayList<>();
115+
queue.drainTo(closeables);
116+
clean(closeables);
117+
}
118+
119+
@Override
120+
public void initComplete() {
121+
}
122+
123+
@Override
124+
public void postShutdown() {
125+
}
126+
127+
@Override
128+
public void preShutdown() {
129+
timer.cancel();
130+
}
131+
132+
public void forceClean() {
133+
clean(queue);
134+
}
135+
136+
private void clean(Collection<DelayedCloseable> closeables) {
137+
final Iterator<DelayedCloseable> iterator = closeables.iterator();
138+
while (iterator.hasNext()) {
139+
final DelayedCloseable next = iterator.next();
140+
try {
141+
iterator.remove();
142+
LOG.warning("Unclosed (leaked?) stream detected: " + next.closeable);
143+
next.closeable.close();
144+
} catch (final IOException | RuntimeException ex) {
145+
LOG.warning("Unable to close (leaked?) stream: " + ex.getMessage());
146+
}
147+
}
148+
}
149+
150+
public static CachedOutputStreamCleaner create(Bus bus) {
151+
Number delayValue = null;
152+
BusLifeCycleManager busLifeCycleManager = null;
153+
154+
if (bus != null) {
155+
delayValue = (Number) bus.getProperty(CachedConstants.CLEANER_DELAY_BUS_PROP);
156+
if (bus.getState() == BusState.RUNNING) {
157+
busLifeCycleManager = bus.getExtension(BusLifeCycleManager.class);
158+
}
159+
}
160+
161+
if (delayValue == null) {
162+
final DelayedCachedOutputStreamCleaner cleaner =
163+
new DelayedCachedOutputStreamCleaner(30, TimeUnit.MINUTES);
164+
if (busLifeCycleManager != null) {
165+
busLifeCycleManager.registerLifeCycleListener(cleaner);
166+
}
167+
return cleaner;
168+
} else {
169+
final long delay = delayValue.longValue();
170+
if (delay > 0) {
171+
final DelayedCachedOutputStreamCleaner cleaner =
172+
new DelayedCachedOutputStreamCleaner(delay, TimeUnit.MILLISECONDS);
173+
if (busLifeCycleManager != null) {
174+
busLifeCycleManager.registerLifeCycleListener(cleaner);
175+
}
176+
return cleaner;
177+
} else {
178+
return new CachedOutputStreamCleaner() {
179+
@Override
180+
public void unregister(Closeable closeable) {
181+
}
182+
183+
@Override
184+
public void register(Closeable closeable) {
185+
}
186+
187+
@Override
188+
public void clean() {
189+
}
190+
}; /* noop */
191+
}
192+
}
193+
}
194+
}

0 commit comments

Comments
 (0)