From fff8f258e11aad1a0bd420e87d996659ee41a086 Mon Sep 17 00:00:00 2001 From: Phil Miller Date: Thu, 11 May 2017 16:38:46 -0500 Subject: [PATCH] Bug #1507 CkIO: Avoid race condition of trying to send messages to an array that the local PE hasn't seen created yet Change-Id: Ia7524d52c3f09edca226ad7ea19599a49084308f --- src/libs/ck-libs/io/ckio.C | 14 ++++++++++---- src/libs/ck-libs/io/ckio.ci | 21 ++++++++++++++++++--- 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/src/libs/ck-libs/io/ckio.C b/src/libs/ck-libs/io/ckio.C index 75a9fdaecd..51f4ea1b91 100644 --- a/src/libs/ck-libs/io/ckio.C +++ b/src/libs/ck-libs/io/ckio.C @@ -36,6 +36,7 @@ namespace Ck { namespace IO { CkCallback opened; Options opts; int fd; + int sessionID; CProxy_WriteSession session; CkCallback complete; @@ -107,9 +108,10 @@ namespace Ck { namespace IO { files[file].opened.send(new FileReadyMsg(file)); } - void prepareWriteSession(FileToken file, size_t bytes, size_t offset, - CkCallback ready, CkCallback complete) { + void prepareWriteSession_helper(FileToken file, size_t bytes, size_t offset, + CkCallback ready, CkCallback complete) { Options &opts = files[file].opts; + files[file].sessionID = sessionID; int numStripes = 0; size_t bytesLeft = bytes, delta = opts.peStripe - offset % opts.peStripe; @@ -123,13 +125,17 @@ namespace Ck { namespace IO { numStripes++; CkArrayOptions sessionOpts(numStripes); + sessionOpts.setStaticInsertion(true); + + CkCallback sessionInitDone(CkIndex_Director::sessionReady(NULL), thisProxy); + sessionInitDone.setRefnum(sessionID); + sessionOpts.setInitCallback(sessionInitDone); + //sessionOpts.setMap(managers); files[file].session = CProxy_WriteSession::ckNew(file, offset, bytes, sessionOpts); CkAssert(files[file].complete.isInvalid()); files[file].complete = complete; - ready.send(new SessionReadyMsg(Session(file, bytes, offset, - files[file].session))); } void sessionComplete(FileToken token) { diff --git a/src/libs/ck-libs/io/ckio.ci b/src/libs/ck-libs/io/ckio.ci index 300fefdda6..d3674bced2 100644 --- a/src/libs/ck-libs/io/ckio.ci +++ b/src/libs/ck-libs/io/ckio.ci @@ -28,7 +28,16 @@ module CkIO_impl { entry [reductiontarget] void sessionComplete(FileToken file); entry void prepareWriteSession(FileToken file, size_t bytes, size_t offset, - CkCallback ready, CkCallback complete); + CkCallback ready, CkCallback complete) { + serial { + prepareWriteSession_helper(file, bytes, offset, ready, complete); + } + when sessionReady[files[file].sessionID](CkReductionMsg *m) serial { + delete m; + ready.send(new SessionReadyMsg(Session(file, bytes, offset, + files[file].session))); + } + }; entry void prepareWriteSession(FileToken file, size_t bytes, size_t offset, CkCallback ready, const char commitData[commitBytes], size_t commitBytes, size_t commitOffset, @@ -36,9 +45,14 @@ module CkIO_impl { serial { CkCallback committed(CkIndex_Director::sessionDone(NULL), thisProxy); committed.setRefnum(++sessionID); - prepareWriteSession(file, bytes, offset, ready, committed); + prepareWriteSession_helper(file, bytes, offset, ready, committed); + } + when sessionReady[files[file].sessionID](CkReductionMsg *m) serial { + delete m; + ready.send(new SessionReadyMsg(Session(file, bytes, offset, + files[file].session))); } - when sessionDone[sessionID](CkReductionMsg *m) serial { + when sessionDone[files[file].sessionID](CkReductionMsg *m) serial { delete m; impl::FileInfo* info = CkpvAccess(manager)->get(file); CmiInt8 ret = CmiPwrite(info->fd, commitData, commitBytes, commitOffset); @@ -47,6 +61,7 @@ module CkIO_impl { complete.send(CkReductionMsg::buildNew(0, NULL)); } }; + entry void sessionReady(CkReductionMsg *); entry void sessionDone(CkReductionMsg *); entry void close(FileToken token, CkCallback closed); } -- 2.11.4.GIT