Page MenuHomestyx hydra

No OneTemporary

diff --git a/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php b/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php
index 3be69d356d..8937c1f205 100644
--- a/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php
+++ b/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php
@@ -1,764 +1,765 @@
<?php
/**
* Manages repository synchronization for cluster repositories.
*
* @task config Configuring Synchronization
* @task sync Cluster Synchronization
* @task internal Internals
*/
final class DiffusionRepositoryClusterEngine extends Phobject {
private $repository;
private $viewer;
private $logger;
private $clusterWriteLock;
private $clusterWriteVersion;
private $clusterWriteOwner;
/* -( Configuring Synchronization )---------------------------------------- */
public function setRepository(PhabricatorRepository $repository) {
$this->repository = $repository;
return $this;
}
public function getRepository() {
return $this->repository;
}
public function setViewer(PhabricatorUser $viewer) {
$this->viewer = $viewer;
return $this;
}
public function getViewer() {
return $this->viewer;
}
public function setLog(DiffusionRepositoryClusterEngineLogInterface $log) {
$this->logger = $log;
return $this;
}
/* -( Cluster Synchronization )-------------------------------------------- */
/**
* Synchronize repository version information after creating a repository.
*
* This initializes working copy versions for all currently bound devices to
* 0, so that we don't get stuck making an ambiguous choice about which
* devices are leaders when we later synchronize before a read.
*
* @task sync
*/
public function synchronizeWorkingCopyAfterCreation() {
if (!$this->shouldEnableSynchronization()) {
return;
}
$repository = $this->getRepository();
$repository_phid = $repository->getPHID();
$service = $repository->loadAlmanacService();
if (!$service) {
throw new Exception(pht('Failed to load repository cluster service.'));
}
$bindings = $service->getActiveBindings();
foreach ($bindings as $binding) {
PhabricatorRepositoryWorkingCopyVersion::updateVersion(
$repository_phid,
$binding->getDevicePHID(),
0);
}
return $this;
}
/**
* @task sync
*/
public function synchronizeWorkingCopyAfterHostingChange() {
if (!$this->shouldEnableSynchronization()) {
return;
}
$repository = $this->getRepository();
$repository_phid = $repository->getPHID();
$versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
$repository_phid);
$versions = mpull($versions, null, 'getDevicePHID');
// After converting a hosted repository to observed, or vice versa, we
// need to reset version numbers because the clocks for observed and hosted
// repositories run on different units.
// We identify all the cluster leaders and reset their version to 0.
// We identify all the cluster followers and demote them.
// This allows the cluster to start over again at version 0 but keep the
// same leaders.
if ($versions) {
$max_version = (int)max(mpull($versions, 'getRepositoryVersion'));
foreach ($versions as $version) {
$device_phid = $version->getDevicePHID();
if ($version->getRepositoryVersion() == $max_version) {
PhabricatorRepositoryWorkingCopyVersion::updateVersion(
$repository_phid,
$device_phid,
0);
} else {
PhabricatorRepositoryWorkingCopyVersion::demoteDevice(
$repository_phid,
$device_phid);
}
}
}
return $this;
}
/**
* @task sync
*/
public function synchronizeWorkingCopyBeforeRead() {
if (!$this->shouldEnableSynchronization()) {
return;
}
$repository = $this->getRepository();
$repository_phid = $repository->getPHID();
$device = AlmanacKeys::getLiveDevice();
$device_phid = $device->getPHID();
$read_lock = PhabricatorRepositoryWorkingCopyVersion::getReadLock(
$repository_phid,
$device_phid);
$lock_wait = phutil_units('2 minutes in seconds');
$this->logLine(
pht(
'Waiting up to %s second(s) for a cluster read lock on "%s"...',
new PhutilNumber($lock_wait),
$device->getName()));
try {
$start = PhabricatorTime::getNow();
$read_lock->lock($lock_wait);
$waited = (PhabricatorTime::getNow() - $start);
if ($waited) {
$this->logLine(
pht(
'Acquired read lock after %s second(s).',
new PhutilNumber($waited)));
} else {
$this->logLine(
pht(
'Acquired read lock immediately.'));
}
} catch (Exception $ex) {
throw new PhutilProxyException(
pht(
'Failed to acquire read lock after waiting %s second(s). You '.
'may be able to retry later.',
- new PhutilNumber($lock_wait)));
+ new PhutilNumber($lock_wait)),
+ $ex);
}
$versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
$repository_phid);
$versions = mpull($versions, null, 'getDevicePHID');
$this_version = idx($versions, $device_phid);
if ($this_version) {
$this_version = (int)$this_version->getRepositoryVersion();
} else {
$this_version = -1;
}
if ($versions) {
// This is the normal case, where we have some version information and
// can identify which nodes are leaders. If the current node is not a
// leader, we want to fetch from a leader and then update our version.
$max_version = (int)max(mpull($versions, 'getRepositoryVersion'));
if ($max_version > $this_version) {
if ($repository->isHosted()) {
$fetchable = array();
foreach ($versions as $version) {
if ($version->getRepositoryVersion() == $max_version) {
$fetchable[] = $version->getDevicePHID();
}
}
$this->synchronizeWorkingCopyFromDevices($fetchable);
} else {
$this->synchornizeWorkingCopyFromRemote();
}
PhabricatorRepositoryWorkingCopyVersion::updateVersion(
$repository_phid,
$device_phid,
$max_version);
} else {
$this->logLine(
pht(
'Device "%s" is already a cluster leader and does not need '.
'to be synchronized.',
$device->getName()));
}
$result_version = $max_version;
} else {
// If no version records exist yet, we need to be careful, because we
// can not tell which nodes are leaders.
// There might be several nodes with arbitrary existing data, and we have
// no way to tell which one has the "right" data. If we pick wrong, we
// might erase some or all of the data in the repository.
// Since this is dangeorus, we refuse to guess unless there is only one
// device. If we're the only device in the group, we obviously must be
// a leader.
$service = $repository->loadAlmanacService();
if (!$service) {
throw new Exception(pht('Failed to load repository cluster service.'));
}
$bindings = $service->getActiveBindings();
$device_map = array();
foreach ($bindings as $binding) {
$device_map[$binding->getDevicePHID()] = true;
}
if (count($device_map) > 1) {
throw new Exception(
pht(
'Repository "%s" exists on more than one device, but no device '.
'has any repository version information. Phabricator can not '.
'guess which copy of the existing data is authoritative. Remove '.
'all but one device from service to mark the remaining device '.
'as the authority.',
$repository->getDisplayName()));
}
if (empty($device_map[$device->getPHID()])) {
throw new Exception(
pht(
'Repository "%s" is being synchronized on device "%s", but '.
'this device is not bound to the corresponding cluster '.
'service ("%s").',
$repository->getDisplayName(),
$device->getName(),
$service->getName()));
}
// The current device is the only device in service, so it must be a
// leader. We can safely have any future nodes which come online read
// from it.
PhabricatorRepositoryWorkingCopyVersion::updateVersion(
$repository_phid,
$device_phid,
0);
$result_version = 0;
}
$read_lock->unlock();
return $result_version;
}
/**
* @task sync
*/
public function synchronizeWorkingCopyBeforeWrite() {
if (!$this->shouldEnableSynchronization()) {
return;
}
$repository = $this->getRepository();
$viewer = $this->getViewer();
$repository_phid = $repository->getPHID();
$device = AlmanacKeys::getLiveDevice();
$device_phid = $device->getPHID();
$table = new PhabricatorRepositoryWorkingCopyVersion();
$locked_connection = $table->establishConnection('w');
$write_lock = PhabricatorRepositoryWorkingCopyVersion::getWriteLock(
$repository_phid);
$write_lock->useSpecificConnection($locked_connection);
$lock_wait = phutil_units('2 minutes in seconds');
$this->logLine(
pht(
'Waiting up to %s second(s) for a cluster write lock...',
new PhutilNumber($lock_wait)));
try {
$start = PhabricatorTime::getNow();
$write_lock->lock($lock_wait);
$waited = (PhabricatorTime::getNow() - $start);
if ($waited) {
$this->logLine(
pht(
'Acquired write lock after %s second(s).',
new PhutilNumber($waited)));
} else {
$this->logLine(
pht(
'Acquired write lock immediately.'));
}
} catch (Exception $ex) {
throw new PhutilProxyException(
pht(
'Failed to acquire write lock after waiting %s second(s). You '.
'may be able to retry later.',
new PhutilNumber($lock_wait)));
}
$versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
$repository_phid);
foreach ($versions as $version) {
if (!$version->getIsWriting()) {
continue;
}
throw new Exception(
pht(
'An previous write to this repository was interrupted; refusing '.
'new writes. This issue requires operator intervention to resolve, '.
'see "Write Interruptions" in the "Cluster: Repositories" in the '.
'documentation for instructions.'));
}
try {
$max_version = $this->synchronizeWorkingCopyBeforeRead();
} catch (Exception $ex) {
$write_lock->unlock();
throw $ex;
}
$pid = getmypid();
$hash = Filesystem::readRandomCharacters(12);
$this->clusterWriteOwner = "{$pid}.{$hash}";
PhabricatorRepositoryWorkingCopyVersion::willWrite(
$locked_connection,
$repository_phid,
$device_phid,
array(
'userPHID' => $viewer->getPHID(),
'epoch' => PhabricatorTime::getNow(),
'devicePHID' => $device_phid,
),
$this->clusterWriteOwner);
$this->clusterWriteVersion = $max_version;
$this->clusterWriteLock = $write_lock;
}
public function synchronizeWorkingCopyAfterDiscovery($new_version) {
if (!$this->shouldEnableSynchronization()) {
return;
}
$repository = $this->getRepository();
$repository_phid = $repository->getPHID();
if ($repository->isHosted()) {
return;
}
$viewer = $this->getViewer();
$device = AlmanacKeys::getLiveDevice();
$device_phid = $device->getPHID();
// NOTE: We are not holding a lock here because this method is only called
// from PhabricatorRepositoryDiscoveryEngine, which already holds a device
// lock. Even if we do race here and record an older version, the
// consequences are mild: we only do extra work to correct it later.
$versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
$repository_phid);
$versions = mpull($versions, null, 'getDevicePHID');
$this_version = idx($versions, $device_phid);
if ($this_version) {
$this_version = (int)$this_version->getRepositoryVersion();
} else {
$this_version = -1;
}
if ($new_version > $this_version) {
PhabricatorRepositoryWorkingCopyVersion::updateVersion(
$repository_phid,
$device_phid,
$new_version);
}
}
/**
* @task sync
*/
public function synchronizeWorkingCopyAfterWrite() {
if (!$this->shouldEnableSynchronization()) {
return;
}
if (!$this->clusterWriteLock) {
throw new Exception(
pht(
'Trying to synchronize after write, but not holding a write '.
'lock!'));
}
$repository = $this->getRepository();
$repository_phid = $repository->getPHID();
$device = AlmanacKeys::getLiveDevice();
$device_phid = $device->getPHID();
// It is possible that we've lost the global lock while receiving the push.
// For example, the master database may have been restarted between the
// time we acquired the global lock and now, when the push has finished.
// We wrote a durable lock while we were holding the the global lock,
// essentially upgrading our lock. We can still safely release this upgraded
// lock even if we're no longer holding the global lock.
// If we fail to release the lock, the repository will be frozen until
// an operator can figure out what happened, so we try pretty hard to
// reconnect to the database and release the lock.
$now = PhabricatorTime::getNow();
$duration = phutil_units('5 minutes in seconds');
$try_until = $now + $duration;
$did_release = false;
$already_failed = false;
while (PhabricatorTime::getNow() <= $try_until) {
try {
// NOTE: This means we're still bumping the version when pushes fail. We
// could select only un-rejected events instead to bump a little less
// often.
$new_log = id(new PhabricatorRepositoryPushEventQuery())
->setViewer(PhabricatorUser::getOmnipotentUser())
->withRepositoryPHIDs(array($repository_phid))
->setLimit(1)
->executeOne();
$old_version = $this->clusterWriteVersion;
if ($new_log) {
$new_version = $new_log->getID();
} else {
$new_version = $old_version;
}
PhabricatorRepositoryWorkingCopyVersion::didWrite(
$repository_phid,
$device_phid,
$this->clusterWriteVersion,
$new_version,
$this->clusterWriteOwner);
$did_release = true;
break;
} catch (AphrontConnectionQueryException $ex) {
$connection_exception = $ex;
} catch (AphrontConnectionLostQueryException $ex) {
$connection_exception = $ex;
}
if (!$already_failed) {
$already_failed = true;
$this->logLine(
pht('CRITICAL. Failed to release cluster write lock!'));
$this->logLine(
pht(
'The connection to the master database was lost while receiving '.
'the write.'));
$this->logLine(
pht(
'This process will spend %s more second(s) attempting to '.
'recover, then give up.',
new PhutilNumber($duration)));
}
sleep(1);
}
if ($did_release) {
if ($already_failed) {
$this->logLine(
pht('RECOVERED. Link to master database was restored.'));
}
$this->logLine(pht('Released cluster write lock.'));
} else {
throw new Exception(
pht(
'Failed to reconnect to master database and release held write '.
'lock ("%s") on device "%s" for repository "%s" after trying '.
'for %s seconds(s). This repository will be frozen.',
$this->clusterWriteOwner,
$device->getName(),
$this->getDisplayName(),
new PhutilNumber($duration)));
}
// We can continue even if we've lost this lock, everything is still
// consistent.
try {
$this->clusterWriteLock->unlock();
} catch (Exception $ex) {
// Ignore.
}
$this->clusterWriteLock = null;
$this->clusterWriteOwner = null;
}
/* -( Internals )---------------------------------------------------------- */
/**
* @task internal
*/
private function shouldEnableSynchronization() {
$repository = $this->getRepository();
$service_phid = $repository->getAlmanacServicePHID();
if (!$service_phid) {
return false;
}
// TODO: For now, this is only supported for Git.
if (!$repository->isGit()) {
return false;
}
$device = AlmanacKeys::getLiveDevice();
if (!$device) {
return false;
}
return true;
}
/**
* @task internal
*/
private function synchornizeWorkingCopyFromRemote() {
$repository = $this->getRepository();
$device = AlmanacKeys::getLiveDevice();
$local_path = $repository->getLocalPath();
$fetch_uri = $repository->getRemoteURIEnvelope();
if ($repository->isGit()) {
$this->requireWorkingCopy();
$argv = array(
'fetch --prune -- %P %s',
$fetch_uri,
'+refs/*:refs/*',
);
} else {
throw new Exception(pht('Remote sync only supported for git!'));
}
$future = DiffusionCommandEngine::newCommandEngine($repository)
->setArgv($argv)
->setSudoAsDaemon(true)
->setCredentialPHID($repository->getCredentialPHID())
->setURI($repository->getRemoteURIObject())
->newFuture();
$future->setCWD($local_path);
try {
$future->resolvex();
} catch (Exception $ex) {
$this->logLine(
pht(
'Synchronization of "%s" from remote failed: %s',
$device->getName(),
$ex->getMessage()));
throw $ex;
}
}
/**
* @task internal
*/
private function synchronizeWorkingCopyFromDevices(array $device_phids) {
$repository = $this->getRepository();
$service = $repository->loadAlmanacService();
if (!$service) {
throw new Exception(pht('Failed to load repository cluster service.'));
}
$device_map = array_fuse($device_phids);
$bindings = $service->getActiveBindings();
$fetchable = array();
foreach ($bindings as $binding) {
// We can't fetch from nodes which don't have the newest version.
$device_phid = $binding->getDevicePHID();
if (empty($device_map[$device_phid])) {
continue;
}
// TODO: For now, only fetch over SSH. We could support fetching over
// HTTP eventually.
if ($binding->getAlmanacPropertyValue('protocol') != 'ssh') {
continue;
}
$fetchable[] = $binding;
}
if (!$fetchable) {
throw new Exception(
pht(
'Leader lost: no up-to-date nodes in repository cluster are '.
'fetchable.'));
}
$caught = null;
foreach ($fetchable as $binding) {
try {
$this->synchronizeWorkingCopyFromBinding($binding);
$caught = null;
break;
} catch (Exception $ex) {
$caught = $ex;
}
}
if ($caught) {
throw $caught;
}
}
/**
* @task internal
*/
private function synchronizeWorkingCopyFromBinding($binding) {
$repository = $this->getRepository();
$device = AlmanacKeys::getLiveDevice();
$this->logLine(
pht(
'Synchronizing this device ("%s") from cluster leader ("%s") before '.
'read.',
$device->getName(),
$binding->getDevice()->getName()));
$fetch_uri = $repository->getClusterRepositoryURIFromBinding($binding);
$local_path = $repository->getLocalPath();
if ($repository->isGit()) {
$this->requireWorkingCopy();
$argv = array(
'fetch --prune -- %s %s',
$fetch_uri,
'+refs/*:refs/*',
);
} else {
throw new Exception(pht('Binding sync only supported for git!'));
}
$future = DiffusionCommandEngine::newCommandEngine($repository)
->setArgv($argv)
->setConnectAsDevice(true)
->setSudoAsDaemon(true)
->setURI($fetch_uri)
->newFuture();
$future->setCWD($local_path);
try {
$future->resolvex();
} catch (Exception $ex) {
$this->logLine(
pht(
'Synchronization of "%s" from leader "%s" failed: %s',
$device->getName(),
$binding->getDevice()->getName(),
$ex->getMessage()));
throw $ex;
}
}
/**
* @task internal
*/
private function logLine($message) {
return $this->logText("# {$message}\n");
}
/**
* @task internal
*/
private function logText($message) {
$log = $this->logger;
if ($log) {
$log->writeClusterEngineLogMessage($message);
}
return $this;
}
private function requireWorkingCopy() {
$repository = $this->getRepository();
$local_path = $repository->getLocalPath();
if (!Filesystem::pathExists($local_path)) {
$device = AlmanacKeys::getLiveDevice();
throw new Exception(
pht(
'Repository "%s" does not have a working copy on this device '.
'yet, so it can not be synchronized. Wait for the daemons to '.
'construct one or run `bin/repository update %s` on this host '.
'("%s") to build it explicitly.',
$repository->getDisplayName(),
$repository->getMonogram(),
$device->getName()));
}
}
}
diff --git a/src/infrastructure/storage/management/workflow/PhabricatorStorageManagementWorkflow.php b/src/infrastructure/storage/management/workflow/PhabricatorStorageManagementWorkflow.php
index a6cd8b6e77..085debb914 100644
--- a/src/infrastructure/storage/management/workflow/PhabricatorStorageManagementWorkflow.php
+++ b/src/infrastructure/storage/management/workflow/PhabricatorStorageManagementWorkflow.php
@@ -1,1137 +1,1151 @@
<?php
abstract class PhabricatorStorageManagementWorkflow
extends PhabricatorManagementWorkflow {
private $apis = array();
private $dryRun;
private $force;
private $patches;
private $didInitialize;
final public function setAPIs(array $apis) {
$this->apis = $apis;
return $this;
}
final public function getAnyAPI() {
return head($this->getAPIs());
}
final public function getMasterAPIs() {
$apis = $this->getAPIs();
$results = array();
foreach ($apis as $api) {
if ($api->getRef()->getIsMaster()) {
$results[] = $api;
}
}
if (!$results) {
throw new PhutilArgumentUsageException(
pht(
'This command only operates on database masters, but the selected '.
'database hosts do not include any masters.'));
}
return $results;
}
final public function getSingleAPI() {
$apis = $this->getAPIs();
if (count($apis) == 1) {
return head($apis);
}
throw new PhutilArgumentUsageException(
pht(
'Phabricator is configured in cluster mode, with multiple database '.
'hosts. Use "--host" to specify which host you want to operate on.'));
}
final public function getAPIs() {
return $this->apis;
}
final protected function isDryRun() {
return $this->dryRun;
}
final protected function setDryRun($dry_run) {
$this->dryRun = $dry_run;
return $this;
}
final protected function isForce() {
return $this->force;
}
final protected function setForce($force) {
$this->force = $force;
return $this;
}
public function getPatches() {
return $this->patches;
}
public function setPatches(array $patches) {
assert_instances_of($patches, 'PhabricatorStoragePatch');
$this->patches = $patches;
return $this;
}
protected function isReadOnlyWorkflow() {
return false;
}
public function execute(PhutilArgumentParser $args) {
$this->setDryRun($args->getArg('dryrun'));
$this->setForce($args->getArg('force'));
if (!$this->isReadOnlyWorkflow()) {
if (PhabricatorEnv::isReadOnly()) {
if ($this->isForce()) {
PhabricatorEnv::setReadOnly(false, null);
} else {
throw new PhutilArgumentUsageException(
pht(
'Phabricator is currently in read-only mode. Use --force to '.
'override this mode.'));
}
}
}
return $this->didExecute($args);
}
public function didExecute(PhutilArgumentParser $args) {}
private function loadSchemata(PhabricatorStorageManagementAPI $api) {
$query = id(new PhabricatorConfigSchemaQuery());
$ref = $api->getRef();
$ref_key = $ref->getRefKey();
$query->setAPIs(array($api));
$query->setRefs(array($ref));
$actual = $query->loadActualSchemata();
$expect = $query->loadExpectedSchemata();
$comp = $query->buildComparisonSchemata($expect, $actual);
return array(
$comp[$ref_key],
$expect[$ref_key],
$actual[$ref_key],
);
}
final protected function adjustSchemata(
PhabricatorStorageManagementAPI $api,
$unsafe) {
$lock = $this->lock($api);
try {
$err = $this->doAdjustSchemata($api, $unsafe);
} catch (Exception $ex) {
$lock->unlock();
throw $ex;
}
$lock->unlock();
return $err;
}
final private function doAdjustSchemata(
PhabricatorStorageManagementAPI $api,
$unsafe) {
$console = PhutilConsole::getConsole();
$console->writeOut(
"%s\n",
pht(
'Verifying database schemata on "%s"...',
$api->getRef()->getRefKey()));
list($adjustments, $errors) = $this->findAdjustments($api);
if (!$adjustments) {
$console->writeOut(
"%s\n",
pht('Found no adjustments for schemata.'));
return $this->printErrors($errors, 0);
}
if (!$this->force && !$api->isCharacterSetAvailable('utf8mb4')) {
$message = pht(
"You have an old version of MySQL (older than 5.5) which does not ".
"support the utf8mb4 character set. We strongly recomend upgrading to ".
"5.5 or newer.\n\n".
"If you apply adjustments now and later update MySQL to 5.5 or newer, ".
"you'll need to apply adjustments again (and they will take a long ".
"time).\n\n".
"You can exit this workflow, update MySQL now, and then run this ".
"workflow again. This is recommended, but may cause a lot of downtime ".
"right now.\n\n".
"You can exit this workflow, continue using Phabricator without ".
"applying adjustments, update MySQL at a later date, and then run ".
"this workflow again. This is also a good approach, and will let you ".
"delay downtime until later.\n\n".
"You can proceed with this workflow, and then optionally update ".
"MySQL at a later date. After you do, you'll need to apply ".
"adjustments again.\n\n".
"For more information, see \"Managing Storage Adjustments\" in ".
"the documentation.");
$console->writeOut(
"\n**<bg:yellow> %s </bg>**\n\n%s\n",
pht('OLD MySQL VERSION'),
phutil_console_wrap($message));
$prompt = pht('Continue with old MySQL version?');
if (!phutil_console_confirm($prompt, $default_no = true)) {
return;
}
}
$table = id(new PhutilConsoleTable())
->addColumn('database', array('title' => pht('Database')))
->addColumn('table', array('title' => pht('Table')))
->addColumn('name', array('title' => pht('Name')))
->addColumn('info', array('title' => pht('Issues')));
foreach ($adjustments as $adjust) {
$info = array();
foreach ($adjust['issues'] as $issue) {
$info[] = PhabricatorConfigStorageSchema::getIssueName($issue);
}
$table->addRow(array(
'database' => $adjust['database'],
'table' => idx($adjust, 'table'),
'name' => idx($adjust, 'name'),
'info' => implode(', ', $info),
));
}
$console->writeOut("\n\n");
$table->draw();
if ($this->dryRun) {
$console->writeOut(
"%s\n",
pht('DRYRUN: Would apply adjustments.'));
return 0;
} else if ($this->didInitialize) {
// If we just initialized the database, continue without prompting. This
// is nicer for first-time setup and there's no reasonable reason any
// user would ever answer "no" to the prompt against an empty schema.
} else if (!$this->force) {
$console->writeOut(
"\n%s\n",
pht(
"Found %s adjustment(s) to apply, detailed above.\n\n".
"You can review adjustments in more detail from the web interface, ".
"in Config > Database Status. To better understand the adjustment ".
"workflow, see \"Managing Storage Adjustments\" in the ".
"documentation.\n\n".
"MySQL needs to copy table data to make some adjustments, so these ".
"migrations may take some time.",
phutil_count($adjustments)));
$prompt = pht('Apply these schema adjustments?');
if (!phutil_console_confirm($prompt, $default_no = true)) {
return 1;
}
}
$console->writeOut(
"%s\n",
pht('Applying schema adjustments...'));
$conn = $api->getConn(null);
if ($unsafe) {
queryfx($conn, 'SET SESSION sql_mode = %s', '');
} else {
queryfx($conn, 'SET SESSION sql_mode = %s', 'STRICT_ALL_TABLES');
}
$failed = array();
// We make changes in several phases.
$phases = array(
// Drop surplus autoincrements. This allows us to drop primary keys on
// autoincrement columns.
'drop_auto',
// Drop all keys we're going to adjust. This prevents them from
// interfering with column changes.
'drop_keys',
// Apply all database, table, and column changes.
'main',
// Restore adjusted keys.
'add_keys',
// Add missing autoincrements.
'add_auto',
);
$bar = id(new PhutilConsoleProgressBar())
->setTotal(count($adjustments) * count($phases));
foreach ($phases as $phase) {
foreach ($adjustments as $adjust) {
try {
switch ($adjust['kind']) {
case 'database':
if ($phase == 'main') {
queryfx(
$conn,
'ALTER DATABASE %T CHARACTER SET = %s COLLATE = %s',
$adjust['database'],
$adjust['charset'],
$adjust['collation']);
}
break;
case 'table':
if ($phase == 'main') {
queryfx(
$conn,
'ALTER TABLE %T.%T COLLATE = %s',
$adjust['database'],
$adjust['table'],
$adjust['collation']);
}
break;
case 'column':
$apply = false;
$auto = false;
$new_auto = idx($adjust, 'auto');
if ($phase == 'drop_auto') {
if ($new_auto === false) {
$apply = true;
$auto = false;
}
} else if ($phase == 'main') {
$apply = true;
if ($new_auto === false) {
$auto = false;
} else {
$auto = $adjust['is_auto'];
}
} else if ($phase == 'add_auto') {
if ($new_auto === true) {
$apply = true;
$auto = true;
}
}
if ($apply) {
$parts = array();
if ($auto) {
$parts[] = qsprintf(
$conn,
'AUTO_INCREMENT');
}
if ($adjust['charset']) {
$parts[] = qsprintf(
$conn,
'CHARACTER SET %Q COLLATE %Q',
$adjust['charset'],
$adjust['collation']);
}
queryfx(
$conn,
'ALTER TABLE %T.%T MODIFY %T %Q %Q %Q',
$adjust['database'],
$adjust['table'],
$adjust['name'],
$adjust['type'],
implode(' ', $parts),
$adjust['nullable'] ? 'NULL' : 'NOT NULL');
}
break;
case 'key':
if (($phase == 'drop_keys') && $adjust['exists']) {
if ($adjust['name'] == 'PRIMARY') {
$key_name = 'PRIMARY KEY';
} else {
$key_name = qsprintf($conn, 'KEY %T', $adjust['name']);
}
queryfx(
$conn,
'ALTER TABLE %T.%T DROP %Q',
$adjust['database'],
$adjust['table'],
$key_name);
}
if (($phase == 'add_keys') && $adjust['keep']) {
// Different keys need different creation syntax. Notable
// special cases are primary keys and fulltext keys.
if ($adjust['name'] == 'PRIMARY') {
$key_name = 'PRIMARY KEY';
} else if ($adjust['indexType'] == 'FULLTEXT') {
$key_name = qsprintf($conn, 'FULLTEXT %T', $adjust['name']);
} else {
if ($adjust['unique']) {
$key_name = qsprintf(
$conn,
'UNIQUE KEY %T',
$adjust['name']);
} else {
$key_name = qsprintf(
$conn,
'/* NONUNIQUE */ KEY %T',
$adjust['name']);
}
}
queryfx(
$conn,
'ALTER TABLE %T.%T ADD %Q (%Q)',
$adjust['database'],
$adjust['table'],
$key_name,
implode(', ', $adjust['columns']));
}
break;
default:
throw new Exception(
pht('Unknown schema adjustment kind "%s"!', $adjust['kind']));
}
} catch (AphrontQueryException $ex) {
$failed[] = array($adjust, $ex);
}
$bar->update(1);
}
}
$bar->done();
if (!$failed) {
$console->writeOut(
"%s\n",
pht('Completed applying all schema adjustments.'));
$err = 0;
} else {
$table = id(new PhutilConsoleTable())
->addColumn('target', array('title' => pht('Target')))
->addColumn('error', array('title' => pht('Error')));
foreach ($failed as $failure) {
list($adjust, $ex) = $failure;
$pieces = array_select_keys(
$adjust,
array('database', 'table', 'name'));
$pieces = array_filter($pieces);
$target = implode('.', $pieces);
$table->addRow(
array(
'target' => $target,
'error' => $ex->getMessage(),
));
}
$console->writeOut("\n");
$table->draw();
$console->writeOut(
"\n%s\n",
pht('Failed to make some schema adjustments, detailed above.'));
$console->writeOut(
"%s\n",
pht(
'For help troubleshooting adjustments, see "Managing Storage '.
'Adjustments" in the documentation.'));
$err = 1;
}
return $this->printErrors($errors, $err);
}
private function findAdjustments(
PhabricatorStorageManagementAPI $api) {
list($comp, $expect, $actual) = $this->loadSchemata($api);
$issue_charset = PhabricatorConfigStorageSchema::ISSUE_CHARSET;
$issue_collation = PhabricatorConfigStorageSchema::ISSUE_COLLATION;
$issue_columntype = PhabricatorConfigStorageSchema::ISSUE_COLUMNTYPE;
$issue_surpluskey = PhabricatorConfigStorageSchema::ISSUE_SURPLUSKEY;
$issue_missingkey = PhabricatorConfigStorageSchema::ISSUE_MISSINGKEY;
$issue_columns = PhabricatorConfigStorageSchema::ISSUE_KEYCOLUMNS;
$issue_unique = PhabricatorConfigStorageSchema::ISSUE_UNIQUE;
$issue_longkey = PhabricatorConfigStorageSchema::ISSUE_LONGKEY;
$issue_auto = PhabricatorConfigStorageSchema::ISSUE_AUTOINCREMENT;
$adjustments = array();
$errors = array();
foreach ($comp->getDatabases() as $database_name => $database) {
foreach ($this->findErrors($database) as $issue) {
$errors[] = array(
'database' => $database_name,
'issue' => $issue,
);
}
$expect_database = $expect->getDatabase($database_name);
$actual_database = $actual->getDatabase($database_name);
if (!$expect_database || !$actual_database) {
// If there's a real issue here, skip this stuff.
continue;
}
if ($actual_database->getAccessDenied()) {
// If we can't access the database, we can't access the tables either.
continue;
}
$issues = array();
if ($database->hasIssue($issue_charset)) {
$issues[] = $issue_charset;
}
if ($database->hasIssue($issue_collation)) {
$issues[] = $issue_collation;
}
if ($issues) {
$adjustments[] = array(
'kind' => 'database',
'database' => $database_name,
'issues' => $issues,
'charset' => $expect_database->getCharacterSet(),
'collation' => $expect_database->getCollation(),
);
}
foreach ($database->getTables() as $table_name => $table) {
foreach ($this->findErrors($table) as $issue) {
$errors[] = array(
'database' => $database_name,
'table' => $table_name,
'issue' => $issue,
);
}
$expect_table = $expect_database->getTable($table_name);
$actual_table = $actual_database->getTable($table_name);
if (!$expect_table || !$actual_table) {
continue;
}
$issues = array();
if ($table->hasIssue($issue_collation)) {
$issues[] = $issue_collation;
}
if ($issues) {
$adjustments[] = array(
'kind' => 'table',
'database' => $database_name,
'table' => $table_name,
'issues' => $issues,
'collation' => $expect_table->getCollation(),
);
}
foreach ($table->getColumns() as $column_name => $column) {
foreach ($this->findErrors($column) as $issue) {
$errors[] = array(
'database' => $database_name,
'table' => $table_name,
'name' => $column_name,
'issue' => $issue,
);
}
$expect_column = $expect_table->getColumn($column_name);
$actual_column = $actual_table->getColumn($column_name);
if (!$expect_column || !$actual_column) {
continue;
}
$issues = array();
if ($column->hasIssue($issue_collation)) {
$issues[] = $issue_collation;
}
if ($column->hasIssue($issue_charset)) {
$issues[] = $issue_charset;
}
if ($column->hasIssue($issue_columntype)) {
$issues[] = $issue_columntype;
}
if ($column->hasIssue($issue_auto)) {
$issues[] = $issue_auto;
}
if ($issues) {
if ($expect_column->getCharacterSet() === null) {
// For non-text columns, we won't be specifying a collation or
// character set.
$charset = null;
$collation = null;
} else {
$charset = $expect_column->getCharacterSet();
$collation = $expect_column->getCollation();
}
$adjustment = array(
'kind' => 'column',
'database' => $database_name,
'table' => $table_name,
'name' => $column_name,
'issues' => $issues,
'collation' => $collation,
'charset' => $charset,
'type' => $expect_column->getColumnType(),
// NOTE: We don't adjust column nullability because it is
// dangerous, so always use the current nullability.
'nullable' => $actual_column->getNullable(),
// NOTE: This always stores the current value, because we have
// to make these updates separately.
'is_auto' => $actual_column->getAutoIncrement(),
);
if ($column->hasIssue($issue_auto)) {
$adjustment['auto'] = $expect_column->getAutoIncrement();
}
$adjustments[] = $adjustment;
}
}
foreach ($table->getKeys() as $key_name => $key) {
foreach ($this->findErrors($key) as $issue) {
$errors[] = array(
'database' => $database_name,
'table' => $table_name,
'name' => $key_name,
'issue' => $issue,
);
}
$expect_key = $expect_table->getKey($key_name);
$actual_key = $actual_table->getKey($key_name);
$issues = array();
$keep_key = true;
if ($key->hasIssue($issue_surpluskey)) {
$issues[] = $issue_surpluskey;
$keep_key = false;
}
if ($key->hasIssue($issue_missingkey)) {
$issues[] = $issue_missingkey;
}
if ($key->hasIssue($issue_columns)) {
$issues[] = $issue_columns;
}
if ($key->hasIssue($issue_unique)) {
$issues[] = $issue_unique;
}
// NOTE: We can't really fix this, per se, but we may need to remove
// the key to change the column type. In the best case, the new
// column type won't be overlong and recreating the key really will
// fix the issue. In the worst case, we get the right column type and
// lose the key, which is still better than retaining the key having
// the wrong column type.
if ($key->hasIssue($issue_longkey)) {
$issues[] = $issue_longkey;
}
if ($issues) {
$adjustment = array(
'kind' => 'key',
'database' => $database_name,
'table' => $table_name,
'name' => $key_name,
'issues' => $issues,
'exists' => (bool)$actual_key,
'keep' => $keep_key,
);
if ($keep_key) {
$adjustment += array(
'columns' => $expect_key->getColumnNames(),
'unique' => $expect_key->getUnique(),
'indexType' => $expect_key->getIndexType(),
);
}
$adjustments[] = $adjustment;
}
}
}
}
return array($adjustments, $errors);
}
private function findErrors(PhabricatorConfigStorageSchema $schema) {
$result = array();
foreach ($schema->getLocalIssues() as $issue) {
$status = PhabricatorConfigStorageSchema::getIssueStatus($issue);
if ($status == PhabricatorConfigStorageSchema::STATUS_FAIL) {
$result[] = $issue;
}
}
return $result;
}
private function printErrors(array $errors, $default_return) {
if (!$errors) {
return $default_return;
}
$console = PhutilConsole::getConsole();
$table = id(new PhutilConsoleTable())
->addColumn('target', array('title' => pht('Target')))
->addColumn('error', array('title' => pht('Error')));
$any_surplus = false;
$all_surplus = true;
$any_access = false;
$all_access = true;
foreach ($errors as $error) {
$pieces = array_select_keys(
$error,
array('database', 'table', 'name'));
$pieces = array_filter($pieces);
$target = implode('.', $pieces);
$name = PhabricatorConfigStorageSchema::getIssueName($error['issue']);
$issue = $error['issue'];
if ($issue === PhabricatorConfigStorageSchema::ISSUE_SURPLUS) {
$any_surplus = true;
} else {
$all_surplus = false;
}
if ($issue === PhabricatorConfigStorageSchema::ISSUE_ACCESSDENIED) {
$any_access = true;
} else {
$all_access = false;
}
$table->addRow(
array(
'target' => $target,
'error' => $name,
));
}
$console->writeOut("\n");
$table->draw();
$console->writeOut("\n");
$message = array();
if ($all_surplus) {
$message[] = pht(
'You have surplus schemata (extra tables or columns which Phabricator '.
'does not expect). For information on resolving these '.
'issues, see the "Surplus Schemata" section in the "Managing Storage '.
'Adjustments" article in the documentation.');
} else if ($all_access) {
$message[] = pht(
'The user you are connecting to MySQL with does not have the correct '.
'permissions, and can not access some databases or tables that it '.
'needs to be able to access. GRANT the user additional permissions.');
} else {
$message[] = pht(
'The schemata have errors (detailed above) which the adjustment '.
'workflow can not fix.');
if ($any_access) {
$message[] = pht(
'Some of these errors are caused by access control problems. '.
'The user you are connecting with does not have permission to see '.
'all of the database or tables that Phabricator uses. You need to '.
'GRANT the user more permission, or use a different user.');
}
if ($any_surplus) {
$message[] = pht(
'Some of these errors are caused by surplus schemata (extra '.
'tables or columns which Phabricator does not expect). These are '.
'not serious. For information on resolving these issues, see the '.
'"Surplus Schemata" section in the "Managing Storage Adjustments" '.
'article in the documentation.');
}
$message[] = pht(
'If you are not developing Phabricator itself, report this issue to '.
'the upstream.');
$message[] = pht(
'If you are developing Phabricator, these errors usually indicate '.
'that your schema specifications do not agree with the schemata your '.
'code actually builds.');
}
$message = implode("\n\n", $message);
if ($all_surplus) {
$console->writeOut(
"**<bg:yellow> %s </bg>**\n\n%s\n",
pht('SURPLUS SCHEMATA'),
phutil_console_wrap($message));
} else if ($all_access) {
$console->writeOut(
"**<bg:yellow> %s </bg>**\n\n%s\n",
pht('ACCESS DENIED'),
phutil_console_wrap($message));
} else {
$console->writeOut(
"**<bg:red> %s </bg>**\n\n%s\n",
pht('SCHEMATA ERRORS'),
phutil_console_wrap($message));
}
return 2;
}
final protected function upgradeSchemata(
array $apis,
$apply_only = null,
$no_quickstart = false,
$init_only = false) {
$locks = array();
foreach ($apis as $api) {
- $ref_key = $api->getRef()->getRefKey();
$locks[] = $this->lock($api);
}
try {
$this->doUpgradeSchemata($apis, $apply_only, $no_quickstart, $init_only);
} catch (Exception $ex) {
foreach ($locks as $lock) {
$lock->unlock();
}
throw $ex;
}
foreach ($locks as $lock) {
$lock->unlock();
}
}
final private function doUpgradeSchemata(
array $apis,
$apply_only,
$no_quickstart,
$init_only) {
$patches = $this->patches;
$is_dryrun = $this->dryRun;
$api_map = array();
foreach ($apis as $api) {
$api_map[$api->getRef()->getRefKey()] = $api;
}
foreach ($api_map as $ref_key => $api) {
$applied = $api->getAppliedPatches();
$needs_init = ($applied === null);
if (!$needs_init) {
continue;
}
if ($is_dryrun) {
echo tsprintf(
"%s\n",
pht(
'DRYRUN: Storage on host "%s" does not exist yet, so it '.
'would be created.',
$ref_key));
continue;
}
if ($apply_only) {
throw new PhutilArgumentUsageException(
pht(
'Storage on host "%s" has not been initialized yet. You must '.
'initialize storage before selectively applying patches.',
$ref_key));
}
// If we're initializing storage for the first time on any host, track
// it so that we can give the user a nicer experience during the
// subsequent adjustment phase.
$this->didInitialize = true;
$legacy = $api->getLegacyPatches($patches);
if ($legacy || $no_quickstart || $init_only) {
// If we have legacy patches, we can't quickstart.
$api->createDatabase('meta_data');
$api->createTable(
'meta_data',
'patch_status',
array(
'patch VARCHAR(255) NOT NULL PRIMARY KEY COLLATE utf8_general_ci',
'applied INT UNSIGNED NOT NULL',
));
foreach ($legacy as $patch) {
$api->markPatchApplied($patch);
}
} else {
echo tsprintf(
"%s\n",
pht(
'Loading quickstart template onto "%s"...',
$ref_key));
$root = dirname(phutil_get_library_root('phabricator'));
$sql = $root.'/resources/sql/quickstart.sql';
$api->applyPatchSQL($sql);
}
}
if ($init_only) {
echo pht('Storage initialized.')."\n";
return 0;
}
$applied_map = array();
foreach ($api_map as $ref_key => $api) {
$applied = $api->getAppliedPatches();
// If we still have nothing applied, this is a dry run and we didn't
// actually initialize storage. Here, just do nothing.
if ($applied === null) {
if ($is_dryrun) {
continue;
} else {
throw new Exception(
pht(
'Database initialization on host "%s" applied no patches!',
$ref_key));
}
}
$applied = array_fuse($applied);
if ($apply_only) {
if (isset($applied[$apply_only])) {
if (!$this->force && !$is_dryrun) {
echo phutil_console_wrap(
pht(
'Patch "%s" has already been applied on host "%s". Are you '.
'sure you want to apply it again? This may put your storage '.
'in a state that the upgrade scripts can not automatically '.
'manage.',
$apply_only,
$ref_key));
if (!phutil_console_confirm(pht('Apply patch again?'))) {
echo pht('Cancelled.')."\n";
return 1;
}
}
// Mark this patch as not yet applied on this host.
unset($applied[$apply_only]);
}
}
$applied_map[$ref_key] = $applied;
}
// If we're applying only a specific patch, select just that patch.
if ($apply_only) {
$patches = array_select_keys($patches, array($apply_only));
}
// Apply each patch to each database. We apply patches patch-by-patch,
// not database-by-database: for each patch we apply it to every database,
// then move to the next patch.
// We must do this because ".php" patches may depend on ".sql" patches
// being up to date on all masters, and that will work fine if we put each
// patch on every host before moving on. If we try to bring database hosts
// up to date one at a time we can end up in a big mess.
$duration_map = array();
+
+ // First, find any global patches which have been applied to ANY database.
+ // We are just going to mark these as applied without actually running
+ // them. Otherwise, adding new empty masters to an existing cluster will
+ // try to apply them against invalid states.
+ foreach ($patches as $key => $patch) {
+ if ($patch->getIsGlobalPatch()) {
+ foreach ($applied_map as $ref_key => $applied) {
+ if (isset($applied[$key])) {
+ $duration_map[$key] = 1;
+ }
+ }
+ }
+ }
+
while (true) {
$applied_something = false;
foreach ($patches as $key => $patch) {
// First, check if any databases need this patch. We can just skip it
// if it has already been applied everywhere.
$need_patch = array();
foreach ($applied_map as $ref_key => $applied) {
if (isset($applied[$key])) {
continue;
}
$need_patch[] = $ref_key;
}
if (!$need_patch) {
unset($patches[$key]);
continue;
}
// Check if we can apply this patch yet. Before we can apply a patch,
// all of the dependencies for the patch must have been applied on all
// databases. Requiring that all databases stay in sync prevents one
// database from racing ahead if it happens to get a patch that nothing
// else has yet.
$missing_patch = null;
foreach ($patch->getAfter() as $after) {
foreach ($applied_map as $ref_key => $applied) {
if (isset($applied[$after])) {
// This database already has the patch. We can apply it to
// other databases but don't need to apply it here.
continue;
}
$missing_patch = $after;
break 2;
}
}
if ($missing_patch) {
if ($apply_only) {
echo tsprintf(
"%s\n",
pht(
'Unable to apply patch "%s" because it depends on patch '.
'"%s", which has not been applied on some hosts: %s.',
$apply_only,
$missing_patch,
implode(', ', $need_patch)));
return 1;
} else {
// Some databases are missing the dependencies, so keep trying
// other patches instead. If everything goes right, we'll apply the
// dependencies and then come back and apply this patch later.
continue;
}
}
$is_global = $patch->getIsGlobalPatch();
$patch_apis = array_select_keys($api_map, $need_patch);
foreach ($patch_apis as $ref_key => $api) {
if ($is_global) {
// If this is a global patch which we previously applied, just
// read the duration from the map without actually applying
// the patch.
$duration = idx($duration_map, $key);
} else {
$duration = null;
}
if ($duration === null) {
if ($is_dryrun) {
echo tsprintf(
"%s\n",
pht(
'DRYRUN: Would apply patch "%s" to host "%s".',
$key,
$ref_key));
} else {
echo tsprintf(
"%s\n",
pht(
'Applying patch "%s" to host "%s"...',
$key,
$ref_key));
}
$t_begin = microtime(true);
$api->applyPatch($patch);
$t_end = microtime(true);
$duration = ($t_end - $t_begin);
$duration_map[$key] = $duration;
}
// If we're explicitly reapplying this patch, we don't need to
// mark it as applied.
if (!isset($applied_map[$ref_key][$key])) {
$api->markPatchApplied($key, ($t_end - $t_begin));
$applied_map[$ref_key][$key] = true;
}
}
// We applied this everywhere, so we're done with the patch.
unset($patches[$key]);
$applied_something = true;
}
if (!$applied_something) {
if ($patches) {
throw new Exception(
pht(
'Some patches could not be applied: %s',
implode(', ', array_keys($patches))));
} else if (!$is_dryrun && !$apply_only) {
echo pht(
'Storage is up to date. Use "%s" for details.',
'storage status')."\n";
}
break;
}
}
}
final protected function getBareHostAndPort($host) {
// Split out port information, since the command-line client requires a
// separate flag for the port.
$uri = new PhutilURI('mysql://'.$host);
if ($uri->getPort()) {
$port = $uri->getPort();
$bare_hostname = $uri->getDomain();
} else {
$port = null;
$bare_hostname = $host;
}
return array($bare_hostname, $port);
}
/**
* Acquires a @{class:PhabricatorGlobalLock}.
*
* @return PhabricatorGlobalLock
*/
final protected function lock(PhabricatorStorageManagementAPI $api) {
// Although we're holding this lock on different databases so it could
// have the same name on each as far as the database is concerned, the
// locks would be the same within this process.
$lock_name = 'adjust/'.$api->getRef()->getRefKey();
return PhabricatorGlobalLock::newLock($lock_name)
->useSpecificConnection($api->getConn(null))
->lock();
}
}

File Metadata

Mime Type
text/x-diff
Expires
Tue, Jul 29, 9:26 AM (3 w, 1 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
188443
Default Alt Text
(58 KB)

Event Timeline