It has been a while since I last worked on Aesalon proper.
[aesalon.git] / module / collector / Interface.c
blobafdd0d7167dde8df09f309452449999f92558187
1 #include <sys/mman.h>
2 #include <string.h>
3 #include <fcntl.h>
4 #include <stdio.h>
5 #include <unistd.h>
6 #include <stdlib.h>
7 #include <errno.h>
8 #include <time.h>
9 #include <sys/timerfd.h>
10 #include <pthread.h>
12 #include "collector/Interface.h"
14 static int AC_mmapFd, AC_heartbeatFd;
15 static uint8_t *AC_memory;
16 static AC_MemoryMapHeader *AC_header;
17 static uint8_t AC_heartbeatStatus;
18 static pthread_t AC_heartbeatThread;
20 /* Function prototypes for internally-used, non-exposed functions. */
21 static int AC_remainingSpace();
22 static void AC_writeData(void *data, size_t size);
23 static void *AC_sendHeartbeats(void *unused);
25 void AC_CONSTRUCTOR AC_constructor() {
26 char filename[64];
27 sprintf(filename, "AC-%i", getpid());
29 AC_mmapFd = shm_open(filename, O_RDWR, 0);
31 int shmSize = AC_configurationInt("global", "shmSize");
33 if(shmSize == 0) {
34 /* Default size is 128KB -- large enough not to overflow, small enough to not waste memory. */
35 shmSize = 131072;
38 AC_memory = mmap(NULL, shmSize, PROT_READ | PROT_WRITE, MAP_SHARED, AC_mmapFd, 0);
40 AC_header = (AC_MemoryMapHeader *)AC_memory;
42 /* NOTE: TFD_CLOEXEC was introduced in Linux 2.6.27; timerfd_create() is Linux-specific. */
43 if((AC_heartbeatFd = timerfd_create(CLOCK_REALTIME, TFD_CLOEXEC)) == -1) {
44 printf("Failed to create timer . . .\n");
45 return;
48 struct itimerspec its;
50 long heartbeatInterval = AC_configurationLong("global", "heartbeatInterval");
51 if(heartbeatInterval == 0) {
52 /* The default is 100 heartbeats per second, or a 10 millisecond interval. */
53 heartbeatInterval = 10;
56 /* heartbeatInterval is specified in milliseconds. Convert it to nanoseconds . . . */
57 heartbeatInterval *= 1000 * 1000;
59 its.it_interval.tv_sec = heartbeatInterval / 1000000000;
60 its.it_interval.tv_nsec = heartbeatInterval % 1000000000;
62 /* Fire off the timer as soon as is possible. */
63 its.it_value.tv_sec = 0;
64 its.it_value.tv_nsec = 1;
66 timerfd_settime(AC_heartbeatFd, 0, &its, NULL);
68 printf("Creating thread . . .\n");
69 pthread_create(&AC_heartbeatThread, NULL, AC_sendHeartbeats, NULL);
71 printf("Interface initialized.\n");
74 void AC_DESTRUCTOR AC_destructor() {
75 AC_heartbeatStatus = 0;
76 pthread_join(AC_heartbeatThread, NULL);
78 char filename[64];
79 sprintf(filename, "AC-%i", getpid());
80 shm_unlink(filename);
83 uint16_t AC_registerModuleInternal(const char *name) {
84 uint16_t id = ++AC_header->latestModule;
86 printf("AC_RegisterModule: registering module %s as id %i.\n", name, id);
88 AC_DataPacket packet;
89 packet.dataSource.moduleID = 0;
90 packet.dataSource.timestamp = AC_timestamp();
91 packet.dataSize = strlen(name) + 1; /* Plus one for the NULL. */
92 packet.data = (void *)name;
93 AC_writePacket(&packet);
94 return id;
97 int AC_remainingSpace() {
98 int used = 0;
100 if(AC_header->dataStart <= AC_header->dataEnd) {
101 used = AC_header->dataEnd - AC_header->dataStart;
103 else {
104 used = AC_header->dataSize - AC_header->dataStart;
105 used += AC_header->dataEnd;
107 return (AC_header->dataSize - AC_header->dataOffset) - used;
110 void AC_writeData(void *data, size_t size) {
111 /* If dataStart <= dataEnd, then the used memory is a contigious chunk. */
112 if(AC_header->dataStart <= AC_header->dataEnd) {
113 /* Two possible scenarios: the data fits on the end . . . */
114 if(size < (AC_header->dataSize - AC_header->dataEnd)) {
115 memcpy(AC_memory + AC_header->dataEnd, data, size);
116 AC_header->dataEnd += size;
118 /* And the data does not fit on the end. */
119 else {
120 size_t over = size - (AC_header->dataSize - AC_header->dataEnd);
121 size_t under = size - over;
123 memcpy(AC_memory + AC_header->dataEnd, data, under);
125 memcpy(AC_memory + AC_header->dataOffset, data + under, over);
127 AC_header->dataEnd = AC_header->dataOffset + over;
130 /* Else the used memory is in two separate chunks. */
131 else {
132 memcpy(AC_memory + AC_header->dataEnd, data, size);
133 AC_header->dataEnd += size;
137 void AC_writePacket(AC_DataPacket *packet) {
138 sem_wait(&AC_header->dataEndSemaphore);
140 int size = sizeof(packet->dataSource) + sizeof(packet->dataSize) + packet->dataSize;
142 while(AC_remainingSpace() < size) {
143 AC_header->dataOverflow = 1;
144 sem_wait(&AC_header->dataOverflowSemaphore);
146 AC_header->dataOverflow = 0;
148 AC_writeData(&packet->dataSource, sizeof(packet->dataSource));
149 AC_writeData(&packet->dataSize, sizeof(packet->dataSize));
150 if(packet->dataSize) AC_writeData(packet->data, packet->dataSize);
152 sem_post(&AC_header->dataEndSemaphore);
153 sem_post(&AC_header->dataSempahore);
155 int value;
156 sem_getvalue(&AC_header->dataSempahore, &value);
159 void *AC_sendHeartbeats(void *unused) {
160 AC_heartbeatStatus = 1;
161 while(AC_heartbeatStatus == 1) {
162 uint64_t exp;
163 read(AC_heartbeatFd, &exp, sizeof(exp));
165 AC_DataPacket packet;
166 packet.dataSource.timestamp = AC_timestamp();
167 packet.dataSource.moduleID = 0;
168 packet.dataSize = 0;
169 packet.data = NULL;
171 AC_writePacket(&packet);
173 return NULL;
176 AC_Timestamp AC_timestamp() {
177 struct timespec t;
178 clock_gettime(CLOCK_REALTIME, &t);
179 return ((AC_Timestamp)t.tv_sec * 1000000000) + t.tv_nsec;
182 uint8_t AC_hasCollectionBegun() {
183 return AC_header->mainReached;
186 AC_Address AC_EXPORT AC_libraryOffset(const char *name) {
187 int fd = open("/proc/self/maps", O_RDONLY);
189 if(!fd) {
190 printf("open() failed: %s\n", strerror(errno));
191 return 0;
194 char buffer[256];
195 int found = 0;
196 AC_Address address = 0;
197 int ret = 1;
199 while(ret > 0) {
200 char c = 0;
201 int pos = 0;
202 while((ret = read(fd, &c, sizeof(c))) && c != '\n') buffer[pos++] = c;
203 buffer[pos] = 0;
204 /*printf("Trying line \"%s\"\n", buffer);*/
206 char mode[128], path[128];
207 sscanf(buffer, "%lx-%*x %s %*s %*s %*s %s", &address, mode, path);
209 if(strcmp(mode, "r-xp")) {
210 continue;
212 if(!strncmp(strrchr(path, '/')+1, name, strlen(name))) {
213 close(fd);
214 return address;
218 close(fd);
219 return 0;
222 char *AC_configurationString(const char *module, const char *name) {
223 char envName[256];
224 snprintf(envName, 256, "ACM_%s_%s", module, name);
225 return getenv(envName);
228 int AC_configurationInt(const char *module, const char *name) {
229 char envName[256];
230 snprintf(envName, 256, "ACM_%s_%s", module, name);
231 char *envContent = getenv(envName);
232 if(envContent == NULL) return 0;
233 int content;
234 sscanf(envContent, "%i", &content);
235 return content;
238 long AC_configurationLong(const char *module, const char *name) {
239 char envName[256];
240 snprintf(envName, 256, "ACM_%s_%s", module, name);
241 char *envContent = getenv(envName);
242 if(envContent == NULL) return 0;
243 long content;
244 sscanf(envContent, "%li", &content);
245 return content;
248 int AC_configurationBool(const char *module, const char *name) {
249 char envName[256];
250 snprintf(envName, 256, "ACM_%s_%s", module, name);
251 char *envContent = getenv(envName);
252 if(envContent == NULL) return 0;
253 else if(!strcmp(envContent, "false")) return 0;
254 else if(!strcmp(envContent, "true")) return 1;
255 return 0;