Debugging DebugQuery#debug
gives us
Debug
+- *(1) Debug
+- *(1) SerializeFromObject [assertnotnull(input[0, org.jetbrains.spark.api.Arity1, true]).getA.intValue AS a#18]
+- *(1) Debug
+- *(1) MapElements org.jetbrains.spark.api.examples.NonWorkingExampleKt$inlined$sam$i$org_apache_spark_api_java_function_MapFunction$0@687d31a9, obj#17: org.jetbrains.spark.api.Arity1
+- *(1) Debug
+- *(1) DeserializeToObject staticinvoke(class java.lang.Integer, ObjectType(class java.lang.Integer), valueOf, value#12, true, false), obj#16: java.lang.Integer
+- *(1) Debug
+- *(1) LocalTableScan [value#12]
.intValue
here ↑ is obviously invalid
It appears here because there is code
case t if isSubtype(t, localTypeOf[java.lang.Integer]) =>
createSerializerForInteger(inputObject)
which explicitly calls unboxing with intValue
. Moreover, there is same code in JavaReflection
and in ScalaReflection
Changing this to more sophistecated
case t if isSubtype(t, localTypeOf[java.lang.Integer]) =>
org.apache.spark.sql.catalyst.expressions.If(
IsNull(inputObject),
org.apache.spark.sql.catalyst.expressions.Literal.create(null, IntegerType),
UpCast(inputObject, IntegerType)
)
Changes error to Cannot up cast assertnotnull(input[0, org.jetbrains.spark.api.Arity1, true]).getA from java.lang.Integer to int.
which is a bit crazy because cause it means that primitive int serializer is somewhere
Interesting thing: If one will look at generated code (to see it you need to change code following way)
fun main() {
withSpark(props = mapOf("spark.sql.codegen.wholeStage" to true)) {
dsOf(1, null, 2)
.map { c(it) }
.also { it.printSchema() }
.debugCodegen()
.debug()
.show()
}
}
One will see that this can be thrown only when tuple (Arity1
) is null
Here is the only data flow to get null
at this point:
private void serializefromobject_doConsume_0(org.jetbrains.spark.api.Arity1 serializefromobject_expr_0_0, boolean serializefromobject_exprIsNull_0_0) throws java.io.IOException {
serializefromobject_doConsume_0(mapelements_value_1, mapelements_isNull_1);
mapelements_isNull_1 = mapelements_resultIsNull_0;
mapelements_resultIsNull_0 = mapelements_exprIsNull_0_0;
private void mapelements_doConsume_0(java.lang.Integer mapelements_expr_0_0, boolean mapelements_exprIsNull_0_0) throws java.io.IOException {
mapelements_doConsume_0(deserializetoobject_value_0, deserializetoobject_isNull_0);
deserializetoobject_resultIsNull_0 = deserializetoobject_exprIsNull_0_0;
private void deserializetoobject_doConsume_0(InternalRow localtablescan_row_0, int deserializetoobject_expr_0_0, boolean deserializetoobject_exprIsNull_0_0) throws java.io.IOException {
deserializetoobject_doConsume_0(localtablescan_row_0, localtablescan_value_0, localtablescan_isNull_0);
boolean localtablescan_isNull_0 = localtablescan_row_0.isNullAt(0);
mapelements_isNull_1 = true;
Here ↑ we can see that if there is any null element in Dataset
it will be propagated thru the whole pipeline and this will lead to NPE. serializefromobject_exprIsNull_0_0
is argument, stating that input element was null
Now it's pretty obvious that we can add one more filter to workaround:
withSpark {
dsOf(1, null, 2)
.filterNotNull()
.map { c(it) }
.also { it.printSchema() }
.debugCodegen()
.debug()
.show()
}