66#include <linux/errno.h>
77#include <linux/bug.h>
88#include "printk_ringbuffer.h"
9+ #include "internal.h"
910
1011/**
1112 * DOC: printk_ringbuffer overview
303304 *
304305 * desc_push_tail:B / desc_reserve:D
305306 * set descriptor reusable (state), then push descriptor tail (id)
307+ *
308+ * desc_update_last_finalized:A / desc_last_finalized_seq:A
309+ * store finalized record, then set new highest finalized sequence number
306310 */
307311
308312#define DATA_SIZE (data_ring ) _DATA_SIZE((data_ring)->size_bits)
@@ -1441,20 +1445,118 @@ bool prb_reserve_in_last(struct prb_reserved_entry *e, struct printk_ringbuffer
14411445 return false;
14421446}
14431447
1448+ /*
1449+ * @last_finalized_seq value guarantees that all records up to and including
1450+ * this sequence number are finalized and can be read. The only exception are
1451+ * too old records which have already been overwritten.
1452+ *
1453+ * It is also guaranteed that @last_finalized_seq only increases.
1454+ *
1455+ * Be aware that finalized records following non-finalized records are not
1456+ * reported because they are not yet available to the reader. For example,
1457+ * a new record stored via printk() will not be available to a printer if
1458+ * it follows a record that has not been finalized yet. However, once that
1459+ * non-finalized record becomes finalized, @last_finalized_seq will be
1460+ * appropriately updated and the full set of finalized records will be
1461+ * available to the printer. And since each printk() caller will either
1462+ * directly print or trigger deferred printing of all available unprinted
1463+ * records, all printk() messages will get printed.
1464+ */
1465+ static u64 desc_last_finalized_seq (struct printk_ringbuffer * rb )
1466+ {
1467+ struct prb_desc_ring * desc_ring = & rb -> desc_ring ;
1468+ unsigned long ulseq ;
1469+
1470+ /*
1471+ * Guarantee the sequence number is loaded before loading the
1472+ * associated record in order to guarantee that the record can be
1473+ * seen by this CPU. This pairs with desc_update_last_finalized:A.
1474+ */
1475+ ulseq = atomic_long_read_acquire (& desc_ring -> last_finalized_seq
1476+ ); /* LMM(desc_last_finalized_seq:A) */
1477+
1478+ return __ulseq_to_u64seq (rb , ulseq );
1479+ }
1480+
1481+ static bool _prb_read_valid (struct printk_ringbuffer * rb , u64 * seq ,
1482+ struct printk_record * r , unsigned int * line_count );
1483+
1484+ /*
1485+ * Check if there are records directly following @last_finalized_seq that are
1486+ * finalized. If so, update @last_finalized_seq to the latest of these
1487+ * records. It is not allowed to skip over records that are not yet finalized.
1488+ */
1489+ static void desc_update_last_finalized (struct printk_ringbuffer * rb )
1490+ {
1491+ struct prb_desc_ring * desc_ring = & rb -> desc_ring ;
1492+ u64 old_seq = desc_last_finalized_seq (rb );
1493+ unsigned long oldval ;
1494+ unsigned long newval ;
1495+ u64 finalized_seq ;
1496+ u64 try_seq ;
1497+
1498+ try_again :
1499+ finalized_seq = old_seq ;
1500+ try_seq = finalized_seq + 1 ;
1501+
1502+ /* Try to find later finalized records. */
1503+ while (_prb_read_valid (rb , & try_seq , NULL , NULL )) {
1504+ finalized_seq = try_seq ;
1505+ try_seq ++ ;
1506+ }
1507+
1508+ /* No update needed if no later finalized record was found. */
1509+ if (finalized_seq == old_seq )
1510+ return ;
1511+
1512+ oldval = __u64seq_to_ulseq (old_seq );
1513+ newval = __u64seq_to_ulseq (finalized_seq );
1514+
1515+ /*
1516+ * Set the sequence number of a later finalized record that has been
1517+ * seen.
1518+ *
1519+ * Guarantee the record data is visible to other CPUs before storing
1520+ * its sequence number. This pairs with desc_last_finalized_seq:A.
1521+ *
1522+ * Memory barrier involvement:
1523+ *
1524+ * If desc_last_finalized_seq:A reads from
1525+ * desc_update_last_finalized:A, then desc_read:A reads from
1526+ * _prb_commit:B.
1527+ *
1528+ * Relies on:
1529+ *
1530+ * RELEASE from _prb_commit:B to desc_update_last_finalized:A
1531+ * matching
1532+ * ACQUIRE from desc_last_finalized_seq:A to desc_read:A
1533+ *
1534+ * Note: _prb_commit:B and desc_update_last_finalized:A can be
1535+ * different CPUs. However, the desc_update_last_finalized:A
1536+ * CPU (which performs the release) must have previously seen
1537+ * _prb_commit:B.
1538+ */
1539+ if (!atomic_long_try_cmpxchg_release (& desc_ring -> last_finalized_seq ,
1540+ & oldval , newval )) { /* LMM(desc_update_last_finalized:A) */
1541+ old_seq = __ulseq_to_u64seq (rb , oldval );
1542+ goto try_again ;
1543+ }
1544+ }
1545+
14441546/*
14451547 * Attempt to finalize a specified descriptor. If this fails, the descriptor
14461548 * is either already final or it will finalize itself when the writer commits.
14471549 */
1448- static void desc_make_final (struct prb_desc_ring * desc_ring , unsigned long id )
1550+ static void desc_make_final (struct printk_ringbuffer * rb , unsigned long id )
14491551{
1552+ struct prb_desc_ring * desc_ring = & rb -> desc_ring ;
14501553 unsigned long prev_state_val = DESC_SV (id , desc_committed );
14511554 struct prb_desc * d = to_desc (desc_ring , id );
14521555
1453- atomic_long_cmpxchg_relaxed (& d -> state_var , prev_state_val ,
1454- DESC_SV (id , desc_finalized )); /* LMM(desc_make_final:A) */
1455-
1456- /* Best effort to remember the last finalized @id. */
1457- atomic_long_set (& desc_ring -> last_finalized_id , id );
1556+ if (atomic_long_try_cmpxchg_relaxed (& d -> state_var , & prev_state_val ,
1557+ DESC_SV (id , desc_finalized ))) { /* LMM(desc_make_final:A) */
1558+ desc_update_last_finalized (rb );
1559+ }
14581560}
14591561
14601562/**
@@ -1550,7 +1652,7 @@ bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
15501652 * readers. (For seq==0 there is no previous descriptor.)
15511653 */
15521654 if (info -> seq > 0 )
1553- desc_make_final (desc_ring , DESC_ID (id - 1 ));
1655+ desc_make_final (rb , DESC_ID (id - 1 ));
15541656
15551657 r -> text_buf = data_alloc (rb , r -> text_buf_size , & d -> text_blk_lpos , id );
15561658 /* If text data allocation fails, a data-less record is committed. */
@@ -1643,7 +1745,7 @@ void prb_commit(struct prb_reserved_entry *e)
16431745 */
16441746 head_id = atomic_long_read (& desc_ring -> head_id ); /* LMM(prb_commit:A) */
16451747 if (head_id != e -> id )
1646- desc_make_final (desc_ring , e -> id );
1748+ desc_make_final (e -> rb , e -> id );
16471749}
16481750
16491751/**
@@ -1663,12 +1765,9 @@ void prb_commit(struct prb_reserved_entry *e)
16631765 */
16641766void prb_final_commit (struct prb_reserved_entry * e )
16651767{
1666- struct prb_desc_ring * desc_ring = & e -> rb -> desc_ring ;
1667-
16681768 _prb_commit (e , desc_finalized );
16691769
1670- /* Best effort to remember the last finalized @id. */
1671- atomic_long_set (& desc_ring -> last_finalized_id , e -> id );
1770+ desc_update_last_finalized (e -> rb );
16721771}
16731772
16741773/*
@@ -2008,42 +2107,29 @@ u64 prb_first_valid_seq(struct printk_ringbuffer *rb)
20082107 * newest sequence number available to readers will be.
20092108 *
20102109 * This provides readers a sequence number to jump to if all currently
2011- * available records should be skipped.
2110+ * available records should be skipped. It is guaranteed that all records
2111+ * previous to the returned value have been finalized and are (or were)
2112+ * available to the reader.
20122113 *
20132114 * Context: Any context.
20142115 * Return: The sequence number of the next newest (not yet available) record
20152116 * for readers.
20162117 */
20172118u64 prb_next_seq (struct printk_ringbuffer * rb )
20182119{
2019- struct prb_desc_ring * desc_ring = & rb -> desc_ring ;
2020- enum desc_state d_state ;
2021- unsigned long id ;
20222120 u64 seq ;
20232121
2024- /* Check if the cached @id still points to a valid @seq. */
2025- id = atomic_long_read (& desc_ring -> last_finalized_id );
2026- d_state = desc_read (desc_ring , id , NULL , & seq , NULL );
2122+ seq = desc_last_finalized_seq (rb );
20272123
2028- if (d_state == desc_finalized || d_state == desc_reusable ) {
2029- /*
2030- * Begin searching after the last finalized record.
2031- *
2032- * On 0, the search must begin at 0 because of hack#2
2033- * of the bootstrapping phase it is not known if a
2034- * record at index 0 exists.
2035- */
2036- if (seq != 0 )
2037- seq ++ ;
2038- } else {
2039- /*
2040- * The information about the last finalized sequence number
2041- * has gone. It should happen only when there is a flood of
2042- * new messages and the ringbuffer is rapidly recycled.
2043- * Give up and start from the beginning.
2044- */
2045- seq = 0 ;
2046- }
2124+ /*
2125+ * Begin searching after the last finalized record.
2126+ *
2127+ * On 0, the search must begin at 0 because of hack#2
2128+ * of the bootstrapping phase it is not known if a
2129+ * record at index 0 exists.
2130+ */
2131+ if (seq != 0 )
2132+ seq ++ ;
20472133
20482134 /*
20492135 * The information about the last finalized @seq might be inaccurate.
@@ -2085,7 +2171,7 @@ void prb_init(struct printk_ringbuffer *rb,
20852171 rb -> desc_ring .infos = infos ;
20862172 atomic_long_set (& rb -> desc_ring .head_id , DESC0_ID (descbits ));
20872173 atomic_long_set (& rb -> desc_ring .tail_id , DESC0_ID (descbits ));
2088- atomic_long_set (& rb -> desc_ring .last_finalized_id , DESC0_ID ( descbits ) );
2174+ atomic_long_set (& rb -> desc_ring .last_finalized_seq , 0 );
20892175
20902176 rb -> text_data_ring .size_bits = textbits ;
20912177 rb -> text_data_ring .data = text_buf ;
0 commit comments