#!/bin/env python3
import binascii
from collections import namedtuple
from datetime import datetime, timezone
import logging
import os
import struct
import sys
import time
import traceback
from bluepy.btle import Peripheral, DefaultDelegate, BTLEException
from influxdb import InfluxDBClient
CellVoltages = namedtuple('CellVoltages', ['dt', 'cell1', 'cell2', 'cell3', 'cell4', 'cell5', 'cell6', 'cell7', 'cell8', 'cellmin', 'cellmax', 'delta'])
SummaryStats = namedtuple('SummaryStats', ['dt', 'volts', 'amps', 'watts', 'remain', 'capacity', 'cycles', 'c01', 'c02', 'c03', 'c04', 'c05', 'c06', 'c07', 'c08'])
DetailInfo = namedtuple('DetailInfo', ['dt', 'ovp', 'uvp', 'bov', 'buv', 'cot', 'cut', 'dot', 'dut', 'coc', 'duc', 'sc', 'ic', 'cnf', 'protect', 'percent', 'fet', 'cells', 'temp1', 'temp2'])
FORMAT = ('%(asctime)-15s %(threadName)-15s'
' %(levelname)-8s %(module)-15s:%(lineno)-8s %(message)s')
logging.basicConfig(format=FORMAT)
log = logging.getLogger()
log.setLevel(logging.WARNING)
_logger = logging.getLogger(__name__)
DEVICE_ADDRESS = os.environ.get('DEVICE_ADDRESS') # "A4:C1:38:DC:71:F8"
DEVICE_NAME = os.environ.get('DEVICE_NAME') # "lfp0"
def cellinfo1(data): # process pack info
infodata = data
i = 4 # Unpack into variables, skipping header bytes 0-3
volts, amps, remain, capacity, cycles, mdate, balance1, balance2 = struct.unpack_from('>HhHHHHHH', infodata, i)
volts=volts/100
amps = amps/100
capacity = capacity/100
remain = remain/100
watts = volts*amps # adding watts field for dbase
bal1 = (format(balance1, "b").zfill(16))
c16 = int(bal1[0:1])
c15 = int(bal1[1:2]) # using balance1 bits for 16 cells
c14 = int(bal1[2:3]) # balance2 is for next 17-32 cells - not using
c13 = int(bal1[3:4])
c12 = int(bal1[4:5]) # bit shows (0,1) charging on-off
c11 = int(bal1[5:6])
c10 = int(bal1[6:7])
c09 = int(bal1[7:8])
c08 = int(bal1[8:9])
c07 = int(bal1[9:10])
c06 = int(bal1[10:11])
c05 = int(bal1[11:12])
c04 = int(bal1[12:13])
c03 = int(bal1[13:14])
c02 = int(bal1[14:15])
c01 = int(bal1[15:16])
return SummaryStats(datetime.now(), float(volts), float(amps), float(watts), float(remain), float(capacity), int(cycles), int(c01), int(c02), int(c03), int(c04), int(c05), int(c06), int(c07), int(c08))
def cellinfo2(data):
infodata = data
i = 0 # unpack into variables, ignore end of message byte '77'
protect,vers,percent,fet,cells,sensors,temp1,temp2,b77 = struct.unpack_from('>HBBBBBHHB', infodata, i)
temp1 = (temp1-2731)/10
temp2 = (temp2-2731)/10 # fet 0011 = 3 both on ; 0010 = 2 disch on ; 0001 = 1 chrg on ; 0000 = 0 both off
prt = (format(protect, "b").zfill(16)) # protect trigger (0,1)(off,on)
ovp = int(prt[0:1]) # overvoltage
uvp = int(prt[1:2]) # undervoltage
bov = int(prt[2:3]) # pack overvoltage
buv = int(prt[3:4]) # pack undervoltage
cot = int(prt[4:5]) # current over temp
cut = int(prt[5:6]) # current under temp
dot = int(prt[6:7]) # discharge over temp
dut = int(prt[7:8]) # discharge under temp
coc = int(prt[8:9]) # charge over current
duc = int(prt[9:10]) # discharge under current
sc = int(prt[10:11]) # short circuit
ic = int(prt[11:12]) # ic failure
cnf = int(prt[12:13]) # fet config problem
return DetailInfo(datetime.now(), int(ovp), int(uvp), int(bov), int(buv), int(cot), int(cut), int(dot), int(dut), int(coc), int(duc), int(sc), int(ic), int(cnf), int(protect), int(percent), int(fet), int(cells), float(temp1), float(temp2))
def cellvolts1(data): # process cell voltages
celldata = data
i = 4 # Unpack into variables, skipping header bytes 0-3
cell1, cell2, cell3, cell4, cell5, cell6, cell7, cell8 = struct.unpack_from('>HHHHHHHH', celldata, i)
cells1 = [cell1, cell2, cell3, cell4, cell5, cell6, cell7, cell8] # needed for max, min, delta calculations
cellmin = min(cells1)
cellmax = max(cells1)
delta = cellmax-cellmin
return CellVoltages(datetime.now(), int(cell1) / 1000.0, int(cell2) / 1000.0, int(cell3) / 1000.0, int(cell4) / 1000.0, int(cell5) / 1000.0, int(cell6) / 1000.0, int(cell7) / 1000.0, int(cell8) / 1000.0, int(cellmin) / 1000.0, int(cellmax) / 1000.0, int(delta) / 1000.0)
def dict_less_dt(d):
del d['dt']
return d
class MyDelegate(DefaultDelegate):
def __init__(self, influx_client):
DefaultDelegate.__init__(self)
self.influx_client = influx_client
def handleNotification(self, cHandle, data):
hex_data = binascii.hexlify(data) # Given raw bytes, get an ASCII string representing the hex values
text_string = hex_data.decode('utf-8') # check incoming data for routing to decoding routines
data_line = None
if text_string.find('dd04') != -1: # x04 (1-8 cells)
data_line = cellvolts1(data)
elif text_string.find('dd03') != -1: # x03
data_line = cellinfo1(data)
elif text_string.find('77') != -1 and len(text_string) == 28 or len(text_string) == 36: # x03
data_line = cellinfo2(data)
if data_line:
self.publish(data_line)
def publish(self, data_line):
influx_data = [
{
"measurement": "battery_data0",
"tags": {
"battery": DEVICE_NAME,
},
"time": data_line.dt.astimezone(timezone.utc).isoformat(),
"fields": dict_less_dt(data_line._asdict())
}
]
try:
self.influx_client.write_points(influx_data)
except:
print("Failed to write data to influxdb:", influx_data)
traceback.print_exc()
def main():
influx_client = InfluxDBClient('myinflux.example', 443, 'usernameZZZ', 'passwordZZZ', 'my_db_name', ssl=True, verify_ssl=True)
try:
print('attempting to connect')
bms = Peripheral(DEVICE_ADDRESS, addrType="public")
except BTLEException as ex:
time.sleep(10)
print('2nd try connect')
bms = Peripheral(DEVICE_ADDRESS, addrType="public")
except BTLEException as ex:
print('cannot connect')
exit()
else:
print('connected ', DEVICE_ADDRESS)
bms.setDelegate(MyDelegate(influx_client))
# write empty data to 0x15 for notification request -- address x03 handle for info & x04 handle for cell voltage
# using waitForNotifications(5) as less than 5 seconds has caused some missed notifications
# TODO: See https://gitlab.com/bms-tools/bms-tools/-/blob/master/JBD_REGISTER_MAP.md
while True:
result = bms.writeCharacteristic(0x15, b'\xdd\xa5\x03\x00\xff\xfd\x77', False) # write x03 w/o response cell info
bms.waitForNotifications(5)
result = bms.writeCharacteristic(0x15, b'\xdd\xa5\x04\x00\xff\xfc\x77', False) # write x04 w/o response cell voltages
bms.waitForNotifications(5)
time.sleep(5)
if __name__ == "__main__":
main()