1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.commons.vfs2.util; 18 19 import java.io.BufferedInputStream; 20 import java.io.IOException; 21 import java.io.InputStream; 22 import java.util.concurrent.atomic.AtomicBoolean; 23 import java.util.concurrent.atomic.AtomicLong; 24 25 /** 26 * An InputStream that provides buffering and end-of-stream monitoring. 27 */ 28 public class MonitorInputStream extends BufferedInputStream { 29 30 private static final int EOF_CHAR = -1; 31 private final AtomicLong atomicCount = new AtomicLong(); 32 private final AtomicBoolean finished = new AtomicBoolean(false); 33 34 /** 35 * Constructs a MonitorInputStream from the passed InputStream 36 * 37 * @param in The input stream to wrap. 38 */ 39 public MonitorInputStream(final InputStream in) { 40 super(in); 41 } 42 43 /** 44 * Constructs a MonitorInputStream from the passed InputStream and with the specified buffer size 45 * 46 * @param in The input stream to wrap. 47 * @param bufferSize The buffer size to use. 48 * @since 2.4 49 */ 50 public MonitorInputStream(final InputStream in, final int bufferSize) { 51 super(in, bufferSize); 52 } 53 54 /** 55 * Returns 0 if the stream is at EOF, else the underlying inputStream will be queried. 56 * 57 * @return The number of bytes that are available. 58 * @throws IOException if an error occurs. 59 * @since 2.0 60 */ 61 @Override 62 public synchronized int available() throws IOException { 63 if (finished.get()) { 64 return 0; 65 } 66 67 return super.available(); 68 } 69 70 /** 71 * Closes this input stream and releases any system resources associated with the stream. 72 * 73 * @throws IOException if an error occurs. 74 */ 75 @Override 76 public void close() throws IOException { 77 final boolean closed = finished.getAndSet(true); 78 if (closed) { 79 return; 80 } 81 82 // Close the stream 83 IOException exc = null; 84 try { 85 closeSuper(); 86 } catch (final IOException ioe) { 87 exc = ioe; 88 } 89 90 // Notify that the stream has been closed 91 try { 92 onClose(); 93 } catch (final IOException ioe) { 94 exc = ioe; 95 } 96 97 if (exc != null) { 98 throw exc; 99 } 100 } 101 102 /** 103 * Gets the number of bytes read by this input stream. 104 * 105 * @return The number of bytes read by this input stream. 106 */ 107 public long getCount() { 108 return atomicCount.get(); 109 } 110 111 /** 112 * This method exists in order to allow overriding whether to actually close 113 * the underlying stream (VFS-805). There are cases where closing that stream will 114 * consume any amount of remaining data. In such cases closing a different 115 * entity instead (such as an HttpResponse) may be more appropriate. 116 * @throws IOException if an IO error occurs. 117 */ 118 protected void closeSuper() throws IOException { 119 super.close(); 120 } 121 122 /** 123 * Called after the stream has been closed. This implementation does nothing. 124 * 125 * @throws IOException if an error occurs. 126 */ 127 protected void onClose() throws IOException { 128 // noop 129 } 130 131 /** 132 * Reads a character. 133 * 134 * @return The character that was read as an integer. 135 * @throws IOException if an IO error occurs. 136 */ 137 @Override 138 public int read() throws IOException { // lgtm [java/non-sync-override] 139 if (finished.get()) { 140 return EOF_CHAR; 141 } 142 143 final int ch = super.read(); 144 if (ch != EOF_CHAR) { 145 atomicCount.incrementAndGet(); 146 } 147 148 return ch; 149 } 150 151 /** 152 * Reads bytes from this input stream. 153 * 154 * @param buffer A byte array in which to place the characters read. 155 * @param offset The offset at which to start reading. 156 * @param length The maximum number of bytes to read. 157 * @return The number of bytes read. 158 * @throws IOException if an IO error occurs. 159 */ 160 @Override 161 public int read(final byte[] buffer, final int offset, final int length) throws IOException { // lgtm [java/non-sync-override] 162 if (finished.get()) { 163 return EOF_CHAR; 164 } 165 166 final int nread = super.read(buffer, offset, length); 167 if (nread != EOF_CHAR) { 168 atomicCount.addAndGet(nread); 169 } 170 return nread; 171 } 172 }