diff --git a/src/storage/chunk_engine/src/core/engine.rs b/src/storage/chunk_engine/src/core/engine.rs index 96e33a4..728e44c 100644 --- a/src/storage/chunk_engine/src/core/engine.rs +++ b/src/storage/chunk_engine/src/core/engine.rs @@ -656,22 +656,28 @@ impl Engine { end: impl AsRef<[u8]>, max_count: u64, ) -> Result { - let mut chunks = self.meta_store.query_chunks(begin, end, max_count)?; - + let chunks = self.meta_store.query_chunks(begin, end, max_count)?; + let chunks_len = chunks.len(); const BATCH_SIZE: Size = Size::mebibyte(1); let mut write_batch = RocksDB::new_write_batch(); - for (chunk_id, meta) in &mut chunks { - if write_batch.size_in_bytes() >= BATCH_SIZE.0 as usize { + let mut persist_end = 0; + for (index, (chunk_id, meta)) in chunks.iter().enumerate() { + if write_batch.size_in_bytes() >= BATCH_SIZE.0 as _ { self.meta_store.write(write_batch, true)?; + for i in persist_end..index + 1 { + self.meta_cache.remove(&chunks[i].0); + } + persist_end = index + 1; write_batch = RocksDB::new_write_batch(); } - self.meta_store - .remove_mut(chunk_id, meta, &mut write_batch)?; - self.meta_cache.remove(chunk_id); + .remove_mut(&chunk_id, &meta, &mut write_batch)?; } if !write_batch.is_empty() { self.meta_store.write(write_batch, true)?; + for i in persist_end..chunks_len { + self.meta_cache.remove(&chunks[i].0); + } } Ok(chunks.len() as _) }