Skip to content

Flux

Flux - the query language builder for influxdb

Helper functions for building flux query expression

Source code in omicron/dal/influx/flux.py
class Flux(object):
    """Helper functions for building flux query expression"""

    EPOCH_START = datetime.datetime(1970, 1, 1, 0, 0, 0)

    def __init__(self, auto_pivot=True, no_sys_cols=True):
        """初始化Flux对象

        Args:
            auto_pivot : 是否自动将查询列字段组装成行. Defaults to True.
            no_sys_cols: 是否自动将系统字段删除. Defaults to True.请参考[drop_sys_cols][omicron.dal.influx.flux.Flux.drop_sys_cols]
        """
        self._cols = None
        self.expressions = defaultdict(list)
        self._auto_pivot = auto_pivot
        self._last_n = None
        self.no_sys_cols = no_sys_cols

    def __str__(self):
        return self._compose()

    def __repr__(self) -> str:
        return f"<{self.__class__.__name__}>:\n{self._compose()}"

    def _compose(self):
        """将所有表达式合并为一个表达式"""
        if not all(
            [
                "bucket" in self.expressions,
                "measurement" in self.expressions,
                "range" in self.expressions,
            ]
        ):
            raise AssertionError("bucket, measurement and range must be set")

        expr = [self.expressions[k] for k in ("bucket", "range", "measurement")]

        if self.expressions.get("tags"):
            expr.append(self.expressions["tags"])

        if self.expressions.get("fields"):
            expr.append(self.expressions["fields"])

        if "drop" not in self.expressions and self.no_sys_cols:
            self.drop_sys_cols()

        if self.expressions.get("drop"):
            expr.append(self.expressions["drop"])

        if self._auto_pivot and "pivot" not in self.expressions:
            self.pivot()

        if self.expressions.get("pivot"):
            expr.append(self.expressions["pivot"])

        if self.expressions.get("group"):
            expr.append(self.expressions["group"])

        if self.expressions.get("sort"):
            expr.append(self.expressions["sort"])

        if self.expressions.get("limit"):
            expr.append(self.expressions["limit"])

        # influxdb默认按升序排列,但last_n查询的结果则必然是降序的,所以还需要再次排序
        if self._last_n:
            expr.append(
                "\n".join(
                    [
                        f' |> top(n: {self._last_n}, columns: ["_time"])',
                        ' |> sort(columns: ["_time"], desc: false)',
                    ]
                )
            )

        return "\n".join(expr)

    def bucket(self, bucket: str) -> "Flux":
        """add bucket to query expression

        Raises:
            DuplicateOperationError: 一个查询中只允许指定一个source,如果表达式中已经指定了bucket,则抛出异常

        Returns:
            Flux对象
        """
        if "bucket" in self.expressions:
            raise DuplicateOperationError("bucket has been set")

        self.expressions["bucket"] = f'from(bucket: "{bucket}")'

        return self

    def measurement(self, measurement: str) -> "Flux":
        """add measurement filter to query

        Raises:
            DuplicateOperationError: 一次查询中只允许指定一个measurement, 如果表达式中已经存在measurement, 则抛出异常

        Returns:
            Flux对象自身,以便进行管道操作
        """
        if "measurement" in self.expressions:
            raise DuplicateOperationError("measurement has been set")

        self.expressions[
            "measurement"
        ] = f'  |> filter(fn: (r) => r["_measurement"] == "{measurement}")'

        return self

    def range(
        self, start: Frame, end: Frame, right_close=True, precision="s"
    ) -> "Flux":
        """添加时间范围过滤

        必须指定的查询条件,否则influxdb会报unbound查询错,因为这种情况下,返回的数据量将非常大。

        在格式化时间时,需要根据`precision`生成时间字符串。在向Influxdb发送请求时,应该注意查询参数中指定的时间精度与这里使用的保持一致。

        Influxdb的查询结果默认不包含结束时间,当`right_close`指定为True时,我们将根据指定的精度修改`end`时间,使之仅比`end`多一个时间单位,从而保证查询结果会包含`end`。

        Raises:
            DuplicateOperationError: 一个查询中只允许指定一次时间范围,如果range表达式已经存在,则抛出异常
        Args:
            start: 开始时间
            end: 结束时间
            right_close: 查询结果是否包含结束时间。
            precision: 时间精度,默认为秒。

        Returns:
            Flux对象,以支持管道操作
        """
        if "range" in self.expressions:
            raise DuplicateOperationError("range has been set")

        if precision not in ["s", "ms", "us"]:
            raise AssertionError("precision must be 's', 'ms' or 'us'")

        end = self.format_time(end, precision, right_close)
        start = self.format_time(start, precision)

        self.expressions["range"] = f"  |> range(start: {start}, stop: {end})"
        return self

    def limit(self, limit: int) -> "Flux":
        """添加返回记录数限制

        Raises:
            DuplicateOperationError: 一个查询中只允许指定一次limit,如果limit表达式已经存在,则抛出异常

        Args:
            limit: 返回记录数限制

        Returns:
            Flux对象,以便进行管道操作
        """
        if "limit" in self.expressions:
            raise DuplicateOperationError("limit has been set")

        self.expressions["limit"] = "  |> limit(n: %d)" % limit
        return self

    @classmethod
    def to_timestamp(cls, tm: Frame, precision: str = "s") -> int:
        """将时间根据精度转换为unix时间戳

        在往influxdb写入数据时,line-protocol要求的时间戳为unix timestamp,并且与其精度对应。

        influxdb始终使用UTC时间,因此,`tm`也必须已经转换成UTC时间。

        Args:
            tm: 时间
            precision: 时间精度,默认为秒。

        Returns:
            时间戳
        """
        if precision not in ["s", "ms", "us"]:
            raise AssertionError("precision must be 's', 'ms' or 'us'")

        # get int repr of tm, in seconds unit
        if isinstance(tm, np.datetime64):
            tm = tm.astype("datetime64[s]").astype("int")
        elif isinstance(tm, datetime.datetime):
            tm = tm.timestamp()
        else:
            tm = arrow.get(tm).timestamp()

        return int(tm * 10 ** ({"s": 0, "ms": 3, "us": 6}[precision]))

    @classmethod
    def format_time(cls, tm: Frame, precision: str = "s", shift_forward=False) -> str:
        """将时间转换成客户端对应的精度,并以 RFC3339 timestamps格式串(即influxdb要求的格式)返回。

        如果这个时间是作为查询的range中的结束时间使用时,由于influx查询的时间范围是左闭右开的,因此如果你需要查询的是一个闭区间,则需要将`end`的时间向前偏移一个精度。通过传入`shift_forward = True`可以完成这种转换。

        Examples:
            >>> # by default, the precision is seconds, and convert a date
            >>> Flux.format_time(datetime.date(2019, 1, 1))
            '2019-01-01T00:00:00Z'

            >>> # set precision to ms, convert a time
            >>> Flux.format_time(datetime.datetime(1978, 7, 8, 12, 34, 56, 123456), precision="ms")
            '1978-07-08T12:34:56.123Z'

            >>> # convert and forward shift
            >>> Flux.format_time(datetime.date(1978, 7, 8), shift_forward = True)
            '1978-07-08T00:00:01Z'

        Args:
            tm : 待格式化的时间
            precision: 时间精度,可选值为:'s', 'ms', 'us'
            shift_forward: 如果为True,则将end向前偏移一个精度

        Returns:
            调整后符合influx时间规范的时间(字符串表示)
        """
        timespec = {"s": "seconds", "ms": "milliseconds", "us": "microseconds"}.get(
            precision
        )

        if timespec is None:
            raise ValueError(
                f"precision must be one of 's', 'ms', 'us', but got {precision}"
            )

        tm = arrow.get(tm).naive

        if shift_forward:
            tm = tm + datetime.timedelta(**{timespec: 1})

        return tm.isoformat(sep="T", timespec=timespec) + "Z"

    def tags(self, tags: DefaultDict[str, List[str]]) -> "Flux":
        """给查询添加tags过滤条件

        此查询条件为过滤条件,并非必须。如果查询中没有指定tags,则会返回所有记录。

        在实现上,既可以使用`contains`语法,也可以使用`or`语法(由于一条记录只能属于一个tag,所以,当指定多个tag进行查询时,它们之间的关系应该为`or`)。经验证,contains语法会始终先将所有符合条件的记录检索出来,再进行过滤。这样的效率比较低,特别是当tags的数量较少时,会远远比使用or语法慢。

        Raises:
            DuplicateOperationError: 一个查询中只允许执行一次,如果tag filter表达式已经存在,则抛出异常

        Args:
            tags : tags是一个{tagname: Union[str,[tag_values]]}对象。

        Examples:
            >>> flux = Flux()
            >>> flux.tags({"code": ["000001", "000002"], "name": ["浦发银行"]}).expressions["tags"]
            '  |> filter(fn: (r) => r["code"] == "000001" or r["code"] == "000002" or r["name"] == "浦发银行")'


        Returns:
            Flux对象,以便进行管道操作
        """
        if "tags" in self.expressions:
            raise DuplicateOperationError("tags has been set")

        filters = []
        for tag, values in tags.items():
            assert (
                isinstance(values, str) or len(values) > 0
            ), f"tag {tag} should not be empty or None"
            if isinstance(values, str):
                values = [values]

            for v in values:
                filters.append(f'r["{tag}"] == "{v}"')

        op_expression = " or ".join(filters)

        self.expressions["tags"] = f"  |> filter(fn: (r) => {op_expression})"

        return self

    def fields(self, fields: List, reserve_time_stamp: bool = True) -> "Flux":
        """给查询添加field过滤条件

        此查询条件为过滤条件,用以指定哪些field会出现在查询结果中,并非必须。如果查询中没有指定tags,则会返回所有记录。

        由于一条记录只能属于一个_field,所以,当指定多个_field进行查询时,它们之间的关系应该为`or`。

        Raises:
            DuplicateOperationError: 一个查询中只允许执行一次,如果filed filter表达式已经存在,则抛出异常
        Args:
            fields: 待查询的field列表
            reserve_time_stamp: 是否保留时间戳`_time`,默认为True

        Returns:
            Flux对象,以便进行管道操作
        """
        if "fields" in self.expressions:
            raise DuplicateOperationError("fields has been set")

        self._cols = fields.copy()

        if reserve_time_stamp and "_time" not in self._cols:
            self._cols.append("_time")

        self._cols = sorted(self._cols)

        filters = [f'r["_field"] == "{name}"' for name in self._cols]

        self.expressions["fields"] = f"  |> filter(fn: (r) => {' or '.join(filters)})"

        return self

    def pivot(
        self,
        row_keys: List[str] = ["_time"],
        column_keys=["_field"],
        value_column: str = "_value",
    ) -> "Flux":
        """pivot用来将以列为单位的数据转换为以行为单位的数据

        Flux查询返回的结果通常都是以列为单位的数据,增加本pivot条件后,结果将被转换成为以行为单位的数据再返回。

        这里实现的是measurement内的转换,请参考 [pivot](https://docs.influxdata.com/flux/v0.x/stdlib/universe/pivot/#align-fields-within-each-measurement-that-have-the-same-timestamp)


        Args:
            row_keys: 惟一确定输出中一行数据的列名字, 默认为["_time"]
            column_keys: 列名称列表,默认为["_field"]
            value_column: 值列名,默认为"_value"

        Returns:
            Flux对象,以便进行管道操作
        """
        if "pivot" in self.expressions:
            raise DuplicateOperationError("pivot has been set")

        columns = ",".join([f'"{name}"' for name in column_keys])
        rowkeys = ",".join([f'"{name}"' for name in row_keys])

        self.expressions[
            "pivot"
        ] = f'  |> pivot(columnKey: [{columns}], rowKey: [{rowkeys}], valueColumn: "{value_column}")'

        return self

    def sort(self, by: List[str] = None, desc: bool = False) -> "Flux":
        """按照指定的列进行排序

        根据[influxdb doc](https://docs.influxdata.com/influxdb/v2.0/query-data/flux/first-last/), 查询返回值默认地按时间排序。因此,如果仅仅是要求查询结果按时间排序,无须调用此API,但是,此API提供了按其它字段排序的能力。

        另外,在一个有5000多个tag,共返回1M条记录的测试中,测试验证返回记录确实按_time升序排列。

        Args:
            by: 指定排序的列名称列表

        Returns:
            Flux对象,以便进行管道操作
        """
        if "sort" in self.expressions:
            raise DuplicateOperationError("sort has been set")

        if by is None:
            by = ["_value"]
        if isinstance(by, str):
            by = [by]

        columns_ = ",".join([f'"{name}"' for name in by])

        desc = "true" if desc else "false"
        self.expressions["sort"] = f"  |> sort(columns: [{columns_}], desc: {desc})"

        return self

    def group(self, by: Tuple[str]) -> "Flux":
        """[summary]

        Returns:
            [description]
        """
        if "group" in self.expressions:
            raise DuplicateOperationError("group has been set")

        if isinstance(by, str):
            by = [by]
        cols = ",".join([f'"{col}"' for col in by])
        self.expressions["group"] = f"  |> group(columns: [{cols}])"

        return self

    def latest(self, n: int) -> "Flux":
        """获取最后n条数据,按时间增序返回

        Flux查询的增强功能,相当于top + sort + limit

        Args:
            n: 最后n条数据

        Returns:
            Flux对象,以便进行管道操作
        """
        assert "top" not in self.expressions, "top and last_n can not be used together"
        assert (
            "sort" not in self.expressions
        ), "sort and last_n can not be used together"
        assert (
            "limit" not in self.expressions
        ), "limit and last_n can not be used together"

        self._last_n = n

        return self

    @property
    def cols(self) -> List[str]:
        """the columns or the return records

        the implementation is buggy. Influx doesn't tell us in which order these columns are.


        Returns:
            the columns name of the return records
        """
        # fixme: if keep in expression, then return group key + tag key + value key
        # if keep not in expression, then stream, table, _time, ...
        return sorted(self._cols)

    def delete(
        self,
        measurement: str,
        stop: datetime.datetime,
        tags: dict = {},
        start: datetime.datetime = None,
        precision: str = "s",
    ) -> dict:
        """构建删除语句。

        according to [delete-predicate](https://docs.influxdata.com/influxdb/v2.1/reference/syntax/delete-predicate/), delete只支持AND逻辑操作,只支持“=”操作,不支持“!=”操作,可以使用任何字段或者tag,但不包括_time和_value字段。

        由于influxdb这一段文档不是很清楚,根据试验结果,目前仅支持按时间范围和tags进行删除较好。如果某个column的值类型是字符串,则也可以通过`tags`参数传入,匹配后删除。但如果传入了非字符串类型的column,则将得到无法预料的结果。

        Args:
            measurement : [description]
            stop : [description]
            tags : 按tags和匹配的值进行删除。传入的tags中,key为tag名称,value为tag要匹配的取值,可以为str或者List[str]。
            start : 起始时间。如果省略,则使用EPOCH_START.
            precision : 时间精度。可以为“s”,“ms”,“us”
        Returns:
            删除语句
        """
        timespec = {"s": "seconds", "ms": "milliseconds", "us": "microseconds"}.get(
            precision
        )

        if start is None:
            start = self.EPOCH_START.isoformat(timespec=timespec) + "Z"

        predicate = [f'_measurement="{measurement}"']
        for key, value in tags.items():
            if isinstance(value, list):
                predicate.extend([f'{key} = "{v}"' for v in value])
            else:
                predicate.append(f'{key} = "{value}"')

        command = {
            "start": start,
            "stop": f"{stop.isoformat(timespec=timespec)}Z",
            "predicate": " AND ".join(predicate),
        }

        return command

    def drop(self, cols: List[str]) -> "Flux":
        """use this to drop columns before return result

        Args:
            cols : the name of columns to be dropped

        Returns:
            Flux object, to support pipe operation
        """
        if "drop" in self.expressions:
            raise DuplicateOperationError("drop operation has been set already")

        # add surrounding quotes
        _cols = [f'"{c}"' for c in cols]
        self.expressions["drop"] = f"  |> drop(columns: [{','.join(_cols)}])"

        return self

    def drop_sys_cols(self, cols: List[str] = None) -> "Flux":
        """use this to drop ["_start", "_stop", "_measurement"], plus columns specified in `cols`, before return query result

        please be noticed, after drop sys columns, there's still two sys columns left, which is "_time" and "table", and "_time" should usually be kept, "table" is one we're not able to removed. If you don't like _time in return result, you can specify it in `cols` parameter.

        Args:
            cols : the extra columns to be dropped

        Returns:
            Flux query object
        """
        _cols = ["_start", "_stop", "_measurement"]
        if cols is not None:
            _cols.extend(cols)

        return self.drop(_cols)

