102 lines
3.2 KiB
Python
102 lines
3.2 KiB
Python
#!/usr/bin/env python3
|
|
#-*- coding: utf-8 -*-
|
|
from buttplug.client import (
|
|
ButtplugClient,
|
|
ButtplugClientDevice,
|
|
ButtplugClientConnectorError,
|
|
ButtplugClientWebsocketConnector,
|
|
)
|
|
from buttplug.core import ButtplugLogLevel
|
|
import asyncio
|
|
|
|
from datetime import datetime
|
|
from collections.abc import Iterator
|
|
from re import match
|
|
|
|
async def cancel_me():
|
|
print('cancel_me(): before sleep')
|
|
|
|
try:
|
|
await asyncio.sleep(3600)
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
|
|
async def device_added_task(dev: ButtplugClientDevice):
|
|
print("Device Added: {}".format(dev.name))
|
|
if "VibrateCmd" in dev.allowed_messages.keys():
|
|
# stolen from <https://stackoverflow.com/a/54263201>
|
|
def follow(file, sleep_sec=0.1) -> Iterator[str]:
|
|
""" Yield each line from a file as they are written.
|
|
`sleep_sec` is the time to sleep after empty reads. """
|
|
line = ''
|
|
while True:
|
|
tmp = file.readline()
|
|
if tmp is not None:
|
|
line += tmp
|
|
if line.endswith("\n"):
|
|
yield line
|
|
line = ''
|
|
elif sleep_sec:
|
|
time.sleep(sleep_sec)
|
|
|
|
|
|
with open("../../debug.txt", 'r') as file:
|
|
for line in follow(file):
|
|
pattern = '^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}: ACTION\[Main\]: \[rumble\] (queue|instant) \d+(.\d+)? \d+(.\d+)?$'
|
|
if match(pattern, line) is None:
|
|
continue
|
|
parts = line.split(" ")
|
|
datetime_log_string = " ".join([
|
|
parts[0],
|
|
parts[1],
|
|
])
|
|
datetime_log = datetime.strptime(
|
|
datetime_log_string,
|
|
'%Y-%m-%d %H:%M:%S:'
|
|
)
|
|
elapsed_time = datetime.now() - datetime_log
|
|
if elapsed_time.seconds > 0:
|
|
continue # skip old lines
|
|
strength = float(parts[5])
|
|
duration = float(parts[6])
|
|
# TODO: implement instant/queue distinction
|
|
await dev.send_vibrate_cmd(strength)
|
|
await asyncio.sleep(duration)
|
|
await dev.send_vibrate_cmd(0.0)
|
|
await dev.send_stop_device_cmd()
|
|
|
|
def device_added(emitter, dev: ButtplugClientDevice):
|
|
asyncio.create_task(device_added_task(dev))
|
|
|
|
def device_removed(emitter, dev: ButtplugClientDevice):
|
|
print("Device removed: ", dev)
|
|
|
|
async def main():
|
|
client = ButtplugClient("Minetest Client ♥ 🍆 💦 🍑")
|
|
connector = ButtplugClientWebsocketConnector("ws://127.0.0.1:12345")
|
|
|
|
client.device_added_handler += device_added
|
|
client.device_removed_handler += device_removed
|
|
|
|
try:
|
|
await client.connect(connector)
|
|
except ButtplugClientConnectorError as e:
|
|
print("Could not connect to server, exiting: {}".format(e.message))
|
|
return
|
|
|
|
await client.start_scanning()
|
|
|
|
task = asyncio.create_task(cancel_me())
|
|
try:
|
|
await task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
await client.stop_scanning()
|
|
|
|
await client.disconnect()
|
|
print("Disconnected, quitting")
|
|
|
|
asyncio.run(main(), debug=True)
|