Campustream 1.0
A social network MQP for WPI
|
00001 <?php 00002 namespace Predis; 00003 use Predis\Shared\Utils, Predis\Distribution\IDistributionStrategy; 00004 00005 class PredisException extends \Exception { } 00006 class ClientException extends PredisException { } // Client-side errors 00007 class AbortedMultiExec extends PredisException { } // Aborted multi/exec 00008 00009 class ServerException extends PredisException { // Server-side errors 00010 public function toResponseError() { 00011 return new ResponseError($this->getMessage()); 00012 } 00013 } 00014 00015 class CommunicationException extends PredisException { // Communication errors 00016 private $_connection; 00017 00018 public function __construct(IConnectionSingle $connection, 00019 $message = null, $code = null) { 00020 00021 $this->_connection = $connection; 00022 parent::__construct($message, $code); 00023 } 00024 00025 public function getConnection() { return $this->_connection; } 00026 public function shouldResetConnection() { return true; } 00027 } 00028 00029 class MalformedServerResponse extends CommunicationException { } // Unexpected responses 00030 00031 /* ------------------------------------------------------------------------- */ 00032 00033 class Client { 00034 private $_options, $_connection, $_serverProfile, $_responseReader; 00035 00036 public function __construct($parameters = null, $clientOptions = null) { 00037 $this->setupClient($clientOptions ?: new ClientOptions()); 00038 $this->setupConnection($parameters); 00039 } 00040 00041 private static function filterClientOptions($options) { 00042 if ($options instanceof ClientOptions) { 00043 return $options; 00044 } 00045 if (is_array($options)) { 00046 return new ClientOptions($options); 00047 } 00048 if ($options instanceof RedisServerProfile) { 00049 return new ClientOptions(array( 00050 'profile' => $options 00051 )); 00052 } 00053 if (is_string($options)) { 00054 return new ClientOptions(array( 00055 'profile' => RedisServerProfile::get($options) 00056 )); 00057 } 00058 throw new \InvalidArgumentException("Invalid type for client options"); 00059 } 00060 00061 private function setupClient($options) { 00062 $this->_responseReader = new ResponseReader(); 00063 $this->_options = self::filterClientOptions($options); 00064 $this->setProfile($this->_options->profile); 00065 if ($this->_options->iterable_multibulk === true) { 00066 $this->_responseReader->setHandler( 00067 Protocol::PREFIX_MULTI_BULK, 00068 new ResponseMultiBulkStreamHandler() 00069 ); 00070 } 00071 if ($this->_options->throw_on_error === false) { 00072 $this->_responseReader->setHandler( 00073 Protocol::PREFIX_ERROR, 00074 new ResponseErrorSilentHandler() 00075 ); 00076 } 00077 } 00078 00079 private function setupConnection($parameters) { 00080 if ($parameters === null) { 00081 return $this->setConnection($this->createConnection(null)); 00082 } 00083 if (!(is_array($parameters) || is_string($parameters) || $parameters instanceof IConnection 00084 || $parameters instanceof ConnectionParameters)) { 00085 throw new \InvalidArgumentException( 00086 'Array, String, Predis\ConnectionParameters or Predis\IConnection expected' 00087 ); 00088 } 00089 if (is_array($parameters) && isset($parameters[0])) { 00090 $cluster = new ConnectionCluster($this->_options->key_distribution); 00091 foreach ($parameters as $shardParams) { 00092 $cluster->add($this->createConnection($shardParams)); 00093 } 00094 $this->setConnection($cluster); 00095 } 00096 else { 00097 $this->setConnection($this->createConnection($parameters)); 00098 } 00099 } 00100 00101 private function createConnection($parameters) { 00102 $params = null; 00103 $connection = null; 00104 if ($parameters instanceof IConnectionSingle) { 00105 $connection = $parameters; 00106 $params = $connection->getParameters(); 00107 } 00108 else { 00109 $params = $parameters instanceof ConnectionParameters 00110 ? $parameters 00111 : new ConnectionParameters($parameters); 00112 $connection = ConnectionFactory::create($params, $this->_responseReader); 00113 } 00114 return $this->pushInitCommands($connection, $params); 00115 } 00116 00117 private function pushInitCommands(IConnectionSingle $connection, ConnectionParameters $params) { 00118 if (isset($params->password)) { 00119 $connection->pushInitCommand($this->createCommand( 00120 'auth', array($params->password) 00121 )); 00122 } 00123 if (isset($params->database)) { 00124 $connection->pushInitCommand($this->createCommand( 00125 'select', array($params->database) 00126 )); 00127 } 00128 return $connection; 00129 } 00130 00131 private function setConnection(IConnection $connection) { 00132 $this->_connection = $connection; 00133 } 00134 00135 public function setProfile($serverProfile) { 00136 if (!($serverProfile instanceof RedisServerProfile || is_string($serverProfile))) { 00137 throw new \InvalidArgumentException( 00138 "Invalid type for server profile, \Predis\RedisServerProfile or string expected" 00139 ); 00140 } 00141 $this->_serverProfile = (is_string($serverProfile) 00142 ? RedisServerProfile::get($serverProfile) 00143 : $serverProfile 00144 ); 00145 } 00146 00147 public function getProfile() { 00148 return $this->_serverProfile; 00149 } 00150 00151 public function getResponseReader() { 00152 return $this->_responseReader; 00153 } 00154 00155 public function getClientFor($connectionAlias) { 00156 if (!Utils::isCluster($this->_connection)) { 00157 throw new ClientException( 00158 'This method is supported only when the client is connected to a cluster of connections' 00159 ); 00160 } 00161 00162 $connection = $this->_connection->getConnectionById($connectionAlias); 00163 if ($connection === null) { 00164 throw new \InvalidArgumentException( 00165 "Invalid connection alias: '$connectionAlias'" 00166 ); 00167 } 00168 00169 $newClient = new Client(); 00170 $newClient->setupClient($this->_options); 00171 $newClient->setConnection($connection); 00172 return $newClient; 00173 } 00174 00175 public function connect() { 00176 $this->_connection->connect(); 00177 } 00178 00179 public function disconnect() { 00180 $this->_connection->disconnect(); 00181 } 00182 00183 public function isConnected() { 00184 return $this->_connection->isConnected(); 00185 } 00186 00187 public function getConnection($id = null) { 00188 if (!isset($id)) { 00189 return $this->_connection; 00190 } 00191 else { 00192 return Utils::isCluster($this->_connection) 00193 ? $this->_connection->getConnectionById($id) 00194 : $this->_connection; 00195 } 00196 } 00197 00198 public function __call($method, $arguments) { 00199 $command = $this->_serverProfile->createCommand($method, $arguments); 00200 return $this->_connection->executeCommand($command); 00201 } 00202 00203 public function createCommand($method, $arguments = array()) { 00204 return $this->_serverProfile->createCommand($method, $arguments); 00205 } 00206 00207 public function executeCommand(ICommand $command) { 00208 return $this->_connection->executeCommand($command); 00209 } 00210 00211 public function executeCommandOnShards(ICommand $command) { 00212 $replies = array(); 00213 if (Utils::isCluster($this->_connection)) { 00214 foreach($this->_connection as $connection) { 00215 $replies[] = $connection->executeCommand($command); 00216 } 00217 } 00218 else { 00219 $replies[] = $this->_connection->executeCommand($command); 00220 } 00221 return $replies; 00222 } 00223 00224 private function sharedInitializer($argv, $initializer) { 00225 $argc = count($argv); 00226 if ($argc === 0) { 00227 return $this->$initializer(); 00228 } 00229 else if ($argc === 1) { 00230 list($arg0) = $argv; 00231 return is_array($arg0) ? $this->$initializer($arg0) : $this->$initializer(null, $arg0); 00232 } 00233 else if ($argc === 2) { 00234 list($arg0, $arg1) = $argv; 00235 return $this->$initializer($arg0, $arg1); 00236 } 00237 return $this->$initializer($this, $arguments); 00238 } 00239 00240 public function pipeline(/* arguments */) { 00241 return $this->sharedInitializer(func_get_args(), 'initPipeline'); 00242 } 00243 00244 private function initPipeline(Array $options = null, $pipelineBlock = null) { 00245 $pipeline = null; 00246 if (isset($options)) { 00247 if (isset($options['safe']) && $options['safe'] == true) { 00248 $connection = $this->getConnection(); 00249 $pipeline = new CommandPipeline($this, $connection instanceof Connection 00250 ? new Pipeline\SafeExecutor($connection) 00251 : new Pipeline\SafeClusterExecutor($connection) 00252 ); 00253 } 00254 else { 00255 $pipeline = new CommandPipeline($this); 00256 } 00257 } 00258 else { 00259 $pipeline = new CommandPipeline($this); 00260 } 00261 return $this->pipelineExecute($pipeline, $pipelineBlock); 00262 } 00263 00264 private function pipelineExecute(CommandPipeline $pipeline, $block) { 00265 return $block !== null ? $pipeline->execute($block) : $pipeline; 00266 } 00267 00268 public function multiExec(/* arguments */) { 00269 return $this->sharedInitializer(func_get_args(), 'initMultiExec'); 00270 } 00271 00272 private function initMultiExec(Array $options = null, $transBlock = null) { 00273 $multi = isset($options) ? new MultiExecBlock($this, $options) : new MultiExecBlock($this); 00274 return $transBlock !== null ? $multi->execute($transBlock) : $multi; 00275 } 00276 00277 public function pubSubContext() { 00278 return new PubSubContext($this); 00279 } 00280 } 00281 00282 /* ------------------------------------------------------------------------- */ 00283 00284 interface IClientOptionsHandler { 00285 public function validate($option, $value); 00286 public function getDefault(); 00287 } 00288 00289 class ClientOptionsProfile implements IClientOptionsHandler { 00290 public function validate($option, $value) { 00291 if ($value instanceof RedisServerProfile) { 00292 return $value; 00293 } 00294 if (is_string($value)) { 00295 return RedisServerProfile::get($value); 00296 } 00297 throw new \InvalidArgumentException("Invalid value for option $option"); 00298 } 00299 00300 public function getDefault() { 00301 return RedisServerProfile::getDefault(); 00302 } 00303 } 00304 00305 class ClientOptionsKeyDistribution implements IClientOptionsHandler { 00306 public function validate($option, $value) { 00307 if ($value instanceof IDistributionStrategy) { 00308 return $value; 00309 } 00310 if (is_string($value)) { 00311 $valueReflection = new \ReflectionClass($value); 00312 if ($valueReflection->isSubclassOf('\Predis\Distribution\IDistributionStrategy')) { 00313 return new $value; 00314 } 00315 } 00316 throw new \InvalidArgumentException("Invalid value for option $option"); 00317 } 00318 00319 public function getDefault() { 00320 return new Distribution\HashRing(); 00321 } 00322 } 00323 00324 class ClientOptionsIterableMultiBulk implements IClientOptionsHandler { 00325 public function validate($option, $value) { 00326 return (bool) $value; 00327 } 00328 00329 public function getDefault() { 00330 return false; 00331 } 00332 } 00333 00334 class ClientOptionsThrowOnError implements IClientOptionsHandler { 00335 public function validate($option, $value) { 00336 return (bool) $value; 00337 } 00338 00339 public function getDefault() { 00340 return true; 00341 } 00342 } 00343 00344 class ClientOptions { 00345 private static $_optionsHandlers; 00346 private $_options; 00347 00348 public function __construct($options = null) { 00349 self::initializeOptionsHandlers(); 00350 $this->initializeOptions($options ?: array()); 00351 } 00352 00353 private static function initializeOptionsHandlers() { 00354 if (!isset(self::$_optionsHandlers)) { 00355 self::$_optionsHandlers = self::getOptionsHandlers(); 00356 } 00357 } 00358 00359 private static function getOptionsHandlers() { 00360 return array( 00361 'profile' => new ClientOptionsProfile(), 00362 'key_distribution' => new ClientOptionsKeyDistribution(), 00363 'iterable_multibulk' => new ClientOptionsIterableMultiBulk(), 00364 'throw_on_error' => new ClientOptionsThrowOnError(), 00365 ); 00366 } 00367 00368 private function initializeOptions($options) { 00369 foreach ($options as $option => $value) { 00370 if (isset(self::$_optionsHandlers[$option])) { 00371 $handler = self::$_optionsHandlers[$option]; 00372 $this->_options[$option] = $handler->validate($option, $value); 00373 } 00374 } 00375 } 00376 00377 public function __get($option) { 00378 if (!isset($this->_options[$option])) { 00379 $defaultValue = self::$_optionsHandlers[$option]->getDefault(); 00380 $this->_options[$option] = $defaultValue; 00381 } 00382 return $this->_options[$option]; 00383 } 00384 00385 public function __isset($option) { 00386 return isset(self::$_optionsHandlers[$option]); 00387 } 00388 } 00389 00390 /* ------------------------------------------------------------------------- */ 00391 00392 class Protocol { 00393 const NEWLINE = "\r\n"; 00394 const OK = 'OK'; 00395 const ERROR = 'ERR'; 00396 const QUEUED = 'QUEUED'; 00397 const NULL = 'nil'; 00398 00399 const PREFIX_STATUS = '+'; 00400 const PREFIX_ERROR = '-'; 00401 const PREFIX_INTEGER = ':'; 00402 const PREFIX_BULK = '$'; 00403 const PREFIX_MULTI_BULK = '*'; 00404 } 00405 00406 interface ICommand { 00407 public function getCommandId(); 00408 public function canBeHashed(); 00409 public function closesConnection(); 00410 public function getHash(IDistributionStrategy $distributor); 00411 public function setArgumentsArray(Array $arguments); 00412 public function getArguments(); 00413 public function parseResponse($data); 00414 public function serialize(); 00415 } 00416 00417 abstract class Command implements ICommand { 00418 private $_arguments, $_hash; 00419 00420 protected function serializeRequest($command, $arguments) { 00421 $newline = Protocol::NEWLINE; 00422 $cmdlen = strlen($command); 00423 $reqlen = count($arguments) + 1; 00424 00425 $buffer = "*{$reqlen}{$newline}\${$cmdlen}{$newline}{$command}{$newline}"; 00426 foreach ($arguments as $argument) { 00427 $arglen = strlen($argument); 00428 $buffer .= "\${$arglen}{$newline}{$argument}{$newline}"; 00429 } 00430 00431 return $buffer; 00432 } 00433 00434 public function canBeHashed() { 00435 return true; 00436 } 00437 00438 public function getHash(IDistributionStrategy $distributor) { 00439 if (isset($this->_hash)) { 00440 return $this->_hash; 00441 } 00442 else { 00443 if (isset($this->_arguments[0])) { 00444 // TODO: should we throw an exception if the command does 00445 // not support sharding? 00446 $key = $this->_arguments[0]; 00447 00448 $start = strpos($key, '{'); 00449 $end = strpos($key, '}'); 00450 if ($start !== false && $end !== false) { 00451 $key = substr($key, ++$start, $end - $start); 00452 } 00453 00454 $this->_hash = $distributor->generateKey($key); 00455 return $this->_hash; 00456 } 00457 } 00458 return null; 00459 } 00460 00461 public function closesConnection() { 00462 return false; 00463 } 00464 00465 protected function filterArguments(Array $arguments) { 00466 return $arguments; 00467 } 00468 00469 public function setArguments(/* arguments */) { 00470 $this->_arguments = $this->filterArguments(func_get_args()); 00471 unset($this->_hash); 00472 } 00473 00474 public function setArgumentsArray(Array $arguments) { 00475 $this->_arguments = $this->filterArguments($arguments); 00476 unset($this->_hash); 00477 } 00478 00479 public function getArguments() { 00480 return isset($this->_arguments) ? $this->_arguments : array(); 00481 } 00482 00483 public function getArgument($index = 0) { 00484 return isset($this->_arguments[$index]) ? $this->_arguments[$index] : null; 00485 } 00486 00487 public function parseResponse($data) { 00488 return $data; 00489 } 00490 00491 public final function serialize() { 00492 return $this->serializeRequest($this->getCommandId(), $this->getArguments()); 00493 } 00494 } 00495 00496 /* ------------------------------------------------------------------------- */ 00497 00498 interface IResponseHandler { 00499 function handle(IConnectionSingle $connection, $payload); 00500 } 00501 00502 class ResponseStatusHandler implements IResponseHandler { 00503 public function handle(IConnectionSingle $connection, $status) { 00504 if ($status === Protocol::OK) { 00505 return true; 00506 } 00507 else if ($status === Protocol::QUEUED) { 00508 return new ResponseQueued(); 00509 } 00510 return $status; 00511 } 00512 } 00513 00514 class ResponseErrorHandler implements IResponseHandler { 00515 public function handle(IConnectionSingle $connection, $errorMessage) { 00516 throw new ServerException(substr($errorMessage, 4)); 00517 } 00518 } 00519 00520 class ResponseErrorSilentHandler implements IResponseHandler { 00521 public function handle(IConnectionSingle $connection, $errorMessage) { 00522 return new ResponseError(substr($errorMessage, 4)); 00523 } 00524 } 00525 00526 class ResponseBulkHandler implements IResponseHandler { 00527 public function handle(IConnectionSingle $connection, $dataLength) { 00528 if (!is_numeric($dataLength)) { 00529 Utils::onCommunicationException(new MalformedServerResponse( 00530 $connection, "Cannot parse '$dataLength' as data length" 00531 )); 00532 } 00533 00534 if ($dataLength > 0) { 00535 $value = $connection->readBytes($dataLength); 00536 self::discardNewLine($connection); 00537 return $value; 00538 } 00539 else if ($dataLength == 0) { 00540 self::discardNewLine($connection); 00541 return ''; 00542 } 00543 00544 return null; 00545 } 00546 00547 private static function discardNewLine(IConnectionSingle $connection) { 00548 if ($connection->readBytes(2) !== Protocol::NEWLINE) { 00549 Utils::onCommunicationException(new MalformedServerResponse( 00550 $connection, 'Did not receive a new-line at the end of a bulk response' 00551 )); 00552 } 00553 } 00554 } 00555 00556 class ResponseMultiBulkHandler implements IResponseHandler { 00557 public function handle(IConnectionSingle $connection, $rawLength) { 00558 if (!is_numeric($rawLength)) { 00559 Utils::onCommunicationException(new MalformedServerResponse( 00560 $connection, "Cannot parse '$rawLength' as data length" 00561 )); 00562 } 00563 00564 $listLength = (int) $rawLength; 00565 if ($listLength === -1) { 00566 return null; 00567 } 00568 00569 $list = array(); 00570 00571 if ($listLength > 0) { 00572 $reader = $connection->getResponseReader(); 00573 for ($i = 0; $i < $listLength; $i++) { 00574 $list[] = $reader->read($connection); 00575 } 00576 } 00577 00578 return $list; 00579 } 00580 } 00581 00582 class ResponseMultiBulkStreamHandler implements IResponseHandler { 00583 public function handle(IConnectionSingle $connection, $rawLength) { 00584 if (!is_numeric($rawLength)) { 00585 Utils::onCommunicationException(new MalformedServerResponse( 00586 $connection, "Cannot parse '$rawLength' as data length" 00587 )); 00588 } 00589 return new Shared\MultiBulkResponseIterator($connection, (int)$rawLength); 00590 } 00591 } 00592 00593 class ResponseIntegerHandler implements IResponseHandler { 00594 public function handle(IConnectionSingle $connection, $number) { 00595 if (is_numeric($number)) { 00596 return (int) $number; 00597 } 00598 else { 00599 if ($number !== Protocol::NULL) { 00600 Utils::onCommunicationException(new MalformedServerResponse( 00601 $connection, "Cannot parse '$number' as numeric response" 00602 )); 00603 } 00604 return null; 00605 } 00606 } 00607 } 00608 00609 class ResponseReader { 00610 private $_prefixHandlers; 00611 00612 public function __construct() { 00613 $this->initializePrefixHandlers(); 00614 } 00615 00616 private function initializePrefixHandlers() { 00617 $this->_prefixHandlers = array( 00618 Protocol::PREFIX_STATUS => new ResponseStatusHandler(), 00619 Protocol::PREFIX_ERROR => new ResponseErrorHandler(), 00620 Protocol::PREFIX_INTEGER => new ResponseIntegerHandler(), 00621 Protocol::PREFIX_BULK => new ResponseBulkHandler(), 00622 Protocol::PREFIX_MULTI_BULK => new ResponseMultiBulkHandler(), 00623 ); 00624 } 00625 00626 public function setHandler($prefix, IResponseHandler $handler) { 00627 $this->_prefixHandlers[$prefix] = $handler; 00628 } 00629 00630 public function getHandler($prefix) { 00631 if (isset($this->_prefixHandlers[$prefix])) { 00632 return $this->_prefixHandlers[$prefix]; 00633 } 00634 } 00635 00636 public function read(IConnectionSingle $connection) { 00637 $header = $connection->readLine(); 00638 if ($header === '') { 00639 Utils::onCommunicationException(new MalformedServerResponse( 00640 $connection, 'Unexpected empty header' 00641 )); 00642 } 00643 00644 $prefix = $header[0]; 00645 $payload = strlen($header) > 1 ? substr($header, 1) : ''; 00646 00647 if (!isset($this->_prefixHandlers[$prefix])) { 00648 Utils::onCommunicationException(new MalformedServerResponse( 00649 $connection, "Unknown prefix '$prefix'" 00650 )); 00651 } 00652 00653 $handler = $this->_prefixHandlers[$prefix]; 00654 return $handler->handle($connection, $payload); 00655 } 00656 } 00657 00658 class ResponseError { 00659 private $_message; 00660 00661 public function __construct($message) { 00662 $this->_message = $message; 00663 } 00664 00665 public function __get($property) { 00666 if ($property == 'error') { 00667 return true; 00668 } 00669 if ($property == 'message') { 00670 return $this->_message; 00671 } 00672 } 00673 00674 public function __isset($property) { 00675 return $property === 'error'; 00676 } 00677 00678 public function __toString() { 00679 return $this->_message; 00680 } 00681 } 00682 00683 class ResponseQueued { 00684 public $queued = true; 00685 00686 public function __toString() { 00687 return Protocol::QUEUED; 00688 } 00689 } 00690 00691 /* ------------------------------------------------------------------------- */ 00692 00693 use Predis\Pipeline\IPipelineExecutor; 00694 00695 class CommandPipeline { 00696 private $_redisClient, $_pipelineBuffer, $_returnValues, $_running, $_executor; 00697 00698 public function __construct(Client $redisClient, IPipelineExecutor $executor = null) { 00699 $this->_redisClient = $redisClient; 00700 $this->_executor = $executor ?: new Pipeline\StandardExecutor(); 00701 $this->_pipelineBuffer = array(); 00702 $this->_returnValues = array(); 00703 } 00704 00705 public function __call($method, $arguments) { 00706 $command = $this->_redisClient->createCommand($method, $arguments); 00707 $this->recordCommand($command); 00708 return $this; 00709 } 00710 00711 private function recordCommand(ICommand $command) { 00712 $this->_pipelineBuffer[] = $command; 00713 } 00714 00715 private function getRecordedCommands() { 00716 return $this->_pipelineBuffer; 00717 } 00718 00719 public function flushPipeline() { 00720 if (count($this->_pipelineBuffer) > 0) { 00721 $connection = $this->_redisClient->getConnection(); 00722 $this->_returnValues = array_merge( 00723 $this->_returnValues, 00724 $this->_executor->execute($connection, $this->_pipelineBuffer) 00725 ); 00726 $this->_pipelineBuffer = array(); 00727 } 00728 return $this; 00729 } 00730 00731 private function setRunning($bool) { 00732 if ($bool == true && $this->_running == true) { 00733 throw new ClientException("This pipeline is already opened"); 00734 } 00735 $this->_running = $bool; 00736 } 00737 00738 public function execute($block = null) { 00739 if ($block && !is_callable($block)) { 00740 throw new \InvalidArgumentException('Argument passed must be a callable object'); 00741 } 00742 00743 // TODO: do not reuse previously executed pipelines 00744 $this->setRunning(true); 00745 $pipelineBlockException = null; 00746 00747 try { 00748 if ($block !== null) { 00749 $block($this); 00750 } 00751 $this->flushPipeline(); 00752 } 00753 catch (\Exception $exception) { 00754 $pipelineBlockException = $exception; 00755 } 00756 00757 $this->setRunning(false); 00758 00759 if ($pipelineBlockException !== null) { 00760 throw $pipelineBlockException; 00761 } 00762 00763 return $this->_returnValues; 00764 } 00765 } 00766 00767 class MultiExecBlock { 00768 private $_initialized, $_discarded, $_insideBlock; 00769 private $_redisClient, $_options, $_commands; 00770 private $_supportsWatch; 00771 00772 public function __construct(Client $redisClient, Array $options = null) { 00773 $this->checkCapabilities($redisClient); 00774 $this->_initialized = false; 00775 $this->_discarded = false; 00776 $this->_insideBlock = false; 00777 $this->_redisClient = $redisClient; 00778 $this->_options = $options ?: array(); 00779 $this->_commands = array(); 00780 } 00781 00782 private function checkCapabilities(Client $redisClient) { 00783 if (Utils::isCluster($redisClient->getConnection())) { 00784 throw new ClientException( 00785 'Cannot initialize a MULTI/EXEC context over a cluster of connections' 00786 ); 00787 } 00788 $profile = $redisClient->getProfile(); 00789 if ($profile->supportsCommands(array('multi', 'exec', 'discard')) === false) { 00790 throw new ClientException( 00791 'The current profile does not support MULTI, EXEC and DISCARD commands' 00792 ); 00793 } 00794 $this->_supportsWatch = $profile->supportsCommands(array('watch', 'unwatch')); 00795 } 00796 00797 private function isWatchSupported() { 00798 if ($this->_supportsWatch === false) { 00799 throw new ClientException( 00800 'The current profile does not support WATCH and UNWATCH commands' 00801 ); 00802 } 00803 } 00804 00805 private function initialize() { 00806 if ($this->_initialized === false) { 00807 if (isset($this->_options['watch'])) { 00808 $this->watch($this->_options['watch']); 00809 } 00810 $this->_redisClient->multi(); 00811 $this->_initialized = true; 00812 $this->_discarded = false; 00813 } 00814 } 00815 00816 private function setInsideBlock($value) { 00817 $this->_insideBlock = $value; 00818 } 00819 00820 public function __call($method, $arguments) { 00821 $this->initialize(); 00822 $command = $this->_redisClient->createCommand($method, $arguments); 00823 $response = $this->_redisClient->executeCommand($command); 00824 if (isset($response->queued)) { 00825 $this->_commands[] = $command; 00826 return $this; 00827 } 00828 else { 00829 $this->malformedServerResponse('The server did not respond with a QUEUED status reply'); 00830 } 00831 } 00832 00833 public function watch($keys) { 00834 $this->isWatchSupported(); 00835 if ($this->_initialized === true) { 00836 throw new ClientException('WATCH inside MULTI is not allowed'); 00837 } 00838 00839 $reply = null; 00840 if (is_array($keys)) { 00841 $reply = array(); 00842 foreach ($keys as $key) { 00843 $reply = $this->_redisClient->watch($keys); 00844 } 00845 } 00846 else { 00847 $reply = $this->_redisClient->watch($keys); 00848 } 00849 return $reply; 00850 } 00851 00852 public function multi() { 00853 $this->initialize(); 00854 } 00855 00856 public function unwatch() { 00857 $this->isWatchSupported(); 00858 $this->_redisClient->unwatch(); 00859 } 00860 00861 public function discard() { 00862 $this->_redisClient->discard(); 00863 $this->_commands = array(); 00864 $this->_initialized = false; 00865 $this->_discarded = true; 00866 } 00867 00868 public function exec() { 00869 return $this->execute(); 00870 } 00871 00872 public function execute($block = null) { 00873 if ($this->_insideBlock === true) { 00874 throw new ClientException( 00875 "Cannot invoke 'execute' or 'exec' inside an active client transaction block" 00876 ); 00877 } 00878 00879 if ($block && !is_callable($block)) { 00880 throw new \InvalidArgumentException('Argument passed must be a callable object'); 00881 } 00882 00883 $blockException = null; 00884 $returnValues = array(); 00885 00886 if ($block !== null) { 00887 $this->setInsideBlock(true); 00888 try { 00889 $block($this); 00890 } 00891 catch (CommunicationException $exception) { 00892 $blockException = $exception; 00893 } 00894 catch (ServerException $exception) { 00895 $blockException = $exception; 00896 } 00897 catch (\Exception $exception) { 00898 $blockException = $exception; 00899 if ($this->_initialized === true) { 00900 $this->discard(); 00901 } 00902 } 00903 $this->setInsideBlock(false); 00904 if ($blockException !== null) { 00905 throw $blockException; 00906 } 00907 } 00908 00909 if ($this->_initialized === false) { 00910 return; 00911 } 00912 00913 $reply = $this->_redisClient->exec(); 00914 if ($reply === null) { 00915 throw new AbortedMultiExec('The current transaction has been aborted by the server'); 00916 } 00917 00918 $execReply = $reply instanceof \Iterator ? iterator_to_array($reply) : $reply; 00919 $commands = &$this->_commands; 00920 $sizeofReplies = count($execReply); 00921 00922 if ($sizeofReplies !== count($commands)) { 00923 $this->malformedServerResponse('Unexpected number of responses for a MultiExecBlock'); 00924 } 00925 00926 for ($i = 0; $i < $sizeofReplies; $i++) { 00927 $returnValues[] = $commands[$i]->parseResponse($execReply[$i] instanceof \Iterator 00928 ? iterator_to_array($execReply[$i]) 00929 : $execReply[$i] 00930 ); 00931 unset($commands[$i]); 00932 } 00933 00934 return $returnValues; 00935 } 00936 00937 private function malformedServerResponse($message) { 00938 // NOTE: a MULTI/EXEC block cannot be initialized on a clustered 00939 // connection, which means that Predis\Client::getConnection 00940 // will always return an instance of Predis\Connection. 00941 Utils::onCommunicationException(new MalformedServerResponse( 00942 $this->_redisClient->getConnection(), $message 00943 )); 00944 } 00945 } 00946 00947 class PubSubContext implements \Iterator { 00948 const SUBSCRIBE = 'subscribe'; 00949 const UNSUBSCRIBE = 'unsubscribe'; 00950 const PSUBSCRIBE = 'psubscribe'; 00951 const PUNSUBSCRIBE = 'punsubscribe'; 00952 const MESSAGE = 'message'; 00953 const PMESSAGE = 'pmessage'; 00954 00955 const STATUS_VALID = 0x0001; 00956 const STATUS_SUBSCRIBED = 0x0010; 00957 const STATUS_PSUBSCRIBED = 0x0100; 00958 00959 private $_redisClient, $_subscriptions, $_isStillValid, $_position; 00960 00961 public function __construct(Client $redisClient) { 00962 $this->checkCapabilities($redisClient); 00963 $this->_redisClient = $redisClient; 00964 $this->_statusFlags = self::STATUS_VALID; 00965 } 00966 00967 public function __destruct() { 00968 $this->closeContext(); 00969 } 00970 00971 private function checkCapabilities(Client $redisClient) { 00972 if (Utils::isCluster($redisClient->getConnection())) { 00973 throw new ClientException( 00974 'Cannot initialize a PUB/SUB context over a cluster of connections' 00975 ); 00976 } 00977 $profile = $redisClient->getProfile(); 00978 $commands = array('publish', 'subscribe', 'unsubscribe', 'psubscribe', 'punsubscribe'); 00979 if ($profile->supportsCommands($commands) === false) { 00980 throw new ClientException( 00981 'The current profile does not support PUB/SUB related commands' 00982 ); 00983 } 00984 } 00985 00986 private function isFlagSet($value) { 00987 return ($this->_statusFlags & $value) === $value; 00988 } 00989 00990 public function subscribe(/* arguments */) { 00991 $this->writeCommand(self::SUBSCRIBE, func_get_args()); 00992 $this->_statusFlags |= self::STATUS_SUBSCRIBED; 00993 } 00994 00995 public function unsubscribe(/* arguments */) { 00996 $this->writeCommand(self::UNSUBSCRIBE, func_get_args()); 00997 } 00998 00999 public function psubscribe(/* arguments */) { 01000 $this->writeCommand(self::PSUBSCRIBE, func_get_args()); 01001 $this->_statusFlags |= self::STATUS_PSUBSCRIBED; 01002 } 01003 01004 public function punsubscribe(/* arguments */) { 01005 $this->writeCommand(self::PUNSUBSCRIBE, func_get_args()); 01006 } 01007 01008 public function closeContext() { 01009 if ($this->valid()) { 01010 if ($this->isFlagSet(self::STATUS_SUBSCRIBED)) { 01011 $this->unsubscribe(); 01012 } 01013 if ($this->isFlagSet(self::STATUS_PSUBSCRIBED)) { 01014 $this->punsubscribe(); 01015 } 01016 } 01017 } 01018 01019 private function writeCommand($method, $arguments) { 01020 if (count($arguments) === 1 && is_array($arguments[0])) { 01021 $arguments = $arguments[0]; 01022 } 01023 $command = $this->_redisClient->createCommand($method, $arguments); 01024 $this->_redisClient->getConnection()->writeCommand($command); 01025 } 01026 01027 public function rewind() { 01028 // NOOP 01029 } 01030 01031 public function current() { 01032 return $this->getValue(); 01033 } 01034 01035 public function key() { 01036 return $this->_position; 01037 } 01038 01039 public function next() { 01040 if ($this->isFlagSet(self::STATUS_VALID)) { 01041 $this->_position++; 01042 } 01043 return $this->_position; 01044 } 01045 01046 public function valid() { 01047 $subscriptions = self::STATUS_SUBSCRIBED + self::STATUS_PSUBSCRIBED; 01048 return $this->isFlagSet(self::STATUS_VALID) 01049 && ($this->_statusFlags & $subscriptions) > 0; 01050 } 01051 01052 private function invalidate() { 01053 $this->_statusFlags = 0x0000; 01054 } 01055 01056 private function getValue() { 01057 $reader = $this->_redisClient->getResponseReader(); 01058 $connection = $this->_redisClient->getConnection(); 01059 $response = $reader->read($connection); 01060 01061 switch ($response[0]) { 01062 case self::SUBSCRIBE: 01063 case self::UNSUBSCRIBE: 01064 case self::PSUBSCRIBE: 01065 case self::PUNSUBSCRIBE: 01066 if ($response[2] === 0) { 01067 $this->invalidate(); 01068 } 01069 case self::MESSAGE: 01070 return (object) array( 01071 'kind' => $response[0], 01072 'channel' => $response[1], 01073 'payload' => $response[2], 01074 ); 01075 case self::PMESSAGE: 01076 return (object) array( 01077 'kind' => $response[0], 01078 'pattern' => $response[1], 01079 'channel' => $response[2], 01080 'payload' => $response[3], 01081 ); 01082 default: 01083 throw new ClientException( 01084 "Received an unknown message type {$response[0]} inside of a pubsub context" 01085 ); 01086 } 01087 } 01088 } 01089 01090 /* ------------------------------------------------------------------------- */ 01091 01092 class ConnectionParameters { 01093 const DEFAULT_SCHEME = 'tcp'; 01094 const DEFAULT_HOST = '127.0.0.1'; 01095 const DEFAULT_PORT = 6379; 01096 const DEFAULT_TIMEOUT = 5; 01097 private $_parameters; 01098 01099 public function __construct($parameters = null) { 01100 $parameters = $parameters ?: array(); 01101 $this->_parameters = is_array($parameters) 01102 ? self::filterConnectionParams($parameters) 01103 : self::parseURI($parameters); 01104 } 01105 01106 private static function paramsExtractor($params, $kv) { 01107 @list($k, $v) = explode('=', $kv); 01108 $params[$k] = $v; 01109 return $params; 01110 } 01111 01112 private static function parseURI($uri) { 01113 $parsed = @parse_url($uri); 01114 if ($parsed == false || $parsed['host'] == null) { 01115 throw new ClientException("Invalid URI: $uri"); 01116 } 01117 if (array_key_exists('query', $parsed)) { 01118 $query = explode('&', $parsed['query']); 01119 $parsed = array_reduce($query, 'self::paramsExtractor', $parsed); 01120 } 01121 return self::filterConnectionParams($parsed); 01122 } 01123 01124 private static function getParamOrDefault(Array $parameters, $param, $default = null) { 01125 return array_key_exists($param, $parameters) ? $parameters[$param] : $default; 01126 } 01127 01128 private static function filterConnectionParams($parameters) { 01129 return array( 01130 'scheme' => self::getParamOrDefault($parameters, 'scheme', self::DEFAULT_SCHEME), 01131 'host' => self::getParamOrDefault($parameters, 'host', self::DEFAULT_HOST), 01132 'port' => (int) self::getParamOrDefault($parameters, 'port', self::DEFAULT_PORT), 01133 'database' => self::getParamOrDefault($parameters, 'database'), 01134 'password' => self::getParamOrDefault($parameters, 'password'), 01135 'connection_async' => self::getParamOrDefault($parameters, 'connection_async', false), 01136 'connection_persistent' => self::getParamOrDefault($parameters, 'connection_persistent', false), 01137 'connection_timeout' => self::getParamOrDefault($parameters, 'connection_timeout', self::DEFAULT_TIMEOUT), 01138 'read_write_timeout' => self::getParamOrDefault($parameters, 'read_write_timeout'), 01139 'alias' => self::getParamOrDefault($parameters, 'alias'), 01140 'weight' => self::getParamOrDefault($parameters, 'weight'), 01141 ); 01142 } 01143 01144 public function __get($parameter) { 01145 return $this->_parameters[$parameter]; 01146 } 01147 01148 public function __isset($parameter) { 01149 return isset($this->_parameters[$parameter]); 01150 } 01151 } 01152 01153 interface IConnection { 01154 public function connect(); 01155 public function disconnect(); 01156 public function isConnected(); 01157 public function writeCommand(ICommand $command); 01158 public function readResponse(ICommand $command); 01159 public function executeCommand(ICommand $command); 01160 } 01161 01162 interface IConnectionSingle extends IConnection { 01163 public function writeBytes($buffer); 01164 public function readBytes($length); 01165 public function readLine(); 01166 } 01167 01168 interface IConnectionCluster extends IConnection { 01169 public function getConnection(ICommand $command); 01170 public function getConnectionById($connectionId); 01171 } 01172 01173 final class ConnectionFactory { 01174 private static $_registeredSchemes; 01175 01176 private function __construct() { 01177 // NOOP 01178 } 01179 01180 private static function ensureInitialized() { 01181 if (!isset(self::$_registeredSchemes)) { 01182 self::$_registeredSchemes = self::getDefaultSchemes(); 01183 } 01184 } 01185 01186 private static function getDefaultSchemes() { 01187 return array( 01188 'tcp' => '\Predis\TcpConnection', 01189 'redis' => '\Predis\TcpConnection', // compatibility with older versions 01190 ); 01191 } 01192 01193 public static function registerScheme($scheme, $connectionClass) { 01194 self::ensureInitialized(); 01195 $connectionReflection = new \ReflectionClass($connectionClass); 01196 if (!$connectionReflection->isSubclassOf('\Predis\IConnectionSingle')) { 01197 throw new ClientException( 01198 "Cannot register '$connectionClass' as it is not a valid connection class" 01199 ); 01200 } 01201 self::$_registeredSchemes[$scheme] = $connectionClass; 01202 } 01203 01204 public static function create(ConnectionParameters $parameters, ResponseReader $reader) { 01205 self::ensureInitialized(); 01206 if (!isset(self::$_registeredSchemes[$parameters->scheme])) { 01207 throw new ClientException("Unknown connection scheme: {$parameters->scheme}"); 01208 } 01209 $connection = self::$_registeredSchemes[$parameters->scheme]; 01210 return new $connection($parameters, $reader); 01211 } 01212 } 01213 01214 abstract class Connection implements IConnectionSingle { 01215 protected $_params, $_socket, $_initCmds, $_reader; 01216 01217 public function __construct(ConnectionParameters $parameters, ResponseReader $reader = null) { 01218 $this->_params = $parameters; 01219 $this->_initCmds = array(); 01220 $this->_reader = $reader ?: new ResponseReader(); 01221 } 01222 01223 public function __destruct() { 01224 $this->disconnect(); 01225 } 01226 01227 public function isConnected() { 01228 return is_resource($this->_socket); 01229 } 01230 01231 protected abstract function createResource(); 01232 01233 public function connect() { 01234 if ($this->isConnected()) { 01235 throw new ClientException('Connection already estabilished'); 01236 } 01237 $this->createResource(); 01238 } 01239 01240 public function disconnect() { 01241 if ($this->isConnected()) { 01242 fclose($this->_socket); 01243 } 01244 } 01245 01246 public function pushInitCommand(ICommand $command){ 01247 $this->_initCmds[] = $command; 01248 } 01249 01250 public function executeCommand(ICommand $command) { 01251 $this->writeCommand($command); 01252 if ($command->closesConnection()) { 01253 return $this->disconnect(); 01254 } 01255 return $this->readResponse($command); 01256 } 01257 01258 protected function onCommunicationException($message, $code = null) { 01259 Utils::onCommunicationException( 01260 new CommunicationException($this, $message, $code) 01261 ); 01262 } 01263 01264 public function getSocket() { 01265 if (!$this->isConnected()) { 01266 $this->connect(); 01267 } 01268 return $this->_socket; 01269 } 01270 01271 public function getResponseReader() { 01272 return $this->_reader; 01273 } 01274 01275 public function getParameters() { 01276 return $this->_params; 01277 } 01278 01279 public function __toString() { 01280 return sprintf('%s:%d', $this->_params->host, $this->_params->port); 01281 } 01282 } 01283 01284 class TcpConnection extends Connection implements IConnectionSingle { 01285 public function __construct(ConnectionParameters $parameters, ResponseReader $reader = null) { 01286 parent::__construct($this->checkParameters($parameters), $reader); 01287 } 01288 01289 public function __destruct() { 01290 if (!$this->_params->connection_persistent) { 01291 $this->disconnect(); 01292 } 01293 } 01294 01295 private function checkParameters(ConnectionParameters $parameters) { 01296 if ($parameters->scheme != 'tcp' && $parameters->scheme != 'redis') { 01297 throw new \InvalidArgumentException("Invalid scheme: {$parameters->scheme}"); 01298 } 01299 return $parameters; 01300 } 01301 01302 protected function createResource() { 01303 $uri = sprintf('tcp://%s:%d/', $this->_params->host, $this->_params->port); 01304 $connectFlags = STREAM_CLIENT_CONNECT; 01305 if ($this->_params->connection_async) { 01306 $connectFlags |= STREAM_CLIENT_ASYNC_CONNECT; 01307 } 01308 if ($this->_params->connection_persistent) { 01309 $connectFlags |= STREAM_CLIENT_PERSISTENT; 01310 } 01311 $this->_socket = @stream_socket_client( 01312 $uri, $errno, $errstr, $this->_params->connection_timeout, $connectFlags 01313 ); 01314 01315 if (!$this->_socket) { 01316 $this->onCommunicationException(trim($errstr), $errno); 01317 } 01318 01319 if (isset($this->_params->read_write_timeout)) { 01320 $timeoutSeconds = floor($this->_params->read_write_timeout); 01321 $timeoutUSeconds = ($this->_params->read_write_timeout - $timeoutSeconds) * 1000000; 01322 stream_set_timeout($this->_socket, $timeoutSeconds, $timeoutUSeconds); 01323 } 01324 } 01325 01326 private function sendInitializationCommands() { 01327 foreach ($this->_initCmds as $command) { 01328 $this->writeCommand($command); 01329 } 01330 foreach ($this->_initCmds as $command) { 01331 $this->readResponse($command); 01332 } 01333 } 01334 01335 public function connect() { 01336 parent::connect(); 01337 if (count($this->_initCmds) > 0){ 01338 $this->sendInitializationCommands(); 01339 } 01340 } 01341 01342 public function writeCommand(ICommand $command) { 01343 $this->writeBytes($command->serialize()); 01344 } 01345 01346 public function readResponse(ICommand $command) { 01347 $response = $this->_reader->read($this); 01348 $skipparse = isset($response->queued) || isset($response->error); 01349 return $skipparse ? $response : $command->parseResponse($response); 01350 } 01351 01352 public function rawCommand($rawCommandData, $closesConnection = false) { 01353 $this->writeBytes($rawCommandData); 01354 if ($closesConnection) { 01355 $this->disconnect(); 01356 return; 01357 } 01358 return $this->_reader->read($this); 01359 } 01360 01361 public function writeBytes($value) { 01362 $socket = $this->getSocket(); 01363 while (($length = strlen($value)) > 0) { 01364 $written = fwrite($socket, $value); 01365 if ($length === $written) { 01366 return true; 01367 } 01368 if ($written === false || $written === 0) { 01369 $this->onCommunicationException('Error while writing bytes to the server'); 01370 } 01371 $value = substr($value, $written); 01372 } 01373 return true; 01374 } 01375 01376 public function readBytes($length) { 01377 if ($length == 0) { 01378 throw new \InvalidArgumentException('Length parameter must be greater than 0'); 01379 } 01380 $socket = $this->getSocket(); 01381 $value = ''; 01382 do { 01383 $chunk = fread($socket, $length); 01384 if ($chunk === false || $chunk === '') { 01385 $this->onCommunicationException('Error while reading bytes from the server'); 01386 } 01387 $value .= $chunk; 01388 } 01389 while (($length -= strlen($chunk)) > 0); 01390 return $value; 01391 } 01392 01393 public function readLine() { 01394 $socket = $this->getSocket(); 01395 $value = ''; 01396 do { 01397 $chunk = fgets($socket); 01398 if ($chunk === false || strlen($chunk) == 0) { 01399 $this->onCommunicationException('Error while reading line from the server'); 01400 } 01401 $value .= $chunk; 01402 } 01403 while (substr($value, -2) !== Protocol::NEWLINE); 01404 return substr($value, 0, -2); 01405 } 01406 } 01407 01408 class ConnectionCluster implements IConnectionCluster, \IteratorAggregate { 01409 private $_pool, $_distributor; 01410 01411 public function __construct(IDistributionStrategy $distributor = null) { 01412 $this->_pool = array(); 01413 $this->_distributor = $distributor ?: new Distribution\HashRing(); 01414 } 01415 01416 public function isConnected() { 01417 foreach ($this->_pool as $connection) { 01418 if ($connection->isConnected()) { 01419 return true; 01420 } 01421 } 01422 return false; 01423 } 01424 01425 public function connect() { 01426 foreach ($this->_pool as $connection) { 01427 $connection->connect(); 01428 } 01429 } 01430 01431 public function disconnect() { 01432 foreach ($this->_pool as $connection) { 01433 $connection->disconnect(); 01434 } 01435 } 01436 01437 public function add(IConnectionSingle $connection) { 01438 $parameters = $connection->getParameters(); 01439 if (isset($parameters->alias)) { 01440 $this->_pool[$parameters->alias] = $connection; 01441 } 01442 else { 01443 $this->_pool[] = $connection; 01444 } 01445 $this->_distributor->add($connection, $parameters->weight); 01446 } 01447 01448 public function getConnection(ICommand $command) { 01449 if ($command->canBeHashed() === false) { 01450 throw new ClientException( 01451 sprintf("Cannot send '%s' commands to a cluster of connections", $command->getCommandId()) 01452 ); 01453 } 01454 return $this->_distributor->get($command->getHash($this->_distributor)); 01455 } 01456 01457 public function getConnectionById($id = null) { 01458 $alias = $id ?: 0; 01459 return isset($this->_pool[$alias]) ? $this->_pool[$alias] : null; 01460 } 01461 01462 public function getIterator() { 01463 return new \ArrayIterator($this->_pool); 01464 } 01465 01466 public function writeCommand(ICommand $command) { 01467 $this->getConnection($command)->writeCommand($command); 01468 } 01469 01470 public function readResponse(ICommand $command) { 01471 return $this->getConnection($command)->readResponse($command); 01472 } 01473 01474 public function executeCommand(ICommand $command) { 01475 $connection = $this->getConnection($command); 01476 $connection->writeCommand($command); 01477 return $connection->readResponse($command); 01478 } 01479 } 01480 01481 /* ------------------------------------------------------------------------- */ 01482 01483 abstract class RedisServerProfile { 01484 private static $_serverProfiles; 01485 private $_registeredCommands; 01486 01487 public function __construct() { 01488 $this->_registeredCommands = $this->getSupportedCommands(); 01489 } 01490 01491 public abstract function getVersion(); 01492 01493 protected abstract function getSupportedCommands(); 01494 01495 public static function getDefault() { 01496 return self::get('default'); 01497 } 01498 01499 public static function getDevelopment() { 01500 return self::get('dev'); 01501 } 01502 01503 private static function predisServerProfiles() { 01504 return array( 01505 '1.2' => '\Predis\RedisServer_v1_2', 01506 '2.0' => '\Predis\RedisServer_v2_0', 01507 'default' => '\Predis\RedisServer_v2_0', 01508 'dev' => '\Predis\RedisServer_vNext', 01509 ); 01510 } 01511 01512 public static function registerProfile($profileClass, $aliases) { 01513 if (!isset(self::$_serverProfiles)) { 01514 self::$_serverProfiles = self::predisServerProfiles(); 01515 } 01516 01517 $profileReflection = new \ReflectionClass($profileClass); 01518 01519 if (!$profileReflection->isSubclassOf('\Predis\RedisServerProfile')) { 01520 throw new ClientException("Cannot register '$profileClass' as it is not a valid profile class"); 01521 } 01522 01523 if (is_array($aliases)) { 01524 foreach ($aliases as $alias) { 01525 self::$_serverProfiles[$alias] = $profileClass; 01526 } 01527 } 01528 else { 01529 self::$_serverProfiles[$aliases] = $profileClass; 01530 } 01531 } 01532 01533 public static function get($version) { 01534 if (!isset(self::$_serverProfiles)) { 01535 self::$_serverProfiles = self::predisServerProfiles(); 01536 } 01537 if (!isset(self::$_serverProfiles[$version])) { 01538 throw new ClientException("Unknown server profile: $version"); 01539 } 01540 $profile = self::$_serverProfiles[$version]; 01541 return new $profile(); 01542 } 01543 01544 public function supportsCommands(Array $commands) { 01545 foreach ($commands as $command) { 01546 if ($this->supportsCommand($command) === false) { 01547 return false; 01548 } 01549 } 01550 return true; 01551 } 01552 01553 public function supportsCommand($command) { 01554 return isset($this->_registeredCommands[$command]); 01555 } 01556 01557 public function createCommand($method, $arguments = array()) { 01558 if (!isset($this->_registeredCommands[$method])) { 01559 throw new ClientException("'$method' is not a registered Redis command"); 01560 } 01561 $commandClass = $this->_registeredCommands[$method]; 01562 $command = new $commandClass(); 01563 $command->setArgumentsArray($arguments); 01564 return $command; 01565 } 01566 01567 public function registerCommands(Array $commands) { 01568 foreach ($commands as $command => $aliases) { 01569 $this->registerCommand($command, $aliases); 01570 } 01571 } 01572 01573 public function registerCommand($command, $aliases) { 01574 $commandReflection = new \ReflectionClass($command); 01575 01576 if (!$commandReflection->isSubclassOf('\Predis\Command')) { 01577 throw new ClientException("Cannot register '$command' as it is not a valid Redis command"); 01578 } 01579 01580 if (is_array($aliases)) { 01581 foreach ($aliases as $alias) { 01582 $this->_registeredCommands[$alias] = $command; 01583 } 01584 } 01585 else { 01586 $this->_registeredCommands[$aliases] = $command; 01587 } 01588 } 01589 01590 public function __toString() { 01591 return $this->getVersion(); 01592 } 01593 } 01594 01595 class RedisServer_v1_2 extends RedisServerProfile { 01596 public function getVersion() { return '1.2'; } 01597 public function getSupportedCommands() { 01598 return array( 01599 /* miscellaneous commands */ 01600 'ping' => '\Predis\Commands\Ping', 01601 'echo' => '\Predis\Commands\DoEcho', 01602 'auth' => '\Predis\Commands\Auth', 01603 01604 /* connection handling */ 01605 'quit' => '\Predis\Commands\Quit', 01606 01607 /* commands operating on string values */ 01608 'set' => '\Predis\Commands\Set', 01609 'setnx' => '\Predis\Commands\SetPreserve', 01610 'mset' => '\Predis\Commands\SetMultiple', 01611 'msetnx' => '\Predis\Commands\SetMultiplePreserve', 01612 'get' => '\Predis\Commands\Get', 01613 'mget' => '\Predis\Commands\GetMultiple', 01614 'getset' => '\Predis\Commands\GetSet', 01615 'incr' => '\Predis\Commands\Increment', 01616 'incrby' => '\Predis\Commands\IncrementBy', 01617 'decr' => '\Predis\Commands\Decrement', 01618 'decrby' => '\Predis\Commands\DecrementBy', 01619 'exists' => '\Predis\Commands\Exists', 01620 'del' => '\Predis\Commands\Delete', 01621 'type' => '\Predis\Commands\Type', 01622 01623 /* commands operating on the key space */ 01624 'keys' => '\Predis\Commands\Keys', 01625 'randomkey' => '\Predis\Commands\RandomKey', 01626 'rename' => '\Predis\Commands\Rename', 01627 'renamenx' => '\Predis\Commands\RenamePreserve', 01628 'expire' => '\Predis\Commands\Expire', 01629 'expireat' => '\Predis\Commands\ExpireAt', 01630 'dbsize' => '\Predis\Commands\DatabaseSize', 01631 'ttl' => '\Predis\Commands\TimeToLive', 01632 01633 /* commands operating on lists */ 01634 'rpush' => '\Predis\Commands\ListPushTail', 01635 'lpush' => '\Predis\Commands\ListPushHead', 01636 'llen' => '\Predis\Commands\ListLength', 01637 'lrange' => '\Predis\Commands\ListRange', 01638 'ltrim' => '\Predis\Commands\ListTrim', 01639 'lindex' => '\Predis\Commands\ListIndex', 01640 'lset' => '\Predis\Commands\ListSet', 01641 'lrem' => '\Predis\Commands\ListRemove', 01642 'lpop' => '\Predis\Commands\ListPopFirst', 01643 'rpop' => '\Predis\Commands\ListPopLast', 01644 'rpoplpush' => '\Predis\Commands\ListPopLastPushHead', 01645 01646 /* commands operating on sets */ 01647 'sadd' => '\Predis\Commands\SetAdd', 01648 'srem' => '\Predis\Commands\SetRemove', 01649 'spop' => '\Predis\Commands\SetPop', 01650 'smove' => '\Predis\Commands\SetMove', 01651 'scard' => '\Predis\Commands\SetCardinality', 01652 'sismember' => '\Predis\Commands\SetIsMember', 01653 'sinter' => '\Predis\Commands\SetIntersection', 01654 'sinterstore' => '\Predis\Commands\SetIntersectionStore', 01655 'sunion' => '\Predis\Commands\SetUnion', 01656 'sunionstore' => '\Predis\Commands\SetUnionStore', 01657 'sdiff' => '\Predis\Commands\SetDifference', 01658 'sdiffstore' => '\Predis\Commands\SetDifferenceStore', 01659 'smembers' => '\Predis\Commands\SetMembers', 01660 'srandmember' => '\Predis\Commands\SetRandomMember', 01661 01662 /* commands operating on sorted sets */ 01663 'zadd' => '\Predis\Commands\ZSetAdd', 01664 'zincrby' => '\Predis\Commands\ZSetIncrementBy', 01665 'zrem' => '\Predis\Commands\ZSetRemove', 01666 'zrange' => '\Predis\Commands\ZSetRange', 01667 'zrevrange' => '\Predis\Commands\ZSetReverseRange', 01668 'zrangebyscore' => '\Predis\Commands\ZSetRangeByScore', 01669 'zcard' => '\Predis\Commands\ZSetCardinality', 01670 'zscore' => '\Predis\Commands\ZSetScore', 01671 'zremrangebyscore' => '\Predis\Commands\ZSetRemoveRangeByScore', 01672 01673 /* multiple databases handling commands */ 01674 'select' => '\Predis\Commands\SelectDatabase', 01675 'move' => '\Predis\Commands\MoveKey', 01676 'flushdb' => '\Predis\Commands\FlushDatabase', 01677 'flushall' => '\Predis\Commands\FlushAll', 01678 01679 /* sorting */ 01680 'sort' => '\Predis\Commands\Sort', 01681 01682 /* remote server control commands */ 01683 'info' => '\Predis\Commands\Info', 01684 'slaveof' => '\Predis\Commands\SlaveOf', 01685 01686 /* persistence control commands */ 01687 'save' => '\Predis\Commands\Save', 01688 'bgsave' => '\Predis\Commands\BackgroundSave', 01689 'lastsave' => '\Predis\Commands\LastSave', 01690 'shutdown' => '\Predis\Commands\Shutdown', 01691 'bgrewriteaof' => '\Predis\Commands\BackgroundRewriteAppendOnlyFile', 01692 ); 01693 } 01694 } 01695 01696 class RedisServer_v2_0 extends RedisServer_v1_2 { 01697 public function getVersion() { return '2.0'; } 01698 public function getSupportedCommands() { 01699 return array_merge(parent::getSupportedCommands(), array( 01700 /* transactions */ 01701 'multi' => '\Predis\Commands\Multi', 01702 'exec' => '\Predis\Commands\Exec', 01703 'discard' => '\Predis\Commands\Discard', 01704 01705 /* commands operating on string values */ 01706 'setex' => '\Predis\Commands\SetExpire', 01707 'append' => '\Predis\Commands\Append', 01708 'substr' => '\Predis\Commands\Substr', 01709 01710 /* commands operating on lists */ 01711 'blpop' => '\Predis\Commands\ListPopFirstBlocking', 01712 'brpop' => '\Predis\Commands\ListPopLastBlocking', 01713 01714 /* commands operating on sorted sets */ 01715 'zunionstore' => '\Predis\Commands\ZSetUnionStore', 01716 'zinterstore' => '\Predis\Commands\ZSetIntersectionStore', 01717 'zcount' => '\Predis\Commands\ZSetCount', 01718 'zrank' => '\Predis\Commands\ZSetRank', 01719 'zrevrank' => '\Predis\Commands\ZSetReverseRank', 01720 'zremrangebyrank' => '\Predis\Commands\ZSetRemoveRangeByRank', 01721 01722 /* commands operating on hashes */ 01723 'hset' => '\Predis\Commands\HashSet', 01724 'hsetnx' => '\Predis\Commands\HashSetPreserve', 01725 'hmset' => '\Predis\Commands\HashSetMultiple', 01726 'hincrby' => '\Predis\Commands\HashIncrementBy', 01727 'hget' => '\Predis\Commands\HashGet', 01728 'hmget' => '\Predis\Commands\HashGetMultiple', 01729 'hdel' => '\Predis\Commands\HashDelete', 01730 'hexists' => '\Predis\Commands\HashExists', 01731 'hlen' => '\Predis\Commands\HashLength', 01732 'hkeys' => '\Predis\Commands\HashKeys', 01733 'hvals' => '\Predis\Commands\HashValues', 01734 'hgetall' => '\Predis\Commands\HashGetAll', 01735 01736 /* publish - subscribe */ 01737 'subscribe' => '\Predis\Commands\Subscribe', 01738 'unsubscribe' => '\Predis\Commands\Unsubscribe', 01739 'psubscribe' => '\Predis\Commands\SubscribeByPattern', 01740 'punsubscribe' => '\Predis\Commands\UnsubscribeByPattern', 01741 'publish' => '\Predis\Commands\Publish', 01742 01743 /* remote server control commands */ 01744 'config' => '\Predis\Commands\Config', 01745 )); 01746 } 01747 } 01748 01749 class RedisServer_vNext extends RedisServer_v2_0 { 01750 public function getVersion() { return '2.1'; } 01751 public function getSupportedCommands() { 01752 return array_merge(parent::getSupportedCommands(), array( 01753 /* transactions */ 01754 'watch' => '\Predis\Commands\Watch', 01755 'unwatch' => '\Predis\Commands\Unwatch', 01756 01757 /* commands operating on string values */ 01758 'strlen' => '\Predis\Commands\Strlen', 01759 01760 /* commands operating on lists */ 01761 'rpushx' => '\Predis\Commands\ListPushTailX', 01762 'lpushx' => '\Predis\Commands\ListPushHeadX', 01763 'linsert' => '\Predis\Commands\ListInsert', 01764 )); 01765 } 01766 } 01767 01768 /* ------------------------------------------------------------------------- */ 01769 01770 namespace Predis\Pipeline; 01771 use Predis\IConnection, Predis\ServerException, Predis\CommunicationException; 01772 01773 interface IPipelineExecutor { 01774 public function execute(IConnection $connection, &$commands); 01775 } 01776 01777 class StandardExecutor implements IPipelineExecutor { 01778 public function execute(IConnection $connection, &$commands) { 01779 $sizeofPipe = count($commands); 01780 $values = array(); 01781 01782 foreach ($commands as $command) { 01783 $connection->writeCommand($command); 01784 } 01785 try { 01786 for ($i = 0; $i < $sizeofPipe; $i++) { 01787 $response = $connection->readResponse($commands[$i]); 01788 $values[] = $response instanceof \Iterator 01789 ? iterator_to_array($response) 01790 : $response; 01791 unset($commands[$i]); 01792 } 01793 } 01794 catch (ServerException $exception) { 01795 // force disconnection to prevent protocol desynchronization 01796 $connection->disconnect(); 01797 throw $exception; 01798 } 01799 01800 return $values; 01801 } 01802 } 01803 01804 class SafeExecutor implements IPipelineExecutor { 01805 public function execute(IConnection $connection, &$commands) { 01806 $sizeofPipe = count($commands); 01807 $values = array(); 01808 01809 foreach ($commands as $command) { 01810 try { 01811 $connection->writeCommand($command); 01812 } 01813 catch (CommunicationException $exception) { 01814 return array_fill(0, $sizeofPipe, $exception); 01815 } 01816 } 01817 01818 for ($i = 0; $i < $sizeofPipe; $i++) { 01819 $command = $commands[$i]; 01820 unset($commands[$i]); 01821 try { 01822 $response = $connection->readResponse($command); 01823 $values[] = ($response instanceof \Iterator 01824 ? iterator_to_array($response) 01825 : $response 01826 ); 01827 } 01828 catch (ServerException $exception) { 01829 $values[] = $exception->toResponseError(); 01830 } 01831 catch (CommunicationException $exception) { 01832 $toAdd = count($commands) - count($values); 01833 $values = array_merge($values, array_fill(0, $toAdd, $exception)); 01834 break; 01835 } 01836 } 01837 01838 return $values; 01839 } 01840 } 01841 01842 class SafeClusterExecutor implements IPipelineExecutor { 01843 public function execute(IConnection $connection, &$commands) { 01844 $connectionExceptions = array(); 01845 $sizeofPipe = count($commands); 01846 $values = array(); 01847 01848 foreach ($commands as $command) { 01849 $cmdConnection = $connection->getConnection($command); 01850 if (isset($connectionExceptions[spl_object_hash($cmdConnection)])) { 01851 continue; 01852 } 01853 try { 01854 $cmdConnection->writeCommand($command); 01855 } 01856 catch (CommunicationException $exception) { 01857 $connectionExceptions[spl_object_hash($cmdConnection)] = $exception; 01858 } 01859 } 01860 01861 for ($i = 0; $i < $sizeofPipe; $i++) { 01862 $command = $commands[$i]; 01863 unset($commands[$i]); 01864 01865 $cmdConnection = $connection->getConnection($command); 01866 $connectionObjectHash = spl_object_hash($cmdConnection); 01867 01868 if (isset($connectionExceptions[$connectionObjectHash])) { 01869 $values[] = $connectionExceptions[$connectionObjectHash]; 01870 continue; 01871 } 01872 01873 try { 01874 $response = $cmdConnection->readResponse($command); 01875 $values[] = ($response instanceof \Iterator 01876 ? iterator_to_array($response) 01877 : $response 01878 ); 01879 } 01880 catch (ServerException $exception) { 01881 $values[] = $exception->toResponseError(); 01882 } 01883 catch (CommunicationException $exception) { 01884 $values[] = $exception; 01885 $connectionExceptions[$connectionObjectHash] = $exception; 01886 } 01887 } 01888 01889 return $values; 01890 } 01891 } 01892 01893 /* ------------------------------------------------------------------------- */ 01894 01895 namespace Predis\Distribution; 01896 01897 interface IDistributionStrategy { 01898 public function add($node, $weight = null); 01899 public function remove($node); 01900 public function get($key); 01901 public function generateKey($value); 01902 } 01903 01904 class EmptyRingException extends \Exception { } 01905 01906 class HashRing implements IDistributionStrategy { 01907 const DEFAULT_REPLICAS = 128; 01908 const DEFAULT_WEIGHT = 100; 01909 private $_nodes, $_ring, $_ringKeys, $_ringKeysCount, $_replicas; 01910 01911 public function __construct($replicas = self::DEFAULT_REPLICAS) { 01912 $this->_replicas = $replicas; 01913 $this->_nodes = array(); 01914 } 01915 01916 public function add($node, $weight = null) { 01917 // NOTE: in case of collisions in the hashes of the nodes, the node added 01918 // last wins, thus the order in which nodes are added is significant. 01919 $this->_nodes[] = array('object' => $node, 'weight' => (int) $weight ?: $this::DEFAULT_WEIGHT); 01920 $this->reset(); 01921 } 01922 01923 public function remove($node) { 01924 // NOTE: a node is removed by resetting the ring so that it's recreated from 01925 // scratch, in order to reassign possible hashes with collisions to the 01926 // right node according to the order in which they were added in the 01927 // first place. 01928 for ($i = 0; $i < count($this->_nodes); ++$i) { 01929 if ($this->_nodes[$i]['object'] === $node) { 01930 array_splice($this->_nodes, $i, 1); 01931 $this->reset(); 01932 break; 01933 } 01934 } 01935 } 01936 01937 private function reset() { 01938 unset($this->_ring); 01939 unset($this->_ringKeys); 01940 unset($this->_ringKeysCount); 01941 } 01942 01943 private function isInitialized() { 01944 return isset($this->_ringKeys); 01945 } 01946 01947 private function computeTotalWeight() { 01948 // TODO: array_reduce + lambda for PHP 5.3 01949 $totalWeight = 0; 01950 foreach ($this->_nodes as $node) { 01951 $totalWeight += $node['weight']; 01952 } 01953 return $totalWeight; 01954 } 01955 01956 private function initialize() { 01957 if ($this->isInitialized()) { 01958 return; 01959 } 01960 if (count($this->_nodes) === 0) { 01961 throw new EmptyRingException('Cannot initialize empty hashring'); 01962 } 01963 01964 $this->_ring = array(); 01965 $totalWeight = $this->computeTotalWeight(); 01966 $nodesCount = count($this->_nodes); 01967 foreach ($this->_nodes as $node) { 01968 $weightRatio = $node['weight'] / $totalWeight; 01969 $this->addNodeToRing($this->_ring, $node, $nodesCount, $this->_replicas, $weightRatio); 01970 } 01971 ksort($this->_ring, SORT_NUMERIC); 01972 $this->_ringKeys = array_keys($this->_ring); 01973 $this->_ringKeysCount = count($this->_ringKeys); 01974 } 01975 01976 protected function addNodeToRing(&$ring, $node, $totalNodes, $replicas, $weightRatio) { 01977 $nodeObject = $node['object']; 01978 $nodeHash = (string) $nodeObject; 01979 $replicas = (int) round($weightRatio * $totalNodes * $replicas); 01980 for ($i = 0; $i < $replicas; $i++) { 01981 $key = crc32("$nodeHash:$i"); 01982 $ring[$key] = $nodeObject; 01983 } 01984 } 01985 01986 public function generateKey($value) { 01987 return crc32($value); 01988 } 01989 01990 public function get($key) { 01991 return $this->_ring[$this->getNodeKey($key)]; 01992 } 01993 01994 private function getNodeKey($key) { 01995 $this->initialize(); 01996 $ringKeys = $this->_ringKeys; 01997 $upper = $this->_ringKeysCount - 1; 01998 $lower = 0; 01999 02000 while ($lower <= $upper) { 02001 $index = ($lower + $upper) >> 1; 02002 $item = $ringKeys[$index]; 02003 if ($item > $key) { 02004 $upper = $index - 1; 02005 } 02006 else if ($item < $key) { 02007 $lower = $index + 1; 02008 } 02009 else { 02010 return $item; 02011 } 02012 } 02013 return $ringKeys[$this->wrapAroundStrategy($upper, $lower, $this->_ringKeysCount)]; 02014 } 02015 02016 protected function wrapAroundStrategy($upper, $lower, $ringKeysCount) { 02017 // NOTE: binary search for the last item in _ringkeys with a value 02018 // less or equal to the key. If no such item exists, return the 02019 // last item. 02020 return $upper >= 0 ? $upper : $ringKeysCount - 1; 02021 } 02022 } 02023 02024 class KetamaPureRing extends HashRing { 02025 const DEFAULT_REPLICAS = 160; 02026 02027 public function __construct() { 02028 parent::__construct($this::DEFAULT_REPLICAS); 02029 } 02030 02031 protected function addNodeToRing(&$ring, $node, $totalNodes, $replicas, $weightRatio) { 02032 $nodeObject = $node['object']; 02033 $nodeHash = (string) $nodeObject; 02034 $replicas = (int) floor($weightRatio * $totalNodes * ($replicas / 4)); 02035 for ($i = 0; $i < $replicas; $i++) { 02036 $unpackedDigest = unpack('V4', md5("$nodeHash-$i", true)); 02037 foreach ($unpackedDigest as $key) { 02038 $ring[$key] = $nodeObject; 02039 } 02040 } 02041 } 02042 02043 public function generateKey($value) { 02044 $hash = unpack('V', md5($value, true)); 02045 return $hash[1]; 02046 } 02047 02048 protected function wrapAroundStrategy($upper, $lower, $ringKeysCount) { 02049 // NOTE: binary search for the first item in _ringkeys with a value 02050 // greater or equal to the key. If no such item exists, return the 02051 // first item. 02052 return $lower < $ringKeysCount ? $lower : 0; 02053 } 02054 } 02055 02056 /* ------------------------------------------------------------------------- */ 02057 02058 namespace Predis\Shared; 02059 use Predis\IConnection, Predis\IConnectionSingle, Predis\IConnectionCluster, 02060 Predis\CommunicationException; 02061 02062 class Utils { 02063 public static function isCluster(IConnection $connection) { 02064 return $connection instanceof IConnectionCluster; 02065 } 02066 02067 public static function onCommunicationException(CommunicationException $exception) { 02068 if ($exception->shouldResetConnection()) { 02069 $connection = $exception->getConnection(); 02070 if ($connection->isConnected()) { 02071 $connection->disconnect(); 02072 } 02073 } 02074 throw $exception; 02075 } 02076 02077 public static function filterArrayArguments(Array $arguments) { 02078 if (count($arguments) === 1 && is_array($arguments[0])) { 02079 return $arguments[0]; 02080 } 02081 return $arguments; 02082 } 02083 } 02084 02085 abstract class MultiBulkResponseIteratorBase implements \Iterator, \Countable { 02086 protected $_position, $_current, $_replySize; 02087 02088 public function rewind() { 02089 // NOOP 02090 } 02091 02092 public function current() { 02093 return $this->_current; 02094 } 02095 02096 public function key() { 02097 return $this->_position; 02098 } 02099 02100 public function next() { 02101 if (++$this->_position < $this->_replySize) { 02102 $this->_current = $this->getValue(); 02103 } 02104 return $this->_position; 02105 } 02106 02107 public function valid() { 02108 return $this->_position < $this->_replySize; 02109 } 02110 02111 public function count() { 02112 // NOTE: use count if you want to get the size of the current multi-bulk 02113 // response without using iterator_count (which actually consumes 02114 // our iterator to calculate the size, and we cannot perform a rewind) 02115 return $this->_replySize; 02116 } 02117 02118 protected abstract function getValue(); 02119 } 02120 02121 class MultiBulkResponseIterator extends MultiBulkResponseIteratorBase { 02122 private $_connection; 02123 02124 public function __construct(IConnectionSingle $connection, $size) { 02125 $this->_connection = $connection; 02126 $this->_reader = $connection->getResponseReader(); 02127 $this->_position = 0; 02128 $this->_current = $size > 0 ? $this->getValue() : null; 02129 $this->_replySize = $size; 02130 } 02131 02132 public function __destruct() { 02133 // when the iterator is garbage-collected (e.g. it goes out of the 02134 // scope of a foreach) but it has not reached its end, we must sync 02135 // the client with the queued elements that have not been read from 02136 // the connection with the server. 02137 $this->sync(); 02138 } 02139 02140 public function sync($drop = false) { 02141 if ($drop == true) { 02142 if ($this->valid()) { 02143 $this->_position = $this->_replySize; 02144 $this->_connection->disconnect(); 02145 } 02146 } 02147 else { 02148 while ($this->valid()) { 02149 $this->next(); 02150 } 02151 } 02152 } 02153 02154 protected function getValue() { 02155 return $this->_reader->read($this->_connection); 02156 } 02157 } 02158 02159 class MultiBulkResponseKVIterator extends MultiBulkResponseIteratorBase { 02160 private $_iterator; 02161 02162 public function __construct(MultiBulkResponseIterator $iterator) { 02163 $virtualSize = count($iterator) / 2; 02164 02165 $this->_iterator = $iterator; 02166 $this->_position = 0; 02167 $this->_current = $virtualSize > 0 ? $this->getValue() : null; 02168 $this->_replySize = $virtualSize; 02169 } 02170 02171 public function __destruct() { 02172 $this->_iterator->sync(); 02173 } 02174 02175 protected function getValue() { 02176 $k = $this->_iterator->current(); 02177 $this->_iterator->next(); 02178 $v = $this->_iterator->current(); 02179 $this->_iterator->next(); 02180 return array($k, $v); 02181 } 02182 } 02183 02184 /* ------------------------------------------------------------------------- */ 02185 02186 namespace Predis\Commands; 02187 use Predis\Command, Predis\Shared\Utils, Predis\Shared\MultiBulkResponseKVIterator; 02188 02189 /* miscellaneous commands */ 02190 class Ping extends Command { 02191 public function canBeHashed() { return false; } 02192 public function getCommandId() { return 'PING'; } 02193 public function parseResponse($data) { 02194 return $data === 'PONG' ? true : false; 02195 } 02196 } 02197 02198 class DoEcho extends Command { 02199 public function canBeHashed() { return false; } 02200 public function getCommandId() { return 'ECHO'; } 02201 } 02202 02203 class Auth extends Command { 02204 public function canBeHashed() { return false; } 02205 public function getCommandId() { return 'AUTH'; } 02206 } 02207 02208 /* connection handling */ 02209 class Quit extends Command { 02210 public function canBeHashed() { return false; } 02211 public function getCommandId() { return 'QUIT'; } 02212 public function closesConnection() { return true; } 02213 } 02214 02215 /* commands operating on string values */ 02216 class Set extends Command { 02217 public function getCommandId() { return 'SET'; } 02218 } 02219 02220 class SetExpire extends Command { 02221 public function getCommandId() { return 'SETEX'; } 02222 } 02223 02224 class SetPreserve extends Command { 02225 public function getCommandId() { return 'SETNX'; } 02226 public function parseResponse($data) { return (bool) $data; } 02227 } 02228 02229 class SetMultiple extends Command { 02230 public function canBeHashed() { return false; } 02231 public function getCommandId() { return 'MSET'; } 02232 public function filterArguments(Array $arguments) { 02233 if (count($arguments) === 1 && is_array($arguments[0])) { 02234 $flattenedKVs = array(); 02235 $args = &$arguments[0]; 02236 foreach ($args as $k => $v) { 02237 $flattenedKVs[] = $k; 02238 $flattenedKVs[] = $v; 02239 } 02240 return $flattenedKVs; 02241 } 02242 return $arguments; 02243 } 02244 } 02245 02246 class SetMultiplePreserve extends SetMultiple { 02247 public function canBeHashed() { return false; } 02248 public function getCommandId() { return 'MSETNX'; } 02249 public function parseResponse($data) { return (bool) $data; } 02250 } 02251 02252 class Get extends Command { 02253 public function getCommandId() { return 'GET'; } 02254 } 02255 02256 class GetMultiple extends Command { 02257 public function canBeHashed() { return false; } 02258 public function getCommandId() { return 'MGET'; } 02259 public function filterArguments(Array $arguments) { 02260 return Utils::filterArrayArguments($arguments); 02261 } 02262 } 02263 02264 class GetSet extends Command { 02265 public function getCommandId() { return 'GETSET'; } 02266 } 02267 02268 class Increment extends Command { 02269 public function getCommandId() { return 'INCR'; } 02270 } 02271 02272 class IncrementBy extends Command { 02273 public function getCommandId() { return 'INCRBY'; } 02274 } 02275 02276 class Decrement extends Command { 02277 public function getCommandId() { return 'DECR'; } 02278 } 02279 02280 class DecrementBy extends Command { 02281 public function getCommandId() { return 'DECRBY'; } 02282 } 02283 02284 class Exists extends Command { 02285 public function getCommandId() { return 'EXISTS'; } 02286 public function parseResponse($data) { return (bool) $data; } 02287 } 02288 02289 class Delete extends Command { 02290 public function getCommandId() { return 'DEL'; } 02291 public function filterArguments(Array $arguments) { 02292 return Utils::filterArrayArguments($arguments); 02293 } 02294 } 02295 02296 class Type extends Command { 02297 public function getCommandId() { return 'TYPE'; } 02298 } 02299 02300 class Append extends Command { 02301 public function getCommandId() { return 'APPEND'; } 02302 } 02303 02304 class Substr extends Command { 02305 public function getCommandId() { return 'SUBSTR'; } 02306 } 02307 02308 class Strlen extends Command { 02309 public function getCommandId() { return 'STRLEN'; } 02310 } 02311 02312 /* commands operating on the key space */ 02313 class Keys extends Command { 02314 public function canBeHashed() { return false; } 02315 public function getCommandId() { return 'KEYS'; } 02316 public function parseResponse($data) { 02317 // TODO: is this behaviour correct? 02318 if (is_array($data) || $data instanceof \Iterator) { 02319 return $data; 02320 } 02321 return strlen($data) > 0 ? explode(' ', $data) : array(); 02322 } 02323 } 02324 02325 class RandomKey extends Command { 02326 public function canBeHashed() { return false; } 02327 public function getCommandId() { return 'RANDOMKEY'; } 02328 public function parseResponse($data) { return $data !== '' ? $data : null; } 02329 } 02330 02331 class Rename extends Command { 02332 public function canBeHashed() { return false; } 02333 public function getCommandId() { return 'RENAME'; } 02334 } 02335 02336 class RenamePreserve extends Command { 02337 public function canBeHashed() { return false; } 02338 public function getCommandId() { return 'RENAMENX'; } 02339 public function parseResponse($data) { return (bool) $data; } 02340 } 02341 02342 class Expire extends Command { 02343 public function getCommandId() { return 'EXPIRE'; } 02344 public function parseResponse($data) { return (bool) $data; } 02345 } 02346 02347 class ExpireAt extends Command { 02348 public function getCommandId() { return 'EXPIREAT'; } 02349 public function parseResponse($data) { return (bool) $data; } 02350 } 02351 02352 class DatabaseSize extends Command { 02353 public function canBeHashed() { return false; } 02354 public function getCommandId() { return 'DBSIZE'; } 02355 } 02356 02357 class TimeToLive extends Command { 02358 public function getCommandId() { return 'TTL'; } 02359 } 02360 02361 /* commands operating on lists */ 02362 class ListPushTail extends Command { 02363 public function getCommandId() { return 'RPUSH'; } 02364 } 02365 02366 class ListPushTailX extends Command { 02367 public function getCommandId() { return 'RPUSHX'; } 02368 } 02369 02370 class ListPushHead extends Command { 02371 public function getCommandId() { return 'LPUSH'; } 02372 } 02373 02374 class ListPushHeadX extends Command { 02375 public function getCommandId() { return 'LPUSHX'; } 02376 } 02377 02378 class ListLength extends Command { 02379 public function getCommandId() { return 'LLEN'; } 02380 } 02381 02382 class ListRange extends Command { 02383 public function getCommandId() { return 'LRANGE'; } 02384 } 02385 02386 class ListTrim extends Command { 02387 public function getCommandId() { return 'LTRIM'; } 02388 } 02389 02390 class ListIndex extends Command { 02391 public function getCommandId() { return 'LINDEX'; } 02392 } 02393 02394 class ListSet extends Command { 02395 public function getCommandId() { return 'LSET'; } 02396 } 02397 02398 class ListRemove extends Command { 02399 public function getCommandId() { return 'LREM'; } 02400 } 02401 02402 class ListPopLastPushHead extends Command { 02403 public function getCommandId() { return 'RPOPLPUSH'; } 02404 } 02405 02406 class ListPopLastPushHeadBulk extends Command { 02407 public function getCommandId() { return 'RPOPLPUSH'; } 02408 } 02409 02410 class ListPopFirst extends Command { 02411 public function getCommandId() { return 'LPOP'; } 02412 } 02413 02414 class ListPopLast extends Command { 02415 public function getCommandId() { return 'RPOP'; } 02416 } 02417 02418 class ListPopFirstBlocking extends Command { 02419 public function getCommandId() { return 'BLPOP'; } 02420 } 02421 02422 class ListPopLastBlocking extends Command { 02423 public function getCommandId() { return 'BRPOP'; } 02424 } 02425 02426 class ListInsert extends Command { 02427 public function getCommandId() { return 'LINSERT'; } 02428 } 02429 02430 /* commands operating on sets */ 02431 class SetAdd extends Command { 02432 public function getCommandId() { return 'SADD'; } 02433 public function parseResponse($data) { return (bool) $data; } 02434 } 02435 02436 class SetRemove extends Command { 02437 public function getCommandId() { return 'SREM'; } 02438 public function parseResponse($data) { return (bool) $data; } 02439 } 02440 02441 class SetPop extends Command { 02442 public function getCommandId() { return 'SPOP'; } 02443 } 02444 02445 class SetMove extends Command { 02446 public function canBeHashed() { return false; } 02447 public function getCommandId() { return 'SMOVE'; } 02448 public function parseResponse($data) { return (bool) $data; } 02449 } 02450 02451 class SetCardinality extends Command { 02452 public function getCommandId() { return 'SCARD'; } 02453 } 02454 02455 class SetIsMember extends Command { 02456 public function getCommandId() { return 'SISMEMBER'; } 02457 public function parseResponse($data) { return (bool) $data; } 02458 } 02459 02460 class SetIntersection extends Command { 02461 public function getCommandId() { return 'SINTER'; } 02462 public function filterArguments(Array $arguments) { 02463 return Utils::filterArrayArguments($arguments); 02464 } 02465 } 02466 02467 class SetIntersectionStore extends Command { 02468 public function getCommandId() { return 'SINTERSTORE'; } 02469 public function filterArguments(Array $arguments) { 02470 return Utils::filterArrayArguments($arguments); 02471 } 02472 } 02473 02474 class SetUnion extends SetIntersection { 02475 public function getCommandId() { return 'SUNION'; } 02476 } 02477 02478 class SetUnionStore extends SetIntersectionStore { 02479 public function getCommandId() { return 'SUNIONSTORE'; } 02480 } 02481 02482 class SetDifference extends SetIntersection { 02483 public function getCommandId() { return 'SDIFF'; } 02484 } 02485 02486 class SetDifferenceStore extends SetIntersectionStore { 02487 public function getCommandId() { return 'SDIFFSTORE'; } 02488 } 02489 02490 class SetMembers extends Command { 02491 public function getCommandId() { return 'SMEMBERS'; } 02492 } 02493 02494 class SetRandomMember extends Command { 02495 public function getCommandId() { return 'SRANDMEMBER'; } 02496 } 02497 02498 /* commands operating on sorted sets */ 02499 class ZSetAdd extends Command { 02500 public function getCommandId() { return 'ZADD'; } 02501 public function parseResponse($data) { return (bool) $data; } 02502 } 02503 02504 class ZSetIncrementBy extends Command { 02505 public function getCommandId() { return 'ZINCRBY'; } 02506 } 02507 02508 class ZSetRemove extends Command { 02509 public function getCommandId() { return 'ZREM'; } 02510 public function parseResponse($data) { return (bool) $data; } 02511 } 02512 02513 class ZSetUnionStore extends Command { 02514 public function getCommandId() { return 'ZUNIONSTORE'; } 02515 public function filterArguments(Array $arguments) { 02516 $options = array(); 02517 $argc = count($arguments); 02518 if ($argc > 1 && is_array($arguments[$argc - 1])) { 02519 $options = $this->prepareOptions(array_pop($arguments)); 02520 } 02521 $args = is_array($arguments[0]) ? $arguments[0] : $arguments; 02522 return array_merge($args, $options); 02523 } 02524 private function prepareOptions($options) { 02525 $opts = array_change_key_case($options, CASE_UPPER); 02526 $finalizedOpts = array(); 02527 if (isset($opts['WEIGHTS']) && is_array($opts['WEIGHTS'])) { 02528 $finalizedOpts[] = 'WEIGHTS'; 02529 foreach ($opts['WEIGHTS'] as $weight) { 02530 $finalizedOpts[] = $weight; 02531 } 02532 } 02533 if (isset($opts['AGGREGATE'])) { 02534 $finalizedOpts[] = 'AGGREGATE'; 02535 $finalizedOpts[] = $opts['AGGREGATE']; 02536 } 02537 return $finalizedOpts; 02538 } 02539 } 02540 02541 class ZSetIntersectionStore extends ZSetUnionStore { 02542 public function getCommandId() { return 'ZINTERSTORE'; } 02543 } 02544 02545 class ZSetRange extends Command { 02546 private $_withScores = false; 02547 public function getCommandId() { return 'ZRANGE'; } 02548 public function filterArguments(Array $arguments) { 02549 if (count($arguments) === 4) { 02550 $lastType = gettype($arguments[3]); 02551 if ($lastType === 'string' && strtolower($arguments[3]) === 'withscores') { 02552 // used for compatibility with older versions 02553 $arguments[3] = array('WITHSCORES' => true); 02554 $lastType = 'array'; 02555 } 02556 if ($lastType === 'array') { 02557 $options = $this->prepareOptions(array_pop($arguments)); 02558 return array_merge($arguments, $options); 02559 } 02560 } 02561 return $arguments; 02562 } 02563 protected function prepareOptions($options) { 02564 $opts = array_change_key_case($options, CASE_UPPER); 02565 $finalizedOpts = array(); 02566 if (isset($opts['WITHSCORES'])) { 02567 $finalizedOpts[] = 'WITHSCORES'; 02568 $this->_withScores = true; 02569 } 02570 return $finalizedOpts; 02571 } 02572 public function parseResponse($data) { 02573 if ($this->_withScores) { 02574 if ($data instanceof \Iterator) { 02575 return new MultiBulkResponseKVIterator($data); 02576 } 02577 $result = array(); 02578 for ($i = 0; $i < count($data); $i++) { 02579 $result[] = array($data[$i], $data[++$i]); 02580 } 02581 return $result; 02582 } 02583 return $data; 02584 } 02585 } 02586 02587 class ZSetReverseRange extends ZSetRange { 02588 public function getCommandId() { return 'ZREVRANGE'; } 02589 } 02590 02591 class ZSetRangeByScore extends ZSetRange { 02592 public function getCommandId() { return 'ZRANGEBYSCORE'; } 02593 protected function prepareOptions($options) { 02594 $opts = array_change_key_case($options, CASE_UPPER); 02595 $finalizedOpts = array(); 02596 if (isset($opts['LIMIT']) && is_array($opts['LIMIT'])) { 02597 $limit = array_change_key_case($opts['LIMIT'], CASE_UPPER); 02598 $finalizedOpts[] = 'LIMIT'; 02599 $finalizedOpts[] = isset($limit['OFFSET']) ? $limit['OFFSET'] : $limit[0]; 02600 $finalizedOpts[] = isset($limit['COUNT']) ? $limit['COUNT'] : $limit[1]; 02601 } 02602 return array_merge($finalizedOpts, parent::prepareOptions($options)); 02603 } 02604 } 02605 02606 class ZSetCount extends Command { 02607 public function getCommandId() { return 'ZCOUNT'; } 02608 } 02609 02610 class ZSetCardinality extends Command { 02611 public function getCommandId() { return 'ZCARD'; } 02612 } 02613 02614 class ZSetScore extends Command { 02615 public function getCommandId() { return 'ZSCORE'; } 02616 } 02617 02618 class ZSetRemoveRangeByScore extends Command { 02619 public function getCommandId() { return 'ZREMRANGEBYSCORE'; } 02620 } 02621 02622 class ZSetRank extends Command { 02623 public function getCommandId() { return 'ZRANK'; } 02624 } 02625 02626 class ZSetReverseRank extends Command { 02627 public function getCommandId() { return 'ZREVRANK'; } 02628 } 02629 02630 class ZSetRemoveRangeByRank extends Command { 02631 public function getCommandId() { return 'ZREMRANGEBYRANK'; } 02632 } 02633 02634 /* commands operating on hashes */ 02635 class HashSet extends Command { 02636 public function getCommandId() { return 'HSET'; } 02637 public function parseResponse($data) { return (bool) $data; } 02638 } 02639 02640 class HashSetPreserve extends Command { 02641 public function getCommandId() { return 'HSETNX'; } 02642 public function parseResponse($data) { return (bool) $data; } 02643 } 02644 02645 class HashSetMultiple extends Command { 02646 public function getCommandId() { return 'HMSET'; } 02647 public function filterArguments(Array $arguments) { 02648 if (count($arguments) === 2 && is_array($arguments[1])) { 02649 $flattenedKVs = array($arguments[0]); 02650 $args = &$arguments[1]; 02651 foreach ($args as $k => $v) { 02652 $flattenedKVs[] = $k; 02653 $flattenedKVs[] = $v; 02654 } 02655 return $flattenedKVs; 02656 } 02657 return $arguments; 02658 } 02659 } 02660 02661 class HashIncrementBy extends Command { 02662 public function getCommandId() { return 'HINCRBY'; } 02663 } 02664 02665 class HashGet extends Command { 02666 public function getCommandId() { return 'HGET'; } 02667 } 02668 02669 class HashGetMultiple extends Command { 02670 public function getCommandId() { return 'HMGET'; } 02671 public function filterArguments(Array $arguments) { 02672 if (count($arguments) === 2 && is_array($arguments[1])) { 02673 $flattenedKVs = array($arguments[0]); 02674 $args = &$arguments[1]; 02675 foreach ($args as $v) { 02676 $flattenedKVs[] = $v; 02677 } 02678 return $flattenedKVs; 02679 } 02680 return $arguments; 02681 } 02682 } 02683 02684 class HashDelete extends Command { 02685 public function getCommandId() { return 'HDEL'; } 02686 public function parseResponse($data) { return (bool) $data; } 02687 } 02688 02689 class HashExists extends Command { 02690 public function getCommandId() { return 'HEXISTS'; } 02691 public function parseResponse($data) { return (bool) $data; } 02692 } 02693 02694 class HashLength extends Command { 02695 public function getCommandId() { return 'HLEN'; } 02696 } 02697 02698 class HashKeys extends Command { 02699 public function getCommandId() { return 'HKEYS'; } 02700 } 02701 02702 class HashValues extends Command { 02703 public function getCommandId() { return 'HVALS'; } 02704 } 02705 02706 class HashGetAll extends Command { 02707 public function getCommandId() { return 'HGETALL'; } 02708 public function parseResponse($data) { 02709 if ($data instanceof \Iterator) { 02710 return new MultiBulkResponseKVIterator($data); 02711 } 02712 $result = array(); 02713 for ($i = 0; $i < count($data); $i++) { 02714 $result[$data[$i]] = $data[++$i]; 02715 } 02716 return $result; 02717 } 02718 } 02719 02720 /* multiple databases handling commands */ 02721 class SelectDatabase extends Command { 02722 public function canBeHashed() { return false; } 02723 public function getCommandId() { return 'SELECT'; } 02724 } 02725 02726 class MoveKey extends Command { 02727 public function canBeHashed() { return false; } 02728 public function getCommandId() { return 'MOVE'; } 02729 public function parseResponse($data) { return (bool) $data; } 02730 } 02731 02732 class FlushDatabase extends Command { 02733 public function canBeHashed() { return false; } 02734 public function getCommandId() { return 'FLUSHDB'; } 02735 } 02736 02737 class FlushAll extends Command { 02738 public function canBeHashed() { return false; } 02739 public function getCommandId() { return 'FLUSHALL'; } 02740 } 02741 02742 /* sorting */ 02743 class Sort extends Command { 02744 public function getCommandId() { return 'SORT'; } 02745 public function filterArguments(Array $arguments) { 02746 if (count($arguments) === 1) { 02747 return $arguments; 02748 } 02749 02750 $query = array($arguments[0]); 02751 $sortParams = array_change_key_case($arguments[1], CASE_UPPER); 02752 02753 if (isset($sortParams['BY'])) { 02754 $query[] = 'BY'; 02755 $query[] = $sortParams['BY']; 02756 } 02757 if (isset($sortParams['GET'])) { 02758 $getargs = $sortParams['GET']; 02759 if (is_array($getargs)) { 02760 foreach ($getargs as $getarg) { 02761 $query[] = 'GET'; 02762 $query[] = $getarg; 02763 } 02764 } 02765 else { 02766 $query[] = 'GET'; 02767 $query[] = $getargs; 02768 } 02769 } 02770 if (isset($sortParams['LIMIT']) && is_array($sortParams['LIMIT']) 02771 && count($sortParams['LIMIT']) == 2) { 02772 02773 $query[] = 'LIMIT'; 02774 $query[] = $sortParams['LIMIT'][0]; 02775 $query[] = $sortParams['LIMIT'][1]; 02776 } 02777 if (isset($sortParams['SORT'])) { 02778 $query[] = strtoupper($sortParams['SORT']); 02779 } 02780 if (isset($sortParams['ALPHA']) && $sortParams['ALPHA'] == true) { 02781 $query[] = 'ALPHA'; 02782 } 02783 if (isset($sortParams['STORE'])) { 02784 $query[] = 'STORE'; 02785 $query[] = $sortParams['STORE']; 02786 } 02787 02788 return $query; 02789 } 02790 } 02791 02792 /* transactions */ 02793 class Multi extends Command { 02794 public function canBeHashed() { return false; } 02795 public function getCommandId() { return 'MULTI'; } 02796 } 02797 02798 class Exec extends Command { 02799 public function canBeHashed() { return false; } 02800 public function getCommandId() { return 'EXEC'; } 02801 } 02802 02803 class Discard extends Command { 02804 public function canBeHashed() { return false; } 02805 public function getCommandId() { return 'DISCARD'; } 02806 } 02807 02808 class Watch extends Command { 02809 public function canBeHashed() { return false; } 02810 public function getCommandId() { return 'WATCH'; } 02811 public function filterArguments(Array $arguments) { 02812 if (isset($arguments[0]) && is_array($arguments[0])) { 02813 return $arguments[0]; 02814 } 02815 return $arguments; 02816 } 02817 public function parseResponse($data) { return (bool) $data; } 02818 } 02819 02820 class Unwatch extends Command { 02821 public function canBeHashed() { return false; } 02822 public function getCommandId() { return 'UNWATCH'; } 02823 public function parseResponse($data) { return (bool) $data; } 02824 } 02825 02826 /* publish/subscribe */ 02827 class Subscribe extends Command { 02828 public function canBeHashed() { return false; } 02829 public function getCommandId() { return 'SUBSCRIBE'; } 02830 } 02831 02832 class Unsubscribe extends Command { 02833 public function canBeHashed() { return false; } 02834 public function getCommandId() { return 'UNSUBSCRIBE'; } 02835 } 02836 02837 class SubscribeByPattern extends Command { 02838 public function canBeHashed() { return false; } 02839 public function getCommandId() { return 'PSUBSCRIBE'; } 02840 } 02841 02842 class UnsubscribeByPattern extends Command { 02843 public function canBeHashed() { return false; } 02844 public function getCommandId() { return 'PUNSUBSCRIBE'; } 02845 } 02846 02847 class Publish extends Command { 02848 public function canBeHashed() { return false; } 02849 public function getCommandId() { return 'PUBLISH'; } 02850 } 02851 02852 /* persistence control commands */ 02853 class Save extends Command { 02854 public function canBeHashed() { return false; } 02855 public function getCommandId() { return 'SAVE'; } 02856 } 02857 02858 class BackgroundSave extends Command { 02859 public function canBeHashed() { return false; } 02860 public function getCommandId() { return 'BGSAVE'; } 02861 public function parseResponse($data) { 02862 if ($data == 'Background saving started') { 02863 return true; 02864 } 02865 return $data; 02866 } 02867 } 02868 02869 class BackgroundRewriteAppendOnlyFile extends Command { 02870 public function canBeHashed() { return false; } 02871 public function getCommandId() { return 'BGREWRITEAOF'; } 02872 public function parseResponse($data) { 02873 return $data == 'Background append only file rewriting started'; 02874 } 02875 } 02876 02877 class LastSave extends Command { 02878 public function canBeHashed() { return false; } 02879 public function getCommandId() { return 'LASTSAVE'; } 02880 } 02881 02882 class Shutdown extends Command { 02883 public function canBeHashed() { return false; } 02884 public function getCommandId() { return 'SHUTDOWN'; } 02885 public function closesConnection() { return true; } 02886 } 02887 02888 /* remote server control commands */ 02889 class Info extends Command { 02890 public function canBeHashed() { return false; } 02891 public function getCommandId() { return 'INFO'; } 02892 public function parseResponse($data) { 02893 $info = array(); 02894 $infoLines = explode("\r\n", $data, -1); 02895 foreach ($infoLines as $row) { 02896 list($k, $v) = explode(':', $row); 02897 if (!preg_match('/^db\d+$/', $k)) { 02898 $info[$k] = $v; 02899 } 02900 else { 02901 $db = array(); 02902 foreach (explode(',', $v) as $dbvar) { 02903 list($dbvk, $dbvv) = explode('=', $dbvar); 02904 $db[trim($dbvk)] = $dbvv; 02905 } 02906 $info[$k] = $db; 02907 } 02908 } 02909 return $info; 02910 } 02911 } 02912 02913 class SlaveOf extends Command { 02914 public function canBeHashed() { return false; } 02915 public function getCommandId() { return 'SLAVEOF'; } 02916 public function filterArguments(Array $arguments) { 02917 if (count($arguments) === 0 || $arguments[0] === 'NO ONE') { 02918 return array('NO', 'ONE'); 02919 } 02920 return $arguments; 02921 } 02922 } 02923 02924 class Config extends Command { 02925 public function canBeHashed() { return false; } 02926 public function getCommandId() { return 'CONFIG'; } 02927 } 02928 ?>