diff --git a/AssetManager.Infrastructure/Services/MarketDataService.cs b/AssetManager.Infrastructure/Services/MarketDataService.cs index d5e5206..a3427f4 100755 --- a/AssetManager.Infrastructure/Services/MarketDataService.cs +++ b/AssetManager.Infrastructure/Services/MarketDataService.cs @@ -20,6 +20,9 @@ public class MarketDataService : IMarketDataService private readonly ITiingoMarketService _tiingoService; private readonly IOkxMarketService _okxService; private readonly IMarketDataRepository _marketDataRepo; + + // 限制数据库并发访问,避免 SqlSugar 连接状态冲突 + private static readonly SemaphoreSlim _dbSemaphore = new(3, 3); public MarketDataService( ILogger logger, @@ -44,20 +47,29 @@ public class MarketDataService : IMarketDataService { _logger.LogInformation("获取实时价格: {Symbol}, 资产类型: {AssetType}", symbol, assetType); - // 先查缓存 - var cached = await _marketDataRepo.GetPriceCacheAsync(symbol, assetType); - - if (cached != null) + // 限制数据库并发访问 + await _dbSemaphore.WaitAsync(); + try { - _logger.LogDebug("缓存命中: {Symbol} {AssetType}, 价格: {Price}", symbol, assetType, cached.Price); - return new MarketPriceResponse + // 先查缓存 + var cached = await _marketDataRepo.GetPriceCacheAsync(symbol, assetType); + + if (cached != null) { - Symbol = cached.Symbol, - Price = cached.Price, - PreviousClose = cached.PreviousClose ?? 0, - Timestamp = cached.FetchedAt, - AssetType = cached.AssetType - }; + _logger.LogDebug("缓存命中: {Symbol} {AssetType}, 价格: {Price}", symbol, assetType, cached.Price); + return new MarketPriceResponse + { + Symbol = cached.Symbol, + Price = cached.Price, + PreviousClose = cached.PreviousClose ?? 0, + Timestamp = cached.FetchedAt, + AssetType = cached.AssetType + }; + } + } + finally + { + _dbSemaphore.Release(); } // 缓存未命中,调用API @@ -108,8 +120,17 @@ public class MarketDataService : IMarketDataService _logger.LogInformation("获取历史数据: {Symbol}, 资产类型: {AssetType}, 周期: {Timeframe}, 数量: {Limit}", symbol, assetType, timeframe, limit); - // 先查缓存 - var cachedKlines = await _marketDataRepo.GetKlineCacheAsync(symbol, assetType, timeframe, limit); + // 限制数据库并发访问 + await _dbSemaphore.WaitAsync(); + List cachedKlines; + try + { + cachedKlines = await _marketDataRepo.GetKlineCacheAsync(symbol, assetType, timeframe, limit); + } + finally + { + _dbSemaphore.Release(); + } // 缓存足够,直接返回 if (cachedKlines.Count >= limit) @@ -214,7 +235,15 @@ public class MarketDataService : IMarketDataService ExpiredAt = GetCacheExpirationTime(assetType) }; - await _marketDataRepo.SavePriceCacheAsync(cacheEntity); + await _dbSemaphore.WaitAsync(); + try + { + await _marketDataRepo.SavePriceCacheAsync(cacheEntity); + } + finally + { + _dbSemaphore.Release(); + } _logger.LogDebug("缓存写入: {Symbol} {AssetType}, 过期时间: {ExpiredAt}", symbol, assetType, cacheEntity.ExpiredAt); } @@ -235,7 +264,15 @@ public class MarketDataService : IMarketDataService FetchedAt = DateTime.Now }).ToList(); - await _marketDataRepo.SaveKlineCacheBatchAsync(cacheEntities); + await _dbSemaphore.WaitAsync(); + try + { + await _marketDataRepo.SaveKlineCacheBatchAsync(cacheEntities); + } + finally + { + _dbSemaphore.Release(); + } _logger.LogDebug("历史K线缓存写入: {Symbol} {AssetType} {Timeframe}, 数量: {Count}", symbol, assetType, timeframe, cacheEntities.Count); }