From 319d7329bfa618babdcaeb30de61bde351fc8013 Mon Sep 17 00:00:00 2001 From: Gabor Adam Toth Date: Wed, 24 Aug 2011 10:57:50 +0200 Subject: [PATCH] twitter user stream --- world/net/http/fetch_stream.c | 44 ++++++ world/net/http/oauth.c | 78 +++++++---- world/net/twitter/client.c | 251 +++++++++++++++++++++++++++++++--- 3 files changed, 326 insertions(+), 47 deletions(-) create mode 100644 world/net/http/fetch_stream.c diff --git a/world/net/http/fetch_stream.c b/world/net/http/fetch_stream.c new file mode 100644 index 0000000..e406bf9 --- /dev/null +++ b/world/net/http/fetch_stream.c @@ -0,0 +1,44 @@ +#include +#include + +inherit NET_PATH "http/fetch"; + +int buffer_content(string data) { + P2(("%O got data:\n%O\n", ME, data)) + + mixed *waiter; + foreach (waiter : qToArray(ME)) { + funcall(waiter[0], data, waiter[1] ? fheaders : copy(fheaders), http_status, 1); + } + next_input_to(#'buffer_content); //' + return 1; +} + +disconnected(string data) { + P2(("%O got disconnected:\n%O\n", ME, data)) + headers["_fetchtime"] = isotime(ctime(time()), 1); + if (headers["last-modified"]) + rheaders["if-modified-since"] = headers["last-modified"]; + //if (headers["etag"]) + // rheaders["if-none-match"] = headers["etag"]; // heise does not work with etag + + fheaders = headers; + buffer = headers = 0; + switch (http_status) { + default: + mixed *waiter; + while (qSize(ME)) { + waiter = shift(ME); + P2(("%O calls back.. body is %O\n", ME, data)) + funcall(waiter[0], data, waiter[1] ? fheaders : copy(fheaders), http_status, 0); + } + if (http_status == R_OK) break; + // doesn't seem to get here when HTTP returns 301 or 302. strange. + // fall thru + case R_NOTMODIFIED: + qDel(ME); + qInit(ME, 150, 5); + } + fetching = 0; + return 1; // presume this disc was expected +} diff --git a/world/net/http/oauth.c b/world/net/http/oauth.c index a8cb21c..751477a 100644 --- a/world/net/http/oauth.c +++ b/world/net/http/oauth.c @@ -11,15 +11,19 @@ #include #include -string consumer_key; -string consumer_secret; -string request_token_url; -mapping request_params = ([ ]); -mapping access_params = ([ ]); -string access_token_url; -string authorize_url; -string callback_url = HTTPS_OR_HTTP_URL + "/oauth"; -object user; +volatile string consumer_key; +volatile string consumer_secret; +volatile string request_token_url; +persistent mapping request_params = ([ ]); +persistent mapping access_params = ([ ]); +volatile string access_token_url; +volatile string authorize_url; +volatile string callback_url = HTTPS_OR_HTTP_URL + "/oauth"; +volatile object user; +volatile int authorized = 0; + +oauth_success() {} +oauth_error() {} varargs void fetch(object ua, string url, string method, mapping post, mapping oauth) { P3((">> oauth:fetch(%O, %O, %O)\n", url, method, oauth)) @@ -60,12 +64,16 @@ void parse_request_token(string body, mapping headers, int http_status) { if (strlen(request_params["oauth_token"]) && strlen(request_params["oauth_token_secret"])) { shared_memory("oauth_request_tokens")[request_params["oauth_token"]] = ME; //P3((">>> adding token: %O to shm: %O\n", request_params["oauth_token"], shared_memory("oauth_request_tokens"))) - sendmsg(user, "_notice_oauth_authorize_url", "Open [_url] to perform authorization.", - (["_url": authorize_url + "?oauth_token=" + request_params["oauth_token"]])); + if (user) sendmsg(user, "_notice_oauth_authorize_url", "Open [_url] to perform authorization.", + (["_url": authorize_url + "?oauth_token=" + request_params["oauth_token"]])); + P1(("OAuth: open %s to perform authorization.\n", authorize_url + "?oauth_token=" + request_params["oauth_token"])); return; } } - sendmsg(user, "_error_oauth_token_request", "OAuth failed: could not get a request token."); + if (user) sendmsg(user, "_error_oauth_token_request", "OAuth failed: could not get a request token."); + P1(("OAuth failed: could not get a request token.\n")); + authorized = -1; + oauth_error(); } void parse_access_token(string body, mapping headers, int http_status) { @@ -74,11 +82,17 @@ void parse_access_token(string body, mapping headers, int http_status) { access_params = ([]); url_parse_query(access_params, body); if (strlen(access_params["oauth_token"]) && strlen(access_params["oauth_token_secret"])) { - sendmsg(user, "_notice_oauth_success", "OAuth successful."); + if (user) sendmsg(user, "_notice_oauth_success", "OAuth successful."); + P1(("OAuth successful.\n")); + authorized = 1; + oauth_success(); return; } } - sendmsg(user, "_error_oauth_token_access", "OAuth failed: could not get an access token."); + if (user) sendmsg(user, "_error_oauth_token_access", "OAuth failed: could not get an access token."); + P1(("OAuth failed: could not get an access token.\n")); + authorized = -1; + oauth_error(); } void verified(string verifier) { @@ -88,18 +102,30 @@ void verified(string verifier) { fetch(ua, access_token_url, "POST", 0, (["oauth_verifier": verifier])); } -object load(object usr, string key, string secret, string request, string access, string authorize) { - if (usr) user = usr; - if (key) consumer_key = key; - if (secret) consumer_secret = secret; - if (request) request_token_url = request; - if (access) access_token_url = access; - if (authorize) authorize_url = authorize; +void oauth() { + if (!request_token_url) return; - if (request_token_url && user) { - object ua = clone_object(NET_PATH "http/fetch"); - ua->content(#'parse_request_token, 1, 1); //'); - fetch(ua, request_token_url, "POST", 0, (["oauth_callback": callback_url])); - } + request_params = ([ ]); + access_params = ([ ]); + authorized = 0; + + object ua = clone_object(NET_PATH "http/fetch"); + ua->content(#'parse_request_token, 1, 1); //'); + fetch(ua, request_token_url, "POST", 0, (["oauth_callback": callback_url])); +} + +object load(object usr, mapping opts) { + if (usr) user = usr; + unless (mappingp(opts)) opts = ([]); + if (opts["consumer_key"]) consumer_key = opts["consumer_key"]; + if (opts["consumer_secret"]) consumer_secret = opts["consumer_secret"]; + if (opts["request_token_url"]) request_token_url = opts["request_token_url"]; + if (opts["access_token_url"]) access_token_url = opts["access_token_url"]; + if (opts["authorize_url"]) authorize_url = opts["authorize_url"]; + + if (access_params["oauth_token"]) + authorized = 1; + else + oauth(); return ME; } diff --git a/world/net/twitter/client.c b/world/net/twitter/client.c index e3a46ee..4776ae2 100644 --- a/world/net/twitter/client.c +++ b/world/net/twitter/client.c @@ -7,34 +7,72 @@ * - app type: browser * - callback url: http://your.host/oauth * (actually the url psyced sends will be used but you have to type in something) - * - access type: read/write * - then in local.h #define TWITTER_KEY & TWITTER_SECRET + * + * - enabling the user stream + * - #define TWITTER_ADMIN which should contain a user name who will receive oauth messages + * - #define USE_TWITTER_STREAM + * - add this to local/config.c: +# ifdef USE_TWITTER_STREAM + D(" " NET_PATH "twitter/client\n"); + load_object(NET_PATH "twitter/client")->home_stream(); +# endif */ #include #include inherit NET_PATH "http/oauth"; +inherit NET_PATH "queue"; -string name = "twitter"; -string display_name = "twitter"; -string api_base_url = "http://api.twitter.com/1"; +persistent mixed lastid; -int status_max_len = 140; +volatile string api_url = "https://api.twitter.com/1"; +volatile string userstream_url = "https://userstream.twitter.com/2"; -object load(object usr, string key, string secret, string request, string access, string authorize) { - unless (consumer_key) consumer_key = TWITTER_KEY; - unless (consumer_secret) consumer_secret = TWITTER_SECRET; - unless (request_token_url) request_token_url = "http://twitter.com/oauth/request_token"; - unless (access_token_url) access_token_url = "http://twitter.com/oauth/access_token"; - unless (authorize_url) authorize_url = "http://twitter.com/oauth/authorize"; +volatile int status_max_len = 140; +volatile int send_to_user = 0; +volatile int wait = 0; +volatile mapping friends; - return ::load(usr, key, secret, request, access, authorize); +user_stream(); + +string object_file_name() { + return DATA_PATH "twitter/" + (user ? user->qNameLower() : "-default"); } -void parse_status_update(string body, string headers, int http_status) { +save() { + mkdir(DATA_PATH "twitter"); + save_object(object_file_name()); +} + +create() { + return load(); +} + +object load(object usr, mapping opts) { +#ifdef TWITTER_ADMIN + unless (usr) usr = user = summon_person(TWITTER_ADMIN, NET_PATH "user"); +#endif + unless (mappingp(opts)) opts = ([]); + send_to_user = opts["send_to_user"]; + + unless (consumer_key) consumer_key = TWITTER_KEY; + unless (consumer_secret) consumer_secret = TWITTER_SECRET; + unless (request_token_url) request_token_url = "https://twitter.com/oauth/request_token"; + unless (access_token_url) access_token_url = "https://twitter.com/oauth/access_token"; + unless (authorize_url) authorize_url = "https://twitter.com/oauth/authorize"; + + restore_object(object_file_name()); + qCreate(); + qInit(ME, 100, 5); + + return ::load(usr, opts); +} + +void check_status_update(string body, string headers, int http_status) { P3(("twitter/client:parse_status_update(%O, %O, %O)\n", body, headers, http_status)) if (http_status != R_OK) - sendmsg(user, "_error_"+name+"_status_update", "Error: failed to post status update on [_name].", (["_name": display_name])); + sendmsg(user, "_error_twitter_status_update", "Error: failed to post status update on twitter."); } void status_update(string text) { @@ -42,19 +80,190 @@ void status_update(string text) { if (status_max_len && strlen(text) > status_max_len) text = text[0..status_max_len-4] + "..."; object ua = clone_object(NET_PATH "http/fetch"); - ua->content(#'parse_status_update, 1, 1); //'); - fetch(ua, api_base_url + "/statuses/update.json", "POST", (["status": text])); + ua->content(#'check_status_update, 1, 1); //'); + fetch(ua, api_url + "/statuses/update.json", "POST", (["status": text])); } -#if 0 //not used, just an example -void parse_home_timeline(string body, string headers, int http_status) { +parse_statuses(string data) { + mixed wurst; + string nick; + object o; + mapping d, p; + int i; + + if (!data || data == "") return; + + wurst = parse_json(data); + if (mappingp(wurst)) + wurst = ({ wurst }); + else unless (pointerp(wurst)) { + monitor_report("_failure_network_fetch_twitter_broken", + "[_source] failed to parse a status message"); + return; + } + unless (sizeof(wurst)) { + monitor_report("_failure_network_fetch_twitter_empty", + "[_source] received an empty structure."); + return; + } + // this used to fail on MAX_INT turning the ints to negative.. it would work for + // a while longer using floats, but since floating point mantissa in lpc is only + // 32 bits wide, it's just a question of time until we hit that roof again (when + // status_id reaches 4294967296). so let's use bignums instead. funny to run into + // such a weird problem only after years that twitter has been in existence. + if (lastid && bignum_cmp(to_string(wurst[0]["id"]), to_string(lastid)) <= 0) { + P1(("%O received %d old updates (id0 %O <= lastid %O).\n", + ME, sizeof(wurst), wurst[0]["id"], lastid)) + return; + } + lastid = wurst[0]["id"]; + save(); + for (i=sizeof(wurst)-1; i>=0; i--) { + d = wurst[i]; + unless (mappingp(d)) { + P1(("%O got a broken tweet: %O.\n", ME, d)) + continue; + } + p = d["user"]; + unless (mappingp(p)) { + P1(("%O got a userless tweet.\n", ME)) + continue; + } + unless (nick = p["screen_name"]) { + P1(("%O got a nickless tweeter.\n", ME)) + continue; + } + + o = send_to_user ? user : find_place(nick); + + // _message_twitter ? not so convincing.. a lot of the + // things are converted rss newsfeeds, and when private + // people are "chatting" over twitter, they are still + // "broadcasting" each message to a random conjunction + // of friends and strangers (we don't follow private + // twitters with this gateway!) ... thus it is quite + // appropriate that twitters are not given the same + // relevance as a _message. still you can /highlight + // particular senders in your client... + // + sendmsg(o, + // "_notice_headline_twitter", "([_nick]) [_headline]", + "_message_twitter", d["text"], + ([ + "_headline": d["text"], // should i send text as _action? + "_nick": nick, + // _count seems to be the better word for this + "_amount_updates": p["statuses_count"], + "_amount_followers": p["followers_count"], + "_amount_sources": p["friends_count"], + "_color": "#"+ p["profile_sidebar_fill_color"], + "_description": p["description"] || "", + "_page": p["url"] || "", + "_name": p["name"] || "", + // "_contact_twitter": p["id"], + "_description_agent_HTML": d["source"], + "_reference_reply": d["in_reply_to_screen_name"], + // "_twit": d["id"], + "_uniform_photo": p["profile_image_url"] || "", + "_uniform_photo_background": + p["profile_background_image_url"] || "" + ]), "/"); // send as root + + // der spiegel u.a. twittern übrigens in latin-1 + // während psyc utf-8 erwartet.. eine char guess engine + // muss her.. FIXME + } +} + +parse_home_timeline(string body, string headers, int http_status) { P3(("twitter/client:parse_home_timeline(%O, %O, %O)\n", body, headers, http_status)) + if (http_status == 401) { + oauth(); + home_timeline(); + } + if (http_status != R_OK || !body || body == "") return; + parse_statuses(body); } -void home_timeline() { +home_timeline(mixed *next) { P3(("twitter/client:home_timeline()\n")) + if (!authorized) return enqueue(ME, ({ #'home_timeline })); //'})); + object ua = clone_object(NET_PATH "http/fetch"); ua->content(#'parse_home_timeline, 1, 1); //'); - fetch(ua, api_base_url + "/statuses/home_timeline.json"); + if (next) ua->content(next, 1, 1); //'); + fetch(ua, api_url + "/statuses/home_timeline.json"); + return ua; +} + +home_stream() { + home_timeline(#'user_stream); +} + +// handle one line in the user stream which contains a full message in json format +// or the user's friend if this is the first line in the stream +user_stream_data(string data, string headers, int http_status, int fetching) { + P3(("twitter/client:user_stream_data(..., %O, %O, %O)\n%O\n", headers, http_status, fetching, data)) + + if (http_status == R_OK && data && data != "") { + if (!friends) + friends = parse_json(data); + else + parse_statuses(data); + } + + if (fetching) { + wait = 0; + return; + } else { + P1(("%O disconnected with status %d, headers: %O\n", ME, http_status, headers)) + } + + switch (http_status) { + case 401: // unauthorized + oauth(); + home_stream(); + case 403: // forbidden + case 404: // unknown + case 406: // not acceptable + case 413: // too long + case 416: // range unacceptable + return; + case 200: + break; + case 420: // rate limited + case 500: // server internal error + case 503: // service overloaded + default: + wait *= 2; + } + if (!wait) wait = 10; + if (wait > 240) wait = 240; + + P1(("%O reconnecting in %d seconds.\n", ME, wait)) + call_out(#'home_stream, wait); //'); +} + +user_stream() { + P3(("twitter/client:user_stream()\n")) + if (!authorized) return enqueue(ME, ({ #'user_stream })); //'})); + friends = 0; + object user_ua = clone_object(NET_PATH "http/fetch_stream"); + user_ua->content(#'user_stream_data, 1, 1); //'); + fetch(user_ua, userstream_url + "/user.json"); +} + +oauth_success() { + P3(("twitter/client:oauth_success()\n")) + save(); + mixed *waiter; + while (qSize(ME)) { + waiter = shift(ME); + funcall(waiter[0]); + } +} + +oauth_error() { + P3(("twitter/client:oauth_error()\n")) + call_out(#'oauth, 60); //'); } -#endif