3 namespace Ck { namespace IO {
5 message SessionReadyMsg;
6 message SessionCommitMsg;
10 initnode _registerCkIO_impl();
16 namespace Ck { namespace IO {
18 readonly CProxy_Director director;
20 mainchare [migratable] Director
22 entry Director(CkArgMsg *);
24 /// Serialize setting up each file through this chare, so that all PEs
25 /// have the same sequence
26 entry void openFile(std::string name, CkCallback opened, Options opts);
27 entry [reductiontarget] void fileOpened(FileToken file);
28 entry [reductiontarget] void sessionComplete(FileToken file);
30 entry void prepareWriteSession(FileToken file, size_t bytes, size_t offset,
31 CkCallback ready, CkCallback complete) {
33 prepareWriteSession_helper(file, bytes, offset, ready, complete);
35 when sessionReady[files[file].sessionID](CkReductionMsg *m) serial {
37 ready.send(new SessionReadyMsg(Session(file, bytes, offset,
38 files[file].session)));
41 entry void prepareWriteSession(FileToken file, size_t bytes, size_t offset,
42 CkCallback ready, const char commitData[commitBytes],
43 size_t commitBytes, size_t commitOffset,
44 CkCallback complete) {
46 CkCallback committed(CkIndex_Director::sessionDone(NULL), thisProxy);
47 committed.setRefnum(++sessionID);
48 prepareWriteSession_helper(file, bytes, offset, ready, committed);
50 when sessionReady[files[file].sessionID](CkReductionMsg *m) serial {
52 ready.send(new SessionReadyMsg(Session(file, bytes, offset,
53 files[file].session)));
55 when sessionDone[files[file].sessionID](CkReductionMsg *m) serial {
57 impl::FileInfo* info = CkpvAccess(manager)->get(file);
58 CmiInt8 ret = CmiPwrite(info->fd, commitData, commitBytes, commitOffset);
59 if (ret != commitBytes)
60 fatalError("Commit write failed", info->name);
61 complete.send(CkReductionMsg::buildNew(0, NULL, CkReduction::nop));
64 entry void sessionReady(CkReductionMsg *);
65 entry void sessionDone(CkReductionMsg *);
66 entry void close(FileToken token, CkCallback closed);
69 group [migratable] Manager
75 when openFile[opnum](unsigned int opnum_,
76 FileToken token, std::string name, Options opts)
77 serial { prepareFile(token, name, opts); }
78 when close[opnum](unsigned int opnum_, FileToken token, CkCallback closed)
79 serial { doClose(token, closed); }
84 entry void openFile(unsigned int opnum,
85 FileToken token, std::string name, Options opts);
86 entry void close(unsigned int opnum, FileToken token, CkCallback closed);
89 array [1D] WriteSession
91 entry WriteSession(FileToken file, size_t offset, size_t bytes);
92 entry void forwardData(const char data[bytes], size_t bytes, size_t offset);
93 entry void syncData();
96 group Map : CkArrayMap