Various code cleanups.
[aesalon.git] / module / collector / Interface.c
blob0df266f24bc031666a95b8ec1fba027c33a24b35
1 #include <sys/mman.h>
2 #include <string.h>
3 #include <fcntl.h>
4 #include <stdio.h>
5 #include <unistd.h>
6 #include <stdlib.h>
7 #include <errno.h>
8 #include <time.h>
9 #include <sys/timerfd.h>
10 #include <pthread.h>
12 #include "collector/Interface.h"
14 static int AC_mmapFd, AC_heartbeatFd;
15 static uint8_t *AC_memory;
16 static AC_MemoryMapHeader *AC_header;
17 static uint8_t AC_heartbeatStatus;
18 static pthread_t AC_heartbeatThread;
20 /* Function prototypes for internally-used, non-exposed functions. */
21 static int AC_remainingSpace();
22 static void AC_writeData(void *data, size_t size);
23 static void *AC_sendHeartbeats(void *unused);
25 void AC_CONSTRUCTOR AC_constructor() {
26 char filename[64];
27 sprintf(filename, "AC-%i", getpid());
29 AC_mmapFd = shm_open(filename, O_RDWR, 0);
31 int shmSize = AC_configurationInt("global", "shmSize");
33 if(shmSize == 0) {
34 /* Default size is 128KB -- large enough not to overflow, small enough to not waste memory. */
35 shmSize = 131072;
38 AC_memory = mmap(NULL, shmSize, PROT_READ | PROT_WRITE, MAP_SHARED, AC_mmapFd, 0);
40 AC_header = (AC_MemoryMapHeader *)AC_memory;
42 /* NOTE: TFD_CLOEXEC was introduced in Linux 2.6.27; timerfd_create() is Linux-specific. */
43 if((AC_heartbeatFd = timerfd_create(CLOCK_REALTIME, TFD_CLOEXEC)) == -1) {
44 printf("Failed to create timer . . .\n");
45 return;
48 struct itimerspec its;
50 long heartbeatInterval = AC_configurationLong("global", "heartbeatInterval");
51 if(heartbeatInterval == 0) {
52 /* The default is 100 heartbeats per second, or a 10 millisecond interval. */
53 heartbeatInterval = 10;
56 printf("Heartbeat interval: %li ms.\n", heartbeatInterval);
58 /* heartbeatInterval is specified in milliseconds. Convert it to nanoseconds . . . */
59 heartbeatInterval *= 1000 * 1000;
61 its.it_interval.tv_sec = heartbeatInterval / 1000000000;
62 its.it_interval.tv_nsec = heartbeatInterval % 1000000000;
64 /* Fire off the timer as soon as is possible. */
65 its.it_value.tv_sec = 0;
66 its.it_value.tv_nsec = 1;
68 timerfd_settime(AC_heartbeatFd, 0, &its, NULL);
70 pthread_create(&AC_heartbeatThread, NULL, AC_sendHeartbeats, NULL);
73 void AC_DESTRUCTOR AC_destructor() {
74 AC_heartbeatStatus = 0;
75 pthread_join(AC_heartbeatThread, NULL);
77 char filename[64];
78 sprintf(filename, "AC-%i", getpid());
79 shm_unlink(filename);
82 uint16_t AC_registerModuleInternal(const char *name) {
83 uint16_t id = ++AC_header->latestModule;
85 printf("AC_RegisterModule: registering module %s as id %i.\n", name, id);
87 AC_DataPacket packet;
88 packet.dataSource.moduleID = 0;
89 packet.dataSource.timestamp = AC_timestamp();
90 packet.dataSize = strlen(name) + 1; /* Plus one for the NULL. */
91 packet.data = (void *)name;
92 AC_writePacket(&packet);
93 return id;
96 int AC_remainingSpace() {
97 int used = 0;
99 if(AC_header->dataStart <= AC_header->dataEnd) {
100 used = AC_header->dataEnd - AC_header->dataStart;
102 else {
103 used = AC_header->dataSize - AC_header->dataStart;
104 used += AC_header->dataEnd;
106 return (AC_header->dataSize - AC_header->dataOffset) - used;
109 void AC_writeData(void *data, size_t size) {
110 /* If dataStart <= dataEnd, then the used memory is a contigious chunk. */
111 if(AC_header->dataStart <= AC_header->dataEnd) {
112 /* Two possible scenarios: the data fits on the end . . . */
113 if(size < (AC_header->dataSize - AC_header->dataEnd)) {
114 memcpy(AC_memory + AC_header->dataEnd, data, size);
115 AC_header->dataEnd += size;
117 /* And the data does not fit on the end. */
118 else {
119 size_t over = size - (AC_header->dataSize - AC_header->dataEnd);
120 size_t under = size - over;
122 memcpy(AC_memory + AC_header->dataEnd, data, under);
124 memcpy(AC_memory + AC_header->dataOffset, data + under, over);
126 AC_header->dataEnd = AC_header->dataOffset + over;
129 /* Else the used memory is in two separate chunks. */
130 else {
131 memcpy(AC_memory + AC_header->dataEnd, data, size);
132 AC_header->dataEnd += size;
136 void AC_writePacket(AC_DataPacket *packet) {
137 sem_wait(&AC_header->dataEndSemaphore);
139 int size = sizeof(packet->dataSource) + sizeof(packet->dataSize) + packet->dataSize;
141 while(AC_remainingSpace() < size) {
142 AC_header->dataOverflow = 1;
143 sem_wait(&AC_header->dataOverflowSemaphore);
145 AC_header->dataOverflow = 0;
147 AC_writeData(&packet->dataSource, sizeof(packet->dataSource));
148 AC_writeData(&packet->dataSize, sizeof(packet->dataSize));
149 if(packet->dataSize) AC_writeData(packet->data, packet->dataSize);
151 sem_post(&AC_header->dataEndSemaphore);
152 sem_post(&AC_header->dataSempahore);
154 int value;
155 sem_getvalue(&AC_header->dataSempahore, &value);
158 void *AC_sendHeartbeats(void *unused) {
159 AC_heartbeatStatus = 1;
160 while(AC_heartbeatStatus == 1) {
161 uint64_t exp;
162 read(AC_heartbeatFd, &exp, sizeof(exp));
164 AC_DataPacket packet;
165 packet.dataSource.timestamp = AC_timestamp();
166 packet.dataSource.moduleID = 0;
167 packet.dataSize = 0;
168 packet.data = NULL;
170 AC_writePacket(&packet);
172 return NULL;
175 AC_Timestamp AC_timestamp() {
176 struct timespec t;
177 clock_gettime(CLOCK_REALTIME, &t);
178 return ((AC_Timestamp)t.tv_sec * 1000000000) + t.tv_nsec;
181 uint8_t AC_hasCollectionBegun() {
182 return AC_header->mainReached;
185 AC_Address AC_EXPORT AC_libraryOffset(const char *name) {
186 int fd = open("/proc/self/maps", O_RDONLY);
188 if(!fd) {
189 printf("open() failed: %s\n", strerror(errno));
190 return 0;
193 char buffer[256];
194 int found = 0;
195 AC_Address address = 0;
196 int ret = 1;
198 while(ret > 0) {
199 char c = 0;
200 int pos = 0;
201 while((ret = read(fd, &c, sizeof(c))) && c != '\n') buffer[pos++] = c;
202 buffer[pos] = 0;
203 /*printf("Trying line \"%s\"\n", buffer);*/
205 char mode[128], path[128];
206 sscanf(buffer, "%lx-%*x %s %*s %*s %*s %s", &address, mode, path);
208 if(strcmp(mode, "r-xp")) {
209 continue;
211 if(!strncmp(strrchr(path, '/')+1, name, strlen(name))) {
212 close(fd);
213 return address;
217 close(fd);
218 return 0;
221 char *AC_configurationString(const char *module, const char *name) {
222 char envName[256];
223 snprintf(envName, 256, "ACM_%s_%s", module, name);
224 return getenv(envName);
227 int AC_configurationInt(const char *module, const char *name) {
228 char envName[256];
229 snprintf(envName, 256, "ACM_%s_%s", module, name);
230 char *envContent = getenv(envName);
231 if(envContent == NULL) return 0;
232 int content;
233 sscanf(envContent, "%i", &content);
234 return content;
237 long AC_configurationLong(const char *module, const char *name) {
238 char envName[256];
239 snprintf(envName, 256, "ACM_%s_%s", module, name);
240 char *envContent = getenv(envName);
241 if(envContent == NULL) return 0;
242 long content;
243 sscanf(envContent, "%li", &content);
244 return content;
247 int AC_configurationBool(const char *module, const char *name) {
248 char envName[256];
249 snprintf(envName, 256, "ACM_%s_%s", module, name);
250 char *envContent = getenv(envName);
251 if(envContent == NULL) return 0;
252 else if(!strcmp(envContent, "false")) return 0;
253 else if(!strcmp(envContent, "true")) return 1;
254 return 0;