View file application/modules/Core/Model/DbTable/Tasks.php

File size: 32.52Kb
<?php
/**
 * SocialEngine
 *
 * @category   Application_Core
 * @package    Core
 * @copyright  Copyright 2006-2020 Webligo Developments
 * @license    http://www.socialengine.com/license/
 * @version    $Id: Tasks.php 9747 2012-07-26 02:08:08Z john $
 * @author     John
 */

/**
 * @category   Application_Core
 * @package    Core
 * @copyright  Copyright 2006-2020 Webligo Developments
 * @license    http://www.socialengine.com/license/
 */
class Core_Model_DbTable_Tasks extends Engine_Db_Table
{
  // Properties
  
  protected $_mode;
  
  protected $_pid;

  protected $_config;

  protected $_external;

  protected $_log;

  protected $_tasks;

  protected $_isExecuting = false;

  protected $_isShutdownRegistered = false;

  protected $_executingTask;

  protected $_runCount = 0;



  // Methods

  // General
  
  public function init()
  {
    // Get configuration
    $this->_config = (array) Engine_Api::_()->getApi('settings', 'core')->getSetting('core.tasks');
    $this->_config = array_merge(array(
      'count'     => 1,       // Max number of tasks run per request
      'countidle' => false,   // Count idle tasks towards the tasks per request
      'interval'  => 15,      // Minimum interval between triggers
      'jobs'      => 3,       // Max number of jobs run per request
      'key'       => '',      // Access key
      'last'      => '',      // Last time trigger was run (or execute in cron mode)
      'mode'      => 'curl',  // Method to trigger execution
      'pid'       => '',      // Random id for trigger and execution mutex
      'processes' => 2,       // Number of allowed concurrent processes
      // @todo add usec support?
      'sleeppre'  => 0,       // (For debug) Seconds to sleep between acquiring task lock and executing task
      'sleepint'  => 0,       // (For debug) Seconds to sleep inside tasks between sub-tasks
      'sleeppost' => 0,       // (For debug) Seconds to sleep after executing task and clearing lock
      'time'      => 120,     // Max time allowed per request (auto adjusts if ini_get('max_execution_time') is available)
      'timeout'   => 900,     // Time before a process is considered rogue
    ), array_filter($this->_config));

    // Make pid
    $this->_pid = $this->_generatePid();
    
    // Get mode
    $this->_mode = ( !empty($this->_config['mode']) ? $this->_config['mode'] : 'curl' );

    // Generate key if missing
    if( empty($this->_config['key']) ) {
      $this->_config['key'] = $this->_generatePid(true);
      Engine_Api::_()->getApi('settings', 'core')->setSetting('core.tasks.key', $this->_config['key']);
    }

    // Adjust time limit if possible
    if( empty($this->_config['time']) || $this->_config['time'] <= 0 ) {
      $this->_config['time'] = 120;
    }
    if( function_exists('ini_get') &&
        ($max_execution_time = ini_get('max_execution_time')) &&
        'cli' !== PHP_SAPI ) {
      if( -1 == $max_execution_time ) {
        // What should we do here
        //$this->_config['time'] = 600;
      } else if( $max_execution_time < $this->_config['time'] ) {
        $this->_config['time'] = floor(0.8 * $max_execution_time);
      }
    }

    // Get default external
    $this->_external = array(
      'key' => null,    // The external access key passed to this request
      'pid' => null,    // The external process id passed to this request
    );
  }

  

  // Config
  
  public function getParam($key, $default = null)
  {
    if( isset($this->_config[$key]) ) {
      return $this->_config[$key];
    } else {
      return $default;
    }
  }
  
  public function getTriggerType()
  {
    switch( $this->_mode ) {
      case 'none':
      case 'cron':
        return false;
        break;
      case 'fork':
      case 'asset':
        return 'pre';
        break;
      case 'curl':
      case 'socket':
      case 'exec':
      default:
        return 'post';
        break;
    }
  }


  
  // Log
  
  /**
   * Get our logger
   * 
   * @return Zend_Log
   */
  public function getLog()
  {
    if( null === $this->_log ) {
      $logAdapter = Engine_Api::_()->getDbtable('settings', 'core')
        ->getSetting('core.log.adapter', 'file');
      $log = new Zend_Log();
      $log->setEventItem('domain', 'tasks');
      try {
        switch( $logAdapter ) {
          case 'file': default:
            $log->addWriter(new Zend_Log_Writer_Stream(APPLICATION_PATH . '/temporary/log/tasks.log'));
            break;
          case 'database':
            $log->addWriter(new Zend_Log_Writer_Db($this->getAdapter(), 'engine4_core_log'));
            break;
          case 'none':
            $log->addWriter(new Zend_Log_Writer_Null());
            break;
        }
      } catch( Exception $e ) {
        $log->addWriter(new Zend_Log_Writer_Null());
      }
      $this->_log = $log;
    }
    return $this->_log;
  }

