using AssetManager.Data; using AssetManager.Data.Repositories; using AssetManager.Models.DTOs; using Microsoft.Extensions.Logging; using System.Collections.Concurrent; namespace AssetManager.Infrastructure.Services; /// /// 市场数据服务实现(组合模式) /// 数据源优先级: /// - 实时价格:Yahoo → 腾讯 → Tiingo /// - 历史K线:Yahoo → Tiingo(腾讯历史接口已废弃) /// - 加密货币:OKX /// public class MarketDataService : IMarketDataService { private readonly ILogger _logger; private readonly ITencentMarketService _tencentService; private readonly IYahooMarketService _yahooService; private readonly ITiingoMarketService _tiingoService; private readonly IOkxMarketService _okxService; private readonly IMarketDataRepository _marketDataRepo; // 内存缓存层(避免并发数据库查询导致连接池冲突) private readonly ConcurrentDictionary _memoryCache = new(); // 防止并发请求同一股票(使用 Lazy 确保只创建一次) private readonly ConcurrentDictionary>> _pendingPriceRequests = new(); public MarketDataService( ILogger logger, ITencentMarketService tencentService, IYahooMarketService yahooService, ITiingoMarketService tiingoService, IOkxMarketService okxService, IMarketDataRepository marketDataRepo) { _logger = logger; _tencentService = tencentService; _yahooService = yahooService; _tiingoService = tiingoService; _okxService = okxService; _marketDataRepo = marketDataRepo; } /// /// 获取实时价格(自动根据资产类型路由到对应数据源) /// 使用 Lazy 确保同一时间只有一个请求在处理 /// public async Task GetPriceAsync(string symbol, string assetType) { var cacheKey = $"{symbol.ToUpper()}_{assetType.ToUpper()}"; _logger.LogInformation("[价格查询开始] Symbol={Symbol}, AssetType={AssetType}, CacheKey={CacheKey}", symbol, assetType, cacheKey); try { // 第一步:查内存缓存(快速返回) if (_memoryCache.TryGetValue(cacheKey, out var memoryCached) && memoryCached.expireAt > DateTime.Now) { _logger.LogInformation("[内存缓存命中] Symbol={Symbol}, Price={Price}", symbol, memoryCached.cache.Price); return new MarketPriceResponse { Symbol = memoryCached.cache.Symbol, Price = memoryCached.cache.Price, PreviousClose = memoryCached.cache.PreviousClose ?? 0, Timestamp = memoryCached.cache.FetchedAt, AssetType = memoryCached.cache.AssetType }; } // 第二步:使用 Lazy 确保同一时间只有一个请求在处理(包括查数据库和获取价格) var lazyTask = _pendingPriceRequests.GetOrAdd(cacheKey, _ => new Lazy>(() => GetPriceInternalAsync(symbol, assetType, cacheKey))); try { var result = await lazyTask.Value; _logger.LogInformation("[价格获取成功] Symbol={Symbol}, Price={Price}", symbol, result.Price); return result; } finally { // 请求完成后移除 _pendingPriceRequests.TryRemove(cacheKey, out _); } } catch (Exception ex) { _logger.LogError(ex, "[价格获取异常] Symbol={Symbol}, AssetType={AssetType}", symbol, assetType); throw; } } /// /// 内部获取价格方法(查数据库缓存 → 获取价格) /// private async Task GetPriceInternalAsync(string symbol, string assetType, string cacheKey) { _logger.LogInformation("[内部查询] Symbol={Symbol}, 查数据库缓存", symbol); // 查数据库缓存 var cached = await _marketDataRepo.GetPriceCacheAsync(symbol, assetType); if (cached != null && cached.Price > 0) { _logger.LogInformation("[数据库缓存命中] Symbol={Symbol}, Price={Price}, ExpiredAt={ExpiredAt}", symbol, cached.Price, cached.ExpiredAt); // 写入内存缓存 _memoryCache[cacheKey] = (cached, cached.ExpiredAt); return new MarketPriceResponse { Symbol = cached.Symbol, Price = cached.Price, PreviousClose = cached.PreviousClose ?? 0, Timestamp = cached.FetchedAt, AssetType = cached.AssetType }; } if (cached != null && cached.Price <= 0) { _logger.LogWarning("[数据库缓存价格无效] Symbol={Symbol}, Price={Price}", symbol, cached.Price); } else { _logger.LogWarning("[数据库缓存未命中] Symbol={Symbol}", symbol); } // 从数据源获取 return await FetchPriceFromSourceAsync(symbol, assetType); } /// /// 从数据源获取价格(内部方法) /// private async Task FetchPriceFromSourceAsync(string symbol, string assetType) { _logger.LogInformation("[数据源获取开始] Symbol={Symbol}, AssetType={AssetType}", symbol, assetType); MarketPriceResponse response; string source; if (assetType.Equals("Crypto", StringComparison.OrdinalIgnoreCase)) { _logger.LogInformation("[数据源] 使用 OKX 获取 {Symbol}", symbol); response = await _okxService.GetCryptoPriceAsync(symbol); source = "OKX"; } else { // 股票:优先Yahoo财经,失败降级腾讯财经,最后降级 Tiingo try { _logger.LogInformation("[数据源] 尝试 Yahoo 获取 {Symbol}", symbol); response = await _yahooService.GetStockPriceAsync(symbol); source = "Yahoo"; _logger.LogInformation("[数据源] Yahoo 成功: {Symbol} = {Price}", symbol, response.Price); } catch (Exception ex) { _logger.LogWarning("[数据源] Yahoo 失败: {Symbol}, 错误: {Error}", symbol, ex.Message); try { _logger.LogInformation("[数据源] 降级使用腾讯获取 {Symbol}", symbol); response = await _tencentService.GetStockPriceAsync(symbol); source = "Tencent"; _logger.LogInformation("[数据源] 腾讯成功: {Symbol} = {Price}", symbol, response.Price); } catch (Exception tencentEx) { _logger.LogWarning("[数据源] 腾讯失败: {Symbol}, 错误: {Error}", symbol, tencentEx.Message); _logger.LogInformation("[数据源] 最后降级使用 Tiingo 获取 {Symbol}", symbol); response = await _tiingoService.GetStockPriceAsync(symbol); source = "Tiingo"; _logger.LogInformation("[数据源] Tiingo 成功: {Symbol} = {Price}", symbol, response.Price); } } } // 验证价格有效性 if (response.Price <= 0) { _logger.LogError("[价格无效] Symbol={Symbol}, Price={Price}, Source={Source},不写入缓存", symbol, response.Price, source); throw new InvalidOperationException($"获取到的价格无效: {symbol} = {response.Price}"); } // 写入数据库缓存 await SavePriceCacheAsync(symbol, assetType, response, source); // 同时写入内存缓存(5分钟有效) var cacheKey = $"{symbol.ToUpper()}_{assetType.ToUpper()}"; var expireAt = DateTime.Now.AddMinutes(5); _memoryCache[cacheKey] = (new MarketPriceCache { Symbol = symbol.ToUpper(), AssetType = assetType.ToUpper(), Price = response.Price, PreviousClose = response.PreviousClose, FetchedAt = DateTime.Now, ExpiredAt = expireAt }, expireAt); return response; } /// /// 获取历史数据(自动根据资产类型路由到对应数据源) /// public async Task> GetHistoricalDataAsync(string symbol, string assetType, string timeframe, int limit) { _logger.LogInformation("获取历史数据: {Symbol}, 资产类型: {AssetType}, 周期: {Timeframe}, 数量: {Limit}", symbol, assetType, timeframe, limit); // 先查缓存 var cachedKlines = await _marketDataRepo.GetKlineCacheAsync(symbol, assetType, timeframe, limit); // 缓存足够,直接返回 if (cachedKlines.Count >= limit) { _logger.LogDebug("历史K线缓存命中: {Symbol} {AssetType} {Timeframe}, 数量: {Count}", symbol, assetType, timeframe, cachedKlines.Count); return cachedKlines.Select(k => new MarketDataResponse { Symbol = k.Symbol, Open = k.Open, High = k.High, Low = k.Low, Close = k.Close, Volume = k.Volume ?? 0, Timestamp = k.Timestamp, AssetType = k.AssetType }).OrderBy(k => k.Timestamp).ToList(); } // 缓存不足,调用API补全 List response; string source; if (assetType.Equals("Crypto", StringComparison.OrdinalIgnoreCase)) { response = await _okxService.GetCryptoHistoricalDataAsync(symbol, timeframe, limit); source = "OKX"; } else { // 股票:优先Yahoo财经,失败降级 Tiingo // 注:腾讯财经历史K线接口已废弃,不再作为降级数据源 try { response = await _yahooService.GetStockHistoricalDataAsync(symbol, timeframe, limit); if (response.Count > 0) { source = "Yahoo"; } else { throw new InvalidOperationException("Yahoo财经返回空数据"); } } catch (Exception ex) { _logger.LogWarning(ex, "Yahoo财经历史数据获取失败,降级使用 Tiingo: {Symbol}", symbol); response = await _tiingoService.GetStockHistoricalDataAsync(symbol, timeframe, limit); source = "Tiingo"; } } // 批量写入缓存 await SaveKlineCacheAsync(symbol, assetType, timeframe, response, source); return response; } /// /// 获取股票实时价格 /// public async Task GetStockPriceAsync(string symbol) { return await GetPriceAsync(symbol, "Stock"); } /// /// 获取加密货币实时价格 /// public async Task GetCryptoPriceAsync(string symbol) { return await GetPriceAsync(symbol, "Crypto"); } /// /// 获取股票历史数据 /// public async Task> GetStockHistoricalDataAsync(string symbol, string timeframe, int limit) { return await GetHistoricalDataAsync(symbol, "Stock", timeframe, limit); } /// /// 获取加密货币历史数据 /// public async Task> GetCryptoHistoricalDataAsync(string symbol, string timeframe, int limit) { return await GetHistoricalDataAsync(symbol, "Crypto", timeframe, limit); } // ===== 私有辅助方法 ===== private async Task SavePriceCacheAsync(string symbol, string assetType, MarketPriceResponse response, string source) { var cacheEntity = new MarketPriceCache { Symbol = symbol.ToUpper(), AssetType = assetType.ToUpper(), Price = Math.Round(response.Price, 8), PreviousClose = response.PreviousClose > 0 ? Math.Round(response.PreviousClose, 8) : null, Source = source, FetchedAt = DateTime.Now, ExpiredAt = GetCacheExpirationTime(assetType) }; await _marketDataRepo.SavePriceCacheAsync(cacheEntity); _logger.LogDebug("缓存写入: {Symbol} {AssetType}, 过期时间: {ExpiredAt}", symbol, assetType, cacheEntity.ExpiredAt); } private async Task SaveKlineCacheAsync(string symbol, string assetType, string timeframe, List data, string source) { var cacheEntities = data.Select(k => new MarketKlineCache { Symbol = symbol.ToUpper(), AssetType = assetType.ToUpper(), Timeframe = timeframe.ToUpper(), Timestamp = k.Timestamp, Open = k.Open, High = k.High, Low = k.Low, Close = k.Close, Volume = k.Volume, Source = source, FetchedAt = DateTime.Now }).ToList(); await _marketDataRepo.SaveKlineCacheBatchAsync(cacheEntities); _logger.LogDebug("历史K线缓存写入: {Symbol} {AssetType} {Timeframe}, 数量: {Count}", symbol, assetType, timeframe, cacheEntities.Count); } private DateTime GetCacheExpirationTime(string assetType) { return assetType.ToLower() switch { "crypto" => DateTime.Now.AddMinutes(1), _ => DateTime.Now.AddMinutes(5) }; } }