cols: List[str] property readonly

the columns or the return records

the implementation is buggy. Influx doesn't tell us in which order these columns are.

Returns:

Type Description
List[str]

the columns name of the return records

__init__(self, auto_pivot=True, no_sys_cols=True) special

初始化Flux对象

Parameters:

Name Type Description Default
auto_pivot

是否自动将查询列字段组装成行. Defaults to True.

True
no_sys_cols

是否自动将系统字段删除. Defaults to True.请参考drop_sys_cols

True
Source code in omicron/dal/influx/flux.py
def __init__(self, auto_pivot=True, no_sys_cols=True):
    """初始化Flux对象

    Args:
        auto_pivot : 是否自动将查询列字段组装成行. Defaults to True.
        no_sys_cols: 是否自动将系统字段删除. Defaults to True.请参考[drop_sys_cols][omicron.dal.influx.flux.Flux.drop_sys_cols]
    """
    self._cols = None
    self.expressions = defaultdict(list)
    self._auto_pivot = auto_pivot
    self._last_n = None
    self.no_sys_cols = no_sys_cols

bucket(self, bucket)

add bucket to query expression

Exceptions:

Type Description
DuplicateOperationError

一个查询中只允许指定一个source,如果表达式中已经指定了bucket,则抛出异常

Returns:

