Initial commit

This commit is contained in:
Daniel Demus 2024-11-10 21:18:00 +01:00
commit b3902a7691
9 changed files with 584 additions and 0 deletions

6
.gitignore vendored Normal file
View File

@ -0,0 +1,6 @@
__pycache__/
res/
*.log

6
README.md Normal file
View File

@ -0,0 +1,6 @@
# Huafetcher CLI
If you use [GadgetBridge](https://codeberg.org/Freeyourgadget/Gadgetbridge) with an AmazFit gadget, you have probably used [Huafetcher](https://codeberg.org/vanous/huafetcher) to get keys and update aGPS.
This project extracts the non-Kivy parts of Huafetcher into a cli app, that can be used to get keys or various forms of aGPS files from the command-line or for example on a daily schedule.

73
config_handler.py Normal file
View File

@ -0,0 +1,73 @@
#!/usr/bin/env python3
import json
import os
import logging
from types import SimpleNamespace
from huami_token import HuamiAmazfit
class ConfigHandler:
def __init__(self, args, config_path='./config.json'):
self.config_path = config_path
new_config = self.load_config()
config_change = False
if args.logfile:
self.config.logfile = args.logfile
config_change = True
if hasattr(self.config, 'logfile'):
logging.basicConfig(filename=self.config.logfile, level=logging.DEBUG if args.verbose else logging.INFO)
else:
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO)
if not hasattr(self.config, 'credentials'):
self.config.credentials = SimpleNamespace()
config_change = True
if not hasattr(self.config.credentials, 'email'):
self.config.credentials.email = input('Username/Email: ')
config_change = True
if not hasattr(self.config.credentials, 'password'):
self.config.credentials.password = getpass('Password: ')
config_change = True
if args.targets:
self.config.targets = args.targets
config_change = True
elif not args.use_config:
self.config.targets = []
if args.target_dir:
self.config.target_dir = os.path.abspath(args.target_dir)
config_change = True
elif not self.config.target_dir:
self.config.target_dir = ''
if config_change and (new_config or args.save):
save_config()
@property
def target_dir(self):
return self.config.target_dir
@target_dir.setter
def target_dir(self, value):
self.config.target_dir = value
def load_config(self):
try:
with open(self.config_path) as config_file:
self.config = json.load(config_file, object_hook=lambda d: SimpleNamespace(**d))
return False
except IOError:
self.config = SimpleNamespace()
return True
def save_config(self):
log.debug('Saving config')
with open(self.config_path, 'w') as config_file:
json.dump(self.config, config_file, default=lambda x: vars(x))
def configureHuamiDevice(self):
huamidevice = HuamiAmazfit(email=self.config.credentials.email, password=self.config.credentials.password)
huamidevice.method = 'amazfit'
if hasattr(self.config, 'targets') and self.config.targets:
huamidevice.agps_packs = dict(filter(lambda entry: entry[0] in self.config.targets, huamidevice.AGPS_PACKS.items()))
return huamidevice

132
fetcher.py Normal file
View File

