1 // Copyright 2011 Google Inc. All rights reserved.
3 package com
.google
.appengine
.api
.files
;
5 import static com
.google
.appengine
.api
.files
.RecordConstants
.BLOCK_SIZE
;
6 import static com
.google
.appengine
.api
.files
.RecordConstants
.HEADER_LENGTH
;
7 import static com
.google
.appengine
.api
.files
.RecordConstants
.maskCrc
;
9 import com
.google
.appengine
.api
.files
.RecordConstants
.RecordType
;
10 import com
.google
.common
.base
.Preconditions
;
12 import java
.io
.IOException
;
13 import java
.nio
.ByteBuffer
;
14 import java
.nio
.ByteOrder
;
17 * An implementation of a {@link RecordWriteChannel}.
20 final class RecordWriteChannelImpl
implements RecordWriteChannel
{
23 * A class that holds information needed to write a physical record.
25 private static final class Record
{
26 private final RecordType type
;
27 private final int bytes
;
30 type
= RecordType
.NONE
;
34 private Record(RecordType type
, int bytes
) {
35 Preconditions
.checkArgument(type
!= RecordType
.UNKNOWN
);
36 Preconditions
.checkArgument(bytes
>= 0);
42 * Returns the number of bytes that needs to be written.
44 * @return the number of bytes.
51 * Returns the type of record that needs to be written.
55 RecordType
getType() {
61 protected static final String LOWEST_SEQUENCE_KEY
= "\u0000";
63 private final Object lock
= new Object();
64 private final FileWriteChannel output
;
65 private ByteBuffer writeBuffer
;
66 private String nextSequenceKey
;
67 private String lastSequenceKey
;
70 * @param output a {@link FileWriteChannel} to write the record to.
72 public RecordWriteChannelImpl(FileWriteChannel output
) {
74 writeBuffer
= ByteBuffer
.allocate(BLOCK_SIZE
);
75 writeBuffer
.order(ByteOrder
.LITTLE_ENDIAN
);
82 public int write(ByteBuffer data
) throws IOException
{
83 return write(data
, null);
90 public boolean isOpen() {
92 return output
.isOpen();
97 * Verify the sequence key is strictly ascending. On the first non-null key, it will retrieve the
98 * latest sequence key from the file service by issuing a write with no bytes using the lowest
99 * possible sequence key.
101 * @param sequenceKey the sequence key to be checked
102 * @throws IOException if there is a problem retrieving the current sequence key from the file
104 * @throws KeyOrderingException if the key provided is not strictly ascending from prior records
106 private void verifySequenceKey(String sequenceKey
) throws IOException
, KeyOrderingException
{
107 if (sequenceKey
!= null) {
108 if (lastSequenceKey
== null) {
109 ByteBuffer emptyData
= ByteBuffer
.wrap(new byte[0]);
111 output
.write(emptyData
, LOWEST_SEQUENCE_KEY
);
112 } catch (KeyOrderingException exception
) {
113 lastSequenceKey
= exception
.getLastGoodSequenceKey();
117 if ((lastSequenceKey
!= null) && (sequenceKey
.compareTo(lastSequenceKey
) <= 0)) {
118 throw new KeyOrderingException(null, lastSequenceKey
);
120 lastSequenceKey
= sequenceKey
;
128 public int write(ByteBuffer data
, String sequenceKey
) throws IOException
{
129 synchronized (lock
) {
130 verifySequenceKey(sequenceKey
);
131 if ((sequenceKey
!= null) && (sequenceKey
.equals(LOWEST_SEQUENCE_KEY
))) {
135 int bytesWritten
= 0;
136 Record lastRecord
= new Record();
139 int bytesToBlockEnd
= writeBuffer
.remaining();
140 Record currentRecord
= createRecord(data
, bytesToBlockEnd
, lastRecord
);
141 writePhysicalRecord(data
, currentRecord
);
142 bytesWritten
+= currentRecord
.getBytes() + HEADER_LENGTH
;
143 lastRecord
= currentRecord
;
145 if ((lastRecord
.getType() == RecordType
.FULL
)
146 || (lastRecord
.getType() == RecordType
.LAST
)) {
147 nextSequenceKey
= sequenceKey
;
150 bytesToBlockEnd
= writeBuffer
.remaining();
151 if ((bytesToBlockEnd
< HEADER_LENGTH
) && (bytesToBlockEnd
> 0)) {
152 writeBlanks(bytesToBlockEnd
);
153 bytesWritten
+= bytesToBlockEnd
;
157 if (bytesToBlockEnd
== 0) {
159 output
.write(writeBuffer
, nextSequenceKey
);
161 nextSequenceKey
= null;
163 } while (data
.hasRemaining());
172 public void closeFinally() throws IllegalStateException
, IOException
{
173 synchronized (lock
) {
175 output
.closeFinally();
180 * Closes and finalizes the {@link RecordWriteChannel}.
183 public void close() throws IOException
{
184 synchronized (lock
) {
191 * Fills a {@link Record} object with data about the physical record to write.
193 * @param data the users data.
194 * @param bytesToBlockEnd remaining bytes in the current block.
195 * @param lastRecord a {@link Record} representing the last physical record written.
196 * @return the {@link Record} with new write data.
198 private static Record
createRecord(ByteBuffer data
, int bytesToBlockEnd
, Record lastRecord
) {
199 int bytesToDataEnd
= data
.remaining();
200 RecordType type
= RecordType
.UNKNOWN
;
202 if ((lastRecord
.getType() == RecordType
.NONE
)
203 && ((bytesToDataEnd
+ HEADER_LENGTH
) <= bytesToBlockEnd
)) {
204 type
= RecordType
.FULL
;
205 bytes
= bytesToDataEnd
;
206 } else if (lastRecord
.getType() == RecordType
.NONE
) {
207 type
= RecordType
.FIRST
;
208 bytes
= bytesToBlockEnd
- HEADER_LENGTH
;
209 } else if (bytesToDataEnd
<= bytesToBlockEnd
) {
210 type
= RecordType
.LAST
;
211 bytes
= bytesToDataEnd
;
213 type
= RecordType
.MIDDLE
;
214 bytes
= bytesToBlockEnd
- HEADER_LENGTH
;
216 return new Record(type
, bytes
);
220 * This method creates a record inside of a {@link ByteBuffer}
222 * @param data The data to output.
223 * @param record A {@link RecordWriteChannelImpl.Record} object that describes
224 * which data to write.
226 private void writePhysicalRecord(ByteBuffer data
, Record record
) {
227 writeBuffer
.putInt(generateCrc(data
.array(), data
.position(), record
.getBytes(),
229 writeBuffer
.putShort((short) record
.getBytes());
230 writeBuffer
.put(record
.getType().value());
231 writeBuffer
.put(data
.array(), data
.position(), record
.getBytes());
232 data
.position(data
.position() + record
.getBytes());
236 * Fills the {@link ByteBuffer} with 0x00;
238 * @param numBlanks the number of bytes to pad.
240 private void writeBlanks(int numBlanks
) {
241 for (int i
= 0; i
< numBlanks
; i
++) {
242 writeBuffer
.put((byte) 0x00);
247 * Generates a CRC32C checksum using {@link Crc32c} for a specific record.
249 * @param data The user data over which the checksum will be generated.
250 * @param off The offset into the user data at which to begin the computation.
251 * @param len The length of user data to use in the computation.
252 * @param type The {@link RecordType} of the record, which is included in the
254 * @return the masked checksum.
256 private int generateCrc(byte[] data
, int off
, int len
, RecordType type
) {
257 Crc32c crc
= new Crc32c();
258 crc
.update(type
.value());
259 crc
.update(data
, off
, len
);
260 return (int) maskCrc(crc
.getValue());
264 * Closes the stream and adds padding to the end of the block without closing
265 * the underlying {@link AppEngineFile}.
267 * @throws IOException
269 private void closeStream(boolean pad
) throws IOException
{
270 int bytesToBlockEnd
= writeBuffer
.remaining();
271 if (bytesToBlockEnd
< BLOCK_SIZE
) {
273 writeBlanks(bytesToBlockEnd
);
276 output
.write(writeBuffer
, nextSequenceKey
);
277 nextSequenceKey
= null;