iceberg/writer/file_writer/
location_generator.rs1use std::sync::Arc;
21use std::sync::atomic::AtomicU64;
22
23use crate::Result;
24use crate::spec::{DataFileFormat, PartitionKey, TableMetadata};
25
26pub trait LocationGenerator: Clone + Send + 'static {
28 fn generate_location(&self, partition_key: Option<&PartitionKey>, file_name: &str) -> String;
42}
43
44const WRITE_DATA_LOCATION: &str = "write.data.path";
45const WRITE_FOLDER_STORAGE_LOCATION: &str = "write.folder-storage.path";
46const DEFAULT_DATA_DIR: &str = "/data";
47
48#[derive(Clone, Debug)]
49pub struct DefaultLocationGenerator {
52 data_location: String,
53}
54
55impl DefaultLocationGenerator {
56 pub fn new(table_metadata: TableMetadata) -> Result<Self> {
58 let table_location = table_metadata.location();
59 let prop = table_metadata.properties();
60 let configured_data_location = prop
61 .get(WRITE_DATA_LOCATION)
62 .or(prop.get(WRITE_FOLDER_STORAGE_LOCATION));
63 let data_location = if let Some(data_location) = configured_data_location {
64 data_location.clone()
65 } else {
66 format!("{}{}", table_location, DEFAULT_DATA_DIR)
67 };
68 Ok(Self { data_location })
69 }
70
71 pub fn with_data_location(data_location: String) -> Self {
77 Self { data_location }
78 }
79}
80
81impl LocationGenerator for DefaultLocationGenerator {
82 fn generate_location(&self, partition_key: Option<&PartitionKey>, file_name: &str) -> String {
83 if PartitionKey::is_effectively_none(partition_key) {
84 format!("{}/{}", self.data_location, file_name)
85 } else {
86 format!(
87 "{}/{}/{}",
88 self.data_location,
89 partition_key.unwrap().to_path(),
90 file_name
91 )
92 }
93 }
94}
95
96pub trait FileNameGenerator: Clone + Send + 'static {
98 fn generate_file_name(&self) -> String;
100}
101
102#[derive(Clone, Debug)]
106pub struct DefaultFileNameGenerator {
107 prefix: String,
108 suffix: String,
109 format: String,
110 file_count: Arc<AtomicU64>,
111}
112
113impl DefaultFileNameGenerator {
114 pub fn new(prefix: String, suffix: Option<String>, format: DataFileFormat) -> Self {
116 let suffix = if let Some(suffix) = suffix {
117 format!("-{}", suffix)
118 } else {
119 "".to_string()
120 };
121
122 Self {
123 prefix,
124 suffix,
125 format: format.to_string(),
126 file_count: Arc::new(AtomicU64::new(0)),
127 }
128 }
129}
130
131impl FileNameGenerator for DefaultFileNameGenerator {
132 fn generate_file_name(&self) -> String {
133 let file_id = self
134 .file_count
135 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
136 format!(
137 "{}-{:05}{}.{}",
138 self.prefix, file_id, self.suffix, self.format
139 )
140 }
141}
142
143#[cfg(test)]
144pub(crate) mod test {
145 use std::collections::HashMap;
146 use std::sync::Arc;
147
148 use uuid::Uuid;
149
150 use super::LocationGenerator;
151 use crate::spec::{
152 FormatVersion, Literal, NestedField, PartitionKey, PartitionSpec, PrimitiveType, Schema,
153 Struct, StructType, TableMetadata, Transform, Type,
154 };
155 use crate::writer::file_writer::location_generator::{
156 DefaultLocationGenerator, FileNameGenerator, WRITE_DATA_LOCATION,
157 WRITE_FOLDER_STORAGE_LOCATION,
158 };
159
160 #[test]
161 fn test_default_location_generate() {
162 let mut table_metadata = TableMetadata {
163 format_version: FormatVersion::V2,
164 table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
165 location: "s3://data.db/table".to_string(),
166 last_updated_ms: 1515100955770,
167 last_column_id: 1,
168 schemas: HashMap::new(),
169 current_schema_id: 1,
170 partition_specs: HashMap::new(),
171 default_spec: PartitionSpec::unpartition_spec().into(),
172 default_partition_type: StructType::new(vec![]),
173 last_partition_id: 1000,
174 default_sort_order_id: 0,
175 sort_orders: HashMap::from_iter(vec![]),
176 snapshots: HashMap::default(),
177 current_snapshot_id: None,
178 last_sequence_number: 1,
179 properties: HashMap::new(),
180 snapshot_log: Vec::new(),
181 metadata_log: vec![],
182 refs: HashMap::new(),
183 statistics: HashMap::new(),
184 partition_statistics: HashMap::new(),
185 encryption_keys: HashMap::new(),
186 };
187
188 let file_name_generator = super::DefaultFileNameGenerator::new(
189 "part".to_string(),
190 Some("test".to_string()),
191 crate::spec::DataFileFormat::Parquet,
192 );
193
194 let location_generator =
196 super::DefaultLocationGenerator::new(table_metadata.clone()).unwrap();
197 let location =
198 location_generator.generate_location(None, &file_name_generator.generate_file_name());
199 assert_eq!(location, "s3://data.db/table/data/part-00000-test.parquet");
200
201 table_metadata.properties.insert(
203 WRITE_FOLDER_STORAGE_LOCATION.to_string(),
204 "s3://data.db/table/data_1".to_string(),
205 );
206 let location_generator =
207 super::DefaultLocationGenerator::new(table_metadata.clone()).unwrap();
208 let location =
209 location_generator.generate_location(None, &file_name_generator.generate_file_name());
210 assert_eq!(
211 location,
212 "s3://data.db/table/data_1/part-00001-test.parquet"
213 );
214
215 table_metadata.properties.insert(
216 WRITE_DATA_LOCATION.to_string(),
217 "s3://data.db/table/data_2".to_string(),
218 );
219 let location_generator =
220 super::DefaultLocationGenerator::new(table_metadata.clone()).unwrap();
221 let location =
222 location_generator.generate_location(None, &file_name_generator.generate_file_name());
223 assert_eq!(
224 location,
225 "s3://data.db/table/data_2/part-00002-test.parquet"
226 );
227
228 table_metadata.properties.insert(
229 WRITE_DATA_LOCATION.to_string(),
230 "s3://data.db/data_3".to_string(),
232 );
233 let location_generator =
234 super::DefaultLocationGenerator::new(table_metadata.clone()).unwrap();
235 let location =
236 location_generator.generate_location(None, &file_name_generator.generate_file_name());
237 assert_eq!(location, "s3://data.db/data_3/part-00003-test.parquet");
238 }
239
240 #[test]
241 fn test_location_generate_with_partition() {
242 let schema = Arc::new(
244 Schema::builder()
245 .with_schema_id(1)
246 .with_fields(vec![
247 NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
248 NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
249 ])
250 .build()
251 .unwrap(),
252 );
253
254 let partition_spec = PartitionSpec::builder(schema.clone())
256 .add_partition_field("id", "id", Transform::Identity)
257 .unwrap()
258 .add_partition_field("name", "name", Transform::Identity)
259 .unwrap()
260 .build()
261 .unwrap();
262
263 let partition_data =
265 Struct::from_iter([Some(Literal::int(42)), Some(Literal::string("alice"))]);
266
267 let partition_key = PartitionKey::new(partition_spec, schema, partition_data);
269
270 let location_gen = DefaultLocationGenerator::with_data_location("/base/path".to_string());
271 let file_name = "data-00000.parquet";
272 let location = location_gen.generate_location(Some(&partition_key), file_name);
273 assert_eq!(location, "/base/path/id=42/name=alice/data-00000.parquet");
274
275 let table_metadata = TableMetadata {
277 format_version: FormatVersion::V2,
278 table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
279 location: "s3://data.db/table".to_string(),
280 last_updated_ms: 1515100955770,
281 last_column_id: 2,
282 schemas: HashMap::new(),
283 current_schema_id: 1,
284 partition_specs: HashMap::new(),
285 default_spec: PartitionSpec::unpartition_spec().into(),
286 default_partition_type: StructType::new(vec![]),
287 last_partition_id: 1000,
288 default_sort_order_id: 0,
289 sort_orders: HashMap::from_iter(vec![]),
290 snapshots: HashMap::default(),
291 current_snapshot_id: None,
292 last_sequence_number: 1,
293 properties: HashMap::new(),
294 snapshot_log: Vec::new(),
295 metadata_log: vec![],
296 refs: HashMap::new(),
297 statistics: HashMap::new(),
298 partition_statistics: HashMap::new(),
299 encryption_keys: HashMap::new(),
300 };
301
302 let default_location_gen = super::DefaultLocationGenerator::new(table_metadata).unwrap();
304 let location = default_location_gen.generate_location(Some(&partition_key), file_name);
305 assert_eq!(
306 location,
307 "s3://data.db/table/data/id=42/name=alice/data-00000.parquet"
308 );
309 }
310}