Omicron提供数据持久化、时间(日历、triggers)、行情数据model、基础运算和基础量化因子
close()
  
      async
  
¶
    关闭与缓存的连接
Source code in omicron/__init__.py
          async def close():
    """关闭与缓存的连接"""
    try:
        await cache.close()
    except Exception as e:  # noqa
        pass
init(app_cache=5)
  
      async
  
¶
    初始化Omicron
初始化influxDB, 缓存等连接, 并加载日历和证券列表
上述初始化的连接,应该在程序退出时,通过调用close()关闭
Source code in omicron/__init__.py
          async def init(app_cache: int = 5):
    """初始化Omicron
    初始化influxDB, 缓存等连接, 并加载日历和证券列表
    上述初始化的连接,应该在程序退出时,通过调用`close()`关闭
    """
    global cache
    await cache.init(app=app_cache)
    await tf.init()
    from omicron.models.security import Security
    await Security.init()
Extensions package¶
        decimals
¶
    
math_round(x, digits)
¶
    由于浮点数的表示问题,很多语言的round函数与数学上的round函数不一致。下面的函数结果与数学上的一致。
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| x | float | 要进行四舍五入的数字 | required | 
| digits | int | 小数点后保留的位数 | required | 
Source code in omicron/extensions/decimals.py
          def math_round(x: float, digits: int):
    """由于浮点数的表示问题,很多语言的round函数与数学上的round函数不一致。下面的函数结果与数学上的一致。
    Args:
        x: 要进行四舍五入的数字
        digits: 小数点后保留的位数
    """
    return int(x * (10**digits) + copysign(0.5, x)) / (10**digits)
price_equal(x, y)
¶
    判断股价是否相等
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| x | 价格1 | required | |
| y | 价格2 | required | 
Returns:
| Type | Description | 
|---|---|
| bool | 如果相等则返回True,否则返回False | 
Source code in omicron/extensions/decimals.py
          def price_equal(x: float, y: float) -> bool:
    """判断股价是否相等
    Args:
        x : 价格1
        y : 价格2
    Returns:
        如果相等则返回True,否则返回False
    """
    return abs(math_round(x, 2) - math_round(y, 2)) < 1e-2
        np
¶
    Extension function related to numpy
array_math_round(arr, digits)
¶
    将一维数组arr的数据进行四舍五入
numpy.around的函数并不是数学上的四舍五入,对1.5和2.5进行round的结果都会变成2,在金融领域计算中,我们必须使用数学意义上的四舍五入。
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| arr | ArrayLike | 输入数组 | required | 
| digits | int | required | 
Returns:
| Type | Description | 
|---|---|
| np.ndarray | 四舍五入后的一维数组 | 
Source code in omicron/extensions/np.py
          def array_math_round(arr: Union[float, ArrayLike], digits: int) -> np.ndarray:
    """将一维数组arr的数据进行四舍五入
    numpy.around的函数并不是数学上的四舍五入,对1.5和2.5进行round的结果都会变成2,在金融领域计算中,我们必须使用数学意义上的四舍五入。
    Args:
        arr (ArrayLike): 输入数组
        digits (int):
    Returns:
        np.ndarray: 四舍五入后的一维数组
    """
    # 如果是单个元素,则直接返回
    if isinstance(arr, float):
        return decimals.math_round(arr, digits)
    f = np.vectorize(lambda x: decimals.math_round(x, digits))
    return f(arr)
array_price_equal(price1, price2)
¶
    判断两个价格数组是否相等
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| price1 | ArrayLike | 价格数组 | required | 
| price2 | ArrayLike | 价格数组 | required | 
Returns:
| Type | Description | 
|---|---|
| np.ndarray | 判断结果 | 
Source code in omicron/extensions/np.py
          def array_price_equal(price1: ArrayLike, price2: ArrayLike) -> np.ndarray:
    """判断两个价格数组是否相等
    Args:
        price1 (ArrayLike): 价格数组
        price2 (ArrayLike): 价格数组
    Returns:
        np.ndarray: 判断结果
    """
    price1 = array_math_round(price1, 2)
    price2 = array_math_round(price2, 2)
    return abs(price1 - price2) < 1e-2
bars_since(condition, default=None)
¶
    Return the number of bars since condition sequence was last True,
or if never, return default.
| 1 2 3 |  | 
Source code in omicron/extensions/np.py
          def bars_since(condition: Sequence[bool], default=None) -> int:
    """
    Return the number of bars since `condition` sequence was last `True`,
    or if never, return `default`.
        >>> condition = [True, True, False]
        >>> bars_since(condition)
        1
    """
    return next(compress(range(len(condition)), reversed(condition)), default)
bin_cut(arr, n)
¶
    将数组arr切分成n份
todo: use padding + reshape to boost performance
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| arr | [type] | [description] | required | 
| n | [type] | [description] | required | 
Returns:
| Type | Description | 
|---|---|
| [type] | [description] | 
Source code in omicron/extensions/np.py
          def bin_cut(arr: list, n: int):
    """将数组arr切分成n份
    todo: use padding + reshape to boost performance
    Args:
        arr ([type]): [description]
        n ([type]): [description]
    Returns:
        [type]: [description]
    """
    result = [[] for i in range(n)]
    for i, e in enumerate(arr):
        result[i % n].append(e)
    return [e for e in result if len(e)]
count_between(arr, start, end)
¶
    计算数组中,start元素与end元素之间共有多少个元素
