diff --git a/php/utils/fetching-coroutines.php b/php/utils/fetching-coroutines.php index af9e8d6..bbc5576 100644 --- a/php/utils/fetching-coroutines.php +++ b/php/utils/fetching-coroutines.php @@ -13,11 +13,9 @@ private bool $consumed = false; /** - * @var \Closure():bool $is_valid_response + * @var \Closure():bool $response_filter */ - private Closure $is_valid_response = function(CurlHandle $handle): bool { - return curl_getinfo($handle, CURLINFO_RESPONSE_CODE) < 300; - }; + private Closure $response_filter; /** * Creates a new Fetching Couroutine instance. @@ -26,28 +24,38 @@ */ public function __construct(\Generator $generator) { $this->generator = $generator; + $this->response_filter = function(CurlHandle $handle): bool { + $code = curl_getinfo($handle, CURLINFO_RESPONSE_CODE); + $url = curl_getinfo($handle, CURLINFO_EFFECTIVE_URL); + log_debug("Got code $code for $url in default request arbitrator."); + return $code < 300 && $code != 0; + }; } /** * Create a new FetchingCoroutine to fetch the contents of a URL. * @param string $url URL to fetch. + * @param array $curlopts Addition cURL options. + * @return \FetchingCoroutine Coroutine returning */ - public static function from_url(string $url): \FetchingCoroutine { + public static function from_url(string $url, array $curlopts = []): \FetchingCoroutine { /** * @var Generator $oneshot */ - $oneshot = (function() use ($url) { - yield make_curl_handle($url); + $oneshot = (function() use ($url, $curlopts) { + return yield make_curl_handle($url, $curlopts); })(); return new FetchingCoroutine($oneshot); } /** * Set callback deciding valid responses. - * @param callable $is_valid_response Predicate on a processed CurlHandle. + * @param Closure $response_filter Predicate on a processed CurlHandle. + * @return \FetchingCoroutine */ - public function set_response_filter(callable $is_valid_response): void { - $this->is_valid_response = Closure::fromCallable($is_valid_response); + public function set_response_filter(Closure $response_filter): \FetchingCoroutine { + $this->response_filter = $response_filter; + return $this; } private function assert_not_consumed() { @@ -57,17 +65,17 @@ } private function consume() { + $this->assert_not_consumed(); $this->consumed = true; - $this->generator = null; } /** * Modifies the current coroutine to halt on failed fetches. Consumes current coroutine. * Resulting coroutine will not produce further fetches. - * @return \FetchingCoroutine New FetchingCoroutine instance. + * @return \FetchingCoroutine New FetchingCoroutine instance. */ public function stop_on_failure(): \FetchingCoroutine { - $this->assert_not_consumed(); + $this->consume(); $haltable = function () { foreach ($this->generator as $id => $handle) { if (!(yield $id => $handle)) { @@ -76,8 +84,7 @@ } return $this->generator->getReturn(); }; - $this->consume(); - return $this->project_response_parameters(new FetchingCoroutine($haltable())); + return $this->project_coroutine_parameters(new FetchingCoroutine($haltable())); } /** @@ -85,10 +92,10 @@ * @param int $retries Number of additional retries made for curl handles returned. * @param bool $tallied_retries If true, the retry count applies to the whole coroutine. * If false, each request is afforded the given retries. - * @return \FetchingCoroutine New FetchingCoroutine instance. + * @return \FetchingCoroutine New FetchingCoroutine instance. */ public function retryable(int $retries, bool $tallied_retries = true): \FetchingCoroutine { - $this->assert_not_consumed(); + $this->consume(); $coroutine = $this; $retryable = function () use ($retries, $coroutine, $tallied_retries) { processing_new_coroutine: @@ -96,6 +103,7 @@ $retries_current = $retries; $id = $coroutine->current_key(); $handle = $coroutine->current_request(); + $attempt_no = 1; do { if (!($attempt_handle = curl_copy_handle($handle))) { log_error("Failed to clone cURL handle"); @@ -103,9 +111,21 @@ goto processing_new_coroutine; } - if ($coroutine->send(yield $id => $attempt_handle)) { + /** @var CurlHandle|false $response_handle */ + $response_handle = yield $id => $attempt_handle; + $url = curl_getinfo($attempt_handle, CURLINFO_EFFECTIVE_URL); + + if ($response_handle) { + $retcode = curl_getinfo($response_handle, CURLINFO_HTTP_CODE); + $url = curl_getinfo($response_handle, CURLINFO_EFFECTIVE_URL) ?? $url; + log_debug("Attempt #$attempt_no for $url returned code $retcode."); + $coroutine->send($response_handle); goto processing_new_coroutine; } + + log_debug("Attempt #$attempt_no for $url failed or was rejected upstream."); + + $attempt_no++; } while ($retries_current-- > 0); // failed to fetch handle @@ -118,18 +138,17 @@ } return $coroutine->return_value(); }; - $this->consume(); - return $this->project_response_parameters(new FetchingCoroutine($retryable())); + return $this->project_coroutine_parameters(new FetchingCoroutine($retryable())); } /** * Modifies the current coroutine to attempt HTTPS->HTTP downgrade after failure. * Consumes current coroutine. * @param bool $did_downgrade Set to true if a downgrade to HTTP has taken place. - * @return \FetchingCoroutine New FetchingCoroutine instance. + * @return \FetchingCoroutine New FetchingCoroutine instance. */ public function downgradeable(mixed &$did_downgrade = NULL): \FetchingCoroutine { - $this->assert_not_consumed(); + $this->consume(); $coroutine = $this; $has_downgrade_ref = func_num_args() >= 1; if ($has_downgrade_ref) $did_downgrade = false; @@ -145,7 +164,8 @@ continue; } - if ($has_downgrade_ref) $did_downgrade = false; + if ($has_downgrade_ref) $did_downgrade = true; + $handle = $handle_downgraded; } // Use HTTP @@ -153,21 +173,19 @@ } return $coroutine->return_value(); }; - $this->consume(); - return $this->project_response_parameters(new FetchingCoroutine($downgradeable())); + return $this->project_coroutine_parameters(new FetchingCoroutine($downgradeable())); } /** * Assign non-generator parameters to given FetchingCoroutine. */ - private function project_response_parameters(\FetchingCoroutine $coroutine): \FetchingCoroutine { - $coroutine->set_response_filter($this->is_valid_response); - return $coroutine; + private function project_coroutine_parameters(\FetchingCoroutine $coroutine): \FetchingCoroutine { + return $coroutine->set_response_filter($this->response_filter); } private function is_valid_response(CurlHandle $handle) { - $is_valid_response = $this->is_valid_response; - return $is_valid_response($handle); + $response_filter = $this->response_filter; + return $response_filter($handle); } /** @@ -180,7 +198,7 @@ /** * Get the cURL handle yielded at this point in the coroutine, if applicable. */ - public function current_request(): CurlHandle { + public function current_request(): CurlHandle|null { return $this->generator->current(); } @@ -188,6 +206,16 @@ return $this->generator->valid(); } + /** + * Invoke the current coroutine. Consumes coroutine. + * @return \Generator + */ + public function run() { + $this->consume(); + // passthrough + return yield from $this->generator; + } + /** * Get the return value of the wrapped generator object once finished. * @return TReturn @@ -222,7 +250,7 @@ /** * Collection of enroled transfers. */ - private CurlMultiHandle $transfers = curl_multi_init(); + private CurlMultiHandle $transfers; /** * Coroutines executed by runner. @@ -253,7 +281,7 @@ // curl_multi_select($transfers, timeout: 6.0); } $this->process_curl_activity(); - } while ($curlm_status == CURLM_OK); + } while ($curlm_active_transfer && $curlm_status == CURLM_OK); return $curlm_status; } @@ -262,6 +290,8 @@ * Enrol initial transfers from all coroutines. */ private function initialize_coroutines() { + $this->transfers = curl_multi_init(); + foreach ($this->coroutines as $id => $coroutine) { $this->poll_coroutine_for_transfer($id); } diff --git a/php/utils/servers-rooms.php b/php/utils/servers-rooms.php index 68e242f..1383c81 100644 --- a/php/utils/servers-rooms.php +++ b/php/utils/servers-rooms.php @@ -716,13 +716,24 @@ public static function poll_reachable(array $servers): array { $reachable_servers = []; - // Synchronous for-loop for now. foreach ($servers as $server) { - if (!($server->fetch_rooms())) continue; - if (!($server->fetch_pubkey())) continue; - $reachable_servers[] = $server; + $fetch_job = function() use ($server, &$reachable_servers): Generator { + if (!yield from $server->fetch_rooms_coroutine()) return; + if (!yield from $server->fetch_pubkey_coroutine()) return; + $reachable_servers[] = $server; + }; + // passthrough hack + // all nested coroutines are allowed to do their own filtering + $coroutines[] = (new FetchingCoroutine($fetch_job())) + ->set_response_filter(function(CurlHandle $handle) { + return true; + }); } + $runner = new FetchingCoroutineRunner($coroutines); + + $runner->fetch_all(); + return $reachable_servers; } @@ -768,6 +779,14 @@ return "$base_url/rooms?all=1"; } + /** + * Returns the URL for the endpoint of the particular room. + */ + function get_room_api_url(string $token): string { + $base_url = $this->base_url; + return "$base_url/room/$token"; + } + /** * Returns the server's public key. * @return string SOGS pubkey as used in the Session protocol. @@ -841,171 +860,175 @@ } /** - * Attempts to fetch the current server's room listing. - * Downgrades the server's scheme to HTTP if necessary. - * @return array|false Associative data about rooms if successful. + * @return \Generator */ - private function fetch_room_list(): array|bool { + private function fetch_room_list_coroutine(): Generator { global $FAST_FETCH_MODE; $base_url = $this->base_url; - list($rooms, $downgrade) = curl_get_contents_downgrade($this->get_rooms_api_url(), retries: $FAST_FETCH_MODE ? 2 : 4); - if (!$rooms) { - log_info("Failed fetching /rooms."); - return false; - } - if ($downgrade) $this->downgrade_scheme(); - $room_data = json_decode($rooms, true); - if ($room_data == null) { - log_info("Failed parsing /rooms."); - return false; - } - log_debug("Fetched /rooms successfully"); - return $room_data; - } - /** - * @return \Generator - */ - private function fetch_room_list_coroutine(): Generator { - global $FAST_FETCH_MODE; + /** @var CurlHandle|false $rooms_api_response */ + $rooms_api_response = + yield from FetchingCoroutine + ::from_url($this->get_rooms_api_url()) + ->retryable($FAST_FETCH_MODE ? 2 : 4) + ->downgradeable($did_downgrade) + ->run(); - $rooms_api_coroutine = - FetchingCoroutine::from_url($this->get_rooms_api_url()) - ->downgradeable($does_downgrade) - ->retryable($FAST_FETCH_MODE ? 2 : 4); + $rooms_raw = $rooms_api_response ? curl_multi_getcontent($rooms_api_response) : null; - /** @var CurlHandle|false $rooms_api_handle */ - // assemble & propagate request to runner - $rooms_api_handle = yield $rooms_api_coroutine->current_request(); - $rooms_raw = $rooms_api_handle && curl_multi_getcontent($rooms_api_handle); if (!$rooms_raw) { - log_info("Failed fetching /rooms."); - return false; + log_info("Failed fetching /rooms for $base_url."); + return null; } - if ($does_downgrade) $this->downgrade_scheme(); + + if ($did_downgrade) $this->downgrade_scheme(); + $room_data = json_decode($rooms_raw, true); + if ($room_data == null) { - log_info("Failed parsing /rooms."); - return false; + log_info("Failed parsing /rooms for $base_url."); + return null; } - log_debug("Fetched /rooms successfully"); + + log_debug("Fetched /rooms successfully for $base_url"); + // log_value($room_data); + return $room_data; } /** - * Attempts to fetch the current server's rooms using observed room names. - * Downgrades the server's scheme to HTTP if necessary. - * @return ?array Associative data about rooms if successful. + * @return Generator */ - private function fetch_room_hints(): ?array { + private function fetch_room_hints_coroutine(): Generator { + global $FAST_FETCH_MODE; + $base_url = $this->base_url; $rooms = []; + if (empty($this->room_hints)) { + log_debug("No room hints to scan for $base_url."); + return null; + } + foreach ($this->room_hints as $token) { - log_debug("Testing room /$token."); - list($room_raw, $downgrade) = curl_get_contents_downgrade("$base_url/room/$token", retries: 2); + log_debug("Testing room /$token at $base_url."); + + // FIXME: This fetches room hints sequentially per each server + // Would need to allow yielding handle arrays + + $room_api_response = yield from FetchingCoroutine + ::from_url($this->get_room_api_url($token)) + // Afford more attempts thanks to reachability test + // TODO Move retryability to outer invocation + ->retryable(retries: $FAST_FETCH_MODE ? 2 : 4) + ->downgradeable($did_downgrade) + ->run(); + + $room_raw = $room_api_response ? curl_multi_getcontent($room_api_response) : null; + if (!$room_raw) { - log_info("Room /$token not reachable."); + log_info("Room /$token not reachable at $base_url."); continue; } - if ($downgrade) $this->downgrade_scheme(); + + if ($did_downgrade) $this->downgrade_scheme(); + $room_data = json_decode($room_raw, true); + if ($room_data == null) { if (count($rooms) == 0) { - log_info("Room /$token not parsable."); + log_info("Room /$token not parsable at $base_url."); break; } else { - log_debug("Room /$token not parsable, continuing."); + log_debug("Room /$token not parsable at $base_url, continuing."); continue; } } + $rooms[] = $room_data; } // Mark no rooms as failure. - if (count($rooms) == 0) { - log_debug("No room hints were valid."); + if (empty($rooms)) { + log_debug("No room hints were valid at $base_url."); return null; } return $rooms; } - /** - * Attempt to fetch rooms for tbe current server using SOGS API. - * - * @return bool True if successful, false otherwise. - */ - function fetch_rooms(): bool { + function check_reachability_coroutine() { global $FAST_FETCH_MODE; - - $this->log_details(); $base_url = $this->base_url; - // Check reachability before polling too much. - if (count($this->room_hints) >= 2) { - log_info("Checking reachability for $base_url first..."); - if (!url_is_reachable($base_url, retries: $FAST_FETCH_MODE ? 1 : 4)) { - log_warning("Reachability test failed by $base_url."); - return false; - } - } - - log_info("Fetching rooms for $base_url."); - $room_data = $this->fetch_room_list(); - if (!$room_data) $room_data = $this->fetch_room_hints(); - if ($room_data == null) { - log_warning("Could not fetch rooms for $base_url."); + log_info("Checking reachability for $base_url first..."); + + /** @var CurlHandle|false $response_handle */ + $response_handle = + yield from FetchingCoroutine + ::from_url($base_url, [CURLOPT_NOBODY => true]) + ->set_response_filter(function (CurlHandle $handle) { + $code = curl_getinfo($handle, CURLINFO_RESPONSE_CODE); + $url = curl_getinfo($handle, CURLINFO_EFFECTIVE_URL); + log_debug("Got $code for $url in custom filter."); + return $code != 0; + }) + ->retryable(retries: $FAST_FETCH_MODE ? 2 : 4) + ->downgradeable($did_downgrade) + ->run(); + + if (!$response_handle) { + log_warning("Reachability test failed by $base_url."); return false; } - $this->rooms = CommunityRoom::from_details_array($this, $room_data); + + if ($did_downgrade) $this->downgrade_scheme(); return true; } - function fetch_rooms_coroutine() { + /** + * @return \Generator + */ + function fetch_rooms_coroutine(): Generator { $this->log_details(); $base_url = $this->base_url; - /* // Check reachability before polling too much. if (count($this->room_hints) >= 2) { - log_info("Checking reachability for $base_url first..."); - if (!url_is_reachable($base_url, retries: $FAST_FETCH_MODE ? 1 : 4)) { - log_warning("Reachability test failed by $base_url."); + if (!yield from $this->check_reachability_coroutine()) { return false; } } - */ log_info("Fetching rooms for $base_url."); - yield from ($room_list_coroutine = $this->fetch_room_list_coroutine()); - $room_data = $room_list_coroutine->getReturn(); - if ($room_data === null) { - yield from ($room_hints_coroutine = $this->fetch_room_hints_coroutine()); - $room_data = $room_hints_coroutine->getReturn(); - } + /** @var array|null $room_data */ + $room_data = + (yield from $this->fetch_room_list_coroutine()) ?? + (yield from $this->fetch_room_hints_coroutine()); + if ($room_data === null) { log_warning("Could not fetch rooms for $base_url."); return false; } + $this->rooms = CommunityRoom::from_details_array($this, $room_data); return true; } /** - * Attempt to fetch server public key by parsing SOGS HTML preview. - * - * @return bool True iff no key conflict has arised and we have a pubkey. + * @return \Generator */ - function fetch_pubkey() { + function fetch_pubkey_coroutine(): Generator { global $FAST_FETCH_MODE; + $base_url = $this->base_url; + if (empty($this->rooms)) { - log_warning("Server has no rooms to poll for public key"); + log_warning("Server $base_url has no rooms to poll for public key"); return false; } @@ -1018,7 +1041,14 @@ $preview_url = $this->rooms[0]->get_preview_url(); log_info("Fetching pubkey from $preview_url"); - $room_view = curl_get_contents($preview_url, retries: $has_pubkey || $FAST_FETCH_MODE ? 1 : 5); + $room_view_response = yield from FetchingCoroutine + ::from_url($preview_url) + ->retryable($has_pubkey || $FAST_FETCH_MODE ? 1 : 5) + ->run(); + + $room_view = $room_view_response + ? curl_multi_getcontent($room_view_response) + : null; if (!$room_view) { log_debug("Failed to fetch room preview from $preview_url.");