  public function setLog(Zend_Log $log)
  {
    $this->_log = $log;
    return $this;
  }



  // Tasks

  public function getTasks()
  {
    if( null === $this->_tasks ) {
      $select = $this->select()
        ->where('module IN(?)', (array) Engine_Api::_()->getDbtable('modules', 'core')->getEnabledModuleNames())
        ;

      // Order by last?
      
      if( engine_in_array('priority', $this->info('cols')) ) {
        $select->order('priority DESC');
      }

      $this->_tasks = $this->fetchAll($select);
    }
    return $this->_tasks;
  }
  
  public function getTaskPlugin($task)
  {
    // Must be a row of tasks or jobs
    if( !is_object($task) ) {
      throw new Core_Model_Exception(sprintf('Must be given a row of ' .
        'Core_Model_DbTable_Tasks, given ' .
        '%s', gettype($task)));
    } else if( !($task->getTable() instanceof Core_Model_DbTable_Tasks) ) {
      throw new Core_Model_Exception(sprintf('Must be given a row of ' .
          'Core_Model_DbTable_Tasks, given ' .
          '%s', get_class($task)));
    }
    
    // Get plugin class
    $class = $task->plugin;
    
    // Load class
    Engine_Loader::loadClass($class);

    // Make sure is a subclass of Core_Plugin_Task_Abstract
    if( !is_subclass_of($class, 'Core_Plugin_Task_Abstract') ) {
      throw new Core_Model_Exception(sprintf('Task plugin %1$s should extend Core_Plugin_Task_Abstract', $class));
    }

    // Check for execute method?
    if( !method_exists($class, 'execute') ) {
      throw new Core_Model_Exception(sprintf('Task plugin %1$s does not have an execute method', $class));
    }

    // Get plugin object
    $plugin = new $class($task);

    // Set the log
    $plugin->setLog($this->getLog());

    return $plugin;
  }


  
  // Triggering

  public function trigger()
  {
    // Benchmark trigger
    if( 'development' == APPLICATION_ENV ) {
      $start = microtime(true);
    }

    // Do not trigger in cron mode
    if( 'cron' === $this->_mode ) {
      return $this;
    }

    // Check if we should trigger
    if( !$this->_triggerCheck() ) {
      return $this;
    }

    // Get method
    $method = '_triggerMethod' . ucfirst($this->_mode);

    // Unknown mode
    if( !method_exists($this, $method) ) {
      throw new Core_Model_Exception('Unsupported mode: ' . $this->_mode);
    }

    // Set ignore user abort
    $prev = ignore_user_abort(true);

    // Trigger
    $this->$method();

    // Reset ignore user abort
    ignore_user_abort($prev);

    // Benchmark trigger
    if( 'development' == APPLICATION_ENV ) {
      $end = microtime(true);
      $delta = $end - $start;
      $this->getLog()->log(sprintf('Trigger Benchmark [%d] : %f seconds', $this->_pid, $delta), Zend_Log::DEBUG);
    }

    return $this;
  }

