From 5b546061c0a9c608335eff39a85a85fb87bc3a4e Mon Sep 17 00:00:00 2001 From: OpenClaw Agent Date: Tue, 24 Mar 2026 10:17:53 +0000 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BD=BF=E7=94=A8=20Lazy=20=E5=AE=8C?= =?UTF-8?q?=E5=85=A8=E8=A7=A3=E5=86=B3=E5=B9=B6=E5=8F=91=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 问题分析: 1. GetOrAdd 在并发时可能创建多个 Value 2. 两个请求同时进入 GetPriceAsync,都检查内存缓存未命中 3. 然后同时查数据库,导致连接冲突 解决方案: 1. 使用 Lazy> 确保 Value Factory 只执行一次 2. 把整个流程(查数据库缓存 → 获取价格)放在 Lazy 里 3. 内存缓存命中时快速返回,不进入 Lazy 流程: - 内存缓存命中 → 直接返回 - 内存缓存未命中 → 进入 Lazy(只执行一次) - 查数据库缓存 → 命中则返回 - 数据库缓存未命中 → 从数据源获取 --- .../Services/MarketDataService.cs | 86 +++++++++++-------- 1 file changed, 49 insertions(+), 37 deletions(-) diff --git a/AssetManager.Infrastructure/Services/MarketDataService.cs b/AssetManager.Infrastructure/Services/MarketDataService.cs index cbc214f..41e02fe 100755 --- a/AssetManager.Infrastructure/Services/MarketDataService.cs +++ b/AssetManager.Infrastructure/Services/MarketDataService.cs @@ -25,8 +25,8 @@ public class MarketDataService : IMarketDataService // 内存缓存层(避免并发数据库查询导致连接池冲突) private readonly ConcurrentDictionary _memoryCache = new(); - // 防止并发请求同一股票 - private readonly ConcurrentDictionary> _pendingPriceRequests = new(); + // 防止并发请求同一股票(使用 Lazy 确保只创建一次) + private readonly ConcurrentDictionary>> _pendingPriceRequests = new(); public MarketDataService( ILogger logger, @@ -46,7 +46,7 @@ public class MarketDataService : IMarketDataService /// /// 获取实时价格(自动根据资产类型路由到对应数据源) - /// 使用内存缓存 + 并发控制防止数据库连接池冲突 + /// 使用 Lazy 确保同一时间只有一个请求在处理 /// public async Task GetPriceAsync(string symbol, string assetType) { @@ -55,7 +55,7 @@ public class MarketDataService : IMarketDataService try { - // 第一步:查内存缓存(避免数据库连接池冲突) + // 第一步:查内存缓存(快速返回) if (_memoryCache.TryGetValue(cacheKey, out var memoryCached) && memoryCached.expireAt > DateTime.Now) { _logger.LogInformation("[内存缓存命中] Symbol={Symbol}, Price={Price}", symbol, memoryCached.cache.Price); @@ -69,47 +69,19 @@ public class MarketDataService : IMarketDataService }; } - // 第二步:查数据库缓存(串行化,避免并发冲突) - var cached = await _marketDataRepo.GetPriceCacheAsync(symbol, assetType); - - if (cached != null && cached.Price > 0) - { - _logger.LogInformation("[数据库缓存命中] Symbol={Symbol}, Price={Price}, ExpiredAt={ExpiredAt}", symbol, cached.Price, cached.ExpiredAt); - - // 写入内存缓存 - _memoryCache[cacheKey] = (cached, cached.ExpiredAt); - - return new MarketPriceResponse - { - Symbol = cached.Symbol, - Price = cached.Price, - PreviousClose = cached.PreviousClose ?? 0, - Timestamp = cached.FetchedAt, - AssetType = cached.AssetType - }; - } - - if (cached != null && cached.Price <= 0) - { - _logger.LogWarning("[缓存命中但价格无效] Symbol={Symbol}, Price={Price},忽略缓存重新获取", symbol, cached.Price); - } - else - { - _logger.LogWarning("[缓存未命中] Symbol={Symbol}, AssetType={AssetType},需要从数据源获取", symbol, assetType); - } - - // 第三步:使用 GetOrAdd 模式防止并发重复请求 - var priceTask = _pendingPriceRequests.GetOrAdd(cacheKey, _ => FetchPriceFromSourceAsync(symbol, assetType)); + // 第二步:使用 Lazy 确保同一时间只有一个请求在处理(包括查数据库和获取价格) + var lazyTask = _pendingPriceRequests.GetOrAdd(cacheKey, + _ => new Lazy>(() => GetPriceInternalAsync(symbol, assetType, cacheKey))); try { - var result = await priceTask; + var result = await lazyTask.Value; _logger.LogInformation("[价格获取成功] Symbol={Symbol}, Price={Price}", symbol, result.Price); return result; } finally { - // 请求完成后移除(无论成功失败) + // 请求完成后移除 _pendingPriceRequests.TryRemove(cacheKey, out _); } } @@ -120,6 +92,46 @@ public class MarketDataService : IMarketDataService } } + /// + /// 内部获取价格方法(查数据库缓存 → 获取价格) + /// + private async Task GetPriceInternalAsync(string symbol, string assetType, string cacheKey) + { + _logger.LogInformation("[内部查询] Symbol={Symbol}, 查数据库缓存", symbol); + + // 查数据库缓存 + var cached = await _marketDataRepo.GetPriceCacheAsync(symbol, assetType); + + if (cached != null && cached.Price > 0) + { + _logger.LogInformation("[数据库缓存命中] Symbol={Symbol}, Price={Price}, ExpiredAt={ExpiredAt}", symbol, cached.Price, cached.ExpiredAt); + + // 写入内存缓存 + _memoryCache[cacheKey] = (cached, cached.ExpiredAt); + + return new MarketPriceResponse + { + Symbol = cached.Symbol, + Price = cached.Price, + PreviousClose = cached.PreviousClose ?? 0, + Timestamp = cached.FetchedAt, + AssetType = cached.AssetType + }; + } + + if (cached != null && cached.Price <= 0) + { + _logger.LogWarning("[数据库缓存价格无效] Symbol={Symbol}, Price={Price}", symbol, cached.Price); + } + else + { + _logger.LogWarning("[数据库缓存未命中] Symbol={Symbol}", symbol); + } + + // 从数据源获取 + return await FetchPriceFromSourceAsync(symbol, assetType); + } + /// /// 从数据源获取价格(内部方法) ///