Parallel nested transactions had a bug on the abort procedure.
[jvstm.git] / src / main / java / jvstm / ParallelNestedTransaction.java
blob1f299d603eadc11395527f7a85fcbcede372df71
1 /*
2 * JVSTM: a Java library for Software Transactional Memory
3 * Copyright (C) 2005 INESC-ID Software Engineering Group
4 * http://www.esw.inesc-id.pt
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 * Author's contact:
21 * INESC-ID Software Engineering Group
22 * Rua Alves Redol 9
23 * 1000 - 029 Lisboa
24 * Portugal
26 package jvstm;
28 import java.util.HashMap;
29 import java.util.Map;
30 import java.util.concurrent.atomic.AtomicInteger;
32 import jvstm.util.Cons;
34 /**
35 * Parallel Nested Transaction is used to represent a part of a transaction that is
36 * running (potentially) in parallel with other subparts of the same
37 * transaction. The programmer is responsible for identifying the parts of a
38 * transaction that he wants to run concurrently. Consequently, those parts may
39 * not run in program order. The only guarantee is that their execution will be
40 * equivalent to some sequential order (plus the properties of opacity). If that
41 * guarantee is already provided by the disjoint accesses of each subpart,
42 * consider using UnsafeParallelTransaction.
44 * @author nmld
47 public class ParallelNestedTransaction extends ReadWriteTransaction {
49 protected static final ExecuteParallelNestedTxSequentiallyException EXECUTE_SEQUENTIALLY_EXCEPTION = new ExecuteParallelNestedTxSequentiallyException();
51 protected ThreadLocal<AtomicInteger> blocksFree = new ThreadLocal<AtomicInteger>() {
52 @Override
53 protected AtomicInteger initialValue() {
54 return new AtomicInteger(0);
58 protected ThreadLocal<Cons<ReadBlock>> blocksPool = new ThreadLocal<Cons<ReadBlock>>() {
59 @Override
60 protected Cons<ReadBlock> initialValue() {
61 return Cons.empty();
65 protected Cons<ReadBlock> globalReads;
66 protected Map<VBox, InplaceWrite> nestedReads;
68 public ParallelNestedTransaction(ReadWriteTransaction parent) {
69 super(parent);
71 int[] parentVers = parent.ancVersions;
72 super.ancVersions = new int[parentVers.length + 1];
73 super.ancVersions[0] = parent.nestedCommitQueue.commitNumber;
74 for (int i = 0; i < parentVers.length; i++) {
75 this.ancVersions[i + 1] = parentVers[i];
78 this.nestedReads = new HashMap<VBox, InplaceWrite>();
79 this.globalReads = Cons.empty();
80 this.boxesWritten = parent.boxesWritten;
83 public ParallelNestedTransaction(ReadWriteTransaction parent, boolean multithreaded) {
84 super(parent);
85 super.ancVersions = EMPTY_VERSIONS;
86 this.nestedReads = ReadWriteTransaction.EMPTY_MAP;
87 this.globalReads = Cons.empty();
90 @Override
91 public Transaction makeUnsafeMultithreaded() {
92 throw new Error("An Unsafe Parallel Transaction may only be spawned by another Unsafe or a Top-Level transaction");
95 @Override
96 public Transaction makeNestedTransaction(boolean readOnly) {
97 throw new Error(
98 "A Parallel Nested Transaction cannot spawn a Linear Nested Transaction yet. Consider using a single Parallel Nested Transaction instead.");
101 @Override
102 protected Transaction commitAndBeginTx(boolean readOnly) {
103 commitTx(true);
104 return beginWithActiveRecord(readOnly, null);
107 // Returns -2 if self; -1 if not anc; >= 0 as version on anc otherwise
108 protected int retrieveAncestorVersion(Transaction tx) {
109 if (tx == this)
110 return -2;
112 int i = 0;
113 Transaction nextParent = parent;
114 while (nextParent != null) {
115 if (nextParent == tx) {
116 return ancVersions[i];
118 nextParent = nextParent.parent;
119 i++;
121 return -1;
124 private Transaction retrieveLowestCommonAncestor(Transaction tx) {
125 Transaction current = tx;
126 while (current != null) {
127 if (retrieveAncestorVersion(current) >= 0) {
128 return current;
130 current = current.parent;
132 return null;
135 @Override
136 public void abortTx() {
137 if (this.orec.version != OwnershipRecord.ABORTED) {
138 manualAbort();
140 Transaction.current.set(parent);
143 private void manualAbort() {
144 ReadWriteTransaction parent = getRWParent();
145 while (parent != null) {
146 for (ParallelNestedTransaction mergedIntoParent : parent.mergedTxs) {
147 for (VBox vboxMergedIntoParent : mergedIntoParent.boxesWrittenInPlace) {
148 revertOverwrite(vboxMergedIntoParent);
151 for (VBox vboxMergedIntoParent : parent.boxesWrittenInPlace) {
152 revertOverwrite(vboxMergedIntoParent);
154 parent = parent.getRWParent();
157 this.orec.version = OwnershipRecord.ABORTED;
158 for (ReadWriteTransaction child : mergedTxs) {
159 child.orec.version = OwnershipRecord.ABORTED;
161 super.boxesWritten = null;
163 int i = 0;
164 for (ReadBlock block : globalReads) {
165 block.free = true;
166 i++;
168 blocksFree.get().addAndGet(i);
170 this.globalReads = null;
171 this.nestedReads = null;
172 super.mergedTxs = null;
175 protected void revertOverwrite(VBox vboxWritten) {
176 InplaceWrite write = vboxWritten.inplace;
177 if (write.orec.owner != this) {
178 return;
180 InplaceWrite overwritten = write;
181 while (overwritten.next != null) {
182 overwritten = overwritten.next;
183 if (overwritten.orec.owner != this && overwritten.orec.version == OwnershipRecord.RUNNING) {
184 write.tempValue = overwritten.tempValue;
185 write.next = overwritten.next;
186 overwritten.orec.owner = overwritten.orec.owner; // enforce
187 // visibility
188 write.orec = overwritten.orec;
189 return;
194 protected <T> T readGlobal(VBox<T> vbox) {
195 VBoxBody<T> body = vbox.body;
196 if (body.version > number) {
197 TransactionSignaller.SIGNALLER.signalEarlyAbort();
200 ReadBlock readBlock = null;
201 if (next < 0) {
202 if (blocksFree.get().get() > 0) {
203 for (ReadBlock poolBlock : blocksPool.get()) {
204 if (poolBlock.free) {
205 poolBlock.free = false;
206 readBlock = poolBlock;
207 blocksFree.get().decrementAndGet();
208 break;
211 } else {
212 readBlock = new ReadBlock(blocksFree.get());
214 next = 999;
215 globalReads = globalReads.cons(readBlock);
216 } else {
217 readBlock = globalReads.first();
219 readBlock.entries[next--] = vbox;
220 return body.value;
223 @Override
224 public <T> T getBoxValue(VBox<T> vbox) {
225 InplaceWrite<T> inplaceWrite = vbox.inplace;
226 T value = inplaceWrite.tempValue;
227 OwnershipRecord inplaceOrec = inplaceWrite.orec;
229 if (inplaceOrec.version > 0 && inplaceOrec.version <= number) {
230 value = readGlobal(vbox);
231 return value;
234 do {
235 int entryNestedVersion = inplaceOrec.nestedVersion;
236 int versionOnAnc = retrieveAncestorVersion(inplaceOrec.owner);
237 if (versionOnAnc >= 0) {
238 if (entryNestedVersion > versionOnAnc) {
239 // eager w-r conflict, may restart immediately
240 manualAbort();
241 TransactionSignaller.SIGNALLER.signalCommitFail(inplaceOrec.owner);
243 nestedReads.put(vbox, inplaceWrite);
244 return (value == NULL_VALUE) ? null : value;
246 if (versionOnAnc == -2) {
247 return (value == NULL_VALUE) ? null : value;
249 inplaceWrite = inplaceWrite.next;
250 if (inplaceWrite == null) {
251 break;
253 value = inplaceWrite.tempValue;
254 inplaceOrec = inplaceWrite.orec;
255 } while (true);
257 if (boxesWritten != EMPTY_MAP) {
258 value = (T) boxesWritten.get(vbox);
259 if (value != null) {
260 return (value == NULL_VALUE) ? null : value;
264 value = readGlobal(vbox);
265 return value;
269 @Override
270 public <T> void setBoxValue(jvstm.VBox<T> vbox, T value) {
271 InplaceWrite<T> inplaceWrite = vbox.inplace;
272 OwnershipRecord currentOwner = inplaceWrite.orec;
273 if (currentOwner.owner == this) { // we are already the current writer
274 inplaceWrite.tempValue = (value == null ? (T) NULL_VALUE : value);
275 return;
278 while (true) {
279 if (currentOwner.version != 0) {
280 if (currentOwner.version <= this.number) {
281 if (inplaceWrite.CASowner(currentOwner, this.orec)) {
282 inplaceWrite.tempValue = (value == null ? (T) NULL_VALUE : value);
283 boxesWrittenInPlace = boxesWrittenInPlace.cons(vbox);
284 return;
286 currentOwner = inplaceWrite.orec;
287 continue;
289 // more recent than my number
290 break;
291 } else {
292 if (retrieveAncestorVersion(currentOwner.owner) >= 0) {
293 if (vbox.CASinplace(inplaceWrite, new InplaceWrite<T>(this.orec, (value == null ? (T) NULL_VALUE : value),
294 inplaceWrite))) {
295 return;
297 inplaceWrite = vbox.inplace;
298 currentOwner = inplaceWrite.orec;
299 continue;
301 break;
305 manualAbort();
306 throw EXECUTE_SEQUENTIALLY_EXCEPTION;
310 * VArrays:
311 * Here we ensure that the local array read over ancestors is consistent with concurrent nested commits
312 * This procedure is blocking, accordingly to the support provided to VArrays.
314 @Override
315 protected <T> T getLocalArrayValue(VArrayEntry<T> entry) {
316 if (this.arrayWrites != EMPTY_MAP) {
317 VArrayEntry<T> wsEntry = (VArrayEntry<T>) this.arrayWrites.get(entry);
318 if (wsEntry != null) {
319 return (wsEntry.getWriteValue() == null ? (T) NULL_VALUE : wsEntry.getWriteValue());
323 ReadWriteTransaction iter = getRWParent();
324 while (iter != null) {
325 synchronized (iter) {
326 if (iter.arrayWrites != EMPTY_MAP) {
327 VArrayEntry<T> wsEntry = (VArrayEntry<T>) iter.arrayWrites.get(entry);
328 if (wsEntry == null) {
329 iter = iter.getRWParent();
330 continue;
333 if (wsEntry.nestedVersion <= retrieveAncestorVersion(iter)) {
334 this.arraysRead = this.arraysRead.cons(entry);
335 entry.setReadOwner(iter);
336 return (wsEntry.getWriteValue() == null ? (T) NULL_VALUE : wsEntry.getWriteValue());
337 } else {
338 TransactionSignaller.SIGNALLER.signalCommitFail(iter);
342 iter = iter.getRWParent();
345 return null;
349 * Both parallel nested transactions and perTxBoxes may be seen as alternatives to work
350 * around inherently-conflicting workloads. An important question may be posed if we put
351 * them together: when should a perTxBox be committed, if write by a parallel nested
352 * transaction? Is it solving a conflict at top-level, or nested level of parallelism?
354 * Should the need for perTxBoxes arise in parNesting, that question shall have to be addressed.
356 @Override
357 public <T> T getPerTxValue(PerTxBox<T> box, T initial) {
358 throw new RuntimeException("Parallel Nested Transactions do not support PerTxBoxes");
361 @Override
362 public <T> void setPerTxValue(PerTxBox<T> box, T value) {
363 throw new RuntimeException("Parallel Nested Transactions do not support PerTxBoxes");
366 @Override
367 protected void finish() {
368 boxesWritten = null;
369 perTxValues = null;
370 mergedTxs = null;
373 @Override
374 protected void doCommit() {
375 tryCommit();
376 boxesWritten = null;
377 perTxValues = EMPTY_MAP;
378 mergedTxs = null;
381 @Override
382 protected void cleanUp() {
383 boxesWrittenInPlace = null;
384 nestedReads = null;
385 for (ReadBlock block : globalReads) {
386 block.free = true;
387 block.freeBlocks.incrementAndGet();
389 globalReads = null;
393 protected NestedCommitRecord helpCommitAll(NestedCommitRecord start) {
394 NestedCommitRecord lastSeen = start;
395 NestedCommitRecord current = lastSeen.next.get();
396 while (current != null) {
397 if (!current.recordCommitted) {
398 current.helpCommit();
400 lastSeen = current;
401 current = current.next.get();
403 return lastSeen;
406 @Override
407 protected void tryCommit() {
408 ReadWriteTransaction parent = getRWParent();
409 NestedCommitRecord lastSeen;
410 NestedCommitRecord newCommit;
412 do {
413 lastSeen = helpCommitAll(parent.nestedCommitQueue);
414 snapshotValidation(lastSeen.commitNumber);
415 Cons<VArrayEntry<?>> varrayReadsToPropagate = validateNestedArrayReads();
416 newCommit = new NestedCommitRecord(this, this.mergedTxs, parent.mergedTxs, varrayReadsToPropagate, arrayWrites, arrayWritesCount, lastSeen.commitNumber + 1);
417 } while (!lastSeen.next.compareAndSet(null, newCommit));
419 lastSeen = parent.nestedCommitQueue;
420 while ((lastSeen != null) && (lastSeen.commitNumber <= newCommit.commitNumber)) {
421 if (!lastSeen.recordCommitted) {
422 lastSeen.helpCommit();
423 parent.nestedCommitQueue = lastSeen;
425 lastSeen = lastSeen.next.get();
430 @Override
431 protected void snapshotValidation(int lastSeenNumber) {
432 if (retrieveAncestorVersion(parent) == lastSeenNumber) {
433 return;
436 for (Map.Entry<VBox, InplaceWrite> read : nestedReads.entrySet()) {
437 validateNestedRead(read);
440 for (ParallelNestedTransaction mergedTx : mergedTxs) {
441 for (Map.Entry<VBox, InplaceWrite> read : mergedTx.nestedReads.entrySet()) {
442 validateNestedRead(read);
446 if (!this.globalReads.isEmpty()) {
447 validateGlobalReads(globalReads, next);
450 for (ParallelNestedTransaction mergedTx : mergedTxs) {
451 if (!mergedTx.globalReads.isEmpty()) {
452 validateGlobalReads(mergedTx.globalReads, mergedTx.next);
459 * Validate a single read that was a read-after-write over some ancestor
460 * write. Iterate over the inplace writes of that VBox: if an entry is found
461 * belonging to an ancestor, it must be the one that it was read, in which
462 * case the search stops.
464 protected void validateNestedRead(Map.Entry<VBox, InplaceWrite> read) {
465 InplaceWrite inplaceRead = read.getValue();
466 InplaceWrite iter = read.getKey().inplace;
467 do {
468 if (iter == inplaceRead) {
469 break;
471 int maxVersion = retrieveAncestorVersion(iter.orec.owner);
472 if (maxVersion >= 0) {
473 manualAbort();
474 TransactionSignaller.SIGNALLER.signalCommitFail(iter.orec.owner);
476 iter = iter.next;
477 } while (iter != null);
481 * Validate a single read that obtained a VBoxBody Iterate over the inplace
482 * writes of that VBox: no entry may be found that belonged to an ancestor
484 protected void validateGlobalReads(Cons<ReadBlock> reads, int startIdx) {
485 VBox[] array = reads.first().entries;
486 // the first may not be full
487 for (int i = startIdx + 1; i < array.length; i++) {
488 InplaceWrite iter = array[i].inplace;
489 do {
490 int maxVersion = retrieveAncestorVersion(iter.orec.owner);
491 if (maxVersion >= 0) {
492 manualAbort();
493 TransactionSignaller.SIGNALLER.signalCommitFail(iter.orec.owner);
495 iter = iter.next;
496 } while (iter != null);
499 // the rest are full
500 for (ReadBlock block : reads.rest()) {
501 array = block.entries;
502 for (int i = 0; i < array.length; i++) {
503 InplaceWrite iter = array[i].inplace;
504 do {
505 int maxVersion = retrieveAncestorVersion(iter.orec.owner);
506 if (maxVersion >= 0) {
507 manualAbort();
508 TransactionSignaller.SIGNALLER.signalCommitFail(iter.orec.owner);
510 iter = iter.next;
511 } while (iter != null);
516 protected Cons<VArrayEntry<?>> validateNestedArrayReads() {
517 Map<VArrayEntry<?>, VArrayEntry<?>> parentArrayWrites = getRWParent().arrayWrites;
518 Cons<VArrayEntry<?>> parentArrayReads = getRWParent().arraysRead;
519 int maxVersionOnParent = retrieveAncestorVersion(parent);
520 for (VArrayEntry<?> entry : arraysRead) {
522 // If the read was performed on an ancestor of the parent, then
523 // propagate it for further validation
524 if (entry.owner != parent) {
525 parentArrayReads = parentArrayReads.cons(entry);
528 synchronized (parent) {
529 if (parentArrayWrites != EMPTY_MAP) {
530 // Verify if the parent contains a more recent write for the
531 // read that we performed somewhere in our ancestors
532 VArrayEntry<?> parentWrite = parentArrayWrites.get(entry);
533 if (parentWrite == null) {
534 continue;
536 if (parentWrite.nestedVersion > maxVersionOnParent) {
537 TransactionSignaller.SIGNALLER.signalCommitFail(parent);
543 return parentArrayReads;