http/fetch: added stream param

This commit is contained in:
Gabor Adam Toth 2011-08-24 11:33:04 +02:00
parent e440532c58
commit 595ce50f5c
4 changed files with 23 additions and 54 deletions

View File

@ -30,6 +30,7 @@ volatile string http_message;
volatile int http_status, port, fetching, ssl; volatile int http_status, port, fetching, ssl;
volatile string buffer, thehost, url, fetched, host, resource, method; volatile string buffer, thehost, url, fetched, host, resource, method;
volatile mixed rbody; volatile mixed rbody;
volatile int stream;
int parse_status(string all); int parse_status(string all);
int parse_header(string all); int parse_header(string all);
@ -37,9 +38,10 @@ int buffer_content(string all);
string qHost() { return thehost; } string qHost() { return thehost; }
varargs void fetch(string murl, string meth, mixed body, mapping hdrs) { varargs void fetch(string murl, string meth, mixed body, mapping hdrs, int strm) {
method = meth || "GET"; method = meth || "GET";
rbody = body; rbody = body;
stream = strm;
if (hdrs) rheaders += hdrs; if (hdrs) rheaders += hdrs;
if (url != murl) { if (url != murl) {
// accept.c does this for us: // accept.c does this for us:
@ -183,9 +185,16 @@ int parse_header(string all) {
return 1; return 1;
} }
int buffer_content(string all) { int buffer_content(string data) {
P2(("%O body %O\n", ME, all)) P2(("%O body %O\n", ME, data))
buffer += all + "\n"; if (stream) {
mixed *waiter;
foreach (waiter : qToArray(ME)) {
funcall(waiter[0], data, waiter[1] ? fheaders : copy(fheaders), http_status, 1);
}
} else {
buffer += data + "\n";
}
next_input_to(#'buffer_content); next_input_to(#'buffer_content);
return 1; return 1;
} }
@ -198,8 +207,12 @@ disconnected(remainder) {
//if (headers["etag"]) //if (headers["etag"])
// rheaders["if-none-match"] = headers["etag"]; // heise does not work with etag // rheaders["if-none-match"] = headers["etag"]; // heise does not work with etag
fetched = buffer; if (stream) {
if (remainder) fetched += remainder; fetched = remainder;
} else {
fetched = buffer;
if (remainder) fetched += remainder;
}
fheaders = headers; fheaders = headers;
buffer = headers = 0; buffer = headers = 0;
switch (http_status) { switch (http_status) {

View File

@ -1,44 +0,0 @@
#include <net.h>
#include <ht/http.h>
inherit NET_PATH "http/fetch";
int buffer_content(string data) {
P2(("%O got data:\n%O\n", ME, data))
mixed *waiter;
foreach (waiter : qToArray(ME)) {
funcall(waiter[0], data, waiter[1] ? fheaders : copy(fheaders), http_status, 1);
}
next_input_to(#'buffer_content); //'
return 1;
}
disconnected(string data) {
P2(("%O got disconnected:\n%O\n", ME, data))
headers["_fetchtime"] = isotime(ctime(time()), 1);
if (headers["last-modified"])
rheaders["if-modified-since"] = headers["last-modified"];
//if (headers["etag"])
// rheaders["if-none-match"] = headers["etag"]; // heise does not work with etag
fheaders = headers;
buffer = headers = 0;
switch (http_status) {
default:
mixed *waiter;
while (qSize(ME)) {
waiter = shift(ME);
P2(("%O calls back.. body is %O\n", ME, data))
funcall(waiter[0], data, waiter[1] ? fheaders : copy(fheaders), http_status, 0);
}
if (http_status == R_OK) break;
// doesn't seem to get here when HTTP returns 301 or 302. strange.
// fall thru
case R_NOTMODIFIED:
qDel(ME);
qInit(ME, 150, 5);
}
fetching = 0;
return 1; // presume this disc was expected
}

View File

@ -25,7 +25,7 @@ volatile int authorized = 0;
oauth_success() {} oauth_success() {}
oauth_error() {} oauth_error() {}
varargs void fetch(object ua, string url, string method, mapping post, mapping oauth) { varargs void fetch(object ua, string url, string method, mapping post, mapping oauth, int stream) {
P3((">> oauth:fetch(%O, %O, %O)\n", url, method, oauth)) P3((">> oauth:fetch(%O, %O, %O)\n", url, method, oauth))
unless (method) method = "GET"; unless (method) method = "GET";
unless (post) post = ([]); unless (post) post = ([]);
@ -53,7 +53,7 @@ varargs void fetch(object ua, string url, string method, mapping post, mapping o
foreach (string key, string value : oauth) foreach (string key, string value : oauth)
p += (strlen(p) ? "," : "") + key + "=\"" + urlencode(to_string(value)) + "\""; p += (strlen(p) ? "," : "") + key + "=\"" + urlencode(to_string(value)) + "\"";
ua->fetch(url, method, post, (["authorization": "OAuth " + p])); ua->fetch(url, method, post, (["authorization": "OAuth " + p]), stream);
} }
void parse_request_token(string body, mapping headers, int http_status) { void parse_request_token(string body, mapping headers, int http_status) {

View File

@ -250,9 +250,9 @@ user_stream() {
P3(("twitter/client:user_stream()\n")) P3(("twitter/client:user_stream()\n"))
if (!authorized) return enqueue(ME, ({ #'user_stream })); //'})); if (!authorized) return enqueue(ME, ({ #'user_stream })); //'}));
friends = 0; friends = 0;
object user_ua = clone_object(NET_PATH "http/fetch_stream"); object user_ua = clone_object(NET_PATH "http/fetch");
user_ua->content(#'user_stream_data, 1, 1); //'); user_ua->content(#'user_stream_data, 1, 1); //');
fetch(user_ua, userstream_url + "/user.json"); fetch(user_ua, userstream_url + "/user.json", "GET", 0, 0, 1);
} }
oauth_success() { oauth_success() {