前段时间在群里面发现有个群友抛出一个实际需求:需要通过一个接口拉取数据,这个接口有每秒10QPS限制,请问如何实现数据拉去效率最大化且限制调用拉取接口每秒10PQPS?我觉得这个需求挺有意思的,跟某群友讨论,发现可以利用JUC包下的Semaphore实现,几行代码就能搞定。这里将实现方案做下整理,算是抛砖引玉吧~
利用Semaphore实现多线程调用接口
- 一、代码实现
- 1.自定义线程池ExecutorConfig
- 2.获取数据接口DataFetchService
- 3.拉取数据接口核心实现RateLimitedDataFetcher
- 4.接口实现类DataFetchServiceImpl
- 5.UserController控制层
- 二、项目结构及源码下载地址
一、代码实现
1.自定义线程池ExecutorConfig
/*** 线程池配置*/
@Component
public class ExecutorConfig {private static int maxPoolSize = Runtime.getRuntime().availableProcessors();private volatile static ExecutorService executorService;public static ExecutorService getThreadPool() {if (executorService == null){synchronized (ExecutorConfig.class){if (executorService == null){executorService = newThreadPool();}}}return executorService;}private static ExecutorService newThreadPool(){int queueSize = 1000;int corePool = Math.min(10, maxPoolSize);return new ThreadPoolExecutor(corePool, maxPoolSize, 10000L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(queueSize),new ThreadPoolExecutor.AbortPolicy());}private ExecutorConfig(){}
}
2.获取数据接口DataFetchService
/*** 获取数据*/
public interface DataFetchService {/*** 获取数据* @return List<User>*/List<User> dataFetchTask();
}
3.拉取数据接口核心实现RateLimitedDataFetcher
@Service
@Slf4j
public class RateLimitedDataFetcher {@Autowiredprivate UserMapper userMapper;private static final int MAX_REQUESTS_PER_SECOND = 10;private Semaphore semaphore = new Semaphore(MAX_REQUESTS_PER_SECOND);private ExecutorService executorService = ExecutorConfig.getThreadPool();public Future<List<User>> fetchData(Integer id) {return executorService.submit((Callable<List<User>>) () -> {try {// 获取许可semaphore.acquire();// 执行网络请求,这里简化为一个延迟操作QueryWrapper qw = new QueryWrapper();//lt是小于,id小于5qw.lt("id", id);return userMapper.selectList(qw);} catch (InterruptedException e) {e.printStackTrace();return null;} finally {// 释放许可semaphore.release();}});}
}
4.接口实现类DataFetchServiceImpl
@Service
@Slf4j
public class DataFetchServiceImpl implements DataFetchService {@Autowiredprivate UserMapper userMapper;@Autowiredprivate RateLimitedDataFetcher rateLimitedDataFetcher;@Overridepublic List<User> dataFetchTask() {List<User> userList = null;try {userList = rateLimitedDataFetcher.fetchData(2).get();} catch (InterruptedException | ExecutionException e) {log.error("DataFetchServiceImpl dataFetchTask error:{}",e.getMessage());}return userList;}
}
5.UserController控制层
/*** 用户控制层** @author hua*/
@RestController
@RequestMapping(value = "/user")
public class UserController {@Autowiredprivate DataFetchService dataFetchService;@GetMapping(value = "/getBatchUser")public ResponseEntity<List<User>> getBatchUser() {List<User> users = dataFetchService.dataFetchTask();HttpStatus status = users == null ? HttpStatus.NOT_FOUND: HttpStatus.OK;return new ResponseEntity<>(users, status);}
}
二、项目结构及源码下载地址
下载地址 springboot-cacheable 欢迎star哦~