问题排查: 1. 缓存命中时验证价格有效性(Price > 0) 2. 外部 API 返回无效价格时拒绝写入缓存 3. 缓存查询层添加详细日志 4. 捕获缓存查询异常并返回 null 改进: - 缓存价格 <= 0 时忽略缓存重新获取 - 外部 API 价格 <= 0 时抛出异常,避免污染缓存 - 详细日志追踪价格获取全流程
303 lines
11 KiB
C#
Executable File
303 lines
11 KiB
C#
Executable File
using AssetManager.Data;
|
||
using AssetManager.Data.Repositories;
|
||
using AssetManager.Models.DTOs;
|
||
using Microsoft.Extensions.Logging;
|
||
using System.Collections.Concurrent;
|
||
|
||
namespace AssetManager.Infrastructure.Services;
|
||
|
||
/// <summary>
|
||
/// 市场数据服务实现(组合模式)
|
||
/// <para>数据源优先级:</para>
|
||
/// <para>- 实时价格:Yahoo → 腾讯 → Tiingo</para>
|
||
/// <para>- 历史K线:Yahoo → Tiingo(腾讯历史接口已废弃)</para>
|
||
/// <para>- 加密货币:OKX</para>
|
||
/// </summary>
|
||
public class MarketDataService : IMarketDataService
|
||
{
|
||
private readonly ILogger<MarketDataService> _logger;
|
||
private readonly ITencentMarketService _tencentService;
|
||
private readonly IYahooMarketService _yahooService;
|
||
private readonly ITiingoMarketService _tiingoService;
|
||
private readonly IOkxMarketService _okxService;
|
||
private readonly IMarketDataRepository _marketDataRepo;
|
||
|
||
// 防止并发请求同一股票
|
||
private readonly ConcurrentDictionary<string, Task<MarketPriceResponse>> _pendingPriceRequests = new();
|
||
|
||
public MarketDataService(
|
||
ILogger<MarketDataService> logger,
|
||
ITencentMarketService tencentService,
|
||
IYahooMarketService yahooService,
|
||
ITiingoMarketService tiingoService,
|
||
IOkxMarketService okxService,
|
||
IMarketDataRepository marketDataRepo)
|
||
{
|
||
_logger = logger;
|
||
_tencentService = tencentService;
|
||
_yahooService = yahooService;
|
||
_tiingoService = tiingoService;
|
||
_okxService = okxService;
|
||
_marketDataRepo = marketDataRepo;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 获取实时价格(自动根据资产类型路由到对应数据源)
|
||
/// 使用并发控制防止重复请求
|
||
/// </summary>
|
||
public async Task<MarketPriceResponse> GetPriceAsync(string symbol, string assetType)
|
||
{
|
||
var cacheKey = $"{symbol.ToUpper()}_{assetType.ToUpper()}";
|
||
_logger.LogInformation("[价格查询开始] Symbol={Symbol}, AssetType={AssetType}, CacheKey={CacheKey}", symbol, assetType, cacheKey);
|
||
|
||
try
|
||
{
|
||
// 先查缓存
|
||
var cached = await _marketDataRepo.GetPriceCacheAsync(symbol, assetType);
|
||
|
||
if (cached != null && cached.Price > 0) // ← 验证价格有效
|
||
{
|
||
_logger.LogInformation("[缓存命中] Symbol={Symbol}, Price={Price}, ExpiredAt={ExpiredAt}", symbol, cached.Price, cached.ExpiredAt);
|
||
return new MarketPriceResponse
|
||
{
|
||
Symbol = cached.Symbol,
|
||
Price = cached.Price,
|
||
PreviousClose = cached.PreviousClose ?? 0,
|
||
Timestamp = cached.FetchedAt,
|
||
AssetType = cached.AssetType
|
||
};
|
||
}
|
||
|
||
if (cached != null && cached.Price <= 0)
|
||
{
|
||
_logger.LogWarning("[缓存命中但价格无效] Symbol={Symbol}, Price={Price},忽略缓存重新获取", symbol, cached.Price);
|
||
}
|
||
else
|
||
{
|
||
_logger.LogWarning("[缓存未命中] Symbol={Symbol}, AssetType={AssetType},需要从数据源获取", symbol, assetType);
|
||
}
|
||
|
||
// 使用 GetOrAdd 模式防止并发重复请求
|
||
var priceTask = _pendingPriceRequests.GetOrAdd(cacheKey, _ => FetchPriceFromSourceAsync(symbol, assetType));
|
||
|
||
try
|
||
{
|
||
var result = await priceTask;
|
||
_logger.LogInformation("[价格获取成功] Symbol={Symbol}, Price={Price}", symbol, result.Price);
|
||
return result;
|
||
}
|
||
finally
|
||
{
|
||
// 请求完成后移除(无论成功失败)
|
||
_pendingPriceRequests.TryRemove(cacheKey, out _);
|
||
}
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
_logger.LogError(ex, "[价格获取异常] Symbol={Symbol}, AssetType={AssetType}", symbol, assetType);
|
||
throw;
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 从数据源获取价格(内部方法)
|
||
/// </summary>
|
||
private async Task<MarketPriceResponse> FetchPriceFromSourceAsync(string symbol, string assetType)
|
||
{
|
||
_logger.LogInformation("从数据源获取价格: {Symbol}, 资产类型: {AssetType}", symbol, assetType);
|
||
|
||
MarketPriceResponse response;
|
||
string source;
|
||
|
||
if (assetType.Equals("Crypto", StringComparison.OrdinalIgnoreCase))
|
||
{
|
||
response = await _okxService.GetCryptoPriceAsync(symbol);
|
||
source = "OKX";
|
||
}
|
||
else
|
||
{
|
||
// 股票:优先Yahoo财经,失败降级腾讯财经,最后降级 Tiingo
|
||
try
|
||
{
|
||
response = await _yahooService.GetStockPriceAsync(symbol);
|
||
source = "Yahoo";
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
_logger.LogWarning(ex, "Yahoo财经获取失败,降级使用 腾讯: {Symbol}", symbol);
|
||
try
|
||
{
|
||
response = await _tencentService.GetStockPriceAsync(symbol);
|
||
source = "Tencent";
|
||
}
|
||
catch (Exception tencentEx)
|
||
{
|
||
_logger.LogWarning(tencentEx, "腾讯财经获取失败,降级使用 Tiingo: {Symbol}", symbol);
|
||
response = await _tiingoService.GetStockPriceAsync(symbol);
|
||
source = "Tiingo";
|
||
}
|
||
}
|
||
}
|
||
|
||
// 验证价格有效性
|
||
if (response.Price <= 0)
|
||
{
|
||
_logger.LogError("[价格无效] Symbol={Symbol}, Price={Price}, Source={Source},不写入缓存", symbol, response.Price, source);
|
||
throw new InvalidOperationException($"获取到的价格无效: {symbol} = {response.Price}");
|
||
}
|
||
|
||
// 写入缓存
|
||
await SavePriceCacheAsync(symbol, assetType, response, source);
|
||
|
||
return response;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 获取历史数据(自动根据资产类型路由到对应数据源)
|
||
/// </summary>
|
||
public async Task<List<MarketDataResponse>> GetHistoricalDataAsync(string symbol, string assetType, string timeframe, int limit)
|
||
{
|
||
_logger.LogInformation("获取历史数据: {Symbol}, 资产类型: {AssetType}, 周期: {Timeframe}, 数量: {Limit}",
|
||
symbol, assetType, timeframe, limit);
|
||
|
||
// 先查缓存
|
||
var cachedKlines = await _marketDataRepo.GetKlineCacheAsync(symbol, assetType, timeframe, limit);
|
||
|
||
// 缓存足够,直接返回
|
||
if (cachedKlines.Count >= limit)
|
||
{
|
||
_logger.LogDebug("历史K线缓存命中: {Symbol} {AssetType} {Timeframe}, 数量: {Count}", symbol, assetType, timeframe, cachedKlines.Count);
|
||
return cachedKlines.Select(k => new MarketDataResponse
|
||
{
|
||
Symbol = k.Symbol,
|
||
Open = k.Open,
|
||
High = k.High,
|
||
Low = k.Low,
|
||
Close = k.Close,
|
||
Volume = k.Volume ?? 0,
|
||
Timestamp = k.Timestamp,
|
||
AssetType = k.AssetType
|
||
}).OrderBy(k => k.Timestamp).ToList();
|
||
}
|
||
|
||
// 缓存不足,调用API补全
|
||
List<MarketDataResponse> response;
|
||
string source;
|
||
|
||
if (assetType.Equals("Crypto", StringComparison.OrdinalIgnoreCase))
|
||
{
|
||
response = await _okxService.GetCryptoHistoricalDataAsync(symbol, timeframe, limit);
|
||
source = "OKX";
|
||
}
|
||
else
|
||
{
|
||
// 股票:优先Yahoo财经,失败降级 Tiingo
|
||
// 注:腾讯财经历史K线接口已废弃,不再作为降级数据源
|
||
try
|
||
{
|
||
response = await _yahooService.GetStockHistoricalDataAsync(symbol, timeframe, limit);
|
||
if (response.Count > 0)
|
||
{
|
||
source = "Yahoo";
|
||
}
|
||
else
|
||
{
|
||
throw new InvalidOperationException("Yahoo财经返回空数据");
|
||
}
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
_logger.LogWarning(ex, "Yahoo财经历史数据获取失败,降级使用 Tiingo: {Symbol}", symbol);
|
||
response = await _tiingoService.GetStockHistoricalDataAsync(symbol, timeframe, limit);
|
||
source = "Tiingo";
|
||
}
|
||
}
|
||
|
||
// 批量写入缓存
|
||
await SaveKlineCacheAsync(symbol, assetType, timeframe, response, source);
|
||
|
||
return response;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 获取股票实时价格
|
||
/// </summary>
|
||
public async Task<MarketPriceResponse> GetStockPriceAsync(string symbol)
|
||
{
|
||
return await GetPriceAsync(symbol, "Stock");
|
||
}
|
||
|
||
/// <summary>
|
||
/// 获取加密货币实时价格
|
||
/// </summary>
|
||
public async Task<MarketPriceResponse> GetCryptoPriceAsync(string symbol)
|
||
{
|
||
return await GetPriceAsync(symbol, "Crypto");
|
||
}
|
||
|
||
/// <summary>
|
||
/// 获取股票历史数据
|
||
/// </summary>
|
||
public async Task<List<MarketDataResponse>> GetStockHistoricalDataAsync(string symbol, string timeframe, int limit)
|
||
{
|
||
return await GetHistoricalDataAsync(symbol, "Stock", timeframe, limit);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 获取加密货币历史数据
|
||
/// </summary>
|
||
public async Task<List<MarketDataResponse>> GetCryptoHistoricalDataAsync(string symbol, string timeframe, int limit)
|
||
{
|
||
return await GetHistoricalDataAsync(symbol, "Crypto", timeframe, limit);
|
||
}
|
||
|
||
// ===== 私有辅助方法 =====
|
||
|
||
private async Task SavePriceCacheAsync(string symbol, string assetType, MarketPriceResponse response, string source)
|
||
{
|
||
var cacheEntity = new MarketPriceCache
|
||
{
|
||
Symbol = symbol.ToUpper(),
|
||
AssetType = assetType.ToUpper(),
|
||
Price = Math.Round(response.Price, 8),
|
||
PreviousClose = response.PreviousClose > 0 ? Math.Round(response.PreviousClose, 8) : null,
|
||
Source = source,
|
||
FetchedAt = DateTime.Now,
|
||
ExpiredAt = GetCacheExpirationTime(assetType)
|
||
};
|
||
|
||
await _marketDataRepo.SavePriceCacheAsync(cacheEntity);
|
||
_logger.LogDebug("缓存写入: {Symbol} {AssetType}, 过期时间: {ExpiredAt}", symbol, assetType, cacheEntity.ExpiredAt);
|
||
}
|
||
|
||
private async Task SaveKlineCacheAsync(string symbol, string assetType, string timeframe, List<MarketDataResponse> data, string source)
|
||
{
|
||
var cacheEntities = data.Select(k => new MarketKlineCache
|
||
{
|
||
Symbol = symbol.ToUpper(),
|
||
AssetType = assetType.ToUpper(),
|
||
Timeframe = timeframe.ToUpper(),
|
||
Timestamp = k.Timestamp,
|
||
Open = k.Open,
|
||
High = k.High,
|
||
Low = k.Low,
|
||
Close = k.Close,
|
||
Volume = k.Volume,
|
||
Source = source,
|
||
FetchedAt = DateTime.Now
|
||
}).ToList();
|
||
|
||
await _marketDataRepo.SaveKlineCacheBatchAsync(cacheEntities);
|
||
_logger.LogDebug("历史K线缓存写入: {Symbol} {AssetType} {Timeframe}, 数量: {Count}", symbol, assetType, timeframe, cacheEntities.Count);
|
||
}
|
||
|
||
private DateTime GetCacheExpirationTime(string assetType)
|
||
{
|
||
return assetType.ToLower() switch
|
||
{
|
||
"crypto" => DateTime.Now.AddMinutes(1),
|
||
_ => DateTime.Now.AddMinutes(5)
|
||
};
|
||
}
|
||
}
|