Skip to content

在apscheduler.triggers的基础上提供了FrameTrigger和IntervalTrigger,使得它们只在交易日(或者 基于交易日+延时)时激发。

FrameTrigger (BaseTrigger)

A cron like trigger fires on each valid Frame

Source code in omicron/core/triggers.py
class FrameTrigger(BaseTrigger):
    """
    A cron like trigger fires on each valid Frame
    """

    def __init__(self, frame_type: Union[str, FrameType], jitter: str = None):
        """构造函数

        jitter的格式用正则式表达为`r"([-]?)(\\d+)([mshd])"`,其中第一组为符号,'-'表示提前;
        第二组为数字,第三组为单位,可以为`m`(分钟), `s`(秒), `h`(小时),`d`(天)。

        下面的示例构造了一个只在交易日,每30分钟触发一次,每次提前15秒触的trigger。即它的触发时
        间是每个交易日的09:29:45, 09:59:45, ...

        Examples:
            >>> FrameTrigger(FrameType.MIN30, '-15s') # doctest: +ELLIPSIS
            <omicron.core.triggers.FrameTrigger object at 0x...>

        Args:
            frame_type:
            jitter: 单位秒。其中offset必须在一个FrameType的长度以内
        """
        self.frame_type = FrameType(frame_type)
        if jitter is None:
            _jitter = 0
        else:
            matched = re.match(r"([-]?)(\d+)([mshd])", jitter)
            if matched is None:  # pragma: no cover
                raise ValueError(
                    "malformed. jitter should be [-](number)(unit), "
                    "for example, -30m, or 30s"
                )
            sign, num, unit = matched.groups()
            num = int(num)
            if unit.lower() == "m":
                _jitter = 60 * num
            elif unit.lower() == "s":
                _jitter = num
            elif unit.lower() == "h":
                _jitter = 3600 * num
            elif unit.lower() == "d":
                _jitter = 3600 * 24 * num
            else:  # pragma: no cover
                raise ValueError("bad time unit. only s,h,m,d is acceptable")

            if sign == "-":
                _jitter = -_jitter

        self.jitter = datetime.timedelta(seconds=_jitter)
        if (
            frame_type == FrameType.MIN1
            and abs(_jitter) >= 60
            or frame_type == FrameType.MIN5
            and abs(_jitter) >= 300
            or frame_type == FrameType.MIN15
            and abs(_jitter) >= 900
            or frame_type == FrameType.MIN30
            and abs(_jitter) >= 1800
            or frame_type == FrameType.MIN60
            and abs(_jitter) >= 3600
            or frame_type == FrameType.DAY
            and abs(_jitter) >= 24 * 3600
            # it's still not allowed if offset > week, month, etc. Would anybody
            # really specify an offset longer than that?
        ):
            raise ValueError("offset must be less than frame length")

    def __str__(self):
        return f"{self.__class__.__name__}:{self.frame_type.value}:{self.jitter}"

    def get_next_fire_time(
        self,
        previous_fire_time: Union[datetime.date, datetime.datetime],
        now: Union[datetime.date, datetime.datetime],
    ):
        """"""
        ft = self.frame_type

        # `now` is timezone aware, while ceiling isn't
        now = now.replace(tzinfo=None)
        next_tick = now
        next_frame = TimeFrame.ceiling(now, ft)
        while next_tick <= now:
            if ft in TimeFrame.day_level_frames:
                next_tick = TimeFrame.combine_time(next_frame, 15) + self.jitter
            else:
                next_tick = next_frame + self.jitter

            if next_tick > now:
                tz = tzlocal.get_localzone()
                return next_tick.astimezone(tz)
            else:
                next_frame = TimeFrame.shift(next_frame, 1, ft)

__init__(self, frame_type, jitter=None) special

构造函数

jitter的格式用正则式表达为r"([-]?)(\d+)([mshd])",其中第一组为符号,'-'表示提前; 第二组为数字,第三组为单位,可以为m(分钟), s(秒), h(小时),d(天)。

下面的示例构造了一个只在交易日,每30分钟触发一次,每次提前15秒触的trigger。即它的触发时 间是每个交易日的09:29:45, 09:59:45, ...

Examples:

>>> FrameTrigger(FrameType.MIN30, '-15s')
<omicron.core.triggers.FrameTrigger object at 0x...>

Parameters:

Name Type Description Default
frame_type Union[str, coretypes.types.FrameType] required
jitter str

单位秒。其中offset必须在一个FrameType的长度以内

