Queue Nedir ?
Gömülü sistemlerde çok sık kullanılan bir veri yapısı olan kuyruk yapısından bahsedeceğim. Öncelikle kuyruk (queue) nedir ne değildir ile başlayıp, çeşitli kullanım alanlarından bahsedip, son olarakta bir kütüphane ile bunu realize edeceğiz.
Anlatıma geçmeden birkaç görsel ile hayal gücünüzde bu veri yapısının neye benzediği ile başlayalım.
Queue yapısı temelinde bir ilk giren ilk çıkar (First in First Out (FIFO)) temelinde bir yapıdır. Bu verinin boyutunun ne olduğundan ziyade verinin tampon bellekte (buffer) yerleştirildiği yeri belirlemek ve doğru bir şekilde ilgili yer indislerini yönetmek önünüze çıkan yegâne zorluk olacaktır.
Nerelerde Kullanılır ?
Kullanım alanı kullanıcın isteği doğrultusunda sınırsız olabileceği gibi gömülü sistemlerde en çok kullanıldığı yerler şunlardır;
- İşlemcinin ana işlemleri arasında birden çok veri giriş/çıkışının (Input/Output (I/O)) yönetildiği ve gelen verilerin anlık parlamalar(burst) ile gelip uzunca süre işlenmesi için size boşluk kaldığında,
- İşlemcinizin hızlı ama çevreselin I/O hızının yavaş olduğu durumlarda,
- UART,
- SPI,
- I2C gibi I/O modüllerinde sıkça kullanılır.
- Temelinde bir veri I/O modeli olduğu için çoklu işlem yapılı (multi-threaded) uygulamalarda threadler arasında veri paylaşımı için sıkça kullanılır.
Nasıl Yönetilir ?
Yazının başında bahsettiğim gibi queue yapısı aslında bir FIFO fakat yazılım tabanlı bir kopyası sadece… Tek farkı donanım üstünden kullanıldığında ilgili yazmaça (registera) yazmak (push, enqueue vs.) ilgili donanımın veri miktarını 1 arttırırken, okumak 1 azaltıyor ve de bize hafıza ile ilgili herhangi bir indis tutma, yönetme, buffer büyüklüğü aşıldı mı sorgusunu gerçekleştirme ihtiyacı kalmıyor. Lakin bu yazının amacı bu modülü yazılım ile yapmak olduğu için hafızayı yönetmek, ilgili queue bufferının sonuna gelip gelmediğimizi kontrol etmek, queuenun yazmaya ve/veya okumaya müsait olup/olmadığını kontrol etmek bizlere düşüyor.
Bu noktada neredeyse milyonlarca implementasyonla karşılaşabilirsiniz. Linux’ün Kernel kodlarında yapılan yaklaşım şu şekildedir.
- Yazıcı indisi(head): Verinin son yazıldığı yeri gösterir ve okuyucu indise yazma yoluyla eşit olmaması gerekir.
- Okuyucu indisi(tail): Verinin okunacağı indisi kontrol eder ve heade ulaşmışsa ilgili queueda okunacak bir şey olmadığını gösterir.
- Linux Kernel’i sanat eseri sayılabilecek (State-of-Art) bir kod olduğu için queuenun sonunu bulmak için birçok implementasyondan farklı olarak modulo (%) işlemi yerine çok daha efektif olan lojik ve (and, &) işlemini kullanır. Bunu kullanmak içinse queue için ayırdığımız bufferın boyutunun 2’nin üssü olması gerekir. (1, 2, 4, 8, 16, 32 ……256…..1024…)
Bu implementasyonda ki kayıp ilgili queueda bir elemanı asla dolduramamaktır. Örneğin üst üste yazma yapıldığında head taili ezemeyeceği için ilgili 1 alandan feragat etmeniz gerekir.
Kendi implementasyonumuzu daha da reelize etmek için konuyu derinleştirmek gerekir. Queue yapısında basitçe veri yazdığımızda 1 artan bir indisi, okuduğumuzda da 1 artan bir başka indisi bulunmakta ve de boyutunu okuyabildiğimiz bir başka indisimiz bulunmalıdır. Linux’ten gördüğümüz üzere eğer queuemuzun boyutu 2’nin üssü olursa % işlemi yerine & işlemi kullanabilmekteyiz. Eğer 2’nin üssü kullanmaz isek % işlemi gibi yaygın hiçbir işlemcide donanımsal implemente edilmeyen bir matematiksel işlem kullandığımızdan çalışma zamanında (run-time) kayba uğrarız.
Uygulama
İmplementasyon aşamasında ilgili queueya yaptığımız girdilerin bir fonksiyon çağırmada topluca atılabilmesi için variadic macro denilen değişken sayıda parametre alabilen fonksiyon tipi kullanılmıştır.
Böylelikle put_to_queue fonksiyonu 3. argüman olan variable data length (değişken veri uzunluğu) her zaman gerek duymaz hale getirmiş bulunduk. 3. parametre olan ve uint32_t tipinde 4 byte uzunluğunda olan fonksiyona geçilen değişken var olup olmadığı ise queue_s içerisinde bulunan q_inout_fixed_size değişkeninin 0xFFFF olup olmaması yardımı ile belirlenmektedir. Böylelikle okuma yönünde get_from_queue fonksiyonu da bu çok yönlülüğe uyumlu çalışmaktadır.
Kütüphanede başarılı olan her put ve get fonksiyonu 1 döndürmekte iken başarısızlar 0 döndürmektedir.
example.c (Edit 11/04/2020)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
#include "byte_queue.h" #define LENGTH_OF_UART_RX_QUEUE_BUFF 100 #define LENGTH_OF_UART_TX_QUEUE_BUFF 100 uint8_t uart_rx_buffer[LENGTH_OF_UART_RX_QUEUE_BUFF]; uint32_t uart_tx_buffer[LENGTH_OF_UART_TX_QUEUE_BUFF]; struct queue_s uart_rx_queue = { .q_buff_start_ptr = uart_rx_buffer, .q_buff_size = LENGTH_OF_UART_RX_QUEUE_BUFF * sizeof(uart_rx_buffer[0]), .q_inout_fixed_size = 0xFFFF }; struct queue_s uart_tx_queue = { .q_buff_start_ptr = (uint8_t*)uart_tx_buffer, .q_buff_size = LENGTH_OF_UART_TX_QUEUE_BUFF * sizeof(uart_tx_buffer[0]), .q_inout_fixed_size = sizeof(uart_tx_buffer[0]) }; void UART_Read(uint8_t *data_ptr, uint32_t size) { if (put_to_queue(&uart_rx_queue, (void*)data_ptr, size)) { /*Bu queue da bu boyutta veri alacak yer yok*/ } } int main() { uint32_t tx_test_data[3] = {0xAA55AA55, 0xDEADBEEF, 0xFACEB00C}; uint8_t read_buffer[3]; /*System Initialize*/ init_queue(&uart_rx_queue); init_queue(&uart_tx_queue); put_to_queue(&uart_tx_queue, (void*)&tx_test_data[0]);/*inout fixed length verildiği için 3. parametre umursanmaz*/ put_to_queue(&uart_tx_queue, (void*)&tx_test_data[1]);/*inout fixed length verildiği için 3. parametre umursanmaz*/ while (1) { if (get_used_place_size_of_queue(&uart_rx_queue) >= sizeof(read_buffer)) { /*Receive queue da veri var*/ get_from_queue(&uart_rx_queue, read_buffer, sizeof(read_buffer)); } } return 0; } |
byte_queue.h dosyamızın içeriği
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
#ifndef BYTE_QUEUE_H_INCLUDED #define BYTE_QUEUE_H_INCLUDED #include <stdint.h> #include <stdarg.h> #ifndef TRUE #define TRUE 1ULL #endif #ifndef FALSE #define FALSE 0ULL #endif #ifndef EMPTY #define EMPTY -1LL #endif #ifndef NULL #define NULL 0ULL #endif struct queue_s { uint8_t *q_buff_start_ptr; /*Queue için kullanılacak bufferın başlangıç adresi*/ uint16_t q_buff_write_index; /*Queue içerisinde yazma indisi*/ uint16_t q_buff_read_index; /*Queue içerisinde okuma indisi*/ uint16_t q_buff_size; /*Queue içerisinde kullanılan bufferın boyutu*/ uint16_t q_inout_fixed_size; /*Her veri okuma/yazmada kullanılacak sabit boyut, eğer yok ise 0xFFFF olarak tanımlanıp, queue ya içeri atılacak veri boyutu 3. parametre olarak put ve get fonksiyonlarına verilmelidir.*/ }; /*Byte Queue Functions*/ /*Queue initialize fonksiyonu*/ extern void init_queue(struct queue_s*); /*Queue'da o anda bulunan boş byte miktarı*/ extern uint16_t get_empty_place_size_of_queue(struct queue_s*); /*Queue'ya veri gönderme*/ extern uint8_t put_to_queue(struct queue_s*, void*, ...); /*Queue'da o an bulunan okunabilecek byte cinsinden veri miktarı*/ extern uint16_t get_used_place_size_of_queue(struct queue_s*); /*Queue'dan veri okuma*/ extern uint8_t get_from_queue(struct queue_s*, void*, ...); /*Queue'dan veri okuma istisnası ise aynı verinin tekrar ve tekrar okunabilmesidir. İstisnai olarak okunan bir verinin queuenun okuma indisine çaktırmadan okunabilmesi için yerleştirilmiştir.*/ extern uint8_t get_from_queue_no_inc_read_ptr(struct queue_s*, void*, ...); #endif |
byte_queue.c dosyasımızın içeriği
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
#include <stdint.h> #include <stdlib.h> #include <stdarg.h> #include <string.h> #include "byte_queue.h" void init_queue(struct queue_s *queue_ptr) { queue_ptr->q_buff_write_index = 0; queue_ptr->q_buff_read_index = 0; } uint16_t get_empty_place_size_of_queue(struct queue_s *queue_ptr) { if (queue_ptr -> q_buff_write_index == queue_ptr -> q_buff_read_index) return queue_ptr -> q_buff_size - 1; if (queue_ptr -> q_buff_write_index > queue_ptr -> q_buff_read_index) { return ((queue_ptr -> q_buff_size - 1) - queue_ptr -> q_buff_write_index) + queue_ptr -> q_buff_read_index; } else { return queue_ptr -> q_buff_read_index - (queue_ptr -> q_buff_write_index + 1); } } uint8_t put_to_queue(struct queue_s *queue_ptr, void *data_ptr, ...) { uint16_t queue_empty_place_cnt, remain_place, temp_write_index, data_len; uint8_t *copy_ptr; va_list arguments; queue_empty_place_cnt = get_empty_place_size_of_queue(queue_ptr); if (queue_ptr -> q_inout_fixed_size != (uint16_t)EMPTY) { data_len = queue_ptr -> q_inout_fixed_size; } else { va_start(arguments,data_ptr);/*data len i alýnýyor*/ data_len = va_arg(arguments, uint32_t); va_end(arguments); } if (queue_empty_place_cnt < data_len) return FALSE; if (queue_ptr -> q_buff_write_index >= queue_ptr -> q_buff_read_index) { remain_place = ((queue_ptr -> q_buff_size - 1) - queue_ptr -> q_buff_write_index); temp_write_index = queue_ptr -> q_buff_write_index + 1; temp_write_index = (temp_write_index == queue_ptr -> q_inout_fixed_size) ? 0:temp_write_index; if (remain_place >= data_len) { copy_ptr = (uint8_t*)( (uint32_t)queue_ptr -> q_buff_start_ptr + temp_write_index ); memcpy(copy_ptr, data_ptr, data_len ); queue_ptr -> q_buff_write_index = temp_write_index + data_len - 1; } else { copy_ptr = (uint8_t*)( (uint32_t)queue_ptr -> q_buff_start_ptr + temp_write_index ); memcpy(copy_ptr, data_ptr, remain_place ); data_len -= remain_place; copy_ptr = (uint8_t*)( (uint32_t)data_ptr + remain_place ); memcpy(queue_ptr -> q_buff_start_ptr, copy_ptr, data_len); queue_ptr -> q_buff_write_index = data_len - 1; } } else { temp_write_index = queue_ptr -> q_buff_write_index + 1; temp_write_index = (temp_write_index == queue_ptr -> q_inout_fixed_size) ? 0: temp_write_index; copy_ptr = (uint8_t*)( (uint32_t)queue_ptr -> q_buff_start_ptr + temp_write_index ); memcpy(copy_ptr, data_ptr, data_len ); queue_ptr -> q_buff_write_index = temp_write_index + data_len - 1; } return TRUE; } uint16_t get_used_place_size_of_queue(struct queue_s *queue_ptr) { if (queue_ptr -> q_buff_write_index == queue_ptr -> q_buff_read_index) return 0; if (queue_ptr -> q_buff_write_index > queue_ptr -> q_buff_read_index) return (queue_ptr -> q_buff_write_index - queue_ptr -> q_buff_read_index); else return ((queue_ptr -> q_buff_size) - queue_ptr -> q_buff_read_index) + queue_ptr -> q_buff_write_index; } uint8_t get_from_queue(struct queue_s *queue_ptr, void *data_ptr, ...) { uint16_t queue_used_place_cnt, remain_place, temp_read_index, data_len = 0; uint8_t *copy_ptr; va_list arguments; if (queue_ptr -> q_inout_fixed_size != (uint16_t)EMPTY) { data_len = queue_ptr -> q_inout_fixed_size; } else { va_start(arguments, data_ptr);/*data len i alýnýyor*/ data_len = va_arg(arguments, uint32_t); va_end(arguments); } queue_used_place_cnt = get_used_place_size_of_queue(queue_ptr); if (data_len == 0 || data_len > queue_used_place_cnt) return FALSE; if (queue_ptr -> q_buff_read_index >= queue_ptr -> q_buff_write_index) { remain_place = ( (queue_ptr -> q_buff_size - 1) - queue_ptr -> q_buff_read_index); temp_read_index = queue_ptr -> q_buff_read_index + 1; temp_read_index = (temp_read_index == queue_ptr -> q_inout_fixed_size) ? 0 : temp_read_index; if (remain_place >= data_len) { copy_ptr = (uint8_t*)( (uint32_t)queue_ptr -> q_buff_start_ptr + temp_read_index ); memcpy(data_ptr, copy_ptr, data_len ); queue_ptr -> q_buff_read_index = temp_read_index + data_len - 1; } else { copy_ptr = (uint8_t*)((uint32_t)queue_ptr -> q_buff_start_ptr + temp_read_index); memcpy(data_ptr, copy_ptr, remain_place ); data_len -= remain_place; copy_ptr = (uint8_t*)(queue_ptr -> q_buff_start_ptr); memcpy((uint8_t*)((uint32_t)data_ptr + remain_place), copy_ptr, data_len); queue_ptr -> q_buff_read_index = data_len - 1; } } else { temp_read_index = queue_ptr -> q_buff_read_index + 1; temp_read_index = (temp_read_index == queue_ptr -> q_inout_fixed_size) ? 0 : temp_read_index; copy_ptr = (uint8_t*)((uint32_t)queue_ptr -> q_buff_start_ptr + temp_read_index); memcpy(data_ptr, copy_ptr, data_len ); queue_ptr -> q_buff_read_index = temp_read_index + data_len - 1; } return TRUE; } uint8_t get_from_queue_no_inc_read_ptr(struct queue_s *queue_ptr, void *data_ptr, ...) { uint16_t queue_used_place_cnt, remain_place, temp_read_index, data_len = 0; uint8_t *copy_ptr; va_list arguments; if (queue_ptr -> q_inout_fixed_size != (uint16_t)EMPTY) { data_len = queue_ptr -> q_inout_fixed_size; } else { va_start(arguments, data_ptr);/*data len i alýnýyor*/ data_len = va_arg(arguments, uint32_t); va_end(arguments); } queue_used_place_cnt = get_used_place_size_of_queue(queue_ptr); if (data_len == 0 || data_len > queue_used_place_cnt) return FALSE; if (queue_ptr -> q_buff_read_index >= queue_ptr -> q_buff_write_index) { remain_place = ( (queue_ptr -> q_buff_size - 1) - queue_ptr -> q_buff_read_index); temp_read_index = queue_ptr -> q_buff_read_index + 1; temp_read_index = (temp_read_index == queue_ptr -> q_inout_fixed_size) ? 0 : temp_read_index; if (remain_place >= data_len) { copy_ptr = (uint8_t*)((uint32_t)queue_ptr -> q_buff_start_ptr + temp_read_index); memcpy(data_ptr, copy_ptr, data_len); } else { copy_ptr = (uint8_t*)((uint32_t)queue_ptr -> q_buff_start_ptr + temp_read_index); memcpy(data_ptr, copy_ptr, remain_place ); data_len -= remain_place; copy_ptr = (uint8_t*)(queue_ptr -> q_buff_start_ptr); memcpy((uint8_t*)((uint32_t)data_ptr + remain_place), copy_ptr, data_len); } } else { temp_read_index = queue_ptr -> q_buff_read_index + 1; temp_read_index = (temp_read_index == queue_ptr -> q_inout_fixed_size) ? 0 : temp_read_index; copy_ptr = (uint8_t*)( (uint32_t)queue_ptr -> q_buff_start_ptr + temp_read_index ); memcpy(data_ptr, copy_ptr, data_len); } return TRUE; } |