@@ -17,7 +17,7 @@
import socket
import threading
import time
-from typing import Optional
+from typing import Deque, Optional
class ConsoleSocket(socket.socket):
@@ -30,11 +30,11 @@ class ConsoleSocket(socket.socket):
Optionally a file path can be passed in and we will also
dump the characters to this file for debugging purposes.
"""
- def __init__(self, address, file=None, drain=False):
- self._recv_timeout_sec = 300
+ def __init__(self, address: str, file: Optional[str] = None,
+ drain: bool = False):
self._recv_timeout_sec = 300.0
self._sleep_time = 0.5
- self._buffer = deque()
+ self._buffer: Deque[str] = deque()
socket.socket.__init__(self, socket.AF_UNIX, socket.SOCK_STREAM)
self.connect(address)
self._logfile = None
@@ -45,7 +45,7 @@ def __init__(self, address, file=None, drain=False):
if drain:
self._drain_thread = self._thread_start()
- def _drain_fn(self):
+ def _drain_fn(self) -> None:
"""Drains the socket and runs while the socket is open."""
while self._open:
try:
@@ -56,7 +56,7 @@ def _drain_fn(self):
# self._open is set to False.
time.sleep(self._sleep_time)
- def _thread_start(self):
+ def _thread_start(self) -> threading.Thread:
"""Kick off a thread to drain the socket."""
# Configure socket to not block and timeout.
# This allows our drain thread to not block
@@ -68,7 +68,7 @@ def _thread_start(self):
drain_thread.start()
return drain_thread
- def close(self):
+ def close(self) -> None:
"""Close the base object and wait for the thread to terminate"""
if self._open:
self._open = False
@@ -80,7 +80,7 @@ def close(self):
self._logfile.close()
self._logfile = None
- def _drain_socket(self):
+ def _drain_socket(self) -> None:
"""process arriving characters into in memory _buffer"""
data = socket.socket.recv(self, 1)
# latin1 is needed since there are some chars
@@ -114,7 +114,7 @@ def recv(self, bufsize: int = 1, flags: int = 0) -> bytes:
# socket w/o our intervention.
return chars.encode("latin1")
- def setblocking(self, value):
+ def setblocking(self, value: bool) -> None:
"""When not draining we pass thru to the socket,
since when draining we control socket blocking.
"""