stm32/adc: Add read_timed_multi() static method, with docs and tests.

This commit is contained in:
Peter Hinch 2018-03-04 09:07:40 +00:00 committed by Damien George
parent 0096a4bd00
commit 4f40fa5cf4
3 changed files with 187 additions and 1 deletions

View File

@ -76,7 +76,58 @@ Methods
for val in buf: # loop over all values
print(val) # print the value out
This function does not allocate any memory.
This function does not allocate any heap memory. It has blocking behaviour:
it does not return to the calling program until the buffer is full.
.. method:: ADC.read_timed_multi((adcx, adcy, ...), (bufx, bufy, ...), timer)
This is a static method. It can be used to extract relative timing or
phase data from multiple ADC's.
It reads analog values from multiple ADC's into buffers at a rate set by
the *timer* object. Each time the timer triggers a sample is rapidly
read from each ADC in turn.
ADC and buffer instances are passed in tuples with each ADC having an
associated buffer. All buffers must be of the same type and length and
the number of buffers must equal the number of ADC's.
Buffers can be ``bytearray`` or ``array.array`` for example. The ADC values
have 12-bit resolution and are stored directly into the buffer if its element
size is 16 bits or greater. If buffers have only 8-bit elements (eg a
``bytearray``) then the sample resolution will be reduced to 8 bits.
*timer* must be a Timer object. The timer must already be initialised
and running at the desired sampling frequency.
Example reading 3 ADC's::
adc0 = pyb.ADC(pyb.Pin.board.X1) # Create ADC's
adc1 = pyb.ADC(pyb.Pin.board.X2)
adc2 = pyb.ADC(pyb.Pin.board.X3)
tim = pyb.Timer(8, freq=100) # Create timer
rx0 = array.array('H', (0 for i in range(100))) # ADC buffers of
rx1 = array.array('H', (0 for i in range(100))) # 100 16-bit words
rx2 = array.array('H', (0 for i in range(100)))
# read analog values into buffers at 100Hz (takes one second)
pyb.ADC.read_timed_multi((adc0, adc1, adc2), (rx0, rx1, rx2), tim)
for n in range(len(rx0)):
print(rx0[n], rx1[n], rx2[n])
This function does not allocate any heap memory. It has blocking behaviour:
it does not return to the calling program until the buffers are full.
The function returns ``True`` if all samples were acquired with correct
timing. At high sample rates the time taken to acquire a set of samples
can exceed the timer period. In this case the function returns ``False``,
indicating a loss of precision in the sample interval. In extreme cases
samples may be missed.
The maximum rate depends on factors including the data width and the
number of ADC's being read. In testing two ADC's were sampled at a timer
rate of 140KHz without overrun. Samples were missed at 180KHz. At high
sample rates disabling interrupts for the duration can reduce the risk
of sporadic data loss.
The ADCAll Object
-----------------

View File

