Page MenuHomestyx hydra

No OneTemporary

diff --git a/resources/sql/autopatches/20160424.locks.1.sql b/resources/sql/autopatches/20160424.locks.1.sql
new file mode 100644
index 0000000000..0edea13689
--- /dev/null
+++ b/resources/sql/autopatches/20160424.locks.1.sql
@@ -0,0 +1,2 @@
+ALTER TABLE {$NAMESPACE}_repository.repository_workingcopyversion
+ ADD lockOwner VARCHAR(255) COLLATE {$COLLATE_TEXT};
diff --git a/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php b/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php
index f388fc2ad6..e3c70fecd9 100644
--- a/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php
+++ b/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php
@@ -1,538 +1,625 @@
<?php
/**
* Manages repository synchronization for cluster repositories.
*
* @task config Configuring Synchronization
* @task sync Cluster Synchronization
* @task internal Internals
*/
final class DiffusionRepositoryClusterEngine extends Phobject {
private $repository;
private $viewer;
+ private $logger;
+
private $clusterWriteLock;
private $clusterWriteVersion;
- private $logger;
+ private $clusterWriteOwner;
/* -( Configuring Synchronization )---------------------------------------- */
public function setRepository(PhabricatorRepository $repository) {
$this->repository = $repository;
return $this;
}
public function getRepository() {
return $this->repository;
}
public function setViewer(PhabricatorUser $viewer) {
$this->viewer = $viewer;
return $this;
}
public function getViewer() {
return $this->viewer;
}
public function setLog(DiffusionRepositoryClusterEngineLogInterface $log) {
$this->logger = $log;
return $this;
}
/* -( Cluster Synchronization )-------------------------------------------- */
/**
* Synchronize repository version information after creating a repository.
*
* This initializes working copy versions for all currently bound devices to
* 0, so that we don't get stuck making an ambiguous choice about which
* devices are leaders when we later synchronize before a read.
*
* @task sync
*/
public function synchronizeWorkingCopyAfterCreation() {
if (!$this->shouldEnableSynchronization()) {
return;
}
$repository = $this->getRepository();
$repository_phid = $repository->getPHID();
$service = $repository->loadAlmanacService();
if (!$service) {
throw new Exception(pht('Failed to load repository cluster service.'));
}
$bindings = $service->getActiveBindings();
foreach ($bindings as $binding) {
PhabricatorRepositoryWorkingCopyVersion::updateVersion(
$repository_phid,
$binding->getDevicePHID(),
0);
}
return $this;
}
/**
* @task sync
*/
public function synchronizeWorkingCopyBeforeRead() {
if (!$this->shouldEnableSynchronization()) {
return;
}
$repository = $this->getRepository();
$repository_phid = $repository->getPHID();
$device = AlmanacKeys::getLiveDevice();
$device_phid = $device->getPHID();
$read_lock = PhabricatorRepositoryWorkingCopyVersion::getReadLock(
$repository_phid,
$device_phid);
$lock_wait = phutil_units('2 minutes in seconds');
$this->logLine(
pht(
'Waiting up to %s second(s) for a cluster read lock on "%s"...',
new PhutilNumber($lock_wait),
$device->getName()));
try {
$start = PhabricatorTime::getNow();
$read_lock->lock($lock_wait);
$waited = (PhabricatorTime::getNow() - $start);
if ($waited) {
$this->logLine(
pht(
'Acquired read lock after %s second(s).',
new PhutilNumber($waited)));
} else {
$this->logLine(
pht(
'Acquired read lock immediately.'));
}
} catch (Exception $ex) {
throw new PhutilProxyException(
pht(
'Failed to acquire read lock after waiting %s second(s). You '.
'may be able to retry later.',
new PhutilNumber($lock_wait)));
}
$versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
$repository_phid);
$versions = mpull($versions, null, 'getDevicePHID');
$this_version = idx($versions, $device_phid);
if ($this_version) {
$this_version = (int)$this_version->getRepositoryVersion();
} else {
$this_version = -1;
}
if ($versions) {
// This is the normal case, where we have some version information and
// can identify which nodes are leaders. If the current node is not a
// leader, we want to fetch from a leader and then update our version.
$max_version = (int)max(mpull($versions, 'getRepositoryVersion'));
if ($max_version > $this_version) {
$fetchable = array();
foreach ($versions as $version) {
if ($version->getRepositoryVersion() == $max_version) {
$fetchable[] = $version->getDevicePHID();
}
}
$this->synchronizeWorkingCopyFromDevices($fetchable);
PhabricatorRepositoryWorkingCopyVersion::updateVersion(
$repository_phid,
$device_phid,
$max_version);
} else {
$this->logLine(
pht(
'Device "%s" is already a cluster leader and does not need '.
'to be synchronized.',
$device->getName()));
}
$result_version = $max_version;
} else {
// If no version records exist yet, we need to be careful, because we
// can not tell which nodes are leaders.
// There might be several nodes with arbitrary existing data, and we have
// no way to tell which one has the "right" data. If we pick wrong, we
// might erase some or all of the data in the repository.
// Since this is dangeorus, we refuse to guess unless there is only one
// device. If we're the only device in the group, we obviously must be
// a leader.
$service = $repository->loadAlmanacService();
if (!$service) {
throw new Exception(pht('Failed to load repository cluster service.'));
}
$bindings = $service->getActiveBindings();
$device_map = array();
foreach ($bindings as $binding) {
$device_map[$binding->getDevicePHID()] = true;
}
if (count($device_map) > 1) {
throw new Exception(
pht(
'Repository "%s" exists on more than one device, but no device '.
'has any repository version information. Phabricator can not '.
'guess which copy of the existing data is authoritative. Remove '.
'all but one device from service to mark the remaining device '.
'as the authority.',
$repository->getDisplayName()));
}
if (empty($device_map[$device->getPHID()])) {
throw new Exception(
pht(
'Repository "%s" is being synchronized on device "%s", but '.
'this device is not bound to the corresponding cluster '.
'service ("%s").',
$repository->getDisplayName(),
$device->getName(),
$service->getName()));
}
// The current device is the only device in service, so it must be a
// leader. We can safely have any future nodes which come online read
// from it.
PhabricatorRepositoryWorkingCopyVersion::updateVersion(
$repository_phid,
$device_phid,
0);
$result_version = 0;
}
$read_lock->unlock();
return $result_version;
}
/**
* @task sync
*/
public function synchronizeWorkingCopyBeforeWrite() {
if (!$this->shouldEnableSynchronization()) {
return;
}
$repository = $this->getRepository();
$viewer = $this->getViewer();
$repository_phid = $repository->getPHID();
$device = AlmanacKeys::getLiveDevice();
$device_phid = $device->getPHID();
+ $table = new PhabricatorRepositoryWorkingCopyVersion();
+ $locked_connection = $table->establishConnection('w');
+
$write_lock = PhabricatorRepositoryWorkingCopyVersion::getWriteLock(
$repository_phid);
+ $write_lock->useSpecificConnection($locked_connection);
+
$lock_wait = phutil_units('2 minutes in seconds');
$this->logLine(
pht(
'Waiting up to %s second(s) for a cluster write lock...',
new PhutilNumber($lock_wait)));
try {
$start = PhabricatorTime::getNow();
$write_lock->lock($lock_wait);
$waited = (PhabricatorTime::getNow() - $start);
if ($waited) {
$this->logLine(
pht(
'Acquired write lock after %s second(s).',
new PhutilNumber($waited)));
} else {
$this->logLine(
pht(
'Acquired write lock immediately.'));
}
} catch (Exception $ex) {
throw new PhutilProxyException(
pht(
'Failed to acquire write lock after waiting %s second(s). You '.
'may be able to retry later.',
new PhutilNumber($lock_wait)));
}
$versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
$repository_phid);
foreach ($versions as $version) {
if (!$version->getIsWriting()) {
continue;
}
throw new Exception(
pht(
'An previous write to this repository was interrupted; refusing '.
- 'new writes. This issue resolves operator intervention to resolve, '.
+ 'new writes. This issue requires operator intervention to resolve, '.
'see "Write Interruptions" in the "Cluster: Repositories" in the '.
'documentation for instructions.'));
}
try {
$max_version = $this->synchronizeWorkingCopyBeforeRead();
} catch (Exception $ex) {
$write_lock->unlock();
throw $ex;
}
+ $pid = getmypid();
+ $hash = Filesystem::readRandomCharacters(12);
+ $this->clusterWriteOwner = "{$pid}.{$hash}";
+
PhabricatorRepositoryWorkingCopyVersion::willWrite(
+ $locked_connection,
$repository_phid,
$device_phid,
array(
'userPHID' => $viewer->getPHID(),
'epoch' => PhabricatorTime::getNow(),
'devicePHID' => $device_phid,
- ));
+ ),
+ $this->clusterWriteOwner);
$this->clusterWriteVersion = $max_version;
$this->clusterWriteLock = $write_lock;
}
/**
* @task sync
*/
public function synchronizeWorkingCopyAfterWrite() {
if (!$this->shouldEnableSynchronization()) {
return;
}
if (!$this->clusterWriteLock) {
throw new Exception(
pht(
'Trying to synchronize after write, but not holding a write '.
'lock!'));
}
$repository = $this->getRepository();
$repository_phid = $repository->getPHID();
$device = AlmanacKeys::getLiveDevice();
$device_phid = $device->getPHID();
- // NOTE: This means we're still bumping the version when pushes fail. We
- // could select only un-rejected events instead to bump a little less
- // often.
+ // It is possible that we've lost the global lock while receiving the push.
+ // For example, the master database may have been restarted between the
+ // time we acquired the global lock and now, when the push has finished.
+
+ // We wrote a durable lock while we were holding the the global lock,
+ // essentially upgrading our lock. We can still safely release this upgraded
+ // lock even if we're no longer holding the global lock.
+
+ // If we fail to release the lock, the repository will be frozen until
+ // an operator can figure out what happened, so we try pretty hard to
+ // reconnect to the database and release the lock.
- $new_log = id(new PhabricatorRepositoryPushEventQuery())
- ->setViewer(PhabricatorUser::getOmnipotentUser())
- ->withRepositoryPHIDs(array($repository_phid))
- ->setLimit(1)
- ->executeOne();
+ $now = PhabricatorTime::getNow();
+ $duration = phutil_units('5 minutes in seconds');
+ $try_until = $now + $duration;
- $old_version = $this->clusterWriteVersion;
- if ($new_log) {
- $new_version = $new_log->getID();
+ $did_release = false;
+ $already_failed = false;
+ while (PhabricatorTime::getNow() <= $try_until) {
+ try {
+ // NOTE: This means we're still bumping the version when pushes fail. We
+ // could select only un-rejected events instead to bump a little less
+ // often.
+
+ $new_log = id(new PhabricatorRepositoryPushEventQuery())
+ ->setViewer(PhabricatorUser::getOmnipotentUser())
+ ->withRepositoryPHIDs(array($repository_phid))
+ ->setLimit(1)
+ ->executeOne();
+
+ $old_version = $this->clusterWriteVersion;
+ if ($new_log) {
+ $new_version = $new_log->getID();
+ } else {
+ $new_version = $old_version;
+ }
+
+ PhabricatorRepositoryWorkingCopyVersion::didWrite(
+ $repository_phid,
+ $device_phid,
+ $this->clusterWriteVersion,
+ $new_log->getID(),
+ $this->clusterWriteOwner);
+ $did_release = true;
+ break;
+ } catch (AphrontConnectionQueryException $ex) {
+ $connection_exception = $ex;
+ } catch (AphrontConnectionLostQueryException $ex) {
+ $connection_exception = $ex;
+ }
+
+ if (!$already_failed) {
+ $already_failed = true;
+ $this->logLine(
+ pht('CRITICAL. Failed to release cluster write lock!'));
+
+ $this->logLine(
+ pht(
+ 'The connection to the master database was lost while receiving '.
+ 'the write.'));
+
+ $this->logLine(
+ pht(
+ 'This process will spend %s more second(s) attempting to '.
+ 'recover, then give up.',
+ new PhutilNumber($duration)));
+ }
+
+ sleep(1);
+ }
+
+ if ($did_release) {
+ if ($already_failed) {
+ $this->logLine(
+ pht('RECOVERED. Link to master database was restored.'));
+ }
+ $this->logLine(pht('Released cluster write lock.'));
} else {
- $new_version = $old_version;
+ throw new Exception(
+ pht(
+ 'Failed to reconnect to master database and release held write '.
+ 'lock ("%s") on device "%s" for repository "%s" after trying '.
+ 'for %s seconds(s). This repository will be frozen.',
+ $this->clusterWriteOwner,
+ $device->getName(),
+ $this->getDisplayName(),
+ new PhutilNumber($duration)));
}
- PhabricatorRepositoryWorkingCopyVersion::didWrite(
- $repository_phid,
- $device_phid,
- $this->clusterWriteVersion,
- $new_log->getID());
+ // We can continue even if we've lost this lock, everything is still
+ // consistent.
+ try {
+ $this->clusterWriteLock->unlock();
+ } catch (Exception $ex) {
+ // Ignore.
+ }
- $this->clusterWriteLock->unlock();
$this->clusterWriteLock = null;
+ $this->clusterWriteOwner = null;
}
/* -( Internals )---------------------------------------------------------- */
/**
* @task internal
*/
private function shouldEnableSynchronization() {
$repository = $this->getRepository();
$service_phid = $repository->getAlmanacServicePHID();
if (!$service_phid) {
return false;
}
// TODO: For now, this is only supported for Git.
if (!$repository->isGit()) {
return false;
}
// TODO: It may eventually make sense to try to version and synchronize
// observed repositories (so that daemons don't do reads against out-of
// date hosts), but don't bother for now.
if (!$repository->isHosted()) {
return false;
}
$device = AlmanacKeys::getLiveDevice();
if (!$device) {
return false;
}
return true;
}
/**
* @task internal
*/
private function synchronizeWorkingCopyFromDevices(array $device_phids) {
$repository = $this->getRepository();
$service = $repository->loadAlmanacService();
if (!$service) {
throw new Exception(pht('Failed to load repository cluster service.'));
}
$device_map = array_fuse($device_phids);
$bindings = $service->getActiveBindings();
$fetchable = array();
foreach ($bindings as $binding) {
// We can't fetch from nodes which don't have the newest version.
$device_phid = $binding->getDevicePHID();
if (empty($device_map[$device_phid])) {
continue;
}
// TODO: For now, only fetch over SSH. We could support fetching over
// HTTP eventually.
if ($binding->getAlmanacPropertyValue('protocol') != 'ssh') {
continue;
}
$fetchable[] = $binding;
}
if (!$fetchable) {
throw new Exception(
pht(
'Leader lost: no up-to-date nodes in repository cluster are '.
'fetchable.'));
}
$caught = null;
foreach ($fetchable as $binding) {
try {
$this->synchronizeWorkingCopyFromBinding($binding);
$caught = null;
break;
} catch (Exception $ex) {
$caught = $ex;
}
}
if ($caught) {
throw $caught;
}
}
/**
* @task internal
*/
private function synchronizeWorkingCopyFromBinding($binding) {
$repository = $this->getRepository();
$device = AlmanacKeys::getLiveDevice();
$this->logLine(
pht(
'Synchronizing this device ("%s") from cluster leader ("%s") before '.
'read.',
$device->getName(),
$binding->getDevice()->getName()));
$fetch_uri = $repository->getClusterRepositoryURIFromBinding($binding);
$local_path = $repository->getLocalPath();
if ($repository->isGit()) {
if (!Filesystem::pathExists($local_path)) {
throw new Exception(
pht(
'Repository "%s" does not have a working copy on this device '.
'yet, so it can not be synchronized. Wait for the daemons to '.
'construct one or run `bin/repository update %s` on this host '.
'("%s") to build it explicitly.',
$repository->getDisplayName(),
$repository->getMonogram(),
$device->getName()));
}
$argv = array(
'fetch --prune -- %s %s',
$fetch_uri,
'+refs/*:refs/*',
);
} else {
throw new Exception(pht('Binding sync only supported for git!'));
}
$future = DiffusionCommandEngine::newCommandEngine($repository)
->setArgv($argv)
->setConnectAsDevice(true)
->setSudoAsDaemon(true)
->setProtocol($fetch_uri->getProtocol())
->newFuture();
$future->setCWD($local_path);
try {
$future->resolvex();
} catch (Exception $ex) {
$this->logLine(
pht(
'Synchronization of "%s" from leader "%s" failed: %s',
$device->getName(),
$binding->getDevice()->getName(),
$ex->getMessage()));
throw $ex;
}
}
/**
* @task internal
*/
private function logLine($message) {
return $this->logText("# {$message}\n");
}
/**
* @task internal
*/
private function logText($message) {
$log = $this->logger;
if ($log) {
$log->writeClusterEngineLogMessage($message);
}
return $this;
}
}
diff --git a/src/applications/diffusion/ssh/DiffusionGitSSHWorkflow.php b/src/applications/diffusion/ssh/DiffusionGitSSHWorkflow.php
index 79c00231c7..9843ca8401 100644
--- a/src/applications/diffusion/ssh/DiffusionGitSSHWorkflow.php
+++ b/src/applications/diffusion/ssh/DiffusionGitSSHWorkflow.php
@@ -1,35 +1,36 @@
<?php
abstract class DiffusionGitSSHWorkflow
extends DiffusionSSHWorkflow
implements DiffusionRepositoryClusterEngineLogInterface {
protected function writeError($message) {
// Git assumes we'll add our own newlines.
return parent::writeError($message."\n");
}
public function writeClusterEngineLogMessage($message) {
parent::writeError($message);
+ $this->getErrorChannel()->update();
}
protected function identifyRepository() {
$args = $this->getArgs();
$path = head($args->getArg('dir'));
return $this->loadRepositoryWithPath($path);
}
protected function waitForGitClient() {
$io_channel = $this->getIOChannel();
// If we don't wait for the client to close the connection, `git` will
// consider it an early abort and fail. Sit around until Git is comfortable
// that it really received all the data.
while ($io_channel->isOpenForReading()) {
$io_channel->update();
$this->getErrorChannel()->flush();
PhutilChannel::waitForAny(array($io_channel));
}
}
}
diff --git a/src/applications/diffusion/ssh/DiffusionSSHWorkflow.php b/src/applications/diffusion/ssh/DiffusionSSHWorkflow.php
index b1694de814..60b929f7c4 100644
--- a/src/applications/diffusion/ssh/DiffusionSSHWorkflow.php
+++ b/src/applications/diffusion/ssh/DiffusionSSHWorkflow.php
@@ -1,261 +1,276 @@
<?php
abstract class DiffusionSSHWorkflow extends PhabricatorSSHWorkflow {
private $args;
private $repository;
private $hasWriteAccess;
private $proxyURI;
private $baseRequestPath;
public function getRepository() {
if (!$this->repository) {
throw new Exception(pht('Repository is not available yet!'));
}
return $this->repository;
}
private function setRepository(PhabricatorRepository $repository) {
$this->repository = $repository;
return $this;
}
public function getArgs() {
return $this->args;
}
public function getEnvironment() {
$env = array(
DiffusionCommitHookEngine::ENV_USER => $this->getUser()->getUsername(),
DiffusionCommitHookEngine::ENV_REMOTE_PROTOCOL => 'ssh',
);
$ssh_client = getenv('SSH_CLIENT');
if ($ssh_client) {
// This has the format "<ip> <remote-port> <local-port>". Grab the IP.
$remote_address = head(explode(' ', $ssh_client));
$env[DiffusionCommitHookEngine::ENV_REMOTE_ADDRESS] = $remote_address;
}
return $env;
}
/**
* Identify and load the affected repository.
*/
abstract protected function identifyRepository();
abstract protected function executeRepositoryOperations();
protected function getBaseRequestPath() {
return $this->baseRequestPath;
}
protected function writeError($message) {
$this->getErrorChannel()->write($message);
return $this;
}
+ protected function getCurrentDeviceName() {
+ $device = AlmanacKeys::getLiveDevice();
+ if ($device) {
+ return $device->getName();
+ }
+
+ return php_uname('n');
+ }
+
+ protected function getTargetDeviceName() {
+ // TODO: This should use the correct device identity.
+ $uri = new PhutilURI($this->proxyURI);
+ return $uri->getDomain();
+ }
+
protected function shouldProxy() {
return (bool)$this->proxyURI;
}
protected function getProxyCommand() {
$uri = new PhutilURI($this->proxyURI);
$username = AlmanacKeys::getClusterSSHUser();
if ($username === null) {
throw new Exception(
pht(
'Unable to determine the username to connect with when trying '.
'to proxy an SSH request within the Phabricator cluster.'));
}
$port = $uri->getPort();
$host = $uri->getDomain();
$key_path = AlmanacKeys::getKeyPath('device.key');
if (!Filesystem::pathExists($key_path)) {
throw new Exception(
pht(
'Unable to proxy this SSH request within the cluster: this device '.
'is not registered and has a missing device key (expected to '.
'find key at "%s").',
$key_path));
}
$options = array();
$options[] = '-o';
$options[] = 'StrictHostKeyChecking=no';
$options[] = '-o';
$options[] = 'UserKnownHostsFile=/dev/null';
// This is suppressing "added <address> to the list of known hosts"
// messages, which are confusing and irrelevant when they arise from
// proxied requests. It might also be suppressing lots of useful errors,
// of course. Ideally, we would enforce host keys eventually.
$options[] = '-o';
$options[] = 'LogLevel=quiet';
// NOTE: We prefix the command with "@username", which the far end of the
// connection will parse in order to act as the specified user. This
// behavior is only available to cluster requests signed by a trusted
// device key.
return csprintf(
'ssh %Ls -l %s -i %s -p %s %s -- %s %Ls',
$options,
$username,
$key_path,
$port,
$host,
'@'.$this->getUser()->getUsername(),
$this->getOriginalArguments());
}
final public function execute(PhutilArgumentParser $args) {
$this->args = $args;
$viewer = $this->getUser();
$have_diffusion = PhabricatorApplication::isClassInstalledForViewer(
'PhabricatorDiffusionApplication',
$viewer);
if (!$have_diffusion) {
throw new Exception(
pht(
'You do not have permission to access the Diffusion application, '.
'so you can not interact with repositories over SSH.'));
}
$repository = $this->identifyRepository();
$this->setRepository($repository);
$is_cluster_request = $this->getIsClusterRequest();
$uri = $repository->getAlmanacServiceURI(
$viewer,
$is_cluster_request,
array(
'ssh',
));
if ($uri) {
$this->proxyURI = $uri;
}
try {
return $this->executeRepositoryOperations();
} catch (Exception $ex) {
$this->writeError(get_class($ex).': '.$ex->getMessage());
return 1;
}
}
protected function loadRepositoryWithPath($path) {
$viewer = $this->getUser();
$info = PhabricatorRepository::parseRepositoryServicePath($path);
if ($info === null) {
throw new Exception(
pht(
'Unrecognized repository path "%s". Expected a path like "%s" '.
'or "%s".',
$path,
'/diffusion/X/',
'/diffusion/123/'));
}
$identifier = $info['identifier'];
$base = $info['base'];
$this->baseRequestPath = $base;
$repository = id(new PhabricatorRepositoryQuery())
->setViewer($viewer)
->withIdentifiers(array($identifier))
->executeOne();
if (!$repository) {
throw new Exception(
pht('No repository "%s" exists!', $identifier));
}
switch ($repository->getServeOverSSH()) {
case PhabricatorRepository::SERVE_READONLY:
case PhabricatorRepository::SERVE_READWRITE:
// If we have read or read/write access, proceed for now. We will
// check write access when the user actually issues a write command.
break;
case PhabricatorRepository::SERVE_OFF:
default:
throw new Exception(
pht(
'This repository ("%s") is not available over SSH.',
$repository->getDisplayName()));
}
return $repository;
}
protected function requireWriteAccess($protocol_command = null) {
if ($this->hasWriteAccess === true) {
return;
}
$repository = $this->getRepository();
$viewer = $this->getUser();
if ($viewer->isOmnipotent()) {
throw new Exception(
pht(
'This request is authenticated as a cluster device, but is '.
'performing a write. Writes must be performed with a real '.
'user account.'));
}
switch ($repository->getServeOverSSH()) {
case PhabricatorRepository::SERVE_READONLY:
if ($protocol_command !== null) {
throw new Exception(
pht(
'This repository is read-only over SSH (tried to execute '.
'protocol command "%s").',
$protocol_command));
} else {
throw new Exception(
pht('This repository is read-only over SSH.'));
}
break;
case PhabricatorRepository::SERVE_READWRITE:
$can_push = PhabricatorPolicyFilter::hasCapability(
$viewer,
$repository,
DiffusionPushCapability::CAPABILITY);
if (!$can_push) {
throw new Exception(
pht('You do not have permission to push to this repository.'));
}
break;
case PhabricatorRepository::SERVE_OFF:
default:
// This shouldn't be reachable because we don't get this far if the
// repository isn't enabled, but kick them out anyway.
throw new Exception(
pht('This repository is not available over SSH.'));
}
$this->hasWriteAccess = true;
return $this->hasWriteAccess;
}
protected function shouldSkipReadSynchronization() {
$viewer = $this->getUser();
// Currently, the only case where devices interact over SSH without
// assuming user credentials is when synchronizing before a read. These
// synchronizing reads do not themselves need to be synchronized.
if ($viewer->isOmnipotent()) {
return true;
}
return false;
}
}
diff --git a/src/applications/repository/storage/PhabricatorRepositoryWorkingCopyVersion.php b/src/applications/repository/storage/PhabricatorRepositoryWorkingCopyVersion.php
index 0feeec759f..51abc70d35 100644
--- a/src/applications/repository/storage/PhabricatorRepositoryWorkingCopyVersion.php
+++ b/src/applications/repository/storage/PhabricatorRepositoryWorkingCopyVersion.php
@@ -1,175 +1,186 @@
<?php
final class PhabricatorRepositoryWorkingCopyVersion
extends PhabricatorRepositoryDAO {
protected $repositoryPHID;
protected $devicePHID;
protected $repositoryVersion;
protected $isWriting;
+ protected $lockOwner;
protected $writeProperties;
protected function getConfiguration() {
return array(
self::CONFIG_TIMESTAMPS => false,
self::CONFIG_COLUMN_SCHEMA => array(
'repositoryVersion' => 'uint32',
'isWriting' => 'bool',
'writeProperties' => 'text?',
+ 'lockOwner' => 'text255?',
),
self::CONFIG_KEY_SCHEMA => array(
'key_workingcopy' => array(
'columns' => array('repositoryPHID', 'devicePHID'),
'unique' => true,
),
),
) + parent::getConfiguration();
}
public static function loadVersions($repository_phid) {
$version = new self();
$conn_w = $version->establishConnection('w');
$table = $version->getTableName();
// This is a normal read, but force it to come from the master.
$rows = queryfx_all(
$conn_w,
'SELECT * FROM %T WHERE repositoryPHID = %s',
$table,
$repository_phid);
return $version->loadAllFromArray($rows);
}
public static function getReadLock($repository_phid, $device_phid) {
$repository_hash = PhabricatorHash::digestForIndex($repository_phid);
$device_hash = PhabricatorHash::digestForIndex($device_phid);
$lock_key = "repo.read({$repository_hash}, {$device_hash})";
return PhabricatorGlobalLock::newLock($lock_key);
}
public static function getWriteLock($repository_phid) {
$repository_hash = PhabricatorHash::digestForIndex($repository_phid);
$lock_key = "repo.write({$repository_hash})";
return PhabricatorGlobalLock::newLock($lock_key);
}
/**
* Before a write, set the "isWriting" flag.
*
* This allows us to detect when we lose a node partway through a write and
* may have committed and acknowledged a write on a node that lost the lock
* partway through the write and is no longer reachable.
*
* In particular, if a node loses its connection to the datbase the global
* lock is released by default. This is a durable lock which stays locked
* by default.
*/
public static function willWrite(
+ AphrontDatabaseConnection $locked_connection,
$repository_phid,
$device_phid,
- array $write_properties) {
+ array $write_properties,
+ $lock_owner) {
+
$version = new self();
- $conn_w = $version->establishConnection('w');
$table = $version->getTableName();
queryfx(
- $conn_w,
+ $locked_connection,
'INSERT INTO %T
(repositoryPHID, devicePHID, repositoryVersion, isWriting,
- writeProperties)
+ writeProperties, lockOwner)
VALUES
- (%s, %s, %d, %d, %s)
+ (%s, %s, %d, %d, %s, %s)
ON DUPLICATE KEY UPDATE
isWriting = VALUES(isWriting),
- writeProperties = VALUES(writeProperties)',
+ writeProperties = VALUES(writeProperties),
+ lockOwner = VALUES(lockOwner)',
$table,
$repository_phid,
$device_phid,
0,
1,
- phutil_json_encode($write_properties));
+ phutil_json_encode($write_properties),
+ $lock_owner);
}
/**
* After a write, update the version and release the "isWriting" lock.
*/
public static function didWrite(
$repository_phid,
$device_phid,
$old_version,
- $new_version) {
+ $new_version,
+ $lock_owner) {
+
$version = new self();
$conn_w = $version->establishConnection('w');
$table = $version->getTableName();
queryfx(
$conn_w,
'UPDATE %T SET
repositoryVersion = %d,
- isWriting = 0
+ isWriting = 0,
+ lockOwner = NULL
WHERE
repositoryPHID = %s AND
devicePHID = %s AND
repositoryVersion = %d AND
- isWriting = 1',
+ isWriting = 1 AND
+ lockOwner = %s',
$table,
$new_version,
$repository_phid,
$device_phid,
- $old_version);
+ $old_version,
+ $lock_owner);
}
/**
* After a fetch, set the local version to the fetched version.
*/
public static function updateVersion(
$repository_phid,
$device_phid,
$new_version) {
$version = new self();
$conn_w = $version->establishConnection('w');
$table = $version->getTableName();
queryfx(
$conn_w,
'INSERT INTO %T
(repositoryPHID, devicePHID, repositoryVersion, isWriting)
VALUES
(%s, %s, %d, %d)
ON DUPLICATE KEY UPDATE
repositoryVersion = VALUES(repositoryVersion)',
$table,
$repository_phid,
$device_phid,
$new_version,
0);
}
/**
* Explicitly demote a device.
*/
public static function demoteDevice(
$repository_phid,
$device_phid) {
$version = new self();
$conn_w = $version->establishConnection('w');
$table = $version->getTableName();
queryfx(
$conn_w,
'DELETE FROM %T WHERE repositoryPHID = %s AND devicePHID = %s',
$table,
$repository_phid,
$device_phid);
}
}
diff --git a/src/infrastructure/util/PhabricatorGlobalLock.php b/src/infrastructure/util/PhabricatorGlobalLock.php
index 394f57d9a9..26e11d1899 100644
--- a/src/infrastructure/util/PhabricatorGlobalLock.php
+++ b/src/infrastructure/util/PhabricatorGlobalLock.php
@@ -1,137 +1,164 @@
<?php
/**
* Global, MySQL-backed lock. This is a high-reliability, low-performance
* global lock.
*
* The lock is maintained by using GET_LOCK() in MySQL, and automatically
* released when the connection terminates. Thus, this lock can safely be used
* to control access to shared resources without implementing any sort of
* timeout or override logic: the lock can't normally be stuck in a locked state
* with no process actually holding the lock.
*
* However, acquiring the lock is moderately expensive (several network
* roundtrips). This makes it unsuitable for tasks where lock performance is
* important.
*
* $lock = PhabricatorGlobalLock::newLock('example');
* $lock->lock();
* do_contentious_things();
* $lock->unlock();
*
* NOTE: This lock is not completely global; it is namespaced to the active
* storage namespace so that unit tests running in separate table namespaces
* are isolated from one another.
*
* @task construct Constructing Locks
* @task impl Implementation
*/
final class PhabricatorGlobalLock extends PhutilLock {
private $conn;
+ private $isExternalConnection = false;
private static $pool = array();
/* -( Constructing Locks )------------------------------------------------- */
public static function newLock($name) {
$namespace = PhabricatorLiskDAO::getStorageNamespace();
$namespace = PhabricatorHash::digestToLength($namespace, 20);
$full_name = 'ph:'.$namespace.':'.$name;
$length_limit = 64;
if (strlen($full_name) > $length_limit) {
throw new Exception(
pht(
'Lock name "%s" is too long (full lock name is "%s"). The '.
'full lock name must not be longer than %s bytes.',
$name,
$full_name,
new PhutilNumber($length_limit)));
}
$lock = self::getLock($full_name);
if (!$lock) {
$lock = new PhabricatorGlobalLock($full_name);
self::registerLock($lock);
}
return $lock;
}
/**
* Use a specific database connection for locking.
*
* By default, `PhabricatorGlobalLock` will lock on the "repository" database
* (somewhat arbitrarily). In most cases this is fine, but this method can
* be used to lock on a specific connection.
*
* @param AphrontDatabaseConnection
* @return this
*/
public function useSpecificConnection(AphrontDatabaseConnection $conn) {
$this->conn = $conn;
+ $this->isExternalConnection = true;
return $this;
}
/* -( Implementation )----------------------------------------------------- */
protected function doLock($wait) {
$conn = $this->conn;
if (!$conn) {
// Try to reuse a connection from the connection pool.
$conn = array_pop(self::$pool);
}
if (!$conn) {
// NOTE: Using the 'repository' database somewhat arbitrarily, mostly
// because the first client of locks is the repository daemons. We must
// always use the same database for all locks, but don't access any
// tables so we could use any valid database. We could build a
// database-free connection instead, but that's kind of messy and we
// might forget about it in the future if we vertically partition the
// application.
$dao = new PhabricatorRepository();
// NOTE: Using "force_new" to make sure each lock is on its own
// connection.
$conn = $dao->establishConnection('w', $force_new = true);
}
// NOTE: Since MySQL will disconnect us if we're idle for too long, we set
// the wait_timeout to an enormous value, to allow us to hold the
// connection open indefinitely (or, at least, for 24 days).
$max_allowed_timeout = 2147483;
queryfx($conn, 'SET wait_timeout = %d', $max_allowed_timeout);
+ $lock_name = $this->getName();
+
$result = queryfx_one(
$conn,
'SELECT GET_LOCK(%s, %f)',
- $this->getName(),
+ $lock_name,
$wait);
$ok = head($result);
if (!$ok) {
- throw new PhutilLockException($this->getName());
+ throw new PhutilLockException($lock_name);
}
+ $conn->rememberLock($lock_name);
+
$this->conn = $conn;
}
protected function doUnlock() {
- queryfx(
- $this->conn,
- 'SELECT RELEASE_LOCK(%s)',
- $this->getName());
+ $lock_name = $this->getName();
+
+ $conn = $this->conn;
+
+ try {
+ $result = queryfx_one(
+ $conn,
+ 'SELECT RELEASE_LOCK(%s)',
+ $lock_name);
+ $conn->forgetLock($lock_name);
+ } catch (Exception $ex) {
+ $result = array(null);
+ }
+
+ $ok = head($result);
+ if (!$ok) {
+ // TODO: We could throw here, but then this lock doesn't get marked
+ // unlocked and we throw again later when exiting. It also doesn't
+ // particularly matter for any current applications. For now, just
+ // swallow the error.
+ }
- $this->conn->close();
- self::$pool[] = $this->conn;
$this->conn = null;
+ $this->isExternalConnection = false;
+
+ if (!$this->isExternalConnection) {
+ $conn->close();
+ self::$pool[] = $conn;
+ }
}
}

File Metadata

Mime Type
text/x-diff
Expires
Tue, Dec 2, 11:35 AM (1 d, 3 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
431858
Default Alt Text
(40 KB)

Event Timeline