Avoid specifying schema twice (Spark/scala)











up vote
-1
down vote

favorite












I need to iterate over data frame in specific order and apply some complex logic to calculate new column.



Also my strong preference is to do it in generic way so I do not have to list all columns of a row and do df.as[my_record] or case Row(...) => as shown here. Instead, I want to access row columns by their names and just add result column(s) to source row.



Below approach works just fine but I'd like to avoid specifying schema twice: first time so that I can access columns by name while iterating and second time to process output.



import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema

val q = """
select 2 part, 1 id
union all select 2 part, 4 id
union all select 2 part, 3 id
union all select 2 part, 2 id
"""
val df = spark.sql(q)

def f_row(iter: Iterator[Row]) : Iterator[Row] = {
if (iter.hasNext) {
def complex_logic(p: Int): Integer = if (p == 3) null else p * 10;

val head = iter.next
val schema = StructType(head.schema.fields :+ StructField("result", IntegerType))
val r =
new GenericRowWithSchema((head.toSeq :+ complex_logic(head.getAs("id"))).toArray, schema)

iter.scanLeft(r)((r1, r2) =>
new GenericRowWithSchema((r2.toSeq :+ complex_logic(r2.getAs("id"))).toArray, schema)
)
} else iter
}

val schema = StructType(df.schema.fields :+ StructField("result", IntegerType))
val encoder = RowEncoder(schema)
df.repartition($"part").sortWithinPartitions($"id").mapPartitions(f_row)(encoder).show


What information is lost after applying mapPartitions so output cannot be processed without explicit encoder? How to avoid specifying it?



Update



Well, maybe it's tricky to avoid because result is produced in executor(s) and driver (which displays output) does not know about result structure, but still I'm not convinced that it's impossible to avoid.










share|improve this question
























  • Can you use Dataset and provide a function that maps from T => U?
    – Terry Dactyl
    Nov 8 at 13:00










  • @TerryDactyl, can you elaborate a bit more? I use df.repartition($"part").sortWithinPartitions($"id") which is org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] and I supply f_row(iter: Iterator[Row]) : Iterator[Row] for mapPartitions.
    – Dr Y Wit
    Nov 8 at 13:09












  • A DataFrame is a Dataset[Row] and as discussed below Row is untyped. If you were to provide case classes T and U which correspond to the input and outputs shapes of your map function Iterator[T] => Iterator[U] and import spark.implicits._ then Spark may be able to provide an Encoder and you would effectively map from Dataset[T] => Dataset[U]. This is just a guess but may be worth a try.
    – Terry Dactyl
    Nov 8 at 13:16












  • See below......
    – Terry Dactyl
    Nov 8 at 14:13















up vote
-1
down vote

favorite












I need to iterate over data frame in specific order and apply some complex logic to calculate new column.



Also my strong preference is to do it in generic way so I do not have to list all columns of a row and do df.as[my_record] or case Row(...) => as shown here. Instead, I want to access row columns by their names and just add result column(s) to source row.



Below approach works just fine but I'd like to avoid specifying schema twice: first time so that I can access columns by name while iterating and second time to process output.



import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema

val q = """
select 2 part, 1 id
union all select 2 part, 4 id
union all select 2 part, 3 id
union all select 2 part, 2 id
"""
val df = spark.sql(q)

def f_row(iter: Iterator[Row]) : Iterator[Row] = {
if (iter.hasNext) {
def complex_logic(p: Int): Integer = if (p == 3) null else p * 10;

val head = iter.next
val schema = StructType(head.schema.fields :+ StructField("result", IntegerType))
val r =
new GenericRowWithSchema((head.toSeq :+ complex_logic(head.getAs("id"))).toArray, schema)

iter.scanLeft(r)((r1, r2) =>
new GenericRowWithSchema((r2.toSeq :+ complex_logic(r2.getAs("id"))).toArray, schema)
)
} else iter
}

val schema = StructType(df.schema.fields :+ StructField("result", IntegerType))
val encoder = RowEncoder(schema)
df.repartition($"part").sortWithinPartitions($"id").mapPartitions(f_row)(encoder).show


What information is lost after applying mapPartitions so output cannot be processed without explicit encoder? How to avoid specifying it?



Update



