Skip to content

Trade

交易模块。定义了交易代理和基本类型。

broker

Broker是一个交易代理。每一个交易代理对应一个账户,记录了该账户下的交易记录、每日持仓记录和每日市值记录等数据,并提供交易撮合的具体实现。

Broker

Source code in backtest/trade/broker.py
class Broker:
    def __init__(
        self,
        account_name: str,
        principal: float,
        commission: float,
        bt_start: datetime.date = None,
        bt_end: datetime.date = None,
    ):
        """创建一个Broker对象

        Args:
            account_name : 账号/策略名
            principal : 初始本金
            commission : 佣金率
            start : 开始日期(回测时使用)
            end : 结束日期(回测时使用)
        """
        if bt_start is not None and bt_end is not None:
            self.mode = "bt"
            self.bt_start = bt_start
            self.bt_stop = bt_end
            # 回测是否终止?
            self._bt_stopped = False
        else:
            self.mode = "mock"
            self._bt_stopped = False
            self.bt_start = None
            self.bt_stop = None

        # 最后交易时间
        self._last_trade_time: datetime.datetime = None
        self._first_trade_time: datetime.datetime = None

        self.account_name = account_name
        self.commission = commission

        # 初始本金
        self.principal = principal
        # 每日盘后可用资金
        self._cash = np.array([], dtype=cash_dtype)
        # 每日总资产, 包括本金和持仓资产
        self._assets = np.array([], dtype=assets_dtype)

        self._positions = np.array([], dtype=daily_position_dtype)  # 每日持仓
        self._unclosed_trades = {}  # 未平仓的交易

        # 委托列表,包括废单和未成交委托
        self.entrusts = {}

        # 所有的成交列表,包括买入和卖出,已关闭和未关闭的
        self.trades = {}

        # trasaction = buy + sell trade
        self.transactions: List[Transaction] = []

        self._lock = asyncio.Lock()

    def __getstate__(self):
        # self._lock is not pickable
        state = self.__dict__.copy()
        del state["_lock"]

        return state

    def __setstate__(self, state):
        self.__dict__.update(state)
        self._lock = asyncio.Lock()

    @property
    def lock(self):
        return self._lock

    @property
    def cash(self):
        if self._cash.size == 0:
            return self.principal

        return self._cash[-1]["cash"].item()

    @property
    def account_start_date(self) -> datetime.date:
        if self.mode == "bt":
            return self.bt_start
        else:
            return (
                None
                if self._first_trade_time is None
                else self._first_trade_time.date()
            )

    @property
    def account_end_date(self) -> datetime.date:
        if self.mode == "bt":
            return self.bt_stop
        else:
            return (
                None if self._last_trade_time is None else self._last_trade_time.date()
            )

    @property
    def last_trade_date(self):
        return None if self._last_trade_time is None else self._last_trade_time.date()

    @property
    def first_trade_date(self):
        return None if self._first_trade_time is None else self._first_trade_time.date()

    def get_cash(self, dt: datetime.date) -> float:
        """获取`dt`当天的可用资金

        在查询时,如果`dt`小于首次交易日,则返回空,否则,如果当日无数据,将从上一个有数据之日起,进行补齐填充。
        Args:
            dt (datetime.date): 日期

        Returns:
            float: 某日可用资金
        """
        if self._cash.size == 0:
            return self.principal

        if dt > self._cash[-1]["date"]:
            return self._cash[-1]["cash"].item()
        elif dt < self._cash[0]["date"]:
            return self.principal

        result = self._cash[self._cash["date"] == dt]["cash"]
        if result.size == 0:
            raise ValueError(f"{dt} not found")
        else:
            return result.item()

    def get_unclosed_trades(self, dt: datetime.date) -> set:
        """获取`dt`当天未平仓的交易

        如果`dt`小于首次交易日,则返回空,否则,如果当日无数据,将从上一个有数据之日起,进行补齐填充。
        """
        if len(self._unclosed_trades) == 0:
            return set()

        result = self._unclosed_trades.get(dt)
        if result is None:
            start = sorted(self._unclosed_trades.keys())[0]
            if dt < start:
                return set()
            else:
                self._fillup_unclosed_trades(dt)

        return self._unclosed_trades.get(dt)

    def get_position(self, dt: datetime.date, dtype=position_dtype) -> np.ndarray:
        """获取`dt`日持仓

        如果传入的`dt`大于持仓数据的最后一天,将返回最后一天的持仓数据,并且所有持仓均为可售状态
        如果传入的`dt`小于持仓数据的第一天,将返回空。

        Args:
            dt : 查询哪一天的持仓
            dtype : 返回的数据类型,可为[position_dtype][backtest.trade.datatypes.position_dtype]或[daily_position_dtype][backtest.trade.datatypes.daily_position_dtype],后者用于日志输出

        Returns:
            返回结果为dtype为`dtype`的一维numpy structured array,其中price为该批持仓的均价。
        """
        if self._positions.size == 0:
            return np.array([], dtype=dtype)

        if dt < self._positions[0]["date"]:
            return np.array([], dtype=dtype)

        last_date = self._positions[-1]["date"]
        if dt > last_date:
            result = self._positions[self._positions["date"] == last_date]
            result["sellable"] = result["shares"]
            return result[list(dtype.names)].astype(dtype)

        result = self._positions[self._positions["date"] == dt]
        if result.size == 0:
            raise ValueError(f"{dt} not found")

        return result[list(dtype.names)].astype(dtype)

    async def recalc_assets(
        self, start: Optional[datetime.date] = None, end: Optional[datetime.date] = None
    ):
        """重新计算账户的每日资产

        计算完成后,资产表将包括从账户开始前一日,到`end`日的资产数据。从账户开始前一日起,是为了方便计算首个交易日的收益。

        Args:
            end: 计算到哪一天的资产,默认为空,即计算到最后一个交易日(非回测),或者回测结束日。

        """
        if end is None:
            if self.mode != "bt":  # 非回测下计算到当下
                end = arrow.now().date()
            else:  # 回测时计算到bt_stop
                end = self.bt_stop

        # 把期初资产加进来
        if self._assets.size == 0:
            start = self.account_start_date
            if start is None:
                return np.array([], dtype=rich_assets_dtype)

            _before_start = tf.day_shift(start, -1)
            self._assets = np.array(
                [(_before_start, self.principal)], dtype=assets_dtype
            )

        start = start or tf.day_shift(self._assets[-1]["date"], 1)
        if start >= end:
            return

        # 待补齐的资产日
        frames = [tf.int2date(d) for d in tf.get_frames(start, end, FrameType.DAY)]
        # 从最后一个资产日到`end`,持仓应都是一样的
        position = self.get_position(end, position_dtype)
        if position.size == 0:
            assets = self._assets[-1]["assets"]
            self._assets = np.concatenate(
                (
                    self._assets,
                    np.array([(frame, assets) for frame in frames], dtype=assets_dtype),
                )
            )

            return

        secs = position[position["shares"] != 0]["security"]
        shares = {
            sec: position[position["security"] == sec]["shares"][0] for sec in secs
        }

        if len(secs):
            feed = get_app_context().feed
            closes = await feed.batch_get_close_price_in_range(secs, frames)

        for frame in frames:
            cash = self.get_cash(frame)

            mv = 0
            for sec in secs:
                if closes.get(sec) is None:
                    price = position[position["security"] == sec]["price"].item()
                    mv += shares.get(sec, 0) * price
                else:
                    iclose = self._index_of(closes[sec], frame, "frame")
                    mv += closes[sec][iclose]["close"] * shares.get(sec, 0)

            i = self._index_of(self._assets, frame)
            if i is None:
                self._assets = np.append(
                    self._assets,
                    np.array([(frame, float(cash + mv))], dtype=assets_dtype),
                    axis=0,
                )
            else:
                self._assets[i]["assets"] = float(cash + mv)

    async def info(self, dt: datetime.date = None) -> Dict:
        """`dt`日的账号相关信息

        Returns:
            Dict: 账号相关信息:

            - name: str, 账户名
            - principal: float, 初始资金
            - assets: float, `dt`日资产
            - start: datetime.date, 账户创建时间
            - end: 账户结束时间,仅在回测模式下有效
            - bt_stopped: 回测是否结束,仅在回测模式下有效。
            - last_trade: datetime.datetime, 最后一笔交易时间
            - available: float, `dt`日可用资金
            - market_value: `dt`日股票市值
            - pnl: `dt`盈亏(绝对值)
            - ppnl: 盈亏(百分比),即pnl/principal
            - positions: 当前持仓,dtype为position_dtype的numpy structured array

        """
        dt = dt or self.last_trade_date

        cash = self.get_cash(dt)
        assets = await self.get_assets(dt)

        return {
            "name": self.account_name,
            "principal": self.principal,
            "start": self.account_start_date,
            "end": self.bt_stop,
            "bt_stopped": self._bt_stopped,
            "last_trade": self.last_trade_date,
            "assets": assets,
            "available": cash,
            "market_value": assets - cash,
            "pnl": assets - self.principal,
            "ppnl": assets / self.principal - 1,
            "positions": self.get_position(dt),
        }

    async def get_returns(
        self, start_date: datetime.date = None, end_date: datetime.date = None
    ) -> np.ndarray:
        """求截止`end_date`时的每日回报

        Args:
            start_date: 计算回报的起始日期
            end_date : 计算回报的结束日期

        Returns:
            以百分比为单位的每日回报率,索引为对应日期
        """
        start = start_date or self.account_start_date

        # 当计算[start, end]之间的每日回报时,需要取多一日,即`start`之前一日的总资产
        _start = tf.day_shift(start, -1)
        end = end_date or self.account_end_date

        assert self.account_start_date <= start <= end
        assert start <= end <= self.account_end_date

        if not self._bt_stopped:
            await self.recalc_assets()

        assets = self._assets[
            (self._assets["date"] >= _start) & (self._assets["date"] <= end)
        ]

        if assets.size == 0:
            raise ValueError(f"date range error: {start} - {end} contains no data")

        return assets["assets"][1:] / assets["assets"][:-1] - 1

    @property
    def assets(self) -> float:
        """当前总资产。

        如果要获取历史上某天的总资产,请使用`get_assets`方法。
        """
        if self._assets.size == 0:
            return self.principal
        else:
            return self._assets[-1]["assets"]

    async def get_assets(self, date: datetime.date) -> float:
        """查询某日的总资产

        当日总资产 = 当日可用资金 + 持仓市值

        Args:
            date: 查询哪一天的资产

        Returns:
            返回某一日的总资产

        """
        if self._assets.size == 0:
            return self.principal

        if date is None:
            return self._assets[-1]["assets"]

        result = self._assets[self._assets["date"] == date]
        if result.size == 1:
            return result["assets"].item()

        assets, *_ = await self._calc_assets(date)
        return assets

    def _index_of(
        self, arr: np.ndarray, date: datetime.date, index: str = "date"
    ) -> int:
        """查找`arr`中其`index`字段等于`date`的索引

            注意数组中`date`字段取值必须惟一。

        Args:
            arr: numpy array, 需要存在`index`字段
            date: datetime.date, 查找的日期

        Returns:
            如果存在,返回索引,否则返回None
        """
        pos = np.argwhere(arr[index] == date).ravel()

        assert len(pos) <= 1, "date should be unique"
        if len(pos) == 0:
            return None

        return pos[0]

    async def _calc_assets(self, date: datetime.date) -> Tuple[float]:
        """计算某日的总资产

        此函数不更新资产表,以避免资产表中留下空洞。比如:
        当前最后交易日为4月10日,4月17日发生一笔委卖,导致cash/position记录更新到4/17,但资产表仍然保持在4月10日,此时如果缓存该记录,将导致资产表中留下空洞。

        Args:
            date: 计算哪一天的资产

        Returns:
            返回总资产, 可用资金, 持仓市值
        """
        if date < self.account_start_date:
            return self.principal, 0, 0

        if (self.mode == "bt" and date > self.bt_stop) or date > arrow.now().date():
            raise ValueError(
                f"wrong date: {date}, date must be before {self.bt_stop} or {arrow.now().date()}"
            )

        cash = self.get_cash(date)
        positions = self.get_position(date)
        # this also exclude empty entry (which security is None)
        heldings = positions[positions["shares"] > 0]["security"]

        market_value = 0
        if heldings.size > 0:
            feed = get_app_context().feed

            for sec in heldings:
                shares = positions[positions["security"] == sec]["shares"].item()
                price = await feed.get_close_price(sec, date)

                if price is not None:
                    market_value += shares * price
                else:
                    price = positions[positions["security"] == sec]["price"].item()
                    market_value += shares * price

        assets = cash + market_value

        return assets, cash, market_value

    @property
    def position(self) -> np.ndarray:
        """获取当前持仓

        如果要获取历史上某天的持仓,请使用`get_position`方法。
        如果当天个股曾有持仓,但被清仓,持仓表仍保留entry,但shares将置为空。如果当天没有任何持仓(不包括当天清空的情况),则会留一个`security`字段为None的空entry。

        Returns:
            返回dtype为[position_dtype][backtest.trade.datatypes.position_dtype]的numpy structure array
        """
        if self._positions.size == 0:
            return np.array([], dtype=position_dtype)

        last_day = self._positions[-1]["date"]
        result = self._positions[self._positions["date"] == last_day]

        return result[list(position_dtype.names)].astype(position_dtype)

    def __str__(self):
        s = (
            f"账户:{self.account_name}:\n"
            + f"    总资产:{self.assets:,.2f}\n"
            + f"    本金:{self.principal:,.2f}\n"
            + f"    可用资金:{self.cash:,.2f}\n"
            + f"    持仓:{self.position}\n"
        )

        return s

    def __repr__(self) -> str:
        return f"<{self.__class__.__name__}>{self}"

    async def _calendar_validation(self, bid_time: datetime.datetime):
        """更新和校准交易日期

        如果是回测模式,则在进入_bt_stopped状态时,还要完整计算一次assets,此后不再重复计算。

        Args:
            bid_time : 交易发生的时间
        """
        if self.mode == "bt" and self._bt_stopped:
            logger.warning("委托时间超过回测结束时间: %s, %s", bid_time, self.bt_stop)
            raise AccountError(f"下单时间为{bid_time},而账户已于{self.bt_stop}冻结。")

        if self._first_trade_time is None:
            self._first_trade_time = bid_time
        elif bid_time < self._first_trade_time:
            logger.warning("委托时间必须递增出现: %s -> %s", self._first_trade_time, bid_time)
            raise EntrustError(
                EntrustError.TIME_REWIND,
                time=bid_time,
                last_trade_time=self._first_trade_time,
            )

        if self._last_trade_time is None or bid_time >= self._last_trade_time:
            self._last_trade_time = bid_time
        else:
            logger.warning("委托时间必须递增出现:%s -> %s", self._last_trade_time, bid_time)
            raise EntrustError(
                EntrustError.TIME_REWIND,
                time=bid_time,
                last_trade_time=self._last_trade_time,
            )

        if self.mode == "bt" and bid_time.date() > self.bt_stop:
            self._bt_stopped = True
            await self.recalc_assets()
            logger.warning("委托时间超过回测结束时间: %s, %s", bid_time, self.bt_stop)
            raise AccountError(f"下单时间为{bid_time},而账户已于{self.bt_stop}冻结。")

    async def buy(
        self,
        security: str,
        bid_price: float,
        bid_shares: int,
        bid_time: datetime.datetime,
    ) -> Trade:
        """买入委托

        买入以尽可能实现委托为目标。如果可用资金不足,但能买入部分股票,则部分买入。

        如果bid_price为None,则使用涨停价买入。

        Args:
            security str: 证券代码
            bid_price float: 委托价格。如果为None,则为市价委托
            bid_shares int: 询买的股数
            bid_time datetime.datetime: 委托时间

        Returns:
            [Trade][backtest.trade.trade.Trade]对象
        """
        # 同一个账户,也可能出现并发的买单和卖单,这些操作必须串行化
        async with self.lock:
            return await self._buy(security, bid_price, bid_shares, bid_time)

    async def _buy(
        self,
        security: str,
        bid_price: float,
        bid_shares: int,
        bid_time: datetime.datetime,
    ) -> Trade:
        entrustlog.info(
            f"{bid_time}\t{security}\t{bid_shares}\t{bid_price}\t{EntrustSide.BUY}"
        )
        assert (
            type(bid_time) is datetime.datetime
        ), f"{bid_time} is not type of datetime"

        await self._before_trade(bid_time)

        feed = get_app_context().feed

        en = Entrust(
            security,
            EntrustSide.BUY,
            bid_shares,
            bid_price,
            bid_time,
            BidType.LIMIT if bid_price is not None else BidType.MARKET,
        )

        logger.info(
            "买入委托(%s): %s %d %s, 单号:%s",
            bid_time,
            security,
            bid_shares,
            bid_price,
            en.eid,
        )

        self.entrusts[en.eid] = en

        _, buy_limit_price, _ = await feed.get_trade_price_limits(
            security, bid_time.date()
        )

        bid_price = bid_price or buy_limit_price

        # 获取用以撮合的数据
        bars = await feed.get_price_for_match(security, bid_time)
        if bars.size == 0:
            logger.warning("failed to match %s, no data at %s", security, bid_time)
            raise EntrustError(
                EntrustError.NODATA_FOR_MATCH, security=security, time=bid_time
            )

        # 移除掉涨停和价格高于委买价的bar后,看还能买多少股
        bars = self._remove_for_buy(
            security, bid_time, bars, bid_price, buy_limit_price
        )

        # 将买入数限制在可用资金范围内
        shares_to_buy = min(
            bid_shares, self.cash // (bid_price * (1 + self.commission))
        )

        # 必须以手为单位买入,否则委托会失败
        shares_to_buy = shares_to_buy // 100 * 100
        if shares_to_buy < 100:
            logger.info("委买失败:%s(%s), 资金(%s)不足", security, self.cash, en.eid)
            raise EntrustError(
                EntrustError.NO_CASH,
                account=self.account_name,
                required=100 * bid_price,
                available=self.cash,
            )

        mean_price, filled, close_time = self._match_buy(bars, shares_to_buy)
        if filled == 0:
            raise EntrustError(
                EntrustError.VOLUME_NOT_ENOUGH, security=security, price=bid_price
            )
        return await self._fill_buy_order(en, mean_price, filled, close_time)

    def _match_buy(
        self, bid_queue, shares_to_buy
    ) -> Tuple[float, float, datetime.datetime]:
        """计算此次买入的成交均价和成交量

        Args:
            bid_queue : 撮合数据
            shares_to_buy : 要买入的股数

        Returns:
            成交均价、可埋单股数和最后成交时间
        """
        c, v = bid_queue["price"], bid_queue["volume"]

        cum_v = np.cumsum(v)

        # until i the order can be filled
        where_total_filled = np.argwhere(cum_v >= shares_to_buy)
        if len(where_total_filled) == 0:
            i = len(v) - 1
        else:
            i = np.min(where_total_filled)

        # 也许到当天结束,都没有足够的股票
        filled = min(cum_v[i], shares_to_buy) // 100 * 100

        # 最后一周期,只需要成交剩余的部分
        vol = v[: i + 1].copy()
        vol[-1] = filled - np.sum(vol[:-1])

        money = sum(c[: i + 1] * vol)
        mean_price = money / filled

        return mean_price, filled, bid_queue["frame"][i]

    def _fillup_unclosed_trades(self, dt: datetime.date):
        if len(self._unclosed_trades) != 0 and self._unclosed_trades.get(dt) is None:
            days = sorted(list(self._unclosed_trades.keys()))
            frames = tf.get_frames(days[-1], dt, FrameType.DAY)
            for src, dst in zip(frames[:-1], frames[1:]):
                src = tf.int2date(src)
                dst = tf.int2date(dst)
                self._unclosed_trades[dst] = self._unclosed_trades[src].copy()

    def _update_unclosed_trades(self, tid, date: datetime.date):
        """记录每日持有的未平仓交易

        Args:
            trades: 交易列表
        """
        unclosed = self._unclosed_trades.get(date, [])
        if len(unclosed):
            unclosed.append(tid)

            return

        if len(self._unclosed_trades) == 0:
            self._unclosed_trades[date] = [tid]
            return

        # 记录还未创建,需要复制前一日记录
        self._fillup_unclosed_trades(date)

        self._unclosed_trades[date].append(tid)

    async def _fill_buy_order(
        self, en: Entrust, price: float, filled: float, close_time: datetime.datetime
    ) -> Trade:
        """生成trade,更新交易、持仓和assets

        Args:
            en : _description_
            price : _description_
            filled : _description_
            close_time : _description_

        Returns:
            成交记录
        """
        money = price * filled
        fee = math_round(money * self.commission, 2)

        trade = Trade(en.eid, en.security, price, filled, fee, en.side, close_time)
        self.trades[trade.tid] = trade
        self._update_unclosed_trades(trade.tid, close_time.date())
        await self._update_positions(trade, close_time.date())

        logger.info(
            "买入成交(%s): %s (%d %.2f %.2f),委单号: %s, 成交号: %s",
            close_time,
            en.security,
            filled,
            price,
            fee,
            en.eid,
            trade.tid,
        )
        tradelog.info(
            f"{en.bid_time.date()}\t{en.side}\t{en.security}\t{filled}\t{price}\t{fee}"
        )

        logger.info(
            "%s 买入后持仓: \n%s",
            close_time.date(),
            tabulate_numpy_array(
                self.get_position(close_time.date(), daily_position_dtype)
            ),
        )

        # 当发生新的买入时,更新资产
        cash_change = -1 * (money + fee)
        await self._update_assets(cash_change, close_time)

        await emit.emit(E_BACKTEST, {"buy": jsonify(trade)})
        return trade

    async def _before_trade(self, bid_time: datetime.datetime):
        """交易前的准备工作

        在每次交易前,补齐每日现金数据和持仓数据到`bid_time`,更新账户生命期等。

        Args:
            bid_time: 委托时间

        Returns:

        """
        logger.info("bid_time is %s", bid_time)
        await self._calendar_validation(bid_time)

        # 补齐可用将资金表
        if self._cash.size == 0:
            start = tf.day_shift(self.account_start_date, -1)
            end = bid_time.date()
            frames = tf.get_frames(start, end, FrameType.DAY)
            _cash = [(tf.int2date(frame), self.principal) for frame in frames]
            self._cash = np.array(_cash, dtype=cash_dtype)
        else:
            prev, cash = self._cash[-1]

            frames = tf.get_frames(prev, bid_time, FrameType.DAY)[1:]
            if len(frames) > 0:
                recs = [(tf.int2date(date), cash) for date in frames]

                self._cash = np.concatenate(
                    (self._cash, np.array(recs, dtype=cash_dtype))
                )

        await self._fillup_positions(bid_time)

    async def _fillup_positions(self, bid_time: datetime.datetime):
        # 补齐持仓表(需要处理复权)
        feed = get_app_context().feed

        if self._positions.size == 0:
            return

        prev = self._positions[-1]["date"]
        logger.info("handling positions fillup from %s to %s", prev, bid_time)
        frames = [
            tf.int2date(frame) for frame in tf.get_frames(prev, bid_time, FrameType.DAY)
        ]
        if len(frames) == 1:
            return

        last_day_position = self._positions[self._positions["date"] == prev]
        if np.all(last_day_position["security"] is None):
            # empty entries, no need to be extended
            return

        last_held_position = last_day_position[last_day_position["shares"] != 0]

        if last_held_position.size == 0:
            empty = np.array(
                [(frame, None, 0, 0, 0) for frame in frames[1:]],
                dtype=daily_position_dtype,
            )
            self._positions = np.hstack((self._positions, empty))
            return

        # 已清空股票不需要展仓, issue 9
        secs = last_held_position["security"].tolist()
        dr_info = await feed.get_dr_factor(secs, frames)

        padding_positions = []
        for position in last_held_position:
            sec = position["security"]
            if sec in dr_info:
                dr = dr_info[sec]
            else:
                dr = None

            paddings = np.array(
                [position.item()] * len(frames), dtype=daily_position_dtype
            )
            paddings["date"] = frames

            if dr is not None:
                adjust_shares = array_math_round(
                    paddings["shares"][1:] * np.diff(dr), 2
                )
                paddings["shares"] = paddings["shares"] * dr
                paddings["price"] = paddings["price"] / dr

                # 模拟一笔买单,以便此后卖出时能对应到买单。否则,将卖不出去。
                # https://github.com/zillionare/trader-client/issues/10
                for i, adjust_share in enumerate(adjust_shares):
                    if abs(adjust_share) < 1e-5:
                        continue

                    order_time = tf.combine_time(frames[i + 1], 15)
                    trade = Trade(
                        uuid.uuid4(),
                        sec,
                        paddings["price"][i + 1].item(),
                        adjust_share,
                        0,
                        EntrustSide.XDXR,
                        order_time,
                    )
                    self.trades[trade.tid] = trade
                    self._update_unclosed_trades(trade.tid, bid_time.date())

            paddings["sellable"][1:] = paddings["shares"][:-1]

            padding_positions.extend(paddings[1:])

        if len(padding_positions):
            padding_positions.sort(key=lambda x: x[0])
            self._positions = np.concatenate((self._positions, padding_positions))

    async def _update_positions(self, trade: Trade, bid_date: datetime.date):
        """更新持仓信息

        持仓信息为一维numpy数组,其类型为daily_position_dtype。如果某支股票在某日被清空,则当日持仓表保留记录,置shares为零,方便通过持仓表看出股票的进场出场时间,另一方面,如果不保留这条记录(而是删除),则在所有股票都被清空的情况下,会导致持仓表出现空洞,从而导致下一次交易时,误将更早之前的持仓记录复制到当日的持仓表中(在_before_trade中),而这些持仓实际上已经被清空。

        Args:
            trade: 交易信息
            bid_date: 买入/卖出日期
        """
        if type(bid_date) == datetime.datetime:
            bid_date = bid_date.date()

        if self._positions.size == 0:
            self._positions = np.array(
                [(bid_date, trade.security, trade.shares, 0, trade.price)],
                dtype=daily_position_dtype,
            )

            return

        # find if the security is already in the position (same day)
        pos = np.argwhere(
            (self._positions["security"] == trade.security)
            & (self._positions["date"] == bid_date)
        )

        if pos.size == 0:
            self._positions = np.append(
                self._positions,
                np.array(
                    [(bid_date, trade.security, trade.shares, 0, trade.price)],
                    dtype=daily_position_dtype,
                ),
            )
        else:
            i = pos[0].item()
            *_, old_shares, old_sellable, old_price = self._positions[i]
            new_shares, new_price = trade.shares, trade.price

            if trade.side == EntrustSide.BUY:
                self._positions[i] = (
                    bid_date,
                    trade.security,
                    old_shares + trade.shares,
                    old_sellable,
                    (old_price * old_shares + new_shares * new_price)
                    / (old_shares + new_shares),
                )
            else:
                shares = old_shares - trade.shares
                sellable = old_sellable - trade.shares
                if shares <= 0.1:
                    old_price = 0
                    shares = 0
                    sellable = 0
                self._positions[i] = (
                    bid_date,
                    trade.security,
                    shares,
                    sellable,
                    old_price,  # 卖出时成本不变,除非已清空
                )

        return

    async def _update_assets(self, cash_change: float, bid_time: datetime.datetime):
        """更新当前资产(含持仓)

        在每次资产变动时进行计算和更新,并对之前的资产表进行补全。

        Args:
            cash_change : 变动的现金
            bid_time: 委托时间
        """
        logger.info("cash change: %s", cash_change)

        # 补齐资产表到上一个交易日
        if self._assets.size == 0:
            _before_start = tf.day_shift(self.account_start_date, -1)
            self._assets = np.array(
                [(_before_start, self.principal)], dtype=assets_dtype
            )

        start = tf.day_shift(self._assets[-1]["date"], 1)
        end = tf.day_shift(bid_time, -1)
        if start < end:
            await self.recalc_assets(start, end)

        bid_date = bid_time.date()

        # _before_trade应该已经为当日交易准备好了可用资金数据
        assert self._cash[-1]["date"] == bid_date
        self._cash[-1]["cash"] += cash_change

        assets, cash, mv = await self._calc_assets(bid_date)

        i = self._index_of(self._assets, bid_date)
        if i is None:
            self._assets = np.append(
                self._assets, np.array([(bid_date, assets)], dtype=assets_dtype)
            )
        else:
            # don't use self._assets[self._assets["date"] == date], this always return copy
            self._assets[i]["assets"] = assets

        info = np.array(
            [(bid_date, assets, cash, mv, cash_change)],
            dtype=[
                ("date", "O"),
                ("assets", float),
                ("cash", float),
                ("market value", float),
                ("change", float),
            ],
        )
        logger.info("\n%s", tabulate_numpy_array(info))

    async def _fill_sell_order(
        self, en: Entrust, price: float, to_sell: float
    ) -> List[Trade]:
        """从positions中扣减股票、增加可用现金

        Args:
            en : 委卖单
            price : 成交均价
            filled : 回报的卖出数量

        Returns:
            成交记录列表
        """
        dt = en.bid_time.date()

        money = price * to_sell
        fee = math_round(money * self.commission, 2)

        security = en.security

        unclosed_trades = self.get_unclosed_trades(dt)
        closed_trades = []
        exit_trades = []
        refund = 0
        while to_sell > 0:
            for tid in unclosed_trades:
                trade: Trade = self.trades[tid]
                if trade.security != security:
                    continue

                if trade.time.date() >= dt:
                    # not T + 1
                    continue

                to_sell, fee, exit_trade, tx = trade.sell(
                    to_sell, price, fee, en.bid_time
                )

                logger.info(
                    "卖出成交(%s): %s (%d %.2f %.2f),委单号: %s, 成交号: %s",
                    exit_trade.time,
                    en.security,
                    exit_trade.shares,
                    exit_trade.price,
                    exit_trade.fee,
                    en.eid,
                    exit_trade.tid,
                )
                tradelog.info(
                    f"{en.bid_time.date()}\t{exit_trade.side}\t{exit_trade.security}\t{exit_trade.shares}\t{exit_trade.price}\t{exit_trade.fee}"
                )
                await self._update_positions(exit_trade, exit_trade.time)
                exit_trades.append(exit_trade)
                self.trades[exit_trade.tid] = exit_trade
                self.transactions.append(tx)

                refund += exit_trade.shares * exit_trade.price - exit_trade.fee

                if trade.closed:
                    closed_trades.append(tid)

                if to_sell == 0:
                    break
            else:  # no more unclosed trades, even if to_sell > 0
                break

        unclosed_trades = [tid for tid in unclosed_trades if tid not in closed_trades]
        self._unclosed_trades[dt] = unclosed_trades

        logger.info(
            "%s 卖出后持仓: \n%s",
            dt,
            tabulate_numpy_array(self.get_position(dt, daily_position_dtype)),
        )

        await self._update_assets(refund, en.bid_time)

        await emit.emit(E_BACKTEST, {"sell": jsonify(exit_trades)})
        return exit_trades

    async def sell(
        self,
        security: str,
        bid_price: Union[None, float],
        bid_shares: float,
        bid_time: datetime.datetime,
    ) -> List[Trade]:
        """卖出委托

        Args:
            security str: 委托证券代码
            bid_price float: 出售价格,如果为None,则为市价委托
            bid_shares float: 询卖股数。注意我们不限制必须以100的倍数卖出。
            bid_time datetime.datetime: 委托时间

        Returns:
            成交记录列表,每个元素都是一个[Trade][backtest.trade.trade.Trade]对象

        """
        # 同一个账户,也可能出现并发的买单和卖单,这些操作必须串行化
        async with self.lock:
            return await self._sell(security, bid_price, bid_shares, bid_time)

    async def _sell(
        self,
        security: str,
        bid_price: Union[None, float],
        bid_shares: float,
        bid_time: datetime.datetime,
    ) -> List[Trade]:
        await self._before_trade(bid_time)

        feed = get_app_context().feed

        entrustlog.info(
            f"{bid_time}\t{security}\t{bid_shares}\t{bid_price}\t{EntrustSide.SELL}"
        )
        logger.info("卖出委托(%s): %s %s %s", bid_time, security, bid_price, bid_shares)
        _, _, sell_limit_price = await feed.get_trade_price_limits(
            security, bid_time.date()
        )

        if bid_price is None:
            bid_type = BidType.MARKET
            bid_price = sell_limit_price
        else:
            bid_type = BidType.LIMIT

        # fill the order, get mean price
        bars = await feed.get_price_for_match(security, bid_time)
        if bars.size == 0:
            logger.warning("failed to match: %s, no data at %s", security, bid_time)
            raise EntrustError(
                EntrustError.NODATA_FOR_MATCH, security=security, time=bid_time
            )

        bars = self._remove_for_sell(
            security, bid_time, bars, bid_price, sell_limit_price
        )

        c, v = bars["price"], bars["volume"]

        cum_v = np.cumsum(v)

        shares_to_sell = self._get_sellable_shares(security, bid_shares, bid_time)
        if shares_to_sell == 0:
            logger.info("卖出失败: %s %s %s, 可用股数为0", security, bid_shares, bid_time)
            logger.info("%s", self.get_unclosed_trades(bid_time.date()))
            raise EntrustError(
                EntrustError.NO_POSITION, security=security, time=bid_time
            )

        # until i the order can be filled
        where_total_filled = np.argwhere(cum_v >= shares_to_sell)
        if len(where_total_filled) == 0:
            i = len(v) - 1
        else:
            i = np.min(where_total_filled)

        close_time = bars[i]["frame"]
        # 也许到当天结束,都没有足够的股票
        filled = min(cum_v[i], shares_to_sell)

        # 最后一周期,只需要成交剩余的部分
        vol = v[: i + 1].copy()
        vol[-1] = filled - np.sum(vol[:-1])

        money = sum(c[: i + 1] * vol)
        mean_price = money / filled

        en = Entrust(
            security, EntrustSide.SELL, bid_shares, bid_price, bid_time, bid_type
        )

        logger.info(
            "委卖%s(%s), 成交%s股,均价%.2f, 成交时间%s",
            en.security,
            en.eid,
            filled,
            mean_price,
            close_time,
        )

        return await self._fill_sell_order(en, mean_price, filled)

    def _get_sellable_shares(
        self, security: str, shares_asked: int, bid_time: datetime.datetime
    ) -> int:
        """获取可卖股数

        如果shares_asked与可售之间的差不足1股,则自动加上零头,确保可以卖完。

        Args:
            security: 证券代码

        Returns:
            可卖股数
        """
        shares = 0
        for tid in self.get_unclosed_trades(bid_time.date()):
            t = self.trades[tid]
            if t.security == security and t.time.date() < bid_time.date():
                assert t.closed is False
                shares += t._unsell

        if shares - shares_asked < 100:
            return shares
        return min(shares_asked, shares)

    def _remove_for_buy(
        self,
        security: str,
        order_time: datetime.datetime,
        bars: np.ndarray,
        price: float,
        limit_price: float,
    ) -> np.ndarray:
        """
        去掉已达到涨停时的分钟线,或者价格高于买入价的bars
        """
        reach_limit = array_price_equal(bars["price"], limit_price)
        bars = bars[(~reach_limit)]

        if bars.size == 0:
            raise EntrustError(
                EntrustError.REACH_BUY_LIMIT, security=security, time=order_time
            )

        bars = bars[(bars["price"] <= price)]
        if bars.size == 0:
            raise EntrustError(
                EntrustError.PRICE_NOT_MEET,
                security=security,
                time=order_time,
                entrust=price,
            )

        return bars

    def _remove_for_sell(
        self,
        security: str,
        order_time: datetime.datetime,
        bars: np.ndarray,
        price: float,
        limit_price: float,
    ) -> np.ndarray:
        """去掉当前价格低于price,或者已经达到跌停时的bars,这些bars上无法成交"""
        reach_limit = array_price_equal(bars["price"], limit_price)
        bars = bars[(~reach_limit)]

        if bars.size == 0:
            raise EntrustError(
                EntrustError.REACH_SELL_LIMIT, security=security, time=order_time
            )

        bars = bars[(bars["price"] >= price)]
        if bars.size == 0:
            raise EntrustError(
                EntrustError.PRICE_NOT_MEET, security=security, entrust=price
            )

        return bars

    def freeze(self):
        """冻结账户,停止接收新的委托"""
        self._bt_stopped = True

    async def metrics(
        self,
        start: Optional[datetime.date] = None,
        end: Optional[datetime.date] = None,
        baseline: Optional[str] = "399300.XSHE",
    ) -> Dict:
        """获取指定时间段的账户指标

        Args:
            start: 开始时间
            end: 结束时间
            baseline: 参考标的

        Returns:
            Dict: 指标字典,其key为

            - start 回测起始时间
            - end   回测结束时间
            - window 资产暴露时间
            - total_tx 发生的配对交易次数
            - total_profit 总盈亏
            - total_profit_rate 总盈亏率
            - win_rate 胜率
            - mean_return 每笔配对交易平均回报率
            - sharpe    夏普比率
            - max_drawdown 最大回撤
            - sortino
            - calmar
            - annual_return 年化收益率
            - volatility 波动率
            - baseline: dict
                - win_rate
                - sharpe
                - max_drawdown
                - sortino
                - annual_return
                - total_profit_rate
                - volatility

        """
        try:
            rf = cfg.metrics.risk_free_rate / cfg.metrics.annual_days
        except Exception:
            rf = 0

        start = min(start or self.account_start_date, self.account_start_date)
        end = max(end or self.account_end_date, self.account_end_date)

        tx = []
        logger.info("%s tx in total", len(self.transactions))
        for t in self.transactions:
            if t.entry_time.date() >= start and t.exit_time.date() <= end:
                tx.append(t)
            else:
                logger.info(
                    "tx %s not in range, start: %s, end: %s",
                    t.sec,
                    t.entry_time,
                    t.exit_time,
                )

        # 资产暴露时间
        window = tf.count_day_frames(start, end)
        total_tx = len(tx)

        if total_tx == 0:
            return {
                "start": start,
                "end": end,
                "window": window,
                "total_tx": total_tx,
                "total_profit": None,
                "total_profit_rate": None,
                "win_rate": None,
                "mean_return": None,
                "sharpe": None,
                "sortino": None,
                "calmar": None,
                "max_drawdown": None,
                "annual_return": None,
                "volatility": None,
                "baseline": None,
            }

        # win_rate
        wr = len([t for t in tx if t.profit > 0]) / total_tx

        if not self._bt_stopped:
            await self.recalc_assets()

        # 当计算[start, end]之间的盈亏时,我们实际上要多取一个交易日,即start之前一个交易日的资产数据
        _start = tf.day_shift(start, -1)
        total_profit = await self.get_assets(end) - await self.get_assets(_start)

        returns = await self.get_returns(start, end)
        mean_return = np.mean(returns)

        sharpe = sharpe_ratio(returns, rf)
        sortino = sortino_ratio(returns, rf)
        calma = calmar_ratio(returns)
        mdd = max_drawdown(returns)

        # 年化收益率
        ar = annual_return(returns)

        # 年化波动率
        vr = annual_volatility(returns)

        # 计算参考标的的相关指标
        if baseline is not None:
            ref_bars = await Stock.get_bars_in_range(
                baseline, FrameType.DAY, start, end
            )

            if ref_bars.size < 2:
                ref_results = None
            else:
                returns = ref_bars["close"][1:] / ref_bars["close"][:-1] - 1

                ref_results = {
                    "total_profit_rate": cum_returns_final(returns),
                    "win_rate": np.count_nonzero(returns > 0) / len(returns),
                    "mean_return": np.mean(returns).item(),
                    "sharpe": sharpe_ratio(returns, rf),
                    "sortino": sortino_ratio(returns, rf),
                    "calmar": calmar_ratio(returns),
                    "max_drawdown": max_drawdown(returns),
                    "annual_return": annual_return(returns),
                    "volatility": annual_volatility(returns),
                }
        else:
            ref_results = None

        return {
            "start": start,
            "end": end,
            "window": window,
            "total_tx": total_tx,
            "total_profit": total_profit,
            "total_profit_rate": total_profit / self.principal,
            "win_rate": wr,
            "mean_return": mean_return,
            "sharpe": sharpe,
            "sortino": sortino,
            "calmar": calma,
            "max_drawdown": mdd,
            "annual_return": ar,
            "volatility": vr,
            "baseline": ref_results,
        }

