Is there a better way to load a huge tar file in Spark while avoiding OutOfMemoryError?











up vote
1
down vote

favorite












I have a single tar file mytar.tar that is 40 GB in size. Inside this tar file are 500 tar.gz files, and inside each one of these tar.gz files are a bunch of JSON files. I have written up the code to process this single tar file and attempt to get the list of JSON string contents. My code looks like the following.



val isRdd = sc.binaryFiles("/mnt/mytar.tar")
.flatMap(t => {
val buf = scala.collection.mutable.ListBuffer.empty[TarArchiveInputStream]
val stream = t._2
val is = new TarArchiveInputStream(stream.open())
var entry = is.getNextTarEntry()
while (entry != null) {
val name = entry.getName()
val size = entry.getSize.toInt

if (entry.isFile() && size > -1) {
val content = new Array[Byte](size)
is.read(content, 0, content.length)

val tgIs = new TarArchiveInputStream(new GzipCompressorInputStream(new ByteArrayInputStream(content)))
buf += tgIs
}
entry = is.getNextTarEntry()
}
buf.toList
})
.cache

val byteRdd = isRdd.flatMap(is => {
val buf = scala.collection.mutable.ListBuffer.empty[Array[Byte]]
var entry = is.getNextTarEntry()
while (entry != null) {
val name = entry.getName()
val size = entry.getSize.toInt

if (entry.isFile() && name.endsWith(".json") && size > -1) {
val data = new Array[Byte](size)
is.read(data, 0, data.length)
buf += data
}
entry = is.getNextTarEntry()
}
buf.toList
})
.cache

val jsonRdd = byteRdd
.map(arr => getJson(arr))
.filter(_.length > 0)
.cache

jsonRdd.count //action just to execute the code


When I execute this code, I get an OutOfMemoryError (OOME).




org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 24.0 failed 4 times, most recent failure:
Lost task 0.3 in stage 24.0 (TID 137, 10.162.224.171, executor 13):
java.lang.OutOfMemoryError: Java heap space


My EC2 cluster has 1 driver and 2 worker nodes of type i3.xlarge (30.5 GB Memory, 4 Cores). From looking at the logs and just thinking about it, I believe the OOME happens during the creation of the isRDD (input stream RDD).



Is there anything else in the code or creation of my Spark cluster that I can do to mitigate this problem? Should I select an EC2 instance with more memory (e.g. a memory optimized instance like R5.2xlarge)? FWIW, I upgraded to an R5.2xlarge cluster setting and still saw the OOME.



One thing I have thought about doing was to untar mytar.tar and instead start with the .tar.gz files inside. I am thinking that each .tar.gz inside the tar file will have to be less than 30 GB to avoid the OOME (on the i3.xlarge).



Any tips or advice is appreciated.










share|improve this question
























  • I would suggest to first separate out the 500 tar.gz files inside the 40GB file and write them to disk. Then process the 500 files as a RDD. It seems all the major work would be happening on master. Also you can try to find out the uncompressed size of the whole file. I think the compression ratio might be too large.
    – user238607
    Nov 10 at 12:25















up vote
1
down vote

favorite












I have a single tar file mytar.tar that is 40 GB in size. Inside this tar file are 500 tar.gz files, and inside each one of these tar.gz files are a bunch of JSON files. I have written up the code to process this single tar file and attempt to get the list of JSON string contents. My code looks like the following.



val isRdd = sc.binaryFiles("/mnt/mytar.tar")
.flatMap(t => {
val buf = scala.collection.mutable.ListBuffer.empty[TarArchiveInputStream]
val stream = t._2
val is = new TarArchiveInputStream(stream.open())
var entry = is.getNextTarEntry()
while (entry != null) {
val name = entry.getName()
val size = entry.getSize.toInt

if (entry.isFile() && size > -1) {
val content = new Array[Byte](size)
is.read(content, 0, content.length)

val tgIs = new TarArchiveInputStream(new GzipCompressorInputStream(new ByteArrayInputStream(content)))
buf += tgIs
}
entry = is.getNextTarEntry()
}
buf.toList
})
.cache

val byteRdd = isRdd.flatMap(is => {
val buf = scala.collection.mutable.ListBuffer.empty[Array[Byte]]
var entry = is.getNextTarEntry()
while (entry != null) {
val name = entry.getName()
val size = entry.getSize.toInt

if (entry.isFile() && name.endsWith(".json") && size > -1) {
val data = new Array[Byte](size)
is.read(data, 0, data.length)
buf += data
}
entry = is.getNextTarEntry()
}
buf.toList
})
.cache