@ -0,0 +1,132 @@
#!/usr/bin/env python3
import logging
import os
import tempfile
from config_handler import ConfigHandler
class Fetcher:
def __init__(self, args):
self.config = ConfigHandler(args)
self.huamidevice = self.config.configureHuamiDevice()
self.log = logging.getLogger(__name__)
def exit_with_error(e):
self.log.error(f"{e}")
exit(1)
def get_token(self):
try:
res = self.huamidevice.get_access_token()
if not res:
exit_with_error("Couldn't acquire Amazfit access token")
self.log.debug("Amazfit access token acquired")
except Exception as e:
exit_with_error(e)
def set_token(self, text):
self.huamidevice.parse_token(text)
if self.huamidevice.access_token is None:
exit_with_error('token not found in the url, repeat sign in a copy/paste url')
def fetch_key(self):
res = False
try:
res = self.huamidevice.get_access_token()
if not res:
exit_with_error("amazfit login failed")
except Exception as e:
exit_with_error(e)
fetch_keys()
def fetch_keys(self):
try:
res = self.huamidevice.login()
if res:
self.log.debug(f"Signed in as: {self.huamidevice.user_id}, getting data")
except Exception as e:
self.log.error(f"{e}")
return
device_keys = self.huamidevice.get_wearable_auth_keys()
if device_keys:
for device_key in device_keys:
print(f"{device_key} {device_keys[device_key]}")
else:
print("No keys on the server")
def get_agps_files(self):
import zipfile
import shutil
try:
res = self.huamidevice.login()
if res:
self.log.debug(f"Signed in as: {self.huamidevice.user_id}, getting data")
except Exception as e:
exit_with_error(e)
data_dir = os.path.abspath(self.config.target_dir or "./tmp")
if not os.path.exists(data_dir):
os.mkdir(data_dir)
with tempfile.TemporaryDirectory() as tmpdir:
popd = os.getcwd()
os.chdir(tmpdir)
self.huamidevice.get_gps_data()
for filename in self.huamidevice.agps_packs.values():
sdpathfile = os.path.join(data_dir, filename)
shutil.copyfile(filename, sdpathfile)
self.log.info(f"Processing {filename}")
if "zip" not in filename:
continue
if filename == "epo.zip":
# epo zip files should not be extracted
continue
with zipfile.ZipFile(filename, "r") as zip_f:
zip_f.extractall(data_dir)
os.chdir(popd)
self.log.debug(f'File(s) downloaded and extracted to {data_dir}')
def create_uihh_agps_file(self):
import typemap as tm
import pathlib
from binascii import crc32
data_dir = os.path.abspath(config.target_dir or "./tmp")
if not os.path.exists(data_dir):
self.log.info(f"Data dir {data_dir} doesn't exist")
return
content = b""
for typeID, inputfilename in tm.typemap.items():
fullPathName = pathlib.Path(data_dir).joinpath(inputfilename)
if not fullPathName.is_file():
self.log.warn(f"File not found: {fullPathName}. Skipping")
return
with open(fullPathName, "rb") as f:
filecontent = f.read()
self.log.info(f"Packing {inputfilename}")
fileheader = chr(1).encode() + typeID.to_bytes(1,"big") + len(filecontent).to_bytes(4,"little") + crc32(filecontent).to_bytes(4,"little")
content += fileheader + filecontent
self.log.info("Adding header")
header = ["UIHH" , chr(0x04) , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x01) , crc32(content).to_bytes(4,"little") , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00) , len(content).to_bytes(4,"little") , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00) , chr(0x00)]
merged_header = b""
for i in header:
if isinstance(i, str):
i = i.encode()
merged_header += i
content = merged_header+content
outputfile = pathlib.Path(data_dir).joinpath("aGPS_UIHH.bin")
self.log.info(f"Writing {outputfile}")
with open(outputfile, "wb") as f:
f.write(content)
self.log.info("aGPS UIHH created")

41
hf-cli Executable file
View File

@ -0,0 +1,41 @@
#!/usr/bin/env python3
"""
AmazFit key and agps fetcher, that can also be used to schedule daily updates
"""
import urls as urls
import argparse
import tempfile
from getpass import getpass
from huami_token import HuamiAmazfit
from fetcher import Fetcher
parser = argparse.ArgumentParser(
prog = 'hf-cli',
description = 'Can aquire a amazfit or huami key for your gadget and downloads amazfit and huami agps firmware updates')
parser.add_argument('-k', '--keys', action = 'store_true', help = 'Fetch keys')
parser.add_argument('-g', '--agps', action = 'store_true', help = 'Fetch agps files')
parser.add_argument('-u', '--uihh', action = 'store_true', help = 'Create UIHH agps file')
parser.add_argument('-s', '--save', action = 'store_true', help = 'Automatically save changes to the config.json file')
parser.add_argument('-t', '--target-dir', help = 'Move results to rhis directory after download')
parser.add_argument('-c', '--use-config', action='store_true', help = 'Load targets from the config file if not specified')
parser.add_argument('-l', '--logfile', help = 'Write log messages to this file')
parser.add_argument('-v', '--verbose', action='store_true', help = 'Write each operation to the console')
parser.add_argument('targets', help='Only download from the given targets. If none are specified, all will be tried', nargs='*', choices=HuamiAmazfit.AGPS_PACKS.keys())
args = parser.parse_args()
fetcher = Fetcher(args)
fetcher.get_token()
if args.keys:
fetcher.fetch_key()
with tempfile.TemporaryDirectory() as tmpdir:
if args.uihh or args.agps or args.targets:
fetcher.get_agps_files()
if args.uihh:
fetcher.create_uihh_agps_file()

206
huami_token.py Normal file
View File

