1 package org.apache.commons.jcs3.auxiliary.disk.block;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.File;
23 import java.io.IOException;
24 import java.nio.ByteBuffer;
25 import java.nio.channels.FileChannel;
26 import java.nio.file.StandardOpenOption;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28 import java.util.concurrent.atomic.AtomicInteger;
29 import java.util.concurrent.atomic.AtomicLong;
30
31 import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
32 import org.apache.commons.jcs3.log.Log;
33 import org.apache.commons.jcs3.log.LogManager;
34 import org.apache.commons.jcs3.utils.serialization.StandardSerializer;
35
36
37
38
39
40 public class BlockDisk implements AutoCloseable
41 {
42
43 private static final Log log = LogManager.getLog(BlockDisk.class);
44
45
46 public static final byte HEADER_SIZE_BYTES = 4;
47
48
49
50 private static final int DEFAULT_BLOCK_SIZE_BYTES = 4 * 1024;
51
52
53 private final int blockSizeBytes;
54
55
56
57
58
59 private final AtomicInteger numberOfBlocks = new AtomicInteger(0);
60
61
62 private final ConcurrentLinkedQueue<Integer> emptyBlocks = new ConcurrentLinkedQueue<>();
63
64
65 private final IElementSerializer elementSerializer;
66
67
68 private final String filepath;
69
70
71 private final FileChannel fc;
72
73
74 private final AtomicLong putBytes = new AtomicLong();
75
76
77 private final AtomicLong putCount = new AtomicLong();
78
79
80
81
82
83
84
85
86 public BlockDisk(final File file, final IElementSerializer elementSerializer)
87 throws IOException
88 {
89 this(file, DEFAULT_BLOCK_SIZE_BYTES, elementSerializer);
90 }
91
92
93
94
95
96
97
98
99 public BlockDisk(final File file, final int blockSizeBytes)
100 throws IOException
101 {
102 this(file, blockSizeBytes, new StandardSerializer());
103 }
104
105
106
107
108
109
110
111
112
113 public BlockDisk(final File file, final int blockSizeBytes, final IElementSerializer elementSerializer)
114 throws IOException
115 {
116 this.filepath = file.getAbsolutePath();
117 this.fc = FileChannel.open(file.toPath(),
118 StandardOpenOption.CREATE,
119 StandardOpenOption.READ,
120 StandardOpenOption.WRITE);
121 this.numberOfBlocks.set((int) Math.ceil(1f * this.fc.size() / blockSizeBytes));
122
123 log.info("Constructing BlockDisk, blockSizeBytes [{0}]", blockSizeBytes);
124
125 this.blockSizeBytes = blockSizeBytes;
126 this.elementSerializer = elementSerializer;
127 }
128
129
130
131
132
133
134
135 private int[] allocateBlocks(final int numBlocksNeeded)
136 {
137 assert numBlocksNeeded >= 1;
138
139 final int[] blocks = new int[numBlocksNeeded];
140
141 for (int i = 0; i < numBlocksNeeded; i++)
142 {
143 Integer emptyBlock = emptyBlocks.poll();
144 if (emptyBlock == null)
145 {
146 emptyBlock = Integer.valueOf(numberOfBlocks.getAndIncrement());
147 }
148 blocks[i] = emptyBlock.intValue();
149 }
150
151 return blocks;
152 }
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171 protected <T> int[] write(final T object)
172 throws IOException
173 {
174
175 final byte[] data = elementSerializer.serialize(object);
176
177 log.debug("write, total pre-chunking data.length = {0}", data.length);
178
179 this.putBytes.addAndGet(data.length);
180 this.putCount.incrementAndGet();
181
182
183 final int numBlocksNeeded = calculateTheNumberOfBlocksNeeded(data);
184
185 log.debug("numBlocksNeeded = {0}", numBlocksNeeded);
186
187
188 final int[] blocks = allocateBlocks(numBlocksNeeded);
189
190 int offset = 0;
191 final int maxChunkSize = blockSizeBytes - HEADER_SIZE_BYTES;
192 final ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE_BYTES);
193 final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
194
195 for (int i = 0; i < numBlocksNeeded; i++)
196 {
197 headerBuffer.clear();
198 final int length = Math.min(maxChunkSize, data.length - offset);
199 headerBuffer.putInt(length);
200 headerBuffer.flip();
201
202 dataBuffer.position(offset).limit(offset + length);
203 final ByteBuffer slice = dataBuffer.slice();
204
205 final long position = calculateByteOffsetForBlockAsLong(blocks[i]);
206
207 int written = fc.write(headerBuffer, position);
208 assert written == HEADER_SIZE_BYTES;
209
210
211 written = fc.write(slice, position + HEADER_SIZE_BYTES);
212 assert written == length;
213
214 offset += length;
215 }
216
217
218
219 return blocks;
220 }
221
222
223
224
225
226
227
228
229 protected byte[][] getBlockChunks(final byte[] complete, final int numBlocksNeeded)
230 {
231 final byte[][] chunks = new byte[numBlocksNeeded][];
232
233 if (numBlocksNeeded == 1)
234 {
235 chunks[0] = complete;
236 }
237 else
238 {
239 final int maxChunkSize = this.blockSizeBytes - HEADER_SIZE_BYTES;
240 final int totalBytes = complete.length;
241 int totalUsed = 0;
242 for (short i = 0; i < numBlocksNeeded; i++)
243 {
244
245
246 final int chunkSize = Math.min(maxChunkSize, totalBytes - totalUsed);
247 final byte[] chunk = new byte[chunkSize];
248
249
250 System.arraycopy(complete, totalUsed, chunk, 0, chunkSize);
251 chunks[i] = chunk;
252 totalUsed += chunkSize;
253 }
254 }
255
256 return chunks;
257 }
258
259
260
261
262
263
264
265
266
267 protected <T> T read(final int[] blockNumbers)
268 throws IOException, ClassNotFoundException
269 {
270 final ByteBuffer data;
271
272 if (blockNumbers.length == 1)
273 {
274 data = readBlock(blockNumbers[0]);
275 }
276 else
277 {
278 data = ByteBuffer.allocate(blockNumbers.length * getBlockSizeBytes());
279
280 for (short i = 0; i < blockNumbers.length; i++)
281 {
282 final ByteBuffer chunk = readBlock(blockNumbers[i]);
283 data.put(chunk);
284 }
285
286 data.flip();
287 }
288
289 log.debug("read, total post combination data.length = {0}", () -> data.limit());
290
291 return elementSerializer.deSerialize(data.array(), null);
292 }
293
294
295
296
297
298
299
300
301
302
303
304 private ByteBuffer readBlock(final int block)
305 throws IOException
306 {
307 int datalen = 0;
308
309 String message = null;
310 boolean corrupted = false;
311 final long fileLength = fc.size();
312
313 final long position = calculateByteOffsetForBlockAsLong(block);
314
315
316
317
318
319
320 {
321 final ByteBuffer datalength = ByteBuffer.allocate(HEADER_SIZE_BYTES);
322 fc.read(datalength, position);
323 datalength.flip();
324 datalen = datalength.getInt();
325 if (position + datalen > fileLength)
326 {
327 corrupted = true;
328 message = "Record " + position + " exceeds file length.";
329 }
330 }
331
332 if (corrupted)
333 {
334 log.warn("\n The file is corrupt: \n {0}", message);
335 throw new IOException("The File Is Corrupt, need to reset");
336 }
337
338 final ByteBuffer data = ByteBuffer.allocate(datalen);
339 fc.read(data, position + HEADER_SIZE_BYTES);
340 data.flip();
341
342 return data;
343 }
344
345
346
347
348
349
350 protected void freeBlocks(final int[] blocksToFree)
351 {
352 if (blocksToFree != null)
353 {
354 for (short i = 0; i < blocksToFree.length; i++)
355 {
356 emptyBlocks.offer(Integer.valueOf(blocksToFree[i]));
357 }
358 }
359 }
360
361
362
363
364
365
366
367
368 protected long calculateByteOffsetForBlockAsLong(final int block)
369 {
370 return (long) block * blockSizeBytes;
371 }
372
373
374
375
376
377
378
379 protected int calculateTheNumberOfBlocksNeeded(final byte[] data)
380 {
381 final int dataLength = data.length;
382
383 final int oneBlock = blockSizeBytes - HEADER_SIZE_BYTES;
384
385
386 if (dataLength <= oneBlock)
387 {
388 return 1;
389 }
390
391 int dividend = dataLength / oneBlock;
392
393 if (dataLength % oneBlock != 0)
394 {
395 dividend++;
396 }
397 return dividend;
398 }
399
400
401
402
403
404
405
406 protected long length()
407 throws IOException
408 {
409 return fc.size();
410 }
411
412
413
414
415
416
417 @Override
418 public void close()
419 throws IOException
420 {
421 this.numberOfBlocks.set(0);
422 this.emptyBlocks.clear();
423 fc.close();
424 }
425
426
427
428
429
430
431 protected synchronized void reset()
432 throws IOException
433 {
434 this.numberOfBlocks.set(0);
435 this.emptyBlocks.clear();
436 fc.truncate(0);
437 fc.force(true);
438 }
439
440
441
442
443 protected int getNumberOfBlocks()
444 {
445 return numberOfBlocks.get();
446 }
447
448
449
450
451 protected int getBlockSizeBytes()
452 {
453 return blockSizeBytes;
454 }
455
456
457
458
459 protected long getAveragePutSizeBytes()
460 {
461 final long count = this.putCount.get();
462
463 if (count == 0)
464 {
465 return 0;
466 }
467 return this.putBytes.get() / count;
468 }
469
470
471
472
473 protected int getEmptyBlocks()
474 {
475 return this.emptyBlocks.size();
476 }
477
478
479
480
481
482
483 @Override
484 public String toString()
485 {
486 final StringBuilder buf = new StringBuilder();
487 buf.append("\nBlock Disk ");
488 buf.append("\n Filepath [" + filepath + "]");
489 buf.append("\n NumberOfBlocks [" + this.numberOfBlocks.get() + "]");
490 buf.append("\n BlockSizeBytes [" + this.blockSizeBytes + "]");
491 buf.append("\n Put Bytes [" + this.putBytes + "]");
492 buf.append("\n Put Count [" + this.putCount + "]");
493 buf.append("\n Average Size [" + getAveragePutSizeBytes() + "]");
494 buf.append("\n Empty Blocks [" + this.getEmptyBlocks() + "]");
495 try
496 {
497 buf.append("\n Length [" + length() + "]");
498 }
499 catch (final IOException e)
500 {
501
502 }
503 return buf.toString();
504 }
505
506
507
508
509
510
511 protected String getFilePath()
512 {
513 return filepath;
514 }
515 }