Campustream 1.0
A social network MQP for WPI
core/lib/activequeue.php
Go to the documentation of this file.
00001 <?php
00002 
00003 class ActiveQueue {
00004         
00005         const FINISHED = 3;
00006         const RETRY    = 4;
00007         const FAILED   = 5;
00008         
00009         const MAX_RETRIES = 5;
00010         
00011         const OPT_RETRY_QUEUE = 'retry_queue';
00012         
00013         public static $kestrel = null;
00014         
00015         private $options = array(ActiveQueue::OPT_RETRY_QUEUE => false);
00016         
00017         private $queue_name;
00018         
00019         public static function kestrel($server='localhost', $port="22133") {
00020                 
00021                 if (self::$kestrel === null) {
00022                 
00023                         self::$kestrel = new Memcached();
00024                         self::$kestrel->setOption( Memcached::OPT_CONNECT_TIMEOUT, 3000 );
00025                         self::$kestrel->setOption( Memcached::OPT_COMPRESSION, false );
00026                         self::$kestrel->addServer( $server, $port, 1 );
00027                         
00028                 }
00029                 
00030                 return self::$kestrel;
00031                 
00032         }
00033         
00034         public function setOption($option, $value) {
00035                 $this->options[$option] = $value;
00036         }
00037         
00038         public function __construct( $name ) {
00039                 $this->queue_name = $name;
00040         }
00041         
00042         public function push( $payload, $metadata=array() ) {
00043                 
00044                 $item = json_encode( array( get_class( $payload ), $payload->id, 0, $metadata ) );
00045                 self::kestrel()->set($this->queue_name, $item, 0 );
00046         }
00047         
00048         public function process( $seconds, $block ) {
00049                 
00050                 $start = time();
00051                 
00052                 $kestrel = self::kestrel();
00053                 
00054                 while ( $seconds >= (time() - $start) ) {
00055                         
00056                         $head = $kestrel->get("{$this->queue_name}/t=2000/open");
00057                         
00058                         if ( $head === false ) {
00059                                 // Queue is empty.
00060                                 sleep(2);
00061                                 continue;
00062                         }
00063                         
00064                         list( $class, $id, $attempts, $metadata ) = json_decode( $head );
00065                         
00066                         if ( $attempts > self::MAX_RETRIES ) {
00067                                 $kestrel->get( "{$this->queue_name}/close" );
00068                                 continue;
00069                         }
00070                         
00071                         $attempts++;
00072                         
00073                         if ($class !== "stdClass") {
00074                                 $object = ActiveRecord::find( $class, $id );
00075                         
00076                                 // Payload is invalid, probably due to slave lag
00077                                 // Remove it from the top and reinsert it.
00078                                 if ( $object->is_loaded() === false ) {
00079                                         $kestrel->get( "{$this->queue_name}/close" );
00080                                         sleep(2);
00081                                         $item = json_encode( array( $class,$id,$attempts,$metadata) );
00082                                         $kestrel->set($this->queue_name, $item, 0 );
00083                                         continue;
00084                                 }
00085                         
00086                         } else {
00087                                 $object = new stdClass;
00088                                 $object->id = $id;
00089                         }
00090                                                 
00091                         $status = $block( $object, $metadata );
00092                         
00093                         if ( $status === ActiveQueue::FAILED ) {
00094                                 $kestrel->get( "{$this->queue_name}/close" );
00095                         } elseif ( $status === ActiveQueue::RETRY ) {
00096                                 $kestrel->get( "{$this->queue_name}/close" );
00097                                 
00098                                 if ($this->options[self::OPT_RETRY_QUEUE] === true) {
00099                                         $kestrel->set($this->queue_name . "_retry", json_encode(array($class,$id,$attempts,$metadata)), 60 );
00100                                 } else {
00101                                         $kestrel->set($this->queue_name, json_encode(array($class,$id,$attempts,$metadata)), 0 );
00102                                 }
00103                                 
00104                         } elseif ( $status === ActiveQueue::FINISHED ) {
00105                                 $kestrel->get( "{$this->queue_name}/close" );
00106                         }
00107                                 
00108                 }       
00109         }       
00110 }