assets: float property readonly

当前总资产。

如果要获取历史上某天的总资产,请使用get_assets方法。

position: ndarray property readonly

获取当前持仓

如果要获取历史上某天的持仓,请使用get_position方法。 如果当天个股曾有持仓,但被清仓,持仓表仍保留entry,但shares将置为空。如果当天没有任何持仓(不包括当天清空的情况),则会留一个security字段为None的空entry。

Returns:

Type Description
ndarray

返回dtype为position_dtype的numpy structure array

__init__(self, account_name, principal, commission, bt_start=None, bt_end=None) special

创建一个Broker对象

Parameters:

Name Type Description Default
account_name

账号/策略名

required
principal

初始本金

required
commission

佣金率

required
start

开始日期(回测时使用)

required
end

结束日期(回测时使用)

required
Source code in backtest/trade/broker.py
def __init__(
    self,
    account_name: str,
    principal: float,
    commission: float,
    bt_start: datetime.date = None,
    bt_end: datetime.date = None,
):
    """创建一个Broker对象

    Args:
        account_name : 账号/策略名
        principal : 初始本金
        commission : 佣金率
        start : 开始日期(回测时使用)
        end : 结束日期(回测时使用)
    """
    if bt_start is not None and bt_end is not None:
        self.mode = "bt"
        self.bt_start = bt_start
        self.bt_stop = bt_end
        # 回测是否终止?
        self._bt_stopped = False
    else:
        self.mode = "mock"
        self._bt_stopped = False
        self.bt_start = None
        self.bt_stop = None

    # 最后交易时间
    self._last_trade_time: datetime.datetime = None
    self._first_trade_time: datetime.datetime = None

    self.account_name = account_name
    self.commission = commission

    # 初始本金
    self.principal = principal
    # 每日盘后可用资金
    self._cash = np.array([], dtype=cash_dtype)
    # 每日总资产, 包括本金和持仓资产
    self._assets = np.array([], dtype=assets_dtype)

    self._positions = np.array([], dtype=daily_position_dtype)  # 每日持仓
    self._unclosed_trades = {}  # 未平仓的交易

    # 委托列表,包括废单和未成交委托
    self.entrusts = {}

    # 所有的成交列表,包括买入和卖出,已关闭和未关闭的
    self.trades = {}

    # trasaction = buy + sell trade
    self.transactions: List[Transaction] = []

    self._lock = asyncio.Lock()

