异步并发利器:实际项目中使用CompletionService提升系统性能

场景

随着互联网应用的深入,很多传统行业也都需要接入到互联网。我们公司也是这样,保险核心需要和很多保险中介对接,比如阿里、京东等等。这些公司对于接口服务的性能有些比较高的要求,传统的核心无法满足要求,所以信息技术部领导高瞻远瞩,决定开发互联网接入服务,满足来自性能的需求。

概念

CompletionServiceExecutorBlockingQueue的功能融合在一起,将Callable任务提交给CompletionService来执行,然后使用类似于队列操作的take和poll等方法来获得已完成的结果,而这些结果会在完成时被封装为Future。对于更多的概念,请参阅其他网络文档。

线程池的设计,阿里开发手册说过不要使用Java Executors 提供的默认线程池,因此需要更接近实际的情况来自定义一个线程池,根据多次压测,采用的线程池如下:

<code>publicExecutorServicegetThreadPool(){
returnnewThreadPoolExecutor(75,
125,
180000,
TimeUnit.MILLISECONDS,
newLinkedBlockingDeque<>(450),
newThreadPoolExecutor.CallerRunsPolicy());
}/<code>

说明:公司的业务为低频交易,对于单次调用性能要求高,但是并发压力根本不大,所以 阻塞队列已满且线程数达到最大值时所采取的饱和策略为调用者执行。

实现

业务

投保业务主要涉及这几个大的方面:投保校验、核保校验、保费试算

  • 投保校验:最主要的是要查询客户黑名单和风险等级,都是千万级的表。而且投保人和被保人都需要校验
  • 核保校验:除了常规的核保规则校验,查询千万级的大表,还需要调用外部智能核保接口获得用户的风险等级,投保人和被保人都需要校验
  • 保费试算:需要计算每个险种的保费

设计

根据上面的业务,如果串行执行的话,单次性能肯定不高,所以考虑多线程异步执行获得校验结果,再对结果综合判断

  • 投保校验:采用一个线程(也可以根据投保人和被保人数量来采用几个线程)
  • 核保校验:
    • 常规校验:采用一个线程
    • 外部调用:有几个用户(指投保人和被保人)就采用几个线程
  • 保费计算:有几个险种就采用几个线程,最后合并得到整个的保费

代码

以下代码是样例,实际逻辑已经去掉

先创建投保、核保(常规、外部调用)、保费计算4个业务服务类:

投保服务类:InsuranceVerificationServiceImpl,假设耗时50ms

<code>@Service
publicclassInsuranceVerificationServiceImplimplementsInsuranceVerificationService{
privatestaticfinalLoggerlogger=LoggerFactory.getLogger(InsuranceVerificationServiceImpl.class);
@Override

publicTaskResponseModel<object>insuranceCheck(Stringkey,PolicyModelpolicyModel){
try{
//假设耗时50ms
Thread.sleep(50);
returnTaskResponseModel.success().setKey(key).setData(policyModel);
}catch(InterruptedExceptione){
logger.warn(e.getMessage());
returnTaskResponseModel.failure().setKey(key).setResultMessage(e.getMessage());
}
}
}/<object>/<code>

核保常规校验服务类:UnderwritingCheckServiceImpl,假设耗时50ms

<code>@Service
publicclassUnderwritingCheckServiceImplimplementsUnderwritingCheckService{
privatestaticfinalLoggerlogger=LoggerFactory.getLogger(UnderwritingCheckServiceImpl.class);
@Override
publicTaskResponseModel<object>underwritingCheck(Stringkey,PolicyModelpolicyModel){
try{
//假设耗时50ms
Thread.sleep(50);
returnTaskResponseModel.success().setKey(key).setData(policyModel);
}catch(InterruptedExceptione){
logger.warn(e.getMessage());
returnTaskResponseModel.failure().setKey(key).setResultMessage(e.getMessage());
}
}
}/<object>/<code>

核保外部调用服务类:ExternalCallServiceImpl,假设耗时200ms

<code>@Service
publicclassExternalCallServiceImplimplementsExternalCallService{
privatestaticfinalLoggerlogger=LoggerFactory.getLogger(ExternalCallServiceImpl.class);
@Override
publicTaskResponseModel<object>externalCall(Stringkey,Insuredinsured){
try{
//假设耗时200ms
Thread.sleep(200);
ExternalCallResultModelexternalCallResultModel=newExternalCallResultModel();
externalCallResultModel.setIdcard(insured.getIdcard());

externalCallResultModel.setScore(200);
returnTaskResponseModel.success().setKey(key).setData(externalCallResultModel);
}catch(InterruptedExceptione){
logger.warn(e.getMessage());
returnTaskResponseModel.failure().setKey(key).setResultMessage(e.getMessage());
}
}
}/<object>/<code>

