原始设计:
我正在使用
RestTemplate
作为我的HttpClient
执行URL,然后我的服务器将返回JSON字符串作为回应。客户将通过传递其中包含DataKey
的userId
对象来调用此库。使用给定的
userId
,我将找出可以用来获取数据的机器以及然后将这些机器存储在ArrayList
中,以便可以顺序执行它们。之后,我将检查第一个主机名是否在阻止列表中。如果它不在阻止列表中,那么我将使用列表中的第一个主机名创建一个URL并执行它,如果响应成功,则返回响应。但是,如果说第一个主机名在阻止列表中,那么我将尝试获取列表中的第二个主机名并生成url并执行它,因此,基本上,首先在创建URL之前找到不在阻止列表中的主机名。
现在,假设我们选择了不在阻止列表中的第一个主机名并执行了URL,并且服务器以某种方式关闭或没有响应,那么我将执行列表中的第二个主机名,并一直执行直到您得到成功的回应。但是请确保它们也不在阻止列表中,所以我们需要遵循上述要点。
如果所有服务器都已关闭或在阻止列表中,那么我可以简单地记录并返回错误:无法使用。
我正在制作一个库,其中将具有同步(
getSyncData
)和异步(getAsyncData
)方法。getSyncData()
-等待直到得到结果,然后返回结果。getAsyncData()
-立即返回Future
,如果需要,可以在完成其他操作后进行处理。下面是我的
DataClient
类,它将由客户调用,他们将根据要调用的内容将DataKey
对象传递给getSyncData
或getAsyncData
方法。通常,某些客户会调用getSyncData
方法,而有些客户可能会调用getAsyncData
方法。public class DataClient implements Client {
private RestTemplate restTemplate = new RestTemplate();
private ExecutorService service = Executors.newFixedThreadPool(15);
@Override
public DataResponse getSyncData(DataKey key) {
DataResponse response = null;
Future<DataResponse> responseFuture = null;
try {
responseFuture = getAsyncData(key);
response = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
} catch (TimeoutException ex) {
response = new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR);
responseFuture.cancel(true); // terminating the tasks that have got timed out
// logging exception here
} catch (Exception ex) {
response = new DataResponse(DataErrorEnum.ERROR_CLIENT, DataStatusEnum.ERROR);
// logging exception here
}
return response;
}
@Override
public Future<DataResponse> getAsyncData(DataKey key) {
DataFetcherTask task = new DataFetcherTask(key, restTemplate);
Future<DataResponse> future = service.submit(task);
return future;
}
}
DataFetcherTask
类:public class DataFetcherTask implements Callable<DataResponse> {
private DataKey key;
private RestTemplate restTemplate;
public DataFetcherTask(DataKey key, RestTemplate restTemplate) {
this.key = key;
this.restTemplate = restTemplate;
}
@Override
public DataResponse call() throws Exception {
ResponseEntity<String> response = null;
Mappings mappings = ShardMapping.getMappings(key.getFlowType());
List<String> hostnames = mappings.getAllHostnames(key);
for (String hostname : hostnames) {
if (DataUtils.isEmpty(hostname) || ShardMapping.isBlockHost(hostname)) {
continue;
}
try {
String url = generateUrl(hostname);
URI uri = URI.create(url);
response = restTemplate.exchange(uri, HttpMethod.GET, key.getEntity(), String.class);
ShardMapping.unblockHost(hostname);
if (response.getStatusCode() == HttpStatus.NO_CONTENT) {
return new DataResponse(response.getBody(), DataErrorEnum.NO_CONTENT,
DataStatusEnum.SUCCESS);
} else {
return new DataResponse(response.getBody(), DataErrorEnum.OK, DataStatusEnum.SUCCESS);
}
} catch (HttpClientErrorException ex) {
HttpStatusCodeException httpException = ex;
DataErrorEnum error = DataErrorEnum.getErrorEnumByException(httpException);
String errorMessage = httpException.getResponseBodyAsString();
return new DataResponse(errorMessage, error, DataStatusEnum.ERROR);
// logging exception here
} catch (HttpServerErrorException ex) {
HttpStatusCodeException httpException = ex;
DataErrorEnum error = DataErrorEnum.getErrorEnumByException(httpException);
String errorMessage = httpException.getResponseBodyAsString();
return new DataResponse(errorMessage, error, DataStatusEnum.ERROR);
// logging exception here
} catch (RestClientException ex) {
ShardMapping.blockHost(hostname);
// logging exception here
}
}
return new DataResponse(DataErrorEnum.SERVICE_UNAVAILABLE, DataStatusEnum.ERROR);
}
}
我的阻止列表每隔1分钟就会从另一个后台线程进行更新。如果任何服务器已关闭且没有响应,那么我需要使用此服务器来阻止该服务器-
ShardMapping.blockHost(hostname);
并检查是否有任何服务器在阻止列表中,我使用它-
ShardMapping.isBlockHost(hostname);
如果所有服务器都已关闭或最后位于阻止列表中,我将返回
SERVICE_UNAVAILABLE
。现在,上面的代码很长一段时间都可以在生产环境中正常工作。设计更改:
更改是-人们会将
DataKey
(例如keyA)对象传递给我的库,并且然后我将使用DataKey (keyA)
对象中存在的用户ID对另一个服务(我在当前设计中没有做)进行http调用,这将向我返回用户ID的列表,因此我将使用这些用户ID并创建另一个DataKey( keyB,keyC,keyD)对象,用于响应中返回的每个用户ID。然后我将拥有List<DataKey>
对象,该对象将具有keyB,keyC和keyD DataKey
对象。现在,对于List<DataKey>
中的每个DataKey
对象,我想并行执行以上List<DataKey>
方法,然后通过添加每个DataFetcherTask.call
来构成List<DataResponse>
。因此,现在我的DataResponse
类将返回DataFetcherTask
对象而不是List<DataResponse>
,然后DataResponse
和getSyncData
方法的签名也将更改。并行调用的目的是在相同的全局超时值中获取所有这三个最大键的数据。如您所见,我在
getAsyncData
方法中设置了全局超时,因此我想保留这种方式,因为我想限制请求,而不是要求人们轰炸我们的服务器。因此,我也想对这个新的http调用应用相同的全局超时,这将获得用户ID的列表。意思是说如果这个http调用花了一些时间,那么我想超时,因为这是为什么我以前在客户端执行bcoz的原因,这就是原因。下面是我在代码中所做的更改:
public class DataFetcherTask implements Callable<List<DataResponse>> {
private DataKey key;
private RestTemplate restTemplate;
private ExecutorService executorService = Executors.newFixedThreadPool(10);
public DataFetcherTask(DataKey key, RestTemplate restTemplate) {
this.key = key;
this.restTemplate = restTemplate;
}
@Override
public List<DataResponse> call() throws Exception {
List<DataKey> keys = performKeyRequest();
List<Future<DataResponse>> responseFutureList = new ArrayList<Future<DataResponse>>();
for (final DataKey key : keys) {
responseFutureList.add(executorService.submit(new Callable<DataResponse>() {
@Override
public DataResponse call() throws Exception {
return performDataRequest(key);
}
}));
}
List<DataResponse> responseList = new ArrayList<DataResponse>();
for (Future<DataResponse> future : responseFutureList) {
responseList.add(future.get());
}
return responseList;
}
// In this method I am making an HTTP call to another service
// and then I will make List<DataKey> accordingly.
private List<DataKey> performKeyRequest() {
List<DataKey> keys = new ArrayList<>();
// use key object which is passed in contructor to make HTTP call to another service
// and then make List of DataKey object and return keys.
return keys;
}
private DataResponse performDataRequest(DataKey key) {
Mappings mappings = ShardMapping.getMappings(key.getFlowType());
List<String> hostnames = mappings.getAllHostnames(key);
for (String hostname : hostnames) {
if (DataUtils.isEmpty(hostname) || ShardMapping.isBlockHost(hostname)) {
continue;
}
try {
String url = generateUrl(hostname);
URI uri = URI.create(url);
ResponseEntity<String> response = restTemplate.exchange(uri, HttpMethod.GET, key.getEntity(), String.class);
ShardMapping.unblockHost(hostname);
if (response.getStatusCode() == HttpStatus.NO_CONTENT) {
return new DataResponse(response.getBody(), DataErrorEnum.NO_CONTENT,
DataStatusEnum.SUCCESS);
} else {
return new DataResponse(response.getBody(), DataErrorEnum.OK, DataStatusEnum.SUCCESS);
}
} catch (HttpClientErrorException ex) {
HttpStatusCodeException httpException = ex;
DataErrorEnum error = DataErrorEnum.getErrorEnumByException(httpException);
String errorMessage = httpException.getResponseBodyAsString();
return new DataResponse(errorMessage, error, DataStatusEnum.ERROR);
// logging exception here
} catch (HttpServerErrorException ex) {
HttpStatusCodeException httpException = ex;
DataErrorEnum error = DataErrorEnum.getErrorEnumByException(httpException);
String errorMessage = httpException.getResponseBodyAsString();
return new DataResponse(errorMessage, error, DataStatusEnum.ERROR);
// logging exception here
} catch (RestClientException ex) {
ShardMapping.blockHost(hostname);
// logging exception here
}
}
return new DataResponse(DataErrorEnum.SERVICE_UNAVAILABLE, DataStatusEnum.ERROR);
}
}
所以现在您可以看到我有两个执行器,一个在
getSyncData
类之外,另一个在DataFetcherTask
类中。外面的一项是控制可以完成该任务并具有全局级别超时的线程数量。内部是将对DataFetcherTask
方法的调用并行化,以便在相同的全局超时值中,我可以并行查询每个performDataRequest
对象并获取响应。我们是否需要像我这样的两个执行程序?在我的代码中?是否有解决此问题的更好方法,或者我们可以做的任何简化/设计更改?我不确定在另一个调用方法中包含调用方法,这很奇怪。不确定是否是一个好的设计。
请查看我的代码。如果您有任何问题,请询问。如果您有任何意见,请给他们。我仍在使用Java7。
#1 楼
在call()
中,您有重复的代码: } catch (HttpClientErrorException ex) {
HttpStatusCodeException httpException = ex;
DataErrorEnum error = DataErrorEnum.getErrorEnumByException(httpException);
String errorMessage = httpException.getResponseBodyAsString();
return new DataResponse(errorMessage, error, DataStatusEnum.ERROR);
// logging exception here
} catch (HttpServerErrorException ex) {
HttpStatusCodeException httpException = ex;
DataErrorEnum error = DataErrorEnum.getErrorEnumByException(httpException);
String errorMessage = httpException.getResponseBodyAsString();
return new DataResponse(errorMessage, error, DataStatusEnum.ERROR);
// logging exception here
}
由于Java 7,您可以使用多次捕获来删除重复的代码:
} catch (HttpClientErrorException | HttpServerErrorException ex) {
HttpStatusCodeException httpException = ex;
DataErrorEnum error = DataErrorEnum.getErrorEnumByException(httpException);
String errorMessage = httpException.getResponseBodyAsString();
return new DataResponse(errorMessage, error, DataStatusEnum.ERROR);
// logging exception here
}
在设计更改中,它位于
performDataRequest()
中。在设计更改中的
call()
方法中,您具有以下内容: List<Future<DataResponse>> responseFutureList = new ArrayList<Future<DataResponse>>();
如果调用该方法的代码对列表执行的
get
最少,则最好改用LinkedList
,因为LinkedList
的添加速度较快,但获取速度较慢。这里有同样的事情:
List<DataKey> keys = new ArrayList<>();
我对这个话题了解不多;我会把它留给更有经验的程序员。
#2 楼
在每个getAsyncData调用上,使用private ExecutorService executorService = Executors.newFixedThreadPool(10);
(为什么10?)创建新的DataFetcherTask,然后创建3个线程,然后全部放弃。 (请注意,手动关闭线程池比依赖
ThreadPoolExecutor.finalize()
方法要好。)在每个getAsyncData调用上创建线程都非常昂贵。使此线程可重用,例如,通过在DataClient中创建另一个线程池,并将其传递到DataFetcherTask构造函数中。我的阻止列表不断从另一个后台线程更新。 >每1分钟。如果有任何服务器宕机并且没有响应,那么我需要
通过使用此服务器来阻止该服务器...
但是您没有提供有关后台线程的更多详细信息,因此我不知道它是如何工作的。在performDataRequest函数中,您调用
ShardMapping.unblockHost(hostname);
,而是更安全地依赖于后台线程中的运行状况检查。我将简要说明可能发生的情况:在第一个线程中收到主机的响应后,该主机关闭,第二个线程检测到它并将主机添加到blockList中,只有在此之后(线程调度非常复杂),第一个线程调用ShardMapping.unblockHost(hostname);
并从blockList中删除主机(当前处于关闭状态)。因此,再有一个请求将无法到达该主机。在其中包含service .shutdown()或service.shutdownNow()的DataClient中创建关闭函数将很方便。
希望您在最终版本中不要使用硬编码常量
Executors.newFixedThreadPool(10);
,并让用户能够调整您的库。评论
\ $ \ begingroup \ $
欢迎使用代码审查!不错的第一答案!
\ $ \ endgroup \ $
– Heslacher
2015年12月17日下午13:46
#3 楼
CompletionService可以使用
call()
简化您的CompletionService
方法。如果需要的话,这可以使代码在第一人称视角下完成。@Override
public List<DataResponse> call() throws Exception {
List<DataKey> keys = performKeyRequest();
CompletionService<DataResponse> comp = new ExecutorCompletionService<>(executorService);
int count = 0;
for (final DataKey key : keys) {
comp.submit(new Callable<DataResponse>() {
@Override
public DataResponse call() throws Exception {
return performDataRequest(key);
}
});
}
List<DataResponse> responseList = new ArrayList<DataResponse>();
while (count-- > 0) {
Future<DataResponse> future = comp.take();
responseList.add(future.get());
}
return responseList;
}
请注意,
comp.take()
将如何在完成时返回(已完成的)期货order。捕获异常
此“正当”使
performDataRequest
方法仍然是一个问题。它具有太多的错误处理;-)这种处理方式很难阅读。您已经指出了多重捕获系统,因此我不再重复。但是,人们错过了什么,您是自动将Client或Server异常分配为HttpStatusCodeException
....因此,我看了一下,这是文档:Direct已知子类:
HttpClientErrorException
,HttpServerErrorException
所以,由于您的两个“捕获”异常都是基类的子代,因此您最好还是捕获基本异常:
} catch (HttpStatusCodeException ex) {
现在您也不需要分配...摆脱:
HttpStatusCodeException httpException = ex;
无用的变量
您不必将所有内容都设置为变量-一次性使用变量有时会使内容难以阅读-并暗示数据“粘滞”周围”。这些行在这里:
Mappings mappings = ShardMapping.getMappings(key.getFlowType());
List<String> hostnames = mappings.getAllHostnames(key);
for (String hostname : hostnames) {
我将重组为:
Mappings mappings = ShardMapping.getMappings(key.getFlowType());
for (String hostname : mappings.getAllHostnames(key)) {
之所以“排除”主机名变量,是因为
List<String>
通用值使其相对复杂。现在只有mappings
和String
循环。没有明显的泛型。同样,我将切除
uri
变量。它不能用作独立值。if-return
最里面的if / else块应还原为if块。之前:
if (response.getStatusCode() == HttpStatus.NO_CONTENT) {
return new DataResponse(response.getBody(), DataErrorEnum.NO_CONTENT,
DataStatusEnum.SUCCESS);
} else {
return new DataResponse(response.getBody(), DataErrorEnum.OK, DataStatusEnum.SUCCESS);
}
之后:
if (response.getStatusCode() == HttpStatus.NO_CONTENT) {
return new DataResponse(response.getBody(), DataErrorEnum.NO_CONTENT,
DataStatusEnum.SUCCESS);
}
return new DataResponse(response.getBody(), DataErrorEnum.OK, DataStatusEnum.SUCCESS);
畅通无阻
无需取消设置“阻止”状态。如果主机被阻止,您将永远不会运行内部循环,因此您将永远无法解开它。该代码是永远无法运行的代码:
ShardMapping.unblockHost(hostname);
将其删除。
摘要
这将方法简化为:
private DataResponse performDataRequest(DataKey key) {
Mappings mappings = ShardMapping.getMappings(key.getFlowType());
for (String hostname : mappings.getAllHostnames(key)) {
if (DataUtils.isEmpty(hostname) || ShardMapping.isBlockHost(hostname)) {
continue;
}
try {
String url = generateUrl(hostname);
ResponseEntity<String> response = restTemplate.exchange(
URI.create(url), HttpMethod.GET, key.getEntity(), String.class);
if (response.getStatusCode() == HttpStatus.NO_CONTENT) {
return new DataResponse(response.getBody(), DataErrorEnum.NO_CONTENT, DataStatusEnum.SUCCESS);
}
return new DataResponse(response.getBody(), DataErrorEnum.OK, DataStatusEnum.SUCCESS);
} catch (HttpStatusCodeException httpException) {
DataErrorEnum error = DataErrorEnum.getErrorEnumByException(httpException);
String errorMessage = httpException.getResponseBodyAsString();
return new DataResponse(errorMessage, error, DataStatusEnum.ERROR);
// logging exception here
} catch (RestClientException ex) {
ShardMapping.blockHost(hostname);
// logging exception here
}
}
return new DataResponse(DataErrorEnum.SERVICE_UNAVAILABLE, DataStatusEnum.ERROR);
}
评论
\ $ \ begingroup \ $
感谢您建议CompletionService。我将对此进行调查。因此,正如dezhik在下面指出的那样,我是否应该在DataFetcherTask类中定义executorservice,因为他说在每个getAsyncData调用上创建线程都很昂贵。还是应该在DataClient中创建一个新的并将其传递给DataFetcherTask类?
\ $ \ endgroup \ $
–大卫
15年12月18日在2:50
\ $ \ begingroup \ $
如果您有一个通用的执行程序服务就可以了,但是考虑到您正在对某个远程服务器进行REST API调用,则在某个地方,线程创建是您性能问题中最少的。
\ $ \ endgroup \ $
–rolfl
2015年12月18日,下午3:10
\ $ \ begingroup \ $
无需取消设置“阻止”状态。如果主机实际上被阻止(并且在某个时间点上),线程1和线程2将获得同一主机,然后线程1发现该主机已关闭并对其进行阻止,则您将永远不会运行内部循环。紧随该主机可以启动之后,线程2将收到它的响应。但是正如我在回答中提到的那样,还有另一种可能的情况,其中此代码将导致另一个请求处理的延迟。是的,最好将其删除
\ $ \ endgroup \ $
– dezhik
15年12月18日在12:05
评论
\ $ \ begingroup \ $
是的,关于catch块,我已经意识到了这一小小的更改。我尚未在master分支中进行过更改,但是我已经在我的开发人员中进行了计划,我的计划是将其与此代码审查合并,但感谢您让我知道..赞赏
\ $ \ endgroup \ $
–大卫
2015年12月14日23:00