1 // Copyright 2011 Google Inc. All rights reserved.
3 package com
.google
.appengine
.api
.files
;
5 import static com
.google
.appengine
.api
.files
.RecordConstants
.unmaskCrc
;
7 import com
.google
.appengine
.api
.files
.RecordConstants
.RecordType
;
9 import java
.io
.IOException
;
10 import java
.nio
.ByteBuffer
;
11 import java
.nio
.ByteOrder
;
12 import java
.util
.logging
.Logger
;
14 final class RecordReadChannelImpl
implements RecordReadChannel
{
15 private Logger log
= Logger
.getLogger(RecordReadChannelImpl
.class.getName());
17 static final class RecordReadException
extends Exception
{
18 public RecordReadException(String errorMessage
) {
23 private static final class Record
{
24 private final ByteBuffer data
;
25 private final RecordType type
;
27 public Record(RecordType type
, ByteBuffer data
) {
32 public ByteBuffer
data() {
36 public RecordType
type() {
40 private final Object lock
= new Object();
41 private final FileReadChannel input
;
42 private ByteBuffer blockBuffer
;
43 private ByteBuffer finalRecord
;
46 * @param input a {@link FileReadChannel} that holds Records to read from.
48 RecordReadChannelImpl(FileReadChannel input
) {
50 blockBuffer
= ByteBuffer
.allocate(RecordConstants
.BLOCK_SIZE
);
51 blockBuffer
.order(ByteOrder
.LITTLE_ENDIAN
);
52 finalRecord
= ByteBuffer
.allocate(RecordConstants
.BLOCK_SIZE
);
53 finalRecord
.order(ByteOrder
.LITTLE_ENDIAN
);
60 public ByteBuffer
readRecord() throws IOException
{
63 RecordType lastRead
= RecordType
.NONE
;
66 Record record
= readPhysicalRecord();
70 switch (record
.type()) {
72 validateRemainderIsEmpty();
75 if (lastRead
!= RecordType
.NONE
) {
76 throw new RecordReadException("Invalid RecordType: " + record
.type
);
78 return record
.data().slice();
80 if (lastRead
!= RecordType
.NONE
) {
81 throw new RecordReadException("Invalid RecordType: " + record
.type
);
83 finalRecord
= appendToBuffer(finalRecord
, record
.data());
86 if (lastRead
== RecordType
.NONE
) {
87 throw new RecordReadException("Invalid RecordType: " + record
.type
);
89 finalRecord
= appendToBuffer(finalRecord
, record
.data());
92 if (lastRead
== RecordType
.NONE
) {
93 throw new RecordReadException("Invalid RecordType: " + record
.type
);
95 finalRecord
= appendToBuffer(finalRecord
, record
.data());
97 return finalRecord
.slice();
99 throw new RecordReadException("Invalid RecordType: " + record
.type
.value());
101 lastRead
= record
.type();
102 } catch (RecordReadException e
) {
103 log
.warning(e
.getMessage());
111 private void validateRemainderIsEmpty() throws IOException
, RecordReadException
{
112 int bytesToBlockEnd
=
113 (int) (RecordConstants
.BLOCK_SIZE
- (input
.position() % RecordConstants
.BLOCK_SIZE
));
115 blockBuffer
.limit(bytesToBlockEnd
);
116 int read
= input
.read(blockBuffer
);
117 if (read
!= bytesToBlockEnd
) {
118 throw new RecordReadException(
119 "There are " + bytesToBlockEnd
+ " but " + read
+ " were read.");
122 for (int i
= 0; i
< bytesToBlockEnd
; i
++) {
123 byte b
= blockBuffer
.get(i
);
125 throw new RecordReadException("Found a non-zero byte: " + b
126 + " before the end of the block " + i
127 + " bytes after encountering a RecordType of NONE");
135 * @throws IOException
138 public long position() throws IOException
{
139 synchronized (lock
) {
140 return input
.position();
148 public void position(long newPosition
) throws IOException
{
149 synchronized (lock
) {
150 input
.position(newPosition
);
155 * Reads the next record from the RecordIO data stream.
157 * @return Record data about the physical record read.
158 * @throws IOException
160 private Record
readPhysicalRecord() throws IOException
, RecordReadException
{
161 int bytesToBlockEnd
=
162 (int) (RecordConstants
.BLOCK_SIZE
- (input
.position() % RecordConstants
.BLOCK_SIZE
));
164 if (bytesToBlockEnd
< RecordConstants
.HEADER_LENGTH
) {
165 return new Record(RecordType
.NONE
, null);
169 blockBuffer
.limit(RecordConstants
.HEADER_LENGTH
);
170 int bytesRead
= input
.read(blockBuffer
);
171 if (bytesRead
!= RecordConstants
.HEADER_LENGTH
) {
175 int checksum
= blockBuffer
.getInt();
176 short length
= blockBuffer
.getShort();
177 RecordType type
= RecordType
.get(blockBuffer
.get());
178 if (length
> bytesToBlockEnd
|| length
< 0) {
179 throw new RecordReadException("Length is too large:" + length
);
183 blockBuffer
.limit(length
);
184 bytesRead
= input
.read(blockBuffer
);
185 if (bytesRead
!= length
) {
188 if (!isValidCrc(checksum
, blockBuffer
, type
.value())) {
189 throw new RecordReadException("Checksum doesn't validate.");
193 return new Record(type
, blockBuffer
);
197 * Moves to the start of the next block.
199 * @throws IOException
201 private void sync() throws IOException
{
202 long padLength
= RecordConstants
.BLOCK_SIZE
- (input
.position() % RecordConstants
.BLOCK_SIZE
);
203 input
.position(input
.position() + padLength
);
207 * Validates that the {@link Crc32c} validates.
209 * @param checksum the checksum in the record.
210 * @param data the {@link ByteBuffer} of the data in the record.
211 * @param type the byte representing the {@link RecordType} of the record.
212 * @return true if the {@link Crc32c} validates.
214 private static boolean isValidCrc(int checksum
, ByteBuffer data
, byte type
) {
215 if (checksum
== 0 && type
== 0 && data
.limit() == 0) {
218 Crc32c crc
= new Crc32c();
220 crc
.update(data
.array(), 0, data
.limit());
222 return unmaskCrc(checksum
) == crc
.getValue();
226 * Appends a {@link ByteBuffer} to another. This may modify the inputed buffer that will be
229 * @param to the {@link ByteBuffer} to append to.
230 * @param from the {@link ByteBuffer} to append.
231 * @return the resulting appended {@link ByteBuffer}
233 private static ByteBuffer
appendToBuffer(ByteBuffer to
, ByteBuffer from
) {
234 if (to
.remaining() < from
.remaining()) {
235 int capacity
= to
.capacity();
236 while (capacity
- to
.position() < from
.remaining()) {
239 ByteBuffer newBuffer
= ByteBuffer
.allocate(capacity
);
243 to
.order(ByteOrder
.LITTLE_ENDIAN
);