19 #ifndef avro_DataFile_hh__
20 #define avro_DataFile_hh__
24 #include "Specific.hh"
26 #include "ValidSchema.hh"
27 #include "buffer/Buffer.hh"
34 #include "boost/utility.hpp"
35 #include <boost/iostreams/filtering_stream.hpp>
44 #ifdef SNAPPY_CODEC_AVAILABLE
50 const int SyncSize = 16;
62 const std::string filename_;
65 const size_t syncInterval_;
68 std::unique_ptr<OutputStream> stream_;
69 std::unique_ptr<OutputStream> buffer_;
73 typedef std::map<std::string, std::vector<uint8_t>> Metadata;
78 static std::unique_ptr<OutputStream> makeStream(
const char *filename);
82 void setMetadata(
const std::string &key,
const std::string &value);
92 void init(
const ValidSchema &schema,
size_t syncInterval,
const Codec &codec);
121 size_t syncInterval,
Codec codec = NULL_CODEC);
148 std::unique_ptr<DataFileWriterBase> base_;
158 size_t syncInterval = 16 * 1024,
Codec codec = NULL_CODEC) : base_(new
DataFileWriterBase(std::move(outputStream),
schema, syncInterval, codec)) {}
164 base_->syncIfNeeded();
195 const std::string filename_;
196 const std::unique_ptr<InputStream> stream_;
198 int64_t objectCount_;
201 int64_t blockStart_{};
207 std::unique_ptr<InputStream> dataStream_;
208 typedef std::map<std::string, std::vector<uint8_t>> Metadata;
214 std::unique_ptr<boost::iostreams::filtering_istream> os_;
215 std::vector<char> compressed_;
216 std::string uncompressed;
219 void readDataBlock();
220 void doSeek(int64_t position);
236 void decr() { --objectCount_; }
307 std::unique_ptr<DataFileReaderBase> base_;
343 explicit DataFileReader(std::unique_ptr<DataFileReaderBase> base) : base_(std::move(base)) {
367 if (base_->hasMore()) {
388 void close() {
return base_->close(); }
394 void seek(int64_t position) { base_->seek(position); }
401 void sync(int64_t position) { base_->sync(position); }
406 bool pastSync(int64_t position) {
return base_->pastSync(position); }
Low level support for encoding avro values.
The type independent portion of reader.
Definition: DataFile.hh:194
int64_t previousSync() const
Return the last synchronization point before our current position.
const ValidSchema & readerSchema()
Returns the schema for this object.
Definition: DataFile.hh:266
const ValidSchema & dataSchema()
Returns the schema stored with the data file.
Definition: DataFile.hh:271
void init(const ValidSchema &readerSchema)
Initializes the reader to read objects according to the given schema.
void init()
Initializes the reader so that the reader and writer schemas are the same.
void seek(int64_t position)
Move to a specific, known synchronization point, for example one returned from tell() after sync().
bool hasMore()
Returns true if and only if there is more to read.
bool pastSync(int64_t position)
Return true if past the next synchronization point after a position.
void close()
Closes the reader.
Decoder & decoder()
Returns the current decoder for this reader.
Definition: DataFile.hh:226
void sync(int64_t position)
Move to the next synchronization point after a position.
void decr()
Decrements the number of objects yet to read.
Definition: DataFile.hh:236
DataFileReaderBase(const char *filename)
Constructs the reader for the given file and the reader is expected to use the schema that is used wi...
Reads the contents of data file one after another.
Definition: DataFile.hh:306
DataFileReader(std::unique_ptr< DataFileReaderBase > base)
Constructs a reader using the reader base.
Definition: DataFile.hh:343
bool pastSync(int64_t position)
Return true if past the next synchronization point after a position.
Definition: DataFile.hh:406
int64_t previousSync()
Return the last synchronization point before our current position.
Definition: DataFile.hh:411
const ValidSchema & dataSchema()
Returns the schema stored with the data file.
Definition: DataFile.hh:383
DataFileReader(std::unique_ptr< DataFileReaderBase > base, const ValidSchema &readerSchema)
Constructs a reader using the reader base.
Definition: DataFile.hh:356
void sync(int64_t position)
Move to the next synchronization point after a position.
Definition: DataFile.hh:401
bool read(T &datum)
Reads the next entry from the data file.
Definition: DataFile.hh:366
void seek(int64_t position)
Move to a specific, known synchronization point, for example one returned from previousSync().
Definition: DataFile.hh:394
DataFileReader(const char *filename)
Constructs the reader for the given file and the reader is expected to use the schema that is used wi...
Definition: DataFile.hh:326
DataFileReader(const char *filename, const ValidSchema &readerSchema)
Constructs the reader for the given file and the reader is expected to use the given schema.
Definition: DataFile.hh:314
const ValidSchema & readerSchema()
Returns the schema for this object.
Definition: DataFile.hh:378
void close()
Closes the reader.
Definition: DataFile.hh:388
Type-independent portion of DataFileWriter.
Definition: DataFile.hh:61
void flush()
Flushes any unwritten data into the file.
const ValidSchema & schema() const
Returns the schema for this data file.
Definition: DataFile.hh:135
DataFileWriterBase(const char *filename, const ValidSchema &schema, size_t syncInterval, Codec codec=NULL_CODEC)
Constructs a data file writer with the given sync interval and name.
void incr()
Increments the object count.
Definition: DataFile.hh:114
void syncIfNeeded()
Returns true if the buffer has sufficient data for a sync to be inserted.
void close()
Closes the current file.
uint64_t getCurrentBlockStart() const
Returns the byte offset (within the current file) of the start of the current block being written.
Encoder & encoder() const
Returns the current encoder for this writer.
Definition: DataFile.hh:98
An Avro datafile that can store objects of type T.
Definition: DataFile.hh:147
void close()
Closes the current file.
Definition: DataFile.hh:178
void flush()
Flushes any unwritten data into the file.
Definition: DataFile.hh:188
uint64_t getCurrentBlockStart()
Returns the byte offset (within the current file) of the start of the current block being written.
Definition: DataFile.hh:172
const ValidSchema & schema() const
Returns the schema for this data file.
Definition: DataFile.hh:183
DataFileWriter(const char *filename, const ValidSchema &schema, size_t syncInterval=16 *1024, Codec codec=NULL_CODEC)
Constructs a new data file.
Definition: DataFile.hh:154
void write(const T &datum)
Writes the given piece of data into the file.
Definition: DataFile.hh:163
Decoder is an interface implemented by every decoder capable of decoding Avro data.
Definition: Decoder.hh:48
The abstract base class for all Avro encoders.
Definition: Encoder.hh:52
A ValidSchema is basically a non-mutable Schema that has passed some minimum of sanity checks.
Definition: ValidSchema.hh:40
A bunch of templates and specializations for encoding and decoding specific types.
Definition: AvroParse.hh:30
void encode(Encoder &e, const T &t)
Generic encoder function that makes use of the codec_traits.
Definition: Specific.hh:343
std::shared_ptr< Encoder > EncoderPtr
Shared pointer to Encoder.
Definition: Encoder.hh:147
std::shared_ptr< Decoder > DecoderPtr
Shared pointer to Decoder.
Definition: Decoder.hh:177
Codec
Specify type of compression to use when writing data files.
Definition: DataFile.hh:40
std::array< uint8_t, SyncSize > DataFileSync
The sync value.
Definition: DataFile.hh:54
void decode(Decoder &d, T &t)
Generic decoder function that makes use of the codec_traits.
Definition: Specific.hh:351