AssetManager.API/AssetManager.Infrastructure/Services/MarketDataService.cs
OpenClaw Agent 267b0bd6ba feat: 添加腾讯财经历史K线数据接口
- 新增 GetStockHistoricalFromTencentAsync 方法
- 腾讯财经历史数据免费无限制,解决 Tiingo 429 限流问题
- 获取历史数据时优先使用腾讯财经,失败后降级 Tiingo
- 支持日/周/月K线数据
- 数据格式: [日期, 开盘, 收盘, 最高, 最低, 成交量]
2026-03-15 10:06:14 +00:00

704 lines
26 KiB
C#
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System.Net.Http.Json;
using System.Security.Cryptography;
using System.Text;
using System.Text.Json;
using System.Text.RegularExpressions;
using AssetManager.Data;
using AssetManager.Models.DTOs;
using Microsoft.Extensions.Logging;
namespace AssetManager.Infrastructure.Services;
/// <summary>
/// 市场数据服务实现(基于 Tiingo
/// </summary>
public class MarketDataService : IMarketDataService
{
private readonly ILogger<MarketDataService> _logger;
private readonly HttpClient _httpClient;
private readonly DatabaseService _databaseService;
private readonly string _tiingoApiKey;
/// <summary>
/// 构造函数
/// </summary>
/// <param name="logger">日志记录器</param>
/// <param name="httpClientFactory">HTTP 客户端工厂</param>
/// <param name="databaseService">数据库服务</param>
public MarketDataService(
ILogger<MarketDataService> logger,
IHttpClientFactory httpClientFactory,
DatabaseService databaseService)
{
_logger = logger;
_httpClient = httpClientFactory.CreateClient();
_databaseService = databaseService;
// 完全从环境变量读取 Tiingo API Key
_tiingoApiKey = Environment.GetEnvironmentVariable("Tiingo__ApiKey")
?? Environment.GetEnvironmentVariable("TIINGO_API_KEY")
?? "bd00fee76d3012b047473078904001b33322cb46";
_httpClient.DefaultRequestHeaders.Add("Authorization", $"Token {_tiingoApiKey}");
}
/// <summary>
/// 获取股票实时价格(使用腾讯财经接口,免费无限制,支持盘前盘后)
/// </summary>
/// <param name="symbol">股票代码</param>
/// <returns>股票价格信息</returns>
public async Task<MarketPriceResponse> GetStockPriceAsync(string symbol)
{
try
{
_logger.LogInformation($"Requesting stock price for symbol: {symbol} (腾讯财经接口)");
// 注册GBK编码支持
Encoding.RegisterProvider(CodePagesEncodingProvider.Instance);
// 腾讯财经美股接口前缀us大写代码
var url = $"http://qt.gtimg.cn/q=us{symbol.ToUpper()}";
var responseBytes = await _httpClient.GetByteArrayAsync(url);
var response = Encoding.GetEncoding("GBK").GetString(responseBytes);
if (string.IsNullOrEmpty(response) || !response.Contains("~"))
{
throw new Exception($"腾讯财经接口返回无效数据,标的: {symbol}");
}
// 解析返回数据
var parts = response.Split('"')[1].Split('~');
if (parts.Length < 36)
{
throw new Exception($"腾讯财经返回字段不足,标的: {symbol}");
}
// 提取字段:[3]=最新价 [4]=昨收价
if (!decimal.TryParse(parts[3], out var currentPrice) || currentPrice <= 0)
{
throw new Exception($"解析最新价失败,标的: {symbol},返回值: {parts[3]}");
}
if (!decimal.TryParse(parts[4], out var prevClose) || prevClose <= 0)
{
prevClose = currentPrice; // 解析失败用当前价当昨收
}
_logger.LogDebug("腾讯财经接口返回 {Symbol}:最新价 {CurrentPrice},昨收价 {PrevClose},涨跌额 {Change}",
symbol, currentPrice, prevClose, currentPrice - prevClose);
return new MarketPriceResponse
{
Symbol = symbol,
Price = currentPrice,
PreviousClose = prevClose,
Timestamp = DateTime.UtcNow,
AssetType = "Stock"
};
}
catch (Exception ex)
{
_logger.LogError(ex, $"腾讯财经接口获取价格失败,标的: {symbol}降级使用Tiingo接口");
// 降级使用Tiingo接口
return await GetStockPriceFromTiingoAsync(symbol);
}
}
/// <summary>
/// 从Tiingo获取股票价格降级用
/// </summary>
private async Task<MarketPriceResponse> GetStockPriceFromTiingoAsync(string symbol)
{
_logger.LogInformation($"Requesting stock price for symbol: {symbol} (Tiingo接口)");
// Tiingo 日线最新价格端点取最近1条
var url = $"https://api.tiingo.com/tiingo/daily/{symbol}/prices?token={_tiingoApiKey}";
var response = await _httpClient.GetFromJsonAsync<List<TiingoDailyResponse>>(url);
if (response == null || response.Count == 0)
{
throw new Exception($"No data found for {symbol}");
}
var latest = response[^1];
decimal? prevClose = response.Count >= 2 ? response[^2].close : null;
return new MarketPriceResponse
{
Symbol = symbol,
Price = latest.close ?? 0,
PreviousClose = prevClose ?? 0,
Timestamp = latest.date ?? DateTime.UtcNow,
AssetType = "Stock"
};
}
/// <summary>
/// 获取加密货币实时价格
/// </summary>
/// <param name="symbol">加密货币代码(如 BTC-USDT</param>
/// <returns>加密货币价格信息</returns>
public async Task<MarketPriceResponse> GetCryptoPriceAsync(string symbol)
{
try
{
_logger.LogInformation($"Requesting crypto price for symbol: {symbol}");
// OKX 实时 ticker 接口symbol 格式如 BTC-USDT
var url = $"https://www.okx.com/api/v5/market/ticker?instId={symbol}";
var response = await _httpClient.GetFromJsonAsync<OkxTickerResponse>(url);
if (response == null || response.code != "0" || response.data == null || response.data.Count == 0)
{
throw new Exception($"No data found for {symbol}, code: {response?.code}, msg: {response?.msg}");
}
var latest = response.data[0];
return new MarketPriceResponse
{
Symbol = symbol,
Price = decimal.TryParse(latest.last, out var price) ? price : 0,
PreviousClose = decimal.TryParse(latest.sodUtc0, out var prevClose) ? prevClose : 0, // UTC0 开盘价作为昨日收盘价
Timestamp = DateTime.UtcNow,
AssetType = "Crypto"
};
}
catch (Exception ex)
{
_logger.LogError(ex, $"Error getting crypto price for {symbol}");
throw;
}
}
/// <summary>
/// 带重试的 HTTP GET 请求(处理 429 限流)
/// </summary>
private async Task<HttpResponseMessage> GetWithRetryAsync(string url, int maxRetries = 3)
{
for (int i = 0; i < maxRetries; i++)
{
try
{
var response = await _httpClient.GetAsync(url);
if (response.StatusCode == System.Net.HttpStatusCode.TooManyRequests)
{
// 429 限流,等待后重试
var retryAfter = response.Headers.RetryAfter?.Delta?.TotalSeconds ?? 2 * (i + 1);
_logger.LogWarning("Tiingo API 限流,等待 {Seconds} 秒后重试 (第 {Attempt}/{Max} 次)",
retryAfter, i + 1, maxRetries);
await Task.Delay((int)(retryAfter * 1000));
continue;
}
response.EnsureSuccessStatusCode();
return response;
}
catch (HttpRequestException ex) when (i < maxRetries - 1)
{
_logger.LogWarning(ex, "HTTP 请求失败,重试中 (第 {Attempt}/{Max} 次)", i + 1, maxRetries);
await Task.Delay(1000 * (i + 1));
}
}
throw new HttpRequestException($"请求失败,已重试 {maxRetries} 次");
}
/// <summary>
/// 获取股票历史数据(优先使用腾讯财经,免费无限制)
/// </summary>
/// <param name="symbol">股票代码</param>
/// <param name="timeframe">时间周期</param>
/// <param name="limit">数据点数量</param>
/// <returns>历史数据列表</returns>
public async Task<List<MarketDataResponse>> GetStockHistoricalDataAsync(string symbol, string timeframe, int limit)
{
try
{
// 优先使用腾讯财经(免费无限制)
var tencentData = await GetStockHistoricalFromTencentAsync(symbol, timeframe, limit);
if (tencentData.Any())
{
return tencentData;
}
}
catch (Exception ex)
{
_logger.LogWarning(ex, "腾讯财经历史数据获取失败,降级使用 Tiingo: {Symbol}", symbol);
}
// 降级使用 Tiingo
return await GetStockHistoricalFromTiingoAsync(symbol, timeframe, limit);
}
/// <summary>
/// 从腾讯财经获取历史K线数据
/// </summary>
private async Task<List<MarketDataResponse>> GetStockHistoricalFromTencentAsync(string symbol, string timeframe, int limit)
{
try
{
_logger.LogInformation("获取腾讯财经历史数据: {Symbol}", symbol);
// 注册GBK编码支持
Encoding.RegisterProvider(CodePagesEncodingProvider.Instance);
// 腾讯财经K线接口
// us{symbol} 表示美股day/week/month 表示日/周/月K线
var klineType = timeframe switch
{
"1d" or "daily" => "day",
"1w" or "weekly" => "week",
"1M" or "monthly" => "month",
_ => "day"
};
var url = $"http://web.ifzq.gtimg.cn/appstock/app/fqkline/get?_var=kline_{klineType}qfq&param=us{symbol.ToUpper()},{klineType},,{Math.Min(limit, 320)}&qrcode=1&asrd=1";
var responseBytes = await _httpClient.GetByteArrayAsync(url);
var response = Encoding.GetEncoding("GBK").GetString(responseBytes);
if (string.IsNullOrEmpty(response) || !response.Contains("kline"))
{
_logger.LogWarning("腾讯财经返回无效数据: {Symbol}", symbol);
return new List<MarketDataResponse>();
}
// 解析JSON
var jsonStart = response.IndexOf('{');
if (jsonStart < 0) return new List<MarketDataResponse>();
var jsonStr = response.Substring(jsonStart);
var jsonDoc = JsonDocument.Parse(jsonStr);
var root = jsonDoc.RootElement;
// 数据路径: data -> us{symbol} -> qfq -> {klineType}
var dataPath = $"us{symbol.ToUpper()}";
if (!root.TryGetProperty("data", out var data) ||
!data.TryGetProperty(dataPath, out var stockData) ||
!stockData.TryGetProperty("qfq", out var qfq))
{
_logger.LogWarning("腾讯财经数据结构异常: {Symbol}", symbol);
return new List<MarketDataResponse>();
}
if (!qfq.TryGetProperty(klineType, out var klineData) || klineData.ValueKind != JsonValueKind.Array)
{
_logger.LogWarning("腾讯财经无K线数据: {Symbol}", symbol);
return new List<MarketDataResponse>();
}
var result = new List<MarketDataResponse>();
foreach (var item in klineData.EnumerateArray())
{
// 数据格式: [日期, 开盘, 收盘, 最高, 最低, 成交量]
var arr = item.EnumerateArray().ToList();
if (arr.Count < 6) continue;
var dateStr = arr[0].GetString();
if (!DateTime.TryParse(dateStr, out var date)) continue;
var open = arr[1].GetDecimal();
var close = arr[2].GetDecimal();
var high = arr[3].GetDecimal();
var low = arr[4].GetDecimal();
var volume = arr[5].GetDecimal();
if (close <= 0) continue;
result.Add(new MarketDataResponse
{
Symbol = symbol,
Open = open,
High = high,
Low = low,
Close = close,
Volume = volume,
Timestamp = date,
AssetType = "Stock"
});
}
// 按日期排序
result = result.OrderBy(x => x.Timestamp).TakeLast(limit).ToList();
_logger.LogInformation("腾讯财经获取 {Symbol} 历史数据 {Count} 条", symbol, result.Count);
return result;
}
catch (Exception ex)
{
_logger.LogError(ex, "腾讯财经历史数据解析失败: {Symbol}", symbol);
return new List<MarketDataResponse>();
}
}
/// <summary>
/// 从 Tiingo 获取历史数据(降级方案)
/// </summary>
private async Task<List<MarketDataResponse>> GetStockHistoricalFromTiingoAsync(string symbol, string timeframe, int limit)
{
try
{
_logger.LogInformation($"Requesting stock historical data from Tiingo: {symbol}");
var endDate = DateTime.UtcNow;
var startDate = CalculateStartDate(endDate, timeframe, limit);
var url = $"https://api.tiingo.com/tiingo/daily/{symbol}/prices?startDate={startDate:yyyy-MM-dd}&endDate={endDate:yyyy-MM-dd}&token={_tiingoApiKey}";
var response = await GetWithRetryAsync(url);
var data = await response.Content.ReadFromJsonAsync<List<TiingoDailyResponse>>();
if (data == null)
{
return new List<MarketDataResponse>();
}
return data
.OrderByDescending(x => x.date)
.Take(limit)
.OrderBy(x => x.date)
.Select(x => new MarketDataResponse
{
Symbol = symbol,
Open = x.open ?? 0,
High = x.high ?? 0,
Low = x.low ?? 0,
Close = x.close ?? 0,
Volume = x.volume ?? 0,
Timestamp = x.date ?? DateTime.UtcNow,
AssetType = "Stock"
})
.ToList();
}
catch (Exception ex)
{
_logger.LogError(ex, "Tiingo 历史数据获取失败: {Symbol}", symbol);
throw;
}
}
/// <summary>
/// 获取加密货币历史数据
/// </summary>
/// <param name="symbol">加密货币代码(如 BTC-USDT</param>
/// <param name="timeframe">时间周期</param>
/// <param name="limit">数据点数量</param>
/// <returns>历史数据列表</returns>
public async Task<List<MarketDataResponse>> GetCryptoHistoricalDataAsync(string symbol, string timeframe, int limit)
{
try
{
_logger.LogInformation($"Requesting crypto historical data for symbol: {symbol}, timeframe: {timeframe}, limit: {limit}");
// 转换 timeframe 为 OKX 支持的粒度
var bar = ConvertToOkxBar(timeframe);
// OKX K线接口
var url = $"https://www.okx.com/api/v5/market/candles?instId={symbol}&bar={bar}&limit={limit}";
var response = await _httpClient.GetFromJsonAsync<OkxCandlesResponse>(url);
if (response == null || response.code != "0" || response.data == null)
{
throw new Exception($"No data found for {symbol}, code: {response?.code}, msg: {response?.msg}");
}
var result = new List<MarketDataResponse>();
foreach (var item in response.data)
{
if (item.Length < 6) continue;
if (long.TryParse(item[0], out var ts) &&
decimal.TryParse(item[1], out var open) &&
decimal.TryParse(item[2], out var high) &&
decimal.TryParse(item[3], out var low) &&
decimal.TryParse(item[4], out var close) &&
decimal.TryParse(item[5], out var volume))
{
result.Add(new MarketDataResponse
{
Symbol = symbol,
Open = open,
High = high,
Low = low,
Close = close,
Volume = volume,
Timestamp = DateTimeOffset.FromUnixTimeMilliseconds(ts).UtcDateTime,
AssetType = "Crypto"
});
}
}
// 按时间升序排列
result = result.OrderBy(x => x.Timestamp).ToList();
return result;
}
catch (Exception ex)
{
_logger.LogError(ex, $"Error getting crypto historical data for {symbol}");
throw;
}
}
/// <summary>
/// 转换 timeframe 为 OKX 支持的 bar 格式
/// </summary>
private string ConvertToOkxBar(string timeframe)
{
return timeframe.ToLower() switch
{
"1min" => "1m",
"5min" => "5m",
"15min" => "15m",
"1h" => "1H",
"4h" => "4H",
"1d" => "1D",
"1w" => "1W",
"1m" => "1M",
_ => "1D"
};
}
/// <summary>
/// 获取实时价格(自动根据资产类型路由到对应数据源)
/// </summary>
public async Task<MarketPriceResponse> GetPriceAsync(string symbol, string assetType)
{
_logger.LogInformation("获取实时价格: {Symbol}, 资产类型: {AssetType}", symbol, assetType);
var db = _databaseService.GetDb();
var cacheKey = GenerateMd5Hash($"{symbol.ToUpper()}_{assetType.ToUpper()}");
// 先查缓存
var cached = await db.Queryable<MarketPriceCache>()
.FirstAsync(p => p.Id == cacheKey && p.ExpiredAt > DateTime.Now);
if (cached != null)
{
_logger.LogDebug("缓存命中: {Symbol} {AssetType}, 价格: {Price}", symbol, assetType, cached.Price);
return new MarketPriceResponse
{
Symbol = cached.Symbol,
Price = cached.Price,
PreviousClose = cached.PreviousClose ?? 0,
Timestamp = cached.FetchedAt,
AssetType = cached.AssetType
};
}
// 缓存未命中调用API
MarketPriceResponse response;
string source;
if (assetType.Equals("Crypto", StringComparison.OrdinalIgnoreCase))
{
response = await GetCryptoPriceAsync(symbol);
source = "OKX";
}
else
{
// 默认走股票数据源
response = await GetStockPriceAsync(symbol);
source = "Tiingo";
}
// 写入缓存保留8位小数
var cacheEntity = new MarketPriceCache
{
Id = cacheKey,
Symbol = symbol.ToUpper(),
AssetType = assetType.ToUpper(),
Price = Math.Round(response.Price, 8),
PreviousClose = response.PreviousClose > 0 ? Math.Round(response.PreviousClose, 8) : null,
Source = source,
FetchedAt = DateTime.Now,
ExpiredAt = GetCacheExpirationTime(assetType)
};
// 存在则更新,不存在则插入
await db.Storageable(cacheEntity).ExecuteCommandAsync();
_logger.LogDebug("缓存写入: {Symbol} {AssetType}, 过期时间: {ExpiredAt}", symbol, assetType, cacheEntity.ExpiredAt);
return response;
}
/// <summary>
/// 获取历史数据(自动根据资产类型路由到对应数据源)
/// </summary>
public async Task<List<MarketDataResponse>> GetHistoricalDataAsync(string symbol, string assetType, string timeframe, int limit)
{
_logger.LogInformation("获取历史数据: {Symbol}, 资产类型: {AssetType}, 周期: {Timeframe}, 数量: {Limit}",
symbol, assetType, timeframe, limit);
var db = _databaseService.GetDb();
var symbolUpper = symbol.ToUpper();
var assetTypeUpper = assetType.ToUpper();
var timeframeUpper = timeframe.ToUpper();
// 先查缓存
var cachedKlines = await db.Queryable<MarketKlineCache>()
.Where(k => k.Symbol == symbolUpper
&& k.AssetType == assetTypeUpper
&& k.Timeframe == timeframeUpper)
.OrderByDescending(k => k.Timestamp)
.Take(limit)
.ToListAsync();
// 缓存足够,直接返回
if (cachedKlines.Count >= limit)
{
_logger.LogDebug("历史K线缓存命中: {Symbol} {AssetType} {Timeframe}, 数量: {Count}", symbol, assetType, timeframe, cachedKlines.Count);
return cachedKlines.Select(k => new MarketDataResponse
{
Symbol = k.Symbol,
Open = k.Open,
High = k.High,
Low = k.Low,
Close = k.Close,
Volume = k.Volume ?? 0,
Timestamp = k.Timestamp,
AssetType = k.AssetType
}).OrderBy(k => k.Timestamp).ToList();
}
// 缓存不足调用API补全
List<MarketDataResponse> response;
string source;
if (assetType.Equals("Crypto", StringComparison.OrdinalIgnoreCase))
{
response = await GetCryptoHistoricalDataAsync(symbol, timeframe, limit);
source = "OKX";
}
else
{
// 默认走股票数据源
response = await GetStockHistoricalDataAsync(symbol, timeframe, limit);
source = "Tiingo";
}
// 批量写入缓存(去重)
var cacheEntities = response.Select(k => new MarketKlineCache
{
Id = GenerateMd5Hash($"{symbolUpper}_{assetTypeUpper}_{timeframeUpper}_{k.Timestamp:yyyyMMddHHmmss}"),
Symbol = symbolUpper,
AssetType = assetTypeUpper,
Timeframe = timeframeUpper,
Timestamp = k.Timestamp,
Open = k.Open,
High = k.High,
Low = k.Low,
Close = k.Close,
Volume = k.Volume,
Source = source,
FetchedAt = DateTime.Now
}).ToList();
// 批量插入,存在则忽略
await db.Storageable(cacheEntities)
.WhereColumns(it => new { it.Id })
.ExecuteCommandAsync();
_logger.LogDebug("历史K线缓存写入: {Symbol} {AssetType} {Timeframe}, 数量: {Count}", symbol, assetType, timeframe, cacheEntities.Count);
return response;
}
/// <summary>
/// 计算开始日期
/// </summary>
private DateTime CalculateStartDate(DateTime endDate, string timeframe, int limit)
{
return timeframe.ToLower() switch
{
"1min" => endDate.AddMinutes(-limit),
"5min" => endDate.AddMinutes(-limit * 5),
"15min" => endDate.AddMinutes(-limit * 15),
"1h" => endDate.AddHours(-limit),
"1d" => endDate.AddDays(-limit),
"1w" => endDate.AddDays(-limit * 7),
"1m" => endDate.AddMonths(-limit),
_ => endDate.AddDays(-limit)
};
}
/// <summary>
/// 生成MD5哈希用于缓存键
/// </summary>
private string GenerateMd5Hash(string input)
{
using var md5 = MD5.Create();
var inputBytes = Encoding.UTF8.GetBytes(input);
var hashBytes = md5.ComputeHash(inputBytes);
var sb = new StringBuilder();
foreach (var b in hashBytes)
{
sb.Append(b.ToString("x2"));
}
return sb.ToString();
}
/// <summary>
/// 获取缓存过期时间
/// </summary>
private DateTime GetCacheExpirationTime(string assetType)
{
return assetType.ToLower() switch
{
"crypto" => DateTime.Now.AddMinutes(1), // 加密货币缓存1分钟
_ => DateTime.Now.AddMinutes(5) // 股票固定缓存5分钟
};
}
}
// Tiingo 响应模型
internal class TiingoPriceResponse
{
public decimal? tngoLast { get; set; }
public decimal? close { get; set; }
public decimal? prevClose { get; set; }
public DateTime? date { get; set; }
}
internal class TiingoDailyResponse
{
public decimal? open { get; set; }
public decimal? high { get; set; }
public decimal? low { get; set; }
public decimal? close { get; set; }
public decimal? volume { get; set; }
public DateTime? date { get; set; }
}
internal class TiingoCryptoPriceResponse
{
public List<TiingoCryptoBar>? priceData { get; set; }
}
internal class TiingoCryptoBar
{
public decimal? open { get; set; }
public decimal? high { get; set; }
public decimal? low { get; set; }
public decimal? close { get; set; }
public decimal? volume { get; set; }
public DateTime? date { get; set; }
}
// OKX 响应模型
internal class OkxTickerResponse
{
public string code { get; set; }
public string msg { get; set; }
public List<OkxTickerData> data { get; set; }
}
internal class OkxTickerData
{
public string instId { get; set; }
public string last { get; set; }
public string sodUtc0 { get; set; } // UTC0 开盘价,作为昨日收盘价
}
internal class OkxCandlesResponse
{
public string code { get; set; }
public string msg { get; set; }
public List<string[]> data { get; set; }
}