@ -450,9 +450,116 @@ STATIC mp_obj_t adc_read_timed(mp_obj_t self_in, mp_obj_t buf_in, mp_obj_t freq_
}
STATIC MP_DEFINE_CONST_FUN_OBJ_3(adc_read_timed_obj, adc_read_timed);
// read_timed_multi((adcx, adcy, ...), (bufx, bufy, ...), timer)
//
// Read analog values from multiple ADC's into buffers at a rate set by the
// timer. The ADC values have 12-bit resolution and are stored directly into
// the corresponding buffer if its element size is 16 bits or greater, otherwise
// the sample resolution will be reduced to 8 bits.
//
// This function should not allocate any heap memory.
STATIC mp_obj_t adc_read_timed_multi(mp_obj_t adc_array_in, mp_obj_t buf_array_in, mp_obj_t tim_in) {
size_t nadcs, nbufs;
mp_obj_t *adc_array, *buf_array;
mp_obj_get_array(adc_array_in, &nadcs, &adc_array);
mp_obj_get_array(buf_array_in, &nbufs, &buf_array);
if (nadcs < 1) {
mp_raise_ValueError("need at least 1 ADC");
}
if (nadcs != nbufs) {
mp_raise_ValueError("length of ADC and buffer lists differ");
}
// Get buf for first ADC, get word size, check other buffers match in type
mp_buffer_info_t bufinfo;
mp_get_buffer_raise(buf_array[0], &bufinfo, MP_BUFFER_WRITE);
size_t typesize = mp_binary_get_size('@', bufinfo.typecode, NULL);
for (uint array_index = 0; array_index < nbufs; array_index++) {
mp_buffer_info_t bufinfo_curr;
mp_get_buffer_raise(buf_array[array_index], &bufinfo_curr, MP_BUFFER_WRITE);
if ((bufinfo.len != bufinfo_curr.len) || (bufinfo.typecode != bufinfo_curr.typecode)) {
mp_raise_ValueError("size and type of buffers must match");
}
}
// Use the supplied timer object as the sampling time base
TIM_HandleTypeDef *tim;
tim = pyb_timer_get_handle(tim_in);
// Start adc; this is slow so wait for it to start
pyb_obj_adc_t *adc0 = adc_array[0];
adc_config_channel(&adc0->handle, adc0->channel);
HAL_ADC_Start(&adc0->handle);
// Wait for sample to complete and discard
#define READ_TIMED_TIMEOUT (10) // in ms
adc_wait_for_eoc_or_timeout(READ_TIMED_TIMEOUT);
// Read (and discard) value
uint value = ADCx->DR;
// Ensure first sample is on a timer tick
__HAL_TIM_CLEAR_FLAG(tim, TIM_FLAG_UPDATE);
while (__HAL_TIM_GET_FLAG(tim, TIM_FLAG_UPDATE) == RESET) {
}
__HAL_TIM_CLEAR_FLAG(tim, TIM_FLAG_UPDATE);
// Overrun check: assume success
bool success = true;
size_t nelems = bufinfo.len / typesize;
for (size_t elem_index = 0; elem_index < nelems; elem_index++) {
if (__HAL_TIM_GET_FLAG(tim, TIM_FLAG_UPDATE) != RESET) {
// Timer has already triggered
success = false;
} else {
// Wait for the timer to trigger so we sample at the correct frequency
while (__HAL_TIM_GET_FLAG(tim, TIM_FLAG_UPDATE) == RESET) {
}
}
__HAL_TIM_CLEAR_FLAG(tim, TIM_FLAG_UPDATE);
for (size_t array_index = 0; array_index < nadcs; array_index++) {
pyb_obj_adc_t *adc = adc_array[array_index];
// configure the ADC channel
adc_config_channel(&adc->handle, adc->channel);
// for the first sample we need to turn the ADC on
// ADC is started: set the "start sample" bit
#if defined(STM32F4) || defined(STM32F7)
ADCx->CR2 |= (uint32_t)ADC_CR2_SWSTART;
#elif defined(STM32L4)
SET_BIT(ADCx->CR, ADC_CR_ADSTART);
#else
#error Unsupported processor
#endif
// wait for sample to complete
#define READ_TIMED_TIMEOUT (10) // in ms
adc_wait_for_eoc_or_timeout(READ_TIMED_TIMEOUT);
// read value
value = ADCx->DR;
// store values in buffer
if (typesize == 1) {
value >>= 4;
}
mp_buffer_info_t bufinfo_curr; // Get buf for current ADC
mp_get_buffer_raise(buf_array[array_index], &bufinfo_curr, MP_BUFFER_WRITE);
mp_binary_set_val_array_from_int(bufinfo_curr.typecode, bufinfo_curr.buf, elem_index, value);
}
}
// Turn the ADC off
adc0 = adc_array[0];
HAL_ADC_Stop(&adc0->handle);
return mp_obj_new_bool(success);
}
STATIC MP_DEFINE_CONST_FUN_OBJ_3(adc_read_timed_multi_fun_obj, adc_read_timed_multi);
STATIC MP_DEFINE_CONST_STATICMETHOD_OBJ(adc_read_timed_multi_obj, MP_ROM_PTR(&adc_read_timed_multi_fun_obj));
STATIC const mp_rom_map_elem_t adc_locals_dict_table[] = {
{ MP_ROM_QSTR(MP_QSTR_read), MP_ROM_PTR(&adc_read_obj) },
{ MP_ROM_QSTR(MP_QSTR_read_timed), MP_ROM_PTR(&adc_read_timed_obj) },
{ MP_ROM_QSTR(MP_QSTR_read_timed_multi), MP_ROM_PTR(&adc_read_timed_multi_obj) },
};
STATIC MP_DEFINE_CONST_DICT(adc_locals_dict, adc_locals_dict_table);

View File

@ -32,3 +32,31 @@ adcv.read_timed(arv, tim)
print(len(arv))
for i in arv:
assert i > 1000 and i < 2000
# Test read_timed_multi
arv = bytearray(b'\xff'*50)
art = bytearray(b'\xff'*50)
ADC.read_timed_multi((adcv, adct), (arv, art), tim)
for i in arv:
assert i > 60 and i < 125
# Wide range: unsure of accuracy of temp sensor.
for i in art:
assert i > 15 and i < 200
arv = array.array('i', 25 * [-1])
art = array.array('i', 25 * [-1])
ADC.read_timed_multi((adcv, adct), (arv, art), tim)
for i in arv:
assert i > 1000 and i < 2000
# Wide range: unsure of accuracy of temp sensor.
for i in art:
assert i > 50 and i < 2000
arv = array.array('h', 25 * [0x7fff])
art = array.array('h', 25 * [0x7fff])
ADC.read_timed_multi((adcv, adct), (arv, art), tim)
for i in arv:
assert i > 1000 and i < 2000
# Wide range: unsure of accuracy of temp sensor.
for i in art:
assert i > 50 and i < 2000