Well, maybe it's tricky to avoid because result is produced in executor(s) and driver (which displays output) does not know about result structure, but still I'm not convinced that it's impossible to avoid.










share|improve this question
























  • Can you use Dataset and provide a function that maps from T => U?
    – Terry Dactyl
    Nov 8 at 13:00










  • @TerryDactyl, can you elaborate a bit more? I use df.repartition($"part").sortWithinPartitions($"id") which is org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] and I supply f_row(iter: Iterator[Row]) : Iterator[Row] for mapPartitions.
    – Dr Y Wit
    Nov 8 at 13:09












  • A DataFrame is a Dataset[Row] and as discussed below Row is untyped. If you were to provide case classes T and U which correspond to the input and outputs shapes of your map function Iterator[T] => Iterator[U] and import spark.implicits._ then Spark may be able to provide an Encoder and you would effectively map from Dataset[T] => Dataset[U]. This is just a guess but may be worth a try.
    – Terry Dactyl
    Nov 8 at 13:16












  • See below......
    – Terry Dactyl
    Nov 8 at 14:13













up vote
-1
down vote

favorite









up vote
-1
down vote

favorite











I need to iterate over data frame in specific order and apply some complex logic to calculate new column.



Also my strong preference is to do it in generic way so I do not have to list all columns of a row and do df.as[my_record] or case Row(...) => as shown here. Instead, I want to access row columns by their names and just add result column(s) to source row.



Below approach works just fine but I'd like to avoid specifying schema twice: first time so that I can access columns by name while iterating and second time to process output.



import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema

val q = """
select 2 part, 1 id
union all select 2 part, 4 id
union all select 2 part, 3 id
union all select 2 part, 2 id
"""
val df = spark.sql(q)

def f_row(iter: Iterator[Row]) : Iterator[Row] = {
if (iter.hasNext) {
def complex_logic(p: Int): Integer = if (p == 3) null else p * 10;

val head = iter.next
val schema = StructType(head.schema.fields :+ StructField("result", IntegerType))
val r =
new GenericRowWithSchema((head.toSeq :+ complex_logic(head.getAs("id"))).toArray, schema)

iter.scanLeft(r)((r1, r2) =>
new GenericRowWithSchema((r2.toSeq :+ complex_logic(r2.getAs("id"))).toArray, schema)
)
} else iter
}

val schema = StructType(df.schema.fields :+ StructField("result", IntegerType))
val encoder = RowEncoder(schema)
df.repartition($"part").sortWithinPartitions($"id").mapPartitions(f_row)(encoder).show


What information is lost after applying mapPartitions so output cannot be processed without explicit encoder? How to avoid specifying it?



Update



Well, maybe it's tricky to avoid because result is produced in executor(s) and driver (which displays output) does not know about result structure, but still I'm not convinced that it's impossible to avoid.










share|improve this question















I need to iterate over data frame in specific order and apply some complex logic to calculate new column.



Also my strong preference is to do it in generic way so I do not have to list all columns of a row and do df.as[my_record] or case Row(...) => as shown here. Instead, I want to access row columns by their names and just add result column(s) to source row.



Below approach works just fine but I'd like to avoid specifying schema twice: first time so that I can access columns by name while iterating and second time to process output.



import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema

val q = """
select 2 part, 1 id
union all select 2 part, 4 id
union all select 2 part, 3 id
union all select 2 part, 2 id
"""
val df = spark.sql(q)

def f_row(iter: Iterator[Row]) : Iterator[Row] = {
if (iter.hasNext) {
def complex_logic(p: Int): Integer = if (p == 3) null else p * 10;

val head = iter.next
val schema = StructType(head.schema.fields :+ StructField("result", IntegerType))
val r =
new GenericRowWithSchema((head.toSeq :+ complex_logic(head.getAs("id"))).toArray, schema)

iter.scanLeft(r)((r1, r2) =>
new GenericRowWithSchema((r2.toSeq :+ complex_logic(r2.getAs("id"))).toArray, schema)
)
} else iter
}

val schema = StructType(df.schema.fields :+ StructField("result", IntegerType))
val encoder = RowEncoder(schema)
df.repartition($"part").sortWithinPartitions($"id").mapPartitions(f_row)(encoder).show


What information is lost after applying mapPartitions so output cannot be processed without explicit encoder? How to avoid specifying it?



Update



