1 // Copyright 2011 Google Inc. All rights reserved.
3 package com
.google
.appengine
.api
.files
;
5 import static com
.google
.appengine
.api
.files
.RecordConstants
.BLOCK_SIZE
;
6 import static com
.google
.appengine
.api
.files
.RecordConstants
.HEADER_LENGTH
;
7 import static com
.google
.appengine
.api
.files
.RecordConstants
.maskCrc
;
9 import com
.google
.appengine
.api
.files
.RecordConstants
.RecordType
;
10 import com
.google
.common
.base
.Preconditions
;
12 import java
.io
.IOException
;
13 import java
.nio
.ByteBuffer
;
14 import java
.nio
.ByteOrder
;
17 * An implementation of a {@link RecordWriteChannel}.
20 final class RecordWriteChannelImpl
implements RecordWriteChannel
{
23 * A class that holds information needed to write a physical record.
25 private static final class Record
{
26 private final RecordType type
;
27 private final int bytes
;
30 type
= RecordType
.NONE
;
34 private Record(RecordType type
, int bytes
) {
35 Preconditions
.checkArgument(type
!= RecordType
.UNKNOWN
);
36 Preconditions
.checkArgument(bytes
>= 0);
42 * Returns the number of bytes that needs to be written.
44 * @return the number of bytes.
51 * Returns the type of record that needs to be written.
55 RecordType
getType() {
60 private final Object lock
= new Object();
61 private final FileWriteChannel output
;
63 private ByteBuffer writeBuffer
;
66 * @param output a {@link FileWriteChannel} to write the record to.
68 public RecordWriteChannelImpl(FileWriteChannel output
) {
71 writeBuffer
= ByteBuffer
.allocate(BLOCK_SIZE
);
72 writeBuffer
.order(ByteOrder
.LITTLE_ENDIAN
);
79 public int write(ByteBuffer data
) throws IOException
{
81 return write(data
, null);
89 public boolean isOpen() {
91 return output
.isOpen();
99 public int write(ByteBuffer data
, String sequenceKey
) throws IOException
{
100 synchronized (lock
) {
102 extendBufferIfNecessary(data
);
103 int oldPosition
= position
;
104 Record lastRecord
= new Record();
106 Record currentRecord
= createRecord(data
, lastRecord
);
107 if (currentRecord
.getType() == RecordType
.NONE
) {
108 writeBlanks(currentRecord
.getBytes());
110 writePhysicalRecord(data
, currentRecord
);
112 position
= oldPosition
+ writeBuffer
.position();
113 lastRecord
= currentRecord
;
114 } while (data
.hasRemaining());
117 if (sequenceKey
== null) {
118 return output
.write(writeBuffer
);
120 return output
.write(writeBuffer
, sequenceKey
);
128 public void closeFinally() throws IllegalStateException
, IOException
{
129 synchronized (lock
) {
131 output
.closeFinally();
136 * Closes and finalizes the {@link RecordWriteChannel}.
139 public void close() throws IOException
{
140 synchronized (lock
) {
147 * Fills a {@link Record} object with data about the physical record to write.
149 * @param data the users data.
150 * @param lastRecord a {@link Record} representing the last physical record written.
151 * @return the {@link Record} with new write data.
153 private Record
createRecord(ByteBuffer data
, Record lastRecord
) {
154 int bytesToBlockEnd
= BLOCK_SIZE
- (position
% BLOCK_SIZE
);
155 int minBytesToWrite
= data
.limit() + HEADER_LENGTH
- data
.position();
156 RecordType type
= RecordType
.UNKNOWN
;
158 if (bytesToBlockEnd
< HEADER_LENGTH
) {
159 type
= lastRecord
.getType();
160 bytes
= bytesToBlockEnd
;
161 } else if (lastRecord
.getType() == RecordType
.NONE
&& minBytesToWrite
<= bytesToBlockEnd
) {
162 type
= RecordType
.FULL
;
163 bytes
= minBytesToWrite
- HEADER_LENGTH
;
164 } else if (lastRecord
.getType() == RecordType
.NONE
) {
165 type
= RecordType
.FIRST
;
166 bytes
= bytesToBlockEnd
- HEADER_LENGTH
;
167 } else if (minBytesToWrite
<= bytesToBlockEnd
) {
168 type
= RecordType
.LAST
;
169 bytes
= data
.limit() - data
.position();
171 type
= RecordType
.MIDDLE
;
172 bytes
= bytesToBlockEnd
- HEADER_LENGTH
;
174 return new Record(type
, bytes
);
178 * This method creates a record inside of a {@link ByteBuffer}
180 * @param data The data to output.
181 * @param record A {@link RecordWriteChannelImpl.Record} object that describes
182 * which data to write.
184 private void writePhysicalRecord(ByteBuffer data
, Record record
) {
185 writeBuffer
.putInt(generateCrc(data
.array(), data
.position(), record
.getBytes(),
187 writeBuffer
.putShort((short) record
.getBytes());
188 writeBuffer
.put(record
.getType().value());
189 writeBuffer
.put(data
.array(), data
.position(), record
.getBytes());
190 data
.position(data
.position() + record
.getBytes());
194 * Fills the {@link ByteBuffer} with 0x00;
196 * @param numBlanks the number of bytes to pad.
198 private void writeBlanks(int numBlanks
) {
199 for (int i
= 0; i
< numBlanks
; i
++) {
200 writeBuffer
.put((byte) 0x00);
205 * Generates a CRC32C checksum using {@link Crc32c} for a specific record.
207 * @param data The user data over which the checksum will be generated.
208 * @param off The offset into the user data at which to begin the computation.
209 * @param len The length of user data to use in the computation.
210 * @param type The {@link RecordType} of the record, which is included in the
212 * @return the masked checksum.
214 private int generateCrc(byte[] data
, int off
, int len
, RecordType type
) {
215 Crc32c crc
= new Crc32c();
216 crc
.update(type
.value());
217 crc
.update(data
, off
, len
);
218 return (int) maskCrc(crc
.getValue());
221 private void extendBufferIfNecessary(ByteBuffer record
) {
222 int maxNumHeaders
= 1 + (int) Math
.ceil(record
.limit() / (BLOCK_SIZE
- HEADER_LENGTH
));
223 int maxRecordSize
= record
.limit() + maxNumHeaders
* HEADER_LENGTH
;
224 int capacity
= writeBuffer
.capacity();
225 while (capacity
< maxRecordSize
) {
228 writeBuffer
= ByteBuffer
.allocate(capacity
);
229 writeBuffer
.order(ByteOrder
.LITTLE_ENDIAN
);
234 * Closes the stream and adds padding to the end of the block without closing
235 * the underlying {@link AppEngineFile}.
237 * @throws IOException
239 private void closeStream() throws IOException
{
241 int bytesToBlockEnd
= BLOCK_SIZE
- (position
% BLOCK_SIZE
);
242 if (bytesToBlockEnd
== BLOCK_SIZE
) {
245 writeBlanks(bytesToBlockEnd
);
247 position
+= writeBuffer
.limit();
248 output
.write(writeBuffer
);