#!/usr/bin/env python3
# getChargeryData.py
# Description: open RS232 serial port and read Chargery BMS data.
# Output data in a format to be consumed by nodeexporter/prometheus/grafana
# in folder /ramdisk
# v0.4b 5-16-20 Joe Elliott joe@inetd.com
# modified 2-July-20 cass3825 for new protocol compatibility
# Protocol at http://chargery.com/uploadFiles/bms24_additional_protocol%20V1.25.pdf
import serial
import sys, os, io
import time
import binascii
devName='/dev/ttyUSB0'
modeList= ["Discharge", "Charge", "Storage"]
gotCellData = False;
gotSysData = False;
gotIRData = False;
debug=False
test=False
if (len(sys.argv) > 1):
if (sys.argv[1] == "-d"):
debug=True
print("sys.argv[1]: Debug: enabled")
if (sys.argv[1] == "-t"):
debug=True
print("sys.argv[1]: Test Data: enabled")
if (len(sys.argv) > 2):
if (sys.argv[2] == "-d"):
debug=True
print("sys.argv[2]: Debug: enabled")
if (sys.argv[2] == "-t"):
test=True
print("sys.argv[2]: Test Data: enabled")
def bin2hex(str1):
bytes_str = bytes(str1)
return binascii.hexlify(bytes_str)
def get_voltage_value(byte1, byte2):
return float((float(byte1 * 256) + float(byte2)) / 1000)
def get_current_value(byte1, byte2):
return float((float(byte1 * 256) + float(byte2)) / 10)
def get_temp_value(byte1, byte2):
return float((float(byte1 * 256) + float(byte2)) / 10)
def get_xh_value(byte1, byte2, byte3, byte4):
return float((float(byte1) + (float(byte2) * 256) + (float(byte3) * 256 * 256) + (float(byte4) * 256 * 256 * 256)) / 1000)
def get_imped_value(byte1, byte2):
return float((float(byte2 * 256) + float(byte1)) / 10) # note byte swap order
def getCellData(fileObj, hexLine, strLen):
decStrLen = len(hexLine)
minLen = 44 # minimal bytes for the 8s inc header (each byte is 2 chars)
dataStart = 0 # cell voltage data starts at byte 9 in 2 byte chunks (hi-lo) was 8
cellNum = 1
aggVolts = 0 # total voltage of the battery
global gotCellData
if (debug): print("getCellData: called - ", hexLine)
# if (debug):
# header = hexLine[0:4] # header
# command = hexLine[0:2] # command
# dataLen = hexLine[2:4] # data length
# print("header:", header)
# print("command:", command)
# print("dataLen:", int(dataLen, 16), "bytes")
if (decStrLen < strLen) or (decStrLen < minLen):
if (debug): print("Truncated cell block - len:", len(hexLine), "Expected:", strLen)
return(True)
else:
if (debug): print("hexLine len", len(hexLine))
for cell in range(dataStart, decStrLen-18, 4): # incorporate datalength for variable cell count
cellVolts = get_voltage_value(int(hexLine[cell:cell+2], 16), int(hexLine[cell+2:cell+4], 16))
if (debug): print("Cell ", cellNum, ":", cellVolts, "v")
# format the data for node_exporter to read into prometheus
#valName = "mode={}{}".format("CellNum", cellNum)
valName = "mode=\"CellNum" + str(cellNum) + "\""
valName = "{" + valName + "}"
dataStr = f"BMS_A{valName} {cellVolts}"
print(dataStr, file=fileObj)
aggVolts += cellVolts
cellNum += 1
aggVolts = "{:4.2f}".format(aggVolts)
valName = "mode=\"aggVolts\""
valName = "{" + valName + "}"
dataStr = f"BMS_A{valName} {aggVolts}"
print(dataStr, file=fileObj)
# soc = int(hexLine[cell], 16)
Wh = get_xh_value(int(hexLine[cell+4:cell+6], 16), int(hexLine[cell+6:cell+8], 16), int(hexLine[cell+8:cell+10], 16), int(hexLine[cell+10:cell+12], 16))
valName = "mode=\"Wh\""
valName = "{" + valName + "}"
dataStr = f"BMS_A{valName} {Wh}"
print(dataStr, file=fileObj)
Ah = get_xh_value(int(hexLine[cell+12:cell+14], 16), int(hexLine[cell+14:cell+16], 16), int(hexLine[cell+16:cell+18], 16), int(hexLine[cell+18:cell+20], 16))
valName = "mode=\"Ah\""
valName = "{" + valName + "}"
dataStr = f"BMS_A{valName} {Ah}"
print(dataStr, file=fileObj)
if (debug):
print("Battery Wh:", Wh, "Wh")
print("Battery Ah:", Ah, "Ah")
# print("Checksum:", int(hexLine[cell+16:cell+18], 16))
print("Battery voltage:", aggVolts, "v")
gotCellData = True;
return(False)
def getSysData(fileObj, hexLine, strLen):
decStrLen = len(hexLine)
dataStart = 4 # data starts at byte 8 in 2 byte chunks (hi-lo)
minLen = 20 # minimal bytes inc header (each byte is 2 chars)
global gotSysData
if (debug): print("getSysData: called - ", hexLine)
if (decStrLen < strLen) or (decStrLen < minLen):
if (debug): print("Truncated system block - len:", len(hexLine), "Expected:", strLen)
return(True)
else:
if (debug): print("hexLine len", len(hexLine))
# first 3 fields for debug only)
# header = hexLine[0:4] # header
# command = hexLine[0:2] # command
# dataLen = hexLine[2:4] # data length
endVolt_hi = hexLine[0:2] # End voltage of cell
endVolt_lo = hexLine[2:4] # End voltage of cell
mode = hexLine[4:6] # Current mode
amps_hi = hexLine[6:8] # Current amps
amps_lo = hexLine[8:10] # Current amps
temp1_hi = hexLine[10:12] # Temp 1
temp1_lo = hexLine[12:14] # Temp 1
temp2_hi = hexLine[14:16] # Temp 2
temp2_lo = hexLine[16:18] # Temp 2
soc = hexLine[18:20] # SOC
chksum = hexLine[20:22] # Checksum
maxVolts = get_voltage_value(int(endVolt_hi, 16), int(endVolt_lo, 16))
currentFlow = get_current_value(int(amps_hi, 16), int(amps_lo, 16))
modeName = modeList[int(mode, 16)]
modeInt = int(mode, 16)
temp1 = get_temp_value(int(temp1_hi, 16), int(temp1_lo, 16))
temp2 = get_temp_value(int(temp2_hi, 16), int(temp2_lo, 16))
socInt = int(soc, 16)
if (debug):
# print("dataLen:", int(dataLen, 16), "bytes")
print("End voltage of cell:", maxVolts, "v")
print("mode:", modeInt, modeName)
print("Current:", currentFlow, "A")
print("Temp 1:", temp1, "c, ", temp1 * 9 / 5 + 32, "f")
print("Temp 2:", temp2, "c, ", temp2 * 9 / 5 + 32, "f")
print("SOC:", socInt, "%")
print("Checksum:", int(chksum, 16))
if (int(mode) == 0):
currentFlow = currentFlow * -1 # flow is in or out of the battery?
#print("currentFlow:", currentFlow)
valName = "mode=\"current\""
valName = "{" + valName + "}"
dataStr = f"BMS_A{valName} {currentFlow}"
print(dataStr, file=fileObj)
valName = "mode=\"maxVolts\""
valName = "{" + valName + "}"
dataStr = f"BMS_A{valName} {maxVolts}"
print(dataStr, file=fileObj)
valName = "mode=\"modeInt\", myStr=\""
valName = valName + modeName + "\""
valName = "{" + valName + "}"
dataStr = f"BMS_A{valName} {modeInt}"
print(dataStr, file=fileObj)
valName = "mode=\"temp1\""
valName = "{" + valName + "}"
dataStr = f"BMS_A{valName} {temp1}"
print(dataStr, file=fileObj)
valName = "mode=\"temp2\""
valName = "{" + valName + "}"
dataStr = f"BMS_A{valName} {temp2}"
print(dataStr, file=fileObj)
valName = "mode=\"SOC\""
valName = "{" + valName + "}"
dataStr = f"BMS_A{valName} {socInt}"
print(dataStr, file=fileObj)
if (debug):
# print("header:", header)
# print("command:", command)
# print("dataLen:", int(dataLen, 16), "bytes")
print("End voltage of cell_hi:", int(endVolt_hi, 16), "v")
print("End voltage of cell_lo:", int(endVolt_lo, 16), "v")
print("mode:", int(mode, 16))
print("amps_hi:", int(amps_hi, 16), "a")
print("amps_lo:", int(amps_lo, 16), "a")
print("Temp 1_hi:", int(temp1_hi, 16), "c")
print("Temp 1_lo:", int(temp1_lo, 16), "c")
print("Temp 2_hi:", int(temp2_hi, 16), "c")
print("Temp 2_lo:", int(temp2_lo, 16), "c")
print("SOC:", int(soc, 16), "%")
print("Checksum:", int(chksum, 16))
gotSysData = True;
return(False)