1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package org.apache.commons.vfs2.util; 18 19 import java.io.FilterInputStream; 20 import java.io.IOException; 21 import java.io.InputStream; 22 import java.util.concurrent.atomic.AtomicBoolean; 23 import java.util.concurrent.atomic.AtomicLong; 24 25 /** 26 * An InputStream that provides end-of-stream monitoring. 27 * <p> 28 * This is the same as {@link MonitorInputStream} but without the buffering. 29 * </p> 30 * 31 * @since 2.5.0 32 */ 33 public class RawMonitorInputStream extends FilterInputStream { 34 35 private static final int EOF_CHAR = -1; 36 private final AtomicBoolean finished = new AtomicBoolean(false); 37 private final AtomicLong atomicCount = new AtomicLong(); 38 39 // @Override 40 // public synchronized void reset() throws IOException { 41 // if (!finished.get()) { 42 // super.reset(); 43 // } 44 // } 45 // 46 // @Override 47 // public synchronized long skip(long n) throws IOException { 48 // if (finished.get()) { 49 // return 0; 50 // } 51 // return super.skip(n); 52 // } 53 54 /** 55 * Constructs a MonitorInputStream from the passed InputStream 56 * 57 * @param inputStream The input stream to wrap. 58 */ 59 public RawMonitorInputStream(final InputStream inputStream) { 60 super(inputStream); 61 } 62 63 /** 64 * Returns 0 if the stream is at EOF, else the underlying inputStream will be queried. 65 * 66 * @return The number of bytes that are available. 67 * @throws IOException if an error occurs. 68 */ 69 @Override 70 public synchronized int available() throws IOException { 71 if (finished.get()) { 72 return 0; 73 } 74 75 return super.available(); 76 } 77 78 /** 79 * Reads a character. 80 * 81 * @return The character that was read as an integer. 82 * @throws IOException if an error occurs. 83 */ 84 @Override 85 public int read() throws IOException { // lgtm [java/non-sync-override] 86 if (finished.get()) { 87 return EOF_CHAR; 88 } 89 90 final int ch = super.read(); 91 if (ch != EOF_CHAR) { 92 atomicCount.incrementAndGet(); 93 } 94 95 return ch; 96 } 97 98 /** 99 * Reads bytes from this input stream. 100 * 101 * @param buffer A byte array in which to place the characters read. 102 * @param offset The offset at which to start reading. 103 * @param length The maximum number of bytes to read. 104 * @return The number of bytes read. 105 * @throws IOException if an error occurs. 106 */ 107 @Override 108 public int read(final byte[] buffer, final int offset, final int length) throws IOException { // lgtm [java/non-sync-override] 109 if (finished.get()) { 110 return EOF_CHAR; 111 } 112 113 final int nread = super.read(buffer, offset, length); 114 if (nread != EOF_CHAR) { 115 atomicCount.addAndGet(nread); 116 } 117 return nread; 118 } 119 120 /** 121 * Closes this input stream and releases any system resources associated with the stream. 122 * 123 * @throws IOException if an error occurs. 124 */ 125 @Override 126 public void close() throws IOException { 127 final boolean closed = finished.getAndSet(true); 128 if (closed) { 129 return; 130 } 131 132 // Close the stream 133 IOException exc = null; 134 try { 135 super.close(); 136 } catch (final IOException ioe) { 137 exc = ioe; 138 } 139 140 // Notify that the stream has been closed 141 try { 142 onClose(); 143 } catch (final IOException ioe) { 144 exc = ioe; 145 } 146 147 if (exc != null) { 148 throw exc; 149 } 150 } 151 152 /** 153 * Called after the stream has been closed. This implementation does nothing. 154 * 155 * @throws IOException if an error occurs. 156 */ 157 protected void onClose() throws IOException { 158 // noop 159 } 160 161 /** 162 * Gets the number of bytes read by this input stream. 163 * 164 * @return The number of bytes read by this input stream. 165 */ 166 public long getCount() { 167 return atomicCount.get(); 168 } 169 170 @Override 171 public synchronized void mark(final int readlimit) { 172 // TODO Auto-generated method stub 173 super.mark(readlimit); 174 } 175 }