Refactor and modularize downloader

Modularize downloader. Handle Retry-After better. Unpack archives
This commit is contained in:
Daniel Demus 2022-05-27 16:41:34 +02:00
parent 85ac11ba2d
commit 7c4e3aa4a7
6 changed files with 349 additions and 137 deletions

161
.gitignore vendored Normal file
View File

@ -0,0 +1,161 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

19
danfoss.py Normal file
View File

@ -0,0 +1,19 @@
#!/usr/bin/env python3
from urllib.request import urlopen
from lxml.html import parse
from downloader import Downloader
class Danfoss(Downloader):
def getUrlList(self):
res = []
page = parse(urlopen('https://files.danfoss.com/download/Heating/Ally/')).getroot()
page.make_links_absolute('https://files.danfoss.com/download/Heating/Ally/')
for link in page.cssselect('a'):
if "OTA" in link.text_content():
res.append((link.get('href'), link.text_content()))
return res

116
downloader.py Normal file
View File

@ -0,0 +1,116 @@
#!/usr/bin/env python3
from abc import ABC, abstractmethod
from datetime import datetime, timedelta, timezone
from email.utils import parsedate_to_datetime
import os, requests, re, time
import tempfile
import shutil
class Downloader(ABC):
extensions = ('.ota', '.ota.signed', '.zigbee', '.fw2', '.sbl-ota')
def __init__(self):
self.otauPath = os.path.expanduser('~/otau')
@abstractmethod
def getUrlList(self):
pass
def performDownloads(self):
print("")
print(f"Putting {self.__class__.__name__} updates in {self.otauPath}")
if not os.path.exists(self.otauPath):
os.makedirs(self.otauPath)
cnt = 0
retries = self.getUrlList()
delay = None
while cnt == 0 or (cnt < 50 and delay):
retries, delay = self.handleDownloads(retries, delay)
cnt += 1
def handleDownloads(self, lst, delay):
retries = []
if delay:
nowish = datetime.now(timezone.utc) + timedelta(seconds=-2)
if delay > nowish:
wait = (delay - nowish).seconds + 1
print(f"Some downloads were deferred by the server. Waiting {wait} seconds until retry", end='', flush=True)
ix = 0
while ix < wait:
print('.', end='', flush=True)
ix += 1
time.sleep(1)
print("", flush=True)
newDelay = None
for (url, filename) in lst:
ret = self.downloadFile(url, filename, retries)
if ret is None or isinstance(ret, datetime):
if ret is not None or newDelay is None or ret > newDelay:
newDelay = ret
continue
fname, firmwarecontent = ret
self.handleContent(fname, firmwarecontent)
return retries, newDelay
def downloadFile(self, url, filename, retries):
if filename and os.path.isfile(os.path.join(self.otauPath, filename)):
print(f"{filename} skipped. A file with that name already exists")
return None
response = requests.get(url)
if 'Retry-After' in response.headers:
timestamp = parsedate_to_datetime(response.headers['Date'])
delay = timedelta(seconds=int(response.headers['Retry-After']) + 1)
retries.append((url, filename))
return timestamp + delay
fname: str = filename
if 'Content-Disposition' in response.headers:
contentDisposition = response.headers['Content-Disposition']
contentDisposition = re.findall("filename=(.+)", contentDisposition)[0]
contentDisposition = contentDisposition.split(";")
fname = contentDisposition[0]
return fname, response.content
def handleContent(self, fname, firmwarecontent):
if fname.endswith(self.extensions):
fullname = os.path.join(self.otauPath, fname)
if not os.path.isfile(fullname):
file = open(fullname, "wb")
file.write(firmwarecontent)
file.close()
print(f"{fname} downloaded")
else:
print(f"{fname} skipped. A file with that name already exists")
else:
with tempfile.TemporaryDirectory() as tmpdirname:
fullname = os.path.join(tmpdirname, fname)
if not os.path.isfile(fullname):
file = open(fullname, 'wb')
file.write(firmwarecontent)
file.close()
shutil.unpack_archive(fullname, tmpdirname)
print(f"Downloaded and unpacked {fname}")
for f in self.filteredFilelist(tmpdirname):
target = os.path.join(self.otauPath, os.path.basename(f))
if not os.path.isfile(target):
shutil.copyfile(f, target)
print('Extracted %s' % os.path.basename(f))
else:
print('%s skipped. A file with that name already exists' % os.path.basename(f))
def filteredFilelist(self, rootDir):
return [os.path.join(r, fn)
for r, ds, fs in os.walk(rootDir)
for fn in fs if fn.endswith(self.extensions)]

View File

