Page MenuHomestyx hydra

No OneTemporary

diff --git a/src/applications/diffusion/ssh/DiffusionGitReceivePackSSHWorkflow.php b/src/applications/diffusion/ssh/DiffusionGitReceivePackSSHWorkflow.php
index abf2a4323e..f59a9b58b4 100644
--- a/src/applications/diffusion/ssh/DiffusionGitReceivePackSSHWorkflow.php
+++ b/src/applications/diffusion/ssh/DiffusionGitReceivePackSSHWorkflow.php
@@ -1,151 +1,138 @@
<?php
final class DiffusionGitReceivePackSSHWorkflow extends DiffusionGitSSHWorkflow {
protected function didConstruct() {
$this->setName('git-receive-pack');
$this->setArguments(
array(
array(
'name' => 'dir',
'wildcard' => true,
),
));
}
protected function executeRepositoryOperations() {
+ // This is a write, and must have write access.
+ $this->requireWriteAccess();
+
+ $is_proxy = $this->shouldProxy();
+ if ($is_proxy) {
+ return $this->executeRepositoryProxyOperations($for_write = true);
+ }
+
$host_wait_start = microtime(true);
$repository = $this->getRepository();
$viewer = $this->getSSHUser();
$device = AlmanacKeys::getLiveDevice();
- // This is a write, and must have write access.
- $this->requireWriteAccess();
-
$cluster_engine = id(new DiffusionRepositoryClusterEngine())
->setViewer($viewer)
->setRepository($repository)
->setLog($this);
- $is_proxy = $this->shouldProxy();
- if ($is_proxy) {
- $command = $this->getProxyCommand(true);
- $did_write = false;
-
- if ($device) {
- $this->writeClusterEngineLogMessage(
- pht(
- "# Push received by \"%s\", forwarding to cluster host.\n",
- $device->getName()));
- }
- } else {
- $command = csprintf('git-receive-pack %s', $repository->getLocalPath());
- $did_write = true;
- $cluster_engine->synchronizeWorkingCopyBeforeWrite();
-
- if ($device) {
- $this->writeClusterEngineLogMessage(
- pht(
- "# Ready to receive on cluster host \"%s\".\n",
- $device->getName()));
- }
+ $command = csprintf('git-receive-pack %s', $repository->getLocalPath());
+ $cluster_engine->synchronizeWorkingCopyBeforeWrite();
+
+ if ($device) {
+ $this->writeClusterEngineLogMessage(
+ pht(
+ "# Ready to receive on cluster host \"%s\".\n",
+ $device->getName()));
}
$log = $this->newProtocolLog($is_proxy);
if ($log) {
$this->setProtocolLog($log);
$log->didStartSession($command);
}
$caught = null;
try {
$err = $this->executeRepositoryCommand($command);
} catch (Exception $ex) {
$caught = $ex;
}
if ($log) {
$log->didEndSession();
}
// We've committed the write (or rejected it), so we can release the lock
// without waiting for the client to receive the acknowledgement.
- if ($did_write) {
- $cluster_engine->synchronizeWorkingCopyAfterWrite();
- }
+ $cluster_engine->synchronizeWorkingCopyAfterWrite();
if ($caught) {
throw $caught;
}
if (!$err) {
$this->waitForGitClient();
// When a repository is clustered, we reach this cleanup code on both
// the proxy and the actual final endpoint node. Don't do more cleanup
// or logging than we need to.
- if ($did_write) {
- $repository->writeStatusMessage(
- PhabricatorRepositoryStatusMessage::TYPE_NEEDS_UPDATE,
- PhabricatorRepositoryStatusMessage::CODE_OKAY);
-
- $host_wait_end = microtime(true);
-
- $this->updatePushLogWithTimingInformation(
- $this->getClusterEngineLogProperty('writeWait'),
- $this->getClusterEngineLogProperty('readWait'),
- ($host_wait_end - $host_wait_start));
- }
+ $repository->writeStatusMessage(
+ PhabricatorRepositoryStatusMessage::TYPE_NEEDS_UPDATE,
+ PhabricatorRepositoryStatusMessage::CODE_OKAY);
+
+ $host_wait_end = microtime(true);
+
+ $this->updatePushLogWithTimingInformation(
+ $this->getClusterEngineLogProperty('writeWait'),
+ $this->getClusterEngineLogProperty('readWait'),
+ ($host_wait_end - $host_wait_start));
}
return $err;
}
private function executeRepositoryCommand($command) {
$repository = $this->getRepository();
$command = PhabricatorDaemon::sudoCommandAsDaemonUser($command);
$future = id(new ExecFuture('%C', $command))
->setEnv($this->getEnvironment());
return $this->newPassthruCommand()
->setIOChannel($this->getIOChannel())
->setCommandChannelFromExecFuture($future)
->execute();
}
private function updatePushLogWithTimingInformation(
$write_wait,
$read_wait,
$host_wait) {
if ($write_wait !== null) {
$write_wait = (int)(1000000 * $write_wait);
}
if ($read_wait !== null) {
$read_wait = (int)(1000000 * $read_wait);
}
if ($host_wait !== null) {
$host_wait = (int)(1000000 * $host_wait);
}
$identifier = $this->getRequestIdentifier();
$event = new PhabricatorRepositoryPushEvent();
$conn = $event->establishConnection('w');
queryfx(
$conn,
'UPDATE %T SET writeWait = %nd, readWait = %nd, hostWait = %nd
WHERE requestIdentifier = %s',
$event->getTableName(),
$write_wait,
$read_wait,
$host_wait,
$identifier);
}
}
diff --git a/src/applications/diffusion/ssh/DiffusionGitSSHWorkflow.php b/src/applications/diffusion/ssh/DiffusionGitSSHWorkflow.php
index d8d0116017..292741e34d 100644
--- a/src/applications/diffusion/ssh/DiffusionGitSSHWorkflow.php
+++ b/src/applications/diffusion/ssh/DiffusionGitSSHWorkflow.php
@@ -1,149 +1,261 @@
<?php
abstract class DiffusionGitSSHWorkflow
extends DiffusionSSHWorkflow
implements DiffusionRepositoryClusterEngineLogInterface {
private $engineLogProperties = array();
private $protocolLog;
private $wireProtocol;
private $ioBytesRead = 0;
private $ioBytesWritten = 0;
+ private $requestAttempts = 0;
+ private $requestFailures = 0;
protected function writeError($message) {
// Git assumes we'll add our own newlines.
return parent::writeError($message."\n");
}
public function writeClusterEngineLogMessage($message) {
parent::writeError($message);
$this->getErrorChannel()->update();
}
public function writeClusterEngineLogProperty($key, $value) {
$this->engineLogProperties[$key] = $value;
}
protected function getClusterEngineLogProperty($key, $default = null) {
return idx($this->engineLogProperties, $key, $default);
}
protected function identifyRepository() {
$args = $this->getArgs();
$path = head($args->getArg('dir'));
return $this->loadRepositoryWithPath(
$path,
PhabricatorRepositoryType::REPOSITORY_TYPE_GIT);
}
protected function waitForGitClient() {
$io_channel = $this->getIOChannel();
// If we don't wait for the client to close the connection, `git` will
// consider it an early abort and fail. Sit around until Git is comfortable
// that it really received all the data.
while ($io_channel->isOpenForReading()) {
$io_channel->update();
$this->getErrorChannel()->flush();
PhutilChannel::waitForAny(array($io_channel));
}
}
protected function raiseWrongVCSException(
PhabricatorRepository $repository) {
throw new Exception(
pht(
'This repository ("%s") is not a Git repository. Use "%s" to '.
'interact with this repository.',
$repository->getDisplayName(),
$repository->getVersionControlSystem()));
}
protected function newPassthruCommand() {
return parent::newPassthruCommand()
->setWillWriteCallback(array($this, 'willWriteMessageCallback'))
->setWillReadCallback(array($this, 'willReadMessageCallback'));
}
protected function newProtocolLog($is_proxy) {
if ($is_proxy) {
return null;
}
// While developing, do this to write a full protocol log to disk:
//
// return new PhabricatorProtocolLog('/tmp/git-protocol.log');
return null;
}
final protected function getProtocolLog() {
return $this->protocolLog;
}
final protected function setProtocolLog(PhabricatorProtocolLog $log) {
$this->protocolLog = $log;
}
final protected function getWireProtocol() {
return $this->wireProtocol;
}
final protected function setWireProtocol(
DiffusionGitWireProtocol $protocol) {
$this->wireProtocol = $protocol;
return $this;
}
public function willWriteMessageCallback(
PhabricatorSSHPassthruCommand $command,
$message) {
$this->ioBytesWritten += strlen($message);
$log = $this->getProtocolLog();
if ($log) {
$log->didWriteBytes($message);
}
$protocol = $this->getWireProtocol();
if ($protocol) {
$message = $protocol->willWriteBytes($message);
}
return $message;
}
public function willReadMessageCallback(
PhabricatorSSHPassthruCommand $command,
$message) {
$log = $this->getProtocolLog();
if ($log) {
$log->didReadBytes($message);
}
$protocol = $this->getWireProtocol();
if ($protocol) {
$message = $protocol->willReadBytes($message);
}
// Note that bytes aren't counted until they're emittted by the protocol
// layer. This means the underlying command might emit bytes, but if they
// are buffered by the protocol layer they won't count as read bytes yet.
$this->ioBytesRead += strlen($message);
return $message;
}
final protected function getIOBytesRead() {
return $this->ioBytesRead;
}
final protected function getIOBytesWritten() {
return $this->ioBytesWritten;
}
+ final protected function executeRepositoryProxyOperations($for_write) {
+ $device = AlmanacKeys::getLiveDevice();
+
+ $refs = $this->getAlmanacServiceRefs($for_write);
+ $err = 1;
+
+ while (true) {
+ $ref = head($refs);
+
+ $command = $this->getProxyCommandForServiceRef($ref);
+
+ if ($device) {
+ $this->writeClusterEngineLogMessage(
+ pht(
+ "# Request received by \"%s\", forwarding to cluster ".
+ "host \"%s\".\n",
+ $device->getName(),
+ $ref->getDeviceName()));
+ }
+
+ $command = PhabricatorDaemon::sudoCommandAsDaemonUser($command);
+
+ $future = id(new ExecFuture('%C', $command))
+ ->setEnv($this->getEnvironment());
+
+ $this->didBeginRequest();
+
+ $err = $this->newPassthruCommand()
+ ->setIOChannel($this->getIOChannel())
+ ->setCommandChannelFromExecFuture($future)
+ ->execute();
+
+ // TODO: Currently, when proxying, we do not write an event log on the
+ // proxy. Perhaps we should write a "proxy log". This is not very useful
+ // for statistics or auditing, but could be useful for diagnostics.
+ // Marking the proxy logs as proxied (and recording devicePHID on all
+ // logs) would make differentiating between these use cases easier.
+
+ if (!$err) {
+ $this->waitForGitClient();
+ return $err;
+ }
+
+ // Throw away this service: the request failed and we're treating the
+ // failure as persistent, so we don't want to retry another request to
+ // the same host.
+ array_shift($refs);
+
+ $should_retry = $this->shouldRetryRequest($refs);
+ if (!$should_retry) {
+ return $err;
+ }
+
+ // If we haven't bailed out yet, we'll retry the request with the next
+ // service.
+ }
+
+ throw new Exception(pht('Reached an unreachable place.'));
+ }
+
+ private function didBeginRequest() {
+ $this->requestAttempts++;
+ return $this;
+ }
+
+ private function shouldRetryRequest(array $remaining_refs) {
+ $this->requestFailures++;
+
+ if ($this->requestFailures > $this->requestAttempts) {
+ throw new Exception(
+ pht(
+ "Workflow has recorded more failures than attempts; there is a ".
+ "missing call to \"didBeginRequest()\".\n"));
+ }
+
+ if (!$remaining_refs) {
+ $this->writeClusterEngineLogMessage(
+ pht(
+ "# All available services failed to serve the request, ".
+ "giving up.\n"));
+ return false;
+ }
+
+ $read_len = $this->getIOBytesRead();
+ if ($read_len) {
+ $this->writeClusterEngineLogMessage(
+ pht(
+ "# Client already read from service (%s bytes), unable to retry.\n",
+ new PhutilNumber($read_len)));
+ return false;
+ }
+
+ $write_len = $this->getIOBytesWritten();
+ if ($write_len) {
+ $this->writeClusterEngineLogMessage(
+ pht(
+ "# Client already wrote to service (%s bytes), unable to retry.\n",
+ new PhutilNumber($write_len)));
+ return false;
+ }
+
+ $this->writeClusterEngineLogMessage(
+ pht(
+ "# Service request failed, retrying (making attempt %s of %s).\n",
+ new PhutilNumber($this->requestAttempts + 1),
+ new PhutilNumber($this->requestAttempts + count($remaining_refs))));
+
+ return true;
+ }
+
}
diff --git a/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php b/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php
index 5c0e2588b7..57c43b5a12 100644
--- a/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php
+++ b/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php
@@ -1,207 +1,94 @@
<?php
final class DiffusionGitUploadPackSSHWorkflow
extends DiffusionGitSSHWorkflow {
- private $requestAttempts = 0;
- private $requestFailures = 0;
-
protected function didConstruct() {
$this->setName('git-upload-pack');
$this->setArguments(
array(
array(
'name' => 'dir',
'wildcard' => true,
),
));
}
protected function executeRepositoryOperations() {
$is_proxy = $this->shouldProxy();
if ($is_proxy) {
- return $this->executeRepositoryProxyOperations();
+ return $this->executeRepositoryProxyOperations($for_write = false);
}
$viewer = $this->getSSHUser();
$repository = $this->getRepository();
$device = AlmanacKeys::getLiveDevice();
$skip_sync = $this->shouldSkipReadSynchronization();
$command = csprintf('git-upload-pack -- %s', $repository->getLocalPath());
if (!$skip_sync) {
$cluster_engine = id(new DiffusionRepositoryClusterEngine())
->setViewer($viewer)
->setRepository($repository)
->setLog($this)
->synchronizeWorkingCopyBeforeRead();
if ($device) {
$this->writeClusterEngineLogMessage(
pht(
"# Cleared to fetch on cluster host \"%s\".\n",
$device->getName()));
}
}
$command = PhabricatorDaemon::sudoCommandAsDaemonUser($command);
$pull_event = $this->newPullEvent();
$future = id(new ExecFuture('%C', $command))
->setEnv($this->getEnvironment());
$log = $this->newProtocolLog($is_proxy);
if ($log) {
$this->setProtocolLog($log);
$log->didStartSession($command);
}
if (PhabricatorEnv::getEnvConfig('phabricator.show-prototypes')) {
$protocol = new DiffusionGitUploadPackWireProtocol();
if ($log) {
$protocol->setProtocolLog($log);
}
$this->setWireProtocol($protocol);
}
$err = $this->newPassthruCommand()
->setIOChannel($this->getIOChannel())
->setCommandChannelFromExecFuture($future)
->execute();
if ($log) {
$log->didEndSession();
}
if ($err) {
$pull_event
->setResultType(PhabricatorRepositoryPullEvent::RESULT_ERROR)
->setResultCode($err);
} else {
$pull_event
->setResultType(PhabricatorRepositoryPullEvent::RESULT_PULL)
->setResultCode(0);
}
$pull_event->save();
if (!$err) {
$this->waitForGitClient();
}
return $err;
}
- private function executeRepositoryProxyOperations() {
- $device = AlmanacKeys::getLiveDevice();
- $for_write = false;
-
- $refs = $this->getAlmanacServiceRefs($for_write);
- $err = 1;
-
- while (true) {
- $ref = head($refs);
-
- $command = $this->getProxyCommandForServiceRef($ref);
-
- if ($device) {
- $this->writeClusterEngineLogMessage(
- pht(
- "# Fetch received by \"%s\", forwarding to cluster host \"%s\".\n",
- $device->getName(),
- $ref->getDeviceName()));
- }
-
- $command = PhabricatorDaemon::sudoCommandAsDaemonUser($command);
-
- $future = id(new ExecFuture('%C', $command))
- ->setEnv($this->getEnvironment());
-
- $this->didBeginRequest();
-
- $err = $this->newPassthruCommand()
- ->setIOChannel($this->getIOChannel())
- ->setCommandChannelFromExecFuture($future)
- ->execute();
-
- // TODO: Currently, when proxying, we do not write an event log on the
- // proxy. Perhaps we should write a "proxy log". This is not very useful
- // for statistics or auditing, but could be useful for diagnostics.
- // Marking the proxy logs as proxied (and recording devicePHID on all
- // logs) would make differentiating between these use cases easier.
-
- if (!$err) {
- $this->waitForGitClient();
- return $err;
- }
-
- // Throw away this service: the request failed and we're treating the
- // failure as persistent, so we don't want to retry another request to
- // the same host.
- array_shift($refs);
-
- $should_retry = $this->shouldRetryRequest($refs);
- if (!$should_retry) {
- return $err;
- }
-
- // If we haven't bailed out yet, we'll retry the request with the next
- // service.
- }
-
- throw new Exception(pht('Reached an unreachable place.'));
- }
-
- private function didBeginRequest() {
- $this->requestAttempts++;
- return $this;
- }
-
- private function shouldRetryRequest(array $remaining_refs) {
- $this->requestFailures++;
-
- if ($this->requestFailures > $this->requestAttempts) {
- throw new Exception(
- pht(
- "Workflow has recorded more failures than attempts; there is a ".
- "missing call to \"didBeginRequest()\".\n"));
- }
-
- if (!$remaining_refs) {
- $this->writeClusterEngineLogMessage(
- pht(
- "# All available services failed to serve the request, ".
- "giving up.\n"));
- return false;
- }
-
- $read_len = $this->getIOBytesRead();
- if ($read_len) {
- $this->writeClusterEngineLogMessage(
- pht(
- "# Client already read from service (%s bytes), unable to retry.\n",
- new PhutilNumber($read_len)));
- return false;
- }
-
- $write_len = $this->getIOBytesWritten();
- if ($write_len) {
- $this->writeClusterEngineLogMessage(
- pht(
- "# Client already wrote to service (%s bytes), unable to retry.\n",
- new PhutilNumber($write_len)));
- return false;
- }
-
- $this->writeClusterEngineLogMessage(
- pht(
- "# Service request failed, retrying (making attempt %s of %s).\n",
- new PhutilNumber($this->requestAttempts + 1),
- new PhutilNumber($this->requestAttempts + count($remaining_refs))));
-
- return true;
- }
-
}

File Metadata

Mime Type
text/x-diff
Expires
Mon, Apr 28, 7:49 AM (1 d, 14 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
107876
Default Alt Text
(19 KB)

Event Timeline