Campustream 1.0
A social network MQP for WPI
|
00001 <?php 00002 00003 class ActiveQueue { 00004 00005 const FINISHED = 3; 00006 const RETRY = 4; 00007 const FAILED = 5; 00008 00009 const MAX_RETRIES = 5; 00010 00011 const OPT_RETRY_QUEUE = 'retry_queue'; 00012 00013 public static $kestrel = null; 00014 00015 private $options = array(ActiveQueue::OPT_RETRY_QUEUE => false); 00016 00017 private $queue_name; 00018 00019 public static function kestrel($server='localhost', $port="22133") { 00020 00021 if (self::$kestrel === null) { 00022 00023 self::$kestrel = new Memcached(); 00024 self::$kestrel->setOption( Memcached::OPT_CONNECT_TIMEOUT, 3000 ); 00025 self::$kestrel->setOption( Memcached::OPT_COMPRESSION, false ); 00026 self::$kestrel->addServer( $server, $port, 1 ); 00027 00028 } 00029 00030 return self::$kestrel; 00031 00032 } 00033 00034 public function setOption($option, $value) { 00035 $this->options[$option] = $value; 00036 } 00037 00038 public function __construct( $name ) { 00039 $this->queue_name = $name; 00040 } 00041 00042 public function push( $payload, $metadata=array() ) { 00043 00044 $item = json_encode( array( get_class( $payload ), $payload->id, 0, $metadata ) ); 00045 self::kestrel()->set($this->queue_name, $item, 0 ); 00046 } 00047 00048 public function process( $seconds, $block ) { 00049 00050 $start = time(); 00051 00052 $kestrel = self::kestrel(); 00053 00054 while ( $seconds >= (time() - $start) ) { 00055 00056 $head = $kestrel->get("{$this->queue_name}/t=2000/open"); 00057 00058 if ( $head === false ) { 00059 // Queue is empty. 00060 sleep(2); 00061 continue; 00062 } 00063 00064 list( $class, $id, $attempts, $metadata ) = json_decode( $head ); 00065 00066 if ( $attempts > self::MAX_RETRIES ) { 00067 $kestrel->get( "{$this->queue_name}/close" ); 00068 continue; 00069 } 00070 00071 $attempts++; 00072 00073 if ($class !== "stdClass") { 00074 $object = ActiveRecord::find( $class, $id ); 00075 00076 // Payload is invalid, probably due to slave lag 00077 // Remove it from the top and reinsert it. 00078 if ( $object->is_loaded() === false ) { 00079 $kestrel->get( "{$this->queue_name}/close" ); 00080 sleep(2); 00081 $item = json_encode( array( $class,$id,$attempts,$metadata) ); 00082 $kestrel->set($this->queue_name, $item, 0 ); 00083 continue; 00084 } 00085 00086 } else { 00087 $object = new stdClass; 00088 $object->id = $id; 00089 } 00090 00091 $status = $block( $object, $metadata ); 00092 00093 if ( $status === ActiveQueue::FAILED ) { 00094 $kestrel->get( "{$this->queue_name}/close" ); 00095 } elseif ( $status === ActiveQueue::RETRY ) { 00096 $kestrel->get( "{$this->queue_name}/close" ); 00097 00098 if ($this->options[self::OPT_RETRY_QUEUE] === true) { 00099 $kestrel->set($this->queue_name . "_retry", json_encode(array($class,$id,$attempts,$metadata)), 60 ); 00100 } else { 00101 $kestrel->set($this->queue_name, json_encode(array($class,$id,$attempts,$metadata)), 0 ); 00102 } 00103 00104 } elseif ( $status === ActiveQueue::FINISHED ) { 00105 $kestrel->get( "{$this->queue_name}/close" ); 00106 } 00107 00108 } 00109 } 00110 }