  protected function _triggerCheck()
  {
    // Log
    if( APPLICATION_ENV == 'development' ) {
      $this->getLog()->log(sprintf('Trigger Check [%d] ', $this->_pid), Zend_Log::DEBUG);
    }
    
    // Get settings table
    $table = Engine_Api::_()->getDbtable('settings', 'core');

    // Get last
    $last = $table->fetchRow(array('name = ?' => 'core.tasks.last'));
    if( null === $last ) {
      try {
        $table->insert(array(
          'name' => 'core.tasks.last',
          'value' => '',
        ));
      } catch( Exception $e ) {}
    } else {
      $last = $last->value;
    }

    // Get pid
    $pid = $table->fetchRow(array('name = ?' => 'core.tasks.pid'));
    if( null === $pid ) {
      try {
        $table->insert(array(
          'name' => 'core.tasks.pid',
          'value' => '',
        ));
      } catch( Exception $e ) {}
    } else {
      $pid = $pid->value;
    }
    
    // If we are still triggering, make sure delta is larger than the ther interval or timeout
      if(  (int)$pid && time() <  (int)$last + max( (int)$this->_config['interval'],  (int)$this->_config['timeout']) ) {
      // Log
      if( APPLICATION_ENV == 'development' ) {
        $this->getLog()->log(sprintf('Trigger Check Failed - Still Triggering + Timeout [%d] - %d < %d',
            $this->_pid, time(), $last + max($this->_config['interval'], $this->_config['timeout'])), Zend_Log::DEBUG);
      }
      return false;
    }
    // Otherwise, if empty, make sure delta is larger than the min of interval and timeout
    else if( !(int)$pid && time() < (int)$last + min((int)$this->_config['interval'], (int)$this->_config['timeout']) ) {
      // Log
      if( APPLICATION_ENV == 'development' ) {
        $this->getLog()->log(sprintf('Trigger Check Failed - Timeout [%d] - %d < %d',
            $this->_pid, time(), $last + min($this->_config['interval'], $this->_config['timeout'])), Zend_Log::DEBUG);
      }
      return false;
    }

    // Acquire lock (pid)
    $affected = $table->update(array(
      'value' => $this->_pid,
    ), array(
      'name = ?' => 'core.tasks.pid',
      'value = ?' => (string) $pid,
    ));

    if( 1 !== $affected ) {
      // Log
      if( APPLICATION_ENV == 'development' ) {
        $this->getLog()->log(sprintf('Trigger Lock Failed [%d] ', $this->_pid), Zend_Log::DEBUG);
      }
      return false;
    }

    // Update last
    $affected = $table->update(array(
      'value' => time(),
    ), array(
      'name = ?' => 'core.tasks.last',
    ));

    // Clear pid lock
    $affected = $table->update(array(
      'value' => '',
    ), array(
      'name = ?' => 'core.tasks.pid',
      'value = ?' => (string) $this->_pid,
    ));

    if( 1 !== $affected ) {
      // Log
      if( APPLICATION_ENV == 'development' ) {
        $this->getLog()->log(sprintf('Trigger Lock Clear Failed [%d] ', $this->_pid), Zend_Log::DEBUG);
      }
    }

    // Check the processes to see if we are above the limit, and whether or not
    // they have timed out
    $processes = $this->getAdapter()->select()
      ->from('engine4_core_processes')
      ->query()
      ->fetchAll()
      ;

    $activeProcesess = 0;
    foreach( $processes as $process ) {
      $started = ( !empty($process['started']) ? $process['started'] : 0 );
      $timeout = ( !empty($process['timeout']) ? $process['timeout'] : $this->_config['timeout'] );
      
      // It's timed out
      if( time() > $started + $timeout ) {
        // Delete
        $this->getAdapter()->delete('engine4_core_processes', array(
          'pid' => $process['pid'],
        ));
        // Log
        if( APPLICATION_ENV == 'development' ) {
          $this->getLog()->log(sprintf('Process Timeout [%d] : %d ', $this->_pid, $process['pid']), Zend_Log::DEBUG);
        }
        continue;
      }

      $activeProcesess++;
    }

    // Process limit reached
    if( $activeProcesess >= $this->_config['processes'] ) {
      // Log
      if( APPLICATION_ENV == 'development' ) {
        $this->getLog()->log(sprintf('Process Limit Reached [%d] : %d ', $this->_pid, $activeProcesess), Zend_Log::DEBUG);
      }
      return false;
    }
    
    // Log
    if( APPLICATION_ENV == 'development' ) {
      $this->getLog()->log(sprintf('Trigger Pass [%d] ', $this->_pid), Zend_Log::DEBUG);
    }
    
    // Okay, let's go!
    return true;
  }

