Flux
Flux - the query language builder for influxdb¶
Helper functions for building flux query expression
Source code in omicron/dal/influx/flux.py
          class Flux(object):
    """Helper functions for building flux query expression"""
    EPOCH_START = datetime.datetime(1970, 1, 1, 0, 0, 0)
    def __init__(self, auto_pivot=True, no_sys_cols=True):
        """初始化Flux对象
        Args:
            auto_pivot : 是否自动将查询列字段组装成行. Defaults to True.
            no_sys_cols: 是否自动将系统字段删除. Defaults to True.请参考[drop_sys_cols][omicron.dal.influx.flux.Flux.drop_sys_cols]
        """
        self._cols = None
        self.expressions = defaultdict(list)
        self._auto_pivot = auto_pivot
        self._last_n = None
        self.no_sys_cols = no_sys_cols
    def __str__(self):
        return self._compose()
    def __repr__(self) -> str:
        return f"<{self.__class__.__name__}>:\n{self._compose()}"
    def _compose(self):
        """将所有表达式合并为一个表达式"""
        if not all(
            [
                "bucket" in self.expressions,
                "measurement" in self.expressions,
                "range" in self.expressions,
            ]
        ):
            raise AssertionError("bucket, measurement and range must be set")
        expr = [self.expressions[k] for k in ("bucket", "range", "measurement")]
        if self.expressions.get("tags"):
            expr.append(self.expressions["tags"])
        if self.expressions.get("fields"):
            expr.append(self.expressions["fields"])
        if "drop" not in self.expressions and self.no_sys_cols:
            self.drop_sys_cols()
        if self.expressions.get("drop"):
            expr.append(self.expressions["drop"])
        if self._auto_pivot and "pivot" not in self.expressions:
            self.pivot()
        if self.expressions.get("pivot"):
            expr.append(self.expressions["pivot"])
        if self.expressions.get("group"):
            expr.append(self.expressions["group"])
        if self.expressions.get("sort"):
            expr.append(self.expressions["sort"])
        if self.expressions.get("limit"):
            expr.append(self.expressions["limit"])
        # influxdb默认按升序排列,但last_n查询的结果则必然是降序的,所以还需要再次排序
        if self._last_n:
            expr.append(
                "\n".join(
                    [
                        f' |> top(n: {self._last_n}, columns: ["_time"])',
                        ' |> sort(columns: ["_time"], desc: false)',
                    ]
                )
            )
        return "\n".join(expr)
    def bucket(self, bucket: str) -> "Flux":
        """add bucket to query expression
        Raises:
            DuplicateOperationError: 一个查询中只允许指定一个source,如果表达式中已经指定了bucket,则抛出异常
        Returns:
            Flux对象
        """
        if "bucket" in self.expressions:
            raise DuplicateOperationError("bucket has been set")
        self.expressions["bucket"] = f'from(bucket: "{bucket}")'
        return self
    def measurement(self, measurement: str) -> "Flux":
        """add measurement filter to query
        Raises:
            DuplicateOperationError: 一次查询中只允许指定一个measurement, 如果表达式中已经存在measurement, 则抛出异常
        Returns:
            Flux对象自身,以便进行管道操作
        """
        if "measurement" in self.expressions:
            raise DuplicateOperationError("measurement has been set")
        self.expressions[
            "measurement"
        ] = f'  |> filter(fn: (r) => r["_measurement"] == "{measurement}")'
        return self
    def range(
        self, start: Frame, end: Frame, right_close=True, precision="s"
    ) -> "Flux":
        """添加时间范围过滤
        必须指定的查询条件,否则influxdb会报unbound查询错,因为这种情况下,返回的数据量将非常大。
        在格式化时间时,需要根据`precision`生成时间字符串。在向Influxdb发送请求时,应该注意查询参数中指定的时间精度与这里使用的保持一致。
        Influxdb的查询结果默认不包含结束时间,当`right_close`指定为True时,我们将根据指定的精度修改`end`时间,使之仅比`end`多一个时间单位,从而保证查询结果会包含`end`。
        Raises:
            DuplicateOperationError: 一个查询中只允许指定一次时间范围,如果range表达式已经存在,则抛出异常
        Args:
            start: 开始时间
            end: 结束时间
            right_close: 查询结果是否包含结束时间。
            precision: 时间精度,默认为秒。
        Returns:
            Flux对象,以支持管道操作
        """
        if "range" in self.expressions:
            raise DuplicateOperationError("range has been set")
        if precision not in ["s", "ms", "us"]:
            raise AssertionError("precision must be 's', 'ms' or 'us'")
        end = self.format_time(end, precision, right_close)
        start = self.format_time(start, precision)
        self.expressions["range"] = f"  |> range(start: {start}, stop: {end})"
        return self
    def limit(self, limit: int) -> "Flux":
        """添加返回记录数限制
        Raises:
            DuplicateOperationError: 一个查询中只允许指定一次limit,如果limit表达式已经存在,则抛出异常
        Args:
            limit: 返回记录数限制
        Returns:
            Flux对象,以便进行管道操作
        """
        if "limit" in self.expressions:
            raise DuplicateOperationError("limit has been set")
        self.expressions["limit"] = "  |> limit(n: %d)" % limit
        return self
    @classmethod
    def to_timestamp(cls, tm: Frame, precision: str = "s") -> int:
        """将时间根据精度转换为unix时间戳
        在往influxdb写入数据时,line-protocol要求的时间戳为unix timestamp,并且与其精度对应。
        influxdb始终使用UTC时间,因此,`tm`也必须已经转换成UTC时间。
        Args:
            tm: 时间
            precision: 时间精度,默认为秒。
        Returns:
            时间戳
        """
        if precision not in ["s", "ms", "us"]:
            raise AssertionError("precision must be 's', 'ms' or 'us'")
        # get int repr of tm, in seconds unit
        if isinstance(tm, np.datetime64):
            tm = tm.astype("datetime64[s]").astype("int")
        elif isinstance(tm, datetime.datetime):
            tm = tm.timestamp()
        else:
            tm = arrow.get(tm).timestamp()
        return int(tm * 10 ** ({"s": 0, "ms": 3, "us": 6}[precision]))
    @classmethod
    def format_time(cls, tm: Frame, precision: str = "s", shift_forward=False) -> str:
        """将时间转换成客户端对应的精度,并以 RFC3339 timestamps格式串(即influxdb要求的格式)返回。
        如果这个时间是作为查询的range中的结束时间使用时,由于influx查询的时间范围是左闭右开的,因此如果你需要查询的是一个闭区间,则需要将`end`的时间向前偏移一个精度。通过传入`shift_forward = True`可以完成这种转换。
        Examples:
            >>> # by default, the precision is seconds, and convert a date
            >>> Flux.format_time(datetime.date(2019, 1, 1))
            '2019-01-01T00:00:00Z'
            >>> # set precision to ms, convert a time
            >>> Flux.format_time(datetime.datetime(1978, 7, 8, 12, 34, 56, 123456), precision="ms")
            '1978-07-08T12:34:56.123Z'
            >>> # convert and forward shift
            >>> Flux.format_time(datetime.date(1978, 7, 8), shift_forward = True)
            '1978-07-08T00:00:01Z'
        Args:
            tm : 待格式化的时间
            precision: 时间精度,可选值为:'s', 'ms', 'us'
            shift_forward: 如果为True,则将end向前偏移一个精度
        Returns:
            调整后符合influx时间规范的时间(字符串表示)
        """
        timespec = {"s": "seconds", "ms": "milliseconds", "us": "microseconds"}.get(
            precision
        )
        if timespec is None:
            raise ValueError(
                f"precision must be one of 's', 'ms', 'us', but got {precision}"
            )
        tm = arrow.get(tm).naive
        if shift_forward:
            tm = tm + datetime.timedelta(**{timespec: 1})
        return tm.isoformat(sep="T", timespec=timespec) + "Z"
    def tags(self, tags: DefaultDict[str, List[str]]) -> "Flux":
        """给查询添加tags过滤条件
        此查询条件为过滤条件,并非必须。如果查询中没有指定tags,则会返回所有记录。
        在实现上,既可以使用`contains`语法,也可以使用`or`语法(由于一条记录只能属于一个tag,所以,当指定多个tag进行查询时,它们之间的关系应该为`or`)。经验证,contains语法会始终先将所有符合条件的记录检索出来,再进行过滤。这样的效率比较低,特别是当tags的数量较少时,会远远比使用or语法慢。
        Raises:
            DuplicateOperationError: 一个查询中只允许执行一次,如果tag filter表达式已经存在,则抛出异常
        Args:
            tags : tags是一个{tagname: Union[str,[tag_values]]}对象。
        Examples:
            >>> flux = Flux()
            >>> flux.tags({"code": ["000001", "000002"], "name": ["浦发银行"]}).expressions["tags"]
            '  |> filter(fn: (r) => r["code"] == "000001" or r["code"] == "000002" or r["name"] == "浦发银行")'
        Returns:
            Flux对象,以便进行管道操作
        """
        if "tags" in self.expressions:
            raise DuplicateOperationError("tags has been set")
        filters = []
        for tag, values in tags.items():
            assert (
                isinstance(values, str) or len(values) > 0
            ), f"tag {tag} should not be empty or None"
            if isinstance(values, str):
                values = [values]
            for v in values:
                filters.append(f'r["{tag}"] == "{v}"')
        op_expression = " or ".join(filters)
        self.expressions["tags"] = f"  |> filter(fn: (r) => {op_expression})"
        return self
    def fields(self, fields: List, reserve_time_stamp: bool = True) -> "Flux":
        """给查询添加field过滤条件
        此查询条件为过滤条件,用以指定哪些field会出现在查询结果中,并非必须。如果查询中没有指定tags,则会返回所有记录。
        由于一条记录只能属于一个_field,所以,当指定多个_field进行查询时,它们之间的关系应该为`or`。
        Raises:
            DuplicateOperationError: 一个查询中只允许执行一次,如果filed filter表达式已经存在,则抛出异常
        Args:
            fields: 待查询的field列表
            reserve_time_stamp: 是否保留时间戳`_time`,默认为True
        Returns:
            Flux对象,以便进行管道操作
        """
        if "fields" in self.expressions:
            raise DuplicateOperationError("fields has been set")
        self._cols = fields.copy()
        if reserve_time_stamp and "_time" not in self._cols:
            self._cols.append("_time")
        self._cols = sorted(self._cols)
        filters = [f'r["_field"] == "{name}"' for name in self._cols]
        self.expressions["fields"] = f"  |> filter(fn: (r) => {' or '.join(filters)})"
        return self
    def pivot(
        self,
        row_keys: List[str] = ["_time"],
        column_keys=["_field"],
        value_column: str = "_value",
    ) -> "Flux":
        """pivot用来将以列为单位的数据转换为以行为单位的数据
        Flux查询返回的结果通常都是以列为单位的数据,增加本pivot条件后,结果将被转换成为以行为单位的数据再返回。
        这里实现的是measurement内的转换,请参考 [pivot](https://docs.influxdata.com/flux/v0.x/stdlib/universe/pivot/#align-fields-within-each-measurement-that-have-the-same-timestamp)
        Args:
            row_keys: 惟一确定输出中一行数据的列名字, 默认为["_time"]
            column_keys: 列名称列表,默认为["_field"]
            value_column: 值列名,默认为"_value"
        Returns:
            Flux对象,以便进行管道操作
        """
        if "pivot" in self.expressions:
            raise DuplicateOperationError("pivot has been set")
        columns = ",".join([f'"{name}"' for name in column_keys])
        rowkeys = ",".join([f'"{name}"' for name in row_keys])
        self.expressions[
            "pivot"
        ] = f'  |> pivot(columnKey: [{columns}], rowKey: [{rowkeys}], valueColumn: "{value_column}")'
        return self
    def sort(self, by: List[str] = None, desc: bool = False) -> "Flux":
        """按照指定的列进行排序
        根据[influxdb doc](https://docs.influxdata.com/influxdb/v2.0/query-data/flux/first-last/), 查询返回值默认地按时间排序。因此,如果仅仅是要求查询结果按时间排序,无须调用此API,但是,此API提供了按其它字段排序的能力。
        另外,在一个有5000多个tag,共返回1M条记录的测试中,测试验证返回记录确实按_time升序排列。
        Args:
            by: 指定排序的列名称列表
        Returns:
            Flux对象,以便进行管道操作
        """
        if "sort" in self.expressions:
            raise DuplicateOperationError("sort has been set")
        if by is None:
            by = ["_value"]
        if isinstance(by, str):
            by = [by]
        columns_ = ",".join([f'"{name}"' for name in by])
        desc = "true" if desc else "false"
        self.expressions["sort"] = f"  |> sort(columns: [{columns_}], desc: {desc})"
        return self
    def group(self, by: Tuple[str]) -> "Flux":
        """[summary]
        Returns:
            [description]
        """
        if "group" in self.expressions:
            raise DuplicateOperationError("group has been set")
        if isinstance(by, str):
            by = [by]
        cols = ",".join([f'"{col}"' for col in by])
        self.expressions["group"] = f"  |> group(columns: [{cols}])"
        return self
    def latest(self, n: int) -> "Flux":
        """获取最后n条数据,按时间增序返回
        Flux查询的增强功能,相当于top + sort + limit
        Args:
            n: 最后n条数据
        Returns:
            Flux对象,以便进行管道操作
        """
        assert "top" not in self.expressions, "top and last_n can not be used together"
        assert (
            "sort" not in self.expressions
        ), "sort and last_n can not be used together"
        assert (
            "limit" not in self.expressions
        ), "limit and last_n can not be used together"
        self._last_n = n
        return self
    @property
    def cols(self) -> List[str]:
        """the columns or the return records
        the implementation is buggy. Influx doesn't tell us in which order these columns are.
        Returns:
            the columns name of the return records
        """
        # fixme: if keep in expression, then return group key + tag key + value key
        # if keep not in expression, then stream, table, _time, ...
        return sorted(self._cols)
    def delete(
        self,
        measurement: str,
        stop: datetime.datetime,
        tags: dict = {},
        start: datetime.datetime = None,
        precision: str = "s",
    ) -> dict:
        """构建删除语句。
        according to [delete-predicate](https://docs.influxdata.com/influxdb/v2.1/reference/syntax/delete-predicate/), delete只支持AND逻辑操作,只支持“=”操作,不支持“!=”操作,可以使用任何字段或者tag,但不包括_time和_value字段。
        由于influxdb这一段文档不是很清楚,根据试验结果,目前仅支持按时间范围和tags进行删除较好。如果某个column的值类型是字符串,则也可以通过`tags`参数传入,匹配后删除。但如果传入了非字符串类型的column,则将得到无法预料的结果。
        Args:
            measurement : [description]
            stop : [description]
            tags : 按tags和匹配的值进行删除。传入的tags中,key为tag名称,value为tag要匹配的取值,可以为str或者List[str]。
            start : 起始时间。如果省略,则使用EPOCH_START.
            precision : 时间精度。可以为“s”,“ms”,“us”
        Returns:
            删除语句
        """
        timespec = {"s": "seconds", "ms": "milliseconds", "us": "microseconds"}.get(
            precision
        )
        if start is None:
            start = self.EPOCH_START.isoformat(timespec=timespec) + "Z"
        predicate = [f'_measurement="{measurement}"']
        for key, value in tags.items():
            if isinstance(value, list):
                predicate.extend([f'{key} = "{v}"' for v in value])
            else:
                predicate.append(f'{key} = "{value}"')
        command = {
            "start": start,
            "stop": f"{stop.isoformat(timespec=timespec)}Z",
            "predicate": " AND ".join(predicate),
        }
        return command
    def drop(self, cols: List[str]) -> "Flux":
        """use this to drop columns before return result
        Args:
            cols : the name of columns to be dropped
        Returns:
            Flux object, to support pipe operation
        """
        if "drop" in self.expressions:
            raise DuplicateOperationError("drop operation has been set already")
        # add surrounding quotes
        _cols = [f'"{c}"' for c in cols]
        self.expressions["drop"] = f"  |> drop(columns: [{','.join(_cols)}])"
        return self
    def drop_sys_cols(self, cols: List[str] = None) -> "Flux":
        """use this to drop ["_start", "_stop", "_measurement"], plus columns specified in `cols`, before return query result
        please be noticed, after drop sys columns, there's still two sys columns left, which is "_time" and "table", and "_time" should usually be kept, "table" is one we're not able to removed. If you don't like _time in return result, you can specify it in `cols` parameter.
        Args:
            cols : the extra columns to be dropped
        Returns:
            Flux query object
        """
        _cols = ["_start", "_stop", "_measurement"]
        if cols is not None:
            _cols.extend(cols)
        return self.drop(_cols)
