Files changed (61) hide show
  1. checksums.yaml +7 -0
  2. data/.circleci/config.yml +36 -0
  3. data/.github/workflows/ruby.yml +20 -0
  4. data/.gitignore +14 -0
  5. data/.rspec +2 -0
  6. data/CHANGELOG.md +34 -0
  7. data/Gemfile +4 -0
  8. data/LICENSE.txt +22 -0
  9. data/README.md +212 -0
  10. data/Rakefile +2 -0
  11. data/lib/avro_turf.rb +116 -0
  12. data/lib/avro_turf/cached_confluent_schema_registry.rb +39 -0
  13. data/lib/avro_turf/cached_schema_registry.rb +6 -0
  14. data/lib/avro_turf/confluent_schema_registry.rb +125 -0
  15. data/lib/avro_turf/core_ext.rb +10 -0
  16. data/lib/avro_turf/core_ext/date.rb +5 -0
  17. data/lib/avro_turf/core_ext/enumerable.rb +5 -0
  18. data/lib/avro_turf/core_ext/false_class.rb +5 -0
  19. data/lib/avro_turf/core_ext/hash.rb +7 -0
  20. data/lib/avro_turf/core_ext/nil_class.rb +5 -0
  21. data/lib/avro_turf/core_ext/numeric.rb +5 -0
  22. data/lib/avro_turf/core_ext/string.rb +5 -0
  23. data/lib/avro_turf/core_ext/symbol.rb +5 -0
  24. data/lib/avro_turf/core_ext/time.rb +5 -0
  25. data/lib/avro_turf/core_ext/true_class.rb +5 -0
  26. data/lib/avro_turf/disk_cache.rb +83 -0
  27. data/lib/avro_turf/in_memory_cache.rb +38 -0
  28. data/lib/avro_turf/messaging.rb +199 -0
  29. data/lib/avro_turf/mutable_schema_store.rb +18 -0
  30. data/lib/avro_turf/schema_registry.rb +6 -0
  31. data/lib/avro_turf/schema_store.rb +75 -0
  32. data/lib/avro_turf/schema_to_avro_patch.rb +52 -0
  33. data/lib/avro_turf/test/fake_confluent_schema_registry_server.rb +141 -0
  34. data/lib/avro_turf/test/fake_schema_registry_server.rb +6 -0
  35. data/lib/avro_turf/version.rb +3 -0
  36. data/perf/address.avsc +14 -0
  37. data/perf/encoding_size.rb +26 -0
  38. data/perf/encoding_speed.rb +30 -0
  39. data/perf/person.avsc +14 -0
  40. data/ros-avro_turf.gemspec +40 -0
  41. data/spec/avro_turf_spec.rb +161 -0
  42. data/spec/cached_confluent_schema_registry_spec.rb +63 -0
  43. data/spec/confluent_schema_registry_spec.rb +21 -0
  44. data/spec/core_ext/date_spec.rb +6 -0
  45. data/spec/core_ext/enumerable_spec.rb +12 -0
  46. data/spec/core_ext/false_class_spec.rb +5 -0
  47. data/spec/core_ext/hash_spec.rb +8 -0
  48. data/spec/core_ext/nil_class_spec.rb +5 -0
  49. data/spec/core_ext/numeric_spec.rb +6 -0
  50. data/spec/core_ext/string_spec.rb +5 -0
  51. data/spec/core_ext/symbol_spec.rb +5 -0
  52. data/spec/core_ext/time_spec.rb +6 -0
  53. data/spec/core_ext/true_class_spec.rb +5 -0
  54. data/spec/disk_cached_confluent_schema_registry_spec.rb +159 -0
  55. data/spec/messaging_spec.rb +300 -0
  56. data/spec/schema_store_spec.rb +289 -0
  57. data/spec/schema_to_avro_patch_spec.rb +66 -0
  58. data/spec/spec_helper.rb +28 -0
  59. data/spec/support/confluent_schema_registry_context.rb +254 -0
  60. data/spec/test/fake_confluent_schema_registry_server_spec.rb +40 -0
  61. metadata +271 -0
checksums.yaml ADDED
@@ -0,0 +1,7 @@
1
+ ---
2
+ SHA256:
3
+ metadata.gz: 8eba15676cf47a240cbe15f6898a71e6cef1a6dc62f983aac0610cf712fcd560
4
+ data.tar.gz: 123110d82ec19c2fd087bb1c429d09ab2b44c8da22e5f4242f02c14385805c1b
5
+ SHA512:
6
+ metadata.gz: dbec021ae1ff1e141cc7eb1f2a1035f905aee67986a67399f27939527089f75558edfad078757afdf24cf6dbcc43be59aa354be1e2fa6b7c4e638b0753b810ea
7
+ data.tar.gz: a32eafbd297ed04f037a7b808950ae14f49441bc9e2ca19521650984b9430667c57a906a93d1bcc614b01553ab0a542b90a30cf2f6457b0c11ab306f5e64ee06
data/.circleci/config.yml ADDED
@@ -0,0 +1,36 @@
1
+ version: 2
2
+ jobs:
3
+ build:
4
+ environment:
5
+ CIRCLE_ARTIFACTS: /tmp/circleci-artifacts
6
+ CIRCLE_TEST_REPORTS: /tmp/circleci-test-results
7
+ docker:
8
+ - image: circleci/ruby:2.6.2
9
+ steps:
10
+ - checkout
11
+ - run: mkdir -p $CIRCLE_ARTIFACTS $CIRCLE_TEST_REPORTS
12
+ - restore_cache:
13
+ keys:
14
+ # This branch if available
15
+ - v1-dep-{{ .Branch }}-
16
+ # Default branch if not
17
+ - v1-dep-master-
18
+ # Any branch if there are none on the default branch - this should be unnecessary if you have your default branch configured correctly
19
+ - v1-dep-
20
+ - run: gem install bundler --no-document
21
+ - run: 'bundle check --path=vendor/bundle || bundle install --path=vendor/bundle --jobs=4 --retry=3'
22
+ # Save dependency cache
23
+ - save_cache:
24
+ key: v1-dep-{{ .Branch }}-{{ epoch }}
25
+ paths:
26
+ - vendor/bundle
27
+ - ~/.bundle
28
+ - run: mkdir -p $CIRCLE_TEST_REPORTS/rspec
29
+ - run:
30
+ command: bundle exec rspec --color --require spec_helper --format progress
31
+ - store_test_results:
32
+ path: /tmp/circleci-test-results
33
+ - store_artifacts:
34
+ path: /tmp/circleci-artifacts
35
+ - store_artifacts:
36
+ path: /tmp/circleci-test-results
data/.github/workflows/ruby.yml ADDED
@@ -0,0 +1,20 @@
1
+ name: Ruby
2
+
3
+ on: [push]
4
+
5
+ jobs:
6
+ build:
7
+
8
+ runs-on: ubuntu-latest
9
+
10
+ steps:
11
+ - uses: actions/checkout@v1
12
+ - name: Set up Ruby 2.6
13
+ uses: actions/setup-ruby@v1
14
+ with:
15
+ ruby-version: 2.6.x
16
+ - name: Build and test with RSpec
17
+ run: |
18
+ gem install bundler
19
+ bundle install --jobs 4 --retry 3
20
+ bundle exec rspec
data/.gitignore ADDED
@@ -0,0 +1,14 @@
1
+ /.bundle/
2
+ /.yardoc
3
+ /Gemfile.lock
4
+ /_yardoc/
5
+ /coverage/
6
+ /doc/
7
+ /pkg/
8
+ /spec/reports/
9
+ /tmp/
10
+ *.bundle
11
+ *.so
12
+ *.o
13
+ *.a
14
+ mkmf.log
data/.rspec ADDED
@@ -0,0 +1,2 @@
1
+ --color
2
+ --require spec_helper
data/CHANGELOG.md ADDED
@@ -0,0 +1,34 @@
1
+ # avro_turf
2
+
3
+ ## Unreleased
4
+
5
+ ## v0.11.0
6
+
7
+ - Add proxy support (#107)
8
+ - Adding support for client certs (#109)
9
+
10
+ ## v0.10.0
11
+
12
+ - Add more disk caching (#103)
13
+ - Include schema information when decoding (#100, #101, #104)
14
+
15
+ ## v0.9.0
16
+
17
+ - Compatibility with Avro v1.9.0 (#94)
18
+ - Disable the auto registeration of schema (#95)
19
+ - abstracted caching from CachedConfluentSchemaRegistry (#74)
20
+ - Load avro-patches if installed to silence deprecation errors (#85)
21
+ - Make schema store to be thread safe (#92)
22
+
23
+ ## v0.8.1
24
+
25
+ - Allow accessing schema store from outside AvroTurf (#68).
26
+
27
+ ## v0.8.0
28
+
29
+ - The names `AvroTurf::SchemaRegistry`, `AvroTurf::CachedSchemaRegistry`, and
30
+ `FakeSchemaRegistryServer` are deprecated and will be removed in a future release.
31
+ Use `AvroTurf::ConfluentSchemaRegistry`, `AvroTurf::CachedConfluentSchemaRegistry`,
32
+ and `FakeConfluentSchemaRegistryServer` instead.
33
+ - Add support for the Config API (http://docs.confluent.io/3.1.2/schema-registry/docs/api.html#config)
34
+ to `AvroTurf::ConfluentSchemaRegistry`.
data/Gemfile ADDED
@@ -0,0 +1,4 @@
1
+ source 'https://rubygems.org'
2
+
3
+ # Specify your gem's dependencies in avro_turf.gemspec
4
+ gemspec
data/LICENSE.txt ADDED
@@ -0,0 +1,22 @@
1
+ Copyright (c) 2015 Daniel Schierbeck
2
+
3
+ MIT License
4
+
5
+ Permission is hereby granted, free of charge, to any person obtaining
6
+ a copy of this software and associated documentation files (the
7
+ "Software"), to deal in the Software without restriction, including
8
+ without limitation the rights to use, copy, modify, merge, publish,
9
+ distribute, sublicense, and/or sell copies of the Software, and to
10
+ permit persons to whom the Software is furnished to do so, subject to
11
+ the following conditions:
12
+
13
+ The above copyright notice and this permission notice shall be
14
+ included in all copies or substantial portions of the Software.
15
+
16
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17
+ EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
18
+ MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
19
+ NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
20
+ LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
21
+ OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
22
+ WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
data/README.md ADDED
@@ -0,0 +1,212 @@
1
+ # AvroTurf
2
+
3
+ AvroTurf is a library that makes it easier to encode and decode data using the [Apache Avro](http://avro.apache.org/) serialization format. It adds a layer on top of the official Avro gem which makes it easier to integrate Avro into your application:
4
+
5
+ * Provides an idiomatic Ruby interface.
6
+ * Allows referencing schemas defined in another file.
7
+
8
+ ## Deprecation Notice
9
+
10
+ The `AvroTurf::SchemaRegistry`, `AvroTurf::CachedSchemaRegistry`,
11
+ and `FakeSchemaRegistryServer` names have been deprecated because the Avro spec recently
12
+ introduced an incompatible [single-message encoding format](https://github.com/apache/avro/commit/30408a9c192c5f4eaaf42f01f0ffbfffd705aa57).
13
+
14
+ These classes have been renamed to `AvroTurf::ConfluentSchemaRegistry`,
15
+ `AvroTurf::CachedConfluentSchemaRegistry`, and `FakeConfluentSchemaRegistry`.
16
+
17
+ The aliases for the original names will be removed in a future release.
18
+
19
+ ## Installation
20
+
21
+ Add this line to your application's Gemfile:
22
+
23
+ ```ruby
24
+ gem 'avro_turf'
25
+ ```
26
+
27
+ And then execute:
28
+
29
+ $ bundle
30
+
31
+ Or install it yourself as:
32
+
33
+ $ gem install avro_turf
34
+
35
+ ## Usage
36
+
37
+ Using AvroTurf is quite simple:
38
+
39
+ ```ruby
40
+ # Schemas will be looked up from the specified directory.
41
+ avro = AvroTurf.new(schemas_path: "app/schemas/")
42
+
43
+ # Decode some data using a named schema. The schema file should exist in the
44
+ # schemas directory with the file name `<name>.avsc`.
45
+ avro.decode(encoded_data, schema_name: "person")
46
+
47
+ # Encode some data using the named schema.
48
+ avro.encode({ "name" => "Jane", "age" => 28 }, schema_name: "person")
49
+ ```
50
+
51
+ ### Inter-schema references
52
+
53
+ Unlike the official Avro library, AvroTurf allows schemas to reference each other. As an example:
54
+
55
+ ```json
56
+ // person.avsc
57
+ {
58
+ "name": "person",
59
+ "type": "record",
60
+ "fields": [
61
+ {
62
+ "name": "full_name",
63
+ "type": "string"
64
+ },
65
+ {
66
+ "name": "address",
67
+ "type": "address"
68
+ }
69
+ ]
70
+ }
71
+
72
+ // address.avsc
73
+ {
74
+ "name": "address",
75
+ "type": "record",
76
+ "fields": [
77
+ {
78
+ "name": "street",
79
+ "type": "string"
80
+ },
81
+ {
82
+ "name": "city",
83
+ "type": "string"
84
+ }
85
+ ]
86
+ }
87
+ ```
88
+
89
+ In the example above, the `person` schema references the `address` schema, even though the latter is defined in another file. This makes it possible to share types across schemas, e.g.
90
+
91
+ ```json
92
+ // person_list.avsc
93
+ {
94
+ "type": "array",
95
+ "items": "person"
96
+ }
97
+ ```
98
+
99
+ There's no reason to copy-paste the `person` schema into the `person_list` schema, as you can reference it directly.
100
+
101
+ This feature helps avoid subtle errors when the same type is represented using slightly different schemas.
102
+
103
+
104
+ ### Using a Schema Registry
105
+
106
+ By default, AvroTurf will encode data in the Avro data file format. This means that the schema used to encode the data is prepended to the output. If you want to decrease the size of the output, e.g. when storing data in a log such as Apache Kafka or in a database, you can use the `AvroTurf::Messaging` API. This top-level API requires the use of [Schema Registry](https://github.com/confluentinc/schema-registry), a service which allows registering and fetching Avro schemas.
107
+
108
+ The Messaging API will automatically register schemas used for encoding data, and will fetch the corresponding schema when decoding. Instead of including the full schema in the output, only a schema id generated by the registry is included. Registering the same schema twice is idempotent, so no coordination is needed.
109
+
110
+ **NOTE:** [The Messaging format](https://github.com/confluentinc/schema-registry/blob/master/docs/serializer-formatter.rst#wire-format) is _not_ compatible with the Avro data file API.
111
+
112
+ The Messaging API is not included by default, so you must require 'avro_turf/messaging' explicitly if you want to use it.
113
+
114
+ Using the Messaging API is simple once you have set up a Schema Registry service:
115
+
116
+ ```ruby
117
+ require 'avro_turf/messaging'
118
+
119
+ # You need to pass the URL of your Schema Registry.
120
+ avro = AvroTurf::Messaging.new(registry_url: "http://my-registry:8081/")
121
+
122
+ # The API for encoding and decoding data is similar to the default one. Encoding
123
+ # data has the side effect of registering the schema. This only happens the first
124
+ # time a schema is used.
125
+ data = avro.encode({ "title" => "hello, world" }, schema_name: "greeting")
126
+
127
+ # If you don't want to automatically register new schemas, you can pass explicitly
128
+ # subject and version to specify which schema should be used for encoding.
129
+ # It will fetch that schema from the registry and cache it. Subsequent instances
130
+ # of the same schema version will be served by the cache.
131
+ data = avro.encode({ "title" => "hello, world" }, subject: 'greeting', version: 1)
132
+
133
+ # You can also pass explicitly schema_id to specify which schema
134
+ # should be used for encoding.
135
+ # It will fetch that schema from the registry and cache it. Subsequent instances
136
+ # of the same schema version will be served by the cache.
137
+ data = avro.encode({ "title" => "hello, world" }, schema_id: 2)
138
+
139
+ # When decoding, the schema will be fetched from the registry and cached. Subsequent
140
+ # instances of the same schema id will be served by the cache.
141
+ avro.decode(data) #=> { "title" => "hello, world" }
142
+
143
+ # If you want to get decoded message as well as the schema used to encode the message,
144
+ # you can use `#decode_message` method.
145
+ result = avro.decode_message(data)
146
+ result.message #=> { "title" => "hello, world" }
147
+ result.schema_id #=> 3
148
+ result.writer_schema #=> #<Avro::Schema: ...>
149
+ result.reader_schema #=> nil
150
+ ```
151
+
152
+ ### Confluent Schema Registry Client
153
+
154
+ The ConfluentSchemaRegistry client used by the Messaging API can also be used directly.
155
+ It can check whether a schema is compatible with a subject in the registry using the [Compatibility API](http://docs.confluent.io/3.1.2/schema-registry/docs/api.html#compatibility):
156
+
157
+ ```ruby
158
+ require 'avro_turf'
159
+ require 'avro_turf/confluent_schema_registry'
160
+
161
+ schema = <<-JSON
162
+ {
163
+ "name": "person",
164
+ "type": "record",
165
+ "fields": [
166
+ {
167
+ "name": "full_name",
168
+ "type": "string"
169
+ },
170
+ {
171
+ "name": "address",
172
+ "type": "address"
173
+ }
174
+ ]
175
+ }
176
+ JSON
177
+
178
+ registry = AvroTurf::ConfluentSchemaRegistry.new("http://my-registry:8081/")
179
+
180
+ # Returns true if the schema is compatible, nil if the subject or version is not registered, and false if incompatible.
181
+ registry.compatible?("person", schema)
182
+ ```
183
+
184
+ The ConfluentSchemaRegistry client can also change the global compatibility level or the compatibility level for an individual subject using the [Config API](http://docs.confluent.io/3.1.2/schema-registry/docs/api.html#config):
185
+
186
+ ```ruby
187
+ registry.update_global_config(compatibility: 'FULL')
188
+ registry.update_subject_config("person", compatibility: 'NONE')
189
+ ```
190
+
191
+ ### Testing Support
192
+
193
+ AvroTurf includes a `FakeConfluentSchemaRegistryServer` that can be used in tests. The
194
+ fake schema registry server depends on Sinatra but it is _not_ listed as a runtime
195
+ dependency for AvroTurf. Sinatra must be added to your Gemfile or gemspec in order
196
+ to use the fake server.
197
+
198
+ Example using RSpec:
199
+
200
+ ```ruby
201
+ require 'avro_turf/test/fake_confluent_schema_registry_server'
202
+ require 'webmock/rspec'
203
+
204
+ # within an example
205
+ let(:registry_url) { "http://registry.example.com" }
206
+ before do
207
+ stub_request(:any, /^#{registry_url}/).to_rack(FakeConfluentSchemaRegistryServer)
208
+ FakeConfluentSchemaRegistryServer.clear
209
+ end
210
+
211
+ # Messaging objects created with the same registry_url will now use the fake server.
212
+ ```
data/Rakefile ADDED
@@ -0,0 +1,2 @@
1
+ require "bundler/gem_tasks"
2
+
data/lib/avro_turf.rb ADDED
@@ -0,0 +1,116 @@
1
+ begin
2
+ require 'avro-patches'
3
+ rescue LoadError
4
+ false
5
+ end
6
+ require 'avro_turf/version'
7
+ require 'avro'
8
+ require 'json'
9
+ require 'avro_turf/schema_store'
10
+ require 'avro_turf/core_ext'
11
+
12
+ # check for something that indicates Avro v1.9.0 or later
13
+ unless defined?(::Avro::LogicalTypes)
14
+ require 'avro_turf/schema_to_avro_patch'
15
+ end
16
+
17
+ class AvroTurf
18
+ class Error < StandardError; end
19
+ class SchemaError < Error; end
20
+ class SchemaNotFoundError < Error; end
21
+
22
+ DEFAULT_SCHEMAS_PATH = "./schemas"
23
+
24
+ # Create a new AvroTurf instance with the specified configuration.
25
+ #
26
+ # schemas_path - The String path to the root directory containing Avro schemas (default: "./schemas").
27
+ # schema_store - A schema store object that responds to #find(schema_name, namespace).
28
+ # namespace - The String namespace that should be used to qualify schema names (optional).
29
+ # codec - The String name of a codec that should be used to compress messages (optional).
30
+ #
31
+ # Currently, the only valid codec name is `deflate`.
32
+ def initialize(schemas_path: nil, schema_store: nil, namespace: nil, codec: nil)
33
+ @namespace = namespace
34
+ @schema_store = schema_store ||
35
+ SchemaStore.new(path: schemas_path || DEFAULT_SCHEMAS_PATH)
36
+ @codec = codec
37
+ end
38
+
39
+ # Encodes data to Avro using the specified schema.
40
+ #
41
+ # data - The data that should be encoded.
42
+ # schema_name - The name of a schema in the `schemas_path`.
43
+ #
44
+ # Returns a String containing the encoded data.
45
+ def encode(data, schema_name: nil, namespace: @namespace)
46
+ stream = StringIO.new
47
+
48
+ encode_to_stream(data, stream: stream, schema_name: schema_name, namespace: namespace)
49
+
50
+ stream.string
51
+ end
52
+
53
+ # Encodes data to Avro using the specified schema and writes it to the
54
+ # specified stream.
55
+ #
56
+ # data - The data that should be encoded.
57
+ # schema_name - The name of a schema in the `schemas_path`.
58
+ # stream - An IO object that the encoded data should be written to (optional).
59
+ #
60
+ # Returns nothing.
61
+ def encode_to_stream(data, schema_name: nil, stream: nil, namespace: @namespace)
62
+ schema = @schema_store.find(schema_name, namespace)
63
+ writer = Avro::IO::DatumWriter.new(schema)
64
+
65
+ dw = Avro::DataFile::Writer.new(stream, writer, schema, @codec)
66
+ dw << data.as_avro
67
+ dw.close
68
+ end
69
+
70
+ # Decodes Avro data.
71
+ #
72
+ # encoded_data - A String containing Avro-encoded data.
73
+ # schema_name - The String name of the schema that should be used to read
74
+ # the data. If nil, the writer schema will be used.
75
+ # namespace - The namespace of the Avro schema used to decode the data.
76
+ #
77
+ # Returns whatever is encoded in the data.
78
+ def decode(encoded_data, schema_name: nil, namespace: @namespace)
79
+ stream = StringIO.new(encoded_data)
80
+ decode_stream(stream, schema_name: schema_name, namespace: namespace)
81
+ end
82
+
83
+ # Decodes Avro data from an IO stream.
84
+ #
85
+ # stream - An IO object containing Avro data.
86
+ # schema_name - The String name of the schema that should be used to read
87
+ # the data. If nil, the writer schema will be used.
88
+ # namespace - The namespace of the Avro schema used to decode the data.
89
+ #
90
+ # Returns whatever is encoded in the stream.
91
+ def decode_stream(stream, schema_name: nil, namespace: @namespace)
92
+ schema = schema_name && @schema_store.find(schema_name, namespace)
93
+ reader = Avro::IO::DatumReader.new(nil, schema)
94
+ dr = Avro::DataFile::Reader.new(stream, reader)
95
+ dr.first
96
+ end
97
+
98
+ # Validates data against an Avro schema.
99
+ #
100
+ # data - The data that should be validated.
101
+ # schema - The String name of the schema that should be used to validate
102
+ # the data.
103
+ # namespace - The namespace of the Avro schema (optional).
104
+ #
105
+ # Returns true if the data is valid, false otherwise.
106
+ def valid?(data, schema_name: nil, namespace: @namespace)
107
+ schema = schema_name && @schema_store.find(schema_name, namespace)
108
+
109
+ Avro::Schema.validate(schema, data.as_avro)
110
+ end
111
+
112
+ # Loads all schema definition files in the `schemas_dir`.
113
+ def load_schemas!
114
+ @schema_store.load_schemas!
115
+ end
116
+ end
data/lib/avro_turf/cached_confluent_schema_registry.rb ADDED
@@ -0,0 +1,39 @@
1
+ require 'avro_turf/confluent_schema_registry'
2
+ require 'avro_turf/in_memory_cache'
3
+ require 'avro_turf/disk_cache'
4
+
5
+ # Caches registrations and lookups to the schema registry in memory.
6
+ class AvroTurf::CachedConfluentSchemaRegistry
7
+
8
+ # Instantiate a new CachedConfluentSchemaRegistry instance with the given configuration.
9
+ # By default, uses a provided InMemoryCache to prevent repeated calls to the upstream registry.
10
+ #
11
+ # upstream - The upstream schema registry object that fully responds to all methods in the
12
+ # AvroTurf::ConfluentSchemaRegistry interface.
13
+ # cache - Optional user provided Cache object that responds to all methods in the AvroTurf::InMemoryCache interface.
14
+ def initialize(upstream, cache: nil)
15
+ @upstream = upstream
16
+ @cache = cache || AvroTurf::InMemoryCache.new()
17
+ end
18
+
19
+ # Delegate the following methods to the upstream
20
+ %i(subjects subject_versions check compatible?
21
+ global_config update_global_config subject_config update_subject_config).each do |name|
22
+ define_method(name) do |*args|
23
+ instance_variable_get(:@upstream).send(name, *args)
24
+ end
25
+ end
26
+
27
+ def fetch(id)
28
+ @cache.lookup_by_id(id) || @cache.store_by_id(id, @upstream.fetch(id))
29
+ end
30
+
31
+ def register(subject, schema)
32
+ @cache.lookup_by_schema(subject, schema) || @cache.store_by_schema(subject, schema, @upstream.register(subject, schema))
33
+ end
34
+
35
+ def subject_version(subject, version = 'latest')
36
+ @cache.lookup_by_version(subject, version) ||
37
+ @cache.store_by_version(subject, version, @upstream.subject_version(subject, version))
38
+ end
39
+ end
data/lib/avro_turf/cached_schema_registry.rb ADDED
@@ -0,0 +1,6 @@
1
+ require 'avro_turf/cached_confluent_schema_registry'
2
+
3
+ # AvroTurf::CachedSchemaRegistry is deprecated and will be removed in a future release.
4
+ # Use AvroTurf::CachedConfluentSchemaRegistry instead.
5
+
6
+ AvroTurf::CachedSchemaRegistry = AvroTurf::CachedConfluentSchemaRegistry
data/lib/avro_turf/confluent_schema_registry.rb ADDED
@@ -0,0 +1,125 @@
1
+ require 'excon'
2
+
3
+ class AvroTurf::ConfluentSchemaRegistry
4
+ CONTENT_TYPE = "application/vnd.schemaregistry.v1+json".freeze
5
+
6
+ def initialize(
7
+ url,
8
+ logger: Logger.new($stdout),
9
+ proxy: nil,
10
+ client_cert: nil,
11
+ client_key: nil,
12
+ client_key_pass: nil,
13
+ client_cert_data: nil,
14
+ client_key_data: nil
15
+ )
16
+ @logger = logger
17
+ headers = {
18
+ "Content-Type" => CONTENT_TYPE
19
+ }
20
+ headers[:proxy] = proxy if proxy&.present?
21
+ @connection = Excon.new(
22
+ url,
23
+ headers: headers,
24
+ client_cert: client_cert,
25
+ client_key: client_key,
26
+ client_key_pass: client_key_pass,
27
+ client_cert_data: client_cert_data,
28
+ client_key_data: client_key_data
29
+ )
30
+ end
31
+
32
+ def fetch(id)
33
+ @logger.info "Fetching schema with id #{id}"
34
+ data = get("/schemas/ids/#{id}")
35
+ data.fetch("schema")
36
+ end
37
+
38
+ def register(subject, schema)
39
+ data = post("/subjects/#{subject}/versions", body: {
40
+ schema: schema.to_s
41
+ }.to_json)
42
+
43
+ id = data.fetch("id")
44
+
45
+ @logger.info "Registered schema for subject `#{subject}`; id = #{id}"
46
+
47
+ id
48
+ end
49
+
50
+ # List all subjects
51
+ def subjects
52
+ get('/subjects')
53
+ end
54
+
55
+ # List all versions for a subject
56
+ def subject_versions(subject)
57
+ get("/subjects/#{subject}/versions")
58
+ end
59
+
60
+ # Get a specific version for a subject
61
+ def subject_version(subject, version = 'latest')
62
+ get("/subjects/#{subject}/versions/#{version}")
63
+ end
64
+
65
+ # Check if a schema exists. Returns nil if not found.
66
+ def check(subject, schema)
67
+ data = post("/subjects/#{subject}",
68
+ expects: [200, 404],
69
+ body: { schema: schema.to_s }.to_json)
70
+ data unless data.has_key?("error_code")
71
+ end
72
+
73
+ # Check if a schema is compatible with the stored version.
74
+ # Returns:
75
+ # - true if compatible
76
+ # - nil if the subject or version does not exist
77
+ # - false if incompatible
78
+ # http://docs.confluent.io/3.1.2/schema-registry/docs/api.html#compatibility
79
+ def compatible?(subject, schema, version = 'latest')
80
+ data = post("/compatibility/subjects/#{subject}/versions/#{version}",
81
+ expects: [200, 404],
82
+ body: { schema: schema.to_s }.to_json)
83
+ data.fetch('is_compatible', false) unless data.has_key?('error_code')
84
+ end
85
+
86
+ # Get global config
87
+ def global_config
88
+ get("/config")
89
+ end
90
+
91
+ # Update global config
92
+ def update_global_config(config)
93
+ put("/config", { body: config.to_json })
94
+ end
95
+
96
+ # Get config for subject
97
+ def subject_config(subject)
98
+ get("/config/#{subject}")
99
+ end
100
+
101
+ # Update config for subject
102
+ def update_subject_config(subject, config)
103
+ put("/config/#{subject}", { body: config.to_json })
104
+ end
105
+
106
+ private
107
+
108
+ def get(path, **options)
109
+ request(path, method: :get, **options)
110
+ end
111
+
112
+ def put(path, **options)
113
+ request(path, method: :put, **options)
114
+ end
115
+
116
+ def post(path, **options)
117
+ request(path, method: :post, **options)
118
+ end
119
+
120
+ def request(path, **options)
121
+ options = { expects: 200 }.merge!(options)
122
+ response = @connection.request(path: path, **options)
123
+ JSON.parse(response.body)
124
+ end
125
+ end
data/lib/avro_turf/core_ext.rb ADDED
@@ -0,0 +1,10 @@
1
+ require 'avro_turf/core_ext/string'
2
+ require 'avro_turf/core_ext/numeric'
3
+ require 'avro_turf/core_ext/enumerable'
4
+ require 'avro_turf/core_ext/hash'
5
+ require 'avro_turf/core_ext/time'
6
+ require 'avro_turf/core_ext/date'
7
+ require 'avro_turf/core_ext/symbol'
8
+ require 'avro_turf/core_ext/nil_class'
9
+ require 'avro_turf/core_ext/true_class'
10
+ require 'avro_turf/core_ext/false_class'
data/lib/avro_turf/core_ext/date.rb ADDED
@@ -0,0 +1,5 @@
1
+ class Date
2
+ def as_avro
3
+ iso8601
4
+ end
5
+ end
data/lib/avro_turf/core_ext/enumerable.rb ADDED
@@ -0,0 +1,5 @@
1
+ module Enumerable
2
+ def as_avro
3
+ map(&:as_avro)
4
+ end
5
+ end
data/lib/avro_turf/core_ext/false_class.rb ADDED
@@ -0,0 +1,5 @@
1
+ class FalseClass
2
+ def as_avro
3
+ self
4
+ end
5
+ end
data/lib/avro_turf/core_ext/hash.rb ADDED
@@ -0,0 +1,7 @@
1
+ class Hash
2
+ def as_avro
3
+ hsh = Hash.new
4
+ each {|k, v| hsh[k.as_avro] = v.as_avro }
5
+ hsh
6
+ end
7
+ end
data/lib/avro_turf/core_ext/nil_class.rb ADDED
@@ -0,0 +1,5 @@
1
+ class NilClass
2
+ def as_avro
3
+ self
4
+ end
5
+ end
data/lib/avro_turf/core_ext/numeric.rb ADDED
@@ -0,0 +1,5 @@
1
+ class Numeric
2
+ def as_avro
3
+ self
4
+ end
5
+ end
data/lib/avro_turf/core_ext/string.rb ADDED
@@ -0,0 +1,5 @@
1
+ class String
2
+ def as_avro
3
+ self
4
+ end
5
+ end
data/lib/avro_turf/core_ext/symbol.rb ADDED
@@ -0,0 +1,5 @@
1
+ class Symbol
2
+ def as_avro
3
+ to_s
4
+ end
5
+ end
data/lib/avro_turf/core_ext/time.rb ADDED
@@ -0,0 +1,5 @@
1
+ class Time
2
+ def as_avro
3
+ iso8601
4
+ end
5
+ end
data/lib/avro_turf/core_ext/true_class.rb ADDED
@@ -0,0 +1,5 @@
1
+ class TrueClass
2
+ def as_avro
3
+ self
4
+ end
5
+ end
data/lib/avro_turf/disk_cache.rb ADDED
@@ -0,0 +1,83 @@
1
+ # A cache for the CachedConfluentSchemaRegistry.
2
+ # Extends the InMemoryCache to provide a write-thru to disk for persistent cache.
3
+ class AvroTurf::DiskCache < AvroTurf::InMemoryCache
4
+
5
+ def initialize(disk_path)
6
+ super()
7
+
8
+ # load the write-thru cache on startup, if it exists
9
+ @schemas_by_id_path = File.join(disk_path, 'schemas_by_id.json')
10
+ @schemas_by_id = JSON.parse(File.read(@schemas_by_id_path)) if File.exist?(@schemas_by_id_path)
11
+
12
+ @ids_by_schema_path = File.join(disk_path, 'ids_by_schema.json')
13
+ @ids_by_schema = JSON.parse(File.read(@ids_by_schema_path)) if File.exist?(@ids_by_schema_path)
14
+
15
+ @schemas_by_subject_version_path = File.join(disk_path, 'schemas_by_subject_version.json')
16
+ @schemas_by_subject_version = {}
17
+ end
18
+
19
+ # override
20
+ # the write-thru cache (json) does not store keys in numeric format
21
+ # so, convert id to a string for caching purposes
22
+ def lookup_by_id(id)
23
+ super(id.to_s)
24
+ end
25
+
26
+ # override to include write-thru cache after storing result from upstream
27
+ def store_by_id(id, schema)
28
+ # must return the value from storing the result (i.e. do not return result from file write)
29
+ value = super(id.to_s, schema)
30
+ File.write(@schemas_by_id_path, JSON.pretty_generate(@schemas_by_id))
31
+ return value
32
+ end
33
+
34
+ # override to include write-thru cache after storing result from upstream
35
+ def store_by_schema(subject, schema, id)
36
+ # must return the value from storing the result (i.e. do not return result from file write)
37
+ value = super
38
+ File.write(@ids_by_schema_path, JSON.pretty_generate(@ids_by_schema))
39
+ return value
40
+ end
41
+
42
+ # checks instance var (in-memory cache) for schema
43
+ # checks disk cache if in-memory cache doesn't exists
44
+ # if file exists but no in-memory cache, read from file and sync in-memory cache
45
+ # finally, if file doesn't exist return nil
46
+ def lookup_by_version(subject, version)
47
+ key = "#{subject}#{version}"
48
+ schema = @schemas_by_subject_version[key]
49
+
50
+ return schema unless schema.nil?
51
+
52
+ hash = JSON.parse(File.read(@schemas_by_subject_version_path)) if File.exist?(@schemas_by_subject_version_path)
53
+ if hash
54
+ @schemas_by_subject_version = hash
55
+ @schemas_by_subject_version[key]
56
+ end
57
+ end
58
+
59
+ # check if file exists and parse json into a hash
60
+ # if file exists take json and overwite/insert schema at key
61
+ # if file doesn't exist create new hash
62
+ # write the new/updated hash to file
63
+ # update instance var (in memory-cache) to match
64
+ def store_by_version(subject, version, schema)
65
+ key = "#{subject}#{version}"
66
+ hash = JSON.parse(File.read(@schemas_by_subject_version_path)) if File.exist?(@schemas_by_subject_version_path)
67
+ hash = if hash
68
+ hash[key] = schema
69
+ hash
70
+ else
71
+ {key => schema}
72
+ end
73
+
74
+ write_to_disk_cache(@schemas_by_subject_version_path, hash)
75
+
76
+ @schemas_by_subject_version = hash
77
+ @schemas_by_subject_version[key]
78
+ end
79
+
80
+ private def write_to_disk_cache(path, hash)
81
+ File.write(path, JSON.pretty_generate(hash))
82
+ end
83
+ end
data/lib/avro_turf/in_memory_cache.rb ADDED
@@ -0,0 +1,38 @@
1
+ # A cache for the CachedConfluentSchemaRegistry.
2
+ # Simply stores the schemas and ids in in-memory hashes.
3
+ class AvroTurf::InMemoryCache
4
+
5
+ def initialize
6
+ @schemas_by_id = {}
7
+ @ids_by_schema = {}
8
+ @schema_by_subject_version = {}
9
+ end
10
+
11
+ def lookup_by_id(id)
12
+ @schemas_by_id[id]
13
+ end
14
+
15
+ def store_by_id(id, schema)
16
+ @schemas_by_id[id] = schema
17
+ end
18
+
19
+ def lookup_by_schema(subject, schema)
20
+ key = subject + schema.to_s
21
+ @ids_by_schema[key]
22
+ end
23
+
24
+ def store_by_schema(subject, schema, id)
25
+ key = subject + schema.to_s
26
+ @ids_by_schema[key] = id
27
+ end
28
+
29
+ def lookup_by_version(subject, version)
30
+ key = "#{subject}#{version}"
31
+ @schema_by_subject_version[key]
32
+ end
33
+
34
+ def store_by_version(subject, version, schema)
35
+ key = "#{subject}#{version}"
36
+ @schema_by_subject_version[key] = schema
37
+ end
38
+ end
data/lib/avro_turf/messaging.rb ADDED
@@ -0,0 +1,199 @@
1
+ require 'logger'
2
+ require 'avro_turf'
3
+ require 'avro_turf/schema_store'
4
+ require 'avro_turf/confluent_schema_registry'
5
+ require 'avro_turf/cached_confluent_schema_registry'
6
+
7
+ # For back-compatibility require the aliases along with the Messaging API.
8
+ # These names are deprecated and will be removed in a future release.
9
+ require 'avro_turf/schema_registry'
10
+ require 'avro_turf/cached_schema_registry'
11
+
12
+ class AvroTurf
13
+
14
+ # Provides a way to encode and decode messages without having to embed schemas
15
+ # in the encoded data. Confluent's Schema Registry[1] is used to register
16
+ # a schema when encoding a message -- the registry will issue a schema id that
17
+ # will be included in the encoded data alongside the actual message. When
18
+ # decoding the data, the schema id will be used to look up the writer's schema
19
+ # from the registry.
20
+ #
21
+ # 1: https://github.com/confluentinc/schema-registry
22
+ class Messaging
23
+ MAGIC_BYTE = [0].pack("C").freeze
24
+ DecodedMessage = Struct.new(:schema_id, :writer_schema, :reader_schema, :message)
25
+ private_constant(:DecodedMessage)
26
+
27
+ # Instantiate a new Messaging instance with the given configuration.
28
+ #
29
+ # registry - A schema registry object that responds to all methods in the
30
+ # AvroTurf::ConfluentSchemaRegistry interface.
31
+ # registry_url - The String URL of the schema registry that should be used.
32
+ # schema_store - A schema store object that responds to #find(schema_name, namespace).
33
+ # schemas_path - The String file system path where local schemas are stored.
34
+ # namespace - The String default schema namespace.
35
+ # logger - The Logger that should be used to log information (optional).
36
+ # proxy - Forward the request via proxy (optional).
37
+ # client_cert - Name of file containing client certificate (optional).
38
+ # client_key - Name of file containing client private key to go with client_cert (optional).
39
+ # client_key_pass - Password to go with client_key (optional).
40
+ # client_cert_data - In-memory client certificate (optional).
41
+ # client_key_data - In-memory client private key to go with client_cert_data (optional).
42
+ def initialize(
43
+ registry: nil,
44
+ registry_url: nil,
45
+ schema_store: nil,
46
+ schemas_path: nil,
47
+ namespace: nil,
48
+ logger: nil,
49
+ proxy: nil,
50
+ client_cert: nil,
51
+ client_key: nil,
52
+ client_key_pass: nil,
53
+ client_cert_data: nil,
54
+ client_key_data: nil
55
+ )
56
+ @logger = logger || Logger.new($stderr)
57
+ @namespace = namespace
58
+ @schema_store = schema_store || SchemaStore.new(path: schemas_path || DEFAULT_SCHEMAS_PATH)
59
+ @registry = registry || CachedConfluentSchemaRegistry.new(
60
+ ConfluentSchemaRegistry.new(
61
+ registry_url,
62
+ logger: @logger,
63
+ proxy: proxy,
64
+ client_cert: client_cert,
65
+ client_key: client_key,
66
+ client_key_pass: client_key_pass,
67
+ client_cert_data: client_cert_data,
68
+ client_key_data: client_key_data
69
+ )
70
+ )
71
+ @schemas_by_id = {}
72
+ end
73
+
74
+ # Encodes a message using the specified schema.
75
+ #
76
+ # message - The message that should be encoded. Must be compatible with
77
+ # the schema.
78
+ # schema_name - The String name of the schema that should be used to encode
79
+ # the data.
80
+ # namespace - The namespace of the schema (optional).
81
+ # subject - The subject name the schema should be registered under in
82
+ # the schema registry (optional).
83
+ # version - The integer version of the schema that should be used to decode
84
+ # the data. Must match the schema used when encoding (optional).
85
+ # schema_id - The integer id of the schema that should be used to encode
86
+ # the data.
87
+ #
88
+ # Returns the encoded data as a String.
89
+ def encode(message, schema_name: nil, namespace: @namespace, subject: nil, version: nil, schema_id: nil)
90
+ schema_id, schema = if schema_id
91
+ fetch_schema_by_id(schema_id)
92
+ elsif subject && version
93
+ fetch_schema(subject, version)
94
+ elsif schema_name
95
+ register_schema(subject, schema_name, namespace)
96
+ else
97
+ raise ArgumentError.new('Neither schema_name nor schema_id nor subject + version provided to determine the schema.')
98
+ end
99
+
100
+ stream = StringIO.new
101
+ writer = Avro::IO::DatumWriter.new(schema)
102
+ encoder = Avro::IO::BinaryEncoder.new(stream)
103
+
104
+ # Always start with the magic byte.
105
+ encoder.write(MAGIC_BYTE)
106
+
107
+ # The schema id is encoded as a 4-byte big-endian integer.
108
+ encoder.write([schema_id].pack("N"))
109
+
110
+ # The actual message comes last.
111
+ writer.write(message, encoder)
112
+
113
+ stream.string
114
+ rescue Excon::Error::NotFound
115
+ if schema_id
116
+ raise SchemaNotFoundError.new("Schema with id: #{schema_id} is not found on registry")
117
+ else
118
+ raise SchemaNotFoundError.new("Schema with subject: `#{subject}` version: `#{version}` is not found on registry")
119
+ end
120
+ end
121
+
122
+ # Decodes data into the original message.
123
+ #
124
+ # data - A String containing encoded data.
125
+ # schema_name - The String name of the schema that should be used to decode
126
+ # the data. Must match the schema used when encoding (optional).
127
+ # namespace - The namespace of the schema (optional).
128
+ #
129
+ # Returns the decoded message.
130
+ def decode(data, schema_name: nil, namespace: @namespace)
131
+ decode_message(data, schema_name: schema_name, namespace: namespace).message
132
+ end
133
+
134
+ # Decodes data into the original message.
135
+ #
136
+ # data - A String containing encoded data.
137
+ # schema_name - The String name of the schema that should be used to decode
138
+ # the data. Must match the schema used when encoding (optional).
139
+ # namespace - The namespace of the schema (optional).
140
+ #
141
+ # Returns Struct with the next attributes:
142
+ # schema_id - The integer id of schema used to encode the message
143
+ # message - The decoded message
144
+ def decode_message(data, schema_name: nil, namespace: @namespace)
145
+ readers_schema = schema_name && @schema_store.find(schema_name, namespace)
146
+ stream = StringIO.new(data)
147
+ decoder = Avro::IO::BinaryDecoder.new(stream)
148
+
149
+ # The first byte is MAGIC!!!
150
+ magic_byte = decoder.read(1)
151
+
152
+ if magic_byte != MAGIC_BYTE
153
+ raise "Expected data to begin with a magic byte, got `#{magic_byte.inspect}`"
154
+ end
155
+
156
+ # The schema id is a 4-byte big-endian integer.
157
+ schema_id = decoder.read(4).unpack("N").first
158
+
159
+ writers_schema = @schemas_by_id.fetch(schema_id) do
160
+ schema_json = @registry.fetch(schema_id)
161
+ @schemas_by_id[schema_id] = Avro::Schema.parse(schema_json)
162
+ end
163
+
164
+ reader = Avro::IO::DatumReader.new(writers_schema, readers_schema)
165
+ message = reader.read(decoder)
166
+
167
+ DecodedMessage.new(schema_id, writers_schema, readers_schema, message)
168
+ rescue Excon::Error::NotFound
169
+ raise SchemaNotFoundError.new("Schema with id: #{schema_id} is not found on registry")
170
+ end
171
+
172
+ private
173
+
174
+ # Providing subject and version to determine the schema,
175
+ # which skips the auto registeration of schema on the schema registry.
176
+ # Fetch the schema from registry with the provided subject name and version.
177
+ def fetch_schema(subject, version)
178
+ schema_data = @registry.subject_version(subject, version)
179
+ schema_id = schema_data.fetch('id')
180
+ schema = Avro::Schema.parse(schema_data.fetch('schema'))
181
+ [schema_id, schema]
182
+ end
183
+
184
+ # Fetch the schema from registry with the provided schema_id.
185
+ def fetch_schema_by_id(schema_id)
186
+ schema_json = @registry.fetch(schema_id)
187
+ schema = Avro::Schema.parse(schema_json)
188
+ [schema_id, schema]
189
+ end
190
+
191
+ # Schemas are registered under the full name of the top level Avro record
192
+ # type, or `subject` if it's provided.
193
+ def register_schema(subject, schema_name, namespace)
194
+ schema = @schema_store.find(schema_name, namespace)
195
+ schema_id = @registry.register(subject || schema.fullname, schema)
196
+ [schema_id, schema]
197
+ end
198
+ end
199
+ end
data/lib/avro_turf/mutable_schema_store.rb ADDED
@@ -0,0 +1,18 @@
1
+ require 'avro_turf/schema_store'
2
+
3
+ class AvroTurf
4
+ # A schema store that allows you to add or remove schemas, and to access
5
+ # them externally.
6
+ class MutableSchemaStore < SchemaStore
7
+ attr_accessor :schemas
8
+
9
+ # @param schema_hash [Hash]
10
+ def add_schema(schema_hash)
11
+ name = schema_hash['name']
12
+ namespace = schema_hash['namespace']
13
+ full_name = Avro::Name.make_fullname(name, namespace)
14
+ return if @schemas.key?(full_name)
15
+ Avro::Schema.real_parse(schema_hash, @schemas)
16
+ end
17
+ end
18
+ end
data/lib/avro_turf/schema_registry.rb ADDED
@@ -0,0 +1,6 @@
1
+ require 'avro_turf/confluent_schema_registry'
2
+
3
+ # AvroTurf::SchemaRegistry is deprecated and will be removed in a future release.
4
+ # Use AvroTurf::ConfluentSchemaRegistry instead.
5
+
6
+ AvroTurf::SchemaRegistry = AvroTurf::ConfluentSchemaRegistry
data/lib/avro_turf/schema_store.rb ADDED
@@ -0,0 +1,75 @@
1
+ class AvroTurf::SchemaStore
2
+
3
+ def initialize(path: nil)
4
+ @path = path or raise "Please specify a schema path"
5
+ @schemas = Hash.new
6
+ @mutex = Mutex.new
7
+ end
8
+
9
+ # Resolves and returns a schema.
10
+ #
11
+ # schema_name - The String name of the schema to resolve.
12
+ #
13
+ # Returns an Avro::Schema.
14
+ def find(name, namespace = nil)
15
+ fullname = Avro::Name.make_fullname(name, namespace)
16
+ # Optimistic non-blocking read from @schemas
17
+ # No sense to lock the resource when all the schemas already loaded
18
+ return @schemas[fullname] if @schemas.key?(fullname)
19
+
20
+ # Pessimistic blocking write to @schemas
21
+ @mutex.synchronize do
22
+ # Still need to check is the schema already loaded
23
+ return @schemas[fullname] if @schemas.key?(fullname)
24
+
25
+ load_schema!(fullname, namespace)
26
+ end
27
+ end
28
+
29
+ # Loads all schema definition files in the `schemas_dir`.
30
+ def load_schemas!
31
+ pattern = [@path, "**", "*.avsc"].join("/")
32
+
33
+ Dir.glob(pattern) do |schema_path|
34
+ # Remove the path prefix.
35
+ schema_path.sub!(/^\/?#{@path}\//, "")
36
+
37
+ # Replace `/` with `.` and chop off the file extension.
38
+ schema_name = File.basename(schema_path.tr("/", "."), ".avsc")
39
+
40
+ # Load and cache the schema.
41
+ find(schema_name)
42
+ end
43
+ end
44
+
45
+ private
46
+
47
+ # Loads single schema
48
+ # Such method is not thread-safe, do not call it of from mutex synchronization routine
49
+ def load_schema!(fullname, namespace = nil)
50
+ *namespace, schema_name = fullname.split(".")
51
+ schema_path = File.join(@path, *namespace, schema_name + ".avsc")
52
+ schema_json = JSON.parse(File.read(schema_path))
53
+ schema = Avro::Schema.real_parse(schema_json, @schemas)
54
+
55
+ if schema.respond_to?(:fullname) && schema.fullname != fullname
56
+ raise AvroTurf::SchemaError, "expected schema `#{schema_path}' to define type `#{fullname}'"
57
+ end
58
+
59
+ schema
60
+ rescue ::Avro::SchemaParseError => e
61
+ # This is a hack in order to figure out exactly which type was missing. The
62
+ # Avro gem ought to provide this data directly.
63
+ if e.to_s =~ /"([\w\.]+)" is not a schema we know about/
64
+ load_schema!($1)
65
+
66
+ # Re-resolve the original schema now that the dependency has been resolved.
67
+ @schemas.delete(fullname)
68
+ load_schema!(fullname)
69
+ else
70
+ raise
71
+ end
72
+ rescue Errno::ENOENT, Errno::ENAMETOOLONG
73
+ raise AvroTurf::SchemaNotFoundError, "could not find Avro schema at `#{schema_path}'"
74
+ end
75
+ end
data/lib/avro_turf/schema_to_avro_patch.rb ADDED
@@ -0,0 +1,52 @@
1
+ class AvroTurf
2
+ module AvroGemPatch
3
+ module RecordSchema
4
+ module ClassMethods
5
+ def make_field_objects(field_data, names, namespace=nil)
6
+ new_field_data = []
7
+ field_data.each do |field|
8
+ if field.respond_to?(:[]) && !field.key?('default')
9
+ field = field.clone
10
+ field['default'] = :no_default
11
+ end
12
+ new_field_data << field
13
+ end
14
+ super(new_field_data, names, namespace)
15
+ end
16
+ end
17
+
18
+ def self.prepended(base)
19
+ class << base
20
+ prepend ClassMethods
21
+ end
22
+ end
23
+ end
24
+
25
+ module Field
26
+ def initialize(type, name, default=:no_default, order=nil, names=nil, namespace=nil)
27
+ super(type, name, default, order, names, namespace)
28
+ end
29
+
30
+ def to_avro(names=Set.new)
31
+ {'name' => name, 'type' => type.to_avro(names)}.tap do |avro|
32
+ avro['default'] = default unless default == :no_default
33
+ avro['order'] = order if order
34
+ end
35
+ end
36
+ end
37
+
38
+ module DatumReader
39
+ def read_default_value(field_schema, default_value)
40
+ if default_value == :no_default
41
+ raise Avro::AvroError, "Missing data for #{field_schema} with no default"
42
+ end
43
+
44
+ super
45
+ end
46
+ end
47
+ end
48
+ end
49
+
50
+ Avro::Schema::RecordSchema.send(:prepend, AvroTurf::AvroGemPatch::RecordSchema)
51
+ Avro::Schema::Field.send(:prepend, AvroTurf::AvroGemPatch::Field)
52
+ Avro::IO::DatumReader.send(:prepend, AvroTurf::AvroGemPatch::DatumReader)
data/lib/avro_turf/test/fake_confluent_schema_registry_server.rb ADDED
@@ -0,0 +1,141 @@
1
+ require 'sinatra/base'
2
+
3
+ class FakeConfluentSchemaRegistryServer < Sinatra::Base
4
+ SUBJECTS = Hash.new { Array.new }
5
+ SCHEMAS = []
6
+ CONFIGS = Hash.new
7
+ SUBJECT_NOT_FOUND = { error_code: 40401, message: 'Subject not found' }.to_json.freeze
8
+ VERSION_NOT_FOUND = { error_code: 40402, message: 'Version not found' }.to_json.freeze
9
+ SCHEMA_NOT_FOUND = { error_code: 40403, message: 'Schema not found' }.to_json.freeze
10
+ DEFAULT_GLOBAL_CONFIG = { 'compatibility' => 'BACKWARD'.freeze }.freeze
11
+
12
+ @global_config = DEFAULT_GLOBAL_CONFIG.dup
13
+
14
+ class << self
15
+ attr_reader :global_config
16
+ end
17
+
18
+ helpers do
19
+ def parse_schema
20
+ request.body.rewind
21
+ JSON.parse(request.body.read).fetch("schema").tap do |schema|
22
+ Avro::Schema.parse(schema)
23
+ end
24
+ end
25
+
26
+ def parse_config
27
+ request.body.rewind
28
+ JSON.parse(request.body.read)
29
+ end
30
+
31
+ def global_config
32
+ self.class.global_config
33
+ end
34
+ end
35
+
36
+ post "/subjects/:subject/versions" do
37
+ schema = parse_schema
38
+ ids_for_subject = SUBJECTS[params[:subject]]
39
+
40
+ schemas_for_subject =
41
+ SCHEMAS.select
42
+ .with_index { |_, i| ids_for_subject.include?(i) }
43
+
44
+ if schemas_for_subject.include?(schema)
45
+ schema_id = SCHEMAS.index(schema)
46
+ else
47
+ SCHEMAS << schema
48
+ schema_id = SCHEMAS.size - 1
49
+ SUBJECTS[params[:subject]] = SUBJECTS[params[:subject]] << schema_id
50
+ end
51
+
52
+ { id: schema_id }.to_json
53
+ end
54
+
55
+ get "/schemas/ids/:schema_id" do
56
+ schema = SCHEMAS.at(params[:schema_id].to_i)
57
+ halt(404, SCHEMA_NOT_FOUND) unless schema
58
+ { schema: schema }.to_json
59
+ end
60
+
61
+ get "/subjects" do
62
+ SUBJECTS.keys.to_json
63
+ end
64
+
65
+ get "/subjects/:subject/versions" do
66
+ schema_ids = SUBJECTS[params[:subject]]
67
+ halt(404, SUBJECT_NOT_FOUND) if schema_ids.empty?
68
+ (1..schema_ids.size).to_a.to_json
69
+ end
70
+
71
+ get "/subjects/:subject/versions/:version" do
72
+ schema_ids = SUBJECTS[params[:subject]]
73
+ halt(404, SUBJECT_NOT_FOUND) if schema_ids.empty?
74
+
75
+ schema_id = if params[:version] == 'latest'
76
+ schema_ids.last
77
+ else
78
+ schema_ids.at(Integer(params[:version]) - 1)
79
+ end
80
+ halt(404, VERSION_NOT_FOUND) unless schema_id
81
+
82
+ schema = SCHEMAS.at(schema_id)
83
+
84
+ {
85
+ name: params[:subject],
86
+ version: schema_ids.index(schema_id) + 1,
87
+ id: schema_id,
88
+ schema: schema
89
+ }.to_json
90
+ end
91
+
92
+ post "/subjects/:subject" do
93
+ schema = parse_schema
94
+
95
+ # Note: this does not actually handle the same schema registered under
96
+ # multiple subjects
97
+ schema_id = SCHEMAS.index(schema)
98
+
99
+ halt(404, SCHEMA_NOT_FOUND) unless schema_id
100
+
101
+ {
102
+ subject: params[:subject],
103
+ id: schema_id,
104
+ version: SUBJECTS[params[:subject]].index(schema_id) + 1,
105
+ schema: schema
106
+ }.to_json
107
+ end
108
+
109
+ post "/compatibility/subjects/:subject/versions/:version" do
110
+ # The ruby avro gem does not yet include a compatibility check between schemas.
111
+ # See https://github.com/apache/avro/pull/170
112
+ raise NotImplementedError
113
+ end
114
+
115
+ get "/config" do
116
+ global_config.to_json
117
+ end
118
+
119
+ put "/config" do
120
+ global_config.merge!(parse_config).to_json
121
+ end
122
+
123
+ get "/config/:subject" do
124
+ CONFIGS.fetch(params[:subject], global_config).to_json
125
+ end
126
+
127
+ put "/config/:subject" do
128
+ config = parse_config
129
+ subject = params[:subject]
130
+ CONFIGS.fetch(subject) do
131
+ CONFIGS[subject] = {}
132
+ end.merge!(config).to_json
133
+ end
134
+
135
+ def self.clear
136
+ SUBJECTS.clear
137
+ SCHEMAS.clear
138
+ CONFIGS.clear
139
+ @global_config = DEFAULT_GLOBAL_CONFIG.dup
140
+ end
141
+ end
data/lib/avro_turf/test/fake_schema_registry_server.rb ADDED
@@ -0,0 +1,6 @@
1
+ require 'avro_turf/test/fake_confluent_schema_registry_server'
2
+
3
+ # FakeSchemaRegistryServer is deprecated and will be removed in a future release.
4
+ # Use FakeConfluentSchemaRegistryServer instead.
5
+
6
+ FakeSchemaRegistryServer = FakeConfluentSchemaRegistryServer
data/lib/avro_turf/version.rb ADDED
@@ -0,0 +1,3 @@
1
+ class AvroTurf
2
+ VERSION = "0.1.0"
3
+ end
data/perf/address.avsc ADDED
@@ -0,0 +1,14 @@
1
+ {
2
+ "name": "address",
3
+ "type": "record",
4
+ "fields": [
5
+ {
6
+ "name": "street",
7
+ "type": "string"
8
+ },
9
+ {
10
+ "name": "city",
11
+ "type": "string"
12
+ }
13
+ ]
14
+ }
data/perf/encoding_size.rb ADDED
@@ -0,0 +1,26 @@
1
+ #!/usr/bin/env ruby
2
+ #
3
+ # Measures the encoded size of messages of increasing size.
4
+
5
+ $LOAD_PATH.unshift(File.expand_path("../lib", File.dirname(__FILE__)))
6
+
7
+ require 'benchmark'
8
+ require 'avro_turf'
9
+
10
+ sizes = [1, 10, 100, 1_000, 10_000]
11
+ avro = AvroTurf.new(schemas_path: File.dirname(__FILE__))
12
+
13
+ sizes.each do |size|
14
+ data = {
15
+ "name" => "John" * size,
16
+ "address" => {
17
+ "street" => "1st st." * size,
18
+ "city" => "Citytown" * size
19
+ }
20
+ }
21
+
22
+ result = avro.encode(data, schema_name: "person")
23
+ encoded_size = result.bytesize
24
+ encode_factor = result.bytesize / size.to_f
25
+ puts "size #{size}: #{encoded_size} bytes (encoding factor #{encode_factor})"
26
+ end
data/perf/encoding_speed.rb ADDED
@@ -0,0 +1,30 @@
1
+ #!/usr/bin/env ruby
2
+ #
3
+ # Measures the time it takes to encode messages of increasing size.
4
+
5
+ $LOAD_PATH.unshift(File.expand_path("../lib", File.dirname(__FILE__)))
6
+
7
+ require 'benchmark'
8
+ require 'avro_turf'
9
+
10
+ # Number of iterations per run.
11
+ N = 10_000
12
+
13
+ Benchmark.bm(15) do |x|
14
+ sizes = [1, 10, 100, 1_000, 10_000]
15
+ avro = AvroTurf.new(schemas_path: File.dirname(__FILE__))
16
+
17
+ sizes.each do |size|
18
+ data = {
19
+ "name" => "John" * size,
20
+ "address" => {
21
+ "street" => "1st st." * size,
22
+ "city" => "Citytown" * size
23
+ }
24
+ }
25
+
26
+ x.report("size #{size}:") do
27
+ N.times { avro.encode(data, schema_name: "person") }
28
+ end
29
+ end
30
+ end
data/perf/person.avsc ADDED
@@ -0,0 +1,14 @@
1
+ {
2
+ "name": "person",
3
+ "type": "record",
4
+ "fields": [
5
+ {
6
+ "name": "name",
7
+ "type": "string"
8
+ },
9
+ {
10
+ "name": "address",
11
+ "type": "address"
12
+ }
13
+ ]
14
+ }
data/ros-avro_turf.gemspec ADDED
@@ -0,0 +1,40 @@
1
+ # coding: utf-8
2
+ lib = File.expand_path('../lib', __FILE__)
3
+ $LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib)
4
+ require 'avro_turf/version'
5
+
6
+ Gem::Specification.new do |spec|
7
+ spec.name = "ros-avro_turf"
8
+ spec.version = AvroTurf::VERSION
9
+ spec.authors = ["Daniel Schierbeck"]
10
+ spec.email = ["dasch@zendesk.com"]
11
+ spec.summary = "A library that makes it easier to use the Avro serialization format from Ruby"
12
+ spec.homepage = "https://github.com/dasch/avro_turf"
13
+ spec.license = "MIT"
14
+
15
+ spec.files = `git ls-files -z`.split("\x0")
16
+ spec.executables = spec.files.grep(%r{^bin/}) { |f| File.basename(f) }
17
+ spec.test_files = spec.files.grep(%r{^(test|spec|features)/})
18
+ spec.require_paths = ["lib"]
19
+ spec.add_dependency "ros-avro"
20
+ spec.add_dependency "excon", "~> 0.45"
21
+
22
+ spec.add_development_dependency "bundler", "~> 2.0"
23
+ spec.add_development_dependency "rake", "~> 13.0"
24
+ spec.add_development_dependency "rspec", "~> 3.2"
25
+ spec.add_development_dependency "fakefs", "~> 0.20.0"
26
+ spec.add_development_dependency "webmock"
27
+ spec.add_development_dependency "sinatra"
28
+ spec.add_development_dependency "json_spec"
29
+ spec.add_development_dependency "rack-test"
30
+
31
+ spec.post_install_message = %{
32
+ avro_turf v0.8.0 deprecates the names AvroTurf::SchemaRegistry,
33
+ AvroTurf::CachedSchemaRegistry, and FakeSchemaRegistryServer.
34
+
35
+ Use AvroTurf::ConfluentSchemaRegistry, AvroTurf::CachedConfluentSchemaRegistry,
36
+ and FakeConfluentSchemaRegistryServer instead.
37
+
38
+ See https://github.com/dasch/avro_turf#deprecation-notice
39
+ }
40
+ end
data/spec/avro_turf_spec.rb ADDED
@@ -0,0 +1,161 @@
1
+ describe AvroTurf do
2
+ let(:avro) { AvroTurf.new(schemas_path: "spec/schemas/") }
3
+
4
+ before do
5
+ FileUtils.mkdir_p("spec/schemas")
6
+ end
7
+
8
+ describe "#encode" do
9
+ before do
10
+ define_schema "person.avsc", <<-AVSC
11
+ {
12
+ "name": "person",
13
+ "type": "record",
14
+ "fields": [
15
+ {
16
+ "type": "string",
17
+ "name": "full_name"
18
+ }
19
+ ]
20
+ }
21
+ AVSC
22
+ end
23
+
24
+ it "encodes data with Avro" do
25
+ data = {
26
+ "full_name" => "John Doe"
27
+ }
28
+
29
+ encoded_data = avro.encode(data, schema_name: "person")
30
+
31
+ expect(avro.decode(encoded_data)).to eq(data)
32
+ end
33
+
34
+ it "allows specifying a codec that should be used to compress messages" do
35
+ compressed_avro = AvroTurf.new(schemas_path: "spec/schemas/", codec: "deflate")
36
+
37
+ data = {
38
+ "full_name" => "John Doe" * 100
39
+ }
40
+
41
+ uncompressed_data = avro.encode(data, schema_name: "person")
42
+ compressed_data = compressed_avro.encode(data, schema_name: "person")
43
+
44
+ expect(compressed_data.bytesize).to be < uncompressed_data.bytesize
45
+ expect(compressed_avro.decode(compressed_data)).to eq(data)
46
+ end
47
+ end
48
+
49
+ describe "#decode" do
50
+ it "decodes Avro data using the inlined writer's schema" do
51
+ define_schema "message.avsc", <<-AVSC
52
+ {
53
+ "name": "message",
54
+ "type": "string"
55
+ }
56
+ AVSC
57
+
58
+ encoded_data = avro.encode("hello, world", schema_name: "message")
59
+
60
+ expect(avro.decode(encoded_data)).to eq "hello, world"
61
+ end
62
+
63
+ it "decodes Avro data using a specified reader's schema" do
64
+ FileUtils.mkdir_p("spec/schemas/reader")
65
+
66
+ define_schema "point.avsc", <<-AVSC
67
+ {
68
+ "name": "point",
69
+ "type": "record",
70
+ "fields": [
71
+ { "name": "x", "type": "long" },
72
+ { "name": "y", "type": "long" }
73
+ ]
74
+ }
75
+ AVSC
76
+
77
+ define_schema "reader/point.avsc", <<-AVSC
78
+ {
79
+ "name": "point",
80
+ "type": "record",
81
+ "fields": [
82
+ { "name": "x", "type": "long" }
83
+ ]
84
+ }
85
+ AVSC
86
+
87
+ encoded_data = avro.encode({ "x" => 42, "y" => 13 }, schema_name: "point")
88
+ reader_avro = AvroTurf.new(schemas_path: "spec/schemas/reader")
89
+
90
+ expect(reader_avro.decode(encoded_data, schema_name: "point")).to eq({ "x" => 42 })
91
+ end
92
+ end
93
+
94
+ describe "#encode_to_stream" do
95
+ it "writes encoded data to an existing stream" do
96
+ define_schema "message.avsc", <<-AVSC
97
+ {
98
+ "name": "message",
99
+ "type": "string"
100
+ }
101
+ AVSC
102
+
103
+ stream = StringIO.new
104
+ avro.encode_to_stream("hello", stream: stream, schema_name: "message")
105
+
106
+ expect(avro.decode(stream.string)).to eq "hello"
107
+ end
108
+ end
109
+
110
+ describe "#decode_stream" do
111
+ it "decodes Avro data from a stream" do
112
+ define_schema "message.avsc", <<-AVSC
113
+ {
114
+ "name": "message",
115
+ "type": "string"
116
+ }
117
+ AVSC
118
+
119
+ encoded_data = avro.encode("hello", schema_name: "message")
120
+ stream = StringIO.new(encoded_data)
121
+
122
+ expect(avro.decode_stream(stream)).to eq "hello"
123
+ end
124
+ end
125
+
126
+ describe "#valid?" do
127
+ before do
128
+ define_schema "message.avsc", <<-AVSC
129
+ {
130
+ "name": "message",
131
+ "type": "string"
132
+ }
133
+ AVSC
134
+ end
135
+
136
+ it "returns true if the datum matches the schema" do
137
+ datum = "hello"
138
+ expect(avro.valid?(datum, schema_name: "message")).to eq true
139
+ end
140
+
141
+ it "returns false if the datum does not match the schema" do
142
+ datum = 42
143
+ expect(avro.valid?(datum, schema_name: "message")).to eq false
144
+ end
145
+
146
+ it "handles symbol keys in hashes" do
147
+ define_schema "postcard.avsc", <<-AVSC
148
+ {
149
+ "name": "postcard",
150
+ "type": "record",
151
+ "fields": [
152
+ { "name": "message", "type": "string" }
153
+ ]
154
+ }
155
+ AVSC
156
+
157
+ datum = { message: "hello" }
158
+ expect(avro.valid?(datum, schema_name: "postcard")).to eq true
159
+ end
160
+ end
161
+ end
data/spec/cached_confluent_schema_registry_spec.rb ADDED
@@ -0,0 +1,63 @@
1
+ require 'webmock/rspec'
2
+ require 'avro_turf/cached_confluent_schema_registry'
3
+ require 'avro_turf/test/fake_confluent_schema_registry_server'
4
+
5
+ describe AvroTurf::CachedConfluentSchemaRegistry do
6
+ let(:upstream) { instance_double(AvroTurf::ConfluentSchemaRegistry) }
7
+ let(:registry) { described_class.new(upstream) }
8
+ let(:id) { rand(999) }
9
+ let(:schema) do
10
+ {
11
+ type: "record",
12
+ name: "person",
13
+ fields: [{ name: "name", type: "string" }]
14
+ }.to_json
15
+ end
16
+
17
+ describe "#fetch" do
18
+ it "caches the result of fetch" do
19
+ # multiple calls return same result, with only one upstream call
20
+ allow(upstream).to receive(:fetch).with(id).and_return(schema)
21
+ expect(registry.fetch(id)).to eq(schema)
22
+ expect(registry.fetch(id)).to eq(schema)
23
+ expect(upstream).to have_received(:fetch).exactly(1).times
24
+ end
25
+ end
26
+
27
+ describe "#register" do
28
+ let(:subject_name) { "a_subject" }
29
+
30
+ it "caches the result of register" do
31
+ # multiple calls return same result, with only one upstream call
32
+ allow(upstream).to receive(:register).with(subject_name, schema).and_return(id)
33
+ expect(registry.register(subject_name, schema)).to eq(id)
34
+ expect(registry.register(subject_name, schema)).to eq(id)
35
+ expect(upstream).to have_received(:register).exactly(1).times
36
+ end
37
+ end
38
+
39
+ describe '#subject_version' do
40
+ let(:subject_name) { 'a_subject' }
41
+ let(:version) { 1 }
42
+ let(:schema_with_meta) do
43
+ {
44
+ subject: subject_name,
45
+ id: 1,
46
+ version: 1,
47
+ schema: schema
48
+ }
49
+ end
50
+
51
+ it 'caches the result of subject_version' do
52
+ allow(upstream).to receive(:subject_version).with(subject_name, version).and_return(schema_with_meta)
53
+ registry.subject_version(subject_name, version)
54
+ registry.subject_version(subject_name, version)
55
+ expect(upstream).to have_received(:subject_version).exactly(1).times
56
+ end
57
+ end
58
+
59
+ it_behaves_like "a confluent schema registry client" do
60
+ let(:upstream) { AvroTurf::ConfluentSchemaRegistry.new(registry_url, logger: logger) }
61
+ let(:registry) { described_class.new(upstream) }
62
+ end
63
+ end
data/spec/confluent_schema_registry_spec.rb ADDED
@@ -0,0 +1,21 @@
1
+ require 'webmock/rspec'
2
+ require 'avro_turf/confluent_schema_registry'
3
+ require 'avro_turf/test/fake_confluent_schema_registry_server'
4
+
5
+ describe AvroTurf::ConfluentSchemaRegistry do
6
+ let(:client_cert) { "test client cert" }
7
+ let(:client_key) { "test client key" }
8
+ let(:client_key_pass) { "test client key password" }
9
+
10
+ it_behaves_like "a confluent schema registry client" do
11
+ let(:registry) {
12
+ described_class.new(
13
+ registry_url,
14
+ logger: logger,
15
+ client_cert: client_cert,
16
+ client_key: client_key,
17
+ client_key_pass: client_key_pass
18
+ )
19
+ }
20
+ end
21
+ end
data/spec/core_ext/date_spec.rb ADDED
@@ -0,0 +1,6 @@
1
+ describe Date, "#as_avro" do
2
+ it "returns an ISO8601 string describing the time" do
3
+ date = Date.today
4
+ expect(date.as_avro).to eq(date.iso8601)
5
+ end
6
+ end
data/spec/core_ext/enumerable_spec.rb ADDED
@@ -0,0 +1,12 @@
1
+ describe Enumerable, "#as_avro" do
2
+ it "returns an array" do
3
+ expect(Set.new.as_avro).to eq []
4
+ end
5
+
6
+ it "coerces the items to Avro" do
7
+ x = double(as_avro: "x")
8
+ y = double(as_avro: "y")
9
+
10
+ expect([x, y].as_avro).to eq ["x", "y"]
11
+ end
12
+ end
data/spec/core_ext/false_class_spec.rb ADDED
@@ -0,0 +1,5 @@
1
+ describe FalseClass, "#as_avro" do
2
+ it "returns itself" do
3
+ expect(false.as_avro).to eq false
4
+ end
5
+ end
data/spec/core_ext/hash_spec.rb ADDED
@@ -0,0 +1,8 @@
1
+ describe Hash, "#as_avro" do
2
+ it "coerces the keys and values to Avro" do
3
+ x = double(as_avro: "x")
4
+ y = double(as_avro: "y")
5
+
6
+ expect({ x => y }.as_avro).to eq({ "x" => "y" })
7
+ end
8
+ end
data/spec/core_ext/nil_class_spec.rb ADDED
@@ -0,0 +1,5 @@
1
+ describe NilClass, "#as_avro" do
2
+ it "returns itself" do
3
+ expect(nil.as_avro).to eq nil
4
+ end
5
+ end
data/spec/core_ext/numeric_spec.rb ADDED
@@ -0,0 +1,6 @@
1
+ describe Numeric, "#as_avro" do
2
+ it "returns the number itself" do
3
+ expect(42.as_avro).to eq 42
4
+ expect(4.2.as_avro).to eq 4.2
5
+ end
6
+ end
data/spec/core_ext/string_spec.rb ADDED
@@ -0,0 +1,5 @@
1
+ describe String, "#as_avro" do
2
+ it "returns itself" do
3
+ expect("hello".as_avro).to eq "hello"
4
+ end
5
+ end
data/spec/core_ext/symbol_spec.rb ADDED
@@ -0,0 +1,5 @@
1
+ describe Symbol, "#as_avro" do
2
+ it "returns the String representation of the Symbol" do
3
+ expect(:hello.as_avro).to eq("hello")
4
+ end
5
+ end
data/spec/core_ext/time_spec.rb ADDED
@@ -0,0 +1,6 @@
1
+ describe Time, "#as_avro" do
2
+ it "returns an ISO8601 string describing the time" do
3
+ time = Time.now
4
+ expect(time.as_avro).to eq(time.iso8601)
5
+ end
6
+ end
data/spec/core_ext/true_class_spec.rb ADDED
@@ -0,0 +1,5 @@
1
+ describe TrueClass, "#as_avro" do
2
+ it "returns itself" do
3
+ expect(true.as_avro).to eq true
4
+ end
5
+ end
data/spec/disk_cached_confluent_schema_registry_spec.rb ADDED
@@ -0,0 +1,159 @@
1
+ require 'webmock/rspec'
2
+ require 'avro_turf/cached_confluent_schema_registry'
3
+ require 'avro_turf/test/fake_confluent_schema_registry_server'
4
+
5
+ describe AvroTurf::CachedConfluentSchemaRegistry do
6
+ let(:upstream) { instance_double(AvroTurf::ConfluentSchemaRegistry) }
7
+ let(:cache) { AvroTurf::DiskCache.new("spec/cache")}
8
+ let(:registry) { described_class.new(upstream, cache: cache) }
9
+ let(:id) { rand(999) }
10
+ let(:schema) do
11
+ {
12
+ type: "record",
13
+ name: "person",
14
+ fields: [{ name: "name", type: "string" }]
15
+ }.to_json
16
+ end
17
+
18
+ let(:city_id) { rand(999) }
19
+ let(:city_schema) do
20
+ {
21
+ type: "record",
22
+ name: "city",
23
+ fields: [{ name: "name", type: "string" }]
24
+ }.to_json
25
+ end
26
+
27
+ let(:subject) { 'subject' }
28
+ let(:version) { rand(999) }
29
+ let(:subject_version_schema) do
30
+ {
31
+ subject: subject,
32
+ version: version,
33
+ id: id,
34
+ schema: {
35
+ type: "record",
36
+ name: "city",
37
+ fields: { name: "name", type: "string" }
38
+ }
39
+ }.to_json
40
+ end
41
+
42
+ before do
43
+ FileUtils.mkdir_p("spec/cache")
44
+ end
45
+
46
+ describe "#fetch" do
47
+ let(:cache_before) do
48
+ {
49
+ "#{id}" => "#{schema}"
50
+ }
51
+ end
52
+ let(:cache_after) do
53
+ {
54
+ "#{id}" => "#{schema}",
55
+ "#{city_id}" => "#{city_schema}"
56
+ }
57
+ end
58
+
59
+ # setup the disk cache to avoid performing the upstream fetch
60
+ before do
61
+ store_cache("schemas_by_id.json", cache_before)
62
+ end
63
+
64
+ it "uses preloaded disk cache" do
65
+ # multiple calls return same result, with zero upstream calls
66
+ allow(upstream).to receive(:fetch).with(id).and_return(schema)
67
+ expect(registry.fetch(id)).to eq(schema)
68
+ expect(registry.fetch(id)).to eq(schema)
69
+ expect(upstream).to have_received(:fetch).exactly(0).times
70
+ expect(load_cache("schemas_by_id.json")).to eq cache_before
71
+ end
72
+
73
+ it "writes thru to disk cache" do
74
+ # multiple calls return same result, with only one upstream call
75
+ allow(upstream).to receive(:fetch).with(city_id).and_return(city_schema)
76
+ expect(registry.fetch(city_id)).to eq(city_schema)
77
+ expect(registry.fetch(city_id)).to eq(city_schema)
78
+ expect(upstream).to have_received(:fetch).exactly(1).times
79
+ expect(load_cache("schemas_by_id.json")).to eq cache_after
80
+ end
81
+ end
82
+
83
+ describe "#register" do
84
+ let(:subject_name) { "a_subject" }
85
+ let(:cache_before) do
86
+ {
87
+ "#{subject_name}#{schema}" => id
88
+ }
89
+ end
90
+
91
+ let(:city_name) { "a_city" }
92
+ let(:cache_after) do
93
+ {
94
+ "#{subject_name}#{schema}" => id,
95
+ "#{city_name}#{city_schema}" => city_id
96
+ }
97
+ end
98
+
99
+ # setup the disk cache to avoid performing the upstream register
100
+ before do
101
+ store_cache("ids_by_schema.json", cache_before)
102
+ end
103
+
104
+ it "uses preloaded disk cache" do
105
+ # multiple calls return same result, with zero upstream calls
106
+ allow(upstream).to receive(:register).with(subject_name, schema).and_return(id)
107
+ expect(registry.register(subject_name, schema)).to eq(id)
108
+ expect(registry.register(subject_name, schema)).to eq(id)
109
+ expect(upstream).to have_received(:register).exactly(0).times
110
+ expect(load_cache("ids_by_schema.json")).to eq cache_before
111
+ end
112
+
113
+ it "writes thru to disk cache" do
114
+ # multiple calls return same result, with only one upstream call
115
+ allow(upstream).to receive(:register).with(city_name, city_schema).and_return(city_id)
116
+ expect(registry.register(city_name, city_schema)).to eq(city_id)
117
+ expect(registry.register(city_name, city_schema)).to eq(city_id)
118
+ expect(upstream).to have_received(:register).exactly(1).times
119
+ expect(load_cache("ids_by_schema.json")).to eq cache_after
120
+ end
121
+ end
122
+
123
+ describe "#subject_version" do
124
+ it "writes thru to disk cache" do
125
+ # multiple calls return same result, with zero upstream calls
126
+ allow(upstream).to receive(:subject_version).with(subject, version).and_return(subject_version_schema)
127
+ expect(File).not_to exist("./spec/cache/schemas_by_subject_version.json")
128
+
129
+ expect(registry.subject_version(subject, version)).to eq(subject_version_schema)
130
+
131
+ json = JSON.parse(File.read("./spec/cache/schemas_by_subject_version.json"))["#{subject}#{version}"]
132
+ expect(json).to eq(subject_version_schema)
133
+
134
+ expect(registry.subject_version(subject, version)).to eq(subject_version_schema)
135
+ expect(upstream).to have_received(:subject_version).exactly(1).times
136
+ end
137
+
138
+ it "reads from disk cache and populates mem cache" do
139
+ allow(upstream).to receive(:subject_version).with(subject, version).and_return(subject_version_schema)
140
+ key = "#{subject}#{version}"
141
+ hash = {key => subject_version_schema}
142
+ cache.send(:write_to_disk_cache, "./spec/cache/schemas_by_subject_version.json", hash)
143
+
144
+ cached_schema = cache.instance_variable_get(:@schemas_by_subject_version)
145
+ expect(cached_schema).to eq({})
146
+
147
+ expect(registry.subject_version(subject, version)).to eq(subject_version_schema)
148
+ expect(upstream).to have_received(:subject_version).exactly(0).times
149
+
150
+ cached_schema = cache.instance_variable_get(:@schemas_by_subject_version)
151
+ expect(cached_schema).to eq({key => subject_version_schema})
152
+ end
153
+ end
154
+
155
+ it_behaves_like "a confluent schema registry client" do
156
+ let(:upstream) { AvroTurf::ConfluentSchemaRegistry.new(registry_url, logger: logger) }
157
+ let(:registry) { described_class.new(upstream) }
158
+ end
159
+ end
data/spec/messaging_spec.rb ADDED
@@ -0,0 +1,300 @@
1
+ require 'webmock/rspec'
2
+ require 'avro_turf/messaging'
3
+ require 'avro_turf/test/fake_confluent_schema_registry_server'
4
+
5
+ describe AvroTurf::Messaging do
6
+ let(:registry_url) { "http://registry.example.com" }
7
+ let(:client_cert) { "test client cert" }
8
+ let(:client_key) { "test client key" }
9
+ let(:client_key_pass) { "test client key password" }
10
+ let(:logger) { Logger.new(StringIO.new) }
11
+
12
+ let(:avro) {
13
+ AvroTurf::Messaging.new(
14
+ registry_url: registry_url,
15
+ schemas_path: "spec/schemas",
16
+ logger: logger,
17
+ client_cert: client_cert,
18
+ client_key: client_key,
19
+ client_key_pass: client_key_pass
20
+ )
21
+ }
22
+
23
+ let(:message) { { "full_name" => "John Doe" } }
24
+ let(:schema_json) do
25
+ <<-AVSC
26
+ {
27
+ "name": "person",
28
+ "type": "record",
29
+ "fields": [
30
+ {
31
+ "type": "string",
32
+ "name": "full_name"
33
+ }
34
+ ]
35
+ }
36
+ AVSC
37
+ end
38
+ let(:schema) { Avro::Schema.parse(schema_json) }
39
+
40
+ before do
41
+ FileUtils.mkdir_p("spec/schemas")
42
+ end
43
+
44
+ before do
45
+ stub_request(:any, /^#{registry_url}/).to_rack(FakeConfluentSchemaRegistryServer)
46
+ FakeConfluentSchemaRegistryServer.clear
47
+ end
48
+
49
+ before do
50
+ define_schema "person.avsc", schema_json
51
+ end
52
+
53
+ shared_examples_for "encoding and decoding with the schema from schema store" do
54
+ it "encodes and decodes messages" do
55
+ data = avro.encode(message, schema_name: "person")
56
+ expect(avro.decode(data)).to eq message
57
+ end
58
+
59
+ it "allows specifying a reader's schema" do
60
+ data = avro.encode(message, schema_name: "person")
61
+ expect(avro.decode(data, schema_name: "person")).to eq message
62
+ end
63
+
64
+ it "caches parsed schemas for decoding" do
65
+ data = avro.encode(message, schema_name: "person")
66
+ avro.decode(data)
67
+ allow(Avro::Schema).to receive(:parse).and_call_original
68
+ expect(avro.decode(data)).to eq message
69
+ expect(Avro::Schema).not_to have_received(:parse)
70
+ end
71
+ end
72
+
73
+ shared_examples_for 'encoding and decoding with the schema from registry' do
74
+ before do
75
+ registry = AvroTurf::ConfluentSchemaRegistry.new(registry_url, logger: logger)
76
+ registry.register('person', schema)
77
+ registry.register('people', schema)
78
+ end
79
+
80
+ it 'encodes and decodes messages' do
81
+ data = avro.encode(message, subject: 'person', version: 1)
82
+ expect(avro.decode(data)).to eq message
83
+ end
84
+
85
+ it "allows specifying a reader's schema by subject and version" do
86
+ data = avro.encode(message, subject: 'person', version: 1)
87
+ expect(avro.decode(data, schema_name: 'person')).to eq message
88
+ end
89
+
90
+ it 'raises AvroTurf::SchemaNotFoundError when the schema does not exist on registry' do
91
+ expect { avro.encode(message, subject: 'missing', version: 1) }.to raise_error(AvroTurf::SchemaNotFoundError)
92
+ end
93
+
94
+ it 'caches parsed schemas for decoding' do
95
+ data = avro.encode(message, subject: 'person', version: 1)
96
+ avro.decode(data)
97
+ allow(Avro::Schema).to receive(:parse).and_call_original
98
+ expect(avro.decode(data)).to eq message
99
+ expect(Avro::Schema).not_to have_received(:parse)
100
+ end
101
+ end
102
+
103
+ shared_examples_for 'encoding and decoding with the schema_id from registry' do
104
+ before do
105
+ registry = AvroTurf::ConfluentSchemaRegistry.new(registry_url, logger: logger)
106
+ registry.register('person', schema)
107
+ registry.register('people', schema)
108
+ end
109
+
110
+ it 'encodes and decodes messages' do
111
+ data = avro.encode(message, schema_id: 1)
112
+ expect(avro.decode(data)).to eq message
113
+ end
114
+
115
+ it 'raises AvroTurf::SchemaNotFoundError when the schema does not exist on registry' do
116
+ expect { avro.encode(message, schema_id: 5) }.to raise_error(AvroTurf::SchemaNotFoundError)
117
+ end
118
+
119
+ it 'caches parsed schemas for decoding' do
120
+ data = avro.encode(message, schema_id: 1)
121
+ avro.decode(data)
122
+ allow(Avro::Schema).to receive(:parse).and_call_original
123
+ expect(avro.decode(data)).to eq message
124
+ expect(Avro::Schema).not_to have_received(:parse)
125
+ end
126
+ end
127
+
128
+ it_behaves_like "encoding and decoding with the schema from schema store"
129
+
130
+ it_behaves_like 'encoding and decoding with the schema from registry'
131
+
132
+ it_behaves_like 'encoding and decoding with the schema_id from registry'
133
+
134
+ context "with a provided registry" do
135
+ let(:registry) { AvroTurf::ConfluentSchemaRegistry.new(registry_url, logger: logger) }
136
+
137
+ let(:avro) do
138
+ AvroTurf::Messaging.new(
139
+ registry: registry,
140
+ schemas_path: "spec/schemas",
141
+ logger: logger
142
+ )
143
+ end
144
+
145
+ it_behaves_like "encoding and decoding with the schema from schema store"
146
+
147
+ it_behaves_like 'encoding and decoding with the schema from registry'
148
+
149
+ it_behaves_like 'encoding and decoding with the schema_id from registry'
150
+
151
+ it "uses the provided registry" do
152
+ allow(registry).to receive(:register).and_call_original
153
+ message = { "full_name" => "John Doe" }
154
+ avro.encode(message, schema_name: "person")
155
+ expect(registry).to have_received(:register).with("person", anything)
156
+ end
157
+
158
+ it "allows specifying a schema registry subject" do
159
+ allow(registry).to receive(:register).and_call_original
160
+ message = { "full_name" => "John Doe" }
161
+ avro.encode(message, schema_name: "person", subject: "people")
162
+ expect(registry).to have_received(:register).with("people", anything)
163
+ end
164
+ end
165
+
166
+ context "with a provided schema store" do
167
+ let(:schema_store) { AvroTurf::SchemaStore.new(path: "spec/schemas") }
168
+
169
+ let(:avro) do
170
+ AvroTurf::Messaging.new(
171
+ registry_url: registry_url,
172
+ schema_store: schema_store,
173
+ logger: logger
174
+ )
175
+ end
176
+
177
+ it_behaves_like "encoding and decoding with the schema from schema store"
178
+
179
+ it "uses the provided schema store" do
180
+ allow(schema_store).to receive(:find).and_call_original
181
+ avro.encode(message, schema_name: "person")
182
+ expect(schema_store).to have_received(:find).with("person", nil)
183
+ end
184
+ end
185
+
186
+ describe 'decoding with #decode_message' do
187
+ shared_examples_for "encoding and decoding with the schema from schema store" do
188
+ it "encodes and decodes messages" do
189
+ data = avro.encode(message, schema_name: "person")
190
+ result = avro.decode_message(data)
191
+ expect(result.message).to eq message
192
+ expect(result.schema_id).to eq 0
193
+ expect(result.writer_schema).to eq schema
194
+ expect(result.reader_schema).to eq nil
195
+ end
196
+
197
+ it "allows specifying a reader's schema" do
198
+ data = avro.encode(message, schema_name: "person")
199
+ result = avro.decode_message(data, schema_name: "person")
200
+ expect(result.message).to eq message
201
+ expect(result.writer_schema).to eq schema
202
+ expect(result.reader_schema).to eq schema
203
+ end
204
+
205
+ it "caches parsed schemas for decoding" do
206
+ data = avro.encode(message, schema_name: "person")
207
+ avro.decode_message(data)
208
+ allow(Avro::Schema).to receive(:parse).and_call_original
209
+ expect(avro.decode_message(data).message).to eq message
210
+ expect(Avro::Schema).not_to have_received(:parse)
211
+ end
212
+ end
213
+
214
+ shared_examples_for 'encoding and decoding with the schema from registry' do
215
+ before do
216
+ registry = AvroTurf::ConfluentSchemaRegistry.new(registry_url, logger: logger)
217
+ registry.register('person', schema)
218
+ registry.register('people', schema)
219
+ end
220
+
221
+ it 'encodes and decodes messages' do
222
+ data = avro.encode(message, subject: 'person', version: 1)
223
+ result = avro.decode_message(data)
224
+ expect(result.message).to eq message
225
+ expect(result.schema_id).to eq 0
226
+ end
227
+
228
+ it "allows specifying a reader's schema by subject and version" do
229
+ data = avro.encode(message, subject: 'person', version: 1)
230
+ expect(avro.decode_message(data, schema_name: 'person').message).to eq message
231
+ end
232
+
233
+ it 'raises AvroTurf::SchemaNotFoundError when the schema does not exist on registry' do
234
+ expect { avro.encode(message, subject: 'missing', version: 1) }.to raise_error(AvroTurf::SchemaNotFoundError)
235
+ end
236
+
237
+ it 'caches parsed schemas for decoding' do
238
+ data = avro.encode(message, subject: 'person', version: 1)
239
+ avro.decode_message(data)
240
+ allow(Avro::Schema).to receive(:parse).and_call_original
241
+ expect(avro.decode_message(data).message).to eq message
242
+ expect(Avro::Schema).not_to have_received(:parse)
243
+ end
244
+ end
245
+
246
+ it_behaves_like "encoding and decoding with the schema from schema store"
247
+
248
+ it_behaves_like 'encoding and decoding with the schema from registry'
249
+
250
+ context "with a provided registry" do
251
+ let(:registry) { AvroTurf::ConfluentSchemaRegistry.new(registry_url, logger: logger) }
252
+
253
+ let(:avro) do
254
+ AvroTurf::Messaging.new(
255
+ registry: registry,
256
+ schemas_path: "spec/schemas",
257
+ logger: logger
258
+ )
259
+ end
260
+
261
+ it_behaves_like "encoding and decoding with the schema from schema store"
262
+
263
+ it_behaves_like 'encoding and decoding with the schema from registry'
264
+
265
+ it "uses the provided registry" do
266
+ allow(registry).to receive(:register).and_call_original
267
+ message = { "full_name" => "John Doe" }
268
+ avro.encode(message, schema_name: "person")
269
+ expect(registry).to have_received(:register).with("person", anything)
270
+ end
271
+
272
+ it "allows specifying a schema registry subject" do
273
+ allow(registry).to receive(:register).and_call_original
274
+ message = { "full_name" => "John Doe" }
275
+ avro.encode(message, schema_name: "person", subject: "people")
276
+ expect(registry).to have_received(:register).with("people", anything)
277
+ end
278
+ end
279
+
280
+ context "with a provided schema store" do
281
+ let(:schema_store) { AvroTurf::SchemaStore.new(path: "spec/schemas") }
282
+
283
+ let(:avro) do
284
+ AvroTurf::Messaging.new(
285
+ registry_url: registry_url,
286
+ schema_store: schema_store,
287
+ logger: logger
288
+ )
289
+ end
290
+
291
+ it_behaves_like "encoding and decoding with the schema from schema store"
292
+
293
+ it "uses the provided schema store" do
294
+ allow(schema_store).to receive(:find).and_call_original
295
+ avro.encode(message, schema_name: "person")
296
+ expect(schema_store).to have_received(:find).with("person", nil)
297
+ end
298
+ end
299
+ end
300
+ end
data/spec/schema_store_spec.rb ADDED
@@ -0,0 +1,289 @@
1
+ require 'avro_turf/schema_store'
2
+
3
+ describe AvroTurf::SchemaStore do
4
+ let(:store) { AvroTurf::SchemaStore.new(path: "spec/schemas") }
5
+
6
+ before do
7
+ FileUtils.mkdir_p("spec/schemas")
8
+ end
9
+
10
+ describe "#find" do
11
+ it "finds schemas on the file system" do
12
+ define_schema "message.avsc", <<-AVSC
13
+ {
14
+ "name": "message",
15
+ "type": "record",
16
+ "fields": [
17
+ {
18
+ "type": "string",
19
+ "name": "message"
20
+ }
21
+ ]
22
+ }
23
+ AVSC
24
+
25
+ schema = store.find("message")
26
+ expect(schema.fullname).to eq "message"
27
+ end
28
+
29
+ it "resolves missing references" do
30
+ define_schema "person.avsc", <<-AVSC
31
+ {
32
+ "name": "person",
33
+ "type": "record",
34
+ "fields": [
35
+ {
36
+ "name": "address",
37
+ "type": "address"
38
+ }
39
+ ]
40
+ }
41
+ AVSC
42
+
43
+ define_schema "address.avsc", <<-AVSC
44
+ {
45
+ "type": "record",
46
+ "name": "address",
47
+ "fields": []
48
+ }
49
+ AVSC
50
+
51
+ schema = store.find("person")
52
+
53
+ expect(schema.fullname).to eq "person"
54
+ end
55
+
56
+ it "finds namespaced schemas" do
57
+ FileUtils.mkdir_p("spec/schemas/test/people")
58
+
59
+ define_schema "test/people/person.avsc", <<-AVSC
60
+ {
61
+ "name": "person",
62
+ "namespace": "test.people",
63
+ "type": "record",
64
+ "fields": [
65
+ {
66
+ "name": "address",
67
+ "type": "test.people.address"
68
+ }
69
+ ]
70
+ }
71
+ AVSC
72
+
73
+ define_schema "test/people/address.avsc", <<-AVSC
74
+ {
75
+ "name": "address",
76
+ "namespace": "test.people",
77
+ "type": "record",
78
+ "fields": []
79
+ }
80
+ AVSC
81
+
82
+ schema = store.find("person", "test.people")
83
+
84
+ expect(schema.fullname).to eq "test.people.person"
85
+ end
86
+
87
+ it "ignores the namespace when the name contains a dot" do
88
+ FileUtils.mkdir_p("spec/schemas/test/acme")
89
+
90
+ define_schema "test/acme/message.avsc", <<-AVSC
91
+ {
92
+ "name": "message",
93
+ "namespace": "test.acme",
94
+ "type": "record",
95
+ "fields": []
96
+ }
97
+ AVSC
98
+
99
+ schema = store.find("test.acme.message", "test.yolo")
100
+
101
+ expect(schema.fullname).to eq "test.acme.message"
102
+ end
103
+
104
+ it "raises AvroTurf::SchemaNotFoundError if there's no schema file matching the name" do
105
+ expect {
106
+ store.find("not_there")
107
+ }.to raise_error(AvroTurf::SchemaNotFoundError, "could not find Avro schema at `spec/schemas/not_there.avsc'")
108
+ end
109
+
110
+ it "raises AvroTurf::SchemaNotFoundError if a type reference cannot be resolved" do
111
+ define_schema "person.avsc", <<-AVSC
112
+ {
113
+ "name": "person",
114
+ "type": "record",
115
+ "fields": [
116
+ {
117
+ "name": "address",
118
+ "type": "address"
119
+ }
120
+ ]
121
+ }
122
+ AVSC
123
+
124
+ expect {
125
+ store.find("person")
126
+ }.to raise_exception(AvroTurf::SchemaNotFoundError)
127
+ end
128
+
129
+ it "raises AvroTurf::SchemaError if the schema's namespace doesn't match the file location" do
130
+ FileUtils.mkdir_p("spec/schemas/test/people")
131
+
132
+ define_schema "test/people/person.avsc", <<-AVSC
133
+ {
134
+ "name": "person",
135
+ "namespace": "yoyoyo.nanana",
136
+ "type": "record",
137
+ "fields": []
138
+ }
139
+ AVSC
140
+
141
+ expect {
142
+ store.find("test.people.person")
143
+ }.to raise_error(AvroTurf::SchemaError, "expected schema `spec/schemas/test/people/person.avsc' to define type `test.people.person'")
144
+ end
145
+
146
+ it "handles circular dependencies" do
147
+ define_schema "a.avsc", <<-AVSC
148
+ {
149
+ "name": "a",
150
+ "type": "record",
151
+ "fields": [
152
+ {
153
+ "type": "b",
154
+ "name": "b"
155
+ }
156
+ ]
157
+ }
158
+ AVSC
159
+
160
+ define_schema "b.avsc", <<-AVSC
161
+ {
162
+ "name": "b",
163
+ "type": "record",
164
+ "fields": [
165
+ {
166
+ "type": "a",
167
+ "name": "a"
168
+ }
169
+ ]
170
+ }
171
+ AVSC
172
+
173
+ schema = store.find("a")
174
+ expect(schema.fullname).to eq "a"
175
+ end
176
+
177
+ it "caches schemas in memory" do
178
+ define_schema "person.avsc", <<-AVSC
179
+ {
180
+ "name": "person",
181
+ "type": "record",
182
+ "fields": [
183
+ {
184
+ "type": "string",
185
+ "name": "full_name"
186
+ }
187
+ ]
188
+ }
189
+ AVSC
190
+
191
+ # Warm the schema cache.
192
+ store.find("person")
193
+
194
+ # Force a failure if the schema file is read again.
195
+ FileUtils.rm("spec/schemas/person.avsc")
196
+
197
+ schema = store.find("person")
198
+ expect(schema.fullname).to eq "person"
199
+ end
200
+
201
+ it "is thread safe" do
202
+ define_schema "address.avsc", <<-AVSC
203
+ {
204
+ "type": "record",
205
+ "name": "address",
206
+ "fields": []
207
+ }
208
+ AVSC
209
+
210
+ # Set a Thread breakpoint right in the core place of race condition
211
+ expect(Avro::Name)
212
+ .to receive(:add_name)
213
+ .and_wrap_original { |m, *args|
214
+ Thread.stop
215
+ m.call(*args)
216
+ }
217
+
218
+ # Run two concurring threads which both will trigger the same schema loading
219
+ threads = 2.times.map { Thread.new { store.find("address") } }
220
+ # Wait for the moment when both threads will reach the breakpoint
221
+ sleep 0.001 until threads.all?(&:stop?)
222
+
223
+ expect {
224
+ # Resume the threads evaluation, one after one
225
+ threads.each do |thread|
226
+ next unless thread.status == 'sleep'
227
+
228
+ thread.run
229
+ sleep 0.001 until thread.stop?
230
+ end
231
+
232
+ # Ensure that threads are finished
233
+ threads.each(&:join)
234
+ }.to_not raise_error
235
+ end
236
+ end
237
+
238
+ describe "#load_schemas!" do
239
+ it "loads schemas defined in the `schemas_path` directory" do
240
+ define_schema "person.avsc", <<-AVSC
241
+ {
242
+ "name": "person",
243
+ "type": "record",
244
+ "fields": [
245
+ {
246
+ "type": "string",
247
+ "name": "full_name"
248
+ }
249
+ ]
250
+ }
251
+ AVSC
252
+
253
+ # Warm the schema cache.
254
+ store.load_schemas!
255
+
256
+ # Force a failure if the schema file is read again.
257
+ FileUtils.rm("spec/schemas/person.avsc")
258
+
259
+ schema = store.find("person")
260
+ expect(schema.fullname).to eq "person"
261
+ end
262
+
263
+ it "recursively finds schema definitions in subdirectories" do
264
+ FileUtils.mkdir_p("spec/schemas/foo/bar")
265
+
266
+ define_schema "foo/bar/person.avsc", <<-AVSC
267
+ {
268
+ "name": "foo.bar.person",
269
+ "type": "record",
270
+ "fields": [
271
+ {
272
+ "type": "string",
273
+ "name": "full_name"
274
+ }
275
+ ]
276
+ }
277
+ AVSC
278
+
279
+ # Warm the schema cache.
280
+ store.load_schemas!
281
+
282
+ # Force a failure if the schema file is read again.
283
+ FileUtils.rm("spec/schemas/foo/bar/person.avsc")
284
+
285
+ schema = store.find("foo.bar.person")
286
+ expect(schema.fullname).to eq "foo.bar.person"
287
+ end
288
+ end
289
+ end
data/spec/schema_to_avro_patch_spec.rb ADDED
@@ -0,0 +1,66 @@
1
+ require 'webmock/rspec'
2
+
3
+ # This spec verifies the monkey-patch that we have to apply until the avro
4
+ # gem releases a fix for bug AVRO-1848:
5
+ # https://issues.apache.org/jira/browse/AVRO-1848
6
+
7
+ describe Avro::Schema do
8
+ it "correctly handles falsey field defaults" do
9
+ schema = Avro::Schema.parse <<-SCHEMA
10
+ {"type": "record", "name": "Record", "namespace": "my.name.space",
11
+ "fields": [
12
+ {"name": "is_usable", "type": "boolean", "default": false}
13
+ ]
14
+ }
15
+ SCHEMA
16
+
17
+ expect(schema.to_avro).to eq({
18
+ 'type' => 'record', 'name' => 'Record', 'namespace' => 'my.name.space',
19
+ 'fields' => [
20
+ {'name' => 'is_usable', 'type' => 'boolean', 'default' => false}
21
+ ]
22
+ })
23
+ end
24
+ end
25
+
26
+
27
+ describe Avro::IO::DatumReader do
28
+ let(:writer_schema) do
29
+ Avro::Schema.parse <<-AVSC
30
+ {
31
+ "name": "no_default",
32
+ "type": "record",
33
+ "fields": [
34
+ { "type": "string", "name": "one" }
35
+ ]
36
+ }
37
+ AVSC
38
+ end
39
+ let(:reader_schema) do
40
+ Avro::Schema.parse <<-AVSC
41
+ {
42
+ "name": "no_default",
43
+ "type": "record",
44
+ "fields": [
45
+ { "type": "string", "name": "one" },
46
+ { "type": "string", "name": "two" }
47
+ ]
48
+ }
49
+ AVSC
50
+ end
51
+
52
+ it "raises an error for missing fields without a default" do
53
+ stream = StringIO.new
54
+ writer = Avro::IO::DatumWriter.new(writer_schema)
55
+ encoder = Avro::IO::BinaryEncoder.new(stream)
56
+ writer.write({ 'one' => 'first' }, encoder)
57
+ encoded = stream.string
58
+
59
+ stream = StringIO.new(encoded)
60
+ decoder = Avro::IO::BinaryDecoder.new(stream)
61
+ reader = Avro::IO::DatumReader.new(writer_schema, reader_schema)
62
+ expect do
63
+ reader.read(decoder)
64
+ end.to raise_error(Avro::AvroError, 'Missing data for "string" with no default')
65
+ end
66
+ end
data/spec/spec_helper.rb ADDED
@@ -0,0 +1,28 @@
1
+ require 'bundler/setup'
2
+ require 'logger'
3
+ require 'json_spec'
4
+ require 'fakefs/spec_helpers'
5
+ require 'avro_turf'
6
+
7
+ Dir["#{File.dirname(__FILE__)}/support/**/*.rb"].each { |f| require f }
8
+
9
+ module Helpers
10
+ def define_schema(path, content)
11
+ File.open(File.join("spec/schemas", path), "w") do |f|
12
+ f.write(content)
13
+ end
14
+ end
15
+
16
+ def store_cache(path, hash)
17
+ File.write(File.join("spec/cache", path), JSON.generate(hash))
18
+ end
19
+
20
+ def load_cache(path)
21
+ JSON.parse(File.read(File.join("spec/cache", path)))
22
+ end
23
+ end
24
+
25
+ RSpec.configure do |config|
26
+ config.include FakeFS::SpecHelpers
27
+ config.include Helpers
28
+ end
data/spec/support/confluent_schema_registry_context.rb ADDED
@@ -0,0 +1,254 @@
1
+ # This shared example expects a registry variable to be defined
2
+ # with an instance of the registry class being tested.
3
+ shared_examples_for "a confluent schema registry client" do
4
+ let(:logger) { Logger.new(StringIO.new) }
5
+ let(:registry_url) { "http://registry.example.com" }
6
+ let(:subject_name) { "some-subject" }
7
+ let(:schema) do
8
+ {
9
+ type: "record",
10
+ name: "person",
11
+ fields: [
12
+ { name: "name", type: "string" }
13
+ ]
14
+ }.to_json
15
+ end
16
+
17
+ before do
18
+ stub_request(:any, /^#{registry_url}/).to_rack(FakeConfluentSchemaRegistryServer)
19
+ FakeConfluentSchemaRegistryServer.clear
20
+ end
21
+
22
+ describe "#register and #fetch" do
23
+ it "allows registering a schema" do
24
+ id = regis