@ -0,0 +1,206 @@
#!/usr/bin/env python3
import json
import uuid
import random
import shutil
import urllib
import requests
import urls
class HuamiAmazfit:
AGPS_PACKS = {
"AGPS_ALM": "cep_1week.zip",
"AGPSZIP": "cep_7days.zip",
"LLE": "lle_1week.zip",
"AGPS": "cep_pak.bin",
"EPO": "epo.zip"
}
def __init__(self, email=None, password=None):
if not email or not password:
raise ValueError("For Amazfit method E-Mail and Password can not be null.")
self.method = 'amazfit'
self.email = email
self.password = password
self.access_token = None
self.country_code = None
self.app_token = None
self.login_token = None
self.user_id = None
self.result = None
self.r = str(uuid.uuid4())
# IMEI or something unique
self.device_id = "02:00:00:%02x:%02x:%02x" % (random.randint(0, 255),
random.randint(0, 255),
random.randint(0, 255))
self.agps_packs = self.AGPS_PACKS
def __str__(self):
return f"{self.email} {self.password} {self.method} {self.access_token}"
def parse_token(self, token_url):
parsed_token_url = urllib.parse.urlparse(token_url)
token_url_parameters = urllib.parse.parse_qs(parsed_token_url.query)
if 'code' not in token_url_parameters:
return
self.access_token = token_url_parameters['code']
self.country_code = 'US'
def get_access_token(self):
print("Getting access token ...")
auth_url = urls.URLS['tokens_amazfit'].format(user_email=urllib.parse.quote(self.email))
data = urls.PAYLOADS['tokens_amazfit']
data['password'] = self.password
response = requests.post(auth_url, data=data, allow_redirects=False)
response.raise_for_status()
# 'Location' parameter contains url with login status
redirect_url = urllib.parse.urlparse(response.headers.get('Location'))
redirect_url_parameters = urllib.parse.parse_qs(redirect_url.query)
if 'error' in redirect_url_parameters:
raise ValueError(f"Wrong E-mail or Password. Error: {redirect_url_parameters['error']}")
if 'access' not in redirect_url_parameters:
raise ValueError("No 'access' parameter in login url.")
if 'country_code' not in redirect_url_parameters:
raise ValueError("No 'country_code' parameter in login url.")
self.access_token = redirect_url_parameters['access']
self.country_code = redirect_url_parameters['country_code']
return True
def login(self, external_token=None):
print("Logging in...")
if external_token:
self.access_token = external_token
login_url = urls.URLS['login_amazfit']
data = urls.PAYLOADS['login_amazfit']
data['country_code'] = self.country_code
data['device_id'] = self.device_id
data['third_name'] = 'huami' if self.method == 'amazfit' else 'mi-watch'
data['code'] = self.access_token
data['grant_type'] = 'access_token' if self.method == 'amazfit' else 'request_token'
response = requests.post(login_url, data=data, allow_redirects=False)
response.raise_for_status()
login_result = response.json()
if 'error_code' in login_result:
raise ValueError(f"Login error. Error: {login_result['error_code']}")
if 'token_info' not in login_result:
raise ValueError("No 'token_info' parameter in login data.")
else:
token_info = login_result['token_info']
if 'app_token' not in token_info:
raise ValueError("No 'app_token' parameter in login data.")
self.app_token = token_info['app_token']
if 'login_token' not in token_info:
raise ValueError("No 'login_token' parameter in login data.")
self.login_token = token_info['login_token']
if 'user_id' not in token_info:
raise ValueError("No 'user_id' parameter in login data.")
self.user_id = token_info['user_id']
print("Logged in! User id: {}".format(self.user_id))
return True
def get_wearable_auth_keys(self):
print("Getting linked wearables...")
print(self.user_id)
devices_url = urls.URLS['devices'].format(user_id=urllib.parse.quote(self.user_id))
headers = urls.PAYLOADS['devices']
headers['apptoken'] = self.app_token
response = requests.get(devices_url, headers=headers)
response.raise_for_status()
device_request = response.json()
if 'items' not in device_request:
raise ValueError("No 'items' parameter in devices data.")
devices = device_request['items']
devices_dict = {}
for idx, wearable in enumerate(devices):
if 'macAddress' not in wearable:
raise ValueError("No 'macAddress' parameter in device data.")
mac_address = wearable['macAddress']
if 'additionalInfo' not in wearable:
raise ValueError("No 'additionalInfo' parameter in device data.")
device_info = json.loads(wearable['additionalInfo'])
if 'auth_key' not in device_info:
raise ValueError("No 'auth_key' parameter in device data.")
key_str = device_info['auth_key']
auth_key = '0x' + (key_str if key_str != '' else '00')
devices_dict[f'{mac_address}'] = auth_key
return devices_dict
def get_gps_data(self) -> None:
"""Download GPS packs: almanac and AGPS"""
agps_packs = self.agps_packs.keys()
agps_file_names = self.agps_packs.values()
agps_link = urls.URLS['agps']
headers = urls.PAYLOADS['agps']
headers['apptoken'] = self.app_token
for agps_pack_name, agps_file_name in self.agps_packs.items():
print(f"Downloading {agps_pack_name}...")
servers=['api-mifit-de2', 'api-mifit-us2', 'api-mifit-cn2', 'api-mifit-sg2','api-mifit']
for server in servers:
print(f"trying server: {server}")
response = requests.get(agps_link.format(pack_name=agps_pack_name, server=server), headers=headers)
print(f"status: {response.status_code}")
#response.raise_for_status()
if response.status_code == 401:
print("try new server")
continue
agps_result = response.json()[0]
if 'fileUrl' not in agps_result:
raise ValueError("No 'fileUrl' parameter in files request.")
with requests.get(agps_result['fileUrl'], stream=True) as request:
with open(agps_file_name, 'wb') as gps_file:
shutil.copyfileobj(request.raw, gps_file)
break
def logout(self):
logout_url = urls.URLS['logout']
data = urls.PAYLOADS['logout']
data['login_token'] = self.login_token
response = requests.post(logout_url, data=data)
logout_result = response.json()
if logout_result['result'] == 'ok':
print("\nLogged out.")
else:
print("\nError logging out.")

