Campustream 1.0
A social network MQP for WPI
|
00001 <? 00002 00003 group('twitter'); 00004 00005 desc('Status message queue'); 00006 task('status_queue', function ($args) { 00007 00008 $start = time(); 00009 $runtime = 290; 00010 $status_queue = 'queue:twitter:statuses'; 00011 00012 $r = RedisManager::connection(); 00013 00014 while (($start + $runtime) > time()) { 00015 if ($r->llen($status_queue) === 0) { 00016 sleep(5); 00017 continue; 00018 } 00019 00020 $data = $r->lpop($status_queue); 00021 if (!$data) { 00022 sleep(2); 00023 continue; 00024 } 00025 00026 $data = unserialize($data); 00027 $user = ActiveCache::find('User_Model', $data['user'], 43200)->sql( 00028 "SELECT * FROM users WHERE id = {$data['user']} LIMIT 1" 00029 ); 00030 00031 if (!$user->is_loaded()) { 00032 continue; 00033 } 00034 00035 if (!$user->has_twitter() || $user->twitter()->export_enabled == 0) { 00036 continue; 00037 } 00038 00039 $twitter = $user->twitter_api(); 00040 $message = mb_substr($data['payload'], 0, 140); 00041 $resp = $twitter->post('statuses/update', array('status' => $message)); 00042 if ($resp && $twitter->http_code === 200) { 00043 $user->twitter()->last_tweet_id = $resp->id_str; 00044 $user->twitter()->save(); 00045 00046 Logger::tweet_export("Posted status update for {$user->username} to @{$user->twitter()->username}"); 00047 } elseif ($twitter->http_code >= 500) { 00048 Logger::tweet_export("Error for {$user->username}: {$twitter->http_code}"); 00049 Logger::tweet_export($twitter->http_info); 00050 $r->rpush($status_queue, serialize($data)); 00051 } else { 00052 Logger::tweet_export("Error for {$user->username}: {$twitter->http_code}"); 00053 } 00054 } 00055 });