private static HttpClient initHttpClient(ReactorResourceFactory resourceFactory) {
ConnectionProvider provider = resourceFactory.getConnectionProvider();
LoopResources resources = resourceFactory.getLoopResources();
Assert.notNull(provider, "No ConnectionProvider: is ReactorResourceFactory not initialized yet?");
Assert.notNull(resources, "No LoopResources: is ReactorResourceFactory not initialized yet?");
return HttpClient.create(provider).tcpConfiguration(tcpClient -> tcpClient.runOn(resources));
private static HttpClient initHttpClient(ReactorResourceFactory resourceFactory) {
ConnectionProvider provider = resourceFactory.getConnectionProvider();
LoopResources resources = resourceFactory.getLoopResources();
Assert.notNull(provider, "No ConnectionProvider: is ReactorResourceFactory not initialized yet?");
Assert.notNull(resources, "No LoopResources: is ReactorResourceFactory not initialized yet?");
return HttpClient.create(provider).tcpConfiguration(tcpClient -> tcpClient.runOn(resources));
private static WebClient.Builder createDefaultWebClient(Duration connectTimeout, Duration readTimeout) { HttpClient httpClient = HttpClient.create() .compress(true) .tcpConfiguration(tcp -> tcp.bootstrap(bootstrap -> bootstrap.option( ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) connectTimeout.toMillis() )).observe((connection, newState) -> { if (ConnectionObserver.State.CONNECTED.equals(newState)) { connection.addHandlerLast(new ReadTimeoutHandler(readTimeout.toMillis(), TimeUnit.MILLISECONDS ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient); return WebClient.builder().clientConnector(connector);
private ReactorClientHttpConnector initConnector() {
if (bufferFactory instanceof NettyDataBufferFactory) {
ByteBufAllocator allocator = ((NettyDataBufferFactory) bufferFactory).getByteBufAllocator();
return new ReactorClientHttpConnector(this.factory, httpClient ->
httpClient.tcpConfiguration
(tcpClient -> tcpClient.option(ChannelOption.ALLOCATOR, allocator)));
else {
return new ReactorClientHttpConnector();
.tcpConfiguration(tcpClient -> {
@Override protected TcpClient tcpConfiguration() { return source.tcpConfiguration();
@Override protected TcpClient tcpConfiguration() { return source.tcpConfiguration();
@Override protected TcpClient tcpConfiguration() { return Objects.requireNonNull(tcpClientMapper.apply(source.tcpConfiguration()), "tcpClientMapper");
@Override protected TcpClient tcpConfiguration() { return source.tcpConfiguration() .bootstrap(this);
@Override protected TcpClient tcpConfiguration() { return source.tcpConfiguration().bootstrap(this);
@Override protected TcpClient tcpConfiguration() { return source.tcpConfiguration() .bootstrap(this);
@Override protected TcpClient tcpConfiguration() { return source.tcpConfiguration() .bootstrap(this);
@Override protected TcpClient tcpConfiguration() { return new OnErrorTcpClient(source.tcpConfiguration(), onRequestError, onResponseError);
/**
* The address to which this client should connect for each subscribe.
* @param connectAddressSupplier A supplier of the address to connect to.
* @return a new {@link HttpClient}
public final HttpClient addressSupplier(Supplier<? extends SocketAddress> connectAddressSupplier) {
Objects.requireNonNull(connectAddressSupplier, "connectAddressSupplier");
return tcpConfiguration(tcpClient -> tcpClient.addressSupplier(connectAddressSupplier));
/**
* The port to which this client should connect.
* @param port The port to connect to.
* @return a new {@link HttpClient}
public final HttpClient port(int port) {
return tcpConfiguration(tcpClient -> tcpClient.port(port));
@Override protected TcpClient tcpConfiguration() { if (sslProvider == null) { return source.tcpConfiguration() .secure(DEFAULT_HTTP_SSL_PROVIDER); return source.tcpConfiguration().secure( SslProvider.addHandlerConfigurator(sslProvider, DEFAULT_HOSTNAME_VERIFICATION));
/**
* Apply a wire logger configuration using {@link HttpClient} category
* and {@code DEBUG} logger level
* @return a new {@link HttpClient}
* @deprecated Use {@link HttpClient#wiretap(boolean)}
@Deprecated
public final HttpClient wiretap() {
return tcpConfiguration(tcpClient -> tcpClient.bootstrap(
b -> BootstrapHandlers.updateLogSupport(b, LOGGING_HANDLER)));
/**
* The HTTP protocol to support. Default is {@link HttpProtocol#HTTP11}.
* @param supportedProtocols The various {@link HttpProtocol} this server will support
* @return a new {@link HttpClient}
public final HttpClient protocol(HttpProtocol... supportedProtocols) {
return tcpConfiguration(tcpClient -> tcpClient.bootstrap(b -> HttpClientConfiguration.protocols(b, supportedProtocols)));
/**
* Use the passed HTTP method to connect the {@link HttpClient}.
* @param method the HTTP method to send
* @return a {@link RequestSender} ready to finalize request and consume for response
public RequestSender request(HttpMethod method) {
Objects.requireNonNull(method, "method");
TcpClient tcpConfiguration = tcpConfiguration().bootstrap(b -> HttpClientConfiguration.method(b, method));
return new HttpClientFinalizer(tcpConfiguration);
@Test public void simpleTest404_1() { ConnectionProvider pool = ConnectionProvider.fixed("http", 1); HttpClient client = HttpClient.create(pool) .port(80) .tcpConfiguration(tcpClient -> tcpClient.host("google.com")) .wiretap(true); doSimpleTest404(client); doSimpleTest404(client); pool.dispose();