1 // Copyright 2011 Google Inc. All rights reserved.
3 package com
.google
.appengine
.api
.files
;
5 import static com
.google
.appengine
.api
.files
.RecordConstants
.unmaskCrc
;
7 import com
.google
.appengine
.api
.files
.RecordConstants
.RecordType
;
9 import java
.io
.IOException
;
10 import java
.nio
.ByteBuffer
;
11 import java
.nio
.ByteOrder
;
12 import java
.util
.logging
.Level
;
13 import java
.util
.logging
.Logger
;
16 final class RecordReadChannelImpl
implements RecordReadChannel
{
17 private static final Logger log
= Logger
.getLogger(RecordReadChannelImpl
.class.getName());
19 static final class RecordReadException
extends Exception
{
20 public RecordReadException(String errorMessage
) {
25 private static final class Record
{
26 private final ByteBuffer data
;
27 private final RecordType type
;
29 public Record(RecordType type
, ByteBuffer data
) {
34 public ByteBuffer
data() {
38 public RecordType
type() {
42 private final Object lock
= new Object();
43 private final FileReadChannel input
;
44 private ByteBuffer blockBuffer
;
45 private ByteBuffer finalRecord
;
48 * @param input a {@link FileReadChannel} that holds Records to read from.
50 RecordReadChannelImpl(FileReadChannel input
) {
52 blockBuffer
= ByteBuffer
.allocate(RecordConstants
.BLOCK_SIZE
);
53 blockBuffer
.order(ByteOrder
.LITTLE_ENDIAN
);
54 finalRecord
= ByteBuffer
.allocate(RecordConstants
.BLOCK_SIZE
);
55 finalRecord
.order(ByteOrder
.LITTLE_ENDIAN
);
62 public ByteBuffer
readRecord() throws IOException
{
65 RecordType lastRead
= RecordType
.NONE
;
68 Record record
= readPhysicalRecord();
72 switch (record
.type()) {
74 validateRemainderIsEmpty();
77 if (lastRead
!= RecordType
.NONE
) {
78 throw new RecordReadException("Invalid RecordType: " + record
.type
);
80 return record
.data().slice();
82 if (lastRead
!= RecordType
.NONE
) {
83 throw new RecordReadException("Invalid RecordType: " + record
.type
);
85 finalRecord
= appendToBuffer(finalRecord
, record
.data());
88 if (lastRead
== RecordType
.NONE
) {
89 throw new RecordReadException("Invalid RecordType: " + record
.type
);
91 finalRecord
= appendToBuffer(finalRecord
, record
.data());
94 if (lastRead
== RecordType
.NONE
) {
95 throw new RecordReadException("Invalid RecordType: " + record
.type
);
97 finalRecord
= appendToBuffer(finalRecord
, record
.data());
99 return finalRecord
.slice();
101 throw new RecordReadException("Invalid RecordType: " + record
.type
.value());
103 lastRead
= record
.type();
104 } catch (RecordReadException e
) {
105 log
.log(Level
.SEVERE
, e
.getMessage() + " At pos " + position() + " in " + input
);
113 private void validateRemainderIsEmpty() throws IOException
, RecordReadException
{
114 int bytesToBlockEnd
=
115 (int) (RecordConstants
.BLOCK_SIZE
- (input
.position() % RecordConstants
.BLOCK_SIZE
));
117 blockBuffer
.limit(bytesToBlockEnd
);
118 int read
= input
.read(blockBuffer
);
119 if (read
!= bytesToBlockEnd
) {
120 throw new RecordReadException(
121 "There are " + bytesToBlockEnd
+ " but " + read
+ " were read.");
124 for (int i
= 0; i
< bytesToBlockEnd
; i
++) {
125 byte b
= blockBuffer
.get(i
);
127 throw new RecordReadException("Found a non-zero byte: " + b
128 + " before the end of the block " + i
129 + " bytes after encountering a RecordType of NONE");
137 * @throws IOException
140 public long position() throws IOException
{
141 synchronized (lock
) {
142 return input
.position();
150 public void position(long newPosition
) throws IOException
{
151 synchronized (lock
) {
152 input
.position(newPosition
);
157 * Reads the next record from the RecordIO data stream.
159 * @return Record data about the physical record read.
160 * @throws IOException
162 private Record
readPhysicalRecord() throws IOException
, RecordReadException
{
163 int bytesToBlockEnd
=
164 (int) (RecordConstants
.BLOCK_SIZE
- (input
.position() % RecordConstants
.BLOCK_SIZE
));
166 if (bytesToBlockEnd
< RecordConstants
.HEADER_LENGTH
) {
167 return new Record(RecordType
.NONE
, null);
171 blockBuffer
.limit(RecordConstants
.HEADER_LENGTH
);
172 int bytesRead
= input
.read(blockBuffer
);
173 if (bytesRead
!= RecordConstants
.HEADER_LENGTH
) {
177 int checksum
= blockBuffer
.getInt();
178 short length
= blockBuffer
.getShort();
179 RecordType type
= RecordType
.get(blockBuffer
.get());
180 if (length
> bytesToBlockEnd
|| length
< 0) {
181 throw new RecordReadException("Length is too large:" + length
);
185 blockBuffer
.limit(length
);
186 bytesRead
= input
.read(blockBuffer
);
187 if (bytesRead
!= length
) {
190 if (!isValidCrc(checksum
, blockBuffer
, type
.value())) {
191 throw new RecordReadException("Checksum doesn't validate.");
195 return new Record(type
, blockBuffer
);
199 * Moves to the start of the next block.
201 * @throws IOException
203 private void sync() throws IOException
{
204 long padLength
= RecordConstants
.BLOCK_SIZE
- (input
.position() % RecordConstants
.BLOCK_SIZE
);
205 input
.position(input
.position() + padLength
);
209 * Validates that the {@link Crc32c} validates.
211 * @param checksum the checksum in the record.
212 * @param data the {@link ByteBuffer} of the data in the record.
213 * @param type the byte representing the {@link RecordType} of the record.
214 * @return true if the {@link Crc32c} validates.
216 private static boolean isValidCrc(int checksum
, ByteBuffer data
, byte type
) {
217 if (checksum
== 0 && type
== 0 && data
.limit() == 0) {
220 Crc32c crc
= new Crc32c();
222 crc
.update(data
.array(), 0, data
.limit());
224 return unmaskCrc(checksum
) == crc
.getValue();
228 * Appends a {@link ByteBuffer} to another. This may modify the inputed buffer that will be
231 * @param to the {@link ByteBuffer} to append to.
232 * @param from the {@link ByteBuffer} to append.
233 * @return the resulting appended {@link ByteBuffer}
235 private static ByteBuffer
appendToBuffer(ByteBuffer to
, ByteBuffer from
) {
236 if (to
.remaining() < from
.remaining()) {
237 int capacity
= to
.capacity();
238 while (capacity
- to
.position() < from
.remaining()) {
241 ByteBuffer newBuffer
= ByteBuffer
.allocate(capacity
);
245 to
.order(ByteOrder
.LITTLE_ENDIAN
);