@@ -15,26 +15,18 @@ use crate::storage::state_backend::key_builder::{build_key, increment_key, is_al
1515use crate :: storage:: state_backend:: store:: { StateIterator , StateStore } ;
1616use rocksdb:: {
1717 BlockBasedOptions , Cache , ColumnFamilyDescriptor , DB , DBCompressionType , Direction ,
18- IteratorMode , Options , ReadOptions , WriteBatch ,
18+ IteratorMode , Options , ReadOptions , WriteBatch , WriteOptions ,
1919} ;
2020use std:: path:: Path ;
2121use std:: sync:: Arc ;
2222
2323pub struct RocksDBStateStore {
2424 db : Arc < DB > ,
2525 cf_name : String ,
26+ write_opts : WriteOptions ,
2627}
2728
2829impl RocksDBStateStore {
29- /// Create a new state store instance from factory
30- ///
31- /// # Arguments
32- /// - `db`: database instance
33- /// - `column_family`: optional column family name
34- ///
35- /// # Returns
36- /// - `Ok(Box<dyn StateStore>)`: successfully created
37- /// - `Err(BackendError)`: creation failed
3830 pub fn new_with_factory (
3931 db : Arc < DB > ,
4032 column_family : Option < String > ,
@@ -46,7 +38,16 @@ impl RocksDBStateStore {
4638 cf_name
4739 ) ) ) ;
4840 }
49- Ok ( Box :: new ( Self { db, cf_name } ) )
41+
42+ let mut write_opts = WriteOptions :: default ( ) ;
43+ write_opts. set_sync ( false ) ;
44+ write_opts. disable_wal ( false ) ;
45+
46+ Ok ( Box :: new ( Self {
47+ db,
48+ cf_name,
49+ write_opts,
50+ } ) )
5051 }
5152
5253 pub fn open < P : AsRef < Path > > ( path : P , cf_name : Option < String > ) -> Result < Self , BackendError > {
@@ -55,11 +56,14 @@ impl RocksDBStateStore {
5556 opts. create_missing_column_families ( true ) ;
5657 opts. set_merge_operator_associative ( "appendOp" , merge_operator) ;
5758 opts. set_compression_type ( DBCompressionType :: Lz4 ) ;
59+ opts. set_enable_pipelined_write ( true ) ;
60+ opts. increase_parallelism ( num_cpus:: get ( ) as i32 ) ;
5861
5962 let mut block_opts = BlockBasedOptions :: default ( ) ;
6063 block_opts. set_block_size ( 16 * 1024 ) ;
6164 block_opts. set_cache_index_and_filter_blocks ( true ) ;
62- block_opts. set_block_cache ( & Cache :: new_lru_cache ( 128 * 1024 * 1024 ) ) ;
65+ block_opts. set_pin_l0_filter_and_index_blocks_in_cache ( true ) ;
66+ block_opts. set_block_cache ( & Cache :: new_lru_cache ( 256 * 1024 * 1024 ) ) ;
6367 opts. set_block_based_table_factory ( & block_opts) ;
6468
6569 let target_cf = cf_name. unwrap_or_else ( || "default" . to_string ( ) ) ;
@@ -72,9 +76,13 @@ impl RocksDBStateStore {
7276 let db = DB :: open_cf_descriptors ( & opts, path, cf_descriptors)
7377 . map_err ( |e| BackendError :: IoError ( e. to_string ( ) ) ) ?;
7478
79+ let mut write_opts = WriteOptions :: default ( ) ;
80+ write_opts. set_sync ( false ) ;
81+
7582 Ok ( Self {
7683 db : Arc :: new ( db) ,
7784 cf_name : target_cf,
85+ write_opts,
7886 } )
7987 }
8088
@@ -90,7 +98,7 @@ impl StateStore for RocksDBStateStore {
9098 fn put_state ( & self , key : Vec < u8 > , value : Vec < u8 > ) -> Result < ( ) , BackendError > {
9199 let cf = self . cf_handle ( ) ?;
92100 self . db
93- . put_cf ( & cf, key, value)
101+ . put_cf_opt ( & cf, key, value, & self . write_opts )
94102 . map_err ( |e| BackendError :: IoError ( e. to_string ( ) ) )
95103 }
96104
@@ -104,14 +112,15 @@ impl StateStore for RocksDBStateStore {
104112 fn delete_state ( & self , key : Vec < u8 > ) -> Result < ( ) , BackendError > {
105113 let cf = self . cf_handle ( ) ?;
106114 self . db
107- . delete_cf ( & cf, key)
115+ . delete_cf_opt ( & cf, key, & self . write_opts )
108116 . map_err ( |e| BackendError :: IoError ( e. to_string ( ) ) )
109117 }
110118
111119 fn list_states ( & self , start : Vec < u8 > , end : Vec < u8 > ) -> Result < Vec < Vec < u8 > > , BackendError > {
112120 let cf = self . cf_handle ( ) ?;
113121 let mut ropts = ReadOptions :: default ( ) ;
114122 ropts. set_iterate_upper_bound ( end. clone ( ) ) ;
123+ ropts. set_readahead_size ( 2 * 1024 * 1024 ) ;
115124
116125 let iter =
117126 self . db
@@ -139,7 +148,7 @@ impl StateStore for RocksDBStateStore {
139148 let cf = self . cf_handle ( ) ?;
140149 let full_key = build_key ( & key_group, & key, & namespace, & user_key) ;
141150 self . db
142- . merge_cf ( & cf, full_key, value)
151+ . merge_cf_opt ( & cf, full_key, value, & self . write_opts )
143152 . map_err ( |e| BackendError :: IoError ( e. to_string ( ) ) )
144153 }
145154
@@ -152,7 +161,7 @@ impl StateStore for RocksDBStateStore {
152161 if !is_all_0xff ( & prefix) {
153162 let end_key = increment_key ( & prefix) ;
154163 self . db
155- . delete_range_cf ( & cf, & prefix, & end_key)
164+ . delete_range_cf_opt ( & cf, & prefix, & end_key, & self . write_opts )
156165 . map_err ( |e| BackendError :: IoError ( e. to_string ( ) ) ) ?;
157166 return Ok ( 0 ) ;
158167 }
@@ -170,13 +179,13 @@ impl StateStore for RocksDBStateStore {
170179 count += 1 ;
171180 if count % 1000 == 0 {
172181 self . db
173- . write ( batch)
182+ . write_opt ( batch, & self . write_opts )
174183 . map_err ( |e| BackendError :: IoError ( e. to_string ( ) ) ) ?;
175184 batch = WriteBatch :: default ( ) ;
176185 }
177186 }
178187 self . db
179- . write ( batch)
188+ . write_opt ( batch, & self . write_opts )
180189 . map_err ( |e| BackendError :: IoError ( e. to_string ( ) ) ) ?;
181190 Ok ( count)
182191 }
@@ -204,6 +213,7 @@ impl RocksDBStateIterator {
204213 . ok_or_else ( || BackendError :: Other ( "CF missing" . into ( ) ) ) ?;
205214 let mut ropts = ReadOptions :: default ( ) ;
206215 ropts. set_prefix_same_as_start ( true ) ;
216+ ropts. set_readahead_size ( 1024 * 1024 ) ;
207217
208218 let iter =
209219 db. iterator_cf_opt ( & cf, ropts, IteratorMode :: From ( & prefix, Direction :: Forward ) ) ;
0 commit comments