<?php
/**
* SocialEngine
*
* @category Application_Core
* @package Core
* @copyright Copyright 2006-2020 Webligo Developments
* @license http://www.socialengine.com/license/
* @version $Id: Tasks.php 9747 2012-07-26 02:08:08Z john $
* @author John
*/
/**
* @category Application_Core
* @package Core
* @copyright Copyright 2006-2020 Webligo Developments
* @license http://www.socialengine.com/license/
*/
class Core_Model_DbTable_Tasks extends Engine_Db_Table
{
// Properties
protected $_mode;
protected $_pid;
protected $_config;
protected $_external;
protected $_log;
protected $_tasks;
protected $_isExecuting = false;
protected $_isShutdownRegistered = false;
protected $_executingTask;
protected $_runCount = 0;
// Methods
// General
public function init()
{
// Get configuration
$this->_config = (array) Engine_Api::_()->getApi('settings', 'core')->getSetting('core.tasks');
$this->_config = array_merge(array(
'count' => 1, // Max number of tasks run per request
'countidle' => false, // Count idle tasks towards the tasks per request
'interval' => 15, // Minimum interval between triggers
'jobs' => 3, // Max number of jobs run per request
'key' => '', // Access key
'last' => '', // Last time trigger was run (or execute in cron mode)
'mode' => 'curl', // Method to trigger execution
'pid' => '', // Random id for trigger and execution mutex
'processes' => 2, // Number of allowed concurrent processes
// @todo add usec support?
'sleeppre' => 0, // (For debug) Seconds to sleep between acquiring task lock and executing task
'sleepint' => 0, // (For debug) Seconds to sleep inside tasks between sub-tasks
'sleeppost' => 0, // (For debug) Seconds to sleep after executing task and clearing lock
'time' => 120, // Max time allowed per request (auto adjusts if ini_get('max_execution_time') is available)
'timeout' => 900, // Time before a process is considered rogue
), array_filter($this->_config));
// Make pid
$this->_pid = $this->_generatePid();
// Get mode
$this->_mode = ( !empty($this->_config['mode']) ? $this->_config['mode'] : 'curl' );
// Generate key if missing
if( empty($this->_config['key']) ) {
$this->_config['key'] = $this->_generatePid(true);
Engine_Api::_()->getApi('settings', 'core')->setSetting('core.tasks.key', $this->_config['key']);
}
// Adjust time limit if possible
if( empty($this->_config['time']) || $this->_config['time'] <= 0 ) {
$this->_config['time'] = 120;
}
if( function_exists('ini_get') &&
($max_execution_time = ini_get('max_execution_time')) &&
'cli' !== PHP_SAPI ) {
if( -1 == $max_execution_time ) {
// What should we do here
//$this->_config['time'] = 600;
} else if( $max_execution_time < $this->_config['time'] ) {
$this->_config['time'] = floor(0.8 * $max_execution_time);
}
}
// Get default external
$this->_external = array(
'key' => null, // The external access key passed to this request
'pid' => null, // The external process id passed to this request
);
}
// Config
public function getParam($key, $default = null)
{
if( isset($this->_config[$key]) ) {
return $this->_config[$key];
} else {
return $default;
}
}
public function getTriggerType()
{
switch( $this->_mode ) {
case 'none':
case 'cron':
return false;
break;
case 'fork':
case 'asset':
return 'pre';
break;
case 'curl':
case 'socket':
case 'exec':
default:
return 'post';
break;
}
}
// Log
/**
* Get our logger
*
* @return Zend_Log
*/
public function getLog()
{
if( null === $this->_log ) {
$logAdapter = Engine_Api::_()->getDbtable('settings', 'core')
->getSetting('core.log.adapter', 'file');
$log = new Zend_Log();
$log->setEventItem('domain', 'tasks');
try {
switch( $logAdapter ) {
case 'file': default:
$log->addWriter(new Zend_Log_Writer_Stream(APPLICATION_PATH . '/temporary/log/tasks.log'));
break;
case 'database':
$log->addWriter(new Zend_Log_Writer_Db($this->getAdapter(), 'engine4_core_log'));
break;
case 'none':
$log->addWriter(new Zend_Log_Writer_Null());
break;
}
} catch( Exception $e ) {
$log->addWriter(new Zend_Log_Writer_Null());
}
$this->_log = $log;
}
return $this->_log;
}
public function setLog(Zend_Log $log)
{
$this->_log = $log;
return $this;
}
// Tasks
public function getTasks()
{
if( null === $this->_tasks ) {
$select = $this->select()
->where('module IN(?)', (array) Engine_Api::_()->getDbtable('modules', 'core')->getEnabledModuleNames())
;
// Order by last?
if( engine_in_array('priority', $this->info('cols')) ) {
$select->order('priority DESC');
}
$this->_tasks = $this->fetchAll($select);
}
return $this->_tasks;
}
public function getTaskPlugin($task)
{
// Must be a row of tasks or jobs
if( !is_object($task) ) {
throw new Core_Model_Exception(sprintf('Must be given a row of ' .
'Core_Model_DbTable_Tasks, given ' .
'%s', gettype($task)));
} else if( !($task->getTable() instanceof Core_Model_DbTable_Tasks) ) {
throw new Core_Model_Exception(sprintf('Must be given a row of ' .
'Core_Model_DbTable_Tasks, given ' .
'%s', get_class($task)));
}
// Get plugin class
$class = $task->plugin;
// Load class
Engine_Loader::loadClass($class);
// Make sure is a subclass of Core_Plugin_Task_Abstract
if( !is_subclass_of($class, 'Core_Plugin_Task_Abstract') ) {
throw new Core_Model_Exception(sprintf('Task plugin %1$s should extend Core_Plugin_Task_Abstract', $class));
}
// Check for execute method?
if( !method_exists($class, 'execute') ) {
throw new Core_Model_Exception(sprintf('Task plugin %1$s does not have an execute method', $class));
}
// Get plugin object
$plugin = new $class($task);
// Set the log
$plugin->setLog($this->getLog());
return $plugin;
}
// Triggering
public function trigger()
{
// Benchmark trigger
if( 'development' == APPLICATION_ENV ) {
$start = microtime(true);
}
// Do not trigger in cron mode
if( 'cron' === $this->_mode ) {
return $this;
}
// Check if we should trigger
if( !$this->_triggerCheck() ) {
return $this;
}
// Get method
$method = '_triggerMethod' . ucfirst($this->_mode);
// Unknown mode
if( !method_exists($this, $method) ) {
throw new Core_Model_Exception('Unsupported mode: ' . $this->_mode);
}
// Set ignore user abort
$prev = ignore_user_abort(true);
// Trigger
$this->$method();
// Reset ignore user abort
ignore_user_abort($prev);
// Benchmark trigger
if( 'development' == APPLICATION_ENV ) {
$end = microtime(true);
$delta = $end - $start;
$this->getLog()->log(sprintf('Trigger Benchmark [%d] : %f seconds', $this->_pid, $delta), Zend_Log::DEBUG);
}
return $this;
}
protected function _triggerCheck()
{
// Log
if( APPLICATION_ENV == 'development' ) {
$this->getLog()->log(sprintf('Trigger Check [%d] ', $this->_pid), Zend_Log::DEBUG);
}
// Get settings table
$table = Engine_Api::_()->getDbtable('settings', 'core');
// Get last
$last = $table->fetchRow(array('name = ?' => 'core.tasks.last'));
if( null === $last ) {
try {
$table->insert(array(
'name' => 'core.tasks.last',
'value' => '',
));
} catch( Exception $e ) {}
} else {
$last = $last->value;
}
// Get pid
$pid = $table->fetchRow(array('name = ?' => 'core.tasks.pid'));
if( null === $pid ) {
try {
$table->insert(array(
'name' => 'core.tasks.pid',
'value' => '',
));
} catch( Exception $e ) {}
} else {
$pid = $pid->value;
}
// If we are still triggering, make sure delta is larger than the ther interval or timeout
if( (int)$pid && time() < (int)$last + max( (int)$this->_config['interval'], (int)$this->_config['timeout']) ) {
// Log
if( APPLICATION_ENV == 'development' ) {
$this->getLog()->log(sprintf('Trigger Check Failed - Still Triggering + Timeout [%d] - %d < %d',
$this->_pid, time(), $last + max($this->_config['interval'], $this->_config['timeout'])), Zend_Log::DEBUG);
}
return false;
}
// Otherwise, if empty, make sure delta is larger than the min of interval and timeout
else if( !(int)$pid && time() < (int)$last + min((int)$this->_config['interval'], (int)$this->_config['timeout']) ) {
// Log
if( APPLICATION_ENV == 'development' ) {
$this->getLog()->log(sprintf('Trigger Check Failed - Timeout [%d] - %d < %d',
$this->_pid, time(), $last + min($this->_config['interval'], $this->_config['timeout'])), Zend_Log::DEBUG);
}
return false;
}
// Acquire lock (pid)
$affected = $table->update(array(
'value' => $this->_pid,
), array(
'name = ?' => 'core.tasks.pid',
'value = ?' => (string) $pid,
));
if( 1 !== $affected ) {
// Log
if( APPLICATION_ENV == 'development' ) {
$this->getLog()->log(sprintf('Trigger Lock Failed [%d] ', $this->_pid), Zend_Log::DEBUG);
}
return false;
}
// Update last
$affected = $table->update(array(
'value' => time(),
), array(
'name = ?' => 'core.tasks.last',
));
// Clear pid lock
$affected = $table->update(array(
'value' => '',
), array(
'name = ?' => 'core.tasks.pid',
'value = ?' => (string) $this->_pid,
));
if( 1 !== $affected ) {
// Log
if( APPLICATION_ENV == 'development' ) {
$this->getLog()->log(sprintf('Trigger Lock Clear Failed [%d] ', $this->_pid), Zend_Log::DEBUG);
}
}
// Check the processes to see if we are above the limit, and whether or not
// they have timed out
$processes = $this->getAdapter()->select()
->from('engine4_core_processes')
->query()
->fetchAll()
;
$activeProcesess = 0;
foreach( $processes as $process ) {
$started = ( !empty($process['started']) ? $process['started'] : 0 );
$timeout = ( !empty($process['timeout']) ? $process['timeout'] : $this->_config['timeout'] );
// It's timed out
if( time() > $started + $timeout ) {
// Delete
$this->getAdapter()->delete('engine4_core_processes', array(
'pid' => $process['pid'],
));
// Log
if( APPLICATION_ENV == 'development' ) {
$this->getLog()->log(sprintf('Process Timeout [%d] : %d ', $this->_pid, $process['pid']), Zend_Log::DEBUG);
}
continue;
}
$activeProcesess++;
}
// Process limit reached
if( $activeProcesess >= $this->_config['processes'] ) {
// Log
if( APPLICATION_ENV == 'development' ) {
$this->getLog()->log(sprintf('Process Limit Reached [%d] : %d ', $this->_pid, $activeProcesess), Zend_Log::DEBUG);
}
return false;
}
// Log
if( APPLICATION_ENV == 'development' ) {
$this->getLog()->log(sprintf('Trigger Pass [%d] ', $this->_pid), Zend_Log::DEBUG);
}
// Okay, let's go!
return true;
}
protected function _triggerMethodCurl()
{
global $generalConfig;
$code = null;
if( !empty($generalConfig['maintenance']['code']) ) {
$code = $generalConfig['maintenance']['code'];
}
// Setup
$scheme = ( !empty($_SERVER['HTTPS']) && $_SERVER['HTTPS'] == 'on' ? 'https://' : 'http://' );
$host = $_SERVER['HTTP_HOST'];
if( false !== stripos($_SERVER['SERVER_SOFTWARE'], 'IIS') ) {
if( !empty($_SERVER['LOCAL_ADDR']) ) {
$addr = $_SERVER['LOCAL_ADDR'];
} else if( !empty($_SERVER['SERVER_ADDR']) ) {
$addr = $_SERVER['SERVER_ADDR'];
} else if( !empty($_SERVER['HTTP_HOST']) ) {
$addr = $_SERVER['HTTP_HOST'];
} else {
$addr = '127.0.0.1';
}
} else {
$addr = '127.0.0.1';
}
$port = ( !empty($_SERVER['SERVER_PORT']) ? (integer) $_SERVER['SERVER_PORT'] : 80 );
$path = Zend_Controller_Front::getInstance()->getRouter()->assemble(array('controller' => 'utility', 'action' => 'tasks'), 'default', true)
. '?notrigger=1'
. '&key=' . $this->_config['key']
. '&pid=' . $this->_pid
;
$url = $scheme . $host . $path;
// Set options
$curl_handle = curl_init();
curl_setopt($curl_handle, CURLOPT_URL, $url);
curl_setopt($curl_handle, CURLOPT_PORT, $port);
curl_setopt($curl_handle, CURLOPT_RETURNTRANSFER, true);
curl_setopt($curl_handle, CURLOPT_HTTPHEADER, array('Host: ' . $_SERVER['HTTP_HOST']));
// Try to handle basic htauth
if( !empty($_SERVER['PHP_AUTH_USER']) && !empty($_SERVER['PHP_AUTH_PW']) ) {
curl_setopt($curl_handle, CURLOPT_HTTPAUTH, CURLAUTH_BASIC);
curl_setopt($curl_handle, CURLOPT_USERPWD, $_SERVER['PHP_AUTH_USER'] . ':' . $_SERVER['PHP_AUTH_PW']);
}
// Try to handle maintenance mode
if( $code ) {
curl_setopt($curl_handle, CURLOPT_COOKIE, 'en4_maint_code=' . $code);
}
// If SSL is enabled
if ($scheme == 'https://') {
curl_setopt($curl_handle, CURLOPT_VERBOSE, true);
curl_setopt($curl_handle, CURLOPT_SSL_VERIFYPEER, false);
}
curl_exec($curl_handle);
curl_close($curl_handle);
}
protected function _triggerMethodSocket()
{
global $generalConfig;
$code = null;
if( !empty($generalConfig['maintenance']['code']) ) {
$code = $generalConfig['maintenance']['code'];
}
// Setup
$scheme = ( !empty($_SERVER['HTTPS']) && $_SERVER['HTTPS'] == 'on' ? 'https://' : 'http://' );
$host = $_SERVER['HTTP_HOST'];
if( false !== stripos($_SERVER['SERVER_SOFTWARE'], 'IIS') ) {
if( !empty($_SERVER['LOCAL_ADDR']) ) {
$addr = $_SERVER['LOCAL_ADDR'];
} else if( !empty($_SERVER['SERVER_ADDR']) ) {
$addr = $_SERVER['SERVER_ADDR'];
} else if( !empty($_SERVER['HTTP_HOST']) ) {
$addr = $_SERVER['HTTP_HOST'];
} else {
$addr = '127.0.0.1';
}
} else {
$addr = '127.0.0.1';
}
$port = ( !empty($_SERVER['SERVER_PORT']) ? (integer) $_SERVER['SERVER_PORT'] : 80 );
$path = Zend_Controller_Front::getInstance()->getRouter()->assemble(array('controller' => 'utility', 'action' => 'tasks'), 'default', true)
. '?notrigger=1'
. '&key=' . $this->_config['key']
. '&pid=' . $this->_pid
;
// If SSL is enabled, let's turn it off and hope it works
if( $scheme == 'https://' && $port == 443 ) {
$scheme = 'http://';
$port = 80;
}
$url = $scheme . $host . $path;
// Connect
$handle = fsockopen($addr, $port, $errno, $errstr, 0.5);
stream_set_blocking($handle, 1);
if( !$handle ) {
//echo "$errstr ($errno)<br />\n";
return;
} else {
$out = "GET {$path} HTTP/1.1\r\n";
$out .= "Host: {$host}\r\n";
if( !empty($code) ) {
$out .= "Cookie: en4_maint_code={$code}\r\n";
}
$out .= "Connection: Close\r\n\r\n";
fwrite($handle, $out);
// Can't close or the remote connection will cancel
//fclose($handle);
}
}
protected function _triggerMethodCron()
{
return false;
}
protected function _triggerMethodFork()
{
if( !function_exists('pcntl_fork') ) {
$this->getLog()->log('Fork not available', Zend_Log::ERR);
return;
}
// Fork
$pid = pcntl_fork();
if( $pid == -1 ) {
$this->getLog()->log('Could not fork', Zend_Log::ERR);
//die('could not fork');
} else if( $pid ) {
// we are the parent
//pcntl_wait($status); //Protect against Zombie children
} else {
// we are the child
$this->execute();
exit();
}
}
protected function _triggerMethodAsset()
{
// Get url
$path = Zend_Controller_Front::getInstance()->getRouter()->assemble(array('controller' => 'utility', 'action' => 'tasks'), 'default', true)
. '?notrigger=1'
//. '&key=' . $this->_config['key']
. '&pid=' . $this->_pid
. '&mode=' . 'js'
;
$url = $path;
// Add to headScript
$headScript = new Zend_View_Helper_HeadScript();
$headScript->appendFile($url);
}
protected function _triggerMethodExec()
{
// Check if exec exists
if( !function_exists('exec') ) {
$this->getLog()->log('Could not trigger using CLI, exec is disabled.', Zend_Log::ERR);
return false;
}
// @todo should we execute which?
// Run command
$command = 'php -f' . ' '
. escapeshellarg(APPLICATION_PATH . '/application/cli.php')
. ' '
. escapeshellarg('controller=utility,action=tasks,pid=' . $this->_pid . ',key=' . $this->_config['key'])
. ' >> ' . escapeshellarg(APPLICATION_PATH . '/temporary/log/tasks.log') . ' 2>&1 &'
;
$this->getLog()->log($command, Zend_Log::ERR);
exec($command);
}
// Execution
public function execute($params = null)
{
// Benchmark execute
if( 'development' == APPLICATION_ENV ) {
$start = microtime(true);
}
// Set time limit and ignore_user_abort
set_time_limit(0);
$prev = ignore_user_abort(true);
// Register fatal error handler
if( !$this->_isShutdownRegistered ) {
register_shutdown_function(array($this, 'handleShutdown'));
$this->_isShutdownRegistered = true;
}
// Signal execution start
$this->_isExecuting = true;
// Process params
$this->_external = array_merge(
$this->_external,
array_intersect_key(array_merge($_GET, $_POST, (array) $params), $this->_external)
);
// Update last in cron mode for consistency
if( 'cron' === $this->_mode ) {
Engine_Api::_()->getDbtable('settings', 'core')->update(array(
'value' => time(),
), array(
'name = ?' => 'core.tasks.last',
));
}
// Execute
if( $this->_executeCheck() ) {
// Inject current process identifier
$this->getAdapter()->insert('engine4_core_processes', array(
'pid' => $this->_pid,
'parent_pid' => (int) $this->_external['pid'],
'system_pid' => (int) ( function_exists('posix_getpid') ? posix_getpid() : 0 ),
'started' => time(),
'timeout' => 0, // @todo
'name' => '',
));
// Run tasks
foreach( $this->getTasks() as $task ) {
// Check if they were run in the background while other tasks were executing
$task->refresh();
if( $this->_executeTaskCheck($task) ) {
$this->_executeTask($task);
}
}
// Log
if( APPLICATION_ENV == 'development' ) {
$this->getLog()->log(sprintf('Execution Complete [%d] [%d]', $this->_pid, $this->_external['pid']), Zend_Log::DEBUG);
}
}
// Signal execution end
$this->_isExecuting = false;
// Restore abort
ignore_user_abort($prev);
// Benchmark trigger
if( 'development' == APPLICATION_ENV ) {
$end = microtime(true);
$delta = $end - $start;
if( !empty($this->slept) ) {
$deltaUnslept = $delta - $this->slept;
$this->getLog()->log(sprintf('Execution Benchmark [%d] [%d] : %f seconds (including sleep: %f)', $this->_pid, $this->_external['pid'], $deltaUnslept, $delta), Zend_Log::DEBUG);
} else {
$this->getLog()->log(sprintf('Execution Benchmark [%d] [%d] : %f seconds', $this->_pid, $this->_external['pid'], $delta), Zend_Log::DEBUG);
}
}
return $this;
}
public function executeTask(Engine_Db_Table_Row $task)
{
$data = array();
if( time() < $task->started_last + $task->timeout ) {
$data['started_last'] = time() - $task->timeout;
}
if( time() < $task->completed_last + $task->timeout ) {
$data['completed_last'] = time() - $task->timeout;
}
if( !empty($data) ) {
$this->update($data, array(
'task_id = ?' => $task->task_id,
));
}
return $this;
}
protected function _executeCheck()
{
// Log
if( APPLICATION_ENV == 'development' ) {
$this->getLog()->log(sprintf('Execution Check [%d] [%d] ', $this->_pid, $this->_external['pid']), Zend_Log::DEBUG);
}
// Check passkey
if( 'asset' !== $this->_mode && $this->_external['key'] != $this->_config['key'] ) {
return false;
}
// Check processes?
$activeProcesess = $this->getAdapter()->select()
->from('engine4_core_processes', new Zend_Db_Expr('COUNT(pid)'))
->query()
->fetchColumn()
;
// Process limit reached
if( $activeProcesess >= $this->getParam('processes', 2) ) {
// Log
if( APPLICATION_ENV == 'development' ) {
$this->getLog()->log(sprintf('Process Limit Reached [%d] : %d ', $this->_pid, $activeProcesess), Zend_Log::DEBUG);
}
return false;
}
// Log
if( APPLICATION_ENV == 'development' ) {
$this->getLog()->log(sprintf('Execution Pass [%d] [%d] ', $this->_pid, $this->_external['pid']), Zend_Log::DEBUG);
}
return true;
}
protected function _executeTask(Engine_Db_Table_Row $task)
{
// Log
if( APPLICATION_ENV == 'development' ) {
$this->getLog()->log(sprintf('Task Execution Check [%d] : %s', $this->_pid, $task->title), Zend_Log::DEBUG);
}
// Task execution semaphore
$affected = $this->update(array(
'started_last' => time(),
'started_count' => new Zend_Db_Expr('started_count + 1'),
'semaphore' => new Zend_Db_Expr('semaphore + 1'),
), array(
'task_id = ?' => $task->task_id,
'semaphore < ?' => new Zend_Db_Expr('processes'),
));
if( 1 !== $affected ) {
if( APPLICATION_ENV == 'development' ) {
$this->getLog()->log(sprintf('Task Execution Failed Semaphore [%d] : %s', $this->_pid, $task->title), Zend_Log::DEBUG);
}
return false;
}
// Update process identifier
$affected = $this->getAdapter()->update('engine4_core_processes', array(
'name' => $task->plugin,
), array(
'pid = ?' => $this->_pid,
));
if( 1 !== $affected ) {
// Wth?
if( APPLICATION_ENV == 'development' ) {
$this->getLog()->log(sprintf('Execution Failed Process Update [%d] : %s', $this->_pid, $task->plugin), Zend_Log::DEBUG);
}
}
// Debug: sleeppre
$slept = 0;
if( $this->getParam('sleeppre', 0) > 0 ) {
$slept += $this->getParam('sleeppre');
sleep($this->getParam('sleeppre'));
}
// Refresh
$task->refresh();
// Log
if( APPLICATION_ENV == 'development' ) {
$this->getLog()->log(sprintf('Task Execution Pass [%d] : %s', $this->_pid, $task->title), Zend_Log::DEBUG);
}
// ----- MAIN -----
// Set executing task
$this->_executingTask = $task;
// Invoke plugin
$status = false;
$isComplete = true;
$wasIdle = false;
try {
// Get plugin object
$plugin = $this->getTaskPlugin($task);
// Execute
$plugin->execute();
// Check was idle
$wasIdle = $plugin->wasIdle();
// Ok
$status = true;
} catch( Exception $e ) {
// Log exception
$this->getLog()->log($e->__toString(), Zend_Log::ERR);
$status = false;
}
// ----- MAIN -----
// Debug: sleeppost
if( $this->getParam('sleeppost', 0) > 0 ) {
$slept += $this->getParam('sleeppost');
sleep($this->getParam('sleeppost'));
}
if( !isset($this->slept) ) {
$this->slept = 0;
}
$this->slept += $slept;
// Update process identifier
$affected = $this->getAdapter()->update('engine4_core_processes', array(
'name' => '',
), array(
'pid = ?' => $this->_pid,
));
if( 1 !== $affected ) {
// Wth?
if( APPLICATION_ENV == 'development' ) {
$this->getLog()->log(sprintf('Execution Failed Process Update (post) [%d] : %s', $this->_pid, $task->plugin), Zend_Log::DEBUG);
}
}
// Update task and release semaphore
$statusKey = ($status ? 'success' : 'failure');
$affected = $this->update(array(
'semaphore' => new Zend_Db_Expr('semaphore - 1'),
'completed_last' => time(),
'completed_count' => new Zend_Db_Expr('completed_count + 1'),
$statusKey . '_last' => time(),
$statusKey . '_count' => new Zend_Db_Expr($statusKey . '_count + 1'),
), array(
'task_id = ?' => $task->task_id,
'semaphore > ?' => 0,
));
if( 1 !== $affected ) {
if( APPLICATION_ENV == 'development' ) {
$this->getLog()->log(sprintf('Task Execution Failed Semaphore Release [%d] : %s', $this->_pid, $task->title), Zend_Log::DEBUG);
}
return false;
}
// Update count
if( !$wasIdle ) {
$this->_runCount++;
}
// Remove executing task
$this->_executingTask = null;
// Log
if( APPLICATION_ENV == 'development' ) {
if( $status ) {
$this->getLog()->log(sprintf('Task Execution Complete [%d] : %s', $this->_pid, $task->title), Zend_Log::DEBUG);
} else {
$this->getLog()->log(sprintf('Task Execution Complete with errors [%d] : %s', $this->_pid, $task->title), Zend_Log::DEBUG);
}
}
return $this;
}
// Utility
public function handleShutdown()
{
// Clear process identifier
try {
$this->getAdapter()->delete('engine4_core_processes', array(
'pid = ?' => $this->_pid,
));
} catch( Exception $e ) {
$this->getLog()->log('Error clearing pid: ' . $e->__toString(), Zend_Log::ERR);
}
// There was no error during execution
if( !$this->_isExecuting ) {
return;
}
$this->_isExecuting = false;
// This means there was a fatal error during execution
$db = $this->getAdapter();
// Log
//if( APPLICATION_ENV == 'development' ) {
$message = '';
if( function_exists('error_get_last') ) {
$message = error_get_last();
$message = $message['type'] . ' ' . $message['message'] . ' ' . $message['file'] . ' ' . $message['line'];
}
$this->getLog()->log('Execution Error: ' . $this->_pid . ' - ' . $message, Zend_Log::ERR);
//}
// Let's call rollback just in case the fatal error happened inside a transaction
// This will restore autocommit
try {
$db->rollBack();
} catch( Exception $e ) {
if( APPLICATION_ENV == 'development' ) {
$this->getLog()->log(sprintf('Shutdown failed rollback [%d]', $this->_pid), Zend_Log::DEBUG);
}
}
// There was no task executing during error
if( !($this->_executingTask instanceof Zend_Db_Table_Row_Abstract) ) {
return;
}
// Cleanup executing task
$task = $this->_executingTask;
// Update task and release semaphore
$statusKey = (false ? 'success' : 'failure');
$affected = $this->update(array(
'semaphore' => new Zend_Db_Expr('semaphore - 1'),
'completed_last' => time(),
'completed_count' => new Zend_Db_Expr('completed_count + 1'),
$statusKey . '_last' => time(),
$statusKey . '_count' => new Zend_Db_Expr($statusKey . '_count + 1'),
), array(
'task_id = ?' => $task->task_id,
'semaphore > ?' => 0,
));
if( 1 !== $affected ) {
if( APPLICATION_ENV == 'development' ) {
$this->getLog()->log(sprintf('Task Execution Failed Semaphore Release [%d] : %s', $this->_pid, $task->title), Zend_Log::DEBUG);
}
return false;
}
}
protected function _executeTaskCheck(Engine_Db_Table_Row $task)
{
// We've executed at least as many tasks as count
if( $this->_runCount >= $this->_config['count'] ) {
return false;
}
// We've reached the time limit for this request
if( microtime(true) >= _ENGINE_REQUEST_START + $this->_config['time'] ) {
return false;
}
// Task is not ready to be executed again yet
if( $task->timeout > 0 ) {
if( time() < $task->started_last + $task->timeout ) {
return false;
}
if( time() < $task->completed_last + $task->timeout ) {
return false;
}
}
// If semaphore limit is reached, and the timeout
// has been reached, check if lock needs to be cleared
if( $task->semaphore >= $task->processes ) {
// Sanity - wth is this?
if( $task->processes < 1 ) {
$task->processes = 1;
$task->save();
return false;
}
// Get all processes matching task plugin
$taskProcesses = $this->getAdapter()->select()
->from('engine4_core_processes')
->where('name = ?', $task->plugin)
->query()
->fetchAll();
// There was nothing, flush mutexes
if( empty($taskProcesses) ) {
$affected = $this->update(array(
'semaphore' => new Zend_Db_Expr(sprintf('semaphore - %d', $task->semaphore)),
), array(
'task_id = ?' => $task->task_id,
));
if( 1 !== $affected ) {
// Log
if( APPLICATION_ENV == 'development' ) {
$this->getLog()->log(sprintf('Execution Mutex Flush Failed [%d] : %s', $this->_pid, $task->title), Zend_Log::DEBUG);
}
return false;
}
}
// Check each process
else {
$activeProcesses = 0;
foreach( $taskProcesses as $process ) {
$started = ( !empty($process['started']) ? $process['started'] : 0 );
$timeout = ( !empty($process['timeout']) ? $process['timeout'] : $this->_config['timeout'] );
// It's timed out
if( time() > $started + $timeout ) {
// Delete
$this->getAdapter()->delete('engine4_core_processes', array(
'pid' => $process['pid'],
));
// Log
if( APPLICATION_ENV == 'development' ) {
$this->getLog()->log(sprintf('Process Timeout [%d] : %d ', $this->_pid, $process['pid']), Zend_Log::DEBUG);
}
continue;
}
$activeProcesses++;
}
if( $activeProcesses >= $task->processes ) {
// Log
if( APPLICATION_ENV == 'development' ) {
$this->getLog()->log(sprintf('Execution Process Flush Failed [%d] : %d ', $this->_pid, $activeProcesses), Zend_Log::DEBUG);
}
return false;
}
}
}
// Task is ready
return true;
}
public function reset($tasks = null, $includeStats = false)
{
// Update global locks
Engine_Api::_()->getDbtable('settings', 'core')->update(array(
'value' => '',
), array(
'name IN(?)' => array('core.tasks.pid', 'core.tasks.last'),
));
if( null !== $tasks ) {
if( !is_array($tasks) && !($tasks instanceof Zend_Db_Table_Rowset_Abstract) ) {
$tasks = array($tasks);
}
$ids = array();
foreach( $tasks as $task ) {
if( is_numeric($task) ) {
$ids[] = $task;
} else if( $task instanceof Zend_Db_Table_Row_Abstract &&
$task->getTable() instanceof Core_Model_DbTable_Tasks ) {
$ids[] = $task->task_id;
}
}
$tasks = $ids;
}
if( null === $tasks || !empty($tasks) ) {
$where = array();
if( null === $tasks ) {
$where = null;
} else if( empty($tasks) ) {
return;
} else if( is_numeric($tasks) ) {
$where['task_id = ?'] = $tasks;
} else if( is_array($tasks) ) {
$where['task_id IN(?)'] = $tasks;
} else {
return;
}
$data = array(
'semaphore' => 0,
'started_last' => 0,
'completed_last' => 0,
);
if( $includeStats ) {
$data = array_merge($data, array(
'started_count' => 0,
'completed_count' => 0,
'failure_last' => 0,
'failure_count' => 0,
'success_last' => 0,
'success_count' => 0,
));
}
$this->update($data, $where);
}
return $this;
}
// Utility
protected function _generatePid($asHex = false)
{
$max = min(mt_getrandmax(), 2147483647); // Do not allow more than 32bit unsigned
$val = mt_rand(0, $max);
if( $asHex ) {
return str_pad(base_convert($val, 10, 16), 8, '0', STR_PAD_LEFT);
} else {
return str_pad(sprintf('%u', $val), 10, '0', STR_PAD_LEFT);
}
}
}