Page MenuHomestyx hydra

No OneTemporary

diff --git a/src/applications/notification/client/PhabricatorNotificationServerRef.php b/src/applications/notification/client/PhabricatorNotificationServerRef.php
index cfdbb5e8eb..b183221eee 100644
--- a/src/applications/notification/client/PhabricatorNotificationServerRef.php
+++ b/src/applications/notification/client/PhabricatorNotificationServerRef.php
@@ -1,234 +1,234 @@
<?php
final class PhabricatorNotificationServerRef
extends Phobject {
private $type;
private $host;
private $port;
private $protocol;
private $path;
private $isDisabled;
const KEY_REFS = 'notification.refs';
public function setType($type) {
$this->type = $type;
return $this;
}
public function getType() {
return $this->type;
}
public function setHost($host) {
$this->host = $host;
return $this;
}
public function getHost() {
return $this->host;
}
public function setPort($port) {
$this->port = $port;
return $this;
}
public function getPort() {
return $this->port;
}
public function setProtocol($protocol) {
$this->protocol = $protocol;
return $this;
}
public function getProtocol() {
return $this->protocol;
}
public function setPath($path) {
$this->path = $path;
return $this;
}
public function getPath() {
return $this->path;
}
public function setIsDisabled($is_disabled) {
$this->isDisabled = $is_disabled;
return $this;
}
public function getIsDisabled() {
return $this->isDisabled;
}
public static function getLiveServers() {
$cache = PhabricatorCaches::getRequestCache();
$refs = $cache->getKey(self::KEY_REFS);
if (!$refs) {
$refs = self::newRefs();
$cache->setKey(self::KEY_REFS, $refs);
}
return $refs;
}
public static function newRefs() {
$configs = PhabricatorEnv::getEnvConfig('notification.servers');
$refs = array();
foreach ($configs as $config) {
$ref = id(new self())
->setType($config['type'])
->setHost($config['host'])
->setPort($config['port'])
->setProtocol($config['protocol'])
->setPath(idx($config, 'path'))
->setIsDisabled(idx($config, 'disabled', false));
$refs[] = $ref;
}
return $refs;
}
public static function getEnabledServers() {
$servers = self::getLiveServers();
foreach ($servers as $key => $server) {
if ($server->getIsDisabled()) {
unset($servers[$key]);
}
}
return array_values($servers);
}
public static function getEnabledAdminServers() {
$servers = self::getEnabledServers();
foreach ($servers as $key => $server) {
if (!$server->isAdminServer()) {
unset($servers[$key]);
}
}
return array_values($servers);
}
public static function getEnabledClientServers($with_protocol) {
$servers = self::getEnabledServers();
foreach ($servers as $key => $server) {
if ($server->isAdminServer()) {
unset($servers[$key]);
continue;
}
$protocol = $server->getProtocol();
if ($protocol != $with_protocol) {
unset($servers[$key]);
continue;
}
}
return array_values($servers);
}
public function isAdminServer() {
return ($this->type == 'admin');
}
public function getURI($to_path = null) {
$full_path = rtrim($this->getPath(), '/').'/'.ltrim($to_path, '/');
$uri = id(new PhutilURI('http://'.$this->getHost()))
->setProtocol($this->getProtocol())
->setPort($this->getPort())
->setPath($full_path);
$instance = PhabricatorEnv::getEnvConfig('cluster.instance');
if (strlen($instance)) {
$uri->setQueryParam('instance', $instance);
}
return $uri;
}
public function getWebsocketURI($to_path = null) {
$instance = PhabricatorEnv::getEnvConfig('cluster.instance');
if (strlen($instance)) {
- $to_path = $to_path.$instance.'/';
+ $to_path = $to_path.'~'.$instance.'/';
}
$uri = $this->getURI($to_path);
if ($this->getProtocol() == 'https') {
$uri->setProtocol('wss');
} else {
$uri->setProtocol('ws');
}
return $uri;
}
public function testClient() {
if ($this->isAdminServer()) {
throw new Exception(
pht('Unable to test client on an admin server!'));
}
$server_uri = $this->getURI();
try {
id(new HTTPSFuture($server_uri))
->setTimeout(2)
->resolvex();
} catch (HTTPFutureHTTPResponseStatus $ex) {
// This is what we expect when things are working correctly.
if ($ex->getStatusCode() == 501) {
return true;
}
throw $ex;
}
throw new Exception(
pht('Got HTTP 200, but expected HTTP 501 (WebSocket Upgrade)!'));
}
public function loadServerStatus() {
if (!$this->isAdminServer()) {
throw new Exception(
pht(
'Unable to load server status: this is not an admin server!'));
}
$server_uri = $this->getURI('/status/');
list($body) = id(new HTTPSFuture($server_uri))
->setTimeout(2)
->resolvex();
return phutil_json_decode($body);
}
public function postMessage(array $data) {
if (!$this->isAdminServer()) {
throw new Exception(
pht('Unable to post message: this is not an admin server!'));
}
$server_uri = $this->getURI('/');
$payload = phutil_json_encode($data);
id(new HTTPSFuture($server_uri, $payload))
->setMethod('POST')
->setTimeout(2)
->resolvex();
}
}
diff --git a/support/aphlict/server/lib/AphlictClientServer.js b/support/aphlict/server/lib/AphlictClientServer.js
index 4d173171c2..1d4375cbba 100644
--- a/support/aphlict/server/lib/AphlictClientServer.js
+++ b/support/aphlict/server/lib/AphlictClientServer.js
@@ -1,121 +1,136 @@
'use strict';
var JX = require('./javelin').JX;
require('./AphlictListenerList');
require('./AphlictLog');
var url = require('url');
var util = require('util');
var WebSocket = require('ws');
JX.install('AphlictClientServer', {
construct: function(server) {
server.on('request', JX.bind(this, this._onrequest));
this._server = server;
this._lists = {};
},
properties: {
logger: null,
},
members: {
_server: null,
_lists: null,
getListenerList: function(instance) {
if (!this._lists[instance]) {
this._lists[instance] = new JX.AphlictListenerList(instance);
}
return this._lists[instance];
},
log: function() {
var logger = this.getLogger();
if (!logger) {
return;
}
logger.log.apply(logger, arguments);
return this;
},
_onrequest: function(request, response) {
// The websocket code upgrades connections before they get here, so
// this only handles normal HTTP connections. We just fail them with
// a 501 response.
response.writeHead(501);
response.end('HTTP/501 Use Websockets\n');
},
+ _parseInstanceFromPath: function(path) {
+ // If there's no "~" marker in the path, it's not an instance name.
+ // Users sometimes configure nginx or Apache to proxy based on the
+ // path.
+ if (path.indexOf('~') === -1) {
+ return 'default';
+ }
+
+ var instance = path.split('~')[1];
+
+ // Remove any "/" characters.
+ instance = instance.replace(/\//g, '');
+ if (!instance.length) {
+ return 'default';
+ }
+
+ return instance;
+ },
+
listen: function() {
var self = this;
var server = this._server.listen.apply(this._server, arguments);
var wss = new WebSocket.Server({server: server});
wss.on('connection', function(ws) {
- var instance = url.parse(ws.upgradeReq.url).pathname;
-
- instance = instance.replace(/\//g, '');
- if (!instance.length) {
- instance = 'default';
- }
+ var path = url.parse(ws.upgradeReq.url).pathname;
+ var instance = self._parseInstanceFromPath(path);
var listener = self.getListenerList(instance).addListener(ws);
function log() {
self.log(
util.format('<%s>', listener.getDescription()) +
' ' +
util.format.apply(null, arguments));
}
log('Connected from %s.', ws._socket.remoteAddress);
ws.on('message', function(data) {
log('Received message: %s', data);
var message;
try {
message = JSON.parse(data);
} catch (err) {
log('Message is invalid: %s', err.message);
return;
}
switch (message.command) {
case 'subscribe':
log(
'Subscribed to: %s',
JSON.stringify(message.data));
listener.subscribe(message.data);
break;
case 'unsubscribe':
log(
'Unsubscribed from: %s',
JSON.stringify(message.data));
listener.unsubscribe(message.data);
break;
default:
log(
'Unrecognized command "%s".',
message.command || '<undefined>');
}
});
ws.on('close', function() {
self.getListenerList(instance).removeListener(listener);
log('Disconnected.');
});
});
}
}
});

File Metadata

Mime Type
text/x-diff
Expires
Tue, Mar 17, 1:42 AM (8 h, 47 s)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
964163
Default Alt Text
(9 KB)

Event Timeline