  protected function _triggerMethodCurl()
  {
    global $generalConfig;
    $code = null;
    if( !empty($generalConfig['maintenance']['code']) ) {
      $code = $generalConfig['maintenance']['code'];
    }

    // Setup
    $scheme = ( !empty($_SERVER['HTTPS']) && $_SERVER['HTTPS'] == 'on' ? 'https://' : 'http://' );
    $host = $_SERVER['HTTP_HOST'];
    if( false !== stripos($_SERVER['SERVER_SOFTWARE'], 'IIS') ) {
      if( !empty($_SERVER['LOCAL_ADDR']) ) {
        $addr = $_SERVER['LOCAL_ADDR'];
      } else if( !empty($_SERVER['SERVER_ADDR']) ) {
        $addr = $_SERVER['SERVER_ADDR'];
      } else if( !empty($_SERVER['HTTP_HOST']) ) {
        $addr = $_SERVER['HTTP_HOST'];
      } else {
        $addr = '127.0.0.1';
      }
    } else {
      $addr = '127.0.0.1';
    }
    $port = ( !empty($_SERVER['SERVER_PORT']) ? (integer) $_SERVER['SERVER_PORT'] : 80 );
    $path = Zend_Controller_Front::getInstance()->getRouter()->assemble(array('controller' => 'utility', 'action' => 'tasks'), 'default', true)
      . '?notrigger=1'
      . '&key=' . $this->_config['key']
      . '&pid=' . $this->_pid
      ;
    $url = $scheme . $host . $path;

    // Set options
    $curl_handle = curl_init();

    curl_setopt($curl_handle, CURLOPT_URL, $url);
    curl_setopt($curl_handle, CURLOPT_PORT, $port);
    curl_setopt($curl_handle, CURLOPT_RETURNTRANSFER, true);
    curl_setopt($curl_handle, CURLOPT_HTTPHEADER, array('Host: ' . $_SERVER['HTTP_HOST']));

    // Try to handle basic htauth
    if( !empty($_SERVER['PHP_AUTH_USER']) && !empty($_SERVER['PHP_AUTH_PW']) ) {
      curl_setopt($curl_handle, CURLOPT_HTTPAUTH, CURLAUTH_BASIC);
      curl_setopt($curl_handle, CURLOPT_USERPWD, $_SERVER['PHP_AUTH_USER'] . ':' . $_SERVER['PHP_AUTH_PW']);
    }

    // Try to handle maintenance mode
    if( $code ) {
      curl_setopt($curl_handle, CURLOPT_COOKIE, 'en4_maint_code=' . $code);
    }
    // If SSL is enabled
    if ($scheme == 'https://') {
      curl_setopt($curl_handle, CURLOPT_VERBOSE, true);
      curl_setopt($curl_handle, CURLOPT_SSL_VERIFYPEER, false);
    }

    curl_exec($curl_handle);
    curl_close($curl_handle);
  }

  protected function _triggerMethodSocket()
  {
    global $generalConfig;
    $code = null;
    if( !empty($generalConfig['maintenance']['code']) ) {
      $code = $generalConfig['maintenance']['code'];
    }

    // Setup
    $scheme = ( !empty($_SERVER['HTTPS']) && $_SERVER['HTTPS'] == 'on' ? 'https://' : 'http://' );
    $host = $_SERVER['HTTP_HOST'];
    if( false !== stripos($_SERVER['SERVER_SOFTWARE'], 'IIS') ) {
      if( !empty($_SERVER['LOCAL_ADDR']) ) {
        $addr = $_SERVER['LOCAL_ADDR'];
      } else if( !empty($_SERVER['SERVER_ADDR']) ) {
        $addr = $_SERVER['SERVER_ADDR'];
      } else if( !empty($_SERVER['HTTP_HOST']) ) {
        $addr = $_SERVER['HTTP_HOST'];
      } else {
        $addr = '127.0.0.1';
      }
    } else {
      $addr = '127.0.0.1';
    }
    $port = ( !empty($_SERVER['SERVER_PORT']) ? (integer) $_SERVER['SERVER_PORT'] : 80 );
    $path = Zend_Controller_Front::getInstance()->getRouter()->assemble(array('controller' => 'utility', 'action' => 'tasks'), 'default', true)
      . '?notrigger=1'
      . '&key=' . $this->_config['key']
      . '&pid=' . $this->_pid
      ;
    // If SSL is enabled, let's turn it off and hope it works
    if( $scheme == 'https://' && $port == 443 ) {
      $scheme = 'http://';
      $port = 80;
    }

    $url = $scheme . $host . $path;

    // Connect
    $handle = fsockopen($addr, $port, $errno, $errstr, 0.5);
    stream_set_blocking($handle, 1);
    if( !$handle ) {
      //echo "$errstr ($errno)<br />\n";
      return;
    } else {
      $out = "GET {$path} HTTP/1.1\r\n";
      $out .= "Host: {$host}\r\n";
      if( !empty($code) ) {
        $out .= "Cookie: en4_maint_code={$code}\r\n";
      }
      $out .= "Connection: Close\r\n\r\n";

      fwrite($handle, $out);

      // Can't close or the remote connection will cancel
      //fclose($handle);
    }
  }

  protected function _triggerMethodCron()
  {
    return false;
  }