buy(self, security, bid_price, bid_shares, bid_time) async

买入委托

买入以尽可能实现委托为目标。如果可用资金不足,但能买入部分股票,则部分买入。

如果bid_price为None,则使用涨停价买入。

Parameters:

Name Type Description Default
security str

证券代码

required
bid_price float

委托价格。如果为None,则为市价委托

required
bid_shares int

询买的股数

required
bid_time datetime.datetime

委托时间

required

Returns:

Type Description
Trade

Trade对象

Source code in backtest/trade/broker.py
async def buy(
    self,
    security: str,
    bid_price: float,
    bid_shares: int,
    bid_time: datetime.datetime,
) -> Trade:
    """买入委托

    买入以尽可能实现委托为目标。如果可用资金不足,但能买入部分股票,则部分买入。

    如果bid_price为None,则使用涨停价买入。

    Args:
        security str: 证券代码
        bid_price float: 委托价格。如果为None,则为市价委托
        bid_shares int: 询买的股数
        bid_time datetime.datetime: 委托时间

    Returns:
        [Trade][backtest.trade.trade.Trade]对象
    """
    # 同一个账户,也可能出现并发的买单和卖单,这些操作必须串行化
    async with self.lock:
        return await self._buy(security, bid_price, bid_shares, bid_time)