Type Description
Flux

Flux对象

Source code in omicron/dal/influx/flux.py
def bucket(self, bucket: str) -> "Flux":
    """add bucket to query expression

    Raises:
        DuplicateOperationError: 一个查询中只允许指定一个source,如果表达式中已经指定了bucket,则抛出异常

    Returns:
        Flux对象
    """
    if "bucket" in self.expressions:
        raise DuplicateOperationError("bucket has been set")

    self.expressions["bucket"] = f'from(bucket: "{bucket}")'

    return self

delete(self, measurement, stop, tags={}, start=None, precision='s')

构建删除语句。

according to delete-predicate, delete只支持AND逻辑操作,只支持“=”操作,不支持“!=”操作,可以使用任何字段或者tag,但不包括_time和_value字段。

由于influxdb这一段文档不是很清楚,根据试验结果,目前仅支持按时间范围和tags进行删除较好。如果某个column的值类型是字符串,则也可以通过tags参数传入,匹配后删除。但如果传入了非字符串类型的column,则将得到无法预料的结果。

Parameters:

Name Type Description Default
measurement

[description]

required
stop

[description]

required
tags

按tags和匹配的值进行删除。传入的tags中,key为tag名称,value为tag要匹配的取值,可以为str或者List[str]。

{}
start

起始时间。如果省略,则使用EPOCH_START.