@ -1,75 +1,19 @@
#!/usr/bin/env python
#!/usr/bin/env python3
"""
Snipped to download current IKEA ZLL OTA files into ~/otau
compatible with python 3.
"""
import os, json, requests, re, time, datetime
try:
from urllib.request import urlopen, urlretrieve
except ImportError:
from urllib2 import urlopen
from urllib import urlretrieve
import datetime
from danfoss import Danfoss
from ikea import Ikea
from ligthify import Lightify
print ('%s - Downloadscript started' % f"{datetime.datetime.now():%Y-%m-%d %H:%M:%S}")
f = urlopen("http://fw.ota.homesmart.ikea.net/feed/version_info.json")
data = f.read()
Danfoss().performDownloads()
Ikea().performDownloads()
Lightify().performDownloads()
arr = json.loads(data)
"""
otapath = '%s/otau' % os.path.expanduser('~')
"""
otapath = '/otau'
if not os.path.exists(otapath):
os.makedirs(otapath)
for i in arr:
if 'fw_binary_url' in i:
url = i['fw_binary_url']
ls = url.split('/')
fname = ls[len(ls) - 1]
path = '%s/%s' % (otapath, fname)
if not os.path.isfile(path):
urlretrieve(url, path)
print('%s - %s downloaded' % (f"{datetime.datetime.now():%Y-%m-%d %H:%M:%S}", path))
else:
print('%s - %s already exists' % (f"{datetime.datetime.now():%Y-%m-%d %H:%M:%S}", fname))
"""
Snipped to download current OSRAM OTA files into ~/otau
compatible with python 3.
"""
response = requests.get("https://api.update.ledvance.com/v1/zigbee/products")
response = json.loads(response.content)
productlist = response['products']
for x in range(len(productlist)):
time.sleep(5)
company = productlist[x].get('id').get('company')
product = productlist[x].get('id').get('product')
url = 'https://api.update.ledvance.com/v1/zigbee/firmwares/download/%s/%s/latest' % (company, product)
response = requests.get(url)
firmwarecontent = response.content
fname = response.headers['Content-Disposition']
fname = re.findall("filename=(.+)", fname)[0]
fname = fname.split(";")
fname = fname[0]
path = '%s/%s' % (otapath, fname)
if not os.path.isfile(path):
file = open(path, "wb")
file.write(firmwarecontent)
file.close()
print('%s - %s downloaded' % (f"{datetime.datetime.now():%Y-%m-%d %H:%M:%S}", path))
else:
print('%s - %s already exists' % (f"{datetime.datetime.now():%Y-%m-%d %H:%M:%S}", fname))
print ('%s - Downloadscript stopped' % f"{datetime.datetime.now():%Y-%m-%d %H:%M:%S}")
print ('%s - Downloadscript finished' % f"{datetime.datetime.now():%Y-%m-%d %H:%M:%S}")

63
ikea.py
View File

@ -1,43 +1,26 @@
#!/usr/bin/env python
"""
Snipped to download current IKEA ZLL OTA files into ~/otau
compatible with python 3.
"""
import os
#!/usr/bin/env python3
import json
try:
from urllib.request import urlopen, urlretrieve
except ImportError:
from urllib2 import urlopen
from urllib import urlretrieve
f = urlopen("http://fw.ota.homesmart.ikea.net/feed/version_info.json")
data = f.read()
arr = json.loads(data)
"""
otapath = '%s/otau' % os.path.expanduser('~')
"""
otapath = '/otau'
if not os.path.exists(otapath):
os.makedirs(otapath)
for i in arr:
if 'fw_binary_url' in i:
url = i['fw_binary_url']
ls = url.split('/')
fname = ls[len(ls) - 1]
path = '%s/%s' % (otapath, fname)
if not os.path.isfile(path):
urlretrieve(url, path)
print(path)
else:
print('%s already exists' % fname)
from urllib.request import urlopen
from downloader import Downloader
class Ikea(Downloader):
def getUrlList(self):
f = urlopen("http://fw.ota.homesmart.ikea.net/feed/version_info.json")
data = f.read()
arr = json.loads(data)
res = []
for i in arr:
if 'fw_binary_url' in i:
url = i['fw_binary_url']
ls = url.split('/')
fname = ls[len(ls) - 1]
res.append((url, fname))
return res

View File

@ -1,36 +1,25 @@
import requests
import json
import re
import time
import os
#!/usr/bin/env python3
otapath = '/otau'
import requests, json, time
from downloader import Downloader
if not os.path.exists(otapath):
os.makedirs(otapath)
class Lightify(Downloader):
response = requests.get("https://api.update.ledvance.com/v1/zigbee/products")
response = json.loads(response.content)
def getUrlList(self):
response = requests.get("https://api.update.ledvance.com/v1/zigbee/products")
if 'Retry-After' in response.headers:
defer = int(response.headers['Retry-After']) + 1
print(f"Waiting {defer} seconds to get ledvance list")
time.sleep(defer)
response = requests.get("https://api.update.ledvance.com/v1/zigbee/products")
productlist = response['products']
for x in range(len(productlist)):
time.sleep(5)
company = productlist[x].get('id').get('company')
product = productlist[x].get('id').get('product')
url = 'https://api.update.ledvance.com/v1/zigbee/firmwares/download/%s/%s/latest' % (company, product)
response = requests.get(url)
firmwarecontent = response.content
fname = response.headers['Content-Disposition']
fname = re.findall("filename=(.+)", fname)[0]
fname = fname.split(";")
fname = fname[0]
path = '%s/%s' % (otapath, fname)
if not os.path.isfile(path):
file = open(path, "wb")
file.write(firmwarecontent)
file.close()
print(path)
else:
print('%s already exists' % fname)
response = json.loads(response.content)
productlist = response['products']
res = []
for x in productlist:
company = x.get('id').get('company')
product = x.get('id').get('product')
res.append(('https://api.update.ledvance.com/v1/zigbee/firmwares/download/%s/%s/latest' % (company, product), None))
return res