Revision created by MOE tool push_codebase.
[gae.git] / java / src / main / com / google / appengine / api / files / RecordWriteChannelImpl.java
blobe27541dffd707500c86177c07436ee0b0bafc61a
1 // Copyright 2011 Google Inc. All rights reserved.
3 package com.google.appengine.api.files;
5 import static com.google.appengine.api.files.RecordConstants.BLOCK_SIZE;
6 import static com.google.appengine.api.files.RecordConstants.HEADER_LENGTH;
7 import static com.google.appengine.api.files.RecordConstants.maskCrc;
9 import com.google.appengine.api.files.RecordConstants.RecordType;
10 import com.google.common.base.Preconditions;
12 import java.io.IOException;
13 import java.nio.ByteBuffer;
14 import java.nio.ByteOrder;
16 /**
17 * An implementation of a {@link RecordWriteChannel}.
20 final class RecordWriteChannelImpl implements RecordWriteChannel {
22 /**
23 * A class that holds information needed to write a physical record.
25 private static final class Record {
26 private final RecordType type;
27 private final int bytes;
29 private Record() {
30 type = RecordType.NONE;
31 bytes = 0;
34 private Record(RecordType type, int bytes) {
35 Preconditions.checkArgument(type != RecordType.UNKNOWN);
36 Preconditions.checkArgument(bytes >= 0);
37 this.type = type;
38 this.bytes = bytes;
41 /**
42 * Returns the number of bytes that needs to be written.
44 * @return the number of bytes.
46 int getBytes() {
47 return bytes;
50 /**
51 * Returns the type of record that needs to be written.
53 * @return the type.
55 RecordType getType() {
56 return type;
61 protected static final String LOWEST_SEQUENCE_KEY = "\u0000";
63 private final Object lock = new Object();
64 private final FileWriteChannel output;
65 private ByteBuffer writeBuffer;
66 private String nextSequenceKey;
67 private String lastSequenceKey;
69 /**
70 * @param output a {@link FileWriteChannel} to write the record to.
72 public RecordWriteChannelImpl(FileWriteChannel output) {
73 this.output = output;
74 writeBuffer = ByteBuffer.allocate(BLOCK_SIZE);
75 writeBuffer.order(ByteOrder.LITTLE_ENDIAN);
78 /**
79 * {@inheritDoc}
81 @Override
82 public int write(ByteBuffer data) throws IOException {
83 return write(data, null);
86 /**
87 * {@inheritDoc}
89 @Override
90 public boolean isOpen() {
91 synchronized (lock) {
92 return output.isOpen();
96 /**
97 * Verify the sequence key is strictly ascending. On the first non-null key, it will retrieve the
98 * latest sequence key from the file service by issuing a write with no bytes using the lowest
99 * possible sequence key.
101 * @param sequenceKey the sequence key to be checked
102 * @throws IOException if there is a problem retrieving the current sequence key from the file
103 * service
104 * @throws KeyOrderingException if the key provided is not strictly ascending from prior records
106 private void verifySequenceKey(String sequenceKey) throws IOException, KeyOrderingException {
107 if (sequenceKey != null) {
108 if (lastSequenceKey == null) {
109 ByteBuffer emptyData = ByteBuffer.wrap(new byte[0]);
110 try {
111 output.write(emptyData, LOWEST_SEQUENCE_KEY);
112 } catch (KeyOrderingException exception) {
113 lastSequenceKey = exception.getLastGoodSequenceKey();
117 if ((lastSequenceKey != null) && (sequenceKey.compareTo(lastSequenceKey) <= 0)) {
118 throw new KeyOrderingException(null, lastSequenceKey);
120 lastSequenceKey = sequenceKey;
125 * {@inheritDoc}
127 @Override
128 public int write(ByteBuffer data, String sequenceKey) throws IOException {
129 synchronized (lock) {
130 verifySequenceKey(sequenceKey);
131 if ((sequenceKey != null) && (sequenceKey.equals(LOWEST_SEQUENCE_KEY))) {
132 sequenceKey = null;
135 int bytesWritten = 0;
136 Record lastRecord = new Record();
138 do {
139 int bytesToBlockEnd = writeBuffer.remaining();
140 Record currentRecord = createRecord(data, bytesToBlockEnd, lastRecord);
141 writePhysicalRecord(data, currentRecord);
142 bytesWritten += currentRecord.getBytes() + HEADER_LENGTH;
143 lastRecord = currentRecord;
145 if ((lastRecord.getType() == RecordType.FULL)
146 || (lastRecord.getType() == RecordType.LAST)) {
147 nextSequenceKey = sequenceKey;
150 bytesToBlockEnd = writeBuffer.remaining();
151 if ((bytesToBlockEnd < HEADER_LENGTH) && (bytesToBlockEnd > 0)) {
152 writeBlanks(bytesToBlockEnd);
153 bytesWritten += bytesToBlockEnd;
154 bytesToBlockEnd = 0;
157 if (bytesToBlockEnd == 0) {
158 writeBuffer.flip();
159 output.write(writeBuffer, nextSequenceKey);
160 writeBuffer.clear();
161 nextSequenceKey = null;
163 } while (data.hasRemaining());
164 return bytesWritten;
169 * {@inheritDoc}
171 @Override
172 public void closeFinally() throws IllegalStateException, IOException {
173 synchronized (lock) {
174 closeStream(false);
175 output.closeFinally();
180 * Closes and finalizes the {@link RecordWriteChannel}.
182 @Override
183 public void close() throws IOException {
184 synchronized (lock) {
185 closeStream(true);
186 output.close();
191 * Fills a {@link Record} object with data about the physical record to write.
193 * @param data the users data.
194 * @param bytesToBlockEnd remaining bytes in the current block.
195 * @param lastRecord a {@link Record} representing the last physical record written.
196 * @return the {@link Record} with new write data.
198 private static Record createRecord(ByteBuffer data, int bytesToBlockEnd, Record lastRecord) {
199 int bytesToDataEnd = data.remaining();
200 RecordType type = RecordType.UNKNOWN;
201 int bytes = -1;
202 if ((lastRecord.getType() == RecordType.NONE)
203 && ((bytesToDataEnd + HEADER_LENGTH) <= bytesToBlockEnd)) {
204 type = RecordType.FULL;
205 bytes = bytesToDataEnd;
206 } else if (lastRecord.getType() == RecordType.NONE) {
207 type = RecordType.FIRST;
208 bytes = bytesToBlockEnd - HEADER_LENGTH;
209 } else if (bytesToDataEnd <= bytesToBlockEnd) {
210 type = RecordType.LAST;
211 bytes = bytesToDataEnd;
212 } else {
213 type = RecordType.MIDDLE;
214 bytes = bytesToBlockEnd - HEADER_LENGTH;
216 return new Record(type, bytes);
220 * This method creates a record inside of a {@link ByteBuffer}
222 * @param data The data to output.
223 * @param record A {@link RecordWriteChannelImpl.Record} object that describes
224 * which data to write.
226 private void writePhysicalRecord(ByteBuffer data, Record record) {
227 writeBuffer.putInt(generateCrc(data.array(), data.position(), record.getBytes(),
228 record.getType()));
229 writeBuffer.putShort((short) record.getBytes());
230 writeBuffer.put(record.getType().value());
231 writeBuffer.put(data.array(), data.position(), record.getBytes());
232 data.position(data.position() + record.getBytes());
236 * Fills the {@link ByteBuffer} with 0x00;
238 * @param numBlanks the number of bytes to pad.
240 private void writeBlanks(int numBlanks) {
241 for (int i = 0; i < numBlanks; i++) {
242 writeBuffer.put((byte) 0x00);
247 * Generates a CRC32C checksum using {@link Crc32c} for a specific record.
249 * @param data The user data over which the checksum will be generated.
250 * @param off The offset into the user data at which to begin the computation.
251 * @param len The length of user data to use in the computation.
252 * @param type The {@link RecordType} of the record, which is included in the
253 * checksum.
254 * @return the masked checksum.
256 private int generateCrc(byte[] data, int off, int len, RecordType type) {
257 Crc32c crc = new Crc32c();
258 crc.update(type.value());
259 crc.update(data, off, len);
260 return (int) maskCrc(crc.getValue());
264 * Closes the stream and adds padding to the end of the block without closing
265 * the underlying {@link AppEngineFile}.
267 * @throws IOException
269 private void closeStream(boolean pad) throws IOException {
270 int bytesToBlockEnd = writeBuffer.remaining();
271 if (bytesToBlockEnd < BLOCK_SIZE) {
272 if (pad) {
273 writeBlanks(bytesToBlockEnd);
275 writeBuffer.flip();
276 output.write(writeBuffer, nextSequenceKey);
277 nextSequenceKey = null;
278 writeBuffer.clear();