1 // Copyright 2011 Google Inc. All rights reserved.
3 package com
.google
.appengine
.api
.files
;
5 import static com
.google
.appengine
.api
.files
.RecordConstants
.BLOCK_SIZE
;
6 import static com
.google
.appengine
.api
.files
.RecordConstants
.HEADER_LENGTH
;
7 import static com
.google
.appengine
.api
.files
.RecordConstants
.maskCrc
;
9 import com
.google
.appengine
.api
.files
.RecordConstants
.RecordType
;
10 import com
.google
.common
.base
.Preconditions
;
12 import java
.io
.IOException
;
13 import java
.nio
.ByteBuffer
;
14 import java
.nio
.ByteOrder
;
17 * An implementation of a {@link RecordWriteChannel}.
20 final class RecordWriteChannelImpl
implements RecordWriteChannel
{
23 * A class that holds information needed to write a physical record.
25 private static final class Record
{
26 private final RecordType type
;
27 private final int bytes
;
30 type
= RecordType
.NONE
;
34 private Record(RecordType type
, int bytes
) {
35 Preconditions
.checkArgument(type
!= RecordType
.UNKNOWN
);
36 Preconditions
.checkArgument(bytes
>= 0);
42 * Returns the number of bytes that needs to be written.
44 * @return the number of bytes.
51 * Returns the type of record that needs to be written.
55 RecordType
getType() {
61 private final FileWriteChannel output
;
63 private ByteBuffer writeBuffer
;
66 * @param output a {@link FileWriteChannel} to write the record to.
68 public RecordWriteChannelImpl(FileWriteChannel output
) {
71 writeBuffer
= ByteBuffer
.allocate(BLOCK_SIZE
);
72 writeBuffer
.order(ByteOrder
.LITTLE_ENDIAN
);
79 public int write(ByteBuffer data
) throws IOException
{
80 return write(data
, null);
87 public boolean isOpen() {
88 return output
.isOpen();
95 public int write(ByteBuffer data
, String sequenceKey
) throws IOException
{
97 extendBufferIfNecessary(data
);
98 int oldPosition
= position
;
99 Record lastRecord
= new Record();
101 Record currentRecord
= createRecord(data
, lastRecord
);
102 if (currentRecord
.getType() == RecordType
.NONE
) {
103 writeBlanks(currentRecord
.getBytes());
105 writePhysicalRecord(data
, currentRecord
);
107 position
= oldPosition
+ writeBuffer
.position();
108 lastRecord
= currentRecord
;
109 } while (data
.hasRemaining());
112 if (sequenceKey
== null) {
113 return output
.write(writeBuffer
);
115 return output
.write(writeBuffer
, sequenceKey
);
122 public void closeFinally() throws IllegalStateException
, IOException
{
124 output
.closeFinally();
128 * Closes and finalizes the {@link RecordWriteChannel}.
131 public void close() throws IOException
{
137 * Fills a {@link Record} object with data about the physical record to write.
139 * @param data the users data.
140 * @param lastRecord a {@link Record} representing the last physical record written.
141 * @return the {@link Record} with new write data.
143 private Record
createRecord(ByteBuffer data
, Record lastRecord
) {
144 int bytesToBlockEnd
= BLOCK_SIZE
- (position
% BLOCK_SIZE
);
145 int minBytesToWrite
= data
.limit() + HEADER_LENGTH
- data
.position();
146 RecordType type
= RecordType
.UNKNOWN
;
148 if (bytesToBlockEnd
< HEADER_LENGTH
) {
149 type
= lastRecord
.getType();
150 bytes
= bytesToBlockEnd
;
151 } else if (lastRecord
.getType() == RecordType
.NONE
&& minBytesToWrite
<= bytesToBlockEnd
) {
152 type
= RecordType
.FULL
;
153 bytes
= minBytesToWrite
- HEADER_LENGTH
;
154 } else if (lastRecord
.getType() == RecordType
.NONE
) {
155 type
= RecordType
.FIRST
;
156 bytes
= bytesToBlockEnd
- HEADER_LENGTH
;
157 } else if (minBytesToWrite
<= bytesToBlockEnd
) {
158 type
= RecordType
.LAST
;
159 bytes
= data
.limit() - data
.position();
161 type
= RecordType
.MIDDLE
;
162 bytes
= bytesToBlockEnd
- HEADER_LENGTH
;
164 return new Record(type
, bytes
);
168 * This method creates a record inside of a {@link ByteBuffer}
170 * @param data The data to output.
171 * @param record A {@link RecordWriteChannelImpl.Record} object that describes
172 * which data to write.
174 private void writePhysicalRecord(ByteBuffer data
, Record record
) {
175 writeBuffer
.putInt(generateCrc(data
.array(), data
.position(), record
.getBytes(),
177 writeBuffer
.putShort((short) record
.getBytes());
178 writeBuffer
.put(record
.getType().value());
179 writeBuffer
.put(data
.array(), data
.position(), record
.getBytes());
180 data
.position(data
.position() + record
.getBytes());
184 * Fills the {@link ByteBuffer} with 0x00;
186 * @param numBlanks the number of bytes to pad.
188 private void writeBlanks(int numBlanks
) {
189 for (int i
= 0; i
< numBlanks
; i
++) {
190 writeBuffer
.put((byte) 0x00);
195 * Generates a CRC32C checksum using {@link Crc32c} for a specific record.
197 * @param data The user data over which the checksum will be generated.
198 * @param off The offset into the user data at which to begin the computation.
199 * @param len The length of user data to use in the computation.
200 * @param type The {@link RecordType} of the record, which is included in the
202 * @return the masked checksum.
204 private int generateCrc(byte[] data
, int off
, int len
, RecordType type
) {
205 Crc32c crc
= new Crc32c();
206 crc
.update(type
.value());
207 crc
.update(data
, off
, len
);
208 return (int) maskCrc(crc
.getValue());
211 private void extendBufferIfNecessary(ByteBuffer record
) {
212 int maxNumHeaders
= 1 + (int) Math
.ceil(record
.limit() / (BLOCK_SIZE
- HEADER_LENGTH
));
213 int maxRecordSize
= record
.limit() + maxNumHeaders
* HEADER_LENGTH
;
214 int capacity
= writeBuffer
.capacity();
215 while (capacity
< maxRecordSize
) {
218 writeBuffer
= ByteBuffer
.allocate(capacity
);
219 writeBuffer
.order(ByteOrder
.LITTLE_ENDIAN
);
224 * Closes the stream and adds padding to the end of the block without closing
225 * the underlying {@link AppEngineFile}.
227 * @throws IOException
229 private void closeStream() throws IOException
{
231 int bytesToBlockEnd
= BLOCK_SIZE
- (position
% BLOCK_SIZE
);
232 if (bytesToBlockEnd
== BLOCK_SIZE
) {
235 writeBlanks(bytesToBlockEnd
);
237 position
+= writeBuffer
.limit();
238 output
.write(writeBuffer
);