GitHub - InfluxCommunity/influxdb3-python: Python module that provides a simple and convenient way to interact with InfluxDB 3.0.
Introduction
influxdb_client_3 is a Python module that provides a simple and convenient way to interact with InfluxDB 3.0. This module supports both writing data to InfluxDB and querying data using the Flight client, which allows you to execute SQL and InfluxQL queries on InfluxDB 3.0.
We offer a "Getting Started: InfluxDB 3.0 Python Client Library" video that goes over how to use the library and goes over the examples.
Dependencies
pyarrow(automatically installed)pandas(optional)
Installation
You can install 'influxdb3-python' using pip:
pip install influxdb3-python
Note: This does not include Pandas support. If you would like to use key features such as to_pandas() and write_file(), or to use PyArrow data conversion methods with nanosecond timestamp precision, you will need to install pandas separately.
Note: Please make sure you are using 3.9 or above. For the best performance use 3.11+
CLI (Agent-Friendly Query Tool)
This package includes an influx3 CLI for read/query workflows.
Run a query
influx3 query -d my_database "SELECT * FROM cpu LIMIT 5"By default, output is JSON to stdout.
Supported formats
json(default)jsonlcsvpretty
influx3 query -d my_database --format csv "SELECT * FROM cpu LIMIT 5"Config precedence
Configuration values are resolved in this order:
- CLI flags
INFLUXDB3_*environment variables- legacy
INFLUX_*environment variables - built-in defaults (host defaults to
http://127.0.0.1:8181)
Relevant environment variables:
INFLUXDB3_HOST_URL(legacy fallback:INFLUX_HOST)INFLUXDB3_DATABASE_NAME(legacy fallback:INFLUX_DATABASE)INFLUXDB3_AUTH_TOKEN(legacy fallback:INFLUX_TOKEN)
Usage
One of the easiest ways to get started is to check out the "Influxdb3 Python Basic Usage" notebook. This scenario takes you through the core write and read APIs of the client library.
Additional examples independent of jupyter are detailed in the ./examples directory.
Importing the Module
from influxdb_client_3 import InfluxDBClient3, Point
Initialization
If you are using InfluxDB Cloud, then you should note that:
- Use bucket name for
databaseorbucketin function argument.
client = InfluxDBClient3(token="your-token", host="your-host", database="your-database")
Writing Data
You can write data using the Point class, or supplying line protocol.
Using Points
point = Point("measurement").tag("location", "london").field("temperature", 42) client.write(point)
Control tag order for first-write column order (InfluxDB 3 Enterprise)
from influxdb_client_3 import InfluxDBClient3, Point, WriteOptions, WriteType, write_client_options point = Point("cpu") \ .tag("host", "server-a") \ .tag("region", "us-east") \ .tag("rack", "r1") \ .field("usage", 0.42) write_options = WriteOptions( write_type=WriteType.synchronous, tag_order=["region", "host"], ) client = InfluxDBClient3( token="your-token", host="your-host", database="your-database", write_client_options=write_client_options(write_options=write_options), ) client.write(point)
Using Line Protocol
point = "measurement fieldname=0" client.write(point)
Write from file
Users can import data from CSV, JSON, Feather, ORC, Parquet
import influxdb_client_3 as InfluxDBClient3 import pandas as pd import numpy as np from influxdb_client_3 import write_client_options, WritePrecision, WriteOptions, InfluxDBError class BatchingCallback(object): def __init__(self): self.write_count = 0 def success(self, conf, data: str): self.write_count += 1 print(f"Written batch: {conf}, data: {data}") def error(self, conf, data: str, exception: InfluxDBError): print(f"Cannot write batch: {conf}, data: {data} due: {exception}") def retry(self, conf, data: str, exception: InfluxDBError): print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}") callback = BatchingCallback() write_options = WriteOptions(batch_size=100, flush_interval=10_000, jitter_interval=2_000, retry_interval=5_000, max_retries=5, max_retry_delay=30_000, exponential_base=2) wco = write_client_options(success_callback=callback.success, error_callback=callback.error, retry_callback=callback.retry, write_options=write_options ) with InfluxDBClient3.InfluxDBClient3( token="INSERT_TOKEN", host="eu-central-1-1.aws.cloud2.influxdata.com", database="python", write_client_options=wco) as client: client.write_file( file='./out.csv', timestamp_column='time', tag_columns=["provider", "machineID"]) print(f'DONE writing from csv in {callback.write_count} batch(es)')
Pandas DataFrame
import pandas as pd # Create a DataFrame with a timestamp column df = pd.DataFrame({ 'time': pd.to_datetime(['2024-01-01', '2024-01-02', '2024-01-03']), 'trainer': ['Ash', 'Misty', 'Brock'], 'pokemon_id': [25, 120, 74], 'pokemon_name': ['Pikachu', 'Staryu', 'Geodude'] }) # Write the DataFrame - timestamp_column is required for consistency client.write_dataframe( df, measurement='caught', timestamp_column='time', tags=['trainer', 'pokemon_id'] )
Polars DataFrame
import polars as pl # Create a DataFrame with a timestamp column df = pl.DataFrame({ 'time': ['2024-01-01T00:00:00Z', '2024-01-02T00:00:00Z'], 'trainer': ['Ash', 'Misty'], 'pokemon_id': [25, 120], 'pokemon_name': ['Pikachu', 'Staryu'] }) # Write the DataFrame - same API works for both pandas and polars client.write_dataframe( df, measurement='caught', timestamp_column='time', tags=['trainer', 'pokemon_id'] )
Accept partial writes and inspect failed lines
accept_partial defaults to True and allows partial success when writing through the V3 API endpoint (use_v2_api=False) and a batch contains invalid lines.
On partial failure, the client raises InfluxDBPartialWriteError with structured line_errors.
from influxdb_client_3 import InfluxDBClient3 from influxdb_client_3.exceptions import InfluxDBPartialWriteError client = InfluxDBClient3( host="http://localhost:8181", token="token", database="db", write_use_v2_api=False, ) lp = "home,room=Sunroom temp=96 1735545600\nhome,room=Sunroom temp=\"hi\" 1735549200" try: client.write(lp) # accept_partial=True by default on V3 API endpoint except InfluxDBPartialWriteError as e: for line_err in e.line_errors: print(f"line {line_err.line_number} failed: {line_err.error_message} ({line_err.original_line})")
Disable partial writes:
client = InfluxDBClient3( host="http://localhost:8181", token="token", database="db", write_use_v2_api=False, write_accept_partial=False, )
Compatibility with InfluxDB Clustered and InfluxDB Cloud Dedicated/Serverless
Writes use the V2 API endpoint by default, so no additional configuration is required for these products.
use_v2_api can be configured by:
WriteOptions(use_v2_api=False)(for V3 API endpoint features)- constructor kwarg:
write_use_v2_api=False - env var:
INFLUX_WRITE_USE_V2_API=false
When use_v2_api=True:
accept_partialis not usedno_sync=Trueis invalid and rejected before dispatch with:invalid write options: no_sync cannot be used with use_v2_api
To use no_sync or accept_partial controls, set use_v2_api=False
(for example with InfluxDB 3 Core/Enterprise).
Querying
Querying with SQL
query = "select * from measurement" reader = client.query(query=query, language="sql") table = reader.read_all() print(table.to_pandas().to_markdown())
Querying to DataFrame
# Query directly to a pandas DataFrame (default) df = client.query_dataframe("SELECT * FROM caught WHERE trainer = 'Ash'") # Query to a polars DataFrame df = client.query_dataframe("SELECT * FROM caught", frame_type="polars")
Querying with influxql
query = "select * from measurement" reader = client.query(query=query, language="influxql") table = reader.read_all() print(table.to_pandas().to_markdown())
gRPC compression
Request compression
Request compression is not supported by InfluxDB 3 — the client sends uncompressed requests.
Response compression
Response compression is enabled by default. The client sends the grpc-accept-encoding: identity, deflate, gzip
header, and the server returns gzip-compressed responses (if supported). The client automatically
decompresses them — no configuration required.
To disable response compression:
# Via constructor parameter client = InfluxDBClient3( host="your-host", token="your-token", database="your-database", disable_grpc_compression=True ) # Or via environment variable # INFLUX_DISABLE_GRPC_COMPRESSION=true client = InfluxDBClient3.from_env()
Windows Users
Currently, Windows users require an extra installation when querying via Flight natively. This is due to the fact gRPC cannot locate Windows root certificates. To work around this please follow these steps:
Install certifi
Next include certifi within the flight client options:
import certifi import influxdb_client_3 as InfluxDBClient3 from influxdb_client_3 import flight_client_options fh = open(certifi.where(), "r") cert = fh.read() fh.close() client = InfluxDBClient3.InfluxDBClient3( token="", host="b0c7cce5-8dbc-428e-98c6-7f996fb96467.a.influxdb.io", database="flightdemo", flight_client_options=flight_client_options( tls_root_certs=cert)) table = client.query( query="SELECT * FROM flight WHERE time > now() - 4h", language="influxql") print(table.to_pandas())
You may include your own root certificate in this manner as well.
If connecting to InfluxDB fails with error DNS resolution failed when using domain name, example www.mydomain.com, then try to set environment variable GRPC_DNS_RESOLVER=native to see if it works.
Contributing
Tests are run using pytest.
# Clone the repository git clone https://github.com/InfluxCommunity/influxdb3-python cd influxdb3-python # Create a virtual environment and activate it python3 -m venv .venv source .venv/bin/activate # Install the package and its dependencies pip install -e .[pandas,polars,dataframe,test] # Run the tests python -m pytest .