freeze(self)

冻结账户,停止接收新的委托

Source code in backtest/trade/broker.py
def freeze(self):
    """冻结账户,停止接收新的委托"""
    self._bt_stopped = True

get_assets(self, date) async

查询某日的总资产

当日总资产 = 当日可用资金 + 持仓市值

Parameters:

Name Type Description Default
date date

查询哪一天的资产

required

Returns:

Type Description
float

返回某一日的总资产

Source code in backtest/trade/broker.py
async def get_assets(self, date: datetime.date) -> float:
    """查询某日的总资产

    当日总资产 = 当日可用资金 + 持仓市值

    Args:
        date: 查询哪一天的资产

    Returns:
        返回某一日的总资产

    """
    if self._assets.size == 0:
        return self.principal

    if date is None:
        return self._assets[-1]["assets"]

    result = self._assets[self._assets["date"] == date]
    if result.size == 1:
        return result["assets"].item()

    assets, *_ = await self._calc_assets(date)
    return assets

get_cash(self, dt)

获取dt当天的可用资金

在查询时,如果dt小于首次交易日,则返回空,否则,如果当日无数据,将从上一个有数据之日起,进行补齐填充。

Parameters:

Name Type Description Default
dt datetime.date

日期

required

Returns:

Type Description
float

某日可用资金

