Day 1 — CCXT 入门 + fetch_ohlcv Demo

# day1_ccxt_demo.ipynb

import ccxt
import pandas as pd
from datetime import datetime

# 交易所(以 Binance 为例)
exchange = ccxt.binance()

# fetch 过去 500 根 1h K 线
symbol = 'BTC/USDT'
timeframe = '1h'

ohlcv = exchange.fetch_ohlcv(symbol, timeframe, limit=500)

# 转 DataFrame
df = pd.DataFrame(ohlcv, columns=['timestamp','open','high','low','close','volume'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')

df.set_index('timestamp', inplace=True)

print(df.head())


📘 Day 2 — 写下载脚本 download_data.py

# download_data.py

import ccxt
import pandas as pd
from datetime import datetime
import argparse

def fetch_ohlcv(symbol="BTC/USDT", timeframe="1h", since=None, limit=1000):
    exchange = ccxt.binance()

    data = exchange.fetch_ohlcv(symbol, timeframe, since=since, limit=limit)

    df = pd.DataFrame(
        data, columns=['timestamp','open','high','low','close','volume']
    )
    df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
    df.set_index('timestamp', inplace=True)

    return df

def save_data(df, path):
    df.to_csv(path)
    print(f"Saved to {path}")

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--symbol", default="BTC/USDT")
    parser.add_argument("--timeframe", default="1h")
    parser.add_argument("--since", type=int, default=None)
    parser.add_argument("--limit", type=int, default=1000)
    parser.add_argument("--output", default="data.csv")
    args = parser.parse_args()

    df = fetch_ohlcv(args.symbol, args.timeframe, args.since, args.limit)
    save_data(df, args.output)


📘 Day 3 — 数据增量更新逻辑

# day3_incremental_update.py

import ccxt
import pandas as pd
import os

exchange = ccxt.binance()

def load_existing(path):
    if not os.path.exists(path):
        return None
    df = pd.read_csv(path, parse_dates=['timestamp'], index_col='timestamp')
    return df

def fetch_new_data(symbol, timeframe, last_ts):
    # last_ts 为 datetime,转成毫秒
    since_ms = int(last_ts.timestamp() * 1000)
    ohlcv = exchange.fetch_ohlcv(symbol, timeframe, since=since_ms, limit=1000)

    df = pd.DataFrame(ohlcv, columns=['timestamp','open','high','low','close','volume'])
    df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
    df.set_index('timestamp', inplace=True)
    return df

path = "BTC_1h.csv"
old = load_existing(path)

if old is None:
    print("No old data, download fresh.")
else:
    last_ts = old.index[-1]
    print("Last timestamp:", last_ts)

    new = fetch_new_data("BTC/USDT", "1h", last_ts)
    combined = pd.concat([old, new]).drop_duplicates()
    combined.to_csv(path)

    print("Done incremental update!")


📘 Day 4 — 数据校验(空值、重复、连续性)+ DataClient 类

# day4_data_client.py

import ccxt
import pandas as pd

class DataClient:
    def __init__(self, exchange="binance"):
        self.exchange = getattr(ccxt, exchange)()

    # 下载数据
    def fetch(self, symbol, timeframe, since=None, limit=1000):
        ohlcv = self.exchange.fetch_ohlcv(symbol, timeframe, since=since, limit=limit)
        df = pd.DataFrame(ohlcv, columns=['timestamp','open','high','low','close','volume'])
        df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
        df.set_index('timestamp', inplace=True)
        return df

    # 数据清洗
    def validate(self, df):
        report = {}

        # 空值检查
        report["nulls"] = df.isnull().sum().to_dict()

        # 重复行
        report["duplicates"] = df.index.duplicated().sum()
        df = df[~df.index.duplicated()]

        # 连续性检查(固定 timeframe 才能严格检查)
        df = df.sort_index()
        diffs = df.index.to_series().diff().astype('timedelta64[m]')
        report["gap_counts"] = (diffs > diffs.mode()[0]).sum()

        return df, report


📘 Day 5 — 改成命令行工具 + 加日志

# download_cli.py

import argparse
import logging
from logging.handlers import RotatingFileHandler
from day4_data_client import DataClient

# 日志
handler = RotatingFileHandler("logs/data.log", maxBytes=2000000, backupCount=3)
logging.basicConfig(handlers=[handler], level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--symbol", default="BTC/USDT")
    parser.add_argument("--timeframe", default="1h")
    parser.add_argument("--limit", type=int, default=1000)
    parser.add_argument("--output", default="data.csv")
    args = parser.parse_args()

    cli = DataClient()

    logging.info(f"Downloading {args.symbol} {args.timeframe}")
    df = cli.fetch(args.symbol, args.timeframe, None, args.limit)

    df.to_csv(args.output)
    logging.info(f"Saved to {args.output}")


📘 Day 6 — 按年月存储数据(分层目录)

# day6_partition_save.py

import pandas as pd
import os

def save_partitioned(df, base_dir="data/"):
    df = df.sort_index()

    for ts, row in df.iterrows():
        year = ts.year
        month = ts.month

        folder = f"{base_dir}/{year}/{month:02d}"
        os.makedirs(folder, exist_ok=True)

        path = f"{folder}/data.csv"

        # 追加或创建
        if not os.path.exists(path):
            pd.DataFrame([row]).to_csv(path)
        else:
            pd.DataFrame([row]).to_csv(path, mode='a', header=False)

# 使用示例
df = pd.read_csv("BTC_1h.csv", parse_dates=['timestamp'], index_col='timestamp')
save_partitioned(df)


📘 Day 7 — DataClient 文档模板(Markdown)

# DataClient 文档

## 类:DataClient

用于下载、校验、清洗、增量更新加密货币 K 线数据。

---

## 方法说明

### `fetch(symbol, timeframe, since=None, limit=1000)`
下载 OHLCV 数据
**参数:**
- `symbol: str`
- `timeframe: str`
- `since: int | None`
- `limit: int`

**返回:**
- `DataFrame`: 带时间索引

---

### `validate(df)`
检查数据质量:空值、重复、连续性
**返回:**
- `(df_cleaned, report_dict)`

---

## 异常
- 网络错误 → ccxt.NetworkError
- API 限速 → ccxt.RateLimitExceeded
- 数据为空 → ValueError