None
precision

时间精度。可以为“s”,“ms”,“us”

's'

Returns:

Type Description
dict

删除语句

Source code in omicron/dal/influx/flux.py
def delete(
    self,
    measurement: str,
    stop: datetime.datetime,
    tags: dict = {},
    start: datetime.datetime = None,
    precision: str = "s",
) -> dict:
    """构建删除语句。

    according to [delete-predicate](https://docs.influxdata.com/influxdb/v2.1/reference/syntax/delete-predicate/), delete只支持AND逻辑操作,只支持“=”操作,不支持“!=”操作,可以使用任何字段或者tag,但不包括_time和_value字段。

    由于influxdb这一段文档不是很清楚,根据试验结果,目前仅支持按时间范围和tags进行删除较好。如果某个column的值类型是字符串,则也可以通过`tags`参数传入,匹配后删除。但如果传入了非字符串类型的column,则将得到无法预料的结果。

    Args:
        measurement : [description]
        stop : [description]
        tags : 按tags和匹配的值进行删除。传入的tags中,key为tag名称,value为tag要匹配的取值,可以为str或者List[str]。
        start : 起始时间。如果省略,则使用EPOCH_START.
        precision : 时间精度。可以为“s”,“ms”,“us”
    Returns:
        删除语句
    """
    timespec = {"s": "seconds", "ms": "milliseconds", "us": "microseconds"}.get(
        precision
    )

    if start is None:
        start = self.EPOCH_START.isoformat(timespec=timespec) + "Z"

    predicate = [f'_measurement="{measurement}"']
    for key, value in tags.items():
        if isinstance(value, list):
            predicate.extend([f'{key} = "{v}"' for v in value])
        else:
            predicate.append(f'{key} = "{value}"')

    command = {
        "start": start,
        "stop": f"{stop.isoformat(timespec=timespec)}Z",
        "predicate": " AND ".join(predicate),
    }

    return command