Source code in backtest/trade/broker.py
def get_cash(self, dt: datetime.date) -> float:
    """获取`dt`当天的可用资金

    在查询时,如果`dt`小于首次交易日,则返回空,否则,如果当日无数据,将从上一个有数据之日起,进行补齐填充。
    Args:
        dt (datetime.date): 日期

    Returns:
        float: 某日可用资金
    """
    if self._cash.size == 0:
        return self.principal

    if dt > self._cash[-1]["date"]:
        return self._cash[-1]["cash"].item()
    elif dt < self._cash[0]["date"]:
        return self.principal

    result = self._cash[self._cash["date"] == dt]["cash"]
    if result.size == 0:
        raise ValueError(f"{dt} not found")
    else:
        return result.item()

get_position(self, dt, dtype=dtype([('security', 'O'), ('shares', '<f8'), ('sellable', '<f8'), ('price', '<f8')]))

获取dt日持仓

如果传入的dt大于持仓数据的最后一天,将返回最后一天的持仓数据,并且所有持仓均为可售状态 如果传入的dt小于持仓数据的第一天,将返回空。

Parameters:

Name Type Description Default
dt

查询哪一天的持仓

required
dtype

返回的数据类型,可为position_dtypedaily_position_dtype,后者用于日志输出

dtype([('security', 'O'), ('shares', '<f8'), ('sellable', '<f8'), ('price', '<f8')])