要求arr必须是已排序。计算结果会包含区间边界点。
Examples:
>>> arr = [20050104, 20050105, 20050106, 20050107, 20050110, 20050111]
>>> count_between(arr, 20050104, 20050111)
6
>>> count_between(arr, 20050104, 20050109)
4
Source code in omicron/extensions/np.py
          def count_between(arr, start, end):
    """计算数组中,`start`元素与`end`元素之间共有多少个元素
    要求arr必须是已排序。计算结果会包含区间边界点。
    Examples:
        >>> arr = [20050104, 20050105, 20050106, 20050107, 20050110, 20050111]
        >>> count_between(arr, 20050104, 20050111)
        6
        >>> count_between(arr, 20050104, 20050109)
        4
    """
    pos_start = np.searchsorted(arr, start, side="right")
    pos_end = np.searchsorted(arr, end, side="right")
    counter = pos_end - pos_start + 1
    if start < arr[0]:
        counter -= 1
    if end > arr[-1]:
        counter -= 1
    return counter
dataframe_to_structured_array(df, dtypes=None)
¶
    convert dataframe (with all columns, and index possibly) to numpy structured arrays
len(dtypes) should be either equal to len(df.columns) or len(df.columns) + 1. In the later case, it implies to include df.index into converted array.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| df | DataFrame | the one needs to be converted | required | 
| dtypes | List[Tuple] | Defaults to None. If it's  | None | 
Returns:
| Type | Description | 
|---|---|
| ArrayLike | [description] | 
Source code in omicron/extensions/np.py
          def dataframe_to_structured_array(
    df: DataFrame, dtypes: List[Tuple] = None
) -> ArrayLike:
    """convert dataframe (with all columns, and index possibly) to numpy structured arrays
    `len(dtypes)` should be either equal to `len(df.columns)` or `len(df.columns) + 1`. In the later case, it implies to include `df.index` into converted array.
    Args:
        df: the one needs to be converted
        dtypes: Defaults to None. If it's `None`, then dtypes of `df` is used, in such case, the `index` of `df` will not be converted.
    Returns:
        ArrayLike: [description]
    """
    v = df
    if dtypes is not None:
        dtypes_in_dict = {key: value for key, value in dtypes}
        col_len = len(df.columns)
        if len(dtypes) == col_len + 1:
            v = df.reset_index()
            rename_index_to = set(dtypes_in_dict.keys()).difference(set(df.columns))
            v.rename(columns={"index": list(rename_index_to)[0]}, inplace=True)
        elif col_len != len(dtypes):
            raise ValueError(
                f"length of dtypes should be either {col_len} or {col_len + 1}, is {len(dtypes)}"
            )
        # re-arrange order of dtypes, in order to align with df.columns
        dtypes = []
        for name in v.columns:
            dtypes.append((name, dtypes_in_dict[name]))
    else:
        dtypes = df.dtypes
    return np.array(np.rec.fromrecords(v.values), dtype=dtypes)
dict_to_numpy_array(d, dtype)
¶
    convert dictionary to numpy array
Examples:
d = {"aaron": 5, "jack": 6} dtype = [("name", "S8"), ("score", "<i4")] dict_to_numpy_array(d, dtype) array([(b'aaron', 5), (b'jack', 6)], dtype=[('name', 'S8'), ('score', '<i4')])
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| d | dict | [description] | required | 
| dtype | List[Tuple] | [description] | required | 
Returns:
| Type | Description | 
|---|---|
| np.array | [description] | 
Source code in omicron/extensions/np.py
          def dict_to_numpy_array(d: dict, dtype: List[Tuple]) -> np.array:
    """convert dictionary to numpy array
    Examples:
    >>> d = {"aaron": 5, "jack": 6}
    >>> dtype = [("name", "S8"), ("score", "<i4")]
    >>> dict_to_numpy_array(d, dtype)
    array([(b'aaron', 5), (b'jack', 6)],
          dtype=[('name', 'S8'), ('score', '<i4')])
    Args:
        d (dict): [description]
        dtype (List[Tuple]): [description]
    Returns:
        np.array: [description]
    """
    return np.fromiter(d.items(), dtype=dtype, count=len(d))
fill_nan(ts)
¶
    将ts中的NaN替换为其前值
如果ts起头的元素为NaN,则用第一个非NaN元素替换。
如果所有元素都为NaN,则无法替换。
Examples:
>>> arr = np.arange(6, dtype=np.float32)
>>> arr[3:5] = np.NaN
>>> fill_nan(arr)
...
array([0., 1., 2., 2., 2., 5.], dtype=float32)
>>> arr = np.arange(6, dtype=np.float32)
>>> arr[0:2] = np.nan
>>> fill_nan(arr)
...
array([2., 2., 2., 3., 4., 5.], dtype=float32)
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| ts | np.array | [description] | required | 
Source code in omicron/extensions/np.py
          def fill_nan(ts: np.ndarray):
    """将ts中的NaN替换为其前值
    如果ts起头的元素为NaN,则用第一个非NaN元素替换。
    如果所有元素都为NaN,则无法替换。
    Example:
        >>> arr = np.arange(6, dtype=np.float32)
        >>> arr[3:5] = np.NaN
        >>> fill_nan(arr)
        ... # doctest: +NORMALIZE_WHITESPACE
        array([0., 1., 2., 2., 2., 5.], dtype=float32)
        >>> arr = np.arange(6, dtype=np.float32)
        >>> arr[0:2] = np.nan
        >>> fill_nan(arr)
        ... # doctest: +NORMALIZE_WHITESPACE
        array([2., 2., 2., 3., 4., 5.], dtype=float32)
    Args:
        ts (np.array): [description]
    """
    if np.all(np.isnan(ts)):
        raise ValueError("all of ts are NaN")
    if ts[0] is None or math.isnan(ts[0]):
        idx = np.argwhere(~np.isnan(ts))[0]
        ts[0] = ts[idx]
    mask = np.isnan(ts)
    idx = np.where(~mask, np.arange(mask.size), 0)
    np.maximum.accumulate(idx, out=idx)
    return ts[idx]
find_runs(x)
¶
    Find runs of consecutive items in an array.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| x | ArrayLike | the sequence to find runs in | required | 
Returns:
| Type | Description | 
|---|---|
| Tuple[np.ndarray, np.ndarray, np.ndarray] | A tuple of unique values, start indices, and length of runs | 
Source code in omicron/extensions/np.py
          def find_runs(x: ArrayLike) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Find runs of consecutive items in an array.
    Args:
        x: the sequence to find runs in
    Returns:
        A tuple of unique values, start indices, and length of runs
    """
    # ensure array
    x = np.asanyarray(x)
    if x.ndim != 1:
        raise ValueError("only 1D array supported")
    n = x.shape[0]
    # handle empty array
    if n == 0:
        return np.array([]), np.array([]), np.array([])
    else:
        # find run starts
        loc_run_start = np.empty(n, dtype=bool)
        loc_run_start[0] = True
        np.not_equal(x[:-1], x[1:], out=loc_run_start[1:])
        run_starts = np.nonzero(loc_run_start)[0]
        # find run values
        run_values = x[loc_run_start]
        # find run lengths
        run_lengths = np.diff(np.append(run_starts, n))
        return run_values, run_starts, run_lengths
floor(arr, item)
¶
    在数据arr中,找到小于等于item的那一个值。如果item小于所有arr元素的值,返回arr[0];如果item 大于所有arr元素的值,返回arr[-1]
与minute_frames_floor不同的是,本函数不做回绕与进位.
Examples:
>>> a = [3, 6, 9]
>>> floor(a, -1)
3
>>> floor(a, 9)
9
>>> floor(a, 10)
9
>>> floor(a, 4)
3
>>> floor(a,10)
9
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| arr | required | ||
| item | required | 
Source code in omicron/extensions/np.py
          def floor(arr, item):
    """
    在数据arr中,找到小于等于item的那一个值。如果item小于所有arr元素的值,返回arr[0];如果item
    大于所有arr元素的值,返回arr[-1]
    与`minute_frames_floor`不同的是,本函数不做回绕与进位.
    Examples:
        >>> a = [3, 6, 9]
        >>> floor(a, -1)
        3
        >>> floor(a, 9)
        9
        >>> floor(a, 10)
        9
        >>> floor(a, 4)
        3
        >>> floor(a,10)
        9
    Args:
        arr:
        item:
    Returns:
    """
    if item < arr[0]:
        return arr[0]
    index = np.searchsorted(arr, item, side="right")
    return arr[index - 1]
join_by_left(key, r1, r2, mask=True)
¶
    左连接 r1, r2 by key
如果r1中存在r2中没有的行,则该行对应的r2中的那些字段将被mask,或者填充随机数。
same as numpy.lib.recfunctions.join_by(key, r1, r2, jointype='leftouter'), but allows r1 have duplicate keys
Examples:
>>> # to join the following
>>> # [[ 1, 2],
>>> #  [ 1, 3],   x   [[1, 5],
>>> #  [ 2, 3]]        [4, 7]]
>>> # only first two rows in left will be joined
>>> r1 = np.array([(1, 2), (1,3), (2,3)], dtype=[('seq', 'i4'), ('score', 'i4')])
>>> r2 = np.array([(1, 5), (4,7)], dtype=[('seq', 'i4'), ('age', 'i4')])
>>> joined = join_by_left('seq', r1, r2)
>>> print(joined)
[(1, 2, 5) (1, 3, 5) (2, 3, --)]
>>> print(joined.dtype)
(numpy.record, [('seq', '<i4'), ('score', '<i4'), ('age', '<i4')])
>>> joined[2][2]
masked
>>> joined.tolist()[2][2] == None
True
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| key | join关键字 | required | |
| r1 | 数据集1 | required | |
| r2 | 数据集2 | required | 
Returns:
| Type | Description | 
|---|---|
| a numpy array | 
Source code in omicron/extensions/np.py
          def join_by_left(key, r1, r2, mask=True):
    """左连接 `r1`, `r2` by `key`
    如果`r1`中存在`r2`中没有的行,则该行对应的`r2`中的那些字段将被mask,或者填充随机数。
    same as numpy.lib.recfunctions.join_by(key, r1, r2, jointype='leftouter'), but allows r1 have duplicate keys
    [Reference: stackoverflow](https://stackoverflow.com/a/53261882/13395693)
    Examples:
        >>> # to join the following
        >>> # [[ 1, 2],
        >>> #  [ 1, 3],   x   [[1, 5],
        >>> #  [ 2, 3]]        [4, 7]]
        >>> # only first two rows in left will be joined
        >>> r1 = np.array([(1, 2), (1,3), (2,3)], dtype=[('seq', 'i4'), ('score', 'i4')])
        >>> r2 = np.array([(1, 5), (4,7)], dtype=[('seq', 'i4'), ('age', 'i4')])
        >>> joined = join_by_left('seq', r1, r2)
        >>> print(joined)
        [(1, 2, 5) (1, 3, 5) (2, 3, --)]
        >>> print(joined.dtype)
        (numpy.record, [('seq', '<i4'), ('score', '<i4'), ('age', '<i4')])
        >>> joined[2][2]
        masked
        >>> joined.tolist()[2][2] == None
        True
    Args:
        key : join关键字
        r1 : 数据集1
        r2 : 数据集2
    Returns:
        a numpy array
    """
    # figure out the dtype of the result array
    descr1 = r1.dtype.descr
    descr2 = [d for d in r2.dtype.descr if d[0] not in r1.dtype.names]
    descrm = descr1 + descr2
    # figure out the fields we'll need from each array
    f1 = [d[0] for d in descr1]
    f2 = [d[0] for d in descr2]
    # cache the number of columns in f1
    ncol1 = len(f1)
    # get a dict of the rows of r2 grouped by key
    rows2 = {}
    for row2 in r2:
        rows2.setdefault(row2[key], []).append(row2)
    # figure out how many rows will be in the result
    nrowm = 0
    for k1 in r1[key]:
        if k1 in rows2:
            nrowm += len(rows2[k1])
        else:
            nrowm += 1
    # allocate the return array
    # ret = np.full((nrowm, ), fill, dtype=descrm)
    _ret = np.recarray(nrowm, dtype=descrm)
    if mask:
        ret = np.ma.array(_ret, mask=True)
    else:
        ret = _ret
    # merge the data into the return array
    i = 0
    for row1 in r1:
        if row1[key] in rows2:
            for row2 in rows2[row1[key]]:
                ret[i] = tuple(row1[f1]) + tuple(row2[f2])
                i += 1
        else:
            for j in range(ncol1):
                ret[i][j] = row1[j]
            i += 1
    return ret
numpy_append_fields(base, names, data, dtypes)
¶
    给现有的数组base增加新的字段
实现了numpy.lib.recfunctions.rec_append_fields的功能。提供这个功能,是因为rec_append_fields不能处理data元素的类型为Object的情况。
新增的数据列将顺序排列在其它列的右边。
Examples:
>>> # 新增单个字段
>>> import numpy
>>> old = np.array([i for i in range(3)], dtype=[('col1', '<f4')])
>>> new_list = [2 * i for i in range(3)]
>>> res = numpy_append_fields(old, 'new_col', new_list, [('new_col', '<f4')])
>>> print(res)
...
[(0., 0.) (1., 2.) (2., 4.)]
>>> # 新增多个字段
>>> data = [res['col1'].tolist(), res['new_col'].tolist()]
>>> print(numpy_append_fields(old, ('col3', 'col4'), data, [('col3', '<f4'), ('col4', '<f4')]))
...
[(0., 0., 0.) (1., 1., 2.) (2., 2., 4.)]
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| base | [numpy.array] | 基础数组 | required | 
| names | [type] | 新增字段的名字,可以是字符串(单字段的情况),也可以是字符串列表 | required | 
| data | list | 增加的字段的数据,list类型 | required | 
| dtypes | [type] | 新增字段的dtype | required | 
Source code in omicron/extensions/np.py
          def numpy_append_fields(
    base: np.ndarray, names: Union[str, List[str]], data: List, dtypes: List
) -> np.ndarray:
    """给现有的数组`base`增加新的字段
    实现了`numpy.lib.recfunctions.rec_append_fields`的功能。提供这个功能,是因为`rec_append_fields`不能处理`data`元素的类型为Object的情况。
    新增的数据列将顺序排列在其它列的右边。
    Example:
        >>> # 新增单个字段
        >>> import numpy
        >>> old = np.array([i for i in range(3)], dtype=[('col1', '<f4')])
        >>> new_list = [2 * i for i in range(3)]
        >>> res = numpy_append_fields(old, 'new_col', new_list, [('new_col', '<f4')])
        >>> print(res)
        ... # doctest: +NORMALIZE_WHITESPACE
        [(0., 0.) (1., 2.) (2., 4.)]
        >>> # 新增多个字段
        >>> data = [res['col1'].tolist(), res['new_col'].tolist()]
        >>> print(numpy_append_fields(old, ('col3', 'col4'), data, [('col3', '<f4'), ('col4', '<f4')]))
        ... # doctest: +NORMALIZE_WHITESPACE
        [(0., 0., 0.) (1., 1., 2.) (2., 2., 4.)]
    Args:
        base ([numpy.array]): 基础数组
        names ([type]): 新增字段的名字,可以是字符串(单字段的情况),也可以是字符串列表
        data (list): 增加的字段的数据,list类型
        dtypes ([type]): 新增字段的dtype
    """
    if isinstance(names, str):
        names = [names]
        data = [data]
    result = np.empty(base.shape, dtype=base.dtype.descr + dtypes)
    for col in base.dtype.names:
        result[col] = base[col]
    for i in range(len(names)):
        result[names[i]] = data[i]
    return result
remove_nan(ts)
¶
    从ts中去除NaN
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| ts | np.array | [description] | required | 
Returns:
| Type | Description | 
|---|---|
| np.array | [description] | 
Source code in omicron/extensions/np.py
          def remove_nan(ts: np.ndarray) -> np.ndarray:
    """从`ts`中去除NaN
    Args:
        ts (np.array): [description]
    Returns:
        np.array: [description]
    """
    return ts[~np.isnan(ts.astype(float))]
replace_zero(ts, replacement=None)
¶
    将ts中的0替换为前值, 处理volume数据时常用用到
如果提供了replacement, 则替换为replacement
Source code in omicron/extensions/np.py
          def replace_zero(ts: np.ndarray, replacement=None) -> np.ndarray:
    """将ts中的0替换为前值, 处理volume数据时常用用到
    如果提供了replacement, 则替换为replacement
    """
    if replacement is not None:
        return np.where(ts == 0, replacement, ts)
    if np.all(ts == 0):
        raise ValueError("all of ts are 0")
    if ts[0] == 0:
        idx = np.argwhere(ts != 0)[0]
        ts[0] = ts[idx]
    mask = ts == 0
    idx = np.where(~mask, np.arange(mask.size), 0)
    np.maximum.accumulate(idx, out=idx)
    return ts[idx]
rolling(x, win, func)
¶
    对序列x进行窗口滑动计算。
如果func要实现的功能是argmax, argmin, max, mean, median, min, rank, std, sum, var等,move_argmax,请使用bottleneck中的move_argmin, move_max, move_mean, move_median, move_min move_rank, move_std, move_sum, move_var。这些函数的性能更好。
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| x | [type] | [description] | required | 
| win | [type] | [description] | required | 
| func | [type] | [description] | required | 
Returns:
| Type | Description | 
|---|---|
| [type] | [description] | 
Source code in omicron/extensions/np.py
          def rolling(x, win, func):
    """对序列`x`进行窗口滑动计算。
    如果`func`要实现的功能是argmax, argmin, max, mean, median, min, rank, std, sum, var等,move_argmax,请使用bottleneck中的move_argmin, move_max, move_mean, move_median, move_min move_rank, move_std, move_sum, move_var。这些函数的性能更好。
    Args:
        x ([type]): [description]
        win ([type]): [description]
        func ([type]): [description]
    Returns:
        [type]: [description]
    """
    results = []
    for subarray in sliding_window_view(x, window_shape=win):
        results.append(func(subarray))
    return np.array(results)
shift(arr, start, offset)
¶
    在numpy数组arr中,找到start(或者最接近的一个),取offset对应的元素。
要求arr已排序。offset为正,表明向后移位;offset为负,表明向前移位
Examples:
>>> arr = [20050104, 20050105, 20050106, 20050107, 20050110, 20050111]
>>> shift(arr, 20050104, 1)
20050105
>>> shift(arr, 20050105, -1)
20050104
>>> # 起始点已右越界,且向右shift,返回起始点
>>> shift(arr, 20050120, 1)
20050120
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| arr | 已排序的数组 | required | |
| start | numpy可接受的数据类型 | required | |
| offset | int | [description] | required | 
Returns:
| Type | Description | 
|---|---|
| 移位后得到的元素值 | 
Source code in omicron/extensions/np.py
          def shift(arr, start, offset):
    """在numpy数组arr中,找到start(或者最接近的一个),取offset对应的元素。
    要求`arr`已排序。`offset`为正,表明向后移位;`offset`为负,表明向前移位
    Examples:
        >>> arr = [20050104, 20050105, 20050106, 20050107, 20050110, 20050111]
        >>> shift(arr, 20050104, 1)
        20050105
        >>> shift(arr, 20050105, -1)
        20050104
        >>> # 起始点已右越界,且向右shift,返回起始点
        >>> shift(arr, 20050120, 1)
        20050120
    Args:
        arr : 已排序的数组
        start : numpy可接受的数据类型
        offset (int): [description]
    Returns:
        移位后得到的元素值
    """
    pos = np.searchsorted(arr, start, side="right")
    if pos + offset - 1 >= len(arr):
        return start
    else:
        return arr[pos + offset - 1]
smallest_n_argpos(ts, n)
¶
    get smallest n (min->max) elements and return argpos which its value ordered in ascent
Examples:
>>> smallest_n_argpos([np.nan, 4, 3, 9, 8, 5, 2, 1, 0, 6, 7], 2)
array([8, 7])
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| ts | np.array | 输入的数组 | required | 
| n | int | 取最小的n个元素 | required | 
Returns:
| Type | Description | 
|---|---|
| np.array | [description] | 
Source code in omicron/extensions/np.py
          def smallest_n_argpos(ts: np.array, n: int) -> np.array:
    """get smallest n (min->max) elements and return argpos which its value ordered in ascent
    Example:
        >>> smallest_n_argpos([np.nan, 4, 3, 9, 8, 5, 2, 1, 0, 6, 7], 2)
        array([8, 7])
    Args:
        ts (np.array): 输入的数组
        n (int): 取最小的n个元素
    Returns:
        np.array: [description]
    """
    return np.argsort(ts)[:n]
to_pydatetime(tm)
¶
    将numpy.datetime64对象转换成为python的datetime对象
numpy.ndarray.item()方法可用以将任何numpy对象转换成python对象,推荐在任何适用的地方使用.item()方法,而不是本方法。示例:
| 1 2 3 4 |  | 
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| tm | the input numpy datetime object | required | 
Returns:
| Type | Description | 
|---|---|
| datetime.datetime | python datetime object | 
.. deprecated:: 2.0.0 use tm.item() instead
Source code in omicron/extensions/np.py
          @deprecated("2.0.0", details="use `tm.item()` instead")
def to_pydatetime(tm: np.datetime64) -> datetime.datetime:
    """将numpy.datetime64对象转换成为python的datetime对象
    numpy.ndarray.item()方法可用以将任何numpy对象转换成python对象,推荐在任何适用的地方使用.item()方法,而不是本方法。示例:
    ```
        arr = np.array(['2022-09-08', '2022-09-09'], dtype='datetime64[s]')
        arr.item(0) # output is datetime.datetime(2022, 9, 8, 0, 0)
        arr[1].item() # output is datetime.datetime(2022, 9, 9, 0, 0)
    ```
    Args:
        tm : the input numpy datetime object
    Returns:
        python datetime object
    """
    unix_epoch = np.datetime64(0, "s")
    one_second = np.timedelta64(1, "s")
    seconds_since_epoch = (tm - unix_epoch) / one_second
    return datetime.datetime.utcfromtimestamp(seconds_since_epoch)
top_n_argpos(ts, n)
¶
    get top n (max->min) elements and return argpos which its value ordered in descent
Examples:
>>> top_n_argpos([np.nan, 4, 3, 9, 8, 5, 2, 1, 0, 6, 7], 2)
array([3, 4])
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| ts | np.array | [description] | required | 
| n | int | [description] | required | 
Returns:
| Type | Description | 
|---|---|
| np.array | [description] | 
Source code in omicron/extensions/np.py
          def top_n_argpos(ts: np.array, n: int) -> np.array:
    """get top n (max->min) elements and return argpos which its value ordered in descent
    Example:
        >>> top_n_argpos([np.nan, 4, 3, 9, 8, 5, 2, 1, 0, 6, 7], 2)
        array([3, 4])
    Args:
        ts (np.array): [description]
        n (int): [description]
    Returns:
        np.array: [description]
    """
    ts_ = np.copy(ts)
    ts_[np.isnan(ts_)] = -np.inf
    return np.argsort(ts_)[-n:][::-1]
Notify package¶
        dingtalk
¶
    
        
DingTalkMessage        
¶
    钉钉的机器人消息推送类,封装了常用的消息类型以及加密算法 需要在配置文件中配置钉钉的机器人的access_token 如果配置了加签,需要在配置文件中配置钉钉的机器人的secret 如果配置了自定义关键词,需要在配置文件中配置钉钉的机器人的keyword,多个关键词用英文逗号分隔 全部的配置文件示例如下, 其中secret和keyword可以不配置, access_token必须配置 notify: dingtalk_access_token: xxxx dingtalk_secret: xxxx
Source code in omicron/notify/dingtalk.py
          class DingTalkMessage:
    """
    钉钉的机器人消息推送类,封装了常用的消息类型以及加密算法
    需要在配置文件中配置钉钉的机器人的access_token
    如果配置了加签,需要在配置文件中配置钉钉的机器人的secret
    如果配置了自定义关键词,需要在配置文件中配置钉钉的机器人的keyword,多个关键词用英文逗号分隔
    全部的配置文件示例如下, 其中secret和keyword可以不配置, access_token必须配置
    notify:
      dingtalk_access_token: xxxx
      dingtalk_secret: xxxx
    """
    url = "https://oapi.dingtalk.com/robot/send"
    @classmethod
    def _get_access_token(cls):
        """获取钉钉机器人的access_token"""
        if hasattr(cfg.notify, "dingtalk_access_token"):
            return cfg.notify.dingtalk_access_token
        else:
            logger.error(
                "Dingtalk not configured, please add the following items:\n"
                "notify:\n"
                "  dingtalk_access_token: xxxx\n"
                "  dingtalk_secret: xxxx\n"
            )
            raise ConfigError("dingtalk_access_token not found")
    @classmethod
    def _get_secret(cls):
        """获取钉钉机器人的secret"""
        if hasattr(cfg.notify, "dingtalk_secret"):
            return cfg.notify.dingtalk_secret
        else:
            return None
    @classmethod
    def _get_url(cls):
        """获取钉钉机器人的消息推送地址,将签名和时间戳拼接在url后面"""
        access_token = cls._get_access_token()
        url = f"{cls.url}?access_token={access_token}"
        secret = cls._get_secret()
        if secret:
            timestamp, sign = cls._get_sign(secret)
            url = f"{url}×tamp={timestamp}&sign={sign}"
        return url
    @classmethod
    def _get_sign(cls, secret: str):
        """获取签名发送给钉钉机器人"""
        timestamp = str(round(time.time() * 1000))
        secret_enc = secret.encode("utf-8")
        string_to_sign = "{}\n{}".format(timestamp, secret)
        string_to_sign_enc = string_to_sign.encode("utf-8")
        hmac_code = hmac.new(
            secret_enc, string_to_sign_enc, digestmod=hashlib.sha256
        ).digest()
        sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
        return timestamp, sign
    @classmethod
    def _send(cls, msg):
        """发送消息到钉钉机器人"""
        url = cls._get_url()
        response = httpx.post(url, json=msg, timeout=30)
        if response.status_code != 200:
            logger.error(
                f"failed to send message, content: {msg}, response from Dingtalk: {response.content.decode()}"
            )
            return
        rsp = json.loads(response.content)
        if rsp.get("errcode") != 0:
            logger.error(
                f"failed to send message, content: {msg}, response from Dingtalk: {rsp}"
            )
        return response.content.decode()
    @classmethod
    async def _send_async(cls, msg):
        """发送消息到钉钉机器人"""
        url = cls._get_url()
        async with httpx.AsyncClient() as client:
            r = await client.post(url, json=msg, timeout=30)
            if r.status_code != 200:
                logger.error(
                    f"failed to send message, content: {msg}, response from Dingtalk: {r.content.decode()}"
                )
                return
            rsp = json.loads(r.content)
            if rsp.get("errcode") != 0:
                logger.error(
                    f"failed to send message, content: {msg}, response from Dingtalk: {rsp}"
                )
            return r.content.decode()
    @classmethod
    @deprecated("2.0.0", details="use function `ding` instead")
    def text(cls, content):
        msg = {"text": {"content": content}, "msgtype": "text"}
        return cls._send(msg)
text(cls, content)
  
      classmethod
  
¶
    .. deprecated:: 2.0.0 use function ding instead
Source code in omicron/notify/dingtalk.py
          @classmethod
@deprecated("2.0.0", details="use function `ding` instead")
def text(cls, content):
    msg = {"text": {"content": content}, "msgtype": "text"}
    return cls._send(msg)
ding(msg)
¶
    发送消息到钉钉机器人
支持发送纯文本消息和markdown格式的文本消息。如果要发送markdown格式的消息,请通过字典传入,必须包含包含"title"和"text"两个字段。更详细信息,请见钉钉开放平台文档
Important
必须在异步线程(即运行asyncio loop的线程)中调用此方法,否则会抛出异常。 此方法返回一个Awaitable,您可以等待它完成,也可以忽略返回值,此时它将作为一个后台任务执行,但完成的时间不确定。
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| msg | Union[str, dict] | 待发送消息。 | required | 
Returns:
| Type | Description | 
|---|---|
| Awaitable | 发送消息的后台任务。您可以使用此返回句柄来取消任务。 | 
Source code in omicron/notify/dingtalk.py
          def ding(msg: Union[str, dict]) -> Awaitable:
    """发送消息到钉钉机器人
    支持发送纯文本消息和markdown格式的文本消息。如果要发送markdown格式的消息,请通过字典传入,必须包含包含"title"和"text"两个字段。更详细信息,请见[钉钉开放平台文档](https://open.dingtalk.com/document/orgapp-server/message-type)
    ???+ Important
        必须在异步线程(即运行asyncio loop的线程)中调用此方法,否则会抛出异常。
        此方法返回一个Awaitable,您可以等待它完成,也可以忽略返回值,此时它将作为一个后台任务执行,但完成的时间不确定。
    Args:
        msg: 待发送消息。
    Returns:
        发送消息的后台任务。您可以使用此返回句柄来取消任务。
    """
    if isinstance(msg, str):
        msg_ = {"text": {"content": msg}, "msgtype": "text"}
    elif isinstance(msg, dict):
        msg_ = {
            "msgtype": "markdown",
            "markdown": {"title": msg["title"], "text": msg["text"]},
        }
    else:
        raise TypeError
    task = asyncio.create_task(DingTalkMessage._send_async(msg_))
    return task
        mail
¶
    
compose(subject, plain_txt=None, html=None, attachment=None)
¶
    编写MIME邮件。
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| subject | str | 邮件主题 | required | 
| plain_txt | str | 纯文本格式的邮件内容 | None | 
| html | str | html格式的邮件内容. Defaults to None. | None | 
| attachment | str | 附件文件名 | None | 
Returns:
| Type | Description | 
|---|---|
| EmailMessage | MIME mail | 
Source code in omicron/notify/mail.py
          def compose(
    subject: str, plain_txt: str = None, html: str = None, attachment: str = None
) -> EmailMessage:
    """编写MIME邮件。
    Args:
        subject (str): 邮件主题
        plain_txt (str): 纯文本格式的邮件内容
        html (str, optional): html格式的邮件内容. Defaults to None.
        attachment (str, optional): 附件文件名
    Returns:
        MIME mail
    """
    msg = EmailMessage()
    msg["Subject"] = subject
    if html:
        msg.preamble = plain_txt or ""
        msg.set_content(html, subtype="html")
    else:
        assert plain_txt, "Either plain_txt or html is required."
        msg.set_content(plain_txt)
    if attachment:
        ctype, encoding = mimetypes.guess_type(attachment)
        if ctype is None or encoding is not None:
            ctype = "application/octet-stream"
        maintype, subtype = ctype.split("/", 1)
        with open(attachment, "rb") as f:
            msg.add_attachment(
                f.read(), maintype=maintype, subtype=subtype, filename=attachment
            )
    return msg
mail_notify(subject=None, body=None, msg=None, html=False, receivers=None)
¶
    发送邮件通知。
发送者、接收者及邮件服务器等配置请通过cfg4py配置:
| 1 2 3 4 5 |  | 
MAIL_PASSWORD来配置。
subject/body与msg必须提供其一。
Important
必须在异步线程(即运行asyncio loop的线程)中调用此方法,否则会抛出异常。 此方法返回一个Awaitable,您可以等待它完成,也可以忽略返回值,此时它将作为一个后台任务执行,但完成的时间不确定。
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| msg | EmailMessage | [description]. Defaults to None. | None | 
| subject | str | [description]. Defaults to None. | None | 
| body | str | [description]. Defaults to None. | None | 
| html | bool | body是否按html格式处理? Defaults to False. | False | 
| receivers | List[str], Optional | 接收者信息。如果不提供,将使用预先配置的接收者信息。 | None | 
Returns:
| Type | Description | 
|---|---|
| Awaitable | 发送消息的后台任务。您可以使用此返回句柄来取消任务。 | 
Source code in omicron/notify/mail.py
          def mail_notify(
    subject: str = None,
    body: str = None,
    msg: EmailMessage = None,
    html=False,
    receivers=None,
) -> Awaitable:
    """发送邮件通知。
    发送者、接收者及邮件服务器等配置请通过cfg4py配置:
    ```
    notify:
        mail_from: aaron_yang@jieyu.ai
        mail_to:
            - code@jieyu.ai
        mail_server: smtp.ym.163.com
    ```
    验证密码请通过环境变量`MAIL_PASSWORD`来配置。
    subject/body与msg必须提供其一。
    ???+ Important
        必须在异步线程(即运行asyncio loop的线程)中调用此方法,否则会抛出异常。
        此方法返回一个Awaitable,您可以等待它完成,也可以忽略返回值,此时它将作为一个后台任务执行,但完成的时间不确定。
    Args:
        msg (EmailMessage, optional): [description]. Defaults to None.
        subject (str, optional): [description]. Defaults to None.
        body (str, optional): [description]. Defaults to None.
        html (bool, optional): body是否按html格式处理? Defaults to False.
        receivers (List[str], Optional): 接收者信息。如果不提供,将使用预先配置的接收者信息。
    Returns:
        发送消息的后台任务。您可以使用此返回句柄来取消任务。
    """
    if all([msg is not None, subject or body]):
        raise TypeError("msg参数与subject/body只能提供其中之一")
    elif all([msg is None, subject is None, body is None]):
        raise TypeError("必须提供msg参数或者subjecdt/body参数")
    if msg is None:
        if html:
            msg = compose(subject, html=body)
        else:
            msg = compose(subject, plain_txt=body)
    cfg = cfg4py.get_instance()
    if not receivers:
        receivers = cfg.notify.mail_to
    password = os.environ.get("MAIL_PASSWORD")
    return send_mail(
        cfg.notify.mail_from, receivers, password, msg, host=cfg.notify.mail_server
    )
send_mail(sender, receivers, password, msg=None, host=None, port=25, cc=None, bcc=None, subject=None, body=None, username=None)
¶
    发送邮件通知。
如果只发送简单的文本邮件,请使用 send_mail(sender, receivers, subject=subject, plain=plain)。如果要发送较复杂的带html和附件的邮件,请先调用compose()生成一个EmailMessage,然后再调用send_mail(sender, receivers, msg)来发送邮件。
Important
必须在异步线程(即运行asyncio loop的线程)中调用此方法,否则会抛出异常。 此方法返回一个Awaitable,您可以等待它完成,也可以忽略返回值,此时它将作为一个后台任务执行,但完成的时间不确定。
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| sender | str | [description] | required | 
| receivers | List[str] | [description] | required | 
| msg | EmailMessage | [description]. Defaults to None. | None | 
| host | str | [description]. Defaults to None. | None | 
| port | int | [description]. Defaults to 25. | 25 | 
| cc | List[str] | [description]. Defaults to None. | None | 
| bcc | List[str] | [description]. Defaults to None. | None | 
| subject | str | [description]. Defaults to None. | None | 
| plain | str | [description]. Defaults to None. | required | 
| username | str | the username used to logon to mail server. if not provided, then  | None | 
Returns:
| Type | Description | 
|---|---|
| Awaitable | 发送消息的后台任务。您可以使用此返回句柄来取消任务。 | 
Source code in omicron/notify/mail.py
          @retry(aiosmtplib.errors.SMTPConnectError, tries=3, backoff=2, delay=30, logger=logger)
def send_mail(
    sender: str,
    receivers: List[str],
    password: str,
    msg: EmailMessage = None,
    host: str = None,
    port: int = 25,
    cc: List[str] = None,
    bcc: List[str] = None,
    subject: str = None,
    body: str = None,
    username: str = None,
) -> Awaitable:
    """发送邮件通知。
    如果只发送简单的文本邮件,请使用 send_mail(sender, receivers, subject=subject, plain=plain)。如果要发送较复杂的带html和附件的邮件,请先调用compose()生成一个EmailMessage,然后再调用send_mail(sender, receivers, msg)来发送邮件。
    ???+ Important
        必须在异步线程(即运行asyncio loop的线程)中调用此方法,否则会抛出异常。
        此方法返回一个Awaitable,您可以等待它完成,也可以忽略返回值,此时它将作为一个后台任务执行,但完成的时间不确定。
    Args:
        sender (str): [description]
        receivers (List[str]): [description]
        msg (EmailMessage, optional): [description]. Defaults to None.
        host (str, optional): [description]. Defaults to None.
        port (int, optional): [description]. Defaults to 25.
        cc (List[str], optional): [description]. Defaults to None.
        bcc (List[str], optional): [description]. Defaults to None.
        subject (str, optional): [description]. Defaults to None.
        plain (str, optional): [description]. Defaults to None.
        username (str, optional): the username used to logon to mail server. if not provided, then `sender` is used.
    Returns:
        发送消息的后台任务。您可以使用此返回句柄来取消任务。
    """
    if all([msg is not None, subject is not None or body is not None]):
        raise TypeError("msg参数与subject/body只能提供其中之一")
    elif all([msg is None, subject is None, body is None]):
        raise TypeError("必须提供msg参数或者subjecdt/body参数")
    msg = msg or EmailMessage()
    if isinstance(receivers, str):
        receivers = [receivers]
    msg["From"] = sender
    msg["To"] = ", ".join(receivers)
    if subject:
        msg["subject"] = subject
    if body:
        msg.set_content(body)
    if cc:
        msg["Cc"] = ", ".join(cc)
    if bcc:
        msg["Bcc"] = ", ".join(bcc)
    username = username or sender
    if host is None:
        host = sender.split("@")[-1]
    task = asyncio.create_task(
        aiosmtplib.send(
            msg, hostname=host, port=port, username=sender, password=password
        )
    )
    return task