Day 1 — CCXT 入门 + fetch_ohlcv Demo
# day1_ccxt_demo.ipynb
import ccxt
import pandas as pd
from datetime import datetime
# 交易所(以 Binance 为例)
exchange = ccxt.binance()
# fetch 过去 500 根 1h K 线
symbol = 'BTC/USDT'
timeframe = '1h'
ohlcv = exchange.fetch_ohlcv(symbol, timeframe, limit=500)
# 转 DataFrame
df = pd.DataFrame(ohlcv, columns=['timestamp','open','high','low','close','volume'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('timestamp', inplace=True)
print(df.head())
📘 Day 2 — 写下载脚本 download_data.py
# download_data.py
import ccxt
import pandas as pd
from datetime import datetime
import argparse
def fetch_ohlcv(symbol="BTC/USDT", timeframe="1h", since=None, limit=1000):
exchange = ccxt.binance()
data = exchange.fetch_ohlcv(symbol, timeframe, since=since, limit=limit)
df = pd.DataFrame(
data, columns=['timestamp','open','high','low','close','volume']
)
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('timestamp', inplace=True)
return df
def save_data(df, path):
df.to_csv(path)
print(f"Saved to {path}")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--symbol", default="BTC/USDT")
parser.add_argument("--timeframe", default="1h")
parser.add_argument("--since", type=int, default=None)
parser.add_argument("--limit", type=int, default=1000)
parser.add_argument("--output", default="data.csv")
args = parser.parse_args()
df = fetch_ohlcv(args.symbol, args.timeframe, args.since, args.limit)
save_data(df, args.output)
📘 Day 3 — 数据增量更新逻辑
# day3_incremental_update.py
import ccxt
import pandas as pd
import os
exchange = ccxt.binance()
def load_existing(path):
if not os.path.exists(path):
return None
df = pd.read_csv(path, parse_dates=['timestamp'], index_col='timestamp')
return df
def fetch_new_data(symbol, timeframe, last_ts):
# last_ts 为 datetime,转成毫秒
since_ms = int(last_ts.timestamp() * 1000)
ohlcv = exchange.fetch_ohlcv(symbol, timeframe, since=since_ms, limit=1000)
df = pd.DataFrame(ohlcv, columns=['timestamp','open','high','low','close','volume'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('timestamp', inplace=True)
return df
path = "BTC_1h.csv"
old = load_existing(path)
if old is None:
print("No old data, download fresh.")
else:
last_ts = old.index[-1]
print("Last timestamp:", last_ts)
new = fetch_new_data("BTC/USDT", "1h", last_ts)
combined = pd.concat([old, new]).drop_duplicates()
combined.to_csv(path)
print("Done incremental update!")
📘 Day 4 — 数据校验(空值、重复、连续性)+ DataClient 类
# day4_data_client.py
import ccxt
import pandas as pd
class DataClient:
def __init__(self, exchange="binance"):
self.exchange = getattr(ccxt, exchange)()
# 下载数据
def fetch(self, symbol, timeframe, since=None, limit=1000):
ohlcv = self.exchange.fetch_ohlcv(symbol, timeframe, since=since, limit=limit)
df = pd.DataFrame(ohlcv, columns=['timestamp','open','high','low','close','volume'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('timestamp', inplace=True)
return df
# 数据清洗
def validate(self, df):
report = {}
# 空值检查
report["nulls"] = df.isnull().sum().to_dict()
# 重复行
report["duplicates"] = df.index.duplicated().sum()
df = df[~df.index.duplicated()]
# 连续性检查(固定 timeframe 才能严格检查)
df = df.sort_index()
diffs = df.index.to_series().diff().astype('timedelta64[m]')
report["gap_counts"] = (diffs > diffs.mode()[0]).sum()
return df, report
📘 Day 5 — 改成命令行工具 + 加日志
# download_cli.py
import argparse
import logging
from logging.handlers import RotatingFileHandler
from day4_data_client import DataClient
# 日志
handler = RotatingFileHandler("logs/data.log", maxBytes=2000000, backupCount=3)
logging.basicConfig(handlers=[handler], level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--symbol", default="BTC/USDT")
parser.add_argument("--timeframe", default="1h")
parser.add_argument("--limit", type=int, default=1000)
parser.add_argument("--output", default="data.csv")
args = parser.parse_args()
cli = DataClient()
logging.info(f"Downloading {args.symbol} {args.timeframe}")
df = cli.fetch(args.symbol, args.timeframe, None, args.limit)
df.to_csv(args.output)
logging.info(f"Saved to {args.output}")
📘 Day 6 — 按年月存储数据(分层目录)
# day6_partition_save.py
import pandas as pd
import os
def save_partitioned(df, base_dir="data/"):
df = df.sort_index()
for ts, row in df.iterrows():
year = ts.year
month = ts.month
folder = f"{base_dir}/{year}/{month:02d}"
os.makedirs(folder, exist_ok=True)
path = f"{folder}/data.csv"
# 追加或创建
if not os.path.exists(path):
pd.DataFrame([row]).to_csv(path)
else:
pd.DataFrame([row]).to_csv(path, mode='a', header=False)
# 使用示例
df = pd.read_csv("BTC_1h.csv", parse_dates=['timestamp'], index_col='timestamp')
save_partitioned(df)
📘 Day 7 — DataClient 文档模板(Markdown)
# DataClient 文档
## 类:DataClient
用于下载、校验、清洗、增量更新加密货币 K 线数据。
---
## 方法说明
### `fetch(symbol, timeframe, since=None, limit=1000)`
下载 OHLCV 数据
**参数:**
- `symbol: str`
- `timeframe: str`
- `since: int | None`
- `limit: int`
**返回:**
- `DataFrame`: 带时间索引
---
### `validate(df)`
检查数据质量:空值、重复、连续性
**返回:**
- `(df_cleaned, report_dict)`
---
## 异常
- 网络错误 → ccxt.NetworkError
- API 限速 → ccxt.RateLimitExceeded
- 数据为空 → ValueError