* Changed nested commits to be lock-free. No support for VArrays at the moment, will...
[jvstm.git] / jvstm / src / main / java / jvstm / ParallelNestedTransaction.java
blob839754391066b7846178f7c1f14e737809fed01e
1 /*
2 * JVSTM: a Java library for Software Transactional Memory
3 * Copyright (C) 2005 INESC-ID Software Engineering Group
4 * http://www.esw.inesc-id.pt
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 * Author's contact:
21 * INESC-ID Software Engineering Group
22 * Rua Alves Redol 9
23 * 1000 - 029 Lisboa
24 * Portugal
26 package jvstm;
28 import java.util.HashMap;
29 import java.util.Map;
30 import java.util.concurrent.atomic.AtomicInteger;
32 import jvstm.util.Cons;
34 /**
35 * Parallel Nested Transaction used to represent a part of a transaction that is
36 * running (potentially) in parallel with other subparts of the same
37 * transaction. The programmer is responsible for identifying the parts of a
38 * transaction that he wants to run concurrently. Consequently, those parts may
39 * not run in program order. The only guarantee is that their execution will be
40 * equivalent to some sequential order (plus the properties of opacity). If that
41 * guarantee is already provided by the disjoint accesses of each subpart,
42 * consider using UnsafeParallelTransaction.
44 * @author nmld
47 public class ParallelNestedTransaction extends ReadWriteTransaction {
49 protected ThreadLocal<AtomicInteger> blocksFree = new ThreadLocal<AtomicInteger>() {
50 @Override
51 protected AtomicInteger initialValue() {
52 return new AtomicInteger(0);
56 protected ThreadLocal<Cons<ReadBlock>> blocksPool = new ThreadLocal<Cons<ReadBlock>>() {
57 @Override
58 protected Cons<ReadBlock> initialValue() {
59 return Cons.empty();
63 protected Cons<ReadBlock> globalReads;
64 protected Map<VBox, InplaceWrite> nestedReads;
66 public ParallelNestedTransaction(ReadWriteTransaction parent) {
67 super(parent);
69 int[] parentVers = parent.ancVersions;
70 super.ancVersions = new int[parentVers.length + 1];
71 super.ancVersions[0] = parent.nestedCommitQueue.commitNumber;
72 for (int i = 0; i < parentVers.length; i++) {
73 this.ancVersions[i + 1] = parentVers[i];
76 this.nestedReads = new HashMap<VBox, InplaceWrite>();
77 this.globalReads = Cons.empty();
78 this.boxesWritten = parent.boxesWritten;
81 public ParallelNestedTransaction(ReadWriteTransaction parent, boolean multithreaded) {
82 super(parent);
83 super.ancVersions = EMPTY_VERSIONS;
84 this.nestedReads = ReadWriteTransaction.EMPTY_MAP;
85 this.globalReads = Cons.empty();
88 @Override
89 public Transaction makeUnsafeMultithreaded() {
90 throw new Error("An Unsafe Parallel Transaction may only be spawned by another Unsafe or a Top-Level transaction");
93 @Override
94 public Transaction makeNestedTransaction(boolean readOnly) {
95 throw new Error(
96 "A Parallel Nested Transaction cannot spawn a Linear Nested Transaction yet. Consider using a single Parallel Nested Transaction instead.");
99 @Override
100 protected Transaction commitAndBeginTx(boolean readOnly) {
101 commitTx(true);
102 return beginWithActiveRecord(readOnly, null);
105 // Returns -2 if self; -1 if not anc; >= 0 as version on anc otherwise
106 protected int retrieveAncestorVersion(Transaction tx) {
107 if (tx == this)
108 return -2;
110 int i = 0;
111 Transaction nextParent = parent;
112 while (nextParent != null) {
113 if (nextParent == tx) {
114 return ancVersions[i];
116 nextParent = nextParent.parent;
117 i++;
119 return -1;
122 private Transaction retrieveLowestCommonAncestor(Transaction tx) {
123 Transaction current = tx;
124 while (current != null) {
125 if (retrieveAncestorVersion(current) >= 0) {
126 return current;
128 current = current.parent;
130 return null;
133 @Override
134 protected void abortTx() {
135 if (this.orec.version != OwnershipRecord.ABORTED) {
136 manualAbort();
138 Transaction.current.set(parent);
141 private void manualAbort() {
142 for (ParallelNestedTransaction mergedIntoParent : getRWParent().mergedTxs) {
143 for (VBox vboxMergedIntoParent : mergedIntoParent.boxesWrittenInPlace) {
144 InplaceWrite inplaceWrite = vboxMergedIntoParent.inplace;
145 if (inplaceWrite.orec.owner == this && inplaceWrite.next != null) {
146 InplaceWrite newWriteAtHead = inplaceWrite.next;
147 while (newWriteAtHead.next != null && newWriteAtHead.next.orec.owner == this) {
148 newWriteAtHead = newWriteAtHead.next;
150 vboxMergedIntoParent.inplace = newWriteAtHead;
154 for (VBox vboxMergedIntoParent : getRWParent().boxesWrittenInPlace) {
155 InplaceWrite inplaceWrite = vboxMergedIntoParent.inplace;
156 if (inplaceWrite.orec.owner == this && inplaceWrite.next != null) {
157 InplaceWrite newWriteAtHead = inplaceWrite.next;
158 while (newWriteAtHead.next != null && newWriteAtHead.next.orec.owner == this) {
159 newWriteAtHead = newWriteAtHead.next;
161 vboxMergedIntoParent.inplace = newWriteAtHead;
165 this.orec.version = OwnershipRecord.ABORTED;
166 for (ReadWriteTransaction child : mergedTxs) {
167 child.orec.version = OwnershipRecord.ABORTED;
169 super.boxesWritten = null;
171 int i = 0;
172 for (ReadBlock block : globalReads) {
173 block.free = true;
174 i++;
176 blocksFree.get().addAndGet(i);
178 this.globalReads = null;
179 this.nestedReads = null;
180 super.mergedTxs = null;
183 protected <T> T readGlobal(VBox<T> vbox) {
184 VBoxBody<T> body = vbox.body;
185 if (body.version > number) {
186 throw new EarlyAbortException(body.version);
189 ReadBlock readBlock = null;
190 if (next < 0) {
191 if (blocksFree.get().get() > 0) {
192 for (ReadBlock poolBlock : blocksPool.get()) {
193 if (poolBlock.free) {
194 poolBlock.free = false;
195 readBlock = poolBlock;
196 blocksFree.get().decrementAndGet();
197 break;
200 } else {
201 readBlock = new ReadBlock(blocksFree.get());
203 next = 999;
204 globalReads = globalReads.cons(readBlock);
205 } else {
206 readBlock = globalReads.first();
208 readBlock.entries[next--] = vbox;
209 return body.value;
212 @Override
213 public <T> T getBoxValue(VBox<T> vbox) {
214 InplaceWrite<T> inplaceWrite = vbox.inplace;
215 T value = inplaceWrite.tempValue;
216 OwnershipRecord inplaceOrec = inplaceWrite.orec;
218 if (inplaceOrec.version > 0 && inplaceOrec.version <= number) {
219 value = readGlobal(vbox);
220 return value;
223 do {
224 int entryNestedVersion = inplaceOrec.nestedVersion;
225 int versionOnAnc = retrieveAncestorVersion(inplaceOrec.owner);
226 if (versionOnAnc >= 0) {
227 if (entryNestedVersion > versionOnAnc) {
228 // eager w-r conflict, may restart immediately
229 manualAbort();
230 throw new CommitException(inplaceOrec.owner);
232 nestedReads.put(vbox, inplaceWrite);
233 return (value == NULL_VALUE) ? null : value;
235 if (versionOnAnc == -2) {
236 return (value == NULL_VALUE) ? null : value;
238 inplaceWrite = inplaceWrite.next;
239 if (inplaceWrite == null) {
240 break;
242 value = inplaceWrite.tempValue;
243 inplaceOrec = inplaceWrite.orec;
244 } while (true);
246 if (boxesWritten != EMPTY_MAP) {
247 value = (T) boxesWritten.get(vbox);
248 if (value != null) {
249 return (value == NULL_VALUE) ? null : value;
253 value = readGlobal(vbox);
254 return value;
258 @Override
259 public <T> void setBoxValue(jvstm.VBox<T> vbox, T value) {
260 InplaceWrite<T> inplaceWrite = vbox.inplace;
261 OwnershipRecord currentOwner = inplaceWrite.orec;
262 if (currentOwner.owner == this) { // we are already the current writer
263 inplaceWrite.tempValue = (value == null ? (T) NULL_VALUE : value);
264 return;
267 while (true) {
268 if (currentOwner.version != 0) {
269 if (currentOwner.version <= this.number) {
270 if (inplaceWrite.CASowner(currentOwner, this.orec)) {
271 inplaceWrite.tempValue = (value == null ? (T) NULL_VALUE : value);
272 boxesWrittenInPlace = boxesWrittenInPlace.cons(vbox);
273 return;
275 currentOwner = inplaceWrite.orec;
276 continue;
278 // more recent than my number
279 break;
280 } else {
281 if (retrieveAncestorVersion(currentOwner.owner) >= 0) {
282 if (vbox.CASinplace(inplaceWrite, new InplaceWrite<T>(this.orec, (value == null ? (T) NULL_VALUE : value),
283 inplaceWrite))) {
284 return;
286 inplaceWrite = vbox.inplace;
287 currentOwner = inplaceWrite.orec;
288 continue;
289 } else {
290 Transaction abortUpTo = retrieveLowestCommonAncestor(currentOwner.owner);
291 // owner is not from this nesting tree
292 break;
297 manualAbort();
298 throw new WriteOnRootWriteSetException();
302 * Here we ensure that the array read is consistent with concurrent nested
303 * commits
305 @Override
306 protected <T> T getLocalArrayValue(VArrayEntry<T> entry) {
307 if (this.arrayWrites != EMPTY_MAP) {
308 VArrayEntry<T> wsEntry = (VArrayEntry<T>) this.arrayWrites.get(entry);
309 if (wsEntry != null) {
310 return (wsEntry.getWriteValue() == null ? (T) NULL_VALUE : wsEntry.getWriteValue());
314 ReadWriteTransaction iter = getRWParent();
315 while (iter != null) {
316 if (iter.arrayWrites != EMPTY_MAP) {
317 VArrayEntry<T> wsEntry = (VArrayEntry<T>) iter.arrayWrites.get(entry);
318 if (wsEntry == null) {
319 iter = iter.getRWParent();
320 continue;
323 if (wsEntry.nestedVersion <= retrieveAncestorVersion(iter)) {
324 this.arraysRead = this.arraysRead.cons(entry);
325 entry.setReadOwner(iter);
326 return (wsEntry.getWriteValue() == null ? (T) NULL_VALUE : wsEntry.getWriteValue());
327 } else {
328 throw new CommitException(iter);
331 iter = iter.getRWParent();
334 return null;
337 @Override
338 protected void finish() {
339 boxesWritten = null;
340 perTxValues = null;
341 mergedTxs = null;
344 @Override
345 protected void doCommit() {
346 tryCommit();
347 boxesWritten = null;
348 perTxValues = EMPTY_MAP;
349 mergedTxs = null;
352 @Override
353 protected void cleanUp() {
354 boxesWrittenInPlace = null;
355 nestedReads = null;
356 for (ReadBlock block : globalReads) {
357 block.free = true;
358 block.freeBlocks.incrementAndGet();
360 globalReads = null;
364 protected NestedCommitRecord helpCommitAll(NestedCommitRecord start) {
365 NestedCommitRecord lastSeen = start;
366 NestedCommitRecord current = lastSeen.next.get();
367 while (current != null) {
368 if (!current.recordCommitted) {
369 current.helpCommit();
371 lastSeen = current;
372 current = current.next.get();
374 return lastSeen;
377 @Override
378 protected void tryCommit() {
379 ReadWriteTransaction parent = getRWParent();
381 NestedCommitRecord lastSeen;
382 NestedCommitRecord newCommit;
384 do {
385 lastSeen = helpCommitAll(parent.nestedCommitQueue);
386 snapshotValidation();
387 newCommit = new NestedCommitRecord(this, this.mergedTxs, parent.mergedTxs, lastSeen.commitNumber);
388 } while (!lastSeen.next.compareAndSet(null, newCommit));
390 lastSeen = parent.nestedCommitQueue;
391 while ((lastSeen != null) && (lastSeen.commitNumber <= newCommit.commitNumber)) {
392 lastSeen.helpCommit();
393 lastSeen = lastSeen.next.get();
396 // Validate array reads and propagate them to the parent. Only a
397 // subset is propagated.
398 // At this point this transaction can no longer fail, thus the
399 // propagation is correct.
401 // Not supported at the moment
402 // parent.arraysRead = validateNestedArrayReads();
406 protected void snapshotValidation() {
407 if (retrieveAncestorVersion(parent) == ((ReadWriteTransaction) parent).nestedCommitQueue.commitNumber) {
408 return;
411 for (Map.Entry<VBox, InplaceWrite> read : nestedReads.entrySet()) {
412 validateNestedRead(read);
415 for (ParallelNestedTransaction mergedTx : mergedTxs) {
416 for (Map.Entry<VBox, InplaceWrite> read : mergedTx.nestedReads.entrySet()) {
417 validateNestedRead(read);
421 if (!this.globalReads.isEmpty()) {
422 validateGlobalReads(globalReads, next);
425 for (ParallelNestedTransaction mergedTx : mergedTxs) {
426 if (!mergedTx.globalReads.isEmpty()) {
427 validateGlobalReads(mergedTx.globalReads, mergedTx.next);
434 * Validate a single read that was a read-after-write over some ancestor
435 * write. Iterate over the inplace writes of that VBox: if an entry is found
436 * belonging to an ancestor, it must be the one that it was read, in which
437 * case the search stops.
439 protected void validateNestedRead(Map.Entry<VBox, InplaceWrite> read) {
440 InplaceWrite inplaceRead = read.getValue();
441 InplaceWrite iter = read.getKey().inplace;
442 do {
443 if (iter == inplaceRead) {
444 break;
446 int maxVersion = retrieveAncestorVersion(iter.orec.owner);
447 if (maxVersion >= 0) {
448 manualAbort();
449 throw new CommitException(iter.orec.owner);
451 iter = iter.next;
452 } while (iter != null);
456 * Validate a single read that obtained a VBoxBody Iterate over the inplace
457 * writes of that VBox: no entry may be found that belonged to an ancestor
459 protected void validateGlobalReads(Cons<ReadBlock> reads, int startIdx) {
460 VBox[] array = reads.first().entries;
461 // the first may not be full
462 for (int i = startIdx + 1; i < array.length; i++) {
463 InplaceWrite iter = array[i].inplace;
464 do {
465 int maxVersion = retrieveAncestorVersion(iter.orec.owner);
466 if (maxVersion >= 0) {
467 manualAbort();
468 throw new CommitException(iter.orec.owner);
470 iter = iter.next;
471 } while (iter != null);
474 // the rest are full
475 for (ReadBlock block : reads.rest()) {
476 array = block.entries;
477 for (int i = 0; i < array.length; i++) {
478 InplaceWrite iter = array[i].inplace;
479 do {
480 int maxVersion = retrieveAncestorVersion(iter.orec.owner);
481 if (maxVersion >= 0) {
482 manualAbort();
483 throw new CommitException(iter.orec.owner);
485 iter = iter.next;
486 } while (iter != null);
491 protected Cons<VArrayEntry<?>> validateNestedArrayReads() {
492 Map<VArrayEntry<?>, VArrayEntry<?>> parentArrayWrites = getRWParent().arrayWrites;
493 Cons<VArrayEntry<?>> parentArrayReads = getRWParent().arraysRead;
494 int maxVersionOnParent = retrieveAncestorVersion(parent);
495 for (VArrayEntry<?> entry : arraysRead) {
497 // If the read was performed on an ancestor of the parent, then
498 // propagate it
499 // for further validation
500 if (entry.owner != parent) {
501 parentArrayReads = parentArrayReads.cons(entry);
504 if (parentArrayWrites != EMPTY_MAP) {
505 // Verify if the parent contains a more recent write for the
506 // read that we performed
507 // somewhere in our ancestors
508 VArrayEntry<?> parentWrite = parentArrayWrites.get(entry);
509 if (parentWrite == null) {
510 continue;
512 if (parentWrite.nestedVersion > maxVersionOnParent) {
513 throw new CommitException(parent);
518 return parentArrayReads;