None
Source code in omicron/core/triggers.py
def __init__(self, frame_type: Union[str, FrameType], jitter: str = None):
    """构造函数

    jitter的格式用正则式表达为`r"([-]?)(\\d+)([mshd])"`,其中第一组为符号,'-'表示提前;
    第二组为数字,第三组为单位,可以为`m`(分钟), `s`(秒), `h`(小时),`d`(天)。

    下面的示例构造了一个只在交易日,每30分钟触发一次,每次提前15秒触的trigger。即它的触发时
    间是每个交易日的09:29:45, 09:59:45, ...

    Examples:
        >>> FrameTrigger(FrameType.MIN30, '-15s') # doctest: +ELLIPSIS
        <omicron.core.triggers.FrameTrigger object at 0x...>

    Args:
        frame_type:
        jitter: 单位秒。其中offset必须在一个FrameType的长度以内
    """
    self.frame_type = FrameType(frame_type)
    if jitter is None:
        _jitter = 0
    else:
        matched = re.match(r"([-]?)(\d+)([mshd])", jitter)
        if matched is None:  # pragma: no cover
            raise ValueError(
                "malformed. jitter should be [-](number)(unit), "
                "for example, -30m, or 30s"
            )
        sign, num, unit = matched.groups()
        num = int(num)
        if unit.lower() == "m":
            _jitter = 60 * num
        elif unit.lower() == "s":
            _jitter = num
        elif unit.lower() == "h":
            _jitter = 3600 * num
        elif unit.lower() == "d":
            _jitter = 3600 * 24 * num
        else:  # pragma: no cover
            raise ValueError("bad time unit. only s,h,m,d is acceptable")

        if sign == "-":
            _jitter = -_jitter

    self.jitter = datetime.timedelta(seconds=_jitter)
    if (
        frame_type == FrameType.MIN1
        and abs(_jitter) >= 60
        or frame_type == FrameType.MIN5
        and abs(_jitter) >= 300
        or frame_type == FrameType.MIN15
        and abs(_jitter) >= 900
        or frame_type == FrameType.MIN30
        and abs(_jitter) >= 1800
        or frame_type == FrameType.MIN60
        and abs(_jitter) >= 3600
        or frame_type == FrameType.DAY
        and abs(_jitter) >= 24 * 3600
        # it's still not allowed if offset > week, month, etc. Would anybody
        # really specify an offset longer than that?
    ):
        raise ValueError("offset must be less than frame length")

TradeTimeIntervalTrigger (BaseTrigger)

只在交易时间触发的固定间隔的trigger

Source code in omicron/core/triggers.py
class TradeTimeIntervalTrigger(BaseTrigger):
    """只在交易时间触发的固定间隔的trigger"""

    def __init__(self, interval: str):
        """构造函数

        interval的格式用正则表达式表示为 `r"(\\d+)([mshd])"` 。其中第一组为数字,第二组为单位。有效的
        `interval`如 1 ,表示每1小时触发一次,则该触发器将在交易日的10:30, 11:30, 14:00和
        15:00各触发一次

        Args:
            interval : [description]

        Raises:
            ValueError: [description]
        """
        matched = re.match(r"(\d+)([mshd])", interval)
        if matched is None:
            raise ValueError(f"malform interval {interval}")

        interval, unit = matched.groups()
        interval = int(interval)
        unit = unit.lower()
        if unit == "s":
            self.interval = datetime.timedelta(seconds=interval)
        elif unit == "m":
            self.interval = datetime.timedelta(minutes=interval)
        elif unit == "h":
            self.interval = datetime.timedelta(hours=interval)
        elif unit == "d":
            self.interval = datetime.timedelta(days=interval)
        else:
            self.interval = datetime.timedelta(seconds=interval)

    def __str__(self):
        return f"{self.__class__.__name__}:{self.interval.seconds}"

    def get_next_fire_time(
        self,
        previous_fire_time: Optional[datetime.datetime],
        now: Optional[datetime.datetime],
    ):
        """"""
        if previous_fire_time is not None:
            fire_time = previous_fire_time + self.interval
        else:
            fire_time = now

        if TimeFrame.date2int(fire_time.date()) not in TimeFrame.day_frames:
            ft = TimeFrame.day_shift(now, 1)
            fire_time = datetime.datetime(
                ft.year, ft.month, ft.day, 9, 30, tzinfo=fire_time.tzinfo
            )
            return fire_time

        minutes = fire_time.hour * 60 + fire_time.minute

        if minutes < 570:
            fire_time = fire_time.replace(hour=9, minute=30, second=0, microsecond=0)
        elif 690 < minutes < 780:
            fire_time = fire_time.replace(hour=13, minute=0, second=0, microsecond=0)
        elif minutes > 900:
            ft = TimeFrame.day_shift(fire_time, 1)
            fire_time = datetime.datetime(
                ft.year, ft.month, ft.day, 9, 30, tzinfo=fire_time.tzinfo
            )

        return fire_time

__init__(self, interval) special

构造函数

interval的格式用正则表达式表示为 r"(\d+)([mshd])" 。其中第一组为数字,第二组为单位。有效的 interval如 1 ,表示每1小时触发一次,则该触发器将在交易日的10:30, 11:30, 14:00和 15:00各触发一次

Parameters:

Name Type Description Default
interval

[description]

required

Exceptions:

Type Description
ValueError

[description]

Source code in omicron/core/triggers.py
def __init__(self, interval: str):
    """构造函数

    interval的格式用正则表达式表示为 `r"(\\d+)([mshd])"` 。其中第一组为数字,第二组为单位。有效的
    `interval`如 1 ,表示每1小时触发一次,则该触发器将在交易日的10:30, 11:30, 14:00和
    15:00各触发一次

    Args:
        interval : [description]

    Raises:
        ValueError: [description]
    """
    matched = re.match(r"(\d+)([mshd])", interval)
    if matched is None:
        raise ValueError(f"malform interval {interval}")

    interval, unit = matched.groups()
    interval = int(interval)
    unit = unit.lower()
    if unit == "s":
        self.interval = datetime.timedelta(seconds=interval)
    elif unit == "m":
        self.interval = datetime.timedelta(minutes=interval)
    elif unit == "h":
        self.interval = datetime.timedelta(hours=interval)
    elif unit == "d":
        self.interval = datetime.timedelta(days=interval)
    else:
        self.interval = datetime.timedelta(seconds=interval)