class CurlMulti

Send { This implementation allows callers to send blocking requests that return back to the caller when their requests complete, regardless of whether or not previously sending requests in the curl_multi object have completed. The implementation relies on managing the recursion scope in which a caller adds a request to the CurlMulti object, and tracking the requests in the current scope until they complete. Although the CurlMulti object only tracks whether or not requests in the current scope have completed, it still sends all requests added to the object in parallel.


Expanded class hierarchy of CurlMulti

See also

RequestInterface} objects in parallel using curl_multi

1 file declares its use of CurlMulti
Client.php in drupal/core/vendor/guzzle/http/Guzzle/Http/Client.php


drupal/core/vendor/guzzle/http/Guzzle/Http/Curl/CurlMulti.php, line 19


View source
class CurlMulti extends AbstractHasDispatcher implements CurlMultiInterface {

   * @var resource cURL multi handle.
  protected $multiHandle;

   * @var string The current state of the pool
  protected $state = self::STATE_IDLE;

   * @var array Attached {@see RequestInterface} objects.
  protected $requests;

   * @var array Cache of all requests currently in any scope
  protected $requestCache;

   * @var \SplObjectStorage {@see RequestInterface} to {@see CurlHandle} storage
  protected $handles;

   * @var array Hash mapping curl handle resource IDs to request objects
  protected $resourceHash;

   * @var array Queued exceptions
  protected $exceptions = array();

   * @var array Queue of handles to remove once everything completes
  protected $removeHandles;

