convert kmeans_incr threshold
[actl.git] / actl.c
blob6a1674bbf49f1f02d4be4b2763a069d9a84d8781
1 /*
2 * Copyright (c) 2016 Mohamed Aslan <maslan@sce.carleton.ca>
4 * Permission to use, copy, modify, and distribute this software for any
5 * purpose with or without fee is hereby granted, provided that the above
6 * copyright notice and this permission notice appear in all copies.
8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 #include <stdio.h>
18 #include <stdlib.h>
19 #include <string.h>
20 #include <pthread.h>
21 #include <err.h>
22 #include <errno.h>
23 #include <unistd.h>
24 #include <sys/wait.h>
25 #include <dlfcn.h>
26 #include <arpa/inet.h>
28 #include <hashtab.h>
29 #include <dht.h>
30 #include <libof.h>
31 #include <of10.h>
33 #include "actl.h"
34 #include "conf.h"
35 #include "utils.h"
36 #include "model.h"
37 #include "clustering.h"
40 static struct actl_ctx ctx;
43 static void
44 adapt(struct actl_ctx *c, double kpi)
46 c->target_kpi = kpi;
47 c->adaptive = 1;
50 static double
51 map(struct actl_ctx *c, double kpi)
53 if (c->conf->c_lalgo == LEARN_KMEANS_SEQ)
54 return kmeans_seq_find(&c->kseq, kpi);
55 else
56 return kmeans_incr_find(&c->kincr, kpi);
59 static int
60 put(struct actl_ctx *c, const char *key, const char *val)
62 double target_phi;
63 int r = 1, w = 1;
64 struct timespec ts;
66 if (c->adaptive) {
67 target_phi = c->learning ? c->learning_phi : map(c, c->target_kpi);
68 phi2rw(target_phi, c->conf->c_nreplicas, &r, &w);
69 printf("[INFO] target: KPI = %lf -> Phi = %lf -> (R = %d, W = %d).\n", ctx.target_kpi, target_phi, r, w);
71 (void)clock_gettime(CLOCK_REALTIME, &ts);
72 return dht_put_tunable(c->node, key, val, &ts, w);
75 static int
76 get(struct actl_ctx *c, const char *key, char **val)
78 double target_phi;
79 int r = 1, w = 1;
81 if (c->adaptive) {
82 target_phi = c->learning ? c->learning_phi : map(c, c->target_kpi);
83 phi2rw(target_phi, c->conf->c_nreplicas, &r, &w);
84 printf("[INFO] target: KPI = %lf -> Phi = %lf -> (R = %d, W = %d).\n", ctx.target_kpi, target_phi, r, w);
86 return dht_get_tunable(c->node, key, val, r);
89 static void
90 topo(struct actl_ctx *c, struct host_info ***hosts, int *n)
92 *hosts = c->conf->c_topo;
93 *n = c->conf->c_topo_n;
96 static void
97 init_context(struct actl_ctx *c)
99 if (!c)
100 return;
101 c->cntl = (struct of_controller *)xmalloc(sizeof(struct of_controller));
102 c->node = (struct dht_node *)xmalloc(sizeof(struct dht_node));
103 c->conf = (struct config *)xmalloc(sizeof(struct config));
104 c->adaptive = 0;
105 c->learning = 1;
106 c->adapt = adapt;
107 c->learning_phi = 0;
108 c->put = put;
109 c->get = get;
110 c->topo = topo;
113 static void
114 load_app(struct actl_ctx *c)
116 if (!c)
117 return;
118 /* load application */
119 c->conf->c_app_so = dlopen(c->conf->c_app_name, RTLD_NOW);
120 if (!c->conf->c_app_so)
121 errx(1, "error loading application %s: %s", c->conf->c_app_name, dlerror());
122 /* init */
123 c->conf->c_app.a_init = dlsym(c->conf->c_app_so, "init");
124 if (!(c->conf->c_app.a_init))
125 errx(1, "error loading symbol from %s: %s", c->conf->c_app_name, dlerror());
126 /* main */
127 c->conf->c_app.a_main = dlsym(c->conf->c_app_so, "main");
128 if (!c->conf->c_app.a_main)
129 errx(1, "error loading symbol from %s: %s", c->conf->c_app_name, dlerror());
130 /* handler */
131 c->conf->c_app.a_handler = dlsym(c->conf->c_app_so, "handler");
132 if (!c->conf->c_app.a_handler)
133 errx(1, "error loading symbol from %s: %s", c->conf->c_app_name, dlerror());
134 /* handler */
135 c->conf->c_app.a_kpi = dlsym(c->conf->c_app_so, "kpi");
136 if (!c->conf->c_app.a_kpi)
137 errx(1, "error loading symbol from %s: %s", c->conf->c_app_name, dlerror());
140 static void
141 poll_kpi(struct of_controller *ctl)
143 double cur_kpi, cur_phi, tar_phi;
145 if (ctx.conf->c_app_so && ctx.adaptive) {
146 cur_kpi = ctx.conf->c_app.a_kpi();
147 if (ctx.learning) {
148 if (ctx.conf->c_lalgo == LEARN_KMEANS_SEQ)
149 kmeans_seq_insert(&ctx.kseq, cur_kpi, ctx.learning_phi);
150 else
151 kmeans_incr_insert(&ctx.kincr, cur_kpi, ctx.learning_phi);
152 printf("[INFO] learned KPI = %lf -> Phi = %lf.\n", cur_kpi, ctx.learning_phi);
153 } else {
154 /* TODO: insert point */
155 cur_phi = map(&ctx, cur_kpi);
156 tar_phi = map(&ctx, ctx.target_kpi);
157 printf("[INFO] KPI = (target %lf, current %lf), Phi = (target %lf, current %lf).\n", ctx.target_kpi, cur_kpi, tar_phi, cur_phi);
158 f_log(ctx.conf->c_logfp, "%lf, %lf, %lf, %lf\n", ctx.target_kpi, tar_phi, cur_kpi, cur_phi);
163 static void
164 learn_timer(struct of_controller *ctl)
166 if (ctx.conf->c_app_so && ctx.adaptive && ctx.learning) {
167 ctx.conf->c_ltime -= ctx.learn_perval;
168 if (ctx.conf->c_ltime <= 0) {
169 ctx.learning = 0;
170 printf("[INFO] learning finished.\n");
171 if (ctx.conf->c_lalgo == LEARN_KMEANS_SEQ)
172 kmeans_seq_print(&ctx.kseq, "LEARN");
173 else
174 kmeans_incr_print(&ctx.kincr, "LEARN");
176 ctx.learning_phi += 0.1;
180 static void *
181 dht_thread(void *arg)
183 dht_event_loop(ctx.node);
184 return NULL;
187 static void *
188 controller_thread(void *arg)
190 /* init default app */
191 if (ctx.conf->c_app_so) {
192 optind = 0; /* make sure apps can use getopt(3) or getopt_long(3) */
193 ctx.conf->c_app.a_init(&ctx);
194 printf("[INFO] adaptation period is %d sec.\n", ctx.conf->c_atime);
195 ctx.conf->c_app.a_main(ctx.conf->c_app_argc, ctx.conf->c_app_argv);
197 ctx.learn_perval = ctx.conf->c_ltime / 10;
198 ctx.cntl->timer(ctx.cntl, ctx.learn_perval * 1000, learn_timer);
199 /* poll application-specific KPI */
200 ctx.cntl->timer(ctx.cntl, ctx.conf->c_atime * 1000, poll_kpi);
201 ctx.cntl->loop(ctx.cntl);
202 return NULL;
205 static void
206 handler(struct of_controller *ctl, struct of_event *ev)
208 #ifdef DEBUG
209 printf("actl: handler.\n");
210 #endif
211 if (ctx.conf->c_app_so)
212 ctx.conf->c_app.a_handler(ev);
216 main(int argc, char **argv)
218 char *homedir, *conf_file;
219 int ch, i, flag_altconf = 0, flag_error = 0;
220 pthread_t dtid, ctid;
223 /* init global context */
224 init_context(&ctx);
226 /* parse command line options */
227 while ((ch = getopt(argc, argv, "c:")) != -1) {
228 switch (ch) {
229 case 'c':
230 flag_altconf = 1;
231 conf_file = strdup(optarg);
232 break;
233 default:
234 flag_error = 1;
235 break;
238 argc -= optind;
239 argv += optind;
241 if (flag_error) {
242 return EXIT_FAILURE; /* TODO: usage */
244 if (!flag_altconf) {
245 if ((homedir = getenv("HOME")) == NULL) {
246 /* TODO: look in current directory */
247 errx(1, "failed to read environmental variable \'$HOME\'");
249 asprintf(&conf_file, "%s/%s", homedir, CONFFILENAME); /* XXX: null */
251 /* parse config file */
252 if (parse_config(ctx.conf, conf_file))
253 errx(1, "error parsing config file.");
255 /* init clustering algorithms */
256 if (ctx.conf->c_lalgo == LEARN_KMEANS_SEQ)
257 kmeans_seq_init(&ctx.kseq, ctx.conf->c_lparam);
258 else
259 kmeans_incr_init(&ctx.kincr, ((ctx.conf->c_lparam * 1.0) / 100));
261 /* open log file */
262 if (ctx.conf->c_logfile) {
263 ctx.conf->c_logfp = fopen(ctx.conf->c_logfile, "w");
264 if (!ctx.conf->c_logfp)
265 errx(1, "error creating log file.");
268 /* load application */
269 load_app(&ctx);
271 #ifdef DEBUG
272 printf("config info...\n");
273 printf("\tnode id \'%s\', control port \'%d\', switch port \'%d\'.\n",
274 ctx.conf->c_nodeid, ctx.conf->c_co_port, ctx.conf->c_sw_port);
275 printf("\topenflow version \'%s\'.\n", ctx.conf->c_ofp_verstr);
276 printf("\tnumber of replicas \'%d\'.\n", ctx.conf->c_nreplicas);
277 #endif
279 /* initialize DHT */
280 if (!dht_init(ctx.node, ctx.conf->c_nodeid, ctx.conf->c_co_port, ctx.conf->c_nreplicas, 0, NULL))
281 errx(1, "dht_init");
282 else
283 printf("dht init successful.\n");
284 for (i = 0 ; i < ctx.conf->c_npeers ; i++) {
285 dht_add_peer(ctx.node, ctx.conf->c_peers[i].node_id, inet_ntoa(ctx.conf->c_peers[i].ip_addr), ctx.conf->c_peers[i].port);
286 printf("\tpeer %d: id: \'%s\', ip=%s, port=%u.\n", (i + 1), ctx.conf->c_peers[i].node_id, inet_ntoa(ctx.conf->c_peers[i].ip_addr), ctx.conf->c_peers[i].port);
288 pthread_create(&dtid, NULL, dht_thread, NULL);
290 /* initialize OF controller */
291 if (of_controller_init(ctx.cntl, ctx.conf->c_sw_port, ctx.conf->c_ofp()))
292 errx(1, "init failed.");
293 else
294 printf("controller init successful.\n");
295 ctx.cntl->handler(ctx.cntl, handler);
296 pthread_create(&ctid, NULL, controller_thread, NULL);
298 /* wait */
299 pthread_join(dtid, NULL);
300 pthread_join(ctid, NULL);
302 fclose(ctx.conf->c_logfp);
303 return 0;