1.9.30 sync.
[gae.git] / java / src / main / com / google / appengine / api / files / RecordReadChannelImpl.java
blobaa623b7ad0caad00b13d74b5f455013f0c58ceea
1 // Copyright 2011 Google Inc. All rights reserved.
3 package com.google.appengine.api.files;
5 import static com.google.appengine.api.files.RecordConstants.unmaskCrc;
7 import com.google.appengine.api.files.RecordConstants.RecordType;
9 import java.io.IOException;
10 import java.nio.ByteBuffer;
11 import java.nio.ByteOrder;
12 import java.util.logging.Level;
13 import java.util.logging.Logger;
15 @Deprecated
16 final class RecordReadChannelImpl implements RecordReadChannel {
17 private static final Logger log = Logger.getLogger(RecordReadChannelImpl.class.getName());
19 static final class RecordReadException extends Exception {
20 public RecordReadException(String errorMessage) {
21 super(errorMessage);
25 private static final class Record {
26 private final ByteBuffer data;
27 private final RecordType type;
29 public Record(RecordType type, ByteBuffer data) {
30 this.type = type;
31 this.data = data;
34 public ByteBuffer data() {
35 return this.data;
38 public RecordType type() {
39 return this.type;
42 private final Object lock = new Object();
43 private final FileReadChannel input;
44 private ByteBuffer blockBuffer;
45 private ByteBuffer finalRecord;
47 /**
48 * @param input a {@link FileReadChannel} that holds Records to read from.
50 RecordReadChannelImpl(FileReadChannel input) {
51 this.input = input;
52 blockBuffer = ByteBuffer.allocate(RecordConstants.BLOCK_SIZE);
53 blockBuffer.order(ByteOrder.LITTLE_ENDIAN);
54 finalRecord = ByteBuffer.allocate(RecordConstants.BLOCK_SIZE);
55 finalRecord.order(ByteOrder.LITTLE_ENDIAN);
58 /**
59 * {@inheritDoc}
61 @Override
62 public ByteBuffer readRecord() throws IOException {
63 synchronized (lock) {
64 finalRecord.clear();
65 RecordType lastRead = RecordType.NONE;
66 while (true) {
67 try {
68 Record record = readPhysicalRecord();
69 if (record == null) {
70 return null;
72 switch (record.type()) {
73 case NONE:
74 validateRemainderIsEmpty();
75 break;
76 case FULL:
77 if (lastRead != RecordType.NONE) {
78 throw new RecordReadException("Invalid RecordType: " + record.type);
80 return record.data().slice();
81 case FIRST:
82 if (lastRead != RecordType.NONE) {
83 throw new RecordReadException("Invalid RecordType: " + record.type);
85 finalRecord = appendToBuffer(finalRecord, record.data());
86 break;
87 case MIDDLE:
88 if (lastRead == RecordType.NONE) {
89 throw new RecordReadException("Invalid RecordType: " + record.type);
91 finalRecord = appendToBuffer(finalRecord, record.data());
92 break;
93 case LAST:
94 if (lastRead == RecordType.NONE) {
95 throw new RecordReadException("Invalid RecordType: " + record.type);
97 finalRecord = appendToBuffer(finalRecord, record.data());
98 finalRecord.flip();
99 return finalRecord.slice();
100 default:
101 throw new RecordReadException("Invalid RecordType: " + record.type.value());
103 lastRead = record.type();
104 } catch (RecordReadException e) {
105 log.log(Level.SEVERE, e.getMessage() + " At pos " + position() + " in " + input);
106 finalRecord.clear();
107 sync();
113 private void validateRemainderIsEmpty() throws IOException, RecordReadException {
114 int bytesToBlockEnd =
115 (int) (RecordConstants.BLOCK_SIZE - (input.position() % RecordConstants.BLOCK_SIZE));
116 blockBuffer.clear();
117 blockBuffer.limit(bytesToBlockEnd);
118 int read = input.read(blockBuffer);
119 if (read != bytesToBlockEnd) {
120 throw new RecordReadException(
121 "There are " + bytesToBlockEnd + " but " + read + " were read.");
123 blockBuffer.flip();
124 for (int i = 0; i < bytesToBlockEnd; i++) {
125 byte b = blockBuffer.get(i);
126 if (b != 0) {
127 throw new RecordReadException("Found a non-zero byte: " + b
128 + " before the end of the block " + i
129 + " bytes after encountering a RecordType of NONE");
135 * {@inheritDoc}
137 * @throws IOException
139 @Override
140 public long position() throws IOException {
141 synchronized (lock) {
142 return input.position();
147 * {@inheritDoc}
149 @Override
150 public void position(long newPosition) throws IOException {
151 synchronized (lock) {
152 input.position(newPosition);
157 * Reads the next record from the RecordIO data stream.
159 * @return Record data about the physical record read.
160 * @throws IOException
162 private Record readPhysicalRecord() throws IOException, RecordReadException {
163 int bytesToBlockEnd =
164 (int) (RecordConstants.BLOCK_SIZE - (input.position() % RecordConstants.BLOCK_SIZE));
166 if (bytesToBlockEnd < RecordConstants.HEADER_LENGTH) {
167 return new Record(RecordType.NONE, null);
170 blockBuffer.clear();
171 blockBuffer.limit(RecordConstants.HEADER_LENGTH);
172 int bytesRead = input.read(blockBuffer);
173 if (bytesRead != RecordConstants.HEADER_LENGTH) {
174 return null;
176 blockBuffer.flip();
177 int checksum = blockBuffer.getInt();
178 short length = blockBuffer.getShort();
179 RecordType type = RecordType.get(blockBuffer.get());
180 if (length > bytesToBlockEnd || length < 0) {
181 throw new RecordReadException("Length is too large:" + length);
184 blockBuffer.clear();
185 blockBuffer.limit(length);
186 bytesRead = input.read(blockBuffer);
187 if (bytesRead != length) {
188 return null;
190 if (!isValidCrc(checksum, blockBuffer, type.value())) {
191 throw new RecordReadException("Checksum doesn't validate.");
194 blockBuffer.flip();
195 return new Record(type, blockBuffer);
199 * Moves to the start of the next block.
201 * @throws IOException
203 private void sync() throws IOException {
204 long padLength = RecordConstants.BLOCK_SIZE - (input.position() % RecordConstants.BLOCK_SIZE);
205 input.position(input.position() + padLength);
209 * Validates that the {@link Crc32c} validates.
211 * @param checksum the checksum in the record.
212 * @param data the {@link ByteBuffer} of the data in the record.
213 * @param type the byte representing the {@link RecordType} of the record.
214 * @return true if the {@link Crc32c} validates.
216 private static boolean isValidCrc(int checksum, ByteBuffer data, byte type) {
217 if (checksum == 0 && type == 0 && data.limit() == 0) {
218 return true;
220 Crc32c crc = new Crc32c();
221 crc.update(type);
222 crc.update(data.array(), 0, data.limit());
224 return unmaskCrc(checksum) == crc.getValue();
228 * Appends a {@link ByteBuffer} to another. This may modify the inputed buffer that will be
229 * appended to.
231 * @param to the {@link ByteBuffer} to append to.
232 * @param from the {@link ByteBuffer} to append.
233 * @return the resulting appended {@link ByteBuffer}
235 private static ByteBuffer appendToBuffer(ByteBuffer to, ByteBuffer from) {
236 if (to.remaining() < from.remaining()) {
237 int capacity = to.capacity();
238 while (capacity - to.position() < from.remaining()) {
239 capacity *= 2;
241 ByteBuffer newBuffer = ByteBuffer.allocate(capacity);
242 to.flip();
243 newBuffer.put(to);
244 to = newBuffer;
245 to.order(ByteOrder.LITTLE_ENDIAN);
247 to.put(from);
248 return to;