Revision created by MOE tool push_codebase.
[gae.git] / java / src / main / com / google / appengine / api / files / RecordReadChannelImpl.java
blob8e15f7f4e7452624e6774a567a73e59c2bf9dc68
1 // Copyright 2011 Google Inc. All rights reserved.
3 package com.google.appengine.api.files;
5 import static com.google.appengine.api.files.RecordConstants.unmaskCrc;
7 import com.google.appengine.api.files.RecordConstants.RecordType;
9 import java.io.IOException;
10 import java.nio.ByteBuffer;
11 import java.nio.ByteOrder;
12 import java.util.logging.Logger;
14 final class RecordReadChannelImpl implements RecordReadChannel {
15 private Logger log = Logger.getLogger(RecordReadChannelImpl.class.getName());
17 static final class RecordReadException extends Exception {
18 public RecordReadException(String errorMessage) {
19 super(errorMessage);
23 private static final class Record {
24 private final ByteBuffer data;
25 private final RecordType type;
27 public Record(RecordType type, ByteBuffer data) {
28 this.type = type;
29 this.data = data;
32 public ByteBuffer data() {
33 return this.data;
36 public RecordType type() {
37 return this.type;
40 private final Object lock = new Object();
41 private final FileReadChannel input;
42 private ByteBuffer blockBuffer;
43 private ByteBuffer finalRecord;
45 /**
46 * @param input a {@link FileReadChannel} that holds Records to read from.
48 RecordReadChannelImpl(FileReadChannel input) {
49 this.input = input;
50 blockBuffer = ByteBuffer.allocate(RecordConstants.BLOCK_SIZE);
51 blockBuffer.order(ByteOrder.LITTLE_ENDIAN);
52 finalRecord = ByteBuffer.allocate(RecordConstants.BLOCK_SIZE);
53 finalRecord.order(ByteOrder.LITTLE_ENDIAN);
56 /**
57 * {@inheritDoc}
59 @Override
60 public ByteBuffer readRecord() throws IOException {
61 synchronized (lock) {
62 finalRecord.clear();
63 RecordType lastRead = RecordType.NONE;
64 while (true) {
65 try {
66 Record record = readPhysicalRecord();
67 if (record == null) {
68 return null;
70 switch (record.type()) {
71 case NONE:
72 validateRemainderIsEmpty();
73 break;
74 case FULL:
75 if (lastRead != RecordType.NONE) {
76 throw new RecordReadException("Invalid RecordType: " + record.type);
78 return record.data().slice();
79 case FIRST:
80 if (lastRead != RecordType.NONE) {
81 throw new RecordReadException("Invalid RecordType: " + record.type);
83 finalRecord = appendToBuffer(finalRecord, record.data());
84 break;
85 case MIDDLE:
86 if (lastRead == RecordType.NONE) {
87 throw new RecordReadException("Invalid RecordType: " + record.type);
89 finalRecord = appendToBuffer(finalRecord, record.data());
90 break;
91 case LAST:
92 if (lastRead == RecordType.NONE) {
93 throw new RecordReadException("Invalid RecordType: " + record.type);
95 finalRecord = appendToBuffer(finalRecord, record.data());
96 finalRecord.flip();
97 return finalRecord.slice();
98 default:
99 throw new RecordReadException("Invalid RecordType: " + record.type.value());
101 lastRead = record.type();
102 } catch (RecordReadException e) {
103 log.warning(e.getMessage());
104 finalRecord.clear();
105 sync();
111 private void validateRemainderIsEmpty() throws IOException, RecordReadException {
112 int bytesToBlockEnd =
113 (int) (RecordConstants.BLOCK_SIZE - (input.position() % RecordConstants.BLOCK_SIZE));
114 blockBuffer.clear();
115 blockBuffer.limit(bytesToBlockEnd);
116 int read = input.read(blockBuffer);
117 if (read != bytesToBlockEnd) {
118 throw new RecordReadException(
119 "There are " + bytesToBlockEnd + " but " + read + " were read.");
121 blockBuffer.flip();
122 for (int i = 0; i < bytesToBlockEnd; i++) {
123 byte b = blockBuffer.get(i);
124 if (b != 0) {
125 throw new RecordReadException("Found a non-zero byte: " + b
126 + " before the end of the block " + i
127 + " bytes after encountering a RecordType of NONE");
133 * {@inheritDoc}
135 * @throws IOException
137 @Override
138 public long position() throws IOException {
139 synchronized (lock) {
140 return input.position();
145 * {@inheritDoc}
147 @Override
148 public void position(long newPosition) throws IOException {
149 synchronized (lock) {
150 input.position(newPosition);
155 * Reads the next record from the RecordIO data stream.
157 * @return Record data about the physical record read.
158 * @throws IOException
160 private Record readPhysicalRecord() throws IOException, RecordReadException {
161 int bytesToBlockEnd =
162 (int) (RecordConstants.BLOCK_SIZE - (input.position() % RecordConstants.BLOCK_SIZE));
164 if (bytesToBlockEnd < RecordConstants.HEADER_LENGTH) {
165 return new Record(RecordType.NONE, null);
168 blockBuffer.clear();
169 blockBuffer.limit(RecordConstants.HEADER_LENGTH);
170 int bytesRead = input.read(blockBuffer);
171 if (bytesRead != RecordConstants.HEADER_LENGTH) {
172 return null;
174 blockBuffer.flip();
175 int checksum = blockBuffer.getInt();
176 short length = blockBuffer.getShort();
177 RecordType type = RecordType.get(blockBuffer.get());
178 if (length > bytesToBlockEnd || length < 0) {
179 throw new RecordReadException("Length is too large:" + length);
182 blockBuffer.clear();
183 blockBuffer.limit(length);
184 bytesRead = input.read(blockBuffer);
185 if (bytesRead != length) {
186 return null;
188 if (!isValidCrc(checksum, blockBuffer, type.value())) {
189 throw new RecordReadException("Checksum doesn't validate.");
192 blockBuffer.flip();
193 return new Record(type, blockBuffer);
197 * Moves to the start of the next block.
199 * @throws IOException
201 private void sync() throws IOException {
202 long padLength = RecordConstants.BLOCK_SIZE - (input.position() % RecordConstants.BLOCK_SIZE);
203 input.position(input.position() + padLength);
207 * Validates that the {@link Crc32c} validates.
209 * @param checksum the checksum in the record.
210 * @param data the {@link ByteBuffer} of the data in the record.
211 * @param type the byte representing the {@link RecordType} of the record.
212 * @return true if the {@link Crc32c} validates.
214 private static boolean isValidCrc(int checksum, ByteBuffer data, byte type) {
215 if (checksum == 0 && type == 0 && data.limit() == 0) {
216 return true;
218 Crc32c crc = new Crc32c();
219 crc.update(type);
220 crc.update(data.array(), 0, data.limit());
222 return unmaskCrc(checksum) == crc.getValue();
226 * Appends a {@link ByteBuffer} to another. This may modify the inputed buffer that will be
227 * appended to.
229 * @param to the {@link ByteBuffer} to append to.
230 * @param from the {@link ByteBuffer} to append.
231 * @return the resulting appended {@link ByteBuffer}
233 private static ByteBuffer appendToBuffer(ByteBuffer to, ByteBuffer from) {
234 if (to.remaining() < from.remaining()) {
235 int capacity = to.capacity();
236 while (capacity - to.position() < from.remaining()) {
237 capacity *= 2;
239 ByteBuffer newBuffer = ByteBuffer.allocate(capacity);
240 to.flip();
241 newBuffer.put(to);
242 to = newBuffer;
243 to.order(ByteOrder.LITTLE_ENDIAN);
245 to.put(from);
246 return to;