1.9.30 sync.
[gae.git] / java / src / main / com / google / appengine / api / files / RecordWriteChannelImpl.java
blob2a7fae85b255a0d72792b17c32bfdfc701b3cea4
1 // Copyright 2011 Google Inc. All rights reserved.
3 package com.google.appengine.api.files;
5 import static com.google.appengine.api.files.RecordConstants.BLOCK_SIZE;
6 import static com.google.appengine.api.files.RecordConstants.HEADER_LENGTH;
7 import static com.google.appengine.api.files.RecordConstants.maskCrc;
9 import com.google.appengine.api.files.RecordConstants.RecordType;
10 import com.google.common.base.Preconditions;
12 import java.io.IOException;
13 import java.nio.ByteBuffer;
14 import java.nio.ByteOrder;
16 /**
17 * An implementation of a {@link RecordWriteChannel}.
20 @Deprecated
21 final class RecordWriteChannelImpl implements RecordWriteChannel {
23 /**
24 * A class that holds information needed to write a physical record.
26 private static final class Record {
27 private final RecordType type;
28 private final int bytes;
30 private Record() {
31 type = RecordType.NONE;
32 bytes = 0;
35 private Record(RecordType type, int bytes) {
36 Preconditions.checkArgument(type != RecordType.UNKNOWN);
37 Preconditions.checkArgument(bytes >= 0);
38 this.type = type;
39 this.bytes = bytes;
42 /**
43 * Returns the number of bytes that needs to be written.
45 * @return the number of bytes.
47 int getBytes() {
48 return bytes;
51 /**
52 * Returns the type of record that needs to be written.
54 * @return the type.
56 RecordType getType() {
57 return type;
62 protected static final String LOWEST_SEQUENCE_KEY = "\u0000";
64 private final Object lock = new Object();
65 private final FileWriteChannel output;
66 private ByteBuffer writeBuffer;
67 private String nextSequenceKey;
68 private String lastSequenceKey;
70 /**
71 * @param output a {@link FileWriteChannel} to write the record to.
73 public RecordWriteChannelImpl(FileWriteChannel output) {
74 this.output = output;
75 writeBuffer = ByteBuffer.allocate(BLOCK_SIZE);
76 writeBuffer.order(ByteOrder.LITTLE_ENDIAN);
79 /**
80 * {@inheritDoc}
82 @Override
83 public int write(ByteBuffer data) throws IOException {
84 return write(data, null);
87 /**
88 * {@inheritDoc}
90 @Override
91 public boolean isOpen() {
92 synchronized (lock) {
93 return output.isOpen();
97 /**
98 * Verify the sequence key is strictly ascending. On the first non-null key, it will retrieve the
99 * latest sequence key from the file service by issuing a write with no bytes using the lowest
100 * possible sequence key.
102 * @param sequenceKey the sequence key to be checked
103 * @throws IOException if there is a problem retrieving the current sequence key from the file
104 * service
105 * @throws KeyOrderingException if the key provided is not strictly ascending from prior records
107 private void verifySequenceKey(String sequenceKey) throws IOException, KeyOrderingException {
108 if (sequenceKey != null) {
109 if (lastSequenceKey == null) {
110 ByteBuffer emptyData = ByteBuffer.wrap(new byte[0]);
111 try {
112 output.write(emptyData, LOWEST_SEQUENCE_KEY);
113 } catch (KeyOrderingException exception) {
114 lastSequenceKey = exception.getLastGoodSequenceKey();
118 if ((lastSequenceKey != null) && (sequenceKey.compareTo(lastSequenceKey) <= 0)) {
119 throw new KeyOrderingException(null, lastSequenceKey);
121 lastSequenceKey = sequenceKey;
126 * {@inheritDoc}
128 @Override
129 public int write(ByteBuffer data, String sequenceKey) throws IOException {
130 synchronized (lock) {
131 verifySequenceKey(sequenceKey);
132 if ((sequenceKey != null) && (sequenceKey.equals(LOWEST_SEQUENCE_KEY))) {
133 sequenceKey = null;
136 int bytesWritten = 0;
137 Record lastRecord = new Record();
139 do {
140 int bytesToBlockEnd = writeBuffer.remaining();
141 Record currentRecord = createRecord(data, bytesToBlockEnd, lastRecord);
142 writePhysicalRecord(data, currentRecord);
143 bytesWritten += currentRecord.getBytes() + HEADER_LENGTH;
144 lastRecord = currentRecord;
146 if ((lastRecord.getType() == RecordType.FULL)
147 || (lastRecord.getType() == RecordType.LAST)) {
148 nextSequenceKey = sequenceKey;
151 bytesToBlockEnd = writeBuffer.remaining();
152 if ((bytesToBlockEnd < HEADER_LENGTH) && (bytesToBlockEnd > 0)) {
153 writeBlanks(bytesToBlockEnd);
154 bytesWritten += bytesToBlockEnd;
155 bytesToBlockEnd = 0;
158 if (bytesToBlockEnd == 0) {
159 writeBuffer.flip();
160 output.write(writeBuffer, nextSequenceKey);
161 writeBuffer.clear();
162 nextSequenceKey = null;
164 } while (data.hasRemaining());
165 return bytesWritten;
170 * {@inheritDoc}
172 @Override
173 public void closeFinally() throws IllegalStateException, IOException {
174 synchronized (lock) {
175 closeStream(false);
176 output.closeFinally();
181 * Closes and finalizes the {@link RecordWriteChannel}.
183 @Override
184 public void close() throws IOException {
185 synchronized (lock) {
186 closeStream(true);
187 output.close();
192 * Fills a {@link Record} object with data about the physical record to write.
194 * @param data the users data.
195 * @param bytesToBlockEnd remaining bytes in the current block.
196 * @param lastRecord a {@link Record} representing the last physical record written.
197 * @return the {@link Record} with new write data.
199 private static Record createRecord(ByteBuffer data, int bytesToBlockEnd, Record lastRecord) {
200 int bytesToDataEnd = data.remaining();
201 RecordType type = RecordType.UNKNOWN;
202 int bytes = -1;
203 if ((lastRecord.getType() == RecordType.NONE)
204 && ((bytesToDataEnd + HEADER_LENGTH) <= bytesToBlockEnd)) {
205 type = RecordType.FULL;
206 bytes = bytesToDataEnd;
207 } else if (lastRecord.getType() == RecordType.NONE) {
208 type = RecordType.FIRST;
209 bytes = bytesToBlockEnd - HEADER_LENGTH;
210 } else if (bytesToDataEnd <= bytesToBlockEnd) {
211 type = RecordType.LAST;
212 bytes = bytesToDataEnd;
213 } else {
214 type = RecordType.MIDDLE;
215 bytes = bytesToBlockEnd - HEADER_LENGTH;
217 return new Record(type, bytes);
221 * This method creates a record inside of a {@link ByteBuffer}
223 * @param data The data to output.
224 * @param record A {@link RecordWriteChannelImpl.Record} object that describes
225 * which data to write.
227 private void writePhysicalRecord(ByteBuffer data, Record record) {
228 writeBuffer.putInt(generateCrc(data.array(), data.position(), record.getBytes(),
229 record.getType()));
230 writeBuffer.putShort((short) record.getBytes());
231 writeBuffer.put(record.getType().value());
232 writeBuffer.put(data.array(), data.position(), record.getBytes());
233 data.position(data.position() + record.getBytes());
237 * Fills the {@link ByteBuffer} with 0x00;
239 * @param numBlanks the number of bytes to pad.
241 private void writeBlanks(int numBlanks) {
242 for (int i = 0; i < numBlanks; i++) {
243 writeBuffer.put((byte) 0x00);
248 * Generates a CRC32C checksum using {@link Crc32c} for a specific record.
250 * @param data The user data over which the checksum will be generated.
251 * @param off The offset into the user data at which to begin the computation.
252 * @param len The length of user data to use in the computation.
253 * @param type The {@link RecordType} of the record, which is included in the
254 * checksum.
255 * @return the masked checksum.
257 private int generateCrc(byte[] data, int off, int len, RecordType type) {
258 Crc32c crc = new Crc32c();
259 crc.update(type.value());
260 crc.update(data, off, len);
261 return (int) maskCrc(crc.getValue());
265 * Closes the stream and adds padding to the end of the block without closing
266 * the underlying {@link AppEngineFile}.
268 * @throws IOException
270 private void closeStream(boolean pad) throws IOException {
271 int bytesToBlockEnd = writeBuffer.remaining();
272 if (bytesToBlockEnd < BLOCK_SIZE) {
273 if (pad) {
274 writeBlanks(bytesToBlockEnd);
276 writeBuffer.flip();
277 output.write(writeBuffer, nextSequenceKey);
278 nextSequenceKey = null;
279 writeBuffer.clear();