diff --git a/AssetManager.Infrastructure/Services/MarketDataService.cs b/AssetManager.Infrastructure/Services/MarketDataService.cs index cbc214f..41e02fe 100755 --- a/AssetManager.Infrastructure/Services/MarketDataService.cs +++ b/AssetManager.Infrastructure/Services/MarketDataService.cs @@ -25,8 +25,8 @@ public class MarketDataService : IMarketDataService // 内存缓存层(避免并发数据库查询导致连接池冲突) private readonly ConcurrentDictionary _memoryCache = new(); - // 防止并发请求同一股票 - private readonly ConcurrentDictionary> _pendingPriceRequests = new(); + // 防止并发请求同一股票(使用 Lazy 确保只创建一次) + private readonly ConcurrentDictionary>> _pendingPriceRequests = new(); public MarketDataService( ILogger logger, @@ -46,7 +46,7 @@ public class MarketDataService : IMarketDataService /// /// 获取实时价格(自动根据资产类型路由到对应数据源) - /// 使用内存缓存 + 并发控制防止数据库连接池冲突 + /// 使用 Lazy 确保同一时间只有一个请求在处理 /// public async Task GetPriceAsync(string symbol, string assetType) { @@ -55,7 +55,7 @@ public class MarketDataService : IMarketDataService try { - // 第一步:查内存缓存(避免数据库连接池冲突) + // 第一步:查内存缓存(快速返回) if (_memoryCache.TryGetValue(cacheKey, out var memoryCached) && memoryCached.expireAt > DateTime.Now) { _logger.LogInformation("[内存缓存命中] Symbol={Symbol}, Price={Price}", symbol, memoryCached.cache.Price); @@ -69,47 +69,19 @@ public class MarketDataService : IMarketDataService }; } - // 第二步:查数据库缓存(串行化,避免并发冲突) - var cached = await _marketDataRepo.GetPriceCacheAsync(symbol, assetType); - - if (cached != null && cached.Price > 0) - { - _logger.LogInformation("[数据库缓存命中] Symbol={Symbol}, Price={Price}, ExpiredAt={ExpiredAt}", symbol, cached.Price, cached.ExpiredAt); - - // 写入内存缓存 - _memoryCache[cacheKey] = (cached, cached.ExpiredAt); - - return new MarketPriceResponse - { - Symbol = cached.Symbol, - Price = cached.Price, - PreviousClose = cached.PreviousClose ?? 0, - Timestamp = cached.FetchedAt, - AssetType = cached.AssetType - }; - } - - if (cached != null && cached.Price <= 0) - { - _logger.LogWarning("[缓存命中但价格无效] Symbol={Symbol}, Price={Price},忽略缓存重新获取", symbol, cached.Price); - } - else - { - _logger.LogWarning("[缓存未命中] Symbol={Symbol}, AssetType={AssetType},需要从数据源获取", symbol, assetType); - } - - // 第三步:使用 GetOrAdd 模式防止并发重复请求 - var priceTask = _pendingPriceRequests.GetOrAdd(cacheKey, _ => FetchPriceFromSourceAsync(symbol, assetType)); + // 第二步:使用 Lazy 确保同一时间只有一个请求在处理(包括查数据库和获取价格) + var lazyTask = _pendingPriceRequests.GetOrAdd(cacheKey, + _ => new Lazy>(() => GetPriceInternalAsync(symbol, assetType, cacheKey))); try { - var result = await priceTask; + var result = await lazyTask.Value; _logger.LogInformation("[价格获取成功] Symbol={Symbol}, Price={Price}", symbol, result.Price); return result; } finally { - // 请求完成后移除(无论成功失败) + // 请求完成后移除 _pendingPriceRequests.TryRemove(cacheKey, out _); } } @@ -120,6 +92,46 @@ public class MarketDataService : IMarketDataService } } + /// + /// 内部获取价格方法(查数据库缓存 → 获取价格) + /// + private async Task GetPriceInternalAsync(string symbol, string assetType, string cacheKey) + { + _logger.LogInformation("[内部查询] Symbol={Symbol}, 查数据库缓存", symbol); + + // 查数据库缓存 + var cached = await _marketDataRepo.GetPriceCacheAsync(symbol, assetType); + + if (cached != null && cached.Price > 0) + { + _logger.LogInformation("[数据库缓存命中] Symbol={Symbol}, Price={Price}, ExpiredAt={ExpiredAt}", symbol, cached.Price, cached.ExpiredAt); + + // 写入内存缓存 + _memoryCache[cacheKey] = (cached, cached.ExpiredAt); + + return new MarketPriceResponse + { + Symbol = cached.Symbol, + Price = cached.Price, + PreviousClose = cached.PreviousClose ?? 0, + Timestamp = cached.FetchedAt, + AssetType = cached.AssetType + }; + } + + if (cached != null && cached.Price <= 0) + { + _logger.LogWarning("[数据库缓存价格无效] Symbol={Symbol}, Price={Price}", symbol, cached.Price); + } + else + { + _logger.LogWarning("[数据库缓存未命中] Symbol={Symbol}", symbol); + } + + // 从数据源获取 + return await FetchPriceFromSourceAsync(symbol, assetType); + } + /// /// 从数据源获取价格(内部方法) ///