#!/usr/bin/env python3 import json import uuid import random import shutil import urllib import requests import urls class HuamiAmazfit: AGPS_PACKS = { "AGPS_ALM": "cep_1week.zip", "AGPSZIP": "cep_7days.zip", "LLE": "lle_1week.zip", "AGPS": "cep_pak.bin", "EPO": "epo.zip" } def __init__(self, email=None, password=None): if not email or not password: raise ValueError("For Amazfit method E-Mail and Password can not be null.") self.method = 'amazfit' self.email = email self.password = password self.access_token = None self.country_code = None self.app_token = None self.login_token = None self.user_id = None self.result = None self.r = str(uuid.uuid4()) # IMEI or something unique self.device_id = "02:00:00:%02x:%02x:%02x" % (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)) self.agps_packs = self.AGPS_PACKS def __str__(self): return f"{self.email} {self.password} {self.method} {self.access_token}" def parse_token(self, token_url): parsed_token_url = urllib.parse.urlparse(token_url) token_url_parameters = urllib.parse.parse_qs(parsed_token_url.query) if 'code' not in token_url_parameters: return self.access_token = token_url_parameters['code'] self.country_code = 'US' def get_access_token(self): print("Getting access token ...") auth_url = urls.URLS['tokens_amazfit'].format(user_email=urllib.parse.quote(self.email)) data = urls.PAYLOADS['tokens_amazfit'] data['password'] = self.password response = requests.post(auth_url, data=data, allow_redirects=False) response.raise_for_status() # 'Location' parameter contains url with login status redirect_url = urllib.parse.urlparse(response.headers.get('Location')) redirect_url_parameters = urllib.parse.parse_qs(redirect_url.query) if 'error' in redirect_url_parameters: raise ValueError(f"Wrong E-mail or Password. Error: {redirect_url_parameters['error']}") if 'access' not in redirect_url_parameters: raise ValueError("No 'access' parameter in login url.") if 'country_code' not in redirect_url_parameters: raise ValueError("No 'country_code' parameter in login url.") self.access_token = redirect_url_parameters['access'] self.country_code = redirect_url_parameters['country_code'] return True def login(self, external_token=None): print("Logging in...") if external_token: self.access_token = external_token login_url = urls.URLS['login_amazfit'] data = urls.PAYLOADS['login_amazfit'] data['country_code'] = self.country_code data['device_id'] = self.device_id data['third_name'] = 'huami' if self.method == 'amazfit' else 'mi-watch' data['code'] = self.access_token data['grant_type'] = 'access_token' if self.method == 'amazfit' else 'request_token' response = requests.post(login_url, data=data, allow_redirects=False) response.raise_for_status() login_result = response.json() if 'error_code' in login_result: raise ValueError(f"Login error. Error: {login_result['error_code']}") if 'token_info' not in login_result: raise ValueError("No 'token_info' parameter in login data.") else: token_info = login_result['token_info'] if 'app_token' not in token_info: raise ValueError("No 'app_token' parameter in login data.") self.app_token = token_info['app_token'] if 'login_token' not in token_info: raise ValueError("No 'login_token' parameter in login data.") self.login_token = token_info['login_token'] if 'user_id' not in token_info: raise ValueError("No 'user_id' parameter in login data.") self.user_id = token_info['user_id'] print("Logged in! User id: {}".format(self.user_id)) return True def get_wearable_auth_keys(self): print("Getting linked wearables...") print(self.user_id) devices_url = urls.URLS['devices'].format(user_id=urllib.parse.quote(self.user_id)) headers = urls.PAYLOADS['devices'] headers['apptoken'] = self.app_token response = requests.get(devices_url, headers=headers) response.raise_for_status() device_request = response.json() if 'items' not in device_request: raise ValueError("No 'items' parameter in devices data.") devices = device_request['items'] devices_dict = {} for idx, wearable in enumerate(devices): if 'macAddress' not in wearable: raise ValueError("No 'macAddress' parameter in device data.") mac_address = wearable['macAddress'] if 'additionalInfo' not in wearable: raise ValueError("No 'additionalInfo' parameter in device data.") device_info = json.loads(wearable['additionalInfo']) if 'auth_key' not in device_info: raise ValueError("No 'auth_key' parameter in device data.") key_str = device_info['auth_key'] auth_key = '0x' + (key_str if key_str != '' else '00') devices_dict[f'{mac_address}'] = auth_key return devices_dict def get_gps_data(self) -> None: """Download GPS packs: almanac and AGPS""" agps_packs = self.agps_packs.keys() agps_file_names = self.agps_packs.values() agps_link = urls.URLS['agps'] headers = urls.PAYLOADS['agps'] headers['apptoken'] = self.app_token for agps_pack_name, agps_file_name in self.agps_packs.items(): print(f"Downloading {agps_pack_name}...") servers=['api-mifit-de2', 'api-mifit-us2', 'api-mifit-cn2', 'api-mifit-sg2','api-mifit'] for server in servers: print(f"trying server: {server}") response = requests.get(agps_link.format(pack_name=agps_pack_name, server=server), headers=headers) print(f"status: {response.status_code}") #response.raise_for_status() if response.status_code == 401: print("try new server") continue agps_result = response.json()[0] if 'fileUrl' not in agps_result: raise ValueError("No 'fileUrl' parameter in files request.") with requests.get(agps_result['fileUrl'], stream=True) as request: with open(agps_file_name, 'wb') as gps_file: shutil.copyfileobj(request.raw, gps_file) break def logout(self): logout_url = urls.URLS['logout'] data = urls.PAYLOADS['logout'] data['login_token'] = self.login_token response = requests.post(logout_url, data=data) logout_result = response.json() if logout_result['result'] == 'ok': print("\nLogged out.") else: print("\nError logging out.")