Skip to content

strategy

base

策略基类

子类派生

  1. 从此基类派生出一个策略子类,比如sma.py
  2. 子类需要重载predict方法,根据当前传入的时间帧和帧类型参数,获取数据并进行处理,评估出交易信号
  3. 子类根据交易信号,在predict方法里,调用基类的buysell方法来进行交易
  4. 子类调用backtest方法来进行回测,该方法将根据策略构建时指定的回测起始时间、终止时间、帧类型,逐帧生成各个时间帧,并调用子类的predict方法
  5. 在交易结束时,调用plot_metrics方法来获取如下所示的回测指标图

如何派生子类,可以参考sma源代码。

回测

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from omicron.strategy.sma import SMAStrategy
sma = SMAStrategy(
    "600000.XSHG",
    url="", # the url of either backtest server, or trade server
    is_backtest=True,
    start=datetime.date(2023, 2, 3),
    end=datetime.date(2023, 4, 28),
    frame_type=FrameType.DAY,
)

await sma.backtest(stop_on_error=True)

实盘

在实盘环境下,你还需要在子类中加入周期性任务(比如每分钟执行一次),在该任务中调用predict方法来完成交易。

BaseStrategy

Source code in omicron/strategy/base.py
class BaseStrategy:
    def __init__(
        self,
        url: str,
        account: Optional[str] = None,
        token: Optional[str] = None,
        name: Optional[str] = None,
        ver: Optional[str] = None,
        is_backtest: bool = True,
        start: Optional[Frame] = None,
        end: Optional[Frame] = None,
        frame_type: Optional[Frame] = None,
        baseline: Optional[str] = "399300.XSHE",
    ):
        """构造函数

        Args:
            url: 实盘/回测服务器的地址。
            start: 回测起始日期。回测模式下必须传入。
            end: 回测结束日期。回测模式下必须传入。
            account: 实盘/回测账号。实盘模式下必须传入。在回测模式下,如果未传入,将以策略名+随机字符构建账号。
            token: 实盘/回测时用的token。实盘模式下必须传入。在回测模式下,如果未传入,将自动生成。
            is_backtest: 是否为回测模式。
            name: 策略名。如果不传入,则使用类名字小写
            ver: 策略版本号。如果不传入,则默认为0.1.
            start: 如果是回测模式,则需要提供回测起始时间
            end: 如果是回测模式,则需要提供回测结束时间
            frame_type: 如果是回测模式,则需要提供回测时使用的主周期
            baseline: 如果是回测模式,则可以提供此参数作为回测基准
        """
        self.ver = ver or "0.1"
        self.name = name or self.__class__.__name__.lower() + f"_v{self.ver}"

        self.token = token or uuid.uuid4().hex
        self.account = account or f"smallcap-{self.token[-4:]}"

        self.url = url

        if is_backtest:
            self.bills = None
            self.metrics = None
            self._bt_start = start
            self._bt_end = end
            self._frame_type = frame_type
            self.broker = TraderClient(
                url,
                self.account,
                self.token,
                is_backtest=True,
                start=self._bt_start,
                end=self._bt_end,
            )
            self._baseline = baseline
        else:
            if account is None or token is None:
                raise ValueError("account and token must be presented.")

            self.broker = TraderClient(url, self.account, self.token, is_backtest=False)

    async def backtest(self, stop_on_error: bool = False, **kwargs):
        converter = (
            tf.int2date if self._frame_type in tf.day_level_frames else tf.int2time
        )
        for i, frame in enumerate(
            tf.get_frames(self._bt_start, self._bt_end, self._frame_type)  # type: ignore
        ):
            try:
                await self.predict(
                    converter(frame), self._frame_type, i, **kwargs  # type: ignore
                )
            except Exception as e:
                logger.exception(e)
                if stop_on_error:
                    raise e

        self.broker.stop_backtest()
        self.bills = self.broker.bills()
        self.metrics = self.broker.metrics(baseline=self._baseline)

    @property
    def cash(self):
        """返回当前可用现金"""
        return self.broker.available_money

    def positions(self, dt: Optional[datetime.date] = None):
        """返回当前持仓"""
        return self.broker.positions(dt)

    def available_shares(self, sec: str, dt: Optional[Frame] = None):
        """返回给定股票当前可售股数。"""
        return self.broker.available_shares(sec, dt)

    async def buy(
        self,
        sec: str,
        price: Optional[float] = None,
        vol: Optional[int] = None,
        money: Optional[float] = None,
        order_time: Optional[datetime.datetime] = None,
    ) -> Dict:
        """买入股票

        Args:
            sec: 证券代码
            price: 委买价。如果为None,则自动转市价买入。
            vol: 委买股数。请自行保证为100的整数。如果为None, 则money必须传入。
            money: 委买金额。如果同时传入了vol,则此参数自动忽略
            order_time: 仅在回测模式下需要提供。实盘模式下,此参数自动被忽略
        Returns:
            见traderclient中的`buy`方法。
        """
        if vol is None:
            if money is None:
                raise ValueError("parameter `mnoey` must be presented!")
            return await self.broker.buy_by_money(
                sec, money, price, order_time=order_time
            )
        elif price is None:
            return self.broker.market_buy(sec, vol, order_time=order_time)
        else:
            return self.broker.buy(sec, price, vol, order_time=order_time)

    async def sell(
        self,
        sec: str,
        price: Optional[float] = None,
        vol: Optional[float] = None,
        percent: Optional[float] = None,
        order_time: Optional[datetime.datetime] = None,
    ) -> Union[List, Dict]:
        """卖出股票

        Args:
            sec: 证券代码
            price: 委卖价,如果未提供,则转为市价单
            vol: 委卖股数。如果为None,则percent必须传入
            percent: 卖出一定比例的持仓,取值介于0与1之间。如果与vol同时提供,此参数将被忽略。请自行保证按比例换算后的卖出数据是符合要求的(比如不为100的倍数,但有些情况下这是允许的,所以程序这里无法帮你判断)
            order_time: 仅在回测模式下需要提供。实盘模式下,此参数自动被忽略

        Returns:
            Union[List, Dict]: 成交返回,详见traderclient中的`buy`方法,trade server只返回一个委托单信息
        """
        if vol is None and percent is None:
            raise ValueError("either vol or percent must be presented")

        if vol is None:
            if price is None:
                price = await self.broker._get_market_sell_price(
                    sec, order_time=order_time
                )
            # there's no market_sell_percent API in traderclient
            return self.broker.sell_percent(sec, price, percent, order_time=order_time)  # type: ignore
        else:
            if price is None:
                return self.broker.market_sell(sec, vol, order_time=order_time)
            else:
                return self.broker.sell(sec, price, vol, order_time=order_time)

    async def filter_paused_stock(self, buylist: List[str], dt: datetime.date):
        secs = await Security.select(dt).eval()
        in_trading = jq.get_price(
            secs, fields=["paused"], start_date=dt, end_date=dt, skip_paused=True
        )["code"].to_numpy()

        return np.intersect1d(buylist, in_trading)

    async def predict(self, frame: Frame, frame_type: FrameType, i: int, **kwargs):
        """策略评估函数。在此函数中实现交易信号检测和处理。

        Args:
            frame: 当前时间帧
            frame_type: 处理的数据主周期
            i: 当前时间离回测起始的单位数
        """
        raise NotImplementedError

    async def plot_metrics(self):
        if self.bills is None or self.metrics is None:
            raise ValueError("Please run `start_backtest` first.")

        mg = MetricsGraph(self.bills, self.metrics, baseline_code=self._baseline)
        await mg.plot()

