根因分析: - MarketDataRepository 使用注入的 ISqlSugarClient(Scoped) - 多个 Task.Run 并发调用 SavePriceCacheAsync - Storageable 操作使用同一个 SqlSugarScope 实例 - 连接在 await 边界被复用 → 冲突 正确方案: - SavePriceCacheAsync 每次创建新的 SqlSugarClient 实例 - MySQL 连接池会复用底层 TCP 连接,性能开销很小 - 不再需要 SemaphoreSlim 锁 优点: - 完全避免连接冲突 - 代码更简洁 - 并发写入无限制
147 lines
4.8 KiB
C#
147 lines
4.8 KiB
C#
using System.Security.Cryptography;
|
|
using System.Text;
|
|
using AssetManager.Data;
|
|
using Microsoft.Extensions.Logging;
|
|
using SqlSugar;
|
|
|
|
namespace AssetManager.Data.Repositories;
|
|
|
|
/// <summary>
|
|
/// 市场数据仓储实现
|
|
/// </summary>
|
|
public class MarketDataRepository : IMarketDataRepository
|
|
{
|
|
private readonly ISqlSugarClient _db;
|
|
private readonly ILogger<MarketDataRepository> _logger;
|
|
|
|
public MarketDataRepository(ISqlSugarClient db, ILogger<MarketDataRepository> logger)
|
|
{
|
|
_db = db;
|
|
_logger = logger;
|
|
}
|
|
|
|
// ===== 价格缓存 =====
|
|
|
|
public async Task<MarketPriceCache?> GetPriceCacheAsync(string symbol, string assetType)
|
|
{
|
|
var cacheKey = GenerateCacheKey(symbol, assetType);
|
|
_logger.LogDebug("[缓存查询] CacheKey={CacheKey}, Symbol={Symbol}, AssetType={AssetType}", cacheKey, symbol, assetType);
|
|
|
|
try
|
|
{
|
|
var result = await _db.Queryable<MarketPriceCache>()
|
|
.Where(p => p.Id == cacheKey && p.ExpiredAt > DateTime.Now)
|
|
.FirstAsync();
|
|
|
|
if (result != null)
|
|
{
|
|
_logger.LogDebug("[缓存查询成功] CacheKey={CacheKey}, Price={Price}, ExpiredAt={ExpiredAt}", cacheKey, result.Price, result.ExpiredAt);
|
|
}
|
|
else
|
|
{
|
|
_logger.LogDebug("[缓存查询无结果] CacheKey={CacheKey}", cacheKey);
|
|
}
|
|
|
|
return result;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(ex, "[缓存查询异常] CacheKey={CacheKey}", cacheKey);
|
|
return null;
|
|
}
|
|
}
|
|
|
|
public async Task<bool> SavePriceCacheAsync(MarketPriceCache cache)
|
|
{
|
|
cache.Id = GenerateCacheKey(cache.Symbol, cache.AssetType);
|
|
|
|
// 使用新的 SqlSugarClient 实例,避免并发写入时连接冲突
|
|
// 连接池会复用底层连接,性能开销很小
|
|
using var db = SqlSugarConfig.GetSqlSugarClient();
|
|
var result = await db.Storageable(cache).ExecuteCommandAsync();
|
|
return result > 0;
|
|
}
|
|
|
|
// ===== K线缓存 =====
|
|
|
|
public async Task<List<MarketKlineCache>> GetKlineCacheAsync(string symbol, string assetType, string timeframe, int limit)
|
|
{
|
|
return await _db.Queryable<MarketKlineCache>()
|
|
.Where(k => k.Symbol == symbol.ToUpper()
|
|
&& k.AssetType == assetType.ToUpper()
|
|
&& k.Timeframe == timeframe.ToUpper())
|
|
.OrderByDescending(k => k.Timestamp)
|
|
.Take(limit)
|
|
.ToListAsync();
|
|
}
|
|
|
|
public async Task<bool> SaveKlineCacheBatchAsync(List<MarketKlineCache> cacheList)
|
|
{
|
|
if (cacheList == null || cacheList.Count == 0) return false;
|
|
|
|
// 为每条记录生成唯一ID
|
|
foreach (var cache in cacheList)
|
|
{
|
|
cache.Id = GenerateKlineCacheKey(cache.Symbol, cache.AssetType, cache.Timeframe, cache.Timestamp);
|
|
}
|
|
|
|
var result = await _db.Storageable(cacheList)
|
|
.WhereColumns(it => new { it.Id })
|
|
.ExecuteCommandAsync();
|
|
|
|
return result > 0;
|
|
}
|
|
|
|
// ===== Tiingo Ticker =====
|
|
|
|
public async Task<TiingoTicker?> GetTiingoTickerAsync(string symbol)
|
|
{
|
|
return await _db.Queryable<TiingoTicker>()
|
|
.Where(t => t.Ticker == symbol.ToUpper())
|
|
.FirstAsync();
|
|
}
|
|
|
|
public async Task<bool> SaveTiingoTickerAsync(TiingoTicker ticker)
|
|
{
|
|
var result = await _db.Storageable(ticker)
|
|
.WhereColumns(it => new { it.Ticker })
|
|
.ExecuteCommandAsync();
|
|
return result > 0;
|
|
}
|
|
|
|
public async Task<List<TiingoTicker>> SearchTiingoTickersAsync(string keyword, int limit)
|
|
{
|
|
var keywordUpper = keyword.ToUpper();
|
|
return await _db.Queryable<TiingoTicker>()
|
|
.Where(t => t.Ticker != null && t.Ticker.Contains(keywordUpper) ||
|
|
(t.Name != null && t.Name.Contains(keyword)))
|
|
.Take(limit)
|
|
.ToListAsync();
|
|
}
|
|
|
|
// ===== 私有辅助方法 =====
|
|
|
|
private string GenerateCacheKey(string symbol, string assetType)
|
|
{
|
|
return GenerateMd5Hash($"{symbol.ToUpper()}_{assetType.ToUpper()}");
|
|
}
|
|
|
|
private string GenerateKlineCacheKey(string symbol, string assetType, string timeframe, DateTime timestamp)
|
|
{
|
|
return GenerateMd5Hash($"{symbol.ToUpper()}_{assetType.ToUpper()}_{timeframe.ToUpper()}_{timestamp:yyyyMMddHHmmss}");
|
|
}
|
|
|
|
private string GenerateMd5Hash(string input)
|
|
{
|
|
using var md5 = MD5.Create();
|
|
var inputBytes = Encoding.UTF8.GetBytes(input);
|
|
var hashBytes = md5.ComputeHash(inputBytes);
|
|
var sb = new StringBuilder();
|
|
foreach (var b in hashBytes)
|
|
{
|
|
sb.Append(b.ToString("x2"));
|
|
}
|
|
return sb.ToString();
|
|
}
|
|
}
|