  protected function _triggerMethodFork()
  {
    if( !function_exists('pcntl_fork') ) {
      $this->getLog()->log('Fork not available', Zend_Log::ERR);
      return;
    }

    // Fork
    $pid = pcntl_fork();
    if( $pid == -1 ) {
      $this->getLog()->log('Could not fork', Zend_Log::ERR);
      //die('could not fork');
    } else if( $pid ) {
      // we are the parent
      //pcntl_wait($status); //Protect against Zombie children
    } else {
      // we are the child
      $this->execute();
      exit();
    }
  }

  protected function _triggerMethodAsset()
  {
    // Get url
    $path = Zend_Controller_Front::getInstance()->getRouter()->assemble(array('controller' => 'utility', 'action' => 'tasks'), 'default', true)
      . '?notrigger=1'
      //. '&key=' . $this->_config['key']
      . '&pid=' . $this->_pid
      . '&mode=' . 'js'
      ;
    $url = $path;

    // Add to headScript
    $headScript = new Zend_View_Helper_HeadScript();
    $headScript->appendFile($url);
  }

  protected function _triggerMethodExec()
  {
    // Check if exec exists
    if( !function_exists('exec') ) {
      $this->getLog()->log('Could not trigger using CLI, exec is disabled.', Zend_Log::ERR);
      return false;
    }
    
    // @todo should we execute which?
    
    // Run command
    $command = 'php -f' . ' '
      . escapeshellarg(APPLICATION_PATH . '/application/cli.php')
      . ' '
      . escapeshellarg('controller=utility,action=tasks,pid=' . $this->_pid . ',key=' . $this->_config['key'])
      . ' >> ' . escapeshellarg(APPLICATION_PATH . '/temporary/log/tasks.log') . ' 2>&1 &'
      ;

    $this->getLog()->log($command, Zend_Log::ERR);
    
    exec($command);
  }



  // Execution
  
  public function execute($params = null)
  {
    // Benchmark execute
    if( 'development' == APPLICATION_ENV ) {
      $start = microtime(true);
    }

    // Set time limit and ignore_user_abort
    set_time_limit(0);
    $prev = ignore_user_abort(true);
    
    // Register fatal error handler
    if( !$this->_isShutdownRegistered ) {
      register_shutdown_function(array($this, 'handleShutdown'));
      $this->_isShutdownRegistered = true;
    }

    // Signal execution start
    $this->_isExecuting = true;

    // Process params
    $this->_external = array_merge(
      $this->_external,
      array_intersect_key(array_merge($_GET, $_POST, (array) $params), $this->_external)
    );

    // Update last in cron mode for consistency
    if( 'cron' === $this->_mode ) {
      Engine_Api::_()->getDbtable('settings', 'core')->update(array(
        'value' => time(),
      ), array(
        'name = ?' => 'core.tasks.last',
      ));
    }

    // Execute
    if( $this->_executeCheck() ) {

      // Inject current process identifier
      $this->getAdapter()->insert('engine4_core_processes', array(
        'pid' => $this->_pid,
        'parent_pid' => (int) $this->_external['pid'],
        'system_pid' => (int) ( function_exists('posix_getpid') ? posix_getpid() : 0 ),
        'started' => time(),
        'timeout' => 0, // @todo
        'name' => '',
      ));
      
      // Run tasks
      foreach( $this->getTasks() as $task ) {
        // Check if they were run in the background while other tasks were executing
        $task->refresh();
        if( $this->_executeTaskCheck($task) ) {
          $this->_executeTask($task);
        }
      }
      
      // Log
      if( APPLICATION_ENV == 'development' ) {
        $this->getLog()->log(sprintf('Execution Complete [%d] [%d]', $this->_pid, $this->_external['pid']), Zend_Log::DEBUG);
      }
    }

    // Signal execution end
    $this->_isExecuting = false;

    // Restore abort
    ignore_user_abort($prev);

    // Benchmark trigger
    if( 'development' == APPLICATION_ENV ) {
      $end = microtime(true);
      $delta = $end - $start;
      if( !empty($this->slept) ) {
        $deltaUnslept = $delta - $this->slept;
        $this->getLog()->log(sprintf('Execution Benchmark [%d] [%d] : %f seconds (including sleep: %f)', $this->_pid, $this->_external['pid'], $deltaUnslept, $delta), Zend_Log::DEBUG);
      } else {
        $this->getLog()->log(sprintf('Execution Benchmark [%d] [%d] : %f seconds', $this->_pid, $this->_external['pid'], $delta), Zend_Log::DEBUG);
      }
    }

    return $this;
  }

