Campustream 1.0
A social network MQP for WPI
core/lib/predis.php
Go to the documentation of this file.
00001 <?php
00002 namespace Predis;
00003 use Predis\Shared\Utils, Predis\Distribution\IDistributionStrategy;
00004 
00005 class PredisException extends \Exception { }
00006 class ClientException extends PredisException { }                   // Client-side errors
00007 class AbortedMultiExec extends PredisException { }                  // Aborted multi/exec
00008 
00009 class ServerException extends PredisException {                     // Server-side errors
00010     public function toResponseError() {
00011         return new ResponseError($this->getMessage());
00012     }
00013 }
00014 
00015 class CommunicationException extends PredisException {              // Communication errors
00016     private $_connection;
00017 
00018     public function __construct(IConnectionSingle $connection, 
00019         $message = null, $code = null) {
00020 
00021         $this->_connection = $connection;
00022         parent::__construct($message, $code);
00023     }
00024 
00025     public function getConnection() { return $this->_connection; }
00026     public function shouldResetConnection() {  return true; }
00027 }
00028 
00029 class MalformedServerResponse extends CommunicationException { }    // Unexpected responses
00030 
00031 /* ------------------------------------------------------------------------- */
00032 
00033 class Client {
00034     private $_options, $_connection, $_serverProfile, $_responseReader;
00035 
00036     public function __construct($parameters = null, $clientOptions = null) {
00037         $this->setupClient($clientOptions ?: new ClientOptions());
00038         $this->setupConnection($parameters);
00039     }
00040 
00041     private static function filterClientOptions($options) {
00042         if ($options instanceof ClientOptions) {
00043             return $options;
00044         }
00045         if (is_array($options)) {
00046             return new ClientOptions($options);
00047         }
00048         if ($options instanceof RedisServerProfile) {
00049             return new ClientOptions(array(
00050                 'profile' => $options
00051             ));
00052         }
00053         if (is_string($options)) {
00054             return new ClientOptions(array(
00055                 'profile' => RedisServerProfile::get($options)
00056             ));
00057         }
00058         throw new \InvalidArgumentException("Invalid type for client options");
00059     }
00060 
00061     private function setupClient($options) {
00062         $this->_responseReader = new ResponseReader();
00063         $this->_options = self::filterClientOptions($options);
00064         $this->setProfile($this->_options->profile);
00065         if ($this->_options->iterable_multibulk === true) {
00066             $this->_responseReader->setHandler(
00067                 Protocol::PREFIX_MULTI_BULK, 
00068                 new ResponseMultiBulkStreamHandler()
00069             );
00070         }
00071         if ($this->_options->throw_on_error === false) {
00072             $this->_responseReader->setHandler(
00073                 Protocol::PREFIX_ERROR, 
00074                 new ResponseErrorSilentHandler()
00075             );
00076         }
00077     }
00078 
00079     private function setupConnection($parameters) {
00080         if ($parameters === null) {
00081             return $this->setConnection($this->createConnection(null));
00082         }
00083         if (!(is_array($parameters) || is_string($parameters) || $parameters instanceof IConnection 
00084             || $parameters instanceof ConnectionParameters)) {
00085             throw new \InvalidArgumentException(
00086                 'Array, String, Predis\ConnectionParameters or Predis\IConnection expected'
00087             );
00088         }
00089         if (is_array($parameters) && isset($parameters[0])) {
00090             $cluster = new ConnectionCluster($this->_options->key_distribution);
00091             foreach ($parameters as $shardParams) {
00092                 $cluster->add($this->createConnection($shardParams));
00093             }
00094             $this->setConnection($cluster);
00095         }
00096         else {
00097             $this->setConnection($this->createConnection($parameters));
00098         }
00099     }
00100 
00101     private function createConnection($parameters) {
00102         $params = null;
00103         $connection = null;
00104         if ($parameters instanceof IConnectionSingle) {
00105             $connection = $parameters;
00106             $params = $connection->getParameters();
00107         }
00108         else {
00109             $params = $parameters instanceof ConnectionParameters 
00110                         ? $parameters 
00111                         : new ConnectionParameters($parameters);
00112             $connection = ConnectionFactory::create($params, $this->_responseReader);
00113         }
00114         return $this->pushInitCommands($connection, $params);
00115     }
00116 
00117     private function pushInitCommands(IConnectionSingle $connection, ConnectionParameters $params) {
00118         if (isset($params->password)) {
00119             $connection->pushInitCommand($this->createCommand(
00120                 'auth', array($params->password)
00121             ));
00122         }
00123         if (isset($params->database)) {
00124             $connection->pushInitCommand($this->createCommand(
00125                 'select', array($params->database)
00126             ));
00127         }
00128         return $connection;
00129     }
00130 
00131     private function setConnection(IConnection $connection) {
00132         $this->_connection = $connection;
00133     }
00134 
00135     public function setProfile($serverProfile) {
00136         if (!($serverProfile instanceof RedisServerProfile || is_string($serverProfile))) {
00137             throw new \InvalidArgumentException(
00138                 "Invalid type for server profile, \Predis\RedisServerProfile or string expected"
00139             );
00140         }
00141         $this->_serverProfile = (is_string($serverProfile) 
00142             ? RedisServerProfile::get($serverProfile)
00143             : $serverProfile
00144         );
00145     }
00146 
00147     public function getProfile() {
00148         return $this->_serverProfile;
00149     }
00150 
00151     public function getResponseReader() {
00152         return $this->_responseReader;
00153     }
00154 
00155     public function getClientFor($connectionAlias) {
00156         if (!Utils::isCluster($this->_connection)) {
00157             throw new ClientException(
00158                 'This method is supported only when the client is connected to a cluster of connections'
00159             );
00160         }
00161 
00162         $connection = $this->_connection->getConnectionById($connectionAlias);
00163         if ($connection === null) {
00164             throw new \InvalidArgumentException(
00165                 "Invalid connection alias: '$connectionAlias'"
00166             );
00167         }
00168 
00169         $newClient = new Client();
00170         $newClient->setupClient($this->_options);
00171         $newClient->setConnection($connection);
00172         return $newClient;
00173     }
00174 
00175     public function connect() {
00176         $this->_connection->connect();
00177     }
00178 
00179     public function disconnect() {
00180         $this->_connection->disconnect();
00181     }
00182 
00183     public function isConnected() {
00184         return $this->_connection->isConnected();
00185     }
00186 
00187     public function getConnection($id = null) {
00188         if (!isset($id)) {
00189             return $this->_connection;
00190         }
00191         else {
00192             return Utils::isCluster($this->_connection)
00193                 ? $this->_connection->getConnectionById($id)
00194                 : $this->_connection;
00195         }
00196     }
00197 
00198     public function __call($method, $arguments) {
00199         $command = $this->_serverProfile->createCommand($method, $arguments);
00200         return $this->_connection->executeCommand($command);
00201     }
00202 
00203     public function createCommand($method, $arguments = array()) {
00204         return $this->_serverProfile->createCommand($method, $arguments);
00205     }
00206 
00207     public function executeCommand(ICommand $command) {
00208         return $this->_connection->executeCommand($command);
00209     }
00210 
00211     public function executeCommandOnShards(ICommand $command) {
00212         $replies = array();
00213         if (Utils::isCluster($this->_connection)) {
00214             foreach($this->_connection as $connection) {
00215                 $replies[] = $connection->executeCommand($command);
00216             }
00217         }
00218         else {
00219             $replies[] = $this->_connection->executeCommand($command);
00220         }
00221         return $replies;
00222     }
00223 
00224     private function sharedInitializer($argv, $initializer) {
00225         $argc = count($argv);
00226         if ($argc === 0) {
00227             return $this->$initializer();
00228         }
00229         else if ($argc === 1) {
00230             list($arg0) = $argv;
00231             return is_array($arg0) ? $this->$initializer($arg0) : $this->$initializer(null, $arg0);
00232         }
00233         else if ($argc === 2) {
00234             list($arg0, $arg1) = $argv;
00235             return $this->$initializer($arg0, $arg1);
00236         }
00237         return $this->$initializer($this, $arguments);
00238     }
00239 
00240     public function pipeline(/* arguments */) {
00241         return $this->sharedInitializer(func_get_args(), 'initPipeline');
00242     }
00243 
00244     private function initPipeline(Array $options = null, $pipelineBlock = null) {
00245         $pipeline = null;
00246         if (isset($options)) {
00247             if (isset($options['safe']) && $options['safe'] == true) {
00248                 $connection = $this->getConnection();
00249                 $pipeline   = new CommandPipeline($this, $connection instanceof Connection
00250                     ? new Pipeline\SafeExecutor($connection)
00251                     : new Pipeline\SafeClusterExecutor($connection)
00252                 );
00253             }
00254             else {
00255                 $pipeline = new CommandPipeline($this);
00256             }
00257         }
00258         else {
00259             $pipeline = new CommandPipeline($this);
00260         }
00261         return $this->pipelineExecute($pipeline, $pipelineBlock);
00262     }
00263 
00264     private function pipelineExecute(CommandPipeline $pipeline, $block) {
00265         return $block !== null ? $pipeline->execute($block) : $pipeline;
00266     }
00267 
00268     public function multiExec(/* arguments */) {
00269         return $this->sharedInitializer(func_get_args(), 'initMultiExec');
00270     }
00271 
00272     private function initMultiExec(Array $options = null, $transBlock = null) {
00273         $multi = isset($options) ? new MultiExecBlock($this, $options) : new MultiExecBlock($this);
00274         return $transBlock !== null ? $multi->execute($transBlock) : $multi;
00275     }
00276 
00277     public function pubSubContext() {
00278         return new PubSubContext($this);
00279     }
00280 }
00281 
00282 /* ------------------------------------------------------------------------- */
00283 
00284 interface IClientOptionsHandler {
00285     public function validate($option, $value);
00286     public function getDefault();
00287 }
00288 
00289 class ClientOptionsProfile implements IClientOptionsHandler {
00290     public function validate($option, $value) {
00291         if ($value instanceof RedisServerProfile) {
00292             return $value;
00293         }
00294         if (is_string($value)) {
00295             return RedisServerProfile::get($value);
00296         }
00297         throw new \InvalidArgumentException("Invalid value for option $option");
00298     }
00299 
00300     public function getDefault() {
00301         return RedisServerProfile::getDefault();
00302     }
00303 }
00304 
00305 class ClientOptionsKeyDistribution implements IClientOptionsHandler {
00306     public function validate($option, $value) {
00307         if ($value instanceof IDistributionStrategy) {
00308             return $value;
00309         }
00310         if (is_string($value)) {
00311             $valueReflection = new \ReflectionClass($value);
00312             if ($valueReflection->isSubclassOf('\Predis\Distribution\IDistributionStrategy')) {
00313                 return new $value;
00314             }
00315         }
00316         throw new \InvalidArgumentException("Invalid value for option $option");
00317     }
00318 
00319     public function getDefault() {
00320         return new Distribution\HashRing();
00321     }
00322 }
00323 
00324 class ClientOptionsIterableMultiBulk implements IClientOptionsHandler {
00325     public function validate($option, $value) {
00326         return (bool) $value;
00327     }
00328 
00329     public function getDefault() {
00330         return false;
00331     }
00332 }
00333 
00334 class ClientOptionsThrowOnError implements IClientOptionsHandler {
00335     public function validate($option, $value) {
00336         return (bool) $value;
00337     }
00338 
00339     public function getDefault() {
00340         return true;
00341     }
00342 }
00343 
00344 class ClientOptions {
00345     private static $_optionsHandlers;
00346     private $_options;
00347 
00348     public function __construct($options = null) {
00349         self::initializeOptionsHandlers();
00350         $this->initializeOptions($options ?: array());
00351     }
00352 
00353     private static function initializeOptionsHandlers() {
00354         if (!isset(self::$_optionsHandlers)) {
00355             self::$_optionsHandlers = self::getOptionsHandlers();
00356         }
00357     }
00358 
00359     private static function getOptionsHandlers() {
00360         return array(
00361             'profile' => new ClientOptionsProfile(),
00362             'key_distribution' => new ClientOptionsKeyDistribution(),
00363             'iterable_multibulk' => new ClientOptionsIterableMultiBulk(),
00364             'throw_on_error' => new ClientOptionsThrowOnError(),
00365         );
00366     }
00367 
00368     private function initializeOptions($options) {
00369         foreach ($options as $option => $value) {
00370             if (isset(self::$_optionsHandlers[$option])) {
00371                 $handler = self::$_optionsHandlers[$option];
00372                 $this->_options[$option] = $handler->validate($option, $value);
00373             }
00374         }
00375     }
00376 
00377     public function __get($option) {
00378         if (!isset($this->_options[$option])) {
00379             $defaultValue = self::$_optionsHandlers[$option]->getDefault();
00380             $this->_options[$option] = $defaultValue;
00381         }
00382         return $this->_options[$option];
00383     }
00384 
00385     public function __isset($option) {
00386         return isset(self::$_optionsHandlers[$option]);
00387     }
00388 }
00389 
00390 /* ------------------------------------------------------------------------- */
00391 
00392 class Protocol {
00393     const NEWLINE = "\r\n";
00394     const OK      = 'OK';
00395     const ERROR   = 'ERR';
00396     const QUEUED  = 'QUEUED';
00397     const NULL    = 'nil';
00398 
00399     const PREFIX_STATUS     = '+';
00400     const PREFIX_ERROR      = '-';
00401     const PREFIX_INTEGER    = ':';
00402     const PREFIX_BULK       = '$';
00403     const PREFIX_MULTI_BULK = '*';
00404 }
00405 
00406 interface ICommand {
00407     public function getCommandId();
00408     public function canBeHashed();
00409     public function closesConnection();
00410     public function getHash(IDistributionStrategy $distributor);
00411     public function setArgumentsArray(Array $arguments);
00412     public function getArguments();
00413     public function parseResponse($data);
00414     public function serialize();
00415 }
00416 
00417 abstract class Command implements ICommand {
00418     private $_arguments, $_hash;
00419 
00420     protected function serializeRequest($command, $arguments) {
00421         $newline = Protocol::NEWLINE;
00422         $cmdlen  = strlen($command);
00423         $reqlen  = count($arguments) + 1;
00424 
00425         $buffer = "*{$reqlen}{$newline}\${$cmdlen}{$newline}{$command}{$newline}";
00426         foreach ($arguments as $argument) {
00427             $arglen  = strlen($argument);
00428             $buffer .= "\${$arglen}{$newline}{$argument}{$newline}";
00429         }
00430 
00431         return $buffer;
00432     }
00433 
00434     public function canBeHashed() {
00435         return true;
00436     }
00437 
00438     public function getHash(IDistributionStrategy $distributor) {
00439         if (isset($this->_hash)) {
00440             return $this->_hash;
00441         }
00442         else {
00443             if (isset($this->_arguments[0])) {
00444                 // TODO: should we throw an exception if the command does 
00445                 //       not support sharding?
00446                 $key = $this->_arguments[0];
00447 
00448                 $start = strpos($key, '{');
00449                 $end   = strpos($key, '}');
00450                 if ($start !== false && $end !== false) {
00451                     $key = substr($key, ++$start, $end - $start);
00452                 }
00453 
00454                 $this->_hash = $distributor->generateKey($key);
00455                 return $this->_hash;
00456             }
00457         }
00458         return null;
00459     }
00460 
00461     public function closesConnection() {
00462         return false;
00463     }
00464 
00465     protected function filterArguments(Array $arguments) {
00466         return $arguments;
00467     }
00468 
00469     public function setArguments(/* arguments */) {
00470         $this->_arguments = $this->filterArguments(func_get_args());
00471         unset($this->_hash);
00472     }
00473 
00474     public function setArgumentsArray(Array $arguments) {
00475         $this->_arguments = $this->filterArguments($arguments);
00476         unset($this->_hash);
00477     }
00478 
00479     public function getArguments() {
00480         return isset($this->_arguments) ? $this->_arguments : array();
00481     }
00482 
00483     public function getArgument($index = 0) {
00484         return isset($this->_arguments[$index]) ? $this->_arguments[$index] : null;
00485     }
00486 
00487     public function parseResponse($data) {
00488         return $data;
00489     }
00490 
00491     public final function serialize() {
00492         return $this->serializeRequest($this->getCommandId(), $this->getArguments());
00493     }
00494 }
00495 
00496 /* ------------------------------------------------------------------------- */
00497 
00498 interface IResponseHandler {
00499     function handle(IConnectionSingle $connection, $payload);
00500 }
00501 
00502 class ResponseStatusHandler implements IResponseHandler {
00503     public function handle(IConnectionSingle $connection, $status) {
00504         if ($status === Protocol::OK) {
00505             return true;
00506         }
00507         else if ($status === Protocol::QUEUED) {
00508             return new ResponseQueued();
00509         }
00510         return $status;
00511     }
00512 }
00513 
00514 class ResponseErrorHandler implements IResponseHandler {
00515     public function handle(IConnectionSingle $connection, $errorMessage) {
00516         throw new ServerException(substr($errorMessage, 4));
00517     }
00518 }
00519 
00520 class ResponseErrorSilentHandler implements IResponseHandler {
00521     public function handle(IConnectionSingle $connection, $errorMessage) {
00522         return new ResponseError(substr($errorMessage, 4));
00523     }
00524 }
00525 
00526 class ResponseBulkHandler implements IResponseHandler {
00527     public function handle(IConnectionSingle $connection, $dataLength) {
00528         if (!is_numeric($dataLength)) {
00529             Utils::onCommunicationException(new MalformedServerResponse(
00530                 $connection, "Cannot parse '$dataLength' as data length"
00531             ));
00532         }
00533 
00534         if ($dataLength > 0) {
00535             $value = $connection->readBytes($dataLength);
00536             self::discardNewLine($connection);
00537             return $value;
00538         }
00539         else if ($dataLength == 0) {
00540             self::discardNewLine($connection);
00541             return '';
00542         }
00543 
00544         return null;
00545     }
00546 
00547     private static function discardNewLine(IConnectionSingle $connection) {
00548         if ($connection->readBytes(2) !== Protocol::NEWLINE) {
00549             Utils::onCommunicationException(new MalformedServerResponse(
00550                 $connection, 'Did not receive a new-line at the end of a bulk response'
00551             ));
00552         }
00553     }
00554 }
00555 
00556 class ResponseMultiBulkHandler implements IResponseHandler {
00557     public function handle(IConnectionSingle $connection, $rawLength) {
00558         if (!is_numeric($rawLength)) {
00559             Utils::onCommunicationException(new MalformedServerResponse(
00560                 $connection, "Cannot parse '$rawLength' as data length"
00561             ));
00562         }
00563 
00564         $listLength = (int) $rawLength;
00565         if ($listLength === -1) {
00566             return null;
00567         }
00568 
00569         $list = array();
00570 
00571         if ($listLength > 0) {
00572             $reader = $connection->getResponseReader();
00573             for ($i = 0; $i < $listLength; $i++) {
00574                 $list[] = $reader->read($connection);
00575             }
00576         }
00577 
00578         return $list;
00579     }
00580 }
00581 
00582 class ResponseMultiBulkStreamHandler implements IResponseHandler {
00583     public function handle(IConnectionSingle $connection, $rawLength) {
00584         if (!is_numeric($rawLength)) {
00585             Utils::onCommunicationException(new MalformedServerResponse(
00586                 $connection, "Cannot parse '$rawLength' as data length"
00587             ));
00588         }
00589         return new Shared\MultiBulkResponseIterator($connection, (int)$rawLength);
00590     }
00591 }
00592 
00593 class ResponseIntegerHandler implements IResponseHandler {
00594     public function handle(IConnectionSingle $connection, $number) {
00595         if (is_numeric($number)) {
00596             return (int) $number;
00597         }
00598         else {
00599             if ($number !== Protocol::NULL) {
00600                 Utils::onCommunicationException(new MalformedServerResponse(
00601                     $connection, "Cannot parse '$number' as numeric response"
00602                 ));
00603             }
00604             return null;
00605         }
00606     }
00607 }
00608 
00609 class ResponseReader {
00610     private $_prefixHandlers;
00611 
00612     public function __construct() {
00613         $this->initializePrefixHandlers();
00614     }
00615 
00616     private function initializePrefixHandlers() {
00617         $this->_prefixHandlers = array(
00618             Protocol::PREFIX_STATUS     => new ResponseStatusHandler(), 
00619             Protocol::PREFIX_ERROR      => new ResponseErrorHandler(), 
00620             Protocol::PREFIX_INTEGER    => new ResponseIntegerHandler(), 
00621             Protocol::PREFIX_BULK       => new ResponseBulkHandler(), 
00622             Protocol::PREFIX_MULTI_BULK => new ResponseMultiBulkHandler(), 
00623         );
00624     }
00625 
00626     public function setHandler($prefix, IResponseHandler $handler) {
00627         $this->_prefixHandlers[$prefix] = $handler;
00628     }
00629 
00630     public function getHandler($prefix) {
00631         if (isset($this->_prefixHandlers[$prefix])) {
00632             return $this->_prefixHandlers[$prefix];
00633         }
00634     }
00635 
00636     public function read(IConnectionSingle $connection) {
00637         $header = $connection->readLine();
00638         if ($header === '') {
00639             Utils::onCommunicationException(new MalformedServerResponse(
00640                 $connection, 'Unexpected empty header'
00641             ));
00642         }
00643 
00644         $prefix  = $header[0];
00645         $payload = strlen($header) > 1 ? substr($header, 1) : '';
00646 
00647         if (!isset($this->_prefixHandlers[$prefix])) {
00648             Utils::onCommunicationException(new MalformedServerResponse(
00649                 $connection, "Unknown prefix '$prefix'"
00650             ));
00651         }
00652 
00653         $handler = $this->_prefixHandlers[$prefix];
00654         return $handler->handle($connection, $payload);
00655     }
00656 }
00657 
00658 class ResponseError {
00659     private $_message;
00660 
00661     public function __construct($message) {
00662         $this->_message = $message;
00663     }
00664 
00665     public function __get($property) {
00666         if ($property == 'error') {
00667             return true;
00668         }
00669         if ($property == 'message') {
00670             return $this->_message;
00671         }
00672     }
00673 
00674     public function __isset($property) {
00675         return $property === 'error';
00676     }
00677 
00678     public function __toString() {
00679         return $this->_message;
00680     }
00681 }
00682 
00683 class ResponseQueued {
00684     public $queued = true;
00685 
00686     public function __toString() {
00687         return Protocol::QUEUED;
00688     }
00689 }
00690 
00691 /* ------------------------------------------------------------------------- */
00692 
00693 use Predis\Pipeline\IPipelineExecutor;
00694 
00695 class CommandPipeline {
00696     private $_redisClient, $_pipelineBuffer, $_returnValues, $_running, $_executor;
00697 
00698     public function __construct(Client $redisClient, IPipelineExecutor $executor = null) {
00699         $this->_redisClient    = $redisClient;
00700         $this->_executor       = $executor ?: new Pipeline\StandardExecutor();
00701         $this->_pipelineBuffer = array();
00702         $this->_returnValues   = array();
00703     }
00704 
00705     public function __call($method, $arguments) {
00706         $command = $this->_redisClient->createCommand($method, $arguments);
00707         $this->recordCommand($command);
00708         return $this;
00709     }
00710 
00711     private function recordCommand(ICommand $command) {
00712         $this->_pipelineBuffer[] = $command;
00713     }
00714 
00715     private function getRecordedCommands() {
00716         return $this->_pipelineBuffer;
00717     }
00718 
00719     public function flushPipeline() {
00720         if (count($this->_pipelineBuffer) > 0) {
00721             $connection = $this->_redisClient->getConnection();
00722             $this->_returnValues = array_merge(
00723                 $this->_returnValues, 
00724                 $this->_executor->execute($connection, $this->_pipelineBuffer)
00725             );
00726             $this->_pipelineBuffer = array();
00727         }
00728         return $this;
00729     }
00730 
00731     private function setRunning($bool) {
00732         if ($bool == true && $this->_running == true) {
00733             throw new ClientException("This pipeline is already opened");
00734         }
00735         $this->_running = $bool;
00736     }
00737 
00738     public function execute($block = null) {
00739         if ($block && !is_callable($block)) {
00740             throw new \InvalidArgumentException('Argument passed must be a callable object');
00741         }
00742 
00743         // TODO: do not reuse previously executed pipelines
00744         $this->setRunning(true);
00745         $pipelineBlockException = null;
00746 
00747         try {
00748             if ($block !== null) {
00749                 $block($this);
00750             }
00751             $this->flushPipeline();
00752         }
00753         catch (\Exception $exception) {
00754             $pipelineBlockException = $exception;
00755         }
00756 
00757         $this->setRunning(false);
00758 
00759         if ($pipelineBlockException !== null) {
00760             throw $pipelineBlockException;
00761         }
00762 
00763         return $this->_returnValues;
00764     }
00765 }
00766 
00767 class MultiExecBlock {
00768     private $_initialized, $_discarded, $_insideBlock;
00769     private $_redisClient, $_options, $_commands;
00770     private $_supportsWatch;
00771 
00772     public function __construct(Client $redisClient, Array $options = null) {
00773         $this->checkCapabilities($redisClient);
00774         $this->_initialized = false;
00775         $this->_discarded   = false;
00776         $this->_insideBlock = false;
00777         $this->_redisClient = $redisClient;
00778         $this->_options     = $options ?: array();
00779         $this->_commands    = array();
00780     }
00781 
00782     private function checkCapabilities(Client $redisClient) {
00783         if (Utils::isCluster($redisClient->getConnection())) {
00784             throw new ClientException(
00785                 'Cannot initialize a MULTI/EXEC context over a cluster of connections'
00786             );
00787         }
00788         $profile = $redisClient->getProfile();
00789         if ($profile->supportsCommands(array('multi', 'exec', 'discard')) === false) {
00790             throw new ClientException(
00791                 'The current profile does not support MULTI, EXEC and DISCARD commands'
00792             );
00793         }
00794         $this->_supportsWatch = $profile->supportsCommands(array('watch', 'unwatch'));
00795     }
00796 
00797     private function isWatchSupported() {
00798         if ($this->_supportsWatch === false) {
00799             throw new ClientException(
00800                 'The current profile does not support WATCH and UNWATCH commands'
00801             );
00802         }
00803     }
00804 
00805     private function initialize() {
00806         if ($this->_initialized === false) {
00807             if (isset($this->_options['watch'])) {
00808                 $this->watch($this->_options['watch']);
00809             }
00810             $this->_redisClient->multi();
00811             $this->_initialized = true;
00812             $this->_discarded   = false;
00813         }
00814     }
00815 
00816     private function setInsideBlock($value) {
00817         $this->_insideBlock = $value;
00818     }
00819 
00820     public function __call($method, $arguments) {
00821         $this->initialize();
00822         $command  = $this->_redisClient->createCommand($method, $arguments);
00823         $response = $this->_redisClient->executeCommand($command);
00824         if (isset($response->queued)) {
00825             $this->_commands[] = $command;
00826             return $this;
00827         }
00828         else {
00829             $this->malformedServerResponse('The server did not respond with a QUEUED status reply');
00830         }
00831     }
00832 
00833     public function watch($keys) {
00834         $this->isWatchSupported();
00835         if ($this->_initialized === true) {
00836             throw new ClientException('WATCH inside MULTI is not allowed');
00837         }
00838 
00839         $reply = null;
00840         if (is_array($keys)) {
00841             $reply = array();
00842             foreach ($keys as $key) {
00843                 $reply = $this->_redisClient->watch($keys);
00844             }
00845         }
00846         else {
00847             $reply = $this->_redisClient->watch($keys);
00848         }
00849         return $reply;
00850     }
00851 
00852     public function multi() {
00853         $this->initialize();
00854     }
00855 
00856     public function unwatch() {
00857         $this->isWatchSupported();
00858         $this->_redisClient->unwatch();
00859     }
00860 
00861     public function discard() {
00862         $this->_redisClient->discard();
00863         $this->_commands    = array();
00864         $this->_initialized = false;
00865         $this->_discarded   = true;
00866     }
00867 
00868     public function exec() {
00869         return $this->execute();
00870     }
00871 
00872     public function execute($block = null) {
00873         if ($this->_insideBlock === true) {
00874             throw new ClientException(
00875                 "Cannot invoke 'execute' or 'exec' inside an active client transaction block"
00876             );
00877         }
00878 
00879         if ($block && !is_callable($block)) {
00880             throw new \InvalidArgumentException('Argument passed must be a callable object');
00881         }
00882 
00883         $blockException = null;
00884         $returnValues   = array();
00885 
00886         if ($block !== null) {
00887             $this->setInsideBlock(true);
00888             try {
00889                 $block($this);
00890             }
00891             catch (CommunicationException $exception) {
00892                 $blockException = $exception;
00893             }
00894             catch (ServerException $exception) {
00895                 $blockException = $exception;
00896             }
00897             catch (\Exception $exception) {
00898                 $blockException = $exception;
00899                 if ($this->_initialized === true) {
00900                     $this->discard();
00901                 }
00902             }
00903             $this->setInsideBlock(false);
00904             if ($blockException !== null) {
00905                 throw $blockException;
00906             }
00907         }
00908 
00909         if ($this->_initialized === false) {
00910             return;
00911         }
00912 
00913         $reply = $this->_redisClient->exec();
00914         if ($reply === null) {
00915             throw new AbortedMultiExec('The current transaction has been aborted by the server');
00916         }
00917 
00918         $execReply = $reply instanceof \Iterator ? iterator_to_array($reply) : $reply;
00919         $commands  = &$this->_commands;
00920         $sizeofReplies = count($execReply);
00921 
00922         if ($sizeofReplies !== count($commands)) {
00923             $this->malformedServerResponse('Unexpected number of responses for a MultiExecBlock');
00924         }
00925 
00926         for ($i = 0; $i < $sizeofReplies; $i++) {
00927             $returnValues[] = $commands[$i]->parseResponse($execReply[$i] instanceof \Iterator
00928                 ? iterator_to_array($execReply[$i])
00929                 : $execReply[$i]
00930             );
00931             unset($commands[$i]);
00932         }
00933 
00934         return $returnValues;
00935     }
00936 
00937     private function malformedServerResponse($message) {
00938         // NOTE: a MULTI/EXEC block cannot be initialized on a clustered 
00939         //       connection, which means that Predis\Client::getConnection 
00940         //       will always return an instance of Predis\Connection.
00941         Utils::onCommunicationException(new MalformedServerResponse(
00942             $this->_redisClient->getConnection(), $message
00943         ));
00944     }
00945 }
00946 
00947 class PubSubContext implements \Iterator {
00948     const SUBSCRIBE    = 'subscribe';
00949     const UNSUBSCRIBE  = 'unsubscribe';
00950     const PSUBSCRIBE   = 'psubscribe';
00951     const PUNSUBSCRIBE = 'punsubscribe';
00952     const MESSAGE      = 'message';
00953     const PMESSAGE     = 'pmessage';
00954 
00955     const STATUS_VALID       = 0x0001;
00956     const STATUS_SUBSCRIBED  = 0x0010;
00957     const STATUS_PSUBSCRIBED = 0x0100;
00958 
00959     private $_redisClient, $_subscriptions, $_isStillValid, $_position;
00960 
00961     public function __construct(Client $redisClient) {
00962         $this->checkCapabilities($redisClient);
00963         $this->_redisClient = $redisClient;
00964         $this->_statusFlags = self::STATUS_VALID;
00965     }
00966 
00967     public function __destruct() {
00968         $this->closeContext();
00969     }
00970 
00971     private function checkCapabilities(Client $redisClient) {
00972         if (Utils::isCluster($redisClient->getConnection())) {
00973             throw new ClientException(
00974                 'Cannot initialize a PUB/SUB context over a cluster of connections'
00975             );
00976         }
00977         $profile = $redisClient->getProfile();
00978         $commands = array('publish', 'subscribe', 'unsubscribe', 'psubscribe', 'punsubscribe');
00979         if ($profile->supportsCommands($commands) === false) {
00980             throw new ClientException(
00981                 'The current profile does not support PUB/SUB related commands'
00982             );
00983         }
00984     }
00985 
00986     private function isFlagSet($value) {
00987         return ($this->_statusFlags & $value) === $value;
00988     }
00989 
00990     public function subscribe(/* arguments */) {
00991         $this->writeCommand(self::SUBSCRIBE, func_get_args());
00992         $this->_statusFlags |= self::STATUS_SUBSCRIBED;
00993     }
00994 
00995     public function unsubscribe(/* arguments */) {
00996         $this->writeCommand(self::UNSUBSCRIBE, func_get_args());
00997     }
00998 
00999     public function psubscribe(/* arguments */) {
01000         $this->writeCommand(self::PSUBSCRIBE, func_get_args());
01001         $this->_statusFlags |= self::STATUS_PSUBSCRIBED;
01002     }
01003 
01004     public function punsubscribe(/* arguments */) {
01005         $this->writeCommand(self::PUNSUBSCRIBE, func_get_args());
01006     }
01007 
01008     public function closeContext() {
01009         if ($this->valid()) {
01010             if ($this->isFlagSet(self::STATUS_SUBSCRIBED)) {
01011                 $this->unsubscribe();
01012             }
01013             if ($this->isFlagSet(self::STATUS_PSUBSCRIBED)) {
01014                 $this->punsubscribe();
01015             }
01016         }
01017     }
01018 
01019     private function writeCommand($method, $arguments) {
01020         if (count($arguments) === 1 && is_array($arguments[0])) {
01021             $arguments = $arguments[0];
01022         }
01023         $command = $this->_redisClient->createCommand($method, $arguments);
01024         $this->_redisClient->getConnection()->writeCommand($command);
01025     }
01026 
01027     public function rewind() {
01028         // NOOP
01029     }
01030 
01031     public function current() {
01032         return $this->getValue();
01033     }
01034 
01035     public function key() {
01036         return $this->_position;
01037     }
01038 
01039     public function next() {
01040         if ($this->isFlagSet(self::STATUS_VALID)) {
01041             $this->_position++;
01042         }
01043         return $this->_position;
01044     }
01045 
01046     public function valid() {
01047         $subscriptions = self::STATUS_SUBSCRIBED + self::STATUS_PSUBSCRIBED;
01048         return $this->isFlagSet(self::STATUS_VALID) 
01049             && ($this->_statusFlags & $subscriptions) > 0;
01050     }
01051 
01052     private function invalidate() {
01053         $this->_statusFlags = 0x0000;
01054     }
01055 
01056     private function getValue() {
01057         $reader     = $this->_redisClient->getResponseReader();
01058         $connection = $this->_redisClient->getConnection();
01059         $response   = $reader->read($connection);
01060 
01061         switch ($response[0]) {
01062             case self::SUBSCRIBE:
01063             case self::UNSUBSCRIBE:
01064             case self::PSUBSCRIBE:
01065             case self::PUNSUBSCRIBE:
01066                 if ($response[2] === 0) {
01067                     $this->invalidate();
01068                 }
01069             case self::MESSAGE:
01070                 return (object) array(
01071                     'kind'    => $response[0],
01072                     'channel' => $response[1],
01073                     'payload' => $response[2],
01074                 );
01075             case self::PMESSAGE:
01076                 return (object) array(
01077                     'kind'    => $response[0],
01078                     'pattern' => $response[1],
01079                     'channel' => $response[2],
01080                     'payload' => $response[3],
01081                 );
01082             default:
01083                 throw new ClientException(
01084                     "Received an unknown message type {$response[0]} inside of a pubsub context"
01085                 );
01086         }
01087     }
01088 }
01089 
01090 /* ------------------------------------------------------------------------- */
01091 
01092 class ConnectionParameters {
01093     const DEFAULT_SCHEME = 'tcp';
01094     const DEFAULT_HOST = '127.0.0.1';
01095     const DEFAULT_PORT = 6379;
01096     const DEFAULT_TIMEOUT = 5;
01097     private $_parameters;
01098 
01099     public function __construct($parameters = null) {
01100         $parameters = $parameters ?: array();
01101         $this->_parameters = is_array($parameters) 
01102             ? self::filterConnectionParams($parameters) 
01103             : self::parseURI($parameters);
01104     }
01105 
01106     private static function paramsExtractor($params, $kv) {
01107         @list($k, $v) = explode('=', $kv);
01108         $params[$k] = $v;
01109         return $params;
01110     }
01111 
01112     private static function parseURI($uri) {
01113         $parsed = @parse_url($uri);
01114         if ($parsed == false || $parsed['host'] == null) {
01115             throw new ClientException("Invalid URI: $uri");
01116         }
01117         if (array_key_exists('query', $parsed)) {
01118             $query  = explode('&', $parsed['query']);
01119             $parsed = array_reduce($query, 'self::paramsExtractor', $parsed);
01120         }
01121         return self::filterConnectionParams($parsed);
01122     }
01123 
01124     private static function getParamOrDefault(Array $parameters, $param, $default = null) {
01125         return array_key_exists($param, $parameters) ? $parameters[$param] : $default;
01126     }
01127 
01128     private static function filterConnectionParams($parameters) {
01129         return array(
01130             'scheme' => self::getParamOrDefault($parameters, 'scheme', self::DEFAULT_SCHEME), 
01131             'host' => self::getParamOrDefault($parameters, 'host', self::DEFAULT_HOST), 
01132             'port' => (int) self::getParamOrDefault($parameters, 'port', self::DEFAULT_PORT), 
01133             'database' => self::getParamOrDefault($parameters, 'database'), 
01134             'password' => self::getParamOrDefault($parameters, 'password'), 
01135             'connection_async'   => self::getParamOrDefault($parameters, 'connection_async', false), 
01136             'connection_persistent' => self::getParamOrDefault($parameters, 'connection_persistent', false), 
01137             'connection_timeout' => self::getParamOrDefault($parameters, 'connection_timeout', self::DEFAULT_TIMEOUT), 
01138             'read_write_timeout' => self::getParamOrDefault($parameters, 'read_write_timeout'), 
01139             'alias'  => self::getParamOrDefault($parameters, 'alias'), 
01140             'weight' => self::getParamOrDefault($parameters, 'weight'), 
01141         );
01142     }
01143 
01144     public function __get($parameter) {
01145         return $this->_parameters[$parameter];
01146     }
01147 
01148     public function __isset($parameter) {
01149         return isset($this->_parameters[$parameter]);
01150     }
01151 }
01152 
01153 interface IConnection {
01154     public function connect();
01155     public function disconnect();
01156     public function isConnected();
01157     public function writeCommand(ICommand $command);
01158     public function readResponse(ICommand $command);
01159     public function executeCommand(ICommand $command);
01160 }
01161 
01162 interface IConnectionSingle extends IConnection {
01163     public function writeBytes($buffer);
01164     public function readBytes($length);
01165     public function readLine();
01166 }
01167 
01168 interface IConnectionCluster extends IConnection {
01169     public function getConnection(ICommand $command);
01170     public function getConnectionById($connectionId);
01171 }
01172 
01173 final class ConnectionFactory {
01174     private static $_registeredSchemes;
01175 
01176     private function __construct() {
01177         // NOOP
01178     }
01179 
01180     private static function ensureInitialized() {
01181         if (!isset(self::$_registeredSchemes)) {
01182             self::$_registeredSchemes = self::getDefaultSchemes();
01183         }
01184     }
01185 
01186     private static function getDefaultSchemes() {
01187         return array(
01188             'tcp'   => '\Predis\TcpConnection',
01189             'redis' => '\Predis\TcpConnection', // compatibility with older versions
01190         );
01191     }
01192 
01193     public static function registerScheme($scheme, $connectionClass) {
01194         self::ensureInitialized();
01195         $connectionReflection = new \ReflectionClass($connectionClass);
01196         if (!$connectionReflection->isSubclassOf('\Predis\IConnectionSingle')) {
01197             throw new ClientException(
01198                 "Cannot register '$connectionClass' as it is not a valid connection class"
01199             );
01200         }
01201         self::$_registeredSchemes[$scheme] = $connectionClass;
01202     }
01203 
01204     public static function create(ConnectionParameters $parameters, ResponseReader $reader) {
01205         self::ensureInitialized();
01206         if (!isset(self::$_registeredSchemes[$parameters->scheme])) {
01207             throw new ClientException("Unknown connection scheme: {$parameters->scheme}");
01208         }
01209         $connection = self::$_registeredSchemes[$parameters->scheme];
01210         return new $connection($parameters, $reader);
01211     }
01212 }
01213 
01214 abstract class Connection implements IConnectionSingle {
01215     protected $_params, $_socket, $_initCmds, $_reader;
01216 
01217     public function __construct(ConnectionParameters $parameters, ResponseReader $reader = null) {
01218         $this->_params   = $parameters;
01219         $this->_initCmds = array();
01220         $this->_reader   = $reader ?: new ResponseReader();
01221     }
01222 
01223     public function __destruct() {
01224         $this->disconnect();
01225     }
01226 
01227     public function isConnected() {
01228         return is_resource($this->_socket);
01229     }
01230 
01231     protected abstract function createResource();
01232 
01233     public function connect() {
01234         if ($this->isConnected()) {
01235             throw new ClientException('Connection already estabilished');
01236         }
01237         $this->createResource();
01238     }
01239 
01240     public function disconnect() {
01241         if ($this->isConnected()) {
01242             fclose($this->_socket);
01243         }
01244     }
01245 
01246     public function pushInitCommand(ICommand $command){
01247         $this->_initCmds[] = $command;
01248     }
01249 
01250     public function executeCommand(ICommand $command) {
01251         $this->writeCommand($command);
01252         if ($command->closesConnection()) {
01253             return $this->disconnect();
01254         }
01255         return $this->readResponse($command);
01256     }
01257 
01258     protected function onCommunicationException($message, $code = null) {
01259         Utils::onCommunicationException(
01260             new CommunicationException($this, $message, $code)
01261         );
01262     }
01263 
01264     public function getSocket() {
01265         if (!$this->isConnected()) {
01266             $this->connect();
01267         }
01268         return $this->_socket;
01269     }
01270 
01271     public function getResponseReader() {
01272         return $this->_reader;
01273     }
01274 
01275     public function getParameters() {
01276         return $this->_params;
01277     }
01278 
01279     public function __toString() {
01280         return sprintf('%s:%d', $this->_params->host, $this->_params->port);
01281     }
01282 }
01283 
01284 class TcpConnection extends Connection implements IConnectionSingle {
01285     public function __construct(ConnectionParameters $parameters, ResponseReader $reader = null) {
01286         parent::__construct($this->checkParameters($parameters), $reader);
01287     }
01288 
01289     public function __destruct() {
01290         if (!$this->_params->connection_persistent) {
01291             $this->disconnect();
01292         }
01293     }
01294 
01295     private function checkParameters(ConnectionParameters $parameters) {
01296         if ($parameters->scheme != 'tcp' && $parameters->scheme != 'redis') {
01297             throw new \InvalidArgumentException("Invalid scheme: {$parameters->scheme}");
01298         }
01299         return $parameters;
01300     }
01301 
01302     protected function createResource() {
01303         $uri = sprintf('tcp://%s:%d/', $this->_params->host, $this->_params->port);
01304         $connectFlags = STREAM_CLIENT_CONNECT;
01305         if ($this->_params->connection_async) {
01306             $connectFlags |= STREAM_CLIENT_ASYNC_CONNECT;
01307         }
01308         if ($this->_params->connection_persistent) {
01309             $connectFlags |= STREAM_CLIENT_PERSISTENT;
01310         }
01311         $this->_socket = @stream_socket_client(
01312             $uri, $errno, $errstr, $this->_params->connection_timeout, $connectFlags
01313         );
01314 
01315         if (!$this->_socket) {
01316             $this->onCommunicationException(trim($errstr), $errno);
01317         }
01318 
01319         if (isset($this->_params->read_write_timeout)) {
01320             $timeoutSeconds  = floor($this->_params->read_write_timeout);
01321             $timeoutUSeconds = ($this->_params->read_write_timeout - $timeoutSeconds) * 1000000;
01322             stream_set_timeout($this->_socket, $timeoutSeconds, $timeoutUSeconds);
01323         }
01324     }
01325 
01326     private function sendInitializationCommands() {
01327         foreach ($this->_initCmds as $command) {
01328             $this->writeCommand($command);
01329         }
01330         foreach ($this->_initCmds as $command) {
01331             $this->readResponse($command);
01332         }
01333     }
01334 
01335     public function connect() {
01336         parent::connect();
01337         if (count($this->_initCmds) > 0){
01338             $this->sendInitializationCommands();
01339         }
01340     }
01341 
01342     public function writeCommand(ICommand $command) {
01343         $this->writeBytes($command->serialize());
01344     }
01345 
01346     public function readResponse(ICommand $command) {
01347         $response = $this->_reader->read($this);
01348         $skipparse = isset($response->queued) || isset($response->error);
01349         return $skipparse ? $response : $command->parseResponse($response);
01350     }
01351 
01352     public function rawCommand($rawCommandData, $closesConnection = false) {
01353         $this->writeBytes($rawCommandData);
01354         if ($closesConnection) {
01355             $this->disconnect();
01356             return;
01357         }
01358         return $this->_reader->read($this);
01359     }
01360 
01361     public function writeBytes($value) {
01362         $socket = $this->getSocket();
01363         while (($length = strlen($value)) > 0) {
01364             $written = fwrite($socket, $value);
01365             if ($length === $written) {
01366                 return true;
01367             }
01368             if ($written === false || $written === 0) {
01369                 $this->onCommunicationException('Error while writing bytes to the server');
01370             }
01371             $value = substr($value, $written);
01372         }
01373         return true;
01374     }
01375 
01376     public function readBytes($length) {
01377         if ($length == 0) {
01378             throw new \InvalidArgumentException('Length parameter must be greater than 0');
01379         }
01380         $socket = $this->getSocket();
01381         $value  = '';
01382         do {
01383             $chunk = fread($socket, $length);
01384             if ($chunk === false || $chunk === '') {
01385                 $this->onCommunicationException('Error while reading bytes from the server');
01386             }
01387             $value .= $chunk;
01388         }
01389         while (($length -= strlen($chunk)) > 0);
01390         return $value;
01391     }
01392 
01393     public function readLine() {
01394         $socket = $this->getSocket();
01395         $value  = '';
01396         do {
01397             $chunk = fgets($socket);
01398             if ($chunk === false || strlen($chunk) == 0) {
01399                 $this->onCommunicationException('Error while reading line from the server');
01400             }
01401             $value .= $chunk;
01402         }
01403         while (substr($value, -2) !== Protocol::NEWLINE);
01404         return substr($value, 0, -2);
01405     }
01406 }
01407 
01408 class ConnectionCluster implements IConnectionCluster, \IteratorAggregate {
01409     private $_pool, $_distributor;
01410 
01411     public function __construct(IDistributionStrategy $distributor = null) {
01412         $this->_pool = array();
01413         $this->_distributor = $distributor ?: new Distribution\HashRing();
01414     }
01415 
01416     public function isConnected() {
01417         foreach ($this->_pool as $connection) {
01418             if ($connection->isConnected()) {
01419                 return true;
01420             }
01421         }
01422         return false;
01423     }
01424 
01425     public function connect() {
01426         foreach ($this->_pool as $connection) {
01427             $connection->connect();
01428         }
01429     }
01430 
01431     public function disconnect() {
01432         foreach ($this->_pool as $connection) {
01433             $connection->disconnect();
01434         }
01435     }
01436 
01437     public function add(IConnectionSingle $connection) {
01438         $parameters = $connection->getParameters();
01439         if (isset($parameters->alias)) {
01440             $this->_pool[$parameters->alias] = $connection;
01441         }
01442         else {
01443             $this->_pool[] = $connection;
01444         }
01445         $this->_distributor->add($connection, $parameters->weight);
01446     }
01447 
01448     public function getConnection(ICommand $command) {
01449         if ($command->canBeHashed() === false) {
01450             throw new ClientException(
01451                 sprintf("Cannot send '%s' commands to a cluster of connections", $command->getCommandId())
01452             );
01453         }
01454         return $this->_distributor->get($command->getHash($this->_distributor));
01455     }
01456 
01457     public function getConnectionById($id = null) {
01458         $alias = $id ?: 0;
01459         return isset($this->_pool[$alias]) ? $this->_pool[$alias] : null;
01460     }
01461 
01462     public function getIterator() {
01463         return new \ArrayIterator($this->_pool);
01464     }
01465 
01466     public function writeCommand(ICommand $command) {
01467         $this->getConnection($command)->writeCommand($command);
01468     }
01469 
01470     public function readResponse(ICommand $command) {
01471         return $this->getConnection($command)->readResponse($command);
01472     }
01473 
01474     public function executeCommand(ICommand $command) {
01475         $connection = $this->getConnection($command);
01476         $connection->writeCommand($command);
01477         return $connection->readResponse($command);
01478     }
01479 }
01480 
01481 /* ------------------------------------------------------------------------- */
01482 
01483 abstract class RedisServerProfile {
01484     private static $_serverProfiles;
01485     private $_registeredCommands;
01486 
01487     public function __construct() {
01488         $this->_registeredCommands = $this->getSupportedCommands();
01489     }
01490 
01491     public abstract function getVersion();
01492 
01493     protected abstract function getSupportedCommands();
01494 
01495     public static function getDefault() {
01496         return self::get('default');
01497     }
01498 
01499     public static function getDevelopment() {
01500         return self::get('dev');
01501     }
01502 
01503     private static function predisServerProfiles() {
01504         return array(
01505             '1.2'     => '\Predis\RedisServer_v1_2',
01506             '2.0'     => '\Predis\RedisServer_v2_0',
01507             'default' => '\Predis\RedisServer_v2_0',
01508             'dev'     => '\Predis\RedisServer_vNext',
01509         );
01510     }
01511 
01512     public static function registerProfile($profileClass, $aliases) {
01513         if (!isset(self::$_serverProfiles)) {
01514             self::$_serverProfiles = self::predisServerProfiles();
01515         }
01516 
01517         $profileReflection = new \ReflectionClass($profileClass);
01518 
01519         if (!$profileReflection->isSubclassOf('\Predis\RedisServerProfile')) {
01520             throw new ClientException("Cannot register '$profileClass' as it is not a valid profile class");
01521         }
01522 
01523         if (is_array($aliases)) {
01524             foreach ($aliases as $alias) {
01525                 self::$_serverProfiles[$alias] = $profileClass;
01526             }
01527         }
01528         else {
01529             self::$_serverProfiles[$aliases] = $profileClass;
01530         }
01531     }
01532 
01533     public static function get($version) {
01534         if (!isset(self::$_serverProfiles)) {
01535             self::$_serverProfiles = self::predisServerProfiles();
01536         }
01537         if (!isset(self::$_serverProfiles[$version])) {
01538             throw new ClientException("Unknown server profile: $version");
01539         }
01540         $profile = self::$_serverProfiles[$version];
01541         return new $profile();
01542     }
01543 
01544     public function supportsCommands(Array $commands) {
01545         foreach ($commands as $command) {
01546             if ($this->supportsCommand($command) === false) {
01547                 return false;
01548             }
01549         }
01550         return true;
01551     }
01552 
01553     public function supportsCommand($command) {
01554         return isset($this->_registeredCommands[$command]);
01555     }
01556 
01557     public function createCommand($method, $arguments = array()) {
01558         if (!isset($this->_registeredCommands[$method])) {
01559             throw new ClientException("'$method' is not a registered Redis command");
01560         }
01561         $commandClass = $this->_registeredCommands[$method];
01562         $command = new $commandClass();
01563         $command->setArgumentsArray($arguments);
01564         return $command;
01565     }
01566 
01567     public function registerCommands(Array $commands) {
01568         foreach ($commands as $command => $aliases) {
01569             $this->registerCommand($command, $aliases);
01570         }
01571     }
01572 
01573     public function registerCommand($command, $aliases) {
01574         $commandReflection = new \ReflectionClass($command);
01575 
01576         if (!$commandReflection->isSubclassOf('\Predis\Command')) {
01577             throw new ClientException("Cannot register '$command' as it is not a valid Redis command");
01578         }
01579 
01580         if (is_array($aliases)) {
01581             foreach ($aliases as $alias) {
01582                 $this->_registeredCommands[$alias] = $command;
01583             }
01584         }
01585         else {
01586             $this->_registeredCommands[$aliases] = $command;
01587         }
01588     }
01589 
01590     public function __toString() {
01591         return $this->getVersion();
01592     }
01593 }
01594 
01595 class RedisServer_v1_2 extends RedisServerProfile {
01596     public function getVersion() { return '1.2'; }
01597     public function getSupportedCommands() {
01598         return array(
01599             /* miscellaneous commands */
01600             'ping'                      => '\Predis\Commands\Ping',
01601             'echo'                      => '\Predis\Commands\DoEcho',
01602             'auth'                      => '\Predis\Commands\Auth',
01603 
01604             /* connection handling */
01605             'quit'                      => '\Predis\Commands\Quit',
01606 
01607             /* commands operating on string values */
01608             'set'                       => '\Predis\Commands\Set',
01609             'setnx'                     => '\Predis\Commands\SetPreserve',
01610             'mset'                      => '\Predis\Commands\SetMultiple',
01611             'msetnx'                    => '\Predis\Commands\SetMultiplePreserve',
01612             'get'                       => '\Predis\Commands\Get',
01613             'mget'                      => '\Predis\Commands\GetMultiple',
01614             'getset'                    => '\Predis\Commands\GetSet',
01615             'incr'                      => '\Predis\Commands\Increment',
01616             'incrby'                    => '\Predis\Commands\IncrementBy',
01617             'decr'                      => '\Predis\Commands\Decrement',
01618             'decrby'                    => '\Predis\Commands\DecrementBy',
01619             'exists'                    => '\Predis\Commands\Exists',
01620             'del'                       => '\Predis\Commands\Delete',
01621             'type'                      => '\Predis\Commands\Type',
01622 
01623             /* commands operating on the key space */
01624             'keys'                      => '\Predis\Commands\Keys',
01625             'randomkey'                 => '\Predis\Commands\RandomKey',
01626             'rename'                    => '\Predis\Commands\Rename',
01627             'renamenx'                  => '\Predis\Commands\RenamePreserve',
01628             'expire'                    => '\Predis\Commands\Expire',
01629             'expireat'                  => '\Predis\Commands\ExpireAt',
01630             'dbsize'                    => '\Predis\Commands\DatabaseSize',
01631             'ttl'                       => '\Predis\Commands\TimeToLive',
01632 
01633             /* commands operating on lists */
01634             'rpush'                     => '\Predis\Commands\ListPushTail',
01635             'lpush'                     => '\Predis\Commands\ListPushHead',
01636             'llen'                      => '\Predis\Commands\ListLength',
01637             'lrange'                    => '\Predis\Commands\ListRange',
01638             'ltrim'                     => '\Predis\Commands\ListTrim',
01639             'lindex'                    => '\Predis\Commands\ListIndex',
01640             'lset'                      => '\Predis\Commands\ListSet',
01641             'lrem'                      => '\Predis\Commands\ListRemove',
01642             'lpop'                      => '\Predis\Commands\ListPopFirst',
01643             'rpop'                      => '\Predis\Commands\ListPopLast',
01644             'rpoplpush'                 => '\Predis\Commands\ListPopLastPushHead',
01645 
01646             /* commands operating on sets */
01647             'sadd'                      => '\Predis\Commands\SetAdd',
01648             'srem'                      => '\Predis\Commands\SetRemove',
01649             'spop'                      => '\Predis\Commands\SetPop',
01650             'smove'                     => '\Predis\Commands\SetMove',
01651             'scard'                     => '\Predis\Commands\SetCardinality',
01652             'sismember'                 => '\Predis\Commands\SetIsMember',
01653             'sinter'                    => '\Predis\Commands\SetIntersection',
01654             'sinterstore'               => '\Predis\Commands\SetIntersectionStore',
01655             'sunion'                    => '\Predis\Commands\SetUnion',
01656             'sunionstore'               => '\Predis\Commands\SetUnionStore',
01657             'sdiff'                     => '\Predis\Commands\SetDifference',
01658             'sdiffstore'                => '\Predis\Commands\SetDifferenceStore',
01659             'smembers'                  => '\Predis\Commands\SetMembers',
01660             'srandmember'               => '\Predis\Commands\SetRandomMember',
01661 
01662             /* commands operating on sorted sets */
01663             'zadd'                      => '\Predis\Commands\ZSetAdd',
01664             'zincrby'                   => '\Predis\Commands\ZSetIncrementBy',
01665             'zrem'                      => '\Predis\Commands\ZSetRemove',
01666             'zrange'                    => '\Predis\Commands\ZSetRange',
01667             'zrevrange'                 => '\Predis\Commands\ZSetReverseRange',
01668             'zrangebyscore'             => '\Predis\Commands\ZSetRangeByScore',
01669             'zcard'                     => '\Predis\Commands\ZSetCardinality',
01670             'zscore'                    => '\Predis\Commands\ZSetScore',
01671             'zremrangebyscore'          => '\Predis\Commands\ZSetRemoveRangeByScore',
01672 
01673             /* multiple databases handling commands */
01674             'select'                    => '\Predis\Commands\SelectDatabase',
01675             'move'                      => '\Predis\Commands\MoveKey',
01676             'flushdb'                   => '\Predis\Commands\FlushDatabase',
01677             'flushall'                  => '\Predis\Commands\FlushAll',
01678 
01679             /* sorting */
01680             'sort'                      => '\Predis\Commands\Sort',
01681 
01682             /* remote server control commands */
01683             'info'                      => '\Predis\Commands\Info',
01684             'slaveof'                   => '\Predis\Commands\SlaveOf',
01685 
01686             /* persistence control commands */
01687             'save'                      => '\Predis\Commands\Save',
01688             'bgsave'                    => '\Predis\Commands\BackgroundSave',
01689             'lastsave'                  => '\Predis\Commands\LastSave',
01690             'shutdown'                  => '\Predis\Commands\Shutdown',
01691             'bgrewriteaof'              =>  '\Predis\Commands\BackgroundRewriteAppendOnlyFile',
01692         );
01693     }
01694 }
01695 
01696 class RedisServer_v2_0 extends RedisServer_v1_2 {
01697     public function getVersion() { return '2.0'; }
01698     public function getSupportedCommands() {
01699         return array_merge(parent::getSupportedCommands(), array(
01700             /* transactions */
01701             'multi'                     => '\Predis\Commands\Multi',
01702             'exec'                      => '\Predis\Commands\Exec',
01703             'discard'                   => '\Predis\Commands\Discard',
01704 
01705             /* commands operating on string values */
01706             'setex'                     => '\Predis\Commands\SetExpire',
01707             'append'                    => '\Predis\Commands\Append',
01708             'substr'                    => '\Predis\Commands\Substr',
01709 
01710             /* commands operating on lists */
01711             'blpop'                     => '\Predis\Commands\ListPopFirstBlocking',
01712             'brpop'                     => '\Predis\Commands\ListPopLastBlocking',
01713 
01714             /* commands operating on sorted sets */
01715             'zunionstore'               => '\Predis\Commands\ZSetUnionStore',
01716             'zinterstore'               => '\Predis\Commands\ZSetIntersectionStore',
01717             'zcount'                    => '\Predis\Commands\ZSetCount',
01718             'zrank'                     => '\Predis\Commands\ZSetRank',
01719             'zrevrank'                  => '\Predis\Commands\ZSetReverseRank',
01720             'zremrangebyrank'           => '\Predis\Commands\ZSetRemoveRangeByRank',
01721 
01722             /* commands operating on hashes */
01723             'hset'                      => '\Predis\Commands\HashSet',
01724             'hsetnx'                    => '\Predis\Commands\HashSetPreserve',
01725             'hmset'                     => '\Predis\Commands\HashSetMultiple',
01726             'hincrby'                   => '\Predis\Commands\HashIncrementBy',
01727             'hget'                      => '\Predis\Commands\HashGet',
01728             'hmget'                     => '\Predis\Commands\HashGetMultiple',
01729             'hdel'                      => '\Predis\Commands\HashDelete',
01730             'hexists'                   => '\Predis\Commands\HashExists',
01731             'hlen'                      => '\Predis\Commands\HashLength',
01732             'hkeys'                     => '\Predis\Commands\HashKeys',
01733             'hvals'                     => '\Predis\Commands\HashValues',
01734             'hgetall'                   => '\Predis\Commands\HashGetAll',
01735 
01736             /* publish - subscribe */
01737             'subscribe'                 => '\Predis\Commands\Subscribe',
01738             'unsubscribe'               => '\Predis\Commands\Unsubscribe',
01739             'psubscribe'                => '\Predis\Commands\SubscribeByPattern',
01740             'punsubscribe'              => '\Predis\Commands\UnsubscribeByPattern',
01741             'publish'                   => '\Predis\Commands\Publish',
01742 
01743             /* remote server control commands */
01744             'config'                    => '\Predis\Commands\Config',
01745         ));
01746     }
01747 }
01748 
01749 class RedisServer_vNext extends RedisServer_v2_0 {
01750     public function getVersion() { return '2.1'; }
01751     public function getSupportedCommands() {
01752         return array_merge(parent::getSupportedCommands(), array(
01753             /* transactions */
01754             'watch'                     => '\Predis\Commands\Watch',
01755             'unwatch'                   => '\Predis\Commands\Unwatch',
01756 
01757             /* commands operating on string values */
01758             'strlen'                    => '\Predis\Commands\Strlen',
01759 
01760             /* commands operating on lists */
01761             'rpushx'                    => '\Predis\Commands\ListPushTailX',
01762             'lpushx'                    => '\Predis\Commands\ListPushHeadX',
01763             'linsert'                   => '\Predis\Commands\ListInsert',
01764         ));
01765     }
01766 }
01767 
01768 /* ------------------------------------------------------------------------- */
01769 
01770 namespace Predis\Pipeline;
01771 use Predis\IConnection, Predis\ServerException, Predis\CommunicationException;
01772 
01773 interface IPipelineExecutor {
01774     public function execute(IConnection $connection, &$commands);
01775 }
01776 
01777 class StandardExecutor implements IPipelineExecutor {
01778     public function execute(IConnection $connection, &$commands) {
01779         $sizeofPipe = count($commands);
01780         $values = array();
01781 
01782         foreach ($commands as $command) {
01783             $connection->writeCommand($command);
01784         }
01785         try {
01786             for ($i = 0; $i < $sizeofPipe; $i++) {
01787                 $response = $connection->readResponse($commands[$i]);
01788                 $values[] = $response instanceof \Iterator
01789                     ? iterator_to_array($response)
01790                     : $response;
01791                 unset($commands[$i]);
01792             }
01793         }
01794         catch (ServerException $exception) {
01795             // force disconnection to prevent protocol desynchronization
01796             $connection->disconnect();
01797             throw $exception;
01798         }
01799 
01800         return $values;
01801     }
01802 }
01803 
01804 class SafeExecutor implements IPipelineExecutor {
01805     public function execute(IConnection $connection, &$commands) {
01806         $sizeofPipe = count($commands);
01807         $values = array();
01808 
01809         foreach ($commands as $command) {
01810             try {
01811                 $connection->writeCommand($command);
01812             }
01813             catch (CommunicationException $exception) {
01814                 return array_fill(0, $sizeofPipe, $exception);
01815             }
01816         }
01817 
01818         for ($i = 0; $i < $sizeofPipe; $i++) {
01819             $command = $commands[$i];
01820             unset($commands[$i]);
01821             try {
01822                 $response = $connection->readResponse($command);
01823                 $values[] = ($response instanceof \Iterator
01824                     ? iterator_to_array($response)
01825                     : $response
01826                 );
01827             }
01828             catch (ServerException $exception) {
01829                 $values[] = $exception->toResponseError();
01830             }
01831             catch (CommunicationException $exception) {
01832                 $toAdd  = count($commands) - count($values);
01833                 $values = array_merge($values, array_fill(0, $toAdd, $exception));
01834                 break;
01835             }
01836         }
01837 
01838         return $values;
01839     }
01840 }
01841 
01842 class SafeClusterExecutor implements IPipelineExecutor {
01843     public function execute(IConnection $connection, &$commands) {
01844         $connectionExceptions = array();
01845         $sizeofPipe = count($commands);
01846         $values = array();
01847 
01848         foreach ($commands as $command) {
01849             $cmdConnection = $connection->getConnection($command);
01850             if (isset($connectionExceptions[spl_object_hash($cmdConnection)])) {
01851                 continue;
01852             }
01853             try {
01854                 $cmdConnection->writeCommand($command);
01855             }
01856             catch (CommunicationException $exception) {
01857                 $connectionExceptions[spl_object_hash($cmdConnection)] = $exception;
01858             }
01859         }
01860 
01861         for ($i = 0; $i < $sizeofPipe; $i++) {
01862             $command = $commands[$i];
01863             unset($commands[$i]);
01864 
01865             $cmdConnection = $connection->getConnection($command);
01866             $connectionObjectHash = spl_object_hash($cmdConnection);
01867 
01868             if (isset($connectionExceptions[$connectionObjectHash])) {
01869                 $values[] = $connectionExceptions[$connectionObjectHash];
01870                 continue;
01871             }
01872 
01873             try {
01874                 $response = $cmdConnection->readResponse($command);
01875                 $values[] = ($response instanceof \Iterator
01876                     ? iterator_to_array($response)
01877                     : $response
01878                 );
01879             }
01880             catch (ServerException $exception) {
01881                 $values[] = $exception->toResponseError();
01882             }
01883             catch (CommunicationException $exception) {
01884                 $values[] = $exception;
01885                 $connectionExceptions[$connectionObjectHash] = $exception;
01886             }
01887         }
01888 
01889         return $values;
01890     }
01891 }
01892 
01893 /* ------------------------------------------------------------------------- */
01894 
01895 namespace Predis\Distribution;
01896 
01897 interface IDistributionStrategy {
01898     public function add($node, $weight = null);
01899     public function remove($node);
01900     public function get($key);
01901     public function generateKey($value);
01902 }
01903 
01904 class EmptyRingException extends \Exception { }
01905 
01906 class HashRing implements IDistributionStrategy {
01907     const DEFAULT_REPLICAS = 128;
01908     const DEFAULT_WEIGHT   = 100;
01909     private $_nodes, $_ring, $_ringKeys, $_ringKeysCount, $_replicas;
01910 
01911     public function __construct($replicas = self::DEFAULT_REPLICAS) {
01912         $this->_replicas = $replicas;
01913         $this->_nodes    = array();
01914     }
01915 
01916     public function add($node, $weight = null) {
01917         // NOTE: in case of collisions in the hashes of the nodes, the node added
01918         //       last wins, thus the order in which nodes are added is significant.
01919         $this->_nodes[] = array('object' => $node, 'weight' => (int) $weight ?: $this::DEFAULT_WEIGHT);
01920         $this->reset();
01921     }
01922 
01923     public function remove($node) {
01924         // NOTE: a node is removed by resetting the ring so that it's recreated from 
01925         //       scratch, in order to reassign possible hashes with collisions to the 
01926         //       right node according to the order in which they were added in the 
01927         //       first place.
01928         for ($i = 0; $i < count($this->_nodes); ++$i) {
01929             if ($this->_nodes[$i]['object'] === $node) {
01930                 array_splice($this->_nodes, $i, 1);
01931                 $this->reset();
01932                 break;
01933             }
01934         }
01935     }
01936 
01937     private function reset() {
01938         unset($this->_ring);
01939         unset($this->_ringKeys);
01940         unset($this->_ringKeysCount);
01941     }
01942 
01943     private function isInitialized() {
01944         return isset($this->_ringKeys);
01945     }
01946 
01947     private function computeTotalWeight() {
01948         // TODO: array_reduce + lambda for PHP 5.3
01949         $totalWeight = 0;
01950         foreach ($this->_nodes as $node) {
01951             $totalWeight += $node['weight'];
01952         }
01953         return $totalWeight;
01954     }
01955 
01956     private function initialize() {
01957         if ($this->isInitialized()) {
01958             return;
01959         }
01960         if (count($this->_nodes) === 0) {
01961             throw new EmptyRingException('Cannot initialize empty hashring');
01962         }
01963 
01964         $this->_ring = array();
01965         $totalWeight = $this->computeTotalWeight();
01966         $nodesCount  = count($this->_nodes);
01967         foreach ($this->_nodes as $node) {
01968             $weightRatio = $node['weight'] / $totalWeight;
01969             $this->addNodeToRing($this->_ring, $node, $nodesCount, $this->_replicas, $weightRatio);
01970         }
01971         ksort($this->_ring, SORT_NUMERIC);
01972         $this->_ringKeys = array_keys($this->_ring);
01973         $this->_ringKeysCount = count($this->_ringKeys);
01974     }
01975 
01976     protected function addNodeToRing(&$ring, $node, $totalNodes, $replicas, $weightRatio) {
01977         $nodeObject = $node['object'];
01978         $nodeHash = (string) $nodeObject;
01979         $replicas = (int) round($weightRatio * $totalNodes * $replicas);
01980         for ($i = 0; $i < $replicas; $i++) {
01981             $key = crc32("$nodeHash:$i");
01982             $ring[$key] = $nodeObject;
01983         }
01984     }
01985 
01986     public function generateKey($value) {
01987         return crc32($value);
01988     }
01989 
01990     public function get($key) {
01991         return $this->_ring[$this->getNodeKey($key)];
01992     }
01993 
01994     private function getNodeKey($key) {
01995         $this->initialize();
01996         $ringKeys = $this->_ringKeys;
01997         $upper = $this->_ringKeysCount - 1;
01998         $lower = 0;
01999 
02000         while ($lower <= $upper) {
02001             $index = ($lower + $upper) >> 1;
02002             $item  = $ringKeys[$index];
02003             if ($item > $key) {
02004                 $upper = $index - 1;
02005             }
02006             else if ($item < $key) {
02007                 $lower = $index + 1;
02008             }
02009             else {
02010                 return $item;
02011             }
02012         }
02013         return $ringKeys[$this->wrapAroundStrategy($upper, $lower, $this->_ringKeysCount)];
02014     }
02015 
02016     protected function wrapAroundStrategy($upper, $lower, $ringKeysCount) {
02017         // NOTE: binary search for the last item in _ringkeys with a value 
02018         //       less or equal to the key. If no such item exists, return the 
02019         //       last item.
02020         return $upper >= 0 ? $upper : $ringKeysCount - 1;
02021     }
02022 }
02023 
02024 class KetamaPureRing extends HashRing {
02025     const DEFAULT_REPLICAS = 160;
02026 
02027     public function __construct() {
02028         parent::__construct($this::DEFAULT_REPLICAS);
02029     }
02030 
02031     protected function addNodeToRing(&$ring, $node, $totalNodes, $replicas, $weightRatio) {
02032         $nodeObject = $node['object'];
02033         $nodeHash = (string) $nodeObject;
02034         $replicas = (int) floor($weightRatio * $totalNodes * ($replicas / 4));
02035         for ($i = 0; $i < $replicas; $i++) {
02036             $unpackedDigest = unpack('V4', md5("$nodeHash-$i", true));
02037             foreach ($unpackedDigest as $key) {
02038                 $ring[$key] = $nodeObject;
02039             }
02040         }
02041     }
02042 
02043     public function generateKey($value) {
02044         $hash = unpack('V', md5($value, true));
02045         return $hash[1];
02046     }
02047 
02048     protected function wrapAroundStrategy($upper, $lower, $ringKeysCount) {
02049         // NOTE: binary search for the first item in _ringkeys with a value 
02050         //       greater or equal to the key. If no such item exists, return the 
02051         //       first item.
02052         return $lower < $ringKeysCount ? $lower : 0;
02053     }
02054 }
02055 
02056 /* ------------------------------------------------------------------------- */
02057 
02058 namespace Predis\Shared;
02059 use Predis\IConnection, Predis\IConnectionSingle, Predis\IConnectionCluster, 
02060     Predis\CommunicationException;
02061 
02062 class Utils {
02063     public static function isCluster(IConnection $connection) {
02064         return $connection instanceof IConnectionCluster;
02065     }
02066 
02067     public static function onCommunicationException(CommunicationException $exception) {
02068         if ($exception->shouldResetConnection()) {
02069             $connection = $exception->getConnection();
02070             if ($connection->isConnected()) {
02071                 $connection->disconnect();
02072             }
02073         }
02074         throw $exception;
02075     }
02076 
02077     public static function filterArrayArguments(Array $arguments) {
02078         if (count($arguments) === 1 && is_array($arguments[0])) {
02079             return $arguments[0];
02080         }
02081         return $arguments;
02082     }
02083 }
02084 
02085 abstract class MultiBulkResponseIteratorBase implements \Iterator, \Countable {
02086     protected $_position, $_current, $_replySize;
02087 
02088     public function rewind() {
02089         // NOOP
02090     }
02091 
02092     public function current() {
02093         return $this->_current;
02094     }
02095 
02096     public function key() {
02097         return $this->_position;
02098     }
02099 
02100     public function next() {
02101         if (++$this->_position < $this->_replySize) {
02102             $this->_current = $this->getValue();
02103         }
02104         return $this->_position;
02105     }
02106 
02107     public function valid() {
02108         return $this->_position < $this->_replySize;
02109     }
02110 
02111     public function count() {
02112         // NOTE: use count if you want to get the size of the current multi-bulk 
02113         //       response without using iterator_count (which actually consumes 
02114         //       our iterator to calculate the size, and we cannot perform a rewind)
02115         return $this->_replySize;
02116     }
02117 
02118     protected abstract function getValue();
02119 }
02120 
02121 class MultiBulkResponseIterator extends MultiBulkResponseIteratorBase {
02122     private $_connection;
02123 
02124     public function __construct(IConnectionSingle $connection, $size) {
02125         $this->_connection = $connection;
02126         $this->_reader     = $connection->getResponseReader();
02127         $this->_position   = 0;
02128         $this->_current    = $size > 0 ? $this->getValue() : null;
02129         $this->_replySize  = $size;
02130     }
02131 
02132     public function __destruct() {
02133         // when the iterator is garbage-collected (e.g. it goes out of the
02134         // scope of a foreach) but it has not reached its end, we must sync
02135         // the client with the queued elements that have not been read from
02136         // the connection with the server.
02137         $this->sync();
02138     }
02139 
02140     public function sync($drop = false) {
02141         if ($drop == true) {
02142             if ($this->valid()) {
02143                 $this->_position = $this->_replySize;
02144                 $this->_connection->disconnect();
02145             }
02146         }
02147         else {
02148             while ($this->valid()) {
02149                 $this->next();
02150             }
02151         }
02152     }
02153 
02154     protected function getValue() {
02155         return $this->_reader->read($this->_connection);
02156     }
02157 }
02158 
02159 class MultiBulkResponseKVIterator extends MultiBulkResponseIteratorBase {
02160     private $_iterator;
02161 
02162     public function __construct(MultiBulkResponseIterator $iterator) {
02163         $virtualSize = count($iterator) / 2;
02164 
02165         $this->_iterator   = $iterator;
02166         $this->_position   = 0;
02167         $this->_current    = $virtualSize > 0 ? $this->getValue() : null;
02168         $this->_replySize  = $virtualSize;
02169     }
02170 
02171     public function __destruct() {
02172         $this->_iterator->sync();
02173     }
02174 
02175     protected function getValue() {
02176         $k = $this->_iterator->current();
02177         $this->_iterator->next();
02178         $v = $this->_iterator->current();
02179         $this->_iterator->next();
02180         return array($k, $v);
02181     }
02182 }
02183 
02184 /* ------------------------------------------------------------------------- */
02185 
02186 namespace Predis\Commands;
02187 use Predis\Command, Predis\Shared\Utils, Predis\Shared\MultiBulkResponseKVIterator;
02188 
02189 /* miscellaneous commands */
02190 class Ping extends Command {
02191     public function canBeHashed()  { return false; }
02192     public function getCommandId() { return 'PING'; }
02193     public function parseResponse($data) {
02194         return $data === 'PONG' ? true : false;
02195     }
02196 }
02197 
02198 class DoEcho extends Command {
02199     public function canBeHashed()  { return false; }
02200     public function getCommandId() { return 'ECHO'; }
02201 }
02202 
02203 class Auth extends Command {
02204     public function canBeHashed()  { return false; }
02205     public function getCommandId() { return 'AUTH'; }
02206 }
02207 
02208 /* connection handling */
02209 class Quit extends Command {
02210     public function canBeHashed()  { return false; }
02211     public function getCommandId() { return 'QUIT'; }
02212     public function closesConnection() { return true; }
02213 }
02214 
02215 /* commands operating on string values */
02216 class Set extends Command {
02217     public function getCommandId() { return 'SET'; }
02218 }
02219 
02220 class SetExpire extends Command {
02221     public function getCommandId() { return 'SETEX'; }
02222 }
02223 
02224 class SetPreserve extends Command {
02225     public function getCommandId() { return 'SETNX'; }
02226     public function parseResponse($data) { return (bool) $data; }
02227 }
02228 
02229 class SetMultiple extends Command {
02230     public function canBeHashed()  { return false; }
02231     public function getCommandId() { return 'MSET'; }
02232     public function filterArguments(Array $arguments) {
02233         if (count($arguments) === 1 && is_array($arguments[0])) {
02234             $flattenedKVs = array();
02235             $args = &$arguments[0];
02236             foreach ($args as $k => $v) {
02237                 $flattenedKVs[] = $k;
02238                 $flattenedKVs[] = $v;
02239             }
02240             return $flattenedKVs;
02241         }
02242         return $arguments;
02243     }
02244 }
02245 
02246 class SetMultiplePreserve extends SetMultiple {
02247     public function canBeHashed()  { return false; }
02248     public function getCommandId() { return 'MSETNX'; }
02249     public function parseResponse($data) { return (bool) $data; }
02250 }
02251 
02252 class Get extends Command {
02253     public function getCommandId() { return 'GET'; }
02254 }
02255 
02256 class GetMultiple extends Command {
02257     public function canBeHashed()  { return false; }
02258     public function getCommandId() { return 'MGET'; }
02259     public function filterArguments(Array $arguments) {
02260         return Utils::filterArrayArguments($arguments);
02261     }
02262 }
02263 
02264 class GetSet extends Command {
02265     public function getCommandId() { return 'GETSET'; }
02266 }
02267 
02268 class Increment extends Command {
02269     public function getCommandId() { return 'INCR'; }
02270 }
02271 
02272 class IncrementBy extends Command {
02273     public function getCommandId() { return 'INCRBY'; }
02274 }
02275 
02276 class Decrement extends Command {
02277     public function getCommandId() { return 'DECR'; }
02278 }
02279 
02280 class DecrementBy extends Command {
02281     public function getCommandId() { return 'DECRBY'; }
02282 }
02283 
02284 class Exists extends Command {
02285     public function getCommandId() { return 'EXISTS'; }
02286     public function parseResponse($data) { return (bool) $data; }
02287 }
02288 
02289 class Delete extends Command {
02290     public function getCommandId() { return 'DEL'; }
02291     public function filterArguments(Array $arguments) {
02292         return Utils::filterArrayArguments($arguments);
02293     }
02294 }
02295 
02296 class Type extends Command {
02297     public function getCommandId() { return 'TYPE'; }
02298 }
02299 
02300 class Append extends Command {
02301     public function getCommandId() { return 'APPEND'; }
02302 }
02303 
02304 class Substr extends Command {
02305     public function getCommandId() { return 'SUBSTR'; }
02306 }
02307 
02308 class Strlen extends Command {
02309     public function getCommandId() { return 'STRLEN'; }
02310 }
02311 
02312 /* commands operating on the key space */
02313 class Keys extends Command {
02314     public function canBeHashed()  { return false; }
02315     public function getCommandId() { return 'KEYS'; }
02316     public function parseResponse($data) { 
02317         // TODO: is this behaviour correct?
02318         if (is_array($data) || $data instanceof \Iterator) {
02319             return $data;
02320         }
02321         return strlen($data) > 0 ? explode(' ', $data) : array();
02322     }
02323 }
02324 
02325 class RandomKey extends Command {
02326     public function canBeHashed()  { return false; }
02327     public function getCommandId() { return 'RANDOMKEY'; }
02328     public function parseResponse($data) { return $data !== '' ? $data : null; }
02329 }
02330 
02331 class Rename extends Command {
02332     public function canBeHashed()  { return false; }
02333     public function getCommandId() { return 'RENAME'; }
02334 }
02335 
02336 class RenamePreserve extends Command {
02337     public function canBeHashed()  { return false; }
02338     public function getCommandId() { return 'RENAMENX'; }
02339     public function parseResponse($data) { return (bool) $data; }
02340 }
02341 
02342 class Expire extends Command {
02343     public function getCommandId() { return 'EXPIRE'; }
02344     public function parseResponse($data) { return (bool) $data; }
02345 }
02346 
02347 class ExpireAt extends Command {
02348     public function getCommandId() { return 'EXPIREAT'; }
02349     public function parseResponse($data) { return (bool) $data; }
02350 }
02351 
02352 class DatabaseSize extends Command {
02353     public function canBeHashed()  { return false; }
02354     public function getCommandId() { return 'DBSIZE'; }
02355 }
02356 
02357 class TimeToLive extends Command {
02358     public function getCommandId() { return 'TTL'; }
02359 }
02360 
02361 /* commands operating on lists */
02362 class ListPushTail extends Command {
02363     public function getCommandId() { return 'RPUSH'; }
02364 }
02365 
02366 class ListPushTailX extends Command {
02367     public function getCommandId() { return 'RPUSHX'; }
02368 }
02369 
02370 class ListPushHead extends Command {
02371     public function getCommandId() { return 'LPUSH'; }
02372 }
02373 
02374 class ListPushHeadX extends Command {
02375     public function getCommandId() { return 'LPUSHX'; }
02376 }
02377 
02378 class ListLength extends Command {
02379     public function getCommandId() { return 'LLEN'; }
02380 }
02381 
02382 class ListRange extends Command {
02383     public function getCommandId() { return 'LRANGE'; }
02384 }
02385 
02386 class ListTrim extends Command {
02387     public function getCommandId() { return 'LTRIM'; }
02388 }
02389 
02390 class ListIndex extends Command {
02391     public function getCommandId() { return 'LINDEX'; }
02392 }
02393 
02394 class ListSet extends Command {
02395     public function getCommandId() { return 'LSET'; }
02396 }
02397 
02398 class ListRemove extends Command {
02399     public function getCommandId() { return 'LREM'; }
02400 }
02401 
02402 class ListPopLastPushHead extends Command {
02403     public function getCommandId() { return 'RPOPLPUSH'; }
02404 }
02405 
02406 class ListPopLastPushHeadBulk extends Command {
02407     public function getCommandId() { return 'RPOPLPUSH'; }
02408 }
02409 
02410 class ListPopFirst extends Command {
02411     public function getCommandId() { return 'LPOP'; }
02412 }
02413 
02414 class ListPopLast extends Command {
02415     public function getCommandId() { return 'RPOP'; }
02416 }
02417 
02418 class ListPopFirstBlocking extends Command {
02419     public function getCommandId() { return 'BLPOP'; }
02420 }
02421 
02422 class ListPopLastBlocking extends Command {
02423     public function getCommandId() { return 'BRPOP'; }
02424 }
02425 
02426 class ListInsert extends Command {
02427     public function getCommandId() { return 'LINSERT'; }
02428 }
02429 
02430 /* commands operating on sets */
02431 class SetAdd extends Command {
02432     public function getCommandId() { return 'SADD'; }
02433     public function parseResponse($data) { return (bool) $data; }
02434 }
02435 
02436 class SetRemove extends Command {
02437     public function getCommandId() { return 'SREM'; }
02438     public function parseResponse($data) { return (bool) $data; }
02439 }
02440 
02441 class SetPop  extends Command {
02442     public function getCommandId() { return 'SPOP'; }
02443 }
02444 
02445 class SetMove extends Command {
02446     public function canBeHashed()  { return false; }
02447     public function getCommandId() { return 'SMOVE'; }
02448     public function parseResponse($data) { return (bool) $data; }
02449 }
02450 
02451 class SetCardinality extends Command {
02452     public function getCommandId() { return 'SCARD'; }
02453 }
02454 
02455 class SetIsMember extends Command {
02456     public function getCommandId() { return 'SISMEMBER'; }
02457     public function parseResponse($data) { return (bool) $data; }
02458 }
02459 
02460 class SetIntersection extends Command {
02461     public function getCommandId() { return 'SINTER'; }
02462     public function filterArguments(Array $arguments) {
02463         return Utils::filterArrayArguments($arguments);
02464     }
02465 }
02466 
02467 class SetIntersectionStore extends Command {
02468     public function getCommandId() { return 'SINTERSTORE'; }
02469     public function filterArguments(Array $arguments) {
02470         return Utils::filterArrayArguments($arguments);
02471     }
02472 }
02473 
02474 class SetUnion extends SetIntersection {
02475     public function getCommandId() { return 'SUNION'; }
02476 }
02477 
02478 class SetUnionStore extends SetIntersectionStore {
02479     public function getCommandId() { return 'SUNIONSTORE'; }
02480 }
02481 
02482 class SetDifference extends SetIntersection {
02483     public function getCommandId() { return 'SDIFF'; }
02484 }
02485 
02486 class SetDifferenceStore extends SetIntersectionStore {
02487     public function getCommandId() { return 'SDIFFSTORE'; }
02488 }
02489 
02490 class SetMembers extends Command {
02491     public function getCommandId() { return 'SMEMBERS'; }
02492 }
02493 
02494 class SetRandomMember extends Command {
02495     public function getCommandId() { return 'SRANDMEMBER'; }
02496 }
02497 
02498 /* commands operating on sorted sets */
02499 class ZSetAdd extends Command {
02500     public function getCommandId() { return 'ZADD'; }
02501     public function parseResponse($data) { return (bool) $data; }
02502 }
02503 
02504 class ZSetIncrementBy extends Command {
02505     public function getCommandId() { return 'ZINCRBY'; }
02506 }
02507 
02508 class ZSetRemove extends Command {
02509     public function getCommandId() { return 'ZREM'; }
02510     public function parseResponse($data) { return (bool) $data; }
02511 }
02512 
02513 class ZSetUnionStore extends Command {
02514     public function getCommandId() { return 'ZUNIONSTORE'; }
02515     public function filterArguments(Array $arguments) {
02516         $options = array();
02517         $argc    = count($arguments);
02518         if ($argc > 1 && is_array($arguments[$argc - 1])) {
02519             $options = $this->prepareOptions(array_pop($arguments));
02520         }
02521         $args = is_array($arguments[0]) ? $arguments[0] : $arguments;
02522         return  array_merge($args, $options);
02523     }
02524     private function prepareOptions($options) {
02525         $opts = array_change_key_case($options, CASE_UPPER);
02526         $finalizedOpts = array();
02527         if (isset($opts['WEIGHTS']) && is_array($opts['WEIGHTS'])) {
02528             $finalizedOpts[] = 'WEIGHTS';
02529             foreach ($opts['WEIGHTS'] as $weight) {
02530                 $finalizedOpts[] = $weight;
02531             }
02532         }
02533         if (isset($opts['AGGREGATE'])) {
02534             $finalizedOpts[] = 'AGGREGATE';
02535             $finalizedOpts[] = $opts['AGGREGATE'];
02536         }
02537         return $finalizedOpts;
02538     }
02539 }
02540 
02541 class ZSetIntersectionStore extends ZSetUnionStore {
02542     public function getCommandId() { return 'ZINTERSTORE'; }
02543 }
02544 
02545 class ZSetRange extends Command {
02546     private $_withScores = false;
02547     public function getCommandId() { return 'ZRANGE'; }
02548     public function filterArguments(Array $arguments) {
02549         if (count($arguments) === 4) {
02550             $lastType = gettype($arguments[3]);
02551             if ($lastType === 'string' && strtolower($arguments[3]) === 'withscores') {
02552                 // used for compatibility with older versions
02553                 $arguments[3] = array('WITHSCORES' => true);
02554                 $lastType = 'array';
02555             }
02556             if ($lastType === 'array') {
02557                 $options = $this->prepareOptions(array_pop($arguments));
02558                 return array_merge($arguments, $options);
02559             }
02560         }
02561         return $arguments;
02562     }
02563     protected function prepareOptions($options) {
02564         $opts = array_change_key_case($options, CASE_UPPER);
02565         $finalizedOpts = array();
02566         if (isset($opts['WITHSCORES'])) {
02567             $finalizedOpts[] = 'WITHSCORES';
02568             $this->_withScores = true;
02569         }
02570         return $finalizedOpts;
02571     }
02572     public function parseResponse($data) {
02573         if ($this->_withScores) {
02574             if ($data instanceof \Iterator) {
02575                 return new MultiBulkResponseKVIterator($data);
02576             }
02577             $result = array();
02578             for ($i = 0; $i < count($data); $i++) {
02579                 $result[] = array($data[$i], $data[++$i]);
02580             }
02581             return $result;
02582         }
02583         return $data;
02584     }
02585 }
02586 
02587 class ZSetReverseRange extends ZSetRange {
02588     public function getCommandId() { return 'ZREVRANGE'; }
02589 }
02590 
02591 class ZSetRangeByScore extends ZSetRange {
02592     public function getCommandId() { return 'ZRANGEBYSCORE'; }
02593     protected function prepareOptions($options) {
02594         $opts = array_change_key_case($options, CASE_UPPER);
02595         $finalizedOpts = array();
02596         if (isset($opts['LIMIT']) && is_array($opts['LIMIT'])) {
02597             $limit = array_change_key_case($opts['LIMIT'], CASE_UPPER);
02598             $finalizedOpts[] = 'LIMIT';
02599             $finalizedOpts[] = isset($limit['OFFSET']) ? $limit['OFFSET'] : $limit[0];
02600             $finalizedOpts[] = isset($limit['COUNT']) ? $limit['COUNT'] : $limit[1];
02601         }
02602         return array_merge($finalizedOpts, parent::prepareOptions($options));
02603     }
02604 }
02605 
02606 class ZSetCount extends Command {
02607     public function getCommandId() { return 'ZCOUNT'; }
02608 }
02609 
02610 class ZSetCardinality extends Command {
02611     public function getCommandId() { return 'ZCARD'; }
02612 }
02613 
02614 class ZSetScore extends Command {
02615     public function getCommandId() { return 'ZSCORE'; }
02616 }
02617 
02618 class ZSetRemoveRangeByScore extends Command {
02619     public function getCommandId() { return 'ZREMRANGEBYSCORE'; }
02620 }
02621 
02622 class ZSetRank extends Command {
02623     public function getCommandId() { return 'ZRANK'; }
02624 }
02625 
02626 class ZSetReverseRank extends Command {
02627     public function getCommandId() { return 'ZREVRANK'; }
02628 }
02629 
02630 class ZSetRemoveRangeByRank extends Command {
02631     public function getCommandId() { return 'ZREMRANGEBYRANK'; }
02632 }
02633 
02634 /* commands operating on hashes */
02635 class HashSet extends Command {
02636     public function getCommandId() { return 'HSET'; }
02637     public function parseResponse($data) { return (bool) $data; }
02638 }
02639 
02640 class HashSetPreserve extends Command {
02641     public function getCommandId() { return 'HSETNX'; }
02642     public function parseResponse($data) { return (bool) $data; }
02643 }
02644 
02645 class HashSetMultiple extends Command {
02646     public function getCommandId() { return 'HMSET'; }
02647     public function filterArguments(Array $arguments) {
02648         if (count($arguments) === 2 && is_array($arguments[1])) {
02649             $flattenedKVs = array($arguments[0]);
02650             $args = &$arguments[1];
02651             foreach ($args as $k => $v) {
02652                 $flattenedKVs[] = $k;
02653                 $flattenedKVs[] = $v;
02654             }
02655             return $flattenedKVs;
02656         }
02657         return $arguments;
02658     }
02659 }
02660 
02661 class HashIncrementBy extends Command {
02662     public function getCommandId() { return 'HINCRBY'; }
02663 }
02664 
02665 class HashGet extends Command {
02666     public function getCommandId() { return 'HGET'; }
02667 }
02668 
02669 class HashGetMultiple extends Command {
02670     public function getCommandId() { return 'HMGET'; }
02671     public function filterArguments(Array $arguments) {
02672         if (count($arguments) === 2 && is_array($arguments[1])) {
02673             $flattenedKVs = array($arguments[0]);
02674             $args = &$arguments[1];
02675             foreach ($args as $v) {
02676                 $flattenedKVs[] = $v;
02677             }
02678             return $flattenedKVs;
02679         }
02680         return $arguments;
02681     }
02682 }
02683 
02684 class HashDelete extends Command {
02685     public function getCommandId() { return 'HDEL'; }
02686     public function parseResponse($data) { return (bool) $data; }
02687 }
02688 
02689 class HashExists extends Command {
02690     public function getCommandId() { return 'HEXISTS'; }
02691     public function parseResponse($data) { return (bool) $data; }
02692 }
02693 
02694 class HashLength extends Command {
02695     public function getCommandId() { return 'HLEN'; }
02696 }
02697 
02698 class HashKeys extends Command {
02699     public function getCommandId() { return 'HKEYS'; }
02700 }
02701 
02702 class HashValues extends Command {
02703     public function getCommandId() { return 'HVALS'; }
02704 }
02705 
02706 class HashGetAll extends Command {
02707     public function getCommandId() { return 'HGETALL'; }
02708     public function parseResponse($data) {
02709         if ($data instanceof \Iterator) {
02710             return new MultiBulkResponseKVIterator($data);
02711         }
02712         $result = array();
02713         for ($i = 0; $i < count($data); $i++) {
02714             $result[$data[$i]] = $data[++$i];
02715         }
02716         return $result;
02717     }
02718 }
02719 
02720 /* multiple databases handling commands */
02721 class SelectDatabase extends Command {
02722     public function canBeHashed()  { return false; }
02723     public function getCommandId() { return 'SELECT'; }
02724 }
02725 
02726 class MoveKey extends Command {
02727     public function canBeHashed()  { return false; }
02728     public function getCommandId() { return 'MOVE'; }
02729     public function parseResponse($data) { return (bool) $data; }
02730 }
02731 
02732 class FlushDatabase extends Command {
02733     public function canBeHashed()  { return false; }
02734     public function getCommandId() { return 'FLUSHDB'; }
02735 }
02736 
02737 class FlushAll extends Command {
02738     public function canBeHashed()  { return false; }
02739     public function getCommandId() { return 'FLUSHALL'; }
02740 }
02741 
02742 /* sorting */
02743 class Sort extends Command {
02744     public function getCommandId() { return 'SORT'; }
02745     public function filterArguments(Array $arguments) {
02746         if (count($arguments) === 1) {
02747             return $arguments;
02748         }
02749 
02750         $query = array($arguments[0]);
02751         $sortParams = array_change_key_case($arguments[1], CASE_UPPER);
02752 
02753         if (isset($sortParams['BY'])) {
02754             $query[] = 'BY';
02755             $query[] = $sortParams['BY'];
02756         }
02757         if (isset($sortParams['GET'])) {
02758             $getargs = $sortParams['GET'];
02759             if (is_array($getargs)) {
02760                 foreach ($getargs as $getarg) {
02761                     $query[] = 'GET';
02762                     $query[] = $getarg;
02763                 }
02764             }
02765             else {
02766                 $query[] = 'GET';
02767                 $query[] = $getargs;
02768             }
02769         }
02770         if (isset($sortParams['LIMIT']) && is_array($sortParams['LIMIT']) 
02771             && count($sortParams['LIMIT']) == 2) {
02772 
02773             $query[] = 'LIMIT';
02774             $query[] = $sortParams['LIMIT'][0];
02775             $query[] = $sortParams['LIMIT'][1];
02776         }
02777         if (isset($sortParams['SORT'])) {
02778             $query[] = strtoupper($sortParams['SORT']);
02779         }
02780         if (isset($sortParams['ALPHA']) && $sortParams['ALPHA'] == true) {
02781             $query[] = 'ALPHA';
02782         }
02783         if (isset($sortParams['STORE'])) {
02784             $query[] = 'STORE';
02785             $query[] = $sortParams['STORE'];
02786         }
02787 
02788         return $query;
02789     }
02790 }
02791 
02792 /* transactions */
02793 class Multi extends Command {
02794     public function canBeHashed()  { return false; }
02795     public function getCommandId() { return 'MULTI'; }
02796 }
02797 
02798 class Exec extends Command {
02799     public function canBeHashed()  { return false; }
02800     public function getCommandId() { return 'EXEC'; }
02801 }
02802 
02803 class Discard extends Command {
02804     public function canBeHashed()  { return false; }
02805     public function getCommandId() { return 'DISCARD'; }
02806 }
02807 
02808 class Watch extends Command {
02809     public function canBeHashed()  { return false; }
02810     public function getCommandId() { return 'WATCH'; }
02811     public function filterArguments(Array $arguments) {
02812         if (isset($arguments[0]) && is_array($arguments[0])) {
02813             return $arguments[0];
02814         }
02815         return $arguments;
02816     }
02817     public function parseResponse($data) { return (bool) $data; }
02818 }
02819 
02820 class Unwatch extends Command {
02821     public function canBeHashed()  { return false; }
02822     public function getCommandId() { return 'UNWATCH'; }
02823     public function parseResponse($data) { return (bool) $data; }
02824 }
02825 
02826 /* publish/subscribe */
02827 class Subscribe extends Command {
02828     public function canBeHashed()  { return false; }
02829     public function getCommandId() { return 'SUBSCRIBE'; }
02830 }
02831 
02832 class Unsubscribe extends Command {
02833     public function canBeHashed()  { return false; }
02834     public function getCommandId() { return 'UNSUBSCRIBE'; }
02835 }
02836 
02837 class SubscribeByPattern extends Command {
02838     public function canBeHashed()  { return false; }
02839     public function getCommandId() { return 'PSUBSCRIBE'; }
02840 }
02841 
02842 class UnsubscribeByPattern extends Command {
02843     public function canBeHashed()  { return false; }
02844     public function getCommandId() { return 'PUNSUBSCRIBE'; }
02845 }
02846 
02847 class Publish extends Command {
02848     public function canBeHashed()  { return false; }
02849     public function getCommandId() { return 'PUBLISH'; }
02850 }
02851 
02852 /* persistence control commands */
02853 class Save extends Command {
02854     public function canBeHashed()  { return false; }
02855     public function getCommandId() { return 'SAVE'; }
02856 }
02857 
02858 class BackgroundSave extends Command {
02859     public function canBeHashed()  { return false; }
02860     public function getCommandId() { return 'BGSAVE'; }
02861     public function parseResponse($data) {
02862         if ($data == 'Background saving started') {
02863             return true;
02864         }
02865         return $data;
02866     }
02867 }
02868 
02869 class BackgroundRewriteAppendOnlyFile extends Command {
02870     public function canBeHashed()  { return false; }
02871     public function getCommandId() { return 'BGREWRITEAOF'; }
02872     public function parseResponse($data) {
02873         return $data == 'Background append only file rewriting started';
02874     }
02875 }
02876 
02877 class LastSave extends Command {
02878     public function canBeHashed()  { return false; }
02879     public function getCommandId() { return 'LASTSAVE'; }
02880 }
02881 
02882 class Shutdown extends Command {
02883     public function canBeHashed()  { return false; }
02884     public function getCommandId() { return 'SHUTDOWN'; }
02885     public function closesConnection() { return true; }
02886 }
02887 
02888 /* remote server control commands */
02889 class Info extends Command {
02890     public function canBeHashed()  { return false; }
02891     public function getCommandId() { return 'INFO'; }
02892     public function parseResponse($data) {
02893         $info      = array();
02894         $infoLines = explode("\r\n", $data, -1);
02895         foreach ($infoLines as $row) {
02896             list($k, $v) = explode(':', $row);
02897             if (!preg_match('/^db\d+$/', $k)) {
02898                 $info[$k] = $v;
02899             }
02900             else {
02901                 $db = array();
02902                 foreach (explode(',', $v) as $dbvar) {
02903                     list($dbvk, $dbvv) = explode('=', $dbvar);
02904                     $db[trim($dbvk)] = $dbvv;
02905                 }
02906                 $info[$k] = $db;
02907             }
02908         }
02909         return $info;
02910     }
02911 }
02912 
02913 class SlaveOf extends Command {
02914     public function canBeHashed()  { return false; }
02915     public function getCommandId() { return 'SLAVEOF'; }
02916     public function filterArguments(Array $arguments) {
02917         if (count($arguments) === 0 || $arguments[0] === 'NO ONE') {
02918             return array('NO', 'ONE');
02919         }
02920         return $arguments;
02921     }
02922 }
02923 
02924 class Config extends Command {
02925     public function canBeHashed()  { return false; }
02926     public function getCommandId() { return 'CONFIG'; }
02927 }
02928 ?>