pixelplanet/utils/historyDownload.py
HF e108a23a83 use api/me for historyDownload.py
camel_case canvasID in areaDownload.py
add parameters to USER_AGENT in both
2023-10-01 01:48:40 +02:00

223 lines
8.7 KiB
Python
Executable File

#!/usr/bin/python3
import PIL.Image
import sys, io, os
import datetime
import asyncio
import aiohttp
USER_AGENT = "ppfun historyDownload 1.0 " + ' '.join(sys.argv[1:])
PPFUN_URL = "https://pixelplanet.fun"
PPFUN_STORAGE_URL = "https://storage.pixelplanet.fun"
# how many frames to skip
# 1 means none
# 2 means that every second frame gets captured
# 3 means every third
# [...]
frameskip = 1
async def fetchMe():
url = f"{PPFUN_URL}/api/me"
headers = {
'User-Agent': USER_AGENT
}
async with aiohttp.ClientSession() as session:
attempts = 0
while True:
try:
async with session.get(url, headers=headers) as resp:
data = await resp.json()
return data
except:
if attempts > 3:
print(f"Could not get {url} in three tries, cancelling")
raise
attempts += 1
print(f"Failed to load {url}, trying again in 5s")
await asyncio.sleep(5)
pass
async def fetch(session, url, offx, offy, image, bkg, needed = False):
attempts = 0
headers = {
'User-Agent': USER_AGENT
}
while True:
try:
async with session.get(url, headers=headers) as resp:
if resp.status == 404:
if needed:
img = PIL.Image.new('RGB', (256, 256), color=bkg)
image.paste(img, (offx, offy))
img.close()
return
if resp.status != 200:
if needed:
continue
return
data = await resp.read()
img = PIL.Image.open(io.BytesIO(data)).convert('RGBA')
image.paste(img, (offx, offy), img)
img.close()
return
except:
if attempts > 3:
raise
attempts += 1
pass
async def get_area(canvas_id, canvas, x, y, w, h, start_date, end_date):
canvas_size = canvas["size"]
bkg = tuple(canvas['colors'][0])
delta = datetime.timedelta(days=1)
end_date = end_date.strftime("%Y%m%d")
iter_date = None
cnt = 0
#frames = []
previous_day = PIL.Image.new('RGB', (w, h), color=bkg)
while iter_date != end_date:
iter_date = start_date.strftime("%Y%m%d")
print('------------------------------------------------')
print('Getting frames for date %s' % (iter_date))
start_date = start_date + delta
fetch_canvas_size = canvas_size
if 'historicalSizes' in canvas:
for ts in canvas['historicalSizes']:
date = ts[0]
size = ts[1]
if iter_date <= date:
fetch_canvas_size = size
offset = int(-fetch_canvas_size / 2)
xc = (x - offset) // 256
wc = (x + w - offset) // 256
yc = (y - offset) // 256
hc = (y + h - offset) // 256
print("Load from %s / %s to %s / %s with canvas size %s" % (xc, yc, wc + 1, hc + 1, fetch_canvas_size))
tasks = []
async with aiohttp.ClientSession() as session:
image = PIL.Image.new('RGBA', (w, h))
for iy in range(yc, hc + 1):
for ix in range(xc, wc + 1):
url = '%s/%s/%s/%s/%s/tiles/%s/%s.png' % (PPFUN_STORAGE_URL, iter_date[0:4], iter_date[4:6] , iter_date[6:], canvas_id, ix, iy)
offx = ix * 256 + offset - x
offy = iy * 256 + offset - y
tasks.append(fetch(session, url, offx, offy, image, bkg, True))
await asyncio.gather(*tasks)
print('Got start of day')
# check if image is all just one color to lazily detect if whole full backup was 404
clr = image.getcolors(1)
if clr is not None:
print("Got faulty full-backup frame, using last frame from previous day instead.")
image = previous_day.copy()
cnt += 1
#frames.append(image.copy())
image.save('./timelapse/t%s.png' % (cnt))
headers = {
'User-Agent': USER_AGENT
}
while True:
async with session.get('%s/history?day=%s&id=%s' % (PPFUN_URL, iter_date, canvas_id), headers=headers) as resp:
try:
time_list = await resp.json()
break
except:
print('Couldn\'t decode json for day %s, trying again' % (iter_date))
i = 0
for time in time_list:
i += 1
if (i % frameskip) != 0:
continue
if time == '0000':
# 0000 incremential backups are faulty
continue
tasks = []
image_rel = image.copy()
for iy in range(yc, hc + 1):
for ix in range(xc, wc + 1):
url = '%s/%s/%s/%s/%s/%s/%s/%s.png' % (PPFUN_STORAGE_URL, iter_date[0:4], iter_date[4:6] , iter_date[6:], canvas_id, time, ix, iy)
offx = ix * 256 + offset - x
offy = iy * 256 + offset - y
tasks.append(fetch(session, url, offx, offy, image_rel, bkg))
await asyncio.gather(*tasks)
print('Got time %s' % (time))
cnt += 1
#frames.append(image.copy())
image_rel.save('./timelapse/t%s.png' % (cnt))
if time == time_list[-1]:
# if last element of day, copy it to previous_day to reuse it when needed
print("Remembering last frame of day.")
previous_day.close()
previous_day = image_rel.copy();
image_rel.close()
image.close()
previous_day.close()
# this would save a gif right out of the script, but giffs are huge and not good
#frames[0].save('timelapse.png', save_all=True, append_images=frames[1:], duration=100, loop=0, default_image=False, blend=1)
async def main():
apime = await fetchMe()
if len(sys.argv) != 5 and len(sys.argv) != 6:
print("Download history of an area of pixelplanet - useful for timelapses")
print("")
print("Usage: historyDownload.py canvasID startX_startY endX_endY start_date [end_date]")
print("")
print("→start_date and end_date are in YYYY-MM-dd formate")
print("→user R key on pixelplanet to copy coordinates)")
print("→images will be saved into timelapse folder)")
print("canvasID: ", end='')
for canvas_id, canvas in apime['canvases'].items():
if 'v' in canvas and canvas['v']:
continue
print(f"{canvas_id} = {canvas['title']}", end=', ')
print()
print("-----------")
print("You can create a timelapse from the resulting files with ffmpeg like that:")
print("ffmpeg -framerate 15 -f image2 -i timelapse/t%d.png -c:v libvpx-vp9 -pix_fmt yuva420p output.webm")
print("or lossless example:")
print("ffmpeg -framerate 15 -f image2 -i timelapse/t%d.png -c:v libvpx-vp9 -pix_fmt yuv444p -qmin 0 -qmax 0 -lossless 1 -an output.webm")
return
canvas_id = sys.argv[1]
if canvas_id not in apime['canvases']:
print("Invalid canvas selected")
return
canvas = apime['canvases'][canvas_id]
if 'v' in canvas and canvas['v']:
print("Can\'t get area for 3D canvas")
return
start = sys.argv[2].split('_')
end = sys.argv[3].split('_')
start_date = datetime.date.fromisoformat(sys.argv[4])
if len(sys.argv) == 6:
end_date = datetime.date.fromisoformat(sys.argv[5])
else:
end_date = datetime.date.today()
x = int(start[0])
y = int(start[1])
w = int(end[0]) - x + 1
h = int( end[1]) - y + 1
if not os.path.exists('./timelapse'):
os.mkdir('./timelapse')
await get_area(canvas_id, canvas, x, y, w, h, start_date, end_date)
print("Done!")
print("to create a timelapse from it:")
print("ffmpeg -framerate 15 -f image2 -i timelapse/t%d.png -c:v libvpx-vp9 -pix_fmt yuva420p output.webm")
print("example with scaling *3 and audio track:")
print("ffmpeg -i ./audio.mp3 -framerate 8 -f image2 -i timelapse/t%d.png -map 0:a -map 1:v -vf scale=iw*3:-1 -shortest -c:v libvpx-vp9 -c:a libvorbis -pix_fmt yuva420p output.webm")
print("lossless example:")
print("ffmpeg -framerate 15 -f image2 -i timelapse/t%d.png -c:v libvpx-vp9 -pix_fmt yuv444p -qmin 0 -qmax 0 -lossless 1 -an output.webm")
if __name__ == "__main__":
asyncio.run(main())