正如许多人所建议的那样,我使用的是客户端池-特别是Apache PoolingHttpClientConnectionManager。为简单起见,我将其包装在我自己的简单单例类中。对不起OTT Stop机制:

public final class HttpClientPool {
  private static final Logger log = LoggerFactory.getLogger(HttpClientPool.class);

  // Single-element enum to implement Singleton.
  private static enum Singleton {
    // Just one of me so constructor will be called once.
    Client;
    // The thread-safe client.
    private final CloseableHttpClient threadSafeClient;
    // The pool monitor.
    private final IdleConnectionMonitorThread monitor;

    // The constructor creates it - thus late
    private Singleton() {
      PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
      // Increase max total connection to 200
      cm.setMaxTotal(200);
      // Increase default max connection per route to 20
      cm.setDefaultMaxPerRoute(20);
      // Build the client.
      threadSafeClient = HttpClients.custom()
              .setConnectionManager(cm)
              .build();
      // Start up an eviction thread.
      monitor = new IdleConnectionMonitorThread(cm);
      // Don't stop quitting.
      monitor.setDaemon(true);
      monitor.start();
    }

    public CloseableHttpClient get() {
      return threadSafeClient;
    }

  }

  public static CloseableHttpClient getClient() {
    // The thread safe client is held by the singleton.
    return Singleton.Client.get();
  }

  // Watches for stale connections and evicts them.
  private static class IdleConnectionMonitorThread extends Thread {
    // The manager to watch.
    private final PoolingHttpClientConnectionManager cm;
    // Use a BlockingQueue to stop everything.
    private final BlockingQueue<Stop> stopSignal = new ArrayBlockingQueue<Stop>(1);

    // Pushed up the queue.
    private static class Stop {
      // The return queue.
      private final BlockingQueue<Stop> stop = new ArrayBlockingQueue<Stop>(1);

      // Called by the process that is being told to stop.
      public void stopped() {
        // Push me back up the queue to indicate we are now stopped.
        stop.add(this);
      }

      // Called by the process requesting the stop.
      public void waitForStopped() throws InterruptedException {
        // Wait until the callee acknowledges that it has stopped.
        stop.take();
      }

    }

    IdleConnectionMonitorThread(PoolingHttpClientConnectionManager cm) {
      super();
      this.cm = cm;
    }

    @Override
    public void run() {
      try {
        // Holds the stop request that stopped the process.
        Stop stopRequest;
        // Every 5 seconds.
        while ((stopRequest = stopSignal.poll(5, TimeUnit.SECONDS)) == null) {
          // Close expired connections
          cm.closeExpiredConnections();
          // Optionally, close connections that have been idle too long.
          cm.closeIdleConnections(60, TimeUnit.SECONDS);
          // Look at pool stats.
          log.trace("Stats: {}", cm.getTotalStats());
        }
        // Acknowledge the stop request.
        stopRequest.stopped();
      } catch (InterruptedException ex) {
        // terminate
      }
    }

    public void shutdown() throws InterruptedException {
      log.trace("Shutting down client pool");
      // Signal the stop to the thread.
      Stop stop = new Stop();
      stopSignal.add(stop);
      // Wait for the stop to complete.
      stop.waitForStopped();
      // Close the pool - Added
      threadSafeClient.close();
      // Close the connection manager.
      cm.close();
      log.trace("Client pool shut down");
    }

  }

  public static void shutdown() throws InterruptedException {
    // Shutdown the monitor.
    Singleton.Client.monitor.shutdown();
  }

}


我仅将其与JSON请求一起使用:

  // General query of the website. Takes an object of type Q and returns one of class R.
  public static <Q extends JSONObject, R> R query(String urlBase, String op, Q q, Class<R> r) throws IOException {
    // The request.
    final HttpRequestBase request;
    //postRequest.addHeader("Accept-Encoding", "gzip,deflate");
    if (q != null) {
      // Prepare the post.
      HttpPost postRequest = new HttpPost(urlBase + op);
      // Get it all into a JSON string.
      StringEntity input = new StringEntity(asJSONString(q));
      input.setContentType("application/json");
      postRequest.setEntity(input);
      // Use that one.
      request = postRequest;
    } else {
      // Just get.
      request = new HttpGet(urlBase + op);
    }
    log.debug("> " + urlBase + op + (q == null ? "" : " " + q));
    // Post it and wait.
    return readResponse(request, HttpClientPool.getClient().execute(request), r);
  }
  public static <R> R readResponse(HttpRequestBase request, CloseableHttpResponse response, Class<R> r) throws IOException {
    // What was read.
    R red = null;
    try {
      // What happened?
      if (response.getStatusLine().getStatusCode() == 200) {
        // Roll out the results
        HttpEntity entity = response.getEntity();
        if (entity != null) {
          InputStream content = entity.getContent();
          try {
            // Roll it directly from the response stream.
            JsonParser rsp = getFactory().createJsonParser(content);
            // Bring back the response.
            red = rsp.readValueAs(r);
          } finally {
            // Always close the content.
            content.close();
          }
        }
      } else {
        // The finally below will clean up.
        throw new IOException("HTTP Response: " + response.getStatusLine().getStatusCode());
      }
    } finally {
      // Always close the response.
      response.close();
    }

    if (red == null) {
      log.debug("< {null}");
    } else {
      log.debug("< {}", red.getClass().isArray() ? Arrays.toString((Object[]) red) : red.toString());
    }
    return red;
  }


