DPDK  19.08.0-rc0
rte_rcu_qsbr.h
Go to the documentation of this file.
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright (c) 2018 Arm Limited
3  */
4 
5 #ifndef _RTE_RCU_QSBR_H_
6 #define _RTE_RCU_QSBR_H_
7 
23 #ifdef __cplusplus
24 extern "C" {
25 #endif
26 
27 #include <stdio.h>
28 #include <stdint.h>
29 #include <inttypes.h>
30 #include <errno.h>
31 #include <rte_common.h>
32 #include <rte_memory.h>
33 #include <rte_lcore.h>
34 #include <rte_debug.h>
35 #include <rte_atomic.h>
36 
37 extern int rte_rcu_log_type;
38 
39 #if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG
40 #define __RTE_RCU_DP_LOG(level, fmt, args...) \
41  rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
42  "%s(): " fmt "\n", __func__, ## args)
43 #else
44 #define __RTE_RCU_DP_LOG(level, fmt, args...)
45 #endif
46 
47 #if defined(RTE_LIBRTE_RCU_DEBUG)
48 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...) do {\
49  if (v->qsbr_cnt[thread_id].lock_cnt) \
50  rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
51  "%s(): " fmt "\n", __func__, ## args); \
52 } while (0)
53 #else
54 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...)
55 #endif
56 
57 /* Registered thread IDs are stored as a bitmap of 64b element array.
58  * Given thread id needs to be converted to index into the array and
59  * the id within the array element.
60  */
61 #define __RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)
62 #define __RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \
63  RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \
64  __RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3, RTE_CACHE_LINE_SIZE)
65 #define __RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \
66  ((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)
67 #define __RTE_QSBR_THRID_INDEX_SHIFT 6
68 #define __RTE_QSBR_THRID_MASK 0x3f
69 #define RTE_QSBR_THRID_INVALID 0xffffffff
70 
71 /* Worker thread counter */
72 struct rte_rcu_qsbr_cnt {
73  uint64_t cnt;
79  uint32_t lock_cnt;
82 
83 #define __RTE_QSBR_CNT_THR_OFFLINE 0
84 #define __RTE_QSBR_CNT_INIT 1
85 
86 /* RTE Quiescent State variable structure.
87  * This structure has two elements that vary in size based on the
88  * 'max_threads' parameter.
89  * 1) Quiescent state counter array
90  * 2) Register thread ID array
91  */
92 struct rte_rcu_qsbr {
93  uint64_t token __rte_cache_aligned;
96  uint32_t num_elems __rte_cache_aligned;
98  uint32_t num_threads;
100  uint32_t max_threads;
103  struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;
110 
125 size_t __rte_experimental
126 rte_rcu_qsbr_get_memsize(uint32_t max_threads);
127 
146 int __rte_experimental
147 rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
148 
172 int __rte_experimental
173 rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id);
174 
193 int __rte_experimental
194 rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id);
195 
224 static __rte_always_inline void __rte_experimental
225 rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
226 {
227  uint64_t t;
228 
229  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
230 
231  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
232  v->qsbr_cnt[thread_id].lock_cnt);
233 
234  /* Copy the current value of token.
235  * The fence at the end of the function will ensure that
236  * the following will not move down after the load of any shared
237  * data structure.
238  */
239  t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);
240 
241  /* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure
242  * 'cnt' (64b) is accessed atomically.
243  */
244  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
245  t, __ATOMIC_RELAXED);
246 
247  /* The subsequent load of the data structure should not
248  * move above the store. Hence a store-load barrier
249  * is required.
250  * If the load of the data structure moves above the store,
251  * writer might not see that the reader is online, even though
252  * the reader is referencing the shared data structure.
253  */
254 #ifdef RTE_ARCH_X86_64
255  /* rte_smp_mb() for x86 is lighter */
256  rte_smp_mb();
257 #else
258  __atomic_thread_fence(__ATOMIC_SEQ_CST);
259 #endif
260 }
261 
285 static __rte_always_inline void __rte_experimental
286 rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
287 {
288  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
289 
290  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
291  v->qsbr_cnt[thread_id].lock_cnt);
292 
293  /* The reader can go offline only after the load of the
294  * data structure is completed. i.e. any load of the
295  * data strcture can not move after this store.
296  */
297 
298  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
299  __RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE);
300 }
301 
325 static __rte_always_inline void __rte_experimental
326 rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v,
327  __rte_unused unsigned int thread_id)
328 {
329  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
330 
331 #if defined(RTE_LIBRTE_RCU_DEBUG)
332  /* Increment the lock counter */
333  __atomic_fetch_add(&v->qsbr_cnt[thread_id].lock_cnt,
334  1, __ATOMIC_ACQUIRE);
335 #endif
336 }
337 
361 static __rte_always_inline void __rte_experimental
362 rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v,
363  __rte_unused unsigned int thread_id)
364 {
365  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
366 
367 #if defined(RTE_LIBRTE_RCU_DEBUG)
368  /* Decrement the lock counter */
369  __atomic_fetch_sub(&v->qsbr_cnt[thread_id].lock_cnt,
370  1, __ATOMIC_RELEASE);
371 
372  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, WARNING,
373  "Lock counter %u. Nested locks?\n",
374  v->qsbr_cnt[thread_id].lock_cnt);
375 #endif
376 }
377 
394 static __rte_always_inline uint64_t __rte_experimental
395 rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
396 {
397  uint64_t t;
398 
399  RTE_ASSERT(v != NULL);
400 
401  /* Release the changes to the shared data structure.
402  * This store release will ensure that changes to any data
403  * structure are visible to the workers before the token
404  * update is visible.
405  */
406  t = __atomic_add_fetch(&v->token, 1, __ATOMIC_RELEASE);
407 
408  return t;
409 }
410 
426 static __rte_always_inline void __rte_experimental
427 rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
428 {
429  uint64_t t;
430 
431  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
432 
433  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
434  v->qsbr_cnt[thread_id].lock_cnt);
435 
436  /* Acquire the changes to the shared data structure released
437  * by rte_rcu_qsbr_start.
438  * Later loads of the shared data structure should not move
439  * above this load. Hence, use load-acquire.
440  */
441  t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);
442 
443  /* Inform the writer that updates are visible to this reader.
444  * Prior loads of the shared data structure should not move
445  * beyond this store. Hence use store-release.
446  */
447  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
448  t, __ATOMIC_RELEASE);
449 
450  __RTE_RCU_DP_LOG(DEBUG, "%s: update: token = %"PRIu64", Thread ID = %d",
451  __func__, t, thread_id);
452 }
453 
454 /* Check the quiescent state counter for registered threads only, assuming
455  * that not all threads have registered.
456  */
457 static __rte_always_inline int
458 __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
459 {
460  uint32_t i, j, id;
461  uint64_t bmap;
462  uint64_t c;
463  uint64_t *reg_thread_id;
464 
465  for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0);
466  i < v->num_elems;
467  i++, reg_thread_id++) {
468  /* Load the current registered thread bit map before
469  * loading the reader thread quiescent state counters.
470  */
471  bmap = __atomic_load_n(reg_thread_id, __ATOMIC_ACQUIRE);
472  id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
473 
474  while (bmap) {
475  j = __builtin_ctzl(bmap);
476  __RTE_RCU_DP_LOG(DEBUG,
477  "%s: check: token = %"PRIu64", wait = %d, Bit Map = 0x%"PRIx64", Thread ID = %d",
478  __func__, t, wait, bmap, id + j);
479  c = __atomic_load_n(
480  &v->qsbr_cnt[id + j].cnt,
481  __ATOMIC_ACQUIRE);
482  __RTE_RCU_DP_LOG(DEBUG,
483  "%s: status: token = %"PRIu64", wait = %d, Thread QS cnt = %"PRIu64", Thread ID = %d",
484  __func__, t, wait, c, id+j);
485  /* Counter is not checked for wrap-around condition
486  * as it is a 64b counter.
487  */
488  if (unlikely(c !=
489  __RTE_QSBR_CNT_THR_OFFLINE && c < t)) {
490  /* This thread is not in quiescent state */
491  if (!wait)
492  return 0;
493 
494  rte_pause();
495  /* This thread might have unregistered.
496  * Re-read the bitmap.
497  */
498  bmap = __atomic_load_n(reg_thread_id,
499  __ATOMIC_ACQUIRE);
500 
501  continue;
502  }
503 
504  bmap &= ~(1UL << j);
505  }
506  }
507 
508  return 1;
509 }
510 
511 /* Check the quiescent state counter for all threads, assuming that
512  * all the threads have registered.
513  */
514 static __rte_always_inline int
515 __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
516 {
517  uint32_t i;
518  struct rte_rcu_qsbr_cnt *cnt;
519  uint64_t c;
520 
521  for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {
522  __RTE_RCU_DP_LOG(DEBUG,
523  "%s: check: token = %"PRIu64", wait = %d, Thread ID = %d",
524  __func__, t, wait, i);
525  while (1) {
526  c = __atomic_load_n(&cnt->cnt, __ATOMIC_ACQUIRE);
527  __RTE_RCU_DP_LOG(DEBUG,
528  "%s: status: token = %"PRIu64", wait = %d, Thread QS cnt = %"PRIu64", Thread ID = %d",
529  __func__, t, wait, c, i);
530  /* Counter is not checked for wrap-around condition
531  * as it is a 64b counter.
532  */
533  if (likely(c == __RTE_QSBR_CNT_THR_OFFLINE || c >= t))
534  break;
535 
536  /* This thread is not in quiescent state */
537  if (!wait)
538  return 0;
539 
540  rte_pause();
541  }
542  }
543 
544  return 1;
545 }
546 
581 static __rte_always_inline int __rte_experimental
582 rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
583 {
584  RTE_ASSERT(v != NULL);
585 
586  if (likely(v->num_threads == v->max_threads))
587  return __rte_rcu_qsbr_check_all(v, t, wait);
588  else
589  return __rte_rcu_qsbr_check_selective(v, t, wait);
590 }
591 
613 void __rte_experimental
614 rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id);
615 
634 int __rte_experimental
635 rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
636 
637 #ifdef __cplusplus
638 }
639 #endif
640 
641 #endif /* _RTE_RCU_QSBR_H_ */
#define __rte_always_inline
Definition: rte_common.h:153
#define likely(x)
static __rte_always_inline void __rte_experimental rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
Definition: rte_rcu_qsbr.h:362
static __rte_always_inline void __rte_experimental rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:286
#define __rte_unused
Definition: rte_common.h:84
static __rte_always_inline void __rte_experimental rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:427
int __rte_experimental rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
#define unlikely(x)
static __rte_always_inline void __rte_experimental rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:225
static void rte_pause(void)
int __rte_experimental rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
void __rte_experimental rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id)
static __rte_always_inline uint64_t __rte_experimental rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
Definition: rte_rcu_qsbr.h:395
static __rte_always_inline int __rte_experimental rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
Definition: rte_rcu_qsbr.h:582
int __rte_experimental rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
static __rte_always_inline void __rte_experimental rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
Definition: rte_rcu_qsbr.h:326
#define __rte_cache_aligned
Definition: rte_memory.h:66
size_t __rte_experimental rte_rcu_qsbr_get_memsize(uint32_t max_threads)
static void rte_smp_mb(void)
int __rte_experimental rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)