Debugging DebugQuery#debug gives us

Debug
+- *(1) Debug
   +- *(1) SerializeFromObject [assertnotnull(input[0, org.jetbrains.spark.api.Arity1, true]).getA.intValue AS a#18]
      +- *(1) Debug
         +- *(1) MapElements org.jetbrains.spark.api.examples.NonWorkingExampleKt$inlined$sam$i$org_apache_spark_api_java_function_MapFunction$0@687d31a9, obj#17: org.jetbrains.spark.api.Arity1
            +- *(1) Debug
               +- *(1) DeserializeToObject staticinvoke(class java.lang.Integer, ObjectType(class java.lang.Integer), valueOf, value#12, true, false), obj#16: java.lang.Integer
                  +- *(1) Debug
                     +- *(1) LocalTableScan [value#12]

.intValue here ↑ is obviously invalid

It appears here because there is code

case t if isSubtype(t, localTypeOf[java.lang.Integer]) =>
        createSerializerForInteger(inputObject)

which explicitly calls unboxing with intValue. Moreover, there is same code in JavaReflection and in ScalaReflection

Changing this to more sophistecated

case t if isSubtype(t, localTypeOf[java.lang.Integer]) =>
        org.apache.spark.sql.catalyst.expressions.If(
          IsNull(inputObject),
          org.apache.spark.sql.catalyst.expressions.Literal.create(null, IntegerType),
          UpCast(inputObject, IntegerType)
        )

Changes error to Cannot up cast assertnotnull(input[0, org.jetbrains.spark.api.Arity1, true]).getA from java.lang.Integer to int. which is a bit crazy because cause it means that primitive int serializer is somewhere

Interesting thing: If one will look at generated code (to see it you need to change code following way)

fun main() {
    withSpark(props = mapOf("spark.sql.codegen.wholeStage" to true)) {
        dsOf(1, null, 2)
                .map { c(it) }
                .also { it.printSchema() }
                .debugCodegen()
                .debug()
                .show()
    }
}

One will see that this can be thrown only when tuple (Arity1) is null

Here is the only data flow to get null at this point:

private void serializefromobject_doConsume_0(org.jetbrains.spark.api.Arity1 serializefromobject_expr_0_0, boolean serializefromobject_exprIsNull_0_0) throws java.io.IOException {
    serializefromobject_doConsume_0(mapelements_value_1, mapelements_isNull_1);
        mapelements_isNull_1 = mapelements_resultIsNull_0;
            mapelements_resultIsNull_0 = mapelements_exprIsNull_0_0;
                private void mapelements_doConsume_0(java.lang.Integer mapelements_expr_0_0, boolean mapelements_exprIsNull_0_0) throws java.io.IOException {
                    mapelements_doConsume_0(deserializetoobject_value_0, deserializetoobject_isNull_0);
                        deserializetoobject_resultIsNull_0 = deserializetoobject_exprIsNull_0_0;
                            private void deserializetoobject_doConsume_0(InternalRow localtablescan_row_0, int deserializetoobject_expr_0_0, boolean deserializetoobject_exprIsNull_0_0) throws java.io.IOException {
                                deserializetoobject_doConsume_0(localtablescan_row_0, localtablescan_value_0, localtablescan_isNull_0);
                                    boolean localtablescan_isNull_0 = localtablescan_row_0.isNullAt(0);
        mapelements_isNull_1 = true;

Here ↑ we can see that if there is any null element in Dataset it will be propagated thru the whole pipeline and this will lead to NPE. serializefromobject_exprIsNull_0_0 is argument, stating that input element was null

Now it's pretty obvious that we can add one more filter to workaround:

withSpark {
    dsOf(1, null, 2)
            .filterNotNull()
            .map { c(it) }
            .also { it.printSchema() }
            .debugCodegen()
            .debug()
            .show()
}