cash property readonly

返回当前可用现金

__init__(self, url, account=None, token=None, name=None, ver=None, is_backtest=True, start=None, end=None, frame_type=None, baseline='399300.XSHE') special

构造函数

Parameters:

Name Type Description Default
url str

实盘/回测服务器的地址。

required
start Union[datetime.date, datetime.datetime]

回测起始日期。回测模式下必须传入。

None
end Union[datetime.date, datetime.datetime]

回测结束日期。回测模式下必须传入。

None
account Optional[str]

实盘/回测账号。实盘模式下必须传入。在回测模式下,如果未传入,将以策略名+随机字符构建账号。

None
token Optional[str]

实盘/回测时用的token。实盘模式下必须传入。在回测模式下,如果未传入,将自动生成。

None
is_backtest bool

是否为回测模式。

True
name Optional[str]

策略名。如果不传入,则使用类名字小写

None
ver Optional[str]

策略版本号。如果不传入,则默认为0.1.

None
start Union[datetime.date, datetime.datetime]

如果是回测模式,则需要提供回测起始时间

None
end Union[datetime.date, datetime.datetime]

如果是回测模式,则需要提供回测结束时间

None
frame_type Union[datetime.date, datetime.datetime]

如果是回测模式,则需要提供回测时使用的主周期

None
baseline Optional[str]

如果是回测模式,则可以提供此参数作为回测基准

