Page MenuHomestyx hydra

No OneTemporary

diff --git a/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php b/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php
index d5ba74a30e..b09b3960a1 100644
--- a/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php
+++ b/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php
@@ -1,824 +1,826 @@
<?php
/**
* Manages repository synchronization for cluster repositories.
*
* @task config Configuring Synchronization
* @task sync Cluster Synchronization
* @task internal Internals
*/
final class DiffusionRepositoryClusterEngine extends Phobject {
private $repository;
private $viewer;
private $logger;
private $clusterWriteLock;
private $clusterWriteVersion;
private $clusterWriteOwner;
/* -( Configuring Synchronization )---------------------------------------- */
public function setRepository(PhabricatorRepository $repository) {
$this->repository = $repository;
return $this;
}
public function getRepository() {
return $this->repository;
}
public function setViewer(PhabricatorUser $viewer) {
$this->viewer = $viewer;
return $this;
}
public function getViewer() {
return $this->viewer;
}
public function setLog(DiffusionRepositoryClusterEngineLogInterface $log) {
$this->logger = $log;
return $this;
}
/* -( Cluster Synchronization )-------------------------------------------- */
/**
* Synchronize repository version information after creating a repository.
*
* This initializes working copy versions for all currently bound devices to
* 0, so that we don't get stuck making an ambiguous choice about which
* devices are leaders when we later synchronize before a read.
*
* @task sync
*/
public function synchronizeWorkingCopyAfterCreation() {
if (!$this->shouldEnableSynchronization(false)) {
return;
}
$repository = $this->getRepository();
$repository_phid = $repository->getPHID();
$service = $repository->loadAlmanacService();
if (!$service) {
throw new Exception(pht('Failed to load repository cluster service.'));
}
$bindings = $service->getActiveBindings();
foreach ($bindings as $binding) {
PhabricatorRepositoryWorkingCopyVersion::updateVersion(
$repository_phid,
$binding->getDevicePHID(),
0);
}
return $this;
}
/**
* @task sync
*/
public function synchronizeWorkingCopyAfterHostingChange() {
if (!$this->shouldEnableSynchronization(false)) {
return;
}
$repository = $this->getRepository();
$repository_phid = $repository->getPHID();
$versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
$repository_phid);
$versions = mpull($versions, null, 'getDevicePHID');
// After converting a hosted repository to observed, or vice versa, we
// need to reset version numbers because the clocks for observed and hosted
// repositories run on different units.
// We identify all the cluster leaders and reset their version to 0.
// We identify all the cluster followers and demote them.
// This allows the cluster to start over again at version 0 but keep the
// same leaders.
if ($versions) {
$max_version = (int)max(mpull($versions, 'getRepositoryVersion'));
foreach ($versions as $version) {
$device_phid = $version->getDevicePHID();
if ($version->getRepositoryVersion() == $max_version) {
PhabricatorRepositoryWorkingCopyVersion::updateVersion(
$repository_phid,
$device_phid,
0);
} else {
PhabricatorRepositoryWorkingCopyVersion::demoteDevice(
$repository_phid,
$device_phid);
}
}
}
return $this;
}
/**
* @task sync
*/
public function synchronizeWorkingCopyBeforeRead() {
if (!$this->shouldEnableSynchronization(true)) {
return;
}
$repository = $this->getRepository();
$repository_phid = $repository->getPHID();
$device = AlmanacKeys::getLiveDevice();
$device_phid = $device->getPHID();
$read_lock = PhabricatorRepositoryWorkingCopyVersion::getReadLock(
$repository_phid,
$device_phid);
$lock_wait = phutil_units('2 minutes in seconds');
$this->logLine(
pht(
'Acquiring read lock for repository "%s" on device "%s"...',
$repository->getDisplayName(),
$device->getName()));
try {
$start = PhabricatorTime::getNow();
$read_lock->lock($lock_wait);
$waited = (PhabricatorTime::getNow() - $start);
if ($waited) {
$this->logLine(
pht(
'Acquired read lock after %s second(s).',
new PhutilNumber($waited)));
} else {
$this->logLine(
pht(
'Acquired read lock immediately.'));
}
- } catch (Exception $ex) {
+ } catch (PhutilLockException $ex) {
throw new PhutilProxyException(
pht(
'Failed to acquire read lock after waiting %s second(s). You '.
- 'may be able to retry later.',
- new PhutilNumber($lock_wait)),
+ 'may be able to retry later. (%s)',
+ new PhutilNumber($lock_wait),
+ $ex->getHint()),
$ex);
}
$versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
$repository_phid);
$versions = mpull($versions, null, 'getDevicePHID');
$this_version = idx($versions, $device_phid);
if ($this_version) {
$this_version = (int)$this_version->getRepositoryVersion();
} else {
$this_version = -1;
}
if ($versions) {
// This is the normal case, where we have some version information and
// can identify which nodes are leaders. If the current node is not a
// leader, we want to fetch from a leader and then update our version.
$max_version = (int)max(mpull($versions, 'getRepositoryVersion'));
if ($max_version > $this_version) {
if ($repository->isHosted()) {
$fetchable = array();
foreach ($versions as $version) {
if ($version->getRepositoryVersion() == $max_version) {
$fetchable[] = $version->getDevicePHID();
}
}
$this->synchronizeWorkingCopyFromDevices($fetchable);
} else {
$this->synchornizeWorkingCopyFromRemote();
}
PhabricatorRepositoryWorkingCopyVersion::updateVersion(
$repository_phid,
$device_phid,
$max_version);
} else {
$this->logLine(
pht(
'Device "%s" is already a cluster leader and does not need '.
'to be synchronized.',
$device->getName()));
}
$result_version = $max_version;
} else {
// If no version records exist yet, we need to be careful, because we
// can not tell which nodes are leaders.
// There might be several nodes with arbitrary existing data, and we have
// no way to tell which one has the "right" data. If we pick wrong, we
// might erase some or all of the data in the repository.
// Since this is dangerous, we refuse to guess unless there is only one
// device. If we're the only device in the group, we obviously must be
// a leader.
$service = $repository->loadAlmanacService();
if (!$service) {
throw new Exception(pht('Failed to load repository cluster service.'));
}
$bindings = $service->getActiveBindings();
$device_map = array();
foreach ($bindings as $binding) {
$device_map[$binding->getDevicePHID()] = true;
}
if (count($device_map) > 1) {
throw new Exception(
pht(
'Repository "%s" exists on more than one device, but no device '.
'has any repository version information. Phabricator can not '.
'guess which copy of the existing data is authoritative. Promote '.
'a device or see "Ambiguous Leaders" in the documentation.',
$repository->getDisplayName()));
}
if (empty($device_map[$device->getPHID()])) {
throw new Exception(
pht(
'Repository "%s" is being synchronized on device "%s", but '.
'this device is not bound to the corresponding cluster '.
'service ("%s").',
$repository->getDisplayName(),
$device->getName(),
$service->getName()));
}
// The current device is the only device in service, so it must be a
// leader. We can safely have any future nodes which come online read
// from it.
PhabricatorRepositoryWorkingCopyVersion::updateVersion(
$repository_phid,
$device_phid,
0);
$result_version = 0;
}
$read_lock->unlock();
return $result_version;
}
/**
* @task sync
*/
public function synchronizeWorkingCopyBeforeWrite() {
if (!$this->shouldEnableSynchronization(true)) {
return;
}
$repository = $this->getRepository();
$viewer = $this->getViewer();
$repository_phid = $repository->getPHID();
$device = AlmanacKeys::getLiveDevice();
$device_phid = $device->getPHID();
$table = new PhabricatorRepositoryWorkingCopyVersion();
$locked_connection = $table->establishConnection('w');
$write_lock = PhabricatorRepositoryWorkingCopyVersion::getWriteLock(
$repository_phid);
$write_lock->useSpecificConnection($locked_connection);
$this->logLine(
pht(
'Acquiring write lock for repository "%s"...',
$repository->getDisplayName()));
$lock_wait = phutil_units('2 minutes in seconds');
try {
$write_wait_start = microtime(true);
$start = PhabricatorTime::getNow();
$step_wait = 1;
while (true) {
try {
$write_lock->lock((int)floor($step_wait));
$write_wait_end = microtime(true);
break;
} catch (PhutilLockException $ex) {
$waited = (PhabricatorTime::getNow() - $start);
if ($waited > $lock_wait) {
throw $ex;
}
$this->logActiveWriter($viewer, $repository);
}
// Wait a little longer before the next message we print.
$step_wait = $step_wait + 0.5;
$step_wait = min($step_wait, 3);
}
$waited = (PhabricatorTime::getNow() - $start);
if ($waited) {
$this->logLine(
pht(
'Acquired write lock after %s second(s).',
new PhutilNumber($waited)));
} else {
$this->logLine(
pht(
'Acquired write lock immediately.'));
}
- } catch (Exception $ex) {
+ } catch (PhutilLockException $ex) {
throw new PhutilProxyException(
pht(
'Failed to acquire write lock after waiting %s second(s). You '.
- 'may be able to retry later.',
- new PhutilNumber($lock_wait)),
+ 'may be able to retry later. (%s)',
+ new PhutilNumber($lock_wait),
+ $ex->getHint()),
$ex);
}
$versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
$repository_phid);
foreach ($versions as $version) {
if (!$version->getIsWriting()) {
continue;
}
throw new Exception(
pht(
'An previous write to this repository was interrupted; refusing '.
'new writes. This issue requires operator intervention to resolve, '.
'see "Write Interruptions" in the "Cluster: Repositories" in the '.
'documentation for instructions.'));
}
$read_wait_start = microtime(true);
try {
$max_version = $this->synchronizeWorkingCopyBeforeRead();
} catch (Exception $ex) {
$write_lock->unlock();
throw $ex;
}
$read_wait_end = microtime(true);
$pid = getmypid();
$hash = Filesystem::readRandomCharacters(12);
$this->clusterWriteOwner = "{$pid}.{$hash}";
PhabricatorRepositoryWorkingCopyVersion::willWrite(
$locked_connection,
$repository_phid,
$device_phid,
array(
'userPHID' => $viewer->getPHID(),
'epoch' => PhabricatorTime::getNow(),
'devicePHID' => $device_phid,
),
$this->clusterWriteOwner);
$this->clusterWriteVersion = $max_version;
$this->clusterWriteLock = $write_lock;
$write_wait = ($write_wait_end - $write_wait_start);
$read_wait = ($read_wait_end - $read_wait_start);
$log = $this->logger;
if ($log) {
$log->writeClusterEngineLogProperty('readWait', $read_wait);
$log->writeClusterEngineLogProperty('writeWait', $write_wait);
}
}
public function synchronizeWorkingCopyAfterDiscovery($new_version) {
if (!$this->shouldEnableSynchronization(true)) {
return;
}
$repository = $this->getRepository();
$repository_phid = $repository->getPHID();
if ($repository->isHosted()) {
return;
}
$viewer = $this->getViewer();
$device = AlmanacKeys::getLiveDevice();
$device_phid = $device->getPHID();
// NOTE: We are not holding a lock here because this method is only called
// from PhabricatorRepositoryDiscoveryEngine, which already holds a device
// lock. Even if we do race here and record an older version, the
// consequences are mild: we only do extra work to correct it later.
$versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
$repository_phid);
$versions = mpull($versions, null, 'getDevicePHID');
$this_version = idx($versions, $device_phid);
if ($this_version) {
$this_version = (int)$this_version->getRepositoryVersion();
} else {
$this_version = -1;
}
if ($new_version > $this_version) {
PhabricatorRepositoryWorkingCopyVersion::updateVersion(
$repository_phid,
$device_phid,
$new_version);
}
}
/**
* @task sync
*/
public function synchronizeWorkingCopyAfterWrite() {
if (!$this->shouldEnableSynchronization(true)) {
return;
}
if (!$this->clusterWriteLock) {
throw new Exception(
pht(
'Trying to synchronize after write, but not holding a write '.
'lock!'));
}
$repository = $this->getRepository();
$repository_phid = $repository->getPHID();
$device = AlmanacKeys::getLiveDevice();
$device_phid = $device->getPHID();
// It is possible that we've lost the global lock while receiving the push.
// For example, the master database may have been restarted between the
// time we acquired the global lock and now, when the push has finished.
// We wrote a durable lock while we were holding the the global lock,
// essentially upgrading our lock. We can still safely release this upgraded
// lock even if we're no longer holding the global lock.
// If we fail to release the lock, the repository will be frozen until
// an operator can figure out what happened, so we try pretty hard to
// reconnect to the database and release the lock.
$now = PhabricatorTime::getNow();
$duration = phutil_units('5 minutes in seconds');
$try_until = $now + $duration;
$did_release = false;
$already_failed = false;
while (PhabricatorTime::getNow() <= $try_until) {
try {
// NOTE: This means we're still bumping the version when pushes fail. We
// could select only un-rejected events instead to bump a little less
// often.
$new_log = id(new PhabricatorRepositoryPushEventQuery())
->setViewer(PhabricatorUser::getOmnipotentUser())
->withRepositoryPHIDs(array($repository_phid))
->setLimit(1)
->executeOne();
$old_version = $this->clusterWriteVersion;
if ($new_log) {
$new_version = $new_log->getID();
} else {
$new_version = $old_version;
}
PhabricatorRepositoryWorkingCopyVersion::didWrite(
$repository_phid,
$device_phid,
$this->clusterWriteVersion,
$new_version,
$this->clusterWriteOwner);
$did_release = true;
break;
} catch (AphrontConnectionQueryException $ex) {
$connection_exception = $ex;
} catch (AphrontConnectionLostQueryException $ex) {
$connection_exception = $ex;
}
if (!$already_failed) {
$already_failed = true;
$this->logLine(
pht('CRITICAL. Failed to release cluster write lock!'));
$this->logLine(
pht(
'The connection to the master database was lost while receiving '.
'the write.'));
$this->logLine(
pht(
'This process will spend %s more second(s) attempting to '.
'recover, then give up.',
new PhutilNumber($duration)));
}
sleep(1);
}
if ($did_release) {
if ($already_failed) {
$this->logLine(
pht('RECOVERED. Link to master database was restored.'));
}
$this->logLine(pht('Released cluster write lock.'));
} else {
throw new Exception(
pht(
'Failed to reconnect to master database and release held write '.
'lock ("%s") on device "%s" for repository "%s" after trying '.
'for %s seconds(s). This repository will be frozen.',
$this->clusterWriteOwner,
$device->getName(),
$this->getDisplayName(),
new PhutilNumber($duration)));
}
// We can continue even if we've lost this lock, everything is still
// consistent.
try {
$this->clusterWriteLock->unlock();
} catch (Exception $ex) {
// Ignore.
}
$this->clusterWriteLock = null;
$this->clusterWriteOwner = null;
}
/* -( Internals )---------------------------------------------------------- */
/**
* @task internal
*/
private function shouldEnableSynchronization($require_device) {
$repository = $this->getRepository();
$service_phid = $repository->getAlmanacServicePHID();
if (!$service_phid) {
return false;
}
if (!$repository->supportsSynchronization()) {
return false;
}
if ($require_device) {
$device = AlmanacKeys::getLiveDevice();
if (!$device) {
return false;
}
}
return true;
}
/**
* @task internal
*/
private function synchornizeWorkingCopyFromRemote() {
$repository = $this->getRepository();
$device = AlmanacKeys::getLiveDevice();
$local_path = $repository->getLocalPath();
$fetch_uri = $repository->getRemoteURIEnvelope();
if ($repository->isGit()) {
$this->requireWorkingCopy();
$argv = array(
'fetch --prune -- %P %s',
$fetch_uri,
'+refs/*:refs/*',
);
} else {
throw new Exception(pht('Remote sync only supported for git!'));
}
$future = DiffusionCommandEngine::newCommandEngine($repository)
->setArgv($argv)
->setSudoAsDaemon(true)
->setCredentialPHID($repository->getCredentialPHID())
->setURI($repository->getRemoteURIObject())
->newFuture();
$future->setCWD($local_path);
try {
$future->resolvex();
} catch (Exception $ex) {
$this->logLine(
pht(
'Synchronization of "%s" from remote failed: %s',
$device->getName(),
$ex->getMessage()));
throw $ex;
}
}
/**
* @task internal
*/
private function synchronizeWorkingCopyFromDevices(array $device_phids) {
$repository = $this->getRepository();
$service = $repository->loadAlmanacService();
if (!$service) {
throw new Exception(pht('Failed to load repository cluster service.'));
}
$device_map = array_fuse($device_phids);
$bindings = $service->getActiveBindings();
$fetchable = array();
foreach ($bindings as $binding) {
// We can't fetch from nodes which don't have the newest version.
$device_phid = $binding->getDevicePHID();
if (empty($device_map[$device_phid])) {
continue;
}
// TODO: For now, only fetch over SSH. We could support fetching over
// HTTP eventually.
if ($binding->getAlmanacPropertyValue('protocol') != 'ssh') {
continue;
}
$fetchable[] = $binding;
}
if (!$fetchable) {
throw new Exception(
pht(
'Leader lost: no up-to-date nodes in repository cluster are '.
'fetchable.'));
}
$caught = null;
foreach ($fetchable as $binding) {
try {
$this->synchronizeWorkingCopyFromBinding($binding);
$caught = null;
break;
} catch (Exception $ex) {
$caught = $ex;
}
}
if ($caught) {
throw $caught;
}
}
/**
* @task internal
*/
private function synchronizeWorkingCopyFromBinding($binding) {
$repository = $this->getRepository();
$device = AlmanacKeys::getLiveDevice();
$this->logLine(
pht(
'Synchronizing this device ("%s") from cluster leader ("%s") before '.
'read.',
$device->getName(),
$binding->getDevice()->getName()));
$fetch_uri = $repository->getClusterRepositoryURIFromBinding($binding);
$local_path = $repository->getLocalPath();
if ($repository->isGit()) {
$this->requireWorkingCopy();
$argv = array(
'fetch --prune -- %s %s',
$fetch_uri,
'+refs/*:refs/*',
);
} else {
throw new Exception(pht('Binding sync only supported for git!'));
}
$future = DiffusionCommandEngine::newCommandEngine($repository)
->setArgv($argv)
->setConnectAsDevice(true)
->setSudoAsDaemon(true)
->setURI($fetch_uri)
->newFuture();
$future->setCWD($local_path);
try {
$future->resolvex();
} catch (Exception $ex) {
$this->logLine(
pht(
'Synchronization of "%s" from leader "%s" failed: %s',
$device->getName(),
$binding->getDevice()->getName(),
$ex->getMessage()));
throw $ex;
}
}
/**
* @task internal
*/
private function logLine($message) {
return $this->logText("# {$message}\n");
}
/**
* @task internal
*/
private function logText($message) {
$log = $this->logger;
if ($log) {
$log->writeClusterEngineLogMessage($message);
}
return $this;
}
private function requireWorkingCopy() {
$repository = $this->getRepository();
$local_path = $repository->getLocalPath();
if (!Filesystem::pathExists($local_path)) {
$device = AlmanacKeys::getLiveDevice();
throw new Exception(
pht(
'Repository "%s" does not have a working copy on this device '.
'yet, so it can not be synchronized. Wait for the daemons to '.
'construct one or run `bin/repository update %s` on this host '.
'("%s") to build it explicitly.',
$repository->getDisplayName(),
$repository->getMonogram(),
$device->getName()));
}
}
private function logActiveWriter(
PhabricatorUser $viewer,
PhabricatorRepository $repository) {
$writer = PhabricatorRepositoryWorkingCopyVersion::loadWriter(
$repository->getPHID());
if (!$writer) {
$this->logLine(pht('Waiting on another user to finish writing...'));
return;
}
$user_phid = $writer->getWriteProperty('userPHID');
$device_phid = $writer->getWriteProperty('devicePHID');
$epoch = $writer->getWriteProperty('epoch');
$phids = array($user_phid, $device_phid);
$handles = $viewer->loadHandles($phids);
$duration = (PhabricatorTime::getNow() - $epoch) + 1;
$this->logLine(
pht(
'Waiting for %s to finish writing (on device "%s" for %ss)...',
$handles[$user_phid]->getName(),
$handles[$device_phid]->getName(),
new PhutilNumber($duration)));
}
}
diff --git a/src/infrastructure/util/PhabricatorGlobalLock.php b/src/infrastructure/util/PhabricatorGlobalLock.php
index 827d7e0e3d..221a75b37c 100644
--- a/src/infrastructure/util/PhabricatorGlobalLock.php
+++ b/src/infrastructure/util/PhabricatorGlobalLock.php
@@ -1,231 +1,375 @@
<?php
/**
* Global, MySQL-backed lock. This is a high-reliability, low-performance
* global lock.
*
* The lock is maintained by using GET_LOCK() in MySQL, and automatically
* released when the connection terminates. Thus, this lock can safely be used
* to control access to shared resources without implementing any sort of
* timeout or override logic: the lock can't normally be stuck in a locked state
* with no process actually holding the lock.
*
* However, acquiring the lock is moderately expensive (several network
* roundtrips). This makes it unsuitable for tasks where lock performance is
* important.
*
* $lock = PhabricatorGlobalLock::newLock('example');
* $lock->lock();
* do_contentious_things();
* $lock->unlock();
*
* NOTE: This lock is not completely global; it is namespaced to the active
* storage namespace so that unit tests running in separate table namespaces
* are isolated from one another.
*
* @task construct Constructing Locks
* @task impl Implementation
*/
final class PhabricatorGlobalLock extends PhutilLock {
private $parameters;
private $conn;
private $isExternalConnection = false;
private $log;
private $disableLogging;
private static $pool = array();
/* -( Constructing Locks )------------------------------------------------- */
public static function newLock($name, $parameters = array()) {
$namespace = PhabricatorLiskDAO::getStorageNamespace();
$namespace = PhabricatorHash::digestToLength($namespace, 20);
$parts = array();
ksort($parameters);
foreach ($parameters as $key => $parameter) {
if (!preg_match('/^[a-zA-Z0-9]+\z/', $key)) {
throw new Exception(
pht(
'Lock parameter key "%s" must be alphanumeric.',
$key));
}
if (!is_scalar($parameter) && !is_null($parameter)) {
throw new Exception(
pht(
'Lock parameter for key "%s" must be a scalar.',
$key));
}
$value = phutil_json_encode($parameter);
$parts[] = "{$key}={$value}";
}
$parts = implode(', ', $parts);
$local = "{$name}({$parts})";
$local = PhabricatorHash::digestToLength($local, 20);
$full_name = "ph:{$namespace}:{$local}";
$lock = self::getLock($full_name);
if (!$lock) {
$lock = new PhabricatorGlobalLock($full_name);
self::registerLock($lock);
$lock->parameters = $parameters;
}
return $lock;
}
/**
* Use a specific database connection for locking.
*
* By default, `PhabricatorGlobalLock` will lock on the "repository" database
* (somewhat arbitrarily). In most cases this is fine, but this method can
* be used to lock on a specific connection.
*
* @param AphrontDatabaseConnection
* @return this
*/
public function useSpecificConnection(AphrontDatabaseConnection $conn) {
$this->conn = $conn;
$this->isExternalConnection = true;
return $this;
}
public function setDisableLogging($disable) {
$this->disableLogging = $disable;
return $this;
}
/* -( Implementation )----------------------------------------------------- */
protected function doLock($wait) {
$conn = $this->conn;
if (!$conn) {
// Try to reuse a connection from the connection pool.
$conn = array_pop(self::$pool);
}
if (!$conn) {
// NOTE: Using the 'repository' database somewhat arbitrarily, mostly
// because the first client of locks is the repository daemons. We must
// always use the same database for all locks, but don't access any
// tables so we could use any valid database. We could build a
// database-free connection instead, but that's kind of messy and we
// might forget about it in the future if we vertically partition the
// application.
$dao = new PhabricatorRepository();
// NOTE: Using "force_new" to make sure each lock is on its own
// connection.
$conn = $dao->establishConnection('w', $force_new = true);
}
// NOTE: Since MySQL will disconnect us if we're idle for too long, we set
// the wait_timeout to an enormous value, to allow us to hold the
// connection open indefinitely (or, at least, for 24 days).
$max_allowed_timeout = 2147483;
queryfx($conn, 'SET wait_timeout = %d', $max_allowed_timeout);
$lock_name = $this->getName();
$result = queryfx_one(
$conn,
'SELECT GET_LOCK(%s, %f)',
$lock_name,
$wait);
$ok = head($result);
if (!$ok) {
- throw new PhutilLockException($lock_name);
+ throw id(new PhutilLockException($lock_name))
+ ->setHint($this->newHint($lock_name, $wait));
}
$conn->rememberLock($lock_name);
$this->conn = $conn;
if ($this->shouldLogLock()) {
- global $argv;
-
- $lock_context = array(
- 'pid' => getmypid(),
- 'host' => php_uname('n'),
- 'argv' => $argv,
- );
+ $lock_context = $this->newLockContext();
$log = id(new PhabricatorDaemonLockLog())
->setLockName($lock_name)
->setLockParameters($this->parameters)
->setLockContext($lock_context)
->save();
$this->log = $log;
}
}
protected function doUnlock() {
$lock_name = $this->getName();
$conn = $this->conn;
try {
$result = queryfx_one(
$conn,
'SELECT RELEASE_LOCK(%s)',
$lock_name);
$conn->forgetLock($lock_name);
} catch (Exception $ex) {
$result = array(null);
}
$ok = head($result);
if (!$ok) {
// TODO: We could throw here, but then this lock doesn't get marked
// unlocked and we throw again later when exiting. It also doesn't
// particularly matter for any current applications. For now, just
// swallow the error.
}
$this->conn = null;
$this->isExternalConnection = false;
if (!$this->isExternalConnection) {
$conn->close();
self::$pool[] = $conn;
}
if ($this->log) {
$log = $this->log;
$this->log = null;
$conn = $log->establishConnection('w');
queryfx(
$conn,
'UPDATE %T SET lockReleased = UNIX_TIMESTAMP() WHERE id = %d',
$log->getTableName(),
$log->getID());
}
}
private function shouldLogLock() {
if ($this->disableLogging) {
return false;
}
$policy = id(new PhabricatorDaemonLockLogGarbageCollector())
->getRetentionPolicy();
if (!$policy) {
return false;
}
return true;
}
+ private function newLockContext() {
+ $context = array(
+ 'pid' => getmypid(),
+ 'host' => php_uname('n'),
+ 'sapi' => php_sapi_name(),
+ );
+
+ global $argv;
+ if ($argv) {
+ $context['argv'] = $argv;
+ }
+
+ $access_log = null;
+
+ // TODO: There's currently no cohesive way to get the parameterized access
+ // log for the current request across different request types. Web requests
+ // have an "AccessLog", SSH requests have an "SSHLog", and other processes
+ // (like scripts) have no log. But there's no method to say "give me any
+ // log you've got". For now, just test if we have a web request and use the
+ // "AccessLog" if we do, since that's the only one we actually read any
+ // parameters from.
+
+ // NOTE: "PhabricatorStartup" is only available from web requests, not
+ // from CLI scripts.
+ if (class_exists('PhabricatorStartup', false)) {
+ $access_log = PhabricatorAccessLog::getLog();
+ }
+
+ if ($access_log) {
+ $controller = $access_log->getData('C');
+ if ($controller) {
+ $context['controller'] = $controller;
+ }
+
+ $method = $access_log->getData('m');
+ if ($method) {
+ $context['method'] = $method;
+ }
+ }
+
+ return $context;
+ }
+
+ private function newHint($lock_name, $wait) {
+ if (!$this->shouldLogLock()) {
+ return pht(
+ 'Enable the lock log for more detailed information about '.
+ 'which process is holding this lock.');
+ }
+
+ $now = PhabricatorTime::getNow();
+
+ // First, look for recent logs. If other processes have been acquiring and
+ // releasing this lock while we've been waiting, this is more likely to be
+ // a contention/throughput issue than an issue with something hung while
+ // holding the lock.
+ $limit = 100;
+ $logs = id(new PhabricatorDaemonLockLog())->loadAllWhere(
+ 'lockName = %s AND dateCreated >= %d ORDER BY id ASC LIMIT %d',
+ $lock_name,
+ ($now - $wait),
+ $limit);
+
+ if ($logs) {
+ if (count($logs) === $limit) {
+ return pht(
+ 'During the last %s second(s) spent waiting for the lock, more '.
+ 'than %s other process(es) acquired it, so this is likely a '.
+ 'bottleneck. Use "bin/lock log --name %s" to review log activity.',
+ new PhutilNumber($wait),
+ new PhutilNumber($limit),
+ $lock_name);
+ } else {
+ return pht(
+ 'During the last %s second(s) spent waiting for the lock, %s '.
+ 'other process(es) acquired it, so this is likely a '.
+ 'bottleneck. Use "bin/lock log --name %s" to review log activity.',
+ new PhutilNumber($wait),
+ phutil_count($logs),
+ $lock_name);
+ }
+ }
+
+ $last_log = id(new PhabricatorDaemonLockLog())->loadOneWhere(
+ 'lockName = %s ORDER BY id DESC LIMIT 1',
+ $lock_name);
+
+ if ($last_log) {
+ $info = array();
+
+ $acquired = $last_log->getDateCreated();
+ $context = $last_log->getLockContext();
+
+ $process_info = array();
+
+ $pid = idx($context, 'pid');
+ if ($pid) {
+ $process_info[] = 'pid='.$pid;
+ }
+
+ $host = idx($context, 'host');
+ if ($host) {
+ $process_info[] = 'host='.$host;
+ }
+
+ $sapi = idx($context, 'sapi');
+ if ($sapi) {
+ $process_info[] = 'sapi='.$sapi;
+ }
+
+ $argv = idx($context, 'argv');
+ if ($argv) {
+ $process_info[] = 'argv='.(string)csprintf('%LR', $argv);
+ }
+
+ $controller = idx($context, 'controller');
+ if ($controller) {
+ $process_info[] = 'controller='.$controller;
+ }
+
+ $method = idx($context, 'method');
+ if ($method) {
+ $process_info[] = 'method='.$method;
+ }
+
+ $process_info = implode(', ', $process_info);
+
+ $info[] = pht(
+ 'This lock was most recently acquired by a process (%s) '.
+ '%s second(s) ago.',
+ $process_info,
+ new PhutilNumber($now - $acquired));
+
+ $released = $last_log->getLockReleased();
+ if ($released) {
+ $info[] = pht(
+ 'This lock was released %s second(s) ago.',
+ new PhutilNumber($now - $released));
+ } else {
+ $info[] = pht('There is no record of this lock being released.');
+ }
+
+ return implode(' ', $info);
+ }
+
+ return pht(
+ 'Found no records of processes acquiring or releasing this lock.');
+ }
+
}

File Metadata

Mime Type
text/x-diff
Expires
Fri, Aug 15, 2:31 PM (6 d, 18 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
202457
Default Alt Text
(35 KB)

Event Timeline