Fixes, logging

dev
gravel 1 year ago
parent 35fe0faaf0
commit fed9033cf3
Signed by: gravel
GPG Key ID: C0538F3C906B308F

@ -13,11 +13,9 @@
private bool $consumed = false;
/**
* @var \Closure():bool $is_valid_response
* @var \Closure():bool $response_filter
*/
private Closure $is_valid_response = function(CurlHandle $handle): bool {
return curl_getinfo($handle, CURLINFO_RESPONSE_CODE) < 300;
};
private Closure $response_filter;
/**
* Creates a new Fetching Couroutine instance.
@ -26,28 +24,38 @@
*/
public function __construct(\Generator $generator) {
$this->generator = $generator;
$this->response_filter = function(CurlHandle $handle): bool {
$code = curl_getinfo($handle, CURLINFO_RESPONSE_CODE);
$url = curl_getinfo($handle, CURLINFO_EFFECTIVE_URL);
log_debug("Got code $code for $url in default request arbitrator.");
return $code < 300 && $code != 0;
};
}
/**
* Create a new FetchingCoroutine to fetch the contents of a URL.
* @param string $url URL to fetch.
* @param array $curlopts Addition cURL options.
* @return \FetchingCoroutine<CurlHandle|false> Coroutine returning
*/
public static function from_url(string $url): \FetchingCoroutine {
public static function from_url(string $url, array $curlopts = []): \FetchingCoroutine {
/**
* @var Generator<int,CurlHandle,CurlHandle|false,CurlHandle|false> $oneshot
*/
$oneshot = (function() use ($url) {
yield make_curl_handle($url);
$oneshot = (function() use ($url, $curlopts) {
return yield make_curl_handle($url, $curlopts);
})();
return new FetchingCoroutine($oneshot);
}
/**
* Set callback deciding valid responses.
* @param callable $is_valid_response Predicate on a processed CurlHandle.
* @param Closure $response_filter Predicate on a processed CurlHandle.
* @return \FetchingCoroutine
*/
public function set_response_filter(callable $is_valid_response): void {
$this->is_valid_response = Closure::fromCallable($is_valid_response);
public function set_response_filter(Closure $response_filter): \FetchingCoroutine {
$this->response_filter = $response_filter;
return $this;
}
private function assert_not_consumed() {
@ -57,17 +65,17 @@
}
private function consume() {
$this->assert_not_consumed();
$this->consumed = true;
$this->generator = null;
}
/**
* Modifies the current coroutine to halt on failed fetches. Consumes current coroutine.
* Resulting coroutine will not produce further fetches.
* @return \FetchingCoroutine New FetchingCoroutine instance.
* @return \FetchingCoroutine<TReturn|null> New FetchingCoroutine instance.
*/
public function stop_on_failure(): \FetchingCoroutine {
$this->assert_not_consumed();
$this->consume();
$haltable = function () {
foreach ($this->generator as $id => $handle) {
if (!(yield $id => $handle)) {
@ -76,8 +84,7 @@
}
return $this->generator->getReturn();
};
$this->consume();
return $this->project_response_parameters(new FetchingCoroutine($haltable()));
return $this->project_coroutine_parameters(new FetchingCoroutine($haltable()));
}
/**
@ -85,10 +92,10 @@
* @param int $retries Number of additional retries made for curl handles returned.
* @param bool $tallied_retries If true, the retry count applies to the whole coroutine.
* If false, each request is afforded the given retries.
* @return \FetchingCoroutine New FetchingCoroutine instance.
* @return \FetchingCoroutine<TReturn> New FetchingCoroutine instance.
*/
public function retryable(int $retries, bool $tallied_retries = true): \FetchingCoroutine {
$this->assert_not_consumed();
$this->consume();
$coroutine = $this;
$retryable = function () use ($retries, $coroutine, $tallied_retries) {
processing_new_coroutine:
@ -96,6 +103,7 @@
$retries_current = $retries;
$id = $coroutine->current_key();
$handle = $coroutine->current_request();
$attempt_no = 1;
do {
if (!($attempt_handle = curl_copy_handle($handle))) {
log_error("Failed to clone cURL handle");
@ -103,9 +111,21 @@
goto processing_new_coroutine;
}
if ($coroutine->send(yield $id => $attempt_handle)) {
/** @var CurlHandle|false $response_handle */
$response_handle = yield $id => $attempt_handle;
$url = curl_getinfo($attempt_handle, CURLINFO_EFFECTIVE_URL);
if ($response_handle) {
$retcode = curl_getinfo($response_handle, CURLINFO_HTTP_CODE);
$url = curl_getinfo($response_handle, CURLINFO_EFFECTIVE_URL) ?? $url;
log_debug("Attempt #$attempt_no for $url returned code $retcode.");
$coroutine->send($response_handle);
goto processing_new_coroutine;
}
log_debug("Attempt #$attempt_no for $url failed or was rejected upstream.");
$attempt_no++;
} while ($retries_current-- > 0);
// failed to fetch handle
@ -118,18 +138,17 @@
}
return $coroutine->return_value();
};
$this->consume();
return $this->project_response_parameters(new FetchingCoroutine($retryable()));
return $this->project_coroutine_parameters(new FetchingCoroutine($retryable()));
}
/**
* Modifies the current coroutine to attempt HTTPS->HTTP downgrade after failure.
* Consumes current coroutine.
* @param bool $did_downgrade Set to true if a downgrade to HTTP has taken place.
* @return \FetchingCoroutine New FetchingCoroutine instance.
* @return \FetchingCoroutine<TReturn> New FetchingCoroutine instance.
*/
public function downgradeable(mixed &$did_downgrade = NULL): \FetchingCoroutine {
$this->assert_not_consumed();
$this->consume();
$coroutine = $this;
$has_downgrade_ref = func_num_args() >= 1;
if ($has_downgrade_ref) $did_downgrade = false;
@ -145,7 +164,8 @@
continue;
}
if ($has_downgrade_ref) $did_downgrade = false;
if ($has_downgrade_ref) $did_downgrade = true;
$handle = $handle_downgraded;
}
// Use HTTP
@ -153,21 +173,19 @@
}
return $coroutine->return_value();
};
$this->consume();
return $this->project_response_parameters(new FetchingCoroutine($downgradeable()));
return $this->project_coroutine_parameters(new FetchingCoroutine($downgradeable()));
}
/**
* Assign non-generator parameters to given FetchingCoroutine.
*/
private function project_response_parameters(\FetchingCoroutine $coroutine): \FetchingCoroutine {
$coroutine->set_response_filter($this->is_valid_response);
return $coroutine;
private function project_coroutine_parameters(\FetchingCoroutine $coroutine): \FetchingCoroutine {
return $coroutine->set_response_filter($this->response_filter);
}
private function is_valid_response(CurlHandle $handle) {
$is_valid_response = $this->is_valid_response;
return $is_valid_response($handle);
$response_filter = $this->response_filter;
return $response_filter($handle);
}
/**
@ -180,7 +198,7 @@
/**
* Get the cURL handle yielded at this point in the coroutine, if applicable.
*/
public function current_request(): CurlHandle {
public function current_request(): CurlHandle|null {
return $this->generator->current();
}
@ -188,6 +206,16 @@
return $this->generator->valid();
}
/**
* Invoke the current coroutine. Consumes coroutine.
* @return \Generator<int,CurlHandle,CurlHandle|false,TReturn>
*/
public function run() {
$this->consume();
// passthrough
return yield from $this->generator;
}
/**
* Get the return value of the wrapped generator object once finished.
* @return TReturn
@ -222,7 +250,7 @@
/**
* Collection of enroled transfers.
*/
private CurlMultiHandle $transfers = curl_multi_init();
private CurlMultiHandle $transfers;
/**
* Coroutines executed by runner.
@ -253,7 +281,7 @@
// curl_multi_select($transfers, timeout: 6.0);
}
$this->process_curl_activity();
} while ($curlm_status == CURLM_OK);
} while ($curlm_active_transfer && $curlm_status == CURLM_OK);
return $curlm_status;
}
@ -262,6 +290,8 @@
* Enrol initial transfers from all coroutines.
*/
private function initialize_coroutines() {
$this->transfers = curl_multi_init();
foreach ($this->coroutines as $id => $coroutine) {
$this->poll_coroutine_for_transfer($id);
}

@ -716,13 +716,24 @@
public static function poll_reachable(array $servers): array {
$reachable_servers = [];
// Synchronous for-loop for now.
foreach ($servers as $server) {
if (!($server->fetch_rooms())) continue;
if (!($server->fetch_pubkey())) continue;
$reachable_servers[] = $server;
$fetch_job = function() use ($server, &$reachable_servers): Generator {
if (!yield from $server->fetch_rooms_coroutine()) return;
if (!yield from $server->fetch_pubkey_coroutine()) return;
$reachable_servers[] = $server;
};
// passthrough hack
// all nested coroutines are allowed to do their own filtering
$coroutines[] = (new FetchingCoroutine($fetch_job()))
->set_response_filter(function(CurlHandle $handle) {
return true;
});
}
$runner = new FetchingCoroutineRunner($coroutines);
$runner->fetch_all();
return $reachable_servers;
}
@ -768,6 +779,14 @@
return "$base_url/rooms?all=1";
}
/**
* Returns the URL for the endpoint of the particular room.
*/
function get_room_api_url(string $token): string {
$base_url = $this->base_url;
return "$base_url/room/$token";
}
/**
* Returns the server's public key.
* @return string SOGS pubkey as used in the Session protocol.
@ -841,171 +860,175 @@
}
/**
* Attempts to fetch the current server's room listing.
* Downgrades the server's scheme to HTTP if necessary.
* @return array|false Associative data about rooms if successful.
* @return \Generator<int,CurlHandle,CurlHandle|false,array|null>
*/
private function fetch_room_list(): array|bool {
private function fetch_room_list_coroutine(): Generator {
global $FAST_FETCH_MODE;
$base_url = $this->base_url;
list($rooms, $downgrade) = curl_get_contents_downgrade($this->get_rooms_api_url(), retries: $FAST_FETCH_MODE ? 2 : 4);
if (!$rooms) {
log_info("Failed fetching /rooms.");
return false;
}
if ($downgrade) $this->downgrade_scheme();
$room_data = json_decode($rooms, true);
if ($room_data == null) {
log_info("Failed parsing /rooms.");
return false;
}
log_debug("Fetched /rooms successfully");
return $room_data;
}
/**
* @return \Generator<int,CurlHandle,CurlHandle|false,array|bool>
*/
private function fetch_room_list_coroutine(): Generator {
global $FAST_FETCH_MODE;
/** @var CurlHandle|false $rooms_api_response */
$rooms_api_response =
yield from FetchingCoroutine
::from_url($this->get_rooms_api_url())
->retryable($FAST_FETCH_MODE ? 2 : 4)
->downgradeable($did_downgrade)
->run();
$rooms_api_coroutine =
FetchingCoroutine::from_url($this->get_rooms_api_url())
->downgradeable($does_downgrade)
->retryable($FAST_FETCH_MODE ? 2 : 4);
$rooms_raw = $rooms_api_response ? curl_multi_getcontent($rooms_api_response) : null;
/** @var CurlHandle|false $rooms_api_handle */
// assemble & propagate request to runner
$rooms_api_handle = yield $rooms_api_coroutine->current_request();
$rooms_raw = $rooms_api_handle && curl_multi_getcontent($rooms_api_handle);
if (!$rooms_raw) {
log_info("Failed fetching /rooms.");
return false;
log_info("Failed fetching /rooms for $base_url.");
return null;
}
if ($does_downgrade) $this->downgrade_scheme();
if ($did_downgrade) $this->downgrade_scheme();
$room_data = json_decode($rooms_raw, true);
if ($room_data == null) {
log_info("Failed parsing /rooms.");
return false;
log_info("Failed parsing /rooms for $base_url.");
return null;
}
log_debug("Fetched /rooms successfully");
log_debug("Fetched /rooms successfully for $base_url");
// log_value($room_data);
return $room_data;
}
/**
* Attempts to fetch the current server's rooms using observed room names.
* Downgrades the server's scheme to HTTP if necessary.
* @return ?array Associative data about rooms if successful.
* @return Generator<int,CurlHandle,CurlHandle|false,array|null>
*/
private function fetch_room_hints(): ?array {
private function fetch_room_hints_coroutine(): Generator {
global $FAST_FETCH_MODE;
$base_url = $this->base_url;
$rooms = [];
if (empty($this->room_hints)) {
log_debug("No room hints to scan for $base_url.");
return null;
}
foreach ($this->room_hints as $token) {
log_debug("Testing room /$token.");
list($room_raw, $downgrade) = curl_get_contents_downgrade("$base_url/room/$token", retries: 2);
log_debug("Testing room /$token at $base_url.");
// FIXME: This fetches room hints sequentially per each server
// Would need to allow yielding handle arrays
$room_api_response = yield from FetchingCoroutine
::from_url($this->get_room_api_url($token))
// Afford more attempts thanks to reachability test
// TODO Move retryability to outer invocation
->retryable(retries: $FAST_FETCH_MODE ? 2 : 4)
->downgradeable($did_downgrade)
->run();
$room_raw = $room_api_response ? curl_multi_getcontent($room_api_response) : null;
if (!$room_raw) {
log_info("Room /$token not reachable.");
log_info("Room /$token not reachable at $base_url.");
continue;
}
if ($downgrade) $this->downgrade_scheme();
if ($did_downgrade) $this->downgrade_scheme();
$room_data = json_decode($room_raw, true);
if ($room_data == null) {
if (count($rooms) == 0) {
log_info("Room /$token not parsable.");
log_info("Room /$token not parsable at $base_url.");
break;
} else {
log_debug("Room /$token not parsable, continuing.");
log_debug("Room /$token not parsable at $base_url, continuing.");
continue;
}
}
$rooms[] = $room_data;
}
// Mark no rooms as failure.
if (count($rooms) == 0) {
log_debug("No room hints were valid.");
if (empty($rooms)) {
log_debug("No room hints were valid at $base_url.");
return null;
}
return $rooms;
}
/**
* Attempt to fetch rooms for tbe current server using SOGS API.
*
* @return bool True if successful, false otherwise.
*/
function fetch_rooms(): bool {
function check_reachability_coroutine() {
global $FAST_FETCH_MODE;
$this->log_details();
$base_url = $this->base_url;
// Check reachability before polling too much.
if (count($this->room_hints) >= 2) {
log_info("Checking reachability for $base_url first...");
if (!url_is_reachable($base_url, retries: $FAST_FETCH_MODE ? 1 : 4)) {
log_warning("Reachability test failed by $base_url.");
return false;
}
}
log_info("Fetching rooms for $base_url.");
$room_data = $this->fetch_room_list();
if (!$room_data) $room_data = $this->fetch_room_hints();
if ($room_data == null) {
log_warning("Could not fetch rooms for $base_url.");
log_info("Checking reachability for $base_url first...");
/** @var CurlHandle|false $response_handle */
$response_handle =
yield from FetchingCoroutine
::from_url($base_url, [CURLOPT_NOBODY => true])
->set_response_filter(function (CurlHandle $handle) {
$code = curl_getinfo($handle, CURLINFO_RESPONSE_CODE);
$url = curl_getinfo($handle, CURLINFO_EFFECTIVE_URL);
log_debug("Got $code for $url in custom filter.");
return $code != 0;
})
->retryable(retries: $FAST_FETCH_MODE ? 2 : 4)
->downgradeable($did_downgrade)
->run();
if (!$response_handle) {
log_warning("Reachability test failed by $base_url.");
return false;
}
$this->rooms = CommunityRoom::from_details_array($this, $room_data);
if ($did_downgrade) $this->downgrade_scheme();
return true;
}
function fetch_rooms_coroutine() {
/**
* @return \Generator<int,CurlHandle,CurlHandle|false,bool>
*/
function fetch_rooms_coroutine(): Generator {
$this->log_details();
$base_url = $this->base_url;
/*
// Check reachability before polling too much.
if (count($this->room_hints) >= 2) {
log_info("Checking reachability for $base_url first...");
if (!url_is_reachable($base_url, retries: $FAST_FETCH_MODE ? 1 : 4)) {
log_warning("Reachability test failed by $base_url.");
if (!yield from $this->check_reachability_coroutine()) {
return false;
}
}
*/
log_info("Fetching rooms for $base_url.");
yield from ($room_list_coroutine = $this->fetch_room_list_coroutine());
$room_data = $room_list_coroutine->getReturn();
if ($room_data === null) {
yield from ($room_hints_coroutine = $this->fetch_room_hints_coroutine());
$room_data = $room_hints_coroutine->getReturn();
}
/** @var array|null $room_data */
$room_data =
(yield from $this->fetch_room_list_coroutine()) ??
(yield from $this->fetch_room_hints_coroutine());
if ($room_data === null) {
log_warning("Could not fetch rooms for $base_url.");
return false;
}
$this->rooms = CommunityRoom::from_details_array($this, $room_data);
return true;
}
/**
* Attempt to fetch server public key by parsing SOGS HTML preview.
*
* @return bool True iff no key conflict has arised and we have a pubkey.
* @return \Generator<int,CurlHandle,CurlHandle|false,bool>
*/
function fetch_pubkey() {
function fetch_pubkey_coroutine(): Generator {
global $FAST_FETCH_MODE;
$base_url = $this->base_url;
if (empty($this->rooms)) {
log_warning("Server has no rooms to poll for public key");
log_warning("Server $base_url has no rooms to poll for public key");
return false;
}
@ -1018,7 +1041,14 @@
$preview_url = $this->rooms[0]->get_preview_url();
log_info("Fetching pubkey from $preview_url");
$room_view = curl_get_contents($preview_url, retries: $has_pubkey || $FAST_FETCH_MODE ? 1 : 5);
$room_view_response = yield from FetchingCoroutine
::from_url($preview_url)
->retryable($has_pubkey || $FAST_FETCH_MODE ? 1 : 5)
->run();
$room_view = $room_view_response
? curl_multi_getcontent($room_view_response)
: null;
if (!$room_view) {
log_debug("Failed to fetch room preview from $preview_url.");

Loading…
Cancel
Save