val jsonRdd = byteRdd
.map(arr => getJson(arr))
.filter(_.length > 0)
.cache

jsonRdd.count //action just to execute the code


When I execute this code, I get an OutOfMemoryError (OOME).




org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 24.0 failed 4 times, most recent failure:
Lost task 0.3 in stage 24.0 (TID 137, 10.162.224.171, executor 13):
java.lang.OutOfMemoryError: Java heap space


My EC2 cluster has 1 driver and 2 worker nodes of type i3.xlarge (30.5 GB Memory, 4 Cores). From looking at the logs and just thinking about it, I believe the OOME happens during the creation of the isRDD (input stream RDD).



Is there anything else in the code or creation of my Spark cluster that I can do to mitigate this problem? Should I select an EC2 instance with more memory (e.g. a memory optimized instance like R5.2xlarge)? FWIW, I upgraded to an R5.2xlarge cluster setting and still saw the OOME.



One thing I have thought about doing was to untar mytar.tar and instead start with the .tar.gz files inside. I am thinking that each .tar.gz inside the tar file will have to be less than 30 GB to avoid the OOME (on the i3.xlarge).



Any tips or advice is appreciated.










share|improve this question
























  • I would suggest to first separate out the 500 tar.gz files inside the 40GB file and write them to disk. Then process the 500 files as a RDD. It seems all the major work would be happening on master. Also you can try to find out the uncompressed size of the whole file. I think the compression ratio might be too large.
    – user238607
    Nov 10 at 12:25













up vote
1
down vote

favorite









up vote
1
down vote

favorite











I have a single tar file mytar.tar that is 40 GB in size. Inside this tar file are 500 tar.gz files, and inside each one of these tar.gz files are a bunch of JSON files. I have written up the code to process this single tar file and attempt to get the list of JSON string contents. My code looks like the following.



val isRdd = sc.binaryFiles("/mnt/mytar.tar")
.flatMap(t => {
val buf = scala.collection.mutable.ListBuffer.empty[TarArchiveInputStream]
val stream = t._2
val is = new TarArchiveInputStream(stream.open())
var entry = is.getNextTarEntry()
while (entry != null) {
val name = entry.getName()
val size = entry.getSize.toInt

if (entry.isFile() && size > -1) {
val content = new Array[Byte](size)
is.read(content, 0, content.length)

val tgIs = new TarArchiveInputStream(new GzipCompressorInputStream(new ByteArrayInputStream(content)))
buf += tgIs
}
entry = is.getNextTarEntry()
}
buf.toList
})
.cache

val byteRdd = isRdd.flatMap(is => {
val buf = scala.collection.mutable.ListBuffer.empty[Array[Byte]]
var entry = is.getNextTarEntry()
while (entry != null) {
val name = entry.getName()
val size = entry.getSize.toInt

if (entry.isFile() && name.endsWith(".json") && size > -1) {
val data = new Array[Byte](size)
is.read(data, 0, data.length)
buf += data
}
entry = is.getNextTarEntry()
}
buf.toList
})
.cache

val jsonRdd = byteRdd
.map(arr => getJson(arr))
.filter(_.length > 0)
.cache

jsonRdd.count //action just to execute the code


When I execute this code, I get an OutOfMemoryError (OOME).




org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 24.0 failed 4 times, most recent failure:
Lost task 0.3 in stage 24.0 (TID 137, 10.162.224.171, executor 13):
java.lang.OutOfMemoryError: Java heap space


My EC2 cluster has 1 driver and 2 worker nodes of type i3.xlarge (30.5 GB Memory, 4 Cores). From looking at the logs and just thinking about it, I believe the OOME happens during the creation of the isRDD (input stream RDD).



Is there anything else in the code or creation of my Spark cluster that I can do to mitigate this problem? Should I select an EC2 instance with more memory (e.g. a memory optimized instance like R5.2xlarge)? FWIW, I upgraded to an R5.2xlarge cluster setting and still saw the OOME.



One thing I have thought about doing was to untar mytar.tar and instead start with the .tar.gz files inside. I am thinking that each .tar.gz inside the tar file will have to be less than 30 GB to avoid the OOME (on the i3.xlarge).



Any tips or advice is appreciated.










share|improve this question















