From b3902a769104bf250252ca53ff8ab45ef102d850 Mon Sep 17 00:00:00 2001 From: Daniel Demus Date: Sun, 10 Nov 2024 21:18:00 +0100 Subject: [PATCH] Initial commit --- .gitignore | 6 ++ README.md | 6 ++ config_handler.py | 73 ++++++++++++++++ fetcher.py | 132 +++++++++++++++++++++++++++++ hf-cli | 41 +++++++++ huami_token.py | 206 ++++++++++++++++++++++++++++++++++++++++++++++ typemap.json | 7 ++ typemap.py | 24 ++++++ urls.py | 89 ++++++++++++++++++++ 9 files changed, 584 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 config_handler.py create mode 100644 fetcher.py create mode 100755 hf-cli create mode 100644 huami_token.py create mode 100644 typemap.json create mode 100644 typemap.py create mode 100644 urls.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..58ba3a9 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ + +__pycache__/ + +res/ + +*.log diff --git a/README.md b/README.md new file mode 100644 index 0000000..555f56f --- /dev/null +++ b/README.md @@ -0,0 +1,6 @@ +# Huafetcher CLI + +If you use [GadgetBridge](https://codeberg.org/Freeyourgadget/Gadgetbridge) with an AmazFit gadget, you have probably used [Huafetcher](https://codeberg.org/vanous/huafetcher) to get keys and update aGPS. + +This project extracts the non-Kivy parts of Huafetcher into a cli app, that can be used to get keys or various forms of aGPS files from the command-line or for example on a daily schedule. + diff --git a/config_handler.py b/config_handler.py new file mode 100644 index 0000000..8c6057a --- /dev/null +++ b/config_handler.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python3 + +import json +import os +import logging +from types import SimpleNamespace +from huami_token import HuamiAmazfit + +class ConfigHandler: + def __init__(self, args, config_path='./config.json'): + self.config_path = config_path + new_config = self.load_config() + config_change = False + if args.logfile: + self.config.logfile = args.logfile + config_change = True + if hasattr(self.config, 'logfile'): + logging.basicConfig(filename=self.config.logfile, level=logging.DEBUG if args.verbose else logging.INFO) + else: + logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO) + + if not hasattr(self.config, 'credentials'): + self.config.credentials = SimpleNamespace() + config_change = True + if not hasattr(self.config.credentials, 'email'): + self.config.credentials.email = input('Username/Email: ') + config_change = True + if not hasattr(self.config.credentials, 'password'): + self.config.credentials.password = getpass('Password: ') + config_change = True + if args.targets: + self.config.targets = args.targets + config_change = True + elif not args.use_config: + self.config.targets = [] + + if args.target_dir: + self.config.target_dir = os.path.abspath(args.target_dir) + config_change = True + elif not self.config.target_dir: + self.config.target_dir = '' + + if config_change and (new_config or args.save): + save_config() + + @property + def target_dir(self): + return self.config.target_dir + + @target_dir.setter + def target_dir(self, value): + self.config.target_dir = value + + def load_config(self): + try: + with open(self.config_path) as config_file: + self.config = json.load(config_file, object_hook=lambda d: SimpleNamespace(**d)) + return False + except IOError: + self.config = SimpleNamespace() + return True + + def save_config(self): + log.debug('Saving config') + with open(self.config_path, 'w') as config_file: + json.dump(self.config, config_file, default=lambda x: vars(x)) + + def configureHuamiDevice(self): + huamidevice = HuamiAmazfit(email=self.config.credentials.email, password=self.config.credentials.password) + huamidevice.method = 'amazfit' + if hasattr(self.config, 'targets') and self.config.targets: + huamidevice.agps_packs = dict(filter(lambda entry: entry[0] in self.config.targets, huamidevice.AGPS_PACKS.items())) + return huamidevice diff --git a/fetcher.py b/fetcher.py new file mode 100644 index 0000000..f8107b0 --- /dev/null +++ b/fetcher.py @@ -0,0 +1,132 @@ +#!/usr/bin/env python3 +import logging +import os +import tempfile + +from config_handler import ConfigHandler + +class Fetcher: + def __init__(self, args): + self.config = ConfigHandler(args) + self.huamidevice = self.config.configureHuamiDevice() + self.log = logging.getLogger(__name__) + + def exit_with_error(e): + self.log.error(f"{e}") + exit(1) + + + def get_token(self): + try: + res = self.huamidevice.get_access_token() + if not res: + exit_with_error("Couldn't acquire Amazfit access token") + self.log.debug("Amazfit access token acquired") + except Exception as e: + exit_with_error(e) + + def set_token(self, text): + self.huamidevice.parse_token(text) + if self.huamidevice.access_token is None: + exit_with_error('token not found in the url, repeat sign in a copy/paste url') + + def fetch_key(self): + res = False + try: + res = self.huamidevice.get_access_token() + if not res: + exit_with_error("amazfit login failed") + except Exception as e: + exit_with_error(e) + fetch_keys() + + def fetch_keys(self): + try: + res = self.huamidevice.login() + if res: + self.log.debug(f"Signed in as: {self.huamidevice.user_id}, getting data") + except Exception as e: + self.log.error(f"{e}") + return + + device_keys = self.huamidevice.get_wearable_auth_keys() + if device_keys: + for device_key in device_keys: + print(f"{device_key} {device_keys[device_key]}") + else: + print("No keys on the server") + + def get_agps_files(self): + import zipfile + import shutil + try: + res = self.huamidevice.login() + if res: + self.log.debug(f"Signed in as: {self.huamidevice.user_id}, getting data") + except Exception as e: + exit_with_error(e) + + data_dir = os.path.abspath(self.config.target_dir or "./tmp") + if not os.path.exists(data_dir): + os.mkdir(data_dir) + with tempfile.TemporaryDirectory() as tmpdir: + popd = os.getcwd() + os.chdir(tmpdir) + + self.huamidevice.get_gps_data() + for filename in self.huamidevice.agps_packs.values(): + sdpathfile = os.path.join(data_dir, filename) + shutil.copyfile(filename, sdpathfile) + self.log.info(f"Processing {filename}") + if "zip" not in filename: + continue + if filename == "epo.zip": + # epo zip files should not be extracted + continue + with zipfile.ZipFile(filename, "r") as zip_f: + zip_f.extractall(data_dir) + os.chdir(popd) + + self.log.debug(f'File(s) downloaded and extracted to {data_dir}') + + def create_uihh_agps_file(self): + import typemap as tm + import pathlib + from binascii import crc32 + data_dir = os.path.abspath(config.target_dir or "./tmp") + if not os.path.exists(data_dir): + self.log.info(f"Data dir {data_dir} doesn't exist") + return + content = b"" + + for typeID, inputfilename in tm.typemap.items(): + fullPathName = pathlib.Path(data_dir).joinpath(inputfilename) + if not fullPathName.is_file(): + self.log.warn(f"File not found: {fullPathName}. Skipping") + return + + with open(fullPathName, "rb") as f: + filecontent = f.read() + + self.log.info(f"Packing {inputfilename}") + fileheader = chr(1).encode() + typeID.to_bytes(1,"big") + len(filecontent).to_bytes(4,"little") + crc32(filecontent).to_bytes(4,"little") + content += fileheader + filecontent + + self.log.info("Adding header") + header = ["UIHH" , chr(0x04) , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x01) , crc32(content).to_bytes(4,"little") , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00) , len(content).to_bytes(4,"little") , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00)] + + merged_header = b"" + for i in header: + if isinstance(i, str): + i = i.encode() + merged_header += i + + content = merged_header+content + + outputfile = pathlib.Path(data_dir).joinpath("aGPS_UIHH.bin") + self.log.info(f"Writing {outputfile}") + with open(outputfile, "wb") as f: + f.write(content) + + self.log.info("aGPS UIHH created") + diff --git a/hf-cli b/hf-cli new file mode 100755 index 0000000..f80d13a --- /dev/null +++ b/hf-cli @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 +""" +AmazFit key and agps fetcher, that can also be used to schedule daily updates +""" + +import urls as urls +import argparse +import tempfile + +from getpass import getpass +from huami_token import HuamiAmazfit +from fetcher import Fetcher + +parser = argparse.ArgumentParser( + prog = 'hf-cli', + description = 'Can aquire a amazfit or huami key for your gadget and downloads amazfit and huami agps firmware updates') +parser.add_argument('-k', '--keys', action = 'store_true', help = 'Fetch keys') +parser.add_argument('-g', '--agps', action = 'store_true', help = 'Fetch agps files') +parser.add_argument('-u', '--uihh', action = 'store_true', help = 'Create UIHH agps file') +parser.add_argument('-s', '--save', action = 'store_true', help = 'Automatically save changes to the config.json file') +parser.add_argument('-t', '--target-dir', help = 'Move results to rhis directory after download') +parser.add_argument('-c', '--use-config', action='store_true', help = 'Load targets from the config file if not specified') +parser.add_argument('-l', '--logfile', help = 'Write log messages to this file') +parser.add_argument('-v', '--verbose', action='store_true', help = 'Write each operation to the console') +parser.add_argument('targets', help='Only download from the given targets. If none are specified, all will be tried', nargs='*', choices=HuamiAmazfit.AGPS_PACKS.keys()) +args = parser.parse_args() + +fetcher = Fetcher(args) +fetcher.get_token() + +if args.keys: + fetcher.fetch_key() + +with tempfile.TemporaryDirectory() as tmpdir: + if args.uihh or args.agps or args.targets: + fetcher.get_agps_files() + + if args.uihh: + fetcher.create_uihh_agps_file() + + diff --git a/huami_token.py b/huami_token.py new file mode 100644 index 0000000..60e909f --- /dev/null +++ b/huami_token.py @@ -0,0 +1,206 @@ +#!/usr/bin/env python3 + +import json +import uuid +import random +import shutil +import urllib +import requests + +import urls + +class HuamiAmazfit: + AGPS_PACKS = { + "AGPS_ALM": "cep_1week.zip", + "AGPSZIP": "cep_7days.zip", + "LLE": "lle_1week.zip", + "AGPS": "cep_pak.bin", + "EPO": "epo.zip" + } + + def __init__(self, email=None, password=None): + + if not email or not password: + raise ValueError("For Amazfit method E-Mail and Password can not be null.") + self.method = 'amazfit' + self.email = email + self.password = password + self.access_token = None + self.country_code = None + + self.app_token = None + self.login_token = None + self.user_id = None + + self.result = None + + self.r = str(uuid.uuid4()) + + # IMEI or something unique + self.device_id = "02:00:00:%02x:%02x:%02x" % (random.randint(0, 255), + random.randint(0, 255), + random.randint(0, 255)) + self.agps_packs = self.AGPS_PACKS + + def __str__(self): + return f"{self.email} {self.password} {self.method} {self.access_token}" + + def parse_token(self, token_url): + + parsed_token_url = urllib.parse.urlparse(token_url) + token_url_parameters = urllib.parse.parse_qs(parsed_token_url.query) + + if 'code' not in token_url_parameters: + return + + self.access_token = token_url_parameters['code'] + self.country_code = 'US' + + def get_access_token(self): + print("Getting access token ...") + + auth_url = urls.URLS['tokens_amazfit'].format(user_email=urllib.parse.quote(self.email)) + + data = urls.PAYLOADS['tokens_amazfit'] + data['password'] = self.password + + response = requests.post(auth_url, data=data, allow_redirects=False) + response.raise_for_status() + + # 'Location' parameter contains url with login status + redirect_url = urllib.parse.urlparse(response.headers.get('Location')) + redirect_url_parameters = urllib.parse.parse_qs(redirect_url.query) + + if 'error' in redirect_url_parameters: + raise ValueError(f"Wrong E-mail or Password. Error: {redirect_url_parameters['error']}") + + if 'access' not in redirect_url_parameters: + raise ValueError("No 'access' parameter in login url.") + + if 'country_code' not in redirect_url_parameters: + raise ValueError("No 'country_code' parameter in login url.") + + self.access_token = redirect_url_parameters['access'] + self.country_code = redirect_url_parameters['country_code'] + + return True + + + def login(self, external_token=None): + print("Logging in...") + if external_token: + self.access_token = external_token + + login_url = urls.URLS['login_amazfit'] + + data = urls.PAYLOADS['login_amazfit'] + data['country_code'] = self.country_code + data['device_id'] = self.device_id + data['third_name'] = 'huami' if self.method == 'amazfit' else 'mi-watch' + data['code'] = self.access_token + data['grant_type'] = 'access_token' if self.method == 'amazfit' else 'request_token' + + response = requests.post(login_url, data=data, allow_redirects=False) + response.raise_for_status() + login_result = response.json() + + if 'error_code' in login_result: + raise ValueError(f"Login error. Error: {login_result['error_code']}") + + if 'token_info' not in login_result: + raise ValueError("No 'token_info' parameter in login data.") + else: + token_info = login_result['token_info'] + if 'app_token' not in token_info: + raise ValueError("No 'app_token' parameter in login data.") + self.app_token = token_info['app_token'] + + if 'login_token' not in token_info: + raise ValueError("No 'login_token' parameter in login data.") + self.login_token = token_info['login_token'] + + if 'user_id' not in token_info: + raise ValueError("No 'user_id' parameter in login data.") + self.user_id = token_info['user_id'] + print("Logged in! User id: {}".format(self.user_id)) + return True + + def get_wearable_auth_keys(self): + print("Getting linked wearables...") + print(self.user_id) + + devices_url = urls.URLS['devices'].format(user_id=urllib.parse.quote(self.user_id)) + + headers = urls.PAYLOADS['devices'] + headers['apptoken'] = self.app_token + + response = requests.get(devices_url, headers=headers) + response.raise_for_status() + device_request = response.json() + if 'items' not in device_request: + raise ValueError("No 'items' parameter in devices data.") + devices = device_request['items'] + + devices_dict = {} + + for idx, wearable in enumerate(devices): + if 'macAddress' not in wearable: + raise ValueError("No 'macAddress' parameter in device data.") + mac_address = wearable['macAddress'] + + if 'additionalInfo' not in wearable: + raise ValueError("No 'additionalInfo' parameter in device data.") + device_info = json.loads(wearable['additionalInfo']) + + if 'auth_key' not in device_info: + raise ValueError("No 'auth_key' parameter in device data.") + key_str = device_info['auth_key'] + auth_key = '0x' + (key_str if key_str != '' else '00') + + devices_dict[f'{mac_address}'] = auth_key + + return devices_dict + + def get_gps_data(self) -> None: + """Download GPS packs: almanac and AGPS""" + agps_packs = self.agps_packs.keys() + agps_file_names = self.agps_packs.values() + agps_link = urls.URLS['agps'] + + headers = urls.PAYLOADS['agps'] + headers['apptoken'] = self.app_token + + for agps_pack_name, agps_file_name in self.agps_packs.items(): + print(f"Downloading {agps_pack_name}...") + servers=['api-mifit-de2', 'api-mifit-us2', 'api-mifit-cn2', 'api-mifit-sg2','api-mifit'] + for server in servers: + print(f"trying server: {server}") + response = requests.get(agps_link.format(pack_name=agps_pack_name, server=server), headers=headers) + print(f"status: {response.status_code}") + #response.raise_for_status() + if response.status_code == 401: + print("try new server") + continue + agps_result = response.json()[0] + if 'fileUrl' not in agps_result: + raise ValueError("No 'fileUrl' parameter in files request.") + with requests.get(agps_result['fileUrl'], stream=True) as request: + with open(agps_file_name, 'wb') as gps_file: + shutil.copyfileobj(request.raw, gps_file) + break + + def logout(self): + logout_url = urls.URLS['logout'] + + data = urls.PAYLOADS['logout'] + data['login_token'] = self.login_token + + response = requests.post(logout_url, data=data) + logout_result = response.json() + + if logout_result['result'] == 'ok': + print("\nLogged out.") + else: + print("\nError logging out.") + + diff --git a/typemap.json b/typemap.json new file mode 100644 index 0000000..fb362c0 --- /dev/null +++ b/typemap.json @@ -0,0 +1,7 @@ +{ "0x05" : "gps_alm.bin", + "0x0f" : "gln_alm.bin", + "0x86" : "lle_bds.lle", + "0x87" : "lle_gps.lle", + "0x88" : "lle_glo.lle", + "0x89" : "lle_gal.lle", + "0x8a" : "lle_qzss.lle"} diff --git a/typemap.py b/typemap.py new file mode 100644 index 0000000..ce88db2 --- /dev/null +++ b/typemap.py @@ -0,0 +1,24 @@ +#Copyright (C) 2021 Andreas Shimokawa +# +#This file is part of Gadgetbridge-tools. +# +#Gadgetbridge is free software: you can redistribute it and/or modify +#it under the terms of the GNU Affero General Public License as published +#by the Free Software Foundation, either version 3 of the License, or +#(at your option) any later version. +# +#Gadgetbridge is distributed in the hope that it will be useful, +#but WITHOUT ANY WARRANTY; without even the implied warranty of +#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +#GNU Affero General Public License for more details. +# +#You should have received a copy of the GNU Affero General Public License +#along with this program. If not, see . + +import json + +with open("typemap.json") as f: + jsonmap = json.load(f) + +typemap={int(k,16):v for k,v in jsonmap.items()} + diff --git a/urls.py b/urls.py new file mode 100644 index 0000000..259f400 --- /dev/null +++ b/urls.py @@ -0,0 +1,89 @@ +# Copyright (c) 2020 Kirill Snezhko + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +"""Module for storin urls and payloads fro different requests""" + +URLS = { + 'login_xiaomi': 'https://account.xiaomi.com/oauth2/authorize?skip_confirm=false&' + 'client_id=2882303761517383915&pt=0&scope=1+6000+16001+20000&' + 'redirect_uri=https%3A%2F%2Fhm.xiaomi.com%2Fwatch.do&' + '_locale=en_US&response_type=code', + 'tokens_amazfit': 'https://api-user.huami.com/registrations/{user_email}/tokens', + 'login_amazfit': 'https://account.huami.com/v2/client/login', + 'devices': 'https://api-mifit-us2.huami.com/users/{user_id}/devices', + 'agps': 'https://{server}.huami.com/apps/com.huami.midong/fileTypes/{pack_name}/files', + 'data_short': 'https://api-mifit-us2.huami.com/users/{user_id}/deviceTypes/4/data', + 'logout': 'https://account-us2.huami.com/v1/client/logout', + 'fw_updates': 'https://api-mifit-us2.huami.com/devices/ALL/hasNewVersion' +} + +PAYLOADS = { + 'login_xiaomi': None, + 'tokens_amazfit': { + 'state': 'REDIRECTION', + 'client_id': 'HuaMi', + 'password': None, + 'redirect_uri': 'https://s3-us-west-2.amazonws.com/hm-registration/successsignin.html', + 'region': 'us-west-2', + 'token': 'access', + 'country_code': 'US' + }, + 'login_amazfit': { + 'dn': 'account.huami.com,api-user.huami.com,app-analytics.huami.com,' + 'api-watch.huami.com,' + 'api-analytics.huami.com,api-mifit.huami.com', + 'app_version': '5.9.2-play_100355', + 'source': 'com.huami.watch.hmwatchmanager', + 'country_code': None, + 'device_id': None, + 'third_name': None, + 'lang': 'en', + 'device_model': 'android_phone', + 'allow_registration': 'false', + 'app_name': 'com.huami.midong', + 'code': None, + 'grant_type': None + }, + 'devices': { + 'apptoken': None, + # 'enableMultiDevice': 'true' + }, + 'agps': { + 'apptoken': None + }, + 'data_short': { + 'apptoken': None, + 'startDay': None, + 'endDay': None + }, + 'logout': { + 'login_token': None + }, + 'fw_updates': { + 'productionSource': None, + 'deviceSource': None, + 'fontVersion': '0', + 'fontFlag': '0', + 'appVersion': '5.9.2-play_100355', + 'firmwareVersion': None, + 'hardwareVersion': None, + 'support8Bytes': 'true' + } +}