Stop
机制:public final class HttpClientPool {
private static final Logger log = LoggerFactory.getLogger(HttpClientPool.class);
// Single-element enum to implement Singleton.
private static enum Singleton {
// Just one of me so constructor will be called once.
Client;
// The thread-safe client.
private final CloseableHttpClient threadSafeClient;
// The pool monitor.
private final IdleConnectionMonitorThread monitor;
// The constructor creates it - thus late
private Singleton() {
PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
// Increase max total connection to 200
cm.setMaxTotal(200);
// Increase default max connection per route to 20
cm.setDefaultMaxPerRoute(20);
// Build the client.
threadSafeClient = HttpClients.custom()
.setConnectionManager(cm)
.build();
// Start up an eviction thread.
monitor = new IdleConnectionMonitorThread(cm);
// Don't stop quitting.
monitor.setDaemon(true);
monitor.start();
}
public CloseableHttpClient get() {
return threadSafeClient;
}
}
public static CloseableHttpClient getClient() {
// The thread safe client is held by the singleton.
return Singleton.Client.get();
}
// Watches for stale connections and evicts them.
private static class IdleConnectionMonitorThread extends Thread {
// The manager to watch.
private final PoolingHttpClientConnectionManager cm;
// Use a BlockingQueue to stop everything.
private final BlockingQueue<Stop> stopSignal = new ArrayBlockingQueue<Stop>(1);
// Pushed up the queue.
private static class Stop {
// The return queue.
private final BlockingQueue<Stop> stop = new ArrayBlockingQueue<Stop>(1);
// Called by the process that is being told to stop.
public void stopped() {
// Push me back up the queue to indicate we are now stopped.
stop.add(this);
}
// Called by the process requesting the stop.
public void waitForStopped() throws InterruptedException {
// Wait until the callee acknowledges that it has stopped.
stop.take();
}
}
IdleConnectionMonitorThread(PoolingHttpClientConnectionManager cm) {
super();
this.cm = cm;
}
@Override
public void run() {
try {
// Holds the stop request that stopped the process.
Stop stopRequest;
// Every 5 seconds.
while ((stopRequest = stopSignal.poll(5, TimeUnit.SECONDS)) == null) {
// Close expired connections
cm.closeExpiredConnections();
// Optionally, close connections that have been idle too long.
cm.closeIdleConnections(60, TimeUnit.SECONDS);
// Look at pool stats.
log.trace("Stats: {}", cm.getTotalStats());
}
// Acknowledge the stop request.
stopRequest.stopped();
} catch (InterruptedException ex) {
// terminate
}
}
public void shutdown() throws InterruptedException {
log.trace("Shutting down client pool");
// Signal the stop to the thread.
Stop stop = new Stop();
stopSignal.add(stop);
// Wait for the stop to complete.
stop.waitForStopped();
// Close the pool - Added
threadSafeClient.close();
// Close the connection manager.
cm.close();
log.trace("Client pool shut down");
}
}
public static void shutdown() throws InterruptedException {
// Shutdown the monitor.
Singleton.Client.monitor.shutdown();
}
}
我仅将其与JSON请求一起使用:
// General query of the website. Takes an object of type Q and returns one of class R.
public static <Q extends JSONObject, R> R query(String urlBase, String op, Q q, Class<R> r) throws IOException {
// The request.
final HttpRequestBase request;
//postRequest.addHeader("Accept-Encoding", "gzip,deflate");
if (q != null) {
// Prepare the post.
HttpPost postRequest = new HttpPost(urlBase + op);
// Get it all into a JSON string.
StringEntity input = new StringEntity(asJSONString(q));
input.setContentType("application/json");
postRequest.setEntity(input);
// Use that one.
request = postRequest;
} else {
// Just get.
request = new HttpGet(urlBase + op);
}
log.debug("> " + urlBase + op + (q == null ? "" : " " + q));
// Post it and wait.
return readResponse(request, HttpClientPool.getClient().execute(request), r);
}
public static <R> R readResponse(HttpRequestBase request, CloseableHttpResponse response, Class<R> r) throws IOException {
// What was read.
R red = null;
try {
// What happened?
if (response.getStatusLine().getStatusCode() == 200) {
// Roll out the results
HttpEntity entity = response.getEntity();
if (entity != null) {
InputStream content = entity.getContent();
try {
// Roll it directly from the response stream.
JsonParser rsp = getFactory().createJsonParser(content);
// Bring back the response.
red = rsp.readValueAs(r);
} finally {
// Always close the content.
content.close();
}
}
} else {
// The finally below will clean up.
throw new IOException("HTTP Response: " + response.getStatusLine().getStatusCode());
}
} finally {
// Always close the response.
response.close();
}
if (red == null) {
log.debug("< {null}");
} else {
log.debug("< {}", red.getClass().isArray() ? Arrays.toString((Object[]) red) : red.toString());
}
return red;
}
这在正常负载下似乎可以正常工作-但是,最近的高负载时期导致一切崩溃。我们甚至干扰了其他托管应用。不知道是什么造成了干扰,但我想确保它的这一方面正确进行。
我看到的异常是:
org.apache.http.NoHttpResponseException: The target server failed to respond
java.net.BindException: Address already in use: connect
java.net.SocketException: No buffer space available (maximum connections reached?): connect -- Thousands of these per hour!!!
所以-我的问题:
我是否正确使用了泳池?
我是否在正确的时间关闭/不关闭?
我应该重置请求(
request.reset()
)?我有什么遗漏吗?
添加了
发现故意的错误-不会在
threadSafeClient
时间关闭shutdown
。与该问题无关,但很重要。已修复!还-无法关闭流并检查实体是否为null。已修复!
#1 楼
我不是库或多线程方面的专家,因此某些建议可能不适用。TL; DR
我在您的代码中没有发现任何错误。实际上,我看得越多,就越发现只能挑剔的东西,而且我真的在强迫自己。我会很高兴自己维护该代码。一切都很干净,异常管理非常出色,而且很容易阅读。
readResponse
对我来说看起来很干净。处理请求时,重要的是始终“使用”实体和/或关闭请求。起初,我以为您在请求未返回200
时遇到了问题。当返回码不是200
时,您不会使用通常会阻塞连接的Entity
,但是如果您对Entity
的请求似乎不需要使用close()
。在我的代码中,我仍在使用EntityUtils.consume()
,但这可能有点过分。query
对我来说似乎有点奇怪。我总是看到get
或post
两者都不是。我发现query
可以同时执行这两项操作很奇怪,因为通常这是非常不同的两件事。尽管如此,它仍然处理得很好。初读IMO时,很难完全掌握这行内容:
return readResponse(request, HttpClientPool.getClient().execute(request), r);
我很难发现这实际上是您发出请求的地方。可以单独使用它,以更好地发挥其重要作用。我发现它也更容易调试。
final HttpRequestBase request;
//postRequest.addHeader("Accept-Encoding", "gzip,deflate");
我想这条评论简直是死话。可以将其删除,因为它没有用。总的来说,我发现我的口味有太多注释,但这基本上是因为您的代码非常简洁,注释只是通过阅读代码来表达我的理解。 (质量给我留下了深刻的印象)
// Post it and wait.
这不是很正确,因为您可能正在执行
Get
请求,而不是Post
。我开始认为这些方法最初是与Post
请求一起使用的,并发展为两种类型的请求。 (基于无效评论和该评论) if (response.getStatusLine().getStatusCode() == 200)
我个人会使用
200
而不是HttpStatus.OK
。评论
\ $ \ begingroup \ $
谢谢Marc的有用评论。您在所有情况下都是正确的-我将相应地调整代码。
\ $ \ endgroup \ $
–OldCurmudgeon
2014年5月10日14:06
#2 楼
这里的一个小更正: // Close the pool - Added
threadSafeClient.close();
将其更改为:
// Close the pool - Added
Singleton.Client.threadSafeClient.close();
说明:
threadSafeClient
类中没有IdleConnectionMonitorThread
字段,因此代码无法编译。由于此字段在Singleton中存在,因此关闭此字段的正确方法是:Singleton.Client.threadSafeClient.close();
这可以修复编译错误。
#3 楼
您只能创建一个连接。请尝试以下操作:
private static enum Singleton {
// Just one of me so constructor will be called once.
Client;
// The pool
private PoolingHttpClientConnectionManager cm;
// The constructor creates it - thus late
private Singleton() {
cm = new PoolingHttpClientConnectionManager();
// Increase max total connection to 200
cm.setMaxTotal(200);
// Increase default max connection per route to 20
cm.setDefaultMaxPerRoute(20);
}
public CloseableHttpClient get() {
CloseableHttpClient threadSafeClient = HttpClients.custom()
.setConnectionManager(cm)
.build();
return threadSafeClient;
}
}
`
评论
您可以指定所用库的版本吗?@ Marc-Andre-httpcomponents版本4.3.3-感谢您的关注。
@马克-安德烈-我忘了!当我使用4.2.5和稍微简化的关闭机制时,发生了问题。我主要是想确认这种方式是否正确。
由于我怀疑Http Apache的每个版本都会有很大变化,所以这就是为什么我要求提供该版本的原因,很高兴您解决了您的问题。我会看看是否能给出答案,但到目前为止一切都很好。
@ Marc-Andre-在Java 5下运行可能也很重要。