I have a single tar file mytar.tar that is 40 GB in size. Inside this tar file are 500 tar.gz files, and inside each one of these tar.gz files are a bunch of JSON files. I have written up the code to process this single tar file and attempt to get the list of JSON string contents. My code looks like the following.



val isRdd = sc.binaryFiles("/mnt/mytar.tar")
.flatMap(t => {
val buf = scala.collection.mutable.ListBuffer.empty[TarArchiveInputStream]
val stream = t._2
val is = new TarArchiveInputStream(stream.open())
var entry = is.getNextTarEntry()
while (entry != null) {
val name = entry.getName()
val size = entry.getSize.toInt

if (entry.isFile() && size > -1) {
val content = new Array[Byte](size)
is.read(content, 0, content.length)

val tgIs = new TarArchiveInputStream(new GzipCompressorInputStream(new ByteArrayInputStream(content)))
buf += tgIs
}
entry = is.getNextTarEntry()
}
buf.toList
})
.cache

val byteRdd = isRdd.flatMap(is => {
val buf = scala.collection.mutable.ListBuffer.empty[Array[Byte]]
var entry = is.getNextTarEntry()
while (entry != null) {
val name = entry.getName()
val size = entry.getSize.toInt

if (entry.isFile() && name.endsWith(".json") && size > -1) {
val data = new Array[Byte](size)
is.read(data, 0, data.length)
buf += data
}
entry = is.getNextTarEntry()
}
buf.toList
})
.cache

val jsonRdd = byteRdd
.map(arr => getJson(arr))
.filter(_.length > 0)
.cache

jsonRdd.count //action just to execute the code


When I execute this code, I get an OutOfMemoryError (OOME).




org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 24.0 failed 4 times, most recent failure:
Lost task 0.3 in stage 24.0 (TID 137, 10.162.224.171, executor 13):
java.lang.OutOfMemoryError: Java heap space


My EC2 cluster has 1 driver and 2 worker nodes of type i3.xlarge (30.5 GB Memory, 4 Cores). From looking at the logs and just thinking about it, I believe the OOME happens during the creation of the isRDD (input stream RDD).



Is there anything else in the code or creation of my Spark cluster that I can do to mitigate this problem? Should I select an EC2 instance with more memory (e.g. a memory optimized instance like R5.2xlarge)? FWIW, I upgraded to an R5.2xlarge cluster setting and still saw the OOME.



One thing I have thought about doing was to untar mytar.tar and instead start with the .tar.gz files inside. I am thinking that each .tar.gz inside the tar file will have to be less than 30 GB to avoid the OOME (on the i3.xlarge).



Any tips or advice is appreciated.







apache-spark tar






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 9 at 23:53

























asked Nov 9 at 22:56









Jane Wayne

2,68653164




2,68653164












  • I would suggest to first separate out the 500 tar.gz files inside the 40GB file and write them to disk. Then process the 500 files as a RDD. It seems all the major work would be happening on master. Also you can try to find out the uncompressed size of the whole file. I think the compression ratio might be too large.
    – user238607
    Nov 10 at 12:25


















  • I would suggest to first separate out the 500 tar.gz files inside the 40GB file and write them to disk. Then process the 500 files as a RDD. It seems all the major work would be happening on master. Also you can try to find out the uncompressed size of the whole file. I think the compression ratio might be too large.
    – user238607
    Nov 10 at 12:25
















I would suggest to first separate out the 500 tar.gz files inside the 40GB file and write them to disk. Then process the 500 files as a RDD. It seems all the major work would be happening on master. Also you can try to find out the uncompressed size of the whole file. I think the compression ratio might be too large.
– user238607
Nov 10 at 12:25




I would suggest to first separate out the 500 tar.gz files inside the 40GB file and write them to disk. Then process the 500 files as a RDD. It seems all the major work would be happening on master. Also you can try to find out the uncompressed size of the whole file. I think the compression ratio might be too large.
– user238607
Nov 10 at 12:25

















active

oldest

votes











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53234307%2fis-there-a-better-way-to-load-a-huge-tar-file-in-spark-while-avoiding-outofmemor%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown






























active

oldest

votes













active

oldest

votes









active

oldest

votes






active

oldest

votes
















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.





Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


Please pay close attention to the following guidance:


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53234307%2fis-there-a-better-way-to-load-a-huge-tar-file-in-spark-while-avoiding-outofmemor%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

Schultheiß

Verwaltungsgliederung Dänemarks

Liste der Kulturdenkmale in Wilsdruff