@@ -6,10 +6,12 @@ from .chip_info import ChipInfo
from .exception import ChipClosedError
from .info_event import InfoEvent
from .internal import poll_fd
+from .line import Value
from .line_info import LineInfo
from .line_settings import LineSettings, _line_settings_to_ext
from .line_request import LineRequest
from collections import Counter
+from collections.abc import Iterable
from datetime import timedelta
from errno import ENOENT
from select import select
@@ -221,6 +223,7 @@ class Chip:
config: dict[tuple[Union[int, str]], Optional[LineSettings]],
consumer: Optional[str] = None,
event_buffer_size: Optional[int] = None,
+ output_values: Optional[dict[tuple[Union[int, str]], Value]] = None,
) -> LineRequest:
"""
Request a set of lines for exclusive usage.
@@ -234,6 +237,10 @@ class Chip:
Consumer string to use for this request.
event_buffer_size:
Size of the kernel edge event buffer to configure for this request.
+ output_values:
+ Dictionary mapping offsets or names to line.Value. This can be used
+ to set the desired output values globally while reusing LineSettings
+ for more lines.
Returns:
New LineRequest object.
@@ -260,10 +267,20 @@ class Chip:
)
)
+ # If we have global output values - map line names to offsets
+ if output_values:
+ mapped_output_values = {
+ self.line_offset_from_id(line): value
+ for line, value in output_values.items()
+ }
+ else:
+ mapped_output_values = None
+
for lines, settings in config.items():
offsets = list()
name_map = dict()
offset_map = dict()
+ global_output_values = list()
if isinstance(lines, int) or isinstance(lines, str):
lines = (lines,)
@@ -271,6 +288,16 @@ class Chip:
for line in lines:
offset = self.line_offset_from_id(line)
offsets.append(offset)
+
+ # If there's a global output value for this offset, store it in the
+ # list for later.
+ if mapped_output_values:
+ global_output_values.append(
+ mapped_output_values[offset]
+ if offset in mapped_output_values
+ else Value.INACTIVE
+ )
+
if isinstance(line, str):
name_map[line] = offset
offset_map[offset] = line
@@ -279,6 +306,9 @@ class Chip:
offsets, _line_settings_to_ext(settings or LineSettings())
)
+ if len(global_output_values):
+ line_cfg.set_output_values(global_output_values)
+
req_internal = self._chip.request_lines(line_cfg, consumer, event_buffer_size)
request = LineRequest(req_internal)
@@ -89,12 +89,76 @@ line_config_add_line_settings(line_config_object *self, PyObject *args)
Py_RETURN_NONE;
}
+static PyObject *
+line_config_set_output_values(line_config_object *self, PyObject *args)
+{
+ PyObject *values, *iter, *next, *val_stripped;
+ enum gpiod_line_value *valbuf;
+ Py_ssize_t num_values, pos;
+ int ret;
+
+ values = PyTuple_GetItem(args, 0);
+ if (!values)
+ return NULL;
+
+ num_values = PyObject_Size(values);
+ if (num_values < 0)
+ return NULL;
+
+ valbuf = PyMem_Calloc(num_values, sizeof(*valbuf));
+ if (!valbuf)
+ return PyErr_NoMemory();
+
+ iter = PyObject_GetIter(values);
+ if (!iter) {
+ PyMem_Free(valbuf);
+ return NULL;
+ }
+
+ for (pos = 0;; pos++) {
+ next = PyIter_Next(iter);
+ if (!next) {
+ Py_DECREF(iter);
+ break;
+ }
+
+ val_stripped = PyObject_GetAttrString(next, "value");
+ Py_DECREF(next);
+ if (!val_stripped) {
+ PyMem_Free(valbuf);
+ Py_DECREF(iter);
+ return NULL;
+ }
+
+ valbuf[pos] = PyLong_AsLong(val_stripped);
+ Py_DECREF(val_stripped);
+ if (PyErr_Occurred()) {
+ PyMem_Free(valbuf);
+ Py_DECREF(iter);
+ return NULL;
+ }
+ }
+
+ ret = gpiod_line_config_set_output_values(self->cfg,
+ valbuf, num_values);
+ PyMem_Free(valbuf);
+ if (ret)
+ return Py_gpiod_SetErrFromErrno();
+
+ Py_RETURN_NONE;
+}
+
static PyMethodDef line_config_methods[] = {
{
.ml_name = "add_line_settings",
.ml_meth = (PyCFunction)line_config_add_line_settings,
.ml_flags = METH_VARARGS,
},
+ {
+ .ml_name = "set_output_values",
+ .ml_meth = (PyCFunction)line_config_set_output_values,
+ .ml_flags = METH_VARARGS,
+ },
{ }
};
@@ -402,6 +402,61 @@ class LineRequestConsumerString(TestCase):
self.assertEqual(info.consumer, "?")
+class LineRequestSetOutputValues(TestCase):
+ def setUp(self):
+ self.sim = gpiosim.Chip(
+ num_lines=4, line_names={0: "foo", 1: "bar", 2: "baz", 3: "xyz"}
+ )
+
+ def tearDown(self):
+ del self.sim
+
+ def test_request_with_globally_set_output_values(self):
+ with gpiod.request_lines(
+ self.sim.dev_path,
+ config={(0, 1, 2, 3): gpiod.LineSettings(direction=Direction.OUTPUT)},
+ output_values={
+ 0: Value.ACTIVE,
+ 1: Value.INACTIVE,
+ 2: Value.ACTIVE,
+ 3: Value.INACTIVE,
+ },
+ ) as request:
+ self.assertEqual(self.sim.get_value(0), SimVal.ACTIVE)
+ self.assertEqual(self.sim.get_value(1), SimVal.INACTIVE)
+ self.assertEqual(self.sim.get_value(2), SimVal.ACTIVE)
+ self.assertEqual(self.sim.get_value(3), SimVal.INACTIVE)
+
+ def test_request_with_globally_set_output_values_with_mapping(self):
+ with gpiod.request_lines(
+ self.sim.dev_path,
+ config={(0, 1, 2, 3): gpiod.LineSettings(direction=Direction.OUTPUT)},
+ output_values={"baz": Value.ACTIVE, "foo": Value.ACTIVE},
+ ) as request:
+ self.assertEqual(self.sim.get_value(0), SimVal.ACTIVE)
+ self.assertEqual(self.sim.get_value(1), SimVal.INACTIVE)
+ self.assertEqual(self.sim.get_value(2), SimVal.ACTIVE)
+ self.assertEqual(self.sim.get_value(3), SimVal.INACTIVE)
+
+ def test_request_with_globally_set_output_values_bad_mapping(self):
+ with self.assertRaises(FileNotFoundError):
+ with gpiod.request_lines(
+ self.sim.dev_path,
+ config={(0, 1, 2, 3): gpiod.LineSettings(direction=Direction.OUTPUT)},
+ output_values={"foobar": Value.ACTIVE},
+ ) as request:
+ pass
+
+ def test_request_with_globally_set_output_values_bad_offset(self):
+ with self.assertRaises(ValueError):
+ with gpiod.request_lines(
+ self.sim.dev_path,
+ config={(0, 1, 2, 3): gpiod.LineSettings(direction=Direction.OUTPUT)},
+ output_values={5: Value.ACTIVE},
+ ) as request:
+ pass
+
+
class ReconfigureRequestedLines(TestCase):
def setUp(self):
self.sim = gpiosim.Chip(num_lines=8, line_names={3: "foo", 4: "bar", 6: "baz"})