cols: List[str]
  
      property
      readonly
  
¶
    the columns or the return records
the implementation is buggy. Influx doesn't tell us in which order these columns are.
Returns:
| Type | Description | 
|---|---|
| List[str] | the columns name of the return records | 
__init__(self, auto_pivot=True, no_sys_cols=True)
  
      special
  
¶
    初始化Flux对象
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| auto_pivot | 是否自动将查询列字段组装成行. Defaults to True. | True | |
| no_sys_cols | 是否自动将系统字段删除. Defaults to True.请参考drop_sys_cols | True | 
Source code in omicron/dal/influx/flux.py
          def __init__(self, auto_pivot=True, no_sys_cols=True):
    """初始化Flux对象
    Args:
        auto_pivot : 是否自动将查询列字段组装成行. Defaults to True.
        no_sys_cols: 是否自动将系统字段删除. Defaults to True.请参考[drop_sys_cols][omicron.dal.influx.flux.Flux.drop_sys_cols]
    """
    self._cols = None
    self.expressions = defaultdict(list)
    self._auto_pivot = auto_pivot
    self._last_n = None
    self.no_sys_cols = no_sys_cols
bucket(self, bucket)
¶
    add bucket to query expression
Exceptions:
| Type | Description | 
|---|---|
| DuplicateOperationError | 一个查询中只允许指定一个source,如果表达式中已经指定了bucket,则抛出异常 | 
Returns:
| Type | Description | 
|---|---|
| Flux | Flux对象 | 
Source code in omicron/dal/influx/flux.py
          def bucket(self, bucket: str) -> "Flux":
    """add bucket to query expression
    Raises:
        DuplicateOperationError: 一个查询中只允许指定一个source,如果表达式中已经指定了bucket,则抛出异常
    Returns:
        Flux对象
    """
    if "bucket" in self.expressions:
        raise DuplicateOperationError("bucket has been set")
    self.expressions["bucket"] = f'from(bucket: "{bucket}")'
    return self