Returns:

Type Description
ndarray

返回结果为dtype为dtype的一维numpy structured array,其中price为该批持仓的均价。

Source code in backtest/trade/broker.py
def get_position(self, dt: datetime.date, dtype=position_dtype) -> np.ndarray:
    """获取`dt`日持仓

    如果传入的`dt`大于持仓数据的最后一天,将返回最后一天的持仓数据,并且所有持仓均为可售状态
    如果传入的`dt`小于持仓数据的第一天,将返回空。

    Args:
        dt : 查询哪一天的持仓
        dtype : 返回的数据类型,可为[position_dtype][backtest.trade.datatypes.position_dtype]或[daily_position_dtype][backtest.trade.datatypes.daily_position_dtype],后者用于日志输出

    Returns:
        返回结果为dtype为`dtype`的一维numpy structured array,其中price为该批持仓的均价。
    """
    if self._positions.size == 0:
        return np.array([], dtype=dtype)

    if dt < self._positions[0]["date"]:
        return np.array([], dtype=dtype)

    last_date = self._positions[-1]["date"]
    if dt > last_date:
        result = self._positions[self._positions["date"] == last_date]
        result["sellable"] = result["shares"]
        return result[list(dtype.names)].astype(dtype)

    result = self._positions[self._positions["date"] == dt]
    if result.size == 0:
        raise ValueError(f"{dt} not found")

    return result[list(dtype.names)].astype(dtype)

get_returns(self, start_date=None, end_date=None) async

求截止end_date时的每日回报

Parameters:

Name Type Description Default
start_date date

计算回报的起始日期

None
end_date

计算回报的结束日期

None

Returns:

Type Description
ndarray

以百分比为单位的每日回报率,索引为对应日期

Source code in backtest/trade/broker.py
async def get_returns(
    self, start_date: datetime.date = None, end_date: datetime.date = None
) -> np.ndarray:
    """求截止`end_date`时的每日回报

    Args:
        start_date: 计算回报的起始日期
        end_date : 计算回报的结束日期

    Returns:
        以百分比为单位的每日回报率,索引为对应日期
    """
    start = start_date or self.account_start_date

    # 当计算[start, end]之间的每日回报时,需要取多一日,即`start`之前一日的总资产
    _start = tf.day_shift(start, -1)
    end = end_date or self.account_end_date

    assert self.account_start_date <= start <= end
    assert start <= end <= self.account_end_date

    if not self._bt_stopped:
        await self.recalc_assets()

    assets = self._assets[
        (self._assets["date"] >= _start) & (self._assets["date"] <= end)
    ]

    if assets.size == 0:
        raise ValueError(f"date range error: {start} - {end} contains no data")

    return assets["assets"][1:] / assets["assets"][:-1] - 1

get_unclosed_trades(self, dt)

获取dt当天未平仓的交易

如果dt小于首次交易日,则返回空,否则,如果当日无数据,将从上一个有数据之日起,进行补齐填充。

Source code in backtest/trade/broker.py
def get_unclosed_trades(self, dt: datetime.date) -> set:
    """获取`dt`当天未平仓的交易

    如果`dt`小于首次交易日,则返回空,否则,如果当日无数据,将从上一个有数据之日起,进行补齐填充。
    """
    if len(self._unclosed_trades) == 0:
        return set()

    result = self._unclosed_trades.get(dt)
    if result is None:
        start = sorted(self._unclosed_trades.keys())[0]
        if dt < start:
            return set()
        else:
            self._fillup_unclosed_trades(dt)

    return self._unclosed_trades.get(dt)

info(self, dt=None) async

dt日的账号相关信息

Returns:

Type Description
Dict

账号相关信息:

  • name: str, 账户名
  • principal: float, 初始资金
  • assets: float, dt日资产
  • start: datetime.date, 账户创建时间
  • end: 账户结束时间,仅在回测模式下有效
  • bt_stopped: 回测是否结束,仅在回测模式下有效。
  • last_trade: datetime.datetime, 最后一笔交易时间
  • available: float, dt日可用资金
  • market_value: dt日股票市值
  • pnl: dt盈亏(绝对值)
  • ppnl: 盈亏(百分比),即pnl/principal
  • positions: 当前持仓,dtype为position_dtype的numpy structured array
Source code in backtest/trade/broker.py
async def info(self, dt: datetime.date = None) -> Dict:
    """`dt`日的账号相关信息

    Returns:
        Dict: 账号相关信息:

        - name: str, 账户名
        - principal: float, 初始资金
        - assets: float, `dt`日资产
        - start: datetime.date, 账户创建时间
        - end: 账户结束时间,仅在回测模式下有效
        - bt_stopped: 回测是否结束,仅在回测模式下有效。
        - last_trade: datetime.datetime, 最后一笔交易时间
        - available: float, `dt`日可用资金
        - market_value: `dt`日股票市值
        - pnl: `dt`盈亏(绝对值)
        - ppnl: 盈亏(百分比),即pnl/principal
        - positions: 当前持仓,dtype为position_dtype的numpy structured array

    """
    dt = dt or self.last_trade_date

    cash = self.get_cash(dt)
    assets = await self.get_assets(dt)

    return {
        "name": self.account_name,
        "principal": self.principal,
        "start": self.account_start_date,
        "end": self.bt_stop,
        "bt_stopped": self._bt_stopped,
        "last_trade": self.last_trade_date,
        "assets": assets,
        "available": cash,
        "market_value": assets - cash,
        "pnl": assets - self.principal,
        "ppnl": assets / self.principal - 1,
        "positions": self.get_position(dt),
    }

metrics(self, start=None, end=None, baseline='399300.XSHE') async

获取指定时间段的账户指标

Parameters:

Name Type Description Default
start Optional[datetime.date]

开始时间

None
end Optional[datetime.date]

结束时间

None
baseline Optional[str]

参考标的

'399300.XSHE'

Returns:

Type Description
Dict

指标字典,其key为

  • start 回测起始时间
  • end 回测结束时间
  • window 资产暴露时间
  • total_tx 发生的配对交易次数
  • total_profit 总盈亏
  • total_profit_rate 总盈亏率
  • win_rate 胜率
  • mean_return 每笔配对交易平均回报率
  • sharpe 夏普比率
  • max_drawdown 最大回撤
  • sortino
  • calmar
  • annual_return 年化收益率
  • volatility 波动率
  • baseline: dict
    • win_rate
    • sharpe
    • max_drawdown
    • sortino
    • annual_return
    • total_profit_rate
    • volatility