试算服务类:TrialCalculationServiceImpl,假设耗时50ms

<code>@Service
publicclassTrialCalculationServiceImplimplementsTrialCalculationService{
privatestaticfinalLoggerlogger=LoggerFactory.getLogger(TrialCalculationServiceImpl.class);
@Override
publicTaskResponseModel<object>trialCalc(Stringkey,Riskrisk){
try{
//假设耗时50ms
Thread.sleep(50);
returnTaskResponseModel.success().setKey(key).setData(risk);
}catch(InterruptedExceptione){
logger.warn(e.getMessage());
returnTaskResponseModel.failure().setKey(key).setResultMessage(e.getMessage());
}
}
}/<object>/<code>

统一返回接口类:TaskResponseModel, 上面4个服务的方法统一返回TaskResponseModel

<code>@Data
@ToString
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
@Accessors(chain=true)
publicclassTaskResponseModel<textendsobject>implementsSerializable{
privateStringkey;//唯一调用标志
privateStringresultCode;//结果码
privateStringresultMessage;//结果信息
privateTdata;//业务处理结果


publicstaticTaskResponseModel<object>success(){
TaskResponseModel<object>taskResponseModel=newTaskResponseModel<>();
taskResponseModel.setResultCode("200");
returntaskResponseModel;
}
publicstaticTaskResponseModel<object>failure(){
TaskResponseModel<object>taskResponseModel=newTaskResponseModel<>();
taskResponseModel.setResultCode("400");
returntaskResponseModel;
}
}/<object>/<object>/<object>/<object>/<textendsobject>/<code>

注:

  1. key为这次调用的唯一标识,由调用者传进来
  2. resultCode结果码,200为成功,400表示有异常
  3. resultMessage信息,表示不成功或者异常信息
  4. data业务处理结果,如果成功的话
  5. 这些服务类都是单例模式

要使用用CompletionService的话,需要创建实现了Callable接口的线程

投保Callable:

<code>@Data
@AllArgsConstructor
publicclassInsuranceVerificationCommandimplementsCallable<taskresponsemodel>>{
privateStringkey;
privatePolicyModelpolicyModel;
privatefinalInsuranceVerificationServiceinsuranceVerificationService;
@Override
publicTaskResponseModel<object>call()throwsException{
returninsuranceVerificationService.insuranceCheck(key,policyModel);
}
}/<object>/<taskresponsemodel>/<code>

核保常规校验Callable:

<code>@Data
@AllArgsConstructor
publicclassUnderwritingCheckCommandimplementsCallable<taskresponsemodel>>{
privateStringkey;
privatePolicyModelpolicyModel;
privatefinalUnderwritingCheckServiceunderwritingCheckService;
@Override
publicTaskResponseModel<object>call()throwsException{
returnunderwritingCheckService.underwritingCheck(key,policyModel);
}
}/<object>/<taskresponsemodel>/<code>

核保外部调用Callable:

<code>@Data
@AllArgsConstructor
publicclassExternalCallCommandimplementsCallable<taskresponsemodel>>{
privateStringkey;
privateInsuredinsured;
privatefinalExternalCallServiceexternalCallService;
@Override
publicTaskResponseModel<object>call()throwsException{
returnexternalCallService.externalCall(key,insured);
}
}/<object>/<taskresponsemodel>/<code>

试算调用Callable:

<code>@Data
@AllArgsConstructor
publicclassTrialCalculationCommandimplementsCallable<taskresponsemodel>>{
privateStringkey;
privateRiskrisk;
privatefinalTrialCalculationServicetrialCalculationService;
@Override
publicTaskResponseModel<object>call()throwsException{
returntrialCalculationService.trialCalc(key,risk);
}
}/<object>/<taskresponsemodel>/<code>

  1. 每一次调用,需要创建这4种Callable
  2. 返回统一接口TaskResopnseModel

异步执行的类:TaskExecutor

