Page MenuHomestyx hydra

No OneTemporary

diff --git a/src/applications/feed/PhabricatorFeedStoryPublisher.php b/src/applications/feed/PhabricatorFeedStoryPublisher.php
index 509ded6f52..552dc795b3 100644
--- a/src/applications/feed/PhabricatorFeedStoryPublisher.php
+++ b/src/applications/feed/PhabricatorFeedStoryPublisher.php
@@ -1,232 +1,235 @@
<?php
final class PhabricatorFeedStoryPublisher {
private $relatedPHIDs;
private $storyType;
private $storyData;
private $storyTime;
private $storyAuthorPHID;
private $primaryObjectPHID;
private $subscribedPHIDs = array();
private $mailRecipientPHIDs = array();
private $notifyAuthor;
public function setNotifyAuthor($notify_author) {
$this->notifyAuthor = $notify_author;
return $this;
}
public function getNotifyAuthor() {
return $this->notifyAuthor;
}
public function setRelatedPHIDs(array $phids) {
$this->relatedPHIDs = $phids;
return $this;
}
public function setSubscribedPHIDs(array $phids) {
$this->subscribedPHIDs = $phids;
return $this;
}
public function setPrimaryObjectPHID($phid) {
$this->primaryObjectPHID = $phid;
return $this;
}
public function setStoryType($story_type) {
$this->storyType = $story_type;
return $this;
}
public function setStoryData(array $data) {
$this->storyData = $data;
return $this;
}
public function setStoryTime($time) {
$this->storyTime = $time;
return $this;
}
public function setStoryAuthorPHID($phid) {
$this->storyAuthorPHID = $phid;
return $this;
}
public function setMailRecipientPHIDs(array $phids) {
$this->mailRecipientPHIDs = $phids;
return $this;
}
public function publish() {
$class = $this->storyType;
if (!$class) {
throw new Exception("Call setStoryType() before publishing!");
}
if (!class_exists($class)) {
throw new Exception(
"Story type must be a valid class name and must subclass ".
"PhabricatorFeedStory. ".
"'{$class}' is not a loadable class.");
}
if (!is_subclass_of($class, 'PhabricatorFeedStory')) {
throw new Exception(
"Story type must be a valid class name and must subclass ".
"PhabricatorFeedStory. ".
"'{$class}' is not a subclass of PhabricatorFeedStory.");
}
$chrono_key = $this->generateChronologicalKey();
$story = new PhabricatorFeedStoryData();
$story->setStoryType($this->storyType);
$story->setStoryData($this->storyData);
$story->setAuthorPHID((string)$this->storyAuthorPHID);
$story->setChronologicalKey($chrono_key);
$story->save();
if ($this->relatedPHIDs) {
$ref = new PhabricatorFeedStoryReference();
$sql = array();
$conn = $ref->establishConnection('w');
foreach (array_unique($this->relatedPHIDs) as $phid) {
$sql[] = qsprintf(
$conn,
'(%s, %s)',
$phid,
$chrono_key);
}
queryfx(
$conn,
'INSERT INTO %T (objectPHID, chronologicalKey) VALUES %Q',
$ref->getTableName(),
implode(', ', $sql));
}
$this->insertNotifications($chrono_key);
if (PhabricatorEnv::getEnvConfig('notification.enabled')) {
$this->sendNotification($chrono_key);
}
PhabricatorWorker::scheduleTask(
'FeedPublisherWorker',
array(
'key' => $chrono_key,
));
return $story;
}
private function insertNotifications($chrono_key) {
$subscribed_phids = $this->subscribedPHIDs;
if (!$this->notifyAuthor) {
$subscribed_phids = array_diff(
$subscribed_phids,
array($this->storyAuthorPHID));
}
if (!$subscribed_phids) {
return;
}
if (!$this->primaryObjectPHID) {
throw new Exception(
"You must call setPrimaryObjectPHID() if you setSubscribedPHIDs()!");
}
$notif = new PhabricatorFeedStoryNotification();
$sql = array();
$conn = $notif->establishConnection('w');
$will_receive_mail = array_fill_keys($this->mailRecipientPHIDs, true);
foreach (array_unique($subscribed_phids) as $user_phid) {
if (isset($will_receive_mail[$user_phid])) {
$mark_read = 1;
} else {
$mark_read = 0;
}
$sql[] = qsprintf(
$conn,
'(%s, %s, %s, %d)',
$this->primaryObjectPHID,
$user_phid,
$chrono_key,
$mark_read);
}
queryfx(
$conn,
'INSERT INTO %T
(primaryObjectPHID, userPHID, chronologicalKey, hasViewed)
VALUES %Q',
$notif->getTableName(),
implode(', ', $sql));
}
private function sendNotification($chrono_key) {
-
$data = array(
- 'key' => (string)$chrono_key,
+ 'data' => array(
+ 'key' => (string)$chrono_key,
+ 'type' => 'notification',
+ ),
+ 'subscribers' => $this->subscribedPHIDs,
);
try {
PhabricatorNotificationClient::postMessage($data);
} catch (Exception $ex) {
// Ignore, these are not critical.
}
}
/**
* We generate a unique chronological key for each story type because we want
* to be able to page through the stream with a cursor (i.e., select stories
* after ID = X) so we can efficiently perform filtering after selecting data,
* and multiple stories with the same ID make this cumbersome without putting
* a bunch of logic in the client. We could use the primary key, but that
* would prevent publishing stories which happened in the past. Since it's
* potentially useful to do that (e.g., if you're importing another data
* source) build a unique key for each story which has chronological ordering.
*
* @return string A unique, time-ordered key which identifies the story.
*/
private function generateChronologicalKey() {
// Use the epoch timestamp for the upper 32 bits of the key. Default to
// the current time if the story doesn't have an explicit timestamp.
$time = nonempty($this->storyTime, time());
// Generate a random number for the lower 32 bits of the key.
$rand = head(unpack('L', Filesystem::readRandomBytes(4)));
// On 32-bit machines, we have to get creative.
if (PHP_INT_SIZE < 8) {
// We're on a 32-bit machine.
if (function_exists('bcadd')) {
// Try to use the 'bc' extension.
return bcadd(bcmul($time, bcpow(2, 32)), $rand);
} else {
// Do the math in MySQL. TODO: If we formalize a bc dependency, get
// rid of this.
$conn_r = id(new PhabricatorFeedStoryData())->establishConnection('r');
$result = queryfx_one(
$conn_r,
'SELECT (%d << 32) + %d as N',
$time,
$rand);
return $result['N'];
}
} else {
// This is a 64 bit machine, so we can just do the math.
return ($time << 32) + $rand;
}
}
}
diff --git a/src/applications/notification/client/PhabricatorNotificationClient.php b/src/applications/notification/client/PhabricatorNotificationClient.php
index 132e76e3f8..ab7ee1c6d0 100644
--- a/src/applications/notification/client/PhabricatorNotificationClient.php
+++ b/src/applications/notification/client/PhabricatorNotificationClient.php
@@ -1,37 +1,37 @@
<?php
final class PhabricatorNotificationClient {
- const EXPECT_VERSION = 4;
+ const EXPECT_VERSION = 5;
public static function getServerStatus() {
$uri = PhabricatorEnv::getEnvConfig('notification.server-uri');
$uri = new PhutilURI($uri);
$uri->setPath('/status/');
list($body) = id(new HTTPSFuture($uri))
->setTimeout(3)
->resolvex();
$status = json_decode($body, true);
if (!is_array($status)) {
throw new Exception(
pht(
'Expected JSON response from notification server, received: %s',
$body));
}
return $status;
}
public static function postMessage(array $data) {
$server_uri = PhabricatorEnv::getEnvConfig('notification.server-uri');
id(new HTTPSFuture($server_uri, json_encode($data)))
->setMethod('POST')
->setTimeout(1)
->resolvex();
}
}
diff --git a/support/aphlict/server/aphlict_server.js b/support/aphlict/server/aphlict_server.js
index d7fc272d1c..d2212fcfa1 100644
--- a/support/aphlict/server/aphlict_server.js
+++ b/support/aphlict/server/aphlict_server.js
@@ -1,178 +1,178 @@
/**
* Notification server. Launch with:
*
* sudo node aphlict_server.js --user=aphlict
*
* You can also specify `port`, `admin`, `host` and `log`.
*/
var JX = require('./lib/javelin').JX;
JX.require('lib/AphlictFlashPolicyServer', __dirname);
JX.require('lib/AphlictListenerList', __dirname);
JX.require('lib/AphlictLog', __dirname);
var debug = new JX.AphlictLog()
.addConsole(console);
var clients = new JX.AphlictListenerList();
var config = parse_command_line_arguments(process.argv);
if (config.logfile) {
debug.addLogfile(config.logfile);
}
function parse_command_line_arguments(argv) {
var config = {
port : 22280,
admin : 22281,
host : '127.0.0.1',
user : null,
log: '/var/log/aphlict.log'
};
for (var ii = 2; ii < argv.length; ii++) {
var arg = argv[ii];
var matches = arg.match(/^--([^=]+)=(.*)$/);
if (!matches) {
throw new Error("Unknown argument '"+arg+"'!");
}
if (!(matches[1] in config)) {
throw new Error("Unknown argument '"+matches[1]+"'!");
}
config[matches[1]] = matches[2];
}
config.port = parseInt(config.port, 10);
config.admin = parseInt(config.admin, 10);
return config;
}
if (process.getuid() !== 0) {
console.log(
"ERROR: "+
"This server must be run as root because it needs to bind to privileged "+
"port 843 to start a Flash policy server. It will downgrade to run as a "+
"less-privileged user after binding if you pass a user in the command "+
"line arguments with '--user=alincoln'.");
process.exit(1);
}
var net = require('net');
var http = require('http');
var url = require('url');
process.on('uncaughtException', function (err) {
debug.log("\n<<< UNCAUGHT EXCEPTION! >>>\n\n" + err);
process.exit(1);
});
var flash_server = new JX.AphlictFlashPolicyServer()
.setDebugLog(debug)
.setAccessPort(config.port)
.start();
var send_server = net.createServer(function(socket) {
var listener = clients.addListener(socket);
debug.log('<%s> Connected from %s',
listener.getDescription(),
socket.remoteAddress);
socket.on('close', function() {
clients.removeListener(listener);
debug.log('<%s> Disconnected', listener.getDescription());
});
socket.on('timeout', function() {
debug.log('<%s> Timed Out', listener.getDescription());
});
socket.on('end', function() {
debug.log('<%s> Ended Connection', listener.getDescription());
});
socket.on('error', function (e) {
debug.log('<%s> Error: %s', listener.getDescription(), e);
});
}).listen(config.port);
var messages_out = 0;
var messages_in = 0;
var start_time = new Date().getTime();
var receive_server = http.createServer(function(request, response) {
response.writeHead(200, {'Content-Type' : 'text/plain'});
// Publishing a notification.
if (request.method == 'POST') {
var body = '';
request.on('data', function (data) {
body += data;
});
request.on('end', function () {
++messages_in;
- var data = JSON.parse(body);
- debug.log('notification: ' + JSON.stringify(data));
- broadcast(data);
+ var msg = JSON.parse(body);
+ debug.log('notification: ' + JSON.stringify(msg));
+ broadcast(msg.data);
response.end();
});
} else if (request.url == '/status/') {
request.on('data', function(data) {
// We just ignore the request data, but newer versions of Node don't
// get to 'end' if we don't process the data. See T2953.
});
request.on('end', function() {
var status = {
'uptime': (new Date().getTime() - start_time),
'clients.active': clients.getActiveListenerCount(),
'clients.total': clients.getTotalListenerCount(),
'messages.in': messages_in,
'messages.out': messages_out,
'log': config.log,
- 'version': 4
+ 'version': 5
};
response.write(JSON.stringify(status));
response.end();
});
} else {
response.statusCode = 400;
response.write('400 Bad Request');
response.end();
}
}).listen(config.admin, config.host);
function broadcast(data) {
var listeners = clients.getListeners();
for (var id in listeners) {
var listener = listeners[id];
try {
listener.writeMessage(data);
++messages_out;
debug.log('<%s> Wrote Message', listener.getDescription());
} catch (error) {
clients.removeListener(listener);
debug.log('<%s> Write Error: %s', error);
}
}
}
// If we're configured to drop permissions, get rid of them now that we've
// bound to the ports we need and opened logfiles.
if (config.user) {
process.setuid(config.user);
}
debug.log('Started Server (PID %d)', process.pid);

File Metadata

Mime Type
text/x-diff
Expires
Thu, Aug 14, 9:37 PM (2 d, 18 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
198870
Default Alt Text
(13 KB)

Event Timeline