rumble/rumble.py

102 lines
3.2 KiB
Python

#!/usr/bin/env python3
#-*- coding: utf-8 -*-
from buttplug.client import (
ButtplugClient,
ButtplugClientDevice,
ButtplugClientConnectorError,
ButtplugClientWebsocketConnector,
)
from buttplug.core import ButtplugLogLevel
import asyncio
from datetime import datetime
from collections.abc import Iterator
from re import match
async def cancel_me():
print('cancel_me(): before sleep')
try:
await asyncio.sleep(3600)
except asyncio.CancelledError:
pass
async def device_added_task(dev: ButtplugClientDevice):
print("Device Added: {}".format(dev.name))
if "VibrateCmd" in dev.allowed_messages.keys():
# stolen from <https://stackoverflow.com/a/54263201>
def follow(file, sleep_sec=0.1) -> Iterator[str]:
""" Yield each line from a file as they are written.
`sleep_sec` is the time to sleep after empty reads. """
line = ''
while True:
tmp = file.readline()
if tmp is not None:
line += tmp
if line.endswith("\n"):
yield line
line = ''
elif sleep_sec:
time.sleep(sleep_sec)
with open("../../debug.txt", 'r') as file:
for line in follow(file):
pattern = '^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}: ACTION\[Main\]: \[rumble\] (queue|instant) \d+(.\d+)? \d+(.\d+)?$'
if match(pattern, line) is None:
continue
parts = line.split(" ")
datetime_log_string = " ".join([
parts[0],
parts[1],
])
datetime_log = datetime.strptime(
datetime_log_string,
'%Y-%m-%d %H:%M:%S:'
)
elapsed_time = datetime.now() - datetime_log
if elapsed_time.seconds > 0:
continue # skip old lines
strength = float(parts[5])
duration = float(parts[6])
# TODO: implement instant/queue distinction
await dev.send_vibrate_cmd(strength)
await asyncio.sleep(duration)
await dev.send_vibrate_cmd(0.0)
await dev.send_stop_device_cmd()
def device_added(emitter, dev: ButtplugClientDevice):
asyncio.create_task(device_added_task(dev))
def device_removed(emitter, dev: ButtplugClientDevice):
print("Device removed: ", dev)
async def main():
client = ButtplugClient("Minetest Client ♥ 🍆 💦 🍑")
connector = ButtplugClientWebsocketConnector("ws://127.0.0.1:12345")
client.device_added_handler += device_added
client.device_removed_handler += device_removed
try:
await client.connect(connector)
except ButtplugClientConnectorError as e:
print("Could not connect to server, exiting: {}".format(e.message))
return
await client.start_scanning()
task = asyncio.create_task(cancel_me())
try:
await task
except asyncio.CancelledError:
pass
await client.stop_scanning()
await client.disconnect()
print("Disconnected, quitting")
asyncio.run(main(), debug=True)