  public function executeTask(Engine_Db_Table_Row $task)
  {
    $data = array();

    if( time() < $task->started_last + $task->timeout ) {
      $data['started_last'] = time() - $task->timeout;
    }

    if( time() < $task->completed_last + $task->timeout ) {
      $data['completed_last'] = time() - $task->timeout;
    }

    if( !empty($data) ) {
      $this->update($data, array(
        'task_id = ?' => $task->task_id,
      ));
    }
    
    return $this;
  }

  protected function _executeCheck()
  {
    // Log
    if( APPLICATION_ENV == 'development' ) {
      $this->getLog()->log(sprintf('Execution Check [%d] [%d] ', $this->_pid, $this->_external['pid']), Zend_Log::DEBUG);
    }

    // Check passkey
    if( 'asset' !== $this->_mode && $this->_external['key'] != $this->_config['key'] ) {
      return false;
    }
    
    // Check processes?
    $activeProcesess = $this->getAdapter()->select()
      ->from('engine4_core_processes', new Zend_Db_Expr('COUNT(pid)'))
      ->query()
      ->fetchColumn()
      ;

    // Process limit reached
    if( $activeProcesess >= $this->getParam('processes', 2) ) {
      // Log
      if( APPLICATION_ENV == 'development' ) {
        $this->getLog()->log(sprintf('Process Limit Reached [%d] : %d ', $this->_pid, $activeProcesess), Zend_Log::DEBUG);
      }
      return false;
    }
    
    // Log
    if( APPLICATION_ENV == 'development' ) {
      $this->getLog()->log(sprintf('Execution Pass [%d] [%d] ', $this->_pid, $this->_external['pid']), Zend_Log::DEBUG);
    }

    return true;
  }
  
  protected function _executeTask(Engine_Db_Table_Row $task)
  {
    // Log
    if( APPLICATION_ENV == 'development' ) {
      $this->getLog()->log(sprintf('Task Execution Check [%d] : %s', $this->_pid, $task->title), Zend_Log::DEBUG);
    }

    // Task execution semaphore
    $affected = $this->update(array(
      'started_last' => time(),
      'started_count' => new Zend_Db_Expr('started_count + 1'),
      'semaphore' => new Zend_Db_Expr('semaphore + 1'),
    ), array(
      'task_id = ?' => $task->task_id,
      'semaphore < ?' => new Zend_Db_Expr('processes'),
    ));

    if( 1 !== $affected ) {
      if( APPLICATION_ENV == 'development' ) {
        $this->getLog()->log(sprintf('Task Execution Failed Semaphore [%d] : %s', $this->_pid, $task->title), Zend_Log::DEBUG);
      }
      return false;
    }

    // Update process identifier
    $affected = $this->getAdapter()->update('engine4_core_processes', array(
      'name' => $task->plugin,
    ), array(
      'pid = ?' => $this->_pid,
    ));

    if( 1 !== $affected ) {
      // Wth?
      if( APPLICATION_ENV == 'development' ) {
        $this->getLog()->log(sprintf('Execution Failed Process Update [%d] : %s', $this->_pid, $task->plugin), Zend_Log::DEBUG);
      }
    }

    // Debug: sleeppre
    $slept = 0;
    if( $this->getParam('sleeppre', 0) > 0 ) {
      $slept += $this->getParam('sleeppre');
      sleep($this->getParam('sleeppre'));
    }
    
    // Refresh
    $task->refresh();
    
    // Log
    if( APPLICATION_ENV == 'development' ) {
      $this->getLog()->log(sprintf('Task Execution Pass [%d] : %s', $this->_pid, $task->title), Zend_Log::DEBUG);
    }



    // ----- MAIN -----
    
    // Set executing task
    $this->_executingTask = $task;

    // Invoke plugin
    $status = false;
    $isComplete = true;
    $wasIdle = false;
    
    try {
      // Get plugin object
      $plugin = $this->getTaskPlugin($task);

      // Execute
      $plugin->execute();

      // Check was idle
      $wasIdle = $plugin->wasIdle();

      // Ok
      $status = true;

    } catch( Exception $e ) {
      // Log exception
      $this->getLog()->log($e->__toString(), Zend_Log::ERR);
      $status = false;
    }

    // ----- MAIN -----



    // Debug: sleeppost
    if( $this->getParam('sleeppost', 0) > 0 ) {
      $slept += $this->getParam('sleeppost');
      sleep($this->getParam('sleeppost'));
    }
    if( !isset($this->slept) ) {
      $this->slept = 0;
    }
    $this->slept += $slept;
    
    // Update process identifier
    $affected = $this->getAdapter()->update('engine4_core_processes', array(
      'name' => '',
    ), array(
      'pid = ?' => $this->_pid,
    ));

    if( 1 !== $affected ) {
      // Wth?
      if( APPLICATION_ENV == 'development' ) {
        $this->getLog()->log(sprintf('Execution Failed Process Update (post) [%d] : %s', $this->_pid, $task->plugin), Zend_Log::DEBUG);
      }
    }

    // Update task and release semaphore
    $statusKey = ($status ? 'success' : 'failure');
    $affected = $this->update(array(
      'semaphore' => new Zend_Db_Expr('semaphore - 1'),
      'completed_last' => time(),
      'completed_count' => new Zend_Db_Expr('completed_count + 1'),
      $statusKey . '_last' => time(),
      $statusKey . '_count' => new Zend_Db_Expr($statusKey . '_count + 1'),
    ), array(
      'task_id = ?' => $task->task_id,
      'semaphore > ?' => 0,
    ));

    if( 1 !== $affected ) {
      if( APPLICATION_ENV == 'development' ) {
        $this->getLog()->log(sprintf('Task Execution Failed Semaphore Release [%d] : %s', $this->_pid, $task->title), Zend_Log::DEBUG);
      }
      return false;
    }
    
    // Update count
    if( !$wasIdle ) {
      $this->_runCount++;
    }
    
    // Remove executing task
    $this->_executingTask = null;

    // Log
    if( APPLICATION_ENV == 'development' ) {
      if( $status ) {
        $this->getLog()->log(sprintf('Task Execution Complete [%d] : %s', $this->_pid, $task->title), Zend_Log::DEBUG);
      } else {
        $this->getLog()->log(sprintf('Task Execution Complete with errors [%d] : %s', $this->_pid, $task->title), Zend_Log::DEBUG);
      }
    }

    return $this;
  }