'399300.XSHE'
Source code in omicron/strategy/base.py
def __init__(
    self,
    url: str,
    account: Optional[str] = None,
    token: Optional[str] = None,
    name: Optional[str] = None,
    ver: Optional[str] = None,
    is_backtest: bool = True,
    start: Optional[Frame] = None,
    end: Optional[Frame] = None,
    frame_type: Optional[Frame] = None,
    baseline: Optional[str] = "399300.XSHE",
):
    """构造函数

    Args:
        url: 实盘/回测服务器的地址。
        start: 回测起始日期。回测模式下必须传入。
        end: 回测结束日期。回测模式下必须传入。
        account: 实盘/回测账号。实盘模式下必须传入。在回测模式下,如果未传入,将以策略名+随机字符构建账号。
        token: 实盘/回测时用的token。实盘模式下必须传入。在回测模式下,如果未传入,将自动生成。
        is_backtest: 是否为回测模式。
        name: 策略名。如果不传入,则使用类名字小写
        ver: 策略版本号。如果不传入,则默认为0.1.
        start: 如果是回测模式,则需要提供回测起始时间
        end: 如果是回测模式,则需要提供回测结束时间
        frame_type: 如果是回测模式,则需要提供回测时使用的主周期
        baseline: 如果是回测模式,则可以提供此参数作为回测基准
    """
    self.ver = ver or "0.1"
    self.name = name or self.__class__.__name__.lower() + f"_v{self.ver}"

    self.token = token or uuid.uuid4().hex
    self.account = account or f"smallcap-{self.token[-4:]}"

    self.url = url

    if is_backtest:
        self.bills = None
        self.metrics = None
        self._bt_start = start
        self._bt_end = end
        self._frame_type = frame_type
        self.broker = TraderClient(
            url,
            self.account,
            self.token,
            is_backtest=True,
            start=self._bt_start,
            end=self._bt_end,
        )
        self._baseline = baseline
    else:
        if account is None or token is None:
            raise ValueError("account and token must be presented.")

        self.broker = TraderClient(url, self.account, self.token, is_backtest=False)

available_shares(self, sec, dt=None)

返回给定股票当前可售股数。

Source code in omicron/strategy/base.py
def available_shares(self, sec: str, dt: Optional[Frame] = None):
    """返回给定股票当前可售股数。"""
    return self.broker.available_shares(sec, dt)

buy(self, sec, price=None, vol=None, money=None, order_time=None) async

买入股票

Parameters:

Name Type Description Default
sec str

证券代码

required
price Optional[float]

委买价。如果为None,则自动转市价买入。

None
vol Optional[int]

委买股数。请自行保证为100的整数。如果为None, 则money必须传入。

None
money Optional[float]

委买金额。如果同时传入了vol,则此参数自动忽略

None
order_time Optional[datetime.datetime]

仅在回测模式下需要提供。实盘模式下,此参数自动被忽略

None

Returns:

Type Description
Dict

见traderclient中的buy方法。

Source code in omicron/strategy/base.py
async def buy(
    self,
    sec: str,
    price: Optional[float] = None,
    vol: Optional[int] = None,
    money: Optional[float] = None,
    order_time: Optional[datetime.datetime] = None,
) -> Dict:
    """买入股票

    Args:
        sec: 证券代码
        price: 委买价。如果为None,则自动转市价买入。
        vol: 委买股数。请自行保证为100的整数。如果为None, 则money必须传入。
        money: 委买金额。如果同时传入了vol,则此参数自动忽略
        order_time: 仅在回测模式下需要提供。实盘模式下,此参数自动被忽略
    Returns:
        见traderclient中的`buy`方法。
    """
    if vol is None:
        if money is None:
            raise ValueError("parameter `mnoey` must be presented!")
        return await self.broker.buy_by_money(
            sec, money, price, order_time=order_time
        )
    elif price is None:
        return self.broker.market_buy(sec, vol, order_time=order_time)
    else:
        return self.broker.buy(sec, price, vol, order_time=order_time)

positions(self, dt=None)

返回当前持仓

Source code in omicron/strategy/base.py
def positions(self, dt: Optional[datetime.date] = None):
    """返回当前持仓"""
    return self.broker.positions(dt)

predict(self, frame, frame_type, i, **kwargs) async

策略评估函数。在此函数中实现交易信号检测和处理。

Parameters:

Name Type Description Default
frame Union[datetime.date, datetime.datetime]

当前时间帧

required
frame_type FrameType

处理的数据主周期

required
i int

当前时间离回测起始的单位数

