App Engine Java SDK version 1.7.0
[gae.git] / java / src / main / com / google / appengine / api / files / RecordReadChannelImpl.java
blobe5ce709111755210dd81f3e94ed6a8f275aa8017
1 // Copyright 2011 Google Inc. All rights reserved.
3 package com.google.appengine.api.files;
5 import static com.google.appengine.api.files.RecordConstants.unmaskCrc;
7 import com.google.appengine.api.files.RecordConstants.RecordType;
9 import java.io.IOException;
10 import java.nio.ByteBuffer;
11 import java.nio.ByteOrder;
12 import java.util.logging.Logger;
14 final class RecordReadChannelImpl implements RecordReadChannel {
15 private Logger log = Logger.getLogger(RecordReadChannelImpl.class.getName());
17 static final class RecordReadException extends Exception {
18 public RecordReadException(String errorMessage) {
19 super(errorMessage);
23 private static final class Record {
24 private final ByteBuffer data;
25 private final RecordType type;
27 public Record(RecordType type, ByteBuffer data) {
28 this.type = type;
29 this.data = data;
31 public ByteBuffer data() {
32 return this.data;
34 public RecordType type() {
35 return this.type;
39 private final FileReadChannel input;
40 private ByteBuffer blockBuffer;
41 private ByteBuffer finalRecord;
43 /**
44 * @param input a {@link FileReadChannel} that holds Records to read from.
46 RecordReadChannelImpl(FileReadChannel input) {
47 this.input = input;
48 blockBuffer = ByteBuffer.allocate(RecordConstants.BLOCK_SIZE);
49 blockBuffer.order(ByteOrder.LITTLE_ENDIAN);
50 finalRecord = ByteBuffer.allocate(RecordConstants.BLOCK_SIZE);
51 finalRecord.order(ByteOrder.LITTLE_ENDIAN);
54 /**
55 * {@inheritDoc}
57 @Override
58 public ByteBuffer readRecord() throws IOException {
59 finalRecord.clear();
60 RecordType lastRead = RecordType.NONE;
61 while (true) {
62 try {
63 Record record = readPhysicalRecord();
64 if (record == null) {
65 return null;
67 switch (record.type()) {
68 case NONE:
69 sync();
70 break;
71 case FULL:
72 if (lastRead != RecordType.NONE) {
73 throw new RecordReadException("Invalid RecordType: "
74 + record.type);
76 return record.data().slice();
77 case FIRST:
78 if (lastRead != RecordType.NONE) {
79 throw new RecordReadException("Invalid RecordType: "
80 + record.type);
82 finalRecord = appendToBuffer(finalRecord, record.data());
83 break;
84 case MIDDLE:
85 if (lastRead == RecordType.NONE) {
86 throw new RecordReadException("Invalid RecordType: "
87 + record.type);
89 finalRecord = appendToBuffer(finalRecord, record.data());
90 break;
91 case LAST:
92 if (lastRead == RecordType.NONE) {
93 throw new RecordReadException("Invalid RecordType: "
94 + record.type);
96 finalRecord = appendToBuffer(finalRecord, record.data());
97 finalRecord.flip();
98 return finalRecord.slice();
99 default:
100 throw new RecordReadException("Invalid RecordType: " + record.type.value());
102 lastRead = record.type();
103 } catch (RecordReadException e) {
104 log.warning(e.getMessage());
105 finalRecord.clear();
106 sync();
112 * {@inheritDoc}
114 @Override
115 public long position() throws IOException {
116 return input.position();
120 * {@inheritDoc}
122 @Override
123 public void position(long newPosition) throws IOException {
124 input.position(newPosition);
128 * Reads the next record from the RecordIO data stream.
130 * @return Record data about the physical record read.
131 * @throws IOException
133 private Record readPhysicalRecord()
134 throws IOException, RecordReadException {
135 int bytesToBlockEnd = (int) (RecordConstants.BLOCK_SIZE -
136 (input.position() % RecordConstants.BLOCK_SIZE));
138 if (bytesToBlockEnd < RecordConstants.HEADER_LENGTH) {
139 return new Record(RecordType.NONE, null);
142 blockBuffer.clear();
143 blockBuffer.limit(RecordConstants.HEADER_LENGTH);
144 int bytesRead = input.read(blockBuffer);
145 if (bytesRead != RecordConstants.HEADER_LENGTH) {
146 return null;
148 blockBuffer.flip();
149 int checksum = blockBuffer.getInt();
150 short length = blockBuffer.getShort();
151 RecordType type = RecordType.get(blockBuffer.get());
152 if (length > bytesToBlockEnd || length < 0) {
153 throw new RecordReadException("Length is too large:" + length);
156 blockBuffer.clear();
157 blockBuffer.limit(length);
158 bytesRead = input.read(blockBuffer);
159 if (bytesRead != length) {
160 return null;
162 if (!isValidCrc(checksum, blockBuffer, type.value())) {
163 throw new RecordReadException("Checksum doesn't validate.");
166 blockBuffer.flip();
167 return new Record(type, blockBuffer);
171 * Moves to the start of the next block.
172 * @throws IOException
174 private void sync() throws IOException {
175 long padLength = RecordConstants.BLOCK_SIZE -
176 (input.position() % RecordConstants.BLOCK_SIZE);
177 input.position(input.position() + padLength);
181 * Validates that the {@link Crc32c} validates.
182 * @param checksum the checksum in the record.
183 * @param data the {@link ByteBuffer} of the data in the record.
184 * @param type the byte representing the {@link RecordType} of the record.
185 * @return true if the {@link Crc32c} validates.
187 private static boolean isValidCrc(int checksum, ByteBuffer data, byte type) {
188 Crc32c crc = new Crc32c();
189 crc.update(type);
190 crc.update(data.array(), 0, data.limit());
192 return unmaskCrc(checksum) == crc.getValue();
196 * Appends a {@link ByteBuffer} to another. This may modify
197 * the inputed buffer that will be appended to.
198 * @param to the {@link ByteBuffer} to append to.
199 * @param from the {@link ByteBuffer} to append.
200 * @return the resulting appended {@link ByteBuffer}
202 private static ByteBuffer appendToBuffer(ByteBuffer to, ByteBuffer from) {
203 if (to.remaining() < from.remaining()) {
204 int capacity = to.capacity();
205 while (capacity - to.position() < from.remaining()) {
206 capacity *= 2;
208 ByteBuffer newBuffer = ByteBuffer.allocate(capacity);
209 to.flip();
210 newBuffer.put(to);
211 to = newBuffer;
212 to.order(ByteOrder.LITTLE_ENDIAN);
214 to.put(from);
215 return to;