<code>@Component
publicclassTaskExecutor{
privatestaticfinalLoggerlogger=LoggerFactory.getLogger(TaskExecutor.class);
//线程池
privatefinalExecutorServiceexecutorService;

publicTaskExecutor(ExecutorServiceexecutorService){
this.executorService=executorService;
}

//异步执行,获取所有结果后返回
publicList<taskresponsemodel>>execute(List<callable>>>commands){

//创建异步执行对象
CompletionService<taskresponsemodel>>completionService=newExecutorCompletionService<>(executorService);
for(Callable<taskresponsemodel>>command:commands){
completionService.submit(command);
}
//获取所有异步执行线程的结果
inttaskCount=commands.size();
List<taskresponsemodel>>params=newArrayList<>(taskCount);
try{
for(inti=0;i<taskcount>Future<taskresponsemodel>>future=completionService.take();
params.add(future.get());
}
}catch(InterruptedException|ExecutionExceptione){
//异常处理
params.clear();
params.add(TaskResponseModel.failure().setKey("error").setResultMessage("异步执行线程错误"));
}
//返回,如果执行中发生error, 则返回相应的key值:error
returnparams;
}
}/<taskresponsemodel>/<taskcount>/<taskresponsemodel>/<taskresponsemodel>/<taskresponsemodel>/<callable>/<taskresponsemodel>/<code>

  1. 为单例模式
  2. 接收参数为List<callable>>>,也就是上面定义的4种Callable的列表/<callable>
  3. 返回List<taskresponsemodel>>,也就是上面定义4种Callable返回的结果列表/<taskresponsemodel>
  4. 我们的业务是对返回结果统一判断,业务返回结果有因果关系
  5. 如果线程执行有异常,也返回List<taskresponsemodel>,这个时候列表中只有一个
    TaskResponseModelkey为error, 后续调用者可以通过这个来判断线程是否执行成功;/<taskresponsemodel>

调用方:CompletionServiceController

<code>@RestController
publicclassCompletionServiceController{
//投保key
privatestaticfinalStringINSURANCE_KEY="insurance_";
//核保key
privatestaticfinalStringUNDERWRITING_KEY="underwriting_";
//外部调用key
privatestaticfinalStringEXTERNALCALL_KEY="externalcall_";
//试算key
privatestaticfinalStringTRIA_KEY="trial_";

privatestaticfinalLoggerlogger=LoggerFactory.getLogger(CompletionServiceController.class);

privatefinalExternalCallServiceexternalCallService;
privatefinalInsuranceVerificationServiceinsuranceVerificationService;
privatefinalTrialCalculationServicetrialCalculationService;
privatefinalUnderwritingCheckServiceunderwritingCheckService;
privatefinalTaskExecutortaskExecutor;

publicCompletionServiceController(ExternalCallServiceexternalCallService,InsuranceVerificationServiceinsuranceVerificationService,TrialCalculationServicetrialCalculationService,UnderwritingCheckServiceunderwritingCheckService,TaskExecutortaskExecutor){
this.externalCallService=externalCallService;
this.insuranceVerificationService=insuranceVerificationService;
this.trialCalculationService=trialCalculationService;
this.underwritingCheckService=underwritingCheckService;
this.taskExecutor=taskExecutor;
}

//多线程异步并发接口
@PostMapping(value="/async",headers="Content-Type=application/json;charset=UTF-8")
publicStringasyncExec(@RequestBodyPolicyModelpolicyModel){
longstart=System.currentTimeMillis();

asyncExecute(policyModel);
logger.info("异步总共耗时:"+(System.currentTimeMillis()-start));
return"ok";
}

//串行调用接口
@PostMapping(value="/sync",headers="Content-Type=application/json;charset=UTF-8")
publicStringsyncExec(@RequestBodyPolicyModelpolicyModel){
longstart=System.currentTimeMillis();
syncExecute(policyModel);
logger.info("同步总共耗时:"+(System.currentTimeMillis()-start));
return"ok";
}
privatevoidasyncExecute(PolicyModelpolicyModel){
List<callable>>>baseTaskCallbackList=newArrayList<>();
//根据被保人外部接口调用
for(Insuredinsured:policyModel.getInsuredList()){
ExternalCallCommandexternalCallCommand=newExternalCallCommand(EXTERNALCALL_KEY+insured.getIdcard(),insured,externalCallService);
baseTaskCallbackList.add(externalCallCommand);
}
//投保校验
InsuranceVerificationCommandinsuranceVerificationCommand=newInsuranceVerificationCommand(INSURANCE_KEY,policyModel,insuranceVerificationService);
baseTaskCallbackList.add(insuranceVerificationCommand);
//核保校验
UnderwritingCheckCommandunderwritingCheckCommand=newUnderwritingCheckCommand(UNDERWRITING_KEY,policyModel,underwritingCheckService);
baseTaskCallbackList.add(underwritingCheckCommand);
//根据险种进行保费试算
for(Riskrisk:policyModel.getRiskList()){
TrialCalculationCommandtrialCalculationCommand=newTrialCalculationCommand(TRIA_KEY+risk.getRiskcode(),risk,trialCalculationService);
baseTaskCallbackList.add(trialCalculationCommand);
}
List<taskresponsemodel>>results=taskExecutor.execute(baseTaskCallbackList);
for(TaskResponseModel<object>t:results){
if(t.getKey().equals("error")){
logger.warn("线程执行失败");
logger.warn(t.toString());
}
logger.info(t.toString());
}

}
privatevoidsyncExecute(PolicyModelpolicyModel){
//根据被保人外部接口调用
for(Insuredinsured:policyModel.getInsuredList()){
TaskResponseModel<object>externalCall=externalCallService.externalCall(insured.getIdcard(),insured);
logger.info(externalCall.toString());
}
//投保校验
TaskResponseModel<object>insurance=insuranceVerificationService.insuranceCheck(INSURANCE_KEY,policyModel);

logger.info(insurance.toString());
//核保校验
TaskResponseModel<object>underwriting=underwritingCheckService.underwritingCheck(UNDERWRITING_KEY,policyModel);
logger.info(underwriting.toString());
//根据险种进行保费试算
for(Riskrisk:policyModel.getRiskList()){
TaskResponseModel<object>risktrial=trialCalculationService.trialCalc(risk.getRiskcode(),risk);
logger.info(risktrial.toString());
}

}
}/<object>/<object>/<object>/<object>/<object>/<taskresponsemodel>/<callable>/<code>

