Page MenuHomestyx hydra

No OneTemporary

diff --git a/src/infrastructure/daemon/workers/PhabricatorWorker.php b/src/infrastructure/daemon/workers/PhabricatorWorker.php
index c448e7221a..473649ba19 100644
--- a/src/infrastructure/daemon/workers/PhabricatorWorker.php
+++ b/src/infrastructure/daemon/workers/PhabricatorWorker.php
@@ -1,286 +1,288 @@
<?php
/**
* @task config Configuring Retries and Failures
*/
abstract class PhabricatorWorker extends Phobject {
private $data;
private static $runAllTasksInProcess = false;
private $queuedTasks = array();
private $currentWorkerTask;
// NOTE: Lower priority numbers execute first. The priority numbers have to
// have the same ordering that IDs do (lowest first) so MySQL can use a
// multipart key across both of them efficiently.
const PRIORITY_ALERTS = 1000;
const PRIORITY_DEFAULT = 2000;
const PRIORITY_BULK = 3000;
const PRIORITY_IMPORT = 4000;
/**
* Special owner indicating that the task has yielded.
*/
const YIELD_OWNER = '(yield)';
/* -( Configuring Retries and Failures )----------------------------------- */
/**
* Return the number of seconds this worker needs hold a lease on the task for
* while it performs work. For most tasks you can leave this at `null`, which
* will give you a default lease (currently 2 hours).
*
* For tasks which may take a very long time to complete, you should return
* an upper bound on the amount of time the task may require.
*
* @return int|null Number of seconds this task needs to remain leased for,
* or null for a default lease.
*
* @task config
*/
public function getRequiredLeaseTime() {
return null;
}
/**
* Return the maximum number of times this task may be retried before it is
* considered permanently failed. By default, tasks retry indefinitely. You
* can throw a @{class:PhabricatorWorkerPermanentFailureException} to cause an
* immediate permanent failure.
*
* @return int|null Number of times the task will retry before permanent
* failure. Return `null` to retry indefinitely.
*
* @task config
*/
public function getMaximumRetryCount() {
return null;
}
/**
* Return the number of seconds a task should wait after a failure before
* retrying. For most tasks you can leave this at `null`, which will give you
* a short default retry period (currently 60 seconds).
*
* @param PhabricatorWorkerTask The task itself. This object is probably
* useful mostly to examine the failure count
* if you want to implement staggered retries,
* or to examine the execution exception if
* you want to react to different failures in
* different ways.
* @return int|null Number of seconds to wait between retries,
* or null for a default retry period
* (currently 60 seconds).
*
* @task config
*/
public function getWaitBeforeRetry(PhabricatorWorkerTask $task) {
return null;
}
public function setCurrentWorkerTask(PhabricatorWorkerTask $task) {
$this->currentWorkerTask = $task;
return $this;
}
public function getCurrentWorkerTask() {
return $this->currentWorkerTask;
}
public function getCurrentWorkerTaskID() {
$task = $this->getCurrentWorkerTask();
if (!$task) {
return null;
}
return $task->getID();
}
abstract protected function doWork();
final public function __construct($data) {
$this->data = $data;
}
final protected function getTaskData() {
return $this->data;
}
final protected function getTaskDataValue($key, $default = null) {
$data = $this->getTaskData();
if (!is_array($data)) {
throw new PhabricatorWorkerPermanentFailureException(
pht('Expected task data to be a dictionary.'));
}
return idx($data, $key, $default);
}
final public function executeTask() {
$this->doWork();
}
final public static function scheduleTask(
$task_class,
$data,
$options = array()) {
PhutilTypeSpec::checkMap(
$options,
array(
'priority' => 'optional int|null',
'objectPHID' => 'optional string|null',
'delayUntil' => 'optional int|null',
));
$priority = idx($options, 'priority');
if ($priority === null) {
$priority = self::PRIORITY_DEFAULT;
}
$object_phid = idx($options, 'objectPHID');
$task = id(new PhabricatorWorkerActiveTask())
->setTaskClass($task_class)
->setData($data)
->setPriority($priority)
->setObjectPHID($object_phid);
$delay = idx($options, 'delayUntil');
if ($delay) {
$task->setLeaseExpires($delay);
}
if (self::$runAllTasksInProcess) {
// Do the work in-process.
$worker = newv($task_class, array($data));
while (true) {
try {
$worker->doWork();
foreach ($worker->getQueuedTasks() as $queued_task) {
- list($queued_class, $queued_data, $queued_priority) = $queued_task;
- $queued_options = array('priority' => $queued_priority);
+ list($queued_class, $queued_data, $queued_options) = $queued_task;
self::scheduleTask($queued_class, $queued_data, $queued_options);
}
break;
} catch (PhabricatorWorkerYieldException $ex) {
phlog(
pht(
'In-process task "%s" yielded for %s seconds, sleeping...',
$task_class,
$ex->getDuration()));
sleep($ex->getDuration());
}
}
// Now, save a task row and immediately archive it so we can return an
// object with a valid ID.
$task->openTransaction();
$task->save();
$archived = $task->archiveTask(
PhabricatorWorkerArchiveTask::RESULT_SUCCESS,
0);
$task->saveTransaction();
return $archived;
} else {
$task->save();
return $task;
}
}
public function renderForDisplay(PhabricatorUser $viewer) {
return null;
}
/**
* Set this flag to execute scheduled tasks synchronously, in the same
* process. This is useful for debugging, and otherwise dramatically worse
* in every way imaginable.
*/
public static function setRunAllTasksInProcess($all) {
self::$runAllTasksInProcess = $all;
}
final protected function log($pattern /* , ... */) {
$console = PhutilConsole::getConsole();
$argv = func_get_args();
call_user_func_array(array($console, 'writeLog'), $argv);
return $this;
}
/**
* Queue a task to be executed after this one succeeds.
*
* The followup task will be queued only if this task completes cleanly.
*
* @param string Task class to queue.
* @param array Data for the followup task.
- * @param int|null Priority for the followup task.
+ * @param array Options for the followup task.
* @return this
*/
- final protected function queueTask($class, array $data, $priority = null) {
- $this->queuedTasks[] = array($class, $data, $priority);
+ final protected function queueTask(
+ $class,
+ array $data,
+ array $options = array()) {
+ $this->queuedTasks[] = array($class, $data, $options);
return $this;
}
/**
* Get tasks queued as followups by @{method:queueTask}.
*
* @return list<tuple<string, wild, int|null>> Queued task specifications.
*/
final public function getQueuedTasks() {
return $this->queuedTasks;
}
/**
* Awaken tasks that have yielded.
*
* Reschedules the specified tasks if they are currently queued in a yielded,
* unleased, unretried state so they'll execute sooner. This can let the
* queue avoid unnecessary waits.
*
* This method does not provide any assurances about when these tasks will
* execute, or even guarantee that it will have any effect at all.
*
* @param list<id> List of task IDs to try to awaken.
* @return void
*/
final public static function awakenTaskIDs(array $ids) {
if (!$ids) {
return;
}
$table = new PhabricatorWorkerActiveTask();
$conn_w = $table->establishConnection('w');
// NOTE: At least for now, we're keeping these tasks yielded, just
// pretending that they threw a shorter yield than they really did.
// Overlap the windows here to handle minor client/server time differences
// and because it's likely correct to push these tasks to the head of their
// respective priorities. There is a good chance they are ready to execute.
$window = phutil_units('1 hour in seconds');
$epoch_ago = (PhabricatorTime::getNow() - $window);
queryfx(
$conn_w,
'UPDATE %T SET leaseExpires = %d
WHERE id IN (%Ld)
AND leaseOwner = %s
AND leaseExpires > %d
AND failureCount = 0',
$table->getTableName(),
$epoch_ago,
$ids,
self::YIELD_OWNER,
$epoch_ago);
}
}
diff --git a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php
index 6c2f5b4461..57a1794ec3 100644
--- a/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php
+++ b/src/infrastructure/daemon/workers/storage/PhabricatorWorkerActiveTask.php
@@ -1,233 +1,234 @@
<?php
final class PhabricatorWorkerActiveTask extends PhabricatorWorkerTask {
protected $failureTime;
private $serverTime;
private $localTime;
protected function getConfiguration() {
$parent = parent::getConfiguration();
$config = array(
self::CONFIG_IDS => self::IDS_COUNTER,
self::CONFIG_TIMESTAMPS => false,
self::CONFIG_KEY_SCHEMA => array(
'dataID' => array(
'columns' => array('dataID'),
'unique' => true,
),
'taskClass' => array(
'columns' => array('taskClass'),
),
'leaseExpires' => array(
'columns' => array('leaseExpires'),
),
'leaseOwner' => array(
'columns' => array('leaseOwner(16)'),
),
'key_failuretime' => array(
'columns' => array('failureTime'),
),
'leaseOwner_2' => array(
'columns' => array('leaseOwner', 'priority', 'id'),
),
) + $parent[self::CONFIG_KEY_SCHEMA],
);
$config[self::CONFIG_COLUMN_SCHEMA] = array(
// T6203/NULLABILITY
// This isn't nullable in the archive table, so at a minimum these
// should be the same.
'dataID' => 'uint32?',
) + $parent[self::CONFIG_COLUMN_SCHEMA];
return $config + $parent;
}
public function setServerTime($server_time) {
$this->serverTime = $server_time;
$this->localTime = time();
return $this;
}
public function setLeaseDuration($lease_duration) {
$this->checkLease();
$server_lease_expires = $this->serverTime + $lease_duration;
$this->setLeaseExpires($server_lease_expires);
// NOTE: This is primarily to allow unit tests to set negative lease
// durations so they don't have to wait around for leases to expire. We
// check that the lease is valid above.
return $this->forceSaveWithoutLease();
}
public function save() {
$this->checkLease();
return $this->forceSaveWithoutLease();
}
public function forceSaveWithoutLease() {
$is_new = !$this->getID();
if ($is_new) {
$this->failureCount = 0;
}
if ($is_new && ($this->getData() !== null)) {
$data = new PhabricatorWorkerTaskData();
$data->setData($this->getData());
$data->save();
$this->setDataID($data->getID());
}
return parent::save();
}
protected function checkLease() {
$owner = $this->leaseOwner;
if (!$owner) {
return;
}
if ($owner == PhabricatorWorker::YIELD_OWNER) {
return;
}
$current_server_time = $this->serverTime + (time() - $this->localTime);
if ($current_server_time >= $this->leaseExpires) {
throw new Exception(
pht(
'Trying to update Task %d (%s) after lease expiration!',
$this->getID(),
$this->getTaskClass()));
}
}
public function delete() {
throw new Exception(
pht(
'Active tasks can not be deleted directly. '.
'Use %s to move tasks to the archive.',
'archiveTask()'));
}
public function archiveTask($result, $duration) {
if ($this->getID() === null) {
throw new Exception(
pht("Attempting to archive a task which hasn't been saved!"));
}
$this->checkLease();
$archive = id(new PhabricatorWorkerArchiveTask())
->setID($this->getID())
->setTaskClass($this->getTaskClass())
->setLeaseOwner($this->getLeaseOwner())
->setLeaseExpires($this->getLeaseExpires())
->setFailureCount($this->getFailureCount())
->setDataID($this->getDataID())
->setPriority($this->getPriority())
->setObjectPHID($this->getObjectPHID())
->setResult($result)
->setDuration($duration);
// NOTE: This deletes the active task (this object)!
$archive->save();
return $archive;
}
public function executeTask() {
// We do this outside of the try .. catch because we don't have permission
// to release the lease otherwise.
$this->checkLease();
$did_succeed = false;
$worker = null;
try {
$worker = $this->getWorkerInstance();
$worker->setCurrentWorkerTask($this);
$maximum_failures = $worker->getMaximumRetryCount();
if ($maximum_failures !== null) {
if ($this->getFailureCount() > $maximum_failures) {
throw new PhabricatorWorkerPermanentFailureException(
pht(
'Task % has exceeded the maximum number of failures (%d).',
$this->getID(),
$maximum_failures));
}
}
$lease = $worker->getRequiredLeaseTime();
if ($lease !== null) {
$this->setLeaseDuration($lease);
}
$t_start = microtime(true);
$worker->executeTask();
$t_end = microtime(true);
$duration = (int)(1000000 * ($t_end - $t_start));
$result = $this->archiveTask(
PhabricatorWorkerArchiveTask::RESULT_SUCCESS,
$duration);
$did_succeed = true;
} catch (PhabricatorWorkerPermanentFailureException $ex) {
$result = $this->archiveTask(
PhabricatorWorkerArchiveTask::RESULT_FAILURE,
0);
$result->setExecutionException($ex);
} catch (PhabricatorWorkerYieldException $ex) {
$this->setExecutionException($ex);
$this->setLeaseOwner(PhabricatorWorker::YIELD_OWNER);
$retry = $ex->getDuration();
$retry = max($retry, 5);
// NOTE: As a side effect, this saves the object.
$this->setLeaseDuration($retry);
$result = $this;
} catch (Exception $ex) {
$this->setExecutionException($ex);
$this->setFailureCount($this->getFailureCount() + 1);
$this->setFailureTime(time());
$retry = null;
if ($worker) {
$retry = $worker->getWaitBeforeRetry($this);
}
$retry = coalesce(
$retry,
PhabricatorWorkerLeaseQuery::getDefaultWaitBeforeRetry());
// NOTE: As a side effect, this saves the object.
$this->setLeaseDuration($retry);
$result = $this;
}
// NOTE: If this throws, we don't want it to cause the task to fail again,
// so execute it out here and just let the exception escape.
if ($did_succeed) {
foreach ($worker->getQueuedTasks() as $task) {
- list($class, $data) = $task;
- PhabricatorWorker::scheduleTask(
- $class,
- $data,
- array(
- 'priority' => (int)$this->getPriority(),
- ));
+ list($class, $data, $options) = $task;
+
+ // Default the new task priority to our own priority.
+ $options = $options + array(
+ 'priority' => (int)$this->getPriority(),
+ );
+
+ PhabricatorWorker::scheduleTask($class, $data, $options);
}
}
return $result;
}
}

File Metadata

Mime Type
text/x-diff
Expires
Tue, Mar 17, 1:56 AM (1 h, 19 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
964199
Default Alt Text
(16 KB)

Event Timeline