#!/usr/bin/env python3 #-*- coding: utf-8 -*- from buttplug.client import ( ButtplugClient, ButtplugClientDevice, ButtplugClientConnectorError, ButtplugClientWebsocketConnector, ) from buttplug.core import ButtplugLogLevel import asyncio from datetime import datetime from collections.abc import Iterator from re import match async def cancel_me(): print('cancel_me(): before sleep') try: await asyncio.sleep(3600) except asyncio.CancelledError: pass async def device_added_task(dev: ButtplugClientDevice): print("Device Added: {}".format(dev.name)) if "VibrateCmd" in dev.allowed_messages.keys(): # stolen from def follow(file, sleep_sec=0.1) -> Iterator[str]: """ Yield each line from a file as they are written. `sleep_sec` is the time to sleep after empty reads. """ line = '' while True: tmp = file.readline() if tmp is not None: line += tmp if line.endswith("\n"): yield line line = '' elif sleep_sec: time.sleep(sleep_sec) with open("debug.txt", 'r') as file: for line in follow(file): pattern = '^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}: ACTION\[Main\]: \[rumble\] (queue|instant) \d+(.\d+)? \d+(.\d+)?$' if match(pattern, line) is None: continue parts = line.split(" ") datetime_log_string = " ".join([ parts[0], parts[1], ]) datetime_log = datetime.strptime( datetime_log_string, '%Y-%m-%d %H:%M:%S:' ) elapsed_time = datetime.now() - datetime_log if elapsed_time.seconds > 0: continue # skip old lines strength = float(parts[5]) duration = float(parts[6]) # TODO: implement instant/queue distinction await dev.send_vibrate_cmd(strength) await asyncio.sleep(duration) await dev.send_vibrate_cmd(0.0) await dev.send_stop_device_cmd() def device_added(emitter, dev: ButtplugClientDevice): asyncio.create_task(device_added_task(dev)) def device_removed(emitter, dev: ButtplugClientDevice): print("Device removed: ", dev) async def main(): client = ButtplugClient("Minetest Client ♥ 🍆 💦 🍑") connector = ButtplugClientWebsocketConnector("ws://127.0.0.1:12345") client.device_added_handler += device_added client.device_removed_handler += device_removed try: await client.connect(connector) except ButtplugClientConnectorError as e: print("Could not connect to server, exiting: {}".format(e.message)) return await client.start_scanning() task = asyncio.create_task(cancel_me()) try: await task except asyncio.CancelledError: pass await client.stop_scanning() await client.disconnect() print("Disconnected, quitting") asyncio.run(main(), debug=True)