required
Source code in omicron/strategy/base.py
async def predict(self, frame: Frame, frame_type: FrameType, i: int, **kwargs):
    """策略评估函数。在此函数中实现交易信号检测和处理。

    Args:
        frame: 当前时间帧
        frame_type: 处理的数据主周期
        i: 当前时间离回测起始的单位数
    """
    raise NotImplementedError

sell(self, sec, price=None, vol=None, percent=None, order_time=None) async

卖出股票

Parameters:

Name Type Description Default
sec str

证券代码

required
price Optional[float]

委卖价,如果未提供,则转为市价单

None
vol Optional[float]

委卖股数。如果为None,则percent必须传入

None
percent Optional[float]

卖出一定比例的持仓,取值介于0与1之间。如果与vol同时提供,此参数将被忽略。请自行保证按比例换算后的卖出数据是符合要求的(比如不为100的倍数,但有些情况下这是允许的,所以程序这里无法帮你判断)

None
order_time Optional[datetime.datetime]

仅在回测模式下需要提供。实盘模式下,此参数自动被忽略

None

Returns:

Type Description
Union[List, Dict]

成交返回,详见traderclient中的buy方法,trade server只返回一个委托单信息

Source code in omicron/strategy/base.py
async def sell(
    self,
    sec: str,
    price: Optional[float] = None,
    vol: Optional[float] = None,
    percent: Optional[float] = None,
    order_time: Optional[datetime.datetime] = None,
) -> Union[List, Dict]:
    """卖出股票

    Args:
        sec: 证券代码
        price: 委卖价,如果未提供,则转为市价单
        vol: 委卖股数。如果为None,则percent必须传入
        percent: 卖出一定比例的持仓,取值介于0与1之间。如果与vol同时提供,此参数将被忽略。请自行保证按比例换算后的卖出数据是符合要求的(比如不为100的倍数,但有些情况下这是允许的,所以程序这里无法帮你判断)
        order_time: 仅在回测模式下需要提供。实盘模式下,此参数自动被忽略

    Returns:
        Union[List, Dict]: 成交返回,详见traderclient中的`buy`方法,trade server只返回一个委托单信息
    """
    if vol is None and percent is None:
        raise ValueError("either vol or percent must be presented")

    if vol is None:
        if price is None:
            price = await self.broker._get_market_sell_price(
                sec, order_time=order_time
            )
        # there's no market_sell_percent API in traderclient
        return self.broker.sell_percent(sec, price, percent, order_time=order_time)  # type: ignore
    else:
        if price is None:
            return self.broker.market_sell(sec, vol, order_time=order_time)
        else:
            return self.broker.sell(sec, price, vol, order_time=order_time)

sma

SMAStrategy (BaseStrategy)

Source code in omicron/strategy/sma.py
class SMAStrategy(BaseStrategy):
    def __init__(self, sec: str, n_short: int = 5, n_long: int = 10, *args, **kwargs):
        self._sec = sec
        self._n_short = n_short
        self._n_long = n_long

        super().__init__(*args, **kwargs)

    async def predict(self, frame: Frame, frame_type: FrameType, i: int):
        n = max(self._n_short, self._n_long) - 1
        bars = await Stock.get_bars(self._sec, n, frame_type, end=frame)

        if len(bars) < n:
            return

        ma_short = np.mean(bars["close"][-self._n_short :])
        ma_long = np.mean(bars["close"][-self._n_long :])

        if ma_short > ma_long:
            await self.buy(
                self._sec, money=self.cash, order_time=tf.combine_time(frame, 14, 55)
            )
        elif ma_short < ma_long:
            await self.sell(
                self._sec, percent=1.0, order_time=tf.combine_time(frame, 14, 55)
            )

predict(self, frame, frame_type, i) async

策略评估函数。在此函数中实现交易信号检测和处理。

Parameters:

Name Type Description Default
frame Union[datetime.date, datetime.datetime]

当前时间帧

required
frame_type FrameType

处理的数据主周期

required
i int

当前时间离回测起始的单位数

required
Source code in omicron/strategy/sma.py
async def predict(self, frame: Frame, frame_type: FrameType, i: int):
    n = max(self._n_short, self._n_long) - 1
    bars = await Stock.get_bars(self._sec, n, frame_type, end=frame)

    if len(bars) < n:
        return

    ma_short = np.mean(bars["close"][-self._n_short :])
    ma_long = np.mean(bars["close"][-self._n_long :])

    if ma_short > ma_long:
        await self.buy(
            self._sec, money=self.cash, order_time=tf.combine_time(frame, 14, 55)
        )
    elif ma_short < ma_long:
        await self.sell(
            self._sec, percent=1.0, order_time=tf.combine_time(frame, 14, 55)
        )