Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29641][PYTHON][CORE] Stage Level Sched: Add python api's and tests #28085

Closed
wants to merge 31 commits into from

Conversation

tgravescs
Copy link
Contributor

What changes were proposed in this pull request?

As part of the Stage level scheduling features, add the Python api's to set resource profiles.
This also adds the functionality to properly apply the pyspark memory configuration when specified in the ResourceProfile. The pyspark memory configuration is being passed in the task local properties. This was an easy way to get it to the PythonRunner that needs it. I modeled this off how the barrier task scheduling is passing the addresses. As part of this I added in the JavaRDD api's because those are needed by python.

Why are the changes needed?

python api for this feature

Does this PR introduce any user-facing change?

Yes adds the java and python apis for user to specify a ResourceProfile to use stage level scheduling.

How was this patch tested?

unit tests and manually tested on yarn. Tests also run to verify it errors properly on standalone and local mode where its not yet supported.

@SparkQA
Copy link

SparkQA commented Mar 31, 2020

Test build #120650 has finished for PR 28085 at commit 2b515c8.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 31, 2020

Test build #120651 has finished for PR 28085 at commit a052427.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 1, 2020

Test build #120653 has finished for PR 28085 at commit 6a90fbe.

  • This patch fails from timeout after a configured wait of 400m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor Author

test this please

@SparkQA
Copy link

SparkQA commented Apr 1, 2020

Test build #120677 has finished for PR 28085 at commit 6a90fbe.

  • This patch fails from timeout after a configured wait of 400m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor Author

the tests are timing out running these:
-- Gauges ----------------------------------------------------------------------
master.aliveWorkers
4/1/20 12:13:40 PM =============================================================

-- Gauges ----------------------------------------------------------------------
master.aliveWorkers
4/1/20 12:33:40 PM =============================================================

-- Gauges ----------------------------------------------------------------------
master.aliveWorkers
4/1/20 12:53:40 PM =============================================================

I'm not sure what those tests are from. @dongjoon-hyun would you happen to know?

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Apr 1, 2020

Ur, is this a consistent failure on this PR? Let me check. I saw the same failure time to time before, but not consistently.

python/pyspark/rdd.py Outdated Show resolved Hide resolved
@dongjoon-hyun
Copy link
Member

Retest this please.

@SparkQA
Copy link

SparkQA commented Apr 2, 2020

Test build #120692 has finished for PR 28085 at commit 6a90fbe.

  • This patch fails from timeout after a configured wait of 400m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 2, 2020

Test build #120693 has finished for PR 28085 at commit 2d754f7.

  • This patch fails from timeout after a configured wait of 400m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

The running Jenkins job still seems to fail.

-- Gauges ----------------------------------------------------------------------
master.aliveWorkers
4/2/20 9:15:05 AM ==============================================================

@dongjoon-hyun
Copy link
Member

For now, I have no clue~, @tgravescs .

@tgravescs
Copy link
Contributor Author

do you know which test is outputting that?

@SparkQA
Copy link

SparkQA commented Apr 2, 2020

Test build #120723 has finished for PR 28085 at commit 1071f40.

  • This patch fails from timeout after a configured wait of 400m.
  • This patch merges cleanly.
  • This patch adds no public classes.

hit some issues with switching between objects created before
SparkContext and ones after. Easier to understand this way. Add tests
@tgravescs
Copy link
Contributor Author

@HyukjinKwon I believe I addressed all your comments. I made it so the python classes can be called before SparkContext creation with the exception of the ResourceProfile.id function.

@SparkQA
Copy link

SparkQA commented Apr 16, 2020

Test build #121377 has finished for PR 28085 at commit a6e9ac2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor Author

test this please

@SparkQA
Copy link

SparkQA commented Apr 17, 2020

Test build #121383 has finished for PR 28085 at commit a6e9ac2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

That's nice, thanks @tgravescs for addressing my comments.

@HyukjinKwon
Copy link
Member

Looks pretty fine to me. I will do a final review and leave my sign-off within few days.

@Ngone51
Copy link
Member

Ngone51 commented Apr 20, 2020

I am basically ok with the change now. But I'm not good at python so it's still depends on @HyukjinKwon .

@SparkQA
Copy link

SparkQA commented Apr 20, 2020

Test build #121535 has finished for PR 28085 at commit 528094c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor Author

looks like resource pyspark test ran now. Let me know if I missed anything with that.

Starting test(pypy): pyspark.resource.tests.test_resources
Finished test(pypy): pyspark.profiler (8s)
Starting test(pypy): pyspark.serializers
Finished test(pypy): pyspark.resource.tests.test_resources (6s)

@HyukjinKwon
Copy link
Member

retest this please

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. several nits.