drop(self, cols)

use this to drop columns before return result

Parameters:

Name Type Description Default
cols

the name of columns to be dropped

required

Returns:

Type Description
Flux

Flux object, to support pipe operation

Source code in omicron/dal/influx/flux.py
def drop(self, cols: List[str]) -> "Flux":
    """use this to drop columns before return result

    Args:
        cols : the name of columns to be dropped

    Returns:
        Flux object, to support pipe operation
    """
    if "drop" in self.expressions:
        raise DuplicateOperationError("drop operation has been set already")

    # add surrounding quotes
    _cols = [f'"{c}"' for c in cols]
    self.expressions["drop"] = f"  |> drop(columns: [{','.join(_cols)}])"

    return self

drop_sys_cols(self, cols=None)

use this to drop ["_start", "_stop", "_measurement"], plus columns specified in cols, before return query result

please be noticed, after drop sys columns, there's still two sys columns left, which is "_time" and "table", and "_time" should usually be kept, "table" is one we're not able to removed. If you don't like _time in return result, you can specify it in cols parameter.

Parameters:

Name Type Description Default
cols

the extra columns to be dropped

None

Returns:

Type Description
Flux

Flux query object

Source code in omicron/dal/influx/flux.py
def drop_sys_cols(self, cols: List[str] = None) -> "Flux":
    """use this to drop ["_start", "_stop", "_measurement"], plus columns specified in `cols`, before return query result

    please be noticed, after drop sys columns, there's still two sys columns left, which is "_time" and "table", and "_time" should usually be kept, "table" is one we're not able to removed. If you don't like _time in return result, you can specify it in `cols` parameter.

    Args:
        cols : the extra columns to be dropped

    Returns:
        Flux query object
    """
    _cols = ["_start", "_stop", "_measurement"]
    if cols is not None:
        _cols.extend(cols)

    return self.drop(_cols)

fields(self, fields, reserve_time_stamp=True)

给查询添加field过滤条件

此查询条件为过滤条件,用以指定哪些field会出现在查询结果中,并非必须。如果查询中没有指定tags,则会返回所有记录。

由于一条记录只能属于一个_field,所以,当指定多个_field进行查询时,它们之间的关系应该为or

Exceptions:

Type Description
DuplicateOperationError

一个查询中只允许执行一次,如果filed filter表达式已经存在,则抛出异常

Parameters:

Name Type Description Default
fields List

待查询的field列表

required
reserve_time_stamp bool

是否保留时间戳_time,默认为True

True

Returns:

Type Description
Flux

Flux对象,以便进行管道操作

Source code in omicron/dal/influx/flux.py
def fields(self, fields: List, reserve_time_stamp: bool = True) -> "Flux":
    """给查询添加field过滤条件

    此查询条件为过滤条件,用以指定哪些field会出现在查询结果中,并非必须。如果查询中没有指定tags,则会返回所有记录。

    由于一条记录只能属于一个_field,所以,当指定多个_field进行查询时,它们之间的关系应该为`or`。

    Raises:
        DuplicateOperationError: 一个查询中只允许执行一次,如果filed filter表达式已经存在,则抛出异常
    Args:
        fields: 待查询的field列表
        reserve_time_stamp: 是否保留时间戳`_time`,默认为True

    Returns:
        Flux对象,以便进行管道操作
    """
    if "fields" in self.expressions:
        raise DuplicateOperationError("fields has been set")

    self._cols = fields.copy()

    if reserve_time_stamp and "_time" not in self._cols:
        self._cols.append("_time")

    self._cols = sorted(self._cols)

    filters = [f'r["_field"] == "{name}"' for name in self._cols]

    self.expressions["fields"] = f"  |> filter(fn: (r) => {' or '.join(filters)})"

    return self

format_time(tm, precision='s', shift_forward=False) classmethod

将时间转换成客户端对应的精度,并以 RFC3339 timestamps格式串(即influxdb要求的格式)返回。

如果这个时间是作为查询的range中的结束时间使用时,由于influx查询的时间范围是左闭右开的,因此如果你需要查询的是一个闭区间,则需要将end的时间向前偏移一个精度。通过传入shift_forward = True可以完成这种转换。

Examples:

