// $Id: udp.c,v 1.15 2008/03/29 20:05:32 lynx Exp $ // vim:syntax=lpc // // interface to the erq (external request daemon) to handle // multi-protocol UDP, since the driver only has one port // which we use for native PSYC. okay fippo also uses it for SIP, // but i'm not sure if that's going to stay that way. #include #ifdef __ERQ_MAX_SEND__ #include #include #include //#include NET_PATH "queue.c" inherit NET_PATH "queue"; volatile private mapping udp_ports, alias; #endif // port || alias -> data // port -> alias public varargs int ticket(closure callback, int localport, string remotehost, int remoteport); public int send(int localport, string host, int remoteport, string data); private void sendUDP(int localport); public varargs mixed listenPort(int port, mixed param, varargs mixed * args); public int closePort(mixed port); void process_erq(int port, string type, int * response_data, int len); public void getRPort(int port); public varargs mixed listPorts(int m); private void reset(); #ifdef __ERQ_MAX_SEND__ #define TICKETS_MAX 100 // maximum size of the tickets array #define TICKETS_PRE 50 // size to allocate #define QUEUE_MAX 100 #define QUEUE_PRE 50 // sockets states #define S_WAITING -1 #define S_ERROR 0 #define S_OK 1 #define S_BLOCKED 2 #define S_SENDING 3 #define S_CLOSED 4 // send-queue packet-array #define P_LOCALPORT 0 #define P_REMOTEHOST 1 #define P_REMOTEPORT 2 #define P_DATA 3 // socket-array #define STATE 0 #define TICKET 1 #define CALLBACK 2 #define PORT 3 //load() { return ME; } /* public void reset(int a) { if(!mappingp(udp_ports)) udp_ports = ([ ]); if(!mappingp(alias)) alias = ([ ]); ::reset(a); } */ public varargs mixed listPorts(int m) { string message, port; mapping states; reset(); states = ([ -1 : "S_WAITING", 0 : "S_ERROR", 1 : "S_OK", 2 : "S_BLOCKED", 3 : "S_SENDING", 4 : "S_CLOSED" ]); if (m) { mixed value, ports; ports = ([ ]); foreach (port, value : udp_ports) { ports += ([ port : ([ "_port" : port, "_state" : states[value[STATE]], "_callback" : to_string(value[CALLBACK]) ]) ]); } return ports; } message = "===================\n\tPORT\tSTATE\tCALLBACK\n"; foreach(port : m_indices(udp_ports)) { message += "\t"+port+"\t"+states[udp_ports[port][STATE]]+"\t" +to_string(udp_ports[port][CALLBACK])+"\n"; } return message; } public varargs int ticket(closure callback, mixed localport, string remotehost, int remoteport) { mixed key; reset(); unless(udp_ports[to_string(localport)]) { if(!alias[to_string(localport)]) { if(stringp(localport)) { unless (listenPort(0,localport,callback)) return 0; } else { unless (listenPort(localport,callback)) return 0; } } else { localport = alias[to_string(localport)]; } } localport = to_string(localport); if(remotehost && sscanf(remotehost,"%~D.%~D.%~D.%~D") != 4) { closure c = lambda( ({ 'name }), ({ CL_IF, 'name, ({ (#'funcall), #'ticket, callback, localport, 'name, remoteport }) })); dns_resolve(remotehost, c); return 1; } if(remoteport) { key = localport + ":" + remotehost + ":" + remoteport; } else if(remotehost) { key = localport + ":" + remotehost; } else { key = localport; } unless(qExists( key )) { qInit(key, TICKETS_MAX, TICKETS_PRE); } enqueue(key, callback); return 1; } public int send(mixed localport, string remotehost, int remoteport, string data) { unless(udp_ports[to_string(localport)]) { if(!alias[to_string(localport)]) { if(stringp(localport)) { unless (listenPort(0,localport)) return 0; } else { unless (listenPort(localport)) return 0; } } else { localport = alias[to_string(localport)]; } } localport = to_string(localport); if(remotehost && sscanf(remotehost,"%~D.%~D.%~D.%~D") != 4) { closure c = lambda( ({ 'name }), ({ CL_IF, 'name, ({ (#'funcall), #'send, localport, 'name, remoteport, data }) }) ); dns_resolve(remotehost, c); return 1; } if(udp_ports[localport][STATE] != S_BLOCKED) { if(!enqueue(":" + localport,({ localport,remotehost,remoteport, to_array(data)[0..strlen(data)-1], }) )) return 0; sendUDP(localport); return 1; } return 0; } private void sendUDP(mixed localport) { string packet; mixed data; int * ip; unless(udp_ports[localport]) return 0; switch(udp_ports[localport][STATE]) { case S_OK: if(!qSize( ":" + localport )) return; data = shift( ":" + localport ); ip = map(explode(data[P_REMOTEHOST],"."),#'to_int); packet = udp_ports[localport][TICKET] + ip[0..3] + ({ data[P_REMOTEPORT] / 256, data[P_REMOTEPORT] & 255 }) + data[P_DATA]; unless (send_erq(ERQ_SEND, packet, lambda(({ 'data, 'len }), ({ #'process_erq, localport, ERQ_SEND, 'data, 'len }) ))) { P0(("%O failed to ERQ_SEND!\n", ME)) } udp_ports[localport][STATE] = S_SENDING; D2(D("============\nOutgoing UDP-packet on port "+localport +"\nto: "+data[P_REMOTEHOST] +":"+data[P_REMOTEPORT]+"\n============\n");) return; case S_ERROR: return; case S_WAITING: return; } } public void getRPort(int port) { // first we will try to bind port 3645 // if that fails ++ until it works fine ,) // then we will send a packet over the new // unknown port which is still represented by udp_ports[0] // to our new port 3645+n. Tada! we know our former unknown // port. Then we just change 0 to x. // // and everything put into one large lambda.. .] // -el if (send_erq(ERQ_OPEN_UDP,({ port/256, port&255 }), lambda(({ 'data, 'len }), ({ (#',), ({ (#'=),'data,({ (#'map),'data,#'&,255 }) }), ({ #'switch, ({ CL_INDEX, 'data, 0 }), ({ ERQ_OK }), ({ (#',), ({ #'send,0,"127.0.0.1",port,"PORT" }), ({ (#'=), ({ CL_INDEX, ({ CL_INDEX, udp_ports, port }), TICKET }), ({ CL_L_RANGE, 'data, 1 }) }), ({ (#'=), ({ CL_INDEX, ({ CL_INDEX, udp_ports, port }), STATE }), S_OK }), }), (#'break), ({ ERQ_E_UNKNOWN }), ({ CL_IF, ({ #'==,({ CL_INDEX, 'data, 1}), 98 }), ({ #'getRPort, port+1 }), ({ (#'return),0 }), }), (#'break), ({ ERQ_STDOUT }), ({ (#',), ({ (#'=),'r_host,({ (#'sprintf),"%d.%d.%d.%d", ({ CL_INDEX, 'data, 1 }), ({ CL_INDEX, 'data, 2 }), ({ CL_INDEX, 'data, 3 }), ({ CL_INDEX, 'data, 4 }) }) }), ({ CL_IF, ({ #'&&, ({ #'==,'r_host,"127.0.0.1" }),({ #'==,({ (#'to_string),({ CL_RANGE,'data,7,11 }) }),"PORT" }), }), ({ (#',), ({ (#'=),'l_port, ({ (#'+), ({ CL_INDEX,'data,6 }),({ (#'*),256,({ CL_INDEX,'data,5 }) }) }) }), ({ #'qRename,":0", ({ (#'+),":",'l_port }) }), ({ (#'=), ({ CL_INDEX, udp_ports, 'l_port }), ({ CL_INDEX, udp_ports, to_string(0) }) }), ({ (#'m_delete), udp_ports, to_string(0) }), ({ #'closePort, port}), ({ (#'=), 'callback, ({ CL_INDEX, ({ CL_INDEX, udp_ports, 'l_port }), CALLBACK }) }), ({ #'closePort, 'l_port }), ({ #'listenPort, 'l_port, 'callback }), ({ (#'funcall), 'callback, 0, 'l_port }), }), ({ (#'return),0 }) }), }), (#'break), }) })))) { qInit(":" + port, QUEUE_MAX, QUEUE_PRE); udp_ports += ([ port : ({ S_WAITING,0,lambda(({ 'data, 'len }), ({ #'process_erq, port, ERQ_OPEN_UDP, 'data, 'len }) )}) ]); } else { P0(("%O failed to ERQ_OPEN_UDP!\n", ME)) return 0; } /* switch(data[0]) { * case ERQ_OK: * send data to that port over unknown port which is still * represented by udp_ports[0] callback must be the 0 to x changer! * break; * case ERQ_E_UNKNOW: * if(data[1] == 98) { // 98 == Adress allready in use * bind port+1 * recall myself * } * break; * case ERQ_STDOUT: * extract port and make the movement! * break; * } */ } public varargs mixed listenPort(int port, mixed param, varargs mixed * args) { unless (mappingp(udp_ports)) reset(); if(qExists(":" + port) && udp_ports[to_string(port)]) return 0; if(!port && !param) return 0; // doesnt make any sence to bind to a // random port without getting to know // which it is. mixed id; closure callback; if(!port && stringp(param)) { while (udp_ports[param]) { param = to_string(({ random(150)+1,random(150)+1 })); } id = param; if(sizeof(args) && closurep(args[0])) callback = args[0]; } else { id = to_string(port); } if(closurep(param)) callback = param; D2(D("============\nlistenPort("+(port || id) +") called!\n============\n");) closure c = lambda( ({ 'data, 'len }), ({ #'process_erq, id, ERQ_OPEN_UDP, 'data, 'len }) ); if (send_erq(ERQ_OPEN_UDP,({ port/256, port&255 }),c)) { qInit(":" + id, QUEUE_MAX, QUEUE_PRE); udp_ports += ([ id : ({ S_WAITING,0,callback || 0 }) ]); if(stringp(param)) return id; return 1; } else { P0(("%O failed to ERQ_OPEN_UDP!\n", ME)) return 0; } } public int closePort(mixed port) { port = to_string(port); if(!udp_ports[port] && (!alias[port] && !udp_ports[alias[port]])) return 0; D2(D("============\nclosePort("+port+") called!\n============\n");) send_erq(ERQ_KILL, (udp_ports[port][TICKET] || udp_ports[alias[port]][TICKET]) + ({0,0,0,0}), lambda(({ 'data, 'len }), ({ #'process_erq, port, ERQ_KILL, 'data, 'len }) )); udp_ports -= ([ port ]); udp_ports -= ([ alias[port] ]); alias -= ([ port ]); qDel(":" + port); } void process_erq(mixed port, string type, int * response_data, int len) { response_data = map(response_data,#'&,255); port = to_string(port); int remoteport; string remotehost; mixed callback; switch (response_data[0]) { // BIND and SEND case ERQ_OK: // port bound || package sent ! switch (type) { case ERQ_OPEN_UDP: D2(D("============\nPORT \""+port+"\" bound for ticket: \""+to_string(response_data[1..])+"\"\n============\n");) udp_ports[port][TICKET] = response_data[1..]; udp_ports[port][STATE] = S_OK; if(port == "0" && udp_ports[port][CALLBACK]) { // no alias, cb getRPort(3654); } else { sendUDP( port ); } break; case ERQ_SEND: udp_ports[port][STATE] = S_OK; sendUDP( port ); break; case ERQ_KILL: break; } return; case ERQ_E_UNKNOWN: // unknown error while binding || sending switch(response_data[1]) { case EADDRINUSE: D2(D("Port "+port+" allready in use by someone else!\n");) default: D2(D("Unknown error while Binding or Sending!" +to_string(response_data[1..])+"\n");) } return; // BIND case ERQ_E_NSLOTS:// The max number of child processes is exhausted. udp_ports -= ([ port ]); return; case ERQ_E_ARGLENGTH: // The port number given does not consist of two bytes return; // SEND case ERQ_E_TICKET: // ticket invalid! if(!unshift(":" + port )) { D2(D("PANIC! Mysteriously a UDP-packet got lost in the" "queue!\n");) } if(!to_int(port) || to_string(to_int(port)) != port) { listenPort(0, port ); } else { listenPort( to_int(port), udp_ports[port][CALLBACK] || 0); } udp_ports -= ([ port ]); return; case ERQ_E_INCOMPLETE: // only a part of the message has been sent return; case ERQ_E_WOULDBLOCK: // erq allready has a packet in the queue.. if(!unshift( port )) { D2(D("You are too fast, young Jedi!\n");) } return; case ERQ_E_PIPE: // pipe error. return; // INCOMING UDP case ERQ_STDOUT: // incoming data on udp-port remotehost = sprintf("%d.%d.%d.%d", response_data[1], response_data[2], response_data[3], response_data[4]); remoteport = response_data[5] * 256 + response_data[6]; string key = qSize(port + ":" + remotehost + ":" + remoteport, port + ":" + remotehost, port); if(key) { callback = shift( key ); } else { callback = udp_ports[port][CALLBACK]; } D2(D("============\nIncoming UDP-packet:\n from: "+port + ":" + remotehost + ":" + remoteport +"\nticket: \""+ key + "\" callback: "+to_string(callback)+"\n============\n");) if(closurep(callback)) funcall(callback,1,to_string(response_data[7..]), port, remotehost, remoteport); return; } } void create() { alias = ([ ]); udp_ports = ([ ]); } load() { return ME; } #else public varargs int ticket(closure callback, int localport, string remotehost, int remoteport) { D2(D("You need an erq to use d/udp.c\n");) return 0; } public int send(int localport, string host, int remoteport, string data) { D2(D("You need an erq to use d/udp.c\n");) return 0; } public varargs mixed listenPort(int port, mixed param) { D2(D("You need an erq to use d/udp.c\n");) return 0; } public void getRPort(int port) { D2(D("You need an erq to use d/udp.c\n");) } public varargs mixed listPorts(int m) { D2(D("You need an erq to use d/udp.c\n");) if (m) return ([ ]); return ""; } load() { return ME; } #endif void reset() { }