Source code in backtest/trade/broker.py
async def metrics(
    self,
    start: Optional[datetime.date] = None,
    end: Optional[datetime.date] = None,
    baseline: Optional[str] = "399300.XSHE",
) -> Dict:
    """获取指定时间段的账户指标

    Args:
        start: 开始时间
        end: 结束时间
        baseline: 参考标的

    Returns:
        Dict: 指标字典,其key为

        - start 回测起始时间
        - end   回测结束时间
        - window 资产暴露时间
        - total_tx 发生的配对交易次数
        - total_profit 总盈亏
        - total_profit_rate 总盈亏率
        - win_rate 胜率
        - mean_return 每笔配对交易平均回报率
        - sharpe    夏普比率
        - max_drawdown 最大回撤
        - sortino
        - calmar
        - annual_return 年化收益率
        - volatility 波动率
        - baseline: dict
            - win_rate
            - sharpe
            - max_drawdown
            - sortino
            - annual_return
            - total_profit_rate
            - volatility

    """
    try:
        rf = cfg.metrics.risk_free_rate / cfg.metrics.annual_days
    except Exception:
        rf = 0

    start = min(start or self.account_start_date, self.account_start_date)
    end = max(end or self.account_end_date, self.account_end_date)

    tx = []
    logger.info("%s tx in total", len(self.transactions))
    for t in self.transactions:
        if t.entry_time.date() >= start and t.exit_time.date() <= end:
            tx.append(t)
        else:
            logger.info(
                "tx %s not in range, start: %s, end: %s",
                t.sec,
                t.entry_time,
                t.exit_time,
            )

    # 资产暴露时间
    window = tf.count_day_frames(start, end)
    total_tx = len(tx)

    if total_tx == 0:
        return {
            "start": start,
            "end": end,
            "window": window,
            "total_tx": total_tx,
            "total_profit": None,
            "total_profit_rate": None,
            "win_rate": None,
            "mean_return": None,
            "sharpe": None,
            "sortino": None,
            "calmar": None,
            "max_drawdown": None,
            "annual_return": None,
            "volatility": None,
            "baseline": None,
        }

    # win_rate
    wr = len([t for t in tx if t.profit > 0]) / total_tx

    if not self._bt_stopped:
        await self.recalc_assets()

    # 当计算[start, end]之间的盈亏时,我们实际上要多取一个交易日,即start之前一个交易日的资产数据
    _start = tf.day_shift(start, -1)
    total_profit = await self.get_assets(end) - await self.get_assets(_start)

    returns = await self.get_returns(start, end)
    mean_return = np.mean(returns)

    sharpe = sharpe_ratio(returns, rf)
    sortino = sortino_ratio(returns, rf)
    calma = calmar_ratio(returns)
    mdd = max_drawdown(returns)

    # 年化收益率
    ar = annual_return(returns)

    # 年化波动率
    vr = annual_volatility(returns)

    # 计算参考标的的相关指标
    if baseline is not None:
        ref_bars = await Stock.get_bars_in_range(
            baseline, FrameType.DAY, start, end
        )

        if ref_bars.size < 2:
            ref_results = None
        else:
            returns = ref_bars["close"][1:] / ref_bars["close"][:-1] - 1

            ref_results = {
                "total_profit_rate": cum_returns_final(returns),
                "win_rate": np.count_nonzero(returns > 0) / len(returns),
                "mean_return": np.mean(returns).item(),
                "sharpe": sharpe_ratio(returns, rf),
                "sortino": sortino_ratio(returns, rf),
                "calmar": calmar_ratio(returns),
                "max_drawdown": max_drawdown(returns),
                "annual_return": annual_return(returns),
                "volatility": annual_volatility(returns),
            }
    else:
        ref_results = None

    return {
        "start": start,
        "end": end,
        "window": window,
        "total_tx": total_tx,
        "total_profit": total_profit,
        "total_profit_rate": total_profit / self.principal,
        "win_rate": wr,
        "mean_return": mean_return,
        "sharpe": sharpe,
        "sortino": sortino,
        "calmar": calma,
        "max_drawdown": mdd,
        "annual_return": ar,
        "volatility": vr,
        "baseline": ref_results,
    }

recalc_assets(self, start=None, end=None) async

重新计算账户的每日资产

计算完成后,资产表将包括从账户开始前一日,到end日的资产数据。从账户开始前一日起,是为了方便计算首个交易日的收益。

Parameters:

Name Type Description Default
end Optional[datetime.date]

计算到哪一天的资产,默认为空,即计算到最后一个交易日(非回测),或者回测结束日。

None
Source code in backtest/trade/broker.py
async def recalc_assets(
    self, start: Optional[datetime.date] = None, end: Optional[datetime.date] = None
):
    """重新计算账户的每日资产

    计算完成后,资产表将包括从账户开始前一日,到`end`日的资产数据。从账户开始前一日起,是为了方便计算首个交易日的收益。

    Args:
        end: 计算到哪一天的资产,默认为空,即计算到最后一个交易日(非回测),或者回测结束日。

    """
    if end is None:
        if self.mode != "bt":  # 非回测下计算到当下
            end = arrow.now().date()
        else:  # 回测时计算到bt_stop
            end = self.bt_stop

    # 把期初资产加进来
    if self._assets.size == 0:
        start = self.account_start_date
        if start is None:
            return np.array([], dtype=rich_assets_dtype)

        _before_start = tf.day_shift(start, -1)
        self._assets = np.array(
            [(_before_start, self.principal)], dtype=assets_dtype
        )

    start = start or tf.day_shift(self._assets[-1]["date"], 1)
    if start >= end:
        return

    # 待补齐的资产日
    frames = [tf.int2date(d) for d in tf.get_frames(start, end, FrameType.DAY)]
    # 从最后一个资产日到`end`,持仓应都是一样的
    position = self.get_position(end, position_dtype)
    if position.size == 0:
        assets = self._assets[-1]["assets"]
        self._assets = np.concatenate(
            (
                self._assets,
                np.array([(frame, assets) for frame in frames], dtype=assets_dtype),
            )
        )

        return

    secs = position[position["shares"] != 0]["security"]
    shares = {
        sec: position[position["security"] == sec]["shares"][0] for sec in secs
    }

    if len(secs):
        feed = get_app_context().feed
        closes = await feed.batch_get_close_price_in_range(secs, frames)

    for frame in frames:
        cash = self.get_cash(frame)

        mv = 0
        for sec in secs:
            if closes.get(sec) is None:
                price = position[position["security"] == sec]["price"].item()
                mv += shares.get(sec, 0) * price
            else:
                iclose = self._index_of(closes[sec], frame, "frame")
                mv += closes[sec][iclose]["close"] * shares.get(sec, 0)

        i = self._index_of(self._assets, frame)
        if i is None:
            self._assets = np.append(
                self._assets,
                np.array([(frame, float(cash + mv))], dtype=assets_dtype),
                axis=0,
            )
        else:
            self._assets[i]["assets"] = float(cash + mv)

sell(self, security, bid_price, bid_shares, bid_time) async

卖出委托

Parameters:

Name Type Description Default
security str

委托证券代码

required
bid_price float

出售价格,如果为None,则为市价委托

required
bid_shares float

询卖股数。注意我们不限制必须以100的倍数卖出。

required
bid_time datetime.datetime

委托时间

required

Returns:

Type Description
List[backtest.trade.trade.Trade]

成交记录列表,每个元素都是一个Trade对象

Source code in backtest/trade/broker.py
async def sell(
    self,
    security: str,
    bid_price: Union[None, float],
    bid_shares: float,
    bid_time: datetime.datetime,
) -> List[Trade]:
    """卖出委托

    Args:
        security str: 委托证券代码
        bid_price float: 出售价格,如果为None,则为市价委托
        bid_shares float: 询卖股数。注意我们不限制必须以100的倍数卖出。
        bid_time datetime.datetime: 委托时间

    Returns:
        成交记录列表,每个元素都是一个[Trade][backtest.trade.trade.Trade]对象

    """
    # 同一个账户,也可能出现并发的买单和卖单,这些操作必须串行化
    async with self.lock:
        return await self._sell(security, bid_price, bid_shares, bid_time)

datatypes

assets_dtype

the assets dtype as the following:

``` np.dtype( [ ("date", "O"), ("assets", "<f8") ] )

daily_position_dtype

the position dtype which usually used in backtest server internally:

1
2
3
4
5
6
7
8
9
np.dtype(
    [
        ("date", "O"),
        ("security", "O"),
        ("shares", "<f8"),
        ("sellable", "<f8"),
        ("price", "<f8"),
    ]
)

float_ts_dtype

generic date-float dtype as the following:

``` np.dtype( [ ("date", "O"), ("value", "<f8") ] )

position_dtype

the position dtype which will return back to trader-client

1
2
3
4
5
6
7
8
np.dtype(
    [
        ("security", "O"),
        ("shares", "<f8"),
        ("sellable", "<f8"),
        ("price", "<f8")
    ]
)

rich_assets_dtype

the rich assets dtype as the following:

1
2
3
4
5
6
7
8
np.dtype(
    [
        ("date", "O"),
        ("assets", "<f8"),
        ("cash", "<f8"),
        ("mv", "<f8")
    ]
)

