View file install/import/DbAbstract.php

File size: 9.04Kb
<?php

abstract class Install_Import_DbAbstract extends Install_Import_Abstract
{
  protected $_fromDb;

  protected $_fromTable;

  //protected $_fromColumns;

  protected $_fromJoins;

  protected $_fromJoinTable;

  protected $_fromJoinCondition;

  protected $_fromWhere;

  protected $_fromTableExists;

  protected $_selectCount = 100;

  protected $_runCount = 0;

  protected $_runComplete = false;

  protected $_successCount = 0;

  protected $_failureCount = 0;

  protected $_totalFromRecords;



  public function __sleep()
  {
    return array_merge(parent::__sleep(), array(
      /*'_fromDb',*/ '_fromTable', /* '_fromColumns', */ '_fromJoins',
      '_fromJoinTable', '_fromJoinCondition', '_fromWhere', '_fromTableExists',
      '_selectCount', '_runCount', '_runComplete', '_successCount',
      '_failureCount', '_totalFromRecords'
    ));
    // Need to reinit _fromDb - (parent) _toDb, _log, _cache
  }


  public function setFromDb(Zend_Db_Adapter_Abstract $fromDb)
  {
    $this->_fromDb = $fromDb;
    return $this;
  }

  /**
   * @return Zend_Db_Adapter_Abstract
   */
  public function getFromDb()
  {
    if( null === $this->_fromDb ) {
      throw new Engine_Exception('No from database');
    }
    return $this->_fromDb;
  }

  public function setFromTable($fromTable)
  {
    $this->_fromTable = $fromTable;
    return $this;
  }

  public function getFromTable()
  {
    if( null === $this->_fromTable ) {
      throw new Engine_Exception('No from table');
    }
    return $this->_fromTable;
  }

  public function getFromTableExists()
  {
    if( null === $this->_fromTable ) {
      return false;
    }
    if( null === $this->_fromTableExists ) {
      $this->_fromTableExists = $this->_tableExists($this->getFromDb(), $this->getFromTable());
    }
    return $this->_fromTableExists;
  }

  public function setSelectCount($selectCount)
  {
    $this->_selectCount = (int) $selectCount;
    return $this;
  }

  public function getSelectCount()
  {
    if( !is_int($this->_selectCount) || $this->_selectCount <= 0 ) {
      $this->_selectCount = 100;
    }
    return $this->_selectCount;
  }

  public function getRunCount()
  {
    return $this->_runCount;
  }

  public function getTotalFromRecords()
  {
    return $this->_totalFromRecords;
  }



  // Custom

  public function init()
  {
    $this->_initPre();

    if( $this->_toTableTruncate ) {
      $this->_truncateToTable();
    }

    $this->_initPost();

    // Try to get total rows now
    try {
      if( null === $this->_totalFromRecords ) {
        if( null !== $this->_fromTable ) {
          $this->_totalFromRecords = (int) $this->getFromDb()->select()
            ->from($this->getFromTable(), new Zend_Db_Expr('COUNT(*)'))
            ->query()
            ->fetchColumn(0);
        }
      }
    } catch( Exception $e ) {
      // Silence
    }
  }

  public function run()
  {
    $this->_message('(START)', 2);

    // This is to trick things that overrode _run to be complete
    $this->_isComplete = true;

    try {

      // Only run this when starting
      if( null === $this->getBatchCount() || $this->_runCount == 0 ) {
        $this->_runPre();
      }

      // Main
      $this->_run();

      // Only run this when ending
      if( null === $this->getBatchCount() || $this->_isComplete ) {
        $this->_runPost();
      }

    } catch( Exception $e ) {
      $this->_error($e->getMessage(), 0);
    }
    
    $this->_message('(END)', 2);
  }