delete(self, measurement, stop, tags={}, start=None, precision='s')
¶
    构建删除语句。
according to delete-predicate, delete只支持AND逻辑操作,只支持“=”操作,不支持“!=”操作,可以使用任何字段或者tag,但不包括_time和_value字段。
由于influxdb这一段文档不是很清楚,根据试验结果,目前仅支持按时间范围和tags进行删除较好。如果某个column的值类型是字符串,则也可以通过tags参数传入,匹配后删除。但如果传入了非字符串类型的column,则将得到无法预料的结果。
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| measurement | [description] | required | |
| stop | [description] | required | |
| tags | 按tags和匹配的值进行删除。传入的tags中,key为tag名称,value为tag要匹配的取值,可以为str或者List[str]。 | {} | |
| start | 起始时间。如果省略,则使用EPOCH_START. | None | |
| precision | 时间精度。可以为“s”,“ms”,“us” | 's' | 
Returns:
| Type | Description | 
|---|---|
| dict | 删除语句 | 
Source code in omicron/dal/influx/flux.py
          def delete(
    self,
    measurement: str,
    stop: datetime.datetime,
    tags: dict = {},
    start: datetime.datetime = None,
    precision: str = "s",
) -> dict:
    """构建删除语句。
    according to [delete-predicate](https://docs.influxdata.com/influxdb/v2.1/reference/syntax/delete-predicate/), delete只支持AND逻辑操作,只支持“=”操作,不支持“!=”操作,可以使用任何字段或者tag,但不包括_time和_value字段。
    由于influxdb这一段文档不是很清楚,根据试验结果,目前仅支持按时间范围和tags进行删除较好。如果某个column的值类型是字符串,则也可以通过`tags`参数传入,匹配后删除。但如果传入了非字符串类型的column,则将得到无法预料的结果。
    Args:
        measurement : [description]
        stop : [description]
        tags : 按tags和匹配的值进行删除。传入的tags中,key为tag名称,value为tag要匹配的取值,可以为str或者List[str]。
        start : 起始时间。如果省略,则使用EPOCH_START.
        precision : 时间精度。可以为“s”,“ms”,“us”
    Returns:
        删除语句
    """
    timespec = {"s": "seconds", "ms": "milliseconds", "us": "microseconds"}.get(
        precision
    )
    if start is None:
        start = self.EPOCH_START.isoformat(timespec=timespec) + "Z"
    predicate = [f'_measurement="{measurement}"']
    for key, value in tags.items():
        if isinstance(value, list):
            predicate.extend([f'{key} = "{v}"' for v in value])
        else:
            predicate.append(f'{key} = "{value}"')
    command = {
        "start": start,
        "stop": f"{stop.isoformat(timespec=timespec)}Z",
        "predicate": " AND ".join(predicate),
    }
    return command
drop(self, cols)
¶
    use this to drop columns before return result
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| cols | the name of columns to be dropped | required | 
Returns:
| Type | Description | 
|---|---|
| Flux | Flux object, to support pipe operation | 
Source code in omicron/dal/influx/flux.py
          def drop(self, cols: List[str]) -> "Flux":
    """use this to drop columns before return result
    Args:
        cols : the name of columns to be dropped
    Returns:
        Flux object, to support pipe operation
    """
    if "drop" in self.expressions:
        raise DuplicateOperationError("drop operation has been set already")
    # add surrounding quotes
    _cols = [f'"{c}"' for c in cols]
    self.expressions["drop"] = f"  |> drop(columns: [{','.join(_cols)}])"
    return self
drop_sys_cols(self, cols=None)
¶
    use this to drop ["_start", "_stop", "_measurement"], plus columns specified in cols, before return query result
please be noticed, after drop sys columns, there's still two sys columns left, which is "_time" and "table", and "_time" should usually be kept, "table" is one we're not able to removed. If you don't like _time in return result, you can specify it in cols parameter.
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| cols | the extra columns to be dropped | None | 
Returns:
| Type | Description | 
|---|---|
| Flux | Flux query object | 
Source code in omicron/dal/influx/flux.py
          def drop_sys_cols(self, cols: List[str] = None) -> "Flux":
    """use this to drop ["_start", "_stop", "_measurement"], plus columns specified in `cols`, before return query result
    please be noticed, after drop sys columns, there's still two sys columns left, which is "_time" and "table", and "_time" should usually be kept, "table" is one we're not able to removed. If you don't like _time in return result, you can specify it in `cols` parameter.
    Args:
        cols : the extra columns to be dropped
    Returns:
        Flux query object
    """
    _cols = ["_start", "_stop", "_measurement"]
    if cols is not None:
        _cols.extend(cols)
    return self.drop(_cols)
fields(self, fields, reserve_time_stamp=True)
¶
    给查询添加field过滤条件
此查询条件为过滤条件,用以指定哪些field会出现在查询结果中,并非必须。如果查询中没有指定tags,则会返回所有记录。
由于一条记录只能属于一个_field,所以,当指定多个_field进行查询时,它们之间的关系应该为or。
Exceptions:
| Type | Description | 
|---|---|
| DuplicateOperationError | 一个查询中只允许执行一次,如果filed filter表达式已经存在,则抛出异常 | 
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| fields | List | 待查询的field列表 | required | 
| reserve_time_stamp | bool | 是否保留时间戳 | True | 
Returns:
| Type | Description | 
|---|---|
| Flux | Flux对象,以便进行管道操作 | 
Source code in omicron/dal/influx/flux.py
          def fields(self, fields: List, reserve_time_stamp: bool = True) -> "Flux":
    """给查询添加field过滤条件
    此查询条件为过滤条件,用以指定哪些field会出现在查询结果中,并非必须。如果查询中没有指定tags,则会返回所有记录。
    由于一条记录只能属于一个_field,所以,当指定多个_field进行查询时,它们之间的关系应该为`or`。
    Raises:
        DuplicateOperationError: 一个查询中只允许执行一次,如果filed filter表达式已经存在,则抛出异常
    Args:
        fields: 待查询的field列表
        reserve_time_stamp: 是否保留时间戳`_time`,默认为True
    Returns:
        Flux对象,以便进行管道操作
    """
    if "fields" in self.expressions:
        raise DuplicateOperationError("fields has been set")
    self._cols = fields.copy()
    if reserve_time_stamp and "_time" not in self._cols:
        self._cols.append("_time")
    self._cols = sorted(self._cols)
    filters = [f'r["_field"] == "{name}"' for name in self._cols]
    self.expressions["fields"] = f"  |> filter(fn: (r) => {' or '.join(filters)})"
    return self
format_time(tm, precision='s', shift_forward=False)
  
      classmethod
  
¶
    将时间转换成客户端对应的精度,并以 RFC3339 timestamps格式串(即influxdb要求的格式)返回。
如果这个时间是作为查询的range中的结束时间使用时,由于influx查询的时间范围是左闭右开的,因此如果你需要查询的是一个闭区间,则需要将end的时间向前偏移一个精度。通过传入shift_forward = True可以完成这种转换。
Examples:
>>> # by default, the precision is seconds, and convert a date
>>> Flux.format_time(datetime.date(2019, 1, 1))
'2019-01-01T00:00:00Z'
>>> # set precision to ms, convert a time
>>> Flux.format_time(datetime.datetime(1978, 7, 8, 12, 34, 56, 123456), precision="ms")
'1978-07-08T12:34:56.123Z'
>>> # convert and forward shift
>>> Flux.format_time(datetime.date(1978, 7, 8), shift_forward = True)
'1978-07-08T00:00:01Z'
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| tm | 待格式化的时间 | required | |
| precision | str | 时间精度,可选值为:'s', 'ms', 'us' | 's' | 
| shift_forward | 如果为True,则将end向前偏移一个精度 | False | 
Returns:
| Type | Description | 
|---|---|
| str | 调整后符合influx时间规范的时间(字符串表示) | 
Source code in omicron/dal/influx/flux.py
          @classmethod
def format_time(cls, tm: Frame, precision: str = "s", shift_forward=False) -> str:
    """将时间转换成客户端对应的精度,并以 RFC3339 timestamps格式串(即influxdb要求的格式)返回。
    如果这个时间是作为查询的range中的结束时间使用时,由于influx查询的时间范围是左闭右开的,因此如果你需要查询的是一个闭区间,则需要将`end`的时间向前偏移一个精度。通过传入`shift_forward = True`可以完成这种转换。
    Examples:
        >>> # by default, the precision is seconds, and convert a date
        >>> Flux.format_time(datetime.date(2019, 1, 1))
        '2019-01-01T00:00:00Z'
        >>> # set precision to ms, convert a time
        >>> Flux.format_time(datetime.datetime(1978, 7, 8, 12, 34, 56, 123456), precision="ms")
        '1978-07-08T12:34:56.123Z'
        >>> # convert and forward shift
        >>> Flux.format_time(datetime.date(1978, 7, 8), shift_forward = True)
        '1978-07-08T00:00:01Z'
    Args:
        tm : 待格式化的时间
        precision: 时间精度,可选值为:'s', 'ms', 'us'
        shift_forward: 如果为True,则将end向前偏移一个精度
    Returns:
        调整后符合influx时间规范的时间(字符串表示)
    """
    timespec = {"s": "seconds", "ms": "milliseconds", "us": "microseconds"}.get(
        precision
    )
    if timespec is None:
        raise ValueError(
            f"precision must be one of 's', 'ms', 'us', but got {precision}"
        )
    tm = arrow.get(tm).naive
    if shift_forward:
        tm = tm + datetime.timedelta(**{timespec: 1})
    return tm.isoformat(sep="T", timespec=timespec) + "Z"
group(self, by)
¶
    [summary]
Returns:
| Type | Description | 
|---|---|
| Flux | [description] | 
Source code in omicron/dal/influx/flux.py
          def group(self, by: Tuple[str]) -> "Flux":
    """[summary]
    Returns:
        [description]
    """
    if "group" in self.expressions:
        raise DuplicateOperationError("group has been set")
    if isinstance(by, str):
        by = [by]
    cols = ",".join([f'"{col}"' for col in by])
    self.expressions["group"] = f"  |> group(columns: [{cols}])"
    return self
latest(self, n)
¶
    获取最后n条数据,按时间增序返回
Flux查询的增强功能,相当于top + sort + limit
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| n | int | 最后n条数据 | required | 
Returns:
| Type | Description | 
|---|---|
| Flux | Flux对象,以便进行管道操作 | 
Source code in omicron/dal/influx/flux.py
          def latest(self, n: int) -> "Flux":
    """获取最后n条数据,按时间增序返回
    Flux查询的增强功能,相当于top + sort + limit
    Args:
        n: 最后n条数据
    Returns:
        Flux对象,以便进行管道操作
    """
    assert "top" not in self.expressions, "top and last_n can not be used together"
    assert (
        "sort" not in self.expressions
    ), "sort and last_n can not be used together"
    assert (
        "limit" not in self.expressions
    ), "limit and last_n can not be used together"
    self._last_n = n
    return self
limit(self, limit)
¶
    添加返回记录数限制
Exceptions:
| Type | Description | 
|---|---|
| DuplicateOperationError | 一个查询中只允许指定一次limit,如果limit表达式已经存在,则抛出异常 | 
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| limit | int | 返回记录数限制 | required | 
Returns:
| Type | Description | 
|---|---|
| Flux | Flux对象,以便进行管道操作 | 
Source code in omicron/dal/influx/flux.py
          def limit(self, limit: int) -> "Flux":
    """添加返回记录数限制
    Raises:
        DuplicateOperationError: 一个查询中只允许指定一次limit,如果limit表达式已经存在,则抛出异常
    Args:
        limit: 返回记录数限制
    Returns:
        Flux对象,以便进行管道操作
    """
    if "limit" in self.expressions:
        raise DuplicateOperationError("limit has been set")
    self.expressions["limit"] = "  |> limit(n: %d)" % limit
    return self
measurement(self, measurement)
¶
    add measurement filter to query
Exceptions:
| Type | Description | 
|---|---|
| DuplicateOperationError | 一次查询中只允许指定一个measurement, 如果表达式中已经存在measurement, 则抛出异常 | 
Returns:
| Type | Description | 
|---|---|
| Flux | Flux对象自身,以便进行管道操作 | 
Source code in omicron/dal/influx/flux.py
          def measurement(self, measurement: str) -> "Flux":
    """add measurement filter to query
    Raises:
        DuplicateOperationError: 一次查询中只允许指定一个measurement, 如果表达式中已经存在measurement, 则抛出异常
    Returns:
        Flux对象自身,以便进行管道操作
    """
    if "measurement" in self.expressions:
        raise DuplicateOperationError("measurement has been set")
    self.expressions[
        "measurement"
    ] = f'  |> filter(fn: (r) => r["_measurement"] == "{measurement}")'
    return self
pivot(self, row_keys=['_time'], column_keys=['_field'], value_column='_value')
¶
    pivot用来将以列为单位的数据转换为以行为单位的数据
Flux查询返回的结果通常都是以列为单位的数据,增加本pivot条件后,结果将被转换成为以行为单位的数据再返回。
这里实现的是measurement内的转换,请参考 pivot
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| row_keys | List[str] | 惟一确定输出中一行数据的列名字, 默认为["_time"] | ['_time'] | 
| column_keys | 列名称列表,默认为["_field"] | ['_field'] | |
| value_column | str | 值列名,默认为"_value" | '_value' | 
Returns:
| Type | Description | 
|---|---|
| Flux | Flux对象,以便进行管道操作 | 
Source code in omicron/dal/influx/flux.py
          def pivot(
    self,
    row_keys: List[str] = ["_time"],
    column_keys=["_field"],
    value_column: str = "_value",
) -> "Flux":
    """pivot用来将以列为单位的数据转换为以行为单位的数据
    Flux查询返回的结果通常都是以列为单位的数据,增加本pivot条件后,结果将被转换成为以行为单位的数据再返回。
    这里实现的是measurement内的转换,请参考 [pivot](https://docs.influxdata.com/flux/v0.x/stdlib/universe/pivot/#align-fields-within-each-measurement-that-have-the-same-timestamp)
    Args:
        row_keys: 惟一确定输出中一行数据的列名字, 默认为["_time"]
        column_keys: 列名称列表,默认为["_field"]
        value_column: 值列名,默认为"_value"
    Returns:
        Flux对象,以便进行管道操作
    """
    if "pivot" in self.expressions:
        raise DuplicateOperationError("pivot has been set")
    columns = ",".join([f'"{name}"' for name in column_keys])
    rowkeys = ",".join([f'"{name}"' for name in row_keys])
    self.expressions[
        "pivot"
    ] = f'  |> pivot(columnKey: [{columns}], rowKey: [{rowkeys}], valueColumn: "{value_column}")'
    return self
range(self, start, end, right_close=True, precision='s')
¶
    添加时间范围过滤
必须指定的查询条件,否则influxdb会报unbound查询错,因为这种情况下,返回的数据量将非常大。
在格式化时间时,需要根据precision生成时间字符串。在向Influxdb发送请求时,应该注意查询参数中指定的时间精度与这里使用的保持一致。
Influxdb的查询结果默认不包含结束时间,当right_close指定为True时,我们将根据指定的精度修改end时间,使之仅比end多一个时间单位,从而保证查询结果会包含end。
Exceptions:
| Type | Description | 
|---|---|
| DuplicateOperationError | 一个查询中只允许指定一次时间范围,如果range表达式已经存在,则抛出异常 | 
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| start | Union[datetime.date, datetime.datetime] | 开始时间 | required | 
| end | Union[datetime.date, datetime.datetime] | 结束时间 | required | 
| right_close | 查询结果是否包含结束时间。 | True | |
| precision | 时间精度,默认为秒。 | 's' | 
Returns:
| Type | Description | 
|---|---|
| Flux | Flux对象,以支持管道操作 | 
Source code in omicron/dal/influx/flux.py
          def range(
    self, start: Frame, end: Frame, right_close=True, precision="s"
) -> "Flux":
    """添加时间范围过滤
    必须指定的查询条件,否则influxdb会报unbound查询错,因为这种情况下,返回的数据量将非常大。
    在格式化时间时,需要根据`precision`生成时间字符串。在向Influxdb发送请求时,应该注意查询参数中指定的时间精度与这里使用的保持一致。
    Influxdb的查询结果默认不包含结束时间,当`right_close`指定为True时,我们将根据指定的精度修改`end`时间,使之仅比`end`多一个时间单位,从而保证查询结果会包含`end`。
    Raises:
        DuplicateOperationError: 一个查询中只允许指定一次时间范围,如果range表达式已经存在,则抛出异常
    Args:
        start: 开始时间
        end: 结束时间
        right_close: 查询结果是否包含结束时间。
        precision: 时间精度,默认为秒。
    Returns:
        Flux对象,以支持管道操作
    """
    if "range" in self.expressions:
        raise DuplicateOperationError("range has been set")
    if precision not in ["s", "ms", "us"]:
        raise AssertionError("precision must be 's', 'ms' or 'us'")
    end = self.format_time(end, precision, right_close)
    start = self.format_time(start, precision)
    self.expressions["range"] = f"  |> range(start: {start}, stop: {end})"
    return self
sort(self, by=None, desc=False)
¶
    按照指定的列进行排序
根据influxdb doc, 查询返回值默认地按时间排序。因此,如果仅仅是要求查询结果按时间排序,无须调用此API,但是,此API提供了按其它字段排序的能力。
另外,在一个有5000多个tag,共返回1M条记录的测试中,测试验证返回记录确实按_time升序排列。
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| by | List[str] | 指定排序的列名称列表 | None | 
Returns:
| Type | Description | 
|---|---|
| Flux | Flux对象,以便进行管道操作 | 
Source code in omicron/dal/influx/flux.py
          def sort(self, by: List[str] = None, desc: bool = False) -> "Flux":
    """按照指定的列进行排序
    根据[influxdb doc](https://docs.influxdata.com/influxdb/v2.0/query-data/flux/first-last/), 查询返回值默认地按时间排序。因此,如果仅仅是要求查询结果按时间排序,无须调用此API,但是,此API提供了按其它字段排序的能力。
    另外,在一个有5000多个tag,共返回1M条记录的测试中,测试验证返回记录确实按_time升序排列。
    Args:
        by: 指定排序的列名称列表
    Returns:
        Flux对象,以便进行管道操作
    """
    if "sort" in self.expressions:
        raise DuplicateOperationError("sort has been set")
    if by is None:
        by = ["_value"]
    if isinstance(by, str):
        by = [by]
    columns_ = ",".join([f'"{name}"' for name in by])
    desc = "true" if desc else "false"
    self.expressions["sort"] = f"  |> sort(columns: [{columns_}], desc: {desc})"
    return self
tags(self, tags)
¶
    给查询添加tags过滤条件
此查询条件为过滤条件,并非必须。如果查询中没有指定tags,则会返回所有记录。
在实现上,既可以使用contains语法,也可以使用or语法(由于一条记录只能属于一个tag,所以,当指定多个tag进行查询时,它们之间的关系应该为or)。经验证,contains语法会始终先将所有符合条件的记录检索出来,再进行过滤。这样的效率比较低,特别是当tags的数量较少时,会远远比使用or语法慢。
Exceptions:
| Type | Description | 
|---|---|
| DuplicateOperationError | 一个查询中只允许执行一次,如果tag filter表达式已经存在,则抛出异常 | 
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| tags | tags是一个{tagname: Union[str,[tag_values]]}对象。 | required | 
Examples:
>>> flux = Flux()
>>> flux.tags({"code": ["000001", "000002"], "name": ["浦发银行"]}).expressions["tags"]
'  |> filter(fn: (r) => r["code"] == "000001" or r["code"] == "000002" or r["name"] == "浦发银行")'
Returns:
| Type | Description | 
|---|---|
| Flux | Flux对象,以便进行管道操作 | 
Source code in omicron/dal/influx/flux.py
          def tags(self, tags: DefaultDict[str, List[str]]) -> "Flux":
    """给查询添加tags过滤条件
    此查询条件为过滤条件,并非必须。如果查询中没有指定tags,则会返回所有记录。
    在实现上,既可以使用`contains`语法,也可以使用`or`语法(由于一条记录只能属于一个tag,所以,当指定多个tag进行查询时,它们之间的关系应该为`or`)。经验证,contains语法会始终先将所有符合条件的记录检索出来,再进行过滤。这样的效率比较低,特别是当tags的数量较少时,会远远比使用or语法慢。
    Raises:
        DuplicateOperationError: 一个查询中只允许执行一次,如果tag filter表达式已经存在,则抛出异常
    Args:
        tags : tags是一个{tagname: Union[str,[tag_values]]}对象。
    Examples:
        >>> flux = Flux()
        >>> flux.tags({"code": ["000001", "000002"], "name": ["浦发银行"]}).expressions["tags"]
        '  |> filter(fn: (r) => r["code"] == "000001" or r["code"] == "000002" or r["name"] == "浦发银行")'
    Returns:
        Flux对象,以便进行管道操作
    """
    if "tags" in self.expressions:
        raise DuplicateOperationError("tags has been set")
    filters = []
    for tag, values in tags.items():
        assert (
            isinstance(values, str) or len(values) > 0
        ), f"tag {tag} should not be empty or None"
        if isinstance(values, str):
            values = [values]
        for v in values:
            filters.append(f'r["{tag}"] == "{v}"')
    op_expression = " or ".join(filters)
    self.expressions["tags"] = f"  |> filter(fn: (r) => {op_expression})"
    return self
to_timestamp(tm, precision='s')
  
      classmethod
  
¶
    将时间根据精度转换为unix时间戳
在往influxdb写入数据时,line-protocol要求的时间戳为unix timestamp,并且与其精度对应。
influxdb始终使用UTC时间,因此,tm也必须已经转换成UTC时间。
Parameters:
| Name | Type | Description | Default | 
|---|---|---|---|
| tm | Union[datetime.date, datetime.datetime] | 时间 | required | 
| precision | str | 时间精度,默认为秒。 | 's' | 
Returns:
| Type | Description | 
|---|---|
| int | 时间戳 | 
Source code in omicron/dal/influx/flux.py
          @classmethod
def to_timestamp(cls, tm: Frame, precision: str = "s") -> int:
    """将时间根据精度转换为unix时间戳
    在往influxdb写入数据时,line-protocol要求的时间戳为unix timestamp,并且与其精度对应。
    influxdb始终使用UTC时间,因此,`tm`也必须已经转换成UTC时间。
    Args:
        tm: 时间
        precision: 时间精度,默认为秒。
    Returns:
        时间戳
    """
    if precision not in ["s", "ms", "us"]:
        raise AssertionError("precision must be 's', 'ms' or 'us'")
    # get int repr of tm, in seconds unit
    if isinstance(tm, np.datetime64):
        tm = tm.astype("datetime64[s]").astype("int")
    elif isinstance(tm, datetime.datetime):
        tm = tm.timestamp()
    else:
        tm = arrow.get(tm).timestamp()
    return int(tm * 10 ** ({"s": 0, "ms": 3, "us": 6}[precision]))