Campustream 1.0
A social network MQP for WPI
|
00001 <? 00002 00003 group('facebook'); 00004 00005 desc('Status message queue'); 00006 task('status_queue', function ($args) { 00007 00008 $start = time(); 00009 $runtime = 290; 00010 $status_queue = 'queue:facebook:statuses'; 00011 00012 $r = RedisManager::connection(); 00013 00014 while (($start + $runtime) > time()) { 00015 if ($r->llen($status_queue) === 0) { 00016 sleep(5); 00017 continue; 00018 } 00019 00020 $data = $r->lpop($status_queue); 00021 if (!$data) { 00022 sleep(2); 00023 continue; 00024 } 00025 00026 $data = unserialize($data); 00027 $user = ActiveCache::find('User_Model', $data['user'], 43200)->sql( 00028 "SELECT * FROM users WHERE id = {$data['user']} LIMIT 1" 00029 ); 00030 00031 if (!$user->is_loaded()) { 00032 continue; 00033 } 00034 00035 if (!$user->has_facebook() || $user->facebook()->export_enabled == 0) { 00036 continue; 00037 } 00038 00039 $facebook = $user->facebook_api(); 00040 $message = $data['payload']; 00041 00042 $resp = $facebook->api('/me/feed', 'POST', $user->fb_prepare(array('message'=>$message))); 00043 Logger::debug($resp); 00044 /*if ($resp && $face->http_code === 200) { 00045 $user->twitter()->last_tweet_id = $resp->id_str; 00046 $user->twitter()->save(); 00047 00048 Logger::tweet_export("Posted status update for {$user->username} to @{$user->twitter()->username}"); 00049 } elseif ($twitter->http_code >= 500) { 00050 Logger::tweet_export("Error for {$user->username}: {$twitter->http_code}"); 00051 Logger::tweet_export($twitter->http_info); 00052 $r->rpush($status_queue, serialize($data)); 00053 } else { 00054 Logger::tweet_export("Error for {$user->username}: {$twitter->http_code}"); 00055 }*/ 00056 } 00057 });