   * @var array cURL multi error values and codes
  protected $multiErrors = array(
    CURLM_BAD_HANDLE => array(
      'The passed-in handle is not a valid CURLM handle.',
      "An easy handle was not good/valid. It could mean that it isn't an easy handle at all, or possibly that the handle already is in used by this or another multi handle.",
    CURLM_OUT_OF_MEMORY => array(
      'You are doomed.',
      'This can only be returned if libcurl bugs. Please report it to us!',

   * @var CurlMulti
  private static $instance;

   * @var int
  private $scope = -1;

   * Get a cached instance of the curl multi object
   * @return CurlMulti
  public static function getInstance() {

    // @codeCoverageIgnoreStart
    if (!self::$instance) {
      self::$instance = new self();

    // @codeCoverageIgnoreEnd
    return self::$instance;

   * {@inheritdoc}
  public static function getAllEvents() {
    return array(
      // A request was added
      // A request was removed
      // Requests are about to be sent
      // The pool finished sending the requests
      // A request is still polling (sent to request's event dispatchers)
      // A request exception occurred

   * {@inheritdoc}
  public function __construct() {

    // You can get some weird "Too many open files" errors when sending a large amount of requests in parallel.These
    // two statements autoload classes before a system runs out of file descriptors so that you can get back
    // valuable error messages if you run out.

   * {@inheritdoc}
  public function __destruct() {
    if (is_resource($this->multiHandle)) {

   * {@inheritdoc}
   * Adds a request to a batch of requests to be sent in parallel.
   * Async requests adds a request to the current scope to be executed in parallel with any currently executing cURL
   * handles. You may only add an async request while other requests are transferring. Attempting to add an async
   * request while no requests are transferring will add the request normally in the next available scope (e.g. 0).
   * @param RequestInterface $request Request to add
   * @param bool             $async   Set to TRUE to add to the current scope
   * @return self
  public function add(RequestInterface $request, $async = false) {
    if ($async && $this->state != self::STATE_SENDING) {
      $async = false;
    $this->requestCache = null;
    $scope = $async ? $this->scope : $this->scope + 1;
    if (!isset($this->requests[$scope])) {
      $this->requests[$scope] = array(
    else {
      $this->requests[$scope][] = $request;
      ->dispatch(self::ADD_REQUEST, array(
      'request' => $request,

    // If requests are currently transferring and this is async, then the
    // request must be prepared now as the send() method is not called.
    if ($async && $this->state == self::STATE_SENDING) {
    return $this;

   * {@inheritdoc}
  public function all() {
    if (!$this->requestCache) {
      $this->requestCache = empty($this->requests) ? array() : call_user_func_array('array_merge', $this->requests);
    return $this->requestCache;

   * {@inheritdoc}
  public function getState() {
    return $this->state;

   * {@inheritdoc}
  public function remove(RequestInterface $request) {
    $this->requestCache = null;
    foreach ($this->requests as $scope => $scopedRequests) {
      $pos = array_search($request, $scopedRequests, true);
      if ($pos !== false) {
      ->dispatch(self::REMOVE_REQUEST, array(
      'request' => $request,
    return $this;

   * {@inheritdoc}
  public function reset($hard = false) {

    // Remove each request
    foreach ($this
      ->all() as $request) {
    $this->requests = array();
    $this->exceptions = array();
    $this->state = self::STATE_IDLE;
    $this->scope = -1;
    $this->requestCache = null;

    // Remove any curl handles that were queued for removal
    if ($this->scope == -1 || $hard) {
      foreach ($this->removeHandles as $handle) {
        curl_multi_remove_handle($this->multiHandle, $handle
      $this->removeHandles = array();
    if ($hard) {

   * {@inheritdoc}
  public function send() {
    $this->state = self::STATE_SENDING;
    $requestsInScope = empty($this->requests[$this->scope]) ? array() : $this->requests[$this->scope];

    // Only prepare and send requests that are in the current recursion scope
    // Only enter the main perform() loop if there are requests in scope
    if ($requestsInScope) {

      // Any exceptions thrown from this event should break the entire flow of sending requests
        ->dispatch(self::BEFORE_SEND, array(
        'requests' => $this->requests[$this->scope],
      foreach ($this->requests[$this->scope] as $request) {
        if ($request
          ->getState() != RequestInterface::STATE_TRANSFER) {
      try {
      } catch (\Exception $e) {
        $this->exceptions[] = array(
          'request' => null,
          'exception' => $e,

    // Aggregate exceptions into a MultiTransferException if needed
    $multiException = $this

    // Complete the transfer if this is not a nested scope
    if ($this->scope == -1) {
      $this->state = self::STATE_COMPLETE;

    // Throw any exceptions that were encountered
    if ($multiException) {
      throw $multiException;

   * {@inheritdoc}
  public function count() {
    return count($this

   * Build a MultiTransferException if needed
   * @param array $requestsInScope All requests in the previous scope
   * @return MultiTransferException|null
  protected function buildMultiTransferException(array $requestsInScope) {
    if (empty($this->exceptions)) {
      return null;

    // Keep a list of all requests, and remove errored requests from the list
    $store = new \SplObjectStorage();
    foreach ($requestsInScope as $request) {
    $multiException = new MultiTransferException('Errors during multi transfer');
    while ($e = array_shift($this->exceptions)) {
      if (isset($e['request'])) {

        // Remove from the total list so that it becomes a list of successful requests

    // Add successful requests
    foreach ($store as $request) {
    return $multiException;

   * Prepare for sending
   * @param RequestInterface $request Request to prepare
  protected function beforeSend(RequestInterface $request) {
    try {
        ->dispatch('request.before_send', array(
        'request' => $request,
      if ($request
        ->getState() != RequestInterface::STATE_TRANSFER) {

        // Requests might decide they don't need to be sent just before transfer (e.g. CachePlugin)
      elseif ($request
        ->get('queued_response')) {

        // Queued responses do not need to be sent using curl
      else {

        // Add the request's curl handle to the multi handle
          ->checkCurlResult(curl_multi_add_handle($this->multiHandle, $this
    } catch (\Exception $e) {
        ->removeErroredRequest($request, $e);

   * Create a curl handle for a request
   * @param RequestInterface $request Request
   * @return CurlHandle
  protected function createCurlHandle(RequestInterface $request) {
    $wrapper = CurlHandle::factory($request);
      ->attach($request, $wrapper);
    $this->resourceHash[(int) $wrapper
      ->getHandle()] = $request;
    return $wrapper;

   * Get the data from the multi handle
  protected function perform() {

    // @codeCoverageIgnoreStart
    // Weird things can happen when making HTTP requests in __destruct methods
    if (!$this->multiHandle) {

    // @codeCoverageIgnoreEnd
    // If there are no requests to send, then exit from the function
    if ($this->scope <= 0) {
      if ($this
        ->count() == 0) {
    elseif (empty($this->requests[$this->scope])) {

    // Create the polling event external to the loop
    $event = array(
      'curl_multi' => $this,
    $active = $this
    while (1) {

      // Exit the function if there are no more requests to send
      if (!($scopedPolling = $this->scope <= 0 ? $this
        ->all() : $this->requests[$this->scope])) {

      // Notify each request as polling
      $blocking = $total = 0;
      foreach ($scopedPolling as $request) {
        $event['request'] = $request;
          ->dispatch(self::POLLING_REQUEST, $event);

        // The blocking variable just has to be non-falsey to block the loop
        if ($request
          ->hasKey(self::BLOCKING)) {
      if ($blocking == $total) {

        // Sleep to prevent eating CPU because no requests are actually pending a select call
      else {

        // Select the curl handles until there is any activity on any of the open file descriptors
        // See
        $active = $this
          ->executeHandles(true, 0.02, $active);

   * Process any received curl multi messages
  private function processMessages() {

    // Get messages from curl handles
    while ($done = curl_multi_info_read($this->multiHandle)) {
      try {
        $request = $this->resourceHash[(int) $done['handle']];
        $handle = $this->handles[$request];
          ->processResponse($request, $handle, $done);
      } catch (\Exception $e) {
          ->removeErroredRequest($request, $e);

   * Execute and select curl handles until there is activity
   * @param bool $select  Set to TRUE to select the file descriptors
   * @param int  $timeout Select timeout in seconds
   * @param int  $active  Previous active value
   * @return int Returns the number of active handles
  private function executeHandles($select = false, $timeout = 1, $active = 0) {
    do {

      // @codeCoverageIgnoreStart
      if ($select && $active && curl_multi_select($this->multiHandle, $timeout) == -1) {

        // Perform a usleep if a previously executed select returned -1
        // @see

      // @codeCoverageIgnoreEnd
      do {
        $mrc = curl_multi_exec($this->multiHandle, $active);
      } while ($mrc == CURLM_CALL_MULTI_PERFORM);

      // Check the return value to ensure an error did not occur

      // Poll once if not selecting, or poll until there are no handles with activity
    } while ($select && $active);
    return $active;

   * Remove a request that encountered an exception
   * @param RequestInterface $request Request to remove
   * @param \Exception       $e       Exception encountered
  protected function removeErroredRequest(RequestInterface $request, \Exception $e) {
    $this->exceptions[] = array(
      'request' => $request,
      'exception' => $e,
      ->dispatch(self::MULTI_EXCEPTION, array(
      'exception' => $e,
      'all_exceptions' => $this->exceptions,

   * Check for errors and fix headers of a request based on a curl response
   * @param RequestInterface $request Request to process
   * @param CurlHandle       $handle  Curl handle object
   * @param array            $curl    Array returned from curl_multi_info_read
   * @throws CurlException on Curl error
  protected function processResponse(RequestInterface $request, CurlHandle $handle, array $curl) {

    // Set the transfer stats on the response

    // Check if a cURL exception occurred, and if so, notify things
    $curlException = $this
      ->isCurlException($request, $handle, $curl);

    // Always remove completed curl handles.  They can be added back again
    // via events if needed (e.g. ExponentialBackoffPlugin)
    if (!$curlException) {
        ->setState(RequestInterface::STATE_COMPLETE, array(
        'handle' => $handle,

      // Only remove the request if it wasn't resent as a result of the state change
      if ($request
        ->getState() != RequestInterface::STATE_TRANSFER) {
    else {

      // Set the state of the request to an error

      // Notify things that listen to the request of the failure
        ->dispatch('request.exception', array(
        'request' => $this,
        'exception' => $curlException,

      // Allow things to ignore the error if possible
      $state = $request
      if ($state != RequestInterface::STATE_TRANSFER) {

      // The error was not handled, so fail
      if ($state == RequestInterface::STATE_ERROR) {

        /** @var $curlException \Exception */
        throw $curlException;

   * Remove a curl handle from the curl multi object
   * Nasty things (bus errors, segmentation faults) can sometimes occur when removing curl handles when in a callback
   * or a recursive scope.  Here we are queueing all curl handles that need to be removed and closed so that this
   * happens only in the outermost scope when everything has completed sending.
   * @param RequestInterface $request Request that owns the handle
  protected function removeHandle(RequestInterface $request) {
    if ($this->handles
      ->contains($request)) {
      $handle = $this->handles[$request];
      unset($this->resourceHash[(int) $handle
      $this->removeHandles[] = $handle;

   * Check if a cURL transfer resulted in what should be an exception
   * @param RequestInterface $request Request to check
   * @param CurlHandle       $handle  Curl handle object
   * @param array            $curl    Array returned from curl_multi_info_read
   * @return \Exception|bool
  private function isCurlException(RequestInterface $request, CurlHandle $handle, array $curl) {
    if (CURLM_OK == $curl['result'] || CURLM_CALL_MULTI_PERFORM == $curl['result']) {
      return false;
    $e = new CurlException(sprintf('[curl] %s: %s [url] %s', $handle
      ->getErrorNo(), $handle
      ->getError(), $handle
      ->getError(), $handle
    return $e;

   * Throw an exception for a cURL multi response if needed
   * @param int $code Curl response code
   * @throws CurlException
  private function checkCurlResult($code) {
    if ($code != CURLM_OK && $code != CURLM_CALL_MULTI_PERFORM) {
      throw new CurlException(isset($this->multiErrors[$code]) ? "cURL error: {$code} ({$this->multiErrors[$code][0]}): cURL message: {$this->multiErrors[$code][1]}" : 'Unexpected cURL error: ' . $code);

   * Create the new cURL multi handle with error checking
  private function createMultiHandle() {
    if ($this->multiHandle && is_resource($this->multiHandle)) {
    $this->requests = array();
    $this->multiHandle = curl_multi_init();
    $this->handles = new \SplObjectStorage();
    $this->resourceHash = array();
    $this->removeHandles = array();

    // @codeCoverageIgnoreStart
    if ($this->multiHandle === false) {
      throw new CurlException('Unable to create multi handle');

    // @codeCoverageIgnoreEnd



Namesort descending Modifiers Type Description Overrides
AbstractHasDispatcher::$eventDispatcher protected property
AbstractHasDispatcher::addSubscriber public function Add an event subscriber to the dispatcher Overrides HasDispatcherInterface::addSubscriber
AbstractHasDispatcher::dispatch public function Helper to dispatch Guzzle events and set the event name on the event Overrides HasDispatcherInterface::dispatch
AbstractHasDispatcher::getEventDispatcher public function Get the EventDispatcher of the request Overrides HasDispatcherInterface::getEventDispatcher
AbstractHasDispatcher::setEventDispatcher public function Set the EventDispatcher of the request Overrides HasDispatcherInterface::setEventDispatcher
CurlMulti::$exceptions protected property
CurlMulti::$handles protected property @var \SplObjectStorage {
CurlMulti::$instance private static property
CurlMulti::$multiErrors protected property
CurlMulti::$multiHandle protected property
CurlMulti::$removeHandles protected property
CurlMulti::$requestCache protected property
CurlMulti::$requests protected property @var array Attached {
CurlMulti::$resourceHash protected property
CurlMulti::$scope private property
CurlMulti::$state protected property
CurlMulti::add public function Adds a request to a batch of requests to be sent in parallel. Overrides CurlMultiInterface::add
CurlMulti::all public function Get an array of attached { Overrides CurlMultiInterface::all
CurlMulti::beforeSend protected function Prepare for sending
CurlMulti::buildMultiTransferException protected function Build a MultiTransferException if needed
CurlMulti::checkCurlResult private function Throw an exception for a cURL multi response if needed
CurlMulti::count public function
CurlMulti::createCurlHandle protected function Create a curl handle for a request
CurlMulti::createMultiHandle private function Create the new cURL multi handle with error checking
CurlMulti::executeHandles private function Execute and select curl handles until there is activity
CurlMulti::getAllEvents public static function Get a list of all of the events emitted from the class Overrides AbstractHasDispatcher::getAllEvents
CurlMulti::getInstance public static function Get a cached instance of the curl multi object
CurlMulti::getState public function Get the current state of the Pool Overrides CurlMultiInterface::getState
CurlMulti::isCurlException private function Check if a cURL transfer resulted in what should be an exception
CurlMulti::perform protected function Get the data from the multi handle
CurlMulti::processMessages private function Process any received curl multi messages
CurlMulti::processResponse protected function Check for errors and fix headers of a request based on a curl response
CurlMulti::remove public function Remove a request from the pool. Overrides CurlMultiInterface::remove
CurlMulti::removeErroredRequest protected function Remove a request that encountered an exception
CurlMulti::removeHandle protected function Remove a curl handle from the curl multi object
CurlMulti::reset public function Reset the state and remove any attached RequestInterface objects Overrides CurlMultiInterface::reset
CurlMulti::send public function Send a pool of { Overrides CurlMultiInterface::send
CurlMulti::__construct public function
CurlMulti::__destruct public function
CurlMultiInterface::ADD_REQUEST constant
CurlMultiInterface::BEFORE_SEND constant
CurlMultiInterface::BLOCKING constant
CurlMultiInterface::COMPLETE constant
CurlMultiInterface::MULTI_EXCEPTION constant
CurlMultiInterface::POLLING_REQUEST constant
CurlMultiInterface::REMOVE_REQUEST constant
CurlMultiInterface::STATE_COMPLETE constant
CurlMultiInterface::STATE_IDLE constant
CurlMultiInterface::STATE_SENDING constant