fix: 添加内存缓存层解决数据库连接池冲突
根因: - Task.WhenAll 并发获取多个股票价格 - 每个价格查询都访问数据库缓存 - SqlSugar 连接状态冲突:Cannot Open when State is Connecting 解决方案: 1. 添加 ConcurrentDictionary 内存缓存层 2. 先查内存缓存,命中则跳过数据库查询 3. 数据库缓存命中后写入内存缓存 4. API 获取成功后同时写入内存缓存 效果: - 避免并发数据库查询 - 减少数据库连接压力 - 提高响应速度
This commit is contained in:
parent
89c6ca5397
commit
ec7ed6d686
@ -22,6 +22,9 @@ public class MarketDataService : IMarketDataService
|
||||
private readonly IOkxMarketService _okxService;
|
||||
private readonly IMarketDataRepository _marketDataRepo;
|
||||
|
||||
// 内存缓存层(避免并发数据库查询导致连接池冲突)
|
||||
private readonly ConcurrentDictionary<string, (MarketPriceCache cache, DateTime expireAt)> _memoryCache = new();
|
||||
|
||||
// 防止并发请求同一股票
|
||||
private readonly ConcurrentDictionary<string, Task<MarketPriceResponse>> _pendingPriceRequests = new();
|
||||
|
||||
@ -43,7 +46,7 @@ public class MarketDataService : IMarketDataService
|
||||
|
||||
/// <summary>
|
||||
/// 获取实时价格(自动根据资产类型路由到对应数据源)
|
||||
/// 使用并发控制防止重复请求
|
||||
/// 使用内存缓存 + 并发控制防止数据库连接池冲突
|
||||
/// </summary>
|
||||
public async Task<MarketPriceResponse> GetPriceAsync(string symbol, string assetType)
|
||||
{
|
||||
@ -52,12 +55,30 @@ public class MarketDataService : IMarketDataService
|
||||
|
||||
try
|
||||
{
|
||||
// 先查缓存
|
||||
// 第一步:查内存缓存(避免数据库连接池冲突)
|
||||
if (_memoryCache.TryGetValue(cacheKey, out var memoryCached) && memoryCached.expireAt > DateTime.Now)
|
||||
{
|
||||
_logger.LogInformation("[内存缓存命中] Symbol={Symbol}, Price={Price}", symbol, memoryCached.cache.Price);
|
||||
return new MarketPriceResponse
|
||||
{
|
||||
Symbol = memoryCached.cache.Symbol,
|
||||
Price = memoryCached.cache.Price,
|
||||
PreviousClose = memoryCached.cache.PreviousClose ?? 0,
|
||||
Timestamp = memoryCached.cache.FetchedAt,
|
||||
AssetType = memoryCached.cache.AssetType
|
||||
};
|
||||
}
|
||||
|
||||
// 第二步:查数据库缓存(串行化,避免并发冲突)
|
||||
var cached = await _marketDataRepo.GetPriceCacheAsync(symbol, assetType);
|
||||
|
||||
if (cached != null && cached.Price > 0) // ← 验证价格有效
|
||||
if (cached != null && cached.Price > 0)
|
||||
{
|
||||
_logger.LogInformation("[缓存命中] Symbol={Symbol}, Price={Price}, ExpiredAt={ExpiredAt}", symbol, cached.Price, cached.ExpiredAt);
|
||||
_logger.LogInformation("[数据库缓存命中] Symbol={Symbol}, Price={Price}, ExpiredAt={ExpiredAt}", symbol, cached.Price, cached.ExpiredAt);
|
||||
|
||||
// 写入内存缓存
|
||||
_memoryCache[cacheKey] = (cached, cached.ExpiredAt);
|
||||
|
||||
return new MarketPriceResponse
|
||||
{
|
||||
Symbol = cached.Symbol,
|
||||
@ -77,7 +98,7 @@ public class MarketDataService : IMarketDataService
|
||||
_logger.LogWarning("[缓存未命中] Symbol={Symbol}, AssetType={AssetType},需要从数据源获取", symbol, assetType);
|
||||
}
|
||||
|
||||
// 使用 GetOrAdd 模式防止并发重复请求
|
||||
// 第三步:使用 GetOrAdd 模式防止并发重复请求
|
||||
var priceTask = _pendingPriceRequests.GetOrAdd(cacheKey, _ => FetchPriceFromSourceAsync(symbol, assetType));
|
||||
|
||||
try
|
||||
@ -153,9 +174,22 @@ public class MarketDataService : IMarketDataService
|
||||
throw new InvalidOperationException($"获取到的价格无效: {symbol} = {response.Price}");
|
||||
}
|
||||
|
||||
// 写入缓存
|
||||
// 写入数据库缓存
|
||||
await SavePriceCacheAsync(symbol, assetType, response, source);
|
||||
|
||||
// 同时写入内存缓存(5分钟有效)
|
||||
var cacheKey = $"{symbol.ToUpper()}_{assetType.ToUpper()}";
|
||||
var expireAt = DateTime.Now.AddMinutes(5);
|
||||
_memoryCache[cacheKey] = (new MarketPriceCache
|
||||
{
|
||||
Symbol = symbol.ToUpper(),
|
||||
AssetType = assetType.ToUpper(),
|
||||
Price = response.Price,
|
||||
PreviousClose = response.PreviousClose,
|
||||
FetchedAt = DateTime.Now,
|
||||
ExpiredAt = expireAt
|
||||
}, expireAt);
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user