import socket
import bluetooth
from binascii import unhexlify
import time
import codecs
import logging
import struct
from paho.mqtt import client as mqtt_client
import random
import json
_LOGGER = logging.getLogger(__name__)
_LOGGER.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
# create formatter
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# add formatter to ch
ch.setFormatter(formatter)
_LOGGER.addHandler(ch)
battery = {}
mqtt_host = '122.122.122.122'
mqtt_port = 1883
mqtt_user = "admin"
mqtt_password = "password"
mqtt_topic = "batt"
mqtt_telegraf_name = "batt1" # table name for telegraf -> influxdb logging
battery_addr = "AA:BB:CC:00:00:00"
battery_port = 1
cell_count = 8
connect_retry_count = 15
connect_retry_delay = 1
discover = True
def bluetooth_discover():
_LOGGER.info("Discovering bluetooth devices")
nearby_devices = bluetooth.discover_devices(lookup_names = True, flush_cache = True, duration = 20)
_LOGGER.info("found %d devices" % len(nearby_devices))
for addr, name in nearby_devices:
_LOGGER.info("-------------------------- {} - {} --------------------------".format(addr, name))
if name[0:7] == "BMS-ANT":
_LOGGER.info("============================== Found BMS-ANT device! Name: {} Addres: {} ==============================".format(name,addr))
s = bluetooth.find_service(address = addr)
for services in s:
_LOGGER.info(" Protocol: {}, Port: {}, host: {}".format(services["protocol"],services["port"],services["host"]))
def ant_connect_socket(serverMACAddress, port, retry_count):
_LOGGER.info("Trying to connect addres: {} port: {} retry count: {}".format(serverMACAddress,port,retry_count))
try:
s = socket.socket(socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM)
except Exception as ex:
_LOGGER.error("Socket error: " + ex)
return None
try:
s.connect((serverMACAddress,port))
_LOGGER.info("Connected!")
connected = True
except Exception as ex:
connected = False
s.close()
_LOGGER.error("Connect error:" + str(ex))
if not connected:
retry_count += 1
_LOGGER.error("Connect error, retrying: " + str(retry_count))
if retry_count <= connect_retry_count:
time.sleep(connect_retry_delay)
_LOGGER.debug("Waiting {} sec: ".format(connect_retry_delay))
ant_connect_socket(serverMACAddress, port, retry_count)
else:
_LOGGER.error("Connect retry {} > {} exiting".format(retry_count,connect_retry_count))
return None
return s
def read_and_decode_ant_answer(s):
try:
test_word = 'DBDB00000000'
_LOGGER.debug("Sending test word: " + str(test_word))
s.send(codecs.decode(test_word, 'hex'))
time.sleep(3)
data = s.recv(140)
_LOGGER.debug("Got result: " + str(data))
except Exception as ex:
_LOGGER.error("Data send/receive error: " + str(ex))
return None
s.close()
if len(data) > 0:
return codecs.encode(data, 'hex')
else:
return None
def decode_data(response_data,cell_count):
try:
# SoC (1)
battery['soc'] = int(response_data[(74 * 2):(75 * 2)],16)
# Power (2)
data = (response_data[(111 * 2):(114 * 2 + 2)])
if int(data, 16) > 2147483648:
battery['power'] = int(-(2 * 2147483648) + int(data, 16))
else:
battery['power'] = int(data, 16)
# BMS current (3)
data = (response_data[(70 * 2):(73 * 2 + 2)])
if int(data, 16) > 2147483648:
battery['bms_current'] = float((-(2 * 2147483648) + int(data, 16)) / 10 )
else:
battery['bms_current'] = float(int(data, 16) / 10)
# BMS V (4)
data = response_data[8:12]
data = struct.unpack('>H', unhexlify(data))[0] * 0.1
battery['bms_v'] = int(data + 0.7)
# 0.7 was added as BMS low.
# Cell_avg (5)
data = (response_data[(121 * 2):(122 * 2 + 2)])
battery['cell_avg'] = float(struct.unpack('>H', unhexlify(data))[0] / 1000)
# Cell_min (6)
data = (response_data[(119 * 2):(120 * 2 + 2)])
battery['cell_min'] = float(struct.unpack('>H', unhexlify(data))[0] / 1000)
# Cell_max (7)
data = (response_data[(116 * 2):(117 * 2 + 2)])
battery['cell_max'] = float(struct.unpack('>H', unhexlify(data))[0] / 1000)
for i in range(cell_count):
data = response_data[((6+i*2) * 2):((7+i*2) * 2 + 2)]
battery['cell_amps' + str(i+1)] = float(struct.unpack('>H', unhexlify(data))[0] / 1000)
return battery
except Exception as ex:
_LOGGER.error("Decode_data error: " + str(ex))
return None
def connect_mqtt(host,port,username,password) -> mqtt_client:
def on_connect(client, userdata, flags, rc):
if rc == 0:
_LOGGER.info("Connected to MQTT Broker!")
else:
_LOGGER.error("Failed to connect, return code %d\n", rc)
return None
try:
client_id = f'python-mqtt-{random.randint(0, 100)}'
_LOGGER.info("Trying mqtt connect host: {} port: {} username: {} client_id: {}".format(host,port,username,client_id))
client = mqtt_client.Client(client_id)
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(host, port)
except Exception as ex:
_LOGGER.error("connect mqtt error: " + ex)
return None
return client
def mqtt_send_battery_data(client, topic, battery_data):
_LOGGER.info("Trying mqtt publics topic: {} ".format(topic))
json_battery_data = json.dumps(battery_data, indent=4)
_LOGGER.debug(json_battery_data)
client.publish(topic + "/" + "battery_data", json_battery_data)
battery_data['name'] = mqtt_telegraf_name
for key in battery_data:
#print(key + "/" + key str(battery_data[key]))
client.publish(topic + "/" + key, str(battery_data[key]))
if discover:
bluetooth_discover()
ant_s = ant_connect_socket(battery_addr, battery_port, 0)
while True:
ant_s = ant_connect_socket(battery_addr, battery_port, 0)
if ant_s is not None:
response_data = read_and_decode_ant_answer(ant_s)
if response_data is not None:
battery_data = decode_data(response_data,cell_count)
if battery_data is not None:
client = connect_mqtt(mqtt_host, mqtt_port, mqtt_user, mqtt_password)
if client is not None:
mqtt_send_battery_data(client, mqtt_topic, battery_data)
time.sleep(60) #mqtt send repeat timeout