  // Utility
  
  public function handleShutdown()
  {
    // Clear process identifier
    try {
      $this->getAdapter()->delete('engine4_core_processes', array(
        'pid = ?' => $this->_pid,
      ));
    } catch( Exception $e ) {
      $this->getLog()->log('Error clearing pid: ' . $e->__toString(), Zend_Log::ERR);
    }

    // There was no error during execution
    if( !$this->_isExecuting ) {
      return;
    }
    $this->_isExecuting = false;

    // This means there was a fatal error during execution
    $db = $this->getAdapter();

    // Log
    //if( APPLICATION_ENV == 'development' ) {
      $message = '';
      if( function_exists('error_get_last') ) {
        $message = error_get_last();
        $message = $message['type'] . ' ' . $message['message'] . ' ' . $message['file'] . ' ' . $message['line'];
      }
      $this->getLog()->log('Execution Error: ' . $this->_pid . ' - ' . $message, Zend_Log::ERR);
    //}

    // Let's call rollback just in case the fatal error happened inside a transaction
    // This will restore autocommit
    try {
      $db->rollBack();
    } catch( Exception $e ) {
      if( APPLICATION_ENV == 'development' ) {
        $this->getLog()->log(sprintf('Shutdown failed rollback [%d]', $this->_pid), Zend_Log::DEBUG);
      }
    }

    // There was no task executing during error
    if( !($this->_executingTask instanceof Zend_Db_Table_Row_Abstract) ) {
      return;
    }

    // Cleanup executing task
    $task = $this->_executingTask;

    // Update task and release semaphore
    $statusKey = (false ? 'success' : 'failure');
    $affected = $this->update(array(
      'semaphore' => new Zend_Db_Expr('semaphore - 1'),
      'completed_last' => time(),
      'completed_count' => new Zend_Db_Expr('completed_count + 1'),
      $statusKey . '_last' => time(),
      $statusKey . '_count' => new Zend_Db_Expr($statusKey . '_count + 1'),
    ), array(
      'task_id = ?' => $task->task_id,
      'semaphore > ?' => 0,
    ));

    if( 1 !== $affected ) {
      if( APPLICATION_ENV == 'development' ) {
        $this->getLog()->log(sprintf('Task Execution Failed Semaphore Release [%d] : %s', $this->_pid, $task->title), Zend_Log::DEBUG);
      }
      return false;
    }
  }
  
