75 lines
2.5 KiB
Python
75 lines
2.5 KiB
Python
import asyncio
|
|
import sys
|
|
import psycopg
|
|
|
|
import matrix
|
|
import mq
|
|
import protocol
|
|
import config
|
|
import db
|
|
import log
|
|
|
|
|
|
async def matrixmain(matrix_client: matrix.MatrixClient, mq_client: mq.MQClient, db_connection: psycopg.AsyncConnection):
|
|
from nio import RoomMessageText, MatrixRoom
|
|
|
|
last_seen = await db.last_seen(db_connection)
|
|
client = matrix_client.client
|
|
|
|
async def callback(room: MatrixRoom, event: RoomMessageText):
|
|
import datetime
|
|
server_timestamp = event.server_timestamp / 1000
|
|
dt_received = datetime.datetime.fromtimestamp(server_timestamp)
|
|
if dt_received > last_seen:
|
|
handled = await matrix.message_received_cb(matrix_client, mq_client, room, event)
|
|
if handled:
|
|
await db.update_last_seen(db_connection, dt_received)
|
|
|
|
client.add_event_callback(callback, RoomMessageText)
|
|
|
|
try:
|
|
await client.sync_forever(timeout=30000, full_state=True) # milliseconds
|
|
log.info('exiting from the matrix loop')
|
|
client.logout()
|
|
except Exception as e:
|
|
log.error(f'exception in matrix loop: {e}')
|
|
|
|
|
|
async def mqmain(rabbit_client: mq.MQClient, matrix_client: matrix.MatrixClient):
|
|
async def loop():
|
|
async with rabbit_client.queue.iterator() as queue_iter:
|
|
async for message in queue_iter:
|
|
async with message.process():
|
|
body = message.body.decode()
|
|
br = protocol.json_to_bot_response(body)
|
|
# logging.info(f'New message from MQ: {str(br)[:24]}...')
|
|
await matrix.send_text(matrix_client, br)
|
|
|
|
try:
|
|
async with rabbit_client.connection:
|
|
await loop()
|
|
except Exception as e:
|
|
log.error(f'exception in mq loop: {e}')
|
|
|
|
|
|
async def main(conf: config.Configuration):
|
|
import echobot
|
|
|
|
mq_client = await mq.initialize(conf)
|
|
matrix_client = await matrix.initialize(conf)
|
|
|
|
async with await psycopg.AsyncConnection.connect(conf.db_url, autocommit=True) as db_connection:
|
|
await asyncio.gather(
|
|
echobot.run(conf.mq_url),
|
|
mqmain(mq_client, matrix_client),
|
|
matrixmain(matrix_client, mq_client, db_connection)
|
|
)
|
|
|
|
if __name__ == '__main__':
|
|
configuration = config.init()
|
|
if isinstance(configuration, str): # it's an error!
|
|
print(configuration, file=sys.stderr)
|
|
sys.exit(2)
|
|
else:
|
|
loop = asyncio.get_event_loop()
|
|
loop.run_until_complete(main(configuration))
|