>>> # by default, the precision is seconds, and convert a date
>>> Flux.format_time(datetime.date(2019, 1, 1))
'2019-01-01T00:00:00Z'
>>> # set precision to ms, convert a time
>>> Flux.format_time(datetime.datetime(1978, 7, 8, 12, 34, 56, 123456), precision="ms")
'1978-07-08T12:34:56.123Z'
>>> # convert and forward shift
>>> Flux.format_time(datetime.date(1978, 7, 8), shift_forward = True)
'1978-07-08T00:00:01Z'

Parameters:

Name Type Description Default
tm

待格式化的时间

required
precision str

时间精度,可选值为:'s', 'ms', 'us'

's'
shift_forward

如果为True,则将end向前偏移一个精度

False

Returns:

Type Description
str

调整后符合influx时间规范的时间(字符串表示)

Source code in omicron/dal/influx/flux.py
@classmethod
def format_time(cls, tm: Frame, precision: str = "s", shift_forward=False) -> str:
    """将时间转换成客户端对应的精度,并以 RFC3339 timestamps格式串(即influxdb要求的格式)返回。

    如果这个时间是作为查询的range中的结束时间使用时,由于influx查询的时间范围是左闭右开的,因此如果你需要查询的是一个闭区间,则需要将`end`的时间向前偏移一个精度。通过传入`shift_forward = True`可以完成这种转换。

    Examples:
        >>> # by default, the precision is seconds, and convert a date
        >>> Flux.format_time(datetime.date(2019, 1, 1))
        '2019-01-01T00:00:00Z'

        >>> # set precision to ms, convert a time
        >>> Flux.format_time(datetime.datetime(1978, 7, 8, 12, 34, 56, 123456), precision="ms")
        '1978-07-08T12:34:56.123Z'

        >>> # convert and forward shift
        >>> Flux.format_time(datetime.date(1978, 7, 8), shift_forward = True)
        '1978-07-08T00:00:01Z'

    Args:
        tm : 待格式化的时间
        precision: 时间精度,可选值为:'s', 'ms', 'us'
        shift_forward: 如果为True,则将end向前偏移一个精度

    Returns:
        调整后符合influx时间规范的时间(字符串表示)
    """
    timespec = {"s": "seconds", "ms": "milliseconds", "us": "microseconds"}.get(
        precision
    )

    if timespec is None:
        raise ValueError(
            f"precision must be one of 's', 'ms', 'us', but got {precision}"
        )

    tm = arrow.get(tm).naive

    if shift_forward:
        tm = tm + datetime.timedelta(**{timespec: 1})

    return tm.isoformat(sep="T", timespec=timespec) + "Z"

group(self, by)

[summary]

Returns:

Type Description
Flux

[description]

Source code in omicron/dal/influx/flux.py
def group(self, by: Tuple[str]) -> "Flux":
    """[summary]

    Returns:
        [description]
    """
    if "group" in self.expressions:
        raise DuplicateOperationError("group has been set")

    if isinstance(by, str):
        by = [by]
    cols = ",".join([f'"{col}"' for col in by])
    self.expressions["group"] = f"  |> group(columns: [{cols}])"

    return self

latest(self, n)

获取最后n条数据,按时间增序返回

Flux查询的增强功能,相当于top + sort + limit

Parameters:

Name Type Description Default
n int

最后n条数据

required

Returns:

Type Description
Flux

Flux对象,以便进行管道操作

Source code in omicron/dal/influx/flux.py
def latest(self, n: int) -> "Flux":
    """获取最后n条数据,按时间增序返回

    Flux查询的增强功能,相当于top + sort + limit

    Args:
        n: 最后n条数据

    Returns:
        Flux对象,以便进行管道操作
    """
    assert "top" not in self.expressions, "top and last_n can not be used together"
    assert (
        "sort" not in self.expressions
    ), "sort and last_n can not be used together"
    assert (
        "limit" not in self.expressions
    ), "limit and last_n can not be used together"

    self._last_n = n

    return self

limit(self, limit)

添加返回记录数限制

Exceptions:

Type Description
DuplicateOperationError

一个查询中只允许指定一次limit,如果limit表达式已经存在,则抛出异常

Parameters:

Name Type Description Default
limit int

返回记录数限制

required

Returns:

Type Description
Flux

Flux对象,以便进行管道操作

Source code in omicron/dal/influx/flux.py
def limit(self, limit: int) -> "Flux":
    """添加返回记录数限制

    Raises:
        DuplicateOperationError: 一个查询中只允许指定一次limit,如果limit表达式已经存在,则抛出异常

    Args:
        limit: 返回记录数限制

    Returns:
        Flux对象,以便进行管道操作
    """
    if "limit" in self.expressions:
        raise DuplicateOperationError("limit has been set")

    self.expressions["limit"] = "  |> limit(n: %d)" % limit
    return self

measurement(self, measurement)

add measurement filter to query

Exceptions:

Type Description
DuplicateOperationError

一次查询中只允许指定一个measurement, 如果表达式中已经存在measurement, 则抛出异常

Returns:

Type Description
Flux

Flux对象自身,以便进行管道操作

Source code in omicron/dal/influx/flux.py
def measurement(self, measurement: str) -> "Flux":
    """add measurement filter to query

    Raises:
        DuplicateOperationError: 一次查询中只允许指定一个measurement, 如果表达式中已经存在measurement, 则抛出异常

    Returns:
        Flux对象自身,以便进行管道操作
    """
    if "measurement" in self.expressions:
        raise DuplicateOperationError("measurement has been set")

    self.expressions[
        "measurement"
    ] = f'  |> filter(fn: (r) => r["_measurement"] == "{measurement}")'

    return self