python/pyspark/resource/executorrequests.py Outdated Show resolved Hide resolved
python/pyspark/resource/executorrequests.py Show resolved Hide resolved
python/pyspark/rdd.py Outdated Show resolved Hide resolved
python/pyspark/resource/executorrequests.py Outdated Show resolved Hide resolved
@SparkQA
Copy link

SparkQA commented Apr 22, 2020

Test build #121604 has finished for PR 28085 at commit 528094c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 22, 2020

Test build #121624 has finished for PR 28085 at commit 89be02e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 22, 2020

Test build #121627 has finished for PR 28085 at commit 354fb0c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Merged to master.

Thank you @tgravescs.

agrawaldevesh pushed a commit to agrawaldevesh/spark that referenced this pull request Aug 13, 2020
The DecommissionWorkerSuite started becoming flaky and it revealed a real regression. Recent PR's (apache#28085 and apache#29211) neccessitate a small reworking of the decommissioning logic.

Before getting into that, let me describe the intended behavior of decommissioning:

If a fetch failure happens where the source executor was decommissioned, we want to treat that as an eager signal to clear all shuffle state associated with that executor. In addition if we know that the host was decommissioned, we want to forget about all map statuses from all other executors on that decommissioned host. This is what the test "decommission workers ensure that fetch failures lead to rerun" is trying to test. This invariant is important to ensure that decommissioning a host does not lead to multiple fetch failures that might fail the job.

- Per apache#29211, the executors now eagerly exit on decommissioning and thus the executor is lost before the fetch failure even happens. (I tested this by waiting some seconds before triggering the fetch failure). When an executor is lost, we forget its decommissioning information. The fix is to keep the decommissioning information around for some time after removal with some extra logic to finally purge it after a timeout.

- Per apache#28085, when the executor is lost, it forgets the shuffle state about just that executor and increments the shuffleFileLostEpoch. This incrementing precludes the clearing of state of the entire host when the fetch failure happens. I elected to only change this codepath for the special case of decommissioning, without any other side effects. This whole version keeping stuff is complex and it has effectively not been semantically changed since 2013! The fix here is also simple: Ignore the shuffleFileLostEpoch when the shuffle status is being cleared due to a fetch failure resulting from host decommission.

These two fixes are local to decommissioning only and don't change other behavior.

I also added some more tests to TaskSchedulerImpl to ensure that the decommissioning information is indeed purged after a timeout.
agrawaldevesh pushed a commit to agrawaldevesh/spark that referenced this pull request Aug 13, 2020
The DecommissionWorkerSuite started becoming flaky and it revealed a real regression. Recent PR's (apache#28085 and apache#29211) neccessitate a small reworking of the decommissioning logic.

Before getting into that, let me describe the intended behavior of decommissioning:

If a fetch failure happens where the source executor was decommissioned, we want to treat that as an eager signal to clear all shuffle state associated with that executor. In addition if we know that the host was decommissioned, we want to forget about all map statuses from all other executors on that decommissioned host. This is what the test "decommission workers ensure that fetch failures lead to rerun" is trying to test. This invariant is important to ensure that decommissioning a host does not lead to multiple fetch failures that might fail the job.

- Per apache#29211, the executors now eagerly exit on decommissioning and thus the executor is lost before the fetch failure even happens. (I tested this by waiting some seconds before triggering the fetch failure). When an executor is lost, we forget its decommissioning information. The fix is to keep the decommissioning information around for some time after removal with some extra logic to finally purge it after a timeout.

- Per apache#28085, when the executor is lost, it forgets the shuffle state about just that executor and increments the shuffleFileLostEpoch. This incrementing precludes the clearing of state of the entire host when the fetch failure happens. This PR elects to only change this codepath for the special case of decommissioning, without any other side effects. This whole version keeping stuff is complex and it has effectively not been semantically changed since 2013! The fix here is also simple: Ignore the shuffleFileLostEpoch when the shuffle status is being cleared due to a fetch failure resulting from host decommission.

These two fixes are local to decommissioning only and don't change other behavior.

Also added some more tests to TaskSchedulerImpl to ensure that the decommissioning information is indeed purged after a timeout.

Also hardened the test DecommissionWorkerSuite to make it wait for successful job completion.
agrawaldevesh pushed a commit to agrawaldevesh/spark that referenced this pull request Aug 14, 2020
The DecommissionWorkerSuite started becoming flaky and it revealed a real regression. Recent PR's (apache#28085 and apache#29211) neccessitate a small reworking of the decommissioning logic.

Before getting into that, let me describe the intended behavior of decommissioning:

If a fetch failure happens where the source executor was decommissioned, we want to treat that as an eager signal to clear all shuffle state associated with that executor. In addition if we know that the host was decommissioned, we want to forget about all map statuses from all other executors on that decommissioned host. This is what the test "decommission workers ensure that fetch failures lead to rerun" is trying to test. This invariant is important to ensure that decommissioning a host does not lead to multiple fetch failures that might fail the job.

