using AssetManager.Data;
using AssetManager.Data.Repositories;
using AssetManager.Models.DTOs;
using Microsoft.Extensions.Logging;
using System.Collections.Concurrent;
namespace AssetManager.Infrastructure.Services;
///
/// 市场数据服务实现(组合模式)
/// 数据源优先级:
/// - 实时价格:Yahoo → 腾讯 → Tiingo
/// - 历史K线:Yahoo → Tiingo(腾讯历史接口已废弃)
/// - 加密货币:OKX
///
public class MarketDataService : IMarketDataService
{
private readonly ILogger _logger;
private readonly ITencentMarketService _tencentService;
private readonly IYahooMarketService _yahooService;
private readonly ITiingoMarketService _tiingoService;
private readonly IOkxMarketService _okxService;
private readonly IMarketDataRepository _marketDataRepo;
// 静态内存缓存层(跨请求共享,避免并发数据库查询导致连接池冲突)
private static readonly ConcurrentDictionary _memoryCache = new();
// 静态 pending 请求字典(跨请求共享,防止并发请求同一股票)
private static readonly ConcurrentDictionary>> _pendingPriceRequests = new();
public MarketDataService(
ILogger logger,
ITencentMarketService tencentService,
IYahooMarketService yahooService,
ITiingoMarketService tiingoService,
IOkxMarketService okxService,
IMarketDataRepository marketDataRepo)
{
_logger = logger;
_tencentService = tencentService;
_yahooService = yahooService;
_tiingoService = tiingoService;
_okxService = okxService;
_marketDataRepo = marketDataRepo;
}
///
/// 获取实时价格(自动根据资产类型路由到对应数据源)
/// 使用静态缓存 + Lazy 确保跨请求并发安全
///
public async Task GetPriceAsync(string symbol, string assetType)
{
var cacheKey = $"{symbol.ToUpper()}_{assetType.ToUpper()}";
_logger.LogInformation("[价格查询开始] Symbol={Symbol}, AssetType={AssetType}, CacheKey={CacheKey}", symbol, assetType, cacheKey);
try
{
// 第一步:查静态内存缓存(快速返回)
if (_memoryCache.TryGetValue(cacheKey, out var memoryCached) && memoryCached.expireAt > DateTime.Now)
{
_logger.LogInformation("[内存缓存命中] Symbol={Symbol}, Price={Price}", symbol, memoryCached.cache.Price);
return new MarketPriceResponse
{
Symbol = memoryCached.cache.Symbol,
Price = memoryCached.cache.Price,
PreviousClose = memoryCached.cache.PreviousClose ?? 0,
Timestamp = memoryCached.cache.FetchedAt,
AssetType = memoryCached.cache.AssetType
};
}
// 第二步:创建 Lazy 并尝试添加到字典
// 先创建 Lazy,再 GetOrAdd,确保所有线程使用同一个 Lazy
var lazy = new Lazy>(() => GetPriceInternalAsync(symbol, assetType, cacheKey));
var actualLazy = _pendingPriceRequests.GetOrAdd(cacheKey, lazy);
try
{
var result = await actualLazy.Value;
_logger.LogInformation("[价格获取成功] Symbol={Symbol}, Price={Price}", symbol, result.Price);
return result;
}
finally
{
// 请求完成后移除
_pendingPriceRequests.TryRemove(cacheKey, out _);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "[价格获取异常] Symbol={Symbol}, AssetType={AssetType}", symbol, assetType);
throw;
}
}
///
/// 内部获取价格方法(查数据库缓存 → 获取价格)
///
private async Task GetPriceInternalAsync(string symbol, string assetType, string cacheKey)
{
_logger.LogInformation("[内部查询] Symbol={Symbol}, 查数据库缓存", symbol);
// 查数据库缓存
var cached = await _marketDataRepo.GetPriceCacheAsync(symbol, assetType);
if (cached != null && cached.Price > 0)
{
_logger.LogInformation("[数据库缓存命中] Symbol={Symbol}, Price={Price}, ExpiredAt={ExpiredAt}", symbol, cached.Price, cached.ExpiredAt);
// 写入内存缓存
_memoryCache[cacheKey] = (cached, cached.ExpiredAt);
return new MarketPriceResponse
{
Symbol = cached.Symbol,
Price = cached.Price,
PreviousClose = cached.PreviousClose ?? 0,
Timestamp = cached.FetchedAt,
AssetType = cached.AssetType
};
}
if (cached != null && cached.Price <= 0)
{
_logger.LogWarning("[数据库缓存价格无效] Symbol={Symbol}, Price={Price}", symbol, cached.Price);
}
else
{
_logger.LogWarning("[数据库缓存未命中] Symbol={Symbol}", symbol);
}
// 从数据源获取
return await FetchPriceFromSourceAsync(symbol, assetType);
}
///
/// 从数据源获取价格(内部方法)
///
private async Task FetchPriceFromSourceAsync(string symbol, string assetType)
{
_logger.LogInformation("[数据源获取开始] Symbol={Symbol}, AssetType={AssetType}", symbol, assetType);
MarketPriceResponse response;
string source;
if (assetType.Equals("Crypto", StringComparison.OrdinalIgnoreCase))
{
_logger.LogInformation("[数据源] 使用 OKX 获取 {Symbol}", symbol);
response = await _okxService.GetCryptoPriceAsync(symbol);
source = "OKX";
}
else
{
// 股票:优先Yahoo财经,失败降级腾讯财经,最后降级 Tiingo
try
{
_logger.LogInformation("[数据源] 尝试 Yahoo 获取 {Symbol}", symbol);
response = await _yahooService.GetStockPriceAsync(symbol);
source = "Yahoo";
_logger.LogInformation("[数据源] Yahoo 成功: {Symbol} = {Price}", symbol, response.Price);
}
catch (Exception ex)
{
_logger.LogWarning("[数据源] Yahoo 失败: {Symbol}, 错误: {Error}", symbol, ex.Message);
try
{
_logger.LogInformation("[数据源] 降级使用腾讯获取 {Symbol}", symbol);
response = await _tencentService.GetStockPriceAsync(symbol);
source = "Tencent";
_logger.LogInformation("[数据源] 腾讯成功: {Symbol} = {Price}", symbol, response.Price);
}
catch (Exception tencentEx)
{
_logger.LogWarning("[数据源] 腾讯失败: {Symbol}, 错误: {Error}", symbol, tencentEx.Message);
_logger.LogInformation("[数据源] 最后降级使用 Tiingo 获取 {Symbol}", symbol);
response = await _tiingoService.GetStockPriceAsync(symbol);
source = "Tiingo";
_logger.LogInformation("[数据源] Tiingo 成功: {Symbol} = {Price}", symbol, response.Price);
}
}
}
// 验证价格有效性
if (response.Price <= 0)
{
_logger.LogError("[价格无效] Symbol={Symbol}, Price={Price}, Source={Source},不写入缓存", symbol, response.Price, source);
throw new InvalidOperationException($"获取到的价格无效: {symbol} = {response.Price}");
}
// 先写入内存缓存(确保返回给调用者)
var cacheKey = $"{symbol.ToUpper()}_{assetType.ToUpper()}";
var expireAt = DateTime.Now.AddMinutes(5);
_memoryCache[cacheKey] = (new MarketPriceCache
{
Symbol = symbol.ToUpper(),
AssetType = assetType.ToUpper(),
Price = response.Price,
PreviousClose = response.PreviousClose,
FetchedAt = DateTime.Now,
ExpiredAt = expireAt
}, expireAt);
// 写入数据库缓存(Fire-and-Forget,失败不影响主流程)
_ = Task.Run(async () =>
{
try
{
await SavePriceCacheAsync(symbol, assetType, response, source);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "[数据库缓存写入失败] Symbol={Symbol},忽略(内存缓存已生效)", symbol);
}
});
return response;
}
///
/// 获取历史数据(自动根据资产类型路由到对应数据源)
///
public async Task> GetHistoricalDataAsync(string symbol, string assetType, string timeframe, int limit)
{
_logger.LogInformation("获取历史数据: {Symbol}, 资产类型: {AssetType}, 周期: {Timeframe}, 数量: {Limit}",
symbol, assetType, timeframe, limit);
// 先查缓存
var cachedKlines = await _marketDataRepo.GetKlineCacheAsync(symbol, assetType, timeframe, limit);
// 缓存足够,直接返回
if (cachedKlines.Count >= limit)
{
_logger.LogDebug("历史K线缓存命中: {Symbol} {AssetType} {Timeframe}, 数量: {Count}", symbol, assetType, timeframe, cachedKlines.Count);
return cachedKlines.Select(k => new MarketDataResponse
{
Symbol = k.Symbol,
Open = k.Open,
High = k.High,
Low = k.Low,
Close = k.Close,
Volume = k.Volume ?? 0,
Timestamp = k.Timestamp,
AssetType = k.AssetType
}).OrderBy(k => k.Timestamp).ToList();
}
// 缓存不足,调用API补全
List response;
string source;
if (assetType.Equals("Crypto", StringComparison.OrdinalIgnoreCase))
{
response = await _okxService.GetCryptoHistoricalDataAsync(symbol, timeframe, limit);
source = "OKX";
}
else
{
// 股票:优先Yahoo财经,失败降级 Tiingo
// 注:腾讯财经历史K线接口已废弃,不再作为降级数据源
try
{
response = await _yahooService.GetStockHistoricalDataAsync(symbol, timeframe, limit);
if (response.Count > 0)
{
source = "Yahoo";
}
else
{
throw new InvalidOperationException("Yahoo财经返回空数据");
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Yahoo财经历史数据获取失败,降级使用 Tiingo: {Symbol}", symbol);
response = await _tiingoService.GetStockHistoricalDataAsync(symbol, timeframe, limit);
source = "Tiingo";
}
}
// 批量写入缓存
await SaveKlineCacheAsync(symbol, assetType, timeframe, response, source);
return response;
}
///
/// 获取股票实时价格
///
public async Task GetStockPriceAsync(string symbol)
{
return await GetPriceAsync(symbol, "Stock");
}
///
/// 获取加密货币实时价格
///
public async Task GetCryptoPriceAsync(string symbol)
{
return await GetPriceAsync(symbol, "Crypto");
}
///
/// 获取股票历史数据
///
public async Task> GetStockHistoricalDataAsync(string symbol, string timeframe, int limit)
{
return await GetHistoricalDataAsync(symbol, "Stock", timeframe, limit);
}
///
/// 获取加密货币历史数据
///
public async Task> GetCryptoHistoricalDataAsync(string symbol, string timeframe, int limit)
{
return await GetHistoricalDataAsync(symbol, "Crypto", timeframe, limit);
}
// ===== 私有辅助方法 =====
private async Task SavePriceCacheAsync(string symbol, string assetType, MarketPriceResponse response, string source)
{
var cacheEntity = new MarketPriceCache
{
Symbol = symbol.ToUpper(),
AssetType = assetType.ToUpper(),
Price = Math.Round(response.Price, 8),
PreviousClose = response.PreviousClose > 0 ? Math.Round(response.PreviousClose, 8) : null,
Source = source,
FetchedAt = DateTime.Now,
ExpiredAt = GetCacheExpirationTime(assetType)
};
await _marketDataRepo.SavePriceCacheAsync(cacheEntity);
_logger.LogDebug("缓存写入: {Symbol} {AssetType}, 过期时间: {ExpiredAt}", symbol, assetType, cacheEntity.ExpiredAt);
}
private async Task SaveKlineCacheAsync(string symbol, string assetType, string timeframe, List data, string source)
{
var cacheEntities = data.Select(k => new MarketKlineCache
{
Symbol = symbol.ToUpper(),
AssetType = assetType.ToUpper(),
Timeframe = timeframe.ToUpper(),
Timestamp = k.Timestamp,
Open = k.Open,
High = k.High,
Low = k.Low,
Close = k.Close,
Volume = k.Volume,
Source = source,
FetchedAt = DateTime.Now
}).ToList();
await _marketDataRepo.SaveKlineCacheBatchAsync(cacheEntities);
_logger.LogDebug("历史K线缓存写入: {Symbol} {AssetType} {Timeframe}, 数量: {Count}", symbol, assetType, timeframe, cacheEntities.Count);
}
private DateTime GetCacheExpirationTime(string assetType)
{
return assetType.ToLower() switch
{
"crypto" => DateTime.Now.AddMinutes(1),
_ => DateTime.Now.AddMinutes(5)
};
}
}