7
typemap.json Normal file
View File

@ -0,0 +1,7 @@
{ "0x05" : "gps_alm.bin",
"0x0f" : "gln_alm.bin",
"0x86" : "lle_bds.lle",
"0x87" : "lle_gps.lle",
"0x88" : "lle_glo.lle",
"0x89" : "lle_gal.lle",
"0x8a" : "lle_qzss.lle"}

24
typemap.py Normal file
View File

@ -0,0 +1,24 @@
#Copyright (C) 2021 Andreas Shimokawa
#
#This file is part of Gadgetbridge-tools.
#
#Gadgetbridge is free software: you can redistribute it and/or modify
#it under the terms of the GNU Affero General Public License as published
#by the Free Software Foundation, either version 3 of the License, or
#(at your option) any later version.
#
#Gadgetbridge is distributed in the hope that it will be useful,
#but WITHOUT ANY WARRANTY; without even the implied warranty of
#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#GNU Affero General Public License for more details.
#
#You should have received a copy of the GNU Affero General Public License
#along with this program. If not, see <http://www.gnu.org/licenses/>.
import json
with open("typemap.json") as f:
jsonmap = json.load(f)
typemap={int(k,16):v for k,v in jsonmap.items()}

89
urls.py Normal file
View File

@ -0,0 +1,89 @@
# Copyright (c) 2020 Kirill Snezhko
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""Module for storin urls and payloads fro different requests"""
URLS = {
'login_xiaomi': 'https://account.xiaomi.com/oauth2/authorize?skip_confirm=false&'
'client_id=2882303761517383915&pt=0&scope=1+6000+16001+20000&'
'redirect_uri=https%3A%2F%2Fhm.xiaomi.com%2Fwatch.do&'
'_locale=en_US&response_type=code',
'tokens_amazfit': 'https://api-user.huami.com/registrations/{user_email}/tokens',
'login_amazfit': 'https://account.huami.com/v2/client/login',
'devices': 'https://api-mifit-us2.huami.com/users/{user_id}/devices',
'agps': 'https://{server}.huami.com/apps/com.huami.midong/fileTypes/{pack_name}/files',
'data_short': 'https://api-mifit-us2.huami.com/users/{user_id}/deviceTypes/4/data',
'logout': 'https://account-us2.huami.com/v1/client/logout',
'fw_updates': 'https://api-mifit-us2.huami.com/devices/ALL/hasNewVersion'
}
PAYLOADS = {
'login_xiaomi': None,
'tokens_amazfit': {
'state': 'REDIRECTION',
'client_id': 'HuaMi',
'password': None,
'redirect_uri': 'https://s3-us-west-2.amazonws.com/hm-registration/successsignin.html',
'region': 'us-west-2',
'token': 'access',
'country_code': 'US'
},
'login_amazfit': {
'dn': 'account.huami.com,api-user.huami.com,app-analytics.huami.com,'
'api-watch.huami.com,'
'api-analytics.huami.com,api-mifit.huami.com',
'app_version': '5.9.2-play_100355',
'source': 'com.huami.watch.hmwatchmanager',
'country_code': None,
'device_id': None,
'third_name': None,
'lang': 'en',
'device_model': 'android_phone',
'allow_registration': 'false',
'app_name': 'com.huami.midong',
'code': None,
'grant_type': None
},
'devices': {
'apptoken': None,
# 'enableMultiDevice': 'true'
},
'agps': {
'apptoken': None
},
'data_short': {
'apptoken': None,
'startDay': None,
'endDay': None
},
'logout': {
'login_token': None
},
'fw_updates': {
'productionSource': None,
'deviceSource': None,
'fontVersion': '0',
'fontFlag': '0',
'appVersion': '5.9.2-play_100355',
'firmwareVersion': None,
'hardwareVersion': None,
'support8Bytes': 'true'
}
}