2 * JVSTM: a Java library for Software Transactional Memory
3 * Copyright (C) 2005 INESC-ID Software Engineering Group
4 * http://www.esw.inesc-id.pt
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
21 * INESC-ID Software Engineering Group
28 import java
.util
.HashMap
;
30 import java
.util
.concurrent
.atomic
.AtomicInteger
;
32 import jvstm
.util
.Cons
;
35 * Parallel Nested Transaction used to represent a part of a transaction that is
36 * running (potentially) in parallel with other subparts of the same
37 * transaction. The programmer is responsible for identifying the parts of a
38 * transaction that he wants to run concurrently. Consequently, those parts may
39 * not run in program order. The only guarantee is that their execution will be
40 * equivalent to some sequential order (plus the properties of opacity). If that
41 * guarantee is already provided by the disjoint accesses of each subpart,
42 * consider using UnsafeParallelTransaction.
47 public class ParallelNestedTransaction
extends ReadWriteTransaction
{
49 protected ThreadLocal
<AtomicInteger
> blocksFree
= new ThreadLocal
<AtomicInteger
>() {
51 protected AtomicInteger
initialValue() {
52 return new AtomicInteger(0);
56 protected ThreadLocal
<Cons
<ReadBlock
>> blocksPool
= new ThreadLocal
<Cons
<ReadBlock
>>() {
58 protected Cons
<ReadBlock
> initialValue() {
63 protected Cons
<ReadBlock
> globalReads
;
64 protected Map
<VBox
, InplaceWrite
> nestedReads
;
66 public ParallelNestedTransaction(ReadWriteTransaction parent
) {
69 int[] parentVers
= parent
.ancVersions
;
70 super.ancVersions
= new int[parentVers
.length
+ 1];
71 super.ancVersions
[0] = parent
.nestedCommitQueue
.commitNumber
;
72 for (int i
= 0; i
< parentVers
.length
; i
++) {
73 this.ancVersions
[i
+ 1] = parentVers
[i
];
76 this.nestedReads
= new HashMap
<VBox
, InplaceWrite
>();
77 this.globalReads
= Cons
.empty();
78 this.boxesWritten
= parent
.boxesWritten
;
81 public ParallelNestedTransaction(ReadWriteTransaction parent
, boolean multithreaded
) {
83 super.ancVersions
= EMPTY_VERSIONS
;
84 this.nestedReads
= ReadWriteTransaction
.EMPTY_MAP
;
85 this.globalReads
= Cons
.empty();
89 public Transaction
makeUnsafeMultithreaded() {
90 throw new Error("An Unsafe Parallel Transaction may only be spawned by another Unsafe or a Top-Level transaction");
94 public Transaction
makeNestedTransaction(boolean readOnly
) {
96 "A Parallel Nested Transaction cannot spawn a Linear Nested Transaction yet. Consider using a single Parallel Nested Transaction instead.");
100 protected Transaction
commitAndBeginTx(boolean readOnly
) {
102 return beginWithActiveRecord(readOnly
, null);
105 // Returns -2 if self; -1 if not anc; >= 0 as version on anc otherwise
106 protected int retrieveAncestorVersion(Transaction tx
) {
111 Transaction nextParent
= parent
;
112 while (nextParent
!= null) {
113 if (nextParent
== tx
) {
114 return ancVersions
[i
];
116 nextParent
= nextParent
.parent
;
122 private Transaction
retrieveLowestCommonAncestor(Transaction tx
) {
123 Transaction current
= tx
;
124 while (current
!= null) {
125 if (retrieveAncestorVersion(current
) >= 0) {
128 current
= current
.parent
;
134 protected void abortTx() {
135 if (this.orec
.version
!= OwnershipRecord
.ABORTED
) {
138 Transaction
.current
.set(parent
);
141 private void manualAbort() {
142 for (ParallelNestedTransaction mergedIntoParent
: getRWParent().mergedTxs
) {
143 for (VBox vboxMergedIntoParent
: mergedIntoParent
.boxesWrittenInPlace
) {
144 InplaceWrite inplaceWrite
= vboxMergedIntoParent
.inplace
;
145 if (inplaceWrite
.orec
.owner
== this && inplaceWrite
.next
!= null) {
146 InplaceWrite newWriteAtHead
= inplaceWrite
.next
;
147 while (newWriteAtHead
.next
!= null && newWriteAtHead
.next
.orec
.owner
== this) {
148 newWriteAtHead
= newWriteAtHead
.next
;
150 vboxMergedIntoParent
.inplace
= newWriteAtHead
;
154 for (VBox vboxMergedIntoParent
: getRWParent().boxesWrittenInPlace
) {
155 InplaceWrite inplaceWrite
= vboxMergedIntoParent
.inplace
;
156 if (inplaceWrite
.orec
.owner
== this && inplaceWrite
.next
!= null) {
157 InplaceWrite newWriteAtHead
= inplaceWrite
.next
;
158 while (newWriteAtHead
.next
!= null && newWriteAtHead
.next
.orec
.owner
== this) {
159 newWriteAtHead
= newWriteAtHead
.next
;
161 vboxMergedIntoParent
.inplace
= newWriteAtHead
;
165 this.orec
.version
= OwnershipRecord
.ABORTED
;
166 for (ReadWriteTransaction child
: mergedTxs
) {
167 child
.orec
.version
= OwnershipRecord
.ABORTED
;
169 super.boxesWritten
= null;
172 for (ReadBlock block
: globalReads
) {
176 blocksFree
.get().addAndGet(i
);
178 this.globalReads
= null;
179 this.nestedReads
= null;
180 super.mergedTxs
= null;
183 protected <T
> T
readGlobal(VBox
<T
> vbox
) {
184 VBoxBody
<T
> body
= vbox
.body
;
185 if (body
.version
> number
) {
186 throw new EarlyAbortException(body
.version
);
189 ReadBlock readBlock
= null;
191 if (blocksFree
.get().get() > 0) {
192 for (ReadBlock poolBlock
: blocksPool
.get()) {
193 if (poolBlock
.free
) {
194 poolBlock
.free
= false;
195 readBlock
= poolBlock
;
196 blocksFree
.get().decrementAndGet();
201 readBlock
= new ReadBlock(blocksFree
.get());
204 globalReads
= globalReads
.cons(readBlock
);
206 readBlock
= globalReads
.first();
208 readBlock
.entries
[next
--] = vbox
;
213 public <T
> T
getBoxValue(VBox
<T
> vbox
) {
214 InplaceWrite
<T
> inplaceWrite
= vbox
.inplace
;
215 T value
= inplaceWrite
.tempValue
;
216 OwnershipRecord inplaceOrec
= inplaceWrite
.orec
;
218 if (inplaceOrec
.version
> 0 && inplaceOrec
.version
<= number
) {
219 value
= readGlobal(vbox
);
224 int entryNestedVersion
= inplaceOrec
.nestedVersion
;
225 int versionOnAnc
= retrieveAncestorVersion(inplaceOrec
.owner
);
226 if (versionOnAnc
>= 0) {
227 if (entryNestedVersion
> versionOnAnc
) {
228 // eager w-r conflict, may restart immediately
230 throw new CommitException(inplaceOrec
.owner
);
232 nestedReads
.put(vbox
, inplaceWrite
);
233 return (value
== NULL_VALUE
) ?
null : value
;
235 if (versionOnAnc
== -2) {
236 return (value
== NULL_VALUE
) ?
null : value
;
238 inplaceWrite
= inplaceWrite
.next
;
239 if (inplaceWrite
== null) {
242 value
= inplaceWrite
.tempValue
;
243 inplaceOrec
= inplaceWrite
.orec
;
246 if (boxesWritten
!= EMPTY_MAP
) {
247 value
= (T
) boxesWritten
.get(vbox
);
249 return (value
== NULL_VALUE
) ?
null : value
;
253 value
= readGlobal(vbox
);
259 public <T
> void setBoxValue(jvstm
.VBox
<T
> vbox
, T value
) {
260 InplaceWrite
<T
> inplaceWrite
= vbox
.inplace
;
261 OwnershipRecord currentOwner
= inplaceWrite
.orec
;
262 if (currentOwner
.owner
== this) { // we are already the current writer
263 inplaceWrite
.tempValue
= (value
== null ?
(T
) NULL_VALUE
: value
);
268 if (currentOwner
.version
!= 0) {
269 if (currentOwner
.version
<= this.number
) {
270 if (inplaceWrite
.CASowner(currentOwner
, this.orec
)) {
271 inplaceWrite
.tempValue
= (value
== null ?
(T
) NULL_VALUE
: value
);
272 boxesWrittenInPlace
= boxesWrittenInPlace
.cons(vbox
);
275 currentOwner
= inplaceWrite
.orec
;
278 // more recent than my number
281 if (retrieveAncestorVersion(currentOwner
.owner
) >= 0) {
282 if (vbox
.CASinplace(inplaceWrite
, new InplaceWrite
<T
>(this.orec
, (value
== null ?
(T
) NULL_VALUE
: value
),
286 inplaceWrite
= vbox
.inplace
;
287 currentOwner
= inplaceWrite
.orec
;
290 Transaction abortUpTo
= retrieveLowestCommonAncestor(currentOwner
.owner
);
291 // owner is not from this nesting tree
298 throw new WriteOnRootWriteSetException();
302 * Here we ensure that the array read is consistent with concurrent nested
306 protected <T
> T
getLocalArrayValue(VArrayEntry
<T
> entry
) {
307 if (this.arrayWrites
!= EMPTY_MAP
) {
308 VArrayEntry
<T
> wsEntry
= (VArrayEntry
<T
>) this.arrayWrites
.get(entry
);
309 if (wsEntry
!= null) {
310 return (wsEntry
.getWriteValue() == null ?
(T
) NULL_VALUE
: wsEntry
.getWriteValue());
314 ReadWriteTransaction iter
= getRWParent();
315 while (iter
!= null) {
316 if (iter
.arrayWrites
!= EMPTY_MAP
) {
317 VArrayEntry
<T
> wsEntry
= (VArrayEntry
<T
>) iter
.arrayWrites
.get(entry
);
318 if (wsEntry
== null) {
319 iter
= iter
.getRWParent();
323 if (wsEntry
.nestedVersion
<= retrieveAncestorVersion(iter
)) {
324 this.arraysRead
= this.arraysRead
.cons(entry
);
325 entry
.setReadOwner(iter
);
326 return (wsEntry
.getWriteValue() == null ?
(T
) NULL_VALUE
: wsEntry
.getWriteValue());
328 throw new CommitException(iter
);
331 iter
= iter
.getRWParent();
338 protected void finish() {
345 protected void doCommit() {
348 perTxValues
= EMPTY_MAP
;
353 protected void cleanUp() {
354 boxesWrittenInPlace
= null;
356 for (ReadBlock block
: globalReads
) {
358 block
.freeBlocks
.incrementAndGet();
364 protected NestedCommitRecord
helpCommitAll(NestedCommitRecord start
) {
365 NestedCommitRecord lastSeen
= start
;
366 NestedCommitRecord current
= lastSeen
.next
.get();
367 while (current
!= null) {
368 if (!current
.recordCommitted
) {
369 current
.helpCommit();
372 current
= current
.next
.get();
378 protected void tryCommit() {
379 ReadWriteTransaction parent
= getRWParent();
381 NestedCommitRecord lastSeen
;
382 NestedCommitRecord newCommit
;
385 lastSeen
= helpCommitAll(parent
.nestedCommitQueue
);
386 snapshotValidation();
387 newCommit
= new NestedCommitRecord(this, this.mergedTxs
, parent
.mergedTxs
, lastSeen
.commitNumber
);
388 } while (!lastSeen
.next
.compareAndSet(null, newCommit
));
390 lastSeen
= parent
.nestedCommitQueue
;
391 while ((lastSeen
!= null) && (lastSeen
.commitNumber
<= newCommit
.commitNumber
)) {
392 lastSeen
.helpCommit();
393 lastSeen
= lastSeen
.next
.get();
396 // Validate array reads and propagate them to the parent. Only a
397 // subset is propagated.
398 // At this point this transaction can no longer fail, thus the
399 // propagation is correct.
401 // Not supported at the moment
402 // parent.arraysRead = validateNestedArrayReads();
406 protected void snapshotValidation() {
407 if (retrieveAncestorVersion(parent
) == ((ReadWriteTransaction
) parent
).nestedCommitQueue
.commitNumber
) {
411 for (Map
.Entry
<VBox
, InplaceWrite
> read
: nestedReads
.entrySet()) {
412 validateNestedRead(read
);
415 for (ParallelNestedTransaction mergedTx
: mergedTxs
) {
416 for (Map
.Entry
<VBox
, InplaceWrite
> read
: mergedTx
.nestedReads
.entrySet()) {
417 validateNestedRead(read
);
421 if (!this.globalReads
.isEmpty()) {
422 validateGlobalReads(globalReads
, next
);
425 for (ParallelNestedTransaction mergedTx
: mergedTxs
) {
426 if (!mergedTx
.globalReads
.isEmpty()) {
427 validateGlobalReads(mergedTx
.globalReads
, mergedTx
.next
);
434 * Validate a single read that was a read-after-write over some ancestor
435 * write. Iterate over the inplace writes of that VBox: if an entry is found
436 * belonging to an ancestor, it must be the one that it was read, in which
437 * case the search stops.
439 protected void validateNestedRead(Map
.Entry
<VBox
, InplaceWrite
> read
) {
440 InplaceWrite inplaceRead
= read
.getValue();
441 InplaceWrite iter
= read
.getKey().inplace
;
443 if (iter
== inplaceRead
) {
446 int maxVersion
= retrieveAncestorVersion(iter
.orec
.owner
);
447 if (maxVersion
>= 0) {
449 throw new CommitException(iter
.orec
.owner
);
452 } while (iter
!= null);
456 * Validate a single read that obtained a VBoxBody Iterate over the inplace
457 * writes of that VBox: no entry may be found that belonged to an ancestor
459 protected void validateGlobalReads(Cons
<ReadBlock
> reads
, int startIdx
) {
460 VBox
[] array
= reads
.first().entries
;
461 // the first may not be full
462 for (int i
= startIdx
+ 1; i
< array
.length
; i
++) {
463 InplaceWrite iter
= array
[i
].inplace
;
465 int maxVersion
= retrieveAncestorVersion(iter
.orec
.owner
);
466 if (maxVersion
>= 0) {
468 throw new CommitException(iter
.orec
.owner
);
471 } while (iter
!= null);
475 for (ReadBlock block
: reads
.rest()) {
476 array
= block
.entries
;
477 for (int i
= 0; i
< array
.length
; i
++) {
478 InplaceWrite iter
= array
[i
].inplace
;
480 int maxVersion
= retrieveAncestorVersion(iter
.orec
.owner
);
481 if (maxVersion
>= 0) {
483 throw new CommitException(iter
.orec
.owner
);
486 } while (iter
!= null);
491 protected Cons
<VArrayEntry
<?
>> validateNestedArrayReads() {
492 Map
<VArrayEntry
<?
>, VArrayEntry
<?
>> parentArrayWrites
= getRWParent().arrayWrites
;
493 Cons
<VArrayEntry
<?
>> parentArrayReads
= getRWParent().arraysRead
;
494 int maxVersionOnParent
= retrieveAncestorVersion(parent
);
495 for (VArrayEntry
<?
> entry
: arraysRead
) {
497 // If the read was performed on an ancestor of the parent, then
499 // for further validation
500 if (entry
.owner
!= parent
) {
501 parentArrayReads
= parentArrayReads
.cons(entry
);
504 if (parentArrayWrites
!= EMPTY_MAP
) {
505 // Verify if the parent contains a more recent write for the
506 // read that we performed
507 // somewhere in our ancestors
508 VArrayEntry
<?
> parentWrite
= parentArrayWrites
.get(entry
);
509 if (parentWrite
== null) {
512 if (parentWrite
.nestedVersion
> maxVersionOnParent
) {
513 throw new CommitException(parent
);
518 return parentArrayReads
;