hf-cli/package/huami_token.py

207 lines
7.3 KiB
Python
Raw Permalink Normal View History

2024-11-10 21:18:00 +01:00
#!/usr/bin/env python3
import json
import uuid
import random
import shutil
import urllib
import requests
import urls
class HuamiAmazfit:
AGPS_PACKS = {
"AGPS_ALM": "cep_1week.zip",
"AGPSZIP": "cep_7days.zip",
"LLE": "lle_1week.zip",
"AGPS": "cep_pak.bin",
"EPO": "epo.zip"
}
def __init__(self, email=None, password=None):
if not email or not password:
raise ValueError("For Amazfit method E-Mail and Password can not be null.")
self.method = 'amazfit'
self.email = email
self.password = password
self.access_token = None
self.country_code = None
self.app_token = None
self.login_token = None
self.user_id = None
self.result = None
self.r = str(uuid.uuid4())
# IMEI or something unique
self.device_id = "02:00:00:%02x:%02x:%02x" % (random.randint(0, 255),
random.randint(0, 255),
random.randint(0, 255))
self.agps_packs = self.AGPS_PACKS
def __str__(self):
return f"{self.email} {self.password} {self.method} {self.access_token}"
def parse_token(self, token_url):
parsed_token_url = urllib.parse.urlparse(token_url)
token_url_parameters = urllib.parse.parse_qs(parsed_token_url.query)
if 'code' not in token_url_parameters:
return
self.access_token = token_url_parameters['code']
self.country_code = 'US'
def get_access_token(self):
print("Getting access token ...")
auth_url = urls.URLS['tokens_amazfit'].format(user_email=urllib.parse.quote(self.email))
data = urls.PAYLOADS['tokens_amazfit']
data['password'] = self.password
response = requests.post(auth_url, data=data, allow_redirects=False)
response.raise_for_status()
# 'Location' parameter contains url with login status
redirect_url = urllib.parse.urlparse(response.headers.get('Location'))
redirect_url_parameters = urllib.parse.parse_qs(redirect_url.query)
if 'error' in redirect_url_parameters:
raise ValueError(f"Wrong E-mail or Password. Error: {redirect_url_parameters['error']}")
if 'access' not in redirect_url_parameters:
raise ValueError("No 'access' parameter in login url.")
if 'country_code' not in redirect_url_parameters:
raise ValueError("No 'country_code' parameter in login url.")
self.access_token = redirect_url_parameters['access']
self.country_code = redirect_url_parameters['country_code']
return True
def login(self, external_token=None):
print("Logging in...")
if external_token:
self.access_token = external_token
login_url = urls.URLS['login_amazfit']
data = urls.PAYLOADS['login_amazfit']
data['country_code'] = self.country_code
data['device_id'] = self.device_id
data['third_name'] = 'huami' if self.method == 'amazfit' else 'mi-watch'
data['code'] = self.access_token
data['grant_type'] = 'access_token' if self.method == 'amazfit' else 'request_token'
response = requests.post(login_url, data=data, allow_redirects=False)
response.raise_for_status()
login_result = response.json()
if 'error_code' in login_result:
raise ValueError(f"Login error. Error: {login_result['error_code']}")
if 'token_info' not in login_result:
raise ValueError("No 'token_info' parameter in login data.")
else:
token_info = login_result['token_info']
if 'app_token' not in token_info:
raise ValueError("No 'app_token' parameter in login data.")
self.app_token = token_info['app_token']
if 'login_token' not in token_info:
raise ValueError("No 'login_token' parameter in login data.")
self.login_token = token_info['login_token']
if 'user_id' not in token_info:
raise ValueError("No 'user_id' parameter in login data.")
self.user_id = token_info['user_id']
print("Logged in! User id: {}".format(self.user_id))
return True
def get_wearable_auth_keys(self):
print("Getting linked wearables...")
print(self.user_id)
devices_url = urls.URLS['devices'].format(user_id=urllib.parse.quote(self.user_id))
headers = urls.PAYLOADS['devices']
headers['apptoken'] = self.app_token
response = requests.get(devices_url, headers=headers)
response.raise_for_status()
device_request = response.json()
if 'items' not in device_request:
raise ValueError("No 'items' parameter in devices data.")
devices = device_request['items']
devices_dict = {}
for idx, wearable in enumerate(devices):
if 'macAddress' not in wearable:
raise ValueError("No 'macAddress' parameter in device data.")
mac_address = wearable['macAddress']
if 'additionalInfo' not in wearable:
raise ValueError("No 'additionalInfo' parameter in device data.")
device_info = json.loads(wearable['additionalInfo'])
if 'auth_key' not in device_info:
raise ValueError("No 'auth_key' parameter in device data.")
key_str = device_info['auth_key']
auth_key = '0x' + (key_str if key_str != '' else '00')
devices_dict[f'{mac_address}'] = auth_key
return devices_dict
def get_gps_data(self) -> None:
"""Download GPS packs: almanac and AGPS"""
agps_packs = self.agps_packs.keys()
agps_file_names = self.agps_packs.values()
agps_link = urls.URLS['agps']
headers = urls.PAYLOADS['agps']
headers['apptoken'] = self.app_token
for agps_pack_name, agps_file_name in self.agps_packs.items():
print(f"Downloading {agps_pack_name}...")
servers=['api-mifit-de2', 'api-mifit-us2', 'api-mifit-cn2', 'api-mifit-sg2','api-mifit']
for server in servers:
print(f"trying server: {server}")
response = requests.get(agps_link.format(pack_name=agps_pack_name, server=server), headers=headers)
print(f"status: {response.status_code}")
#response.raise_for_status()
if response.status_code == 401:
print("try new server")
continue
agps_result = response.json()[0]
if 'fileUrl' not in agps_result:
raise ValueError("No 'fileUrl' parameter in files request.")
with requests.get(agps_result['fileUrl'], stream=True) as request:
with open(agps_file_name, 'wb') as gps_file:
shutil.copyfileobj(request.raw, gps_file)
break
def logout(self):
logout_url = urls.URLS['logout']
data = urls.PAYLOADS['logout']
data['login_token'] = self.login_token
response = requests.post(logout_url, data=data)
logout_result = response.json()
if logout_result['result'] == 'ok':
print("\nLogged out.")
else:
print("\nError logging out.")