DPDK  20.11.0
rte_rcu_qsbr.h
Go to the documentation of this file.
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright (c) 2018-2020 Arm Limited
3  */
4 
5 #ifndef _RTE_RCU_QSBR_H_
6 #define _RTE_RCU_QSBR_H_
7 
28 #ifdef __cplusplus
29 extern "C" {
30 #endif
31 
32 #include <stdbool.h>
33 #include <stdio.h>
34 #include <stdint.h>
35 #include <inttypes.h>
36 #include <errno.h>
37 #include <rte_common.h>
38 #include <rte_memory.h>
39 #include <rte_lcore.h>
40 #include <rte_debug.h>
41 #include <rte_atomic.h>
42 #include <rte_ring.h>
43 
44 extern int rte_rcu_log_type;
45 
46 #if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG
47 #define __RTE_RCU_DP_LOG(level, fmt, args...) \
48  rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
49  "%s(): " fmt "\n", __func__, ## args)
50 #else
51 #define __RTE_RCU_DP_LOG(level, fmt, args...)
52 #endif
53 
54 #if defined(RTE_LIBRTE_RCU_DEBUG)
55 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...) do {\
56  if (v->qsbr_cnt[thread_id].lock_cnt) \
57  rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
58  "%s(): " fmt "\n", __func__, ## args); \
59 } while (0)
60 #else
61 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...)
62 #endif
63 
64 /* Registered thread IDs are stored as a bitmap of 64b element array.
65  * Given thread id needs to be converted to index into the array and
66  * the id within the array element.
67  */
68 #define __RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)
69 #define __RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \
70  RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \
71  __RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3, RTE_CACHE_LINE_SIZE)
72 #define __RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \
73  ((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)
74 #define __RTE_QSBR_THRID_INDEX_SHIFT 6
75 #define __RTE_QSBR_THRID_MASK 0x3f
76 #define RTE_QSBR_THRID_INVALID 0xffffffff
77 
78 /* Worker thread counter */
79 struct rte_rcu_qsbr_cnt {
80  uint64_t cnt;
86  uint32_t lock_cnt;
89 
90 #define __RTE_QSBR_CNT_THR_OFFLINE 0
91 #define __RTE_QSBR_CNT_INIT 1
92 #define __RTE_QSBR_CNT_MAX ((uint64_t)~0)
93 #define __RTE_QSBR_TOKEN_SIZE sizeof(uint64_t)
94 
95 /* RTE Quiescent State variable structure.
96  * This structure has two elements that vary in size based on the
97  * 'max_threads' parameter.
98  * 1) Quiescent state counter array
99  * 2) Register thread ID array
100  */
101 struct rte_rcu_qsbr {
102  uint64_t token __rte_cache_aligned;
104  uint64_t acked_token;
109  uint32_t num_elems __rte_cache_aligned;
111  uint32_t num_threads;
113  uint32_t max_threads;
116  struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;
123 
137 typedef void (*rte_rcu_qsbr_free_resource_t)(void *p, void *e, unsigned int n);
138 
139 #define RTE_RCU_QSBR_DQ_NAMESIZE RTE_RING_NAMESIZE
140 
149 #define RTE_RCU_QSBR_DQ_MT_UNSAFE 1
150 
155  const char *name;
157  uint32_t flags;
159  uint32_t size;
166  uint32_t esize;
188  void *p;
193  struct rte_rcu_qsbr *v;
195 };
196 
197 /* RTE defer queue structure.
198  * This structure holds the defer queue. The defer queue is used to
199  * hold the deleted entries from the data structure that are not
200  * yet freed.
201  */
202 struct rte_rcu_qsbr_dq;
203 
215 size_t
216 rte_rcu_qsbr_get_memsize(uint32_t max_threads);
217 
233 int
234 rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
235 
256 int
257 rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id);
258 
274 int
275 rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id);
276 
302 static __rte_always_inline void
303 rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
304 {
305  uint64_t t;
306 
307  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
308 
309  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
310  v->qsbr_cnt[thread_id].lock_cnt);
311 
312  /* Copy the current value of token.
313  * The fence at the end of the function will ensure that
314  * the following will not move down after the load of any shared
315  * data structure.
316  */
317  t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);
318 
319  /* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure
320  * 'cnt' (64b) is accessed atomically.
321  */
322  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
323  t, __ATOMIC_RELAXED);
324 
325  /* The subsequent load of the data structure should not
326  * move above the store. Hence a store-load barrier
327  * is required.
328  * If the load of the data structure moves above the store,
329  * writer might not see that the reader is online, even though
330  * the reader is referencing the shared data structure.
331  */
332 #ifdef RTE_ARCH_X86_64
333  /* rte_smp_mb() for x86 is lighter */
334  rte_smp_mb();
335 #else
336  __atomic_thread_fence(__ATOMIC_SEQ_CST);
337 #endif
338 }
339 
360 static __rte_always_inline void
361 rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
362 {
363  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
364 
365  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
366  v->qsbr_cnt[thread_id].lock_cnt);
367 
368  /* The reader can go offline only after the load of the
369  * data structure is completed. i.e. any load of the
370  * data strcture can not move after this store.
371  */
372 
373  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
374  __RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE);
375 }
376 
397 static __rte_always_inline void
398 rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v,
399  __rte_unused unsigned int thread_id)
400 {
401  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
402 
403 #if defined(RTE_LIBRTE_RCU_DEBUG)
404  /* Increment the lock counter */
405  __atomic_fetch_add(&v->qsbr_cnt[thread_id].lock_cnt,
406  1, __ATOMIC_ACQUIRE);
407 #endif
408 }
409 
430 static __rte_always_inline void
431 rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v,
432  __rte_unused unsigned int thread_id)
433 {
434  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
435 
436 #if defined(RTE_LIBRTE_RCU_DEBUG)
437  /* Decrement the lock counter */
438  __atomic_fetch_sub(&v->qsbr_cnt[thread_id].lock_cnt,
439  1, __ATOMIC_RELEASE);
440 
441  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, WARNING,
442  "Lock counter %u. Nested locks?\n",
443  v->qsbr_cnt[thread_id].lock_cnt);
444 #endif
445 }
446 
460 static __rte_always_inline uint64_t
461 rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
462 {
463  uint64_t t;
464 
465  RTE_ASSERT(v != NULL);
466 
467  /* Release the changes to the shared data structure.
468  * This store release will ensure that changes to any data
469  * structure are visible to the workers before the token
470  * update is visible.
471  */
472  t = __atomic_add_fetch(&v->token, 1, __ATOMIC_RELEASE);
473 
474  return t;
475 }
476 
489 static __rte_always_inline void
490 rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
491 {
492  uint64_t t;
493 
494  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
495 
496  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
497  v->qsbr_cnt[thread_id].lock_cnt);
498 
499  /* Acquire the changes to the shared data structure released
500  * by rte_rcu_qsbr_start.
501  * Later loads of the shared data structure should not move
502  * above this load. Hence, use load-acquire.
503  */
504  t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);
505 
506  /* Check if there are updates available from the writer.
507  * Inform the writer that updates are visible to this reader.
508  * Prior loads of the shared data structure should not move
509  * beyond this store. Hence use store-release.
510  */
511  if (t != __atomic_load_n(&v->qsbr_cnt[thread_id].cnt, __ATOMIC_RELAXED))
512  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
513  t, __ATOMIC_RELEASE);
514 
515  __RTE_RCU_DP_LOG(DEBUG, "%s: update: token = %" PRIu64 ", Thread ID = %d",
516  __func__, t, thread_id);
517 }
518 
519 /* Check the quiescent state counter for registered threads only, assuming
520  * that not all threads have registered.
521  */
522 static __rte_always_inline int
523 __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
524 {
525  uint32_t i, j, id;
526  uint64_t bmap;
527  uint64_t c;
528  uint64_t *reg_thread_id;
529  uint64_t acked_token = __RTE_QSBR_CNT_MAX;
530 
531  for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0);
532  i < v->num_elems;
533  i++, reg_thread_id++) {
534  /* Load the current registered thread bit map before
535  * loading the reader thread quiescent state counters.
536  */
537  bmap = __atomic_load_n(reg_thread_id, __ATOMIC_ACQUIRE);
538  id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
539 
540  while (bmap) {
541  j = __builtin_ctzl(bmap);
542  __RTE_RCU_DP_LOG(DEBUG,
543  "%s: check: token = %" PRIu64 ", wait = %d, Bit Map = 0x%" PRIx64 ", Thread ID = %d",
544  __func__, t, wait, bmap, id + j);
545  c = __atomic_load_n(
546  &v->qsbr_cnt[id + j].cnt,
547  __ATOMIC_ACQUIRE);
548  __RTE_RCU_DP_LOG(DEBUG,
549  "%s: status: token = %" PRIu64 ", wait = %d, Thread QS cnt = %" PRIu64 ", Thread ID = %d",
550  __func__, t, wait, c, id+j);
551 
552  /* Counter is not checked for wrap-around condition
553  * as it is a 64b counter.
554  */
555  if (unlikely(c !=
556  __RTE_QSBR_CNT_THR_OFFLINE && c < t)) {
557  /* This thread is not in quiescent state */
558  if (!wait)
559  return 0;
560 
561  rte_pause();
562  /* This thread might have unregistered.
563  * Re-read the bitmap.
564  */
565  bmap = __atomic_load_n(reg_thread_id,
566  __ATOMIC_ACQUIRE);
567 
568  continue;
569  }
570 
571  /* This thread is in quiescent state. Use the counter
572  * to find the least acknowledged token among all the
573  * readers.
574  */
575  if (c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c)
576  acked_token = c;
577 
578  bmap &= ~(1UL << j);
579  }
580  }
581 
582  /* All readers are checked, update least acknowledged token.
583  * There might be multiple writers trying to update this. There is
584  * no need to update this very accurately using compare-and-swap.
585  */
586  if (acked_token != __RTE_QSBR_CNT_MAX)
587  __atomic_store_n(&v->acked_token, acked_token,
588  __ATOMIC_RELAXED);
589 
590  return 1;
591 }
592 
593 /* Check the quiescent state counter for all threads, assuming that
594  * all the threads have registered.
595  */
596 static __rte_always_inline int
597 __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
598 {
599  uint32_t i;
600  struct rte_rcu_qsbr_cnt *cnt;
601  uint64_t c;
602  uint64_t acked_token = __RTE_QSBR_CNT_MAX;
603 
604  for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {
605  __RTE_RCU_DP_LOG(DEBUG,
606  "%s: check: token = %" PRIu64 ", wait = %d, Thread ID = %d",
607  __func__, t, wait, i);
608  while (1) {
609  c = __atomic_load_n(&cnt->cnt, __ATOMIC_ACQUIRE);
610  __RTE_RCU_DP_LOG(DEBUG,
611  "%s: status: token = %" PRIu64 ", wait = %d, Thread QS cnt = %" PRIu64 ", Thread ID = %d",
612  __func__, t, wait, c, i);
613 
614  /* Counter is not checked for wrap-around condition
615  * as it is a 64b counter.
616  */
617  if (likely(c == __RTE_QSBR_CNT_THR_OFFLINE || c >= t))
618  break;
619 
620  /* This thread is not in quiescent state */
621  if (!wait)
622  return 0;
623 
624  rte_pause();
625  }
626 
627  /* This thread is in quiescent state. Use the counter to find
628  * the least acknowledged token among all the readers.
629  */
630  if (likely(c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c))
631  acked_token = c;
632  }
633 
634  /* All readers are checked, update least acknowledged token.
635  * There might be multiple writers trying to update this. There is
636  * no need to update this very accurately using compare-and-swap.
637  */
638  if (acked_token != __RTE_QSBR_CNT_MAX)
639  __atomic_store_n(&v->acked_token, acked_token,
640  __ATOMIC_RELAXED);
641 
642  return 1;
643 }
644 
676 static __rte_always_inline int
677 rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
678 {
679  RTE_ASSERT(v != NULL);
680 
681  /* Check if all the readers have already acknowledged this token */
682  if (likely(t <= v->acked_token)) {
683  __RTE_RCU_DP_LOG(DEBUG,
684  "%s: check: token = %" PRIu64 ", wait = %d",
685  __func__, t, wait);
686  __RTE_RCU_DP_LOG(DEBUG,
687  "%s: status: least acked token = %" PRIu64,
688  __func__, v->acked_token);
689  return 1;
690  }
691 
692  if (likely(v->num_threads == v->max_threads))
693  return __rte_rcu_qsbr_check_all(v, t, wait);
694  else
695  return __rte_rcu_qsbr_check_selective(v, t, wait);
696 }
697 
716 void
717 rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id);
718 
734 int
735 rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
736 
753 __rte_experimental
754 struct rte_rcu_qsbr_dq *
756 
788 __rte_experimental
789 int
790 rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e);
791 
817 __rte_experimental
818 int
819 rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
820  unsigned int *freed, unsigned int *pending, unsigned int *available);
821 
843 __rte_experimental
844 int
845 rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq);
846 
847 #ifdef __cplusplus
848 }
849 #endif
850 
851 #endif /* _RTE_RCU_QSBR_H_ */
int rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
#define __rte_always_inline
Definition: rte_common.h:226
__rte_experimental int rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n, unsigned int *freed, unsigned int *pending, unsigned int *available)
__rte_experimental int rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)
#define likely(x)
int rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
#define __rte_unused
Definition: rte_common.h:116
static __rte_always_inline int rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
Definition: rte_rcu_qsbr.h:677
static __rte_always_inline void rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:490
struct rte_rcu_qsbr * v
Definition: rte_rcu_qsbr.h:193
static __rte_always_inline void rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:361
int rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
#define unlikely(x)
static __rte_always_inline void rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
Definition: rte_rcu_qsbr.h:431
void(* rte_rcu_qsbr_free_resource_t)(void *p, void *e, unsigned int n)
Definition: rte_rcu_qsbr.h:137
rte_rcu_qsbr_free_resource_t free_fn
Definition: rte_rcu_qsbr.h:186
static void rte_pause(void)
#define __rte_cache_aligned
Definition: rte_common.h:400
static __rte_always_inline void rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:303
static __rte_always_inline void rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
Definition: rte_rcu_qsbr.h:398
static void rte_smp_mb(void)
__rte_experimental struct rte_rcu_qsbr_dq * rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)
static __rte_always_inline uint64_t rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
Definition: rte_rcu_qsbr.h:461
int rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
size_t rte_rcu_qsbr_get_memsize(uint32_t max_threads)
__rte_experimental int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
void rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id)