Page MenuHomestyx hydra

No OneTemporary

diff --git a/src/infrastructure/daemon/PhutilDaemonHandle.php b/src/infrastructure/daemon/PhutilDaemonHandle.php
index 25c517b8cf..428a64a056 100644
--- a/src/infrastructure/daemon/PhutilDaemonHandle.php
+++ b/src/infrastructure/daemon/PhutilDaemonHandle.php
@@ -1,513 +1,538 @@
<?php
final class PhutilDaemonHandle extends Phobject {
const EVENT_DID_LAUNCH = 'daemon.didLaunch';
const EVENT_DID_LOG = 'daemon.didLogMessage';
const EVENT_DID_HEARTBEAT = 'daemon.didHeartbeat';
const EVENT_WILL_GRACEFUL = 'daemon.willGraceful';
const EVENT_WILL_EXIT = 'daemon.willExit';
private $pool;
private $properties;
private $future;
private $argv;
private $restartAt;
private $busyEpoch;
- private $pid;
private $daemonID;
private $deadline;
private $heartbeat;
private $stdoutBuffer;
private $shouldRestart = true;
private $shouldShutdown;
private $hibernating = false;
private $shouldSendExitEvent = false;
private function __construct() {
// <empty>
}
public static function newFromConfig(array $config) {
PhutilTypeSpec::checkMap(
$config,
array(
'class' => 'string',
'argv' => 'optional list<string>',
'load' => 'optional list<string>',
'log' => 'optional string|null',
'down' => 'optional int',
));
$config = $config + array(
'argv' => array(),
'load' => array(),
'log' => null,
'down' => 15,
);
$daemon = new self();
$daemon->properties = $config;
$daemon->daemonID = $daemon->generateDaemonID();
return $daemon;
}
public function setDaemonPool(PhutilDaemonPool $daemon_pool) {
$this->pool = $daemon_pool;
return $this;
}
public function getDaemonPool() {
return $this->pool;
}
public function getBusyEpoch() {
return $this->busyEpoch;
}
public function getDaemonClass() {
return $this->getProperty('class');
}
private function getProperty($key) {
return idx($this->properties, $key);
}
public function setCommandLineArguments(array $arguments) {
$this->argv = $arguments;
return $this;
}
public function getCommandLineArguments() {
return $this->argv;
}
public function getDaemonArguments() {
return $this->getProperty('argv');
}
public function didLaunch() {
$this->restartAt = time();
$this->shouldSendExitEvent = true;
$this->dispatchEvent(
self::EVENT_DID_LAUNCH,
array(
'argv' => $this->getCommandLineArguments(),
'explicitArgv' => $this->getDaemonArguments(),
));
return $this;
}
public function isRunning() {
- return (bool)$this->future;
+ return (bool)$this->getFuture();
}
public function isHibernating() {
return
!$this->isRunning() &&
!$this->isDone() &&
$this->hibernating;
}
public function wakeFromHibernation() {
if (!$this->isHibernating()) {
return $this;
}
$this->logMessage(
'WAKE',
pht(
'Process is being awakened from hibernation.'));
$this->restartAt = time();
$this->update();
return $this;
}
public function isDone() {
return (!$this->shouldRestart && !$this->isRunning());
}
- public function getFuture() {
- return $this->future;
- }
-
public function update() {
if (!$this->isRunning()) {
if (!$this->shouldRestart) {
return;
}
if (!$this->restartAt || (time() < $this->restartAt)) {
return;
}
if ($this->shouldShutdown) {
return;
}
$this->startDaemonProcess();
}
- $future = $this->future;
+ $future = $this->getFuture();
$result = null;
- if ($future->isReady()) {
- $result = $future->resolve();
+ $caught = null;
+ if ($future->canResolve()) {
+ $this->future = null;
+ try {
+ $result = $future->resolve();
+ } catch (Exception $ex) {
+ $caught = $ex;
+ } catch (Throwable $ex) {
+ $caught = $ex;
+ }
}
list($stdout, $stderr) = $future->read();
$future->discardBuffers();
if (strlen($stdout)) {
$this->didReadStdout($stdout);
}
$stderr = trim($stderr);
if (strlen($stderr)) {
foreach (phutil_split_lines($stderr, false) as $line) {
$this->logMessage('STDE', $line);
}
}
- if ($result !== null) {
- list($err) = $result;
+ if ($result !== null || $caught !== null) {
- if ($err) {
- $this->logMessage('FAIL', pht('Process exited with error %s.', $err));
+ if ($caught) {
+ $message = pht(
+ 'Process failed with exception: %s',
+ $caught->getMessage());
+ $this->logMessage('FAIL', $message);
} else {
- $this->logMessage('DONE', pht('Process exited normally.'));
- }
+ list($err) = $result;
- $this->future = null;
+ if ($err) {
+ $this->logMessage('FAIL', pht('Process exited with error %s.', $err));
+ } else {
+ $this->logMessage('DONE', pht('Process exited normally.'));
+ }
+ }
if ($this->shouldShutdown) {
$this->restartAt = null;
} else {
$this->scheduleRestart();
}
}
$this->updateHeartbeatEvent();
$this->updateHangDetection();
}
private function updateHeartbeatEvent() {
if ($this->heartbeat > time()) {
return;
}
$this->heartbeat = time() + $this->getHeartbeatEventFrequency();
$this->dispatchEvent(self::EVENT_DID_HEARTBEAT);
}
private function updateHangDetection() {
if (!$this->isRunning()) {
return;
}
if (time() > $this->deadline) {
$this->logMessage('HANG', pht('Hang detected. Restarting process.'));
$this->annihilateProcessGroup();
$this->scheduleRestart();
}
}
private function scheduleRestart() {
// Wait a minimum of a few sceconds before restarting, but we may wait
// longer if the daemon has initiated hibernation.
$default_restart = time() + self::getWaitBeforeRestart();
if ($default_restart >= $this->restartAt) {
$this->restartAt = $default_restart;
}
$this->logMessage(
'WAIT',
pht(
'Waiting %s second(s) to restart process.',
new PhutilNumber($this->restartAt - time())));
}
/**
* Generate a unique ID for this daemon.
*
* @return string A unique daemon ID.
*/
private function generateDaemonID() {
return substr(getmypid().':'.Filesystem::readRandomCharacters(12), 0, 12);
}
public function getDaemonID() {
return $this->daemonID;
}
- public function getPID() {
- return $this->pid;
+ private function getFuture() {
+ return $this->future;
+ }
+
+ private function getPID() {
+ $future = $this->getFuture();
+
+ if (!$future) {
+ return null;
+ }
+
+ if (!$future->hasPID()) {
+ return null;
+ }
+
+ return $future->getPID();
}
private function getCaptureBufferSize() {
return 65535;
}
private function getRequiredHeartbeatFrequency() {
return 86400;
}
public static function getWaitBeforeRestart() {
return 5;
}
public static function getHeartbeatEventFrequency() {
return 120;
}
private function getKillDelay() {
return 3;
}
private function getDaemonCWD() {
$root = dirname(phutil_get_library_root('phabricator'));
return $root.'/scripts/daemon/exec/';
}
private function newExecFuture() {
$class = $this->getDaemonClass();
$argv = $this->getCommandLineArguments();
$buffer_size = $this->getCaptureBufferSize();
// NOTE: PHP implements proc_open() by running 'sh -c'. On most systems this
// is bash, but on Ubuntu it's dash. When you proc_open() using bash, you
// get one new process (the command you ran). When you proc_open() using
// dash, you get two new processes: the command you ran and a parent
// "dash -c" (or "sh -c") process. This means that the child process's PID
// is actually the 'dash' PID, not the command's PID. To avoid this, use
// 'exec' to replace the shell process with the real process; without this,
// the child will call posix_getppid(), be given the pid of the 'sh -c'
// process, and send it SIGUSR1 to keepalive which will terminate it
// immediately. We also won't be able to do process group management because
// the shell process won't properly posix_setsid() so the pgid of the child
// won't be meaningful.
$config = $this->properties;
unset($config['class']);
$config = phutil_json_encode($config);
return id(new ExecFuture('exec ./exec_daemon.php %s %Ls', $class, $argv))
->setCWD($this->getDaemonCWD())
->setStdoutSizeLimit($buffer_size)
->setStderrSizeLimit($buffer_size)
->write($config);
}
/**
* Dispatch an event to event listeners.
*
* @param string Event type.
* @param dict Event parameters.
* @return void
*/
private function dispatchEvent($type, array $params = array()) {
$data = array(
'id' => $this->getDaemonID(),
'daemonClass' => $this->getDaemonClass(),
'childPID' => $this->getPID(),
) + $params;
$event = new PhutilEvent($type, $data);
try {
PhutilEventEngine::dispatchEvent($event);
} catch (Exception $ex) {
phlog($ex);
}
}
private function annihilateProcessGroup() {
$pid = $this->getPID();
if ($pid) {
$pgid = posix_getpgid($pid);
if ($pgid) {
posix_kill(-$pgid, SIGTERM);
sleep($this->getKillDelay());
posix_kill(-$pgid, SIGKILL);
$this->pid = null;
}
}
}
private function startDaemonProcess() {
$this->logMessage('INIT', pht('Starting process.'));
$this->deadline = time() + $this->getRequiredHeartbeatFrequency();
$this->heartbeat = time() + self::getHeartbeatEventFrequency();
$this->stdoutBuffer = '';
$this->hibernating = false;
- $this->future = $this->newExecFuture();
- $this->future->start();
+ $future = $this->newExecFuture();
+ $this->future = $future;
- $this->pid = $this->future->getPID();
+ $pool = $this->getDaemonPool();
+ $overseer = $pool->getOverseer();
+ $overseer->addFutureToPool($future);
}
private function didReadStdout($data) {
$this->stdoutBuffer .= $data;
while (true) {
$pos = strpos($this->stdoutBuffer, "\n");
if ($pos === false) {
break;
}
$message = substr($this->stdoutBuffer, 0, $pos);
$this->stdoutBuffer = substr($this->stdoutBuffer, $pos + 1);
try {
$structure = phutil_json_decode($message);
} catch (PhutilJSONParserException $ex) {
$structure = array();
}
switch (idx($structure, 0)) {
case PhutilDaemon::MESSAGETYPE_STDOUT:
$this->logMessage('STDO', idx($structure, 1));
break;
case PhutilDaemon::MESSAGETYPE_HEARTBEAT:
$this->deadline = time() + $this->getRequiredHeartbeatFrequency();
break;
case PhutilDaemon::MESSAGETYPE_BUSY:
if (!$this->busyEpoch) {
$this->busyEpoch = time();
}
break;
case PhutilDaemon::MESSAGETYPE_IDLE:
$this->busyEpoch = null;
break;
case PhutilDaemon::MESSAGETYPE_DOWN:
// The daemon is exiting because it doesn't have enough work and it
// is trying to scale the pool down. We should not restart it.
$this->shouldRestart = false;
$this->shouldShutdown = true;
break;
case PhutilDaemon::MESSAGETYPE_HIBERNATE:
$config = idx($structure, 1);
$duration = (int)idx($config, 'duration', 0);
$this->restartAt = time() + $duration;
$this->hibernating = true;
$this->busyEpoch = null;
$this->logMessage(
'ZZZZ',
pht(
'Process is preparing to hibernate for %s second(s).',
new PhutilNumber($duration)));
break;
default:
// If we can't parse this or it isn't a message we understand, just
// emit the raw message.
$this->logMessage('STDO', pht('<Malformed> %s', $message));
break;
}
}
}
public function didReceiveNotifySignal($signo) {
$pid = $this->getPID();
if ($pid) {
posix_kill($pid, $signo);
}
}
public function didReceiveReloadSignal($signo) {
$signame = phutil_get_signal_name($signo);
if ($signame) {
$sigmsg = pht(
'Reloading in response to signal %d (%s).',
$signo,
$signame);
} else {
$sigmsg = pht(
'Reloading in response to signal %d.',
$signo);
}
$this->logMessage('RELO', $sigmsg, $signo);
// This signal means "stop the current process gracefully, then launch
// a new identical process once it exits". This can be used to update
// daemons after code changes (the new processes will run the new code)
// without aborting any running tasks.
// We SIGINT the daemon but don't set the shutdown flag, so it will
// naturally be restarted after it exits, as though it had exited after an
// unhandled exception.
$pid = $this->getPID();
if ($pid) {
posix_kill($pid, SIGINT);
}
}
public function didReceiveGracefulSignal($signo) {
$this->shouldShutdown = true;
$this->shouldRestart = false;
$signame = phutil_get_signal_name($signo);
if ($signame) {
$sigmsg = pht(
'Graceful shutdown in response to signal %d (%s).',
$signo,
$signame);
} else {
$sigmsg = pht(
'Graceful shutdown in response to signal %d.',
$signo);
}
$this->logMessage('DONE', $sigmsg, $signo);
$pid = $this->getPID();
if ($pid) {
posix_kill($pid, SIGINT);
}
}
public function didReceiveTerminateSignal($signo) {
$this->shouldShutdown = true;
$this->shouldRestart = false;
$signame = phutil_get_signal_name($signo);
if ($signame) {
$sigmsg = pht(
'Shutting down in response to signal %s (%s).',
$signo,
$signame);
} else {
$sigmsg = pht('Shutting down in response to signal %s.', $signo);
}
$this->logMessage('EXIT', $sigmsg, $signo);
$this->annihilateProcessGroup();
}
private function logMessage($type, $message, $context = null) {
$this->getDaemonPool()->logMessage($type, $message, $context);
$this->dispatchEvent(
self::EVENT_DID_LOG,
array(
'type' => $type,
'message' => $message,
'context' => $context,
));
}
public function didExit() {
if ($this->shouldSendExitEvent) {
$this->dispatchEvent(self::EVENT_WILL_EXIT);
$this->shouldSendExitEvent = false;
}
return $this;
}
}
diff --git a/src/infrastructure/daemon/PhutilDaemonOverseer.php b/src/infrastructure/daemon/PhutilDaemonOverseer.php
index 77f6f6a20a..974a9cf3ed 100644
--- a/src/infrastructure/daemon/PhutilDaemonOverseer.php
+++ b/src/infrastructure/daemon/PhutilDaemonOverseer.php
@@ -1,410 +1,411 @@
<?php
/**
* Oversees a daemon and restarts it if it fails.
*
* @task signals Signal Handling
*/
final class PhutilDaemonOverseer extends Phobject {
private $argv;
private static $instance;
private $config;
private $pools = array();
private $traceMode;
private $traceMemory;
private $daemonize;
private $log;
private $libraries = array();
private $modules = array();
private $verbose;
private $startEpoch;
private $autoscale = array();
private $autoscaleConfig = array();
const SIGNAL_NOTIFY = 'signal/notify';
const SIGNAL_RELOAD = 'signal/reload';
const SIGNAL_GRACEFUL = 'signal/graceful';
const SIGNAL_TERMINATE = 'signal/terminate';
private $err = 0;
private $inAbruptShutdown;
private $inGracefulShutdown;
private $futurePool;
public function __construct(array $argv) {
PhutilServiceProfiler::getInstance()->enableDiscardMode();
$args = new PhutilArgumentParser($argv);
$args->setTagline(pht('daemon overseer'));
$args->setSynopsis(<<<EOHELP
**launch_daemon.php** [__options__] __daemon__
Launch and oversee an instance of __daemon__.
EOHELP
);
$args->parseStandardArguments();
$args->parse(
array(
array(
'name' => 'trace-memory',
'help' => pht('Enable debug memory tracing.'),
),
array(
'name' => 'verbose',
'help' => pht('Enable verbose activity logging.'),
),
array(
'name' => 'label',
'short' => 'l',
'param' => 'label',
'help' => pht(
'Optional process label. Makes "%s" nicer, no behavioral effects.',
'ps'),
),
));
$argv = array();
if ($args->getArg('trace')) {
$this->traceMode = true;
$argv[] = '--trace';
}
if ($args->getArg('trace-memory')) {
$this->traceMode = true;
$this->traceMemory = true;
$argv[] = '--trace-memory';
}
$verbose = $args->getArg('verbose');
if ($verbose) {
$this->verbose = true;
$argv[] = '--verbose';
}
$label = $args->getArg('label');
if ($label) {
$argv[] = '-l';
$argv[] = $label;
}
$this->argv = $argv;
if (function_exists('posix_isatty') && posix_isatty(STDIN)) {
fprintf(STDERR, pht('Reading daemon configuration from stdin...')."\n");
}
$config = @file_get_contents('php://stdin');
$config = id(new PhutilJSONParser())->parse($config);
$this->libraries = idx($config, 'load');
$this->log = idx($config, 'log');
$this->daemonize = idx($config, 'daemonize');
$this->config = $config;
if (self::$instance) {
throw new Exception(
pht('You may not instantiate more than one Overseer per process.'));
}
self::$instance = $this;
$this->startEpoch = time();
if (!idx($config, 'daemons')) {
throw new PhutilArgumentUsageException(
pht('You must specify at least one daemon to start!'));
}
if ($this->log) {
// NOTE: Now that we're committed to daemonizing, redirect the error
// log if we have a `--log` parameter. Do this at the last moment
// so as many setup issues as possible are surfaced.
ini_set('error_log', $this->log);
}
if ($this->daemonize) {
// We need to get rid of these or the daemon will hang when we TERM it
// waiting for something to read the buffers. TODO: Learn how unix works.
fclose(STDOUT);
fclose(STDERR);
ob_start();
$pid = pcntl_fork();
if ($pid === -1) {
throw new Exception(pht('Unable to fork!'));
} else if ($pid) {
exit(0);
}
$sid = posix_setsid();
if ($sid <= 0) {
throw new Exception(pht('Failed to create new process session!'));
}
}
$this->logMessage(
'OVER',
pht(
'Started new daemon overseer (with PID "%s").',
getmypid()));
$this->modules = PhutilDaemonOverseerModule::getAllModules();
$this->installSignalHandlers();
}
public function addLibrary($library) {
$this->libraries[] = $library;
return $this;
}
public function run() {
$this->createDaemonPools();
$future_pool = $this->getFuturePool();
while (true) {
if ($this->shouldReloadDaemons()) {
$this->didReceiveSignal(SIGHUP);
}
$running_pools = false;
foreach ($this->getDaemonPools() as $pool) {
$pool->updatePool();
if (!$this->shouldShutdown()) {
if ($pool->isHibernating()) {
if ($this->shouldWakePool($pool)) {
$pool->wakeFromHibernation();
}
}
}
- foreach ($pool->getFutures() as $future) {
- $future_pool->addFuture($future);
- }
-
if ($pool->getDaemons()) {
$running_pools = true;
}
}
$this->updateMemory();
if ($future_pool->hasFutures()) {
$future_pool->resolve();
} else {
if (!$this->shouldShutdown()) {
sleep(1);
}
}
if (!$future_pool->hasFutures() && !$running_pools) {
if ($this->shouldShutdown()) {
break;
}
}
}
exit($this->err);
}
+ public function addFutureToPool(Future $future) {
+ $this->getFuturePool()->addFuture($future);
+ return $this;
+ }
+
private function getFuturePool() {
if (!$this->futurePool) {
$pool = new FuturePool();
// TODO: This only wakes if any daemons actually exit, or 1 second
// passes. It would be a bit cleaner to wait on any I/O, but Futures
// currently can't do that.
$pool->getIteratorTemplate()
->setUpdateInterval(1);
$this->futurePool = $pool;
}
return $this->futurePool;
}
private function createDaemonPools() {
$configs = $this->config['daemons'];
$forced_options = array(
'load' => $this->libraries,
'log' => $this->log,
);
foreach ($configs as $config) {
$config = $forced_options + $config;
$pool = PhutilDaemonPool::newFromConfig($config)
->setOverseer($this)
->setCommandLineArguments($this->argv);
$this->pools[] = $pool;
}
}
private function getDaemonPools() {
return $this->pools;
}
private function updateMemory() {
if (!$this->traceMemory) {
return;
}
$this->logMessage(
'RAMS',
pht(
'Overseer Memory Usage: %s KB',
new PhutilNumber(memory_get_usage() / 1024, 1)));
}
public function logMessage($type, $message, $context = null) {
$always_log = false;
switch ($type) {
case 'OVER':
case 'SGNL':
case 'PIDF':
$always_log = true;
break;
}
if ($always_log || $this->traceMode || $this->verbose) {
error_log(date('Y-m-d g:i:s A').' ['.$type.'] '.$message);
}
}
/* -( Signal Handling )---------------------------------------------------- */
/**
* @task signals
*/
private function installSignalHandlers() {
$signals = array(
SIGUSR2,
SIGHUP,
SIGINT,
SIGTERM,
);
foreach ($signals as $signal) {
pcntl_signal($signal, array($this, 'didReceiveSignal'));
}
}
/**
* @task signals
*/
public function didReceiveSignal($signo) {
$this->logMessage(
'SGNL',
pht(
'Overseer ("%d") received signal %d ("%s").',
getmypid(),
$signo,
phutil_get_signal_name($signo)));
switch ($signo) {
case SIGUSR2:
$signal_type = self::SIGNAL_NOTIFY;
break;
case SIGHUP:
$signal_type = self::SIGNAL_RELOAD;
break;
case SIGINT:
// If we receive SIGINT more than once, interpret it like SIGTERM.
if ($this->inGracefulShutdown) {
return $this->didReceiveSignal(SIGTERM);
}
$this->inGracefulShutdown = true;
$signal_type = self::SIGNAL_GRACEFUL;
break;
case SIGTERM:
// If we receive SIGTERM more than once, terminate abruptly.
$this->err = 128 + $signo;
if ($this->inAbruptShutdown) {
exit($this->err);
}
$this->inAbruptShutdown = true;
$signal_type = self::SIGNAL_TERMINATE;
break;
default:
throw new Exception(
pht(
'Signal handler called with unknown signal type ("%d")!',
$signo));
}
foreach ($this->getDaemonPools() as $pool) {
$pool->didReceiveSignal($signal_type, $signo);
}
}
/* -( Daemon Modules )----------------------------------------------------- */
private function getModules() {
return $this->modules;
}
private function shouldReloadDaemons() {
$modules = $this->getModules();
$should_reload = false;
foreach ($modules as $module) {
try {
// NOTE: Even if one module tells us to reload, we call the method on
// each module anyway to make calls a little more predictable.
if ($module->shouldReloadDaemons()) {
$this->logMessage(
'RELO',
pht(
'Reloading daemons (triggered by overseer module "%s").',
get_class($module)));
$should_reload = true;
}
} catch (Exception $ex) {
phlog($ex);
}
}
return $should_reload;
}
private function shouldWakePool(PhutilDaemonPool $pool) {
$modules = $this->getModules();
$should_wake = false;
foreach ($modules as $module) {
try {
if ($module->shouldWakePool($pool)) {
$this->logMessage(
'WAKE',
pht(
'Waking pool "%s" (triggered by overseer module "%s").',
$pool->getPoolLabel(),
get_class($module)));
$should_wake = true;
}
} catch (Exception $ex) {
phlog($ex);
}
}
return $should_wake;
}
private function shouldShutdown() {
return $this->inGracefulShutdown || $this->inAbruptShutdown;
}
}
diff --git a/src/infrastructure/daemon/PhutilDaemonPool.php b/src/infrastructure/daemon/PhutilDaemonPool.php
index 50b22289a8..9cdc15228e 100644
--- a/src/infrastructure/daemon/PhutilDaemonPool.php
+++ b/src/infrastructure/daemon/PhutilDaemonPool.php
@@ -1,360 +1,348 @@
<?php
final class PhutilDaemonPool extends Phobject {
private $properties = array();
private $commandLineArguments;
private $overseer;
private $daemons = array();
private $argv;
private $lastAutoscaleUpdate;
private $inShutdown;
private function __construct() {
// <empty>
}
public static function newFromConfig(array $config) {
PhutilTypeSpec::checkMap(
$config,
array(
'class' => 'string',
'label' => 'string',
'argv' => 'optional list<string>',
'load' => 'optional list<string>',
'log' => 'optional string|null',
'pool' => 'optional int',
'up' => 'optional int',
'down' => 'optional int',
'reserve' => 'optional int|float',
));
$config = $config + array(
'argv' => array(),
'load' => array(),
'log' => null,
'pool' => 1,
'up' => 2,
'down' => 15,
'reserve' => 0,
);
$pool = new self();
$pool->properties = $config;
return $pool;
}
public function setOverseer(PhutilDaemonOverseer $overseer) {
$this->overseer = $overseer;
return $this;
}
public function getOverseer() {
return $this->overseer;
}
public function setCommandLineArguments(array $arguments) {
$this->commandLineArguments = $arguments;
return $this;
}
public function getCommandLineArguments() {
return $this->commandLineArguments;
}
private function shouldShutdown() {
return $this->inShutdown;
}
private function newDaemon() {
$config = $this->properties;
if (count($this->daemons)) {
$down_duration = $this->getPoolScaledownDuration();
} else {
// TODO: For now, never scale pools down to 0.
$down_duration = 0;
}
$forced_config = array(
'down' => $down_duration,
);
$config = $forced_config + $config;
$config = array_select_keys(
$config,
array(
'class',
'log',
'load',
'argv',
'down',
));
$daemon = PhutilDaemonHandle::newFromConfig($config)
->setDaemonPool($this)
->setCommandLineArguments($this->getCommandLineArguments());
$daemon_id = $daemon->getDaemonID();
$this->daemons[$daemon_id] = $daemon;
$daemon->didLaunch();
return $daemon;
}
public function getDaemons() {
return $this->daemons;
}
- public function getFutures() {
- $futures = array();
- foreach ($this->getDaemons() as $daemon) {
- $future = $daemon->getFuture();
- if ($future) {
- $futures[] = $future;
- }
- }
-
- return $futures;
- }
-
public function didReceiveSignal($signal, $signo) {
switch ($signal) {
case PhutilDaemonOverseer::SIGNAL_GRACEFUL:
case PhutilDaemonOverseer::SIGNAL_TERMINATE:
$this->inShutdown = true;
break;
}
foreach ($this->getDaemons() as $daemon) {
switch ($signal) {
case PhutilDaemonOverseer::SIGNAL_NOTIFY:
$daemon->didReceiveNotifySignal($signo);
break;
case PhutilDaemonOverseer::SIGNAL_RELOAD:
$daemon->didReceiveReloadSignal($signo);
break;
case PhutilDaemonOverseer::SIGNAL_GRACEFUL:
$daemon->didReceiveGracefulSignal($signo);
break;
case PhutilDaemonOverseer::SIGNAL_TERMINATE:
$daemon->didReceiveTerminateSignal($signo);
break;
default:
throw new Exception(
pht(
'Unknown signal "%s" ("%d").',
$signal,
$signo));
}
}
}
public function getPoolLabel() {
return $this->getPoolProperty('label');
}
public function getPoolMaximumSize() {
return $this->getPoolProperty('pool');
}
public function getPoolScaleupDuration() {
return $this->getPoolProperty('up');
}
public function getPoolScaledownDuration() {
return $this->getPoolProperty('down');
}
public function getPoolMemoryReserve() {
return $this->getPoolProperty('reserve');
}
public function getPoolDaemonClass() {
return $this->getPoolProperty('class');
}
private function getPoolProperty($key) {
return idx($this->properties, $key);
}
public function updatePool() {
$daemons = $this->getDaemons();
foreach ($daemons as $key => $daemon) {
$daemon->update();
if ($daemon->isDone()) {
$daemon->didExit();
unset($this->daemons[$key]);
if ($this->shouldShutdown()) {
$this->logMessage(
'DOWN',
pht(
'Pool "%s" is exiting, with %s daemon(s) remaining.',
$this->getPoolLabel(),
new PhutilNumber(count($this->daemons))));
} else {
$this->logMessage(
'POOL',
pht(
'Autoscale pool "%s" scaled down to %s daemon(s).',
$this->getPoolLabel(),
new PhutilNumber(count($this->daemons))));
}
}
}
$this->updateAutoscale();
}
public function isHibernating() {
foreach ($this->getDaemons() as $daemon) {
if (!$daemon->isHibernating()) {
return false;
}
}
return true;
}
public function wakeFromHibernation() {
if (!$this->isHibernating()) {
return $this;
}
$this->logMessage(
'WAKE',
pht(
'Autoscale pool "%s" is being awakened from hibernation.',
$this->getPoolLabel()));
$did_wake_daemons = false;
foreach ($this->getDaemons() as $daemon) {
if ($daemon->isHibernating()) {
$daemon->wakeFromHibernation();
$did_wake_daemons = true;
}
}
if (!$did_wake_daemons) {
// TODO: Pools currently can't scale down to 0 daemons, but we should
// scale up immediately here once they can.
}
$this->updatePool();
return $this;
}
private function updateAutoscale() {
if ($this->shouldShutdown()) {
return;
}
// Don't try to autoscale more than once per second. This mostly stops the
// logs from getting flooded in verbose mode.
$now = time();
if ($this->lastAutoscaleUpdate >= $now) {
return;
}
$this->lastAutoscaleUpdate = $now;
$daemons = $this->getDaemons();
// If this pool is already at the maximum size, we can't launch any new
// daemons.
$max_size = $this->getPoolMaximumSize();
if (count($daemons) >= $max_size) {
$this->logMessage(
'POOL',
pht(
'Autoscale pool "%s" already at maximum size (%s of %s).',
$this->getPoolLabel(),
new PhutilNumber(count($daemons)),
new PhutilNumber($max_size)));
return;
}
$scaleup_duration = $this->getPoolScaleupDuration();
foreach ($daemons as $daemon) {
$busy_epoch = $daemon->getBusyEpoch();
// If any daemons haven't started work yet, don't scale the pool up.
if (!$busy_epoch) {
$this->logMessage(
'POOL',
pht(
'Autoscale pool "%s" has an idle daemon, declining to scale.',
$this->getPoolLabel()));
return;
}
// If any daemons started work very recently, wait a little while
// to scale the pool up.
$busy_for = ($now - $busy_epoch);
if ($busy_for < $scaleup_duration) {
$this->logMessage(
'POOL',
pht(
'Autoscale pool "%s" has not been busy long enough to scale up '.
'(busy for %s of %s seconds).',
$this->getPoolLabel(),
new PhutilNumber($busy_for),
new PhutilNumber($scaleup_duration)));
return;
}
}
// If we have a configured memory reserve for this pool, it tells us that
// we should not scale up unless there's at least that much memory left
// on the system (for example, a reserve of 0.25 means that 25% of system
// memory must be free to autoscale).
// Note that the first daemon is exempt: we'll always launch at least one
// daemon, regardless of any memory reservation.
if (count($daemons)) {
$reserve = $this->getPoolMemoryReserve();
if ($reserve) {
// On some systems this may be slightly more expensive than other
// checks, so we only do it once we're prepared to scale up.
$memory = PhutilSystem::getSystemMemoryInformation();
$free_ratio = ($memory['free'] / $memory['total']);
// If we don't have enough free memory, don't scale.
if ($free_ratio <= $reserve) {
$this->logMessage(
'POOL',
pht(
'Autoscale pool "%s" does not have enough free memory to '.
'scale up (%s free of %s reserved).',
$this->getPoolLabel(),
new PhutilNumber($free_ratio, 3),
new PhutilNumber($reserve, 3)));
return;
}
}
}
$this->logMessage(
'AUTO',
pht(
'Scaling pool "%s" up to %s daemon(s).',
$this->getPoolLabel(),
new PhutilNumber(count($daemons) + 1)));
$this->newDaemon();
}
public function logMessage($type, $message, $context = null) {
return $this->getOverseer()->logMessage($type, $message, $context);
}
}

File Metadata

Mime Type
text/x-diff
Expires
Tue, Nov 26, 12:42 PM (1 d, 14 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
1199
Default Alt Text
(35 KB)

Event Timeline