这在正常负载下似乎可以正常工作-但是,最近的高负载时期导致一切崩溃。我们甚至干扰了其他托管应用。不知道是什么造成了干扰,但我想确保它的这一方面正确进行。

我看到的异常是:

org.apache.http.NoHttpResponseException: The target server failed to respond
java.net.BindException: Address already in use: connect
java.net.SocketException: No buffer space available (maximum connections reached?): connect -- Thousands of these per hour!!!


所以-我的问题:


我是否正确使用了泳池?
我是否在正确的时间关闭/不关闭?
我应该重置请求( request.reset())?
我有什么遗漏吗?


添加了

发现故意的错误-不会在threadSafeClient时间关闭shutdown。与该问题无关,但很重要。已修复!

还-无法关闭流并检查实体是否为null。已修复!

评论

您可以指定所用库的版本吗?

@ Marc-Andre-httpcomponents版本4.3.3-感谢您的关注。

@马克-安德烈-我忘了!当我使用4.2.5和稍微简化的关闭机制时,发生了问题。我主要是想确认这种方式是否正确。

由于我怀疑Http Apache的每个版本都会有很大变化,所以这就是为什么我要求提供该版本的原因,很高兴您解决了您的问题。我会看看是否能给出答案,但到目前为止一切都很好。

@ Marc-Andre-在Java 5下运行可能也很重要。

#1 楼

我不是库或多线程方面的专家,因此某些建议可能不适用。

TL; DR
我在您的代码中没有发现任何错误。实际上,我看得越多,就越发现只能挑剔的东西,而且我真的在强迫自己。我会很高兴自己维护该代码。一切都很干净,异常管理非常出色,而且很容易阅读。

readResponse对我来说看起来很干净。处理请求时,重要的是始终“使用”实体和/或关闭请求。起初,我以为您在请求未返回200时遇到了问题。当返回码不是200时,您不会使用通常会阻塞连接的Entity,但是如果您对Entity的请求似乎不需要使用close()。在我的代码中,我仍在使用EntityUtils.consume(),但这可能有点过分。

query对我来说似乎有点奇怪。我总是看到getpost两者都不是。我发现query可以同时执行这两项操作很奇怪,因为通常这是非常不同的两件事。尽管如此,它仍然处理得很好。

初读IMO时,很难完全掌握这行内容:


return readResponse(request, HttpClientPool.getClient().execute(request), r);



我很难发现这实际上是您发出请求的地方。可以单独使用它,以更好地发挥其重要作用。我发现它也更容易调试。


final HttpRequestBase request;
//postRequest.addHeader("Accept-Encoding", "gzip,deflate");



我想这条评论简直是死话。可以将其删除,因为它没有用。总的来说,我发现我的口味有太多注释,但这基本上是因为您的代码非常简洁,注释只是通过阅读代码来表达我的理解。 (质量给我留下了深刻的印象)


// Post it and wait.



这不是很正确,因为您可能正在执行Get请求,而不是Post。我开始认为这些方法最初是与Post请求一起使用的,并发展为两种类型的请求。 (基于无效评论和该评论)


 if (response.getStatusLine().getStatusCode() == 200)



我个人会使用200而不是HttpStatus.OK

评论


\ $ \ begingroup \ $
谢谢Marc的有用评论。您在所有情况下都是正确的-我将相应地调整代码。
\ $ \ endgroup \ $
–OldCurmudgeon
2014年5月10日14:06

#2 楼

这里的一个小更正:

  // Close the pool - Added
  threadSafeClient.close();


将其更改为:

// Close the pool - Added
Singleton.Client.threadSafeClient.close();


说明:

threadSafeClient类中没有IdleConnectionMonitorThread字段,因此代码无法编译。由于此字段在Singleton中存在,因此关闭此字段的正确方法是:

Singleton.Client.threadSafeClient.close();


这可以修复编译错误。

#3 楼

您只能创建一个连接。
请尝试以下操作:

private static enum Singleton {
    // Just one of me so constructor will be called once.
    Client;
    // The pool
    private PoolingHttpClientConnectionManager cm;

    // The constructor creates it - thus late
    private Singleton() {
      cm = new PoolingHttpClientConnectionManager();
      // Increase max total connection to 200
      cm.setMaxTotal(200);
      // Increase default max connection per route to 20
      cm.setDefaultMaxPerRoute(20);

    }

    public CloseableHttpClient get() {
      CloseableHttpClient threadSafeClient = HttpClients.custom()
              .setConnectionManager(cm)
              .build();
              return threadSafeClient;
    }

}


`