207 lines
7.3 KiB
Python
207 lines
7.3 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import json
|
|
import uuid
|
|
import random
|
|
import shutil
|
|
import urllib
|
|
import requests
|
|
|
|
import urls
|
|
|
|
class HuamiAmazfit:
|
|
AGPS_PACKS = {
|
|
"AGPS_ALM": "cep_1week.zip",
|
|
"AGPSZIP": "cep_7days.zip",
|
|
"LLE": "lle_1week.zip",
|
|
"AGPS": "cep_pak.bin",
|
|
"EPO": "epo.zip"
|
|
}
|
|
|
|
def __init__(self, email=None, password=None):
|
|
|
|
if not email or not password:
|
|
raise ValueError("For Amazfit method E-Mail and Password can not be null.")
|
|
self.method = 'amazfit'
|
|
self.email = email
|
|
self.password = password
|
|
self.access_token = None
|
|
self.country_code = None
|
|
|
|
self.app_token = None
|
|
self.login_token = None
|
|
self.user_id = None
|
|
|
|
self.result = None
|
|
|
|
self.r = str(uuid.uuid4())
|
|
|
|
# IMEI or something unique
|
|
self.device_id = "02:00:00:%02x:%02x:%02x" % (random.randint(0, 255),
|
|
random.randint(0, 255),
|
|
random.randint(0, 255))
|
|
self.agps_packs = self.AGPS_PACKS
|
|
|
|
def __str__(self):
|
|
return f"{self.email} {self.password} {self.method} {self.access_token}"
|
|
|
|
def parse_token(self, token_url):
|
|
|
|
parsed_token_url = urllib.parse.urlparse(token_url)
|
|
token_url_parameters = urllib.parse.parse_qs(parsed_token_url.query)
|
|
|
|
if 'code' not in token_url_parameters:
|
|
return
|
|
|
|
self.access_token = token_url_parameters['code']
|
|
self.country_code = 'US'
|
|
|
|
def get_access_token(self):
|
|
print("Getting access token ...")
|
|
|
|
auth_url = urls.URLS['tokens_amazfit'].format(user_email=urllib.parse.quote(self.email))
|
|
|
|
data = urls.PAYLOADS['tokens_amazfit']
|
|
data['password'] = self.password
|
|
|
|
response = requests.post(auth_url, data=data, allow_redirects=False)
|
|
response.raise_for_status()
|
|
|
|
# 'Location' parameter contains url with login status
|
|
redirect_url = urllib.parse.urlparse(response.headers.get('Location'))
|
|
redirect_url_parameters = urllib.parse.parse_qs(redirect_url.query)
|
|
|
|
if 'error' in redirect_url_parameters:
|
|
raise ValueError(f"Wrong E-mail or Password. Error: {redirect_url_parameters['error']}")
|
|
|
|
if 'access' not in redirect_url_parameters:
|
|
raise ValueError("No 'access' parameter in login url.")
|
|
|
|
if 'country_code' not in redirect_url_parameters:
|
|
raise ValueError("No 'country_code' parameter in login url.")
|
|
|
|
self.access_token = redirect_url_parameters['access']
|
|
self.country_code = redirect_url_parameters['country_code']
|
|
|
|
return True
|
|
|
|
|
|
def login(self, external_token=None):
|
|
print("Logging in...")
|
|
if external_token:
|
|
self.access_token = external_token
|
|
|
|
login_url = urls.URLS['login_amazfit']
|
|
|
|
data = urls.PAYLOADS['login_amazfit']
|
|
data['country_code'] = self.country_code
|
|
data['device_id'] = self.device_id
|
|
data['third_name'] = 'huami' if self.method == 'amazfit' else 'mi-watch'
|
|
data['code'] = self.access_token
|
|
data['grant_type'] = 'access_token' if self.method == 'amazfit' else 'request_token'
|
|
|
|
response = requests.post(login_url, data=data, allow_redirects=False)
|
|
response.raise_for_status()
|
|
login_result = response.json()
|
|
|
|
if 'error_code' in login_result:
|
|
raise ValueError(f"Login error. Error: {login_result['error_code']}")
|
|
|
|
if 'token_info' not in login_result:
|
|
raise ValueError("No 'token_info' parameter in login data.")
|
|
else:
|
|
token_info = login_result['token_info']
|
|
if 'app_token' not in token_info:
|
|
raise ValueError("No 'app_token' parameter in login data.")
|
|
self.app_token = token_info['app_token']
|
|
|
|
if 'login_token' not in token_info:
|
|
raise ValueError("No 'login_token' parameter in login data.")
|
|
self.login_token = token_info['login_token']
|
|
|
|
if 'user_id' not in token_info:
|
|
raise ValueError("No 'user_id' parameter in login data.")
|
|
self.user_id = token_info['user_id']
|
|
print("Logged in! User id: {}".format(self.user_id))
|
|
return True
|
|
|
|
def get_wearable_auth_keys(self):
|
|
print("Getting linked wearables...")
|
|
print(self.user_id)
|
|
|
|
devices_url = urls.URLS['devices'].format(user_id=urllib.parse.quote(self.user_id))
|
|
|
|
headers = urls.PAYLOADS['devices']
|
|
headers['apptoken'] = self.app_token
|
|
|
|
response = requests.get(devices_url, headers=headers)
|
|
response.raise_for_status()
|
|
device_request = response.json()
|
|
if 'items' not in device_request:
|
|
raise ValueError("No 'items' parameter in devices data.")
|
|
devices = device_request['items']
|
|
|
|
devices_dict = {}
|
|
|
|
for idx, wearable in enumerate(devices):
|
|
if 'macAddress' not in wearable:
|
|
raise ValueError("No 'macAddress' parameter in device data.")
|
|
mac_address = wearable['macAddress']
|
|
|
|
if 'additionalInfo' not in wearable:
|
|
raise ValueError("No 'additionalInfo' parameter in device data.")
|
|
device_info = json.loads(wearable['additionalInfo'])
|
|
|
|
if 'auth_key' not in device_info:
|
|
raise ValueError("No 'auth_key' parameter in device data.")
|
|
key_str = device_info['auth_key']
|
|
auth_key = '0x' + (key_str if key_str != '' else '00')
|
|
|
|
devices_dict[f'{mac_address}'] = auth_key
|
|
|
|
return devices_dict
|
|
|
|
def get_gps_data(self) -> None:
|
|
"""Download GPS packs: almanac and AGPS"""
|
|
agps_packs = self.agps_packs.keys()
|
|
agps_file_names = self.agps_packs.values()
|
|
agps_link = urls.URLS['agps']
|
|
|
|
headers = urls.PAYLOADS['agps']
|
|
headers['apptoken'] = self.app_token
|
|
|
|
for agps_pack_name, agps_file_name in self.agps_packs.items():
|
|
print(f"Downloading {agps_pack_name}...")
|
|
servers=['api-mifit-de2', 'api-mifit-us2', 'api-mifit-cn2', 'api-mifit-sg2','api-mifit']
|
|
for server in servers:
|
|
print(f"trying server: {server}")
|
|
response = requests.get(agps_link.format(pack_name=agps_pack_name, server=server), headers=headers)
|
|
print(f"status: {response.status_code}")
|
|
#response.raise_for_status()
|
|
if response.status_code == 401:
|
|
print("try new server")
|
|
continue
|
|
agps_result = response.json()[0]
|
|
if 'fileUrl' not in agps_result:
|
|
raise ValueError("No 'fileUrl' parameter in files request.")
|
|
with requests.get(agps_result['fileUrl'], stream=True) as request:
|
|
with open(agps_file_name, 'wb') as gps_file:
|
|
shutil.copyfileobj(request.raw, gps_file)
|
|
break
|
|
|
|
def logout(self):
|
|
logout_url = urls.URLS['logout']
|
|
|
|
data = urls.PAYLOADS['logout']
|
|
data['login_token'] = self.login_token
|
|
|
|
response = requests.post(logout_url, data=data)
|
|
logout_result = response.json()
|
|
|
|
if logout_result['result'] == 'ok':
|
|
print("\nLogged out.")
|
|
else:
|
|
print("\nError logging out.")
|
|
|
|
|