133 lines
4.6 KiB
Python
133 lines
4.6 KiB
Python
|
#!/usr/bin/env python3
|
||
|
import logging
|
||
|
import os
|
||
|
import tempfile
|
||
|
|
||
|
from config_handler import ConfigHandler
|
||
|
|
||
|
class Fetcher:
|
||
|
def __init__(self, args):
|
||
|
self.config = ConfigHandler(args)
|
||
|
self.huamidevice = self.config.configureHuamiDevice()
|
||
|
self.log = logging.getLogger(__name__)
|
||
|
|
||
|
def exit_with_error(e):
|
||
|
self.log.error(f"{e}")
|
||
|
exit(1)
|
||
|
|
||
|
|
||
|
def get_token(self):
|
||
|
try:
|
||
|
res = self.huamidevice.get_access_token()
|
||
|
if not res:
|
||
|
exit_with_error("Couldn't acquire Amazfit access token")
|
||
|
self.log.debug("Amazfit access token acquired")
|
||
|
except Exception as e:
|
||
|
exit_with_error(e)
|
||
|
|
||
|
def set_token(self, text):
|
||
|
self.huamidevice.parse_token(text)
|
||
|
if self.huamidevice.access_token is None:
|
||
|
exit_with_error('token not found in the url, repeat sign in a copy/paste url')
|
||
|
|
||
|
def fetch_key(self):
|
||
|
res = False
|
||
|
try:
|
||
|
res = self.huamidevice.get_access_token()
|
||
|
if not res:
|
||
|
exit_with_error("amazfit login failed")
|
||
|
except Exception as e:
|
||
|
exit_with_error(e)
|
||
|
fetch_keys()
|
||
|
|
||
|
def fetch_keys(self):
|
||
|
try:
|
||
|
res = self.huamidevice.login()
|
||
|
if res:
|
||
|
self.log.debug(f"Signed in as: {self.huamidevice.user_id}, getting data")
|
||
|
except Exception as e:
|
||
|
self.log.error(f"{e}")
|
||
|
return
|
||
|
|
||
|
device_keys = self.huamidevice.get_wearable_auth_keys()
|
||
|
if device_keys:
|
||
|
for device_key in device_keys:
|
||
|
print(f"{device_key} {device_keys[device_key]}")
|
||
|
else:
|
||
|
print("No keys on the server")
|
||
|
|
||
|
def get_agps_files(self):
|
||
|
import zipfile
|
||
|
import shutil
|
||
|
try:
|
||
|
res = self.huamidevice.login()
|
||
|
if res:
|
||
|
self.log.debug(f"Signed in as: {self.huamidevice.user_id}, getting data")
|
||
|
except Exception as e:
|
||
|
exit_with_error(e)
|
||
|
|
||
|
data_dir = os.path.abspath(self.config.target_dir or "./tmp")
|
||
|
if not os.path.exists(data_dir):
|
||
|
os.mkdir(data_dir)
|
||
|
with tempfile.TemporaryDirectory() as tmpdir:
|
||
|
popd = os.getcwd()
|
||
|
os.chdir(tmpdir)
|
||
|
|
||
|
self.huamidevice.get_gps_data()
|
||
|
for filename in self.huamidevice.agps_packs.values():
|
||
|
sdpathfile = os.path.join(data_dir, filename)
|
||
|
shutil.copyfile(filename, sdpathfile)
|
||
|
self.log.info(f"Processing {filename}")
|
||
|
if "zip" not in filename:
|
||
|
continue
|
||
|
if filename == "epo.zip":
|
||
|
# epo zip files should not be extracted
|
||
|
continue
|
||
|
with zipfile.ZipFile(filename, "r") as zip_f:
|
||
|
zip_f.extractall(data_dir)
|
||
|
os.chdir(popd)
|
||
|
|
||
|
self.log.debug(f'File(s) downloaded and extracted to {data_dir}')
|
||
|
|
||
|
def create_uihh_agps_file(self):
|
||
|
import typemap as tm
|
||
|
import pathlib
|
||
|
from binascii import crc32
|
||
|
data_dir = os.path.abspath(config.target_dir or "./tmp")
|
||
|
if not os.path.exists(data_dir):
|
||
|
self.log.info(f"Data dir {data_dir} doesn't exist")
|
||
|
return
|
||
|
content = b""
|
||
|
|
||
|
for typeID, inputfilename in tm.typemap.items():
|
||
|
fullPathName = pathlib.Path(data_dir).joinpath(inputfilename)
|
||
|
if not fullPathName.is_file():
|
||
|
self.log.warn(f"File not found: {fullPathName}. Skipping")
|
||
|
return
|
||
|
|
||
|
with open(fullPathName, "rb") as f:
|
||
|
filecontent = f.read()
|
||
|
|
||
|
self.log.info(f"Packing {inputfilename}")
|
||
|
fileheader = chr(1).encode() + typeID.to_bytes(1,"big") + len(filecontent).to_bytes(4,"little") + crc32(filecontent).to_bytes(4,"little")
|
||
|
content += fileheader + filecontent
|
||
|
|
||
|
self.log.info("Adding header")
|
||
|
header = ["UIHH" , chr(0x04) , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x01) , crc32(content).to_bytes(4,"little") , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00) , len(content).to_bytes(4,"little") , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00)]
|
||
|
|
||
|
merged_header = b""
|
||
|
for i in header:
|
||
|
if isinstance(i, str):
|
||
|
i = i.encode()
|
||
|
merged_header += i
|
||
|
|
||
|
content = merged_header+content
|
||
|
|
||
|
outputfile = pathlib.Path(data_dir).joinpath("aGPS_UIHH.bin")
|
||
|
self.log.info(f"Writing {outputfile}")
|
||
|
with open(outputfile, "wb") as f:
|
||
|
f.write(content)
|
||
|
|
||
|
self.log.info("aGPS UIHH created")
|
||
|
|