|
9 | 9 | package org.elasticsearch.client;
|
10 | 10 |
|
11 | 11 | import org.apache.http.HttpEntity;
|
| 12 | +import org.apache.logging.log4j.LogManager; |
| 13 | +import org.apache.logging.log4j.Logger; |
| 14 | +import org.elasticsearch.Build; |
12 | 15 | import org.elasticsearch.ElasticsearchException;
|
13 | 16 | import org.elasticsearch.ElasticsearchStatusException;
|
14 | 17 | import org.elasticsearch.action.ActionListener;
|
|
64 | 67 | import org.elasticsearch.client.core.TermVectorsRequest;
|
65 | 68 | import org.elasticsearch.client.core.TermVectorsResponse;
|
66 | 69 | import org.elasticsearch.client.tasks.TaskSubmissionResponse;
|
| 70 | +import org.elasticsearch.common.util.concurrent.FutureUtils; |
67 | 71 | import org.elasticsearch.core.CheckedConsumer;
|
68 | 72 | import org.elasticsearch.core.CheckedFunction;
|
69 | 73 | import org.elasticsearch.common.xcontent.ParseField;
|
| 74 | +import org.elasticsearch.common.Strings; |
| 75 | +import org.elasticsearch.common.util.concurrent.ListenableFuture; |
70 | 76 | import org.elasticsearch.common.xcontent.ContextParser;
|
71 | 77 | import org.elasticsearch.common.xcontent.DeprecationHandler;
|
72 | 78 | import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
|
205 | 211 | import java.util.Optional;
|
206 | 212 | import java.util.ServiceLoader;
|
207 | 213 | import java.util.Set;
|
| 214 | +import java.util.concurrent.CompletableFuture; |
| 215 | +import java.util.concurrent.ExecutionException; |
208 | 216 | import java.util.function.Function;
|
209 | 217 | import java.util.stream.Collectors;
|
210 | 218 | import java.util.stream.Stream;
|
|
244 | 252 | */
|
245 | 253 | public class RestHighLevelClient implements Closeable {
|
246 | 254 |
|
| 255 | + private static final Logger logger = LogManager.getLogger(RestHighLevelClient.class); |
| 256 | + |
| 257 | + // To be called using performClientRequest and performClientRequestAsync to ensure version compatibility check |
247 | 258 | private final RestClient client;
|
248 | 259 | private final NamedXContentRegistry registry;
|
249 | 260 | private final CheckedConsumer<RestClient, IOException> doClose;
|
250 | 261 |
|
| 262 | + /** Do not access directly but through getVersionValidationFuture() */ |
| 263 | + private volatile ListenableFuture<Optional<String>> versionValidationFuture; |
| 264 | + |
251 | 265 | private final IndicesClient indicesClient = new IndicesClient(this);
|
252 | 266 | private final ClusterClient clusterClient = new ClusterClient(this);
|
253 | 267 | private final IngestClient ingestClient = new IngestClient(this);
|
@@ -1715,7 +1729,7 @@ private <Req, Resp> Resp internalPerformRequest(Req request,
|
1715 | 1729 | req.setOptions(options);
|
1716 | 1730 | Response response;
|
1717 | 1731 | try {
|
1718 |
| - response = client.performRequest(req); |
| 1732 | + response = performClientRequest(req); |
1719 | 1733 | } catch (ResponseException e) {
|
1720 | 1734 | if (ignores.contains(e.getResponse().getStatusLine().getStatusCode())) {
|
1721 | 1735 | try {
|
@@ -1755,7 +1769,7 @@ protected final <Req extends Validatable, Resp> Optional<Resp> performRequestAnd
|
1755 | 1769 | req.setOptions(options);
|
1756 | 1770 | Response response;
|
1757 | 1771 | try {
|
1758 |
| - response = client.performRequest(req); |
| 1772 | + response = performClientRequest(req); |
1759 | 1773 | } catch (ResponseException e) {
|
1760 | 1774 | if (RestStatus.NOT_FOUND.getStatus() == e.getResponse().getStatusLine().getStatusCode()) {
|
1761 | 1775 | return Optional.empty();
|
@@ -1854,7 +1868,7 @@ private <Req, Resp> Cancellable internalPerformRequestAsync(Req request,
|
1854 | 1868 | req.setOptions(options);
|
1855 | 1869 |
|
1856 | 1870 | ResponseListener responseListener = wrapResponseListener(responseConverter, listener, ignores);
|
1857 |
| - return client.performRequestAsync(req, responseListener); |
| 1871 | + return performClientRequestAsync(req, responseListener); |
1858 | 1872 | }
|
1859 | 1873 |
|
1860 | 1874 |
|
@@ -1920,7 +1934,7 @@ protected final <Req extends Validatable, Resp> Cancellable performRequestAsyncA
|
1920 | 1934 | req.setOptions(options);
|
1921 | 1935 | ResponseListener responseListener = wrapResponseListener404sOptional(response -> parseEntity(response.getEntity(),
|
1922 | 1936 | entityParser), listener);
|
1923 |
| - return client.performRequestAsync(req, responseListener); |
| 1937 | + return performClientRequestAsync(req, responseListener); |
1924 | 1938 | }
|
1925 | 1939 |
|
1926 | 1940 | final <Resp> ResponseListener wrapResponseListener404sOptional(CheckedFunction<Response, Resp, IOException> responseConverter,
|
@@ -2002,6 +2016,204 @@ protected static boolean convertExistsResponse(Response response) {
|
2002 | 2016 | return response.getStatusLine().getStatusCode() == 200;
|
2003 | 2017 | }
|
2004 | 2018 |
|
| 2019 | + private Cancellable performClientRequestAsync(Request request, ResponseListener listener) { |
| 2020 | + |
| 2021 | + ListenableFuture<Optional<String>> versionCheck = getVersionValidationFuture(); |
| 2022 | + |
| 2023 | + // Create a future that tracks cancellation of this method's result and forwards cancellation to the actual LLRC request. |
| 2024 | + CompletableFuture<Void> cancellationForwarder = new CompletableFuture<Void>(); |
| 2025 | + Cancellable result = new Cancellable() { |
| 2026 | + @Override |
| 2027 | + public void cancel() { |
| 2028 | + // Raise the flag by completing the future |
| 2029 | + FutureUtils.cancel(cancellationForwarder); |
| 2030 | + } |
| 2031 | + |
| 2032 | + @Override |
| 2033 | + void runIfNotCancelled(Runnable runnable) { |
| 2034 | + if (cancellationForwarder.isCancelled()) { |
| 2035 | + throw newCancellationException(); |
| 2036 | + } |
| 2037 | + runnable.run(); |
| 2038 | + } |
| 2039 | + }; |
| 2040 | + |
| 2041 | + // Send the request after we have done the version compatibility check. Note that if it has already happened, the listener will |
| 2042 | + // be called immediately on the same thread with no asynchronous scheduling overhead. |
| 2043 | + versionCheck.addListener(new ActionListener<Optional<String>>() { |
| 2044 | + @Override |
| 2045 | + public void onResponse(Optional<String> validation) { |
| 2046 | + if (validation.isEmpty()) { |
| 2047 | + // Send the request and propagate cancellation |
| 2048 | + Cancellable call = client.performRequestAsync(request, listener); |
| 2049 | + cancellationForwarder.whenComplete((r, t) -> |
| 2050 | + // Forward cancellation to the actual request (no need to check parameters as the |
| 2051 | + // only way for cancellationForwarder to be completed is by being cancelled). |
| 2052 | + call.cancel() |
| 2053 | + ); |
| 2054 | + } else { |
| 2055 | + // Version validation wasn't successful, fail the request with the validation result. |
| 2056 | + listener.onFailure(new ElasticsearchException(validation.get())); |
| 2057 | + } |
| 2058 | + } |
| 2059 | + |
| 2060 | + @Override |
| 2061 | + public void onFailure(Exception e) { |
| 2062 | + // Propagate validation request failure. This will be transient since `getVersionValidationFuture` clears the validation |
| 2063 | + // future if the request fails, leading to retries at the next HLRC request (see comments below). |
| 2064 | + listener.onFailure(e); |
| 2065 | + } |
| 2066 | + }); |
| 2067 | + |
| 2068 | + return result; |
| 2069 | + }; |
| 2070 | + |
| 2071 | + private Response performClientRequest(Request request) throws IOException { |
| 2072 | + |
| 2073 | + Optional<String> versionValidation; |
| 2074 | + try { |
| 2075 | + versionValidation = getVersionValidationFuture().get(); |
| 2076 | + } catch (InterruptedException | ExecutionException e) { |
| 2077 | + // Unlikely to happen |
| 2078 | + throw new ElasticsearchException(e); |
| 2079 | + } |
| 2080 | + |
| 2081 | + if (versionValidation.isEmpty()) { |
| 2082 | + return client.performRequest(request); |
| 2083 | + } else { |
| 2084 | + throw new ElasticsearchException(versionValidation.get()); |
| 2085 | + } |
| 2086 | + } |
| 2087 | + |
| 2088 | + /** |
| 2089 | + * Returns a future that asynchronously validates the Elasticsearch product version. Its result is an optional string: if empty then |
| 2090 | + * validation was successful, if present it contains the validation error. API requests should be chained to this future and check |
| 2091 | + * the validation result before going further. |
| 2092 | + * <p> |
| 2093 | + * This future is a memoization of the first successful request to the "/" endpoint and the subsequent compatibility check |
| 2094 | + * ({@see #versionValidationFuture}). Further client requests reuse its result. |
| 2095 | + * <p> |
| 2096 | + * If the version check request fails (e.g. network error), {@link #versionValidationFuture} is cleared so that a new validation |
| 2097 | + * request is sent at the next HLRC request. This allows retries to happen while avoiding a busy retry loop (LLRC retries on the node |
| 2098 | + * pool still happen). |
| 2099 | + */ |
| 2100 | + private ListenableFuture<Optional<String>> getVersionValidationFuture() { |
| 2101 | + ListenableFuture<Optional<String>> currentFuture = this.versionValidationFuture; |
| 2102 | + if (currentFuture != null) { |
| 2103 | + return currentFuture; |
| 2104 | + } else { |
| 2105 | + synchronized (this) { |
| 2106 | + // Re-check in synchronized block |
| 2107 | + currentFuture = this.versionValidationFuture; |
| 2108 | + if (currentFuture != null) { |
| 2109 | + return currentFuture; |
| 2110 | + } |
| 2111 | + ListenableFuture<Optional<String>> future = new ListenableFuture<>(); |
| 2112 | + this.versionValidationFuture = future; |
| 2113 | + |
| 2114 | + // Asynchronously call the info endpoint and complete the future with the version validation result. |
| 2115 | + Request req = new Request("GET", "/"); |
| 2116 | + // These status codes are nominal in the context of product version verification |
| 2117 | + req.addParameter("ignore", "401,403"); |
| 2118 | + client.performRequestAsync(req, new ResponseListener() { |
| 2119 | + @Override |
| 2120 | + public void onSuccess(Response response) { |
| 2121 | + Optional<String> validation; |
| 2122 | + try { |
| 2123 | + validation = getVersionValidation(response); |
| 2124 | + } catch (Exception e) { |
| 2125 | + logger.error("Failed to parse info response", e); |
| 2126 | + validation = Optional.of("Failed to parse info response. Check logs for detailed information - " + |
| 2127 | + e.getMessage()); |
| 2128 | + } |
| 2129 | + future.onResponse(validation); |
| 2130 | + } |
| 2131 | + |
| 2132 | + @Override |
| 2133 | + public void onFailure(Exception exception) { |
| 2134 | + |
| 2135 | + // Fail the requests (this one and the ones waiting for it) and clear the future |
| 2136 | + // so that we retry the next time the client executes a request. |
| 2137 | + versionValidationFuture = null; |
| 2138 | + future.onFailure(exception); |
| 2139 | + } |
| 2140 | + }); |
| 2141 | + |
| 2142 | + return future; |
| 2143 | + } |
| 2144 | + } |
| 2145 | + } |
| 2146 | + |
| 2147 | + /** |
| 2148 | + * Validates that the response info() is a compatible Elasticsearch version. |
| 2149 | + * |
| 2150 | + * @return an optional string. If empty, version is compatible. Otherwise, it's the message to return to the application. |
| 2151 | + */ |
| 2152 | + private Optional<String> getVersionValidation(Response response) throws IOException { |
| 2153 | + // Let requests go through if the client doesn't have permissions for the info endpoint. |
| 2154 | + int statusCode = response.getStatusLine().getStatusCode(); |
| 2155 | + if (statusCode == 401 || statusCode == 403) { |
| 2156 | + return Optional.empty(); |
| 2157 | + } |
| 2158 | + |
| 2159 | + MainResponse mainResponse; |
| 2160 | + try { |
| 2161 | + mainResponse = parseEntity(response.getEntity(), MainResponse::fromXContent); |
| 2162 | + } catch (ResponseException e) { |
| 2163 | + throw parseResponseException(e); |
| 2164 | + } |
| 2165 | + |
| 2166 | + String version = mainResponse.getVersion().getNumber(); |
| 2167 | + if (Strings.hasLength(version) == false) { |
| 2168 | + return Optional.of("Missing version.number in info response"); |
| 2169 | + } |
| 2170 | + |
| 2171 | + String[] parts = version.split("\\."); |
| 2172 | + if (parts.length < 2) { |
| 2173 | + return Optional.of("Wrong version.number format in info response"); |
| 2174 | + } |
| 2175 | + |
| 2176 | + int major = Integer.parseInt(parts[0]); |
| 2177 | + int minor = Integer.parseInt(parts[1]); |
| 2178 | + |
| 2179 | + if (major < 6) { |
| 2180 | + return Optional.of("Elasticsearch version 6 or more is required"); |
| 2181 | + } |
| 2182 | + |
| 2183 | + if (major == 6 || (major == 7 && minor < 14)) { |
| 2184 | + if ("You Know, for Search".equalsIgnoreCase(mainResponse.getTagline()) == false) { |
| 2185 | + return Optional.of("Invalid or missing tagline [" + mainResponse.getTagline() + "]"); |
| 2186 | + } |
| 2187 | + |
| 2188 | + if (major == 7) { |
| 2189 | + // >= 7.0 and < 7.14 |
| 2190 | + String responseFlavor = mainResponse.getVersion().getBuildFlavor(); |
| 2191 | + if ("default".equals(responseFlavor) == false) { |
| 2192 | + // Flavor is unknown when running tests, and non-mocked responses will return an unknown flavor |
| 2193 | + if (Build.CURRENT.flavor() != Build.Flavor.UNKNOWN || "unknown".equals(responseFlavor) == false) { |
| 2194 | + return Optional.of("Invalid or missing build flavor [" + responseFlavor + "]"); |
| 2195 | + } |
| 2196 | + } |
| 2197 | + } |
| 2198 | + |
| 2199 | + return Optional.empty(); |
| 2200 | + } |
| 2201 | + |
| 2202 | + String header = response.getHeader("X-Elastic-Product"); |
| 2203 | + if (header == null) { |
| 2204 | + return Optional.of( |
| 2205 | + "Missing [X-Elastic-Product] header. Please check that you are connecting to an Elasticsearch " + |
| 2206 | + "instance, and that any networking filters are preserving that header." |
| 2207 | + ); |
| 2208 | + } |
| 2209 | + |
| 2210 | + if ("Elasticsearch".equals(header) == false) { |
| 2211 | + return Optional.of("Invalid value [" + header + "] for [X-Elastic-Product] header."); |
| 2212 | + } |
| 2213 | + |
| 2214 | + return Optional.empty(); |
| 2215 | + } |
| 2216 | + |
2005 | 2217 | /**
|
2006 | 2218 | * Ignores deprecation warnings. This is appropriate because it is only
|
2007 | 2219 | * used to parse responses from Elasticsearch. Any deprecation warnings
|
|
0 commit comments