From 1e45f45f4481129709af145051aedd54fb733c28 Mon Sep 17 00:00:00 2001 From: Vinnam Kim Date: Fri, 22 Sep 2023 09:59:38 +0900 Subject: [PATCH] Enhance Datumaro data format stream importer performance (#1153) - Ticket no. 120785 - Change streaming import logic with DatumPageMapper implemented in Rust | Before | After | | :-: | :-: | | ![image](https://github.com/openvinotoolkit/datumaro/assets/26541465/0a06ddc0-5256-45b4-af03-e9299b8e61b8) | ![image](https://github.com/openvinotoolkit/datumaro/assets/26541465/af76210b-8fb5-4b30-aec1-2b5a22856ef7) | Signed-off-by: Kim, Vinnam --- CHANGELOG.md | 4 + rust/Cargo.toml | 1 + rust/src/coco_page_mapper.rs | 246 +++++++++--------- rust/src/datum_page_mapper.rs | 236 +++++++++++++++++ rust/src/lib.rs | 105 +------- rust/src/page_mapper.rs | 62 +++++ rust/src/page_maps.rs | 77 ++++-- rust/src/test_helpers.rs | 18 ++ rust/src/utils.rs | 40 +++ .../plugins/data_formats/datumaro/base.py | 72 ++--- .../data_formats/datumaro/page_mapper.py | 71 +++++ tests/conftest.py | 4 + tests/utils/test_utils.py | 34 ++- 13 files changed, 659 insertions(+), 311 deletions(-) create mode 100644 rust/src/datum_page_mapper.rs create mode 100644 rust/src/page_mapper.rs create mode 100644 rust/src/test_helpers.rs create mode 100644 src/datumaro/plugins/data_formats/datumaro/page_mapper.py diff --git a/CHANGELOG.md b/CHANGELOG.md index ffa20eccfd..3e6697c4d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Coco exporter can export annotations even if there is no media, except for mask annotations which require media info. ()() +### Enhancements +- Enhance Datumaro data format stream importer performance + () + ## 15/09/2023 - Release 1.5.0 ### New features - Add tabular data import/export diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 8b5f36c84a..a4c50ac0d2 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -11,3 +11,4 @@ pyo3 = "0.19.2" serde = { version = "1.0.180", features = ["derive"] } serde_json = "1.0.104" strum = { version = "0.25", features = ["derive"] } +tempfile = "3.8.0" diff --git a/rust/src/coco_page_mapper.rs b/rust/src/coco_page_mapper.rs index 8d91ef49e1..d7a1813708 100644 --- a/rust/src/coco_page_mapper.rs +++ b/rust/src/coco_page_mapper.rs @@ -9,12 +9,15 @@ use std::{ use strum::EnumString; use crate::{ + page_mapper::{JsonPageMapper, ParsedJsonSection}, page_maps::{AnnPageMap, ImgPageMap, JsonDict}, - utils::{invalid_data, parse_serde_json_value, read_skipping_ws}, + utils::{convert_to_py_object, invalid_data, parse_serde_json_value, read_skipping_ws}, }; +use pyo3::{prelude::*, types::PyList}; +use std::{fs::File, io::BufReader, path::Path}; #[derive(EnumString, Debug)] -pub enum CocoJsonSection { +enum CocoJsonSection { #[strum(ascii_case_insensitive)] LICENSES(JsonDict), #[strum(ascii_case_insensitive)] @@ -22,21 +25,67 @@ pub enum CocoJsonSection { #[strum(ascii_case_insensitive)] CATEGORIES(JsonDict), #[strum(ascii_case_insensitive)] - IMAGES(ImgPageMap), + IMAGES(ImgPageMap), #[strum(ascii_case_insensitive)] ANNOTATIONS(AnnPageMap), } +impl ParsedJsonSection for CocoJsonSection { + fn parse( + buf_key: String, + mut reader: impl Read + Seek, + ) -> Result, io::Error> { + match CocoJsonSection::from_str(buf_key.as_str()) { + Ok(curr_key) => { + while let Ok(c) = read_skipping_ws(&mut reader) { + if c == b':' { + break; + } + } + match curr_key { + CocoJsonSection::LICENSES(_) => { + let v = parse_serde_json_value(reader)?; + Ok(Box::new(CocoJsonSection::LICENSES(v))) + } + CocoJsonSection::INFO(_) => { + let v = parse_serde_json_value(reader)?; + Ok(Box::new(CocoJsonSection::INFO(v))) + } + CocoJsonSection::CATEGORIES(_) => { + let v = parse_serde_json_value(reader)?; + Ok(Box::new(CocoJsonSection::CATEGORIES(v))) + } + CocoJsonSection::IMAGES(_) => { + let v = ImgPageMap::from_reader(reader)?; + Ok(Box::new(CocoJsonSection::IMAGES(v))) + } + CocoJsonSection::ANNOTATIONS(_) => { + let v = AnnPageMap::from_reader(reader)?; + Ok(Box::new(CocoJsonSection::ANNOTATIONS(v))) + } + } + } + Err(e) => { + let cur_pos = reader.stream_position()?; + let msg = format!("Unknown key: {} at pos: {}", e, cur_pos); + Err(invalid_data(msg.as_str())) + } + } + } +} + #[derive(Debug)] -pub struct CocoPageMapper { +struct CocoPageMapperImpl { licenses: JsonDict, info: JsonDict, categories: JsonDict, - images: ImgPageMap, + images: ImgPageMap, annotations: AnnPageMap, } -impl CocoPageMapper { +impl JsonPageMapper for CocoPageMapperImpl {} + +impl CocoPageMapperImpl { pub fn licenses(&self) -> &JsonDict { return &self.licenses; } @@ -51,7 +100,7 @@ impl CocoPageMapper { } pub fn get_item_dict( &self, - img_id: i64, + img_id: &i64, mut reader: impl Read + Seek, ) -> Result { self.images.get_dict(&mut reader, img_id) @@ -63,7 +112,6 @@ impl CocoPageMapper { ) -> Result, io::Error> { self.annotations.get_anns(&mut reader, img_id) } - pub fn new(mut reader: impl Read + Seek) -> Result { let sections = Self::parse_json(&mut reader)?; @@ -74,7 +122,7 @@ impl CocoPageMapper { let mut annotations = None; for section in sections { - match section { + match *section { CocoJsonSection::LICENSES(v) => { licenses = Some(v); } @@ -100,7 +148,7 @@ impl CocoPageMapper { let annotations = annotations.ok_or(invalid_data("Cannot find the annotations section."))?; - Ok(CocoPageMapper { + Ok(CocoPageMapperImpl { licenses, info, categories, @@ -108,133 +156,72 @@ impl CocoPageMapper { annotations, }) } +} - fn parse_json(mut reader: impl Read + Seek) -> Result, io::Error> { - let mut brace_level = 0; - let mut coco_json_sections = Vec::new(); - - while let Ok(c) = read_skipping_ws(&mut reader) { - match c { - b'{' => brace_level += 1, - b'"' => { - let mut buf_key = Vec::new(); - while let Ok(c) = read_skipping_ws(&mut reader) { - if c == b'"' { - break; - } - buf_key.push(c); - } - match String::from_utf8(buf_key.clone()) { - Ok(key) => { - let section = Self::parse_section_from_key(key, &mut reader)?; - coco_json_sections.push(section); - } - Err(e) => { - let cur_pos = reader.stream_position()?; - let msg = format!( - "Section key buffer, {:?} is invalid at pos: {}. {}", - buf_key, cur_pos, e - ); - let err = invalid_data(msg.as_str()); - return Err(err); - } - } - } - b',' => { - continue; - } - b'}' => { - brace_level -= 1; - if brace_level == 0 { - break; - } - } - _ => { - let cur_pos = reader.stream_position()?; - let msg = format!("{} is invalid character at pos: {}", c, cur_pos); - let err = invalid_data(msg.as_str()); - return Err(err); - } - } - } - Ok(coco_json_sections) +#[pyclass] +pub struct CocoPageMapper { + reader: BufReader, + mapper: CocoPageMapperImpl, +} + +#[pymethods] +impl CocoPageMapper { + #[new] + fn py_new(path: String) -> PyResult { + let file = File::open(Path::new(&path))?; + let mut reader = BufReader::new(file); + let mapper = CocoPageMapperImpl::new(&mut reader)?; + + Ok(CocoPageMapper { reader, mapper }) } - fn parse_section_from_key( - buf_key: String, - mut reader: impl Read + Seek, - ) -> Result { - match CocoJsonSection::from_str(buf_key.as_str()) { - Ok(curr_key) => { - while let Ok(c) = read_skipping_ws(&mut reader) { - if c == b':' { - break; - } - } - match curr_key { - CocoJsonSection::LICENSES(_) => { - let v = parse_serde_json_value(reader)?; - Ok(CocoJsonSection::LICENSES(v)) - } - CocoJsonSection::INFO(_) => { - let v = parse_serde_json_value(reader)?; - Ok(CocoJsonSection::INFO(v)) - } - CocoJsonSection::CATEGORIES(_) => { - let v = parse_serde_json_value(reader)?; - Ok(CocoJsonSection::CATEGORIES(v)) - } - CocoJsonSection::IMAGES(_) => { - let v = ImgPageMap::from_reader(reader)?; - Ok(CocoJsonSection::IMAGES(v)) - } - CocoJsonSection::ANNOTATIONS(_) => { - let v = AnnPageMap::from_reader(reader)?; - Ok(CocoJsonSection::ANNOTATIONS(v)) - } - } - } - Err(e) => { - let cur_pos = reader.stream_position()?; - let msg = format!("Unknown key: {} at pos: {}", e, cur_pos); - Err(invalid_data(msg.as_str())) - } - } + fn licenses(self_: PyRef) -> PyResult { + convert_to_py_object(self_.mapper.licenses(), self_.py()) + } + + fn info(self_: PyRef) -> PyResult { + convert_to_py_object(self_.mapper.info(), self_.py()) + } + + fn categories(self_: PyRef) -> PyResult { + convert_to_py_object(self_.mapper.categories(), self_.py()) + } + + fn get_item_dict(&mut self, py: Python<'_>, img_id: i64) -> PyResult { + let item_dict = self.mapper.get_item_dict(&img_id, &mut self.reader)?; + Ok(convert_to_py_object(&item_dict, py)?) + } + + fn get_anns_dict(&mut self, py: Python<'_>, img_id: i64) -> PyResult { + let anns_list = PyList::new( + py, + self.mapper + .get_anns_dict(img_id, &mut self.reader)? + .iter() + .map(|child| convert_to_py_object(child, py).unwrap()), + ); + Ok(anns_list.into()) + } + + fn get_img_ids(&self) -> Vec { + self.mapper.get_img_ids().to_owned() + } + + fn __len__(&self) -> PyResult { + Ok(self.mapper.get_img_ids().len()) } } #[cfg(test)] mod tests { - use std::{ - env::temp_dir, - fs::{File, OpenOptions}, - io::{BufReader, Write}, - }; - use super::*; - - fn prepare(example: &str) -> (BufReader, CocoPageMapper) { - let filepath = temp_dir().join("tmp.json"); - - let mut f = OpenOptions::new() - .read(false) - .write(true) - .create(true) - .open(&filepath) - .expect("cannot open file"); - let _ = f.write_all(example.as_bytes()); - let f = File::open(&filepath).expect("cannot open file"); - let mut reader = BufReader::new(f); - let coco_page_mapper = CocoPageMapper::new(&mut reader).unwrap(); - - (reader, coco_page_mapper) - } + use crate::test_helpers::prepare_reader; #[test] fn test_instance() { const EXAMPLE: &str = r#" { - "licenses":[{"name":"","id":0,"url":""}], + "licenses":[{"name":"test_instance()","id":0,"url":""}], "info":{"contributor":"","date_created":"","description":"","url":"","version":"","year":""}, "categories":[ {"id":1,"name":"a","supercategory":""}, @@ -254,12 +241,15 @@ mod tests { ] }"#; - let (mut reader, coco_page_mapper) = prepare(EXAMPLE); + let (tempfile, mut reader) = prepare_reader(EXAMPLE); + let coco_page_mapper = CocoPageMapperImpl::new(&mut reader).unwrap(); println!("{:?}", coco_page_mapper); for img_id in [5, 6] { - let item = coco_page_mapper.get_item_dict(img_id, &mut reader).unwrap(); + let item = coco_page_mapper + .get_item_dict(&img_id, &mut reader) + .unwrap(); assert_eq!(item["id"].as_i64(), Some(img_id)); @@ -278,7 +268,8 @@ mod tests { {"licenses": [{"name": "", "id": 0, "url": ""}], "info": {"contributor": "", "date_created": "", "description": "", "url": "", "version": "", "year": ""}, "categories": [], "images": [{"id": 1, "width": 2, "height": 4, "file_name": "1.jpg", "license": 0, "flickr_url": "", "coco_url": "", "date_captured": 0}], "annotations": []} "#; - let (mut reader, coco_page_mapper) = prepare(EXAMPLE); + let (tempfile, mut reader) = prepare_reader(EXAMPLE); + let coco_page_mapper = CocoPageMapperImpl::new(&mut reader).unwrap(); println!("{:?}", coco_page_mapper); } @@ -289,7 +280,8 @@ mod tests { {"licenses":[{"name":"","id":0,"url":""}],"info":{"contributor":"","date_created":"","description":"","url":"","version":"","year":""},"categories":[{"id":1,"name":"0","supercategory":"","isthing":0},{"id":2,"name":"1","supercategory":"","isthing":0},{"id":3,"name":"2","supercategory":"","isthing":0},{"id":4,"name":"3","supercategory":"","isthing":0},{"id":5,"name":"4","supercategory":"","isthing":0},{"id":6,"name":"5","supercategory":"","isthing":0},{"id":7,"name":"6","supercategory":"","isthing":0},{"id":8,"name":"7","supercategory":"","isthing":0},{"id":9,"name":"8","supercategory":"","isthing":0},{"id":10,"name":"9","supercategory":"","isthing":0}],"images":[{"id":1,"width":4,"height":4,"file_name":"1.jpg","license":0,"flickr_url":"","coco_url":"","date_captured":0}],"annotations":[{"image_id":1,"file_name":"1.png","segments_info":[{"id":3,"category_id":5,"area":5.0,"bbox":[1.0,0.0,2.0,2.0],"iscrowd":0}]}]} "#; - let (mut reader, coco_page_mapper) = prepare(EXAMPLE); + let (tempfile, mut reader) = prepare_reader(EXAMPLE); + let coco_page_mapper = CocoPageMapperImpl::new(&mut reader).unwrap(); println!("{:?}", coco_page_mapper); } diff --git a/rust/src/datum_page_mapper.rs b/rust/src/datum_page_mapper.rs new file mode 100644 index 0000000000..1a4171dc8d --- /dev/null +++ b/rust/src/datum_page_mapper.rs @@ -0,0 +1,236 @@ +// Copyright (C) 2023 Intel Corporation +// +// SPDX-License-Identifier: MIT + +use std::{ + fs::File, + io::{self, BufReader, Read, Seek}, + path::Path, + str::FromStr, +}; +use strum::EnumString; + +use crate::{ + page_mapper::{JsonPageMapper, ParsedJsonSection}, + page_maps::{ImgPageMap, JsonDict}, + utils::{convert_to_py_object, invalid_data, parse_serde_json_value, read_skipping_ws}, +}; +use pyo3::prelude::*; +use serde_json::json; +#[derive(EnumString, Debug)] +pub enum DatumJsonSection { + #[strum(ascii_case_insensitive)] + DM_FORMAT_VERSION(String), + #[strum(ascii_case_insensitive)] + MEDIA_TYPE(i64), + #[strum(ascii_case_insensitive)] + INFOS(JsonDict), + #[strum(ascii_case_insensitive)] + CATEGORIES(JsonDict), + #[strum(ascii_case_insensitive)] + ITEMS(ImgPageMap), +} + +impl ParsedJsonSection for DatumJsonSection { + fn parse( + buf_key: String, + mut reader: impl Read + Seek, + ) -> Result, io::Error> { + match DatumJsonSection::from_str(buf_key.as_str()) { + Ok(curr_key) => { + while let Ok(c) = read_skipping_ws(&mut reader) { + if c == b':' { + break; + } + } + match curr_key { + DatumJsonSection::DM_FORMAT_VERSION(_) => { + let v = parse_serde_json_value(reader)? + .as_str() + .ok_or(invalid_data( + "Cannot parse datumaro format version from the json file", + ))? + .to_string(); + Ok(Box::new(DatumJsonSection::DM_FORMAT_VERSION(v))) + } + DatumJsonSection::MEDIA_TYPE(_) => { + let v = parse_serde_json_value(reader)? + .as_i64() + .ok_or(invalid_data("Cannot parse media type from the json file"))?; + Ok(Box::new(DatumJsonSection::MEDIA_TYPE(v))) + } + DatumJsonSection::INFOS(_) => { + let v = parse_serde_json_value(reader)?; + Ok(Box::new(DatumJsonSection::INFOS(v))) + } + DatumJsonSection::CATEGORIES(_) => { + let v = parse_serde_json_value(reader)?; + Ok(Box::new(DatumJsonSection::CATEGORIES(v))) + } + DatumJsonSection::ITEMS(_) => { + let v = ImgPageMap::from_reader(reader)?; + Ok(Box::new(DatumJsonSection::ITEMS(v))) + } + } + } + Err(e) => { + let cur_pos = reader.stream_position()?; + let msg = format!("Unknown key: {} at pos: {}", e, cur_pos); + Err(invalid_data(msg.as_str())) + } + } + } +} + +#[derive(Debug)] +pub struct DatumPageMapperImpl { + dm_format_version: Option, + media_type: Option, + infos: JsonDict, + categories: JsonDict, + items: ImgPageMap, +} + +impl JsonPageMapper for DatumPageMapperImpl {} + +impl DatumPageMapperImpl { + pub fn dm_format_version(&self) -> &Option { + return &self.dm_format_version; + } + pub fn media_type(&self) -> &Option { + return &self.media_type; + } + pub fn infos(&self) -> &JsonDict { + return &self.infos; + } + pub fn categories(&self) -> &JsonDict { + return &self.categories; + } + pub fn get_img_ids(&self) -> &Vec { + self.items.ids() + } + pub fn get_item_dict( + &self, + img_id: &String, + mut reader: impl Read + Seek, + ) -> Result { + self.items.get_dict(&mut reader, img_id) + } + + pub fn new(mut reader: impl Read + Seek) -> Result { + let sections = Self::parse_json(&mut reader)?; + + let mut dm_format_version = None; + let mut media_type = None; + let mut infos = None; + let mut categories = None; + let mut items = None; + + for section in sections { + match *section { + DatumJsonSection::DM_FORMAT_VERSION(v) => { + dm_format_version = Some(v); + } + DatumJsonSection::MEDIA_TYPE(v) => { + media_type = Some(v); + } + DatumJsonSection::INFOS(v) => { + infos = Some(v); + } + DatumJsonSection::CATEGORIES(v) => { + categories = Some(v); + } + DatumJsonSection::ITEMS(v) => { + items = Some(v); + } + } + } + let infos = infos.unwrap_or(json!({})); + let categories = categories.ok_or(invalid_data("Cannot find the categories section."))?; + let items = items.ok_or(invalid_data("Cannot find the items section."))?; + + Ok(DatumPageMapperImpl { + dm_format_version, + media_type, + infos, + categories, + items, + }) + } +} + +#[pyclass] +pub struct DatumPageMapper { + reader: BufReader, + mapper: DatumPageMapperImpl, +} + +#[pymethods] +impl DatumPageMapper { + #[new] + fn py_new(path: String) -> PyResult { + let file = File::open(Path::new(&path))?; + let mut reader = BufReader::new(file); + let mapper = DatumPageMapperImpl::new(&mut reader)?; + + Ok(DatumPageMapper { reader, mapper }) + } + + fn dm_format_version(self_: PyRef) -> Option { + self_.mapper.dm_format_version().clone() + } + + fn media_type(self_: PyRef) -> Option { + self_.mapper.media_type().clone() + } + + fn infos(self_: PyRef) -> PyResult { + convert_to_py_object(self_.mapper.infos(), self_.py()) + } + + fn categories(self_: PyRef) -> PyResult { + convert_to_py_object(self_.mapper.categories(), self_.py()) + } + + fn get_item_dict(&mut self, py: Python<'_>, img_id: String) -> PyResult { + let item_dict = self.mapper.get_item_dict(&img_id, &mut self.reader)?; + Ok(convert_to_py_object(&item_dict, py)?) + } + + fn get_img_ids(&self) -> Vec { + self.mapper.get_img_ids().to_owned() + } + + fn __len__(&self) -> PyResult { + Ok(self.mapper.get_img_ids().len()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_helpers::prepare_reader; + + #[test] + fn test_instance() { + const EXAMPLE: &str = r#"{"dm_format_version": "1.0", "media_type": 2, "infos": {"string": "test", "int": 0, "float": 0.0, "string_list": ["test0", "test1", "test2"], "int_list": [0, 1, 2], "float_list": [0.0, 0.1, 0.2]}, "categories": {"label": {"labels": [{"name": "cat0", "parent": "", "attributes": ["x", "y"]}, {"name": "cat1", "parent": "", "attributes": ["x", "y"]}, {"name": "cat2", "parent": "", "attributes": ["x", "y"]}, {"name": "cat3", "parent": "", "attributes": ["x", "y"]}, {"name": "cat4", "parent": "", "attributes": ["x", "y"]}], "label_groups": [], "attributes": ["a", "b", "score"]}, "mask": {"colormap": [{"label_id": 0, "r": 0, "g": 0, "b": 0}, {"label_id": 1, "r": 128, "g": 0, "b": 0}, {"label_id": 2, "r": 0, "g": 128, "b": 0}, {"label_id": 3, "r": 128, "g": 128, "b": 0}, {"label_id": 4, "r": 0, "g": 0, "b": 128}]}, "points": {"items": [{"label_id": 0, "labels": ["cat1", "cat2"], "joints": [[0, 1]]}, {"label_id": 1, "labels": ["cat1", "cat2"], "joints": [[0, 1]]}, {"label_id": 2, "labels": ["cat1", "cat2"], "joints": [[0, 1]]}, {"label_id": 3, "labels": ["cat1", "cat2"], "joints": [[0, 1]]}, {"label_id": 4, "labels": ["cat1", "cat2"], "joints": [[0, 1]]}]}}, "items": [{"id": "42", "annotations": [{"id": 900100087038, "type": "mask", "attributes": {}, "group": 900100087038, "label_id": null, "rle": {"counts": "06", "size": [2, 3]}, "z_order": 0}, {"id": 900100087038, "type": "mask", "attributes": {}, "group": 900100087038, "label_id": null, "rle": {"counts": "06", "size": [2, 3]}, "z_order": 0}], "image": {"path": "42.jpg", "size": [10, 6]}}, {"id": "43", "annotations": [], "image": {"path": "43.qq", "size": [2, 4]}}]} + "#; + + let (tempfile, mut reader) = prepare_reader(EXAMPLE); + let datum_page_mapper = DatumPageMapperImpl::new(&mut reader).unwrap(); + + println!("{:?}", datum_page_mapper); + + let expected_ids = vec!["42".to_string(), "43".to_string()]; + assert_eq!(datum_page_mapper.get_img_ids(), &expected_ids); + + for img_id in expected_ids { + let item = datum_page_mapper + .get_item_dict(&img_id.to_string(), &mut reader) + .unwrap(); + + assert_eq!(item["id"].as_str(), Some(img_id.as_str())); + println!("{:?}", item); + } + } +} diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 1cae4d58be..b7fb71ae0b 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -3,113 +3,22 @@ // SPDX-License-Identifier: MIT mod coco_page_mapper; +mod datum_page_mapper; +mod page_mapper; mod page_maps; +mod test_helpers; mod utils; +use pyo3::prelude::*; -use std::{fs::File, io::BufReader, path::Path}; - -use crate::coco_page_mapper::CocoPageMapper as CocoPageMapperImpl; -use pyo3::{ - exceptions::PyValueError, - prelude::*, - types::{PyBool, PyDict, PyFloat, PyList, PyUnicode}, -}; -use serde_json; - -#[pyclass] -struct CocoPageMapper { - reader: BufReader, - mapper: CocoPageMapperImpl, -} - -fn convert_to_py_object(value: &serde_json::Value, py: Python<'_>) -> PyResult { - if value.is_array() { - let list = PyList::empty(py); - - for child in value.as_array().unwrap() { - list.append(convert_to_py_object(child, py)?)?; - } - - return Ok(list.into()); - } else if value.is_object() { - let dict = PyDict::new(py); - - for (key, child) in value.as_object().unwrap().iter() { - let child = convert_to_py_object(child, py)?; - dict.set_item(key, child)?; - } - - return Ok(dict.into()); - } else if value.is_boolean() { - return Ok(PyBool::new(py, value.as_bool().unwrap()).into()); - } else if value.is_f64() { - return Ok(PyFloat::new(py, value.as_f64().unwrap()).into()); - } else if value.is_i64() { - return Ok(value.as_i64().unwrap().to_object(py)); - } else if value.is_u64() { - return Ok(value.as_u64().unwrap().to_object(py)); - } else if value.is_string() { - return Ok(PyUnicode::new(py, value.as_str().unwrap()).into()); - } else if value.is_null() { - return Ok(PyUnicode::new(py, "null").into()); - } else { - return Err(PyValueError::new_err("Unknown value type")); - } -} - -#[pymethods] -impl CocoPageMapper { - #[new] - fn py_new(path: String) -> PyResult { - let file = File::open(Path::new(&path))?; - let mut reader = BufReader::new(file); - let mapper = CocoPageMapperImpl::new(&mut reader)?; - - Ok(CocoPageMapper { reader, mapper }) - } - - fn licenses(self_: PyRef) -> PyResult { - convert_to_py_object(self_.mapper.licenses(), self_.py()) - } - - fn info(self_: PyRef) -> PyResult { - convert_to_py_object(self_.mapper.info(), self_.py()) - } - - fn categories(self_: PyRef) -> PyResult { - convert_to_py_object(self_.mapper.categories(), self_.py()) - } - - fn get_item_dict(&mut self, py: Python<'_>, img_id: i64) -> PyResult { - let item_dict = self.mapper.get_item_dict(img_id, &mut self.reader)?; - Ok(convert_to_py_object(&item_dict, py)?) - } - - fn get_anns_dict(&mut self, py: Python<'_>, img_id: i64) -> PyResult { - let anns_list = PyList::new( - py, - self.mapper - .get_anns_dict(img_id, &mut self.reader)? - .iter() - .map(|child| convert_to_py_object(child, py).unwrap()), - ); - Ok(anns_list.into()) - } - - fn get_img_ids(&self) -> Vec { - self.mapper.get_img_ids().to_owned() - } - - fn __len__(&self) -> PyResult { - Ok(self.mapper.get_img_ids().len()) - } -} +use crate::coco_page_mapper::CocoPageMapper; +use crate::datum_page_mapper::DatumPageMapper; /// Datumaro Rust API #[pymodule] #[pyo3(name = "rust_api")] fn rust_api(_py: Python<'_>, m: &PyModule) -> PyResult<()> { m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/rust/src/page_mapper.rs b/rust/src/page_mapper.rs new file mode 100644 index 0000000000..9946df1eed --- /dev/null +++ b/rust/src/page_mapper.rs @@ -0,0 +1,62 @@ +use crate::utils::{invalid_data, read_skipping_ws}; +use std::io::{Error, Read, Seek}; + +pub trait ParsedJsonSection: Sized { + fn parse(buf_key: String, reader: impl Read + Seek) -> Result, Error>; +} + +pub trait JsonPageMapper: Sized +where + T: ParsedJsonSection, +{ + fn parse_json(mut reader: impl Read + Seek) -> Result>, Error> { + let mut brace_level = 0; + let mut json_sections = Vec::new(); + + while let Ok(c) = read_skipping_ws(&mut reader) { + match c { + b'{' => brace_level += 1, + b'"' => { + let mut buf_key = Vec::new(); + while let Ok(c) = read_skipping_ws(&mut reader) { + if c == b'"' { + break; + } + buf_key.push(c); + } + match String::from_utf8(buf_key.clone()) { + Ok(key) => { + let section = T::parse(key, &mut reader)?; + json_sections.push(section); + } + Err(e) => { + let cur_pos = reader.stream_position()?; + let msg = format!( + "Section key buffer, {:?} is invalid at pos: {}. {}", + buf_key, cur_pos, e + ); + let err = invalid_data(msg.as_str()); + return Err(err); + } + } + } + b',' => { + continue; + } + b'}' => { + brace_level -= 1; + if brace_level == 0 { + break; + } + } + _ => { + let cur_pos = reader.stream_position()?; + let msg = format!("{} is invalid character at pos: {}", c, cur_pos); + let err = invalid_data(msg.as_str()); + return Err(err); + } + } + } + Ok(json_sections) + } +} diff --git a/rust/src/page_maps.rs b/rust/src/page_maps.rs index 4d2c10fc80..d7b779898e 100644 --- a/rust/src/page_maps.rs +++ b/rust/src/page_maps.rs @@ -34,18 +34,53 @@ pub struct ImgPage { pub size: u32, } +pub trait ItemPageMapKeyTrait: + std::cmp::Eq + std::hash::Hash + std::clone::Clone + std::fmt::Display +{ + fn get_id(parsed_map: HashMap, offset: u64) -> Result + where + Self: Sized; +} + +impl ItemPageMapKeyTrait for i64 { + fn get_id(parsed_map: HashMap, offset: u64) -> Result { + parsed_map + .get("id") + .ok_or(stream_error("Cannot find an image id", offset))? + .as_i64() + .ok_or(stream_error("The image id is not an integer.", offset)) + } +} + +impl ItemPageMapKeyTrait for String { + fn get_id(parsed_map: HashMap, offset: u64) -> Result { + Ok(parsed_map + .get("id") + .ok_or(stream_error("Cannot find an image id", offset))? + .as_str() + .ok_or(stream_error("The image id is not an integer.", offset))? + .to_string()) + } +} + #[derive(Debug)] -pub struct ImgPageMap { - ids: Vec, - pages: HashMap, +pub struct ImgPageMap +where + T: ItemPageMapKeyTrait, +{ + ids: Vec, + pages: HashMap, } -impl ImgPageMap { - pub fn get_dict(&self, reader: &mut R, img_id: i64) -> Result +impl ImgPageMap +where + T: ItemPageMapKeyTrait, +{ + pub fn get_dict(&self, reader: &mut R, img_id: &T) -> Result where R: io::Read + io::Seek, { - match self.pages.get(&img_id) { + match self.pages.get(img_id) { Some(page) => parse_serde_json_value_from_page(reader, page.offset, page.size as u64), None => Err(invalid_data( format!("Image id: {} is not on the page map", img_id).as_str(), @@ -53,12 +88,12 @@ impl ImgPageMap { } } - pub fn push(&mut self, img_id: i64, page: ImgPage) { - self.ids.push(img_id); - self.pages.insert(img_id, page); + pub fn push(&mut self, img_id: T, page: ImgPage) { + self.ids.push(img_id.clone()); + self.pages.insert(img_id.clone(), page); } - pub fn from_reader(mut reader: impl io::Read + io::Seek) -> Result { + pub fn from_reader(mut reader: impl io::Read + io::Seek) -> Result, io::Error> { let mut page_map = ImgPageMap::default(); let (empty, rewind_pos) = is_empty_list(&mut reader)?; @@ -79,11 +114,7 @@ impl ImgPageMap { match stream.next().unwrap() { Ok(parsed_map) => { - let id = parsed_map - .get("id") - .ok_or(stream_error("Cannot find an image id", offset))? - .as_i64() - .ok_or(stream_error("The image id is not an integer.", offset))?; + let id = ItemPageMapKeyTrait::get_id(parsed_map, offset)?; let size = (curr_pos + stream.byte_offset() as u64 - offset) as u32; page_map.push(id, ImgPage { offset, size }); @@ -100,22 +131,28 @@ impl ImgPageMap { Ok(page_map) } - pub fn ids(&self) -> &Vec { + pub fn ids(&self) -> &Vec { return &self.ids; } } -impl IntoIterator for ImgPageMap { - type Item = (i64, ImgPage); +impl IntoIterator for ImgPageMap +where + T: ItemPageMapKeyTrait, +{ + type Item = (T, ImgPage); - type IntoIter = as IntoIterator>::IntoIter; + type IntoIter = as IntoIterator>::IntoIter; fn into_iter(self) -> Self::IntoIter { self.pages.into_iter() } } -impl Default for ImgPageMap { +impl Default for ImgPageMap +where + T: ItemPageMapKeyTrait, +{ fn default() -> Self { Self { ids: Vec::with_capacity(0), diff --git a/rust/src/test_helpers.rs b/rust/src/test_helpers.rs new file mode 100644 index 0000000000..7b761f9bf7 --- /dev/null +++ b/rust/src/test_helpers.rs @@ -0,0 +1,18 @@ +// Copyright (C) 2023 Intel Corporation +// +// SPDX-License-Identifier: MIT + +use std::{ + fs::File, + io::{BufReader, Write}, +}; +use tempfile::NamedTempFile; + +pub fn prepare_reader(example: &str) -> (NamedTempFile, BufReader) { + let mut tempfile = NamedTempFile::new().expect("cannot open file"); + let _ = tempfile.write_all(example.as_bytes()); + let f = File::open(tempfile.path()).expect("cannot open file"); + let mut reader = BufReader::new(f); + + (tempfile, reader) +} diff --git a/rust/src/utils.rs b/rust/src/utils.rs index 1c061a97c3..cc1c9f9e13 100644 --- a/rust/src/utils.rs +++ b/rust/src/utils.rs @@ -3,6 +3,11 @@ // SPDX-License-Identifier: MIT use std::io::{self}; +use pyo3::{ + exceptions::PyValueError, + prelude::*, + types::{PyBool, PyDict, PyFloat, PyList, PyUnicode}, +}; pub fn read_skipping_ws(mut reader: impl io::Read) -> io::Result { loop { @@ -61,3 +66,38 @@ pub fn parse_serde_json_value( } } } + +pub fn convert_to_py_object(value: &serde_json::Value, py: Python<'_>) -> PyResult { + if value.is_array() { + let list = PyList::empty(py); + + for child in value.as_array().unwrap() { + list.append(convert_to_py_object(child, py)?)?; + } + + return Ok(list.into()); + } else if value.is_object() { + let dict = PyDict::new(py); + + for (key, child) in value.as_object().unwrap().iter() { + let child = convert_to_py_object(child, py)?; + dict.set_item(key, child)?; + } + + return Ok(dict.into()); + } else if value.is_boolean() { + return Ok(PyBool::new(py, value.as_bool().unwrap()).into()); + } else if value.is_f64() { + return Ok(PyFloat::new(py, value.as_f64().unwrap()).into()); + } else if value.is_i64() { + return Ok(value.as_i64().unwrap().to_object(py)); + } else if value.is_u64() { + return Ok(value.as_u64().unwrap().to_object(py)); + } else if value.is_string() { + return Ok(PyUnicode::new(py, value.as_str().unwrap()).into()); + } else if value.is_null() { + return Ok(py.None()); + } else { + return Err(PyValueError::new_err("Unknown value type")); + } +} diff --git a/src/datumaro/plugins/data_formats/datumaro/base.py b/src/datumaro/plugins/data_formats/datumaro/base.py index 6f923fd6bd..6caa060043 100644 --- a/src/datumaro/plugins/data_formats/datumaro/base.py +++ b/src/datumaro/plugins/data_formats/datumaro/base.py @@ -6,9 +6,6 @@ import re from typing import Dict, List, Optional, Type -import json_stream -from json_stream.base import StreamingJSONObject - from datumaro.components.annotation import ( NO_OBJECT_ID, AnnotationType, @@ -30,7 +27,8 @@ from datumaro.components.errors import DatasetImportError, MediaTypeError from datumaro.components.importer import ImportContext from datumaro.components.media import Image, MediaElement, MediaType, PointCloud, Video, VideoFrame -from datumaro.util import parse_json_file, to_dict_from_streaming_json +from datumaro.plugins.data_formats.datumaro.page_mapper import DatumPageMapper +from datumaro.util import parse_json_file from datumaro.version import __version__ from .format import DATUMARO_FORMAT_VERSION, DatumaroPath @@ -372,67 +370,35 @@ def __init__( self._length = None def __len__(self): - if self._length is None: - self._length = sum(1 for _ in self) - return self._length + return len(self._reader) def __iter__(self): pbar = self._ctx.progress_reporter - with open(self._reader, "rb") as fp: - data = json_stream.load(fp) - items = data.get("items", None) - if items is None: - raise DatasetImportError('Annotation JSON file should have "items" entity.') - - length = 0 - for item in pbar.iter(items): - item_desc = to_dict_from_streaming_json(item) - length += 1 - yield self._parse_item(item_desc) - - if self._length != length: - self._length = length + for item_desc in pbar.iter( + self._reader, + desc=f"Importing '{self._subset}'", + ): + yield self._parse_item(item_desc) - def _init_reader(self, path: str): - return path + def _init_reader(self, path: str) -> DatumPageMapper: + return DatumPageMapper(path) @staticmethod - def _load_media_type(path) -> Type[MediaElement]: - # We can assume that the media_type information will be within the first 1 KB of the file. - # This is because we are the producer of Datumaro format. - search_size = 1024 # 1 KB + def _load_media_type(page_mapper: DatumPageMapper) -> Type[MediaElement]: + media_type = page_mapper.media_type - pattern = '"media_type"\s*:\s*(\d+)' + if media_type is None: + return MediaType.IMAGE.media - with open(path, "r", encoding="utf-8") as fp: - out = fp.read(search_size) - found = re.search(pattern, out) - - if found: - int_type = int(found.group(1)) - return MediaType(int_type).IMAGE.media - - return MediaType.IMAGE.media + return media_type.media @staticmethod - def _load_infos(path) -> Dict: - with open(path, "r", encoding="utf-8") as fp: - data = json_stream.load(fp) - infos = data.get("infos", {}) - if isinstance(infos, StreamingJSONObject): - infos = to_dict_from_streaming_json(infos) - - return infos + def _load_infos(page_mapper: DatumPageMapper) -> Dict: + return page_mapper.infos @staticmethod - def _load_categories(path) -> Dict: - with open(path, "r", encoding="utf-8") as fp: - data = json_stream.load(fp) - categories = data.get("categories", {}) - if isinstance(categories, StreamingJSONObject): - categories = to_dict_from_streaming_json(categories) - - return JsonReader._load_categories({"categories": categories}) + def _load_categories(page_mapper: DatumPageMapper) -> Dict: + return JsonReader._load_categories({"categories": page_mapper.categories}) def _load_items(self, parsed) -> List: return [] diff --git a/src/datumaro/plugins/data_formats/datumaro/page_mapper.py b/src/datumaro/plugins/data_formats/datumaro/page_mapper.py new file mode 100644 index 0000000000..44b3a9731d --- /dev/null +++ b/src/datumaro/plugins/data_formats/datumaro/page_mapper.py @@ -0,0 +1,71 @@ +# Copyright (C) 2023 Intel Corporation +# +# SPDX-License-Identifier: MIT + +import logging as log +from typing import Any, Dict, Iterator, Optional + +from datumaro.components.media import MediaType +from datumaro.rust_api import DatumPageMapper as DatumPageMapperImpl + +__all__ = ["DatumPageMapper"] + + +class DatumPageMapper: + """Construct page maps for items and annotations from the JSON file, + which are used for the stream importer. + + It also provides __iter__() to produce item and annotation dictionaries + in stream manner after constructing the page map. + """ + + def __init__(self, path: str) -> None: + self._path = path + self._impl = DatumPageMapperImpl(path) + + def __iter__(self) -> Iterator[Dict]: + for item_key in self.iter_item_ids(): + yield self._impl.get_item_dict(item_key) + + def get_item_dict(self, item_key: str) -> Optional[Dict]: + try: + return self._impl.get_item_dict(item_key) + except Exception as e: + log.error(e) + return None + + def __len__(self) -> int: + return len(self._impl) + + def iter_item_ids(self) -> Iterator[str]: + for item_id in self._impl.get_img_ids(): + yield item_id + + def __del__(self): + pass + + @property + def dm_format_version(self) -> Optional[str]: + """Parse "dm_format_version" section from the given JSON file using the stream json parser""" + return self._impl.dm_format_version() + + @property + def media_type(self) -> Optional[MediaType]: + """Parse "media_type" section from the given JSON file using the stream json parser""" + media_type = self._impl.media_type() + if media_type is not None: + return MediaType(media_type) + return None + + @property + def infos(self) -> Dict[str, Any]: + """Parse "infos" section from the given JSON file using the stream json parser""" + return self._impl.infos() + + @property + def categories(self) -> Dict[str, Any]: + """Parse "categories" section from the given JSON file using the stream json parser""" + return self._impl.categories() + + def __reduce__(self): + return (self.__class__, (self._path,)) diff --git a/tests/conftest.py b/tests/conftest.py index 9217fd88d0..6f1b90a6e1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,8 +2,12 @@ # # SPDX-License-Identifier: MIT +from time import sleep + import pytest +from datumaro.util.os_util import rmtree + from tests.utils.test_utils import TestCaseHelper, TestDir diff --git a/tests/utils/test_utils.py b/tests/utils/test_utils.py index 2144d9e59e..9be67da70a 100644 --- a/tests/utils/test_utils.py +++ b/tests/utils/test_utils.py @@ -10,10 +10,9 @@ import tempfile import unittest import unittest.mock -import warnings from enum import Enum, auto from glob import glob -from tempfile import TemporaryDirectory +from time import sleep from typing import Any, Collection, List, Optional, Union import pytest @@ -45,15 +44,20 @@ def __enter__(self): return self.path def __exit__(self, exc_type=None, exc_value=None, traceback=None): - if self.is_dir: - try: - rmtree(self.path) - except unittest.SkipTest: - # Suppress skip test errors from git.util.rmtree - if not exc_type: - raise - else: - rmfile(self.path) + for _ in range(10): + if self.is_dir: + try: + rmtree(self.path) + except unittest.SkipTest: + # Suppress skip test errors from git.util.rmtree + if not exc_type: + raise + else: + rmfile(self.path) + + if not os.path.exists(self.path): + return + sleep(0.5) class TestDir(FileRemover): @@ -334,7 +338,7 @@ def _change_path_in_items(dataset, source_path, target_path): new_images.append(image) item.media._extra_images = new_images - with TemporaryDirectory(prefix=test_dir) as tmp_dir: + with TestDir() as tmp_dir: converter(source_dataset, test_dir, stream=stream) if move_save_dir: save_dir = tmp_dir @@ -364,7 +368,11 @@ def _change_path_in_items(dataset, source_path, target_path): del cmp_kwargs["dimension"] elif not compare: compare = compare_datasets - compare(test, expected=target_dataset, actual=parsed_dataset, **cmp_kwargs) + + try: + compare(test, expected=target_dataset, actual=parsed_dataset, **cmp_kwargs) + finally: + del parsed_dataset def compare_dirs(test, expected: str, actual: str):