AssetManager.API/AssetManager.Infrastructure/Services/MarketDataService.cs
OpenClaw Agent 39808c6d5d refactor: 调整实时价格数据源优先级
调整:
- 实时价格:腾讯 → Yahoo → Tiingo
- 历史K线:Yahoo → Tiingo(不变)

原因:
- Yahoo 经常 429 限流
- 腾讯财经对国内用户更稳定
- 腾讯支持美股 ETF(如 UPRO、TMF)
2026-03-24 10:57:24 +00:00

375 lines
15 KiB
C#
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using AssetManager.Data;
using AssetManager.Data.Repositories;
using AssetManager.Models.DTOs;
using Microsoft.Extensions.Logging;
using System.Collections.Concurrent;
namespace AssetManager.Infrastructure.Services;
/// <summary>
/// 市场数据服务实现(组合模式)
/// <para>数据源优先级:</para>
/// <para>- 实时价格:腾讯 → Yahoo → Tiingo腾讯国内稳定Yahoo 常限流)</para>
/// <para>- 历史K线Yahoo → Tiingo腾讯历史接口已废弃</para>
/// <para>- 加密货币OKX</para>
/// </summary>
public class MarketDataService : IMarketDataService
{
private readonly ILogger<MarketDataService> _logger;
private readonly ITencentMarketService _tencentService;
private readonly IYahooMarketService _yahooService;
private readonly ITiingoMarketService _tiingoService;
private readonly IOkxMarketService _okxService;
private readonly IMarketDataRepository _marketDataRepo;
// 静态内存缓存层(跨请求共享,避免并发数据库查询导致连接池冲突)
private static readonly ConcurrentDictionary<string, (MarketPriceCache cache, DateTime expireAt)> _memoryCache = new();
// 静态 pending 请求字典(跨请求共享,防止并发请求同一股票)
private static readonly ConcurrentDictionary<string, Lazy<Task<MarketPriceResponse>>> _pendingPriceRequests = new();
// 静态写入锁(串行化数据库写入)
private static readonly SemaphoreSlim _writeLock = new(1, 1);
public MarketDataService(
ILogger<MarketDataService> logger,
ITencentMarketService tencentService,
IYahooMarketService yahooService,
ITiingoMarketService tiingoService,
IOkxMarketService okxService,
IMarketDataRepository marketDataRepo)
{
_logger = logger;
_tencentService = tencentService;
_yahooService = yahooService;
_tiingoService = tiingoService;
_okxService = okxService;
_marketDataRepo = marketDataRepo;
}
/// <summary>
/// 获取实时价格(自动根据资产类型路由到对应数据源)
/// 使用静态缓存 + Lazy 确保跨请求并发安全
/// </summary>
public async Task<MarketPriceResponse> GetPriceAsync(string symbol, string assetType)
{
var cacheKey = $"{symbol.ToUpper()}_{assetType.ToUpper()}";
_logger.LogInformation("[价格查询开始] Symbol={Symbol}, AssetType={AssetType}, CacheKey={CacheKey}", symbol, assetType, cacheKey);
try
{
// 第一步:查静态内存缓存(快速返回)
if (_memoryCache.TryGetValue(cacheKey, out var memoryCached) && memoryCached.expireAt > DateTime.Now)
{
_logger.LogInformation("[内存缓存命中] Symbol={Symbol}, Price={Price}", symbol, memoryCached.cache.Price);
return new MarketPriceResponse
{
Symbol = memoryCached.cache.Symbol,
Price = memoryCached.cache.Price,
PreviousClose = memoryCached.cache.PreviousClose ?? 0,
Timestamp = memoryCached.cache.FetchedAt,
AssetType = memoryCached.cache.AssetType
};
}
// 第二步:创建 Lazy 并尝试添加到字典
// 先创建 Lazy再 GetOrAdd确保所有线程使用同一个 Lazy
var lazy = new Lazy<Task<MarketPriceResponse>>(() => GetPriceInternalAsync(symbol, assetType, cacheKey));
var actualLazy = _pendingPriceRequests.GetOrAdd(cacheKey, lazy);
try
{
var result = await actualLazy.Value;
_logger.LogInformation("[价格获取成功] Symbol={Symbol}, Price={Price}", symbol, result.Price);
return result;
}
finally
{
// 请求完成后移除
_pendingPriceRequests.TryRemove(cacheKey, out _);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "[价格获取异常] Symbol={Symbol}, AssetType={AssetType}", symbol, assetType);
throw;
}
}
/// <summary>
/// 内部获取价格方法(查数据库缓存 → 获取价格)
/// </summary>
private async Task<MarketPriceResponse> GetPriceInternalAsync(string symbol, string assetType, string cacheKey)
{
_logger.LogInformation("[内部查询] Symbol={Symbol}, 查数据库缓存", symbol);
// 查数据库缓存
var cached = await _marketDataRepo.GetPriceCacheAsync(symbol, assetType);
if (cached != null && cached.Price > 0)
{
_logger.LogInformation("[数据库缓存命中] Symbol={Symbol}, Price={Price}, ExpiredAt={ExpiredAt}", symbol, cached.Price, cached.ExpiredAt);
// 写入内存缓存
_memoryCache[cacheKey] = (cached, cached.ExpiredAt);
return new MarketPriceResponse
{
Symbol = cached.Symbol,
Price = cached.Price,
PreviousClose = cached.PreviousClose ?? 0,
Timestamp = cached.FetchedAt,
AssetType = cached.AssetType
};
}
if (cached != null && cached.Price <= 0)
{
_logger.LogWarning("[数据库缓存价格无效] Symbol={Symbol}, Price={Price}", symbol, cached.Price);
}
else
{
_logger.LogWarning("[数据库缓存未命中] Symbol={Symbol}", symbol);
}
// 从数据源获取
return await FetchPriceFromSourceAsync(symbol, assetType);
}
/// <summary>
/// 从数据源获取价格(内部方法)
/// </summary>
private async Task<MarketPriceResponse> FetchPriceFromSourceAsync(string symbol, string assetType)
{
_logger.LogInformation("[数据源获取开始] Symbol={Symbol}, AssetType={AssetType}", symbol, assetType);
MarketPriceResponse response;
string source;
if (assetType.Equals("Crypto", StringComparison.OrdinalIgnoreCase))
{
_logger.LogInformation("[数据源] 使用 OKX 获取 {Symbol}", symbol);
response = await _okxService.GetCryptoPriceAsync(symbol);
source = "OKX";
}
else
{
// 股票实时价格:优先腾讯财经(国内稳定),失败降级 Yahoo最后 Tiingo
try
{
_logger.LogInformation("[数据源] 尝试腾讯获取 {Symbol}", symbol);
response = await _tencentService.GetStockPriceAsync(symbol);
source = "Tencent";
_logger.LogInformation("[数据源] 腾讯成功: {Symbol} = {Price}", symbol, response.Price);
}
catch (Exception ex)
{
_logger.LogWarning("[数据源] 腾讯失败: {Symbol}, 错误: {Error}", symbol, ex.Message);
try
{
_logger.LogInformation("[数据源] 降级使用 Yahoo 获取 {Symbol}", symbol);
response = await _yahooService.GetStockPriceAsync(symbol);
source = "Yahoo";
_logger.LogInformation("[数据源] Yahoo 成功: {Symbol} = {Price}", symbol, response.Price);
}
catch (Exception yahooEx)
{
_logger.LogWarning("[数据源] Yahoo 失败: {Symbol}, 错误: {Error}", symbol, yahooEx.Message);
_logger.LogInformation("[数据源] 最后降级使用 Tiingo 获取 {Symbol}", symbol);
response = await _tiingoService.GetStockPriceAsync(symbol);
source = "Tiingo";
_logger.LogInformation("[数据源] Tiingo 成功: {Symbol} = {Price}", symbol, response.Price);
}
}
}
// 验证价格有效性
if (response.Price <= 0)
{
_logger.LogError("[价格无效] Symbol={Symbol}, Price={Price}, Source={Source},不写入缓存", symbol, response.Price, source);
throw new InvalidOperationException($"获取到的价格无效: {symbol} = {response.Price}");
}
// 先写入内存缓存(确保返回给调用者)
var cacheKey = $"{symbol.ToUpper()}_{assetType.ToUpper()}";
var expireAt = DateTime.Now.AddMinutes(5);
_memoryCache[cacheKey] = (new MarketPriceCache
{
Symbol = symbol.ToUpper(),
AssetType = assetType.ToUpper(),
Price = response.Price,
PreviousClose = response.PreviousClose,
FetchedAt = DateTime.Now,
ExpiredAt = expireAt
}, expireAt);
// 写入数据库缓存(串行化,避免连接冲突)
_ = Task.Run(async () =>
{
await _writeLock.WaitAsync();
try
{
await SavePriceCacheAsync(symbol, assetType, response, source);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "[数据库缓存写入失败] Symbol={Symbol},忽略(内存缓存已生效)", symbol);
}
finally
{
_writeLock.Release();
}
});
return response;
}
/// <summary>
/// 获取历史数据(自动根据资产类型路由到对应数据源)
/// </summary>
public async Task<List<MarketDataResponse>> GetHistoricalDataAsync(string symbol, string assetType, string timeframe, int limit)
{
_logger.LogInformation("获取历史数据: {Symbol}, 资产类型: {AssetType}, 周期: {Timeframe}, 数量: {Limit}",
symbol, assetType, timeframe, limit);
// 先查缓存
var cachedKlines = await _marketDataRepo.GetKlineCacheAsync(symbol, assetType, timeframe, limit);
// 缓存足够,直接返回
if (cachedKlines.Count >= limit)
{
_logger.LogDebug("历史K线缓存命中: {Symbol} {AssetType} {Timeframe}, 数量: {Count}", symbol, assetType, timeframe, cachedKlines.Count);
return cachedKlines.Select(k => new MarketDataResponse
{
Symbol = k.Symbol,
Open = k.Open,
High = k.High,
Low = k.Low,
Close = k.Close,
Volume = k.Volume ?? 0,
Timestamp = k.Timestamp,
AssetType = k.AssetType
}).OrderBy(k => k.Timestamp).ToList();
}
// 缓存不足调用API补全
List<MarketDataResponse> response;
string source;
if (assetType.Equals("Crypto", StringComparison.OrdinalIgnoreCase))
{
response = await _okxService.GetCryptoHistoricalDataAsync(symbol, timeframe, limit);
source = "OKX";
}
else
{
// 股票优先Yahoo财经失败降级 Tiingo
// 注腾讯财经历史K线接口已废弃不再作为降级数据源
try
{
response = await _yahooService.GetStockHistoricalDataAsync(symbol, timeframe, limit);
if (response.Count > 0)
{
source = "Yahoo";
}
else
{
throw new InvalidOperationException("Yahoo财经返回空数据");
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Yahoo财经历史数据获取失败降级使用 Tiingo: {Symbol}", symbol);
response = await _tiingoService.GetStockHistoricalDataAsync(symbol, timeframe, limit);
source = "Tiingo";
}
}
// 批量写入缓存
await SaveKlineCacheAsync(symbol, assetType, timeframe, response, source);
return response;
}
/// <summary>
/// 获取股票实时价格
/// </summary>
public async Task<MarketPriceResponse> GetStockPriceAsync(string symbol)
{
return await GetPriceAsync(symbol, "Stock");
}
/// <summary>
/// 获取加密货币实时价格
/// </summary>
public async Task<MarketPriceResponse> GetCryptoPriceAsync(string symbol)
{
return await GetPriceAsync(symbol, "Crypto");
}
/// <summary>
/// 获取股票历史数据
/// </summary>
public async Task<List<MarketDataResponse>> GetStockHistoricalDataAsync(string symbol, string timeframe, int limit)
{
return await GetHistoricalDataAsync(symbol, "Stock", timeframe, limit);
}
/// <summary>
/// 获取加密货币历史数据
/// </summary>
public async Task<List<MarketDataResponse>> GetCryptoHistoricalDataAsync(string symbol, string timeframe, int limit)
{
return await GetHistoricalDataAsync(symbol, "Crypto", timeframe, limit);
}
// ===== 私有辅助方法 =====
private async Task SavePriceCacheAsync(string symbol, string assetType, MarketPriceResponse response, string source)
{
var cacheEntity = new MarketPriceCache
{
Symbol = symbol.ToUpper(),
AssetType = assetType.ToUpper(),
Price = Math.Round(response.Price, 8),
PreviousClose = response.PreviousClose > 0 ? Math.Round(response.PreviousClose, 8) : null,
Source = source,
FetchedAt = DateTime.Now,
ExpiredAt = GetCacheExpirationTime(assetType)
};
await _marketDataRepo.SavePriceCacheAsync(cacheEntity);
_logger.LogDebug("缓存写入: {Symbol} {AssetType}, 过期时间: {ExpiredAt}", symbol, assetType, cacheEntity.ExpiredAt);
}
private async Task SaveKlineCacheAsync(string symbol, string assetType, string timeframe, List<MarketDataResponse> data, string source)
{
var cacheEntities = data.Select(k => new MarketKlineCache
{
Symbol = symbol.ToUpper(),
AssetType = assetType.ToUpper(),
Timeframe = timeframe.ToUpper(),
Timestamp = k.Timestamp,
Open = k.Open,
High = k.High,
Low = k.Low,
Close = k.Close,
Volume = k.Volume,
Source = source,
FetchedAt = DateTime.Now
}).ToList();
await _marketDataRepo.SaveKlineCacheBatchAsync(cacheEntities);
_logger.LogDebug("历史K线缓存写入: {Symbol} {AssetType} {Timeframe}, 数量: {Count}", symbol, assetType, timeframe, cacheEntities.Count);
}
private DateTime GetCacheExpirationTime(string assetType)
{
return assetType.ToLower() switch
{
"crypto" => DateTime.Now.AddMinutes(1),
_ => DateTime.Now.AddMinutes(5)
};
}
}