App Engine Java SDK version 1.7.0
[gae.git] / java / src / main / com / google / appengine / api / files / RecordWriteChannelImpl.java
blob867f16c977c3f3e406f1a9c37d566ebab02c3a95
1 // Copyright 2011 Google Inc. All rights reserved.
3 package com.google.appengine.api.files;
5 import static com.google.appengine.api.files.RecordConstants.BLOCK_SIZE;
6 import static com.google.appengine.api.files.RecordConstants.HEADER_LENGTH;
7 import static com.google.appengine.api.files.RecordConstants.maskCrc;
9 import com.google.appengine.api.files.RecordConstants.RecordType;
10 import com.google.common.base.Preconditions;
12 import java.io.IOException;
13 import java.nio.ByteBuffer;
14 import java.nio.ByteOrder;
16 /**
17 * An implementation of a {@link RecordWriteChannel}.
20 final class RecordWriteChannelImpl implements RecordWriteChannel {
22 /**
23 * A class that holds information needed to write a physical record.
25 private static final class Record {
26 private final RecordType type;
27 private final int bytes;
29 private Record() {
30 type = RecordType.NONE;
31 bytes = 0;
34 private Record(RecordType type, int bytes) {
35 Preconditions.checkArgument(type != RecordType.UNKNOWN);
36 Preconditions.checkArgument(bytes >= 0);
37 this.type = type;
38 this.bytes = bytes;
41 /**
42 * Returns the number of bytes that needs to be written.
44 * @return the number of bytes.
46 int getBytes() {
47 return bytes;
50 /**
51 * Returns the type of record that needs to be written.
53 * @return the type.
55 RecordType getType() {
56 return type;
61 private final FileWriteChannel output;
62 private int position;
63 private ByteBuffer writeBuffer;
65 /**
66 * @param output a {@link FileWriteChannel} to write the record to.
68 public RecordWriteChannelImpl(FileWriteChannel output) {
69 this.output = output;
70 position = 0;
71 writeBuffer = ByteBuffer.allocate(BLOCK_SIZE);
72 writeBuffer.order(ByteOrder.LITTLE_ENDIAN);
75 /**
76 * {@inheritDoc}
78 @Override
79 public int write(ByteBuffer data) throws IOException {
80 return write(data, null);
83 /**
84 * {@inheritDoc}
86 @Override
87 public boolean isOpen() {
88 return output.isOpen();
91 /**
92 * {@inheritDoc}
94 @Override
95 public int write(ByteBuffer data, String sequenceKey) throws IOException {
96 writeBuffer.clear();
97 extendBufferIfNecessary(data);
98 int oldPosition = position;
99 Record lastRecord = new Record();
100 do {
101 Record currentRecord = createRecord(data, lastRecord);
102 if (currentRecord.getType() == RecordType.NONE) {
103 writeBlanks(currentRecord.getBytes());
104 } else {
105 writePhysicalRecord(data, currentRecord);
107 position = oldPosition + writeBuffer.position();
108 lastRecord = currentRecord;
109 } while (data.hasRemaining());
111 writeBuffer.flip();
112 if (sequenceKey == null) {
113 return output.write(writeBuffer);
115 return output.write(writeBuffer, sequenceKey);
119 * {@inheritDoc}
121 @Override
122 public void closeFinally() throws IllegalStateException, IOException {
123 closeStream();
124 output.closeFinally();
128 * Closes and finalizes the {@link RecordWriteChannel}.
130 @Override
131 public void close() throws IOException {
132 closeStream();
133 output.close();
137 * Fills a {@link Record} object with data about the physical record to write.
139 * @param data the users data.
140 * @param lastRecord a {@link Record} representing the last physical record written.
141 * @return the {@link Record} with new write data.
143 private Record createRecord(ByteBuffer data, Record lastRecord) {
144 int bytesToBlockEnd = BLOCK_SIZE - (position % BLOCK_SIZE);
145 int minBytesToWrite = data.limit() + HEADER_LENGTH - data.position();
146 RecordType type = RecordType.UNKNOWN;
147 int bytes = -1;
148 if (bytesToBlockEnd < HEADER_LENGTH) {
149 type = lastRecord.getType();
150 bytes = bytesToBlockEnd;
151 } else if (lastRecord.getType() == RecordType.NONE && minBytesToWrite <= bytesToBlockEnd) {
152 type = RecordType.FULL;
153 bytes = minBytesToWrite - HEADER_LENGTH;
154 } else if (lastRecord.getType() == RecordType.NONE) {
155 type = RecordType.FIRST;
156 bytes = bytesToBlockEnd - HEADER_LENGTH;
157 } else if (minBytesToWrite <= bytesToBlockEnd) {
158 type = RecordType.LAST;
159 bytes = data.limit() - data.position();
160 } else {
161 type = RecordType.MIDDLE;
162 bytes = bytesToBlockEnd - HEADER_LENGTH;
164 return new Record(type, bytes);
168 * This method creates a record inside of a {@link ByteBuffer}
170 * @param data The data to output.
171 * @param record A {@link RecordWriteChannelImpl.Record} object that describes
172 * which data to write.
174 private void writePhysicalRecord(ByteBuffer data, Record record) {
175 writeBuffer.putInt(generateCrc(data.array(), data.position(), record.getBytes(),
176 record.getType()));
177 writeBuffer.putShort((short) record.getBytes());
178 writeBuffer.put(record.getType().value());
179 writeBuffer.put(data.array(), data.position(), record.getBytes());
180 data.position(data.position() + record.getBytes());
184 * Fills the {@link ByteBuffer} with 0x00;
186 * @param numBlanks the number of bytes to pad.
188 private void writeBlanks(int numBlanks) {
189 for (int i = 0; i < numBlanks; i++) {
190 writeBuffer.put((byte) 0x00);
195 * Generates a CRC32C checksum using {@link Crc32c} for a specific record.
197 * @param data The user data over which the checksum will be generated.
198 * @param off The offset into the user data at which to begin the computation.
199 * @param len The length of user data to use in the computation.
200 * @param type The {@link RecordType} of the record, which is included in the
201 * checksum.
202 * @return the masked checksum.
204 private int generateCrc(byte[] data, int off, int len, RecordType type) {
205 Crc32c crc = new Crc32c();
206 crc.update(type.value());
207 crc.update(data, off, len);
208 return (int) maskCrc(crc.getValue());
211 private void extendBufferIfNecessary(ByteBuffer record) {
212 int maxNumHeaders = 1 + (int) Math.ceil(record.limit() / (BLOCK_SIZE - HEADER_LENGTH));
213 int maxRecordSize = record.limit() + maxNumHeaders * HEADER_LENGTH;
214 int capacity = writeBuffer.capacity();
215 while (capacity < maxRecordSize) {
216 capacity *= 2;
218 writeBuffer = ByteBuffer.allocate(capacity);
219 writeBuffer.order(ByteOrder.LITTLE_ENDIAN);
224 * Closes the stream and adds padding to the end of the block without closing
225 * the underlying {@link AppEngineFile}.
227 * @throws IOException
229 private void closeStream() throws IOException {
230 writeBuffer.clear();
231 int bytesToBlockEnd = BLOCK_SIZE - (position % BLOCK_SIZE);
232 if (bytesToBlockEnd == BLOCK_SIZE) {
233 return;
235 writeBlanks(bytesToBlockEnd);
236 writeBuffer.flip();
237 position += writeBuffer.limit();
238 output.write(writeBuffer);