fix: 添加数据库并发限制,解决SqlSugar连接状态冲突
This commit is contained in:
parent
7abb8796ec
commit
d30c3076bd
@ -20,6 +20,9 @@ public class MarketDataService : IMarketDataService
|
|||||||
private readonly ITiingoMarketService _tiingoService;
|
private readonly ITiingoMarketService _tiingoService;
|
||||||
private readonly IOkxMarketService _okxService;
|
private readonly IOkxMarketService _okxService;
|
||||||
private readonly IMarketDataRepository _marketDataRepo;
|
private readonly IMarketDataRepository _marketDataRepo;
|
||||||
|
|
||||||
|
// 限制数据库并发访问,避免 SqlSugar 连接状态冲突
|
||||||
|
private static readonly SemaphoreSlim _dbSemaphore = new(3, 3);
|
||||||
|
|
||||||
public MarketDataService(
|
public MarketDataService(
|
||||||
ILogger<MarketDataService> logger,
|
ILogger<MarketDataService> logger,
|
||||||
@ -44,20 +47,29 @@ public class MarketDataService : IMarketDataService
|
|||||||
{
|
{
|
||||||
_logger.LogInformation("获取实时价格: {Symbol}, 资产类型: {AssetType}", symbol, assetType);
|
_logger.LogInformation("获取实时价格: {Symbol}, 资产类型: {AssetType}", symbol, assetType);
|
||||||
|
|
||||||
// 先查缓存
|
// 限制数据库并发访问
|
||||||
var cached = await _marketDataRepo.GetPriceCacheAsync(symbol, assetType);
|
await _dbSemaphore.WaitAsync();
|
||||||
|
try
|
||||||
if (cached != null)
|
|
||||||
{
|
{
|
||||||
_logger.LogDebug("缓存命中: {Symbol} {AssetType}, 价格: {Price}", symbol, assetType, cached.Price);
|
// 先查缓存
|
||||||
return new MarketPriceResponse
|
var cached = await _marketDataRepo.GetPriceCacheAsync(symbol, assetType);
|
||||||
|
|
||||||
|
if (cached != null)
|
||||||
{
|
{
|
||||||
Symbol = cached.Symbol,
|
_logger.LogDebug("缓存命中: {Symbol} {AssetType}, 价格: {Price}", symbol, assetType, cached.Price);
|
||||||
Price = cached.Price,
|
return new MarketPriceResponse
|
||||||
PreviousClose = cached.PreviousClose ?? 0,
|
{
|
||||||
Timestamp = cached.FetchedAt,
|
Symbol = cached.Symbol,
|
||||||
AssetType = cached.AssetType
|
Price = cached.Price,
|
||||||
};
|
PreviousClose = cached.PreviousClose ?? 0,
|
||||||
|
Timestamp = cached.FetchedAt,
|
||||||
|
AssetType = cached.AssetType
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_dbSemaphore.Release();
|
||||||
}
|
}
|
||||||
|
|
||||||
// 缓存未命中,调用API
|
// 缓存未命中,调用API
|
||||||
@ -108,8 +120,17 @@ public class MarketDataService : IMarketDataService
|
|||||||
_logger.LogInformation("获取历史数据: {Symbol}, 资产类型: {AssetType}, 周期: {Timeframe}, 数量: {Limit}",
|
_logger.LogInformation("获取历史数据: {Symbol}, 资产类型: {AssetType}, 周期: {Timeframe}, 数量: {Limit}",
|
||||||
symbol, assetType, timeframe, limit);
|
symbol, assetType, timeframe, limit);
|
||||||
|
|
||||||
// 先查缓存
|
// 限制数据库并发访问
|
||||||
var cachedKlines = await _marketDataRepo.GetKlineCacheAsync(symbol, assetType, timeframe, limit);
|
await _dbSemaphore.WaitAsync();
|
||||||
|
List<MarketKlineCache> cachedKlines;
|
||||||
|
try
|
||||||
|
{
|
||||||
|
cachedKlines = await _marketDataRepo.GetKlineCacheAsync(symbol, assetType, timeframe, limit);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_dbSemaphore.Release();
|
||||||
|
}
|
||||||
|
|
||||||
// 缓存足够,直接返回
|
// 缓存足够,直接返回
|
||||||
if (cachedKlines.Count >= limit)
|
if (cachedKlines.Count >= limit)
|
||||||
@ -214,7 +235,15 @@ public class MarketDataService : IMarketDataService
|
|||||||
ExpiredAt = GetCacheExpirationTime(assetType)
|
ExpiredAt = GetCacheExpirationTime(assetType)
|
||||||
};
|
};
|
||||||
|
|
||||||
await _marketDataRepo.SavePriceCacheAsync(cacheEntity);
|
await _dbSemaphore.WaitAsync();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await _marketDataRepo.SavePriceCacheAsync(cacheEntity);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_dbSemaphore.Release();
|
||||||
|
}
|
||||||
_logger.LogDebug("缓存写入: {Symbol} {AssetType}, 过期时间: {ExpiredAt}", symbol, assetType, cacheEntity.ExpiredAt);
|
_logger.LogDebug("缓存写入: {Symbol} {AssetType}, 过期时间: {ExpiredAt}", symbol, assetType, cacheEntity.ExpiredAt);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -235,7 +264,15 @@ public class MarketDataService : IMarketDataService
|
|||||||
FetchedAt = DateTime.Now
|
FetchedAt = DateTime.Now
|
||||||
}).ToList();
|
}).ToList();
|
||||||
|
|
||||||
await _marketDataRepo.SaveKlineCacheBatchAsync(cacheEntities);
|
await _dbSemaphore.WaitAsync();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await _marketDataRepo.SaveKlineCacheBatchAsync(cacheEntities);
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_dbSemaphore.Release();
|
||||||
|
}
|
||||||
_logger.LogDebug("历史K线缓存写入: {Symbol} {AssetType} {Timeframe}, 数量: {Count}", symbol, assetType, timeframe, cacheEntities.Count);
|
_logger.LogDebug("历史K线缓存写入: {Symbol} {AssetType} {Timeframe}, 数量: {Count}", symbol, assetType, timeframe, cacheEntities.Count);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user