Bug #1507 CkIO: Avoid race condition of trying to send messages to an array that...
[charm.git] / src / libs / ck-libs / io / ckio.ci
blobd3674bced2606dfc1aaf05fecb09790880e4d00d
1 module CkIO {
3   namespace Ck { namespace IO {
4       message FileReadyMsg;
5       message SessionReadyMsg;
6       message SessionCommitMsg;
7     }
8   }
10   initnode _registerCkIO_impl();
13 module CkIO_impl {
14   include "ckio.h";
16   namespace Ck { namespace IO {
17       namespace impl {
18         readonly CProxy_Director director;
20         mainchare [migratable] Director
21         {
22           entry Director(CkArgMsg *);
24           /// Serialize setting up each file through this chare, so that all PEs
25           /// have the same sequence
26           entry void openFile(std::string name, CkCallback opened, Options opts);
27           entry [reductiontarget] void fileOpened(FileToken file);
28           entry [reductiontarget] void sessionComplete(FileToken file);
30           entry void prepareWriteSession(FileToken file, size_t bytes, size_t offset,
31                                          CkCallback ready, CkCallback complete) {
32             serial {
33               prepareWriteSession_helper(file, bytes, offset, ready, complete);
34             }
35             when sessionReady[files[file].sessionID](CkReductionMsg *m) serial {
36               delete m;
37               ready.send(new SessionReadyMsg(Session(file, bytes, offset,
38                                                      files[file].session)));
39             }
40           };
41           entry void prepareWriteSession(FileToken file, size_t bytes, size_t offset,
42                                          CkCallback ready, const char commitData[commitBytes],
43                                          size_t commitBytes, size_t commitOffset,
44                                          CkCallback complete) {
45             serial {
46               CkCallback committed(CkIndex_Director::sessionDone(NULL), thisProxy);
47               committed.setRefnum(++sessionID);
48               prepareWriteSession_helper(file, bytes, offset, ready, committed);
49             }
50             when sessionReady[files[file].sessionID](CkReductionMsg *m) serial {
51               delete m;
52               ready.send(new SessionReadyMsg(Session(file, bytes, offset,
53                                                      files[file].session)));
54             }
55             when sessionDone[files[file].sessionID](CkReductionMsg *m) serial {
56               delete m;
57               impl::FileInfo* info = CkpvAccess(manager)->get(file);
58               CmiInt8 ret = CmiPwrite(info->fd, commitData, commitBytes, commitOffset);
59               if (ret != commitBytes)
60                 fatalError("Commit write failed", info->name);
61               complete.send(CkReductionMsg::buildNew(0, NULL));
62             }
63           };
64           entry void sessionReady(CkReductionMsg *);
65           entry void sessionDone(CkReductionMsg *);
66           entry void close(FileToken token, CkCallback closed);
67         }
69         group [migratable] Manager
70         {
71           entry Manager();
72           entry void run() {
73             while (true) {
74               case {
75                 when openFile[opnum](unsigned int opnum_,
76                                      FileToken token, std::string name, Options opts)
77                   serial { prepareFile(token, name, opts); }
78                 when close[opnum](unsigned int opnum_, FileToken token, CkCallback closed)
79                   serial { doClose(token, closed); }
80               }
81               serial { opnum++; }
82             }
83           };
84           entry void openFile(unsigned int opnum,
85                               FileToken token, std::string name, Options opts);
86           entry void close(unsigned int opnum, FileToken token, CkCallback closed);
87         };
89         array [1D] WriteSession
90         {
91           entry WriteSession(FileToken file, size_t offset, size_t bytes);
92           entry void forwardData(const char data[bytes], size_t bytes, size_t offset);
93           entry void syncData();
94         };
96         group Map : CkArrayMap
97         {
98           entry Map();
99         };
100       }
101     }
102   }