Well, maybe it's tricky to avoid because result is produced in executor(s) and driver (which displays output) does not know about result structure, but still I'm not convinced that it's impossible to avoid.







scala apache-spark apache-spark-sql apache-spark-dataset






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 8 at 16:10

























asked Nov 8 at 11:10









Dr Y Wit

1,260311




1,260311












  • Can you use Dataset and provide a function that maps from T => U?
    – Terry Dactyl
    Nov 8 at 13:00










  • @TerryDactyl, can you elaborate a bit more? I use df.repartition($"part").sortWithinPartitions($"id") which is org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] and I supply f_row(iter: Iterator[Row]) : Iterator[Row] for mapPartitions.
    – Dr Y Wit
    Nov 8 at 13:09












  • A DataFrame is a Dataset[Row] and as discussed below Row is untyped. If you were to provide case classes T and U which correspond to the input and outputs shapes of your map function Iterator[T] => Iterator[U] and import spark.implicits._ then Spark may be able to provide an Encoder and you would effectively map from Dataset[T] => Dataset[U]. This is just a guess but may be worth a try.
    – Terry Dactyl
    Nov 8 at 13:16












  • See below......
    – Terry Dactyl
    Nov 8 at 14:13


















  • Can you use Dataset and provide a function that maps from T => U?
    – Terry Dactyl
    Nov 8 at 13:00










  • @TerryDactyl, can you elaborate a bit more? I use df.repartition($"part").sortWithinPartitions($"id") which is org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] and I supply f_row(iter: Iterator[Row]) : Iterator[Row] for mapPartitions.
    – Dr Y Wit
    Nov 8 at 13:09












  • A DataFrame is a Dataset[Row] and as discussed below Row is untyped. If you were to provide case classes T and U which correspond to the input and outputs shapes of your map function Iterator[T] => Iterator[U] and import spark.implicits._ then Spark may be able to provide an Encoder and you would effectively map from Dataset[T] => Dataset[U]. This is just a guess but may be worth a try.
    – Terry Dactyl
    Nov 8 at 13:16












  • See below......
    – Terry Dactyl
    Nov 8 at 14:13
















Can you use Dataset and provide a function that maps from T => U?
– Terry Dactyl
Nov 8 at 13:00




Can you use Dataset and provide a function that maps from T => U?
– Terry Dactyl
Nov 8 at 13:00












@TerryDactyl, can you elaborate a bit more? I use df.repartition($"part").sortWithinPartitions($"id") which is org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] and I supply f_row(iter: Iterator[Row]) : Iterator[Row] for mapPartitions.
– Dr Y Wit
Nov 8 at 13:09






@TerryDactyl, can you elaborate a bit more? I use df.repartition($"part").sortWithinPartitions($"id") which is org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] and I supply f_row(iter: Iterator[Row]) : Iterator[Row] for mapPartitions.
– Dr Y Wit
Nov 8 at 13:09














A DataFrame is a Dataset[Row] and as discussed below Row is untyped. If you were to provide case classes T and U which correspond to the input and outputs shapes of your map function Iterator[T] => Iterator[U] and import spark.implicits._ then Spark may be able to provide an Encoder and you would effectively map from Dataset[T] => Dataset[U]. This is just a guess but may be worth a try.
– Terry Dactyl
Nov 8 at 13:16






A DataFrame is a Dataset[Row] and as discussed below Row is untyped. If you were to provide case classes T and U which correspond to the input and outputs shapes of your map function Iterator[T] => Iterator[U] and import spark.implicits._ then Spark may be able to provide an Encoder and you would effectively map from Dataset[T] => Dataset[U]. This is just a guess but may be worth a try.
– Terry Dactyl
Nov 8 at 13:16














See below......
– Terry Dactyl
Nov 8 at 14:13




See below......
– Terry Dactyl
Nov 8 at 14:13












3 Answers
3






active

oldest

votes

















up vote
0
down vote














What information is lost after applying mapPartitions so output cannot be processed without




The information is hardly lost - it wasn't there from the begining - subclasses of Row or InternalRow are basically untyped, variable shape containers, which don't provide any useful type information, that could be used to derive an Encoder.



schema in GenericRowWithSchema is inconsequential as it describes content in terms of metadata not types.




How to avoid specifying it?




