# # Python3 Module for matrix-synapse # # All the self.api._underline using methods could break on updates # from typing import Awaitable, Callable, Optional, Tuple, List import logging import synapse from synapse import module_api from synapse.types import RoomAlias from synapse.push.bulk_push_rule_evaluator import BulkPushRuleEvaluator from synapse.api.constants import EventTypes, Membership logger = logging.getLogger(__name__) def monkeyPatchPushRules(store): """Monkey patching BulkPushRuleEvaluator to not send notifications if message is in pp_ channel, otherwise event_push_actions will get filled up with unread notification for pp_ users, creating and infinite growing database """ originalFunction = BulkPushRuleEvaluator._action_for_event_by_user # cache for whether room is bridge room that gets notifications blocked or not room_block_notif = {} async def monkeyPatch(self, event, context, event_id_to_event): # always notify on invites if event.type != EventTypes.Member or event.membership != Membership.INVITE: room_id = event.room_id if room_id in room_block_notif: if room_block_notif[room_id]: return else: aliases = await store.get_aliases_for_room(room_id) for alias in aliases: if alias.startswith('#pp_'): room_block_notif[room_id] = True return room_block_notif[room_id] = False return await originalFunction(self, event, context, event_id_to_event) BulkPushRuleEvaluator._action_for_event_by_user = monkeyPatch logger.info('Monkey patched BulkPushRuleEvaluator') class PPfunAuthProvider: def __init__(self, config: dict, api: module_api): self.api = api if 'ppfunurl' not in config: raise Exception('Pixelplanet ppfunurl not configured') self.ppfunurl = config["ppfunurl"] if 'apisocketkey' not in config: raise Exception('Pixelplanet apisocketkey not configured') self.apisocketkey = config["apisocketkey"] if 'verified' in config: self.check_verified = config['verified'] else: self.check_verified = False if 'block_notify' in config and config['block_notify']: monkeyPatchPushRules(api._store) self.autojoin_rooms = [] if 'autojoin_rooms' in config: self.parsed_rooms = False self.autojoin_rooms = config["autojoin_rooms"] api.register_password_auth_provider_callbacks( check_3pid_auth = self.check_3pid_pass, auth_checkers = { ("m.login.password", ("password",)): self.check_pass, }, ) async def translate_room_aliases_to_ids( self, room_aliases: List[str], ) -> List[str]: """Resole list of room_aliases to room_ids """ room_ids = [] logger.info('Translating room_aliases to ids') for alias in room_aliases: try: (room_id, hosts) = await self.api.lookup_room_alias(alias) logger.info('Map alias %s to room %s', alias, room_id) room_ids.append(room_id) except: logger.warning('Could not resolve room %s', alias) return room_ids; async def check_credentials( self, query, ) -> Optional[ Tuple[ int, str, Optional[str] ] ]: try: resp = await self.api.http_client.post_json_get_json( self.ppfunurl + '/adminapi/checklogin', query, [[ "authorization", ['Bearer ' + self.apisocketkey] ]], ) if not resp["success"]: raise Exception(resp["errors"][0]) userdata = resp['userdata'] if self.check_verified and not userdata['verified']: raise Exception('User is not mail verified'); return (userdata['id'], userdata['name'], userdata['email']) except Exception as e: logger.warning('Could not login via ppfun: %s', e) return None async def set_email( self, user_id: str, email: str ) -> bool: """making sure that email is the same between ppfun and matrix user """ threepid_dict_list = await self.api.get_threepids_for_user(user_id) for threepid_dict in threepid_dict_list: if threepid_dict['address'] == email: logger.info('User %s has same email as in ppfun. Nice', user_id) return False logger.info('User %s did not have email set, setting it to %s', user_id, email) current_time = self.api._hs.get_clock().time_msec() threepid_dict = { "medium": "email", "address": email, "validated_at": current_time, } registration_handler = self.api._hs.get_registration_handler() await registration_handler._register_email_threepid(user_id, threepid_dict, None) return True async def join_auto_rooms( self, user_id: str, ) -> None: if not self.autojoin_rooms: return if not self.parsed_rooms: self.autojoin_rooms = await self.translate_room_aliases_to_ids(self.autojoin_rooms) self.parsed_rooms = True for room_id in self.autojoin_rooms: await self.api.update_room_membership(user_id, user_id, room_id, 'join') async def login( self, ppfun_id: str, ppfun_name: str, ppfun_email: str, ) -> Optional[ Tuple[ str, Optional[Callable[["synapse.module_api.LoginResponse"], Awaitable[None]]], ] ]: localpart = f'pp_{ppfun_id}' user_id = self.api.get_qualified_user_id(localpart) logger.info('check if user %s exists', user_id) does_exist = await self.api.check_user_exists(user_id) if does_exist is None: logger.info('User %s does not exist yet, registering new user', user_id) emails = None if ppfun_email: emails = [ppfun_email] try: user_id = await self.api.register_user(localpart, ppfun_name, emails) except Exception as e: logger.warning('Could not create user %s, because %s', user_id, e) return None else: await self.set_email(user_id, ppfun_email) await self.join_auto_rooms(user_id) logger.info('User %s logged in via ppfun %s', user_id, ppfun_name) return user_id, None async def check_3pid_pass( self, medium: str, address: str, password: str, ) -> Optional[ Tuple[ str, Optional[Callable[["synapse.module_api.LoginResponse"], Awaitable[None]]], ] ]: if medium != "email" or not address: return None ppfun_userdata = await self.check_credentials({ "email": address, "password": password }) if ppfun_userdata is not None: logger.info('User %s logging in with ppfun credentials', ppfun_userdata) ret = await self.login(*ppfun_userdata) return ret return None async def check_pass( self, username: str, login_type: str, login_dict: "synapse.module_api.JsonDict", ) -> Optional[ Tuple[ str, Optional[Callable[["synapse.module_api.LoginResponse"], Awaitable[None]]], ] ]: if login_type != "m.login.password": return None query = { "password": login_dict.get('password') } if username.startswith('@pp_'): query['id'] = username[4: username.index(':')] elif username.startswith('pp_') and username.endswith(f':{self.api.server_name}'): query['id'] = username[3: username.index(':')] else: query['name'] = username ppfun_userdata = await self.check_credentials(query) if ppfun_userdata is not None: logger.info('User %s logging in with ppfun credentials', ppfun_userdata) ret = await self.login(*ppfun_userdata) return ret return None