  protected function _executeTaskCheck(Engine_Db_Table_Row $task)
  {
    // We've executed at least as many tasks as count
    if( $this->_runCount >= $this->_config['count'] ) {
      return false;
    }

    // We've reached the time limit for this request
    if( microtime(true) >= _ENGINE_REQUEST_START + $this->_config['time'] ) {
      return false;
    }
    
    // Task is not ready to be executed again yet
    if( $task->timeout > 0 ) {
      if( time() < $task->started_last + $task->timeout ) {
        return false;
      }
      if( time() < $task->completed_last + $task->timeout ) {
        return false;
      }
    }

    // If semaphore limit is reached, and the timeout
    // has been reached, check if lock needs to be cleared
    if( $task->semaphore >= $task->processes ) {
      // Sanity - wth is this?
      if( $task->processes < 1 ) {
        $task->processes = 1;
        $task->save();
        return false;
      }

      // Get all processes matching task plugin
      $taskProcesses = $this->getAdapter()->select()
        ->from('engine4_core_processes')
        ->where('name = ?', $task->plugin)
        ->query()
        ->fetchAll();

      // There was nothing, flush mutexes
      if( empty($taskProcesses) ) {
        $affected = $this->update(array(
          'semaphore' => new Zend_Db_Expr(sprintf('semaphore - %d', $task->semaphore)),
        ), array(
          'task_id = ?' => $task->task_id,
        ));
        
        if( 1 !== $affected ) {
          // Log
          if( APPLICATION_ENV == 'development' ) {
            $this->getLog()->log(sprintf('Execution Mutex Flush Failed [%d] : %s', $this->_pid, $task->title), Zend_Log::DEBUG);
          }
          return false;
        }
      }

      // Check each process
      else {
        $activeProcesses = 0;
        foreach( $taskProcesses as $process ) {
          $started = ( !empty($process['started']) ? $process['started'] : 0 );
          $timeout = ( !empty($process['timeout']) ? $process['timeout'] : $this->_config['timeout'] );

          // It's timed out
          if( time() > $started + $timeout ) {
            // Delete
            $this->getAdapter()->delete('engine4_core_processes', array(
              'pid' => $process['pid'],
            ));
            // Log
            if( APPLICATION_ENV == 'development' ) {
              $this->getLog()->log(sprintf('Process Timeout [%d] : %d ', $this->_pid, $process['pid']), Zend_Log::DEBUG);
            }
            continue;
          }
          
          $activeProcesses++;
        }
        if( $activeProcesses >= $task->processes ) {
          // Log
          if( APPLICATION_ENV == 'development' ) {
            $this->getLog()->log(sprintf('Execution Process Flush Failed [%d] : %d ', $this->_pid, $activeProcesses), Zend_Log::DEBUG);
          }
          return false;
        }
      }
    }
    
    // Task is ready
    return true;
  }

  public function reset($tasks = null, $includeStats = false)
  {
    // Update global locks
    Engine_Api::_()->getDbtable('settings', 'core')->update(array(
      'value' => '',
    ), array(
      'name IN(?)' => array('core.tasks.pid', 'core.tasks.last'),
    ));
    
    if( null !== $tasks ) {

      if( !is_array($tasks) && !($tasks instanceof Zend_Db_Table_Rowset_Abstract) ) {
        $tasks = array($tasks);
      }

      $ids = array();
      foreach( $tasks as $task ) {
        if( is_numeric($task) ) {
          $ids[] = $task;
        } else if( $task instanceof Zend_Db_Table_Row_Abstract &&
            $task->getTable() instanceof Core_Model_DbTable_Tasks ) {
          $ids[] = $task->task_id;
        }
      }
      $tasks = $ids;
    }

    if( null === $tasks || !empty($tasks) ) {
      $where = array();
      if( null === $tasks ) {
        $where = null;
      } else if( empty($tasks) ) {
        return;
      } else if( is_numeric($tasks) ) {
        $where['task_id = ?'] = $tasks;
      } else if( is_array($tasks) ) {
        $where['task_id IN(?)'] = $tasks;
      } else {
        return;
      }

      $data = array(
        'semaphore' => 0,
        'started_last' => 0,
        'completed_last' => 0,
      );

      if( $includeStats ) {
        $data = array_merge($data, array(
          'started_count' => 0,
          'completed_count' => 0,
          'failure_last' => 0,
          'failure_count' => 0,
          'success_last' => 0,
          'success_count' => 0,
        ));
      }
      
      $this->update($data, $where);
    }
    
    return $this;
  }



  // Utility

  protected function _generatePid($asHex = false)
  {
    $max = min(mt_getrandmax(), 2147483647); // Do not allow more than 32bit unsigned
    $val = mt_rand(0, $max);
    if( $asHex ) {
      return str_pad(base_convert($val, 10, 16), 8, '0', STR_PAD_LEFT);
    } else {
      return str_pad(sprintf('%u', $val), 10, '0', STR_PAD_LEFT);
    }
  }
}