Sorry, you're out of luck. If you want to use dynamically typed constructs (a bag of Any) in a statically typed language you have to pay the price, which here is providing an Encoder.






share|improve this answer





















  • I do not agree that "it wasn't there from the begining". I can show result after re-partitioning and ordering df.repartition($"part").sortWithinPartitions($"id").show but it's not possible after mapPartitions.
    – Dr Y Wit
    Nov 8 at 12:35












  • And function used in mapPartitions is (func: (Iterator[T]) ⇒ Iterator[U]). So why show can not be used if function produces pretty much the same records (with new columns to be precise) after iterating.
    – Dr Y Wit
    Nov 8 at 12:38


















up vote
0
down vote













OK - I have checked some of my spark code and using .mapPartitions with the Dataset API does not require me to explicitly build/pass an encoder.



You need something like:



case class Before(part: Int, id: Int)
case class After(part: Int, id: Int, newCol: String)

import spark.implicits._

// Note column names/types must match case class constructor parameters.
val beforeDS = <however you obtain your input DF>.as[Before]

def f_row(it: Iterator[Before]): Iterator[After] = ???

beforeDS.reparition($"part").sortWithinPartitions($"id").mapPartitions(f_row).show





share|improve this answer





















  • As I mentioned in the original post, "my strong preference is to do it in generic way so I do not have to list all columns of a row and do df.as[my_record] or case Row(...) =>". I want to avoid any changes if new columns added to data frame. See stackoverflow.com/questions/53159461/…
    – Dr Y Wit
    Nov 8 at 15:50




















up vote
0
down vote













I found below explanation sufficient, maybe it will be useful for others.



mapPartitions requires Encoder because otherwise it cannot construct Dataset from iterator or Rows. Even though each row has a schema, that shema cannot be derived (used) by constructor of Dataset[U].



  def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = {
new Dataset[U](
sparkSession,
MapPartitions[T, U](func, logicalPlan),
implicitly[Encoder[U]])
}


On the other hand, without calling mapPartitions Spark can use the schema derived from initial query because structure (metadata) of the original columns is not changed.



Entire transformation on DataFrame can be encapsulated into a function, especially taking into account that f_row does not make much sense without partitioning and sorting.



def addResult(df: org.apache.spark.sql.DataFrame) = {
val schema = StructType(df.schema.fields :+ StructField("result", IntegerType))
implicit val encoder = RowEncoder(schema)

def f_row(iter: Iterator[Row]) : Iterator[Row] = {
if (iter.hasNext) {
def complex_logic(p: Int): Integer = if (p == 3) null else p * 10;

val head = iter.next
val r =
new GenericRowWithSchema((head.toSeq :+ complex_logic(head.getAs("id"))).toArray, schema)

iter.scanLeft(r)((r1, r2) =>
new GenericRowWithSchema((r2.toSeq :+ complex_logic(r2.getAs("id"))).toArray, schema)
)
} else iter
}

df.repartition($"part").sortWithinPartitions($"id").mapPartitions(f_row)
}


Test



scala> addResult(df).show
+----+---+------+
|part| id|result|
+----+---+------+
| 2| 1| 10|
| 2| 2| 20|
| 2| 3| null|
| 2| 4| 40|
+----+---+------+





