2020-05-01 22:11:42 +00:00
|
|
|
#!/usr/bin/python3
|
|
|
|
|
|
|
|
import PIL.Image
|
2023-09-30 22:03:57 +00:00
|
|
|
import sys, os, io, math
|
2020-05-01 22:11:42 +00:00
|
|
|
import asyncio
|
|
|
|
import aiohttp
|
2023-09-25 21:57:31 +00:00
|
|
|
|
2023-09-30 23:48:40 +00:00
|
|
|
USER_AGENT = "ppfun areaDownload 1.0 " + ' '.join(sys.argv[1:])
|
2023-09-30 22:03:57 +00:00
|
|
|
PPFUN_URL = "https://pixelplanet.fun"
|
2020-05-01 22:11:42 +00:00
|
|
|
|
|
|
|
class Color(object):
|
2023-09-25 21:57:31 +00:00
|
|
|
def __init__(self, index, rgb):
|
2020-05-01 22:11:42 +00:00
|
|
|
self.rgb = rgb
|
|
|
|
self.index = index
|
|
|
|
|
|
|
|
class EnumColorPixelplanet:
|
|
|
|
|
2023-09-25 21:57:31 +00:00
|
|
|
ENUM = []
|
2020-05-01 22:11:42 +00:00
|
|
|
|
2023-09-30 22:03:57 +00:00
|
|
|
def getColors(canvas):
|
|
|
|
colors = canvas['colors']
|
2023-09-25 21:57:31 +00:00
|
|
|
for i, color in enumerate(colors):
|
|
|
|
EnumColorPixelplanet.ENUM.append(Color(i, tuple(color)))
|
|
|
|
|
2020-05-01 22:11:42 +00:00
|
|
|
@staticmethod
|
|
|
|
def index(i):
|
|
|
|
for color in EnumColorPixelplanet.ENUM:
|
|
|
|
if i == color.index:
|
|
|
|
return color
|
|
|
|
return EnumColorPixelplanet.ENUM[0]
|
|
|
|
|
|
|
|
class Matrix:
|
|
|
|
def __init__(self):
|
|
|
|
self.start_x = None
|
|
|
|
self.start_y = None
|
|
|
|
self.width = None
|
|
|
|
self.height = None
|
|
|
|
self.matrix = {}
|
|
|
|
|
|
|
|
def add_coords(self, x, y, w, h):
|
|
|
|
if self.start_x is None or self.start_x > x:
|
|
|
|
self.start_x = x
|
|
|
|
if self.start_y is None or self.start_y > y:
|
|
|
|
self.start_y = y
|
|
|
|
end_x_a = x + w
|
|
|
|
end_y_a = y + h
|
|
|
|
if self.width is None or self.height is None:
|
|
|
|
self.width = w
|
|
|
|
self.height = h
|
|
|
|
else:
|
|
|
|
end_x_b = self.start_x + self.width
|
|
|
|
end_y_b = self.start_y + self.height
|
|
|
|
self.width = max(end_x_b, end_x_a) - self.start_x
|
|
|
|
self.height = max(end_y_b, end_y_a) - self.start_y
|
|
|
|
|
|
|
|
def create_image(self, filename = None):
|
|
|
|
img = PIL.Image.new('RGBA', (self.width, self.height), (255, 0, 0, 0))
|
|
|
|
pxls = img.load()
|
|
|
|
for x in range(self.width):
|
|
|
|
for y in range(self.height):
|
|
|
|
try:
|
|
|
|
color = self.matrix[x + self.start_x][y + self.start_y].rgb
|
|
|
|
pxls[x, y] = color
|
|
|
|
except (IndexError, KeyError, AttributeError):
|
|
|
|
pass
|
|
|
|
if filename is not None:
|
|
|
|
if filename == 'b':
|
|
|
|
b = io.BytesIO()
|
|
|
|
img.save(b, "PNG")
|
|
|
|
b.seek(0)
|
|
|
|
return b
|
|
|
|
else:
|
|
|
|
img.save(filename)
|
|
|
|
else:
|
|
|
|
img.show()
|
|
|
|
img.close()
|
|
|
|
|
|
|
|
def set_pixel(self, x, y, color):
|
|
|
|
if x >= self.start_x and x < (self.start_x + self.width) and y >= self.start_y and y < (self.start_y + self.height):
|
|
|
|
if x not in self.matrix:
|
|
|
|
self.matrix[x] = {}
|
|
|
|
self.matrix[x][y] = color
|
|
|
|
|
2023-09-30 22:03:57 +00:00
|
|
|
async def fetchMe():
|
|
|
|
url = f"{PPFUN_URL}/api/me"
|
|
|
|
headers = {
|
|
|
|
'User-Agent': USER_AGENT
|
|
|
|
}
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
|
|
attempts = 0
|
|
|
|
while True:
|
|
|
|
try:
|
|
|
|
async with session.get(url, headers=headers) as resp:
|
|
|
|
data = await resp.json()
|
|
|
|
return data
|
|
|
|
except:
|
|
|
|
if attempts > 3:
|
|
|
|
print(f"Could not get {url} in three tries, cancelling")
|
|
|
|
raise
|
|
|
|
attempts += 1
|
|
|
|
print(f"Failed to load {url}, trying again in 5s")
|
|
|
|
await asyncio.sleep(5)
|
|
|
|
pass
|
|
|
|
|
2023-09-30 23:48:40 +00:00
|
|
|
async def fetch(session, canvas_id, canvasoffset, ix, iy, target_matrix):
|
|
|
|
url = f"{PPFUN_URL}/chunks/{canvas_id}/{ix}/{iy}.bmp"
|
2023-09-30 22:03:57 +00:00
|
|
|
headers = {
|
|
|
|
'User-Agent': USER_AGENT
|
|
|
|
}
|
2020-05-01 22:11:42 +00:00
|
|
|
attempts = 0
|
|
|
|
while True:
|
|
|
|
try:
|
2023-09-30 22:03:57 +00:00
|
|
|
async with session.get(url, headers=headers) as resp:
|
2020-05-01 22:11:42 +00:00
|
|
|
data = await resp.read()
|
2023-09-25 21:57:31 +00:00
|
|
|
offset = int(-canvasoffset * canvasoffset / 2)
|
2020-05-01 22:11:42 +00:00
|
|
|
off_x = ix * 256 + offset
|
|
|
|
off_y = iy * 256 + offset
|
|
|
|
if len(data) == 0:
|
2023-09-25 21:57:31 +00:00
|
|
|
clr = EnumColorPixelplanet.index(0)
|
2020-05-01 22:11:42 +00:00
|
|
|
for i in range(256*256):
|
|
|
|
tx = off_x + i % 256
|
|
|
|
ty = off_y + i // 256
|
|
|
|
target_matrix.set_pixel(tx, ty, clr)
|
|
|
|
else:
|
|
|
|
i = 0
|
|
|
|
for b in data:
|
2023-09-25 21:57:31 +00:00
|
|
|
tx = off_x + i % 256
|
2020-05-01 22:11:42 +00:00
|
|
|
ty = off_y + i // 256
|
2020-11-29 23:38:28 +00:00
|
|
|
bcl = b & 0x7F
|
2023-09-25 21:57:31 +00:00
|
|
|
target_matrix.set_pixel(tx, ty, EnumColorPixelplanet.index(bcl))
|
2020-05-01 22:11:42 +00:00
|
|
|
i += 1
|
2023-09-25 21:57:31 +00:00
|
|
|
print(f"Loaded {url} with {i} pixels")
|
2020-05-01 22:11:42 +00:00
|
|
|
break
|
|
|
|
except:
|
|
|
|
if attempts > 3:
|
2023-09-30 22:03:57 +00:00
|
|
|
print(f"Could not get {url} in three tries, cancelling")
|
2020-05-01 22:11:42 +00:00
|
|
|
raise
|
|
|
|
attempts += 1
|
2023-09-30 22:03:57 +00:00
|
|
|
print(f"Failed to load {url}, trying again in 3s")
|
|
|
|
await asyncio.sleep(3)
|
2020-05-01 22:11:42 +00:00
|
|
|
pass
|
|
|
|
|
2023-09-30 23:48:40 +00:00
|
|
|
async def get_area(canvas_id, canvas, x, y, w, h):
|
2020-05-01 22:11:42 +00:00
|
|
|
target_matrix = Matrix()
|
|
|
|
target_matrix.add_coords(x, y, w, h)
|
2023-09-30 22:03:57 +00:00
|
|
|
canvasoffset = math.pow(canvas['size'], 0.5)
|
2023-09-25 21:57:31 +00:00
|
|
|
offset = int(-canvasoffset * canvasoffset / 2)
|
2020-05-01 22:11:42 +00:00
|
|
|
xc = (x - offset) // 256
|
|
|
|
wc = (x + w - offset) // 256
|
|
|
|
yc = (y - offset) // 256
|
|
|
|
hc = (y + h - offset) // 256
|
2023-09-25 21:57:31 +00:00
|
|
|
print(f"Loading from {xc} / {yc} to {wc + 1} / {hc + 1} PixelGetter")
|
2020-05-01 22:11:42 +00:00
|
|
|
tasks = []
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
|
|
for iy in range(yc, hc + 1):
|
|
|
|
for ix in range(xc, wc + 1):
|
2023-09-30 23:48:40 +00:00
|
|
|
tasks.append(fetch(session, canvas_id, canvasoffset, ix, iy, target_matrix))
|
2020-05-01 22:11:42 +00:00
|
|
|
await asyncio.gather(*tasks)
|
|
|
|
return target_matrix
|
|
|
|
|
2023-09-25 21:57:31 +00:00
|
|
|
def validateCoorRange(ulcoor: str, brcoor: str, canvasSize: int): # stolen from hf with love
|
|
|
|
if not ulcoor or not brcoor:
|
2023-09-30 22:03:57 +00:00
|
|
|
return "Not all coordinates defined"
|
2023-09-25 21:57:31 +00:00
|
|
|
splitCoords = ulcoor.strip().split('_')
|
|
|
|
if not len(splitCoords) == 2:
|
2023-09-30 22:03:57 +00:00
|
|
|
return "Invalid Coordinate Format for top-left corner"
|
2023-09-25 21:57:31 +00:00
|
|
|
|
|
|
|
x, y = map(lambda z: int(math.floor(float(z))), splitCoords)
|
|
|
|
|
|
|
|
splitCoords = brcoor.strip().split('_')
|
|
|
|
if not len(splitCoords) == 2:
|
2023-09-30 22:03:57 +00:00
|
|
|
return "Invalid Coordinate Format for top-left corner"
|
2023-09-25 21:57:31 +00:00
|
|
|
u, v = map(lambda z: int(math.floor(float(z))), splitCoords)
|
|
|
|
|
|
|
|
error = None
|
|
|
|
|
|
|
|
if (math.isnan(x)):
|
2023-09-30 22:03:57 +00:00
|
|
|
error = "x of top-left corner is not a valid number"
|
2023-09-25 21:57:31 +00:00
|
|
|
elif (math.isnan(y)):
|
2023-09-30 22:03:57 +00:00
|
|
|
error = "y of top-left corner is not a valid number"
|
2023-09-25 21:57:31 +00:00
|
|
|
elif (math.isnan(u)):
|
2023-09-30 22:03:57 +00:00
|
|
|
error = "x of bottom-right corner is not a valid number"
|
2023-09-25 21:57:31 +00:00
|
|
|
elif (math.isnan(v)):
|
2023-09-30 22:03:57 +00:00
|
|
|
error = "y of bottom-right corner is not a valid number"
|
2023-09-25 21:57:31 +00:00
|
|
|
elif (u < x or v < y):
|
2023-09-30 22:03:57 +00:00
|
|
|
error = "Corner coordinates are aligned wrong"
|
2023-09-25 21:57:31 +00:00
|
|
|
|
|
|
|
if not error is None:
|
|
|
|
return error
|
|
|
|
|
|
|
|
canvasMaxXY = canvasSize / 2
|
|
|
|
canvasMinXY = -canvasMaxXY
|
|
|
|
|
|
|
|
if (x < canvasMinXY or y < canvasMinXY or x >= canvasMaxXY or y >= canvasMaxXY):
|
2023-09-30 22:03:57 +00:00
|
|
|
return "Coordinates of top-left corner are outside of canvas"
|
2023-09-25 21:57:31 +00:00
|
|
|
if (u < canvasMinXY or v < canvasMinXY or u >= canvasMaxXY or v >= canvasMaxXY):
|
2023-09-30 22:03:57 +00:00
|
|
|
return "Coordinates of bottom-right corner are outside of canvas"
|
2023-09-25 21:57:31 +00:00
|
|
|
|
|
|
|
return (x, y, u, v)
|
2020-05-01 22:11:42 +00:00
|
|
|
|
2023-09-30 22:03:57 +00:00
|
|
|
async def main():
|
|
|
|
apime = await fetchMe()
|
|
|
|
|
2023-09-25 21:57:31 +00:00
|
|
|
if len(sys.argv) != 5:
|
2020-05-01 22:11:42 +00:00
|
|
|
print("Download an area of pixelplanet")
|
2023-09-25 21:57:31 +00:00
|
|
|
print("Usage: areaDownload.py canvasID startX_startY endX_endY filename.png")
|
|
|
|
print("(use R key on pixelplanet to copy coordinates)")
|
2023-09-30 22:03:57 +00:00
|
|
|
print("canvasID: ", end='')
|
2023-09-30 23:48:40 +00:00
|
|
|
for canvas_id, canvas in apime['canvases'].items():
|
2023-09-30 22:03:57 +00:00
|
|
|
if 'v' in canvas and canvas['v']:
|
2023-09-25 21:57:31 +00:00
|
|
|
continue
|
2023-09-30 23:48:40 +00:00
|
|
|
print(f"{canvas_id} = {canvas['title']}", end=', ')
|
2023-09-25 21:57:31 +00:00
|
|
|
print()
|
2023-09-30 22:03:57 +00:00
|
|
|
return
|
2023-09-25 21:57:31 +00:00
|
|
|
|
2023-09-30 23:48:40 +00:00
|
|
|
canvas_id = sys.argv[1]
|
2023-09-25 21:57:31 +00:00
|
|
|
|
2023-09-30 23:48:40 +00:00
|
|
|
if canvas_id not in apime['canvases']:
|
2023-09-30 22:03:57 +00:00
|
|
|
print("Invalid canvas selected")
|
|
|
|
return
|
2023-09-25 21:57:31 +00:00
|
|
|
|
2023-09-30 23:48:40 +00:00
|
|
|
canvas = apime['canvases'][canvas_id]
|
2023-09-25 21:57:31 +00:00
|
|
|
|
2023-09-30 22:03:57 +00:00
|
|
|
if 'v' in canvas and canvas['v']:
|
|
|
|
print("Can\'t get area for 3D canvas")
|
|
|
|
return
|
2023-09-25 21:57:31 +00:00
|
|
|
|
2023-09-30 22:03:57 +00:00
|
|
|
parseCoords = validateCoorRange(sys.argv[2], sys.argv[3], canvas['size'])
|
|
|
|
|
|
|
|
if (type(parseCoords) is str):
|
|
|
|
print(parseCoords)
|
|
|
|
sys.exit()
|
|
|
|
else:
|
|
|
|
x, y, w, h = parseCoords
|
|
|
|
w = w - x + 1
|
|
|
|
h = h - y + 1
|
|
|
|
|
|
|
|
EnumColorPixelplanet.getColors(canvas)
|
|
|
|
filename = sys.argv[4]
|
|
|
|
|
2023-09-30 23:48:40 +00:00
|
|
|
matrix = await get_area(canvas_id, canvas, x, y, w, h)
|
2023-09-30 22:03:57 +00:00
|
|
|
matrix.create_image(filename)
|
|
|
|
print("Done!")
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
asyncio.run(main())
|