BidType (IntEnum)

An enumeration.

Source code in backtest/trade/datatypes.py
class BidType(IntEnum):
    LIMIT = 1
    MARKET = 2

    def __str__(self):
        return {BidType.LIMIT: "限价委托", BidType.MARKET: "市价委托"}.get(self)

EntrustSide (IntEnum)

An enumeration.

Source code in backtest/trade/datatypes.py
class EntrustSide(IntEnum):
    BUY = 1
    SELL = -1
    XDXR = 0

    def __str__(self):
        return {
            EntrustSide.BUY: "买入",
            EntrustSide.SELL: "卖出",
            EntrustSide.XDXR: "分红配股",
        }[self]

trade

Trade

Trade对象代表了一笔已成功完成的委托。一个委托可能对应多个Trade,特别是当卖出的时候

Source code in backtest/trade/trade.py
class Trade:
    """Trade对象代表了一笔已成功完成的委托。一个委托可能对应多个Trade,特别是当卖出的时候"""

    def __init__(
        self,
        eid: str,
        security: str,
        price: float,
        shares: int,
        fee: float,
        side: EntrustSide,
        time: datetime.datetime,
    ):
        """Trade对象代表了一笔已成功的委托(即已完成的交易)

        Args:
            eid: 对应的委托号
            security: 证券代码
            price: 交易价格
            shares: 交易数量
            fee: 交易手续费
            time: 交易时间
        """
        self.eid = eid
        self.tid = str(uuid.uuid4())
        self.security = security

        self.fee = fee
        self.price = price
        self.shares = shares
        self.time = time

        self.side = side

        # only for buying trade
        self._unsell = shares
        self._unamortized_fee = fee
        self.closed = False

        if side == EntrustSide.XDXR:
            logger.info("XDXR entrust: %s", self)

    def __str__(self):
        return f"证券代码: {self.security}\n成交方向: {self.side}\n成交均价: {self.price}\n数量: {self.shares}\n手续费: {self.fee}\n委托号: {self.eid}\n成交号: {self.tid}\n成交时间: {self.time}\n"

    def to_dict(self) -> dict:
        """将Trade对象转换为字典格式。

        Returns:
            Dict: 返回值,其key为

            - tid: 交易号
            - eid: 委托号
            - security: 证券代码
            - price: 交易价格
            - filled: 居交数量
            - trade_fees: 交易手续费
            - order_side: 交易方向
            - time: 交易时间

        """
        return {
            "tid": str(self.tid),
            "eid": str(self.eid),
            "security": self.security,
            "order_side": str(self.side),
            "price": self.price,
            "filled": self.shares,
            "time": self.time.isoformat(),
            "trade_fees": self.fee,
        }

    def sell(
        self, shares: float, price: float, fee: float, close_time: datetime.datetime
    ):
        """从当前未售出股中售出。

        计算时将根据售出的股数,分摊买入和卖的交易成本。返回未售出的股份和未分摊的成本。

        Args:
            shares: 待出售股数
            price: 出售价格
            fee: 交易手续费
            close_time: 成交日期
        """
        assert self.side in (EntrustSide.BUY, EntrustSide.XDXR)

        if not self.closed:
            sec = self.security
            assert self._unsell > 0, str(self) + "状态错误,无法售出,请检查代码"

            sellable = min(shares, self._unsell)

            # 计算本次交易的收益,并分摊交易成本
            amortized_buy_fee = self.fee * sellable / self.shares
            amortized_sell_fee = fee * sellable / shares

            self._unsell -= sellable
            self._unamortized_fee -= amortized_buy_fee

            if self._unsell == 0:
                logger.debug("交易%s (%s)已close.", self.security, self.tid)
                self.closed = True

            trade = Trade(
                self.eid,
                sec,
                price,
                sellable,
                amortized_sell_fee,
                EntrustSide.SELL,
                close_time,
            )

            tx = Transaction(
                sec,
                self.time,
                close_time,
                self.price,
                price,
                sellable,
                amortized_buy_fee + amortized_sell_fee,
            )

            return shares - sellable, fee - amortized_sell_fee, trade, tx

__init__(self, eid, security, price, shares, fee, side, time) special

Trade对象代表了一笔已成功的委托(即已完成的交易)

Parameters:

Name Type Description Default
eid str

对应的委托号

required
security str

证券代码

required
price float

交易价格

required
shares int

交易数量

required
fee float

交易手续费

required
time datetime

交易时间

required
Source code in backtest/trade/trade.py
def __init__(
    self,
    eid: str,
    security: str,
    price: float,
    shares: int,
    fee: float,
    side: EntrustSide,
    time: datetime.datetime,
):
    """Trade对象代表了一笔已成功的委托(即已完成的交易)

    Args:
        eid: 对应的委托号
        security: 证券代码
        price: 交易价格
        shares: 交易数量
        fee: 交易手续费
        time: 交易时间
    """
    self.eid = eid
    self.tid = str(uuid.uuid4())
    self.security = security

    self.fee = fee
    self.price = price
    self.shares = shares
    self.time = time

    self.side = side

    # only for buying trade
    self._unsell = shares
    self._unamortized_fee = fee
    self.closed = False

    if side == EntrustSide.XDXR:
        logger.info("XDXR entrust: %s", self)

sell(self, shares, price, fee, close_time)

从当前未售出股中售出。

计算时将根据售出的股数,分摊买入和卖的交易成本。返回未售出的股份和未分摊的成本。

Parameters:

Name Type Description Default
shares float

待出售股数

required
price float

出售价格

required
fee float

交易手续费

required
close_time datetime

成交日期

required
Source code in backtest/trade/trade.py
def sell(
    self, shares: float, price: float, fee: float, close_time: datetime.datetime
):
    """从当前未售出股中售出。

    计算时将根据售出的股数,分摊买入和卖的交易成本。返回未售出的股份和未分摊的成本。

    Args:
        shares: 待出售股数
        price: 出售价格
        fee: 交易手续费
        close_time: 成交日期
    """
    assert self.side in (EntrustSide.BUY, EntrustSide.XDXR)

    if not self.closed:
        sec = self.security
        assert self._unsell > 0, str(self) + "状态错误,无法售出,请检查代码"

        sellable = min(shares, self._unsell)

        # 计算本次交易的收益,并分摊交易成本
        amortized_buy_fee = self.fee * sellable / self.shares
        amortized_sell_fee = fee * sellable / shares

        self._unsell -= sellable
        self._unamortized_fee -= amortized_buy_fee

        if self._unsell == 0:
            logger.debug("交易%s (%s)已close.", self.security, self.tid)
            self.closed = True

        trade = Trade(
            self.eid,
            sec,
            price,
            sellable,
            amortized_sell_fee,
            EntrustSide.SELL,
            close_time,
        )

        tx = Transaction(
            sec,
            self.time,
            close_time,
            self.price,
            price,
            sellable,
            amortized_buy_fee + amortized_sell_fee,
        )

        return shares - sellable, fee - amortized_sell_fee, trade, tx

to_dict(self)

将Trade对象转换为字典格式。

Returns:

Type Description
Dict

返回值,其key为

  • tid: 交易号
  • eid: 委托号
  • security: 证券代码
  • price: 交易价格
  • filled: 居交数量
  • trade_fees: 交易手续费
  • order_side: 交易方向
  • time: 交易时间
Source code in backtest/trade/trade.py
def to_dict(self) -> dict:
    """将Trade对象转换为字典格式。

    Returns:
        Dict: 返回值,其key为

        - tid: 交易号
        - eid: 委托号
        - security: 证券代码
        - price: 交易价格
        - filled: 居交数量
        - trade_fees: 交易手续费
        - order_side: 交易方向
        - time: 交易时间

    """
    return {
        "tid": str(self.tid),
        "eid": str(self.eid),
        "security": self.security,
        "order_side": str(self.side),
        "price": self.price,
        "filled": self.shares,
        "time": self.time.isoformat(),
        "trade_fees": self.fee,
    }

transaction

Transaction

包括了买和卖的一次完整交易

Source code in backtest/trade/transaction.py
class Transaction:
    """包括了买和卖的一次完整交易"""

    def __init__(
        self,
        sec: str,
        entry_time: datetime.datetime,
        exit_time: datetime.datetime,
        entry_price: float,
        exit_price: float,
        shares: float,
        fee: float,
    ):
        self.sec = sec
        self.entry_time = entry_time
        self.exit_time = exit_time
        self.entry_price = entry_price
        self.exit_price = exit_price
        self.shares = shares
        self.fee = fee

        self.profit = (exit_price - entry_price) * shares - fee
        self.pprofit = self.profit / (entry_price * shares)

        try:  # 如果omicron未初始化,则不计算资产暴露窗口
            self.window = tf.count_day_frames(entry_time, exit_time)
        except Exception:
            pass

    def __str__(self):
        return f"{self.sec} {self.entry_time}买入({self.entry_price}, {self.exit_time}卖出({self.exit_price}), profit {self.exit_price/self.entry_price - 1:.2%}"