From 9a3a2e3b9390a7a3edd9c24e7013c535b2e22240 Mon Sep 17 00:00:00 2001 From: Thomas Harning Jr Date: Sat, 18 Aug 2007 20:57:33 +0000 Subject: [PATCH] * Committing what will be version 0.1.2 Main feature: Callback/coroutine issues resolved as described in COROUTINE_MANAGEMENT --- luaevent/CHANGELOG | 9 +++ luaevent/COROUTINE_MANAGEMENT | 39 +++++++++ luaevent/include/luaevent.h | 8 +- luaevent/luaevent.lua | 84 ++++++++++++-------- luaevent/src/luaevent.c | 179 ++++++++++++++++++------------------------ luaevent/test/test.lua | 34 +++++--- luaevent/test/testClient.lua | 48 +++++------ 7 files changed, 223 insertions(+), 178 deletions(-) create mode 100644 luaevent/COROUTINE_MANAGEMENT rewrite luaevent/test/testClient.lua (91%) diff --git a/luaevent/CHANGELOG b/luaevent/CHANGELOG index c3622c8..f2f964d 100644 --- a/luaevent/CHANGELOG +++ b/luaevent/CHANGELOG @@ -1,4 +1,13 @@ ====== +0.1.2 - Revision 15 - 2007-08-18 ++ Setup system to use new coro management as described in COROUTINE_MANAGEMENT + The callbacks are called from the event_loop 'thread' rather than that which they are + created in. This will prevent the self-resume problem as well as dead-thread problems. +- Recognized issues to fix in next release: + * Socket/event closing needs to be cleaned + * luaevent.lua needs refactoring + * luaevent.[ch] need to be cleaned up +====== 0.1.1 - Revision 14 - 2007-06-13 + Fixed event-handling code to cancel events on nothing being returned + Added socket/object cleanup. diff --git a/luaevent/COROUTINE_MANAGEMENT b/luaevent/COROUTINE_MANAGEMENT new file mode 100644 index 0000000..1af0cb4 --- /dev/null +++ b/luaevent/COROUTINE_MANAGEMENT @@ -0,0 +1,39 @@ +Due to the issue w/ self-resuming threads and crashing out threads, +a management system needs to be in place. + +Example thread system: + +MAIN +EVENT_LOOP --------running--- +WAITING ON READ +WAITING ON WRITE +WAITING ON CONNECT + + +Since main and the other 'waiting' threads are yielded, it is unsafe to call things arbitrarily on them +or resume them from themselves... +However the EVENT_LOOP one is running and thus can execute the callbacks (which can resume the threads) +Each of the 'waiting' events are attached to an event and contain a pointer, this pointer can be setup to point +to a per event_base item which will be updated w/ the lua_State of whatever calls EVENT_LOOP... +this will guarantee that the thread will be resumed from the currently running EVENT_LOOP + + +Other system that's more complicated and less likely: + +MAIN +EVENT_LOOP a -----running--- + +WAITING ON READ a +WAITING ON WRITE a + +EVENT_LOOP b ----yielded +WAITING ON READ b + + +Since there can only be one event_loop running per event_base, you do not have to worry about +cross-pollination of the different waits... + +NOTES: +If the event_loop thread goes away... then the waiting coroutines will have no way to get back... +though in this case, they are dead in the water anyways.. until a new event_loop starts... +in which case the lua_State references has been updated... \ No newline at end of file diff --git a/luaevent/include/luaevent.h b/luaevent/include/luaevent.h index 8b97be7..2878632 100644 --- a/luaevent/include/luaevent.h +++ b/luaevent/include/luaevent.h @@ -9,10 +9,14 @@ #include typedef struct { + struct event_base* base; + lua_State* loop_L; +} le_base; + +typedef struct { struct event ev; - lua_State* L; + le_base* base; int callbackRef; - int objectRef; /* TEMP */ } le_callback; int luaopen_luaevent(lua_State* L); diff --git a/luaevent/luaevent.lua b/luaevent/luaevent.lua index c6bd015..ec9bc15 100644 --- a/luaevent/luaevent.lua +++ b/luaevent/luaevent.lua @@ -7,9 +7,37 @@ require("luaevent.core") local EV_READ = luaevent.core.EV_READ local EV_WRITE = luaevent.core.EV_WRITE -local fair = false +local fair = false -- Not recommended for most cases... +local base = luaevent.core.new() +local sockMap = setmetatable({}, {'__mode', 'kv'}) +local function addevent(sock, ...) + local item = base:addevent(sock, ...) + if not item then print("FAILED TO SETUP ITEM") return item end + local fd = sock:getfd() + sockMap[item] = fd + print("SETUP ITEM FOR: ", fd) + if not hookedObjectMt then + hookedObjectMt = true + --[[ + local mt = debug.getmetatable(item) + local oldGC = mt.__gc + mt.__gc = function(...) + print("RELEASING ITEM FOR: ", sockMap[(...)]) + return oldGC(...) + end]] + end + return item +end +-- Weak keys.. the keys are the client sockets +local clientTable = setmetatable({}, {'__mode', 'kv'}) -local hookedObjectMt = false +local function getWrapper() + local running = coroutine.running() + return function(...) + if coroutine.running() == running then return end + return select(2, coroutine.resume(running, ...)) + end +end function send(sock, data, start, stop) local s, err @@ -21,9 +49,11 @@ function send(sock, data, start, stop) -- Add extra coro swap for fairness -- CURRENTLY DISABLED FOR TESTING...... if fair and math.random(100) > 90 then + if not clientTable[sock] then clientTable[sock] = addevent(sock, EV_WRITE, getWrapper()) end coroutine.yield(EV_WRITE) end if s or err ~= "timeout" then return s, err, sent end + if not clientTable[sock] then clientTable[sock] = addevent(sock, EV_WRITE, getWrapper()) end coroutine.yield(EV_WRITE) until false end @@ -33,6 +63,7 @@ function receive(sock, pattern, part) repeat s, err, part = sock:receive(pattern, part) if s or err ~= "timeout" then return s, err, part end + if not clientTable[sock] then clientTable[sock] = addevent(sock, EV_READ, getWrapper()) end coroutine.yield(EV_READ) until false end @@ -45,6 +76,7 @@ function receivePartial(client, pattern) s, err, part = client:receive(pattern) if s or ( (type(pattern)=="number") and part~="" and part ~=nil ) or err ~= "timeout" then return s, err, part end + if not clientTable[sock] then clientTable[sock] = addevent(sock, EV_READ, getWrapper()) end coroutine.yield(EV_READ) until false end @@ -52,6 +84,7 @@ function connect(sock, ...) sock:settimeout(0) local ret, err = sock:connect(...) if ret or err ~= "timeout" then return ret, err end + if not clientTable[sock] then clientTable[sock] = addevent(sock, EV_WRITE, getWrapper()) end coroutine.yield(EV_WRITE) ret, err = sock:connect(...) if err == "already connected" then @@ -66,74 +99,59 @@ local function clientCoroutine(sock, handler) -- Figure out what to do ...... return handler(sock) end - +local function handleClient(co, client, handler) + local ok, res, event = coroutine.resume(co, client, handler) +end local function serverCoroutine(sock, callback) + local listenItem = addevent(sock, EV_READ, getWrapper()) repeat local event = coroutine.yield(EV_READ) -- Get new socket local client = sock:accept() if client then - --cl[#cl + 1] = client client:settimeout(0) - local coFunc = coroutine.wrap(clientCoroutine) - luaevent.core.addevent(client, coFunc, client, callback) + local co = coroutine.create(clientCoroutine) + handleClient(co, client, callback) end until false end - -local oldAddEvent = luaevent.core.addevent -luaevent.core.addevent = function(...) - local item = oldAddEvent(...) - if not item then print("FAILED TO SETUP ITEM") return item end - print("SETUP ITEM FOR: ", debug.getmetatable(item).getfd(item)) - if not hookedObjectMt then - hookedObjectMt = true - local mt = debug.getmetatable(item) - local oldGC = mt.__gc - mt.__gc = function(...) - print("RELEASING ITEM FOR: ", mt.getfd(...)) - return oldGC(...) - end - end - return item -end - function addserver(sock, callback) - local coFunc = coroutine.wrap(serverCoroutine) - luaevent.core.addevent(sock, coFunc, sock, callback) + local coro = coroutine.create(serverCoroutine) + assert(coroutine.resume(coro, sock, callback)) end -function addthread(sock, func, ...) - local coFunc = coroutine.wrap(func) - luaevent.core.addevent(sock, coFunc, ...) +function addthread(func, ...) + return coroutine.resume(coroutine.create(func), ...) end local _skt_mt = {__index = { connect = function(self, ...) return connect(self.socket, ...) end, send = function (self, data) - return send (self.socket, data) + return send(self.socket, data) end, receive = function (self, pattern) if (self.timeout==0) then return receivePartial(self.socket, pattern) end - return receive (self.socket, pattern) + return receive(self.socket, pattern) end, flush = function (self) - return flush (self.socket) + return flush(self.socket) end, settimeout = function (self,time) self.timeout=time return end, + close = function(self) + clientTable[self.socket]:close() self.socket:close() end }} function wrap(sock) return setmetatable({socket = sock}, _skt_mt) end -loop = luaevent.core.loop \ No newline at end of file +loop = function(...) base:loop(...) end \ No newline at end of file diff --git a/luaevent/src/luaevent.c b/luaevent/src/luaevent.c index e5e33cf..38f19ce 100644 --- a/luaevent/src/luaevent.c +++ b/luaevent/src/luaevent.c @@ -1,111 +1,87 @@ /* LuaEvent - Copyright (C) 2007 Thomas Harning * Licensed as LGPL - See doc/COPYING for details */ - -#include "luaevent.h" + #include "luaevent.h" #include #include +#include #define EVENT_BASE_MT "EVENT_BASE_MT" #define EVENT_CALLBACK_ARG_MT "EVENT_CALLBACK_ARG_MT" -#define EVENT_BASE_LOCATION 1 +#define MAIN_THREAD_LOCATION 1 -static void setEventBase(lua_State* L, struct event_base* base) { - struct event_base** pbase = lua_newuserdata(L, sizeof(base)); - *pbase = base; - luaL_getmetatable(L, EVENT_BASE_MT); - lua_setmetatable(L, -2); - lua_rawseti(L, LUA_ENVIRONINDEX, EVENT_BASE_LOCATION); +void setMainThread(lua_State* L) { + lua_pushthread(L); + lua_rawseti(L, LUA_ENVIRONINDEX, MAIN_THREAD_LOCATION); } -struct event_base* getEventBase(lua_State* L) { - struct event_base* base; - lua_rawgeti(L, LUA_ENVIRONINDEX, EVENT_BASE_LOCATION); - base = *(struct event_base**)lua_topointer(L, -1); +lua_State* getMainThread(lua_State* L) { + lua_State* g_L; + lua_rawgeti(L, LUA_ENVIRONINDEX, MAIN_THREAD_LOCATION); + g_L = lua_tothread(L, -1); lua_pop(L, 1); - return base; + return g_L; } -static void freeCallbackArgs(le_callback* arg) { - if(arg->L) { - lua_State* L = arg->L; - arg->L = NULL; - event_del(&arg->ev); - luaL_unref(L, LUA_REGISTRYINDEX, arg->callbackRef); - luaL_unref(L, LUA_REGISTRYINDEX, arg->objectRef); - } +int luaevent_newbase(lua_State* L) { + le_base *base = (le_base*)lua_newuserdata(L, sizeof(le_base)); + base->loop_L = NULL; /* No running loop */ + base->base = event_init(); + luaL_getmetatable(L, EVENT_BASE_MT); + lua_setmetatable(L, -2); + return 1; } -static int call_callback_function(lua_State* L, int argCount) { - int ret; - if(lua_pcall(L, argCount, 1, 0) || !(lua_isnil(L, -1) || lua_isnumber(L, -1))) { - printf("ERROR IN INIT: %s\n", lua_tostring(L, -1)); - lua_pop(L, 1); - return -1; - } - /* Lua_isnil returns 1 if the value is nil... */ - ret = lua_tointeger(L, -1) | -lua_isnil(L, -1); - lua_pop(L, 1); - if(ret < 0) { /* Done, no need to setup event */ - return -1; - } - if(ret != EV_READ && ret != EV_WRITE) { - printf("BAD RET_VAL IN INIT: %i\n", ret); +void freeCallbackArgs(le_callback* arg, lua_State* L) { + if(arg->base) { + arg->base = NULL; + event_del(&arg->ev); + luaL_unref(L, LUA_REGISTRYINDEX, arg->callbackRef); } - return ret; } - -static void luaevent_callback(int fd, short event, void* p); - -static void setup_event(le_callback* arg, int fd, short event, int resetEvent) { - /* Setup event... */ - if(resetEvent) event_del(&arg->ev); - event_set(&arg->ev, fd, event| EV_PERSIST, luaevent_callback, arg); - if(!resetEvent) event_base_set(getEventBase(arg->L), &arg->ev); - event_add(&arg->ev, NULL); -} - /* le_callback is allocated at the beginning of the coroutine in which it is used, no need to manually de-allocate */ /* Index for coroutine is fd as integer for *nix, as lightuserdata for Win */ static void luaevent_callback(int fd, short event, void* p) { le_callback* arg = p; - lua_State* L = arg->L; + lua_State* L; int ret; + assert(arg && arg->base && arg->base->loop_L); + L = arg->base->loop_L; lua_rawgeti(L, LUA_REGISTRYINDEX, arg->callbackRef); lua_pushinteger(L, event); - - if(-1 == (ret = call_callback_function(L, 1))) { - freeCallbackArgs(arg); - return; + lua_call(L, 1, 1); + ret = lua_tointeger(L, -1); + lua_pop(L, 1); + if(ret == -1) { + freeCallbackArgs(arg, L); + } else { + struct event *ev = &arg->ev; + int newEvent = ret; + if(newEvent != event) { // Need to hook up new event... + event_del(ev); + event_set(ev, fd, EV_PERSIST | newEvent, luaevent_callback, arg); + event_add(ev, NULL); + } } - - if(event != ret) - setup_event(arg, fd, ret, 1); } static int luaevent_base_gc(lua_State* L) { - struct event_base** pbase = luaL_checkudata(L, 1, EVENT_BASE_MT); - if(*pbase) { - event_base_free(*pbase); - *pbase = NULL; + le_base *base = luaL_checkudata(L, 1, EVENT_BASE_MT); + if(base->base) { + event_base_free(base->base); + base->base = NULL; } return 0; } static int luaevent_cb_gc(lua_State* L) { le_callback* arg = luaL_checkudata(L, 1, EVENT_CALLBACK_ARG_MT); - freeCallbackArgs(arg); + freeCallbackArgs(arg, L); return 0; } -static int luaevent_cb_getfd(lua_State* L) { - le_callback* arg = luaL_checkudata(L, 1, EVENT_CALLBACK_ARG_MT); - lua_pushinteger(L, arg->ev.ev_fd); - return 1; -} - -static int getSocketFd(lua_State* L, int idx) { +int getSocketFd(lua_State* L, int idx) { int fd; luaL_checktype(L, idx, LUA_TUSERDATA); lua_getfield(L, idx, "getfd"); @@ -118,52 +94,48 @@ static int getSocketFd(lua_State* L, int idx) { return fd; } -static void push_new_callback(lua_State* L, int callbackRef, int fd, short event) { - le_callback* arg = lua_newuserdata(L, sizeof(*arg)); +/* sock, event, callback */ +static int luaevent_addevent(lua_State* L) { + int fd, event, callbackRef; + le_callback* arg; + le_base *base = luaL_checkudata(L, 1, EVENT_BASE_MT); + fd = getSocketFd(L, 2); + event = luaL_checkinteger(L, 3); + luaL_checktype(L, 4, LUA_TFUNCTION); + lua_pushvalue(L, 4); + callbackRef = luaL_ref(L, LUA_REGISTRYINDEX); + arg = lua_newuserdata(L, sizeof(*arg)); luaL_getmetatable(L, EVENT_CALLBACK_ARG_MT); lua_setmetatable(L, -2); - arg->L = L; + arg->base = base; arg->callbackRef = callbackRef; - lua_pushvalue(L, -1); - arg->objectRef = luaL_ref(L, LUA_REGISTRYINDEX); - setup_event(arg, fd, event, 0); -} -/* Expected to be called at the beginning of the coro that uses it.. -Value must be kept until coro is complete.... -*/ -/* sock, callback */ -static int luaevent_addevent(lua_State* L) { - int fd, callbackRef; - int top, ret; - fd = getSocketFd(L, 1); - luaL_checktype(L, 2, LUA_TFUNCTION); - top = lua_gettop(L); - /* Preserve the callback function */ - lua_pushvalue(L, 2); - callbackRef = luaL_ref(L, LUA_REGISTRYINDEX); - /* Call the callback with all arguments after it to get the loop primed.. */ - if(-1 == (ret = call_callback_function(L, top - 2))) { - luaL_unref(L, LUA_REGISTRYINDEX, callbackRef); - return 0; - } - - push_new_callback(L, callbackRef, fd, ret); + /* Setup event... */ + event_set(&arg->ev, fd, event | EV_PERSIST, luaevent_callback, arg); + event_base_set(base->base, &arg->ev); + event_add(&arg->ev, NULL); return 1; } static int luaevent_loop(lua_State* L) { - int ret = event_base_loop(getEventBase(L), 0); + le_base *base = luaL_checkudata(L, 1, EVENT_BASE_MT); + base->loop_L = L; + int ret = event_base_loop(base->base, 0); lua_pushinteger(L, ret); return 1; } -static luaL_Reg funcs[] = { +static luaL_Reg base_funcs[] = { { "addevent", luaevent_addevent }, { "loop", luaevent_loop }, { NULL, NULL } }; +static luaL_Reg funcs[] = { + { "new", luaevent_newbase }, + { NULL, NULL } +}; + typedef struct { const char* name; int value; @@ -191,18 +163,21 @@ int luaopen_luaevent_core(lua_State* L) { lua_replace(L, LUA_ENVIRONINDEX); /* Setup metatable */ luaL_newmetatable(L, EVENT_BASE_MT); + lua_newtable(L); + luaL_register(L, NULL, base_funcs); + lua_setfield(L, -2, "__index"); lua_pushcfunction(L, luaevent_base_gc); lua_setfield(L, -2, "__gc"); lua_pop(L, 1); luaL_newmetatable(L, EVENT_CALLBACK_ARG_MT); lua_pushcfunction(L, luaevent_cb_gc); lua_setfield(L, -2, "__gc"); - lua_pushcfunction(L, luaevent_cb_getfd); - lua_setfield(L, -2, "getfd"); + lua_newtable(L); + lua_pushcfunction(L, luaevent_cb_gc); + lua_setfield(L, -2, "close"); + lua_setfield(L, -2, "__index"); lua_pop(L, 1); - setEventBase(L, event_init()); - luaL_register(L, "luaevent.core", funcs); setNamedIntegers(L, consts); return 1; diff --git a/luaevent/test/test.lua b/luaevent/test/test.lua index fd9919d..412857e 100644 --- a/luaevent/test/test.lua +++ b/luaevent/test/test.lua @@ -5,20 +5,30 @@ require"luaevent" require"socket" -local function echoHandler(skt) - while true do - local data,ret = luaevent.receive(skt, 10) - if data == "quit" or ret == 'closed' or not data then - break - end - --collectgarbage() - if not luaevent.send(skt, data) then return end - end - if skt then skt:close() end +local oldPrint = print +print = function(...) + oldPrint("SRV", ...) end +local function echoHandler(skt) + while true do + local data,ret = luaevent.receive(skt, 10) + --print("GOT: ", data, ret) + if data == "quit" or ret == 'closed' then + break + end + luaevent.send(skt, data) + collectgarbage() + end + skt:close() + --print("DONE") +end local server = assert(socket.bind("localhost", 20000)) server:settimeout(0) - +local coro = coroutine.create +coroutine.create = function(...) + local ret = coro(...) + return ret +end luaevent.addserver(server, echoHandler) -luaevent.loop() \ No newline at end of file +luaevent.loop() diff --git a/luaevent/test/testClient.lua b/luaevent/test/testClient.lua dissimilarity index 91% index 8d14887..fbbcf87 100644 --- a/luaevent/test/testClient.lua +++ b/luaevent/test/testClient.lua @@ -1,29 +1,19 @@ -require"luaevent" -require"socket" -local function setupHook(thread) - if not thread then debug.sethook(function(event) print("TRACE >: ", debug.getinfo(2, 'n').name) end, 'c') - else debug.sethook(thread, function(event) print("TRACE ", thread,">: ", debug.getinfo(2, 'n').name) end, 'c') end -end -local count = 100 -local function func(sock) - sock = luaevent.wrap(sock) - assert(sock:connect("localhost", 20000)) - for i = 1, 2 do - local maxZ = 10 - for z = 1, maxZ do - assert(sock:send("Greet me ")) - end - assert(sock:receive(10 * maxZ)) - end - if skt then skt:close() end - count = count - 1 - if count > 0 then - --local sock = assert(socket.tcp()) - --luaevent.addthread(sock, func, sock) - end -end -for i = 1, 500 do - local sock = assert(socket.tcp()) - luaevent.addthread(sock, func, sock) -end -luaevent.loop() +require"luaevent" +require"socket" +local oldPrint = print +print = function(...) + oldPrint("CLT", ...) +end + +local function func() + print("ACTIVATED") + local sock = socket.tcp() + --sock: + sock = luaevent.wrap(sock) + print(assert(sock:connect("localhost", 20000))) + for i = 1, 100 do assert(sock:send("Greet me ")) assert(sock:receive(10)) collectgarbage() end +end + +luaevent.addthread(func) + +luaevent.loop() \ No newline at end of file -- 2.11.4.GIT