Skip to content

Omicron提供数据持久化、时间(日历、triggers)、行情数据model、基础运算和基础量化因子

close() async

关闭与缓存的连接

Source code in omicron/__init__.py
async def close():
    """关闭与缓存的连接"""

    try:
        await cache.close()
    except Exception as e:  # noqa
        pass

init(app_cache=5) async

初始化Omicron

初始化influxDB, 缓存等连接, 并加载日历和证券列表

上述初始化的连接,应该在程序退出时,通过调用close()关闭

Source code in omicron/__init__.py
async def init(app_cache: int = 5):
    """初始化Omicron

    初始化influxDB, 缓存等连接, 并加载日历和证券列表

    上述初始化的连接,应该在程序退出时,通过调用`close()`关闭
    """
    global cache

    await cache.init(app=app_cache)
    await tf.init()

    from omicron.models.security import Security

    await Security.init()

Extensions package

decimals

math_round(x, digits)

由于浮点数的表示问题,很多语言的round函数与数学上的round函数不一致。下面的函数结果与数学上的一致。

Parameters:

Name Type Description Default
x float

要进行四舍五入的数字

required
digits int

小数点后保留的位数

required
Source code in omicron/extensions/decimals.py
def math_round(x: float, digits: int):
    """由于浮点数的表示问题,很多语言的round函数与数学上的round函数不一致。下面的函数结果与数学上的一致。

    Args:
        x: 要进行四舍五入的数字
        digits: 小数点后保留的位数

    """

    return int(x * (10**digits) + copysign(0.5, x)) / (10**digits)

price_equal(x, y)

判断股价是否相等

Parameters:

Name Type Description Default
x

价格1

required
y

价格2

required

Returns:

Type Description
bool

如果相等则返回True,否则返回False

Source code in omicron/extensions/decimals.py
def price_equal(x: float, y: float) -> bool:
    """判断股价是否相等

    Args:
        x : 价格1
        y : 价格2

    Returns:
        如果相等则返回True,否则返回False
    """
    return abs(math_round(x, 2) - math_round(y, 2)) < 1e-2

np

Extension function related to numpy

array_math_round(arr, digits)

将一维数组arr的数据进行四舍五入

numpy.around的函数并不是数学上的四舍五入,对1.5和2.5进行round的结果都会变成2,在金融领域计算中,我们必须使用数学意义上的四舍五入。

Parameters:

Name Type Description Default
arr ArrayLike

输入数组

required
digits int required

Returns:

Type Description
np.ndarray

四舍五入后的一维数组

Source code in omicron/extensions/np.py
def array_math_round(arr: Union[float, ArrayLike], digits: int) -> np.ndarray:
    """将一维数组arr的数据进行四舍五入

    numpy.around的函数并不是数学上的四舍五入,对1.5和2.5进行round的结果都会变成2,在金融领域计算中,我们必须使用数学意义上的四舍五入。

    Args:
        arr (ArrayLike): 输入数组
        digits (int):

    Returns:
        np.ndarray: 四舍五入后的一维数组
    """
    # 如果是单个元素,则直接返回
    if isinstance(arr, float):
        return decimals.math_round(arr, digits)

    f = np.vectorize(lambda x: decimals.math_round(x, digits))
    return f(arr)

array_price_equal(price1, price2)

判断两个价格数组是否相等

Parameters:

Name Type Description Default
price1 ArrayLike

价格数组

required
price2 ArrayLike

价格数组

required

Returns:

Type Description
np.ndarray

判断结果

Source code in omicron/extensions/np.py
def array_price_equal(price1: ArrayLike, price2: ArrayLike) -> np.ndarray:
    """判断两个价格数组是否相等

    Args:
        price1 (ArrayLike): 价格数组
        price2 (ArrayLike): 价格数组

    Returns:
        np.ndarray: 判断结果
    """
    price1 = array_math_round(price1, 2)
    price2 = array_math_round(price2, 2)

    return abs(price1 - price2) < 1e-2

bars_since(condition, default=None)

Return the number of bars since condition sequence was last True, or if never, return default.

1
2
3
>>> condition = [True, True, False]
>>> bars_since(condition)
1
Source code in omicron/extensions/np.py
def bars_since(condition: Sequence[bool], default=None) -> int:
    """
    Return the number of bars since `condition` sequence was last `True`,
    or if never, return `default`.

        >>> condition = [True, True, False]
        >>> bars_since(condition)
        1
    """
    return next(compress(range(len(condition)), reversed(condition)), default)

bin_cut(arr, n)

将数组arr切分成n份

todo: use padding + reshape to boost performance

Parameters:

Name Type Description Default
arr [type]

[description]

required
n [type]

[description]

required

Returns:

Type Description
[type]

[description]

Source code in omicron/extensions/np.py
def bin_cut(arr: list, n: int):
    """将数组arr切分成n份

    todo: use padding + reshape to boost performance
    Args:
        arr ([type]): [description]
        n ([type]): [description]

    Returns:
        [type]: [description]
    """
    result = [[] for i in range(n)]

    for i, e in enumerate(arr):
        result[i % n].append(e)

    return [e for e in result if len(e)]

count_between(arr, start, end)

计算数组中,start元素与end元素之间共有多少个元素

要求arr必须是已排序。计算结果会包含区间边界点。

Examples:

>>> arr = [20050104, 20050105, 20050106, 20050107, 20050110, 20050111]
>>> count_between(arr, 20050104, 20050111)
6
>>> count_between(arr, 20050104, 20050109)
4
Source code in omicron/extensions/np.py
def count_between(arr, start, end):
    """计算数组中,`start`元素与`end`元素之间共有多少个元素

    要求arr必须是已排序。计算结果会包含区间边界点。

    Examples:
        >>> arr = [20050104, 20050105, 20050106, 20050107, 20050110, 20050111]
        >>> count_between(arr, 20050104, 20050111)
        6

        >>> count_between(arr, 20050104, 20050109)
        4
    """
    pos_start = np.searchsorted(arr, start, side="right")
    pos_end = np.searchsorted(arr, end, side="right")

    counter = pos_end - pos_start + 1
    if start < arr[0]:
        counter -= 1
    if end > arr[-1]:
        counter -= 1

    return counter

dataframe_to_structured_array(df, dtypes=None)

convert dataframe (with all columns, and index possibly) to numpy structured arrays

len(dtypes) should be either equal to len(df.columns) or len(df.columns) + 1. In the later case, it implies to include df.index into converted array.

Parameters:

Name Type Description Default
df DataFrame

the one needs to be converted

required
dtypes List[Tuple]

Defaults to None. If it's None, then dtypes of df is used, in such case, the index of df will not be converted.

None

Returns:

Type Description
ArrayLike

[description]

Source code in omicron/extensions/np.py
def dataframe_to_structured_array(
    df: DataFrame, dtypes: List[Tuple] = None
) -> ArrayLike:
    """convert dataframe (with all columns, and index possibly) to numpy structured arrays

    `len(dtypes)` should be either equal to `len(df.columns)` or `len(df.columns) + 1`. In the later case, it implies to include `df.index` into converted array.

    Args:
        df: the one needs to be converted
        dtypes: Defaults to None. If it's `None`, then dtypes of `df` is used, in such case, the `index` of `df` will not be converted.

    Returns:
        ArrayLike: [description]
    """
    v = df
    if dtypes is not None:
        dtypes_in_dict = {key: value for key, value in dtypes}

        col_len = len(df.columns)
        if len(dtypes) == col_len + 1:
            v = df.reset_index()

            rename_index_to = set(dtypes_in_dict.keys()).difference(set(df.columns))
            v.rename(columns={"index": list(rename_index_to)[0]}, inplace=True)
        elif col_len != len(dtypes):
            raise ValueError(
                f"length of dtypes should be either {col_len} or {col_len + 1}, is {len(dtypes)}"
            )

        # re-arrange order of dtypes, in order to align with df.columns
        dtypes = []
        for name in v.columns:
            dtypes.append((name, dtypes_in_dict[name]))
    else:
        dtypes = df.dtypes

    return np.array(np.rec.fromrecords(v.values), dtype=dtypes)

dict_to_numpy_array(d, dtype)

convert dictionary to numpy array

Examples:

d = {"aaron": 5, "jack": 6} dtype = [("name", "S8"), ("score", "<i4")] dict_to_numpy_array(d, dtype) array([(b'aaron', 5), (b'jack', 6)], dtype=[('name', 'S8'), ('score', '<i4')])

Parameters:

Name Type Description Default
d dict

[description]

required
dtype List[Tuple]

[description]

required

Returns:

Type Description
np.array

[description]

Source code in omicron/extensions/np.py
def dict_to_numpy_array(d: dict, dtype: List[Tuple]) -> np.array:
    """convert dictionary to numpy array

    Examples:

    >>> d = {"aaron": 5, "jack": 6}
    >>> dtype = [("name", "S8"), ("score", "<i4")]
    >>> dict_to_numpy_array(d, dtype)
    array([(b'aaron', 5), (b'jack', 6)],
          dtype=[('name', 'S8'), ('score', '<i4')])

    Args:
        d (dict): [description]
        dtype (List[Tuple]): [description]

    Returns:
        np.array: [description]
    """
    return np.fromiter(d.items(), dtype=dtype, count=len(d))

fill_nan(ts)

将ts中的NaN替换为其前值

如果ts起头的元素为NaN,则用第一个非NaN元素替换。

如果所有元素都为NaN,则无法替换。

Examples:

>>> arr = np.arange(6, dtype=np.float32)
>>> arr[3:5] = np.NaN
>>> fill_nan(arr)
...
array([0., 1., 2., 2., 2., 5.], dtype=float32)
>>> arr = np.arange(6, dtype=np.float32)
>>> arr[0:2] = np.nan
>>> fill_nan(arr)
...
array([2., 2., 2., 3., 4., 5.], dtype=float32)

Parameters:

Name Type Description Default
ts np.array

[description]

required
Source code in omicron/extensions/np.py
def fill_nan(ts: np.ndarray):
    """将ts中的NaN替换为其前值

    如果ts起头的元素为NaN,则用第一个非NaN元素替换。

    如果所有元素都为NaN,则无法替换。

    Example:
        >>> arr = np.arange(6, dtype=np.float32)
        >>> arr[3:5] = np.NaN
        >>> fill_nan(arr)
        ... # doctest: +NORMALIZE_WHITESPACE
        array([0., 1., 2., 2., 2., 5.], dtype=float32)

        >>> arr = np.arange(6, dtype=np.float32)
        >>> arr[0:2] = np.nan
        >>> fill_nan(arr)
        ... # doctest: +NORMALIZE_WHITESPACE
        array([2., 2., 2., 3., 4., 5.], dtype=float32)

    Args:
        ts (np.array): [description]
    """
    if np.all(np.isnan(ts)):
        raise ValueError("all of ts are NaN")

    if ts[0] is None or math.isnan(ts[0]):
        idx = np.argwhere(~np.isnan(ts))[0]
        ts[0] = ts[idx]

    mask = np.isnan(ts)
    idx = np.where(~mask, np.arange(mask.size), 0)
    np.maximum.accumulate(idx, out=idx)
    return ts[idx]

find_runs(x)

Find runs of consecutive items in an array.

Parameters:

Name Type Description Default
x ArrayLike

the sequence to find runs in

required

Returns:

Type Description
Tuple[np.ndarray, np.ndarray, np.ndarray]

A tuple of unique values, start indices, and length of runs

Source code in omicron/extensions/np.py
def find_runs(x: ArrayLike) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Find runs of consecutive items in an array.

    Args:
        x: the sequence to find runs in

    Returns:
        A tuple of unique values, start indices, and length of runs
    """

    # ensure array
    x = np.asanyarray(x)
    if x.ndim != 1:
        raise ValueError("only 1D array supported")
    n = x.shape[0]

    # handle empty array
    if n == 0:
        return np.array([]), np.array([]), np.array([])

    else:
        # find run starts
        loc_run_start = np.empty(n, dtype=bool)
        loc_run_start[0] = True
        np.not_equal(x[:-1], x[1:], out=loc_run_start[1:])
        run_starts = np.nonzero(loc_run_start)[0]

        # find run values
        run_values = x[loc_run_start]

        # find run lengths
        run_lengths = np.diff(np.append(run_starts, n))

        return run_values, run_starts, run_lengths

floor(arr, item)

在数据arr中,找到小于等于item的那一个值。如果item小于所有arr元素的值,返回arr[0];如果item 大于所有arr元素的值,返回arr[-1]

minute_frames_floor不同的是,本函数不做回绕与进位.

Examples:

>>> a = [3, 6, 9]
>>> floor(a, -1)
3
>>> floor(a, 9)
9
>>> floor(a, 10)
9
>>> floor(a, 4)
3
>>> floor(a,10)
9

Parameters:

Name Type Description Default
arr required
item required
Source code in omicron/extensions/np.py
def floor(arr, item):
    """
    在数据arr中,找到小于等于item的那一个值。如果item小于所有arr元素的值,返回arr[0];如果item
    大于所有arr元素的值,返回arr[-1]

    与`minute_frames_floor`不同的是,本函数不做回绕与进位.

    Examples:
        >>> a = [3, 6, 9]
        >>> floor(a, -1)
        3
        >>> floor(a, 9)
        9
        >>> floor(a, 10)
        9
        >>> floor(a, 4)
        3
        >>> floor(a,10)
        9

    Args:
        arr:
        item:

    Returns:

    """
    if item < arr[0]:
        return arr[0]
    index = np.searchsorted(arr, item, side="right")
    return arr[index - 1]

join_by_left(key, r1, r2, mask=True)

左连接 r1, r2 by key

如果r1中存在r2中没有的行,则该行对应的r2中的那些字段将被mask,或者填充随机数。 same as numpy.lib.recfunctions.join_by(key, r1, r2, jointype='leftouter'), but allows r1 have duplicate keys

Reference: stackoverflow

Examples:

>>> # to join the following
>>> # [[ 1, 2],
>>> #  [ 1, 3],   x   [[1, 5],
>>> #  [ 2, 3]]        [4, 7]]
>>> # only first two rows in left will be joined
>>> r1 = np.array([(1, 2), (1,3), (2,3)], dtype=[('seq', 'i4'), ('score', 'i4')])
>>> r2 = np.array([(1, 5), (4,7)], dtype=[('seq', 'i4'), ('age', 'i4')])
>>> joined = join_by_left('seq', r1, r2)
>>> print(joined)
[(1, 2, 5) (1, 3, 5) (2, 3, --)]
>>> print(joined.dtype)
(numpy.record, [('seq', '<i4'), ('score', '<i4'), ('age', '<i4')])
>>> joined[2][2]
masked
>>> joined.tolist()[2][2] == None
True

Parameters:

Name Type Description Default
key

join关键字

required
r1

数据集1

required
r2

数据集2

required

Returns:

Type Description

a numpy array

Source code in omicron/extensions/np.py
def join_by_left(key, r1, r2, mask=True):
    """左连接 `r1`, `r2` by `key`

    如果`r1`中存在`r2`中没有的行,则该行对应的`r2`中的那些字段将被mask,或者填充随机数。
    same as numpy.lib.recfunctions.join_by(key, r1, r2, jointype='leftouter'), but allows r1 have duplicate keys

    [Reference: stackoverflow](https://stackoverflow.com/a/53261882/13395693)

    Examples:
        >>> # to join the following
        >>> # [[ 1, 2],
        >>> #  [ 1, 3],   x   [[1, 5],
        >>> #  [ 2, 3]]        [4, 7]]
        >>> # only first two rows in left will be joined

        >>> r1 = np.array([(1, 2), (1,3), (2,3)], dtype=[('seq', 'i4'), ('score', 'i4')])
        >>> r2 = np.array([(1, 5), (4,7)], dtype=[('seq', 'i4'), ('age', 'i4')])
        >>> joined = join_by_left('seq', r1, r2)
        >>> print(joined)
        [(1, 2, 5) (1, 3, 5) (2, 3, --)]

        >>> print(joined.dtype)
        (numpy.record, [('seq', '<i4'), ('score', '<i4'), ('age', '<i4')])

        >>> joined[2][2]
        masked

        >>> joined.tolist()[2][2] == None
        True

    Args:
        key : join关键字
        r1 : 数据集1
        r2 : 数据集2

    Returns:
        a numpy array
    """
    # figure out the dtype of the result array
    descr1 = r1.dtype.descr
    descr2 = [d for d in r2.dtype.descr if d[0] not in r1.dtype.names]
    descrm = descr1 + descr2

    # figure out the fields we'll need from each array
    f1 = [d[0] for d in descr1]
    f2 = [d[0] for d in descr2]

    # cache the number of columns in f1
    ncol1 = len(f1)

    # get a dict of the rows of r2 grouped by key
    rows2 = {}
    for row2 in r2:
        rows2.setdefault(row2[key], []).append(row2)

    # figure out how many rows will be in the result
    nrowm = 0
    for k1 in r1[key]:
        if k1 in rows2:
            nrowm += len(rows2[k1])
        else:
            nrowm += 1

    # allocate the return array
    # ret = np.full((nrowm, ), fill, dtype=descrm)
    _ret = np.recarray(nrowm, dtype=descrm)
    if mask:
        ret = np.ma.array(_ret, mask=True)
    else:
        ret = _ret

    # merge the data into the return array
    i = 0
    for row1 in r1:
        if row1[key] in rows2:
            for row2 in rows2[row1[key]]:
                ret[i] = tuple(row1[f1]) + tuple(row2[f2])
                i += 1
        else:
            for j in range(ncol1):
                ret[i][j] = row1[j]
            i += 1

    return ret

numpy_append_fields(base, names, data, dtypes)

给现有的数组base增加新的字段

实现了numpy.lib.recfunctions.rec_append_fields的功能。提供这个功能,是因为rec_append_fields不能处理data元素的类型为Object的情况。

新增的数据列将顺序排列在其它列的右边。

Examples:

>>> # 新增单个字段
>>> import numpy
>>> old = np.array([i for i in range(3)], dtype=[('col1', '<f4')])
>>> new_list = [2 * i for i in range(3)]
>>> res = numpy_append_fields(old, 'new_col', new_list, [('new_col', '<f4')])
>>> print(res)
...
[(0., 0.) (1., 2.) (2., 4.)]
>>> # 新增多个字段
>>> data = [res['col1'].tolist(), res['new_col'].tolist()]
>>> print(numpy_append_fields(old, ('col3', 'col4'), data, [('col3', '<f4'), ('col4', '<f4')]))
...
[(0., 0., 0.) (1., 1., 2.) (2., 2., 4.)]

Parameters:

Name Type Description Default
base [numpy.array]

基础数组

required
names [type]

新增字段的名字,可以是字符串(单字段的情况),也可以是字符串列表

required
data list

增加的字段的数据,list类型

required
dtypes [type]

新增字段的dtype

required
Source code in omicron/extensions/np.py
def numpy_append_fields(
    base: np.ndarray, names: Union[str, List[str]], data: List, dtypes: List
) -> np.ndarray:
    """给现有的数组`base`增加新的字段

    实现了`numpy.lib.recfunctions.rec_append_fields`的功能。提供这个功能,是因为`rec_append_fields`不能处理`data`元素的类型为Object的情况。

    新增的数据列将顺序排列在其它列的右边。

    Example:
        >>> # 新增单个字段
        >>> import numpy
        >>> old = np.array([i for i in range(3)], dtype=[('col1', '<f4')])
        >>> new_list = [2 * i for i in range(3)]
        >>> res = numpy_append_fields(old, 'new_col', new_list, [('new_col', '<f4')])
        >>> print(res)
        ... # doctest: +NORMALIZE_WHITESPACE
        [(0., 0.) (1., 2.) (2., 4.)]

        >>> # 新增多个字段
        >>> data = [res['col1'].tolist(), res['new_col'].tolist()]
        >>> print(numpy_append_fields(old, ('col3', 'col4'), data, [('col3', '<f4'), ('col4', '<f4')]))
        ... # doctest: +NORMALIZE_WHITESPACE
        [(0., 0., 0.) (1., 1., 2.) (2., 2., 4.)]

    Args:
        base ([numpy.array]): 基础数组
        names ([type]): 新增字段的名字,可以是字符串(单字段的情况),也可以是字符串列表
        data (list): 增加的字段的数据,list类型
        dtypes ([type]): 新增字段的dtype
    """
    if isinstance(names, str):
        names = [names]
        data = [data]

    result = np.empty(base.shape, dtype=base.dtype.descr + dtypes)
    for col in base.dtype.names:
        result[col] = base[col]

    for i in range(len(names)):
        result[names[i]] = data[i]

    return result

remove_nan(ts)

ts中去除NaN

Parameters:

Name Type Description Default
ts np.array

[description]

required

Returns:

Type Description
np.array

[description]

Source code in omicron/extensions/np.py
def remove_nan(ts: np.ndarray) -> np.ndarray:
    """从`ts`中去除NaN

    Args:
        ts (np.array): [description]

    Returns:
        np.array: [description]
    """
    return ts[~np.isnan(ts.astype(float))]

replace_zero(ts, replacement=None)

将ts中的0替换为前值, 处理volume数据时常用用到

如果提供了replacement, 则替换为replacement

Source code in omicron/extensions/np.py
def replace_zero(ts: np.ndarray, replacement=None) -> np.ndarray:
    """将ts中的0替换为前值, 处理volume数据时常用用到

    如果提供了replacement, 则替换为replacement

    """
    if replacement is not None:
        return np.where(ts == 0, replacement, ts)

    if np.all(ts == 0):
        raise ValueError("all of ts are 0")

    if ts[0] == 0:
        idx = np.argwhere(ts != 0)[0]
        ts[0] = ts[idx]

    mask = ts == 0
    idx = np.where(~mask, np.arange(mask.size), 0)
    np.maximum.accumulate(idx, out=idx)
    return ts[idx]

rolling(x, win, func)

对序列x进行窗口滑动计算。

如果func要实现的功能是argmax, argmin, max, mean, median, min, rank, std, sum, var等,move_argmax,请使用bottleneck中的move_argmin, move_max, move_mean, move_median, move_min move_rank, move_std, move_sum, move_var。这些函数的性能更好。

Parameters:

Name Type Description Default
x [type]

[description]

required
win [type]

[description]

required
func [type]

[description]

required

Returns:

Type Description
[type]

[description]

Source code in omicron/extensions/np.py
def rolling(x, win, func):
    """对序列`x`进行窗口滑动计算。

    如果`func`要实现的功能是argmax, argmin, max, mean, median, min, rank, std, sum, var等,move_argmax,请使用bottleneck中的move_argmin, move_max, move_mean, move_median, move_min move_rank, move_std, move_sum, move_var。这些函数的性能更好。

    Args:
        x ([type]): [description]
        win ([type]): [description]
        func ([type]): [description]

    Returns:
        [type]: [description]
    """
    results = []
    for subarray in sliding_window_view(x, window_shape=win):
        results.append(func(subarray))

    return np.array(results)

shift(arr, start, offset)

在numpy数组arr中,找到start(或者最接近的一个),取offset对应的元素。

要求arr已排序。offset为正,表明向后移位;offset为负,表明向前移位

Examples:

>>> arr = [20050104, 20050105, 20050106, 20050107, 20050110, 20050111]
>>> shift(arr, 20050104, 1)
20050105
>>> shift(arr, 20050105, -1)
20050104
>>> # 起始点已右越界,且向右shift,返回起始点
>>> shift(arr, 20050120, 1)
20050120

Parameters:

Name Type Description Default
arr

已排序的数组

required
start

numpy可接受的数据类型

required
offset int

[description]

required

Returns:

Type Description

移位后得到的元素值

Source code in omicron/extensions/np.py
def shift(arr, start, offset):
    """在numpy数组arr中,找到start(或者最接近的一个),取offset对应的元素。

    要求`arr`已排序。`offset`为正,表明向后移位;`offset`为负,表明向前移位

    Examples:
        >>> arr = [20050104, 20050105, 20050106, 20050107, 20050110, 20050111]
        >>> shift(arr, 20050104, 1)
        20050105

        >>> shift(arr, 20050105, -1)
        20050104

        >>> # 起始点已右越界,且向右shift,返回起始点
        >>> shift(arr, 20050120, 1)
        20050120


    Args:
        arr : 已排序的数组
        start : numpy可接受的数据类型
        offset (int): [description]

    Returns:
        移位后得到的元素值
    """
    pos = np.searchsorted(arr, start, side="right")

    if pos + offset - 1 >= len(arr):
        return start
    else:
        return arr[pos + offset - 1]

smallest_n_argpos(ts, n)

get smallest n (min->max) elements and return argpos which its value ordered in ascent

Examples:

>>> smallest_n_argpos([np.nan, 4, 3, 9, 8, 5, 2, 1, 0, 6, 7], 2)
array([8, 7])

Parameters:

Name Type Description Default
ts np.array

输入的数组

required
n int

取最小的n个元素

required

Returns:

Type Description
np.array

[description]

Source code in omicron/extensions/np.py
def smallest_n_argpos(ts: np.array, n: int) -> np.array:
    """get smallest n (min->max) elements and return argpos which its value ordered in ascent

    Example:
        >>> smallest_n_argpos([np.nan, 4, 3, 9, 8, 5, 2, 1, 0, 6, 7], 2)
        array([8, 7])

    Args:
        ts (np.array): 输入的数组
        n (int): 取最小的n个元素

    Returns:
        np.array: [description]
    """
    return np.argsort(ts)[:n]

to_pydatetime(tm)

将numpy.datetime64对象转换成为python的datetime对象

numpy.ndarray.item()方法可用以将任何numpy对象转换成python对象,推荐在任何适用的地方使用.item()方法,而不是本方法。示例:

1
2
3
4
    arr = np.array(['2022-09-08', '2022-09-09'], dtype='datetime64[s]')
    arr.item(0) # output is datetime.datetime(2022, 9, 8, 0, 0)

    arr[1].item() # output is datetime.datetime(2022, 9, 9, 0, 0)

Parameters:

Name Type Description Default
tm

the input numpy datetime object

required

Returns:

Type Description
datetime.datetime

python datetime object

.. deprecated:: 2.0.0 use tm.item() instead

Source code in omicron/extensions/np.py
@deprecated("2.0.0", details="use `tm.item()` instead")
def to_pydatetime(tm: np.datetime64) -> datetime.datetime:
    """将numpy.datetime64对象转换成为python的datetime对象

    numpy.ndarray.item()方法可用以将任何numpy对象转换成python对象,推荐在任何适用的地方使用.item()方法,而不是本方法。示例:
    ```
        arr = np.array(['2022-09-08', '2022-09-09'], dtype='datetime64[s]')
        arr.item(0) # output is datetime.datetime(2022, 9, 8, 0, 0)

        arr[1].item() # output is datetime.datetime(2022, 9, 9, 0, 0)
    ```

    Args:
        tm : the input numpy datetime object

    Returns:
        python datetime object
    """
    unix_epoch = np.datetime64(0, "s")
    one_second = np.timedelta64(1, "s")
    seconds_since_epoch = (tm - unix_epoch) / one_second

    return datetime.datetime.utcfromtimestamp(seconds_since_epoch)

top_n_argpos(ts, n)

get top n (max->min) elements and return argpos which its value ordered in descent

Examples:

>>> top_n_argpos([np.nan, 4, 3, 9, 8, 5, 2, 1, 0, 6, 7], 2)
array([3, 4])

Parameters:

Name Type Description Default
ts np.array

[description]

required
n int

[description]

required

Returns:

Type Description
np.array

[description]

Source code in omicron/extensions/np.py
def top_n_argpos(ts: np.array, n: int) -> np.array:
    """get top n (max->min) elements and return argpos which its value ordered in descent

    Example:
        >>> top_n_argpos([np.nan, 4, 3, 9, 8, 5, 2, 1, 0, 6, 7], 2)
        array([3, 4])

    Args:
        ts (np.array): [description]
        n (int): [description]

    Returns:
        np.array: [description]
    """
    ts_ = np.copy(ts)
    ts_[np.isnan(ts_)] = -np.inf
    return np.argsort(ts_)[-n:][::-1]

Notify package

dingtalk

DingTalkMessage

钉钉的机器人消息推送类,封装了常用的消息类型以及加密算法 需要在配置文件中配置钉钉的机器人的access_token 如果配置了加签,需要在配置文件中配置钉钉的机器人的secret 如果配置了自定义关键词,需要在配置文件中配置钉钉的机器人的keyword,多个关键词用英文逗号分隔 全部的配置文件示例如下, 其中secret和keyword可以不配置, access_token必须配置 notify: dingtalk_access_token: xxxx dingtalk_secret: xxxx

Source code in omicron/notify/dingtalk.py
class DingTalkMessage:
    """
    钉钉的机器人消息推送类,封装了常用的消息类型以及加密算法
    需要在配置文件中配置钉钉的机器人的access_token
    如果配置了加签,需要在配置文件中配置钉钉的机器人的secret
    如果配置了自定义关键词,需要在配置文件中配置钉钉的机器人的keyword,多个关键词用英文逗号分隔
    全部的配置文件示例如下, 其中secret和keyword可以不配置, access_token必须配置
    notify:
      dingtalk_access_token: xxxx
      dingtalk_secret: xxxx
    """

    url = "https://oapi.dingtalk.com/robot/send"

    @classmethod
    def _get_access_token(cls):
        """获取钉钉机器人的access_token"""
        if hasattr(cfg.notify, "dingtalk_access_token"):
            return cfg.notify.dingtalk_access_token
        else:
            logger.error(
                "Dingtalk not configured, please add the following items:\n"
                "notify:\n"
                "  dingtalk_access_token: xxxx\n"
                "  dingtalk_secret: xxxx\n"
            )
            raise ConfigError("dingtalk_access_token not found")

    @classmethod
    def _get_secret(cls):
        """获取钉钉机器人的secret"""
        if hasattr(cfg.notify, "dingtalk_secret"):
            return cfg.notify.dingtalk_secret
        else:
            return None

    @classmethod
    def _get_url(cls):
        """获取钉钉机器人的消息推送地址,将签名和时间戳拼接在url后面"""
        access_token = cls._get_access_token()
        url = f"{cls.url}?access_token={access_token}"
        secret = cls._get_secret()
        if secret:
            timestamp, sign = cls._get_sign(secret)
            url = f"{url}&timestamp={timestamp}&sign={sign}"
        return url

    @classmethod
    def _get_sign(cls, secret: str):
        """获取签名发送给钉钉机器人"""
        timestamp = str(round(time.time() * 1000))
        secret_enc = secret.encode("utf-8")
        string_to_sign = "{}\n{}".format(timestamp, secret)
        string_to_sign_enc = string_to_sign.encode("utf-8")
        hmac_code = hmac.new(
            secret_enc, string_to_sign_enc, digestmod=hashlib.sha256
        ).digest()
        sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
        return timestamp, sign

    @classmethod
    def _send(cls, msg):
        """发送消息到钉钉机器人"""
        url = cls._get_url()
        response = httpx.post(url, json=msg, timeout=30)
        if response.status_code != 200:
            logger.error(
                f"failed to send message, content: {msg}, response from Dingtalk: {response.content.decode()}"
            )
            return
        rsp = json.loads(response.content)
        if rsp.get("errcode") != 0:
            logger.error(
                f"failed to send message, content: {msg}, response from Dingtalk: {rsp}"
            )
        return response.content.decode()

    @classmethod
    async def _send_async(cls, msg):
        """发送消息到钉钉机器人"""
        url = cls._get_url()
        async with httpx.AsyncClient() as client:
            r = await client.post(url, json=msg, timeout=30)
            if r.status_code != 200:
                logger.error(
                    f"failed to send message, content: {msg}, response from Dingtalk: {r.content.decode()}"
                )
                return
            rsp = json.loads(r.content)
            if rsp.get("errcode") != 0:
                logger.error(
                    f"failed to send message, content: {msg}, response from Dingtalk: {rsp}"
                )
            return r.content.decode()

    @classmethod
    @deprecated("2.0.0", details="use function `ding` instead")
    def text(cls, content):
        msg = {"text": {"content": content}, "msgtype": "text"}
        return cls._send(msg)

text(cls, content) classmethod

.. deprecated:: 2.0.0 use function ding instead

Source code in omicron/notify/dingtalk.py
@classmethod
@deprecated("2.0.0", details="use function `ding` instead")
def text(cls, content):
    msg = {"text": {"content": content}, "msgtype": "text"}
    return cls._send(msg)

ding(msg)

发送消息到钉钉机器人

支持发送纯文本消息和markdown格式的文本消息。如果要发送markdown格式的消息,请通过字典传入,必须包含包含"title"和"text"两个字段。更详细信息,请见钉钉开放平台文档

Important

必须在异步线程(即运行asyncio loop的线程)中调用此方法,否则会抛出异常。 此方法返回一个Awaitable,您可以等待它完成,也可以忽略返回值,此时它将作为一个后台任务执行,但完成的时间不确定。

Parameters:

Name Type Description Default
msg Union[str, dict]

待发送消息。

required

Returns:

Type Description
Awaitable

发送消息的后台任务。您可以使用此返回句柄来取消任务。

Source code in omicron/notify/dingtalk.py
def ding(msg: Union[str, dict]) -> Awaitable:
    """发送消息到钉钉机器人

    支持发送纯文本消息和markdown格式的文本消息。如果要发送markdown格式的消息,请通过字典传入,必须包含包含"title"和"text"两个字段。更详细信息,请见[钉钉开放平台文档](https://open.dingtalk.com/document/orgapp-server/message-type)

    ???+ Important
        必须在异步线程(即运行asyncio loop的线程)中调用此方法,否则会抛出异常。
        此方法返回一个Awaitable,您可以等待它完成,也可以忽略返回值,此时它将作为一个后台任务执行,但完成的时间不确定。

    Args:
        msg: 待发送消息。

    Returns:
        发送消息的后台任务。您可以使用此返回句柄来取消任务。
    """
    if isinstance(msg, str):
        msg_ = {"text": {"content": msg}, "msgtype": "text"}
    elif isinstance(msg, dict):
        msg_ = {
            "msgtype": "markdown",
            "markdown": {"title": msg["title"], "text": msg["text"]},
        }
    else:
        raise TypeError

    task = asyncio.create_task(DingTalkMessage._send_async(msg_))
    return task

mail

compose(subject, plain_txt=None, html=None, attachment=None)

编写MIME邮件。

Parameters:

Name Type Description Default
subject str

邮件主题

required
plain_txt str

纯文本格式的邮件内容

None
html str

html格式的邮件内容. Defaults to None.

None
attachment str

附件文件名

None

Returns:

Type Description
EmailMessage

MIME mail

Source code in omicron/notify/mail.py
def compose(
    subject: str, plain_txt: str = None, html: str = None, attachment: str = None
) -> EmailMessage:
    """编写MIME邮件。

    Args:
        subject (str): 邮件主题
        plain_txt (str): 纯文本格式的邮件内容
        html (str, optional): html格式的邮件内容. Defaults to None.
        attachment (str, optional): 附件文件名
    Returns:
        MIME mail
    """
    msg = EmailMessage()

    msg["Subject"] = subject

    if html:
        msg.preamble = plain_txt or ""
        msg.set_content(html, subtype="html")
    else:
        assert plain_txt, "Either plain_txt or html is required."
        msg.set_content(plain_txt)

    if attachment:
        ctype, encoding = mimetypes.guess_type(attachment)
        if ctype is None or encoding is not None:
            ctype = "application/octet-stream"

        maintype, subtype = ctype.split("/", 1)
        with open(attachment, "rb") as f:
            msg.add_attachment(
                f.read(), maintype=maintype, subtype=subtype, filename=attachment
            )

    return msg

mail_notify(subject=None, body=None, msg=None, html=False, receivers=None)

发送邮件通知。

发送者、接收者及邮件服务器等配置请通过cfg4py配置:

1
2
3
4
5
notify:
    mail_from: aaron_yang@jieyu.ai
    mail_to:
        - code@jieyu.ai
    mail_server: smtp.ym.163.com
验证密码请通过环境变量MAIL_PASSWORD来配置。

subject/body与msg必须提供其一。

Important

必须在异步线程(即运行asyncio loop的线程)中调用此方法,否则会抛出异常。 此方法返回一个Awaitable,您可以等待它完成,也可以忽略返回值,此时它将作为一个后台任务执行,但完成的时间不确定。

Parameters:

Name Type Description Default
msg EmailMessage

[description]. Defaults to None.

None
subject str

[description]. Defaults to None.

None
body str

[description]. Defaults to None.

None
html bool

body是否按html格式处理? Defaults to False.

False
receivers List[str], Optional

接收者信息。如果不提供,将使用预先配置的接收者信息。

None

Returns:

Type Description
Awaitable

发送消息的后台任务。您可以使用此返回句柄来取消任务。

Source code in omicron/notify/mail.py
def mail_notify(
    subject: str = None,
    body: str = None,
    msg: EmailMessage = None,
    html=False,
    receivers=None,
) -> Awaitable:
    """发送邮件通知。

    发送者、接收者及邮件服务器等配置请通过cfg4py配置:

    ```
    notify:
        mail_from: aaron_yang@jieyu.ai
        mail_to:
            - code@jieyu.ai
        mail_server: smtp.ym.163.com
    ```
    验证密码请通过环境变量`MAIL_PASSWORD`来配置。

    subject/body与msg必须提供其一。

    ???+ Important
        必须在异步线程(即运行asyncio loop的线程)中调用此方法,否则会抛出异常。
        此方法返回一个Awaitable,您可以等待它完成,也可以忽略返回值,此时它将作为一个后台任务执行,但完成的时间不确定。

    Args:
        msg (EmailMessage, optional): [description]. Defaults to None.
        subject (str, optional): [description]. Defaults to None.
        body (str, optional): [description]. Defaults to None.
        html (bool, optional): body是否按html格式处理? Defaults to False.
        receivers (List[str], Optional): 接收者信息。如果不提供,将使用预先配置的接收者信息。

    Returns:
        发送消息的后台任务。您可以使用此返回句柄来取消任务。
    """
    if all([msg is not None, subject or body]):
        raise TypeError("msg参数与subject/body只能提供其中之一")
    elif all([msg is None, subject is None, body is None]):
        raise TypeError("必须提供msg参数或者subjecdt/body参数")

    if msg is None:
        if html:
            msg = compose(subject, html=body)
        else:
            msg = compose(subject, plain_txt=body)

    cfg = cfg4py.get_instance()
    if not receivers:
        receivers = cfg.notify.mail_to

    password = os.environ.get("MAIL_PASSWORD")
    return send_mail(
        cfg.notify.mail_from, receivers, password, msg, host=cfg.notify.mail_server
    )

send_mail(sender, receivers, password, msg=None, host=None, port=25, cc=None, bcc=None, subject=None, body=None, username=None)

发送邮件通知。

如果只发送简单的文本邮件,请使用 send_mail(sender, receivers, subject=subject, plain=plain)。如果要发送较复杂的带html和附件的邮件,请先调用compose()生成一个EmailMessage,然后再调用send_mail(sender, receivers, msg)来发送邮件。

Important

必须在异步线程(即运行asyncio loop的线程)中调用此方法,否则会抛出异常。 此方法返回一个Awaitable,您可以等待它完成,也可以忽略返回值,此时它将作为一个后台任务执行,但完成的时间不确定。

Parameters:

Name Type Description Default
sender str

[description]

required
receivers List[str]

[description]

required
msg EmailMessage

[description]. Defaults to None.

None
host str

[description]. Defaults to None.

None
port int

[description]. Defaults to 25.

25
cc List[str]

[description]. Defaults to None.

None
bcc List[str]

[description]. Defaults to None.

None
subject str

[description]. Defaults to None.

None
plain str

[description]. Defaults to None.

required
username str

the username used to logon to mail server. if not provided, then sender is used.

None

Returns:

Type Description
Awaitable

发送消息的后台任务。您可以使用此返回句柄来取消任务。

Source code in omicron/notify/mail.py
@retry(aiosmtplib.errors.SMTPConnectError, tries=3, backoff=2, delay=30, logger=logger)
def send_mail(
    sender: str,
    receivers: List[str],
    password: str,
    msg: EmailMessage = None,
    host: str = None,
    port: int = 25,
    cc: List[str] = None,
    bcc: List[str] = None,
    subject: str = None,
    body: str = None,
    username: str = None,
) -> Awaitable:
    """发送邮件通知。

    如果只发送简单的文本邮件,请使用 send_mail(sender, receivers, subject=subject, plain=plain)。如果要发送较复杂的带html和附件的邮件,请先调用compose()生成一个EmailMessage,然后再调用send_mail(sender, receivers, msg)来发送邮件。

    ???+ Important
        必须在异步线程(即运行asyncio loop的线程)中调用此方法,否则会抛出异常。
        此方法返回一个Awaitable,您可以等待它完成,也可以忽略返回值,此时它将作为一个后台任务执行,但完成的时间不确定。

    Args:
        sender (str): [description]
        receivers (List[str]): [description]
        msg (EmailMessage, optional): [description]. Defaults to None.
        host (str, optional): [description]. Defaults to None.
        port (int, optional): [description]. Defaults to 25.
        cc (List[str], optional): [description]. Defaults to None.
        bcc (List[str], optional): [description]. Defaults to None.
        subject (str, optional): [description]. Defaults to None.
        plain (str, optional): [description]. Defaults to None.
        username (str, optional): the username used to logon to mail server. if not provided, then `sender` is used.

    Returns:
        发送消息的后台任务。您可以使用此返回句柄来取消任务。
    """
    if all([msg is not None, subject is not None or body is not None]):
        raise TypeError("msg参数与subject/body只能提供其中之一")
    elif all([msg is None, subject is None, body is None]):
        raise TypeError("必须提供msg参数或者subjecdt/body参数")

    msg = msg or EmailMessage()

    if isinstance(receivers, str):
        receivers = [receivers]

    msg["From"] = sender
    msg["To"] = ", ".join(receivers)

    if subject:
        msg["subject"] = subject

    if body:
        msg.set_content(body)

    if cc:
        msg["Cc"] = ", ".join(cc)
    if bcc:
        msg["Bcc"] = ", ".join(bcc)

    username = username or sender

    if host is None:
        host = sender.split("@")[-1]

    task = asyncio.create_task(
        aiosmtplib.send(
            msg, hostname=host, port=port, username=sender, password=password
        )
    )

    return task