influxdb连接问题
最近之前实习的嵌入式公司修改了他们产品原有的数据库结构,从mysql转移到了influxdb,在接触influxdb的过程中也遇到了一些问题,现在汇总如下:
token401问题
登录401
一开始拿到公司的token测试连接
1 2 3 4 [InfluxDB] Token = ***ServerUrl = http://localhost:8086 Bucket = ***
78c665f7d532f73e4cf89e901284eb7f
发现可以正常连接,这意味着token本身没有问题,但是想要执行查询时,会报错401
1 2 3 4 2024-09-13 18:14:47,261 - ERROR - Error querying InfluxDB: (401) Reason: Unauthorized HTTP response headers: HTTPHeaderDict({'Content-Type': 'application/json; charset=utf-8', 'X-Influxdb-Build': 'OSS', 'X-Influxdb-Version': 'v2.7.8', 'X-Platform-Error-Code': 'unauthorized', 'Date': 'Fri, 13 Sep 2024 09:44:47 GMT', 'Content-Length': '55'}) HTTP response body: b'{"code":"unauthorized","message":"unauthorized access"}'
上网搜索发现许多人遇到过这个问题,最终在一个评论区找到思路
image-20240919184721228
于是把ini文件改为
1 2 3 4 5 6 [InfluxDB] Username = ***Password = ***Org = ***ServerUrl = http://localhost:8086 Bucket = ***
发现可以正常读取了
过期401
后面在公司测试时又发现了401问题,还是同样的报错,重启程序发现又可以查询了,猜测可能是连接有时间限制
于是修改建立连接的方式,改为每次要查询前建立连接,问题得到解决
1 2 3 4 5 6 7 8 9 def get_influxdb_client (): try : client = InfluxDBClient(url=influxdb_url, username=influxdb_username, password=influxdb_password, org=influxdb_org) logger.info("Successfully connected to InfluxDB." ) return client except Exception as e: logger.error(f"Failed to connect to InfluxDB: {str (e)} " ) return None
数据读取问题
原本的数据库为mysql,查询语句如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def fetch_device_events (cursor, f_device_id, start_date, end_date ): """ 获取设备事件数据 """ query = f""" SELECT f_dtc_index, f_notify, f_create_time, f_report_time, f_channel, f_desc FROM t_mqtt_event WHERE f_device_id = '{f_device_id} ' AND f_report_time BETWEEN '{start_date} ' AND '{end_date} ' ORDER BY f_id DESC """ cursor.execute(query) return cursor.fetchall()
现在要修改为influxdb的查询语句,influxdb有自己的查询语法,这里的难点在于理解influxdb的数据是如何存取的。
我个人的理解为,这里的field是一列,每个field是一个表,这里的查询返回的是所有field的行,但是每行只有一个field以及对应的值。
这样会产生一个问题,就是我们要合并数据。原有的mysql查询语句是每行有多列的内容,这里我们要根据时间戳去合并所有field来达到同样的效果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 def fetch_device_events (influxdb_client, f_device_id, start_date, end_date ): """ 从 InfluxDB 获取设备事件数据并以元组形式输出 """ cst_tz = pytz.timezone('Asia/Shanghai' ) if isinstance (start_date, str ): start_date = datetime.fromisoformat(start_date) if isinstance (end_date, str ): end_date = datetime.fromisoformat(end_date) start_date = cst_tz.localize(start_date).astimezone(pytz.utc) end_date = cst_tz.localize(end_date).astimezone(pytz.utc) start_date_rfc3339 = start_date.strftime("%Y-%m-%dT%H:%M:%SZ" ) end_date_rfc3339 = end_date.strftime("%Y-%m-%dT%H:%M:%SZ" ) query = f''' from(bucket: "***") |> range(start: {start_date_rfc3339} , stop: {end_date_rfc3339} ) |> filter(fn: (r) => r["guid"] == "{f_device_id} ") |> filter(fn: (r) => r._measurement == "device_history") |> filter(fn: (r) => r["_field"] == "fqIndex" or r["_field"] == "code" or r["_field"] == "reportTime" or r["_field"] == "channel" or r["_field"] == "desc") |> sort(columns: ["_time"], desc: true) ''' try : result = influxdb_client.query_api().query(query, org="shike" ) events = {} for table in result: for record in table.records: timestamp = record.get_time() if timestamp not in events: events[timestamp] = { "fqIndex" : None , "code" : None , "reportTime" : None , "channel" : None , "desc" : None , "f_create_time" : timestamp } field_name = record.get_field() field_value = record.get_value() if field_name == "fqIndex" : events[timestamp]["fqIndex" ] = field_value elif field_name == "code" : events[timestamp]["code" ] = field_value elif field_name == "reportTime" : events[timestamp]["reportTime" ] = field_value elif field_name == "channel" : events[timestamp]["channel" ] = field_value elif field_name == "desc" : events[timestamp]["desc" ] = field_value event_list = [] for timestamp, event in events.items(): event_list.append(( event["fqIndex" ], event["code" ], event["f_create_time" ], datetime.fromtimestamp(event["reportTime" ] / 1000 ) if event["reportTime" ] else None , event["channel" ], event["desc" ] )) return event_list except Exception as e: logger.error(f"Error querying InfluxDB: {e} " ) return []
其他问题
上面的代码衍生出了一个问题就是influxdb的时间戳时区设置,听公司的前辈说时间戳要设为UTC,否则会出现一些很多问题,于是在查询语句前新增了时区转换。
此外influxdb不支持int索引。