1.为测试方便,提供两个接口调用:一个是串行执行,一个是异步并发执行

2.在异步并发执行函数asyncExecute中:

  1. 根据有多少个被保人,创建多少个外部调用的Callable实例,key值为EXTERNALCALL_KEY + insured.getIdcard(),在一次保单投保调用中,每一个被保人Callablekey是不一样的。
  2. 根据有多少个险种,创建多少个试算的Callable实例,key
    TRIA_KEY + risk.getRiskcode(),在一次保单投保调用中,每一个险种的Callable的key是不一样的
  3. 创建投保校验的Callable实例,业务上只需要一个
  4. 创建核保校验的Callable实例,业务上只需要一个
  5. 将Callable列表传入到TaskExecutor执行异步并发调用
  6. 根据返回结果来判断,通过判断返回的TaskResponseModelkey值可以知道是哪类业务校验,分别进行判断,还可以交叉判断(公司的业务就是要交叉判断)

验证

验证数据:

<code>{
"insuredList":
[{"idcard":"laza","name":"320106"},
{"idcard":"ranran","name":"120102"}],
"policyHolder":"lazasha","policyNo":"345000987","riskList":
[{"mainFlag":1,"premium":300,"riskcode":"risk001","riskname":"险种一"},
{"mainFlag":0,"premium":400,"riskcode":"risk002","riskname":"险种二"}]

}/<code>

上面数据表明:有两个被保人,两个险种。按照我们上面的定义,会调用两次外部接口,两次试算,一次投保,一次核保。而在样例代码中,一次外部接口调用耗时为200ms, 其他都为50ms.

本地开发的配置为8C16G:

  • 同步串行接口调用计算:2 * 200 + 2 * 50 + 50 + 50 = 600ms
  • 多线程异步执行调用计算:按照多线程并发执行原理,取耗时最长的200ms

验证:同步接口

异步并发利器:实际项目中使用CompletionService提升系统性能

输出耗时:可以看到耗时601ms

异步并发利器:实际项目中使用CompletionService提升系统性能

验证:多线程异步执行接口

异步并发利器:实际项目中使用CompletionService提升系统性能

输出耗时:可以看到为204ms

异步并发利器:实际项目中使用CompletionService提升系统性能

结果:基本和我们的预期相符合。

结束

这是将实际生产中的例子简化出来,具体生产的业务比较复杂,不便于展示。

实际情况下,原来的接口需要1000ms以上才能完成单次调用,有的需要2000-3000ms。现在的接口,在生产两台8c16g的虚拟机, 经过4个小时的简单压测能够支持2000用户并发,单次返回时长为350ms左右,服务很稳定,完全能够满足公司的业务发展需求。

Java知音,专注于Java实用文章推送,不容错过!


分享到:


相關文章: