Revision created by MOE tool push_codebase.
[gae.git] / java / src / main / com / google / appengine / api / files / RecordWriteChannelImpl.java
blobb07b36fa53cf7297fba68404375170ca3b2173e5
1 // Copyright 2011 Google Inc. All rights reserved.
3 package com.google.appengine.api.files;
5 import static com.google.appengine.api.files.RecordConstants.BLOCK_SIZE;
6 import static com.google.appengine.api.files.RecordConstants.HEADER_LENGTH;
7 import static com.google.appengine.api.files.RecordConstants.maskCrc;
9 import com.google.appengine.api.files.RecordConstants.RecordType;
10 import com.google.common.base.Preconditions;
12 import java.io.IOException;
13 import java.nio.ByteBuffer;
14 import java.nio.ByteOrder;
16 /**
17 * An implementation of a {@link RecordWriteChannel}.
20 final class RecordWriteChannelImpl implements RecordWriteChannel {
22 /**
23 * A class that holds information needed to write a physical record.
25 private static final class Record {
26 private final RecordType type;
27 private final int bytes;
29 private Record() {
30 type = RecordType.NONE;
31 bytes = 0;
34 private Record(RecordType type, int bytes) {
35 Preconditions.checkArgument(type != RecordType.UNKNOWN);
36 Preconditions.checkArgument(bytes >= 0);
37 this.type = type;
38 this.bytes = bytes;
41 /**
42 * Returns the number of bytes that needs to be written.
44 * @return the number of bytes.
46 int getBytes() {
47 return bytes;
50 /**
51 * Returns the type of record that needs to be written.
53 * @return the type.
55 RecordType getType() {
56 return type;
60 private final Object lock = new Object();
61 private final FileWriteChannel output;
62 private int position;
63 private ByteBuffer writeBuffer;
65 /**
66 * @param output a {@link FileWriteChannel} to write the record to.
68 public RecordWriteChannelImpl(FileWriteChannel output) {
69 this.output = output;
70 position = 0;
71 writeBuffer = ByteBuffer.allocate(BLOCK_SIZE);
72 writeBuffer.order(ByteOrder.LITTLE_ENDIAN);
75 /**
76 * {@inheritDoc}
78 @Override
79 public int write(ByteBuffer data) throws IOException {
80 synchronized (lock) {
81 return write(data, null);
85 /**
86 * {@inheritDoc}
88 @Override
89 public boolean isOpen() {
90 synchronized (lock) {
91 return output.isOpen();
95 /**
96 * {@inheritDoc}
98 @Override
99 public int write(ByteBuffer data, String sequenceKey) throws IOException {
100 synchronized (lock) {
101 writeBuffer.clear();
102 extendBufferIfNecessary(data);
103 int oldPosition = position;
104 Record lastRecord = new Record();
105 do {
106 Record currentRecord = createRecord(data, lastRecord);
107 if (currentRecord.getType() == RecordType.NONE) {
108 writeBlanks(currentRecord.getBytes());
109 } else {
110 writePhysicalRecord(data, currentRecord);
112 position = oldPosition + writeBuffer.position();
113 lastRecord = currentRecord;
114 } while (data.hasRemaining());
116 writeBuffer.flip();
117 if (sequenceKey == null) {
118 return output.write(writeBuffer);
120 return output.write(writeBuffer, sequenceKey);
125 * {@inheritDoc}
127 @Override
128 public void closeFinally() throws IllegalStateException, IOException {
129 synchronized (lock) {
130 closeStream();
131 output.closeFinally();
136 * Closes and finalizes the {@link RecordWriteChannel}.
138 @Override
139 public void close() throws IOException {
140 synchronized (lock) {
141 closeStream();
142 output.close();
147 * Fills a {@link Record} object with data about the physical record to write.
149 * @param data the users data.
150 * @param lastRecord a {@link Record} representing the last physical record written.
151 * @return the {@link Record} with new write data.
153 private Record createRecord(ByteBuffer data, Record lastRecord) {
154 int bytesToBlockEnd = BLOCK_SIZE - (position % BLOCK_SIZE);
155 int minBytesToWrite = data.limit() + HEADER_LENGTH - data.position();
156 RecordType type = RecordType.UNKNOWN;
157 int bytes = -1;
158 if (bytesToBlockEnd < HEADER_LENGTH) {
159 type = lastRecord.getType();
160 bytes = bytesToBlockEnd;
161 } else if (lastRecord.getType() == RecordType.NONE && minBytesToWrite <= bytesToBlockEnd) {
162 type = RecordType.FULL;
163 bytes = minBytesToWrite - HEADER_LENGTH;
164 } else if (lastRecord.getType() == RecordType.NONE) {
165 type = RecordType.FIRST;
166 bytes = bytesToBlockEnd - HEADER_LENGTH;
167 } else if (minBytesToWrite <= bytesToBlockEnd) {
168 type = RecordType.LAST;
169 bytes = data.limit() - data.position();
170 } else {
171 type = RecordType.MIDDLE;
172 bytes = bytesToBlockEnd - HEADER_LENGTH;
174 return new Record(type, bytes);
178 * This method creates a record inside of a {@link ByteBuffer}
180 * @param data The data to output.
181 * @param record A {@link RecordWriteChannelImpl.Record} object that describes
182 * which data to write.
184 private void writePhysicalRecord(ByteBuffer data, Record record) {
185 writeBuffer.putInt(generateCrc(data.array(), data.position(), record.getBytes(),
186 record.getType()));
187 writeBuffer.putShort((short) record.getBytes());
188 writeBuffer.put(record.getType().value());
189 writeBuffer.put(data.array(), data.position(), record.getBytes());
190 data.position(data.position() + record.getBytes());
194 * Fills the {@link ByteBuffer} with 0x00;
196 * @param numBlanks the number of bytes to pad.
198 private void writeBlanks(int numBlanks) {
199 for (int i = 0; i < numBlanks; i++) {
200 writeBuffer.put((byte) 0x00);
205 * Generates a CRC32C checksum using {@link Crc32c} for a specific record.
207 * @param data The user data over which the checksum will be generated.
208 * @param off The offset into the user data at which to begin the computation.
209 * @param len The length of user data to use in the computation.
210 * @param type The {@link RecordType} of the record, which is included in the
211 * checksum.
212 * @return the masked checksum.
214 private int generateCrc(byte[] data, int off, int len, RecordType type) {
215 Crc32c crc = new Crc32c();
216 crc.update(type.value());
217 crc.update(data, off, len);
218 return (int) maskCrc(crc.getValue());
221 private void extendBufferIfNecessary(ByteBuffer record) {
222 int maxNumHeaders = 1 + (int) Math.ceil(record.limit() / (BLOCK_SIZE - HEADER_LENGTH));
223 int maxRecordSize = record.limit() + maxNumHeaders * HEADER_LENGTH;
224 int capacity = writeBuffer.capacity();
225 while (capacity < maxRecordSize) {
226 capacity *= 2;
228 writeBuffer = ByteBuffer.allocate(capacity);
229 writeBuffer.order(ByteOrder.LITTLE_ENDIAN);
234 * Closes the stream and adds padding to the end of the block without closing
235 * the underlying {@link AppEngineFile}.
237 * @throws IOException
239 private void closeStream() throws IOException {
240 writeBuffer.clear();
241 int bytesToBlockEnd = BLOCK_SIZE - (position % BLOCK_SIZE);
242 if (bytesToBlockEnd == BLOCK_SIZE) {
243 return;
245 writeBlanks(bytesToBlockEnd);
246 writeBuffer.flip();
247 position += writeBuffer.limit();
248 output.write(writeBuffer);