  protected function _run()
  {
    // We're done
    if( $this->_runComplete ) {
      return;
    }

    // Get from db/table, check if it exists
    try {
      $fromDb = $this->getFromDb();
      $fromTable = $this->getFromTable();
    } catch( Exception $e ) {
      return $this->_error($e);
    }

    $fromTables = array();
    $fromTables[] = $fromTable;
    $fromTables = array_merge($fromTables, array_keys((array) $this->_fromJoins));
    $fromTables[] = $this->_fromJoinTable;
    $fromTables = array_filter($fromTables);

    if( !$fromTables || empty($fromTables) || !is_array($fromTables) || count($fromTables) < 1 ) {
      $this->_error('No from table specified.', 0);
      return;
    }

    if( !$this->_tableExists($fromDb, $fromTables) ) {
      $this->_warning(sprintf('One of the source tables does not exist (%s). This usually means you did not have the plugin installed.', join(', ', $fromTables)), 0);
      return;
    }
    
    // Get to db/table, check if it exists
    try {
      $toDb = $this->getToDb();
      $toTable = $this->getToTable();
    } catch( Exception $e ) {
      return $this->_error($e);
    }

    if( !$this->_tableExists($toDb, $toTable) ) {
      $this->_warning(sprintf('One of the target tables does not exist (%s). This usually means you do not have the plugin installed.', $toTable), 0);
      return;
    }
    
    $this->_runComplete = $this->_isComplete = false;

    // Get a count of rows
    if( null === $this->_totalFromRecords ) {
      $this->_totalFromRecords = (int) $this->getFromDb()->select()
        ->from($this->getFromTable(), new Zend_Db_Expr('COUNT(*)'))
        ->query()
        ->fetchColumn(0);
    }
    
    // stats
    $cTotal = 0;
    $cSuccess = 0;
    $cFail = 0;

    // Import
    $select = $fromDb->select();
    $select->from($fromTable);
    if( $this->_fromJoins && is_array($this->_fromJoins) ) {
      foreach( $this->_fromJoins as $table => $condition ) {
        $select->joinLeft($table, $condition);
      }
    } else if( $this->_fromJoinTable && $this->_fromJoinCondition ) {
      $select->joinLeft($this->_fromJoinTable, $this->_fromJoinCondition);
    }

    if( $this->_fromWhere && is_array($this->_fromWhere) ) {
      foreach( $this->_fromWhere as $cond => $value ) {
        $select->where($cond, $value);
      }
    }
    
    // Get ready to select
    $break = false;
    $currentRunCount = 0;
    $batchCount = 0;
    $runStartTime = time();
    $maxAllowedTime = $this->getParam('maxAllowedTime');
    
    $this->_message($select->__toString(), 2);

    // Select in batches of $this->_selectCount
    do {
      $batchStartTime = time();

      // Select
      $tmpSelect = clone $select;
      //$tmpSelect = new Zend_Db_Select();
      $dataSet = $tmpSelect->limit($this->getSelectCount(), $this->_runCount)
        ->query(Zend_Db::FETCH_ASSOC)
        ->fetchAll(Zend_Db::FETCH_ASSOC);

      // Nothing left
      if( !is_array($dataSet) || empty($dataSet) ) {
        $this->_runComplete = $this->_isComplete = true;
        $break = true;
        continue;
      }

      // Iterate over each item in batch
      foreach( $dataSet as $data ) {

        $cTotal++;

        try {

          $newData = $this->_translateRow($data);

          if( !empty($newData) && is_array($newData) ) {
            $toDb->insert($toTable, $newData);
          }

          $cSuccess++;

        } catch( Exception $e ) {

          $this->_error($e);

          $cFail++;

        }

      }

      // Prepare next round
      $num = count($dataSet);
      $this->_runCount += $num;
      $currentRunCount += $num;
      $batchCount++;

      // Ran out of stuff
      if( $num < $this->getSelectCount() ) {
        $this->_runComplete = $this->_isComplete = true;
        $break = true;
      }
      // Finished the batch
      else if( null !== $this->getBatchCount() && $currentRunCount >= $this->getBatchCount() ) {
        $break = true;
      }

      // Check maxAllowedTime
      if( $maxAllowedTime > 0 ) {
        $timeSinceStart = time() - $runStartTime;
        $avgTimeForBatch = $timeSinceStart / $batchCount;
        $approxNextBatchEnd = $timeSinceStart + $avgTimeForBatch;
        // Break if we might go over
        if( $approxNextBatchEnd > $maxAllowedTime ) {
          $break = true;
        }
      }

    } while( !$break );

    $this->_successCount += $cSuccess;
    $this->_failureCount += $cFail;

    // If we're in batch mode, strip success and failure messages and re-add
    if( null !== $this->getBatchCount() ) {
      foreach( $this->_messages as $index => $message ) {
        if( strpos($message, 'Success - ') !== false || strpos($message, 'Failure - ') !== false ) {
          unset($this->_messages[$index]);
        }
      }
      $this->_messages = array_values($this->_messages);
    }

    $this->_message(sprintf('Average batch time: %d, Total time on batches: %d, Max allowed time: %d', @$avgTimeForBatch, (time() - $runStartTime), $maxAllowedTime), 2);

    if( $this->_failureCount <= 0 ) {
      if( null !== $this->_totalFromRecords ) {
        $this->_message(sprintf('Success - %1$d records imported of %2$d total records.', $this->_successCount, $this->_totalFromRecords), 1);
      } else {
        $this->_message(sprintf('Success - %d records imported.', $this->_successCount), 1);
      }
    } else {
      if( null !== $this->_totalFromRecords ) {
        $this->_warning(sprintf('Failure - %1$d records imported, %2$d records failed of %3$d total records.', $this->_successCount, $this->_failureCount, $this->_totalFromRecords), 0);
      } else {
        $this->_warning(sprintf('Failure - %1$d records imported, %2$d records failed.', $this->_successCount, $this->_failureCount), 0);
      }
    }
  }

}