#!/usr/bin/env python3 import logging import os import tempfile from config_handler import ConfigHandler class Fetcher: def __init__(self, args): self.config = ConfigHandler(args) self.huamidevice = self.config.configureHuamiDevice() self.log = logging.getLogger(__name__) def exit_with_error(e): self.log.error(f"{e}") exit(1) def get_token(self): try: res = self.huamidevice.get_access_token() if not res: exit_with_error("Couldn't acquire Amazfit access token") self.log.debug("Amazfit access token acquired") except Exception as e: exit_with_error(e) def set_token(self, text): self.huamidevice.parse_token(text) if self.huamidevice.access_token is None: exit_with_error('token not found in the url, repeat sign in a copy/paste url') def fetch_key(self): res = False try: res = self.huamidevice.get_access_token() if not res: exit_with_error("amazfit login failed") except Exception as e: exit_with_error(e) fetch_keys() def fetch_keys(self): try: res = self.huamidevice.login() if res: self.log.debug(f"Signed in as: {self.huamidevice.user_id}, getting data") except Exception as e: self.log.error(f"{e}") return device_keys = self.huamidevice.get_wearable_auth_keys() if device_keys: for device_key in device_keys: print(f"{device_key} {device_keys[device_key]}") else: print("No keys on the server") def get_agps_files(self): import zipfile import shutil try: res = self.huamidevice.login() if res: self.log.debug(f"Signed in as: {self.huamidevice.user_id}, getting data") except Exception as e: exit_with_error(e) data_dir = os.path.abspath(self.config.target_dir or "./tmp") if not os.path.exists(data_dir): os.mkdir(data_dir) with tempfile.TemporaryDirectory() as tmpdir: popd = os.getcwd() os.chdir(tmpdir) self.huamidevice.get_gps_data() for filename in self.huamidevice.agps_packs.values(): sdpathfile = os.path.join(data_dir, filename) shutil.copyfile(filename, sdpathfile) self.log.info(f"Processing {filename}") if "zip" not in filename: continue if filename == "epo.zip": # epo zip files should not be extracted continue with zipfile.ZipFile(filename, "r") as zip_f: zip_f.extractall(data_dir) os.chdir(popd) self.log.debug(f'File(s) downloaded and extracted to {data_dir}') def create_uihh_agps_file(self): import typemap as tm import pathlib from binascii import crc32 data_dir = os.path.abspath(config.target_dir or "./tmp") if not os.path.exists(data_dir): self.log.info(f"Data dir {data_dir} doesn't exist") return content = b"" for typeID, inputfilename in tm.typemap.items(): fullPathName = pathlib.Path(data_dir).joinpath(inputfilename) if not fullPathName.is_file(): self.log.warn(f"File not found: {fullPathName}. Skipping") return with open(fullPathName, "rb") as f: filecontent = f.read() self.log.info(f"Packing {inputfilename}") fileheader = chr(1).encode() + typeID.to_bytes(1,"big") + len(filecontent).to_bytes(4,"little") + crc32(filecontent).to_bytes(4,"little") content += fileheader + filecontent self.log.info("Adding header") header = ["UIHH" , chr(0x04) , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x01) , crc32(content).to_bytes(4,"little") , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00) , len(content).to_bytes(4,"little") , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00)] merged_header = b"" for i in header: if isinstance(i, str): i = i.encode() merged_header += i content = merged_header+content outputfile = pathlib.Path(data_dir).joinpath("aGPS_UIHH.bin") self.log.info(f"Writing {outputfile}") with open(outputfile, "wb") as f: f.write(content) self.log.info("aGPS UIHH created")