Ingesting unique records in Kafka-Spark Streaming











up vote
1
down vote

favorite












I have a Kafka topic getting 10K events per min and a Spark Streaming 2.3 consumer in scala written to receive and ingest into Cassandra. Incoming events are JSON having an 'userid' field among others. However if an event with the same userid comes along again (even with a different message body) still I don't want that to be ingested into Cassandra. The Cassandra table to growing every minute and day so doing a lookup of all userids encountered till this point by retrieving the table into an in-memory spark dataframe is impossible as the table will be becoming huge. How can I best ingest only unique records?



Can updateStateByKey work? And how long can state be maintained? Because if the same userid comes after one year, i don't want to ingest it into Cassandra.










share|improve this question




























    up vote
    1
    down vote

    favorite












    I have a Kafka topic getting 10K events per min and a Spark Streaming 2.3 consumer in scala written to receive and ingest into Cassandra. Incoming events are JSON having an 'userid' field among others. However if an event with the same userid comes along again (even with a different message body) still I don't want that to be ingested into Cassandra. The Cassandra table to growing every minute and day so doing a lookup of all userids encountered till this point by retrieving the table into an in-memory spark dataframe is impossible as the table will be becoming huge. How can I best ingest only unique records?



    Can updateStateByKey work? And how long can state be maintained? Because if the same userid comes after one year, i don't want to ingest it into Cassandra.










    share|improve this question


























      up vote
      1
      down vote

      favorite









      up vote
      1
      down vote

      favorite











      I have a Kafka topic getting 10K events per min and a Spark Streaming 2.3 consumer in scala written to receive and ingest into Cassandra. Incoming events are JSON having an 'userid' field among others. However if an event with the same userid comes along again (even with a different message body) still I don't want that to be ingested into Cassandra. The Cassandra table to growing every minute and day so doing a lookup of all userids encountered till this point by retrieving the table into an in-memory spark dataframe is impossible as the table will be becoming huge. How can I best ingest only unique records?



      Can updateStateByKey work? And how long can state be maintained? Because if the same userid comes after one year, i don't want to ingest it into Cassandra.










      share|improve this question















      I have a Kafka topic getting 10K events per min and a Spark Streaming 2.3 consumer in scala written to receive and ingest into Cassandra. Incoming events are JSON having an 'userid' field among others. However if an event with the same userid comes along again (even with a different message body) still I don't want that to be ingested into Cassandra. The Cassandra table to growing every minute and day so doing a lookup of all userids encountered till this point by retrieving the table into an in-memory spark dataframe is impossible as the table will be becoming huge. How can I best ingest only unique records?



      Can updateStateByKey work? And how long can state be maintained? Because if the same userid comes after one year, i don't want to ingest it into Cassandra.







      scala cassandra apache-kafka spark-streaming






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 11 at 23:45

























      asked Nov 8 at 19:26









      Steven Park

      698




      698
























          1 Answer
          1






          active

          oldest

          votes

















          up vote
          0
          down vote













          Use an external low latency external DB like Aerospike or if the rate of duplicates is low you can use an in-memory bloom/cuckoo filter (that is ~4GB for 1 year @ 10K per min rate) with rechecking of matches through Cassandra to do not discard events in case of false positives.






          share|improve this answer





















          • will the bloom filter then contain all the userids seen till date? So with every streaming interval the bloomfilter keeps increasing to append to itself the new ids encountered?
            – Steven Park
            Nov 11 at 22:17











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














           

          draft saved


          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53214811%2fingesting-unique-records-in-kafka-spark-streaming%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes








          up vote
          0
          down vote













          Use an external low latency external DB like Aerospike or if the rate of duplicates is low you can use an in-memory bloom/cuckoo filter (that is ~4GB for 1 year @ 10K per min rate) with rechecking of matches through Cassandra to do not discard events in case of false positives.






          share|improve this answer





















          • will the bloom filter then contain all the userids seen till date? So with every streaming interval the bloomfilter keeps increasing to append to itself the new ids encountered?
            – Steven Park
            Nov 11 at 22:17















          up vote
          0
          down vote













          Use an external low latency external DB like Aerospike or if the rate of duplicates is low you can use an in-memory bloom/cuckoo filter (that is ~4GB for 1 year @ 10K per min rate) with rechecking of matches through Cassandra to do not discard events in case of false positives.






          share|improve this answer





















          • will the bloom filter then contain all the userids seen till date? So with every streaming interval the bloomfilter keeps increasing to append to itself the new ids encountered?
            – Steven Park
            Nov 11 at 22:17













          up vote
          0
          down vote










          up vote
          0
          down vote









          Use an external low latency external DB like Aerospike or if the rate of duplicates is low you can use an in-memory bloom/cuckoo filter (that is ~4GB for 1 year @ 10K per min rate) with rechecking of matches through Cassandra to do not discard events in case of false positives.






          share|improve this answer












          Use an external low latency external DB like Aerospike or if the rate of duplicates is low you can use an in-memory bloom/cuckoo filter (that is ~4GB for 1 year @ 10K per min rate) with rechecking of matches through Cassandra to do not discard events in case of false positives.







          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Nov 8 at 21:23









          Andriy Plokhotnyuk

          5,5933040




          5,5933040












          • will the bloom filter then contain all the userids seen till date? So with every streaming interval the bloomfilter keeps increasing to append to itself the new ids encountered?
            – Steven Park
            Nov 11 at 22:17


















          • will the bloom filter then contain all the userids seen till date? So with every streaming interval the bloomfilter keeps increasing to append to itself the new ids encountered?
            – Steven Park
            Nov 11 at 22:17
















          will the bloom filter then contain all the userids seen till date? So with every streaming interval the bloomfilter keeps increasing to append to itself the new ids encountered?
          – Steven Park
          Nov 11 at 22:17




          will the bloom filter then contain all the userids seen till date? So with every streaming interval the bloomfilter keeps increasing to append to itself the new ids encountered?
          – Steven Park
          Nov 11 at 22:17


















           

          draft saved


          draft discarded



















































           


          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53214811%2fingesting-unique-records-in-kafka-spark-streaming%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Schultheiß

          Verwaltungsgliederung Dänemarks

          Liste der Kulturdenkmale in Wilsdruff