From d7592254d29bd09b23c7a585435133108534746c Mon Sep 17 00:00:00 2001 From: Jeff Darcy Date: Wed, 4 Aug 2010 13:23:26 -0400 Subject: [PATCH] Added manual re-replication trigger. --- notes.txt | 43 ++++++++++++++------ rest.c | 133 ++++++++++++++++++++++++++++++++++++++++++++++++++------------ 2 files changed, 139 insertions(+), 37 deletions(-) diff --git a/notes.txt b/notes.txt index 17f73d0..b1d7448 100644 --- a/notes.txt +++ b/notes.txt @@ -1,10 +1,13 @@ Commands: GET / - list buckets + list top-level objects POST / - create bucket (must not exist) + global control operations (none defined) + + POST /_factory bucket=x,... + create bucket with attributes GET /bucket list bucket contents @@ -13,7 +16,10 @@ Commands: query bucket contents POST /bucket - create object (must not exist) + bucket-level control operations + + POST /bucket/_factory key=x,... + create object with attributes DELETE /bucket delete bucket (must be empty) @@ -21,8 +27,8 @@ Commands: GET /bucket/object get body/attr* list - POST /bucket/object - set multiple attributes (object must exist) + POST /bucket/object op=repl + trigger re-replication DELETE /bucket/object delete object @@ -36,6 +42,9 @@ Commands: GET /bucket/object/attrs get all attributes + POST /bucket/object/attrs + set multiple attributes + GET /bucket/object/attr_X get attribute X @@ -45,7 +54,7 @@ Commands: DELETE /bucket/object/attr_X delete attribute X - GET /bucket/object/repl + POST /bucket/object/_control op=repl trigger re-replication Formats: @@ -53,10 +62,16 @@ Formats: JSON bucket list [ { + "type": "bucket_factory", + "path": ".../_factory" + }, + { + "type": "bucket", "name": "bucketA", "path": ".../bucketA" }, { + "type": "bucket", "name": "bucketB", "path": ".../bucketB" } @@ -65,10 +80,16 @@ Formats: JSON object list [ { + "type": "object_factory", + "path": ".../bucketA/_factory" + }, + { + "type": "object", "name": "objectC", "path": ".../bucketA/objectC" }, { + "type": "object", "name": "objectD", "path": ".../bucketA/objectD" } @@ -81,10 +102,6 @@ Formats: "path": ".../bucketA/objectC/body" }, { - "type": "repl_trigger", - "path": ".../bucketA/objectC/repl" - }, - { "type": "multi_attributes", "path": ".../bucketA/objectC/attrs" }, @@ -105,11 +122,13 @@ To Do - priority (1 highest) work (5 largest) desc: 1 1 policy inheritance 1 2 delete metadata as well as data 1 3 re-replicate on policy change (single object) - --- done 2 2 manual re-replication trigger + --- done + 1 5 auth 2 4 writes proxied upstream + 2 5 reconcile dispatch with object with commands/format above 3 3 content types - 3 3 get rid of strtok + 3 3 fix string handling (eliminate strtok) 3 4 re-replicate on policy change (default) 3 5 replicated DB 4 2 check for existence before queuing repl job diff --git a/rest.c b/rest.c index 73c129f..101e26b 100644 --- a/rest.c +++ b/rest.c @@ -141,13 +141,13 @@ validate_put (struct MHD_Connection *conn) } int -is_reserved (char *cand, char **resv_list) +is_reserved (char *cand, char **resv_list, char *allow) { int i; for (i = 0; resv_list[i]; ++i) { if (!strcmp(cand,resv_list[i])) { - return 1; + return !allow || strcmp(cand,allow); } } @@ -164,7 +164,7 @@ validate_url (const char *url) return 0; } - return !is_reserved(slash+1,reserved_name); + return !is_reserved(slash+1,reserved_name,NULL); } /********** @@ -965,7 +965,7 @@ proxy_put_attr (void *cctx, struct MHD_Connection *conn, const char *url, return MHD_NO; } ms->buf_ptr[ms->buf_len-1] = '\0'; - if (is_reserved(ms->attr,reserved_attr)) { + if (is_reserved(ms->attr,reserved_attr,NULL)) { resp = MHD_create_response_from_data( 0,NULL,MHD_NO,MHD_NO); if (!resp) { @@ -1254,20 +1254,21 @@ post_iterator (void *ctx, enum MHD_ValueKind kind, const char *key, return MHD_YES; } +/* Returns TRUE if we found an *invalid* key. */ gboolean post_find (gpointer key, gpointer value, gpointer ctx) { my_state *ms = ctx; + char fixed[1024]; + char *bucket; + char *stctx; - if (is_reserved(key,reserved_attr)) { - if (strcmp(key,"key")) { - DPRINTF("bad attr %s\n",key); - return TRUE; - } - strncpy(ms->key,value,MAX_FIELD_LEN-1); + if (!is_reserved(key,reserved_attr,"key")) { + return FALSE; } - return FALSE; + DPRINTF("bad attr %s\n",key); + return TRUE; } void @@ -1291,13 +1292,14 @@ post_foreach (gpointer key, gpointer value, gpointer ctx) } int -proxy_post (void *cctx, struct MHD_Connection *conn, const char *url, - const char *method, const char *version, const char *data, - size_t *data_size, void **rctx) +proxy_bucket_post (void *cctx, struct MHD_Connection *conn, const char *url, + const char *method, const char *version, const char *data, + size_t *data_size, void **rctx) { struct MHD_Response *resp; my_state *ms = *rctx; int rc; + char *key; DPRINTF("PROXY POST (%s, %llu)\n",url,*data_size); @@ -1316,7 +1318,9 @@ proxy_post (void *cctx, struct MHD_Connection *conn, const char *url, else { rc = MHD_HTTP_BAD_REQUEST; if (!g_hash_table_find(ms->dict,post_find,ms)) { - if (ms->key[0]) { + key = g_hash_table_lookup(ms->dict,"key"); + if (key) { + strncpy(ms->key,key,MAX_FIELD_LEN-1); g_hash_table_remove(ms->dict,"key"); g_hash_table_foreach(ms->dict,post_foreach,ms); rc = MHD_HTTP_OK; @@ -1336,30 +1340,109 @@ proxy_post (void *cctx, struct MHD_Connection *conn, const char *url, } return MHD_YES; +} + +void +do_replicate (my_state * ms) +{ + int rc; + char *policy = NULL; + + DPRINTF("fetching policy for %s/%s\n",ms->bucket,ms->key); + rc = meta_get_value(ms->bucket,ms->key, + "_policy", &policy); + if (rc != 0) { + DPRINTF(" inheriting policy from %s\n",ms->bucket); + rc = meta_get_value(ms->bucket, + "_default", "_policy", &policy); + } + + if (policy) { + DPRINTF(" implementing policy %s\n",policy); + replicate(ms->url,0,policy); + free(policy); + } +} + +int +proxy_object_post (void *cctx, struct MHD_Connection *conn, const char *url, + const char *method, const char *version, const char *data, + size_t *data_size, void **rctx) +{ + struct MHD_Response *resp; + my_state *ms = *rctx; + int rc; + char *op; + + DPRINTF("PROXY POST (%s, %llu)\n",url,*data_size); + + if (ms->state == MS_NEW) { + ms->state = MS_NORMAL; + ms->url = (char *)url; + ms->dict = g_hash_table_new_full( + g_str_hash,g_str_equal,free,free); + ms->post = MHD_create_post_processor(conn,4096, + post_iterator,ms->dict); + } + else if (*data_size) { + MHD_post_process(ms->post,data,*data_size); + *data_size = 0; + } + else { + rc = MHD_HTTP_BAD_REQUEST; + if (!g_hash_table_find(ms->dict,post_find,ms)) { + op = g_hash_table_lookup(ms->dict,"op"); + if (op) { + if (!strcmp(op,"repl")) { + do_replicate(ms); + } + else { + DPRINTF("unknown op %s for %s/%s\n", + op, ms->bucket, ms->key); + } + rc = MHD_HTTP_OK; + } + else { + DPRINTF("op is MISSING (fail)\n"); + } + } + g_hash_table_destroy(ms->dict); + resp = MHD_create_response_from_data(0,NULL,MHD_NO,MHD_NO); + if (!resp) { + fprintf(stderr,"MHD_crfd failed\n"); + return MHD_NO; + } + MHD_queue_response(conn,rc,resp); + MHD_destroy_response(resp); + } + + return MHD_YES; } rule proxy_rules[] = { { /* get bucket list */ - "GET", URL_ROOT, proxy_api_root }, + "GET", URL_ROOT, proxy_api_root }, { /* get object list */ - "GET", URL_BUCKET, proxy_list_objs }, + "GET", URL_BUCKET, proxy_list_objs }, { /* get object data */ - "GET", URL_OBJECT, proxy_get_data }, + "GET", URL_OBJECT, proxy_get_data }, { /* get attribute data */ - "GET", URL_ATTR, proxy_get_attr }, + "GET", URL_ATTR, proxy_get_attr }, { /* put object data */ - "PUT", URL_OBJECT, proxy_put_data }, + "PUT", URL_OBJECT, proxy_put_data }, { /* put attribute data */ - "PUT", URL_ATTR, proxy_put_attr }, + "PUT", URL_ATTR, proxy_put_attr }, { /* create object and/or modify attributes */ - "POST", URL_BUCKET, proxy_post }, + "POST", URL_BUCKET, proxy_bucket_post }, + { /* perform control operations on an object */ + "POST", URL_OBJECT, proxy_object_post }, { /* query */ - "POST", URL_QUERY, proxy_query }, + "POST", URL_QUERY, proxy_query }, { /* delete object */ - "DELETE", URL_OBJECT, proxy_delete }, + "DELETE", URL_OBJECT, proxy_delete }, { /* delete attribute (TBD) */ - "DELETE", URL_ATTR, NULL }, + "DELETE", URL_ATTR, NULL }, {} }; -- 2.11.4.GIT