add utils for downloading from canvas and for creating timelapses based on historical view

This commit is contained in:
HF 2020-05-02 00:11:42 +02:00
parent 2149326b1a
commit 11672715e2
7 changed files with 433 additions and 0 deletions

2
.gitignore vendored
View File

@ -1,4 +1,6 @@
build
timelapse
output.webm
chunks
players.json
.digitalocean

View File

@ -35,5 +35,21 @@ Converts a proxy list in specific txt format to a better readable list
## imageClean.py
python3 script that takes an input image and cleares spare pixels and bot remains
## areaDownload.py
downloads an area of the canvas into a png file.
Usage: `areaDownload.py startX_startY endX_endY filename.png`
(note that you can copy the current coordinates in this format on the site by pressing R)
## historyDownload.py
downloads the history from an canvas area between two dates.
Useage: `historyDownload.py startX_startY endX_endY start_date end_date
This is used for creating timelapses, see the cmd help to know how
## historyCopy.py
same as historyDownload, just that its designed for running on the storage server by copying the chunks. Also instead of time you define the amount of days you want to make a timelapse of.
## backupSync.sh
shell script that can be launched with backup.js to sync to a storage server after every backup. It uses rsync which is much faster than ftp, sftp or any other methode
## pp-center\*.png
center logo of pixelplanet

183
utils/areaDownload.py Executable file
View File

@ -0,0 +1,183 @@
#!/usr/bin/python3
import PIL.Image
import sys, os, io
import asyncio
import aiohttp
class Color(object):
def __init__(self, index, name, rgb):
self.name = name
self.rgb = rgb
self.index = index
class EnumColorPixelplanet:
ENUM = [
#Color 0 and 1 are unset colors.
#Bot adds +2 to color number. So subtract 2 from the browser inspector to match.
Color(0, 'aryan white', (255, 255, 255, 255)), #HEX FFFFFF
Color(1, 'light gray', (228, 228, 228, 255)), #HEX E4E4E4
Color(2, 'mid gray', (196, 196, 196, 255)), #HEX C4C4C4
Color(3, 'dark gray', (136, 136, 136, 255)), #HEX 888888
Color(4, 'darker gray', (78, 78, 78, 255)), #HEX 4E4E4E
Color(5, 'black', (0, 0, 0, 255)), #HEX 000000
Color(6, 'light peach', (244, 179, 174, 255)), #HEX F4B3AE
Color(7, 'light pink', (255, 167, 209, 255)), #HEX FFA7D1
Color(8, 'pink', (255, 84, 178, 255)), #HEX FF54B2
Color(9, 'peach', (255, 101, 101, 255)), #HEX FF6565
Color(10, 'windmill red', (229, 0, 0, 255)), #HEX E50000
Color(11, 'blood red', (154, 0, 0, 255)), #HEX 9A0000
Color(12, 'orange', (254, 164, 96, 255)), #HEX FEA460
Color(13, 'light brown', (229, 149, 0, 255)), #HEX E59500
Color(14, 'brazil skin', (160, 106, 66, 255)), #HEX A06A42
Color(15, 'nig skin', (96, 64, 40, 255)), #HEX 604028
Color(16, 'normal skin', (245, 223, 176, 255)), #HEX FEDFB0
Color(17, 'yellow', (255, 248, 137, 255)), #HEX FFF889
Color(18, 'dark yellow', (229, 217, 0, 255)), #HEX E5D900
Color(19, 'light green', (148, 224, 68, 255)), #HEX 94E044
Color(20, 'green', (2, 190, 1, 255)), #HEX 02BE01
Color(21, 'dark green', (104, 131, 56, 255)), #HEX 688338
Color(22, 'darker green', (0, 101, 19, 255)), #HEX 006513
Color(23, 'sky blew', (202, 227, 255, 255)), #HEX CAE3FF
Color(24, 'lite blew', (0, 211, 221, 255)), #HEX 00D3DD
Color(25, 'dark blew', (0, 131, 199, 255)), #HEX 0083C7
Color(26, 'blew', (0, 0, 234, 255)), #HEX 0000EA
Color(27, 'darker blew', (25, 25, 115, 255)), #HEX 191973
Color(28, 'light violette', (207, 110, 228, 255)), #HEX CF6EE4
Color(29, 'violette', (130, 0, 128, 255)) #HEX 820080
]
@staticmethod
def index(i):
for color in EnumColorPixelplanet.ENUM:
if i == color.index:
return color
# White is default color
return EnumColorPixelplanet.ENUM[0]
class Matrix:
def __init__(self):
self.start_x = None
self.start_y = None
self.width = None
self.height = None
self.matrix = {}
def add_coords(self, x, y, w, h):
if self.start_x is None or self.start_x > x:
self.start_x = x
if self.start_y is None or self.start_y > y:
self.start_y = y
end_x_a = x + w
end_y_a = y + h
if self.width is None or self.height is None:
self.width = w
self.height = h
else:
end_x_b = self.start_x + self.width
end_y_b = self.start_y + self.height
self.width = max(end_x_b, end_x_a) - self.start_x
self.height = max(end_y_b, end_y_a) - self.start_y
def create_image(self, filename = None):
img = PIL.Image.new('RGBA', (self.width, self.height), (255, 0, 0, 0))
pxls = img.load()
for x in range(self.width):
for y in range(self.height):
try:
color = self.matrix[x + self.start_x][y + self.start_y].rgb
pxls[x, y] = color
except (IndexError, KeyError, AttributeError):
pass
if filename is not None:
if filename == 'b':
b = io.BytesIO()
img.save(b, "PNG")
b.seek(0)
return b
else:
img.save(filename)
else:
img.show()
img.close()
def set_pixel(self, x, y, color):
if x >= self.start_x and x < (self.start_x + self.width) and y >= self.start_y and y < (self.start_y + self.height):
if x not in self.matrix:
self.matrix[x] = {}
self.matrix[x][y] = color
async def fetch(session, ix, iy, target_matrix):
url = 'https://pixelplanet.fun/chunks/0/%s/%s.bmp' % (ix, iy)
attempts = 0
while True:
try:
async with session.get(url) as resp:
data = await resp.read()
offset = int(-256 * 256 / 2)
off_x = ix * 256 + offset
off_y = iy * 256 + offset
if len(data) == 0:
clr = EnumColorPixelplanet.index(23)
for i in range(256*256):
tx = off_x + i % 256
ty = off_y + i // 256
target_matrix.set_pixel(tx, ty, clr)
else:
c = 0
i = 0
for b in data:
tx = off_x + i % 256
ty = off_y + i // 256
if b == 0:
c = 23
elif b == 1:
c = 0
else:
c = b - 2;
target_matrix.set_pixel(tx, ty, EnumColorPixelplanet.index(c))
i += 1
print("Loaded %s with %s pixels" % (url, i))
break
except:
if attempts > 3:
raise
attempts += 1
pass
async def get_area(x, y, w, h):
target_matrix = Matrix()
target_matrix.add_coords(x, y, w, h)
offset = int(-256 * 256 / 2)
xc = (x - offset) // 256
wc = (x + w - offset) // 256
yc = (y - offset) // 256
hc = (y + h - offset) // 256
print("Load from %s / %s to %s / %s" % (xc, yc, wc + 1, hc + 1), "PixelGetter")
tasks = []
async with aiohttp.ClientSession() as session:
for iy in range(yc, hc + 1):
for ix in range(xc, wc + 1):
tasks.append(fetch(session, ix, iy, target_matrix))
await asyncio.gather(*tasks)
return target_matrix
if __name__ == "__main__":
if len(sys.argv) != 4:
print("Download an area of pixelplanet")
print("Usage: areaDownload.py startX_startY endX_endY filename.png")
print("(user R key on pixelplanet to copy coordinates)")
else:
start = sys.argv[1].split('_')
end = sys.argv[2].split('_')
filename = sys.argv[3]
x = int(start[0])
y = int(start[1])
w = int(end[0]) - x
h =int( end[1]) - y
loop = asyncio.get_event_loop()
matrix = loop.run_until_complete(get_area(x, y, w, h))
matrix.create_image(filename)
print("Done!")

109
utils/historyCopy.py Executable file
View File

@ -0,0 +1,109 @@
#!/usr/bin/python3
import PIL.Image
import sys, io, os
import datetime
import json
import threading
# minus half the canvas size
offset = int(-256 * 256 / 2)
class GetDay(threading.Thread):
def __init__(self, x, y, w, h, iter_date, cnt):
threading.Thread.__init__(self)
self.x = x
self.y = y
self.w = w
self.h = h
self.iter_date = iter_date
self.cnt = cnt * 1000
self.daemon = True
def run(self):
iter_date = self.iter_date
x = self.x
y = self.y
w = self.w
h = self.h
cnt = self.cnt
xc = (x - offset) // 256
wc = (x + w - offset) // 256
yc = (y - offset) // 256
hc = (y + h - offset) // 256
print('------------------------------------------------')
print('Getting frames for date %s' % (iter_date))
image = PIL.Image.new('RGBA', (w, h))
for iy in range(yc, hc + 1):
for ix in range(xc, wc + 1):
path = './canvas/%s/0/tiles/%s/%s.png' % (iter_date, ix, iy)
offx = ix * 256 + offset - x
offy = iy * 256 + offset - y
img = PIL.Image.open(path).convert('RGBA')
image.paste(img, (offx, offy), img)
img.close()
print('Got start of day')
cnt += 1
image.save('./timelapse/t%06d.png' % (cnt))
time_list = os.listdir('./canvas/%s/%s' % (iter_date, 0))
for time in time_list:
if time == 'tiles':
continue
for iy in range(yc, hc + 1):
for ix in range(xc, wc + 1):
path = './canvas/%s/0/%s/%s/%s.png' % (iter_date, time, ix, iy)
if not os.path.exists(path):
continue
offx = ix * 256 + offset - x
offy = iy * 256 + offset - y
img = PIL.Image.open(path).convert('RGBA')
image.paste(img, (offx, offy), img)
img.close()
print('Got time %s' % (time))
cnt += 1
image.save('./timelapse/t%06d.png' % (cnt))
image.close()
def get_area(x, y, w, h, start_date, end_date):
delta = datetime.timedelta(days=1)
end_date = end_date.strftime("%Y%m%d")
iter_date = None
cnt = 0
threads = []
while iter_date != end_date:
iter_date = start_date.strftime("%Y%m%d")
start_date = start_date + delta
thread = GetDay(x, y, w, h, iter_date, cnt)
thread.start()
threads.append(thread)
cnt += 1
for t in threads:
t.join()
if __name__ == "__main__":
if len(sys.argv) != 4 and len(sys.argv) != 5:
print("Download history of an area of pixelplanet - useful for timelapses")
print("Usage: historyDownload.py startX_startY endX_endY amount_days")
print("→start_date and end_date are in YYYY-MM-dd formate")
print("→user R key on pixelplanet to copy coordinates)")
print("→images will be saved into timelapse folder)")
print("-----------")
print("You can create a timelapse from the resulting files with ffmpeg like that:")
print("ffmpeg -framerate 15 -f image2 -i timelapse/t%06d.png -c:v libx264 -pix_fmt yuva420p output.mp4")
else:
start = sys.argv[1].split('_')
end = sys.argv[2].split('_')
amount_days = datetime.timedelta(days=int(sys.argv[3]));
end_date = datetime.date.today()
start_date = end_date - amount_days
x = int(start[0])
y = int(start[1])
w = int(end[0]) - x
h =int( end[1]) - y
if not os.path.exists('./timelapse'):
os.mkdir('./timelapse')
get_area(x, y, w, h, start_date, end_date)
print("Done!")
print("to create a timelapse from it:")
print("ffmpeg -framerate 15 -f image2 -i timelapse/t%06d.png -c:v libx264 -pix_fmt yuva420p output.mp4")

123
utils/historyDownload.py Executable file
View File

@ -0,0 +1,123 @@
#!/usr/bin/python3
import PIL.Image
import sys, io, os
import datetime
import asyncio
import aiohttp
import json
canvas_size = 256*256
canvas_id = 0
frameskip = 1
async def fetch(session, url, offx, offy, image, needed = False):
attempts = 0
while True:
try:
async with session.get(url) as resp:
if resp.status == 404:
if needed:
img = PIL.Image.new('RGB', (256, 256), color = (202, 227, 255))
image.paste(img, (offx, offy))
img.close()
return
if resp.status != 200:
if needed:
continue
return
data = await resp.read()
img = PIL.Image.open(io.BytesIO(data)).convert('RGBA')
image.paste(img, (offx, offy), img)
img.close()
return
except:
if attempts > 3:
raise
attempts += 1
pass
async def get_area(x, y, w, h, start_date, end_date):
offset = int(-canvas_size / 2)
xc = (x - offset) // 256
wc = (x + w - offset) // 256
yc = (y - offset) // 256
hc = (y + h - offset) // 256
print("Load from %s / %s to %s / %s" % (xc, yc, wc + 1, hc + 1))
delta = datetime.timedelta(days=1)
end_date = end_date.strftime("%Y%m%d")
iter_date = None
cnt = 0
#frames = []
while iter_date != end_date:
iter_date = start_date.strftime("%Y%m%d")
print('------------------------------------------------')
print('Getting frames for date %s' % (iter_date))
start_date = start_date + delta
tasks = []
async with aiohttp.ClientSession() as session:
image = PIL.Image.new('RGBA', (w, h))
for iy in range(yc, hc + 1):
for ix in range(xc, wc + 1):
url = 'https://storage.pixelplanet.fun/%s/%s/tiles/%s/%s.png' % (iter_date, canvas_id, ix, iy)
offx = ix * 256 + offset - x
offy = iy * 256 + offset - y
tasks.append(fetch(session, url, offx, offy, image, True))
await asyncio.gather(*tasks)
print('Got start of day')
cnt += 1
#frames.append(image.copy())
image.save('./timelapse/t%s.png' % (cnt))
async with session.get('https://pixelplanet.fun/api/history?day=%s&id=%s' % (iter_date, canvas_id)) as resp:
time_list = json.loads(await resp.text())
i = 0
for time in time_list:
i += 1
if (i % frameskip) != 0:
continue
tasks = []
for iy in range(yc, hc + 1):
for ix in range(xc, wc + 1):
url = 'https://storage.pixelplanet.fun/%s/%s/%s/%s/%s.png' % (iter_date, canvas_id, time, ix, iy)
offx = ix * 256 + offset - x
offy = iy * 256 + offset - y
tasks.append(fetch(session, url, offx, offy, image))
await asyncio.gather(*tasks)
print('Got time %s' % (time))
cnt += 1
#frames.append(image.copy())
image.save('./timelapse/t%s.png' % (cnt))
image.close()
# this would save a gif right out of the script, but giffs are huge and not good
#frames[0].save('timelapse.png', save_all=True, append_images=frames[1:], duration=100, loop=0, default_image=False, blend=1)
if __name__ == "__main__":
if len(sys.argv) != 4 and len(sys.argv) != 5:
print("Download history of an area of pixelplanet - useful for timelapses")
print("Usage: historyDownload.py startX_startY endX_endY start_date [end_date]")
print("→start_date and end_date are in YYYY-MM-dd formate")
print("→user R key on pixelplanet to copy coordinates)")
print("→images will be saved into timelapse folder)")
print("-----------")
print("You can create a timelapse from the resulting files with ffmpeg like that:")
print("ffmpeg -framerate 15 -f image2 -i timelapse/t%d.png -c:v libvpx-vp9 -pix_fmt yuva420p output.webm")
else:
start = sys.argv[1].split('_')
end = sys.argv[2].split('_')
start_date = datetime.date.fromisoformat(sys.argv[3])
if len(sys.argv) == 5:
end_date = datetime.date.fromisoformat(sys.argv[4])
else:
end_date = datetime.date.today()
x = int(start[0])
y = int(start[1])
w = int(end[0]) - x
h =int( end[1]) - y
loop = asyncio.get_event_loop()
if not os.path.exists('./timelapse'):
os.mkdir('./timelapse')
loop.run_until_complete(get_area(x, y, w, h, start_date, end_date))
print("Done!")
print("to create a timelapse from it:")
print("ffmpeg -framerate 15 -f image2 -i timelapse/t%d.png -c:v libvpx-vp9 -pix_fmt yuva420p output.webm")

Binary file not shown.

After

Width:  |  Height:  |  Size: 26 KiB

BIN
utils/pp-center-331-337.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 32 KiB