share|improve this answer























    Your Answer






    StackExchange.ifUsing("editor", function () {
    StackExchange.using("externalEditor", function () {
    StackExchange.using("snippets", function () {
    StackExchange.snippets.init();
    });
    });
    }, "code-snippets");

    StackExchange.ready(function() {
    var channelOptions = {
    tags: "".split(" "),
    id: "1"
    };
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function() {
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled) {
    StackExchange.using("snippets", function() {
    createEditor();
    });
    }
    else {
    createEditor();
    }
    });

    function createEditor() {
    StackExchange.prepareEditor({
    heartbeatType: 'answer',
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader: {
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    },
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    });


    }
    });














     

    draft saved


    draft discarded


















    StackExchange.ready(
    function () {
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53206556%2favoid-specifying-schema-twice-spark-scala%23new-answer', 'question_page');
    }
    );

    Post as a guest















    Required, but never shown

























    3 Answers
    3






    active

    oldest

    votes








    3 Answers
    3






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes








    up vote
    0
    down vote














    What information is lost after applying mapPartitions so output cannot be processed without




    The information is hardly lost - it wasn't there from the begining - subclasses of Row or InternalRow are basically untyped, variable shape containers, which don't provide any useful type information, that could be used to derive an Encoder.



    schema in GenericRowWithSchema is inconsequential as it describes content in terms of metadata not types.




    How to avoid specifying it?




    Sorry, you're out of luck. If you want to use dynamically typed constructs (a bag of Any) in a statically typed language you have to pay the price, which here is providing an Encoder.






    share|improve this answer





















    • I do not agree that "it wasn't there from the begining". I can show result after re-partitioning and ordering df.repartition($"part").sortWithinPartitions($"id").show but it's not possible after mapPartitions.
      – Dr Y Wit
      Nov 8 at 12:35












    • And function used in mapPartitions is (func: (Iterator[T]) ⇒ Iterator[U]). So why show can not be used if function produces pretty much the same records (with new columns to be precise) after iterating.
      – Dr Y Wit
      Nov 8 at 12:38















    up vote
    0
    down vote














    What information is lost after applying mapPartitions so output cannot be processed without




    The information is hardly lost - it wasn't there from the begining - subclasses of Row or InternalRow are basically untyped, variable shape containers, which don't provide any useful type information, that could be used to derive an Encoder.



    schema in GenericRowWithSchema is inconsequential as it describes content in terms of metadata not types.




    How to avoid specifying it?




    Sorry, you're out of luck. If you want to use dynamically typed constructs (a bag of Any) in a statically typed language you have to pay the price, which here is providing an Encoder.






    share|improve this answer





















    • I do not agree that "it wasn't there from the begining". I can show result after re-partitioning and ordering df.repartition($"part").sortWithinPartitions($"id").show but it's not possible after mapPartitions.
      – Dr Y Wit
      Nov 8 at 12:35












    • And function used in mapPartitions is (func: (Iterator[T]) ⇒ Iterator[U]). So why show can not be used if function produces pretty much the same records (with new columns to be precise) after iterating.
      – Dr Y Wit
      Nov 8 at 12:38













    up vote
    0
    down vote










    up vote
    0
    down vote










    What information is lost after applying mapPartitions so output cannot be processed without




    The information is hardly lost - it wasn't there from the begining - subclasses of Row or InternalRow are basically untyped, variable shape containers, which don't provide any useful type information, that could be used to derive an Encoder.



    schema in GenericRowWithSchema is inconsequential as it describes content in terms of metadata not types.




    How to avoid specifying it?




    Sorry, you're out of luck. If you want to use dynamically typed constructs (a bag of Any) in a statically typed language you have to pay the price, which here is providing an Encoder.






    share|improve this answer













    What information is lost after applying mapPartitions so output cannot be processed without




    The information is hardly lost - it wasn't there from the begining - subclasses of Row or InternalRow are basically untyped, variable shape containers, which don't provide any useful type information, that could be used to derive an Encoder.



    schema in GenericRowWithSchema is inconsequential as it describes content in terms of metadata not types.




    How to avoid specifying it?




    Sorry, you're out of luck. If you want to use dynamically typed constructs (a bag of Any) in a statically typed language you have to pay the price, which here is providing an Encoder.







    share|improve this answer












    share|improve this answer



    share|improve this answer










    answered Nov 8 at 12:27









    user10623686

    1




    1












    • I do not agree that "it wasn't there from the begining". I can show result after re-partitioning and ordering df.repartition($"part").sortWithinPartitions($"id").show but it's not possible after mapPartitions.
      – Dr Y Wit
      Nov 8 at 12:35












    • And function used in mapPartitions is (func: (Iterator[T]) ⇒ Iterator[U]). So why show can not be used if function produces pretty much the same records (with new columns to be precise) after iterating.
      – Dr Y Wit
      Nov 8 at 12:38


















    • I do not agree that "it wasn't there from the begining". I can show result after re-partitioning and ordering df.repartition($"part").sortWithinPartitions($"id").show but it's not possible after mapPartitions.
      – Dr Y Wit
      Nov 8 at 12:35












    • And function used in mapPartitions is (func: (Iterator[T]) ⇒ Iterator[U]). So why show can not be used if function produces pretty much the same records (with new columns to be precise) after iterating.
      – Dr Y Wit
      Nov 8 at 12:38
















    I do not agree that "it wasn't there from the begining". I can show result after re-partitioning and ordering df.repartition($"part").sortWithinPartitions($"id").show but it's not possible after mapPartitions.
    – Dr Y Wit
    Nov 8 at 12:35






    I do not agree that "it wasn't there from the begining". I can show result after re-partitioning and ordering df.repartition($"part").sortWithinPartitions($"id").show but it's not possible after mapPartitions.
    – Dr Y Wit
    Nov 8 at 12:35














    And function used in mapPartitions is (func: (Iterator[T]) ⇒ Iterator[U]). So why show can not be used if function produces pretty much the same records (with new columns to be precise) after iterating.
    – Dr Y Wit
    Nov 8 at 12:38




    And function used in mapPartitions is (func: (Iterator[T]) ⇒ Iterator[U]). So why show can not be used if function produces pretty much the same records (with new columns to be precise) after iterating.
    – Dr Y Wit
    Nov 8 at 12:38












    up vote
    0
    down vote













    OK - I have checked some of my spark code and using .mapPartitions with the Dataset API does not require me to explicitly build/pass an encoder.



    You need something like:



    case class Before(part: Int, id: Int)
    case class After(part: Int, id: Int, newCol: String)

    import spark.implicits._

    // Note column names/types must match case class constructor parameters.
    val beforeDS = <however you obtain your input DF>.as[Before]

    def f_row(it: Iterator[Before]): Iterator[After] = ???

    beforeDS.reparition($"part").sortWithinPartitions($"id").mapPartitions(f_row).show





    share|improve this answer





















    • As I mentioned in the original post, "my strong preference is to do it in generic way so I do not have to list all columns of a row and do df.as[my_record] or case Row(...) =>". I want to avoid any changes if new columns added to data frame. See stackoverflow.com/questions/53159461/…
      – Dr Y Wit
      Nov 8 at 15:50

















    up vote
    0
    down vote













    OK - I have checked some of my spark code and using .mapPartitions with the Dataset API does not require me to explicitly build/pass an encoder.



    You need something like:



    case class Before(part: Int, id: Int)
    case class After(part: Int, id: Int, newCol: String)

    import spark.implicits._

    // Note column names/types must match case class constructor parameters.
    val beforeDS = <however you obtain your input DF>.as[Before]

    def f_row(it: Iterator[Before]): Iterator[After] = ???

    beforeDS.reparition($"part").sortWithinPartitions($"id").mapPartitions(f_row).show





    share|improve this answer





















    • As I mentioned in the original post, "my strong preference is to do it in generic way so I do not have to list all columns of a row and do df.as[my_record] or case Row(...) =>". I want to avoid any changes if new columns added to data frame. See stackoverflow.com/questions/53159461/…
      – Dr Y Wit
      Nov 8 at 15:50















    up vote
    0
    down vote










    up vote
    0
    down vote









    OK - I have checked some of my spark code and using .mapPartitions with the Dataset API does not require me to explicitly build/pass an encoder.



    You need something like:



    case class Before(part: Int, id: Int)
    case class After(part: Int, id: Int, newCol: String)

    import spark.implicits._

    // Note column names/types must match case class constructor parameters.
    val beforeDS = <however you obtain your input DF>.as[Before]

    def f_row(it: Iterator[Before]): Iterator[After] = ???

    beforeDS.reparition($"part").sortWithinPartitions($"id").mapPartitions(f_row).show





    share|improve this answer












    OK - I have checked some of my spark code and using .mapPartitions with the Dataset API does not require me to explicitly build/pass an encoder.



    You need something like:



    case class Before(part: Int, id: Int)
    case class After(part: Int, id: Int, newCol: String)

    import spark.implicits._

    // Note column names/types must match case class constructor parameters.
    val beforeDS = <however you obtain your input DF>.as[Before]

    def f_row(it: Iterator[Before]): Iterator[After] = ???

    beforeDS.reparition($"part").sortWithinPartitions($"id").mapPartitions(f_row).show






    share|improve this answer












    share|improve this answer



    share|improve this answer










    answered Nov 8 at 14:13









    Terry Dactyl

    1,073412




    1,073412












    • As I mentioned in the original post, "my strong preference is to do it in generic way so I do not have to list all columns of a row and do df.as[my_record] or case Row(...) =>". I want to avoid any changes if new columns added to data frame. See stackoverflow.com/questions/53159461/…
      – Dr Y Wit
      Nov 8 at 15:50




















    • As I mentioned in the original post, "my strong preference is to do it in generic way so I do not have to list all columns of a row and do df.as[my_record] or case Row(...) =>". I want to avoid any changes if new columns added to data frame. See stackoverflow.com/questions/53159461/…
      – Dr Y Wit
      Nov 8 at 15:50


















    As I mentioned in the original post, "my strong preference is to do it in generic way so I do not have to list all columns of a row and do df.as[my_record] or case Row(...) =>". I want to avoid any changes if new columns added to data frame. See stackoverflow.com/questions/53159461/…
    – Dr Y Wit
    Nov 8 at 15:50






    As I mentioned in the original post, "my strong preference is to do it in generic way so I do not have to list all columns of a row and do df.as[my_record] or case Row(...) =>". I want to avoid any changes if new columns added to data frame. See stackoverflow.com/questions/53159461/…
    – Dr Y Wit
    Nov 8 at 15:50












    up vote
    0
    down vote













    I found below explanation sufficient, maybe it will be useful for others.



    mapPartitions requires Encoder because otherwise it cannot construct Dataset from iterator or Rows. Even though each row has a schema, that shema cannot be derived (used) by constructor of Dataset[U].



      def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = {
    new Dataset[U](
    sparkSession,
    MapPartitions[T, U](func, logicalPlan),
    implicitly[Encoder[U]])
    }


    On the other hand, without calling mapPartitions Spark can use the schema derived from initial query because structure (metadata) of the original columns is not changed.



    Entire transformation on DataFrame can be encapsulated into a function, especially taking into account that f_row does not make much sense without partitioning and sorting.



    def addResult(df: org.apache.spark.sql.DataFrame) = {
    val schema = StructType(df.schema.fields :+ StructField("result", IntegerType))
    implicit val encoder = RowEncoder(schema)

    def f_row(iter: Iterator[Row]) : Iterator[Row] = {
    if (iter.hasNext) {
    def complex_logic(p: Int): Integer = if (p == 3) null else p * 10;

    val head = iter.next
    val r =
    new GenericRowWithSchema((head.toSeq :+ complex_logic(head.getAs("id"))).toArray, schema)

    iter.scanLeft(r)((r1, r2) =>
    new GenericRowWithSchema((r2.toSeq :+ complex_logic(r2.getAs("id"))).toArray, schema)
    )
    } else iter
    }

    df.repartition($"part").sortWithinPartitions($"id").mapPartitions(f_row)
    }


    Test



    scala> addResult(df).show
    +----+---+------+
    |part| id|result|
    +----+---+------+
    | 2| 1| 10|
    | 2| 2| 20|
    | 2| 3| null|
    | 2| 4| 40|
    +----+---+------+





    share|improve this answer



























      up vote
      0
      down vote













      I found below explanation sufficient, maybe it will be useful for others.



      mapPartitions requires Encoder because otherwise it cannot construct Dataset from iterator or Rows. Even though each row has a schema, that shema cannot be derived (used) by constructor of Dataset[U].



        def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = {
      new Dataset[U](
      sparkSession,
      MapPartitions[T, U](func, logicalPlan),
      implicitly[Encoder[U]])
      }


      On the other hand, without calling mapPartitions Spark can use the schema derived from initial query because structure (metadata) of the original columns is not changed.



      Entire transformation on DataFrame can be encapsulated into a function, especially taking into account that f_row does not make much sense without partitioning and sorting.



      def addResult(df: org.apache.spark.sql.DataFrame) = {
      val schema = StructType(df.schema.fields :+ StructField("result", IntegerType))
      implicit val encoder = RowEncoder(schema)

      def f_row(iter: Iterator[Row]) : Iterator[Row] = {
      if (iter.hasNext) {
      def complex_logic(p: Int): Integer = if (p == 3) null else p * 10;

      val head = iter.next
      val r =
      new GenericRowWithSchema((head.toSeq :+ complex_logic(head.getAs("id"))).toArray, schema)

      iter.scanLeft(r)((r1, r2) =>
      new GenericRowWithSchema((r2.toSeq :+ complex_logic(r2.getAs("id"))).toArray, schema)
      )
      } else iter
      }

      df.repartition($"part").sortWithinPartitions($"id").mapPartitions(f_row)
      }


      Test



      scala> addResult(df).show
      +----+---+------+
      |part| id|result|
      +----+---+------+
      | 2| 1| 10|
      | 2| 2| 20|
      | 2| 3| null|
      | 2| 4| 40|
      +----+---+------+





      share|improve this answer

























        up vote
        0
        down vote










        up vote
        0
        down vote









        I found below explanation sufficient, maybe it will be useful for others.



        mapPartitions requires Encoder because otherwise it cannot construct Dataset from iterator or Rows. Even though each row has a schema, that shema cannot be derived (used) by constructor of Dataset[U].



          def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = {
        new Dataset[U](
        sparkSession,
        MapPartitions[T, U](func, logicalPlan),
        implicitly[Encoder[U]])
        }


        On the other hand, without calling mapPartitions Spark can use the schema derived from initial query because structure (metadata) of the original columns is not changed.



        Entire transformation on DataFrame can be encapsulated into a function, especially taking into account that f_row does not make much sense without partitioning and sorting.



        def addResult(df: org.apache.spark.sql.DataFrame) = {
        val schema = StructType(df.schema.fields :+ StructField("result", IntegerType))
        implicit val encoder = RowEncoder(schema)

        def f_row(iter: Iterator[Row]) : Iterator[Row] = {
        if (iter.hasNext) {
        def complex_logic(p: Int): Integer = if (p == 3) null else p * 10;

        val head = iter.next
        val r =
        new GenericRowWithSchema((head.toSeq :+ complex_logic(head.getAs("id"))).toArray, schema)

        iter.scanLeft(r)((r1, r2) =>
        new GenericRowWithSchema((r2.toSeq :+ complex_logic(r2.getAs("id"))).toArray, schema)
        )
        } else iter
        }

        df.repartition($"part").sortWithinPartitions($"id").mapPartitions(f_row)
        }


        Test



        scala> addResult(df).show
        +----+---+------+
        |part| id|result|
        +----+---+------+
        | 2| 1| 10|
        | 2| 2| 20|
        | 2| 3| null|
        | 2| 4| 40|
        +----+---+------+





        share|improve this answer














        I found below explanation sufficient, maybe it will be useful for others.



        mapPartitions requires Encoder because otherwise it cannot construct Dataset from iterator or Rows. Even though each row has a schema, that shema cannot be derived (used) by constructor of Dataset[U].



          def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = {
        new Dataset[U](
        sparkSession,
        MapPartitions[T, U](func, logicalPlan),
        implicitly[Encoder[U]])
        }


        On the other hand, without calling mapPartitions Spark can use the schema derived from initial query because structure (metadata) of the original columns is not changed.



        Entire transformation on DataFrame can be encapsulated into a function, especially taking into account that f_row does not make much sense without partitioning and sorting.



        def addResult(df: org.apache.spark.sql.DataFrame) = {
        val schema = StructType(df.schema.fields :+ StructField("result", IntegerType))
        implicit val encoder = RowEncoder(schema)

        def f_row(iter: Iterator[Row]) : Iterator[Row] = {
        if (iter.hasNext) {
        def complex_logic(p: Int): Integer = if (p == 3) null else p * 10;

        val head = iter.next
        val r =
        new GenericRowWithSchema((head.toSeq :+ complex_logic(head.getAs("id"))).toArray, schema)

        iter.scanLeft(r)((r1, r2) =>
        new GenericRowWithSchema((r2.toSeq :+ complex_logic(r2.getAs("id"))).toArray, schema)
        )
        } else iter
        }

        df.repartition($"part").sortWithinPartitions($"id").mapPartitions(f_row)
        }


        Test



        scala> addResult(df).show
        +----+---+------+
        |part| id|result|
        +----+---+------+
        | 2| 1| 10|
        | 2| 2| 20|
        | 2| 3| null|
        | 2| 4| 40|
        +----+---+------+






        share|improve this answer














        share|improve this answer



        share|improve this answer








        edited 2 days ago

























        answered 2 days ago









        Dr Y Wit

        1,260311




        1,260311






























             

            draft saved


            draft discarded



















































             


            draft saved


            draft discarded














            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53206556%2favoid-specifying-schema-twice-spark-scala%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown