1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.commons.net.telnet;
19
20 import java.io.BufferedInputStream;
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.InterruptedIOException;
24
25 final class TelnetInputStream extends BufferedInputStream implements Runnable {
26
27 private static final int EOF = -1;
28
29
30 private static final int WOULD_BLOCK = -2;
31
32
33 static final int STATE_DATA = 0, STATE_IAC = 1, STATE_WILL = 2, STATE_WONT = 3, STATE_DO = 4, STATE_DONT = 5, STATE_SB = 6, STATE_SE = 7, STATE_CR = 8,
34 STATE_IAC_SB = 9;
35
36 private boolean hasReachedEOF;
37 private volatile boolean isClosed;
38 private boolean readIsWaiting;
39 private int receiveState, queueHead, queueTail, bytesAvailable;
40 private final int[] queue;
41 private final TelnetClient client;
42 private final Thread thread;
43 private IOException ioException;
44
45
46 private final int suboption[];
47 private int suboptionCount;
48
49
50 private volatile boolean threaded;
51
52 TelnetInputStream(final InputStream input, final TelnetClient client) {
53 this(input, client, true);
54 }
55
56 TelnetInputStream(final InputStream input, final TelnetClient client, final boolean readerThread) {
57 super(input);
58 this.client = client;
59 this.receiveState = STATE_DATA;
60 this.isClosed = true;
61 this.hasReachedEOF = false;
62
63
64 this.queue = new int[2049];
65 this.queueHead = 0;
66 this.queueTail = 0;
67 this.suboption = new int[client.maxSubnegotiationLength];
68 this.bytesAvailable = 0;
69 this.ioException = null;
70 this.readIsWaiting = false;
71 this.threaded = false;
72 if (readerThread) {
73 this.thread = new Thread(this);
74 } else {
75 this.thread = null;
76 }
77 }
78
79 @Override
80 public int available() throws IOException {
81
82 synchronized (queue) {
83 if (threaded) {
84 return bytesAvailable;
85 }
86 return bytesAvailable + super.available();
87 }
88 }
89
90
91
92 @Override
93 public void close() throws IOException {
94
95
96
97
98 super.close();
99
100 synchronized (queue) {
101 hasReachedEOF = true;
102 isClosed = true;
103
104 if (thread != null && thread.isAlive()) {
105 thread.interrupt();
106 }
107
108 queue.notifyAll();
109 }
110
111 }
112
113
114 @Override
115 public boolean markSupported() {
116 return false;
117 }
118
119
120
121
122
123 private boolean processChar(final int ch) throws InterruptedException {
124
125
126 final boolean bufferWasEmpty;
127 synchronized (queue) {
128 bufferWasEmpty = bytesAvailable == 0;
129 while (bytesAvailable >= queue.length - 1) {
130
131
132 if (!threaded) {
133
134
135 throw new IllegalStateException("Queue is full! Cannot process another character.");
136 }
137 queue.notify();
138 try {
139 queue.wait();
140 } catch (final InterruptedException e) {
141 throw e;
142 }
143 }
144
145
146 if (readIsWaiting && threaded) {
147 queue.notify();
148 }
149
150 queue[queueTail] = ch;
151 ++bytesAvailable;
152
153 if (++queueTail >= queue.length) {
154 queueTail = 0;
155 }
156 }
157 return bufferWasEmpty;
158 }
159
160 @Override
161 public int read() throws IOException {
162
163
164
165 synchronized (queue) {
166
167 while (true) {
168 if (ioException != null) {
169 final IOException e;
170 e = ioException;
171 ioException = null;
172 throw e;
173 }
174
175 if (bytesAvailable == 0) {
176
177 if (hasReachedEOF) {
178 return EOF;
179 }
180
181
182 if (threaded) {
183 queue.notify();
184 try {
185 readIsWaiting = true;
186 queue.wait();
187 readIsWaiting = false;
188 } catch (final InterruptedException e) {
189 throw new InterruptedIOException("Fatal thread interruption during read.");
190 }
191 } else {
192
193 readIsWaiting = true;
194 int ch;
195 boolean mayBlock = true;
196
197 do {
198 try {
199 if ((ch = read(mayBlock)) < 0) {
200 if (ch != WOULD_BLOCK) {
201 return ch;
202 }
203 }
204 } catch (final InterruptedIOException e) {
205 synchronized (queue) {
206 ioException = e;
207 queue.notifyAll();
208 try {
209 queue.wait(100);
210 } catch (final InterruptedException interrupted) {
211
212 }
213 }
214 return EOF;
215 }
216
217 try {
218 if (ch != WOULD_BLOCK) {
219 processChar(ch);
220 }
221 } catch (final InterruptedException e) {
222 if (isClosed) {
223 return EOF;
224 }
225 }
226
227
228
229 mayBlock = false;
230
231 }
232
233 while (super.available() > 0 && bytesAvailable < queue.length - 1);
234
235 readIsWaiting = false;
236 }
237 continue;
238 }
239 final int ch;
240
241 ch = queue[queueHead];
242
243 if (++queueHead >= queue.length) {
244 queueHead = 0;
245 }
246
247 --bytesAvailable;
248
249
250 if (bytesAvailable == 0 && threaded) {
251 queue.notify();
252 }
253
254 return ch;
255 }
256 }
257 }
258
259
260
261
262
263
264
265
266
267
268
269 private int read(final boolean mayBlock) throws IOException {
270 int ch;
271
272 while (true) {
273
274
275
276 if (!mayBlock && super.available() == 0) {
277 return WOULD_BLOCK;
278 }
279
280
281 if ((ch = super.read()) < 0) {
282 return EOF;
283 }
284
285 ch &= 0xff;
286
287
288 synchronized (client) {
289 client.processAYTResponse();
290 }
291
292
293
294 client.spyRead(ch);
295
296
297 switch (receiveState) {
298
299 case STATE_CR:
300 if (ch == '\0') {
301
302 continue;
303 }
304
305
306
307
308
309
310 case STATE_DATA:
311 if (ch == TelnetCommand.IAC) {
312 receiveState = STATE_IAC;
313 continue;
314 }
315
316 if (ch == '\r') {
317 synchronized (client) {
318 if (client.requestedDont(TelnetOption.BINARY)) {
319 receiveState = STATE_CR;
320 } else {
321 receiveState = STATE_DATA;
322 }
323 }
324 } else {
325 receiveState = STATE_DATA;
326 }
327 break;
328
329 case STATE_IAC:
330 switch (ch) {
331 case TelnetCommand.WILL:
332 receiveState = STATE_WILL;
333 continue;
334 case TelnetCommand.WONT:
335 receiveState = STATE_WONT;
336 continue;
337 case TelnetCommand.DO:
338 receiveState = STATE_DO;
339 continue;
340 case TelnetCommand.DONT:
341 receiveState = STATE_DONT;
342 continue;
343
344 case TelnetCommand.SB:
345 suboptionCount = 0;
346 receiveState = STATE_SB;
347 continue;
348
349 case TelnetCommand.IAC:
350 receiveState = STATE_DATA;
351 break;
352 case TelnetCommand.SE:
353 receiveState = STATE_DATA;
354 continue;
355 default:
356 receiveState = STATE_DATA;
357 client.processCommand(ch);
358 continue;
359 }
360 break;
361 case STATE_WILL:
362 synchronized (client) {
363 client.processWill(ch);
364 client.flushOutputStream();
365 }
366 receiveState = STATE_DATA;
367 continue;
368 case STATE_WONT:
369 synchronized (client) {
370 client.processWont(ch);
371 client.flushOutputStream();
372 }
373 receiveState = STATE_DATA;
374 continue;
375 case STATE_DO:
376 synchronized (client) {
377 client.processDo(ch);
378 client.flushOutputStream();
379 }
380 receiveState = STATE_DATA;
381 continue;
382 case STATE_DONT:
383 synchronized (client) {
384 client.processDont(ch);
385 client.flushOutputStream();
386 }
387 receiveState = STATE_DATA;
388 continue;
389
390 case STATE_SB:
391 switch (ch) {
392 case TelnetCommand.IAC:
393 receiveState = STATE_IAC_SB;
394 continue;
395 default:
396
397 if (suboptionCount < suboption.length) {
398 suboption[suboptionCount++] = ch;
399 }
400 break;
401 }
402 receiveState = STATE_SB;
403 continue;
404 case STATE_IAC_SB:
405 switch (ch) {
406 case TelnetCommand.SE:
407 synchronized (client) {
408 client.processSuboption(suboption, suboptionCount);
409 client.flushOutputStream();
410 }
411 receiveState = STATE_DATA;
412 continue;
413 case TelnetCommand.IAC:
414 if (suboptionCount < suboption.length) {
415 suboption[suboptionCount++] = ch;
416 }
417 break;
418 default:
419 break;
420 }
421 receiveState = STATE_SB;
422 continue;
423
424 }
425
426 break;
427 }
428
429 return ch;
430 }
431
432
433
434
435
436
437
438
439 @Override
440 public int read(final byte buffer[]) throws IOException {
441 return read(buffer, 0, buffer.length);
442 }
443
444
445
446
447
448
449
450
451
452
453
454 @Override
455 public int read(final byte buffer[], int offset, int length) throws IOException {
456 int ch;
457 final int off;
458
459 if (length < 1) {
460 return 0;
461 }
462
463
464 synchronized (queue) {
465 if (length > bytesAvailable) {
466 length = bytesAvailable;
467 }
468 }
469
470 if ((ch = read()) == EOF) {
471 return EOF;
472 }
473
474 off = offset;
475
476 do {
477 buffer[offset++] = (byte) ch;
478 } while (--length > 0 && (ch = read()) != EOF);
479
480
481 return offset - off;
482 }
483
484 @Override
485 public void run() {
486 int ch;
487
488 try {
489 _outerLoop: while (!isClosed) {
490 try {
491 if ((ch = read(true)) < 0) {
492 break;
493 }
494 } catch (final InterruptedIOException e) {
495 synchronized (queue) {
496 ioException = e;
497 queue.notifyAll();
498 try {
499 queue.wait(100);
500 } catch (final InterruptedException interrupted) {
501 if (isClosed) {
502 break _outerLoop;
503 }
504 }
505 continue;
506 }
507 } catch (final RuntimeException re) {
508
509
510
511 super.close();
512
513
514 break _outerLoop;
515 }
516
517
518 boolean notify = false;
519 try {
520 notify = processChar(ch);
521 } catch (final InterruptedException e) {
522 if (isClosed) {
523 break _outerLoop;
524 }
525 }
526
527
528 if (notify) {
529 client.notifyInputListener();
530 }
531 }
532 } catch (final IOException ioe) {
533 synchronized (queue) {
534 ioException = ioe;
535 }
536 client.notifyInputListener();
537 }
538
539 synchronized (queue) {
540 isClosed = true;
541 hasReachedEOF = true;
542 queue.notify();
543 }
544
545 threaded = false;
546 }
547
548 void start() {
549 if (thread == null) {
550 return;
551 }
552
553 int priority;
554 isClosed = false;
555
556
557
558
559 priority = Thread.currentThread().getPriority() + 1;
560 if (priority > Thread.MAX_PRIORITY) {
561 priority = Thread.MAX_PRIORITY;
562 }
563 thread.setPriority(priority);
564 thread.setDaemon(true);
565 thread.start();
566 threaded = true;
567 }
568 }