1 // Copyright 2011 Google Inc. All rights reserved.
3 package com
.google
.appengine
.api
.files
;
5 import static com
.google
.appengine
.api
.files
.RecordConstants
.BLOCK_SIZE
;
6 import static com
.google
.appengine
.api
.files
.RecordConstants
.HEADER_LENGTH
;
7 import static com
.google
.appengine
.api
.files
.RecordConstants
.maskCrc
;
9 import com
.google
.appengine
.api
.files
.RecordConstants
.RecordType
;
10 import com
.google
.common
.base
.Preconditions
;
12 import java
.io
.IOException
;
13 import java
.nio
.ByteBuffer
;
14 import java
.nio
.ByteOrder
;
17 * An implementation of a {@link RecordWriteChannel}.
21 final class RecordWriteChannelImpl
implements RecordWriteChannel
{
24 * A class that holds information needed to write a physical record.
26 private static final class Record
{
27 private final RecordType type
;
28 private final int bytes
;
31 type
= RecordType
.NONE
;
35 private Record(RecordType type
, int bytes
) {
36 Preconditions
.checkArgument(type
!= RecordType
.UNKNOWN
);
37 Preconditions
.checkArgument(bytes
>= 0);
43 * Returns the number of bytes that needs to be written.
45 * @return the number of bytes.
52 * Returns the type of record that needs to be written.
56 RecordType
getType() {
62 protected static final String LOWEST_SEQUENCE_KEY
= "\u0000";
64 private final Object lock
= new Object();
65 private final FileWriteChannel output
;
66 private ByteBuffer writeBuffer
;
67 private String nextSequenceKey
;
68 private String lastSequenceKey
;
71 * @param output a {@link FileWriteChannel} to write the record to.
73 public RecordWriteChannelImpl(FileWriteChannel output
) {
75 writeBuffer
= ByteBuffer
.allocate(BLOCK_SIZE
);
76 writeBuffer
.order(ByteOrder
.LITTLE_ENDIAN
);
83 public int write(ByteBuffer data
) throws IOException
{
84 return write(data
, null);
91 public boolean isOpen() {
93 return output
.isOpen();
98 * Verify the sequence key is strictly ascending. On the first non-null key, it will retrieve the
99 * latest sequence key from the file service by issuing a write with no bytes using the lowest
100 * possible sequence key.
102 * @param sequenceKey the sequence key to be checked
103 * @throws IOException if there is a problem retrieving the current sequence key from the file
105 * @throws KeyOrderingException if the key provided is not strictly ascending from prior records
107 private void verifySequenceKey(String sequenceKey
) throws IOException
, KeyOrderingException
{
108 if (sequenceKey
!= null) {
109 if (lastSequenceKey
== null) {
110 ByteBuffer emptyData
= ByteBuffer
.wrap(new byte[0]);
112 output
.write(emptyData
, LOWEST_SEQUENCE_KEY
);
113 } catch (KeyOrderingException exception
) {
114 lastSequenceKey
= exception
.getLastGoodSequenceKey();
118 if ((lastSequenceKey
!= null) && (sequenceKey
.compareTo(lastSequenceKey
) <= 0)) {
119 throw new KeyOrderingException(null, lastSequenceKey
);
121 lastSequenceKey
= sequenceKey
;
129 public int write(ByteBuffer data
, String sequenceKey
) throws IOException
{
130 synchronized (lock
) {
131 verifySequenceKey(sequenceKey
);
132 if ((sequenceKey
!= null) && (sequenceKey
.equals(LOWEST_SEQUENCE_KEY
))) {
136 int bytesWritten
= 0;
137 Record lastRecord
= new Record();
140 int bytesToBlockEnd
= writeBuffer
.remaining();
141 Record currentRecord
= createRecord(data
, bytesToBlockEnd
, lastRecord
);
142 writePhysicalRecord(data
, currentRecord
);
143 bytesWritten
+= currentRecord
.getBytes() + HEADER_LENGTH
;
144 lastRecord
= currentRecord
;
146 if ((lastRecord
.getType() == RecordType
.FULL
)
147 || (lastRecord
.getType() == RecordType
.LAST
)) {
148 nextSequenceKey
= sequenceKey
;
151 bytesToBlockEnd
= writeBuffer
.remaining();
152 if ((bytesToBlockEnd
< HEADER_LENGTH
) && (bytesToBlockEnd
> 0)) {
153 writeBlanks(bytesToBlockEnd
);
154 bytesWritten
+= bytesToBlockEnd
;
158 if (bytesToBlockEnd
== 0) {
160 output
.write(writeBuffer
, nextSequenceKey
);
162 nextSequenceKey
= null;
164 } while (data
.hasRemaining());
173 public void closeFinally() throws IllegalStateException
, IOException
{
174 synchronized (lock
) {
176 output
.closeFinally();
181 * Closes and finalizes the {@link RecordWriteChannel}.
184 public void close() throws IOException
{
185 synchronized (lock
) {
192 * Fills a {@link Record} object with data about the physical record to write.
194 * @param data the users data.
195 * @param bytesToBlockEnd remaining bytes in the current block.
196 * @param lastRecord a {@link Record} representing the last physical record written.
197 * @return the {@link Record} with new write data.
199 private static Record
createRecord(ByteBuffer data
, int bytesToBlockEnd
, Record lastRecord
) {
200 int bytesToDataEnd
= data
.remaining();
201 RecordType type
= RecordType
.UNKNOWN
;
203 if ((lastRecord
.getType() == RecordType
.NONE
)
204 && ((bytesToDataEnd
+ HEADER_LENGTH
) <= bytesToBlockEnd
)) {
205 type
= RecordType
.FULL
;
206 bytes
= bytesToDataEnd
;
207 } else if (lastRecord
.getType() == RecordType
.NONE
) {
208 type
= RecordType
.FIRST
;
209 bytes
= bytesToBlockEnd
- HEADER_LENGTH
;
210 } else if (bytesToDataEnd
<= bytesToBlockEnd
) {
211 type
= RecordType
.LAST
;
212 bytes
= bytesToDataEnd
;
214 type
= RecordType
.MIDDLE
;
215 bytes
= bytesToBlockEnd
- HEADER_LENGTH
;
217 return new Record(type
, bytes
);
221 * This method creates a record inside of a {@link ByteBuffer}
223 * @param data The data to output.
224 * @param record A {@link RecordWriteChannelImpl.Record} object that describes
225 * which data to write.
227 private void writePhysicalRecord(ByteBuffer data
, Record record
) {
228 writeBuffer
.putInt(generateCrc(data
.array(), data
.position(), record
.getBytes(),
230 writeBuffer
.putShort((short) record
.getBytes());
231 writeBuffer
.put(record
.getType().value());
232 writeBuffer
.put(data
.array(), data
.position(), record
.getBytes());
233 data
.position(data
.position() + record
.getBytes());
237 * Fills the {@link ByteBuffer} with 0x00;
239 * @param numBlanks the number of bytes to pad.
241 private void writeBlanks(int numBlanks
) {
242 for (int i
= 0; i
< numBlanks
; i
++) {
243 writeBuffer
.put((byte) 0x00);
248 * Generates a CRC32C checksum using {@link Crc32c} for a specific record.
250 * @param data The user data over which the checksum will be generated.
251 * @param off The offset into the user data at which to begin the computation.
252 * @param len The length of user data to use in the computation.
253 * @param type The {@link RecordType} of the record, which is included in the
255 * @return the masked checksum.
257 private int generateCrc(byte[] data
, int off
, int len
, RecordType type
) {
258 Crc32c crc
= new Crc32c();
259 crc
.update(type
.value());
260 crc
.update(data
, off
, len
);
261 return (int) maskCrc(crc
.getValue());
265 * Closes the stream and adds padding to the end of the block without closing
266 * the underlying {@link AppEngineFile}.
268 * @throws IOException
270 private void closeStream(boolean pad
) throws IOException
{
271 int bytesToBlockEnd
= writeBuffer
.remaining();
272 if (bytesToBlockEnd
< BLOCK_SIZE
) {
274 writeBlanks(bytesToBlockEnd
);
277 output
.write(writeBuffer
, nextSequenceKey
);
278 nextSequenceKey
= null;