- Per apache#29211, the executors now eagerly exit on decommissioning and thus the executor is lost before the fetch failure even happens. (I tested this by waiting some seconds before triggering the fetch failure). When an executor is lost, we forget its decommissioning information. The fix is to keep the decommissioning information around for some time after removal with some extra logic to finally purge it after a timeout.

- Per apache#28085, when the executor is lost, it forgets the shuffle state about just that executor and increments the shuffleFileLostEpoch. This incrementing precludes the clearing of state of the entire host when the fetch failure happens. This PR elects to only change this codepath for the special case of decommissioning, without any other side effects. This whole version keeping stuff is complex and it has effectively not been semantically changed since 2013! The fix here is also simple: Ignore the shuffleFileLostEpoch when the shuffle status is being cleared due to a fetch failure resulting from host decommission.

These two fixes are local to decommissioning only and don't change other behavior.

Also added some more tests to TaskSchedulerImpl to ensure that the decommissioning information is indeed purged after a timeout.

Also hardened the test DecommissionWorkerSuite to make it wait for successful job completion.
agrawaldevesh pushed a commit to agrawaldevesh/spark that referenced this pull request Aug 15, 2020
The DecommissionWorkerSuite started becoming flaky and it revealed a real regression. Recent PR's (apache#28085 and apache#29211) neccessitate a small reworking of the decommissioning logic.

Before getting into that, let me describe the intended behavior of decommissioning:

If a fetch failure happens where the source executor was decommissioned, we want to treat that as an eager signal to clear all shuffle state associated with that executor. In addition if we know that the host was decommissioned, we want to forget about all map statuses from all other executors on that decommissioned host. This is what the test "decommission workers ensure that fetch failures lead to rerun" is trying to test. This invariant is important to ensure that decommissioning a host does not lead to multiple fetch failures that might fail the job.

- Per apache#29211, the executors now eagerly exit on decommissioning and thus the executor is lost before the fetch failure even happens. (I tested this by waiting some seconds before triggering the fetch failure). When an executor is lost, we forget its decommissioning information. The fix is to keep the decommissioning information around for some time after removal with some extra logic to finally purge it after a timeout.

- Per apache#28085, when the executor is lost, it forgets the shuffle state about just that executor and increments the shuffleFileLostEpoch. This incrementing precludes the clearing of state of the entire host when the fetch failure happens. This PR elects to only change this codepath for the special case of decommissioning, without any other side effects. This whole version keeping stuff is complex and it has effectively not been semantically changed since 2013! The fix here is also simple: Ignore the shuffleFileLostEpoch when the shuffle status is being cleared due to a fetch failure resulting from host decommission.

These two fixes are local to decommissioning only and don't change other behavior.

Also added some more tests to TaskSchedulerImpl to ensure that the decommissioning information is indeed purged after a timeout.

Also hardened the test DecommissionWorkerSuite to make it wait for successful job completion.
agrawaldevesh pushed a commit to agrawaldevesh/spark that referenced this pull request Aug 17, 2020
The DecommissionWorkerSuite started becoming flaky and it revealed a real regression. Recent PR's (apache#28085 and apache#29211) neccessitate a small reworking of the decommissioning logic.

Before getting into that, let me describe the intended behavior of decommissioning:

If a fetch failure happens where the source executor was decommissioned, we want to treat that as an eager signal to clear all shuffle state associated with that executor. In addition if we know that the host was decommissioned, we want to forget about all map statuses from all other executors on that decommissioned host. This is what the test "decommission workers ensure that fetch failures lead to rerun" is trying to test. This invariant is important to ensure that decommissioning a host does not lead to multiple fetch failures that might fail the job.

- Per apache#29211, the executors now eagerly exit on decommissioning and thus the executor is lost before the fetch failure even happens. (I tested this by waiting some seconds before triggering the fetch failure). When an executor is lost, we forget its decommissioning information. The fix is to keep the decommissioning information around for some time after removal with some extra logic to finally purge it after a timeout.

- Per apache#28085, when the executor is lost, it forgets the shuffle state about just that executor and increments the shuffleFileLostEpoch. This incrementing precludes the clearing of state of the entire host when the fetch failure happens. This PR elects to only change this codepath for the special case of decommissioning, without any other side effects. This whole version keeping stuff is complex and it has effectively not been semantically changed since 2013! The fix here is also simple: Ignore the shuffleFileLostEpoch when the shuffle status is being cleared due to a fetch failure resulting from host decommission.

These two fixes are local to decommissioning only and don't change other behavior.

Also added some more tests to TaskSchedulerImpl to ensure that the decommissioning information is indeed purged after a timeout.

Also hardened the test DecommissionWorkerSuite to make it wait for successful job completion.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants