Skip to content

InfluxClient

InfluxClient - the performanct async client for influxdb

Source code in omicron/dal/influx/influxclient.py
class InfluxClient:
    def __init__(
        self,
        url: str,
        token: str,
        bucket: str,
        org: str = None,
        enable_compress=False,
        chunk_size: int = 5000,
        precision: str = "s",
    ):
        """[summary]

        Args:
            url ([type]): [description]
            token ([type]): [description]
            bucket ([type]): [description]
            org ([type], optional): [description]. Defaults to None.
            enable_compress ([type], optional): [description]. Defaults to False.
            chunk_size: number of lines to be saved in one request
            precision: 支持的时间精度
        """
        self._url = url
        self._bucket = bucket
        self._enable_compress = enable_compress
        self._org = org
        self._org_id = None  # 需要时通过查询获取,此后不再更新
        self._token = token

        # influxdb 2.0起支持的时间精度有:ns, us, ms, s。本客户端只支持s, ms和us
        self._precision = precision.lower()
        if self._precision not in ["s", "ms", "us"]:  # pragma: no cover
            raise ValueError("precision must be one of ['s', 'ms', 'us']")

        self._chunk_size = chunk_size

        # write
        self._write_url = f"{self._url}/api/v2/write?org={self._org}&bucket={self._bucket}&precision={self._precision}"

        self._write_headers = {
            "Content-Type": "text/plain; charset=utf-8",
            "Authorization": f"Token {token}",
            "Accept": "application/json",
        }

        if self._enable_compress:
            self._write_headers["Content-Encoding"] = "gzip"

        self._query_url = f"{self._url}/api/v2/query?org={self._org}"
        self._query_headers = {
            "Authorization": f"Token {token}",
            "Content-Type": "application/vnd.flux",
            # influx查询结果格式,无论如何指定(或者不指定),在2.1中始终是csv格式
            "Accept": "text/csv",
        }

        if self._enable_compress:
            self._query_headers["Accept-Encoding"] = "gzip"

        self._delete_url = (
            f"{self._url}/api/v2/delete?org={self._org}&bucket={self._bucket}"
        )
        self._delete_headers = {
            "Authorization": f"Token {token}",
            "Content-Type": "application/json",
        }

    async def save(
        self,
        data: Union[np.ndarray, DataFrame],
        measurement: str = None,
        tag_keys: List[str] = [],
        time_key: str = None,
        global_tags: Dict = {},
        chunk_size: int = None,
    ) -> None:
        """save `data` into influxdb

        if `data` is a pandas.DataFrame or numy structured array, it will be converted to line protocol and saved. If `data` is str, use `write` method instead.

        Args:
            data: data to be saved
            measurement: the name of measurement
            tag_keys: which columns name will be used as tags
            chunk_size: number of lines to be saved in one request. if it's -1, then all data will be written in one request. If it's None, then it will be set to `self._chunk_size`

        Raises:
            InfluxDBWriteError: if write failed

        """
        # todo: add more errors raise
        if isinstance(data, DataFrame):
            assert (
                measurement is not None
            ), "measurement must be specified when data is a DataFrame"

            if tag_keys:
                assert set(tag_keys) in set(
                    data.columns.tolist()
                ), "tag_keys must be in data.columns"

            serializer = DataframeSerializer(
                data,
                measurement,
                time_key,
                tag_keys,
                global_tags,
                precision=self._precision,
            )
            if chunk_size == -1:
                chunk_size = len(data)

            for lines in serializer.serialize(chunk_size or self._chunk_size):
                await self.write(lines)
        elif isinstance(data, np.ndarray):
            assert (
                measurement is not None
            ), "measurement must be specified when data is a numpy array"
            assert (
                time_key is not None
            ), "time_key must be specified when data is a numpy array"
            serializer = NumpySerializer(
                data,
                measurement,
                time_key,
                tag_keys,
                global_tags,
                time_precision=self._precision,
            )
            if chunk_size == -1:
                chunk_size = len(data)
            for lines in serializer.serialize(chunk_size or self._chunk_size):
                await self.write(lines)
        else:
            raise TypeError(
                f"data must be pandas.DataFrame, numpy array, got {type(data)}"
            )

    async def write(self, line_protocol: str):
        """将line-protocol数组写入influxdb

        Args:
            line_protocol: 待写入的数据,以line-protocol数组形式存在

        """
        # todo: add raise error declaration
        if self._enable_compress:
            line_protocol_ = gzip.compress(line_protocol.encode("utf-8"))
        else:
            line_protocol_ = line_protocol

        async with ClientSession() as session:
            async with session.post(
                self._write_url, data=line_protocol_, headers=self._write_headers
            ) as resp:
                if resp.status != 204:
                    err = await resp.json()
                    logger.warning(
                        "influxdb write error when processing: %s, err code: %s, message: %s",
                        {line_protocol[:100]},
                        err["code"],
                        err["message"],
                    )
                    logger.debug("data caused error:%s", line_protocol)
                    raise InfluxDBWriteError(
                        f"influxdb write failed, err: {err['message']}"
                    )

    async def query(self, flux: Union[Flux, str], deserializer: Callable = None) -> Any:
        """flux查询

        flux查询结果是一个以annotated csv格式存储的数据,例如:
        ```
        ,result,table,_time,code,amount,close,factor,high,low,open,volume
        ,_result,0,2019-01-01T00:00:00Z,000001.XSHE,100000000,5.15,1.23,5.2,5,5.1,1000000
        ```

        上述`result`中,事先通过Flux.keep()限制了返回的字段为_time,code,amount,close,factor,high,low,open,volume。influxdb查询返回结果时,总是按照字段名称升序排列。此外,总是会额外地返回_result, table两个字段。

        如果传入了deserializer,则会调用deserializer将其解析成为python对象。否则,返回bytes数据。

        Args:
            flux: flux查询语句
            deserializer: 反序列化函数

        Returns:
            如果未提供反序列化函数,则返回结果为bytes array(如果指定了compress=True,返回结果为gzip解压缩后的bytes array),否则返回反序列化后的python对象
        """
        if isinstance(flux, Flux):
            flux = str(flux)

        async with ClientSession() as session:
            async with session.post(
                self._query_url, data=flux, headers=self._query_headers
            ) as resp:
                if resp.status != 200:
                    err = await resp.json()
                    logger.warning(
                        f"influxdb query error: {err} when processing {flux[:500]}"
                    )
                    logger.debug("data caused error:%s", flux)
                    raise InfluxDBQueryError(
                        f"influxdb query failed, status code: {err['message']}"
                    )
                else:
                    # auto-unzip
                    body = await resp.read()
                    if deserializer:
                        try:
                            return deserializer(body)
                        except Exception as e:
                            logger.exception(e)
                            logger.warning(
                                "failed to deserialize data: %s, the query is:%s",
                                body,
                                flux[:500],
                            )
                            raise
                    else:
                        return body

    async def drop_measurement(self, measurement: str):
        """从influxdb中删除一个measurement

        调用此方法后,实际上该measurement仍然存在,只是没有数据。

        """
        # todo: add raise error declaration
        await self.delete(measurement, arrow.now().naive)

    async def delete(
        self,
        measurement: str,
        stop: datetime.datetime,
        tags: Optional[Dict[str, str]] = {},
        start: datetime.datetime = None,
        precision: str = "s",
    ):
        """删除influxdb中指定时间段内的数据

        关于参数,请参见[Flux.delete][omicron.dal.influx.flux.Flux.delete]。

        Args:
            measurement: 指定measurement名字
            stop: 待删除记录的结束时间
            start: 待删除记录的开始时间,如果未指定,则使用EPOCH_START
            tags: 按tag进行过滤的条件
            precision: 用以格式化起始和结束时间。

        Raises:
            InfluxDeleteError: 如果删除失败,则抛出此异常
        """
        # todo: add raise error declaration
        command = Flux().delete(
            measurement, stop, tags, start=start, precision=precision
        )

        async with ClientSession() as session:
            async with session.post(
                self._delete_url, data=json.dumps(command), headers=self._delete_headers
            ) as resp:
                if resp.status != 204:
                    err = await resp.json()
                    logger.warning(
                        "influxdb delete error: %s when processin command %s",
                        err["message"],
                        command,
                    )
                    raise InfluxDeleteError(
                        f"influxdb delete failed, status code: {err['message']}"
                    )

    async def list_buckets(self) -> List[Dict]:
        """列出influxdb中对应token能看到的所有的bucket

        Returns:
            list of buckets, each bucket is a dict with keys:
            ```
            id
            orgID, a 16 bytes hex string
            type, system or user
            description
            name
            retentionRules
            createdAt
            updatedAt
            links
            labels
        ```
        """
        url = f"{self._url}/api/v2/buckets"
        headers = {"Authorization": f"Token {self._token}"}
        async with ClientSession() as session:
            async with session.get(url, headers=headers) as resp:
                if resp.status != 200:
                    err = await resp.json()
                    raise InfluxSchemaError(
                        f"influxdb list bucket failed, status code: {err['message']}"
                    )
                else:
                    return (await resp.json())["buckets"]

    async def delete_bucket(self, bucket_id: str = None):
        """删除influxdb中指定bucket

        Args:
            bucket_id: 指定bucket的id。如果为None,则会删除本client对应的bucket。
        """
        if bucket_id is None:
            buckets = await self.list_buckets()
            for bucket in buckets:
                if bucket["type"] == "user" and bucket["name"] == self._bucket:
                    bucket_id = bucket["id"]
                    break
            else:
                raise BadParameterError(
                    "bucket_id is None, and we can't find bucket with name: %s"
                    % self._bucket
                )

        url = f"{self._url}/api/v2/buckets/{bucket_id}"
        headers = {"Authorization": f"Token {self._token}"}
        async with ClientSession() as session:
            async with session.delete(url, headers=headers) as resp:
                if resp.status != 204:
                    err = await resp.json()
                    logger.warning(
                        "influxdb delete bucket error: %s when processin command %s",
                        err["message"],
                        bucket_id,
                    )
                    raise InfluxSchemaError(
                        f"influxdb delete bucket failed, status code: {err['message']}"
                    )

    async def create_bucket(
        self, description=None, retention_rules: List[Dict] = None, org_id: str = None
    ) -> str:
        """创建influxdb中指定bucket

        Args:
            description: 指定bucket的描述
            org_id: 指定bucket所属的组织id,如果未指定,则使用本client对应的组织id。

        Raises:
            InfluxSchemaError: 当influxdb返回错误时,比如重复创建bucket等,会抛出此异常
        Returns:
            新创建的bucket的id
        """
        if org_id is None:
            org_id = await self.query_org_id()

        url = f"{self._url}/api/v2/buckets"
        headers = {"Authorization": f"Token {self._token}"}
        data = {
            "name": self._bucket,
            "orgID": org_id,
            "description": description,
            "retentionRules": retention_rules,
        }
        async with ClientSession() as session:
            async with session.post(
                url, data=json.dumps(data), headers=headers
            ) as resp:
                if resp.status != 201:
                    err = await resp.json()
                    logger.warning(
                        "influxdb create bucket error: %s when processin command %s",
                        err["message"],
                        data,
                    )
                    raise InfluxSchemaError(
                        f"influxdb create bucket failed, status code: {err['message']}"
                    )
                else:
                    result = await resp.json()
                    return result["id"]

    async def list_organizations(self, offset: int = 0, limit: int = 100) -> List[Dict]:
        """列出本客户端允许查询的所组织

        Args:
            offset : 分页起点
            limit : 每页size

        Raises:
            InfluxSchemaError: influxdb返回的错误

        Returns:
            list of organizations, each organization is a dict with keys:
            ```
            id      : the id of the org
            links
            name    : the name of the org
            description
            createdAt
            updatedAt
            ```
        """
        url = f"{self._url}/api/v2/orgs?offset={offset}&limit={limit}"
        headers = {"Authorization": f"Token {self._token}"}

        async with ClientSession() as session:
            async with session.get(url, headers=headers) as resp:
                if resp.status != 200:
                    err = await resp.json()
                    logger.warning("influxdb query orgs err: %s", err["message"])
                    raise InfluxSchemaError(
                        f"influxdb query orgs failed, status code: {err['message']}"
                    )
                else:
                    return (await resp.json())["orgs"]

    async def query_org_id(self, name: str = None) -> str:
        """通过组织名查找组织id

        只能查的本客户端允许查询的组织。如果name未提供,则使用本客户端创建时传入的组织名。

        Args:
            name: 指定组织名

        Returns:
            组织id
        """
        if name is None:
            name = self._org
        orgs = await self.list_organizations()
        for org in orgs:
            if org["name"] == name:
                return org["id"]

        raise BadParameterError(f"can't find org with name: {name}")

__init__(self, url, token, bucket, org=None, enable_compress=False, chunk_size=5000, precision='s') special

[summary]

Parameters:

Name Type Description Default
url [type]

[description]

required
token [type]

[description]

required
bucket [type]

[description]

required
org [type]

[description]. Defaults to None.

None
enable_compress [type]

[description]. Defaults to False.

False
chunk_size int

number of lines to be saved in one request

5000
precision str

支持的时间精度

's'
Source code in omicron/dal/influx/influxclient.py
def __init__(
    self,
    url: str,
    token: str,
    bucket: str,
    org: str = None,
    enable_compress=False,
    chunk_size: int = 5000,
    precision: str = "s",
):
    """[summary]

    Args:
        url ([type]): [description]
        token ([type]): [description]
        bucket ([type]): [description]
        org ([type], optional): [description]. Defaults to None.
        enable_compress ([type], optional): [description]. Defaults to False.
        chunk_size: number of lines to be saved in one request
        precision: 支持的时间精度
    """
    self._url = url
    self._bucket = bucket
    self._enable_compress = enable_compress
    self._org = org
    self._org_id = None  # 需要时通过查询获取,此后不再更新
    self._token = token

    # influxdb 2.0起支持的时间精度有:ns, us, ms, s。本客户端只支持s, ms和us
    self._precision = precision.lower()
    if self._precision not in ["s", "ms", "us"]:  # pragma: no cover
        raise ValueError("precision must be one of ['s', 'ms', 'us']")

    self._chunk_size = chunk_size

    # write
    self._write_url = f"{self._url}/api/v2/write?org={self._org}&bucket={self._bucket}&precision={self._precision}"

    self._write_headers = {
        "Content-Type": "text/plain; charset=utf-8",
        "Authorization": f"Token {token}",
        "Accept": "application/json",
    }

    if self._enable_compress:
        self._write_headers["Content-Encoding"] = "gzip"

    self._query_url = f"{self._url}/api/v2/query?org={self._org}"
    self._query_headers = {
        "Authorization": f"Token {token}",
        "Content-Type": "application/vnd.flux",
        # influx查询结果格式,无论如何指定(或者不指定),在2.1中始终是csv格式
        "Accept": "text/csv",
    }

    if self._enable_compress:
        self._query_headers["Accept-Encoding"] = "gzip"

    self._delete_url = (
        f"{self._url}/api/v2/delete?org={self._org}&bucket={self._bucket}"
    )
    self._delete_headers = {
        "Authorization": f"Token {token}",
        "Content-Type": "application/json",
    }

create_bucket(self, description=None, retention_rules=None, org_id=None) async

创建influxdb中指定bucket

Parameters:

Name Type Description Default
description

指定bucket的描述

None
org_id str

指定bucket所属的组织id,如果未指定,则使用本client对应的组织id。

None

Exceptions:

Type Description
InfluxSchemaError

当influxdb返回错误时,比如重复创建bucket等,会抛出此异常

Returns:

Type Description
str

新创建的bucket的id

Source code in omicron/dal/influx/influxclient.py
async def create_bucket(
    self, description=None, retention_rules: List[Dict] = None, org_id: str = None
) -> str:
    """创建influxdb中指定bucket

    Args:
        description: 指定bucket的描述
        org_id: 指定bucket所属的组织id,如果未指定,则使用本client对应的组织id。

    Raises:
        InfluxSchemaError: 当influxdb返回错误时,比如重复创建bucket等,会抛出此异常
    Returns:
        新创建的bucket的id
    """
    if org_id is None:
        org_id = await self.query_org_id()

    url = f"{self._url}/api/v2/buckets"
    headers = {"Authorization": f"Token {self._token}"}
    data = {
        "name": self._bucket,
        "orgID": org_id,
        "description": description,
        "retentionRules": retention_rules,
    }
    async with ClientSession() as session:
        async with session.post(
            url, data=json.dumps(data), headers=headers
        ) as resp:
            if resp.status != 201:
                err = await resp.json()
                logger.warning(
                    "influxdb create bucket error: %s when processin command %s",
                    err["message"],
                    data,
                )
                raise InfluxSchemaError(
                    f"influxdb create bucket failed, status code: {err['message']}"
                )
            else:
                result = await resp.json()
                return result["id"]

delete(self, measurement, stop, tags={}, start=None, precision='s') async

删除influxdb中指定时间段内的数据

关于参数,请参见Flux.delete

Parameters:

Name Type Description Default
measurement str

指定measurement名字

required
stop datetime

待删除记录的结束时间

required
start datetime

待删除记录的开始时间,如果未指定,则使用EPOCH_START

None
tags Optional[Dict[str, str]]

按tag进行过滤的条件

{}
precision str

用以格式化起始和结束时间。

's'

Exceptions:

Type Description
InfluxDeleteError

如果删除失败,则抛出此异常

Source code in omicron/dal/influx/influxclient.py
async def delete(
    self,
    measurement: str,
    stop: datetime.datetime,
    tags: Optional[Dict[str, str]] = {},
    start: datetime.datetime = None,
    precision: str = "s",
):
    """删除influxdb中指定时间段内的数据

    关于参数,请参见[Flux.delete][omicron.dal.influx.flux.Flux.delete]。

    Args:
        measurement: 指定measurement名字
        stop: 待删除记录的结束时间
        start: 待删除记录的开始时间,如果未指定,则使用EPOCH_START
        tags: 按tag进行过滤的条件
        precision: 用以格式化起始和结束时间。

    Raises:
        InfluxDeleteError: 如果删除失败,则抛出此异常
    """
    # todo: add raise error declaration
    command = Flux().delete(
        measurement, stop, tags, start=start, precision=precision
    )

    async with ClientSession() as session:
        async with session.post(
            self._delete_url, data=json.dumps(command), headers=self._delete_headers
        ) as resp:
            if resp.status != 204:
                err = await resp.json()
                logger.warning(
                    "influxdb delete error: %s when processin command %s",
                    err["message"],
                    command,
                )
                raise InfluxDeleteError(
                    f"influxdb delete failed, status code: {err['message']}"
                )

delete_bucket(self, bucket_id=None) async

删除influxdb中指定bucket

Parameters:

Name Type Description Default
bucket_id str

指定bucket的id。如果为None,则会删除本client对应的bucket。

None
Source code in omicron/dal/influx/influxclient.py
async def delete_bucket(self, bucket_id: str = None):
    """删除influxdb中指定bucket

    Args:
        bucket_id: 指定bucket的id。如果为None,则会删除本client对应的bucket。
    """
    if bucket_id is None:
        buckets = await self.list_buckets()
        for bucket in buckets:
            if bucket["type"] == "user" and bucket["name"] == self._bucket:
                bucket_id = bucket["id"]
                break
        else:
            raise BadParameterError(
                "bucket_id is None, and we can't find bucket with name: %s"
                % self._bucket
            )

    url = f"{self._url}/api/v2/buckets/{bucket_id}"
    headers = {"Authorization": f"Token {self._token}"}
    async with ClientSession() as session:
        async with session.delete(url, headers=headers) as resp:
            if resp.status != 204:
                err = await resp.json()
                logger.warning(
                    "influxdb delete bucket error: %s when processin command %s",
                    err["message"],
                    bucket_id,
                )
                raise InfluxSchemaError(
                    f"influxdb delete bucket failed, status code: {err['message']}"
                )

drop_measurement(self, measurement) async

从influxdb中删除一个measurement

调用此方法后,实际上该measurement仍然存在,只是没有数据。

Source code in omicron/dal/influx/influxclient.py
async def drop_measurement(self, measurement: str):
    """从influxdb中删除一个measurement

    调用此方法后,实际上该measurement仍然存在,只是没有数据。

    """
    # todo: add raise error declaration
    await self.delete(measurement, arrow.now().naive)

list_buckets(self) async

列出influxdb中对应token能看到的所有的bucket

Returns:

Type Description
list of buckets, each bucket is a dict with keys

``` id orgID, a 16 bytes hex string type, system or user description name retentionRules createdAt updatedAt links labels

```

Source code in omicron/dal/influx/influxclient.py
async def list_buckets(self) -> List[Dict]:
    """列出influxdb中对应token能看到的所有的bucket

    Returns:
        list of buckets, each bucket is a dict with keys:
        ```
        id
        orgID, a 16 bytes hex string
        type, system or user
        description
        name
        retentionRules
        createdAt
        updatedAt
        links
        labels
    ```
    """
    url = f"{self._url}/api/v2/buckets"
    headers = {"Authorization": f"Token {self._token}"}
    async with ClientSession() as session:
        async with session.get(url, headers=headers) as resp:
            if resp.status != 200:
                err = await resp.json()
                raise InfluxSchemaError(
                    f"influxdb list bucket failed, status code: {err['message']}"
                )
            else:
                return (await resp.json())["buckets"]

list_organizations(self, offset=0, limit=100) async

列出本客户端允许查询的所组织

Parameters:

Name Type Description Default
offset

分页起点

0
limit

每页size

100

Exceptions:

Type Description
InfluxSchemaError

influxdb返回的错误

Returns:

Type Description
list of organizations, each organization is a dict with keys
1
2
3
4
5
6
id      : the id of the org
links
name    : the name of the org
description
createdAt
updatedAt
Source code in omicron/dal/influx/influxclient.py
async def list_organizations(self, offset: int = 0, limit: int = 100) -> List[Dict]:
    """列出本客户端允许查询的所组织

    Args:
        offset : 分页起点
        limit : 每页size

    Raises:
        InfluxSchemaError: influxdb返回的错误

    Returns:
        list of organizations, each organization is a dict with keys:
        ```
        id      : the id of the org
        links
        name    : the name of the org
        description
        createdAt
        updatedAt
        ```
    """
    url = f"{self._url}/api/v2/orgs?offset={offset}&limit={limit}"
    headers = {"Authorization": f"Token {self._token}"}

    async with ClientSession() as session:
        async with session.get(url, headers=headers) as resp:
            if resp.status != 200:
                err = await resp.json()
                logger.warning("influxdb query orgs err: %s", err["message"])
                raise InfluxSchemaError(
                    f"influxdb query orgs failed, status code: {err['message']}"
                )
            else:
                return (await resp.json())["orgs"]

query(self, flux, deserializer=None) async

flux查询

flux查询结果是一个以annotated csv格式存储的数据,例如:

1
2
,result,table,_time,code,amount,close,factor,high,low,open,volume
,_result,0,2019-01-01T00:00:00Z,000001.XSHE,100000000,5.15,1.23,5.2,5,5.1,1000000

上述result中,事先通过Flux.keep()限制了返回的字段为_time,code,amount,close,factor,high,low,open,volume。influxdb查询返回结果时,总是按照字段名称升序排列。此外,总是会额外地返回_result, table两个字段。

如果传入了deserializer,则会调用deserializer将其解析成为python对象。否则,返回bytes数据。

Parameters:

Name Type Description Default
flux Union[omicron.dal.influx.flux.Flux, str]

flux查询语句

required
deserializer Callable

反序列化函数

None

Returns:

Type Description
Any

如果未提供反序列化函数,则返回结果为bytes array(如果指定了compress=True,返回结果为gzip解压缩后的bytes array),否则返回反序列化后的python对象

Source code in omicron/dal/influx/influxclient.py
async def query(self, flux: Union[Flux, str], deserializer: Callable = None) -> Any:
    """flux查询

    flux查询结果是一个以annotated csv格式存储的数据,例如:
    ```
    ,result,table,_time,code,amount,close,factor,high,low,open,volume
    ,_result,0,2019-01-01T00:00:00Z,000001.XSHE,100000000,5.15,1.23,5.2,5,5.1,1000000
    ```

    上述`result`中,事先通过Flux.keep()限制了返回的字段为_time,code,amount,close,factor,high,low,open,volume。influxdb查询返回结果时,总是按照字段名称升序排列。此外,总是会额外地返回_result, table两个字段。

    如果传入了deserializer,则会调用deserializer将其解析成为python对象。否则,返回bytes数据。

    Args:
        flux: flux查询语句
        deserializer: 反序列化函数

    Returns:
        如果未提供反序列化函数,则返回结果为bytes array(如果指定了compress=True,返回结果为gzip解压缩后的bytes array),否则返回反序列化后的python对象
    """
    if isinstance(flux, Flux):
        flux = str(flux)

    async with ClientSession() as session:
        async with session.post(
            self._query_url, data=flux, headers=self._query_headers
        ) as resp:
            if resp.status != 200:
                err = await resp.json()
                logger.warning(
                    f"influxdb query error: {err} when processing {flux[:500]}"
                )
                logger.debug("data caused error:%s", flux)
                raise InfluxDBQueryError(
                    f"influxdb query failed, status code: {err['message']}"
                )
            else:
                # auto-unzip
                body = await resp.read()
                if deserializer:
                    try:
                        return deserializer(body)
                    except Exception as e:
                        logger.exception(e)
                        logger.warning(
                            "failed to deserialize data: %s, the query is:%s",
                            body,
                            flux[:500],
                        )
                        raise
                else:
                    return body

query_org_id(self, name=None) async

通过组织名查找组织id

只能查的本客户端允许查询的组织。如果name未提供,则使用本客户端创建时传入的组织名。

Parameters:

Name Type Description Default
name str

指定组织名

None

Returns:

Type Description
str

组织id

Source code in omicron/dal/influx/influxclient.py
async def query_org_id(self, name: str = None) -> str:
    """通过组织名查找组织id

    只能查的本客户端允许查询的组织。如果name未提供,则使用本客户端创建时传入的组织名。

    Args:
        name: 指定组织名

    Returns:
        组织id
    """
    if name is None:
        name = self._org
    orgs = await self.list_organizations()
    for org in orgs:
        if org["name"] == name:
            return org["id"]

    raise BadParameterError(f"can't find org with name: {name}")

save(self, data, measurement=None, tag_keys=[], time_key=None, global_tags={}, chunk_size=None) async

save data into influxdb

if data is a pandas.DataFrame or numy structured array, it will be converted to line protocol and saved. If data is str, use write method instead.

Parameters:

Name Type Description Default
data Union[numpy.ndarray, pandas.core.frame.DataFrame]

data to be saved

required
measurement str

the name of measurement

None
tag_keys List[str]

which columns name will be used as tags

[]
chunk_size int

number of lines to be saved in one request. if it's -1, then all data will be written in one request. If it's None, then it will be set to self._chunk_size

None

Exceptions:

Type Description
InfluxDBWriteError

if write failed

Source code in omicron/dal/influx/influxclient.py
async def save(
    self,
    data: Union[np.ndarray, DataFrame],
    measurement: str = None,
    tag_keys: List[str] = [],
    time_key: str = None,
    global_tags: Dict = {},
    chunk_size: int = None,
) -> None:
    """save `data` into influxdb

    if `data` is a pandas.DataFrame or numy structured array, it will be converted to line protocol and saved. If `data` is str, use `write` method instead.

    Args:
        data: data to be saved
        measurement: the name of measurement
        tag_keys: which columns name will be used as tags
        chunk_size: number of lines to be saved in one request. if it's -1, then all data will be written in one request. If it's None, then it will be set to `self._chunk_size`

    Raises:
        InfluxDBWriteError: if write failed

    """
    # todo: add more errors raise
    if isinstance(data, DataFrame):
        assert (
            measurement is not None
        ), "measurement must be specified when data is a DataFrame"

        if tag_keys:
            assert set(tag_keys) in set(
                data.columns.tolist()
            ), "tag_keys must be in data.columns"

        serializer = DataframeSerializer(
            data,
            measurement,
            time_key,
            tag_keys,
            global_tags,
            precision=self._precision,
        )
        if chunk_size == -1:
            chunk_size = len(data)

        for lines in serializer.serialize(chunk_size or self._chunk_size):
            await self.write(lines)
    elif isinstance(data, np.ndarray):
        assert (
            measurement is not None
        ), "measurement must be specified when data is a numpy array"
        assert (
            time_key is not None
        ), "time_key must be specified when data is a numpy array"
        serializer = NumpySerializer(
            data,
            measurement,
            time_key,
            tag_keys,
            global_tags,
            time_precision=self._precision,
        )
        if chunk_size == -1:
            chunk_size = len(data)
        for lines in serializer.serialize(chunk_size or self._chunk_size):
            await self.write(lines)
    else:
        raise TypeError(
            f"data must be pandas.DataFrame, numpy array, got {type(data)}"
        )

write(self, line_protocol) async

将line-protocol数组写入influxdb

Parameters:

Name Type Description Default
line_protocol str

待写入的数据,以line-protocol数组形式存在

required
Source code in omicron/dal/influx/influxclient.py
async def write(self, line_protocol: str):
    """将line-protocol数组写入influxdb

    Args:
        line_protocol: 待写入的数据,以line-protocol数组形式存在

    """
    # todo: add raise error declaration
    if self._enable_compress:
        line_protocol_ = gzip.compress(line_protocol.encode("utf-8"))
    else:
        line_protocol_ = line_protocol

    async with ClientSession() as session:
        async with session.post(
            self._write_url, data=line_protocol_, headers=self._write_headers
        ) as resp:
            if resp.status != 204:
                err = await resp.json()
                logger.warning(
                    "influxdb write error when processing: %s, err code: %s, message: %s",
                    {line_protocol[:100]},
                    err["code"],
                    err["message"],
                )
                logger.debug("data caused error:%s", line_protocol)
                raise InfluxDBWriteError(
                    f"influxdb write failed, err: {err['message']}"
                )