pivot(self, row_keys=['_time'], column_keys=['_field'], value_column='_value')

pivot用来将以列为单位的数据转换为以行为单位的数据

Flux查询返回的结果通常都是以列为单位的数据,增加本pivot条件后,结果将被转换成为以行为单位的数据再返回。

这里实现的是measurement内的转换,请参考 pivot

Parameters:

Name Type Description Default
row_keys List[str]

惟一确定输出中一行数据的列名字, 默认为["_time"]

['_time']
column_keys

列名称列表,默认为["_field"]

['_field']
value_column str

值列名,默认为"_value"

'_value'

Returns:

Type Description
Flux

Flux对象,以便进行管道操作

Source code in omicron/dal/influx/flux.py
def pivot(
    self,
    row_keys: List[str] = ["_time"],
    column_keys=["_field"],
    value_column: str = "_value",
) -> "Flux":
    """pivot用来将以列为单位的数据转换为以行为单位的数据

    Flux查询返回的结果通常都是以列为单位的数据,增加本pivot条件后,结果将被转换成为以行为单位的数据再返回。

    这里实现的是measurement内的转换,请参考 [pivot](https://docs.influxdata.com/flux/v0.x/stdlib/universe/pivot/#align-fields-within-each-measurement-that-have-the-same-timestamp)


    Args:
        row_keys: 惟一确定输出中一行数据的列名字, 默认为["_time"]
        column_keys: 列名称列表,默认为["_field"]
        value_column: 值列名,默认为"_value"

    Returns:
        Flux对象,以便进行管道操作
    """
    if "pivot" in self.expressions:
        raise DuplicateOperationError("pivot has been set")

    columns = ",".join([f'"{name}"' for name in column_keys])
    rowkeys = ",".join([f'"{name}"' for name in row_keys])

    self.expressions[
        "pivot"
    ] = f'  |> pivot(columnKey: [{columns}], rowKey: [{rowkeys}], valueColumn: "{value_column}")'

    return self

range(self, start, end, right_close=True, precision='s')

添加时间范围过滤

必须指定的查询条件,否则influxdb会报unbound查询错,因为这种情况下,返回的数据量将非常大。

在格式化时间时,需要根据precision生成时间字符串。在向Influxdb发送请求时,应该注意查询参数中指定的时间精度与这里使用的保持一致。

Influxdb的查询结果默认不包含结束时间,当right_close指定为True时,我们将根据指定的精度修改end时间,使之仅比end多一个时间单位,从而保证查询结果会包含end

Exceptions:

Type Description
DuplicateOperationError

一个查询中只允许指定一次时间范围,如果range表达式已经存在,则抛出异常

Parameters:

Name Type Description Default
start Union[datetime.date, datetime.datetime]

开始时间

required
end Union[datetime.date, datetime.datetime]

结束时间

required
right_close

查询结果是否包含结束时间。

True
precision

时间精度,默认为秒。

's'

Returns:

Type Description
Flux

Flux对象,以支持管道操作

Source code in omicron/dal/influx/flux.py
def range(
    self, start: Frame, end: Frame, right_close=True, precision="s"
) -> "Flux":
    """添加时间范围过滤

    必须指定的查询条件,否则influxdb会报unbound查询错,因为这种情况下,返回的数据量将非常大。

    在格式化时间时,需要根据`precision`生成时间字符串。在向Influxdb发送请求时,应该注意查询参数中指定的时间精度与这里使用的保持一致。

    Influxdb的查询结果默认不包含结束时间,当`right_close`指定为True时,我们将根据指定的精度修改`end`时间,使之仅比`end`多一个时间单位,从而保证查询结果会包含`end`。

    Raises:
        DuplicateOperationError: 一个查询中只允许指定一次时间范围,如果range表达式已经存在,则抛出异常
    Args:
        start: 开始时间
        end: 结束时间
        right_close: 查询结果是否包含结束时间。
        precision: 时间精度,默认为秒。

    Returns:
        Flux对象,以支持管道操作
    """
    if "range" in self.expressions:
        raise DuplicateOperationError("range has been set")

    if precision not in ["s", "ms", "us"]:
        raise AssertionError("precision must be 's', 'ms' or 'us'")

    end = self.format_time(end, precision, right_close)
    start = self.format_time(start, precision)

    self.expressions["range"] = f"  |> range(start: {start}, stop: {end})"
    return self

sort(self, by=None, desc=False)

按照指定的列进行排序

根据influxdb doc, 查询返回值默认地按时间排序。因此,如果仅仅是要求查询结果按时间排序,无须调用此API,但是,此API提供了按其它字段排序的能力。

另外,在一个有5000多个tag,共返回1M条记录的测试中,测试验证返回记录确实按_time升序排列。

Parameters:

Name Type Description Default
by List[str]

指定排序的列名称列表

None

Returns:

Type Description
Flux

Flux对象,以便进行管道操作

Source code in omicron/dal/influx/flux.py
def sort(self, by: List[str] = None, desc: bool = False) -> "Flux":
    """按照指定的列进行排序

    根据[influxdb doc](https://docs.influxdata.com/influxdb/v2.0/query-data/flux/first-last/), 查询返回值默认地按时间排序。因此,如果仅仅是要求查询结果按时间排序,无须调用此API,但是,此API提供了按其它字段排序的能力。

    另外,在一个有5000多个tag,共返回1M条记录的测试中,测试验证返回记录确实按_time升序排列。

    Args:
        by: 指定排序的列名称列表

    Returns:
        Flux对象,以便进行管道操作
    """
    if "sort" in self.expressions:
        raise DuplicateOperationError("sort has been set")

    if by is None:
        by = ["_value"]
    if isinstance(by, str):
        by = [by]

    columns_ = ",".join([f'"{name}"' for name in by])

    desc = "true" if desc else "false"
    self.expressions["sort"] = f"  |> sort(columns: [{columns_}], desc: {desc})"

    return self

tags(self, tags)

给查询添加tags过滤条件

此查询条件为过滤条件,并非必须。如果查询中没有指定tags,则会返回所有记录。

在实现上,既可以使用contains语法,也可以使用or语法(由于一条记录只能属于一个tag,所以,当指定多个tag进行查询时,它们之间的关系应该为or)。经验证,contains语法会始终先将所有符合条件的记录检索出来,再进行过滤。这样的效率比较低,特别是当tags的数量较少时,会远远比使用or语法慢。

Exceptions:

Type Description
DuplicateOperationError

一个查询中只允许执行一次,如果tag filter表达式已经存在,则抛出异常

Parameters:

Name Type Description Default
tags

tags是一个{tagname: Union[str,[tag_values]]}对象。

required

Examples:

>>> flux = Flux()
>>> flux.tags({"code": ["000001", "000002"], "name": ["浦发银行"]}).expressions["tags"]
'  |> filter(fn: (r) => r["code"] == "000001" or r["code"] == "000002" or r["name"] == "浦发银行")'

Returns:

Type Description
Flux

Flux对象,以便进行管道操作

Source code in omicron/dal/influx/flux.py
def tags(self, tags: DefaultDict[str, List[str]]) -> "Flux":
    """给查询添加tags过滤条件

    此查询条件为过滤条件,并非必须。如果查询中没有指定tags,则会返回所有记录。

    在实现上,既可以使用`contains`语法,也可以使用`or`语法(由于一条记录只能属于一个tag,所以,当指定多个tag进行查询时,它们之间的关系应该为`or`)。经验证,contains语法会始终先将所有符合条件的记录检索出来,再进行过滤。这样的效率比较低,特别是当tags的数量较少时,会远远比使用or语法慢。

    Raises:
        DuplicateOperationError: 一个查询中只允许执行一次,如果tag filter表达式已经存在,则抛出异常

    Args:
        tags : tags是一个{tagname: Union[str,[tag_values]]}对象。

    Examples:
        >>> flux = Flux()
        >>> flux.tags({"code": ["000001", "000002"], "name": ["浦发银行"]}).expressions["tags"]
        '  |> filter(fn: (r) => r["code"] == "000001" or r["code"] == "000002" or r["name"] == "浦发银行")'


    Returns:
        Flux对象,以便进行管道操作
    """
    if "tags" in self.expressions:
        raise DuplicateOperationError("tags has been set")

    filters = []
    for tag, values in tags.items():
        assert (
            isinstance(values, str) or len(values) > 0
        ), f"tag {tag} should not be empty or None"
        if isinstance(values, str):
            values = [values]

        for v in values:
            filters.append(f'r["{tag}"] == "{v}"')

    op_expression = " or ".join(filters)

    self.expressions["tags"] = f"  |> filter(fn: (r) => {op_expression})"

    return self

to_timestamp(tm, precision='s') classmethod

将时间根据精度转换为unix时间戳

在往influxdb写入数据时,line-protocol要求的时间戳为unix timestamp,并且与其精度对应。

influxdb始终使用UTC时间,因此,tm也必须已经转换成UTC时间。

Parameters:

Name Type Description Default
tm Union[datetime.date, datetime.datetime]

时间

required
precision str

时间精度,默认为秒。

's'

Returns:

Type Description
int

时间戳

Source code in omicron/dal/influx/flux.py
@classmethod
def to_timestamp(cls, tm: Frame, precision: str = "s") -> int:
    """将时间根据精度转换为unix时间戳

    在往influxdb写入数据时,line-protocol要求的时间戳为unix timestamp,并且与其精度对应。

    influxdb始终使用UTC时间,因此,`tm`也必须已经转换成UTC时间。

    Args:
        tm: 时间
        precision: 时间精度,默认为秒。

    Returns:
        时间戳
    """
    if precision not in ["s", "ms", "us"]:
        raise AssertionError("precision must be 's', 'ms' or 'us'")

    # get int repr of tm, in seconds unit
    if isinstance(tm, np.datetime64):
        tm = tm.astype("datetime64[s]").astype("int")
    elif isinstance(tm, datetime.datetime):
        tm = tm.timestamp()
    else:
        tm = arrow.get(tm).timestamp()

    return int(tm * 10 ** ({"s": 0, "ms": 3, "us": 6}[precision]))